Skip to content

Commit 3a92f50

Browse files
fix(cloud): materialize chunk uploads as mutations
1 parent 6d2138c commit 3a92f50

3 files changed

Lines changed: 338 additions & 3 deletions

File tree

internal/cloud/cloudserver/mutations_test.go

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import (
1212
"time"
1313

1414
"github.com/Gentleman-Programming/engram/internal/cloud/cloudstore"
15+
"github.com/Gentleman-Programming/engram/internal/store"
16+
engramsync "github.com/Gentleman-Programming/engram/internal/sync"
1517
)
1618

1719
// ─── Fakes for mutation tests ─────────────────────────────────────────────────
@@ -50,6 +52,125 @@ func (s *fakeMutationStore) IsProjectSyncEnabled(project string) (bool, error) {
5052
return true, nil // default: enabled
5153
}
5254

55+
func (s *fakeMutationStore) WriteChunk(ctx context.Context, project string, chunkID, createdBy, clientCreatedAt string, payload []byte) error {
56+
if _, exists := s.chunks[chunkID]; exists {
57+
return s.fakeStore.WriteChunk(ctx, project, chunkID, createdBy, clientCreatedAt, payload)
58+
}
59+
if err := s.fakeStore.WriteChunk(ctx, project, chunkID, createdBy, clientCreatedAt, payload); err != nil {
60+
return err
61+
}
62+
var chunk engramsync.ChunkData
63+
if err := json.Unmarshal(payload, &chunk); err != nil {
64+
return err
65+
}
66+
batch := make([]MutationEntry, 0, len(chunk.Sessions)+len(chunk.Observations)+len(chunk.Prompts))
67+
for _, session := range chunk.Sessions {
68+
body, _ := json.Marshal(session)
69+
batch = append(batch, MutationEntry{Project: project, Entity: store.SyncEntitySession, EntityKey: strings.TrimSpace(session.ID), Op: store.SyncOpUpsert, Payload: body})
70+
}
71+
for _, observation := range chunk.Observations {
72+
body, _ := json.Marshal(observation)
73+
batch = append(batch, MutationEntry{Project: project, Entity: store.SyncEntityObservation, EntityKey: strings.TrimSpace(observation.SyncID), Op: store.SyncOpUpsert, Payload: body})
74+
}
75+
for _, prompt := range chunk.Prompts {
76+
body, _ := json.Marshal(prompt)
77+
batch = append(batch, MutationEntry{Project: project, Entity: store.SyncEntityPrompt, EntityKey: strings.TrimSpace(prompt.SyncID), Op: store.SyncOpUpsert, Payload: body})
78+
}
79+
_, err := s.InsertMutationBatch(ctx, batch)
80+
return err
81+
}
82+
83+
func TestChunkPushMaterializesMutationsForAutosyncPull(t *testing.T) {
84+
ms := newFakeMutationStore()
85+
srv := newMutationTestServer(ms, "secret", []string{"proj-a"})
86+
body := strings.NewReader(`{
87+
"project":"proj-a",
88+
"created_by":"tester",
89+
"data":{
90+
"sessions":[{"id":"s-1","directory":"/tmp/s-1","started_at":"2026-04-29T10:00:00Z"}],
91+
"observations":[{"sync_id":"obs-1","session_id":"s-1","type":"decision","title":"Decision","content":"Content","scope":"project","created_at":"2026-04-29T10:01:00Z","updated_at":"2026-04-29T10:01:00Z"}],
92+
"prompts":[{"sync_id":"prompt-1","session_id":"s-1","content":"Prompt","created_at":"2026-04-29T10:02:00Z"}]
93+
}
94+
}`)
95+
96+
pushRec := httptest.NewRecorder()
97+
pushReq := httptest.NewRequest(http.MethodPost, "/sync/push", body)
98+
pushReq.Header.Set("Authorization", "Bearer secret")
99+
pushReq.Header.Set("Content-Type", "application/json")
100+
srv.Handler().ServeHTTP(pushRec, pushReq)
101+
if pushRec.Code != http.StatusOK {
102+
t.Fatalf("expected chunk push 200, got %d body=%q", pushRec.Code, pushRec.Body.String())
103+
}
104+
if len(ms.chunks) != 1 {
105+
t.Fatalf("expected one stored chunk, got %d", len(ms.chunks))
106+
}
107+
if len(ms.mutations) != 3 {
108+
t.Fatalf("expected 3 materialized mutations, got %d: %+v", len(ms.mutations), ms.mutations)
109+
}
110+
if ms.mutations[0].Entity != store.SyncEntitySession || ms.mutations[1].Entity != store.SyncEntityObservation || ms.mutations[2].Entity != store.SyncEntityPrompt {
111+
t.Fatalf("expected session/observation/prompt order, got %+v", ms.mutations)
112+
}
113+
if ms.mutations[1].Project != "proj-a" || ms.mutations[1].EntityKey != "obs-1" || ms.mutations[1].Op != store.SyncOpUpsert {
114+
t.Fatalf("unexpected materialized observation mutation: %+v", ms.mutations[1])
115+
}
116+
117+
pullRec := httptest.NewRecorder()
118+
pullReq := httptest.NewRequest(http.MethodGet, "/sync/mutations/pull?since_seq=0&limit=100", nil)
119+
pullReq.Header.Set("Authorization", "Bearer secret")
120+
srv.Handler().ServeHTTP(pullRec, pullReq)
121+
if pullRec.Code != http.StatusOK {
122+
t.Fatalf("expected mutation pull 200, got %d body=%q", pullRec.Code, pullRec.Body.String())
123+
}
124+
var pulled struct {
125+
Mutations []StoredMutation `json:"mutations"`
126+
}
127+
if err := json.NewDecoder(pullRec.Body).Decode(&pulled); err != nil {
128+
t.Fatalf("decode pull response: %v", err)
129+
}
130+
if len(pulled.Mutations) != 3 || pulled.Mutations[1].Entity != store.SyncEntityObservation || pulled.Mutations[1].EntityKey != "obs-1" {
131+
t.Fatalf("expected pulled observation mutation after chunk push, got %+v", pulled.Mutations)
132+
}
133+
}
134+
135+
func TestChunkPushReplayDoesNotDuplicateMaterializedMutations(t *testing.T) {
136+
ms := newFakeMutationStore()
137+
srv := newMutationTestServer(ms, "secret", []string{"proj-a"})
138+
body := `{"project":"proj-a","created_by":"tester","data":{"sessions":[{"id":"s-1","directory":"/tmp/s-1"}],"observations":[{"sync_id":"obs-1","session_id":"s-1","type":"decision","title":"Decision","content":"Content","scope":"project"}]}}`
139+
140+
for i := 0; i < 2; i++ {
141+
rec := httptest.NewRecorder()
142+
req := httptest.NewRequest(http.MethodPost, "/sync/push", strings.NewReader(body))
143+
req.Header.Set("Authorization", "Bearer secret")
144+
req.Header.Set("Content-Type", "application/json")
145+
srv.Handler().ServeHTTP(rec, req)
146+
if rec.Code != http.StatusOK {
147+
t.Fatalf("push %d expected 200, got %d body=%q", i+1, rec.Code, rec.Body.String())
148+
}
149+
}
150+
if len(ms.chunks) != 1 {
151+
t.Fatalf("expected one stored chunk after replay, got %d", len(ms.chunks))
152+
}
153+
if len(ms.mutations) != 2 {
154+
t.Fatalf("expected replay to keep 2 materialized mutations, got %d: %+v", len(ms.mutations), ms.mutations)
155+
}
156+
}
157+
158+
func TestMalformedChunkRejectsBeforeMaterialization(t *testing.T) {
159+
ms := newFakeMutationStore()
160+
srv := newMutationTestServer(ms, "secret", []string{"proj-a"})
161+
rec := httptest.NewRecorder()
162+
req := httptest.NewRequest(http.MethodPost, "/sync/push", strings.NewReader(`{"project":"proj-a","created_by":"tester","data":{"observations":[{"sync_id":"obs-1","session_id":"missing","type":"decision","title":"Decision","content":"Content","scope":"project"}]}}`))
163+
req.Header.Set("Authorization", "Bearer secret")
164+
req.Header.Set("Content-Type", "application/json")
165+
srv.Handler().ServeHTTP(rec, req)
166+
if rec.Code != http.StatusBadRequest {
167+
t.Fatalf("expected malformed chunk 400, got %d body=%q", rec.Code, rec.Body.String())
168+
}
169+
if len(ms.chunks) != 0 || len(ms.mutations) != 0 {
170+
t.Fatalf("expected no chunk or mutation after malformed push, chunks=%d mutations=%d", len(ms.chunks), len(ms.mutations))
171+
}
172+
}
173+
53174
func (s *fakeMutationStore) InsertMutationBatch(ctx context.Context, batch []MutationEntry) ([]int64, error) {
54175
if s.errInsert != nil {
55176
return nil, s.errInsert

internal/cloud/cloudstore/cloudstore.go

Lines changed: 99 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
"github.com/Gentleman-Programming/engram/internal/cloud"
1515
"github.com/Gentleman-Programming/engram/internal/cloud/chunkcodec"
16+
"github.com/Gentleman-Programming/engram/internal/store"
1617
engramsync "github.com/Gentleman-Programming/engram/internal/sync"
1718
"github.com/jackc/pgx/v5/pgconn"
1819
_ "github.com/jackc/pgx/v5/stdlib"
@@ -230,8 +231,27 @@ func (cs *CloudStore) WriteChunk(ctx context.Context, project, chunkID, createdB
230231
return fmt.Errorf("cloudstore: read existing chunk: %w", err)
231232
}
232233

234+
chunk, err := parseChunkData(payload)
235+
if err != nil {
236+
return fmt.Errorf("cloudstore: parse chunk for materialization: %w", err)
237+
}
238+
mutations, err := materializedChunkMutations(project, chunk)
239+
if err != nil {
240+
return err
241+
}
242+
243+
tx, err := cs.db.BeginTx(ctx, nil)
244+
if err != nil {
245+
return fmt.Errorf("cloudstore: begin write chunk tx: %w", err)
246+
}
247+
defer func() {
248+
if tx != nil {
249+
_ = tx.Rollback()
250+
}
251+
}()
252+
233253
counts := summarizeChunk(payload)
234-
_, err = cs.db.ExecContext(ctx, `
254+
_, err = tx.ExecContext(ctx, `
235255
INSERT INTO cloud_chunks (project_name, chunk_id, created_by, client_created_at, payload, sessions_count, observations_count, prompts_count)
236256
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`,
237257
project, strings.TrimSpace(chunkID), strings.TrimSpace(createdBy), originCreatedAt, payload, counts.sessions, counts.observations, counts.prompts)
@@ -246,9 +266,16 @@ func (cs *CloudStore) WriteChunk(ctx context.Context, project, chunkID, createdB
246266
}
247267
return fmt.Errorf("cloudstore: write chunk: %w", err)
248268
}
249-
if err := cs.indexChunkSessions(ctx, project, payload); err != nil {
269+
if err := cs.indexChunkSessionsWith(ctx, tx, project, payload); err != nil {
270+
return err
271+
}
272+
if err := insertMaterializedMutations(ctx, tx, mutations); err != nil {
250273
return err
251274
}
275+
if err := tx.Commit(); err != nil {
276+
return fmt.Errorf("cloudstore: commit write chunk: %w", err)
277+
}
278+
tx = nil
252279
cs.invalidateDashboardReadModel()
253280
return nil
254281
}
@@ -296,12 +323,20 @@ func (cs *CloudStore) KnownSessionIDs(ctx context.Context, project string) (map[
296323
}
297324

298325
func (cs *CloudStore) indexChunkSessions(ctx context.Context, project string, payload []byte) error {
326+
return cs.indexChunkSessionsWith(ctx, cs.db, project, payload)
327+
}
328+
329+
type chunkSessionIndexer interface {
330+
ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
331+
}
332+
333+
func (cs *CloudStore) indexChunkSessionsWith(ctx context.Context, execer chunkSessionIndexer, project string, payload []byte) error {
299334
sessionIDs := collectSessionIDsFromPayload(payload)
300335
if len(sessionIDs) == 0 {
301336
return nil
302337
}
303338
for sessionID := range sessionIDs {
304-
if _, err := cs.db.ExecContext(ctx,
339+
if _, err := execer.ExecContext(ctx,
305340
`INSERT INTO cloud_project_sessions (project_name, session_id) VALUES ($1, $2) ON CONFLICT (project_name, session_id) DO NOTHING`,
306341
project, sessionID,
307342
); err != nil {
@@ -311,6 +346,67 @@ func (cs *CloudStore) indexChunkSessions(ctx context.Context, project string, pa
311346
return nil
312347
}
313348

349+
func materializedChunkMutations(project string, chunk engramsync.ChunkData) ([]MutationEntry, error) {
350+
project = strings.TrimSpace(project)
351+
entries := make([]MutationEntry, 0, len(chunk.Sessions)+len(chunk.Observations)+len(chunk.Prompts))
352+
353+
for i, session := range chunk.Sessions {
354+
entityKey := strings.TrimSpace(session.ID)
355+
if entityKey == "" {
356+
return nil, fmt.Errorf("cloudstore: materialize chunk: sessions[%d].id is required", i)
357+
}
358+
payload, err := json.Marshal(session)
359+
if err != nil {
360+
return nil, fmt.Errorf("cloudstore: materialize chunk session %q: %w", entityKey, err)
361+
}
362+
entries = append(entries, MutationEntry{Project: project, Entity: store.SyncEntitySession, EntityKey: entityKey, Op: store.SyncOpUpsert, Payload: payload})
363+
}
364+
365+
for i, observation := range chunk.Observations {
366+
entityKey := strings.TrimSpace(observation.SyncID)
367+
if entityKey == "" {
368+
return nil, fmt.Errorf("cloudstore: materialize chunk: observations[%d].sync_id is required", i)
369+
}
370+
payload, err := json.Marshal(observation)
371+
if err != nil {
372+
return nil, fmt.Errorf("cloudstore: materialize chunk observation %q: %w", entityKey, err)
373+
}
374+
entries = append(entries, MutationEntry{Project: project, Entity: store.SyncEntityObservation, EntityKey: entityKey, Op: store.SyncOpUpsert, Payload: payload})
375+
}
376+
377+
for i, prompt := range chunk.Prompts {
378+
entityKey := strings.TrimSpace(prompt.SyncID)
379+
if entityKey == "" {
380+
return nil, fmt.Errorf("cloudstore: materialize chunk: prompts[%d].sync_id is required", i)
381+
}
382+
payload, err := json.Marshal(prompt)
383+
if err != nil {
384+
return nil, fmt.Errorf("cloudstore: materialize chunk prompt %q: %w", entityKey, err)
385+
}
386+
entries = append(entries, MutationEntry{Project: project, Entity: store.SyncEntityPrompt, EntityKey: entityKey, Op: store.SyncOpUpsert, Payload: payload})
387+
}
388+
389+
return entries, nil
390+
}
391+
392+
func insertMaterializedMutations(ctx context.Context, tx *sql.Tx, entries []MutationEntry) error {
393+
for _, entry := range entries {
394+
payload := entry.Payload
395+
if len(payload) == 0 {
396+
payload = json.RawMessage("{}")
397+
}
398+
_, err := tx.ExecContext(ctx, `
399+
INSERT INTO cloud_mutations (project, entity, entity_key, op, payload)
400+
VALUES ($1, $2, $3, $4, $5)`,
401+
strings.TrimSpace(entry.Project), strings.TrimSpace(entry.Entity), strings.TrimSpace(entry.EntityKey), strings.TrimSpace(entry.Op), payload,
402+
)
403+
if err != nil {
404+
return fmt.Errorf("cloudstore: insert materialized chunk mutation %s/%s/%s: %w", entry.Project, entry.Entity, entry.EntityKey, err)
405+
}
406+
}
407+
return nil
408+
}
409+
314410
func (cs *CloudStore) backfillProjectSessionsFromChunks(ctx context.Context) error {
315411
rows, err := cs.db.QueryContext(ctx, `SELECT project_name, payload FROM cloud_chunks`)
316412
if err != nil {

0 commit comments

Comments
 (0)