Skip to content

Commit 73f75ba

Browse files
Frank Guoclaude
andcommitted
Add multi-agent support: Codex, Gemini, OpenCode adapters
Refactor session parsing into an adapter pattern so checkpoint discovers and parses sessions from Claude, Codex, Gemini CLI, and OpenCode. Each adapter implements Discover + Parse. Adds schema migration for the new `source` column, persistent Nomic model caching, and agent detection hints on `rekal init`. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent d776f83 commit 73f75ba

20 files changed

Lines changed: 1310 additions & 168 deletions

cmd/rekal/cli/checkpoint.go

Lines changed: 124 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -57,30 +57,18 @@ func runCheckpoint(cmd *cobra.Command, gitRoot string) error {
5757
// doCheckpoint captures the current session after a commit.
5858
// Extracted so sync can call it without a cobra.Command.
5959
func doCheckpoint(gitRoot string, w io.Writer) error {
60-
// Find session directory for this repo.
61-
sessionDir := session.FindSessionDir(gitRoot)
62-
if sessionDir == "" {
63-
return nil
64-
}
65-
66-
files, err := session.FindSessionFiles(sessionDir)
67-
if err != nil {
68-
if os.IsNotExist(err) {
69-
return nil
70-
}
71-
return fmt.Errorf("find session files: %w", err)
72-
}
73-
if len(files) == 0 {
74-
return nil
75-
}
76-
7760
// Open data DB.
7861
dataDB, err := db.OpenData(gitRoot)
7962
if err != nil {
8063
return fmt.Errorf("open data DB: %w", err)
8164
}
8265
defer dataDB.Close()
8366

67+
// Run forward-only migrations for existing DBs.
68+
if err := db.MigrateDataSchema(dataDB); err != nil {
69+
return fmt.Errorf("migrate data schema: %w", err)
70+
}
71+
8472
// Verify DB is healthy by running a simple query.
8573
if _, err := dataDB.Exec("SELECT 1"); err != nil {
8674
return fmt.Errorf("data DB is corrupt or unreadable: %w", err)
@@ -97,107 +85,144 @@ func doCheckpoint(gitRoot string, w io.Writer) error {
9785
// Collect unique relative file paths from file-modifying tool_calls across all sessions.
9886
toolCallPaths := make(map[string]struct{})
9987

100-
for _, f := range files {
101-
// Incremental: check checkpoint_state to skip unchanged files.
102-
info, statErr := os.Stat(f)
103-
if statErr != nil {
104-
continue
105-
}
106-
107-
data, err := os.ReadFile(f)
88+
// Iterate all adapters to discover sessions from all known agents.
89+
for _, adapter := range session.Adapters {
90+
refs, err := adapter.Discover(gitRoot)
10891
if err != nil {
10992
continue
11093
}
111-
if len(data) == 0 {
112-
continue
113-
}
114-
115-
hash := sha256Hex(data)
11694

117-
// Check cached state — skip if size + hash match.
118-
cachedSize, cachedHash, found, csErr := db.GetCheckpointState(dataDB, f)
119-
if csErr != nil {
120-
return fmt.Errorf("check checkpoint state: %w", csErr)
121-
}
122-
if found && cachedSize == info.Size() && cachedHash == hash {
123-
continue
124-
}
95+
for _, ref := range refs {
96+
// Determine cache key for deduplication.
97+
cacheKey := ref.Path
98+
if cacheKey == "" {
99+
cacheKey = adapter.Name() + ":" + ref.DBID
100+
}
125101

126-
exists, err := db.SessionExistsByHash(dataDB, hash)
127-
if err != nil {
128-
return fmt.Errorf("dedup check: %w", err)
129-
}
130-
if exists {
131-
// File changed but session already exists (re-parse produced same hash).
132-
// Update state cache and skip.
133-
_ = db.UpsertCheckpointState(dataDB, f, info.Size(), hash)
134-
continue
135-
}
102+
// For file-based refs, check size+hash cache.
103+
var data []byte
104+
var hash string
105+
if ref.Path != "" {
106+
info, statErr := os.Stat(ref.Path)
107+
if statErr != nil {
108+
continue
109+
}
110+
111+
fileData, err := os.ReadFile(ref.Path)
112+
if err != nil || len(fileData) == 0 {
113+
continue
114+
}
115+
data = fileData
116+
hash = sha256Hex(data)
117+
118+
cachedSize, cachedHash, found, csErr := db.GetCheckpointState(dataDB, cacheKey)
119+
if csErr != nil {
120+
return fmt.Errorf("check checkpoint state: %w", csErr)
121+
}
122+
if found && cachedSize == info.Size() && cachedHash == hash {
123+
continue
124+
}
125+
} else {
126+
// DB-based ref — use DBID as hash seed for dedup.
127+
hash = sha256Hex([]byte(cacheKey))
128+
129+
_, _, found, csErr := db.GetCheckpointState(dataDB, cacheKey)
130+
if csErr != nil {
131+
return fmt.Errorf("check checkpoint state: %w", csErr)
132+
}
133+
if found {
134+
continue
135+
}
136+
}
136137

137-
payload, err := session.ParseTranscript(data)
138-
if err != nil {
139-
continue
140-
}
138+
exists, err := db.SessionExistsByHash(dataDB, hash)
139+
if err != nil {
140+
return fmt.Errorf("dedup check: %w", err)
141+
}
142+
if exists {
143+
if ref.Path != "" {
144+
info, _ := os.Stat(ref.Path)
145+
if info != nil {
146+
_ = db.UpsertCheckpointState(dataDB, cacheKey, info.Size(), hash)
147+
}
148+
} else {
149+
_ = db.UpsertCheckpointState(dataDB, cacheKey, 0, hash)
150+
}
151+
continue
152+
}
141153

142-
// Redact secrets and anonymize paths before any DB insertion.
143-
scrub.Scrub(payload)
154+
payload, err := adapter.Parse(ref)
155+
if err != nil || payload == nil {
156+
continue
157+
}
144158

145-
if len(payload.Turns) == 0 && len(payload.ToolCalls) == 0 {
146-
continue
147-
}
159+
// Redact secrets and anonymize paths before any DB insertion.
160+
scrub.Scrub(payload)
148161

149-
sessionID := newID()
150-
capturedAt := time.Now().UTC()
162+
if len(payload.Turns) == 0 && len(payload.ToolCalls) == 0 {
163+
continue
164+
}
151165

152-
// Insert session into DuckDB.
153-
if err := db.InsertSession(
154-
dataDB, sessionID, "", hash,
155-
payload.ActorType, payload.AgentID, email, payload.Branch, capturedAt.Format(time.RFC3339),
156-
); err != nil {
157-
return fmt.Errorf("insert session: %w", err)
158-
}
166+
sessionID := newID()
167+
capturedAt := time.Now().UTC()
159168

160-
// Insert turns into DuckDB.
161-
for i, t := range payload.Turns {
162-
ts := ""
163-
if !t.Timestamp.IsZero() {
164-
ts = t.Timestamp.UTC().Format(time.RFC3339)
165-
}
166-
if err := db.InsertTurn(dataDB, newID(), sessionID, i, t.Role, t.Content, ts); err != nil {
167-
return fmt.Errorf("insert turn: %w", err)
169+
// Insert session into DuckDB.
170+
if err := db.InsertSession(
171+
dataDB, sessionID, "", hash,
172+
payload.ActorType, payload.AgentID, email, payload.Branch, capturedAt.Format(time.RFC3339),
173+
payload.Source,
174+
); err != nil {
175+
return fmt.Errorf("insert session: %w", err)
168176
}
169-
}
170177

171-
// Insert tool calls into DuckDB.
172-
for i, tc := range payload.ToolCalls {
173-
if err := db.InsertToolCall(dataDB, newID(), sessionID, i, tc.Tool, tc.Path, tc.CmdPrefix); err != nil {
174-
return fmt.Errorf("insert tool_call: %w", err)
178+
// Insert turns into DuckDB.
179+
for i, t := range payload.Turns {
180+
ts := ""
181+
if !t.Timestamp.IsZero() {
182+
ts = t.Timestamp.UTC().Format(time.RFC3339)
183+
}
184+
if err := db.InsertTurn(dataDB, newID(), sessionID, i, t.Role, t.Content, ts); err != nil {
185+
return fmt.Errorf("insert turn: %w", err)
186+
}
175187
}
176-
}
177188

178-
// Collect file-modifying tool_call paths for files_touched supplementation.
179-
for _, tc := range payload.ToolCalls {
180-
if tc.Path == "" {
181-
continue
182-
}
183-
switch tc.Tool {
184-
case "Write", "Edit", "NotebookEdit":
185-
default:
186-
continue
189+
// Insert tool calls into DuckDB.
190+
for i, tc := range payload.ToolCalls {
191+
if err := db.InsertToolCall(dataDB, newID(), sessionID, i, tc.Tool, tc.Path, tc.CmdPrefix); err != nil {
192+
return fmt.Errorf("insert tool_call: %w", err)
193+
}
187194
}
188-
rel := strings.TrimPrefix(tc.Path, gitRoot+"/")
189-
if rel == tc.Path {
190-
// Path is not under gitRoot — external file, skip.
191-
continue
195+
196+
// Collect file-modifying tool_call paths for files_touched supplementation.
197+
for _, tc := range payload.ToolCalls {
198+
if tc.Path == "" {
199+
continue
200+
}
201+
switch tc.Tool {
202+
case "Write", "Edit", "NotebookEdit":
203+
default:
204+
continue
205+
}
206+
rel := strings.TrimPrefix(tc.Path, gitRoot+"/")
207+
if rel == tc.Path {
208+
continue
209+
}
210+
toolCallPaths[rel] = struct{}{}
192211
}
193-
toolCallPaths[rel] = struct{}{}
194-
}
195212

196-
// Update checkpoint state cache.
197-
_ = db.UpsertCheckpointState(dataDB, f, info.Size(), hash)
213+
// Update checkpoint state cache.
214+
if ref.Path != "" {
215+
info, _ := os.Stat(ref.Path)
216+
if info != nil {
217+
_ = db.UpsertCheckpointState(dataDB, cacheKey, info.Size(), hash)
218+
}
219+
} else {
220+
_ = db.UpsertCheckpointState(dataDB, cacheKey, 0, hash)
221+
}
198222

199-
sessionIDs = append(sessionIDs, sessionID)
200-
inserted++
223+
sessionIDs = append(sessionIDs, sessionID)
224+
inserted++
225+
}
201226
}
202227

203228
if inserted == 0 {

cmd/rekal/cli/db/db.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,14 @@ func SessionExistsByHash(d *sql.DB, hash string) (bool, error) {
4444
}
4545

4646
// InsertSession inserts a new session row into the data DB.
47-
func InsertSession(d *sql.DB, id, parentSessionID, hash, actorType, agentID, userEmail, branch, capturedAt string) error {
47+
func InsertSession(d *sql.DB, id, parentSessionID, hash, actorType, agentID, userEmail, branch, capturedAt, source string) error {
48+
if source == "" {
49+
source = "claude"
50+
}
4851
_, err := d.Exec(
49-
`INSERT INTO sessions (id, parent_session_id, session_hash, captured_at, actor_type, agent_id, user_email, branch)
50-
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`,
51-
id, nullIfEmpty(parentSessionID), hash, capturedAt, actorType, agentID, userEmail, branch,
52+
`INSERT INTO sessions (id, parent_session_id, session_hash, captured_at, actor_type, agent_id, user_email, branch, source)
53+
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)`,
54+
id, nullIfEmpty(parentSessionID), hash, capturedAt, actorType, agentID, userEmail, branch, source,
5255
)
5356
if err != nil {
5457
return fmt.Errorf("insert session: %w", err)

cmd/rekal/cli/db/schema.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,25 @@ import "database/sql"
55
// InitDataSchema creates the data DB tables if they do not exist.
66
// Data DB is the source of truth — append-only, never rebuilt.
77
func InitDataSchema(d *sql.DB) error {
8-
_, err := d.Exec(dataDDL)
9-
return err
8+
if _, err := d.Exec(dataDDL); err != nil {
9+
return err
10+
}
11+
return MigrateDataSchema(d)
12+
}
13+
14+
// MigrateDataSchema applies forward-only migrations to an existing data DB.
15+
// Safe to call multiple times — each migration checks before applying.
16+
func MigrateDataSchema(d *sql.DB) error {
17+
// Migration: add source column if missing (existing DBs pre-multi-agent).
18+
var count int
19+
err := d.QueryRow(`SELECT count(*) FROM information_schema.columns
20+
WHERE table_name = 'sessions' AND column_name = 'source'`).Scan(&count)
21+
if err == nil && count == 0 {
22+
if _, err := d.Exec(`ALTER TABLE sessions ADD COLUMN source VARCHAR NOT NULL DEFAULT 'claude'`); err != nil {
23+
return err
24+
}
25+
}
26+
return nil
1027
}
1128

1229
// InitIndexSchema creates the index DB tables if they do not exist.
@@ -28,7 +45,8 @@ CREATE TABLE IF NOT EXISTS sessions (
2845
actor_type VARCHAR NOT NULL DEFAULT 'human',
2946
agent_id VARCHAR,
3047
user_email VARCHAR,
31-
branch VARCHAR
48+
branch VARCHAR,
49+
source VARCHAR NOT NULL DEFAULT 'claude'
3250
);
3351
3452
CREATE TABLE IF NOT EXISTS turns (

cmd/rekal/cli/import.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func importBranch(gitRoot string, dataDB *sql.DB, branch string) (int, error) {
8989
sessionHash := "wire:" + sessionID
9090
capturedAt := sf.CapturedAt.UTC().Format(time.RFC3339)
9191

92-
if err := db.InsertSession(dataDB, sessionID, "", sessionHash, actorType, agentID, email, branch, capturedAt); err != nil {
92+
if err := db.InsertSession(dataDB, sessionID, "", sessionHash, actorType, agentID, email, branch, capturedAt, ""); err != nil {
9393
return imported, fmt.Errorf("insert session: %w", err)
9494
}
9595

0 commit comments

Comments
 (0)