-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstream.go
More file actions
135 lines (116 loc) · 3.61 KB
/
stream.go
File metadata and controls
135 lines (116 loc) · 3.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package tok
import (
"strings"
"sync"
"github.com/GrayCodeAI/tok/internal/core"
)
// StreamCompressor maintains a background-compressed version of accumulating content.
// As new content is appended, it re-compresses in the background so a compressed
// snapshot is always available without blocking.
type StreamCompressor struct {
mu sync.RWMutex
raw []string // accumulated raw segments
compressed string // latest compressed output (ready to read)
stats Stats // stats from last compression
opts []Option // compression options
threshold int // re-compress when raw tokens exceed this
dirty bool // new content since last compression
compressing bool // background compression in progress
done chan struct{}
wg sync.WaitGroup // tracks in-progress compression goroutines
}
// NewStreamCompressor creates a background compressor that keeps compressed
// output ready at all times. Threshold is the token count that triggers
// background re-compression. If threshold <= 0, it defaults to 2000 tokens.
func NewStreamCompressor(threshold int, opts ...Option) *StreamCompressor {
if threshold <= 0 {
threshold = 2000
}
return &StreamCompressor{
threshold: threshold,
opts: opts,
done: make(chan struct{}),
}
}
// Append adds new content. If accumulated tokens exceed the threshold,
// triggers background re-compression.
func (sc *StreamCompressor) Append(content string) {
if content == "" {
return
}
sc.mu.Lock()
sc.raw = append(sc.raw, content)
sc.dirty = true
shouldCompress := !sc.compressing && sc.tokenCountLocked() >= sc.threshold
if shouldCompress {
sc.compressing = true
}
// Take a snapshot of the raw content under lock for the goroutine.
var rawCopy string
if shouldCompress {
rawCopy = sc.joinRawLocked()
}
sc.mu.Unlock()
if shouldCompress {
sc.wg.Add(1)
go sc.backgroundCompress(rawCopy)
}
}
// backgroundCompress runs compression on the given raw content and updates
// the compressed snapshot when done.
func (sc *StreamCompressor) backgroundCompress(raw string) {
defer sc.wg.Done()
// Check if we've been shut down before starting.
select {
case <-sc.done:
sc.mu.Lock()
sc.compressing = false
sc.mu.Unlock()
return
default:
}
compressed, stats := Compress(raw, sc.opts...)
sc.mu.Lock()
defer sc.mu.Unlock()
sc.compressed = compressed
sc.stats = stats
sc.dirty = false
sc.compressing = false
}
// Snapshot returns the current compressed output without blocking.
// If compression hasn't run yet, returns the raw content joined.
func (sc *StreamCompressor) Snapshot() (string, Stats) {
sc.mu.RLock()
defer sc.mu.RUnlock()
if sc.compressed == "" {
return sc.joinRawLocked(), Stats{}
}
return sc.compressed, sc.stats
}
// Raw returns all accumulated raw content.
func (sc *StreamCompressor) Raw() string {
sc.mu.RLock()
defer sc.mu.RUnlock()
return sc.joinRawLocked()
}
// TokenCount returns estimated token count of raw content.
func (sc *StreamCompressor) TokenCount() int {
sc.mu.RLock()
defer sc.mu.RUnlock()
return sc.tokenCountLocked()
}
// Close shuts down the background compressor and waits for any in-progress
// compression to finish.
func (sc *StreamCompressor) Close() {
close(sc.done)
sc.wg.Wait()
}
// joinRawLocked joins raw segments. Caller must hold at least a read lock.
func (sc *StreamCompressor) joinRawLocked() string {
return strings.Join(sc.raw, "\n")
}
// tokenCountLocked estimates the token count of raw content.
// Caller must hold at least a read lock.
func (sc *StreamCompressor) tokenCountLocked() int {
return core.EstimateTokens(sc.joinRawLocked())
}