Skip to content

Commit 6ed3c0d

Browse files
authored
fix: Robust and Idempotent Backfill for Search Pipeline Run API (#166)
### TL;DR Refactored database migration logic to ensure data parity by creating system annotations for all pipeline runs, even when source values are null or empty, and moved backfill functions to a dedicated module with comprehensive error handling. ### What changed? - **System annotation mirroring now ensures data parity**: The `_mirror_system_annotations` function always creates annotation rows for `created_by` and `pipeline_name`, storing empty string `""` when source values are null/empty, with warning logs for null cases - **New dedicated migration module**: Created `database_migrate.py` containing three idempotent backfill functions: - `backfill_created_by_annotations`: Uses `COALESCE` to handle null values - `backfill_pipeline_names_from_extra_data`: Extracts from JSON with null filtering - `backfill_pipeline_names_from_component_spec`: Extracts from nested JSON path with anti-join logic - **Robust orchestration**: `run_all_annotation_backfills` wraps all backfills in try-catch with configurable skip guards and single transaction commit - **Enhanced test coverage**: Added 1600+ lines of comprehensive tests covering idempotency, order independence, data parity, error handling, and edge cases - **Updated existing tests**: Modified assertions to expect empty string annotations instead of missing keys ### How to test? Run the existing test suite - the new `test_database_migrate.py` provides extensive coverage including: - Basic backfill functionality for both sources - Idempotency verification (safe to run multiple times) - Order independence between pipeline name sources - Data parity validation (every run gets annotations) - Error handling and transaction rollback scenarios - Truncation behavior for long values ### Why make this change? - **Data consistency**: Ensures every pipeline run has system annotations for reliable filtering/querying, eliminating gaps where some runs lack annotation rows - **Improved maintainability**: Separates migration logic from general database operations with better organization and comprehensive documentation - **Production reliability**: Adds proper error handling so migration failures don't block application startup, with detailed logging for debugging - **Database portability**: Uses SQLAlchemy abstractions for cross-database compatibility (SQLite, MySQL, PostgreSQL)
1 parent ce2c295 commit 6ed3c0d

7 files changed

Lines changed: 2762 additions & 1394 deletions

File tree

cloud_pipelines_backend/api_router.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,16 @@ def setup_routes(
3939
user_details_getter: typing.Callable[..., UserDetails],
4040
container_launcher_for_log_streaming: "launcher_interfaces.ContainerTaskLauncher[launcher_interfaces.LaunchedContainer] | None" = None,
4141
default_component_library_owner_username: str = "admin",
42+
do_skip_backfill: bool = False,
4243
):
4344
def get_session():
4445
with orm.Session(autocommit=False, autoflush=False, bind=db_engine) as session:
4546
yield session
4647

4748
def create_db_and_tables():
48-
database_ops.initialize_and_migrate_db(db_engine=db_engine)
49+
database_ops.initialize_and_migrate_db(
50+
db_engine=db_engine, do_skip_backfill=do_skip_backfill
51+
)
4952

5053
# The default library must be initialized here, not when adding the Component Library routes.
5154
# Otherwise the tables won't yet exist when initialization is performed.

cloud_pipelines_backend/api_server_sql.py

Lines changed: 44 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1331,40 +1331,62 @@ def _mirror_system_annotations(
13311331
created_by: str | None,
13321332
pipeline_name: str | None,
13331333
) -> None:
1334-
"""Mirror pipeline run fields as system annotations for filter_query search"""
1334+
"""Mirror pipeline run fields as system annotations for filter_query search.
1335+
1336+
Always creates an annotation for every run, even when the source value is
1337+
None or empty (stored as ""). This ensures data parity so every run has a
1338+
row for each system key.
1339+
"""
13351340

13361341
# TODO: The original pipeline_run.created_by and the pipeline name stored in
13371342
# extra_data / task_spec are saved untruncated, while the annotation mirror
13381343
# is truncated to VARCHAR(255). This creates a data parity mismatch between
13391344
# the source columns and their annotation copies. Revisit this to either
13401345
# widen the annotation column or enforce the same limit at the source.
13411346

1342-
if created_by:
1343-
created_by = _truncate_for_annotation(
1344-
value=created_by,
1345-
field_name=filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY,
1347+
created_by_value = created_by
1348+
if created_by_value is None:
1349+
created_by_value = ""
1350+
_logger.warning(
1351+
f"Pipeline run id {pipeline_run_id} `created_by` is None, "
1352+
'setting it to empty string "" for data parity'
1353+
)
1354+
1355+
created_by_value = _truncate_for_annotation(
1356+
value=created_by_value,
1357+
field_name=filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY,
1358+
pipeline_run_id=pipeline_run_id,
1359+
)
1360+
1361+
session.add(
1362+
bts.PipelineRunAnnotation(
13461363
pipeline_run_id=pipeline_run_id,
1364+
key=filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY,
1365+
value=created_by_value,
13471366
)
1348-
session.add(
1349-
bts.PipelineRunAnnotation(
1350-
pipeline_run_id=pipeline_run_id,
1351-
key=filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY,
1352-
value=created_by,
1353-
)
1367+
)
1368+
1369+
pipeline_name_value = pipeline_name
1370+
if pipeline_name_value is None:
1371+
pipeline_name_value = ""
1372+
_logger.warning(
1373+
f"Pipeline run id {pipeline_run_id} `pipeline_name` is None, "
1374+
'setting it to empty string "" for data parity'
13541375
)
1355-
if pipeline_name:
1356-
pipeline_name = _truncate_for_annotation(
1357-
value=pipeline_name,
1358-
field_name=filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME,
1376+
1377+
pipeline_name_value = _truncate_for_annotation(
1378+
value=pipeline_name_value,
1379+
field_name=filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME,
1380+
pipeline_run_id=pipeline_run_id,
1381+
)
1382+
1383+
session.add(
1384+
bts.PipelineRunAnnotation(
13591385
pipeline_run_id=pipeline_run_id,
1386+
key=filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME,
1387+
value=pipeline_name_value,
13601388
)
1361-
session.add(
1362-
bts.PipelineRunAnnotation(
1363-
pipeline_run_id=pipeline_run_id,
1364-
key=filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME,
1365-
value=pipeline_name,
1366-
)
1367-
)
1389+
)
13681390

13691391

13701392
def _recursively_create_all_executions_and_artifacts_root(

0 commit comments

Comments
 (0)