Skip to content

Commit c77b000

Browse files
committed
fixup! enable stream multiplexing
1 parent e1f642b commit c77b000

8 files changed

Lines changed: 30 additions & 35 deletions

File tree

drpcconn/conn_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func TestConn_InvokeFlushesSendClose(t *testing.T) {
4141

4242
ctx.Run(func(ctx context.Context) {
4343
wr := drpcwire.NewMuxWriter(ps, nil)
44-
defer wr.StopWait()
44+
defer func() { wr.Stop(nil); <-wr.Done() }()
4545
rd := drpcwire.NewReader(ps)
4646

4747
_, _ = rd.ReadFrame() // Invoke
@@ -97,7 +97,7 @@ func TestConn_InvokeSendsGrpcAndDrpcMetadata(t *testing.T) {
9797

9898
ctx.Run(func(ctx context.Context) {
9999
wr := drpcwire.NewMuxWriter(ps, nil)
100-
defer wr.StopWait()
100+
defer func() { wr.Stop(nil); <-wr.Done() }()
101101
rd := drpcwire.NewReader(ps)
102102

103103
md, err := rd.ReadFrame() // Metadata

drpcmanager/active_streams.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,10 @@ import (
1212
// activeStreams is a thread-safe map of stream IDs to stream objects.
1313
// It is used by the Manager to track active streams for lifecycle management.
1414
type activeStreams struct {
15-
mu sync.RWMutex
16-
streams map[uint64]*drpcstream.Stream
17-
closed bool
15+
mu sync.RWMutex
16+
streams map[uint64]*drpcstream.Stream
17+
closed bool
18+
closeErr error
1819
}
1920

2021
func newActiveStreams() *activeStreams {
@@ -34,7 +35,7 @@ func (r *activeStreams) Add(id uint64, stream *drpcstream.Stream) error {
3435
defer r.mu.Unlock()
3536

3637
if r.closed {
37-
return managerClosed.New("add to closed collection")
38+
return r.closeErr
3839
}
3940
if _, ok := r.streams[id]; ok {
4041
return managerClosed.New("duplicate stream id")
@@ -73,6 +74,7 @@ func (r *activeStreams) Close(err error) {
7374
defer r.mu.Unlock()
7475

7576
r.closed = true
77+
r.closeErr = err
7678
for id, s := range r.streams {
7779
s.Cancel(err)
7880
delete(r.streams, id)

drpcmanager/active_streams_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717

1818
func testMuxWriter(t *testing.T) *drpcwire.MuxWriter {
1919
mw := drpcwire.NewMuxWriter(io.Discard, func(error) {})
20-
t.Cleanup(func() { mw.Stop(); <-mw.Done() })
20+
t.Cleanup(func() { mw.Stop(nil); <-mw.Done() })
2121
return mw
2222
}
2323

drpcmanager/manager.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,14 +152,14 @@ func (m *Manager) log(what string, cb func() string) {
152152
func (m *Manager) terminate(err error) {
153153
if m.sigs.term.Set(err) {
154154
m.log("TERM", func() string { return fmt.Sprint(err) })
155-
m.wr.Stop()
156-
m.sigs.tport.Set(m.tr.Close())
157155
if errors.Is(err, io.EOF) {
158156
err = context.Canceled
159157
if m.kind == Client {
160158
err = drpc.ClosedError.New("connection closed")
161159
}
162160
}
161+
m.wr.Stop(err)
162+
m.sigs.tport.Set(m.tr.Close())
163163
m.streams.Close(err)
164164
}
165165
}
@@ -191,7 +191,7 @@ func (m *Manager) manageReader() {
191191

192192
switch {
193193
// if the packet is for an active stream, deliver it.
194-
case ok && stream != nil:
194+
case ok:
195195
if err := stream.HandleFrame(incomingFrame); err != nil {
196196
m.terminate(managerClosed.Wrap(err))
197197
return

drpcmanager/random_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ func runRandomized(t *testing.T, prog []byte, r runner) {
201201
defer func() { _ = ps.Close() }()
202202

203203
wr := drpcwire.NewMuxWriter(pc, func(error) {})
204-
defer func() { wr.Stop(); <-wr.Done() }()
204+
defer func() { wr.Stop(nil); <-wr.Done() }()
205205

206206
man := New(ps, Server)
207207
defer func() { _ = man.Close() }()

drpcstream/stream_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import (
2323
func testMuxWriter(t *testing.T) *drpcwire.MuxWriter {
2424
t.Helper()
2525
mw := drpcwire.NewMuxWriter(io.Discard, func(error) {})
26-
t.Cleanup(mw.StopWait)
26+
t.Cleanup(func() { mw.Stop(nil); <-mw.Done() })
2727
return mw
2828
}
2929

drpcwire/mux_writer.go

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,15 @@ package drpcwire
66
import (
77
"io"
88
"sync"
9-
10-
"storj.io/drpc"
119
)
1210

1311
type MuxWriter struct {
1412
w io.Writer
1513
buf []byte
1614
mu sync.Mutex
1715
cond *sync.Cond
18-
closed bool
16+
closed bool
17+
closeErr error
1918
onError func(error)
2019
done chan struct{}
2120
}
@@ -59,6 +58,7 @@ func (mw *MuxWriter) run() {
5958
return
6059
}
6160
mw.closed = true
61+
mw.closeErr = err
6262
mw.mu.Unlock()
6363
if mw.onError != nil {
6464
mw.onError(err)
@@ -74,18 +74,18 @@ func (mw *MuxWriter) WriteFrame(fr Frame) (err error) {
7474
mw.mu.Lock()
7575
defer mw.mu.Unlock()
7676
if mw.closed {
77-
return drpc.ClosedError.New("mux writer closed")
77+
return mw.closeErr
7878
}
7979
mw.buf = AppendFrame(mw.buf, fr)
8080
mw.cond.Signal()
8181
return nil
8282
}
8383

84-
func (mw *MuxWriter) Stop() {
84+
func (mw *MuxWriter) Stop(err error) {
8585
mw.mu.Lock()
86-
if mw.closed {
87-
} else {
86+
if !mw.closed {
8887
mw.closed = true
88+
mw.closeErr = err
8989
mw.cond.Broadcast()
9090
}
9191
mw.mu.Unlock()
@@ -94,8 +94,3 @@ func (mw *MuxWriter) Stop() {
9494
func (mw *MuxWriter) Done() <-chan struct{} {
9595
return mw.done
9696
}
97-
98-
func (mw *MuxWriter) StopWait() {
99-
mw.Stop()
100-
<-mw.Done()
101-
}

drpcwire/mux_writer_test.go

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@ import (
1212
"time"
1313

1414
"github.com/zeebo/assert"
15-
16-
"storj.io/drpc"
1715
)
1816

1917
// blockingWriter blocks in Write until unblock is closed, then returns err.
@@ -80,7 +78,7 @@ func TestMuxWriter(t *testing.T) {
8078
assert.NoError(t, err)
8179

8280
// Now stop the writer and close the pipe.
83-
mw.Stop()
81+
mw.Stop(errors.New("stopped"))
8482
<-mw.Done()
8583
pw.Close()
8684
pr.Close()
@@ -90,12 +88,12 @@ func TestMuxWriter(t *testing.T) {
9088

9189
func TestMuxWriter_WriteFrameAfterStop(t *testing.T) {
9290
mw := NewMuxWriter(io.Discard, func(error) {})
93-
mw.Stop()
91+
mw.Stop(errors.New("stopped"))
9492
<-mw.Done()
9593

9694
err := mw.WriteFrame(RandFrame())
9795
assert.Error(t, err)
98-
assert.That(t, drpc.ClosedError.Has(err))
96+
assert.Equal(t, err.Error(), "stopped")
9997
}
10098

10199
func TestMuxWriter_ConcurrentWriteFrame(t *testing.T) {
@@ -141,7 +139,7 @@ func TestMuxWriter_ConcurrentWriteFrame(t *testing.T) {
141139
got := make([]byte, expSize)
142140
_, err := io.ReadFull(pr, got)
143141
assert.NoError(t, err)
144-
mw.Stop()
142+
mw.Stop(errors.New("stopped"))
145143
<-mw.Done()
146144
pw.Close()
147145
pr.Close()
@@ -190,7 +188,7 @@ func TestMuxWriter_OnErrorCallingStopDoesNotDeadlock(t *testing.T) {
190188
var mw *MuxWriter
191189
mw = NewMuxWriter(fw, func(err error) {
192190
// Simulate manager.terminate calling Stop.
193-
mw.Stop()
191+
mw.Stop(errors.New("stopped"))
194192
})
195193

196194
assert.NoError(t, mw.WriteFrame(RandFrame()))
@@ -218,7 +216,7 @@ func TestMuxWriter_BlockedWriteUnblockedByClose(t *testing.T) {
218216
}
219217

220218
// Simulate terminate: Stop, then unblock the writer (like tr.Close()).
221-
mw.Stop()
219+
mw.Stop(errors.New("stopped"))
222220
bw.err = errors.New("closed")
223221
close(bw.unblock)
224222

@@ -241,7 +239,7 @@ func TestMuxWriter_ConcurrentStop(t *testing.T) {
241239
for range n {
242240
go func() {
243241
defer wg.Done()
244-
mw.Stop()
242+
mw.Stop(errors.New("stopped"))
245243
}()
246244
}
247245
wg.Wait()
@@ -276,7 +274,7 @@ func TestMuxWriter_StopDiscardsBufferedData(t *testing.T) {
276274
}
277275

278276
// Stop without letting the blocked Write complete.
279-
mw.Stop()
277+
mw.Stop(errors.New("stopped"))
280278
bw.err = errors.New("closed")
281279
close(bw.unblock)
282280

@@ -322,7 +320,7 @@ func TestMuxWriter_WriteFrameDuringActiveDrain(t *testing.T) {
322320
close(g2.ch)
323321

324322
// Both batches were written. Stop and verify.
325-
mw.Stop()
323+
mw.Stop(errors.New("stopped"))
326324
<-mw.Done()
327325
}
328326

0 commit comments

Comments
 (0)