Skip to content

Commit 847e2f3

Browse files
authored
fix: align process_data timestamps to UTC to match ingest_data cutoff filter (#927)
process_data used naive datetime.now() for creation_timestamp while ingest_data uses datetime.now(timezone.utc) for the look_back_days cutoff. On non-UTC containers, chunks could fall outside the window.
1 parent eb55be1 commit 847e2f3

1 file changed

Lines changed: 4 additions & 4 deletions

File tree

  • agent_starter_pack/agents/agentic_rag/data_ingestion/data_ingestion_pipeline/components

agent_starter_pack/agents/agentic_rag/data_ingestion/data_ingestion_pipeline/components/process_data.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ def process_data(
6262
location: BigQuery location
6363
"""
6464
import logging
65-
from datetime import datetime, timedelta
65+
from datetime import datetime, timedelta, timezone
6666

6767
import bigframes.pandas as bpd
6868
import swifter
@@ -89,7 +89,7 @@ def process_data(
8989
logging.warning(
9090
"Pipeline schedule not set. Setting schedule_time to current date."
9191
)
92-
schedule_time_dt = datetime.now()
92+
schedule_time_dt = datetime.now(timezone.utc)
9393

9494
# Note: The following line sets the schedule time 5 years back to allow sample data to be present.
9595
# For your use case, please comment out the following line to use the actual schedule time.
@@ -215,7 +215,7 @@ def create_table_if_not_exist(
215215
# This allows create-before-delete ingestion: new chunks never collide
216216
# with old ones, so we can safely create first, then delete stale data.
217217
logging.info("Creating chunk IDs and exploding chunks into rows...")
218-
run_ts = datetime.now().strftime("%Y%m%d%H%M%S")
218+
run_ts = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S")
219219
chunk_ids = [
220220
str(idx) for text_chunk in df["text_chunk"] for idx in range(len(text_chunk))
221221
]
@@ -226,7 +226,7 @@ def create_table_if_not_exist(
226226
logging.info("Chunk IDs created and chunks exploded.")
227227

228228
# No embedding generation needed — Vector Search 2.0 auto-generates embeddings
229-
df = df.assign(creation_timestamp=datetime.now())
229+
df = df.assign(creation_timestamp=datetime.now(timezone.utc))
230230

231231
# Store results in BigQuery
232232
PARTITION_DATE_COLUMN = "creation_timestamp"

0 commit comments

Comments
 (0)