-
Notifications
You must be signed in to change notification settings - Fork 14
Expand file tree
/
Copy pathstream.go
More file actions
90 lines (80 loc) · 2.09 KB
/
stream.go
File metadata and controls
90 lines (80 loc) · 2.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package agentcore
import "sync"
// Collect consumes all events from the channel and returns the final messages.
// Blocks until the channel is closed. Returns any error from EventError events.
func Collect(events <-chan Event) ([]AgentMessage, error) {
var (
result []AgentMessage
err error
)
for ev := range events {
if ev.Type == EventAgentEnd {
result = ev.NewMessages
}
if ev.Type == EventError && ev.Err != nil {
err = ev.Err
}
}
return result, err
}
// EventStream wraps an event channel to provide both real-time iteration
// and deferred result collection.
//
// Usage:
//
// stream := agentcore.NewEventStream(AgentLoop(...))
// for ev := range stream.Events() {
// // handle real-time events
// }
// msgs, err := stream.Result()
type EventStream struct {
events chan Event
done chan struct{}
mu sync.Mutex
result []AgentMessage
err error
}
// NewEventStream creates an EventStream that reads from the source channel.
// Events are forwarded to an internal channel for iteration.
// The final result is captured from EventAgentEnd.
func NewEventStream(source <-chan Event) *EventStream {
s := &EventStream{
events: make(chan Event, 128),
done: make(chan struct{}),
}
go func() {
defer close(s.events)
defer close(s.done)
for ev := range source {
if ev.Type == EventAgentEnd {
s.mu.Lock()
s.result = ev.NewMessages
s.mu.Unlock()
}
if ev.Type == EventError && ev.Err != nil {
s.mu.Lock()
s.err = ev.Err
s.mu.Unlock()
}
s.events <- ev
}
}()
return s
}
// Events returns the event channel for real-time iteration.
// The channel is closed when the source is exhausted.
func (s *EventStream) Events() <-chan Event {
return s.events
}
// Result blocks until the stream is done and returns the final messages.
// Returns the error from the last EventError, if any.
func (s *EventStream) Result() ([]AgentMessage, error) {
<-s.done
s.mu.Lock()
defer s.mu.Unlock()
return s.result, s.err
}
// Done returns a channel that is closed when the stream finishes.
func (s *EventStream) Done() <-chan struct{} {
return s.done
}