Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 21 additions & 21 deletions pkg/moqt/session/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,40 +207,40 @@ func rejectStreamWithError(stream Stream, code moqt.RequestErrorCode, reason str
// On any error before the stream is established and the first message
// written, the stream (if any) is reset and the error is returned.
func (s *Session) OpenRequest(first message.Message) (Stream, error) {
return s.openStreamWith(first, nil)
stream, err := s.conn.OpenStream()
if err != nil {
return nil, err
}
return writeFirst(stream, first)
}

// openStreamWith opens a new outbound bidirectional stream, runs prepare (if
// non-nil) now that the open has succeeded, then writes first as the stream's
// initial message. prepare is the hook where a fresh Request ID is assigned, so
// a failed open (e.g. ErrNoStreamCredit) consumes no ID — the §10.1 sequence
// stays untouched. On a write failure the stream is reset and the error
// returned.
func (s *Session) openStreamWith(first message.Message, prepare func()) (Stream, error) {
// openAllocRequest opens a request stream for m and assigns m a freshly
// allocated Request ID (§10.1) only after the open succeeds — so a failed open
// (e.g. ErrNoStreamCredit) consumes no ID and the §10.1 sequence stays
// untouched — then writes it as the stream's first message. It does NOT await
// the peer's response — the caller owns the read side. It is the single
// primitive beneath every typed request opener (Publish, Subscribe, Fetch,
// TrackStatus, the namespace requests) and the non-blocking
// [Session.OpenPublish] used for relay fan-out.
func (s *Session) openAllocRequest(m message.WithRequestID) (Stream, error) {
stream, err := s.conn.OpenStream()
if err != nil {
return nil, err
}
if prepare != nil {
prepare()
}
m.SetRequestID(s.AllocRequestID())
return writeFirst(stream, m)
}

// writeFirst marshals first as the initial message of a freshly opened request
// stream. On a write failure the stream is reset and the error is returned.
func writeFirst(stream Stream, first message.Message) (Stream, error) {
if err := message.Marshal(stream, first); err != nil {
resetStream(stream)
return nil, fmt.Errorf("moqt/session: write request first message: %w", err)
}
return stream, nil
}

// openAllocRequest opens a request stream for m and assigns m a freshly
// allocated Request ID (§10.1) only after the open succeeds, then writes it as
// the stream's first message. It does NOT await the peer's response — the
// caller owns the read side. It is the single primitive beneath every typed
// request opener (Publish, Subscribe, Fetch, TrackStatus, the namespace
// requests) and the non-blocking [Session.OpenPublish] used for relay fan-out.
func (s *Session) openAllocRequest(m message.WithRequestID) (Stream, error) {
return s.openStreamWith(m, func() { m.SetRequestID(s.AllocRequestID()) })
}

// readResponse parses one message from stream, honoring ctx. message.Parse
// reads from a context-free io.Reader, so cancellation is bridged by resetting
// the stream's read side with StreamResetCancelled (§3.3.3), which unblocks the
Expand Down