Implement multipart for netty-future, netty-cats & netty-zio#5315
Conversation
78ea9a8 to
8d9e390
Compare
|
PR is ready. It can contains more changes - just on master, because I squashed commits with just merged (to master) changes .... Main part is based on MonadSubscriber - implementation of Subscriber - who subscribe to Netty publisher. In implementation java.util.concurrent.Condition is used. |
265995f to
03fedda
Compare
Code ReviewOverviewThis PR adds multipart request body support to the three async Netty backends ( Request side (decoding multipart):
Response side (encoding multipart):
Tests: Enables The de-duplication is the strongest part of this PR — the response-body multipart logic was genuinely copy-pasted before, and consolidating it is a clear win. 🔴 Significant concerns1. Lost-wakeup deadlock / NPE race in
|
03fedda to
645dd86
Compare
347f47f to
b669a8c
Compare
Re-review (after "Restore traits, call onError when buffer exceed")Thanks for the updates — both of the significant concerns from the previous review are resolved. ✅ Resolved: the blocking lock/condition raceThe val promise = Promise[F[RawValue[Seq[RawPart]]]]()
val subscriber = new MonadSubscriber(...)(promise)
nettyRequest.subscribe(subscriber)
monad.flatten(fromFuture(promise.future)) // no blockingThe subscriber completes the promise from its
The maxBytes-exceeded path now routes through ✅ Resolved: formatting churnThe non-standard reformatting in Remaining minor notes
The core architecture concern is fully addressed — this is now a clean, non-blocking implementation. What's left is minor robustness polish. 🤖 Generated with Claude Code |
| import sttp.capabilities.Streams | ||
| import sttp.tapir.TapirFile | ||
|
|
||
| private[netty] trait NettyRequestBodyWithMultipart[F[_], S <: Streams[S]] extends NettyRequestBody[F, S] { |
There was a problem hiding this comment.
do we have any NettyRequestBody impls that are NOT multpart? maybe we can simply fold this into the base trait, flatenning the hierarhcy
Reactive Streams spec compliance —
|
|
|
||
| import java.io.InputStream | ||
|
|
||
| trait NettyToResponseBodyBase[S] extends NettyToResponseBodyCommon[S] { |
There was a problem hiding this comment.
same with the base/common/toresponsebody traits - it would be great to document why they are separate, and what are their roles
There was a problem hiding this comment.
Renamed to NettyToResponseBodyWrap and comment added
§2.2 (recommendation, not a MUST) - not corrected. All other corrected. Implementation of §2.2 needs to create "executor". Calling generic: works on Future, but hanging on cats/zio (wrong context). The solution is to create separate executor pull (how to configure it and manage? what size of thread pool?) |
writeBytesToFile only used scala-java8-compat to convert a manually-built CompletableFuture to a scala.concurrent.Future. Complete a Promise directly in the AsynchronousFileChannel CompletionHandler instead, which works on 2.12/2.13/3 without the extra module dependency. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Every NettyRequestBody implementation (future/cats/zio via NettyMonadRequestBody, and sync) uses the multipart httpDataFactory, so the intermediate trait added no real separation. Move the factory and its multipartTempDirectory/multipartMinSizeForDisk config into the base trait and drop NettyRequestBodyWithHttpDataFactory, flattening the hierarchy by one level. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Thanks for working on this! :) |
Proposal of multipart implementation for netty - see #4851 - only multipart.
Implementation based on
nettyServerSyncimplementation - but without OxStreams.According to Note: Netty's multipart decoder does not expose other part headers, nor other disposition params. the test with netty should ignoring part headers -
partOtherHeaderSupport = falseIssue for support headers has been requested: FileUpload or HttpData should contain HTTP-Headers
What is missing?
For sync implementation multipart depends on OxStreams - what is not needed and can be moved to other submodule.