Skip to content

Commit 9677c91

Browse files
authored
fix: JBDA import bugs (#1423)
1 parent 6b3ffa9 commit 9677c91

2 files changed

Lines changed: 16 additions & 15 deletions

File tree

functions-python/tasks_executor/src/tasks/data_import/import_jbda_feeds.py

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,7 @@ def _process_feed(
447447
}, None
448448

449449
# Upsert/lookup schedule feed
450-
stable_id = f"jbda-{feed_id}"
450+
stable_id = f"jbda-{org_id}-{feed_id}"
451451
gtfs_feed, is_new_gtfs = _get_or_create_feed(
452452
db_session, Gtfsfeed, stable_id, "gtfs"
453453
)
@@ -503,7 +503,6 @@ def _process_feed(
503503
location=location,
504504
)
505505

506-
feed_to_publish = gtfs_feed if is_new_gtfs else None
507506
return (
508507
{
509508
"created_gtfs": created_gtfs,
@@ -512,7 +511,7 @@ def _process_feed(
512511
"linked_refs": linked_refs,
513512
"processed": 1,
514513
},
515-
feed_to_publish,
514+
gtfs_feed if is_new_gtfs else None,
516515
)
517516

518517

@@ -521,6 +520,7 @@ def _build_api_schedule_fingerprint(
521520
) -> dict:
522521
"""Collect only fields we actually persist on schedule feeds."""
523522
return {
523+
"stable_id": f"jbda-{list_item.get('organization_id')}-{list_item.get('feed_id')}",
524524
"feed_name": detail.get("feed_name"),
525525
"provider": list_item.get("organization_name"),
526526
"producer_url": producer_url,
@@ -536,6 +536,7 @@ def _build_api_schedule_fingerprint(
536536

537537
def _build_db_schedule_fingerprint(feed: Gtfsfeed) -> dict:
538538
return {
539+
"stable_id": getattr(feed, "stable_id", None),
539540
"feed_name": getattr(feed, "feed_name", None),
540541
"provider": getattr(feed, "provider", None),
541542
"producer_url": getattr(feed, "producer_url", None),
@@ -571,9 +572,9 @@ def _import_jbda(db_session: Session, dry_run: bool = True) -> dict:
571572

572573
logger.info(
573574
"Commit batch size (env COMMIT_BATCH_SIZE)=%s",
574-
os.getenv("COMMIT_BATCH_SIZE", "20"),
575+
os.getenv("COMMIT_BATCH_SIZE", "5"),
575576
)
576-
commit_batch_size = int(os.getenv("COMMIT_BATCH_SIZE", 20))
577+
commit_batch_size = int(os.getenv("COMMIT_BATCH_SIZE", 5))
577578

578579
# Aggregates
579580
created_gtfs = updated_gtfs = created_rt = linked_refs = total_processed = 0
@@ -602,9 +603,11 @@ def _import_jbda(db_session: Session, dry_run: bool = True) -> dict:
602603
if not dry_run and (total_processed % commit_batch_size == 0):
603604
logger.info("Committing batch at total_processed=%d", total_processed)
604605
try:
605-
db_session.commit()
606+
commit_changes(db_session, feeds_to_publish, total_processed)
607+
feeds_to_publish = [] # reset after commit
606608
except IntegrityError:
607609
db_session.rollback()
610+
feeds_to_publish = [] # reset even on failure
608611
logger.exception(
609612
"DB IntegrityError during batch commit at processed=%d",
610613
total_processed,
@@ -639,17 +642,15 @@ def commit_changes(
639642
db_session: Session, feeds_to_publish: list[Feed], total_processed: int
640643
):
641644
"""
642-
Final commit + downstream triggers after main loop.
645+
Commit DB changes and trigger dataset downloads for new feeds.
643646
"""
644647
try:
645-
logger.info(
646-
"Final commit after processing all items (count=%d)", total_processed
647-
)
648+
logger.info("Commit after processing items (count=%d)", total_processed)
648649
db_session.commit()
649650
execution_id = str(uuid.uuid4())
650651
publisher = pubsub_v1.PublisherClient()
651652
for feed in feeds_to_publish:
652653
trigger_dataset_download(feed, execution_id, publisher)
653654
except IntegrityError:
654655
db_session.rollback()
655-
logger.exception("Final commit failed with IntegrityError; rolled back")
656+
logger.exception("Commit failed with IntegrityError; rolled back")

functions-python/tasks_executor/tests/tasks/data_import/test_jbda_import.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ def _head_side_effect(url, allow_redirects=True, timeout=15):
258258
# DB checks for GTFS feed
259259
sched = (
260260
db_session.query(Gtfsfeed)
261-
.filter(Gtfsfeed.stable_id == "jbda-feed1")
261+
.filter(Gtfsfeed.stable_id == "jbda-org1-feed1")
262262
.first()
263263
)
264264
self.assertIsNotNone(sched)
@@ -276,12 +276,12 @@ def _head_side_effect(url, allow_redirects=True, timeout=15):
276276
# RT feeds + entity types + back-links
277277
tu = (
278278
db_session.query(Gtfsrealtimefeed)
279-
.filter(Gtfsrealtimefeed.stable_id == "jbda-feed1-tu")
279+
.filter(Gtfsrealtimefeed.stable_id == "jbda-org1-feed1-tu")
280280
.first()
281281
)
282282
vp = (
283283
db_session.query(Gtfsrealtimefeed)
284-
.filter(Gtfsrealtimefeed.stable_id == "jbda-feed1-vp")
284+
.filter(Gtfsrealtimefeed.stable_id == "jbda-org1-feed1-vp")
285285
.first()
286286
)
287287
self.assertIsNotNone(tu)
@@ -301,7 +301,7 @@ def _head_side_effect(url, allow_redirects=True, timeout=15):
301301
self.assertEqual(topic_path, "projects/test-project/topics/dataset-batch")
302302

303303
payload = json.loads(data_bytes.decode("utf-8"))
304-
self.assertEqual(payload["feed_stable_id"], "jbda-feed1")
304+
self.assertEqual(payload["feed_stable_id"], "jbda-org1-feed1")
305305
self.assertEqual(payload["producer_url"], url_current)
306306
self.assertIsNone(payload["dataset_id"])
307307
self.assertIsNone(payload["dataset_hash"])

0 commit comments

Comments
 (0)