Skip to content

Commit 618d895

Browse files
var-ggclaude
andcommitted
fix(timeline): re-key the keyset cursor + transactional migration
Acts on the two foundation-level findings from the GPT Pro review. (1) The keyset cursor was (sort_ts, repo_id, hash) — neither guaranteed-unique (discover_repos wrote commits before the repos row, so repo_id stayed 0) nor stable (repo_id is mutable: ON CONFLICT updates it, a repo re-add changes the rowid). Two repos sharing a commit (fork / clone) could collide on (-ts, 0, hash) and make keyset pagination skip or duplicate a row at a page boundary. The cursor + total-order index are now (sort_ts, repo_path, hash): (repo_path, hash) is the table's primary key, so the tuple is structurally unique and immutable. discover_repos also upserts the repos row before its commits so the denormalised repo_id resolves. Schema v5. The frontend now rebuilds a cursor straight from a CommitSummary, dropping the repoIdByPath plumbing. (2) The v5 migration runs inside a BEGIN IMMEDIATE transaction, so concurrent open() calls (watcher / orchestrator / IPC) serialize instead of racing the DROP / CREATE on the first launch after an upgrade. Also folds in two query-layer optimizations the review flagged: - list_commits_at_rank derives base_index as rank - newer_count, skipping the second O(rank) scan count_newer_than would otherwise do. - the time filter is sort_ts <= -since (an index range on idx_commits_timeline) instead of the residual timestamp >= since. 47 cargo tests green; tsc + vite build clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 6fcfe9f commit 618d895

6 files changed

Lines changed: 98 additions & 78 deletions

File tree

src-tauri/src/cache.rs

Lines changed: 85 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ pub const DIFF_CACHE_MAX_BYTES: i64 = 500 * 1024 * 1024;
4949
/// never rewritten by a later conflict-update. A reader pinned to generation
5050
/// N filters `first_seen_generation <= N` for an MVCC-lite snapshot the
5151
/// background scanner's subsequent inserts cannot disturb.
52-
const COMMITS_SCHEMA_V4: &str = r#"
52+
const COMMITS_SCHEMA_V5: &str = r#"
5353
CREATE TABLE commits (
5454
repo_path TEXT NOT NULL,
5555
hash TEXT NOT NULL,
@@ -72,9 +72,10 @@ CREATE TABLE commits (
7272
PRIMARY KEY (repo_path, hash)
7373
);
7474
-- Total-order index for keyset window queries. Newest-first = ascending on
75-
-- sort_ts (= -timestamp); repo_id + hash break ties into a deterministic
76-
-- total order, stable under concurrent scanner inserts.
77-
CREATE INDEX idx_commits_timeline ON commits(sort_ts, repo_id, hash);
75+
-- sort_ts (= -timestamp). (repo_path, hash) is the table's primary key, so
76+
-- (sort_ts, repo_path, hash) is a genuinely unique + stable total order —
77+
-- repo_id is denormalised and mutable, so it must not key the cursor.
78+
CREATE INDEX idx_commits_timeline ON commits(sort_ts, repo_path, hash);
7879
-- Retained for the legacy since-filtered list_recent_commits.
7980
CREATE INDEX idx_commits_ts ON commits(timestamp);
8081
"#;
@@ -90,7 +91,8 @@ fn db_path(app: &AppHandle) -> Result<PathBuf> {
9091

9192
pub fn open(app: &AppHandle) -> Result<Connection> {
9293
let path = db_path(app)?;
93-
let conn = Connection::open(&path).with_context(|| format!("open {}", path.display()))?;
94+
let mut conn =
95+
Connection::open(&path).with_context(|| format!("open {}", path.display()))?;
9496
// The cache is opened on separate connections from several threads — the
9597
// file watcher, command spawn_blocking tasks, the discovery orchestrator.
9698
// Without a busy timeout a concurrent writer fails immediately with
@@ -258,20 +260,34 @@ pub fn open(app: &AppHandle) -> Result<Connection> {
258260
"#,
259261
)?;
260262

261-
// Schema v3 gave `commits` `repo_id` + `sort_ts` for the windowed
262-
// timeline; schema v4 adds `first_seen_generation` for the
263-
// generation/MVCC-lite snapshot model. `commits` is a pure cache the
264-
// scanner refills, so each step drops & recreates it rather than
265-
// ALTERing — `repos` (tombstones, user state) and `meta` (which holds
266-
// the generation counter itself) are untouched.
267-
let schema_ver: i64 = conn
268-
.query_row("PRAGMA user_version", [], |r| r.get(0))
269-
.unwrap_or(0);
270-
if schema_ver < 4 {
271-
conn.execute_batch("DROP TABLE IF EXISTS commits;")?;
272-
conn.execute_batch(COMMITS_SCHEMA_V4)?;
263+
// Schema v3 gave `commits` `repo_id` + `sort_ts`; v4 added
264+
// `first_seen_generation`; v5 re-keys the timeline index to
265+
// `(sort_ts, repo_path, hash)` — `(repo_path, hash)` is the table's
266+
// primary key, a genuinely unique + stable total order (`repo_id` can
267+
// be 0 mid-discovery and is mutable, so it must not key the cursor).
268+
// `commits` is a pure cache the scanner refills, so each step drops &
269+
// recreates it; `repos` / `meta` (user state, the generation counter)
270+
// are untouched.
271+
//
272+
// The whole migration runs inside an IMMEDIATE transaction so that
273+
// concurrent `open()` calls (file watcher, orchestrator, IPC workers)
274+
// serialize: the first takes the write lock and migrates, the rest
275+
// block then re-read `user_version` inside their own transaction and
276+
// skip. Without it two opens could both see `user_version < 5` and
277+
// race the DROP / CREATE.
278+
{
279+
let tx = conn
280+
.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
281+
let schema_ver: i64 = tx
282+
.query_row("PRAGMA user_version", [], |r| r.get(0))
283+
.unwrap_or(0);
284+
if schema_ver < 5 {
285+
tx.execute_batch("DROP TABLE IF EXISTS commits;")?;
286+
tx.execute_batch(COMMITS_SCHEMA_V5)?;
287+
tx.pragma_update(None, "user_version", 5_i64)?;
288+
}
289+
tx.commit()?;
273290
}
274-
let _ = conn.pragma_update(None, "user_version", 4_i64);
275291

276292
// v0.1.2 cleanup: the v0.1.1 orchestrator used `fs::canonicalize`
277293
// and stored its `\\?\…` Windows extended-length output as
@@ -645,14 +661,15 @@ pub fn list_recent_commits(
645661

646662
// ----- timeline window queries (Phase 1: windowed-pull) -----
647663

648-
/// A keyset cursor into the timeline's total order. `(sort_ts, repo_id,
649-
/// hash)` is unique across `commits`, so a cursor points unambiguously
650-
/// between two rows and stays valid under concurrent scanner inserts.
664+
/// A keyset cursor into the timeline's total order. `(sort_ts, repo_path,
665+
/// hash)` is unique across `commits` — `(repo_path, hash)` is the table's
666+
/// primary key — so a cursor points unambiguously between two rows and
667+
/// stays valid + stable under concurrent scanner inserts.
651668
#[derive(Debug, Clone, Serialize, Deserialize)]
652669
#[serde(rename_all = "camelCase")]
653670
pub struct Cursor {
654671
pub sort_ts: i64,
655-
pub repo_id: i64,
672+
pub repo_path: String,
656673
pub hash: String,
657674
}
658675

@@ -695,20 +712,21 @@ pub struct CommitWindow {
695712
pub has_older: bool,
696713
}
697714

698-
/// 16 columns: the 15 CommitSummary fields plus `repo_id` for the cursor.
715+
/// The 15 CommitSummary fields. The cursor's `repo_path` + `hash` are reused
716+
/// from here (col 0 + col 2); `sort_ts` is derived (= -timestamp).
699717
const TIMELINE_COLS: &str = "repo_path, repo_name, hash, short_hash, summary, \
700718
author, email, timestamp, branch_label, is_merge, is_tagged, parents, \
701-
message, remote_tip_label, remote_tip_extra_count, repo_id";
719+
message, remote_tip_label, remote_tip_extra_count";
702720

703721
/// Map a row selecting `TIMELINE_COLS` to a CommitSummary + its cursor.
704722
fn row_to_window_item(row: &rusqlite::Row) -> rusqlite::Result<(CommitSummary, Cursor)> {
705723
let parents_json: String = row.get(11)?;
706724
let parents: Vec<String> = serde_json::from_str(&parents_json).unwrap_or_default();
707725
let timestamp: i64 = row.get(7)?;
708-
let repo_id: i64 = row.get(15)?;
726+
let repo_path: String = row.get(0)?;
709727
let hash: String = row.get(2)?;
710728
let summary = CommitSummary {
711-
repo_path: row.get(0)?,
729+
repo_path: repo_path.clone(),
712730
repo_name: row.get(1)?,
713731
hash: hash.clone(),
714732
short_hash: row.get(3)?,
@@ -726,7 +744,7 @@ fn row_to_window_item(row: &rusqlite::Row) -> rusqlite::Result<(CommitSummary, C
726744
};
727745
let cursor = Cursor {
728746
sort_ts: -timestamp,
729-
repo_id,
747+
repo_path,
730748
hash,
731749
};
732750
Ok((summary, cursor))
@@ -744,8 +762,11 @@ fn build_filter_sql(filters: &TimelineFilters) -> (String, Vec<Box<dyn rusqlite:
744762

745763
if let Some(since) = filters.since {
746764
if since > 0 {
747-
sql.push_str(" AND timestamp >= ?");
748-
params.push(Box::new(since));
765+
// sort_ts = -timestamp and leads idx_commits_timeline, so a
766+
// sort_ts bound is an index range — `timestamp >= ?` would
767+
// only ever be a residual filter.
768+
sql.push_str(" AND sort_ts <= ?");
769+
params.push(Box::new(-since));
749770
}
750771
}
751772
if let Some(repo_ids) = &filters.repo_ids {
@@ -806,15 +827,17 @@ pub fn list_commits_window(
806827

807828
let mut sql = format!("SELECT {TIMELINE_COLS} FROM commits WHERE 1=1{filter_sql}");
808829
if cursor.is_some() {
809-
sql.push_str(&format!(" AND (sort_ts, repo_id, hash) {keyset_op} (?, ?, ?)"));
830+
sql.push_str(&format!(
831+
" AND (sort_ts, repo_path, hash) {keyset_op} (?, ?, ?)"
832+
));
810833
}
811834
sql.push_str(&format!(
812-
" ORDER BY sort_ts {order}, repo_id {order}, hash {order} LIMIT ?"
835+
" ORDER BY sort_ts {order}, repo_path {order}, hash {order} LIMIT ?"
813836
));
814837

815838
if let Some(c) = cursor {
816839
bind.push(Box::new(c.sort_ts));
817-
bind.push(Box::new(c.repo_id));
840+
bind.push(Box::new(c.repo_path.clone()));
818841
bind.push(Box::new(c.hash.clone()));
819842
}
820843
bind.push(Box::new(keep as i64));
@@ -878,10 +901,10 @@ fn count_newer_than(
878901
let (filter_sql, mut bind) = build_filter_sql(filters);
879902
let sql = format!(
880903
"SELECT COUNT(*) FROM commits WHERE 1=1{filter_sql} \
881-
AND (sort_ts, repo_id, hash) < (?, ?, ?)"
904+
AND (sort_ts, repo_path, hash) < (?, ?, ?)"
882905
);
883906
bind.push(Box::new(cursor.sort_ts));
884-
bind.push(Box::new(cursor.repo_id));
907+
bind.push(Box::new(cursor.repo_path.clone()));
885908
bind.push(Box::new(cursor.hash.clone()));
886909
let n: i64 =
887910
conn.query_row(&sql, rusqlite::params_from_iter(bind.iter()), |r| r.get(0))?;
@@ -986,8 +1009,10 @@ pub fn list_commits_around_anchor(
9861009
anchor: &Cursor,
9871010
before: usize,
9881011
after: usize,
1012+
anchor_rank: Option<i64>,
9891013
) -> Result<CommitAround> {
9901014
let newer = list_commits_window(conn, filters, Some(anchor), WindowDirection::Newer, before)?;
1015+
let newer_count = newer.rows.len() as i64;
9911016
let older = list_commits_window(conn, filters, Some(anchor), WindowDirection::Older, after)?;
9921017

9931018
// The window queries are strict (`<` / `>`), so the anchor row itself
@@ -996,11 +1021,11 @@ pub fn list_commits_around_anchor(
9961021
let (filter_sql, filter_bind) = build_filter_sql(filters);
9971022
let anchor_sql = format!(
9981023
"SELECT {TIMELINE_COLS} FROM commits \
999-
WHERE sort_ts = ? AND repo_id = ? AND hash = ?{filter_sql}"
1024+
WHERE sort_ts = ? AND repo_path = ? AND hash = ?{filter_sql}"
10001025
);
10011026
let mut anchor_bind: Vec<Box<dyn rusqlite::ToSql>> = vec![
10021027
Box::new(anchor.sort_ts),
1003-
Box::new(anchor.repo_id),
1028+
Box::new(anchor.repo_path.clone()),
10041029
Box::new(anchor.hash.clone()),
10051030
];
10061031
anchor_bind.extend(filter_bind);
@@ -1032,11 +1057,15 @@ pub fn list_commits_around_anchor(
10321057
.or(if anchor_found { Some(anchor.clone()) } else { None })
10331058
.or(newer.end_cursor);
10341059

1035-
// `rows[0]`'s rank = the count of commits ahead of it in the total
1036-
// order. Lets the UI drop this window into a `count`-tall scroll space.
1037-
let base_index = match &start_cursor {
1038-
Some(c) => count_newer_than(conn, filters, c)?,
1039-
None => 0,
1060+
// `rows[0]`'s global rank. When the caller resolved the anchor by rank
1061+
// (`list_commits_at_rank`), rows[0] sits exactly `newer_count` rows
1062+
// above it — no scan needed. Otherwise count the commits ahead of it.
1063+
let base_index = match anchor_rank {
1064+
Some(rank) => (rank - newer_count).max(0),
1065+
None => match &start_cursor {
1066+
Some(c) => count_newer_than(conn, filters, c)?,
1067+
None => 0,
1068+
},
10401069
};
10411070

10421071
Ok(CommitAround {
@@ -1062,14 +1091,14 @@ pub fn cursor_at_rank(
10621091
) -> Result<Option<Cursor>> {
10631092
let (filter_sql, mut bind) = build_filter_sql(filters);
10641093
let sql = format!(
1065-
"SELECT sort_ts, repo_id, hash FROM commits WHERE 1=1{filter_sql} \
1066-
ORDER BY sort_ts ASC, repo_id ASC, hash ASC LIMIT 1 OFFSET ?"
1094+
"SELECT sort_ts, repo_path, hash FROM commits WHERE 1=1{filter_sql} \
1095+
ORDER BY sort_ts ASC, repo_path ASC, hash ASC LIMIT 1 OFFSET ?"
10671096
);
10681097
bind.push(Box::new(rank.max(0)));
10691098
match conn.query_row(&sql, rusqlite::params_from_iter(bind.iter()), |r| {
10701099
Ok(Cursor {
10711100
sort_ts: r.get(0)?,
1072-
repo_id: r.get(1)?,
1101+
repo_path: r.get(1)?,
10731102
hash: r.get(2)?,
10741103
})
10751104
}) {
@@ -1091,7 +1120,9 @@ pub fn list_commits_at_rank(
10911120
after: usize,
10921121
) -> Result<CommitAround> {
10931122
match cursor_at_rank(conn, filters, rank)? {
1094-
Some(anchor) => list_commits_around_anchor(conn, filters, &anchor, before, after),
1123+
Some(anchor) => {
1124+
list_commits_around_anchor(conn, filters, &anchor, before, after, Some(rank))
1125+
}
10951126
None => Ok(CommitAround {
10961127
rows: Vec::new(),
10971128
anchor_found: false,
@@ -1211,7 +1242,7 @@ mod tests {
12111242
/// A fresh in-memory DB carrying just the v4 `commits` schema.
12121243
fn test_db() -> Connection {
12131244
let conn = Connection::open_in_memory().unwrap();
1214-
conn.execute_batch(COMMITS_SCHEMA_V4).unwrap();
1245+
conn.execute_batch(COMMITS_SCHEMA_V5).unwrap();
12151246
conn
12161247
}
12171248

@@ -1220,7 +1251,7 @@ mod tests {
12201251
/// lookup and the `meta` generation counter respectively.
12211252
fn upsert_test_db() -> Connection {
12221253
let conn = Connection::open_in_memory().unwrap();
1223-
conn.execute_batch(COMMITS_SCHEMA_V4).unwrap();
1254+
conn.execute_batch(COMMITS_SCHEMA_V5).unwrap();
12241255
conn.execute_batch(
12251256
"CREATE TABLE repos (path TEXT PRIMARY KEY, name TEXT NOT NULL);\
12261257
CREATE TABLE meta (key TEXT PRIMARY KEY, value TEXT NOT NULL, \
@@ -1324,7 +1355,7 @@ mod tests {
13241355
fn identical_timestamps_keep_a_stable_total_order() {
13251356
let conn = test_db();
13261357
// 30 commits sharing one timestamp across 3 repos — the
1327-
// (sort_ts, repo_id, hash) total order must still paginate cleanly.
1358+
// (sort_ts, repo_path, hash) total order must still paginate cleanly.
13281359
for repo in 1..=3 {
13291360
for i in 0..10 {
13301361
insert(&conn, repo, &format!("r{repo}c{i}"), 5_000, "bob");
@@ -1381,11 +1412,11 @@ mod tests {
13811412
}
13821413
let anchor = Cursor {
13831414
sort_ts: -2_020,
1384-
repo_id: 1,
1415+
repo_path: "/repo/1".to_string(),
13851416
hash: "c20".to_string(),
13861417
};
13871418
let around =
1388-
list_commits_around_anchor(&conn, &TimelineFilters::default(), &anchor, 5, 5).unwrap();
1419+
list_commits_around_anchor(&conn, &TimelineFilters::default(), &anchor, 5, 5, None).unwrap();
13891420
assert!(around.anchor_found);
13901421
assert_eq!(around.rows.len(), 11, "5 newer + anchor + 5 older");
13911422
assert_eq!(around.rows[0].hash.as_str(), "c25");
@@ -1403,11 +1434,11 @@ mod tests {
14031434
// d10) is still a valid position in the total order.
14041435
let anchor = Cursor {
14051436
sort_ts: -3_095,
1406-
repo_id: 1,
1437+
repo_path: "/repo/1".to_string(),
14071438
hash: "zzz".to_string(),
14081439
};
14091440
let around =
1410-
list_commits_around_anchor(&conn, &TimelineFilters::default(), &anchor, 3, 3).unwrap();
1441+
list_commits_around_anchor(&conn, &TimelineFilters::default(), &anchor, 3, 3, None).unwrap();
14111442
assert!(!around.anchor_found);
14121443
assert_eq!(around.rows.len(), 6, "3 newer + 3 older, no anchor row");
14131444
}
@@ -1455,11 +1486,11 @@ mod tests {
14551486
// g20 sits at rank 19 (g39 = rank 0); window rows[0] = g25 at rank 14.
14561487
let anchor = Cursor {
14571488
sort_ts: -6_020,
1458-
repo_id: 1,
1489+
repo_path: "/repo/1".to_string(),
14591490
hash: "g20".to_string(),
14601491
};
14611492
let around =
1462-
list_commits_around_anchor(&conn, &TimelineFilters::default(), &anchor, 5, 5).unwrap();
1493+
list_commits_around_anchor(&conn, &TimelineFilters::default(), &anchor, 5, 5, None).unwrap();
14631494
assert_eq!(around.base_index, 14);
14641495
assert_eq!(around.rows[0].hash, "g25");
14651496
}

src-tauri/src/commands.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ pub async fn list_commits_around_anchor(
156156
) -> Result<cache::CommitAround, String> {
157157
tauri::async_runtime::spawn_blocking(move || -> Result<cache::CommitAround, String> {
158158
let conn = cache::open(&app).map_err(|e| e.to_string())?;
159-
cache::list_commits_around_anchor(&conn, &filters, &anchor, before, after)
159+
cache::list_commits_around_anchor(&conn, &filters, &anchor, before, after, None)
160160
.map_err(|e| e.to_string())
161161
})
162162
.await
@@ -848,9 +848,13 @@ pub async fn discover_repos(app: AppHandle) -> Result<usize, String> {
848848
.collect::<Vec<_>>();
849849

850850
if !commits.is_empty() {
851-
let outcome = cache::open(&app)
852-
.ok()
853-
.and_then(|mut conn| cache::upsert_commits(&mut conn, &commits).ok());
851+
let outcome = cache::open(&app).ok().and_then(|mut conn| {
852+
// Upsert the repo row FIRST so upsert_commits can
853+
// resolve a real repo_id — its COALESCE falls back
854+
// to 0 when the repos row does not exist yet.
855+
cache::upsert_repos(&mut conn, std::slice::from_ref(&repo)).ok()?;
856+
cache::upsert_commits(&mut conn, &commits).ok()
857+
});
854858
// Lightweight windowed-pull signal — the frontend
855859
// re-pulls the affected windows from the cache.
856860
if let Some(o) = outcome {

src/App.tsx

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -650,24 +650,18 @@ function App() {
650650
return commits.filter((c) => set.has(c.author));
651651
}, [commits, selectedAuthors]);
652652

653-
// repoPath → backend repo id (`Repo.id`). Shared by the multi-repo filter
654-
// resolution and the windowed timeline's filter-change cursor recovery.
655-
const repoIdByPath = useMemo(
656-
() => new Map(allRepos.map((r) => [r.path, r.id])),
657-
[allRepos],
658-
);
659-
660653
// Resolve the multi-repo path filter to backend repo ids for the
661654
// windowed timeline. id 0 (a just-discovered repo not yet refreshed via
662655
// listRepos) is dropped; a selection that resolves to no usable ids
663656
// falls back to "all repos" rather than showing nothing.
664657
const repoIds = useMemo<number[] | null>(() => {
665658
if (selectedRepoPaths === "all") return null;
659+
const byPath = new Map(allRepos.map((r) => [r.path, r.id]));
666660
const ids = selectedRepoPaths
667-
.map((p) => repoIdByPath.get(p))
661+
.map((p) => byPath.get(p))
668662
.filter((id): id is number => id != null && id > 0);
669663
return ids.length > 0 ? ids : null;
670-
}, [selectedRepoPaths, repoIdByPath]);
664+
}, [selectedRepoPaths, allRepos]);
671665

672666
function togglePin(path: string) {
673667
setPinnedRepos((prev) => {
@@ -789,7 +783,6 @@ function App() {
789783
authors={selectedAuthors === "all" ? null : selectedAuthors}
790784
windowDays={toWindowParam(windowDays)}
791785
refreshNonce={refreshNonce}
792-
repoIdByPath={repoIdByPath}
793786
onSelectRepo={setSelectedRepoPath}
794787
/>
795788
)}

0 commit comments

Comments
 (0)