Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions crates/storage-postgres/data_migrations/002_gsi_pending.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
-- Copyright 2026 ExtendDB contributors
-- SPDX-License-Identifier: Apache-2.0
-- Persistent queue for async GSI propagation.
--
-- Rows are inserted atomically within the base write transaction and consumed
-- by background workers in a single claim+apply+delete transaction, so an
-- uncommitted claim rolls back and the row is reprocessed after a crash.
--
-- Each row is self-describing: `index_context` carries the base key schema,
-- attribute definitions, and the target index definitions captured at enqueue
-- time, so workers apply updates with zero catalog reads. A GSI's key schema
-- and projection are immutable after creation, so the snapshot can never be
-- stale relative to the live index definition.
--
-- `worker_partition` is a stable hash of the base table key. All updates to a
-- given base item share a partition and are consumed by a single worker in `id`
-- order, preserving per-key FIFO ordering (so successive updates to the same
-- item never race in the index and converge to the latest state).

CREATE TABLE IF NOT EXISTS gsi_pending (

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't recall where it's buried, but somewhere there is a catalog version identifier that should be updated when the metadata/system table schema is updated, so that extenddb migrate will know that there is a migration to perform. I believe it's in storage-postgres somewhere.

id BIGSERIAL PRIMARY KEY,
table_id TEXT NOT NULL,
worker_partition INTEGER NOT NULL,
old_item JSONB,
new_item JSONB,
index_context JSONB NOT NULL,
ready_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- Serves the worker claim: WHERE worker_partition = $1 AND ready_at <= NOW()
-- ORDER BY id. The leading partition column also keeps each worker's scan
-- confined to its own rows.
CREATE INDEX IF NOT EXISTS idx_gsi_pending_claim
ON gsi_pending (worker_partition, ready_at, id);
65 changes: 36 additions & 29 deletions crates/storage-postgres/src/data/delete_item.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ use extenddb_storage::StreamCapture;
use extenddb_storage::error::StorageError;
use extenddb_storage::util::{SortKeyValue, parse_sk, pk_to_text, sk_column, sk_info};

use super::index::{enqueue_async_indexes, fetch_indexes_for_table, pk_hash, sync_indexes};
use super::index::{async_index_context, fetch_indexes_for_table, has_async_indexes, sync_indexes};
use super::query::check_condition;
use super::tx_helpers::write_stream_record_in_tx;
use super::{data_table_name, json_to_item};
use crate::PostgresEngine;
use crate::gsi_queue::enqueue_gsi_pending;

impl PostgresEngine {
/// Implementation of `DataEngine::delete_item`.
Expand Down Expand Up @@ -126,7 +127,6 @@ impl PostgresEngine {
.transpose()?;
sync_indexes(
&mut tx,
&key_info.table_id,
&key_info.key_schema,
&key_info.attribute_definitions,
&indexes,
Expand Down Expand Up @@ -155,26 +155,30 @@ impl PostgresEngine {
)
.await?;
}
tx.commit()
.await
.map_err(|e| StorageError::Internal(e.to_string()))?;

// Enqueue async GSI updates after commit (D-4).
if let Some(ref q) = self.gsi_queue {
enqueue_async_indexes(
q,
pk_hash(pk_text.as_ref()),
&key_info.account_id,
&key_info.table_name,
&key_info.table_id,
if has_async_indexes(&indexes, sys_delay) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could probably be called once at the beginning (right after indexes is populated).

let gsi_ctx = async_index_context(
&key_info.key_schema,
&key_info.attribute_definitions,
&indexes,
sys_delay,
);
enqueue_gsi_pending(
&mut tx,
&key_info.table_id,
old_item_for_idx.as_ref(),
None,
sys_delay,
&gsi_ctx,
)
.await;
.await?;
}

tx.commit()
.await
.map_err(|e| StorageError::Internal(e.to_string()))?;

if let Some(ref q) = self.gsi_queue {
q.notify_workers();
}

if return_old {
Expand Down Expand Up @@ -264,7 +268,6 @@ impl PostgresEngine {
.transpose()?;
sync_indexes(
&mut tx,
&key_info.table_id,
&key_info.key_schema,
&key_info.attribute_definitions,
&indexes,
Expand Down Expand Up @@ -293,26 +296,30 @@ impl PostgresEngine {
)
.await?;
}
tx.commit()
.await
.map_err(|e| StorageError::Internal(e.to_string()))?;

// Enqueue async GSI updates after commit (D-4).
if let Some(ref q) = self.gsi_queue {
enqueue_async_indexes(
q,
pk_hash(pk_text.as_ref()),
&key_info.account_id,
&key_info.table_name,
&key_info.table_id,
if has_async_indexes(&indexes, sys_delay) {
let gsi_ctx = async_index_context(
&key_info.key_schema,
&key_info.attribute_definitions,
&indexes,
sys_delay,
);
enqueue_gsi_pending(
&mut tx,
&key_info.table_id,
old_item_for_idx.as_ref(),
None,
sys_delay,
&gsi_ctx,
)
.await;
.await?;
}

tx.commit()
.await
.map_err(|e| StorageError::Internal(e.to_string()))?;

if let Some(ref q) = self.gsi_queue {
q.notify_workers();
}

if return_old {
Expand Down
113 changes: 49 additions & 64 deletions crates/storage-postgres/src/data/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,21 @@ use extenddb_storage::util::SortKeyValue;
use extenddb_storage::util::{composite_pk_to_text, parse_sk, sk_column, sk_column_n};

use super::{all_sort_key_info, index_table_name};
use crate::gsi_queue::{GsiApplyContext, GsiIndexDef};

/// Map a `sqlx` error to `StorageError`, preserving the SQLSTATE code in the
/// message. The async GSI worker keys off the code (e.g. `42P01`,
/// undefined_table) to tell a dropped-index race apart from a real failure;
/// `sqlx`'s `Display` only carries the human text, so the code must be kept.
fn db_error(e: sqlx::Error) -> StorageError {
match e.as_database_error().and_then(|d| d.code()) {
Some(code) => StorageError::Internal(format!("SQLSTATE {code}: {e}")),
None => StorageError::Internal(e.to_string()),
}
}

/// Metadata for a single index, used during write-path GSI/LSI sync.
pub(crate) struct IndexMeta {
pub(super) index_name: String,
pub(super) index_id: String,
pub(super) index_type: String,
pub(super) key_schema: Vec<KeySchemaElement>,
Expand All @@ -33,22 +44,21 @@ pub(crate) async fn fetch_indexes_for_table(
table_id: &str,
pool: &sqlx::PgPool,
) -> Result<Vec<IndexMeta>, StorageError> {
let rows: Vec<(String, String, String, serde_json::Value, serde_json::Value, Option<i32>)> = sqlx::query_as(
"SELECT index_name, index_id, index_type, key_schema, projection, propagation_delay_ms FROM indexes WHERE table_id = $1",
let rows: Vec<(String, String, serde_json::Value, serde_json::Value, Option<i32>)> = sqlx::query_as(
"SELECT index_id, index_type, key_schema, projection, propagation_delay_ms FROM indexes WHERE table_id = $1",
)
.bind(table_id)
.fetch_all(pool)
.await
.map_err(|e| StorageError::Internal(e.to_string()))?;

rows.into_iter()
.map(|(name, id, idx_type, ks_json, proj_json, delay)| {
.map(|(id, idx_type, ks_json, proj_json, delay)| {
let key_schema: Vec<KeySchemaElement> = serde_json::from_value(ks_json)
.map_err(|e| StorageError::Internal(e.to_string()))?;
let projection: Projection = serde_json::from_value(proj_json)
.map_err(|e| StorageError::Internal(e.to_string()))?;
Ok(IndexMeta {
index_name: name,
index_id: id,
index_type: idx_type,
key_schema,
Expand Down Expand Up @@ -124,11 +134,10 @@ pub(super) fn effective_delay(idx: &IndexMeta, system_default: u64) -> u64 {
///
/// Called within the same PG transaction as the base table write.
/// Only processes indexes where `effective_delay == 0`. Async indexes are
/// handled by `enqueue_async_indexes` after the transaction commits.
/// handled by the persistent `gsi_pending` queue after the transaction commits.
#[allow(clippy::too_many_arguments)]
pub(crate) async fn sync_indexes(
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
_table_id: &str,
base_key_schema: &[KeySchemaElement],
attr_defs: &[AttributeDefinition],
indexes: &[IndexMeta],
Expand All @@ -148,8 +157,7 @@ pub(crate) async fn sync_indexes(
if let Some(old) = old_item
&& item_has_index_keys(old, &idx.key_schema)
{
delete_index_row_multi(tx, &idx_table, old, base_key_schema, attr_defs, &base_sks)
.await?;
delete_index_row_multi(tx, &idx_table, old, base_key_schema, &base_sks).await?;
}

// Insert new index row if the new item has index keys
Expand All @@ -165,7 +173,6 @@ pub(crate) async fn sync_indexes(
&projected,
&idx.key_schema,
base_key_schema,
attr_defs,
&idx_sks,
&base_sks,
)
Expand All @@ -175,65 +182,50 @@ pub(crate) async fn sync_indexes(
Ok(())
}

/// Enqueue async GSI updates for indexes with non-zero propagation delay.
/// Check whether any indexes require async propagation.
///
/// Called after the base table transaction commits. The queue workers apply
/// the index updates after a random delay within the configured range.
#[allow(clippy::too_many_arguments)]
pub(crate) async fn enqueue_async_indexes(
gsi_queue: &crate::gsi_queue::GsiQueue,
pk_hash: u64,
account_id: &str,
table_name: &str,
table_id: &str,
/// Returns `true` if at least one GSI has a non-zero effective delay,
/// meaning the write should insert a row into `gsi_pending`.
pub(crate) fn has_async_indexes(indexes: &[IndexMeta], system_default_delay: u64) -> bool {
indexes
.iter()
.any(|idx| idx.index_type != "LSI" && effective_delay(idx, system_default_delay) != 0)
}

/// Build the self-describing apply context for a write's async GSIs.
///
/// Captures the base key schema, attribute definitions, and the definition of
/// each GSI that propagates asynchronously, so the queue worker can apply the
/// update without reading the catalog. Call only when [`has_async_indexes`]
/// returned `true`; the returned context then has at least one index.
pub(crate) fn async_index_context(
base_key_schema: &[KeySchemaElement],
attr_defs: &[AttributeDefinition],
indexes: &[IndexMeta],
old_item: Option<&Item>,
new_item: Option<&Item>,
system_default_delay: u64,
) {
for idx in indexes {
let delay = effective_delay(idx, system_default_delay);
if delay == 0 {
continue; // Sync — already handled in transaction.
}
if idx.index_type == "LSI" {
continue; // LSIs are always synchronous — already handled in transaction.
}
gsi_queue
.enqueue(
pk_hash,
account_id,
table_name,
table_id,
base_key_schema,
attr_defs,
&idx.index_name,
&idx.index_id,
&idx.key_schema,
&idx.projection,
old_item,
new_item,
delay,
)
.await;
) -> GsiApplyContext {
let index_defs = indexes
.iter()
.filter(|idx| idx.index_type != "LSI" && effective_delay(idx, system_default_delay) != 0)
.map(|idx| GsiIndexDef {
index_id: idx.index_id.clone(),
key_schema: idx.key_schema.clone(),
projection: idx.projection.clone(),
})
.collect();
GsiApplyContext {
base_key_schema: base_key_schema.to_vec(),
attribute_definitions: attr_defs.to_vec(),
indexes: index_defs,
}
}

/// Compute a hash of the partition key text for queue partitioning.
/// Uses crc32 for stability across Rust versions (DefaultHasher is not stable).
pub(crate) fn pk_hash(pk_text: &str) -> u64 {
u64::from(crc32fast::hash(pk_text.as_bytes()))
}

/// Delete a row from an index table using base table key columns.
pub(crate) async fn delete_index_row_multi(
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
idx_table: &str,
item: &Item,
base_ks: &[KeySchemaElement],
_attr_defs: &[AttributeDefinition],
base_sks: &[(&str, ScalarAttributeType)],
) -> Result<(), StorageError> {
let base_pk_text = composite_pk_to_text(item, base_ks)?;
Expand Down Expand Up @@ -266,10 +258,7 @@ pub(crate) async fn delete_index_row_multi(
}
}

query
.execute(&mut **tx)
.await
.map_err(|e| StorageError::Internal(e.to_string()))?;
query.execute(&mut **tx).await.map_err(db_error)?;
Ok(())
}

Expand All @@ -282,7 +271,6 @@ pub(crate) async fn insert_index_row_multi(
projected: &Item,
index_ks: &[KeySchemaElement],
base_ks: &[KeySchemaElement],
_attr_defs: &[AttributeDefinition],
idx_sks: &[(&str, ScalarAttributeType)],
base_sks: &[(&str, ScalarAttributeType)],
) -> Result<(), StorageError> {
Expand Down Expand Up @@ -347,9 +335,6 @@ pub(crate) async fn insert_index_row_multi(
// Bind item_data
query = query.bind(item_json);

query
.execute(&mut **tx)
.await
.map_err(|e| StorageError::Internal(e.to_string()))?;
query.execute(&mut **tx).await.map_err(db_error)?;
Ok(())
}
Loading
Loading