Skip to content

Commit 4098580

Browse files
committed
feat(execd): extract stream buffer (sbuf) package for ring buffer queue
1 parent 97b6d8c commit 4098580

14 files changed

Lines changed: 1072 additions & 47 deletions

File tree

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// Copyright 2026 Alibaba Group Holding Ltd.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package sbuf
16+
17+
// Config controls per-stream bounds and append policy.
18+
type Config struct {
19+
// MaxEvents is the maximum number of events retained per stream. Oldest events are dropped when exceeded.
20+
// Zero defaults to DefaultMaxEvents.
21+
MaxEvents int
22+
// MaxBytes is the approximate upper bound on total payload bytes per stream (sum of len(Payload)).
23+
// Oldest events are dropped until under the limit. Zero means no byte limit.
24+
MaxBytes int64
25+
// StrictMonotonic rejects Append when eid <= last eid for that stream. Recommended for execd SSE eids.
26+
StrictMonotonic bool
27+
}
28+
29+
const DefaultMaxEvents = 1024
30+
31+
func (c *Config) normalized() Config {
32+
out := *c
33+
if out.MaxEvents <= 0 {
34+
out.MaxEvents = DefaultMaxEvents
35+
}
36+
return out
37+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
// Copyright 2026 Alibaba Group Holding Ltd.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package sbuf
16+
17+
import "errors"
18+
19+
var (
20+
// ErrOutOfOrder is returned when StrictMonotonic is enabled and eid is not greater than the last appended eid.
21+
ErrOutOfOrder = errors.New("sbuf: eid out of order for stream")
22+
ErrEmptyStreamID = errors.New("sbuf: empty stream id")
23+
)

components/execd/pkg/sbuf/event.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// Copyright 2026 Alibaba Group Holding Ltd.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package sbuf
16+
17+
// Event is one stored record (e.g. a single SSE JSON line body). Payload is owned by the buffer after Append.
18+
type Event struct {
19+
EID int64
20+
Payload []byte
21+
}

components/execd/pkg/sbuf/ring.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
// Copyright 2026 Alibaba Group Holding Ltd.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package sbuf
16+
17+
// ring is a FIFO queue with a fixed max length; push drops oldest when full.
18+
type ring struct {
19+
maxLen int
20+
slots []eventSlot
21+
head int
22+
n int
23+
bytes int64
24+
}
25+
26+
type eventSlot struct {
27+
eid int64
28+
payload []byte
29+
}
30+
31+
func newRing(maxLen int) *ring {
32+
if maxLen < 1 {
33+
maxLen = 1
34+
}
35+
return &ring{
36+
maxLen: maxLen,
37+
slots: make([]eventSlot, maxLen),
38+
}
39+
}
40+
41+
func (r *ring) push(eid int64, payload []byte, maxBytes int64) {
42+
pld := append([]byte(nil), payload...)
43+
size := int64(len(pld))
44+
45+
if r.n == r.maxLen {
46+
r.evictHead()
47+
}
48+
idx := (r.head + r.n) % r.maxLen
49+
r.slots[idx] = eventSlot{eid: eid, payload: pld}
50+
r.n++
51+
r.bytes += size
52+
53+
if maxBytes > 0 {
54+
for r.bytes > maxBytes && r.n > 0 {
55+
r.evictHead()
56+
}
57+
}
58+
}
59+
60+
func (r *ring) evictHead() {
61+
if r.n == 0 {
62+
return
63+
}
64+
old := r.slots[r.head]
65+
r.bytes -= int64(len(old.payload))
66+
r.slots[r.head] = eventSlot{}
67+
r.head = (r.head + 1) % r.maxLen
68+
r.n--
69+
}
70+
71+
func (r *ring) iterAfter(afterEid int64, fn func(eid int64, payload []byte)) {
72+
for i := range r.n {
73+
idx := (r.head + i) % r.maxLen
74+
s := r.slots[idx]
75+
if s.eid > afterEid {
76+
fn(s.eid, s.payload)
77+
}
78+
}
79+
}
80+
81+
// snapshotAfter returns a copy slice for safe iteration outside the ring lock.
82+
func (r *ring) snapshotAfter(afterEid int64) []Event {
83+
var out []Event
84+
r.iterAfter(afterEid, func(eid int64, payload []byte) {
85+
out = append(out, Event{
86+
EID: eid,
87+
Payload: append([]byte(nil), payload...),
88+
})
89+
})
90+
return out
91+
}

components/execd/pkg/sbuf/store.go

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
// Copyright 2026 Alibaba Group Holding Ltd.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
// Package sbuf provides bounded, per-stream FIFO buffers for SSE (or similar) events keyed by eid,
16+
// used to serve disconnect resume (catch-up by event id).
17+
// It is storage-only: callers assign eids and decide when to delete a stream.
18+
package sbuf
19+
20+
import (
21+
"sync"
22+
)
23+
24+
// Store holds bounded event rings keyed by caller-defined stream IDs (e.g. command execution id).
25+
type Store struct {
26+
cfg Config
27+
mu sync.Mutex
28+
streams map[string]*streamBuf
29+
}
30+
31+
type streamBuf struct {
32+
mu sync.Mutex
33+
lastEid int64
34+
ring *ring
35+
maxBytes int64
36+
}
37+
38+
// NewStore creates an empty store. cfg is copied after normalization.
39+
func NewStore(cfg Config) *Store {
40+
cfg = cfg.normalized()
41+
return &Store{
42+
cfg: cfg,
43+
streams: make(map[string]*streamBuf),
44+
}
45+
}
46+
47+
// Append adds one event to the stream's ring. Payload is copied.
48+
// With StrictMonotonic, returns ErrOutOfOrder if eid <= previous eid for this stream.
49+
func (s *Store) Append(streamID string, eid int64, payload []byte) error {
50+
if streamID == "" {
51+
return ErrEmptyStreamID
52+
}
53+
sb := s.getOrCreate(streamID)
54+
sb.mu.Lock()
55+
defer sb.mu.Unlock()
56+
57+
if s.cfg.StrictMonotonic {
58+
if eid <= sb.lastEid {
59+
return ErrOutOfOrder
60+
}
61+
}
62+
sb.lastEid = eid
63+
sb.ring.push(eid, payload, sb.maxBytes)
64+
return nil
65+
}
66+
67+
func (s *Store) getOrCreate(streamID string) *streamBuf {
68+
s.mu.Lock()
69+
defer s.mu.Unlock()
70+
if sb, ok := s.streams[streamID]; ok {
71+
return sb
72+
}
73+
sb := &streamBuf{
74+
ring: newRing(s.cfg.MaxEvents),
75+
maxBytes: s.cfg.MaxBytes,
76+
}
77+
s.streams[streamID] = sb
78+
return sb
79+
}
80+
81+
// EventsAfter returns a snapshot of events with EID > afterEid in order.
82+
// If the stream does not exist, ok is false and events is nil.
83+
func (s *Store) EventsAfter(streamID string, afterEid int64) (events []Event, ok bool) {
84+
s.mu.Lock()
85+
sb, found := s.streams[streamID]
86+
s.mu.Unlock()
87+
if !found {
88+
return nil, false
89+
}
90+
sb.mu.Lock()
91+
defer sb.mu.Unlock()
92+
return sb.ring.snapshotAfter(afterEid), true
93+
}
94+
95+
// Delete removes a stream buffer. No-op if missing.
96+
func (s *Store) Delete(streamID string) {
97+
s.mu.Lock()
98+
defer s.mu.Unlock()
99+
delete(s.streams, streamID)
100+
}
101+
102+
// Has reports whether a stream currently exists.
103+
func (s *Store) Has(streamID string) bool {
104+
s.mu.Lock()
105+
defer s.mu.Unlock()
106+
_, ok := s.streams[streamID]
107+
return ok
108+
}

0 commit comments

Comments
 (0)