Skip to content

drpcstream: make invoke sequence atomic under write lock#52

Open
shubhamdhama wants to merge 15 commits intoshubham/enable-stream-multiplexingfrom
shubham/make-invoke-sequence-atomic
Open

drpcstream: make invoke sequence atomic under write lock#52
shubhamdhama wants to merge 15 commits intoshubham/enable-stream-multiplexingfrom
shubham/make-invoke-sequence-atomic

Conversation

@shubhamdhama
Copy link
Copy Markdown

Add Stream.WriteInvoke that writes InvokeMetadata and Invoke frames
under a single write lock acquisition. This prevents SendCancel from
slipping in between the two frames when a context is cancelled during
stream setup.

Without this, the following race is possible:

  1. Client creates stream, starts manageStream goroutine.
  2. doNewStream sends InvokeMetadata, releases write lock.
  3. Context cancels. manageStream calls SendCancel, acquires write lock,
    sends KindCancel, terminates the stream.
  4. doNewStream tries to send Invoke, sees stream terminated, returns error.

The server receives InvokeMetadata then Cancel, but never the Invoke.
It has no registered stream to cancel, so the Cancel is dropped and
the partial invokeAssembler entry leaks until the connection closes.

With WriteInvoke, SendCancel blocks until both frames are written.
The server always sees a complete invoke before any cancel.

Extract the frame assembly logic from Stream.HandleFrame into a
reusable PacketAssembler type in drpcwire. Both the stream and the
manager now use their own PacketAssembler instance, keeping assembly
logic in one place.

The manager's assembler handles the invoke sequence (metadata +
invoke), which removes the restriction that only KindMessage packets
could be split across frames. This also simplifies NewServerStream
from a packet-at-a-time loop into a single receive.
Rename packet_builder.go to packet_assembler.go for clarity. Move
frame-to-packet assembly tests from drpcstream/stream_test.go to
drpcwire/packet_assembler_test.go, testing PacketAssembler directly
without Stream dependencies. Stream tests now focus on handlePacket
behavior (delivery, termination, invoke rejection).
This simplifies stream lifecycle management by spawning a goroutine per
stream. This is a prerequisite for stream multiplexing, where multiple
streams need concurrent lifecycle management.
Replace the shared sfin channel with stream.Finished(), giving each
stream its own completion signal. The shared channel worked for
single-stream-at-a-time. Per-stream signals are required for
multiplexing where multiple streams finish independently.
Decouple client stream ID assignment from sbuf, which tracks the latest
stream object. This is a step toward removing sbuf in favor of the stream
registry.
Replace the single-stream streamBuffer with a stream registry that maps
stream IDs to stream objects. The registry currently holds at most one
active stream (two briefly during handoff), but provides the foundation
for stream multiplexing where callers will look up streams by ID directly.
We are not using it and not planning to use it anytime soon, so till then
it's just a burden to maintain.
Enable multiple concurrent streams over a single transport. This is the
foundational change that replaces the single-stream-at-a-time model with true
multiplexing, allowing clients and servers to run multiple RPCs concurrently on
a shared connection.

Background:

Previously, the Manager enforced single-stream semantics: a semaphore (sem)
allowed only one active stream, and each new stream had to wait for the previous
one to finish (waitForPreviousStream). Stream IDs were required to be
monotonically increasing (checkStreamMonotonicity), and a single PacketAssembler
was shared across all invoke sequences. This was simple and correct for
one-at-a-time RPCs but incompatible with multiplexing.

Structural changes:

Manager:
- Remove the semaphore (sem) and acquireSemaphore/waitForPreviousStream.
  Multiple streams can now be created concurrently without blocking on each
  other.
- Remove checkStreamMonotonicity. With multiplexing, frames from different
  streams arrive interleaved; monotonicity is not meaningful.
- Remove lastFrameID/lastFrameKind tracking fields (only used by the
  monotonicity check).
- Replace the single shared PacketAssembler with a per-stream invokesAssembler
  map (map[uint64]*invokeAssembler). Each stream's invoke/metadata frame
  sequence is assembled independently.
- Remove SoftCancel option (see error semantics below).
- Remove GetLatest from streamRegistry; manageReader now dispatches frames by
  looking up the stream ID in the registry directly.

Server:
- ServeOne now spawns a goroutine per incoming RPC via sync.WaitGroup, rather
  than handling RPCs sequentially. Errors from individual RPC handlers are
  logged (via opts.Log) rather than terminating the connection.

Stream:
- NewWithOptions no longer calls wr.Reset() on the shared Writer. With
  multiplexing, multiple streams share the same Writer; resetting it would
  discard buffered frames from other streams.
- SendCancel no longer returns a (busy, error) tuple. It blocks on the stream's
  write lock instead of returning busy=true when another write is in progress.
  This guarantees the cancel frame is sent (or fails with an IO error), at the
  cost of waiting for any in-progress write to finish. A future writer queue
  will eliminate this blocking.

Error and cancellation semantics:

The central design principle is that the manageReader goroutine is the single
authority on transport health. It is the only goroutine that reads from the
transport, and if the transport fails, it will detect the failure and terminate
the connection. Write-side errors propagate to the caller but do not directly
terminate the connection; the reader will independently detect the broken
transport (since an IO write failure implies the transport is broken, and the
next read will also fail). This matches gRPC's approach: when loopyWriter
encounters an IO error, it does not close the connection. Instead, it relies on
the reader to detect the failure and clean up.

Error classification:

  Connection-scoped (terminates all streams):
  - Transport read error: manageReader fails to read a frame.
  - Frame assembly error: corrupted wire data that cannot be parsed.
  - Protocol error: e.g., receiving an invoke on an existing stream, or an
    unknown non-control packet kind.
  - Manager.Close(): explicit shutdown by the application.

  Stream-scoped (only affects that stream):
  - Application error: the RPC handler returns an error, which is sent via
    SendError (KindError) and terminates only that stream.
  - Remote close: receiving KindClose or KindCloseSend terminates or half-closes
    only that stream.
  - Remote cancel: receiving KindCancel terminates only that stream.
  - Remote error: receiving KindError terminates only that stream.
  - Write error (MsgSend, SendError, CloseSend, SendCancel): the error
    propagates to the caller. The stream is terminated locally. The manageReader
    goroutine will detect the transport failure on its next read and terminate
    the connection.

Context cancellation:

When a stream's context is cancelled, manageStream:
1. Attempts to send a KindCancel frame (SendCancel). This blocks until any
   in-progress write on that stream completes, then sends the cancel. If the
   send fails (IO error), the error is logged. The reader will catch the
   transport failure.
2. Cancels the stream locally (stream.Cancel), which terminates the stream and
   causes any blocked Send/Recv to return the context error.
3. Waits for the stream to finish (stream.Finished).

The SoftCancel option is removed. Previously, SoftCancel=false would terminate
the entire connection when a stream's context was cancelled (calling m.terminate
if the stream wasn't already finished). With multiplexing, cancelling one stream
must never kill the connection. SoftCancel=true behavior (send cancel, then
cancel locally) is now the only behavior, simplified to always block for the
write lock rather than returning "busy" and falling back to a hard cancel.

Manager termination:

When the manager terminates (from any connection-scoped error), it closes the
transport and the stream registry. Each active stream's manageStream goroutine
detects termination via m.sigs.term, cancels its stream, and waits for it to
finish. Manager.Close() then waits for all stream goroutines (m.wg.Wait) and the
reader goroutine before returning.

Known limitations:

- The shared drpcwire.Writer is protected by a mutex. All streams serialize
  their writes through this single writer.
- SendCancel blocks on the stream's write lock. If a stream has a large
  in-progress write, the cancel is delayed.
- packetBuffer is single-slot (Put blocks until Get+Done). A slow consumer
  stream blocks manageReader, stalling frame delivery to all streams. This needs
  to be addressed with per-stream buffering or async delivery.
- Conn.Invoke() holds a mutex for the entire unary RPC duration, serializing
  concurrent unary RPCs. Streaming RPCs (NewStream) are not affected.
Add Stream.WriteInvoke that writes InvokeMetadata and Invoke frames
under a single write lock acquisition. This prevents SendCancel from
slipping in between the two frames when a context is cancelled during
stream setup.

Without this, the following race is possible:
1. Client creates stream, starts manageStream goroutine.
2. doNewStream sends InvokeMetadata, releases write lock.
3. Context cancels. manageStream calls SendCancel, acquires write lock,
   sends KindCancel, terminates the stream.
4. doNewStream tries to send Invoke, sees stream terminated, returns error.

The server receives InvokeMetadata then Cancel, but never the Invoke.
It has no registered stream to cancel, so the Cancel is dropped and
the partial invokeAssembler entry leaks until the connection closes.

With WriteInvoke, SendCancel blocks until both frames are written.
The server always sees a complete invoke before any cancel.
@shubhamdhama shubhamdhama force-pushed the shubham/enable-stream-multiplexing branch 2 times, most recently from aebdc90 to d57f892 Compare April 12, 2026 11:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant