Skip to content

fix: persist GSI queue to PostgreSQL for crash safety#128

Open
LeeroyHannigan wants to merge 5 commits into
mainfrom
fix/persistent-gsi-queue
Open

fix: persist GSI queue to PostgreSQL for crash safety#128
LeeroyHannigan wants to merge 5 commits into
mainfrom
fix/persistent-gsi-queue

Conversation

@LeeroyHannigan

@LeeroyHannigan LeeroyHannigan commented May 25, 2026

Copy link
Copy Markdown
Collaborator

What

Replaces the in-memory VecDeque GSI propagation queue with a PostgreSQL-backed persistent, self-describing queue (gsi_pending). Pending GSI updates are enqueued in the same transaction as the base write and applied by background workers, surviving process crash/restart with no lost updates.

Key design:

  • New gsi_pending table (data migration 002_gsi_pending.sql).
  • Self-describing rows: each row carries an index_context snapshot (base key schema, attribute definitions, target index defs) captured at enqueue, so workers apply with zero catalog reads, no metadata cache. A GSI's key schema/projection are immutable after creation, so the snapshot is never stale.
  • Crash-safe apply: a worker claims, applies, and deletes each row in a single transaction (SELECT … FOR UPDATE SKIP LOCKED → index writes → DELETECOMMIT). A crash or error before commit rolls the unit back and the row is reprocessed.
  • Per-key FIFO ordering: rows are partitioned by a stable hash of the base key; each worker owns one partition and drains it in id order, so successive updates to one item apply in order (matching the old in-memory queue) while distinct keys propagate concurrently.
  • Propagation delay enforced by ready_at; the worker sleeps until the next due row rather than polling.
  • Dropped-index race (table deleted mid-flight) handled per index via a SAVEPOINT (SQLSTATE 42P01 → skip, row consumed).
  • Data migrations are tracked in the data DB's schema_history (registry), with safe adoption of pre-tracking deployments.

Why

Closes #125. The in-memory queue lost all pending GSI updates on crash/restart, causing permanent GSI inconsistency with no recovery path.

Note on this branch

Merges latest main. Beyond the original change it addresses review feedback (removes the index-metadata cache in favour of self-describing rows; removes the dead index_name field) and hardens crash safety and per-key ordering, with integration-test coverage.

Testing done

  • cargo fmt --all -- --check, cargo clippy --all-targets -- -D warnings — clean
  • cargo test --workspace — pass
  • New PG integration tests (tests/test_gsi_async_queue.py): crash recovery
    (SIGKILL → recovered after restart), per-key FIFO ordering, dropped-index
    consume; plus data-migration tracking (test_cli_lifecycle.py)
  • Python integration suites — pass;

Checklist

  • I have read CONTRIBUTING.md
  • All tests pass (cargo test --workspace)
  • Code is formatted (cargo fmt --check)
  • Clippy is clean
  • I have added or updated tests for new functionality
  • I have updated documentation if behavior changed
  • Breaking changes are noted below (if any)

Breaking changes

None to the API. 002_gsi_pending.sql adds the gsi_pending table (with a worker_partition column); it is applied via the tracked data migration on init/migrate. Existing deployments gain crash-safe GSI propagation
transparently.

SELECT id FROM gsi_pending \
WHERE ready_at <= NOW() \
ORDER BY id \
LIMIT $1 \

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is LIMIT applied before or after ORDER BY?

@jcshepherd jcshepherd May 27, 2026

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a bit of reading ... It looks like Postgres will sort the entire resultset first, and then apply the limit. So if "ready_at" is far enough in the past on a busy table, the sort could be expensive, though the index on ready_at should help mitigate that. My guess is that BATCH_SIZE is small enough to discourage the planner from believing that a full table scan would be cheaper than an index scan.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After

pk_hash(pk_text.as_ref()),
&key_info.account_id,
&key_info.table_name,
if has_async_indexes(&indexes, sys_delay) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could probably be called once at the beginning (right after indexes is populated).

-- Inserted atomically within the base write transaction, consumed by
-- background workers. Survives process crash/restart.

CREATE TABLE IF NOT EXISTS gsi_pending (

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't recall where it's buried, but somewhere there is a catalog version identifier that should be updated when the metadata/system table schema is updated, so that extenddb migrate will know that there is a migration to perform. I believe it's in storage-postgres somewhere.

@jcshepherd jcshepherd left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple initial questions/comments. Probably the one I'm most concerned with is the evaluation order of LIMIT and ORDER BY. If I were a gambler, I'd wager results are LIMITed before ORDER BY, which may not given you the ordering guarantees you want.

@amrith

amrith commented Jun 8, 2026

Copy link
Copy Markdown
Collaborator

I'm uncomfortable with the caching of index information. In my steering rules, I have a hard veto on anything other than auth data in a cache. this PR violates that and that's something I want to think through very closely. There are also many minor (lesser) nits.

Why are we still populating index_name? Per steering: "#[allow(dead_code)] is a review blocker unless accompanied by a TODO(phase-N) comment." The underscore prefix is the same thing — a quieter way to suppress the warning.

Shouldn't there be migration version tracking?

…ion tracking

Cache (Amrith): replace the index-metadata cache with self-describing
gsi_pending rows. Each row carries an index_context snapshot captured at
enqueue, so workers apply with zero catalog reads. Drops catalog_pool, the
30s TTL cache, and the staleness window it created.

Crash safety: claim + apply + delete now happen in a single transaction
(savepoint per index for the dropped-table race), so a crash or transient
apply error no longer loses a queued GSI update. Also fix is_undefined_table,
which matched the SQLSTATE string the error never actually carried.

Migrations: track data migrations in schema_history via a DATA_MIGRATIONS
registry, with legacy adoption so 001 is never re-run (avoids resetting
stream_seq).

Dead code: remove the _index_name field and unused _attr_defs/_table_id
params (no underscore-suppressed dead code).

Tests: add PG-backed crash-safety (durability + dropped-index consume) and
migration tracking/adoption tests, gated on EXTENDDB_TEST_PG_CONNECTION_STRING.
# Conflicts:
#	crates/storage-postgres/src/data/index.rs
#	crates/storage-postgres/src/gsi_queue.rs
Integration with origin/main plus review-hardening on top of the merge:

- Per-key FIFO ordering: partition gsi_pending by a stable hash of the base
  table key (worker_partition). Each worker owns one partition and drains it in
  id order, so successive updates to one item apply in order — restoring the
  guarantee the old in-memory partitioned queue had and that the persistent
  rewrite had dropped.
- Worker latency: sleep until the next due row in the worker's partition rather
  than a fixed 1s poll.
- Detect the undefined_table race by the full "SQLSTATE 42P01" marker (the
  index helpers embed the SQLSTATE via db_error), avoiding false positives.
- Post-merge cleanups: adopt main's let-chain style in the index/apply/
  transaction paths (clippy::collapsible_if), and drop the database-creating
  unit tests in favour of integration coverage.

Integration tests (PostgreSQL, lifecycle-gated, excluded from --pytest):
- tests/test_gsi_async_queue.py: crash recovery (SIGKILL), per-key ordering,
  and dropped-index savepoint skip.
- test_cli_lifecycle.py: data-migration tracking in schema_history.

Validated: cargo fmt/clippy -D warnings clean, cargo test --workspace,
integration tests, pytest (664 + comprehensive 326), conformance (647/3/35,
the 3 remaining being upstream extenddb-core expression issues).
Move the CLI-lifecycle test helpers and the cli_env fixture out of
test_cli_lifecycle.py into tests/lifecycle_helpers.py, and re-export cli_env via
conftest.py for auto-discovery. The GSI-queue integration tests now reuse the
helpers and the fixture without importing a fixture from a test module.

No behavioral change: cli_lifecycle (13 passed, 1 skipped) and the GSI-queue
integration tests pass unchanged.
@LeeroyHannigan

Copy link
Copy Markdown
Collaborator Author

I'm uncomfortable with the caching of index information. In my steering rules, I have a hard veto on anything other than auth data in a cache. this PR violates that and that's something I want to think through very closely. There are also many minor (lesser) nits.

Why are we still populating index_name? Per steering: "#[allow(dead_code)] is a review blocker unless accompanied by a TODO(phase-N) comment." The underscore prefix is the same thing — a quieter way to suppress the warning.

Shouldn't there be migration version tracking?

Cache removed, there's no metadata cache, and the queue no longer holds a catalog_pool at all. Each gsi_pending row is now self-describing: it carries an index_context snapshot (base key schema, attribute definitions, and the target index definitions) captured at enqueue, so workers apply with zero catalog reads.

That snapshot is the work item's payload, written transactionally with the base write, not a lookup-aside cache, so it doesn't fall under the veto. It's also safe by construction: a GSI's key schema and projection are immutable after creation, so the snapshot can't go stale, and it removes the 30s staleness window the cache had.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] Data Loss / Inconsistency on process exit

3 participants