Skip to content

Commit ee07ef0

Browse files
committed
*: move frame assembly from reader to stream
Previously, Reader assembled wire frames into complete packets before handing them to the manager. This change makes Reader return individual frames (ReadFrame), and the stream handles frame assembly itself (HandleFrame). The manager now enforces global frame ID monotonicity and other validation that are beyond a stream's scope. This is groundwork for stream multiplexing, where frames from different streams will be interleaved on the wire and must be routed to the correct stream before assembly.
1 parent 1898402 commit ee07ef0

File tree

7 files changed

+641
-362
lines changed

7 files changed

+641
-362
lines changed

drpcconn/conn_test.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,9 @@ func TestConn_InvokeFlushesSendClose(t *testing.T) {
4343
wr := drpcwire.NewWriter(ps, 64)
4444
rd := drpcwire.NewReader(ps)
4545

46-
_, _ = rd.ReadPacket() // Invoke
47-
_, _ = rd.ReadPacket() // Message
48-
pkt, _ := rd.ReadPacket() // CloseSend
46+
_, _ = rd.ReadFrame() // Invoke
47+
_, _ = rd.ReadFrame() // Message
48+
pkt, _ := rd.ReadFrame() // CloseSend
4949

5050
_ = wr.WritePacket(drpcwire.Packet{
5151
Data: []byte("qux"),
@@ -54,8 +54,8 @@ func TestConn_InvokeFlushesSendClose(t *testing.T) {
5454
})
5555
_ = wr.Flush()
5656

57-
_, _ = rd.ReadPacket() // Close
58-
<-invokeDone // wait for invoke to return
57+
_, _ = rd.ReadFrame() // Close
58+
<-invokeDone // wait for invoke to return
5959

6060
// ensure that any later packets are dropped by writing one
6161
// before closing the transport.
@@ -98,7 +98,7 @@ func TestConn_InvokeSendsGrpcAndDrpcMetadata(t *testing.T) {
9898
wr := drpcwire.NewWriter(ps, 64)
9999
rd := drpcwire.NewReader(ps)
100100

101-
md, err := rd.ReadPacket() // Metadata
101+
md, err := rd.ReadFrame() // Metadata
102102
assert.NoError(t, err)
103103
assert.Equal(t, md.Kind, drpcwire.KindInvokeMetadata)
104104
metadata, err := drpcmetadata.Decode(md.Data)
@@ -110,9 +110,9 @@ func TestConn_InvokeSendsGrpcAndDrpcMetadata(t *testing.T) {
110110
"common-key": "common-value2",
111111
})
112112

113-
_, _ = rd.ReadPacket() // Invoke
114-
_, _ = rd.ReadPacket() // Message
115-
pkt, _ := rd.ReadPacket() // CloseSend
113+
_, _ = rd.ReadFrame() // Invoke
114+
_, _ = rd.ReadFrame() // Message
115+
pkt, _ := rd.ReadFrame() // CloseSend
116116

117117
_ = wr.WritePacket(drpcwire.Packet{
118118
Data: []byte("qux"),
@@ -121,7 +121,7 @@ func TestConn_InvokeSendsGrpcAndDrpcMetadata(t *testing.T) {
121121
})
122122
_ = wr.Flush()
123123

124-
_, _ = rd.ReadPacket() // Close
124+
_, _ = rd.ReadFrame() // Close
125125
})
126126

127127
conn := New(pc)
@@ -154,7 +154,7 @@ func TestConn_NewStreamSendsGrpcAndDrpcMetadata(t *testing.T) {
154154
ctx.Run(func(ctx context.Context) {
155155
rd := drpcwire.NewReader(ps)
156156

157-
md, err := rd.ReadPacket() // Metadata
157+
md, err := rd.ReadFrame() // Metadata
158158
assert.NoError(t, err)
159159
assert.Equal(t, md.Kind, drpcwire.KindInvokeMetadata)
160160
metadata, err := drpcmetadata.Decode(md.Data)
@@ -164,8 +164,8 @@ func TestConn_NewStreamSendsGrpcAndDrpcMetadata(t *testing.T) {
164164
"drpc-key": "drpc-value",
165165
})
166166

167-
_, _ = rd.ReadPacket() // Invoke
168-
_, _ = rd.ReadPacket() // CloseSend
167+
_, _ = rd.ReadFrame() // Invoke
168+
_, _ = rd.ReadFrame() // CloseSend
169169
})
170170

171171
conn := New(pc)

drpcmanager/manager.go

Lines changed: 36 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@ type Manager struct {
7272
rd *drpcwire.Reader
7373
opts Options
7474

75+
lastFrameID drpcwire.ID
76+
lastFrameKind drpcwire.Kind
77+
7578
sem drpcsignal.Chan // held by the active stream
7679
sbuf streamBuffer // largest stream id created
7780
pkts chan drpcwire.Packet // channel for invoke packets
@@ -213,27 +216,15 @@ func (m *Manager) terminate(err error) {
213216
// manage reader
214217
//
215218

216-
// manageReader is always reading a packet and dispatching it to the appropriate
217-
// stream or queue. It sets the read signal when it exits so that one can wait
218-
// to ensure that no one is reading on the reader. It sets the term signal if
219-
// there is any error reading packets.
219+
// manageReader reads the frame and dispatches them to the appropriate stream or
220+
// queue. It sets the read signal when it exits so that one can wait to ensure
221+
// that no one is reading on the reader. It sets the term signal if there is any
222+
// error reading frames.
220223
func (m *Manager) manageReader() {
221224
defer m.sigs.read.Set(nil)
222225

223-
var pkt drpcwire.Packet
224-
var err error
225-
var run int
226-
227226
for !m.sigs.term.IsSet() {
228-
// if we have a run of "small" packets, drop the buffer to release
229-
// memory so that a burst of large packets does not cause eternally
230-
// large heap usage.
231-
if run > 10 {
232-
pkt.Data = nil
233-
run = 0
234-
}
235-
236-
pkt, err = m.rd.ReadPacketUsing(pkt.Data[:0])
227+
incomingFrame, err := m.rd.ReadFrame()
237228
if err != nil {
238229
if isConnectionReset(err) {
239230
err = drpc.ClosedError.Wrap(err)
@@ -242,36 +233,36 @@ func (m *Manager) manageReader() {
242233
return
243234
}
244235

245-
if len(pkt.Data) < cap(pkt.Data)/4 {
246-
run++
247-
} else {
248-
run = 0
249-
}
236+
m.log("READ", incomingFrame.String)
250237

251-
m.log("READ", pkt.String)
238+
if incomingFrame.ID.Less(m.lastFrameID) {
239+
m.terminate(managerClosed.Wrap(drpc.ProtocolError.New("id monotonicity violation")))
240+
return
241+
}
252242

253243
switch curr := m.sbuf.Get(); {
254-
// if the packet is for the current stream, deliver it.
255-
case curr != nil && pkt.ID.Stream == curr.ID():
256-
if err := curr.HandlePacket(pkt); err != nil {
244+
// If the frame is for the current stream, deliver it.
245+
case curr != nil && incomingFrame.ID.Stream == curr.ID():
246+
if err := curr.HandleFrame(incomingFrame); err != nil {
257247
m.terminate(managerClosed.Wrap(err))
258248
return
259249
}
260250

261-
// if an old message has been sent, just ignore it.
262-
case curr != nil && pkt.ID.Stream < curr.ID():
251+
// If a frame arrives for an old stream, just ignore it.
252+
case curr != nil && incomingFrame.ID.Stream < curr.ID():
263253

264-
// if any invoke sequence is being sent, close any old unterminated
265-
// stream and forward it to be handled.
266-
case pkt.Kind == drpcwire.KindInvoke || pkt.Kind == drpcwire.KindInvokeMetadata:
254+
// If an invoke sequence is being sent for a new stream, close any
255+
// old unterminated stream and forward it to be handled.
256+
case incomingFrame.Kind == drpcwire.KindInvoke || incomingFrame.Kind == drpcwire.KindInvokeMetadata:
267257
if curr != nil && !curr.IsTerminated() {
268258
curr.Cancel(context.Canceled)
269259
}
270260

261+
pkt := drpcwire.Packet{ID: incomingFrame.ID, Kind: incomingFrame.Kind, Data: incomingFrame.Data}
271262
select {
272263
case m.pkts <- pkt:
273264
// Wait for NewServerStream to finish stream creation (including
274-
// sbuf.Set) before reading the next packet. This guarantees curr
265+
// sbuf.Set) before reading the next frame. This guarantees curr
275266
// is set for subsequent non-invoke packets.
276267
m.pdone.Recv()
277268

@@ -280,15 +271,21 @@ func (m *Manager) manageReader() {
280271
}
281272

282273
default:
283-
// A non-invoke packet arrived for a stream that doesn't exist yet
284-
// (curr is nil or pkt.ID.Stream > curr.ID). The first packet of a
285-
// new stream must be KindInvoke or KindInvokeMetadata.
286-
m.terminate(managerClosed.Wrap(drpc.ProtocolError.New(
287-
"first packet of a new stream must be Invoke, got %v (ID:%v)",
288-
pkt.Kind,
289-
pkt.ID)))
274+
// A non-invoke frame arrived for a stream that doesn't exist yet
275+
// (curr is nil or incomingFrame.ID.Stream > curr.ID). The first
276+
// frame of a new stream must be KindInvoke or KindInvokeMetadata.
277+
m.terminate(managerClosed.New(
278+
"first frame of a new stream must be Invoke, got %v (ID:%v)",
279+
incomingFrame.Kind,
280+
incomingFrame.ID))
290281
return
291282
}
283+
284+
m.lastFrameKind = incomingFrame.Kind
285+
m.lastFrameID = incomingFrame.ID
286+
if incomingFrame.Done {
287+
m.lastFrameID.Message += 1
288+
}
292289
}
293290
}
294291

drpcmanager/manager_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,7 @@ func TestManageReader_OldStreamFramesIgnored(t *testing.T) {
366366

367367
// The first frame for a new stream must be KindInvoke or KindInvokeMetadata.
368368
// A non-invoke kind causes a protocol error.
369-
func Disabled_TestManageReader_FirstFrameMustBeInvoke(t *testing.T) {
369+
func TestManageReader_FirstFrameMustBeInvoke(t *testing.T) {
370370
for _, kind := range []drpcwire.Kind{
371371
drpcwire.KindMessage,
372372
drpcwire.KindCancel,

drpcstream/stream.go

Lines changed: 62 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import (
2323

2424
// Options controls configuration settings for a stream.
2525
type Options struct {
26-
// SplitSize controls the default size we split packets into frames.
26+
// SplitSize controls the default size we split data packets into frames.
2727
SplitSize int
2828

2929
// ManualFlush controls if the stream will automatically flush after every
@@ -52,6 +52,11 @@ type Stream struct {
5252
read inspectMutex
5353
flush sync.Once
5454

55+
assembling bool
56+
pktBuf []byte
57+
pktKind drpcwire.Kind
58+
nextMessageID uint64
59+
5560
id drpcwire.ID
5661
wr *drpcwire.Writer
5762
pbuf packetBuffer
@@ -98,6 +103,9 @@ func NewWithOptions(ctx context.Context, sid uint64, wr *drpcwire.Writer, opts O
98103
fin: drpcopts.GetStreamFin(&opts.Internal),
99104
task: task,
100105

106+
// TODO: add a test case.
107+
nextMessageID: 1,
108+
101109
id: drpcwire.ID{Stream: sid},
102110
wr: wr.Reset(),
103111
}
@@ -211,18 +219,58 @@ func (s *Stream) IsFinished() bool { return s.sigs.fin.IsSet() }
211219
func (s *Stream) SetManualFlush(mf bool) { s.opts.ManualFlush = mf }
212220

213221
//
214-
// packet handler
222+
// frame handler
215223
//
216224

217-
// HandlePacket advances the stream state machine by inspecting the packet. It
218-
// returns any major errors that should terminate the transport the stream is
219-
// operating on as well as a boolean indicating if the stream expects more
220-
// packets.
221-
func (s *Stream) HandlePacket(pkt drpcwire.Packet) (err error) {
222-
if pkt.ID.Stream != s.id.Stream {
225+
// HandleFrame processes an incoming frame, assembling multi-frame packets
226+
// and dispatching complete packets to the stream state machine.
227+
func (s *Stream) HandleFrame(fr drpcwire.Frame) (err error) {
228+
if fr.ID.Stream != s.ID() {
229+
return drpc.ProtocolError.New("frame doesn't belong to this stream (fr: %v)", fr.ID)
230+
}
231+
232+
if fr.ID.Message < s.nextMessageID {
233+
return drpc.ProtocolError.New(
234+
"id monotonicity violation: frame %v has message ID less than expected %v", fr.ID, s.nextMessageID)
235+
} else if fr.ID.Message > s.nextMessageID || !s.assembling {
236+
s.pktBuf = s.pktBuf[:0]
237+
s.assembling = true
238+
s.nextMessageID = fr.ID.Message
239+
} else if fr.Kind != s.pktKind {
240+
return drpc.ProtocolError.New("frame kind change within packet: got %v, expected %v", fr.Kind, s.pktKind)
241+
}
242+
243+
// TODO(shubham): add buf reuse
244+
s.pktBuf = append(s.pktBuf, fr.Data...)
245+
246+
s.pktKind = fr.Kind
247+
248+
if s.opts.MaximumBufferSize > 0 && len(s.pktBuf) > s.opts.MaximumBufferSize {
249+
return drpc.ProtocolError.New("data overflow (len:%d)", len(s.pktBuf))
250+
}
251+
252+
if !fr.Done {
223253
return nil
224254
}
225255

256+
s.assembling = false
257+
s.nextMessageID = fr.ID.Message + 1
258+
259+
err = s.handlePacket(drpcwire.Packet{
260+
ID: fr.ID,
261+
Kind: fr.Kind,
262+
Data: s.pktBuf,
263+
})
264+
265+
// TODO(shubham): add buf reuse
266+
s.pktBuf = nil
267+
return err
268+
}
269+
270+
// handlePacket advances the stream state machine by inspecting the packet. It
271+
// returns any major errors that should terminate the transport the stream is
272+
// operating on.
273+
func (s *Stream) handlePacket(pkt drpcwire.Packet) (err error) {
226274
drpcopts.GetStreamStats(&s.opts.Internal).AddRead(uint64(len(pkt.Data)))
227275

228276
if s.sigs.term.IsSet() {
@@ -240,7 +288,7 @@ func (s *Stream) HandlePacket(pkt drpcwire.Packet) (err error) {
240288
defer s.mu.Unlock()
241289

242290
switch pkt.Kind {
243-
case drpcwire.KindInvoke:
291+
case drpcwire.KindInvoke, drpcwire.KindInvokeMetadata:
244292
err := drpc.ProtocolError.New("invoke on existing stream")
245293
s.terminate(err)
246294
return err
@@ -375,9 +423,13 @@ func (s *Stream) RawWrite(kind drpcwire.Kind, data []byte) (err error) {
375423

376424
// rawWriteLocked does the body of RawWrite assuming the caller is holding the
377425
// appropriate locks.
426+
// TODO(shubham): can we merge this with sendPacketLocked?
378427
func (s *Stream) rawWriteLocked(kind drpcwire.Kind, data []byte) (err error) {
379428
fr := s.newFrameLocked(kind)
380-
n := s.opts.SplitSize
429+
n := 0
430+
if kind == drpcwire.KindMessage {
431+
n = s.opts.SplitSize
432+
}
381433

382434
for {
383435
switch {

0 commit comments

Comments
 (0)