Skip to content

Commit c651cde

Browse files
committed
Merge branch 'dev' into main
2 parents 0138b29 + 677396c commit c651cde

11 files changed

Lines changed: 717 additions & 25 deletions

docs/08-scheduling-cron.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ Cancel commands for Telegram and other channels.
122122
- **Empty outbound**: On cancel, an empty outbound message is published to trigger cleanup (stop typing indicator, clear reactions)
123123
- **Trace finalization**: When `ctx.Err() != nil`, trace finalization falls back to `context.Background()` for the final DB write. Status is set to `"cancelled"`
124124
- **Context survival**: Context values (traceID, collector) survive cancellation -- only the Done channel fires
125+
- **Background workers (ticker/cron) — tenant ctx injection required**: Jobs started from `context.Background()` carry no tenant. Before calling any tenant-scoped store method (e.g. `GetTeam`, `GetTask`, `GetByID`), the worker MUST inject `store.WithTenantID(ctx, tenantID)` derived from the row-level `tenant_id` (e.g. `RecoveredTaskInfo.TenantID`, `TeamTaskData.TenantID`). Callers must also nil-check returned entities — some stores (e.g. `PGTeamStore.GetTeam`) return `(nil, nil)` when tenant is missing rather than an error. See `internal/tasks/task_ticker.go` for the reference pattern
125126
- **Generation counter**: Each `SessionQueue` tracks a generation counter. When reset (e.g., during SIGUSR1 in-process restart), old generations are ignored, preventing stale completions from interfering with new requests
126127

127128
---

internal/agent/loop_pipeline_tool_callbacks.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,22 @@ type toolRawResult struct {
6363
// makeExecuteToolRaw wraps tool I/O only (parallel-safe, no state mutation).
6464
// Returns tool message + toolRawResult (with timing + spanID) as opaque raw data for ProcessToolResult.
6565
func (l *Loop) makeExecuteToolRaw(req *RunRequest) func(ctx context.Context, tc providers.ToolCall) (providers.Message, any, error) {
66+
emitRun := makeToolEmitRun(l, req)
6667
return func(ctx context.Context, tc providers.ToolCall) (providers.Message, any, error) {
6768
registryName := l.resolveToolCallName(tc.Name)
6869
argsJSON, _ := json.Marshal(tc.Arguments)
70+
slog.Info("tool call", "agent", l.id, "tool", tc.Name, "args_len", len(argsJSON))
71+
72+
// Emit tool.call event at I/O start — parity with sequential path (makeExecuteToolCall).
73+
// Without this, parallel tool execution (2+ concurrent tools) never notifies UI of
74+
// tool invocation, so `tool.result` arrives with no matching `tool.call` to update.
75+
// Bus.Broadcast is RWMutex-guarded; safe to call from parallel goroutines.
76+
emitRun(AgentEvent{
77+
Type: protocol.AgentEventToolCall,
78+
AgentID: l.id,
79+
RunID: req.RunID,
80+
Payload: map[string]any{"name": tc.Name, "id": tc.ID, "arguments": tc.Arguments},
81+
})
6982

7083
// Emit tool span start (goroutine-safe: channel send only).
7184
start := time.Now().UTC()
Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
package agent
2+
3+
import (
4+
"context"
5+
"sync"
6+
"testing"
7+
8+
"github.com/nextlevelbuilder/goclaw/internal/pipeline"
9+
"github.com/nextlevelbuilder/goclaw/internal/providers"
10+
"github.com/nextlevelbuilder/goclaw/internal/tools"
11+
"github.com/nextlevelbuilder/goclaw/pkg/protocol"
12+
)
13+
14+
// stubExecutor implements tools.ToolExecutor with a canned successful Result.
15+
// Used to isolate the tool-callback wrappers from real tool registry wiring.
16+
type stubExecutor struct{}
17+
18+
func (s *stubExecutor) ExecuteWithContext(_ context.Context, _ string, _ map[string]any, _, _, _, _ string, _ tools.AsyncCallback) *tools.Result {
19+
return &tools.Result{ForLLM: "ok", IsError: false}
20+
}
21+
func (s *stubExecutor) TryActivateDeferred(string) bool { return false }
22+
func (s *stubExecutor) ProviderDefs() []providers.ToolDefinition { return nil }
23+
func (s *stubExecutor) Get(string) (tools.Tool, bool) { return nil, false }
24+
func (s *stubExecutor) List() []string { return nil }
25+
func (s *stubExecutor) Aliases() map[string]string { return nil }
26+
27+
// eventCollector buffers AgentEvents for inspection in tests.
28+
// Safe for concurrent appends from parallel goroutines.
29+
type eventCollector struct {
30+
mu sync.Mutex
31+
events []AgentEvent
32+
}
33+
34+
func (c *eventCollector) onEvent(e AgentEvent) {
35+
c.mu.Lock()
36+
c.events = append(c.events, e)
37+
c.mu.Unlock()
38+
}
39+
40+
func (c *eventCollector) filter(typ string) []AgentEvent {
41+
c.mu.Lock()
42+
defer c.mu.Unlock()
43+
var out []AgentEvent
44+
for _, e := range c.events {
45+
if e.Type == typ {
46+
out = append(out, e)
47+
}
48+
}
49+
return out
50+
}
51+
52+
// newTestLoopForToolCallbacks builds a minimal Loop instance sufficient to
53+
// exercise makeExecuteToolCall / makeExecuteToolRaw. All optional subsystems
54+
// (tracing, metrics, input guard) are left nil and hit early-return paths.
55+
func newTestLoopForToolCallbacks(onEvent func(AgentEvent)) *Loop {
56+
return &Loop{
57+
id: "test-agent",
58+
tools: &stubExecutor{},
59+
onEvent: onEvent,
60+
}
61+
}
62+
63+
// TestMakeExecuteToolCall_EmitsToolCallEvent verifies the sequential wrapper
64+
// emits a tool.call event before running tool I/O.
65+
func TestMakeExecuteToolCall_EmitsToolCallEvent(t *testing.T) {
66+
t.Parallel()
67+
col := &eventCollector{}
68+
l := newTestLoopForToolCallbacks(col.onEvent)
69+
70+
req := &RunRequest{
71+
RunID: "run-1",
72+
SessionKey: "sess-A",
73+
UserID: "u-1",
74+
Channel: "ws",
75+
RunKind: "",
76+
}
77+
state := &pipeline.RunState{RunID: "run-1"}
78+
tc := providers.ToolCall{ID: "tc-1", Name: "read_file", Arguments: map[string]any{"path": "/tmp/x"}}
79+
80+
_, err := l.makeExecuteToolCall(req, &runState{})(context.Background(), state, tc)
81+
if err != nil {
82+
t.Fatalf("makeExecuteToolCall returned error: %v", err)
83+
}
84+
85+
calls := col.filter(protocol.AgentEventToolCall)
86+
if len(calls) != 1 {
87+
t.Fatalf("expected 1 tool.call event, got %d (all events: %+v)", len(calls), col.events)
88+
}
89+
assertToolCallPayload(t, calls[0], tc, req)
90+
}
91+
92+
// TestMakeExecuteToolRaw_EmitsToolCallEvent is the PRIMARY regression guard.
93+
// The original bug: parallel path (makeExecuteToolRaw) did not emit tool.call,
94+
// so web UI and desktop UI silently dropped tool info during real-time streaming.
95+
// Mutation-verify: remove emitRun(...) from makeExecuteToolRaw — this test must fail.
96+
func TestMakeExecuteToolRaw_EmitsToolCallEvent(t *testing.T) {
97+
t.Parallel()
98+
col := &eventCollector{}
99+
l := newTestLoopForToolCallbacks(col.onEvent)
100+
101+
req := &RunRequest{
102+
RunID: "run-2",
103+
SessionKey: "sess-B",
104+
UserID: "u-2",
105+
Channel: "ws",
106+
RunKind: "",
107+
}
108+
tc := providers.ToolCall{ID: "tc-2", Name: "write_file", Arguments: map[string]any{"path": "/tmp/y"}}
109+
110+
msg, raw, err := l.makeExecuteToolRaw(req)(context.Background(), tc)
111+
if err != nil {
112+
t.Fatalf("makeExecuteToolRaw returned error: %v", err)
113+
}
114+
if msg.Role != "tool" || msg.ToolCallID != tc.ID {
115+
t.Errorf("unexpected tool message: %+v", msg)
116+
}
117+
if raw == nil {
118+
t.Error("expected non-nil raw data (toolRawResult)")
119+
}
120+
121+
calls := col.filter(protocol.AgentEventToolCall)
122+
if len(calls) != 1 {
123+
t.Fatalf("expected 1 tool.call event, got %d (all events: %+v)", len(calls), col.events)
124+
}
125+
assertToolCallPayload(t, calls[0], tc, req)
126+
}
127+
128+
// TestMakeExecuteToolRaw_ConcurrentCallsEmitAllEvents confirms the parallel
129+
// wrapper is safe to invoke from multiple goroutines — mirrors the real
130+
// executeParallel dispatch in pipeline/tool_stage.go.
131+
func TestMakeExecuteToolRaw_ConcurrentCallsEmitAllEvents(t *testing.T) {
132+
t.Parallel()
133+
col := &eventCollector{}
134+
l := newTestLoopForToolCallbacks(col.onEvent)
135+
136+
req := &RunRequest{RunID: "run-3", SessionKey: "sess-C", UserID: "u-3", Channel: "ws"}
137+
exec := l.makeExecuteToolRaw(req)
138+
139+
const n = 5
140+
var wg sync.WaitGroup
141+
for i := range n {
142+
wg.Add(1)
143+
go func(idx int) {
144+
defer wg.Done()
145+
tc := providers.ToolCall{ID: "tc-" + string(rune('a'+idx)), Name: "t", Arguments: nil}
146+
if _, _, err := exec(context.Background(), tc); err != nil {
147+
t.Errorf("goroutine %d: %v", idx, err)
148+
}
149+
}(i)
150+
}
151+
wg.Wait()
152+
153+
calls := col.filter(protocol.AgentEventToolCall)
154+
if len(calls) != n {
155+
t.Fatalf("expected %d tool.call events, got %d", n, len(calls))
156+
}
157+
}
158+
159+
// assertToolCallPayload verifies the event carries the expected tc identity
160+
// and routing context from RunRequest.
161+
func assertToolCallPayload(t *testing.T, ev AgentEvent, tc providers.ToolCall, req *RunRequest) {
162+
t.Helper()
163+
if ev.AgentID != "test-agent" {
164+
t.Errorf("AgentID: got %q, want test-agent", ev.AgentID)
165+
}
166+
if ev.RunID != req.RunID {
167+
t.Errorf("RunID: got %q, want %q", ev.RunID, req.RunID)
168+
}
169+
if ev.SessionKey != req.SessionKey {
170+
t.Errorf("SessionKey: got %q, want %q", ev.SessionKey, req.SessionKey)
171+
}
172+
if ev.Channel != req.Channel {
173+
t.Errorf("Channel: got %q, want %q", ev.Channel, req.Channel)
174+
}
175+
if ev.UserID != req.UserID {
176+
t.Errorf("UserID: got %q, want %q", ev.UserID, req.UserID)
177+
}
178+
payload, ok := ev.Payload.(map[string]any)
179+
if !ok {
180+
t.Fatalf("Payload is not map[string]any: %T", ev.Payload)
181+
}
182+
if payload["id"] != tc.ID {
183+
t.Errorf("payload.id: got %v, want %q", payload["id"], tc.ID)
184+
}
185+
if payload["name"] != tc.Name {
186+
t.Errorf("payload.name: got %v, want %q", payload["name"], tc.Name)
187+
}
188+
}

internal/pipeline/stages_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"os"
77
"path/filepath"
88
"strings"
9+
"sync"
910
"testing"
1011

1112
"github.com/nextlevelbuilder/goclaw/internal/bootstrap"
@@ -789,6 +790,66 @@ func TestToolStage_MultipleTools_Sequential_MessagesInOrder(t *testing.T) {
789790
}
790791
}
791792

793+
// Regression: v3 parallel path invokes ExecuteToolRaw + ProcessToolResult
794+
// for every tool call. If this breaks, the `tool.call` WS event emitted
795+
// inside makeExecuteToolRaw (loop_pipeline_tool_callbacks.go) stops firing
796+
// and UIs go silent during real-time tool execution.
797+
func TestToolStage_MultipleTools_ParallelPath_InvokesRawAndProcessForEach(t *testing.T) {
798+
t.Parallel()
799+
var rawMu, procMu sync.Mutex
800+
rawCalls := []string{}
801+
procCalls := []string{}
802+
deps := &PipelineDeps{
803+
// Parallel path requires all three callbacks. ExecuteToolCall is required
804+
// upfront (nil-guard) even though the parallel branch won't invoke it.
805+
ExecuteToolCall: func(_ context.Context, _ *RunState, _ providers.ToolCall) ([]providers.Message, error) {
806+
t.Fatal("ExecuteToolCall must NOT be called when parallel path is active")
807+
return nil, nil
808+
},
809+
ExecuteToolRaw: func(_ context.Context, tc providers.ToolCall) (providers.Message, any, error) {
810+
rawMu.Lock()
811+
rawCalls = append(rawCalls, tc.ID)
812+
rawMu.Unlock()
813+
return providers.Message{Role: "tool", Content: "raw:" + tc.Name, ToolCallID: tc.ID}, nil, nil
814+
},
815+
ProcessToolResult: func(_ context.Context, _ *RunState, tc providers.ToolCall, rawMsg providers.Message, _ any) []providers.Message {
816+
procMu.Lock()
817+
procCalls = append(procCalls, tc.ID)
818+
procMu.Unlock()
819+
return []providers.Message{rawMsg}
820+
},
821+
}
822+
stage := NewToolStage(deps)
823+
state := defaultState()
824+
state.Think.LastResponse = &providers.ChatResponse{
825+
ToolCalls: []providers.ToolCall{
826+
{ID: "1", Name: "tool_a"},
827+
{ID: "2", Name: "tool_b"},
828+
{ID: "3", Name: "tool_c"},
829+
},
830+
}
831+
832+
if err := stage.Execute(context.Background(), state); err != nil {
833+
t.Fatalf("Execute() error: %v", err)
834+
}
835+
836+
// Each tool must be passed through BOTH ExecuteToolRaw (where tool.call emits)
837+
// AND ProcessToolResult (where tool.result emits). Missing either breaks UIs.
838+
if len(rawCalls) != 3 {
839+
t.Errorf("ExecuteToolRaw called %d times, want 3 (one per tool)", len(rawCalls))
840+
}
841+
if len(procCalls) != 3 {
842+
t.Errorf("ProcessToolResult called %d times, want 3", len(procCalls))
843+
}
844+
// ProcessToolResult must run sequentially in original order (deterministic state mutation)
845+
if len(procCalls) == 3 && (procCalls[0] != "1" || procCalls[1] != "2" || procCalls[2] != "3") {
846+
t.Errorf("ProcessToolResult order = %v, want [1 2 3]", procCalls)
847+
}
848+
if state.Tool.TotalToolCalls != 3 {
849+
t.Errorf("TotalToolCalls = %d, want 3", state.Tool.TotalToolCalls)
850+
}
851+
}
852+
792853
func TestToolStage_LoopKilled_ReturnsBreakLoop(t *testing.T) {
793854
t.Parallel()
794855
deps := &PipelineDeps{

internal/store/pg/teams_tasks.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import (
2222
const taskLockDuration = 60 * time.Minute
2323

2424
// taskSelectCols is the shared SELECT column list for task queries (must match scanTaskRowsJoined).
25-
const taskSelectCols = `t.id, t.team_id, t.subject, t.description, t.status, t.owner_agent_id, t.blocked_by, t.priority, t.result, t.user_id, t.channel,
25+
const taskSelectCols = `t.id, t.team_id, t.tenant_id, t.subject, t.description, t.status, t.owner_agent_id, t.blocked_by, t.priority, t.result, t.user_id, t.channel,
2626
t.task_type, t.task_number, COALESCE(t.identifier,''), t.created_by_agent_id, COALESCE(t.assignee_user_id,''), t.parent_id,
2727
COALESCE(t.chat_id,''), t.metadata, t.locked_at, t.lock_expires_at, COALESCE(t.progress_percent,0), COALESCE(t.progress_step,''),
2828
t.followup_at, COALESCE(t.followup_count,0), COALESCE(t.followup_max,0), COALESCE(t.followup_message,''), COALESCE(t.followup_channel,''), COALESCE(t.followup_chat_id,''),
@@ -560,7 +560,7 @@ func scanTaskRowsJoined(rows *sql.Rows) ([]store.TeamTaskData, error) {
560560
var followupCount, followupMax int
561561
var followupMessage, followupChannel, followupChatID string
562562
if err := rows.Scan(
563-
&d.ID, &d.TeamID, &d.Subject, &desc, &d.Status,
563+
&d.ID, &d.TeamID, &d.TenantID, &d.Subject, &desc, &d.Status,
564564
&ownerID, pq.Array(&blockedBy), &d.Priority, &result,
565565
&userID, &channel,
566566
&d.TaskType, &d.TaskNumber, &identifier, &createdByAgentID, &assigneeUserID, &parentID,

internal/store/sqlitestore/teams_tasks.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ const taskLockDuration = 30 * time.Minute
2424
const maxListTasksRows = 30
2525

2626
// taskSelectCols is the shared SELECT column list for task queries.
27-
const taskSelectCols = `t.id, t.team_id, t.subject, t.description, t.status, t.owner_agent_id, t.blocked_by, t.priority, t.result, t.user_id, t.channel,
27+
const taskSelectCols = `t.id, t.team_id, t.tenant_id, t.subject, t.description, t.status, t.owner_agent_id, t.blocked_by, t.priority, t.result, t.user_id, t.channel,
2828
t.task_type, t.task_number, COALESCE(t.identifier,''), t.created_by_agent_id, COALESCE(t.assignee_user_id,''), t.parent_id,
2929
COALESCE(t.chat_id,''), t.metadata, t.locked_at, t.lock_expires_at, COALESCE(t.progress_percent,0), COALESCE(t.progress_step,''),
3030
t.followup_at, COALESCE(t.followup_count,0), COALESCE(t.followup_max,0), COALESCE(t.followup_message,''), COALESCE(t.followup_channel,''), COALESCE(t.followup_chat_id,''),
@@ -510,7 +510,7 @@ func scanTaskRowsJoined(rows *sql.Rows) ([]store.TeamTaskData, error) {
510510
var followupMessage, followupChannel, followupChatID string
511511
createdAt, updatedAt := scanTimePair()
512512
if err := rows.Scan(
513-
&d.ID, &d.TeamID, &d.Subject, &desc, &d.Status,
513+
&d.ID, &d.TeamID, &d.TenantID, &d.Subject, &desc, &d.Status,
514514
&ownerID, &blockedByJSON, &d.Priority, &result,
515515
&userID, &channel,
516516
&d.TaskType, &d.TaskNumber, &identifier, &createdByAgentID, &assigneeUserID, &parentID,

internal/store/team_store.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ type TeamMemberData struct {
9494
type TeamTaskData struct {
9595
BaseModel
9696
TeamID uuid.UUID `json:"team_id" db:"team_id"`
97+
TenantID uuid.UUID `json:"tenant_id" db:"tenant_id"`
9798
Subject string `json:"subject" db:"subject"`
9899
Description string `json:"description,omitempty" db:"description"`
99100
Status string `json:"status" db:"status"`

0 commit comments

Comments
 (0)