diff --git a/go.mod b/go.mod index 47f9da9..be8aad9 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,8 @@ module github.com/strrl/lapp go 1.25.7 require ( + github.com/bytedance/sonic v1.15.0 github.com/cloudwego/eino v0.8.0 - github.com/cloudwego/eino-ext/adk/backend/local v0.1.2-0.20260306073537-008f82264d85 github.com/cloudwego/eino-ext/callbacks/langfuse v0.0.0-20260227151421-e109b4ff9563 github.com/cloudwego/eino-ext/components/model/openrouter v0.1.2 github.com/duckdb/duckdb-go/v2 v2.5.5 @@ -24,10 +24,8 @@ require ( require ( github.com/apache/arrow-go/v18 v18.5.1 // indirect github.com/bahlo/generic-list-go v0.2.0 // indirect - github.com/bmatcuk/doublestar/v4 v4.10.0 // indirect github.com/buger/jsonparser v1.1.1 // indirect github.com/bytedance/gopkg v0.1.3 // indirect - github.com/bytedance/sonic v1.15.0 // indirect github.com/bytedance/sonic/loader v0.5.0 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cenkalti/backoff/v5 v5.0.3 // indirect diff --git a/go.sum b/go.sum index 4180aca..307965b 100644 --- a/go.sum +++ b/go.sum @@ -8,8 +8,6 @@ github.com/apache/thrift v0.22.0/go.mod h1:1e7J/O1Ae6ZQMTYdy9xa3w9k+XHWPfRvdPyJe github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk= github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg= github.com/bitly/go-simplejson v0.5.0/go.mod h1:cXHtHw4XUPsvGaxgjIAn8PhEWG9NfngEKAMDJEczWVA= -github.com/bmatcuk/doublestar/v4 v4.10.0 h1:zU9WiOla1YA122oLM6i4EXvGW62DvKZVxIe6TYWexEs= -github.com/bmatcuk/doublestar/v4 v4.10.0/go.mod h1:xBQ8jztBU6kakFMg+8WGxn0c6z1fTSPVIjEY1Wr7jzc= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs= github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= @@ -34,8 +32,6 @@ github.com/cloudwego/base64x v0.1.6 h1:t11wG9AECkCDk5fMSoxmufanudBtJ+/HemLstXDLI github.com/cloudwego/base64x v0.1.6/go.mod h1:OFcloc187FXDaYHvrNIjxSe8ncn0OOM8gEHfghB2IPU= github.com/cloudwego/eino v0.8.0 h1:DLbrgEAloA+l7aR2qim7qQocQB48DjPrb8LzG3PYMHY= github.com/cloudwego/eino v0.8.0/go.mod h1:+2N4nsMPxA6kGBHpH+75JuTfEcGprAMTdsZESrShKpU= -github.com/cloudwego/eino-ext/adk/backend/local v0.1.2-0.20260306073537-008f82264d85 h1:mD47o0GKdeqMdGI5xEqnlO8ZtArvhalIorRtrCmLRkA= -github.com/cloudwego/eino-ext/adk/backend/local v0.1.2-0.20260306073537-008f82264d85/go.mod h1:LfFk+VqZk0JOxIyl5RaerYqlFVLyXOCoSaqqak8hNls= github.com/cloudwego/eino-ext/callbacks/langfuse v0.0.0-20260227151421-e109b4ff9563 h1:DKTXDDw8ErC4RorZLfB2ZdHChjDKWIqOEO7VRSjjfbg= github.com/cloudwego/eino-ext/callbacks/langfuse v0.0.0-20260227151421-e109b4ff9563/go.mod h1:lrNKITZR4QUaYl9Rdz9W6qGOolHRy6mPamEZYA8uz7s= github.com/cloudwego/eino-ext/components/model/openrouter v0.1.2 h1:zDFteouktUsGk4I/7m1b7yT4e9qawy45gWtLoyeHwxI= diff --git a/pkg/analyzer/acp_tool_model.go b/pkg/analyzer/acp_tool_model.go index fdd6033..52b725a 100644 --- a/pkg/analyzer/acp_tool_model.go +++ b/pkg/analyzer/acp_tool_model.go @@ -3,12 +3,16 @@ package analyzer import ( "context" + "github.com/cloudwego/eino/components" "github.com/cloudwego/eino/components/model" "github.com/cloudwego/eino/schema" einoacp "github.com/strrl/eino-acp" ) -var _ model.ToolCallingChatModel = (*acpToolCallingModel)(nil) +var ( + _ model.ToolCallingChatModel = (*acpToolCallingModel)(nil) + _ components.Checker = (*acpToolCallingModel)(nil) +) // acpToolCallingModel adapts eino-acp ChatModel to ToolCallingChatModel. // ACP agents manage tools in their own runtime, so WithTools is a no-op. @@ -20,6 +24,10 @@ func newACPToolCallingModel(base *einoacp.ChatModel) model.ToolCallingChatModel return &acpToolCallingModel{base: base} } +func (m *acpToolCallingModel) IsCallbacksEnabled() bool { + return true +} + func (m *acpToolCallingModel) Generate(ctx context.Context, input []*schema.Message, opts ...model.Option) (*schema.Message, error) { return m.base.Generate(ctx, input, opts...) } diff --git a/pkg/analyzer/analyzer.go b/pkg/analyzer/analyzer.go index f974870..6775e6f 100644 --- a/pkg/analyzer/analyzer.go +++ b/pkg/analyzer/analyzer.go @@ -7,21 +7,21 @@ import ( "path/filepath" "strings" - "github.com/cloudwego/eino-ext/adk/backend/local" "github.com/cloudwego/eino/adk" - fsmw "github.com/cloudwego/eino/adk/middlewares/filesystem" "github.com/go-errors/errors" + "github.com/google/uuid" einoacp "github.com/strrl/eino-acp" - "github.com/strrl/lapp/pkg/tape" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + + "github.com/strrl/lapp/pkg/tape" + "github.com/strrl/lapp/pkg/tracing" ) func buildSystemPrompt(workDir string) string { return fmt.Sprintf(`You are a log analysis expert helping developers troubleshoot issues. -IMPORTANT: All file operations (read_file, grep, ls, glob, execute) MUST use paths under %s. -Do NOT access files outside this workspace directory. +IMPORTANT: Stay within the workspace directory %s for any file or shell work (your runtime provides the tools). Your workspace contains pre-processed log data at %s: - %s/raw.log — the original log file @@ -29,8 +29,8 @@ Your workspace contains pre-processed log data at %s: - %s/errors.txt — error and warning patterns extracted from logs Start by reading %s/summary.txt and %s/errors.txt to understand the log patterns. -Then use grep and read_file on %s/raw.log to investigate specific patterns in detail. -You can also use the execute tool to run shell commands (e.g., awk, sort, wc) for deeper analysis. +Then search and read %s/raw.log for specifics (grep, read, or equivalents your environment exposes). +Use shell only when it helps (e.g. awk, sort, wc). Provide: 1. Key findings from the logs @@ -46,7 +46,7 @@ Be concise and actionable. Focus on what matters.`, type Config struct { Provider string Model string - // TapePath, when set, enables tape recording to this JSONL file. + // TapePath overrides the default workspace tape file. TapePath string } @@ -54,8 +54,7 @@ type Config struct { func BuildWorkspaceSystemPrompt(workDir string) string { return fmt.Sprintf(`You are a log analysis expert helping developers troubleshoot issues. -IMPORTANT: All file operations (read_file, grep, ls, glob, execute) MUST use paths under %s. -Do NOT access files outside this workspace directory. +IMPORTANT: Stay within the workspace directory %s for any file or shell work (your runtime provides the tools). Your workspace at %s contains structured log data: - %s/logs/ — original log files @@ -67,8 +66,8 @@ Your workspace at %s contains structured log data: Start by reading %s/notes/summary.md and %s/notes/errors.md to understand the log patterns. Then drill into specific patterns under %s/patterns/ for details. -Use grep on %s/logs/ to search for specific terms across all log files. -You can also use the execute tool to run shell commands (e.g., awk, sort, wc) for deeper analysis. +Search %s/logs/ for specific terms across log files. +Use shell only when it helps (e.g., awk, sort, wc). Provide: 1. Key findings from the logs @@ -117,30 +116,31 @@ func RunAgentWithPrompt(ctx context.Context, config Config, workDir, question, s return "", errors.Errorf("create chat model: %w", err) } - backend, err := local.NewBackend(ctx, &local.Config{}) - if err != nil { - return "", errors.Errorf("create local backend: %w", err) + if systemPrompt == "" { + systemPrompt = buildSystemPrompt(absDir) } - backendAdapter := newLocalBackendAdapter(backend) - fsHandler, err := fsmw.New(ctx, &fsmw.MiddlewareConfig{ - Backend: backendAdapter, - StreamingShell: backendAdapter, - }) + tapePath := config.TapePath + if tapePath == "" { + tapePath = filepath.Join(absDir, tape.FileName) + } + tapeStore, err := tape.OpenJSONL(tapePath) if err != nil { - return "", errors.Errorf("create filesystem middleware: %w", err) + return "", errors.Errorf("open tape store: %w", err) } + defer func() { _ = tapeStore.Close() }() - if systemPrompt == "" { - systemPrompt = buildSystemPrompt(absDir) - } + tapeHandler := tape.NewEinoHandler(tapeStore, tape.RunMeta{ + RunID: uuid.NewString(), + Provider: provider, + Model: config.Model, + }) agent, err := adk.NewChatModelAgent(ctx, &adk.ChatModelAgentConfig{ Name: "log-analyzer", Description: "Analyzes log files to find root causes", Instruction: systemPrompt, Model: newACPToolCallingModel(chatModel), - Handlers: []adk.ChatModelAgentMiddleware{fsHandler}, MaxIterations: 15, }) if err != nil { @@ -153,13 +153,7 @@ func RunAgentWithPrompt(ctx context.Context, config Config, workDir, question, s } runner := adk.NewRunner(ctx, adk.RunnerConfig{Agent: agent}) - var runOptions []adk.AgentRunOption - if config.TapePath != "" { - jsonlStore := tape.NewJSONLStore(config.TapePath) - runOptions = append(runOptions, adk.WithCallbacks(tape.NewHandler(jsonlStore))) - slog.Info("Tape recording enabled", "path", config.TapePath) - } - iter := runner.Query(ctx, userMessage, runOptions...) + iter := runner.Query(ctx, userMessage, adk.WithCallbacks(tracing.NewSlogEinoHandler(nil), tapeHandler)) var result strings.Builder for { diff --git a/pkg/analyzer/local_backend_adapter.go b/pkg/analyzer/local_backend_adapter.go deleted file mode 100644 index 22247fe..0000000 --- a/pkg/analyzer/local_backend_adapter.go +++ /dev/null @@ -1,52 +0,0 @@ -package analyzer - -import ( - "context" - - "github.com/cloudwego/eino-ext/adk/backend/local" - "github.com/cloudwego/eino/adk/filesystem" - "github.com/cloudwego/eino/schema" -) - -var _ filesystem.Backend = (*localBackendAdapter)(nil) -var _ filesystem.StreamingShell = (*localBackendAdapter)(nil) - -type localBackendAdapter struct { - base *local.Local -} - -func newLocalBackendAdapter(base *local.Local) *localBackendAdapter { - return &localBackendAdapter{base: base} -} - -func (a *localBackendAdapter) LsInfo(ctx context.Context, req *filesystem.LsInfoRequest) ([]filesystem.FileInfo, error) { - return a.base.LsInfo(ctx, req) -} - -func (a *localBackendAdapter) Read(ctx context.Context, req *filesystem.ReadRequest) (*filesystem.FileContent, error) { - content, err := a.base.Read(ctx, req) - if err != nil { - return nil, err - } - return &filesystem.FileContent{Content: content}, nil -} - -func (a *localBackendAdapter) GrepRaw(ctx context.Context, req *filesystem.GrepRequest) ([]filesystem.GrepMatch, error) { - return a.base.GrepRaw(ctx, req) -} - -func (a *localBackendAdapter) GlobInfo(ctx context.Context, req *filesystem.GlobInfoRequest) ([]filesystem.FileInfo, error) { - return a.base.GlobInfo(ctx, req) -} - -func (a *localBackendAdapter) Write(ctx context.Context, req *filesystem.WriteRequest) error { - return a.base.Write(ctx, req) -} - -func (a *localBackendAdapter) Edit(ctx context.Context, req *filesystem.EditRequest) error { - return a.base.Edit(ctx, req) -} - -func (a *localBackendAdapter) ExecuteStreaming(ctx context.Context, input *filesystem.ExecuteRequest) (*schema.StreamReader[*filesystem.ExecuteResponse], error) { - return a.base.ExecuteStreaming(ctx, input) -} diff --git a/pkg/tape/eino_handler.go b/pkg/tape/eino_handler.go new file mode 100644 index 0000000..b2e408a --- /dev/null +++ b/pkg/tape/eino_handler.go @@ -0,0 +1,393 @@ +package tape + +import ( + "context" + "errors" + "io" + "log" + "runtime/debug" + + "github.com/bytedance/sonic" + "github.com/cloudwego/eino/callbacks" + "github.com/cloudwego/eino/components" + "github.com/cloudwego/eino/components/model" + "github.com/cloudwego/eino/components/tool" + "github.com/cloudwego/eino/schema" + goerrors "github.com/go-errors/errors" +) + +type RunMeta struct { + RunID string + Provider string + Model string +} + +type toolCtxKey struct{} + +type pendingTool struct { + Name string + Args string +} + +type EinoHandler struct { + store *JSONLStore + meta RunMeta + recordedMsg int +} + +func NewEinoHandler(store *JSONLStore, meta RunMeta) *EinoHandler { + return &EinoHandler{store: store, meta: meta} +} + +func (h *EinoHandler) baseMeta() map[string]any { + m := map[string]any{"run_id": h.meta.RunID} + if h.meta.Provider != "" { + m["provider"] = h.meta.Provider + } + if h.meta.Model != "" { + m["model"] = h.meta.Model + } + return m +} + +func (h *EinoHandler) write(e Entry) { + if err := h.store.Append(e); err != nil { + log.Printf("tape append: %v", err) + } +} + +func (h *EinoHandler) recordMessagesDelta(msgs []*schema.Message) { + if len(msgs) <= h.recordedMsg { + return + } + for i := h.recordedMsg; i < len(msgs); i++ { + m := msgs[i] + if m == nil { + continue + } + if m.Role == schema.Assistant { + continue + } + h.write(Message(messageToMap(m), h.baseMeta())) + } + h.recordedMsg = len(msgs) +} + +func (h *EinoHandler) Needed(ctx context.Context, info *callbacks.RunInfo, timing callbacks.CallbackTiming) bool { + _ = ctx + _ = info + switch timing { + case callbacks.TimingOnStart, callbacks.TimingOnEnd, callbacks.TimingOnError, + callbacks.TimingOnStartWithStreamInput, callbacks.TimingOnEndWithStreamOutput: + return true + default: + return false + } +} + +func (h *EinoHandler) OnStart(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context { + if info == nil { + return ctx + } + switch info.Component { + case components.ComponentOfChatModel: + mIn := model.ConvCallbackInput(input) + if mIn == nil { + return ctx + } + h.recordMessagesDelta(mIn.Messages) + case components.ComponentOfTool: + tIn := tool.ConvCallbackInput(input) + if tIn == nil { + return ctx + } + return context.WithValue(ctx, toolCtxKey{}, &pendingTool{Name: info.Name, Args: tIn.ArgumentsInJSON}) + default: + } + return ctx +} + +func (h *EinoHandler) OnEnd(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context { + if info == nil { + return ctx + } + switch info.Component { + case components.ComponentOfChatModel: + mOut := model.ConvCallbackOutput(output) + if mOut == nil || mOut.Message == nil { + return ctx + } + h.write(Message(messageToMap(mOut.Message), h.baseMeta())) + runData := map[string]any{"status": "ok"} + if mOut.TokenUsage != nil { + runData["usage"] = map[string]any{ + "prompt_tokens": mOut.TokenUsage.PromptTokens, + "completion_tokens": mOut.TokenUsage.CompletionTokens, + "total_tokens": mOut.TokenUsage.TotalTokens, + } + } + if h.meta.Provider != "" { + runData["provider"] = h.meta.Provider + } + if h.meta.Model != "" { + runData["model"] = h.meta.Model + } + h.write(Event("run", runData, h.baseMeta())) + case components.ComponentOfTool: + pend, _ := ctx.Value(toolCtxKey{}).(*pendingTool) + name := info.Name + args := "" + if pend != nil { + name = pend.Name + args = pend.Args + } + tOut := tool.ConvCallbackOutput(output) + res := "" + if tOut != nil { + if tOut.Response != "" { + res = tOut.Response + } else if tOut.ToolOutput != nil { + if b, err := sonic.MarshalString(tOut.ToolOutput); err == nil { + res = b + } + } + } + h.write(ToolCall([]map[string]any{{ + "name": name, + "arguments": args, + }}, h.baseMeta())) + h.write(ToolResult([]any{res}, h.baseMeta())) + default: + } + return ctx +} + +func (h *EinoHandler) OnError(ctx context.Context, info *callbacks.RunInfo, err error) context.Context { + if info == nil || err == nil { + return ctx + } + ex := map[string]any{"component": string(info.Component), "node": info.Name} + h.write(ErrorPayload(err.Error(), ex, h.baseMeta())) + if info.Component == components.ComponentOfChatModel { + h.write(Event("run", map[string]any{ + "status": "error", + "provider": h.meta.Provider, + "model": h.meta.Model, + }, h.baseMeta())) + } + return ctx +} + +func (h *EinoHandler) OnStartWithStreamInput(ctx context.Context, info *callbacks.RunInfo, input *schema.StreamReader[callbacks.CallbackInput]) context.Context { + if info == nil || input == nil { + return ctx + } + if info.Component != components.ComponentOfChatModel { + return ctx + } + go func() { + defer func() { + if e := recover(); e != nil { + log.Printf("tape stream input panic: %v\n%s", e, string(debug.Stack())) + } + input.Close() + }() + var chunks []callbacks.CallbackInput + for { + ch, err := input.Recv() + if errors.Is(err, io.EOF) { + break + } + if err != nil { + log.Printf("tape stream input recv: %v", err) + return + } + chunks = append(chunks, ch) + } + ins := convModelCallbackInputs(chunks) + _, msgs, _, err := extractModelInput(ins) + if err != nil { + log.Printf("tape extract model input: %v", err) + return + } + h.recordMessagesDelta(msgs) + }() + return ctx +} + +func (h *EinoHandler) OnEndWithStreamOutput(ctx context.Context, info *callbacks.RunInfo, output *schema.StreamReader[callbacks.CallbackOutput]) context.Context { + if info == nil || output == nil { + return ctx + } + if info.Component != components.ComponentOfChatModel { + return ctx + } + go func() { + defer func() { + if e := recover(); e != nil { + log.Printf("tape stream output panic: %v\n%s", e, string(debug.Stack())) + } + output.Close() + }() + var chunks []callbacks.CallbackOutput + var streamErr error + for { + ch, err := output.Recv() + if errors.Is(err, io.EOF) { + break + } + if err != nil { + log.Printf("tape stream output recv: %v", err) + streamErr = err + break + } + chunks = append(chunks, ch) + } + outs := convModelCallbackOutputs(chunks) + usage, msg, _, err := extractModelOutput(outs) + if err != nil { + log.Printf("tape extract model output: %v", err) + return + } + if msg != nil { + h.write(Message(messageToMap(msg), h.baseMeta())) + } + status := "ok" + if streamErr != nil { + status = "error" + } + runData := map[string]any{"status": status} + if usage != nil { + runData["usage"] = map[string]any{ + "prompt_tokens": usage.PromptTokens, + "completion_tokens": usage.CompletionTokens, + "total_tokens": usage.TotalTokens, + } + } + if h.meta.Provider != "" { + runData["provider"] = h.meta.Provider + } + if h.meta.Model != "" { + runData["model"] = h.meta.Model + } + h.write(Event("run", runData, h.baseMeta())) + }() + return ctx +} + +func messageToMap(m *schema.Message) map[string]any { + if m == nil { + return map[string]any{} + } + raw, err := sonic.Marshal(m) + if err != nil { + return map[string]any{"role": string(m.Role), "content": m.Content} + } + var out map[string]any + if err := sonic.Unmarshal(raw, &out); err != nil { + return map[string]any{"role": string(m.Role), "content": m.Content} + } + return out +} + +func convModelCallbackInputs(in []callbacks.CallbackInput) []*model.CallbackInput { + ret := make([]*model.CallbackInput, len(in)) + for i, c := range in { + ret[i] = model.ConvCallbackInput(c) + } + return ret +} + +func convModelCallbackOutputs(out []callbacks.CallbackOutput) []*model.CallbackOutput { + ret := make([]*model.CallbackOutput, len(out)) + for i, c := range out { + ret[i] = model.ConvCallbackOutput(c) + } + return ret +} + +func extractModelInput(ins []*model.CallbackInput) (config *model.Config, messages []*schema.Message, extra map[string]any, err error) { + var mas [][]*schema.Message + for _, in := range ins { + if in == nil { + continue + } + if len(in.Messages) > 0 { + mas = append(mas, in.Messages) + } + if len(in.Extra) > 0 { + extra = in.Extra + } + if in.Config != nil { + config = in.Config + } + } + if len(mas) == 0 { + return config, []*schema.Message{}, extra, nil + } + messages, err = concatMessageArrays(mas) + if err != nil { + return nil, nil, nil, err + } + return config, messages, extra, nil +} + +func extractModelOutput(outs []*model.CallbackOutput) (usage *model.TokenUsage, message *schema.Message, extra map[string]any, err error) { + var mas []*schema.Message + for _, out := range outs { + if out == nil { + continue + } + if out.TokenUsage != nil { + usage = out.TokenUsage + } + if out.Message != nil { + mas = append(mas, out.Message) + } + if out.Extra != nil { + extra = out.Extra + } + } + if len(mas) == 0 { + return usage, &schema.Message{}, extra, nil + } + message, err = schema.ConcatMessages(mas) + if err != nil { + return nil, nil, nil, err + } + return usage, message, extra, nil +} + +func concatMessageArrays(mas [][]*schema.Message) ([]*schema.Message, error) { + if len(mas) == 0 { + return nil, nil + } + arrayLen := len(mas[0]) + ret := make([]*schema.Message, arrayLen) + slicesToConcat := make([][]*schema.Message, arrayLen) + for _, ma := range mas { + if len(ma) != arrayLen { + return nil, goerrors.Errorf("mismatch streamed message batch length: got %d want %d", len(ma), arrayLen) + } + for i := 0; i < arrayLen; i++ { + if ma[i] != nil { + slicesToConcat[i] = append(slicesToConcat[i], ma[i]) + } + } + } + for i, slice := range slicesToConcat { + switch len(slice) { + case 0: + ret[i] = nil + case 1: + ret[i] = slice[0] + default: + cm, err := schema.ConcatMessages(slice) + if err != nil { + return nil, err + } + ret[i] = cm + } + } + return ret, nil +} diff --git a/pkg/tape/eino_handler_test.go b/pkg/tape/eino_handler_test.go new file mode 100644 index 0000000..97c8d2a --- /dev/null +++ b/pkg/tape/eino_handler_test.go @@ -0,0 +1,11 @@ +package tape + +import ( + "testing" + + "github.com/cloudwego/eino/callbacks" +) + +func TestEinoHandlerImplementsCallbackHandler(t *testing.T) { + var _ callbacks.Handler = (*EinoHandler)(nil) +} diff --git a/pkg/tape/entry.go b/pkg/tape/entry.go index 7271b8d..5470048 100644 --- a/pkg/tape/entry.go +++ b/pkg/tape/entry.go @@ -1,27 +1,23 @@ +// Package tape implements append-only audit logs aligned with Republic's tape schema. package tape import "time" -// Entry is a single append-only entry in a tape, modeled after republic's TapeEntry. type Entry struct { - ID int `json:"id"` + ID int64 `json:"id"` Kind string `json:"kind"` Payload map[string]any `json:"payload"` Meta map[string]any `json:"meta,omitempty"` Date string `json:"date"` } -// Entry kinds. -const ( - KindMessage = "message" - KindSystem = "system" - KindToolCall = "tool_call" - KindToolResult = "tool_result" - KindError = "error" - KindEvent = "event" -) - func newEntry(kind string, payload, meta map[string]any) Entry { + if payload == nil { + payload = map[string]any{} + } + if meta == nil { + meta = map[string]any{} + } return Entry{ Kind: kind, Payload: payload, @@ -30,47 +26,46 @@ func newEntry(kind string, payload, meta map[string]any) Entry { } } -// MessageEntry creates a message entry. -func MessageEntry(role, content string, meta map[string]any) Entry { - return newEntry(KindMessage, map[string]any{ - "role": role, - "content": content, - }, meta) +func Message(msg, meta map[string]any) Entry { + p := map[string]any{} + for k, v := range msg { + p[k] = v + } + return newEntry("message", p, meta) } -// SystemEntry creates a system prompt entry. -func SystemEntry(content string, meta map[string]any) Entry { - return newEntry(KindSystem, map[string]any{ - "content": content, - }, meta) +func System(content string, meta map[string]any) Entry { + return newEntry("system", map[string]any{"content": content}, meta) } -// ToolCallEntry creates a tool call entry. -func ToolCallEntry(calls []map[string]any, meta map[string]any) Entry { - return newEntry(KindToolCall, map[string]any{ - "calls": calls, - }, meta) +func Anchor(name string, state, meta map[string]any) Entry { + p := map[string]any{"name": name} + if state != nil { + p["state"] = state + } + return newEntry("anchor", p, meta) } -// ToolResultEntry creates a tool result entry. -func ToolResultEntry(results []any, meta map[string]any) Entry { - return newEntry(KindToolResult, map[string]any{ - "results": results, - }, meta) +func ToolCall(calls []map[string]any, meta map[string]any) Entry { + return newEntry("tool_call", map[string]any{"calls": calls}, meta) } -// ErrorEntry creates an error entry. -func ErrorEntry(kind, message string, meta map[string]any) Entry { - return newEntry(KindError, map[string]any{ - "kind": kind, - "message": message, - }, meta) +func ToolResult(results []any, meta map[string]any) Entry { + return newEntry("tool_result", map[string]any{"results": results}, meta) } -// EventEntry creates a generic event entry. -func EventEntry(name string, data, meta map[string]any) Entry { - return newEntry(KindEvent, map[string]any{ - "name": name, - "data": data, - }, meta) +func ErrorPayload(message string, extra, meta map[string]any) Entry { + p := map[string]any{"message": message} + for k, v := range extra { + p[k] = v + } + return newEntry("error", p, meta) +} + +func Event(name string, data, meta map[string]any) Entry { + p := map[string]any{"name": name} + if data != nil { + p["data"] = data + } + return newEntry("event", p, meta) } diff --git a/pkg/tape/handler.go b/pkg/tape/handler.go deleted file mode 100644 index 7b8158b..0000000 --- a/pkg/tape/handler.go +++ /dev/null @@ -1,220 +0,0 @@ -package tape - -import ( - "context" - "log/slog" - - "github.com/cloudwego/eino/callbacks" - "github.com/cloudwego/eino/components" - "github.com/cloudwego/eino/components/model" - "github.com/cloudwego/eino/schema" -) - -// NewHandler creates an eino callbacks.Handler that records tape entries to the given store. -func NewHandler(store Recorder) callbacks.Handler { - return callbacks.NewHandlerBuilder(). - OnStartFn(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context { - onStart(store, info, input) - return ctx - }). - OnEndFn(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context { - onEnd(store, info, output) - return ctx - }). - OnErrorFn(func(ctx context.Context, info *callbacks.RunInfo, err error) context.Context { - onError(store, info, err) - return ctx - }). - OnEndWithStreamOutputFn(func(ctx context.Context, info *callbacks.RunInfo, output *schema.StreamReader[callbacks.CallbackOutput]) context.Context { - onEndWithStream(store, info, output) - return ctx - }). - Build() -} - -func componentMeta(info *callbacks.RunInfo) map[string]any { - return map[string]any{ - "component": string(info.Component), - "name": info.Name, - "type": info.Type, - } -} - -func onStart(store Recorder, info *callbacks.RunInfo, input callbacks.CallbackInput) { - if info == nil { - return - } - meta := componentMeta(info) - - if info.Component == components.ComponentOfChatModel { - modelInput := model.ConvCallbackInput(input) - if modelInput != nil { - recordModelInput(store, modelInput, meta) - return - } - } - - if info.Component == components.ComponentOfTool { - entry := EventEntry("tool_start", map[string]any{ - "component": string(info.Component), - "name": info.Name, - }, meta) - appendSafe(store, entry) - return - } - - entry := EventEntry("start", map[string]any{ - "component": string(info.Component), - "name": info.Name, - }, meta) - appendSafe(store, entry) -} - -func onEnd(store Recorder, info *callbacks.RunInfo, output callbacks.CallbackOutput) { - if info == nil { - return - } - meta := componentMeta(info) - - if info.Component == components.ComponentOfChatModel { - modelOutput := model.ConvCallbackOutput(output) - if modelOutput != nil { - recordModelOutput(store, modelOutput, meta) - return - } - } - - if info.Component == components.ComponentOfTool { - recordToolResult(store, output, meta) - return - } - - entry := EventEntry("end", map[string]any{ - "component": string(info.Component), - "name": info.Name, - }, meta) - appendSafe(store, entry) -} - -func onError(store Recorder, info *callbacks.RunInfo, err error) { - if info == nil { - return - } - meta := componentMeta(info) - entry := ErrorEntry(string(info.Component), err.Error(), meta) - appendSafe(store, entry) -} - -func onEndWithStream(store Recorder, info *callbacks.RunInfo, output *schema.StreamReader[callbacks.CallbackOutput]) { - if info == nil { - return - } - meta := componentMeta(info) - - // Consume the stream in a goroutine to avoid blocking - go func() { - defer func() { - if r := recover(); r != nil { - slog.Warn("tape: panic consuming stream", "recover", r) - } - }() - - var lastOutput callbacks.CallbackOutput - for { - chunk, err := output.Recv() - if err != nil { - break - } - lastOutput = chunk - } - output.Close() - - if lastOutput == nil { - return - } - - if info.Component == components.ComponentOfChatModel { - modelOutput := model.ConvCallbackOutput(lastOutput) - if modelOutput != nil { - recordModelOutput(store, modelOutput, meta) - return - } - } - - entry := EventEntry("end", map[string]any{ - "component": string(info.Component), - "name": info.Name, - }, meta) - appendSafe(store, entry) - }() -} - -func recordModelInput(store Recorder, input *model.CallbackInput, meta map[string]any) { - // Record system message separately if present - for _, msg := range input.Messages { - if msg.Role == schema.System { - appendSafe(store, SystemEntry(msg.Content, meta)) - continue - } - appendSafe(store, MessageEntry(string(msg.Role), msg.Content, meta)) - } - - // Record tool calls from messages - if len(input.Tools) > 0 { - var tools []map[string]any - for _, t := range input.Tools { - tools = append(tools, map[string]any{ - "name": t.Name, - "desc": t.Desc, - }) - } - appendSafe(store, EventEntry("tools_available", map[string]any{ - "tools": tools, - }, meta)) - } -} - -func recordModelOutput(store Recorder, output *model.CallbackOutput, meta map[string]any) { - if output.TokenUsage != nil { - meta["token_usage"] = map[string]any{ - "prompt_tokens": output.TokenUsage.PromptTokens, - "completion_tokens": output.TokenUsage.CompletionTokens, - "total_tokens": output.TokenUsage.TotalTokens, - } - } - - if output.Message != nil { - // Record tool calls if present - if len(output.Message.ToolCalls) > 0 { - var calls []map[string]any - for _, tc := range output.Message.ToolCalls { - calls = append(calls, map[string]any{ - "id": tc.ID, - "function": tc.Function.Name, - "args": tc.Function.Arguments, - }) - } - appendSafe(store, ToolCallEntry(calls, meta)) - } - - // Record assistant message - if output.Message.Content != "" { - appendSafe(store, MessageEntry(string(output.Message.Role), output.Message.Content, meta)) - } - } -} - -func recordToolResult(store Recorder, output callbacks.CallbackOutput, meta map[string]any) { - // Tool output is typically a string - if s, ok := output.(string); ok { - appendSafe(store, ToolResultEntry([]any{s}, meta)) - return - } - appendSafe(store, ToolResultEntry([]any{output}, meta)) -} - -func appendSafe(store Recorder, entry Entry) { - if err := store.Append(entry); err != nil { - slog.Warn("tape: failed to append entry", "err", err, "kind", entry.Kind) - } -} diff --git a/pkg/tape/jsonl.go b/pkg/tape/jsonl.go new file mode 100644 index 0000000..2e8c135 --- /dev/null +++ b/pkg/tape/jsonl.go @@ -0,0 +1,96 @@ +package tape + +import ( + "bufio" + "encoding/json" + "os" + "path/filepath" + "sync" +) + +const FileName = ".tape.jsonl" + +type JSONLStore struct { + path string + file *os.File + mu sync.Mutex + nextID int64 +} + +func OpenJSONL(path string) (*JSONLStore, error) { + if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { + return nil, err + } + nextID, err := scanMaxID(path) + if err != nil { + return nil, err + } + f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644) + if err != nil { + return nil, err + } + return &JSONLStore{path: path, file: f, nextID: nextID}, nil +} + +func scanMaxID(path string) (int64, error) { + f, err := os.Open(path) + if err != nil { + if os.IsNotExist(err) { + return 1, nil + } + return 0, err + } + defer f.Close() + + var maxID int64 + sc := bufio.NewScanner(f) + // Allow entries up to 10 MB to handle large tool outputs in tape lines. + sc.Buffer(make([]byte, 0, 64*1024), 10*1024*1024) + for sc.Scan() { + var row struct { + ID int64 `json:"id"` + } + if json.Unmarshal(sc.Bytes(), &row) == nil && row.ID > maxID { + maxID = row.ID + } + } + if err := sc.Err(); err != nil { + return 0, err + } + if maxID == 0 { + return 1, nil + } + return maxID + 1, nil +} + +func (s *JSONLStore) Append(e Entry) error { + s.mu.Lock() + defer s.mu.Unlock() + + e.ID = s.nextID + s.nextID++ + + enc, err := json.Marshal(e) + if err != nil { + return err + } + if _, err := s.file.Write(append(enc, '\n')); err != nil { + return err + } + return s.file.Sync() +} + +func (s *JSONLStore) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + if s.file == nil { + return nil + } + err := s.file.Close() + s.file = nil + return err +} + +func (s *JSONLStore) Path() string { + return s.path +} diff --git a/pkg/tape/jsonl_test.go b/pkg/tape/jsonl_test.go new file mode 100644 index 0000000..5d6f2e2 --- /dev/null +++ b/pkg/tape/jsonl_test.go @@ -0,0 +1,38 @@ +package tape + +import ( + "os" + "path/filepath" + "testing" +) + +func TestJSONLAppendSequentialID(t *testing.T) { + dir := t.TempDir() + p := filepath.Join(dir, FileName) + + s1, err := OpenJSONL(p) + if err != nil { + t.Fatal(err) + } + if err := s1.Append(Message(map[string]any{"role": "user", "content": "hi"}, map[string]any{"run_id": "a"})); err != nil { + t.Fatal(err) + } + if err := s1.Close(); err != nil { + t.Fatal(err) + } + + s2, err := OpenJSONL(p) + if err != nil { + t.Fatal(err) + } + defer s2.Close() + if err := s2.Append(System("sys", map[string]any{"run_id": "b"})); err != nil { + t.Fatal(err) + } + + data, err := os.ReadFile(p) + if err != nil { + t.Fatal(err) + } + t.Logf("%s", data) +} diff --git a/pkg/tape/slog_handler.go b/pkg/tape/slog_handler.go deleted file mode 100644 index ddda737..0000000 --- a/pkg/tape/slog_handler.go +++ /dev/null @@ -1,149 +0,0 @@ -package tape - -import ( - "context" - "encoding/json" - "log/slog" - - "github.com/cloudwego/eino/callbacks" - "github.com/cloudwego/eino/components" - "github.com/cloudwego/eino/components/model" - "github.com/cloudwego/eino/schema" -) - -// NewSlogHandler creates an eino callbacks.Handler that logs all callback -// events directly via slog. Register it with callbacks.AppendGlobalHandlers. -func NewSlogHandler() callbacks.Handler { - return callbacks.NewHandlerBuilder(). - OnStartFn(slogOnStart). - OnEndFn(slogOnEnd). - OnErrorFn(slogOnError). - OnEndWithStreamOutputFn(slogOnEndWithStream). - Build() -} - -func slogOnStart(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context { - if info == nil { - return ctx - } - if info.Component == components.ComponentOfChatModel { - if mi := model.ConvCallbackInput(input); mi != nil { - for _, msg := range mi.Messages { - slog.Info("tape.start", - "component", string(info.Component), - "name", info.Name, - "role", string(msg.Role), - "content", truncate(msg.Content, 200), - "tool_calls", len(msg.ToolCalls), - ) - } - return ctx - } - } - - slog.Info("tape.start", - "component", string(info.Component), - "name", info.Name, - "type", info.Type, - "input", formatAny(input), - ) - return ctx -} - -func slogOnEnd(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context { - if info == nil { - return ctx - } - if info.Component == components.ComponentOfChatModel { - if mo := model.ConvCallbackOutput(output); mo != nil { - attrs := []any{ - "component", string(info.Component), - "name", info.Name, - } - if mo.Message != nil { - attrs = append(attrs, - "role", string(mo.Message.Role), - "content", truncate(mo.Message.Content, 200), - "tool_calls", len(mo.Message.ToolCalls), - ) - } - if mo.TokenUsage != nil { - attrs = append(attrs, - "prompt_tokens", mo.TokenUsage.PromptTokens, - "completion_tokens", mo.TokenUsage.CompletionTokens, - "total_tokens", mo.TokenUsage.TotalTokens, - ) - } - slog.Info("tape.end", attrs...) - return ctx - } - } - - slog.Info("tape.end", - "component", string(info.Component), - "name", info.Name, - "type", info.Type, - "output", formatAny(output), - ) - return ctx -} - -func slogOnError(ctx context.Context, info *callbacks.RunInfo, err error) context.Context { - if info == nil { - return ctx - } - slog.Error("tape.error", - "component", string(info.Component), - "name", info.Name, - "err", err, - ) - return ctx -} - -func slogOnEndWithStream(ctx context.Context, info *callbacks.RunInfo, output *schema.StreamReader[callbacks.CallbackOutput]) context.Context { - if info == nil { - return ctx - } - go func() { - defer func() { - if r := recover(); r != nil { - slog.Warn("tape.stream: panic", "recover", r) - } - }() - - var last callbacks.CallbackOutput - for { - chunk, err := output.Recv() - if err != nil { - break - } - last = chunk - } - output.Close() - - if last == nil { - return - } - - slogOnEnd(context.Background(), info, last) - }() - return ctx -} - -func truncate(s string, limit int) string { - if len(s) <= limit { - return s - } - return s[:limit] + "..." -} - -func formatAny(v any) string { - if v == nil { - return "" - } - b, err := json.Marshal(v) - if err != nil { - return "" - } - return truncate(string(b), 300) -} diff --git a/pkg/tape/store.go b/pkg/tape/store.go deleted file mode 100644 index 18f52ce..0000000 --- a/pkg/tape/store.go +++ /dev/null @@ -1,83 +0,0 @@ -package tape - -import ( - "bufio" - "encoding/json" - "os" - "sync" -) - -// Recorder is the interface for appending tape entries. -type Recorder interface { - Append(entry Entry) error -} - -var _ Recorder = (*JSONLStore)(nil) - -// JSONLStore is an append-only tape store that writes entries as JSONL to a file. -type JSONLStore struct { - mu sync.Mutex - path string - nextID int -} - -// NewJSONLStore creates a new JSONL store writing to the given file path. -// It scans any existing file to resume IDs after the current maximum. -func NewJSONLStore(path string) *JSONLStore { - nextID := scanMaxID(path) - return &JSONLStore{ - path: path, - nextID: nextID, - } -} - -// scanMaxID reads an existing JSONL file and returns the next ID to use. -// Returns 1 if the file does not exist or contains no entries. -func scanMaxID(path string) int { - f, err := os.Open(path) - if err != nil { - return 1 - } - defer f.Close() - - var maxID int - sc := bufio.NewScanner(f) - sc.Buffer(make([]byte, 0, 64*1024), 10*1024*1024) - for sc.Scan() { - var row struct { - ID int `json:"id"` - } - if json.Unmarshal(sc.Bytes(), &row) == nil && row.ID > maxID { - maxID = row.ID - } - } - if maxID == 0 { - return 1 - } - return maxID + 1 -} - -// Append adds an entry to the tape, assigns it an ID, and writes it as a JSON line. -func (s *JSONLStore) Append(entry Entry) error { - s.mu.Lock() - defer s.mu.Unlock() - - entry.ID = s.nextID - s.nextID++ - - line, err := json.Marshal(entry) - if err != nil { - return err - } - line = append(line, '\n') - - //nolint:gosec // tape file is not sensitive; path is controlled by the application - f, err := os.OpenFile(s.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) - if err != nil { - return err - } - defer f.Close() - - _, err = f.Write(line) - return err -} diff --git a/pkg/tape/store_test.go b/pkg/tape/store_test.go deleted file mode 100644 index dd9246a..0000000 --- a/pkg/tape/store_test.go +++ /dev/null @@ -1,56 +0,0 @@ -package tape - -import ( - "bufio" - "encoding/json" - "os" - "path/filepath" - "testing" -) - -func TestJSONLStoreAppend(t *testing.T) { - dir := t.TempDir() - path := filepath.Join(dir, ".tape.jsonl") - - store := NewJSONLStore(path) - - if err := store.Append(MessageEntry("user", "hello", nil)); err != nil { - t.Fatalf("append message: %v", err) - } - if err := store.Append(MessageEntry("assistant", "hi there", map[string]any{"model": "test"})); err != nil { - t.Fatalf("append response: %v", err) - } - if err := store.Append(ErrorEntry("test", "something failed", nil)); err != nil { - t.Fatalf("append error: %v", err) - } - - f, err := os.Open(path) - if err != nil { - t.Fatalf("open file: %v", err) - } - defer f.Close() - - var entries []Entry - scanner := bufio.NewScanner(f) - for scanner.Scan() { - var e Entry - if err := json.Unmarshal(scanner.Bytes(), &e); err != nil { - t.Fatalf("unmarshal: %v", err) - } - entries = append(entries, e) - } - - if len(entries) != 3 { - t.Fatalf("expected 3 entries, got %d", len(entries)) - } - - if entries[0].ID != 1 || entries[0].Kind != KindMessage { - t.Errorf("entry 0: id=%d kind=%s", entries[0].ID, entries[0].Kind) - } - if entries[1].ID != 2 || entries[1].Payload["role"] != "assistant" { - t.Errorf("entry 1: id=%d role=%v", entries[1].ID, entries[1].Payload["role"]) - } - if entries[2].Kind != KindError { - t.Errorf("entry 2: kind=%s", entries[2].Kind) - } -} diff --git a/pkg/tracing/slog_eino.go b/pkg/tracing/slog_eino.go new file mode 100644 index 0000000..cbbea72 --- /dev/null +++ b/pkg/tracing/slog_eino.go @@ -0,0 +1,110 @@ +package tracing + +import ( + "context" + "fmt" + "log/slog" + + "github.com/bytedance/sonic" + "github.com/cloudwego/eino/callbacks" + "github.com/cloudwego/eino/schema" +) + +// SlogEinoHandler logs full callback payloads as JSON for side-by-side comparison with tape / Langfuse. +// Stream callbacks only log a placeholder (the StreamReader is not drained here). +type SlogEinoHandler struct { + Log *slog.Logger +} + +func NewSlogEinoHandler(log *slog.Logger) *SlogEinoHandler { + if log == nil { + log = slog.Default() + } + return &SlogEinoHandler{Log: log} +} + +func (h *SlogEinoHandler) Needed(context.Context, *callbacks.RunInfo, callbacks.CallbackTiming) bool { + return true +} + +func (h *SlogEinoHandler) OnStart(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context { + h.logPayload(ctx, "eino.callback.OnStart", info, "input", input) + return ctx +} + +func (h *SlogEinoHandler) OnEnd(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context { + h.logPayload(ctx, "eino.callback.OnEnd", info, "output", output) + return ctx +} + +func (h *SlogEinoHandler) OnError(ctx context.Context, info *callbacks.RunInfo, err error) context.Context { + if err == nil { + return ctx + } + attrs := []any{ + slog.String("err", err.Error()), + } + if info != nil { + attrs = append(attrs, + slog.String("node", info.Name), + slog.String("component", string(info.Component)), + slog.String("graph_type", info.Type), + ) + } + h.Log.WarnContext(ctx, "eino.callback.OnError", attrs...) + return ctx +} + +func (h *SlogEinoHandler) OnStartWithStreamInput(ctx context.Context, info *callbacks.RunInfo, input *schema.StreamReader[callbacks.CallbackInput]) context.Context { + h.logPayload(ctx, "eino.callback.OnStartWithStreamInput", info, "input_stream", streamInputNote(input)) + return ctx +} + +func (h *SlogEinoHandler) OnEndWithStreamOutput(ctx context.Context, info *callbacks.RunInfo, output *schema.StreamReader[callbacks.CallbackOutput]) context.Context { + h.logPayload(ctx, "eino.callback.OnEndWithStreamOutput", info, "output_stream", streamOutputNote(output)) + return ctx +} + +func streamInputNote(s *schema.StreamReader[callbacks.CallbackInput]) any { + if s == nil { + return map[string]any{"kind": "StreamReader[CallbackInput]", "nil": true} + } + return map[string]any{ + "kind": "StreamReader[CallbackInput]", + "note": "not drained here; compare non-stream OnStart/OnEnd JSON or tape", + } +} + +func streamOutputNote(s *schema.StreamReader[callbacks.CallbackOutput]) any { + if s == nil { + return map[string]any{"kind": "StreamReader[CallbackOutput]", "nil": true} + } + return map[string]any{ + "kind": "StreamReader[CallbackOutput]", + "note": "not drained here; compare non-stream callbacks or tape", + } +} + +func (h *SlogEinoHandler) logPayload(ctx context.Context, msg string, info *callbacks.RunInfo, field string, v any) { + attrs := []any{} + if info != nil { + attrs = append(attrs, + slog.String("node", info.Name), + slog.String("component", string(info.Component)), + slog.String("graph_type", info.Type), + ) + } + attrs = append(attrs, slog.String(field, marshalCallbackPayload(v))) + h.Log.InfoContext(ctx, msg, attrs...) +} + +func marshalCallbackPayload(v any) string { + if v == nil { + return "null" + } + b, err := sonic.MarshalString(v) + if err != nil { + return fmt.Sprintf("%q", fmt.Sprintf("", err, v)) + } + return b +} diff --git a/pkg/tracing/slog_eino_test.go b/pkg/tracing/slog_eino_test.go new file mode 100644 index 0000000..e85414e --- /dev/null +++ b/pkg/tracing/slog_eino_test.go @@ -0,0 +1,12 @@ +package tracing + +import ( + "testing" + + "github.com/cloudwego/eino/callbacks" +) + +func TestSlogEinoHandlerImplementsCallbackHandler(t *testing.T) { + t.Helper() + var _ callbacks.Handler = (*SlogEinoHandler)(nil) +}