Skip to content

Commit 150b32a

Browse files
Merge pull request #322 from Gentleman-Programming/fix/cloud-chunks-observation-materialization
fix(sync): materialize mutation chunks for recovery
2 parents 402fd40 + 32afbf5 commit 150b32a

4 files changed

Lines changed: 697 additions & 3 deletions

File tree

DOCS.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,7 @@ Inspect or replay the `sync_apply_deferred` queue.
393393
- `engram cloud upgrade bootstrap --project <project> [--resume]` — resumable checkpointed enroll/push/verify flow
394394
- `engram cloud upgrade status --project <project>` — show upgrade stage/class/reason
395395
- `engram cloud upgrade rollback --project <project>` — restore pre-upgrade local snapshot before `bootstrap_verified`; blocked afterwards
396+
- `engram cloud repair materialize-mutations --project <project> (--dry-run|--apply)` — explicit server-side Postgres repair that backfills existing `cloud_mutations` into compatible `cloud_chunks` without deleting remote data
396397

397398
Cloud auth token is provided at runtime via `ENGRAM_CLOUD_TOKEN` (not by a dedicated CLI subcommand).
398399
Cloud server startup fails closed when the token is missing unless `ENGRAM_CLOUD_INSECURE_NO_AUTH=1` is explicitly set for local insecure development.
@@ -443,6 +444,17 @@ engram sync --cloud --project <project>
443444

444445
Sync/autosync never auto-applies repairs; only the explicit `repair --apply` command mutates local repairable upgrade state.
445446

447+
For cloud servers that already accepted mutation pushes before mutation payloads were materialized into chunk history, run the server-side backfill against the Postgres DSN used by `engram cloud serve`:
448+
449+
```bash
450+
ENGRAM_DATABASE_URL='postgres://...' engram cloud repair materialize-mutations --project <project> --dry-run
451+
ENGRAM_DATABASE_URL='postgres://...' engram cloud repair materialize-mutations --project <project> --apply
452+
```
453+
454+
The backfill is project-scoped, non-destructive, and idempotent: it inserts missing compatible chunks and leaves existing `cloud_mutations` and chunks in place.
455+
456+
`engram cloud serve` also runs this materialization repair automatically for every configured `ENGRAM_CLOUD_ALLOWED_PROJECTS` entry at startup. The explicit repair command remains available for operator verification, dry-runs, and re-running a project after an upgrade.
457+
446458
### Local Cloud Bring-Up (Docker + Postgres)
447459

448460
```bash

cmd/engram/cloud.go

Lines changed: 73 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,10 @@ var newCloudRuntime = func(cfg cloud.Config) (cloudServerRuntime, error) {
8484
return nil, err
8585
}
8686
allowedProjects := normalizeAllowedProjects(cfg.AllowedProjects)
87+
if err := backfillAllowedProjectMutationChunks(context.Background(), cs, allowedProjects); err != nil {
88+
_ = cs.Close()
89+
return nil, err
90+
}
8791
projectAuth := auth.NewProjectScopeAuthorizer(allowedProjects)
8892
token := strings.TrimSpace(os.Getenv("ENGRAM_CLOUD_TOKEN"))
8993
cs.SetDashboardAllowedProjects(allowedProjects)
@@ -114,6 +118,22 @@ var newCloudRuntime = func(cfg cloud.Config) (cloudServerRuntime, error) {
114118
}, nil
115119
}
116120

121+
func backfillAllowedProjectMutationChunks(ctx context.Context, cs *cloudstore.CloudStore, projects []string) error {
122+
for _, project := range projects {
123+
report, err := cs.BackfillMutationChunks(ctx, project, true)
124+
if err != nil {
125+
return fmt.Errorf("cloud repair materialize-mutations for project %q: %w", project, err)
126+
}
127+
if report.CandidateMutations > 0 || report.ChunksInserted > 0 {
128+
fmt.Fprintf(os.Stderr,
129+
"engram cloud repair materialize-mutations: project=%s candidates=%d already_materialized=%d chunks_planned=%d chunks_inserted=%d\n",
130+
report.Project, report.CandidateMutations, report.AlreadyMaterialized, report.ChunksPlanned, report.ChunksInserted,
131+
)
132+
}
133+
}
134+
return nil
135+
}
136+
117137
var runUpgradeBootstrap = func(s *store.Store, project string, cc *cloudConfig) (*engramsync.UpgradeBootstrapResult, error) {
118138
transport, err := remote.NewRemoteTransport(cc.ServerURL, cc.Token, project)
119139
if err != nil {
@@ -130,12 +150,12 @@ type cloudConfig struct {
130150
func cmdCloud(cfg store.Config) {
131151
if len(os.Args) < 3 {
132152
fmt.Fprintln(os.Stderr, "usage: engram cloud <subcommand> [options]")
133-
fmt.Fprintln(os.Stderr, "supported subcommands: status, enroll, config, serve, upgrade")
153+
fmt.Fprintln(os.Stderr, "supported subcommands: status, enroll, config, serve, upgrade, repair")
134154
exitFunc(1)
135155
}
136156
if os.Args[2] == "--help" || os.Args[2] == "-h" || os.Args[2] == "help" {
137157
fmt.Println("usage: engram cloud <subcommand> [options]")
138-
fmt.Println("supported subcommands: status, enroll, config, serve, upgrade")
158+
fmt.Println("supported subcommands: status, enroll, config, serve, upgrade, repair")
139159
return
140160
}
141161

@@ -150,11 +170,61 @@ func cmdCloud(cfg store.Config) {
150170
cmdCloudServe()
151171
case "upgrade":
152172
cmdCloudUpgrade(cfg)
173+
case "repair":
174+
cmdCloudRepair()
153175
default:
154176
fmt.Fprintf(os.Stderr, "unknown cloud command: %s\n", os.Args[2])
155-
fmt.Fprintln(os.Stderr, "supported subcommands: status, enroll, config, serve, upgrade")
177+
fmt.Fprintln(os.Stderr, "supported subcommands: status, enroll, config, serve, upgrade, repair")
178+
exitFunc(1)
179+
}
180+
}
181+
182+
func cmdCloudRepair() {
183+
if len(os.Args) < 4 || os.Args[3] == "--help" || os.Args[3] == "-h" || os.Args[3] == "help" {
184+
fmt.Println("usage: engram cloud repair materialize-mutations --project <name> (--dry-run|--apply)")
185+
fmt.Println("repairs existing cloud_mutations into compatible cloud_chunks without deleting remote data")
186+
return
187+
}
188+
command := strings.TrimSpace(strings.ToLower(os.Args[3]))
189+
if command != "materialize-mutations" {
190+
fmt.Fprintf(os.Stderr, "unknown cloud repair command: %s\n", command)
191+
fmt.Fprintln(os.Stderr, "supported cloud repair commands: materialize-mutations")
192+
exitFunc(1)
193+
return
194+
}
195+
project := parseCloudUpgradeProjectArg(os.Args[4:])
196+
if project == "" {
197+
fmt.Fprintln(os.Stderr, "usage: engram cloud repair materialize-mutations --project <name> (--dry-run|--apply)")
198+
fmt.Fprintln(os.Stderr, "error: --project is required")
156199
exitFunc(1)
200+
return
201+
}
202+
dryRun := hasCloudUpgradeFlag(os.Args[4:], "--dry-run")
203+
apply := hasCloudUpgradeFlag(os.Args[4:], "--apply")
204+
if dryRun == apply {
205+
fmt.Fprintln(os.Stderr, "usage: engram cloud repair materialize-mutations --project <name> (--dry-run|--apply)")
206+
fmt.Fprintln(os.Stderr, "error: exactly one of --dry-run or --apply is required")
207+
exitFunc(1)
208+
return
209+
}
210+
211+
cs, err := cloudstore.New(cloud.ConfigFromEnv())
212+
if err != nil {
213+
fatal(err)
214+
return
215+
}
216+
defer cs.Close()
217+
report, err := cs.BackfillMutationChunks(context.Background(), project, apply)
218+
if err != nil {
219+
fatal(err)
220+
return
221+
}
222+
encoded, err := jsonMarshalIndent(report, "", " ")
223+
if err != nil {
224+
fatal(err)
225+
return
157226
}
227+
fmt.Println(string(encoded))
158228
}
159229

160230
func cmdCloudUpgrade(cfg store.Config) {

0 commit comments

Comments
 (0)