Skip to content

Commit de47e64

Browse files
authored
Merge pull request #30 from STRRL/tape-opus-4.6
feat: add tape recording system for workspace analyze sessions
2 parents 8190080 + 49ca47c commit de47e64

7 files changed

Lines changed: 596 additions & 1 deletion

File tree

cmd/lapp/workspace.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,9 +392,11 @@ func runWorkspaceAnalyze(cmd *cobra.Command, args []string) error {
392392
return errors.Errorf("resolve workspace dir: %w", err)
393393
}
394394

395+
tapePath := filepath.Join(absDir, ".tape.jsonl")
395396
config := analyzer.Config{
396397
Provider: analyzeWsACP,
397398
Model: analyzeWsModel,
399+
TapePath: tapePath,
398400
}
399401

400402
prompt := analyzer.BuildWorkspaceSystemPrompt(absDir)

pkg/analyzer/analyzer.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
fsmw "github.com/cloudwego/eino/adk/middlewares/filesystem"
1313
"github.com/go-errors/errors"
1414
einoacp "github.com/strrl/eino-acp"
15+
"github.com/strrl/lapp/pkg/tape"
1516
"go.opentelemetry.io/otel"
1617
"go.opentelemetry.io/otel/attribute"
1718
)
@@ -45,6 +46,8 @@ Be concise and actionable. Focus on what matters.`,
4546
type Config struct {
4647
Provider string
4748
Model string
49+
// TapePath, when set, enables tape recording to this JSONL file.
50+
TapePath string
4851
}
4952

5053
// BuildWorkspaceSystemPrompt builds a system prompt for the structured workspace layout.
@@ -150,7 +153,13 @@ func RunAgentWithPrompt(ctx context.Context, config Config, workDir, question, s
150153
}
151154

152155
runner := adk.NewRunner(ctx, adk.RunnerConfig{Agent: agent})
153-
iter := runner.Query(ctx, userMessage)
156+
var runOptions []adk.AgentRunOption
157+
if config.TapePath != "" {
158+
jsonlStore := tape.NewJSONLStore(config.TapePath)
159+
runOptions = append(runOptions, adk.WithCallbacks(tape.NewHandler(jsonlStore)))
160+
slog.Info("Tape recording enabled", "path", config.TapePath)
161+
}
162+
iter := runner.Query(ctx, userMessage, runOptions...)
154163

155164
var result strings.Builder
156165
for {

pkg/tape/entry.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package tape
2+
3+
import "time"
4+
5+
// Entry is a single append-only entry in a tape, modeled after republic's TapeEntry.
6+
type Entry struct {
7+
ID int `json:"id"`
8+
Kind string `json:"kind"`
9+
Payload map[string]any `json:"payload"`
10+
Meta map[string]any `json:"meta,omitempty"`
11+
Date string `json:"date"`
12+
}
13+
14+
// Entry kinds.
15+
const (
16+
KindMessage = "message"
17+
KindSystem = "system"
18+
KindToolCall = "tool_call"
19+
KindToolResult = "tool_result"
20+
KindError = "error"
21+
KindEvent = "event"
22+
)
23+
24+
func newEntry(kind string, payload, meta map[string]any) Entry {
25+
return Entry{
26+
Kind: kind,
27+
Payload: payload,
28+
Meta: meta,
29+
Date: time.Now().UTC().Format(time.RFC3339Nano),
30+
}
31+
}
32+
33+
// MessageEntry creates a message entry.
34+
func MessageEntry(role, content string, meta map[string]any) Entry {
35+
return newEntry(KindMessage, map[string]any{
36+
"role": role,
37+
"content": content,
38+
}, meta)
39+
}
40+
41+
// SystemEntry creates a system prompt entry.
42+
func SystemEntry(content string, meta map[string]any) Entry {
43+
return newEntry(KindSystem, map[string]any{
44+
"content": content,
45+
}, meta)
46+
}
47+
48+
// ToolCallEntry creates a tool call entry.
49+
func ToolCallEntry(calls []map[string]any, meta map[string]any) Entry {
50+
return newEntry(KindToolCall, map[string]any{
51+
"calls": calls,
52+
}, meta)
53+
}
54+
55+
// ToolResultEntry creates a tool result entry.
56+
func ToolResultEntry(results []any, meta map[string]any) Entry {
57+
return newEntry(KindToolResult, map[string]any{
58+
"results": results,
59+
}, meta)
60+
}
61+
62+
// ErrorEntry creates an error entry.
63+
func ErrorEntry(kind, message string, meta map[string]any) Entry {
64+
return newEntry(KindError, map[string]any{
65+
"kind": kind,
66+
"message": message,
67+
}, meta)
68+
}
69+
70+
// EventEntry creates a generic event entry.
71+
func EventEntry(name string, data, meta map[string]any) Entry {
72+
return newEntry(KindEvent, map[string]any{
73+
"name": name,
74+
"data": data,
75+
}, meta)
76+
}

pkg/tape/handler.go

Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
package tape
2+
3+
import (
4+
"context"
5+
"log/slog"
6+
7+
"github.com/cloudwego/eino/callbacks"
8+
"github.com/cloudwego/eino/components"
9+
"github.com/cloudwego/eino/components/model"
10+
"github.com/cloudwego/eino/schema"
11+
)
12+
13+
// NewHandler creates an eino callbacks.Handler that records tape entries to the given store.
14+
func NewHandler(store Recorder) callbacks.Handler {
15+
return callbacks.NewHandlerBuilder().
16+
OnStartFn(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
17+
onStart(store, info, input)
18+
return ctx
19+
}).
20+
OnEndFn(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
21+
onEnd(store, info, output)
22+
return ctx
23+
}).
24+
OnErrorFn(func(ctx context.Context, info *callbacks.RunInfo, err error) context.Context {
25+
onError(store, info, err)
26+
return ctx
27+
}).
28+
OnEndWithStreamOutputFn(func(ctx context.Context, info *callbacks.RunInfo, output *schema.StreamReader[callbacks.CallbackOutput]) context.Context {
29+
onEndWithStream(store, info, output)
30+
return ctx
31+
}).
32+
Build()
33+
}
34+
35+
func componentMeta(info *callbacks.RunInfo) map[string]any {
36+
return map[string]any{
37+
"component": string(info.Component),
38+
"name": info.Name,
39+
"type": info.Type,
40+
}
41+
}
42+
43+
func onStart(store Recorder, info *callbacks.RunInfo, input callbacks.CallbackInput) {
44+
if info == nil {
45+
return
46+
}
47+
meta := componentMeta(info)
48+
49+
if info.Component == components.ComponentOfChatModel {
50+
modelInput := model.ConvCallbackInput(input)
51+
if modelInput != nil {
52+
recordModelInput(store, modelInput, meta)
53+
return
54+
}
55+
}
56+
57+
if info.Component == components.ComponentOfTool {
58+
entry := EventEntry("tool_start", map[string]any{
59+
"component": string(info.Component),
60+
"name": info.Name,
61+
}, meta)
62+
appendSafe(store, entry)
63+
return
64+
}
65+
66+
entry := EventEntry("start", map[string]any{
67+
"component": string(info.Component),
68+
"name": info.Name,
69+
}, meta)
70+
appendSafe(store, entry)
71+
}
72+
73+
func onEnd(store Recorder, info *callbacks.RunInfo, output callbacks.CallbackOutput) {
74+
if info == nil {
75+
return
76+
}
77+
meta := componentMeta(info)
78+
79+
if info.Component == components.ComponentOfChatModel {
80+
modelOutput := model.ConvCallbackOutput(output)
81+
if modelOutput != nil {
82+
recordModelOutput(store, modelOutput, meta)
83+
return
84+
}
85+
}
86+
87+
if info.Component == components.ComponentOfTool {
88+
recordToolResult(store, output, meta)
89+
return
90+
}
91+
92+
entry := EventEntry("end", map[string]any{
93+
"component": string(info.Component),
94+
"name": info.Name,
95+
}, meta)
96+
appendSafe(store, entry)
97+
}
98+
99+
func onError(store Recorder, info *callbacks.RunInfo, err error) {
100+
if info == nil {
101+
return
102+
}
103+
meta := componentMeta(info)
104+
entry := ErrorEntry(string(info.Component), err.Error(), meta)
105+
appendSafe(store, entry)
106+
}
107+
108+
func onEndWithStream(store Recorder, info *callbacks.RunInfo, output *schema.StreamReader[callbacks.CallbackOutput]) {
109+
if info == nil {
110+
return
111+
}
112+
meta := componentMeta(info)
113+
114+
// Consume the stream in a goroutine to avoid blocking
115+
go func() {
116+
defer func() {
117+
if r := recover(); r != nil {
118+
slog.Warn("tape: panic consuming stream", "recover", r)
119+
}
120+
}()
121+
122+
var lastOutput callbacks.CallbackOutput
123+
for {
124+
chunk, err := output.Recv()
125+
if err != nil {
126+
break
127+
}
128+
lastOutput = chunk
129+
}
130+
output.Close()
131+
132+
if lastOutput == nil {
133+
return
134+
}
135+
136+
if info.Component == components.ComponentOfChatModel {
137+
modelOutput := model.ConvCallbackOutput(lastOutput)
138+
if modelOutput != nil {
139+
recordModelOutput(store, modelOutput, meta)
140+
return
141+
}
142+
}
143+
144+
entry := EventEntry("end", map[string]any{
145+
"component": string(info.Component),
146+
"name": info.Name,
147+
}, meta)
148+
appendSafe(store, entry)
149+
}()
150+
}
151+
152+
func recordModelInput(store Recorder, input *model.CallbackInput, meta map[string]any) {
153+
// Record system message separately if present
154+
for _, msg := range input.Messages {
155+
if msg.Role == schema.System {
156+
appendSafe(store, SystemEntry(msg.Content, meta))
157+
continue
158+
}
159+
appendSafe(store, MessageEntry(string(msg.Role), msg.Content, meta))
160+
}
161+
162+
// Record tool calls from messages
163+
if len(input.Tools) > 0 {
164+
var tools []map[string]any
165+
for _, t := range input.Tools {
166+
tools = append(tools, map[string]any{
167+
"name": t.Name,
168+
"desc": t.Desc,
169+
})
170+
}
171+
appendSafe(store, EventEntry("tools_available", map[string]any{
172+
"tools": tools,
173+
}, meta))
174+
}
175+
}
176+
177+
func recordModelOutput(store Recorder, output *model.CallbackOutput, meta map[string]any) {
178+
if output.TokenUsage != nil {
179+
meta["token_usage"] = map[string]any{
180+
"prompt_tokens": output.TokenUsage.PromptTokens,
181+
"completion_tokens": output.TokenUsage.CompletionTokens,
182+
"total_tokens": output.TokenUsage.TotalTokens,
183+
}
184+
}
185+
186+
if output.Message != nil {
187+
// Record tool calls if present
188+
if len(output.Message.ToolCalls) > 0 {
189+
var calls []map[string]any
190+
for _, tc := range output.Message.ToolCalls {
191+
calls = append(calls, map[string]any{
192+
"id": tc.ID,
193+
"function": tc.Function.Name,
194+
"args": tc.Function.Arguments,
195+
})
196+
}
197+
appendSafe(store, ToolCallEntry(calls, meta))
198+
}
199+
200+
// Record assistant message
201+
if output.Message.Content != "" {
202+
appendSafe(store, MessageEntry(string(output.Message.Role), output.Message.Content, meta))
203+
}
204+
}
205+
}
206+
207+
func recordToolResult(store Recorder, output callbacks.CallbackOutput, meta map[string]any) {
208+
// Tool output is typically a string
209+
if s, ok := output.(string); ok {
210+
appendSafe(store, ToolResultEntry([]any{s}, meta))
211+
return
212+
}
213+
appendSafe(store, ToolResultEntry([]any{output}, meta))
214+
}
215+
216+
func appendSafe(store Recorder, entry Entry) {
217+
if err := store.Append(entry); err != nil {
218+
slog.Warn("tape: failed to append entry", "err", err, "kind", entry.Kind)
219+
}
220+
}

0 commit comments

Comments
 (0)