-
Notifications
You must be signed in to change notification settings - Fork 791
fix(postgres): preserve NUL bytes in internal JSONB tracking state #1798
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,6 +9,12 @@ use utils::db::WriteAction; | |
| const SETUP_METADATA_TABLE_NAME_UNQUALIFIED: &str = "cocoindex_setup_metadata"; | ||
| pub const FLOW_VERSION_RESOURCE_TYPE: &str = "__FlowVersion"; | ||
|
|
||
| type EscapedJson<T> = utils::str_sanitize::ZeroCodeEscapedJson<T>; | ||
|
|
||
| fn escaped_json<T>(value: T) -> EscapedJson<T> { | ||
| utils::str_sanitize::ZeroCodeEscapedJson(value) | ||
| } | ||
|
Comment on lines
+12
to
+16
|
||
|
|
||
| #[derive(sqlx::FromRow, Debug)] | ||
| pub struct SetupMetadataRecord { | ||
| pub flow_name: String, | ||
|
|
@@ -44,9 +50,33 @@ pub async fn read_setup_metadata(pool: &PgPool) -> Result<Option<Vec<SetupMetada | |
| let table_name = get_setup_metadata_table_name()?; | ||
| let query_str = | ||
| format!("SELECT flow_name, resource_type, key, state, staging_changes FROM {table_name}",); | ||
| let metadata = sqlx::query_as(&query_str).fetch_all(&mut *db_conn).await; | ||
| let metadata = sqlx::query_as::< | ||
| _, | ||
| ( | ||
| String, | ||
| String, | ||
| EscapedJson<serde_json::Value>, | ||
| Option<EscapedJson<serde_json::Value>>, | ||
| EscapedJson<Vec<StateChange<serde_json::Value>>>, | ||
| ), | ||
| >(&query_str) | ||
| .fetch_all(&mut *db_conn) | ||
| .await; | ||
| let result = match metadata { | ||
| Ok(metadata) => Some(metadata), | ||
| Ok(metadata) => Some( | ||
| metadata | ||
| .into_iter() | ||
| .map(|(flow_name, resource_type, key, state, staging_changes)| { | ||
| SetupMetadataRecord { | ||
| flow_name, | ||
| resource_type, | ||
| key: key.into_inner(), | ||
| state: state.map(EscapedJson::into_inner), | ||
| staging_changes: sqlx::types::Json(staging_changes.into_inner()), | ||
| } | ||
| }) | ||
| .collect(), | ||
| ), | ||
| Err(err) => { | ||
| let exists: Option<bool> = sqlx::query_scalar(&format!( | ||
| "SELECT EXISTS (SELECT 1 FROM pg_tables WHERE tablename = '{table_name}')" | ||
|
|
@@ -88,12 +118,30 @@ async fn read_metadata_records_for_flow( | |
| let query_str = format!( | ||
| "SELECT flow_name, resource_type, key, state, staging_changes FROM {table_name} WHERE flow_name = $1", | ||
| ); | ||
| let metadata: Vec<SetupMetadataRecord> = sqlx::query_as(&query_str) | ||
| .bind(flow_name) | ||
| .fetch_all(db_executor) | ||
| .await?; | ||
| let metadata = sqlx::query_as::< | ||
| _, | ||
| ( | ||
| String, | ||
| String, | ||
| EscapedJson<serde_json::Value>, | ||
| Option<EscapedJson<serde_json::Value>>, | ||
| EscapedJson<Vec<StateChange<serde_json::Value>>>, | ||
| ), | ||
| >(&query_str) | ||
| .bind(flow_name) | ||
| .fetch_all(db_executor) | ||
| .await?; | ||
| let result = metadata | ||
| .into_iter() | ||
| .map( | ||
| |(flow_name, resource_type, key, state, staging_changes)| SetupMetadataRecord { | ||
| flow_name, | ||
| resource_type, | ||
| key: key.into_inner(), | ||
| state: state.map(EscapedJson::into_inner), | ||
| staging_changes: sqlx::types::Json(staging_changes.into_inner()), | ||
| }, | ||
| ) | ||
| .map(|m| { | ||
| ( | ||
| ResourceTypeKey { | ||
|
|
@@ -116,13 +164,13 @@ async fn read_state( | |
| let query_str = format!( | ||
| "SELECT state FROM {table_name} WHERE flow_name = $1 AND resource_type = $2 AND key = $3", | ||
| ); | ||
| let state: Option<serde_json::Value> = sqlx::query_scalar(&query_str) | ||
| let state: Option<EscapedJson<serde_json::Value>> = sqlx::query_scalar(&query_str) | ||
| .bind(flow_name) | ||
| .bind(&type_id.resource_type) | ||
| .bind(&type_id.key) | ||
| .bind(escaped_json(&type_id.key)) | ||
| .fetch_optional(db_executor) | ||
| .await?; | ||
| Ok(state) | ||
| Ok(state.map(EscapedJson::into_inner)) | ||
| } | ||
|
|
||
| async fn upsert_staging_changes( | ||
|
|
@@ -144,8 +192,8 @@ async fn upsert_staging_changes( | |
| sqlx::query(&query_str) | ||
| .bind(flow_name) | ||
| .bind(&type_id.resource_type) | ||
| .bind(&type_id.key) | ||
| .bind(sqlx::types::Json(staging_changes)) | ||
| .bind(escaped_json(&type_id.key)) | ||
| .bind(escaped_json(staging_changes)) | ||
| .execute(db_executor) | ||
| .await?; | ||
| Ok(()) | ||
|
|
@@ -170,9 +218,9 @@ async fn upsert_state( | |
| sqlx::query(&query_str) | ||
| .bind(flow_name) | ||
| .bind(&type_id.resource_type) | ||
| .bind(&type_id.key) | ||
| .bind(sqlx::types::Json(state)) | ||
| .bind(sqlx::types::Json(Vec::<serde_json::Value>::new())) | ||
| .bind(escaped_json(&type_id.key)) | ||
| .bind(escaped_json(state)) | ||
| .bind(escaped_json(Vec::<serde_json::Value>::new())) | ||
| .execute(db_executor) | ||
| .await?; | ||
| Ok(()) | ||
|
|
@@ -190,7 +238,7 @@ async fn delete_state( | |
| sqlx::query(&query_str) | ||
| .bind(flow_name) | ||
| .bind(&type_id.resource_type) | ||
| .bind(&type_id.key) | ||
| .bind(escaped_json(&type_id.key)) | ||
| .execute(db_executor) | ||
| .await?; | ||
| Ok(()) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EscapedJson/escaped_jsonare duplicated here and insetup/db_metadata.rs. Consider centralizing this helper (or re-exporting a single helper fromutils::str_sanitize) to avoid repeating the pattern and to make future adjustments (prefix/version changes, etc.) less error-prone.