Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
f442d41
Preliminary code.
jcpitre Jun 8, 2026
fdafd7b
Read pre-extracted GTFS files from GCS instead of downloading the dat…
jcpitre Jun 8, 2026
2d5a130
Added terraform config
jcpitre Jun 8, 2026
03969b0
Mounted buckets on file system in terraform
jcpitre Jun 9, 2026
806dc29
Mount GCS bucket on gtfs_change_tracker via terraform_data workaround
jcpitre Jun 9, 2026
4489e24
Allowed ALL ingress
jcpitre Jun 10, 2026
5b1177f
Change ids to stable_ids
jcpitre Jun 10, 2026
55b9942
Use stable IDs in payload and return plain UUIDs from _resolve_datasets
jcpitre Jun 10, 2026
dcdf833
Idempotency check, memory limiting, and retry suppression
jcpitre Jun 11, 2026
ffe92d2
Added dry_run and allow_overwrite payload parameters.
jcpitre Jun 11, 2026
8e35bd6
Flipped allow_overwrite to disallow_overwrite
jcpitre Jun 11, 2026
8d8f164
Added README.md
jcpitre Jun 11, 2026
efc538e
Merge branch 'main' into gtfs_change_tracker-1634
jcpitre Jun 11, 2026
c0320e6
Corrected some tests
jcpitre Jun 11, 2026
c9ccd42
Gated the call to the change tracker based on the presence in the gtf…
jcpitre Jun 11, 2026
9f2b47e
Adjusted the memory allocation
jcpitre Jun 11, 2026
6ecedd0
Made sure the base dataset is from the right feed.
jcpitre Jun 11, 2026
fcec71a
Remove CHANGELOG_DB_WRITE_ENABLED flag (covered by dry_run)
jcpitre Jun 11, 2026
a2254ec
Made the uploaded report public
jcpitre Jun 11, 2026
8420941
improve error logging, add payload to error response, and fix upsert …
jcpitre Jun 12, 2026
a1fa432
Changed the name to gtfs-datasets-comparer
jcpitre Jun 15, 2026
358f6ad
Changed the folder name to gtfs-datasets-comparer
jcpitre Jun 15, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 48 additions & 2 deletions functions-python/batch_process_dataset/src/pipeline_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@
from sqlalchemy.orm import Session

from shared.database.database import with_db_session
from shared.database_gen.sqlacodegen_models import Gtfsdataset
from shared.helpers.utils import create_http_task, create_http_pmtiles_builder_task
from shared.database_gen.sqlacodegen_models import Gtfsdataset, GtfsDatasetChangelog
from shared.helpers.utils import (
create_http_task,
create_http_pmtiles_builder_task,
create_http_gtfs_datasets_comparer_task,
)


def create_http_reverse_geolocation_processor_task(
Expand Down Expand Up @@ -136,3 +140,45 @@ def create_pipeline_tasks(dataset: Gtfsdataset, db_session: Session) -> None:
f"routes.txt file size : {routes_file.file_size_bytes} bytes"
f" and changed files: {changed_files}"
)

# Create GTFS change tracker task when a previous dataset exists
previous_dataset = (
db_session.query(Gtfsdataset)
.filter(
Gtfsdataset.feed_id == dataset.feed_id,
Gtfsdataset.id != dataset.id,
)
.order_by(Gtfsdataset.downloaded_at.desc())
.first()
)
if previous_dataset:
# Check the DB for an existing changelog record rather than the GCS blob presence.
# The unique constraint on (previous_dataset_id, current_dataset_id) makes this the
# authoritative idempotency check. GCS blob presence could be used instead, but that
# would require an extra API call and could miss cases where the blob exists but the
# DB record does not (or vice versa).
changelog_exists = (
db_session.query(GtfsDatasetChangelog)
.filter_by(
previous_dataset_id=previous_dataset.id,
current_dataset_id=dataset.id,
)
.first()
is not None
)
if changelog_exists:
logging.info(
"Skipping change tracker task for dataset %s: changelog already exists.",
dataset_stable_id,
)
else:
create_http_gtfs_datasets_comparer_task(
feed_stable_id=stable_id,
base_dataset_stable_id=previous_dataset.stable_id,
new_dataset_stable_id=dataset_stable_id,
)
else:
logging.info(
"Skipping change tracker task for dataset %s: no previous dataset found.",
dataset_stable_id,
)
Original file line number Diff line number Diff line change
Expand Up @@ -449,8 +449,9 @@ def mock_remove_side_effect(path):
@patch.dict(
os.environ, {"FEEDS_CREDENTIALS": '{"test_stable_id": "test_credentials"}'}
)
@patch("pipeline_tasks.create_http_gtfs_datasets_comparer_task")
@with_db_session(db_url=default_db_url)
def test_process(self, db_session):
def test_process(self, mock_gtfs_datasets_comparer_task, db_session):
feeds = db_session.query(Gtfsfeed).all()
feed_id = feeds[0].id

Expand Down
81 changes: 81 additions & 0 deletions functions-python/gtfs_datasets_comparer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# GTFS Change Tracker

This function computes a structured diff between two consecutive GTFS datasets and stores the resulting changelog in GCS and the database.

The function reads pre-extracted GTFS files from a GCS-mounted bucket (uploaded by `batch_process_dataset`), runs the diff engine, uploads the changelog JSON to GCS, and upserts a row in `gtfs_dataset_changelog`.

## Usage

The function receives the following request:
```
{
"feed_stable_id": str, – stable_id of the GTFS feed
"base_dataset_stable_id": str, – stable_id of the base (older) dataset
"new_dataset_stable_id": str, – stable_id of the new (recent) dataset
"disallow_overwrite": bool (optional), – skip if changelog already exists (default: false)
"dry_run": bool (optional) – compute diff but skip GCS upload and DB write (default: false)
}
```

Example:
```json
{
"feed_stable_id": "mdb-2142",
"base_dataset_stable_id": "mdb-2142-202502251658",
"new_dataset_stable_id": "mdb-2142-202507081652"
}
```

Example curl call:
```bash
curl -X POST https://<function-url> \
-H "Authorization: Bearer $(gcloud auth print-identity-token)" \
-H "Content-Type: application/json" \
-d '{
"feed_stable_id": "mdb-2142",
"base_dataset_stable_id": "mdb-2142-202502251658",
"new_dataset_stable_id": "mdb-2142-202507081652"
}'
```

### `disallow_overwrite`
By default the function will overwrite an existing changelog for the same dataset pair. Set `disallow_overwrite: true` to skip execution if a changelog already exists in GCS.

### `dry_run`
When `dry_run: true`, the diff is computed and a summary is returned in the response, but nothing is written to GCS or the database. Useful for validating that the extracted files are present and the diff engine runs correctly.

## Response

Success:
```json
{
"status": "success",
"message": "Changelog generated successfully.",
"changelog_url": "https://storage.googleapis.com/<bucket>/<feed>/<dataset>/..."
}
```

Dry run:
```json
{
"status": "success",
"message": "Dry run completed. Diff computed but not persisted.",
"summary": {
"total_changes": 42,
"files_added_count": 0,
"files_deleted_count": 0,
"files_modified_count": 3,
...
}
}
```

The function always returns HTTP 200, including on errors. Errors are reported in the response body under `"status": "error"`. This prevents GCP from retrying failures where re-running with the same parameters would produce the same result.

## GCP environment variables

- `DATASETS_BUCKET_NAME`: The GCS bucket where datasets are stored (required). Must include the environment suffix, e.g. `mobilitydata-datasets-dev`.
- `DATASETS_BUCKET_MOUNT`: Mount path for the GCS bucket (default: `/mobilitydata-datasets`).
- `GTFS_DIFF_DUCKDB_TMPDIR`: Mount path for the in-memory tmpfs used by the diff engine (default: `/tmp/in-memory`). Used by `limit_gcp_memory` to compute the available process memory and set `RLIMIT_AS`, preventing silent OOM kills.
- `MEMORY_MARGIN_MB`: Safety margin in MiB subtracted from the memory limit before setting `RLIMIT_AS` (default: `200`).
- `LOGGING_LEVEL`: Log level (default: `INFO`).
31 changes: 31 additions & 0 deletions functions-python/gtfs_datasets_comparer/function_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{
"name": "gtfs-datasets-comparer",
"description": "Tracks changes between two GTFS datasets and stores a structured changelog in GCS and the database",
"entry_point": "gtfs_datasets_comparer",
"timeout": 540,
"memory": "8Gi",
"trigger_http": true,
"include_folders": ["helpers"],
"include_api_folders": ["database_gen", "database", "common"],
"environment_variables": [
{
"key": "DATASETS_BUCKET_NAME"
},
{
"key": "LOGGING_LEVEL"
},
{
"key": "GTFS_DIFF_DUCKDB_TMPDIR"
}
],
"secret_environment_variables": [
{
"key": "FEEDS_DATABASE_URL"
}
],
"ingress_settings": "ALLOW_ALL",
"max_instance_request_concurrency": 1,
"max_instance_count": 10,
"min_instance_count": 0,
"available_cpu": 2
}
22 changes: 22 additions & 0 deletions functions-python/gtfs_datasets_comparer/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Common packages
functions-framework==3.*
google-cloud-logging
psycopg2-binary==2.9.6
urllib3~=2.6.3
attrs~=23.1.0
certifi~=2025.8.3

# SQL Alchemy and Geo Alchemy
SQLAlchemy==2.0.23
geoalchemy2==0.14.7

# Google specific packages for this function
flask
google-cloud-storage
google-cloud-tasks

# GTFS diff engine
gtfs-diff-engine

# Configuration
python-dotenv==1.2.2
2 changes: 2 additions & 0 deletions functions-python/gtfs_datasets_comparer/requirements_dev.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pytest~=7.4.3
requests-mock
Loading
Loading