Skip to content

Commit 3219dcf

Browse files
committed
*: use per-stream Finished signal instead of shared sfin channel
Replace the shared sfin channel with stream.Finished(), giving each stream its own completion signal. The shared channel worked for single-stream-at-a-time. Per-stream signals are required for multiplexing where multiple streams finish independently.
1 parent 7221ab1 commit 3219dcf

3 files changed

Lines changed: 4 additions & 19 deletions

File tree

drpcmanager/manager.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@ type Manager struct {
8080

8181
sem drpcsignal.Chan // held by the active stream
8282
sbuf streamBuffer // largest stream id created
83-
sfin chan struct{} // shared signal for stream finished
8483

8584
pdone drpcsignal.Chan // signals when NewServerStream has registered the new stream
8685
invokes chan invokeInfo // completed invoke info from manageReader to NewServerStream
@@ -119,8 +118,6 @@ func NewWithOptions(tr drpc.Transport, opts Options) *Manager {
119118
opts: opts,
120119

121120
invokes: make(chan invokeInfo),
122-
123-
sfin: make(chan struct{}, 1),
124121
}
125122

126123
// initialize the stream buffer
@@ -137,7 +134,6 @@ func NewWithOptions(tr drpc.Transport, opts Options) *Manager {
137134

138135
// set the internal stream options
139136
drpcopts.SetStreamTransport(&m.opts.Stream.Internal, m.tr)
140-
drpcopts.SetStreamFin(&m.opts.Stream.Internal, m.sfin)
141137

142138
go m.manageReader()
143139

@@ -370,10 +366,10 @@ func (m *Manager) manageStream(ctx context.Context, stream *drpcstream.Stream) {
370366
}
371367
}
372368
stream.Cancel(err)
373-
<-m.sfin
369+
<-stream.Finished()
374370
m.sem.Recv()
375371

376-
case <-m.sfin:
372+
case <-stream.Finished():
377373
m.sem.Recv()
378374

379375
case <-ctx.Done():
@@ -394,7 +390,7 @@ func (m *Manager) manageStream(ctx context.Context, stream *drpcstream.Stream) {
394390
stream.Cancel(ctx.Err())
395391

396392
// wait for the stream to signal that it is finished.
397-
<-m.sfin
393+
<-stream.Finished()
398394
} else {
399395
// If the stream isn't already finished, we have to terminate the
400396
// transport to do an active cancel. If it is already finished,
@@ -407,7 +403,7 @@ func (m *Manager) manageStream(ctx context.Context, stream *drpcstream.Stream) {
407403
}
408404

409405
// wait for the stream to signal that it is finished.
410-
<-m.sfin
406+
<-stream.Finished()
411407

412408
// allow a new stream to begin.
413409
m.sem.Recv()

drpcstream/stream.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ type Options struct {
4545
type Stream struct {
4646
ctx streamCtx
4747
opts Options
48-
fin chan<- struct{}
4948
task *trace.Task
5049

5150
write inspectMutex
@@ -100,7 +99,6 @@ func NewWithOptions(ctx context.Context, sid uint64, wr *drpcwire.Writer, opts O
10099
tr: drpcopts.GetStreamTransport(&opts.Internal),
101100
},
102101
opts: opts,
103-
fin: drpcopts.GetStreamFin(&opts.Internal),
104102
task: task,
105103

106104
pa: pa,
@@ -309,9 +307,6 @@ func (s *Stream) checkFinished() {
309307
if s.sigs.fin.Set(nil) {
310308
s.log("FIN", func() string { return "" })
311309
s.ctx.sig.Set(context.Canceled)
312-
if s.fin != nil {
313-
s.fin <- struct{}{}
314-
}
315310
if s.task != nil {
316311
s.task.End()
317312
}

internal/drpcopts/stream.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,6 @@ func GetStreamTransport(opts *Stream) drpc.Transport { return opts.transport }
2323
// SetStreamTransport sets the drpc.Transport stored in the options.
2424
func SetStreamTransport(opts *Stream, tr drpc.Transport) { opts.transport = tr }
2525

26-
// GetStreamFin returns the chan<- struct{} stored in the options.
27-
func GetStreamFin(opts *Stream) chan<- struct{} { return opts.fin }
28-
29-
// SetStreamFin sets the chan<- struct{} stored in the options.
30-
func SetStreamFin(opts *Stream, fin chan<- struct{}) { opts.fin = fin }
31-
3226
// GetStreamKind returns the StreamKind stored in the options.
3327
func GetStreamKind(opts *Stream) drpc.StreamKind { return opts.kind }
3428

0 commit comments

Comments
 (0)