Skip to content

Implement multipart for netty-future, netty-cats & netty-zio#5315

Merged
adamw merged 6 commits into
softwaremill:masterfrom
ajozwik:netty-multipart-future-4851
Jun 19, 2026
Merged

Implement multipart for netty-future, netty-cats & netty-zio#5315
adamw merged 6 commits into
softwaremill:masterfrom
ajozwik:netty-multipart-future-4851

Conversation

@ajozwik

@ajozwik ajozwik commented Jun 14, 2026

Copy link
Copy Markdown
Contributor

Proposal of multipart implementation for netty - see #4851 - only multipart.

Implementation based on nettyServerSync implementation - but without OxStreams.

According to Note: Netty's multipart decoder does not expose other part headers, nor other disposition params. the test with netty should ignoring part headers - partOtherHeaderSupport = false

Issue for support headers has been requested: FileUpload or HttpData should contain HTTP-Headers

What is missing?

  • Part headers if netty api expose it
  • Configuration for multipart temp directory and minimum disk size
  • Streaming - but it is not part of multipart.

For sync implementation multipart depends on OxStreams - what is not needed and can be moved to other submodule.

@ajozwik ajozwik force-pushed the netty-multipart-future-4851 branch 3 times, most recently from 78ea9a8 to 8d9e390 Compare June 15, 2026 15:54
@ajozwik

ajozwik commented Jun 15, 2026

Copy link
Copy Markdown
Contributor Author

PR is ready. It can contains more changes - just on master, because I squashed commits with just merged (to master) changes ....

Main part is based on MonadSubscriber - implementation of Subscriber - who subscribe to Netty publisher. In implementation java.util.concurrent.Condition is used.

@ajozwik ajozwik force-pushed the netty-multipart-future-4851 branch 2 times, most recently from 265995f to 03fedda Compare June 16, 2026 15:02
@adamw

adamw commented Jun 16, 2026

Copy link
Copy Markdown
Member

Code Review

Overview

This PR adds multipart request body support to the three async Netty backends (netty-future, netty-cats, netty-zio), which previously threw UnsupportedOperationException("Multipart requests are not supported"). It does so largely by extracting and generalizing the multipart logic that already existed in the sync backend (which depended on OxStreams) into a reusable, effect-polymorphic hierarchy:

Request side (decoding multipart):

  • NettyRequestBodyWithMultipart — holds the Netty HttpDataFactory (temp-dir / min-disk-size config).
  • NettyRequestBodyWithMultipartF[F, S] — generic publisherToMultipart + writeBytesToFile implemented via CompletableFuture / scala-java8-compat. Each backend supplies a listMonadToMonadOfList (Future.sequence / _.sequence / ZIO.collectAll).
  • NettyStreamingRequestBody changed from a trait to an abstract class extending the new F base; sync now extends the non-F base.

Response side (encoding multipart):

  • NettyToResponseBodyCommon / NettyToResponseBodyBase — pulls the fromRawValue + MultipartEntityBuilder logic (previously duplicated in NettySyncToResponseBody) into shared traits used by all backends.

Tests: Enables multipart = true for the Future backend and flips partOtherHeaderSupport = false for all three (Netty's decoder doesn't expose per-part headers — tracked upstream in netty/netty#15445).

The de-duplication is the strongest part of this PR — the response-body multipart logic was genuinely copy-pasted before, and consolidating it is a clear win.

🔴 Significant concerns

1. Lost-wakeup deadlock / NPE race in publisherToMultipart

In NettyRequestBodyWithMultipartF:

nettyRequest.subscribe(subscriber)   // runs on a Netty event-loop thread
locked {
  condition.await()
  result.get()
}

where the subscriber calls release()locked(condition.signal()) on onComplete/onError.

There's a classic lost-wakeup race: subscribe(...) may run the subscriber to completion on the Netty I/O thread before the calling thread acquires the lock and reaches condition.await(). If signal() fires before anyone is await()-ing, the wakeup is lost and the calling thread blocks forever.

Even when it doesn't deadlock, condition.await() is not guarded by a while (!done) loop, so a spurious wakeup can return early, after which result.get() returns null → NPE. Conditions should always be awaited in a loop that re-checks a state predicate.

Suggested: set a done flag (or check result.get() != null) under the lock and loop: while (result.get() == null) condition.await(), with the subscriber setting the result before signalling, under the same lock.

2. Blocking a thread in async/non-blocking backends

Because NettyStreamingRequestBody (cats & zio) now extends NettyRequestBodyWithMultipartF, this synchronous condition.await() blocks the thread that invokes publisherToMultipart for cats-effect and ZIO as well as Future. Blocking a compute/event-loop thread defeats the purpose of those runtimes and risks thread-pool starvation under load.

The idiomatic approach is to thread the result through the effect system instead of a lock+condition — e.g. complete a Deferred (cats), a Promise (zio), or a scala.concurrent.Promise (Future) from the subscriber callbacks, and return that. That also makes concern #1 disappear.

It's worth confirming whether the passing tests exercise concurrency at all, or just sequential single-request cases that happen to dodge the race.

🟡 Minor issues

  • Formatting likely fails scalafmt CI. NettyToResponseBody.scala has non-standard reformatting: the Scaladoc block was re-aligned from * to *, and method parameters use a deep continuation indent that looks like IntelliJ's default rather than the repo's scalafmt. Worth running sbt scalafmtAll before merge.
  • Stray blank lines introduced in NettySyncRequestBody.scala (double blank after the extends clause) and NettySyncToResponseBody.scala.
  • scala.compat.java8.FutureConverters is deprecated in favor of scala.jdk.FutureConverters on 2.13; since netty-future still cross-builds 2.12 this is probably deliberate — a short comment would help.
  • Unrelated cleanups bundled in: URLURI, NioEventLoopGroupEventLoopGroup, SAM conversions for Sleeper, removed unused imports, the new testSettings warning suppression. All reasonable, but they enlarge the diff and mix concerns with the feature.
  • part.fileName.get in rawValueToContentBody (moved, not new) will throw if a ByteArrayBody/InputStreamBody part has no filename. Pre-existing behavior carried over from sync, so not a regression, but worth hardening since this path now serves more backends.

✅ Strengths

  • Removes real duplication (response multipart logic was copy-pasted in sync).
  • Cleanly generalizes over effect types via a single listMonadToMonadOfList hook per backend.
  • maxBytes enforcement and decoder cleanup (RawValue.cleanup = decoder.destroy(), httpContent.release() after decoder.offer) are handled — relevant given this repo's history of Netty ByteBuf leaks.
  • Honest scoping in the description: part headers, temp-dir config, and streaming are explicitly called out as not-yet-done.

Recommendation

The feature direction and refactoring are good, but the lock/condition.await() mechanism in NettyRequestBodyWithMultipartF (#1 + #2) is the crux: it has a real lost-wakeup/NPE race and blocks async runtimes. Reworking it to complete a promise/deferred from the subscriber (rather than blocking on a condition) would address both, plus a scalafmt pass. As a follow-up, the sync backend's multipart need no longer depend on OxStreams and could move to a shared module.

🤖 Generated with Claude Code

@ajozwik ajozwik force-pushed the netty-multipart-future-4851 branch from 03fedda to 645dd86 Compare June 17, 2026 19:44
@ajozwik ajozwik force-pushed the netty-multipart-future-4851 branch from 347f47f to b669a8c Compare June 18, 2026 06:51
@adamw

adamw commented Jun 18, 2026

Copy link
Copy Markdown
Member

Re-review (after "Restore traits, call onError when buffer exceed")

Thanks for the updates — both of the significant concerns from the previous review are resolved.

✅ Resolved: the blocking lock/condition race

The ReentrantLock + condition.await() mechanism is gone, replaced by a scala.concurrent.Promise:

val promise = Promise[F[RawValue[Seq[RawPart]]]]()
val subscriber = new MonadSubscriber(...)(promise)
nettyRequest.subscribe(subscriber)
monad.flatten(fromFuture(promise.future))   // no blocking

The subscriber completes the promise from its onComplete/onError callbacks, so this is fully non-blocking — no more lost-wakeup deadlock, no NPE-on-spurious-wakeup, and no thread blocked in the cats/zio runtimes. The new per-backend fromFuture bridges all look correct:

  • future: identity (f)
  • cats: Async[F].fromFuture(Async[F].delay(f))
  • zio: ZIO.fromFuture(_ => f)

The maxBytes-exceeded path now routes through onError(StreamMaxLengthExceededException(max)) rather than setting the result directly, and NettyStreamingRequestBody is back to a trait — nice, keeps the change less invasive.

✅ Resolved: formatting churn

The non-standard reformatting in NettyToResponseBody.scala (realigned Scaladoc, deep param indents) is gone; the added lines are now standard scalafmt. Note a scalafmtAll run appears to have pulled in one unrelated file — observability/zio-opentelemetry/.../ZIOpenTelemetryTracingSpec.scala has two purely cosmetic reformats. Harmless, just unrelated to the feature.

Remaining minor notes

  • Promise double-completion: promise.success/promise.failure throw IllegalStateException if called twice. The maxBytes path does subscription.cancel() then onError; if the publisher still emits a terminal onComplete/onError afterwards, the second completion throws. Using trySuccess/tryFailure would be idempotent and safer.
  • decoder.offer exceptions: in addContentSafe, an exception from offer propagates out of onNext rather than being routed to onError(t) — a reactive-streams contract nit.
  • Pre-existing carry-overs still stand: part.fileName.get can throw for byte-array/input-stream parts without a filename; scala.compat.java8.FutureConverters is deprecated on 2.13 (likely intentional for the 2.12 cross-build).

The core architecture concern is fully addressed — this is now a clean, non-blocking implementation. What's left is minor robustness polish.

🤖 Generated with Claude Code

Comment thread build.sbt Outdated
import sttp.capabilities.Streams
import sttp.tapir.TapirFile

private[netty] trait NettyRequestBodyWithMultipart[F[_], S <: Streams[S]] extends NettyRequestBody[F, S] {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have any NettyRequestBody impls that are NOT multpart? maybe we can simply fold this into the base trait, flatenning the hierarhcy

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

@adamw

adamw commented Jun 18, 2026

Copy link
Copy Markdown
Member

Reactive Streams spec compliance — MonadSubscriber

I checked MonadSubscriber against the Reactive Streams JVM spec (Subscriber rules §2.x). It works for the well-behaved single-shot publisher it's actually given (Play's StreamedHttpRequest), but it's not spec-robust: there are two §2.13 throw-paths, and it isn't prepared for the signals a compliant publisher is explicitly allowed to send after cancel().

✅ Compliant

  • §2.1 (signal demand): request(1) in onSubscribe and after each non-terminal onNext — correct one-at-a-time backpressure.
  • §2.7 (request/cancel serial): all request/cancel calls happen inside serially-delivered signals. (The AtomicReference/AtomicLong aren't strictly needed given §2.3 serial delivery, but they're harmless.)
  • Buffer ownership: httpContent.release() in the finally is correct, and decoder.destroy() cleanup wiring is fine — but see the leak note below.

❌ §2.13 — signals MUST return normally and MUST NOT throw

"Calling onSubscribe, onNext, onError or onComplete MUST return normally… The only legal way to signal failure is by calling onError; the Subscriber MUST NOT throw."

Two paths violate this:

  1. decoder.offer can throw. In addContentSafe, offer (e.g. ErrorDataDecoderException on malformed multipart) is wrapped in try/finally that releases the buffer but does not catch — so the exception propagates out of onNext. Malformed input throws into the publisher instead of failing the promise. It should be caught and routed to onError.
  2. Promise double-completion throws IllegalStateException. promise.failure/promise.success throw if the promise is already completed (see §2.8–2.10), and that ISE escapes the signal method.

❌ §2.8 / §2.9 / §2.10 — not prepared for post-cancel / extra terminal signals

§2.8: "A Subscriber MUST be prepared to receive one or more onNext signals after having called Subscription.cancel()… cancel does not guarantee to perform the underlying cleaning operations immediately."
§2.9 / §2.10: "MUST be prepared to receive an onComplete/onError signal with or without a preceding request."

The maxBytes path does subscription.cancel() then onError(...) (failing the promise). A compliant publisher may still deliver:

  • a late onNextcurrentRead still > maxcancel() + onError(...) again → promise.failure on an already-failed promise → ISE;
  • a late onCompletepromise.success on an already-failed promise → ISE;
  • a late onErrorpromise.failure again → ISE.

Root cause: no "already terminated" guard. Two changes together close all of these:

  • use promise.tryFailure/trySuccess (idempotent), and
  • track a terminal flag (or early-return once terminated) so post-termination signals are ignored and you don't keep re-cancelling / touching a destroy()-ed decoder.

⚠️ §2.2 (recommendation, not a MUST) — work on the publisher's thread

onNext decodes synchronously, and with DefaultHttpDataFactory a large part can spill to disk inside decoder.offer — potentially blocking I/O on Netty's event-loop thread. §2.2 recommends async dispatch when processing could harm publisher responsiveness. Not a violation, but a real consideration for large uploads.

⚠️ Buffer leak on the over-limit chunk

In the maxBytes branch the incoming httpContent is not released (it skips addContentSafe), so the final over-limit chunk leaks its buffer. Given this repo's history with Netty ByteBuf leaks, worth releasing it before cancelling.

Bottom line

Happy path and backpressure are correct. The gaps: (1) decoder.offer exceptions escaping onNext (§2.13), (2) non-idempotent promise completion that throws if the publisher emits anything after cancel()/termination (§2.8–2.10 → §2.13), and (3) the missed buffer release on the over-limit chunk. All three are addressed by: a catch around offeronError, a terminal guard + tryFailure/trySuccess, and releasing the chunk in the maxBytes branch. None will likely fire with Play's current publisher, but the subscriber isn't robust to a spec-compliant one.

🤖 Generated with Claude Code


import java.io.InputStream

trait NettyToResponseBodyBase[S] extends NettyToResponseBodyCommon[S] {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same with the base/common/toresponsebody traits - it would be great to document why they are separate, and what are their roles

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to NettyToResponseBodyWrap and comment added

@ajozwik

ajozwik commented Jun 19, 2026

Copy link
Copy Markdown
Contributor Author

Reactive Streams spec compliance — MonadSubscriber

I checked MonadSubscriber against the Reactive Streams JVM spec (Subscriber rules §2.x). It works for the well-behaved single-shot publisher it's actually given (Play's StreamedHttpRequest), but it's not spec-robust: there are two §2.13 throw-paths, and it isn't prepared for the signals a compliant publisher is explicitly allowed to send after cancel().

✅ Compliant

  • §2.1 (signal demand): request(1) in onSubscribe and after each non-terminal onNext — correct one-at-a-time backpressure.
  • §2.7 (request/cancel serial): all request/cancel calls happen inside serially-delivered signals. (The AtomicReference/AtomicLong aren't strictly needed given §2.3 serial delivery, but they're harmless.)
  • Buffer ownership: httpContent.release() in the finally is correct, and decoder.destroy() cleanup wiring is fine — but see the leak note below.

❌ §2.13 — signals MUST return normally and MUST NOT throw

"Calling onSubscribe, onNext, onError or onComplete MUST return normally… The only legal way to signal failure is by calling onError; the Subscriber MUST NOT throw."

Two paths violate this:

  1. decoder.offer can throw. In addContentSafe, offer (e.g. ErrorDataDecoderException on malformed multipart) is wrapped in try/finally that releases the buffer but does not catch — so the exception propagates out of onNext. Malformed input throws into the publisher instead of failing the promise. It should be caught and routed to onError.
  2. Promise double-completion throws IllegalStateException. promise.failure/promise.success throw if the promise is already completed (see §2.8–2.10), and that ISE escapes the signal method.

❌ §2.8 / §2.9 / §2.10 — not prepared for post-cancel / extra terminal signals

§2.8: "A Subscriber MUST be prepared to receive one or more onNext signals after having called Subscription.cancel()… cancel does not guarantee to perform the underlying cleaning operations immediately."
§2.9 / §2.10: "MUST be prepared to receive an onComplete/onError signal with or without a preceding request."

The maxBytes path does subscription.cancel() then onError(...) (failing the promise). A compliant publisher may still deliver:

  • a late onNextcurrentRead still > maxcancel() + onError(...) again → promise.failure on an already-failed promise → ISE;
  • a late onCompletepromise.success on an already-failed promise → ISE;
  • a late onErrorpromise.failure again → ISE.

Root cause: no "already terminated" guard. Two changes together close all of these:

  • use promise.tryFailure/trySuccess (idempotent), and
  • track a terminal flag (or early-return once terminated) so post-termination signals are ignored and you don't keep re-cancelling / touching a destroy()-ed decoder.

⚠️ §2.2 (recommendation, not a MUST) — work on the publisher's thread

onNext decodes synchronously, and with DefaultHttpDataFactory a large part can spill to disk inside decoder.offer — potentially blocking I/O on Netty's event-loop thread. §2.2 recommends async dispatch when processing could harm publisher responsiveness. Not a violation, but a real consideration for large uploads.

⚠️ Buffer leak on the over-limit chunk

In the maxBytes branch the incoming httpContent is not released (it skips addContentSafe), so the final over-limit chunk leaks its buffer. Given this repo's history with Netty ByteBuf leaks, worth releasing it before cancelling.

Bottom line

Happy path and backpressure are correct. The gaps: (1) decoder.offer exceptions escaping onNext (§2.13), (2) non-idempotent promise completion that throws if the publisher emits anything after cancel()/termination (§2.8–2.10 → §2.13), and (3) the missed buffer release on the over-limit chunk. All three are addressed by: a catch around offeronError, a terminal guard + tryFailure/trySuccess, and releasing the chunk in the maxBytes branch. None will likely fire with Play's current publisher, but the subscriber isn't robust to a spec-compliant one.

🤖 Generated with Claude Code

§2.2 (recommendation, not a MUST) - not corrected. All other corrected.

Implementation of §2.2 needs to create "executor". Calling generic:

override def onNext(httpContent: HttpContent): Unit =
      monad.eval(onNextHandle(httpContent))

private def onNextHandle(httpContent: HttpContent): Unit = // original synchronize onNext

works on Future, but hanging on cats/zio (wrong context). The solution is to create separate executor pull (how to configure it and manage? what size of thread pool?)

adamw and others added 3 commits June 19, 2026 10:10
writeBytesToFile only used scala-java8-compat to convert a manually-built
CompletableFuture to a scala.concurrent.Future. Complete a Promise directly
in the AsynchronousFileChannel CompletionHandler instead, which works on
2.12/2.13/3 without the extra module dependency.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Every NettyRequestBody implementation (future/cats/zio via NettyMonadRequestBody,
and sync) uses the multipart httpDataFactory, so the intermediate trait added no
real separation. Move the factory and its multipartTempDirectory/multipartMinSizeForDisk
config into the base trait and drop NettyRequestBodyWithHttpDataFactory, flattening
the hierarchy by one level.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@adamw adamw merged commit b3fbc1c into softwaremill:master Jun 19, 2026
16 checks passed
@adamw

adamw commented Jun 19, 2026

Copy link
Copy Markdown
Member

Thanks for working on this! :)

@ajozwik ajozwik deleted the netty-multipart-future-4851 branch June 19, 2026 11:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants