Skip to content

drpcmanager: replace streamBuffer with a streams registry#47

Merged
shubhamdhama merged 1 commit intostream-multiplexingfrom
shubham/add-stream-registry
Apr 13, 2026
Merged

drpcmanager: replace streamBuffer with a streams registry#47
shubhamdhama merged 1 commit intostream-multiplexingfrom
shubham/add-stream-registry

Conversation

@shubhamdhama
Copy link
Copy Markdown

Replace the single-stream streamBuffer with a stream registry that maps
stream IDs to stream objects. The registry currently holds at most one
active stream (two briefly during handoff), but provides the foundation
for stream multiplexing where callers will look up streams by ID directly.

r.mu.RLock()
defer r.mu.RUnlock()

s, ok := r.streams[id]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you have to check if registry is closed?

Copy link
Copy Markdown
Author

@shubhamdhama shubhamdhama Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do that now since Close means remove all the entries. I can do it in next PR.

func (r *streamRegistry) Unregister(id uint64) {
r.mu.Lock()
defer r.mu.Unlock()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you have to check if registry is closed?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. If registry is closed the entry would have most likely removed and doing this is noop.

m.log("TERM", func() string { return fmt.Sprint(err) })
m.sigs.tport.Set(m.tr.Close())
m.sbuf.Close()
m.reg.ForEach(func(s *drpcstream.Stream) { s.Close() })
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be s.Cancel() instead of s.Close()? s.Close() writes to the transport, and the current code already holds the lock and writes to the transport for every stream. I'm assuming this path isn't intended for mux streams?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just closing the registry. Anyways, Close will Cancel the streams now.

Comment on lines 363 to 373
case <-m.sigs.term.Signal():
err := m.sigs.term.Err()
if errors.Is(err, io.EOF) {
err = context.Canceled
if stream.Kind() == drpc.StreamKindClient {
err = drpc.ClosedError.New("connection closed")
}
}
stream.Cancel(err)
<-stream.Finished()
m.sem.Recv()
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[pp] we can remove it and cancel all the streams in reg.Close().

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Next PR.

@@ -217,7 +220,7 @@
if m.sigs.term.Set(err) {
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[pp] Let's depend on .term in the registry.Register. And close the streams in reg.Close and remove the .term dependency from manageStreams

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll do in follow-up.

@shubhamdhama shubhamdhama force-pushed the shubham/use-atomic-counter branch from 6abe605 to e3212b7 Compare April 7, 2026 03:55
Base automatically changed from shubham/use-atomic-counter to stream-multiplexing April 7, 2026 03:56
@shubhamdhama shubhamdhama force-pushed the shubham/add-stream-registry branch from a4092e3 to 9155272 Compare April 7, 2026 05:09
Copy link
Copy Markdown
Author

@shubhamdhama shubhamdhama left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to address some of the feedback in a separate PR.

r.mu.RLock()
defer r.mu.RUnlock()

s, ok := r.streams[id]
Copy link
Copy Markdown
Author

@shubhamdhama shubhamdhama Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do that now since Close means remove all the entries. I can do it in next PR.

@@ -217,7 +220,7 @@ func (m *Manager) terminate(err error) {
if m.sigs.term.Set(err) {
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll do in follow-up.

m.log("TERM", func() string { return fmt.Sprint(err) })
m.sigs.tport.Set(m.tr.Close())
m.sbuf.Close()
m.reg.ForEach(func(s *drpcstream.Stream) { s.Close() })
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just closing the registry. Anyways, Close will Cancel the streams now.

@shubhamdhama shubhamdhama changed the title drpcmanager: replace streamBuffer with streamRegistry drpcmanager: replace streamBuffer with a streams registry Apr 7, 2026
Replace the single-stream streamBuffer with a stream registry that maps
stream IDs to stream objects. The registry currently holds at most one
active stream (two briefly during handoff), but provides the foundation
for stream multiplexing where callers will look up streams by ID directly.
@shubhamdhama shubhamdhama force-pushed the shubham/add-stream-registry branch from 9155272 to e14c876 Compare April 8, 2026 07:27
Copy link
Copy Markdown

@cthumuluru-crdb cthumuluru-crdb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just added one comment about a test that's removed. Otherwise changes look good to me.

r.mu.Lock()
defer r.mu.Unlock()

if r.closed {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(for future commits), if you make this an atomic boolean, you can do this check outside. You can also move this check outside once you rely on manager closed signal.

@shubhamdhama shubhamdhama merged commit 54db735 into stream-multiplexing Apr 13, 2026
@shubhamdhama shubhamdhama deleted the shubham/add-stream-registry branch April 13, 2026 11:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants