Skip to content

Commit cef2844

Browse files
committed
refactor: Move code to appropriate folders for CODEOWNERS
1 parent c4b56e5 commit cef2844

14 files changed

Lines changed: 964 additions & 746 deletions

File tree

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
import logging
2+
3+
from sqlalchemy import orm
4+
5+
from .. import backend_types_sql as bts
6+
from .. import errors
7+
from ..search import filter_query_sql
8+
9+
_logger = logging.getLogger(__name__)
10+
11+
_SYSTEM_KEY_RESERVED_MSG = (
12+
"Annotation keys starting with "
13+
f"{filter_query_sql.SYSTEM_KEY_PREFIX!r} are reserved for system use."
14+
)
15+
16+
17+
def fail_if_changing_system_annotation(*, key: str) -> None:
18+
if key.startswith(filter_query_sql.SYSTEM_KEY_PREFIX):
19+
raise errors.ApiValidationError(_SYSTEM_KEY_RESERVED_MSG)
20+
21+
22+
def _truncate_for_annotation(
23+
*,
24+
value: str,
25+
field_name: str,
26+
pipeline_run_id: bts.IdType,
27+
) -> str:
28+
"""Truncate a string to fit the annotation VARCHAR column.
29+
30+
Returns the value unchanged if it fits within _STR_MAX_LENGTH,
31+
otherwise truncates and logs a warning with the run ID and
32+
original length.
33+
"""
34+
max_len = bts._STR_MAX_LENGTH
35+
if len(value) <= max_len:
36+
return value
37+
38+
_logger.warning(
39+
f"Truncating {field_name} annotation for run {pipeline_run_id}: "
40+
f"{len(value)} chars -> {max_len} chars"
41+
)
42+
return value[:max_len]
43+
44+
45+
def mirror_system_annotations(
46+
*,
47+
session: orm.Session,
48+
pipeline_run_id: bts.IdType,
49+
created_by: str | None,
50+
pipeline_name: str | None,
51+
) -> None:
52+
"""Mirror pipeline run fields as system annotations for filter_query search.
53+
54+
Always creates an annotation for every run, even when the source value is
55+
None or empty (stored as ""). This ensures data parity so every run has a
56+
row for each system key.
57+
"""
58+
59+
# TODO: The original pipeline_run.created_by and the pipeline name stored in
60+
# extra_data / task_spec are saved untruncated, while the annotation mirror
61+
# is truncated to VARCHAR(255). This creates a data parity mismatch between
62+
# the source columns and their annotation copies. Revisit this to either
63+
# widen the annotation column or enforce the same limit at the source.
64+
65+
created_by_value = created_by
66+
if created_by_value is None:
67+
created_by_value = ""
68+
_logger.warning(
69+
f"Pipeline run id {pipeline_run_id} `created_by` is None, "
70+
'setting it to empty string "" for data parity'
71+
)
72+
73+
created_by_value = _truncate_for_annotation(
74+
value=created_by_value,
75+
field_name=filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY,
76+
pipeline_run_id=pipeline_run_id,
77+
)
78+
79+
session.add(
80+
bts.PipelineRunAnnotation(
81+
pipeline_run_id=pipeline_run_id,
82+
key=filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY,
83+
value=created_by_value,
84+
)
85+
)
86+
87+
pipeline_name_value = pipeline_name
88+
if pipeline_name_value is None:
89+
pipeline_name_value = ""
90+
_logger.warning(
91+
f"Pipeline run id {pipeline_run_id} `pipeline_name` is None, "
92+
'setting it to empty string "" for data parity'
93+
)
94+
95+
pipeline_name_value = _truncate_for_annotation(
96+
value=pipeline_name_value,
97+
field_name=filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME,
98+
pipeline_run_id=pipeline_run_id,
99+
)
100+
101+
session.add(
102+
bts.PipelineRunAnnotation(
103+
pipeline_run_id=pipeline_run_id,
104+
key=filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME,
105+
value=pipeline_name_value,
106+
)
107+
)

0 commit comments

Comments
 (0)