Skip to content

Commit a5c05c2

Browse files
committed
fixup! enable stream multiplexing
1 parent 48d83cb commit a5c05c2

10 files changed

Lines changed: 42 additions & 49 deletions

File tree

drpcconn/conn_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func TestConn_InvokeFlushesSendClose(t *testing.T) {
4141

4242
ctx.Run(func(ctx context.Context) {
4343
wr := drpcwire.NewMuxWriter(ps, nil)
44-
defer wr.StopWait()
44+
defer func() { wr.Stop(nil); <-wr.Done() }()
4545
rd := drpcwire.NewReader(ps)
4646

4747
_, _ = rd.ReadFrame() // Invoke
@@ -97,7 +97,7 @@ func TestConn_InvokeSendsGrpcAndDrpcMetadata(t *testing.T) {
9797

9898
ctx.Run(func(ctx context.Context) {
9999
wr := drpcwire.NewMuxWriter(ps, nil)
100-
defer wr.StopWait()
100+
defer func() { wr.Stop(nil); <-wr.Done() }()
101101
rd := drpcwire.NewReader(ps)
102102

103103
md, err := rd.ReadFrame() // Metadata

drpcmanager/active_streams.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,10 @@ import (
1212
// activeStreams is a thread-safe map of stream IDs to stream objects.
1313
// It is used by the Manager to track active streams for lifecycle management.
1414
type activeStreams struct {
15-
mu sync.RWMutex
16-
streams map[uint64]*drpcstream.Stream
17-
closed bool
15+
mu sync.RWMutex
16+
streams map[uint64]*drpcstream.Stream
17+
closed bool
18+
closeErr error
1819
}
1920

2021
func newActiveStreams() *activeStreams {
@@ -34,7 +35,7 @@ func (r *activeStreams) Add(id uint64, stream *drpcstream.Stream) error {
3435
defer r.mu.Unlock()
3536

3637
if r.closed {
37-
return managerClosed.New("add to closed collection")
38+
return r.closeErr
3839
}
3940
if _, ok := r.streams[id]; ok {
4041
return managerClosed.New("duplicate stream id")
@@ -73,6 +74,7 @@ func (r *activeStreams) Close(err error) {
7374
defer r.mu.Unlock()
7475

7576
r.closed = true
77+
r.closeErr = err
7678
for id, s := range r.streams {
7779
s.Cancel(err)
7880
delete(r.streams, id)

drpcmanager/active_streams_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717

1818
func testMuxWriter(t *testing.T) *drpcwire.MuxWriter {
1919
mw := drpcwire.NewMuxWriter(io.Discard, func(error) {})
20-
t.Cleanup(func() { mw.Stop(); <-mw.Done() })
20+
t.Cleanup(func() { mw.Stop(nil); <-mw.Done() })
2121
return mw
2222
}
2323

drpcmanager/manager.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,14 +152,14 @@ func (m *Manager) log(what string, cb func() string) {
152152
func (m *Manager) terminate(err error) {
153153
if m.sigs.term.Set(err) {
154154
m.log("TERM", func() string { return fmt.Sprint(err) })
155-
m.wr.Stop()
156-
m.sigs.tport.Set(m.tr.Close())
157155
if errors.Is(err, io.EOF) {
158156
err = context.Canceled
159157
if m.kind == Client {
160158
err = drpc.ClosedError.New("connection closed")
161159
}
162160
}
161+
m.wr.Stop(err)
162+
m.sigs.tport.Set(m.tr.Close())
163163
m.streams.Close(err)
164164
}
165165
}
@@ -191,7 +191,7 @@ func (m *Manager) manageReader() {
191191

192192
switch {
193193
// if the packet is for an active stream, deliver it.
194-
case ok && stream != nil:
194+
case ok:
195195
if err := stream.HandleFrame(incomingFrame); err != nil {
196196
m.terminate(managerClosed.Wrap(err))
197197
return

drpcmanager/manager_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,6 @@ func TestManageReader_GlobalMonotonicity_SameStream(t *testing.T) {
201201
waitForClosed(t, man)
202202
}
203203

204-
205204
// Invoke replay: after [s1,m1,invoke,done=true], lastFrameID is bumped to
206205
// {1,2}. A replayed [s1,m1,invoke] is caught by the monotonicity check.
207206
func TestManageReader_InvokeReplayBlocked(t *testing.T) {
@@ -531,4 +530,3 @@ func TestManageReader_WaitsForStreamCreation(t *testing.T) {
531530

532531
assert.DeepEqual(t, <-recv, []byte("data"))
533532
}
534-

drpcmanager/random_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ func runRandomized(t *testing.T, prog []byte, r runner) {
201201
defer func() { _ = ps.Close() }()
202202

203203
wr := drpcwire.NewMuxWriter(pc, func(error) {})
204-
defer func() { wr.Stop(); <-wr.Done() }()
204+
defer func() { wr.Stop(nil); <-wr.Done() }()
205205

206206
man := New(ps, Server)
207207
defer func() { _ = man.Close() }()

drpcstream/packet_queue.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@ type packetQueue struct {
2929
mu sync.Mutex
3030
cond sync.Cond
3131

32-
buf [][]byte // ring buffer of byte slices
33-
head int // next write position (producer)
34-
tail int // next read position (consumer)
35-
count int // number of occupied slots
32+
buf [][]byte // ring buffer of byte slices
33+
head int // next write position (producer)
34+
tail int // next read position (consumer)
35+
count int // number of occupied slots
3636

3737
held bool // true between Get and Done
3838
err error // terminal error, set by Close

drpcstream/stream_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import (
2323
func testMuxWriter(t *testing.T) *drpcwire.MuxWriter {
2424
t.Helper()
2525
mw := drpcwire.NewMuxWriter(io.Discard, func(error) {})
26-
t.Cleanup(mw.StopWait)
26+
t.Cleanup(func() { mw.Stop(nil); <-mw.Done() })
2727
return mw
2828
}
2929

drpcwire/mux_writer.go

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,17 @@ package drpcwire
66
import (
77
"io"
88
"sync"
9-
10-
"storj.io/drpc"
119
)
1210

1311
type MuxWriter struct {
14-
w io.Writer
15-
buf []byte
16-
mu sync.Mutex
17-
cond *sync.Cond
18-
closed bool
19-
onError func(error)
20-
done chan struct{}
12+
w io.Writer
13+
buf []byte
14+
mu sync.Mutex
15+
cond *sync.Cond
16+
closed bool
17+
closeErr error
18+
onError func(error)
19+
done chan struct{}
2120
}
2221

2322
var defaultBufferCapacity = 4096
@@ -57,6 +56,7 @@ func (mw *MuxWriter) run() {
5756
return
5857
}
5958
mw.closed = true
59+
mw.closeErr = err
6060
mw.mu.Unlock()
6161
if mw.onError != nil {
6262
mw.onError(err)
@@ -72,18 +72,18 @@ func (mw *MuxWriter) WriteFrame(fr Frame) (err error) {
7272
mw.mu.Lock()
7373
defer mw.mu.Unlock()
7474
if mw.closed {
75-
return drpc.ClosedError.New("mux writer closed")
75+
return mw.closeErr
7676
}
7777
mw.buf = AppendFrame(mw.buf, fr)
7878
mw.cond.Signal()
7979
return nil
8080
}
8181

82-
func (mw *MuxWriter) Stop() {
82+
func (mw *MuxWriter) Stop(err error) {
8383
mw.mu.Lock()
84-
if mw.closed {
85-
} else {
84+
if !mw.closed {
8685
mw.closed = true
86+
mw.closeErr = err
8787
mw.cond.Broadcast()
8888
}
8989
mw.mu.Unlock()
@@ -92,8 +92,3 @@ func (mw *MuxWriter) Stop() {
9292
func (mw *MuxWriter) Done() <-chan struct{} {
9393
return mw.done
9494
}
95-
96-
func (mw *MuxWriter) StopWait() {
97-
mw.Stop()
98-
<-mw.Done()
99-
}

drpcwire/mux_writer_test.go

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,13 @@ import (
1212
"time"
1313

1414
"github.com/zeebo/assert"
15-
16-
"storj.io/drpc"
1715
)
1816

1917
// blockingWriter blocks in Write until unblock is closed, then returns err.
2018
type blockingWriter struct {
2119
unblock chan struct{}
22-
err error // error to return once unblocked
23-
wrote chan []byte // sends a copy of data on each Write entry
20+
err error // error to return once unblocked
21+
wrote chan []byte // sends a copy of data on each Write entry
2422
}
2523

2624
func newBlockingWriter() *blockingWriter {
@@ -80,7 +78,7 @@ func TestMuxWriter(t *testing.T) {
8078
assert.NoError(t, err)
8179

8280
// Now stop the writer and close the pipe.
83-
mw.Stop()
81+
mw.Stop(errors.New("stopped"))
8482
<-mw.Done()
8583
pw.Close()
8684
pr.Close()
@@ -90,12 +88,12 @@ func TestMuxWriter(t *testing.T) {
9088

9189
func TestMuxWriter_WriteFrameAfterStop(t *testing.T) {
9290
mw := NewMuxWriter(io.Discard, func(error) {})
93-
mw.Stop()
91+
mw.Stop(errors.New("stopped"))
9492
<-mw.Done()
9593

9694
err := mw.WriteFrame(RandFrame())
9795
assert.Error(t, err)
98-
assert.That(t, drpc.ClosedError.Has(err))
96+
assert.Equal(t, err.Error(), "stopped")
9997
}
10098

10199
func TestMuxWriter_ConcurrentWriteFrame(t *testing.T) {
@@ -141,7 +139,7 @@ func TestMuxWriter_ConcurrentWriteFrame(t *testing.T) {
141139
got := make([]byte, expSize)
142140
_, err := io.ReadFull(pr, got)
143141
assert.NoError(t, err)
144-
mw.Stop()
142+
mw.Stop(errors.New("stopped"))
145143
<-mw.Done()
146144
pw.Close()
147145
pr.Close()
@@ -190,7 +188,7 @@ func TestMuxWriter_OnErrorCallingStopDoesNotDeadlock(t *testing.T) {
190188
var mw *MuxWriter
191189
mw = NewMuxWriter(fw, func(err error) {
192190
// Simulate manager.terminate calling Stop.
193-
mw.Stop()
191+
mw.Stop(errors.New("stopped"))
194192
})
195193

196194
assert.NoError(t, mw.WriteFrame(RandFrame()))
@@ -218,7 +216,7 @@ func TestMuxWriter_BlockedWriteUnblockedByClose(t *testing.T) {
218216
}
219217

220218
// Simulate terminate: Stop, then unblock the writer (like tr.Close()).
221-
mw.Stop()
219+
mw.Stop(errors.New("stopped"))
222220
bw.err = errors.New("closed")
223221
close(bw.unblock)
224222

@@ -241,7 +239,7 @@ func TestMuxWriter_ConcurrentStop(t *testing.T) {
241239
for range n {
242240
go func() {
243241
defer wg.Done()
244-
mw.Stop()
242+
mw.Stop(errors.New("stopped"))
245243
}()
246244
}
247245
wg.Wait()
@@ -276,7 +274,7 @@ func TestMuxWriter_StopDiscardsBufferedData(t *testing.T) {
276274
}
277275

278276
// Stop without letting the blocked Write complete.
279-
mw.Stop()
277+
mw.Stop(errors.New("stopped"))
280278
bw.err = errors.New("closed")
281279
close(bw.unblock)
282280

@@ -322,7 +320,7 @@ func TestMuxWriter_WriteFrameDuringActiveDrain(t *testing.T) {
322320
close(g2.ch)
323321

324322
// Both batches were written. Stop and verify.
325-
mw.Stop()
323+
mw.Stop(errors.New("stopped"))
326324
<-mw.Done()
327325
}
328326

0 commit comments

Comments
 (0)