Skip to content

Commit 6d2138c

Browse files
fix(cloud): stop autosync false acknowledgements
1 parent d44d37b commit 6d2138c

9 files changed

Lines changed: 354 additions & 28 deletions

File tree

cmd/engram/autosync_e2e_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,10 @@ func (s *autosyncFakeStore) ListPendingSyncMutations(_ string, limit int) ([]sto
302302
return result, nil
303303
}
304304

305+
func (s *autosyncFakeStore) CountPendingNonEnrolledSyncMutations(_ string) ([]store.PendingSyncMutationProjectCount, error) {
306+
return nil, nil
307+
}
308+
305309
func (s *autosyncFakeStore) AckSyncMutations(_ string, _ int64) error { return nil }
306310

307311
func (s *autosyncFakeStore) AckSyncMutationSeqs(_ string, seqs []int64) error {
@@ -334,6 +338,8 @@ func (s *autosyncFakeStore) ApplyPulledMutation(_ string, _ store.SyncMutation)
334338

335339
func (s *autosyncFakeStore) MarkSyncFailure(_, _ string, _ time.Time) error { return nil }
336340

341+
func (s *autosyncFakeStore) MarkSyncBlocked(_, _, _ string) error { return nil }
342+
337343
func (s *autosyncFakeStore) MarkSyncHealthy(_ string) error { return nil }
338344

339345
// httpPushMutations is a helper to push mutations directly to a test server.

cmd/engram/autosync_status.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,11 @@ func (a *autosyncStatusAdapter) mapPhase(st autosync.Status) server.SyncStatus {
7979
} else {
8080
base.ReasonCode = "transport_failed"
8181
}
82-
base.ReasonMessage = st.LastError
82+
if st.ReasonMessage != "" {
83+
base.ReasonMessage = st.ReasonMessage
84+
} else {
85+
base.ReasonMessage = st.LastError
86+
}
8387

8488
case autosync.PhaseDisabled:
8589
base.Phase = "degraded"

cmd/engram/cloud.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -552,15 +552,46 @@ func cmdCloudStatus(cfg store.Config) {
552552
fmt.Println("Auth status: ready (insecure local-dev mode: ENGRAM_CLOUD_INSECURE_NO_AUTH=1)")
553553
fmt.Println("Sync readiness: ready for explicit --project sync (project must be enrolled)")
554554
fmt.Println("Warning: bearer auth is disabled in insecure mode; do not use in production")
555+
printCloudStatusSyncDiagnostic(cfg)
555556
return
556557
}
557558
fmt.Println("Auth status: token not configured (client token is optional at preflight)")
558559
fmt.Println("Sync readiness: ready to attempt explicit --project sync (project must be enrolled)")
559560
fmt.Println("Hint: if the remote server enforces bearer auth, set ENGRAM_CLOUD_TOKEN")
561+
printCloudStatusSyncDiagnostic(cfg)
560562
return
561563
}
562564
fmt.Println("Auth status: ready (token provided via runtime cloud config)")
563565
fmt.Println("Sync readiness: ready for explicit --project sync (project must be enrolled)")
566+
printCloudStatusSyncDiagnostic(cfg)
567+
}
568+
569+
func printCloudStatusSyncDiagnostic(cfg store.Config) {
570+
if _, err := os.Stat(filepath.Join(cfg.DataDir, "engram.db")); err != nil {
571+
return
572+
}
573+
s, err := storeNew(cfg)
574+
if err != nil {
575+
fmt.Printf("Sync diagnostic: unavailable (%v)\n", err)
576+
return
577+
}
578+
defer s.Close()
579+
state, err := s.GetSyncState(constants.TargetKeyCloud)
580+
if err != nil || state == nil {
581+
return
582+
}
583+
code := strings.TrimSpace(derefString(state.ReasonCode))
584+
message := strings.TrimSpace(derefString(state.ReasonMessage))
585+
if code == "" && message == "" {
586+
return
587+
}
588+
fmt.Printf("Sync diagnostic: %s\n", state.Lifecycle)
589+
if code != "" {
590+
fmt.Printf("reason_code: %s\n", code)
591+
}
592+
if message != "" {
593+
fmt.Printf("reason_message: %s\n", message)
594+
}
564595
}
565596

566597
func cmdCloudEnroll(cfg store.Config) {

cmd/engram/main_extra_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1336,6 +1336,35 @@ func TestCmdCloudStatusHonorsEnvServerOverride(t *testing.T) {
13361336
}
13371337
}
13381338

1339+
func TestCmdCloudStatusSurfacesPersistedNonEnrolledPendingDiagnostic(t *testing.T) {
1340+
stubExitWithPanic(t)
1341+
stubRuntimeHooks(t)
1342+
1343+
cfg := testConfig(t)
1344+
t.Setenv("ENGRAM_CLOUD_SERVER", "https://env-cloud.example.test")
1345+
t.Setenv("ENGRAM_CLOUD_TOKEN", "env-token")
1346+
s, err := store.New(cfg)
1347+
if err != nil {
1348+
t.Fatalf("open store: %v", err)
1349+
}
1350+
if err := s.MarkSyncBlocked(store.DefaultSyncTargetKey, constants.ReasonNonEnrolledPendingMutations, "pending cloud sync mutations are blocked because project(s) are not enrolled: alpha=2. Run `engram cloud enroll <project>` for each intended project or review enrollment."); err != nil {
1351+
_ = s.Close()
1352+
t.Fatalf("mark blocked: %v", err)
1353+
}
1354+
_ = s.Close()
1355+
1356+
withArgs(t, "engram", "cloud", "status")
1357+
stdout, stderr, recovered := captureOutputAndRecover(t, func() { cmdCloud(cfg) })
1358+
if recovered != nil || stderr != "" {
1359+
t.Fatalf("cloud status should succeed, panic=%v stderr=%q", recovered, stderr)
1360+
}
1361+
for _, want := range []string{"Sync diagnostic: degraded", "reason_code: non_enrolled_pending_mutations", "engram cloud enroll <project>"} {
1362+
if !strings.Contains(stdout, want) {
1363+
t.Fatalf("expected cloud status output to contain %q, got %q", want, stdout)
1364+
}
1365+
}
1366+
}
1367+
13391368
func TestCmdCloudStatusRejectsInvalidEffectiveRuntimeServerURL(t *testing.T) {
13401369
stubExitWithPanic(t)
13411370
stubRuntimeHooks(t)

internal/cloud/autosync/manager.go

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,16 +79,25 @@ type PullMutationsResponse struct {
7979
type LocalStore interface {
8080
GetSyncState(targetKey string) (*store.SyncState, error)
8181
ListPendingSyncMutations(targetKey string, limit int) ([]store.SyncMutation, error)
82+
CountPendingNonEnrolledSyncMutations(targetKey string) ([]store.PendingSyncMutationProjectCount, error)
8283
AckSyncMutations(targetKey string, lastAckedSeq int64) error
8384
AckSyncMutationSeqs(targetKey string, seqs []int64) error
84-
SkipAckNonEnrolledMutations(targetKey string) (int64, error)
8585
AcquireSyncLease(targetKey, owner string, ttl time.Duration, now time.Time) (bool, error)
8686
ReleaseSyncLease(targetKey, owner string) error
8787
ApplyPulledMutation(targetKey string, mutation store.SyncMutation) error
8888
MarkSyncFailure(targetKey, message string, backoffUntil time.Time) error
89+
MarkSyncBlocked(targetKey, reasonCode, message string) error
8990
MarkSyncHealthy(targetKey string) error
9091
}
9192

93+
type nonEnrolledPendingError struct {
94+
counts []store.PendingSyncMutationProjectCount
95+
}
96+
97+
func (e *nonEnrolledPendingError) Error() string {
98+
return nonEnrolledPendingMessage(e.counts)
99+
}
100+
92101
// CloudTransport is the subset of remote.MutationTransport methods the manager needs.
93102
type CloudTransport interface {
94103
PushMutations(mutations []MutationEntry) (*PushMutationsResult, error)
@@ -388,6 +397,11 @@ func (m *Manager) cycle(ctx context.Context) {
388397

389398
// Push, then pull.
390399
if err := m.push(ctx); err != nil {
400+
var blocked *nonEnrolledPendingError
401+
if errors.As(err, &blocked) {
402+
m.recordBlocked(err.Error(), constants.ReasonNonEnrolledPendingMutations)
403+
return
404+
}
391405
reasonCode := classifyTransportError(err)
392406
m.recordFailureWithReason(autosyncFailureMessage(m.cfg.TargetKey, fmt.Sprintf("push: %v", err), err), reasonCode)
393407
return
@@ -453,16 +467,18 @@ func (m *Manager) push(ctx context.Context) error {
453467

454468
m.setPhase(PhasePushing)
455469

456-
// Skip-ack mutations for non-enrolled projects.
457-
if _, err := m.store.SkipAckNonEnrolledMutations(m.cfg.TargetKey); err != nil {
458-
return fmt.Errorf("skip-ack non-enrolled: %w", err)
459-
}
460-
461470
pending, err := m.store.ListPendingSyncMutations(m.cfg.TargetKey, m.cfg.PushBatchSize)
462471
if err != nil {
463472
return fmt.Errorf("list pending: %w", err)
464473
}
465474
if len(pending) == 0 {
475+
counts, err := m.store.CountPendingNonEnrolledSyncMutations(m.cfg.TargetKey)
476+
if err != nil {
477+
return fmt.Errorf("count pending non-enrolled mutations: %w", err)
478+
}
479+
if len(counts) > 0 {
480+
return &nonEnrolledPendingError{counts: counts}
481+
}
466482
return nil
467483
}
468484

@@ -589,6 +605,18 @@ func (m *Manager) recordFailureWithReason(msg, reasonCode string) {
589605
_ = m.store.MarkSyncFailure(m.cfg.TargetKey, msg, bu)
590606
}
591607

608+
func (m *Manager) recordBlocked(msg, reasonCode string) {
609+
m.mu.Lock()
610+
m.status.Phase = PhasePushFailed
611+
m.status.LastError = msg
612+
m.status.ReasonCode = reasonCode
613+
m.status.ReasonMessage = msg
614+
m.status.BackoffUntil = nil
615+
m.mu.Unlock()
616+
617+
_ = m.store.MarkSyncBlocked(m.cfg.TargetKey, reasonCode, msg)
618+
}
619+
592620
func (m *Manager) recordSuccess() {
593621
now := time.Now()
594622
m.mu.Lock()
@@ -604,6 +632,14 @@ func (m *Manager) recordSuccess() {
604632
_ = m.store.MarkSyncHealthy(m.cfg.TargetKey)
605633
}
606634

635+
func nonEnrolledPendingMessage(counts []store.PendingSyncMutationProjectCount) string {
636+
parts := make([]string, 0, len(counts))
637+
for _, count := range counts {
638+
parts = append(parts, fmt.Sprintf("%s=%d", count.Project, count.Count))
639+
}
640+
return fmt.Sprintf("pending cloud sync mutations are blocked because project(s) are not enrolled: %s. Run `engram cloud enroll <project>` for each intended project or review enrollment.", strings.Join(parts, ", "))
641+
}
642+
607643
// computeBackoff returns exponential backoff with ±25% jitter.
608644
// Formula: min(base * 2^(failures-1), maxBackoff) ± jitter where jitter ∈ [-base*0.25, +base*0.25]
609645
// BW1: ±25% means jitter can be negative, so result ∈ [base*0.75, base*1.25].

0 commit comments

Comments
 (0)