Skip to content

Commit cfe66b3

Browse files
fix(mcp): serialize write tools
1 parent 3687c2f commit cfe66b3

5 files changed

Lines changed: 481 additions & 12 deletions

File tree

internal/mcp/mcp.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,8 @@ func shouldRegister(name string, allowlist map[string]bool) bool {
237237
}
238238

239239
func registerTools(srv *server.MCPServer, s *store.Store, cfg MCPConfig, allowlist map[string]bool, activity *SessionActivity) {
240+
writeQueue := newWriteQueue(defaultMCPWriteQueueSize)
241+
240242
// ─── mem_search (profile: agent, core — always in context) ─────────
241243
if shouldRegister("mem_search", allowlist) {
242244
srv.AddTool(
@@ -324,7 +326,7 @@ Examples:
324326
mcp.Description("Optional topic identifier for upserts (e.g. architecture/auth-model). Reuses and updates the latest observation in same project+scope."),
325327
),
326328
),
327-
handleSave(s, cfg, activity),
329+
queuedWriteHandler(writeQueue, handleSave(s, cfg, activity)),
328330
)
329331
}
330332

@@ -359,7 +361,7 @@ Examples:
359361
mcp.Description("New topic key (normalized internally)"),
360362
),
361363
),
362-
handleUpdate(s),
364+
queuedWriteHandler(writeQueue, handleUpdate(s)),
363365
)
364366
}
365367

@@ -407,7 +409,7 @@ Examples:
407409
mcp.Description("If true, permanently deletes the observation"),
408410
),
409411
),
410-
handleDelete(s),
412+
queuedWriteHandler(writeQueue, handleDelete(s)),
411413
)
412414
}
413415

@@ -429,7 +431,7 @@ Examples:
429431
mcp.Description("Session ID to associate with (default: manual-save-{project})"),
430432
),
431433
),
432-
handleSavePrompt(s, cfg),
434+
queuedWriteHandler(writeQueue, handleSavePrompt(s, cfg)),
433435
)
434436
}
435437

@@ -570,7 +572,7 @@ GUIDELINES:
570572
),
571573
// project field intentionally omitted — auto-detect only (REQ-308 write-tool contract)
572574
),
573-
handleSessionSummary(s, cfg, activity),
575+
queuedWriteHandler(writeQueue, handleSessionSummary(s, cfg, activity)),
574576
)
575577
}
576578

@@ -593,7 +595,7 @@ GUIDELINES:
593595
mcp.Description("Working directory"),
594596
),
595597
),
596-
handleSessionStart(s, cfg, activity),
598+
queuedWriteHandler(writeQueue, handleSessionStart(s, cfg, activity)),
597599
)
598600
}
599601

@@ -616,7 +618,7 @@ GUIDELINES:
616618
mcp.Description("Summary of what was accomplished"),
617619
),
618620
),
619-
handleSessionEnd(s, cfg, activity),
621+
queuedWriteHandler(writeQueue, handleSessionEnd(s, cfg, activity)),
620622
)
621623
}
622624

@@ -646,7 +648,7 @@ Duplicates are automatically detected and skipped — safe to call multiple time
646648
mcp.Description("Source identifier (e.g. 'subagent-stop', 'session-end')"),
647649
),
648650
),
649-
handleCapturePassive(s, cfg, activity),
651+
queuedWriteHandler(writeQueue, handleCapturePassive(s, cfg, activity)),
650652
)
651653
}
652654

@@ -670,7 +672,7 @@ Duplicates are automatically detected and skipped — safe to call multiple time
670672
mcp.Description("The canonical project name to merge INTO (e.g. 'engram')"),
671673
),
672674
),
673-
handleMergeProjects(s),
675+
queuedWriteHandler(writeQueue, handleMergeProjects(s)),
674676
)
675677
}
676678

@@ -739,7 +741,7 @@ Re-judging an already-judged ID overwrites the verdict (deliberate revision).`),
739741
mcp.Description("Session ID for provenance (default: auto)"),
740742
),
741743
),
742-
handleJudge(s, activity),
744+
queuedWriteHandler(writeQueue, handleJudge(s, activity)),
743745
)
744746
}
745747
}

internal/mcp/write_queue.go

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package mcp
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
8+
mcppkg "github.com/mark3labs/mcp-go/mcp"
9+
"github.com/mark3labs/mcp-go/server"
10+
)
11+
12+
const defaultMCPWriteQueueSize = 32
13+
14+
var errMCPWriteQueueFull = errors.New("engram: mcp write queue is full; retry shortly")
15+
16+
type writeQueue struct {
17+
jobs chan writeJob
18+
}
19+
20+
type writeJob struct {
21+
ctx context.Context
22+
run func(context.Context) (*mcppkg.CallToolResult, error)
23+
result chan writeResult
24+
}
25+
26+
type writeResult struct {
27+
result *mcppkg.CallToolResult
28+
err error
29+
}
30+
31+
func newWriteQueue(size int) *writeQueue {
32+
if size <= 0 {
33+
size = defaultMCPWriteQueueSize
34+
}
35+
q := &writeQueue{jobs: make(chan writeJob, size)}
36+
go q.run()
37+
return q
38+
}
39+
40+
func (q *writeQueue) run() {
41+
for job := range q.jobs {
42+
if err := job.ctx.Err(); err != nil {
43+
job.result <- writeResult{err: err}
44+
continue
45+
}
46+
47+
result, err := runWriteJob(job)
48+
job.result <- writeResult{result: result, err: err}
49+
}
50+
}
51+
52+
func runWriteJob(job writeJob) (result *mcppkg.CallToolResult, err error) {
53+
defer func() {
54+
if recovered := recover(); recovered != nil {
55+
result = nil
56+
err = fmt.Errorf("mcp write handler panic: %v", recovered)
57+
}
58+
}()
59+
60+
return job.run(job.ctx)
61+
}
62+
63+
func (q *writeQueue) Do(ctx context.Context, run func(context.Context) (*mcppkg.CallToolResult, error)) (*mcppkg.CallToolResult, error) {
64+
if err := ctx.Err(); err != nil {
65+
return nil, err
66+
}
67+
68+
job := writeJob{
69+
ctx: ctx,
70+
run: run,
71+
result: make(chan writeResult, 1),
72+
}
73+
74+
select {
75+
case q.jobs <- job:
76+
// Enqueued.
77+
default:
78+
return nil, errMCPWriteQueueFull
79+
}
80+
81+
// The worker owns the post-enqueue cancellation decision. Returning directly
82+
// on ctx.Done() here can race with the worker starting the job: the caller may
83+
// see cancellation while the handler is about to mutate SQLite. Waiting for the
84+
// worker's result makes the outcome deterministic: queued canceled jobs are
85+
// skipped by the worker before start, while started jobs finish and return the
86+
// handler result.
87+
res := <-job.result
88+
return res.result, res.err
89+
}
90+
91+
func queuedWriteHandler(q *writeQueue, h server.ToolHandlerFunc) server.ToolHandlerFunc {
92+
return func(ctx context.Context, req mcppkg.CallToolRequest) (*mcppkg.CallToolResult, error) {
93+
result, err := q.Do(ctx, func(runCtx context.Context) (*mcppkg.CallToolResult, error) {
94+
return h(runCtx, req)
95+
})
96+
if err == nil {
97+
return result, nil
98+
}
99+
if errors.Is(err, errMCPWriteQueueFull) || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
100+
return mcppkg.NewToolResultError(fmt.Sprintf("MCP write queue error: %s", err)), nil
101+
}
102+
return nil, err
103+
}
104+
}

0 commit comments

Comments
 (0)