Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/fast-pianos-teach.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@tanstack/browser-db-sqlite-persistence": patch
"@tanstack/db-sqlite-persistence-core": patch
---

Fix SQLite expression-index matching for persisted ref filters by inlining JSON-path literals in runtime SQL compilation while keeping actual filter values bound.
13 changes: 8 additions & 5 deletions packages/db-sqlite-persistence-core/src/sqlite-core-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -579,18 +579,21 @@ function compileComparisonSql(
function compileRefExpressionSql(jsonPath: string): CompiledSqlFragment {
const typePath = `${jsonPath}.${PERSISTED_TYPE_TAG}`
const taggedValuePath = `${jsonPath}.${PERSISTED_VALUE_TAG}`
const typePathSql = toSqliteLiteral(typePath)
const taggedValuePathSql = toSqliteLiteral(taggedValuePath)
const jsonPathSql = toSqliteLiteral(jsonPath)

return {
supported: true,
sql: `(CASE json_extract(value, ?)
WHEN 'bigint' THEN CAST(json_extract(value, ?) AS NUMERIC)
WHEN 'date' THEN json_extract(value, ?)
sql: `(CASE json_extract(value, ${typePathSql})
WHEN 'bigint' THEN CAST(json_extract(value, ${taggedValuePathSql}) AS NUMERIC)
WHEN 'date' THEN json_extract(value, ${taggedValuePathSql})
WHEN 'nan' THEN NULL
WHEN 'infinity' THEN NULL
WHEN '-infinity' THEN NULL
ELSE json_extract(value, ?)
ELSE json_extract(value, ${jsonPathSql})
END)`,
params: [typePath, taggedValuePath, taggedValuePath, jsonPath],
params: [],
valueKind: `unknown`,
}
}
Expand Down
118 changes: 118 additions & 0 deletions packages/db-sqlite-persistence-core/tests/sqlite-core-adapter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,36 @@ function registerHarness(
return harness
}

function createQueryObservingDriver(
inner: SQLiteDriver,
observeQuery: (
sql: string,
params: ReadonlyArray<unknown>,
) => void | Promise<void>,
): SQLiteDriver {
const wrap = (driver: SQLiteDriver): SQLiteDriver => {
const transactionWithDriver = driver.transactionWithDriver

return {
exec: async (sql) => driver.exec(sql),
query: async <T>(sql: string, params?: ReadonlyArray<unknown>) => {
await observeQuery(sql, params ?? [])
return driver.query<T>(sql, params)
},
run: async (sql, params) => driver.run(sql, params),
transaction: async <T>(
fn: (transactionDriver: SQLiteDriver) => Promise<T>,
) => driver.transaction((transactionDriver) => fn(wrap(transactionDriver))),
transactionWithDriver: transactionWithDriver
? async <T>(fn: (transactionDriver: SQLiteDriver) => Promise<T>) =>
transactionWithDriver((transactionDriver) => fn(wrap(transactionDriver)))
: undefined,
}
}

return wrap(inner)
}

export type SQLiteCoreAdapterHarnessFactory = (
options?: Omit<
ConstructorParameters<typeof SQLiteCorePersistenceAdapter>[0],
Expand Down Expand Up @@ -1609,6 +1639,94 @@ export function runSQLiteCoreAdapterContractSuite(
)
})

it(`inlines JSON-path refs in runtime subset filters while keeping values bound`, async () => {
const baseHarness = registerHarness()
const collectionId = `thread-messages`
let capturedSubsetSql: string | undefined
let capturedSubsetParams: ReadonlyArray<unknown> = []

const driver = createQueryObservingDriver(
baseHarness.driver,
(sql, params) => {
if (
sql.startsWith(`SELECT key, value, metadata, row_version FROM`) &&
sql.includes(`WHERE`)
) {
capturedSubsetSql = sql
capturedSubsetParams = params
}
},
)
const adapter = new SQLiteCorePersistenceAdapter({ driver })

await adapter.applyCommittedTx(collectionId, {
txId: `thread-messages-seed`,
term: 1,
seq: 1,
rowVersion: 1,
mutations: [
{
type: `insert`,
key: `1`,
value: {
id: `1`,
threadId: `thread-1`,
title: `First`,
createdAt: `2026-01-01T00:00:00.000Z`,
score: 1,
},
},
{
type: `insert`,
key: `2`,
value: {
id: `2`,
threadId: `thread-1`,
title: `Second`,
createdAt: `2026-01-01T00:00:00.000Z`,
score: 2,
},
},
{
type: `insert`,
key: `3`,
value: {
id: `3`,
threadId: `thread-2`,
title: `Other thread`,
createdAt: `2026-01-01T00:00:00.000Z`,
score: 3,
},
},
],
})

await adapter.ensureIndex(collectionId, `thread-id`, {
expressionSql: [
JSON.stringify({
type: `ref`,
path: [`threadId`],
}),
],
})

const rows = await adapter.loadSubset(collectionId, {
where: new IR.Func(`eq`, [
new IR.PropRef([`threadId`]),
new IR.Value(`thread-1`),
]),
})

expect(rows).toHaveLength(2)
expect(rows.map((row) => String(row.key)).sort()).toEqual([`1`, `2`])
expect(capturedSubsetSql).toContain(`WHERE`)
expect(capturedSubsetSql).toContain(
`json_extract(value, '$.threadId.__tanstack_db_persisted_type__')`,
)
expect(capturedSubsetSql).toContain(`json_extract(value, '$.threadId')`)
expect(capturedSubsetParams).toEqual([`thread-1`])
})

it(`rejects unsafe raw SQL fragments in index specs`, async () => {
const { adapter } = registerContractHarness()

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
import { mkdtempSync, rmSync } from 'node:fs'
import { tmpdir } from 'node:os'
import { join } from 'node:path'
import { describe, expect, it } from 'vitest'
import { IR } from '@tanstack/db'
import { runSQLiteCoreAdapterContractSuite } from '../../db-sqlite-persistence-core/tests/contracts/sqlite-core-adapter-contract'
import { BetterSqlite3SQLiteDriver } from '../src/node-driver'
import { SQLiteCorePersistenceAdapter } from '../../db-sqlite-persistence-core/src'
import {
SQLiteCorePersistenceAdapter,
createPersistedTableName,
} from '../../db-sqlite-persistence-core/src'
import type { SQLiteCoreAdapterHarnessFactory } from '../../db-sqlite-persistence-core/tests/contracts/sqlite-core-adapter-contract'
import type { SQLiteDriver } from '../../db-sqlite-persistence-core/src'

const createHarness: SQLiteCoreAdapterHarnessFactory = (options) => {
const tempDirectory = mkdtempSync(join(tmpdir(), `db-node-sqlite-core-`))
Expand Down Expand Up @@ -32,3 +38,155 @@ runSQLiteCoreAdapterContractSuite(
`SQLiteCorePersistenceAdapter (better-sqlite3 node driver)`,
createHarness,
)

function createQueryObservingDriver(
inner: SQLiteDriver,
observeQuery: (
sql: string,
params: ReadonlyArray<unknown>,
) => void | Promise<void>,
): SQLiteDriver {
const wrap = (driver: SQLiteDriver): SQLiteDriver => {
return {
exec: async function (sql) {
return driver.exec(sql)
},
query: async function <T>(
sql: string,
params?: ReadonlyArray<unknown>,
) {
await observeQuery(sql, params ?? [])
return driver.query<T>(sql, params)
},
run: async function (sql, params) {
return driver.run(sql, params)
},
transaction: async function <T>(
fn: (transactionDriver: SQLiteDriver) => Promise<T>,
) {
return driver.transaction((transactionDriver) =>
fn(wrap(transactionDriver)),
)
},
transactionWithDriver: driver.transactionWithDriver
? async function <T>(
fn: (transactionDriver: SQLiteDriver) => Promise<T>,
) {
return driver.transactionWithDriver!((
transactionDriver,
) =>
fn(wrap(transactionDriver)),
)
}
: undefined,
}
}

return wrap(inner)
}

describe(`SQLiteCorePersistenceAdapter planner behavior (better-sqlite3)`, () => {
it(`uses expression indexes for runtime ref filters`, async () => {
const tempDirectory = mkdtempSync(join(tmpdir(), `db-node-sqlite-plan-`))
const dbPath = join(tempDirectory, `state.sqlite`)
const baseDriver = new BetterSqlite3SQLiteDriver({ filename: dbPath })
const collectionId = `thread-messages`
const tableName = createPersistedTableName(collectionId, `c`)
let capturedSubsetSql: string | undefined
let capturedSubsetParams: ReadonlyArray<unknown> = []

const driver = createQueryObservingDriver(baseDriver, (sql, params) => {
if (
sql.startsWith(`SELECT key, value, metadata, row_version FROM`) &&
sql.includes(`WHERE`)
) {
capturedSubsetSql = sql
capturedSubsetParams = params
}
})
const adapter = new SQLiteCorePersistenceAdapter({ driver })

try {
await adapter.applyCommittedTx(collectionId, {
txId: `thread-messages-seed`,
term: 1,
seq: 1,
rowVersion: 1,
mutations: [
{
type: `insert`,
key: `1`,
value: {
id: `1`,
threadId: `thread-1`,
title: `First`,
createdAt: `2026-01-01T00:00:00.000Z`,
score: 1,
},
},
{
type: `insert`,
key: `2`,
value: {
id: `2`,
threadId: `thread-1`,
title: `Second`,
createdAt: `2026-01-01T00:00:00.000Z`,
score: 2,
},
},
{
type: `insert`,
key: `3`,
value: {
id: `3`,
threadId: `thread-2`,
title: `Other thread`,
createdAt: `2026-01-01T00:00:00.000Z`,
score: 3,
},
},
],
})

await adapter.ensureIndex(collectionId, `thread-id`, {
expressionSql: [
JSON.stringify({
type: `ref`,
path: [`threadId`],
}),
],
})

const rows = await adapter.loadSubset(collectionId, {
where: new IR.Func(`eq`, [
new IR.PropRef([`threadId`]),
new IR.Value(`thread-1`),
]),
})

expect(rows).toHaveLength(2)
expect(capturedSubsetSql).toContain(`WHERE`)
expect(capturedSubsetParams).toEqual([`thread-1`])

const planRows = baseDriver
.getDatabase()
.prepare(`EXPLAIN QUERY PLAN ${capturedSubsetSql}`)
.all(...capturedSubsetParams) as Array<{ detail: string }>
const indexedSearchPattern = new RegExp(
`\\bSEARCH ${tableName}\\b.*\\bUSING INDEX\\b`,
)

expect(planRows.map((row) => row.detail).length).toBeGreaterThan(0)
expect(
planRows.some((row) => indexedSearchPattern.test(row.detail)),
).toBe(true)
expect(
planRows.some((row) => row.detail.startsWith(`SCAN ${tableName}`)),
).toBe(false)
} finally {
baseDriver.close()
rmSync(tempDirectory, { recursive: true, force: true })
}
})
})