Skip to content

Commit 497f984

Browse files
authored
perf(db): migrate build pipeline writes to NativeDatabase (6.15) (#669)
* perf(db): add NativeDatabase napi-rs class for rusqlite connection lifecycle (6.13) Foundation for moving all DB operations to rusqlite on the native engine path. Creates a persistent rusqlite::Connection holder exposed to JS, handling schema migrations and build metadata KV — eliminating redundant per-call connection open/close in the native build pipeline. * perf(db): migrate build pipeline writes to NativeDatabase persistent connection (6.15) Consolidate all build-pipeline write operations into NativeDatabase methods that reuse a single persistent rusqlite connection, eliminating the per-call connection open/close overhead from the standalone functions. Rust: Refactor standalone modules (insert_nodes, edges_db, ast_db, roles_db) to expose pub(crate) functions accepting &Connection, then add wrapper methods on NativeDatabase: bulkInsertNodes, bulkInsertEdges, bulkInsertAstNodes, classifyRolesFull, classifyRolesIncremental, and purgeFilesData. Standalone #[napi] functions preserved for backward compatibility. TypeScript: Wire all build stages (insert-nodes, build-edges, build-structure, detect-changes, ast) to prefer ctx.nativeDb methods when available, falling back to standalone native functions then JS. Thread nativeDb through EngineOpts for analysis-phase AST insertion. * fix: rename do_insert to do_insert_nodes and add error logging before .is_ok() (#669)
1 parent 8974514 commit 497f984

13 files changed

Lines changed: 316 additions & 65 deletions

File tree

crates/codegraph-core/src/ast_db.rs

Lines changed: 28 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -74,35 +74,42 @@ pub fn bulk_insert_ast_nodes(db_path: String, batches: Vec<FileAstBatch>) -> u32
7474
}
7575

7676
let flags = OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_NO_MUTEX;
77-
let mut conn = match Connection::open_with_flags(&db_path, flags) {
77+
let conn = match Connection::open_with_flags(&db_path, flags) {
7878
Ok(c) => c,
7979
Err(_) => return 0,
8080
};
8181

8282
// Match the JS-side performance pragmas (including busy_timeout for WAL contention)
83-
let _ = conn.execute_batch(
84-
"PRAGMA synchronous = NORMAL; PRAGMA busy_timeout = 5000",
85-
);
83+
let _ = conn.execute_batch("PRAGMA synchronous = NORMAL; PRAGMA busy_timeout = 5000");
84+
85+
do_insert_ast_nodes(&conn, &batches).unwrap_or(0)
86+
}
87+
88+
/// Internal implementation: insert AST nodes using an existing connection.
89+
/// Used by both the standalone `bulk_insert_ast_nodes` function and `NativeDatabase`.
90+
pub(crate) fn do_insert_ast_nodes(
91+
conn: &Connection,
92+
batches: &[FileAstBatch],
93+
) -> rusqlite::Result<u32> {
94+
if batches.is_empty() {
95+
return Ok(0);
96+
}
8697

8798
// Bail out if the ast_nodes table doesn't exist (schema too old)
8899
let has_table: bool = conn
89100
.prepare("SELECT 1 FROM sqlite_master WHERE type='table' AND name='ast_nodes'")
90101
.and_then(|mut s| s.query_row([], |_| Ok(true)))
91102
.unwrap_or(false);
92103
if !has_table {
93-
return 0;
104+
return Ok(0);
94105
}
95106

96107
// ── Phase 1: Pre-fetch node definitions for parent resolution ────────
97108
let mut file_defs: HashMap<String, Vec<NodeDef>> = HashMap::new();
98109
{
99-
let Ok(mut stmt) =
100-
conn.prepare("SELECT id, line, end_line FROM nodes WHERE file = ?1")
101-
else {
102-
return 0;
103-
};
110+
let mut stmt = conn.prepare("SELECT id, line, end_line FROM nodes WHERE file = ?1")?;
104111

105-
for batch in &batches {
112+
for batch in batches {
106113
if batch.nodes.is_empty() || file_defs.contains_key(&batch.file) {
107114
continue;
108115
}
@@ -118,48 +125,39 @@ pub fn bulk_insert_ast_nodes(db_path: String, batches: Vec<FileAstBatch>) -> u32
118125
.unwrap_or_default();
119126
file_defs.insert(batch.file.clone(), defs);
120127
}
121-
} // `stmt` dropped — releases the immutable borrow on `conn`
128+
}
122129

123130
// ── Phase 2: Bulk insert in a single transaction ─────────────────────
124-
let Ok(tx) = conn.transaction() else {
125-
return 0;
126-
};
131+
let tx = conn.unchecked_transaction()?;
127132

128133
let mut total = 0u32;
129134
{
130-
let Ok(mut insert_stmt) = tx.prepare(
135+
let mut insert_stmt = tx.prepare(
131136
"INSERT INTO ast_nodes (file, line, kind, name, text, receiver, parent_node_id) \
132137
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
133-
) else {
134-
return 0;
135-
};
138+
)?;
136139

137-
for batch in &batches {
140+
for batch in batches {
138141
let empty = Vec::new();
139142
let defs = file_defs.get(&batch.file).unwrap_or(&empty);
140143

141144
for node in &batch.nodes {
142145
let parent_id = find_parent_id(defs, node.line);
143146

144-
match insert_stmt.execute(params![
147+
insert_stmt.execute(params![
145148
&batch.file,
146149
node.line,
147150
&node.kind,
148151
&node.name,
149152
&node.text,
150153
&node.receiver,
151154
parent_id,
152-
]) {
153-
Ok(_) => total += 1,
154-
Err(_) => return 0, // abort; tx rolls back on drop
155-
}
155+
])?;
156+
total += 1;
156157
}
157158
}
158-
} // `insert_stmt` dropped
159-
160-
if tx.commit().is_err() {
161-
return 0;
162159
}
163160

164-
total
161+
tx.commit()?;
162+
Ok(total)
165163
}

crates/codegraph-core/src/edges_db.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,20 +32,20 @@ pub fn bulk_insert_edges(db_path: String, edges: Vec<EdgeRow>) -> bool {
3232
return true;
3333
}
3434
let flags = OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_NO_MUTEX;
35-
let mut conn = match Connection::open_with_flags(&db_path, flags) {
35+
let conn = match Connection::open_with_flags(&db_path, flags) {
3636
Ok(c) => c,
3737
Err(_) => return false,
3838
};
3939
let _ = conn.execute_batch("PRAGMA synchronous = NORMAL; PRAGMA busy_timeout = 5000");
40-
do_insert(&mut conn, &edges).is_ok()
40+
do_insert_edges(&conn, &edges).is_ok()
4141
}
4242

4343
/// 199 rows × 5 params = 995 bind parameters per statement, safely under
4444
/// the legacy `SQLITE_MAX_VARIABLE_NUMBER` default of 999.
4545
const CHUNK: usize = 199;
4646

47-
fn do_insert(conn: &mut Connection, edges: &[EdgeRow]) -> rusqlite::Result<()> {
48-
let tx = conn.transaction()?;
47+
pub(crate) fn do_insert_edges(conn: &Connection, edges: &[EdgeRow]) -> rusqlite::Result<()> {
48+
let tx = conn.unchecked_transaction()?;
4949

5050
for chunk in edges.chunks(CHUNK) {
5151
let placeholders: Vec<String> = (0..chunk.len())

crates/codegraph-core/src/insert_nodes.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,14 +76,14 @@ pub fn bulk_insert_nodes(
7676
removed_files: Vec<String>,
7777
) -> bool {
7878
let flags = OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_NO_MUTEX;
79-
let mut conn = match Connection::open_with_flags(&db_path, flags) {
79+
let conn = match Connection::open_with_flags(&db_path, flags) {
8080
Ok(c) => c,
8181
Err(_) => return false,
8282
};
8383

8484
let _ = conn.execute_batch("PRAGMA synchronous = NORMAL; PRAGMA busy_timeout = 5000");
8585

86-
do_insert(&mut conn, &batches, &file_hashes, &removed_files).is_ok()
86+
do_insert_nodes(&conn, &batches, &file_hashes, &removed_files).is_ok()
8787
}
8888

8989
// ── Internal implementation ─────────────────────────────────────────
@@ -108,13 +108,13 @@ fn query_node_ids(
108108
Ok(map)
109109
}
110110

111-
fn do_insert(
112-
conn: &mut Connection,
111+
pub(crate) fn do_insert_nodes(
112+
conn: &Connection,
113113
batches: &[InsertNodesBatch],
114114
file_hashes: &[FileHashEntry],
115115
removed_files: &[String],
116116
) -> rusqlite::Result<()> {
117-
let tx = conn.transaction()?;
117+
let tx = conn.unchecked_transaction()?;
118118

119119
// ── Phase 1: Insert file nodes + definitions + export nodes ──────
120120
{

crates/codegraph-core/src/native_db.rs

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@ use napi_derive::napi;
1111
use rusqlite::{params, Connection, OpenFlags};
1212
use send_wrapper::SendWrapper;
1313

14+
use crate::ast_db::{self, FileAstBatch};
15+
use crate::edges_db::{self, EdgeRow};
16+
use crate::insert_nodes::{self, FileHashEntry, InsertNodesBatch};
17+
use crate::roles_db::{self, RoleSummary};
18+
1419
// ── Migration DDL (mirrored from src/db/migrations.ts) ──────────────────
1520

1621
struct Migration {
@@ -543,6 +548,120 @@ impl NativeDatabase {
543548
.map_err(|e| napi::Error::from_reason(format!("commit setBuildMeta failed: {e}")))?;
544549
Ok(())
545550
}
551+
552+
// ── Phase 6.15: Build pipeline write operations ─────────────────────
553+
554+
/// Bulk-insert nodes, children, containment edges, exports, and file hashes.
555+
/// Reuses the persistent connection instead of opening a new one.
556+
/// Returns `true` on success, `false` on failure.
557+
#[napi]
558+
pub fn bulk_insert_nodes(
559+
&self,
560+
batches: Vec<InsertNodesBatch>,
561+
file_hashes: Vec<FileHashEntry>,
562+
removed_files: Vec<String>,
563+
) -> napi::Result<bool> {
564+
let conn = self.conn()?;
565+
Ok(insert_nodes::do_insert_nodes(conn, &batches, &file_hashes, &removed_files)
566+
.inspect_err(|e| eprintln!("[NativeDatabase] bulk_insert_nodes failed: {e}"))
567+
.is_ok())
568+
}
569+
570+
/// Bulk-insert edge rows using chunked multi-value INSERT statements.
571+
/// Returns `true` on success, `false` on failure.
572+
#[napi]
573+
pub fn bulk_insert_edges(&self, edges: Vec<EdgeRow>) -> napi::Result<bool> {
574+
if edges.is_empty() {
575+
return Ok(true);
576+
}
577+
let conn = self.conn()?;
578+
Ok(edges_db::do_insert_edges(conn, &edges)
579+
.inspect_err(|e| eprintln!("[NativeDatabase] bulk_insert_edges failed: {e}"))
580+
.is_ok())
581+
}
582+
583+
/// Bulk-insert AST nodes, resolving parent_node_id from the nodes table.
584+
/// Returns the number of rows inserted (0 on failure).
585+
#[napi]
586+
pub fn bulk_insert_ast_nodes(&self, batches: Vec<FileAstBatch>) -> napi::Result<u32> {
587+
let conn = self.conn()?;
588+
Ok(ast_db::do_insert_ast_nodes(conn, &batches).unwrap_or(0))
589+
}
590+
591+
/// Full role classification: queries all nodes, computes fan-in/fan-out,
592+
/// classifies roles, and batch-updates the `role` column.
593+
#[napi]
594+
pub fn classify_roles_full(&self) -> napi::Result<Option<RoleSummary>> {
595+
let conn = self.conn()?;
596+
Ok(roles_db::do_classify_full(conn).ok())
597+
}
598+
599+
/// Incremental role classification: only reclassifies nodes from changed
600+
/// files plus their immediate edge neighbours.
601+
#[napi]
602+
pub fn classify_roles_incremental(
603+
&self,
604+
changed_files: Vec<String>,
605+
) -> napi::Result<Option<RoleSummary>> {
606+
let conn = self.conn()?;
607+
Ok(roles_db::do_classify_incremental(conn, &changed_files).ok())
608+
}
609+
610+
/// Cascade-delete all graph data for the specified files across all tables.
611+
/// Order: dependent tables first (embeddings, cfg, dataflow, complexity,
612+
/// metrics, ast_nodes), then edges, then nodes, then optionally file_hashes.
613+
#[napi]
614+
pub fn purge_files_data(
615+
&self,
616+
files: Vec<String>,
617+
purge_hashes: Option<bool>,
618+
) -> napi::Result<()> {
619+
if files.is_empty() {
620+
return Ok(());
621+
}
622+
let conn = self.conn()?;
623+
let purge_hashes = purge_hashes.unwrap_or(true);
624+
625+
let tx = conn
626+
.unchecked_transaction()
627+
.map_err(|e| napi::Error::from_reason(format!("purge transaction failed: {e}")))?;
628+
629+
// Purge each file across all tables. Optional tables are silently
630+
// skipped if they don't exist. Order: dependents → edges → nodes → hashes.
631+
let purge_sql: &[(&str, bool)] = &[
632+
("DELETE FROM embeddings WHERE node_id IN (SELECT id FROM nodes WHERE file = ?1)", false),
633+
("DELETE FROM cfg_edges WHERE function_node_id IN (SELECT id FROM nodes WHERE file = ?1)", false),
634+
("DELETE FROM cfg_blocks WHERE function_node_id IN (SELECT id FROM nodes WHERE file = ?1)", false),
635+
("DELETE FROM dataflow WHERE source_id IN (SELECT id FROM nodes WHERE file = ?1) OR target_id IN (SELECT id FROM nodes WHERE file = ?1)", false),
636+
("DELETE FROM function_complexity WHERE node_id IN (SELECT id FROM nodes WHERE file = ?1)", false),
637+
("DELETE FROM node_metrics WHERE node_id IN (SELECT id FROM nodes WHERE file = ?1)", false),
638+
("DELETE FROM ast_nodes WHERE file = ?1", false),
639+
// Core tables — errors propagated
640+
("DELETE FROM edges WHERE source_id IN (SELECT id FROM nodes WHERE file = ?1) OR target_id IN (SELECT id FROM nodes WHERE file = ?1)", true),
641+
("DELETE FROM nodes WHERE file = ?1", true),
642+
];
643+
644+
for file in &files {
645+
for &(sql, required) in purge_sql {
646+
match tx.execute(sql, params![file]) {
647+
Ok(_) => {}
648+
Err(e) if required => {
649+
return Err(napi::Error::from_reason(format!(
650+
"purge failed for \"{file}\": {e}"
651+
)));
652+
}
653+
Err(_) => {} // optional table missing — skip
654+
}
655+
}
656+
if purge_hashes {
657+
let _ = tx.execute("DELETE FROM file_hashes WHERE file = ?1", params![file]);
658+
}
659+
}
660+
661+
tx.commit()
662+
.map_err(|e| napi::Error::from_reason(format!("purge commit failed: {e}")))?;
663+
Ok(())
664+
}
546665
}
547666

548667
// ── Private helpers ─────────────────────────────────────────────────────

crates/codegraph-core/src/roles_db.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,9 @@ pub struct RoleSummary {
7474
#[napi]
7575
pub fn classify_roles_full(db_path: String) -> Option<RoleSummary> {
7676
let flags = OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_NO_MUTEX;
77-
let mut conn = Connection::open_with_flags(&db_path, flags).ok()?;
77+
let conn = Connection::open_with_flags(&db_path, flags).ok()?;
7878
let _ = conn.execute_batch("PRAGMA synchronous = NORMAL; PRAGMA busy_timeout = 5000");
79-
do_classify_full(&mut conn).ok()
79+
do_classify_full(&conn).ok()
8080
}
8181

8282
/// Incremental role classification: only reclassifies nodes from changed files
@@ -88,9 +88,9 @@ pub fn classify_roles_incremental(
8888
changed_files: Vec<String>,
8989
) -> Option<RoleSummary> {
9090
let flags = OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_NO_MUTEX;
91-
let mut conn = Connection::open_with_flags(&db_path, flags).ok()?;
91+
let conn = Connection::open_with_flags(&db_path, flags).ok()?;
9292
let _ = conn.execute_batch("PRAGMA synchronous = NORMAL; PRAGMA busy_timeout = 5000");
93-
do_classify_incremental(&mut conn, &changed_files).ok()
93+
do_classify_incremental(&conn, &changed_files).ok()
9494
}
9595

9696
// ── Shared helpers ───────────────────────────────────────────────────
@@ -228,8 +228,8 @@ fn batch_update_roles(
228228

229229
// ── Full classification ──────────────────────────────────────────────
230230

231-
fn do_classify_full(conn: &mut Connection) -> rusqlite::Result<RoleSummary> {
232-
let tx = conn.transaction()?;
231+
pub(crate) fn do_classify_full(conn: &Connection) -> rusqlite::Result<RoleSummary> {
232+
let tx = conn.unchecked_transaction()?;
233233
let mut summary = RoleSummary::default();
234234

235235
// 1. Leaf kinds → dead-leaf (skip expensive fan-in/fan-out JOINs)
@@ -351,11 +351,11 @@ fn do_classify_full(conn: &mut Connection) -> rusqlite::Result<RoleSummary> {
351351

352352
// ── Incremental classification ───────────────────────────────────────
353353

354-
fn do_classify_incremental(
355-
conn: &mut Connection,
354+
pub(crate) fn do_classify_incremental(
355+
conn: &Connection,
356356
changed_files: &[String],
357357
) -> rusqlite::Result<RoleSummary> {
358-
let tx = conn.transaction()?;
358+
let tx = conn.unchecked_transaction()?;
359359
let mut summary = RoleSummary::default();
360360

361361
// Build placeholders for changed files

src/domain/graph/builder/pipeline.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ function initializeEngine(ctx: PipelineContext): void {
3434
engine: ctx.opts.engine || 'auto',
3535
dataflow: ctx.opts.dataflow !== false,
3636
ast: ctx.opts.ast !== false,
37+
nativeDb: ctx.nativeDb,
3738
};
3839
const { name: engineName, version: engineVersion } = getActiveEngine(ctx.engineOpts);
3940
ctx.engineName = engineName as 'native' | 'wasm';

src/domain/graph/builder/stages/build-edges.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -673,23 +673,30 @@ export async function buildEdges(ctx: PipelineContext): Promise<void> {
673673

674674
// When using native edge insert, skip JS insert here — do it after tx commits.
675675
// Otherwise insert edges within this transaction for atomicity.
676-
if (!native?.bulkInsertEdges) {
676+
const useNativeEdgeInsert = !!(ctx.nativeDb?.bulkInsertEdges || native?.bulkInsertEdges);
677+
if (!useNativeEdgeInsert) {
677678
batchInsertEdges(db, allEdgeRows);
678679
}
679680
});
680681
computeEdgesTx();
681682

682683
// Phase 2: Native rusqlite bulk insert (outside better-sqlite3 transaction
683-
// since rusqlite opens its own connection — avoids SQLITE_BUSY contention)
684-
if (native?.bulkInsertEdges && allEdgeRows.length > 0) {
684+
// to avoid SQLITE_BUSY contention). Prefer NativeDatabase persistent
685+
// connection (6.15), fall back to standalone function (6.12).
686+
if ((ctx.nativeDb?.bulkInsertEdges || native?.bulkInsertEdges) && allEdgeRows.length > 0) {
685687
const nativeEdges = allEdgeRows.map((r) => ({
686688
sourceId: r[0],
687689
targetId: r[1],
688690
kind: r[2],
689691
confidence: r[3],
690692
dynamic: r[4],
691693
}));
692-
const ok = native.bulkInsertEdges(db.name, nativeEdges);
694+
let ok: boolean;
695+
if (ctx.nativeDb?.bulkInsertEdges) {
696+
ok = ctx.nativeDb.bulkInsertEdges(nativeEdges);
697+
} else {
698+
ok = native!.bulkInsertEdges(db.name, nativeEdges);
699+
}
693700
if (!ok) {
694701
debug('Native bulkInsertEdges failed — falling back to JS batchInsertEdges');
695702
batchInsertEdges(db, allEdgeRows);

0 commit comments

Comments
 (0)