Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 38 additions & 32 deletions rust/cocoindex/src/execution/db_tracking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ use sqlx::PgPool;
use std::fmt;
use utils::{db::WriteAction, fingerprint::Fingerprint};

type EscapedJson<T> = utils::str_sanitize::ZeroCodeEscapedJson<T>;

fn escaped_json<T>(value: T) -> EscapedJson<T> {
utils::str_sanitize::ZeroCodeEscapedJson(value)
}

Comment on lines 12 to +19
Copy link

Copilot AI Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EscapedJson/escaped_json are duplicated here and in setup/db_metadata.rs. Consider centralizing this helper (or re-exporting a single helper from utils::str_sanitize) to avoid repeating the pattern and to make future adjustments (prefix/version changes, etc.) less error-prone.

Suggested change
use utils::{db::WriteAction, fingerprint::Fingerprint};
type EscapedJson<T> = utils::str_sanitize::ZeroCodeEscapedJson<T>;
fn escaped_json<T>(value: T) -> EscapedJson<T> {
utils::str_sanitize::ZeroCodeEscapedJson(value)
}
use utils::{
db::WriteAction,
fingerprint::Fingerprint,
str_sanitize::{EscapedJson, escaped_json},
};

Copilot uses AI. Check for mistakes.
////////////////////////////////////////////////////////////
// Access for the row tracking table
////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -86,7 +92,7 @@ pub type TrackedTargetKeyForSource = Vec<(i32, Vec<TrackedTargetKeyInfo>)>;

#[derive(sqlx::FromRow, Debug)]
pub struct SourceTrackingInfoForProcessing {
pub memoization_info: Option<sqlx::types::Json<Option<StoredMemoizationInfo>>>,
pub memoization_info: Option<EscapedJson<Option<StoredMemoizationInfo>>>,

pub processed_source_ordinal: Option<i64>,
pub processed_source_fp: Option<Vec<u8>>,
Expand All @@ -113,7 +119,7 @@ pub async fn read_source_tracking_info_for_processing(
);
let tracking_info = sqlx::query_as(&query_str)
.bind(source_id)
.bind(source_key_json)
.bind(escaped_json(source_key_json))
.fetch_optional(pool)
.await?;

Expand All @@ -123,13 +129,13 @@ pub async fn read_source_tracking_info_for_processing(
#[derive(sqlx::FromRow, Debug)]
pub struct SourceTrackingInfoForPrecommit {
pub max_process_ordinal: i64,
pub staging_target_keys: sqlx::types::Json<TrackedTargetKeyForSource>,
pub staging_target_keys: EscapedJson<TrackedTargetKeyForSource>,

pub processed_source_ordinal: Option<i64>,
pub processed_source_fp: Option<Vec<u8>>,
pub process_logic_fingerprint: Option<Vec<u8>>,
pub process_ordinal: Option<i64>,
pub target_keys: Option<sqlx::types::Json<TrackedTargetKeyForSource>>,
pub target_keys: Option<EscapedJson<TrackedTargetKeyForSource>>,
}

pub async fn read_source_tracking_info_for_precommit(
Expand All @@ -150,7 +156,7 @@ pub async fn read_source_tracking_info_for_precommit(
);
let precommit_tracking_info = sqlx::query_as(&query_str)
.bind(source_id)
.bind(source_key_json)
.bind(escaped_json(source_key_json))
.fetch_optional(db_executor)
.await?;

Expand Down Expand Up @@ -181,10 +187,10 @@ pub async fn precommit_source_tracking_info(
};
sqlx::query(&query_str)
.bind(source_id) // $1
.bind(source_key_json) // $2
.bind(escaped_json(source_key_json)) // $2
.bind(max_process_ordinal) // $3
.bind(sqlx::types::Json(staging_target_keys)) // $4
.bind(memoization_info.map(sqlx::types::Json)) // $5
.bind(escaped_json(staging_target_keys)) // $4
.bind(memoization_info.map(escaped_json)) // $5
.execute(db_executor)
.await?;
Ok(())
Expand All @@ -207,17 +213,17 @@ pub async fn touch_max_process_ordinal(
);
sqlx::query(&query_str)
.bind(source_id)
.bind(source_key_json)
.bind(escaped_json(source_key_json))
.bind(process_ordinal)
.bind(sqlx::types::Json(TrackedTargetKeyForSource::default()))
.bind(escaped_json(TrackedTargetKeyForSource::default()))
.execute(db_executor)
.await?;
Ok(())
}

#[derive(sqlx::FromRow, Debug)]
pub struct SourceTrackingInfoForCommit {
pub staging_target_keys: sqlx::types::Json<TrackedTargetKeyForSource>,
pub staging_target_keys: EscapedJson<TrackedTargetKeyForSource>,
pub process_ordinal: Option<i64>,
}

Expand All @@ -234,7 +240,7 @@ pub async fn read_source_tracking_info_for_commit(
);
let commit_tracking_info = sqlx::query_as(&query_str)
.bind(source_id)
.bind(source_key_json)
.bind(escaped_json(source_key_json))
.fetch_optional(db_executor)
.await?;
Ok(commit_tracking_info)
Expand Down Expand Up @@ -287,13 +293,13 @@ pub async fn commit_source_tracking_info(
};
let mut query = sqlx::query(&query_str)
.bind(source_id) // $1
.bind(source_key_json) // $2
.bind(sqlx::types::Json(staging_target_keys)) // $3
.bind(escaped_json(source_key_json)) // $2
.bind(escaped_json(staging_target_keys)) // $3
.bind(processed_source_ordinal) // $4
.bind(logic_fingerprint) // $5
.bind(process_ordinal) // $6
.bind(process_time_micros) // $7
.bind(sqlx::types::Json(target_keys)); // $8
.bind(escaped_json(target_keys)); // $8

if db_setup.has_fast_fingerprint_column {
query = query.bind(processed_source_fp); // $9
Expand All @@ -316,7 +322,7 @@ pub async fn delete_source_tracking_info(
);
sqlx::query(&query_str)
.bind(source_id)
.bind(source_key_json)
.bind(escaped_json(source_key_json))
.execute(db_executor)
.await?;
Ok(())
Expand Down Expand Up @@ -344,8 +350,8 @@ pub async fn read_tracking_entries_for_sources(

let rows: Vec<(
i32,
serde_json::Value,
Option<sqlx::types::Json<TrackedTargetKeyForSource>>,
EscapedJson<serde_json::Value>,
Option<EscapedJson<TrackedTargetKeyForSource>>,
)> = sqlx::query_as(&query_str)
.bind(source_ids)
.fetch_all(pool)
Expand All @@ -356,8 +362,8 @@ pub async fn read_tracking_entries_for_sources(
.map(
|(source_id, source_key, target_keys_json)| SourceTrackingEntryForCleanup {
source_id,
source_key,
target_keys: target_keys_json.map(|j| j.0),
source_key: source_key.into_inner(),
target_keys: target_keys_json.map(EscapedJson::into_inner),
},
)
.collect())
Expand All @@ -378,8 +384,8 @@ pub fn read_tracking_entries_for_sources_stream(

let mut rows = sqlx::query_as::<_, (
i32,
serde_json::Value,
Option<sqlx::types::Json<TrackedTargetKeyForSource>>,
EscapedJson<serde_json::Value>,
Option<EscapedJson<TrackedTargetKeyForSource>>,
)>(&query_str)
.bind(&source_ids)
.fetch(&pool);
Expand All @@ -388,8 +394,8 @@ pub fn read_tracking_entries_for_sources_stream(
let (source_id, source_key, target_keys_json) = row;
yield SourceTrackingEntryForCleanup {
source_id,
source_key,
target_keys: target_keys_json.map(|j| j.0),
source_key: source_key.into_inner(),
target_keys: target_keys_json.map(EscapedJson::into_inner),
};
}
}
Expand All @@ -412,7 +418,7 @@ pub async fn delete_tracking_entries_for_sources(

#[derive(sqlx::FromRow, Debug)]
pub struct TrackedSourceKeyMetadata {
pub source_key: serde_json::Value,
pub source_key: EscapedJson<serde_json::Value>,
pub processed_source_ordinal: Option<i64>,
pub processed_source_fp: Option<Vec<u8>>,
pub process_logic_fingerprint: Option<Vec<u8>>,
Expand Down Expand Up @@ -474,7 +480,7 @@ pub async fn read_source_last_processed_info(
);
let last_processed_info = sqlx::query_as(&query_str)
.bind(source_id)
.bind(source_key_json)
.bind(escaped_json(source_key_json))
.fetch_optional(pool)
.await?;
Ok(last_processed_info)
Expand All @@ -494,7 +500,7 @@ pub async fn update_source_tracking_ordinal(
);
sqlx::query(&query_str)
.bind(source_id) // $1
.bind(source_key_json) // $2
.bind(escaped_json(source_key_json)) // $2
.bind(processed_source_ordinal) // $3
.execute(db_executor)
.await?;
Expand All @@ -521,12 +527,12 @@ pub async fn read_source_state(
"SELECT value FROM {} WHERE source_id = $1 AND key = $2",
table_name
);
let state: Option<serde_json::Value> = sqlx::query_scalar(&query_str)
let state: Option<EscapedJson<serde_json::Value>> = sqlx::query_scalar(&query_str)
.bind(source_id)
.bind(source_key_json)
.bind(escaped_json(source_key_json))
.fetch_optional(db_executor)
.await?;
Ok(state)
Ok(state.map(EscapedJson::into_inner))
}

#[allow(dead_code)]
Expand All @@ -549,8 +555,8 @@ pub async fn upsert_source_state(
);
sqlx::query(&query_str)
.bind(source_id)
.bind(source_key_json)
.bind(sqlx::types::Json(state))
.bind(escaped_json(source_key_json))
.bind(escaped_json(state))
.execute(db_executor)
.await?;
Ok(())
Expand Down
7 changes: 4 additions & 3 deletions rust/cocoindex/src/execution/row_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ impl<'a> RowIndexer<'a> {

// Collect from existing tracking info.
if let Some(info) = tracking_info {
let sqlx::types::Json(staging_target_keys) = info.staging_target_keys;
let staging_target_keys = info.staging_target_keys.into_inner();
for (target_id, keys_info) in staging_target_keys.into_iter() {
let target_info = tracking_info_for_targets.entry(target_id).or_default();
for key_info in keys_info.into_iter() {
Expand All @@ -604,7 +604,8 @@ impl<'a> RowIndexer<'a> {
}
}

if let Some(sqlx::types::Json(target_keys)) = info.target_keys {
if let Some(target_keys) = info.target_keys.map(|target_keys| target_keys.into_inner())
{
for (target_id, keys_info) in target_keys.into_iter() {
let target_info = tracking_info_for_targets.entry(target_id).or_default();
for key_info in keys_info.into_iter() {
Expand Down Expand Up @@ -802,7 +803,7 @@ impl<'a> RowIndexer<'a> {

let cleaned_staging_target_keys = tracking_info
.map(|info| {
let sqlx::types::Json(staging_target_keys) = info.staging_target_keys;
let staging_target_keys = info.staging_target_keys.into_inner();
staging_target_keys
.into_iter()
.filter_map(|(target_id, target_keys)| {
Expand Down
2 changes: 1 addition & 1 deletion rust/cocoindex/src/execution/source_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ impl SourceIndexingContext {
while let Some(key_metadata) = key_metadata_stream.next().await {
let key_metadata = key_metadata?;
let source_pk = value::KeyValue::from_json(
key_metadata.source_key,
key_metadata.source_key.into_inner(),
&import_op.primary_key_schema,
)?;
if let Some(rows_to_retry) = &mut rows_to_retry {
Expand Down
78 changes: 63 additions & 15 deletions rust/cocoindex/src/setup/db_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ use utils::db::WriteAction;
const SETUP_METADATA_TABLE_NAME_UNQUALIFIED: &str = "cocoindex_setup_metadata";
pub const FLOW_VERSION_RESOURCE_TYPE: &str = "__FlowVersion";

type EscapedJson<T> = utils::str_sanitize::ZeroCodeEscapedJson<T>;

fn escaped_json<T>(value: T) -> EscapedJson<T> {
utils::str_sanitize::ZeroCodeEscapedJson(value)
}
Comment on lines +12 to +16
Copy link

Copilot AI Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EscapedJson/escaped_json are duplicated here and in execution/db_tracking.rs. To keep the escaping behavior consistent and avoid drift, consider centralizing this helper (e.g., a shared module in utils::str_sanitize or a small db_json helper) rather than duplicating it per file.

Copilot uses AI. Check for mistakes.

#[derive(sqlx::FromRow, Debug)]
pub struct SetupMetadataRecord {
pub flow_name: String,
Expand Down Expand Up @@ -44,9 +50,33 @@ pub async fn read_setup_metadata(pool: &PgPool) -> Result<Option<Vec<SetupMetada
let table_name = get_setup_metadata_table_name()?;
let query_str =
format!("SELECT flow_name, resource_type, key, state, staging_changes FROM {table_name}",);
let metadata = sqlx::query_as(&query_str).fetch_all(&mut *db_conn).await;
let metadata = sqlx::query_as::<
_,
(
String,
String,
EscapedJson<serde_json::Value>,
Option<EscapedJson<serde_json::Value>>,
EscapedJson<Vec<StateChange<serde_json::Value>>>,
),
>(&query_str)
.fetch_all(&mut *db_conn)
.await;
let result = match metadata {
Ok(metadata) => Some(metadata),
Ok(metadata) => Some(
metadata
.into_iter()
.map(|(flow_name, resource_type, key, state, staging_changes)| {
SetupMetadataRecord {
flow_name,
resource_type,
key: key.into_inner(),
state: state.map(EscapedJson::into_inner),
staging_changes: sqlx::types::Json(staging_changes.into_inner()),
}
})
.collect(),
),
Err(err) => {
let exists: Option<bool> = sqlx::query_scalar(&format!(
"SELECT EXISTS (SELECT 1 FROM pg_tables WHERE tablename = '{table_name}')"
Expand Down Expand Up @@ -88,12 +118,30 @@ async fn read_metadata_records_for_flow(
let query_str = format!(
"SELECT flow_name, resource_type, key, state, staging_changes FROM {table_name} WHERE flow_name = $1",
);
let metadata: Vec<SetupMetadataRecord> = sqlx::query_as(&query_str)
.bind(flow_name)
.fetch_all(db_executor)
.await?;
let metadata = sqlx::query_as::<
_,
(
String,
String,
EscapedJson<serde_json::Value>,
Option<EscapedJson<serde_json::Value>>,
EscapedJson<Vec<StateChange<serde_json::Value>>>,
),
>(&query_str)
.bind(flow_name)
.fetch_all(db_executor)
.await?;
let result = metadata
.into_iter()
.map(
|(flow_name, resource_type, key, state, staging_changes)| SetupMetadataRecord {
flow_name,
resource_type,
key: key.into_inner(),
state: state.map(EscapedJson::into_inner),
staging_changes: sqlx::types::Json(staging_changes.into_inner()),
},
)
.map(|m| {
(
ResourceTypeKey {
Expand All @@ -116,13 +164,13 @@ async fn read_state(
let query_str = format!(
"SELECT state FROM {table_name} WHERE flow_name = $1 AND resource_type = $2 AND key = $3",
);
let state: Option<serde_json::Value> = sqlx::query_scalar(&query_str)
let state: Option<EscapedJson<serde_json::Value>> = sqlx::query_scalar(&query_str)
.bind(flow_name)
.bind(&type_id.resource_type)
.bind(&type_id.key)
.bind(escaped_json(&type_id.key))
.fetch_optional(db_executor)
.await?;
Ok(state)
Ok(state.map(EscapedJson::into_inner))
}

async fn upsert_staging_changes(
Expand All @@ -144,8 +192,8 @@ async fn upsert_staging_changes(
sqlx::query(&query_str)
.bind(flow_name)
.bind(&type_id.resource_type)
.bind(&type_id.key)
.bind(sqlx::types::Json(staging_changes))
.bind(escaped_json(&type_id.key))
.bind(escaped_json(staging_changes))
.execute(db_executor)
.await?;
Ok(())
Expand All @@ -170,9 +218,9 @@ async fn upsert_state(
sqlx::query(&query_str)
.bind(flow_name)
.bind(&type_id.resource_type)
.bind(&type_id.key)
.bind(sqlx::types::Json(state))
.bind(sqlx::types::Json(Vec::<serde_json::Value>::new()))
.bind(escaped_json(&type_id.key))
.bind(escaped_json(state))
.bind(escaped_json(Vec::<serde_json::Value>::new()))
.execute(db_executor)
.await?;
Ok(())
Expand All @@ -190,7 +238,7 @@ async fn delete_state(
sqlx::query(&query_str)
.bind(flow_name)
.bind(&type_id.resource_type)
.bind(&type_id.key)
.bind(escaped_json(&type_id.key))
.execute(db_executor)
.await?;
Ok(())
Expand Down
Loading
Loading