Skip to content

Commit c7a2d1a

Browse files
committed
rename invokesAssembler to pendingStreams
1 parent a5c05c2 commit c7a2d1a

1 file changed

Lines changed: 22 additions & 15 deletions

File tree

drpcmanager/manager.go

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,10 @@ type Manager struct {
6666
pdone drpcsignal.Chan // signals when NewServerStream has registered the new stream
6767
invokes chan invokeInfo // completed invoke info from manageReader to NewServerStream
6868

69-
// invokesAssembler is owned by the manageReader goroutine, used in
70-
// handleInvokeFrame.
71-
invokesAssembler map[uint64]*invokeAssembler
69+
// pendingStreams is owned by the manageReader goroutine, used in
70+
// handleInvokeFrame. It tracks streams that are being assembled from
71+
// invoke/metadata frames but haven't been fully created yet.
72+
pendingStreams map[uint64]*pendingStream
7273

7374
sigs struct {
7475
term drpcsignal.Signal // set when the manager should start terminating
@@ -87,7 +88,10 @@ const (
8788
Server
8889
)
8990

90-
type invokeAssembler struct {
91+
// pendingStream accumulates invoke and metadata frames for a stream that is
92+
// being set up but hasn't been fully created yet. Once the invoke packet
93+
// arrives, the pending stream is forwarded to NewServerStream.
94+
type pendingStream struct {
9195
metadata map[string]string // accumulated invoke metadata
9296
pa drpcwire.PacketAssembler // assembles invoke/metadata frames into packets
9397
}
@@ -123,7 +127,7 @@ func NewWithOptions(tr drpc.Transport, kind ManagerKind, opts Options) *Manager
123127
// new server stream without having to coordinate with manageReader.
124128
m.pdone.Make(1)
125129

126-
m.invokesAssembler = make(map[uint64]*invokeAssembler)
130+
m.pendingStreams = make(map[uint64]*pendingStream)
127131

128132
m.streams = newActiveStreams()
129133

@@ -211,15 +215,15 @@ func (m *Manager) manageReader() {
211215
}
212216

213217
// handleInvokeFrame assembles invoke/metadata frames into complete packets and
214-
// forwards the finished invoke info to NewServerStream via m.newServerStreamInfo.
215-
// Metadata packets are accumulated; the invoke packet triggers the send.
218+
// forwards the finished invoke info to NewServerStream. Metadata packets are
219+
// accumulated; the invoke packet triggers the send.
216220
func (m *Manager) handleInvokeFrame(fr drpcwire.Frame) error {
217-
ia, ok := m.invokesAssembler[fr.ID.Stream]
221+
ps, ok := m.pendingStreams[fr.ID.Stream]
218222
if !ok {
219-
ia = &invokeAssembler{pa: drpcwire.NewPacketAssembler()}
220-
m.invokesAssembler[fr.ID.Stream] = ia
223+
ps = &pendingStream{pa: drpcwire.NewPacketAssembler()}
224+
m.pendingStreams[fr.ID.Stream] = ps
221225
}
222-
pkt, packetReady, err := ia.pa.AppendFrame(fr)
226+
pkt, packetReady, err := ps.pa.AppendFrame(fr)
223227
if err != nil {
224228
return err
225229
}
@@ -233,19 +237,19 @@ func (m *Manager) handleInvokeFrame(fr drpcwire.Frame) error {
233237
if err != nil {
234238
return err
235239
}
236-
ia.metadata = meta
240+
ps.metadata = meta
237241
return nil
238242
}
239243

240244
// Invoke packet completes the sequence. Send to NewServerStream.
241245
select {
242-
case m.invokes <- invokeInfo{sid: pkt.ID.Stream, data: pkt.Data, metadata: ia.metadata}:
246+
case m.invokes <- invokeInfo{sid: pkt.ID.Stream, data: pkt.Data, metadata: ps.metadata}:
243247
// Wait for NewServerStream to finish stream creation before reading the
244248
// next frame. This guarantees curr is set for subsequent non-invoke
245249
// packets.
246250
m.pdone.Recv()
247-
// TODO: reuse invoke assembler
248-
delete(m.invokesAssembler, fr.ID.Stream)
251+
// TODO: reuse pending stream
252+
delete(m.pendingStreams, fr.ID.Stream)
249253
case <-m.sigs.term.Signal():
250254
}
251255
return nil
@@ -310,6 +314,9 @@ func (m *Manager) Closed() <-chan struct{} {
310314
// Unblocked returns a channel that is closed when the manager is no longer
311315
// blocked. With multiplexing, multiple streams run concurrently and this
312316
// channel is always closed immediately.
317+
//
318+
// TODO(shubham): audit whether this is still useful in a multiplexing world.
319+
// The only meaningful caller is Pool.Take.
313320
func (m *Manager) Unblocked() <-chan struct{} {
314321
return closedCh
315322
}

0 commit comments

Comments
 (0)