Skip to content

Commit 8330dc4

Browse files
Merge pull request #325 from Gentleman-Programming/fix/sync-legacy-orphan-import
fix(sync): recover legacy orphan imports
2 parents 76b2909 + b5f2eb9 commit 8330dc4

2 files changed

Lines changed: 170 additions & 22 deletions

File tree

internal/sync/sync.go

Lines changed: 132 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ package sync
2121

2222
import (
2323
"compress/gzip"
24+
"database/sql"
2425
"encoding/json"
2526
"errors"
2627
"fmt"
@@ -517,8 +518,10 @@ func (sy *Syncer) Import() (*ImportResult, error) {
517518
type importMode string
518519

519520
const (
520-
importModeLocal importMode = "local"
521-
importModeCloud importMode = "cloud"
521+
importModeLocal importMode = "local"
522+
importModeCloud importMode = "cloud"
523+
recoveredMissingSessionDirectory = "(recovered-missing-session)"
524+
recoveredMissingSessionStartedAt = "1970-01-01 00:00:00"
522525
)
523526

524527
func (sy *Syncer) importEntriesDependencySafe(entries []ChunkEntry, knownChunks map[string]bool, mode importMode) (*ImportResult, error) {
@@ -536,6 +539,14 @@ func (sy *Syncer) importEntriesDependencySafe(entries []ChunkEntry, knownChunks
536539
if len(pendingEntries) == 0 {
537540
return result, nil
538541
}
542+
availableSessionIDs := map[string]struct{}{}
543+
if mode == importModeLocal {
544+
var err error
545+
availableSessionIDs, err = sy.sessionIDsAvailableInChunks(entries, knownChunks)
546+
if err != nil {
547+
return nil, err
548+
}
549+
}
539550

540551
lastErrors := map[string]error{}
541552
for pass := 1; len(pendingEntries) > 0; pass++ {
@@ -562,11 +573,26 @@ func (sy *Syncer) importEntriesDependencySafe(entries []ChunkEntry, knownChunks
562573
}
563574

564575
if err := sy.importMutationChunk(entry.ID, chunk); err != nil {
576+
if mode == importModeLocal {
577+
recoveredChunk, recovered, recoveryErr := sy.recoverLocalMissingSessionDependencies(chunk, availableSessionIDs)
578+
if recoveryErr != nil {
579+
return nil, recoveryErr
580+
}
581+
if recovered {
582+
if retryErr := sy.importMutationChunk(entry.ID, recoveredChunk); retryErr == nil {
583+
chunk = recoveredChunk
584+
goto imported
585+
} else {
586+
err = retryErr
587+
}
588+
}
589+
}
565590
lastErrors[entry.ID] = importDependencyError(chunk, err)
566591
nextPending = append(nextPending, entry)
567592
continue
568593
}
569594

595+
imported:
570596
importResult := estimateMutationImportResult(chunk)
571597
knownChunks[entry.ID] = true
572598
delete(lastErrors, entry.ID)
@@ -612,6 +638,77 @@ func importDependencyError(chunk ChunkData, err error) error {
612638
return fmt.Errorf("%w; pending session dependencies: %s", err, strings.Join(sessions, ", "))
613639
}
614640

641+
func (sy *Syncer) sessionIDsAvailableInChunks(entries []ChunkEntry, knownChunks map[string]bool) (map[string]struct{}, error) {
642+
available := make(map[string]struct{})
643+
for _, entry := range entries {
644+
chunkJSON, err := sy.transport.ReadChunk(entry.ID)
645+
if err != nil {
646+
if errors.Is(err, ErrChunkNotFound) || knownChunks[entry.ID] {
647+
continue
648+
}
649+
return nil, fmt.Errorf("read chunk %s: %w", entry.ID, err)
650+
}
651+
652+
var chunk ChunkData
653+
if err := json.Unmarshal(chunkJSON, &chunk); err != nil {
654+
if knownChunks[entry.ID] {
655+
continue
656+
}
657+
return nil, fmt.Errorf("parse chunk %s: %w", entry.ID, err)
658+
}
659+
for _, mutation := range buildImportMutations(chunk) {
660+
if mutation.Entity != store.SyncEntitySession || mutation.Op != store.SyncOpUpsert {
661+
continue
662+
}
663+
sessionID := strings.TrimSpace(mutation.EntityKey)
664+
if sessionID != "" {
665+
available[sessionID] = struct{}{}
666+
}
667+
}
668+
}
669+
return available, nil
670+
}
671+
672+
func (sy *Syncer) recoverLocalMissingSessionDependencies(chunk ChunkData, availableSessionIDs map[string]struct{}) (ChunkData, bool, error) {
673+
mutations := buildImportMutations(chunk)
674+
projectsBySession := referencedSessionProjectsFromNonSessionUpserts(mutations)
675+
if len(projectsBySession) == 0 {
676+
return chunk, false, nil
677+
}
678+
679+
missingIDs := make([]string, 0, len(projectsBySession))
680+
for sessionID := range projectsBySession {
681+
if _, available := availableSessionIDs[sessionID]; available {
682+
continue
683+
}
684+
_, err := sy.store.GetSession(sessionID)
685+
if err == nil {
686+
continue
687+
}
688+
if !errors.Is(err, sql.ErrNoRows) {
689+
return chunk, false, fmt.Errorf("check recovered session dependency %s: %w", sessionID, err)
690+
}
691+
missingIDs = append(missingIDs, sessionID)
692+
}
693+
if len(missingIDs) == 0 {
694+
return chunk, false, nil
695+
}
696+
697+
sort.Strings(missingIDs)
698+
recovered := chunk
699+
stubSessions := make([]store.Session, 0, len(missingIDs))
700+
for _, sessionID := range missingIDs {
701+
stubSessions = append(stubSessions, store.Session{
702+
ID: sessionID,
703+
Project: projectsBySession[sessionID],
704+
Directory: recoveredMissingSessionDirectory,
705+
StartedAt: recoveredMissingSessionStartedAt,
706+
})
707+
}
708+
recovered.Sessions = append(stubSessions, recovered.Sessions...)
709+
return recovered, true, nil
710+
}
711+
615712
func buildImportMutations(chunk ChunkData) []store.SyncMutation {
616713
if len(chunk.Mutations) == 0 {
617714
return synthesizeMutationsFromChunk(chunk)
@@ -704,6 +801,39 @@ func referencedSessionIDsFromNonSessionUpserts(mutations []store.SyncMutation) m
704801
return required
705802
}
706803

804+
func referencedSessionProjectsFromNonSessionUpserts(mutations []store.SyncMutation) map[string]string {
805+
projects := make(map[string]string)
806+
for _, mutation := range mutations {
807+
if mutation.Op != store.SyncOpUpsert {
808+
continue
809+
}
810+
switch mutation.Entity {
811+
case store.SyncEntityObservation, store.SyncEntityPrompt:
812+
var payload struct {
813+
SessionID string `json:"session_id"`
814+
Project *string `json:"project"`
815+
}
816+
if err := decodeSyncPayloadForProject([]byte(mutation.Payload), &payload); err != nil {
817+
continue
818+
}
819+
sessionID := strings.TrimSpace(payload.SessionID)
820+
if sessionID == "" {
821+
continue
822+
}
823+
project, _ := store.NormalizeProject(strings.TrimSpace(mutation.Project))
824+
project = strings.TrimSpace(project)
825+
if project == "" && payload.Project != nil {
826+
project, _ = store.NormalizeProject(strings.TrimSpace(*payload.Project))
827+
project = strings.TrimSpace(project)
828+
}
829+
if existing := strings.TrimSpace(projects[sessionID]); existing == "" || (project != "" && project < existing) {
830+
projects[sessionID] = project
831+
}
832+
}
833+
}
834+
return projects
835+
}
836+
707837
func mutationIdentityKey(mutation store.SyncMutation) string {
708838
return fmt.Sprintf("%s:%s", mutation.Entity, strings.TrimSpace(mutation.EntityKey))
709839
}

internal/sync/sync_test.go

Lines changed: 38 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -948,15 +948,11 @@ func TestImportBranches(t *testing.T) {
948948
})
949949

950950
chunk := ChunkData{
951-
Observations: []store.Observation{{
952-
ID: 1,
953-
SessionID: "missing-session",
954-
Type: "bugfix",
955-
Title: "broken",
956-
Content: "missing session should violate FK",
957-
Scope: "project",
958-
CreatedAt: "2025-01-01 00:00:01",
959-
UpdatedAt: "2025-01-01 00:00:01",
951+
Mutations: []store.SyncMutation{{
952+
Entity: "unknown",
953+
EntityKey: "broken-entity",
954+
Op: store.SyncOpUpsert,
955+
Payload: `{}`,
960956
}},
961957
}
962958
payload, err := json.Marshal(chunk)
@@ -973,7 +969,7 @@ func TestImportBranches(t *testing.T) {
973969
}
974970

975971
sy := New(s, syncDir)
976-
if _, err := sy.Import(); err == nil || !strings.Contains(err.Error(), "dependency-safe local import stalled") || !strings.Contains(err.Error(), "missing-session") {
972+
if _, err := sy.Import(); err == nil || !strings.Contains(err.Error(), "dependency-safe local import stalled") || !strings.Contains(err.Error(), "unknown sync entity") {
977973
t.Fatalf("expected dependency-safe local import error, got %v", err)
978974
}
979975
})
@@ -1052,9 +1048,13 @@ func TestLocalImportDependencySafeAcrossChunksRegardlessManifestOrder(t *testing
10521048
if res.ChunksImported != 2 || res.SessionsImported != 1 || res.ObservationsImported != 1 || res.PromptsImported != 1 {
10531049
t.Fatalf("unexpected import result: %+v", res)
10541050
}
1055-
if _, err := s.GetSession("sess-cross-chunk"); err != nil {
1051+
sess, err := s.GetSession("sess-cross-chunk")
1052+
if err != nil {
10561053
t.Fatalf("expected session imported: %v", err)
10571054
}
1055+
if sess.Directory != "/tmp/proj-a" {
1056+
t.Fatalf("expected real session chunk to win, got %+v", sess)
1057+
}
10581058
results, err := s.Search("cross chunk observation", store.SearchOptions{Project: project, Limit: 5})
10591059
if err != nil || len(results) != 1 {
10601060
t.Fatalf("expected imported observation, results=%d err=%v", len(results), err)
@@ -1099,20 +1099,38 @@ func TestLocalImportOrdersExplicitMutationsAndDirectArraysSafely(t *testing.T) {
10991099
}
11001100
}
11011101

1102-
func TestLocalImportMissingSessionStallsWithUsefulError(t *testing.T) {
1102+
func TestLocalImportRecoversLegacyChunkWithMissingSessionStub(t *testing.T) {
11031103
s := newTestStore(t)
11041104
syncDir := t.TempDir()
1105-
chunkID := "missing-session"
1105+
chunkID := "aaf7a13f"
1106+
project := "proj-a"
11061107
writeManifestFile(t, syncDir, &Manifest{Version: 1, Chunks: []ChunkEntry{{ID: chunkID, CreatedAt: "2025-01-01T00:00:00Z"}}})
1107-
writeLocalChunkFile(t, syncDir, chunkID, ChunkData{Observations: []store.Observation{{SyncID: "obs-missing-session", SessionID: "does-not-exist", Type: "note", Title: "missing", Content: "missing dependency", Scope: "project", CreatedAt: "2025-01-01 00:00:00", UpdatedAt: "2025-01-01 00:00:00"}}})
1108+
writeLocalChunkFile(t, syncDir, chunkID, ChunkData{
1109+
Observations: []store.Observation{{SyncID: "obs-missing-session", SessionID: "does-not-exist", Type: "note", Title: "missing", Content: "missing dependency", Project: &project, Scope: "project", CreatedAt: "2025-01-01 00:00:00", UpdatedAt: "2025-01-01 00:00:00"}},
1110+
Prompts: []store.Prompt{{SyncID: "prompt-missing-session", SessionID: "does-not-exist", Content: "prompt should be preserved", Project: project, CreatedAt: "2025-01-01 00:00:01"}},
1111+
})
11081112

1109-
_, err := New(s, syncDir).Import()
1110-
if err == nil {
1111-
t.Fatal("expected missing session dependency to stall")
1113+
res, err := New(s, syncDir).Import()
1114+
if err != nil {
1115+
t.Fatalf("local import should recover malformed legacy missing session chunk: %v", err)
11121116
}
1113-
msg := err.Error()
1114-
if !strings.Contains(msg, "dependency-safe local import stalled") || !strings.Contains(msg, chunkID) || !strings.Contains(msg, "does-not-exist") {
1115-
t.Fatalf("expected useful dependency-safe stall error, got %v", err)
1117+
if res.ChunksImported != 1 || res.SessionsImported != 1 || res.ObservationsImported != 1 || res.PromptsImported != 1 {
1118+
t.Fatalf("unexpected import result: %+v", res)
1119+
}
1120+
sess, err := s.GetSession("does-not-exist")
1121+
if err != nil {
1122+
t.Fatalf("expected recovered stub session: %v", err)
1123+
}
1124+
if sess.Project != project || sess.Directory != "(recovered-missing-session)" {
1125+
t.Fatalf("unexpected recovered session: %+v", sess)
1126+
}
1127+
results, err := s.Search("missing dependency", store.SearchOptions{Project: project, Limit: 5})
1128+
if err != nil || len(results) != 1 {
1129+
t.Fatalf("expected recovered observation, results=%d err=%v", len(results), err)
1130+
}
1131+
prompts, err := s.RecentPrompts(project, 5)
1132+
if err != nil || len(prompts) != 1 || prompts[0].SyncID != "prompt-missing-session" {
1133+
t.Fatalf("expected recovered prompt, prompts=%+v err=%v", prompts, err)
11161134
}
11171135
}
11181136

0 commit comments

Comments
 (0)