Skip to content

Commit 4f6b30e

Browse files
fix(sync): start autosync in mcp mode
1 parent 2bbd68d commit 4f6b30e

4 files changed

Lines changed: 276 additions & 2 deletions

File tree

DOCS.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1144,9 +1144,15 @@ ENGRAM_CLOUD_AUTOSYNC=1 \
11441144
ENGRAM_CLOUD_TOKEN=your-token \
11451145
ENGRAM_CLOUD_SERVER=https://cloud.engram.example.com \
11461146
engram serve
1147+
1148+
# Or, for stdio MCP agents:
1149+
ENGRAM_CLOUD_AUTOSYNC=1 \
1150+
ENGRAM_CLOUD_TOKEN=your-token \
1151+
ENGRAM_CLOUD_SERVER=https://cloud.engram.example.com \
1152+
engram mcp
11471153
```
11481154

1149-
Missing `ENGRAM_CLOUD_TOKEN` or `ENGRAM_CLOUD_SERVER` logs an `ERROR` and disables autosync gracefully — the server still starts.
1155+
Missing `ENGRAM_CLOUD_TOKEN` or `ENGRAM_CLOUD_SERVER` logs an `ERROR` and disables autosync gracefully — `engram serve` or `engram mcp` still starts.
11501156

11511157
### Autosync Phase Table
11521158

cmd/engram/main.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -847,11 +847,30 @@ func cmdMCP(cfg store.Config) {
847847
}
848848
defer s.Close()
849849

850+
// Match `engram serve` autosync startup semantics for stdio MCP agents.
851+
// Autosync remains opt-in via ENGRAM_CLOUD_AUTOSYNC=1 and never makes MCP
852+
// startup fatal when cloud config is missing or invalid.
853+
ctx, cancel := context.WithCancel(context.Background())
854+
_, mgrStop := tryStartAutosync(ctx, s, cfg)
855+
autosyncStopped := false
856+
stopAutosync := func() {
857+
if autosyncStopped {
858+
return
859+
}
860+
autosyncStopped = true
861+
cancel()
862+
if mgrStop != nil {
863+
mgrStop()
864+
}
865+
}
866+
defer stopAutosync()
867+
850868
mcpCfg := mcp.MCPConfig{}
851869
allowlist := resolveMCPTools(toolsFilter)
852870
mcpSrv := newMCPServerWithConfig(s, mcpCfg, allowlist)
853871

854872
if err := serveMCP(mcpSrv); err != nil {
873+
stopAutosync()
855874
fatal(err)
856875
}
857876
}

cmd/engram/main_extra_test.go

Lines changed: 249 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"os"
1313
"path/filepath"
1414
"strings"
15+
"sync"
1516
"testing"
1617
"time"
1718

@@ -495,10 +496,15 @@ func TestTryStartAutosyncReturnsStopFn(t *testing.T) {
495496
// Run exits immediately (no goroutines); Stop is synchronous and calls stopFn.
496497
// BR2-3: Used to stub newAutosyncManager in tests.
497498
type fakeStartableManager struct {
499+
runFn func(context.Context)
498500
stopFn func()
499501
}
500502

501-
func (f *fakeStartableManager) Run(_ context.Context) {} // exits immediately — no goroutine spawned
503+
func (f *fakeStartableManager) Run(ctx context.Context) {
504+
if f.runFn != nil {
505+
f.runFn(ctx)
506+
}
507+
} // exits immediately — no goroutine spawned unless runFn blocks
502508
func (f *fakeStartableManager) Stop() {
503509
if f.stopFn != nil {
504510
f.stopFn()
@@ -3933,6 +3939,45 @@ func TestCmdMCP(t *testing.T) {
39333939
}
39343940
})
39353941

3942+
t.Run("cloud autosync env with token and server starts and stops manager", func(t *testing.T) {
3943+
t.Setenv("ENGRAM_CLOUD_AUTOSYNC", "1")
3944+
t.Setenv("ENGRAM_CLOUD_TOKEN", "tok")
3945+
t.Setenv("ENGRAM_CLOUD_SERVER", "http://localhost:9999")
3946+
3947+
runStarted := make(chan struct{}, 1)
3948+
stopCalled := make(chan struct{}, 1)
3949+
oldNewAutosyncManager := newAutosyncManager
3950+
newAutosyncManager = func(_ *store.Store, _ autosync.CloudTransport, _ autosync.Config) startableAutosyncManager {
3951+
return &fakeStartableManager{
3952+
runFn: func(context.Context) { runStarted <- struct{}{} },
3953+
stopFn: func() { stopCalled <- struct{}{} },
3954+
}
3955+
}
3956+
t.Cleanup(func() { newAutosyncManager = oldNewAutosyncManager })
3957+
3958+
serveMCP = func(_ *mcpserver.MCPServer, _ ...mcpserver.StdioOption) error {
3959+
select {
3960+
case <-runStarted:
3961+
return nil
3962+
case <-time.After(time.Second):
3963+
t.Fatal("expected MCP autosync manager to start before serving returned")
3964+
return nil
3965+
}
3966+
}
3967+
3968+
withArgs(t, "engram", "mcp")
3969+
_, _, recovered := captureOutputAndRecover(t, func() { cmdMCP(cfg) })
3970+
if recovered != nil {
3971+
t.Fatalf("expected clean run, got panic=%v", recovered)
3972+
}
3973+
select {
3974+
case <-stopCalled:
3975+
// expected
3976+
default:
3977+
t.Fatal("expected MCP autosync manager to stop after stdio server exits")
3978+
}
3979+
})
3980+
39363981
t.Run("storeNew failure calls fatal", func(t *testing.T) {
39373982
storeNew = func(cfg store.Config) (*store.Store, error) {
39383983
return nil, errors.New("db open failed")
@@ -3952,3 +3997,206 @@ func TestCmdMCP(t *testing.T) {
39523997
assertFatal(t, stderr, recovered, "stdio failed")
39533998
})
39543999
}
4000+
4001+
func TestCmdMCPAutosyncPushesWriteDuringServe(t *testing.T) {
4002+
cfg := testConfig(t)
4003+
stubExitWithPanic(t)
4004+
4005+
var mu sync.Mutex
4006+
var pushed []autosync.MutationEntry
4007+
observationPushed := make(chan struct{})
4008+
var closeObservationPushed sync.Once
4009+
4010+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
4011+
if got := r.Header.Get("Authorization"); got != "Bearer test-token" {
4012+
http.Error(w, "unauthorized", http.StatusUnauthorized)
4013+
return
4014+
}
4015+
4016+
switch r.URL.Path {
4017+
case "/sync/mutations/push":
4018+
var req struct {
4019+
Entries []autosync.MutationEntry `json:"entries"`
4020+
}
4021+
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
4022+
http.Error(w, err.Error(), http.StatusBadRequest)
4023+
return
4024+
}
4025+
4026+
mu.Lock()
4027+
pushed = append(pushed, req.Entries...)
4028+
for _, entry := range req.Entries {
4029+
if entry.Project == "engram" && entry.Entity == store.SyncEntityObservation && strings.Contains(string(entry.Payload), "mcp autosync proof") {
4030+
closeObservationPushed.Do(func() { close(observationPushed) })
4031+
}
4032+
}
4033+
mu.Unlock()
4034+
4035+
seqs := make([]int64, len(req.Entries))
4036+
for i := range seqs {
4037+
seqs[i] = int64(i + 1)
4038+
}
4039+
w.Header().Set("Content-Type", "application/json")
4040+
_ = json.NewEncoder(w).Encode(map[string]any{"accepted_seqs": seqs})
4041+
4042+
case "/sync/mutations/pull":
4043+
w.Header().Set("Content-Type", "application/json")
4044+
_ = json.NewEncoder(w).Encode(map[string]any{
4045+
"mutations": []any{},
4046+
"has_more": false,
4047+
"latest_seq": 0,
4048+
})
4049+
4050+
default:
4051+
http.Error(w, "not found", http.StatusNotFound)
4052+
}
4053+
}))
4054+
defer srv.Close()
4055+
4056+
t.Setenv("ENGRAM_CLOUD_AUTOSYNC", "1")
4057+
t.Setenv("ENGRAM_CLOUD_TOKEN", "test-token")
4058+
t.Setenv("ENGRAM_CLOUD_SERVER", srv.URL)
4059+
4060+
oldStoreNew := storeNew
4061+
oldNewMCPServerWithConfig := newMCPServerWithConfig
4062+
oldServeMCP := serveMCP
4063+
oldNewAutosyncManager := newAutosyncManager
4064+
storeNew = store.New
4065+
t.Cleanup(func() {
4066+
storeNew = oldStoreNew
4067+
newMCPServerWithConfig = oldNewMCPServerWithConfig
4068+
serveMCP = oldServeMCP
4069+
newAutosyncManager = oldNewAutosyncManager
4070+
})
4071+
4072+
var mcpStore *store.Store
4073+
newMCPServerWithConfig = func(s *store.Store, _ mcp.MCPConfig, _ map[string]bool) *mcpserver.MCPServer {
4074+
mcpStore = s
4075+
return mcpserver.NewMCPServer("test", "0")
4076+
}
4077+
newAutosyncManager = func(s *store.Store, transport autosync.CloudTransport, cfg autosync.Config) startableAutosyncManager {
4078+
cfg.DebounceDuration = 5 * time.Millisecond
4079+
cfg.PollInterval = 10 * time.Millisecond
4080+
cfg.BaseBackoff = 20 * time.Millisecond
4081+
cfg.MaxBackoff = 50 * time.Millisecond
4082+
return autosync.New(s, transport, cfg)
4083+
}
4084+
serveMCP = func(_ *mcpserver.MCPServer, _ ...mcpserver.StdioOption) error {
4085+
if mcpStore == nil {
4086+
return errors.New("MCP store was not wired into server construction")
4087+
}
4088+
if err := mcpStore.EnrollProject("engram"); err != nil {
4089+
return fmt.Errorf("enroll project: %w", err)
4090+
}
4091+
if err := mcpStore.CreateSession("mcp-autosync-session", "engram", t.TempDir()); err != nil {
4092+
return fmt.Errorf("create session: %w", err)
4093+
}
4094+
if _, err := mcpStore.AddObservation(store.AddObservationParams{
4095+
SessionID: "mcp-autosync-session",
4096+
Type: "bugfix",
4097+
Title: "mcp autosync proof",
4098+
Content: "mcp autosync proof mutation created during stdio serving",
4099+
Project: "engram",
4100+
Scope: "project",
4101+
}); err != nil {
4102+
return fmt.Errorf("add observation: %w", err)
4103+
}
4104+
4105+
select {
4106+
case <-observationPushed:
4107+
return nil
4108+
case <-time.After(2 * time.Second):
4109+
return errors.New("timed out waiting for autosync to push MCP write")
4110+
}
4111+
}
4112+
4113+
withArgs(t, "engram", "mcp")
4114+
_, stderr, recovered := captureOutputAndRecover(t, func() { cmdMCP(cfg) })
4115+
if recovered != nil || stderr != "" {
4116+
t.Fatalf("expected MCP autosync proof to complete cleanly, panic=%v stderr=%q", recovered, stderr)
4117+
}
4118+
4119+
mu.Lock()
4120+
defer mu.Unlock()
4121+
if len(pushed) == 0 {
4122+
t.Fatal("expected remote mutation endpoint to receive at least one pushed mutation")
4123+
}
4124+
}
4125+
4126+
func TestCmdMCPAutosyncPollTickerPullsDuringServe(t *testing.T) {
4127+
cfg := testConfig(t)
4128+
stubExitWithPanic(t)
4129+
4130+
pullCalled := make(chan struct{})
4131+
var closePullCalled sync.Once
4132+
4133+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
4134+
if got := r.Header.Get("Authorization"); got != "Bearer test-token" {
4135+
http.Error(w, "unauthorized", http.StatusUnauthorized)
4136+
return
4137+
}
4138+
4139+
switch r.URL.Path {
4140+
case "/sync/mutations/pull":
4141+
if r.Method != http.MethodGet {
4142+
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
4143+
return
4144+
}
4145+
closePullCalled.Do(func() { close(pullCalled) })
4146+
w.Header().Set("Content-Type", "application/json")
4147+
_ = json.NewEncoder(w).Encode(map[string]any{
4148+
"mutations": []any{},
4149+
"has_more": false,
4150+
"latest_seq": 0,
4151+
})
4152+
4153+
case "/sync/mutations/push":
4154+
http.Error(w, "unexpected push without MCP write", http.StatusInternalServerError)
4155+
4156+
default:
4157+
http.Error(w, "not found", http.StatusNotFound)
4158+
}
4159+
}))
4160+
defer srv.Close()
4161+
4162+
t.Setenv("ENGRAM_CLOUD_AUTOSYNC", "1")
4163+
t.Setenv("ENGRAM_CLOUD_TOKEN", "test-token")
4164+
t.Setenv("ENGRAM_CLOUD_SERVER", srv.URL)
4165+
4166+
oldStoreNew := storeNew
4167+
oldNewMCPServerWithConfig := newMCPServerWithConfig
4168+
oldServeMCP := serveMCP
4169+
oldNewAutosyncManager := newAutosyncManager
4170+
storeNew = store.New
4171+
t.Cleanup(func() {
4172+
storeNew = oldStoreNew
4173+
newMCPServerWithConfig = oldNewMCPServerWithConfig
4174+
serveMCP = oldServeMCP
4175+
newAutosyncManager = oldNewAutosyncManager
4176+
})
4177+
4178+
newMCPServerWithConfig = func(s *store.Store, _ mcp.MCPConfig, _ map[string]bool) *mcpserver.MCPServer {
4179+
return mcpserver.NewMCPServer("test", "0")
4180+
}
4181+
newAutosyncManager = func(s *store.Store, transport autosync.CloudTransport, cfg autosync.Config) startableAutosyncManager {
4182+
cfg.DebounceDuration = time.Hour
4183+
cfg.PollInterval = 10 * time.Millisecond
4184+
cfg.BaseBackoff = 20 * time.Millisecond
4185+
cfg.MaxBackoff = 50 * time.Millisecond
4186+
return autosync.New(s, transport, cfg)
4187+
}
4188+
serveMCP = func(_ *mcpserver.MCPServer, _ ...mcpserver.StdioOption) error {
4189+
select {
4190+
case <-pullCalled:
4191+
return nil
4192+
case <-time.After(2 * time.Second):
4193+
return errors.New("timed out waiting for autosync poll ticker pull during MCP serve")
4194+
}
4195+
}
4196+
4197+
withArgs(t, "engram", "mcp")
4198+
_, stderr, recovered := captureOutputAndRecover(t, func() { cmdMCP(cfg) })
4199+
if recovered != nil || stderr != "" {
4200+
t.Fatalf("expected MCP autosync poll ticker proof to complete cleanly, panic=%v stderr=%q", recovered, stderr)
4201+
}
4202+
}

docs/AGENT-SETUP.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -586,6 +586,7 @@ engram mcp
586586
```
587587

588588
The process logs `[autosync] started (server=...)` on success. Missing token or server URL logs `[autosync] ERROR: ...` and the process starts normally without autosync.
589+
For `engram mcp`, autosync runs for the lifetime of the stdio MCP process and is stopped when that process exits.
589590

590591
---
591592

0 commit comments

Comments
 (0)