Skip to content

Commit a153148

Browse files
authored
feat: updating geolocation trigger schedule for gbfs feeds + trigger feed processing on addition (#1631)
1 parent 2151e04 commit a153148

File tree

8 files changed

+279
-14
lines changed

8 files changed

+279
-14
lines changed

.github/workflows/catalog-update.yml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,16 @@ on:
88
required: false
99
default: true
1010
type: boolean
11+
ENVIRONMENT:
12+
description: Target environment (leave as 'all' to run all environments)
13+
required: false
14+
default: all
15+
type: choice
16+
options:
17+
- all
18+
- qa
19+
- prod
20+
- dev
1121
repository_dispatch: # Update on mobility-database-catalog repo dispatch
1222
types: [ catalog-sources-updated, gbfs-systems-updated ]
1323

@@ -18,6 +28,7 @@ env:
1828
jobs:
1929
resolve-api-meta-qa:
2030
name: QA Resolve API commit/version
31+
if: github.event_name == 'repository_dispatch' || inputs.ENVIRONMENT == 'all' || inputs.ENVIRONMENT == 'qa'
2132
runs-on: ubuntu-latest
2233
outputs:
2334
CHECKOUT_REF: ${{ steps.resolve.outputs.COMMIT_SHA != '' && steps.resolve.outputs.COMMIT_SHA || 'main' }}
@@ -57,6 +68,7 @@ jobs:
5768

5869
resolve-api-meta-prod:
5970
name: PROD Resolve API commit/version
71+
if: github.event_name == 'repository_dispatch' || inputs.ENVIRONMENT == 'all' || inputs.ENVIRONMENT == 'prod'
6072
runs-on: ubuntu-latest
6173
outputs:
6274
CHECKOUT_REF: ${{ steps.resolve.outputs.COMMIT_SHA != '' && steps.resolve.outputs.COMMIT_SHA || 'main' }}
@@ -96,6 +108,7 @@ jobs:
96108

97109
resolve-api-meta-dev:
98110
name: DEV Resolve API commit/version
111+
if: github.event_name == 'repository_dispatch' || inputs.ENVIRONMENT == 'all' || inputs.ENVIRONMENT == 'dev'
99112
runs-on: ubuntu-latest
100113
outputs:
101114
CHECKOUT_REF: ${{ steps.resolve.outputs.COMMIT_SHA != '' && steps.resolve.outputs.COMMIT_SHA || 'main' }}

api/src/scripts/populate_db_gbfs.py

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,14 @@
1+
import json
2+
import os
3+
import uuid
4+
from concurrent import futures
15
from datetime import datetime
6+
from typing import List, Dict
27

38
import pandas as pd
49
import pycountry
510
import pytz
11+
from google.cloud import pubsub_v1
612

713
from scripts.gbfs_utils.comparison import generate_system_csv_from_db, compare_db_to_csv
814
from scripts.gbfs_utils.fetching import fetch_data, get_data_content
@@ -11,10 +17,13 @@
1117
from shared.database.database import generate_unique_id, configure_polymorphic_mappers
1218
from shared.database_gen.sqlacodegen_models import Gbfsfeed, Location, Externalid
1319

20+
GBFS_PUBSUB_TOPIC_NAME = "validate-gbfs-feed"
21+
1422

1523
class GBFSDatabasePopulateHelper(DatabasePopulateHelper):
1624
def __init__(self, filepaths, test_mode=False):
1725
super().__init__(filepaths, test_mode)
26+
self.added_feeds: List[Dict] = []
1827

1928
def filter_data(self):
2029
"""Filter out rows with Authentication Info and duplicate System IDs"""
@@ -73,6 +82,7 @@ def populate_db(self, session, fetch_url=True):
7382
else:
7483
fetched_data = dict()
7584
# If the feed already exists, update it. Otherwise, create a new feed.
85+
is_new_feed = gbfs_feed is None
7686
if gbfs_feed:
7787
self.logger.info(f"Updating feed {stable_id} - {row['Name']}")
7888
else:
@@ -116,6 +126,14 @@ def populate_db(self, session, fetch_url=True):
116126
gbfs_feed.locations = [location]
117127

118128
session.flush()
129+
if is_new_feed:
130+
self.added_feeds.append(
131+
{
132+
"feed_id": gbfs_feed.id,
133+
"stable_id": gbfs_feed.stable_id,
134+
"url": gbfs_feed.auto_discovery_url,
135+
}
136+
)
119137
self.logger.info(80 * "-")
120138

121139
# self.db.session.commit()
@@ -125,6 +143,42 @@ def populate_db(self, session, fetch_url=True):
125143
self.logger.error(f"Error populating the database: {e}")
126144
raise e
127145

146+
def trigger_downstream_tasks(self):
147+
"""Trigger GBFS version extraction, validation, and geolocation for newly added feeds."""
148+
self.logger.info(
149+
f"Triggering downstream tasks for {len(self.added_feeds)} newly added feed(s): "
150+
f"{', '.join(f['stable_id'] for f in self.added_feeds)}"
151+
)
152+
if os.getenv("ENV", "local") == "local":
153+
self.logger.info("Skipping downstream tasks in local environment.")
154+
return
155+
if not self.added_feeds:
156+
self.logger.info("No feeds to trigger downstream tasks for.")
157+
return
158+
159+
env = os.getenv("ENV", "dev")
160+
project_id = f"mobility-feeds-{env}"
161+
publisher = pubsub_v1.PublisherClient()
162+
topic_path = publisher.topic_path(project_id, GBFS_PUBSUB_TOPIC_NAME)
163+
execution_id = str(uuid.uuid4())
164+
165+
publish_futures = []
166+
for feed_data in self.added_feeds:
167+
message = {
168+
"execution_id": execution_id,
169+
"stable_id": feed_data["stable_id"],
170+
"feed_id": feed_data["feed_id"],
171+
"url": feed_data["url"],
172+
"extract_geolocation": True,
173+
}
174+
data = json.dumps(message).encode("utf-8")
175+
self.logger.info(f"Publishing feed {feed_data['stable_id']} to {topic_path}")
176+
future = publisher.publish(topic_path, data)
177+
publish_futures.append(future)
178+
179+
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)
180+
self.logger.info(f"Published {len(self.added_feeds)} feed(s) to {topic_path}.")
181+
128182

129183
if __name__ == "__main__":
130-
GBFSDatabasePopulateHelper(set_up_configs()).initialize(trigger_downstream_tasks=False)
184+
GBFSDatabasePopulateHelper(set_up_configs()).initialize(trigger_downstream_tasks=True)

functions-python/gbfs_validator/src/gbfs_data_processor.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@ def __init__(self, stable_id: str, feed_id: str):
5353
self.validation_reports: Dict[str, Dict[str, Any]] = {}
5454
self.logger = get_logger(GBFSDataProcessor.__name__, stable_id)
5555

56-
def process_gbfs_data(self, autodiscovery_url: str) -> None:
56+
def process_gbfs_data(
57+
self, autodiscovery_url: str, extract_geolocation: bool = True
58+
) -> None:
5759
"""Process the GBFS data from the autodiscovery URL."""
5860
# Record the request to the autodiscovery URL
5961
self.record_autodiscovery_request(autodiscovery_url)
@@ -70,7 +72,8 @@ def process_gbfs_data(self, autodiscovery_url: str) -> None:
7072
# Update database entities
7173
self.update_database_entities()
7274

73-
self.trigger_location_extraction()
75+
if extract_geolocation:
76+
self.trigger_location_extraction()
7477

7578
@with_db_session()
7679
def record_autodiscovery_request(

functions-python/gbfs_validator/src/main.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,12 @@
3232

3333
def fetch_all_gbfs_feeds(db_session: Session) -> List[Gbfsfeed]:
3434
try:
35-
gbfs_feeds = (
36-
db_session.query(Gbfsfeed).filter(Gbfsfeed.status != "deprecated").all()
37-
)
35+
query = db_session.query(Gbfsfeed).filter(Gbfsfeed.status != "deprecated")
36+
limit = os.getenv("FEEDS_LIMIT")
37+
if limit is not None:
38+
logging.info("Limiting batch to %s feeds (FEEDS_LIMIT is set).", limit)
39+
query = query.limit(int(limit))
40+
gbfs_feeds = query.all()
3841
db_session.expunge_all()
3942
return gbfs_feeds
4043
except Exception as e:
@@ -60,6 +63,7 @@ def gbfs_validator_pubsub(cloud_event: CloudEvent):
6063
except KeyError as e:
6164
logging.error("Missing required field: %s", e)
6265
return f"Invalid Pub/Sub message data. Missing {e}."
66+
extract_geolocation = message_json.get("extract_geolocation", True)
6367

6468
# get logger with stable_id
6569
logger = get_logger(__name__, stable_id)
@@ -86,7 +90,7 @@ def gbfs_validator_pubsub(cloud_event: CloudEvent):
8690
# Process GBFS data
8791
try:
8892
processor = GBFSDataProcessor(stable_id, feed_id)
89-
processor.process_gbfs_data(url)
93+
processor.process_gbfs_data(url, extract_geolocation=extract_geolocation)
9094
except Exception as e:
9195
error_message = f"Error processing GBFS data: {e}"
9296
logger.error(error_message)
@@ -117,11 +121,15 @@ def gbfs_validator_batch(request, db_session: Session):
117121

118122
try:
119123
feed_stable_ids = None
124+
extract_geolocation = True
120125
if request and request.method == "POST" and request.is_json:
121126
request_json = request.get_json()
122127
feed_stable_ids = (
123128
request_json.get("feed_stable_ids") if request_json else None
124129
)
130+
extract_geolocation = (
131+
request_json.get("extract_geolocation", True) if request_json else True
132+
)
125133
else:
126134
logging.info("Request body not provided or not a valid JSON.")
127135
except Exception as e:
@@ -146,6 +154,7 @@ def gbfs_validator_batch(request, db_session: Session):
146154
"stable_id": gbfs_feed.stable_id,
147155
"feed_id": gbfs_feed.id,
148156
"url": gbfs_feed.auto_discovery_url,
157+
"extract_geolocation": extract_geolocation,
149158
}
150159
feeds_data.append(feed_data)
151160
logging.info("Feed %s added to the batch.", gbfs_feed.stable_id)

functions-python/gbfs_validator/tests/test_gbfs_data_processor.py

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,9 @@ def test_fetch_gbfs_files(
233233
},
234234
],
235235
}
236-
self.processor.process_gbfs_data(autodiscovery_url)
236+
self.processor.process_gbfs_data(
237+
autodiscovery_url, extract_geolocation=True
238+
)
237239
gbfs_feed = (
238240
session.query(Gbfsfeed)
239241
.filter_by(stable_id=self.stable_id)
@@ -250,3 +252,64 @@ def test_fetch_gbfs_files(
250252
)
251253
self.assertIn("2.2", versions)
252254
self.assertIn("2.1", versions)
255+
256+
@with_db_session(db_url=default_db_url)
257+
@patch("gbfs_data_processor.create_http_task")
258+
@patch("gbfs_data_processor.tasks_v2")
259+
@patch(
260+
"gbfs_data_processor.GBFSEndpoint.get_request_metadata",
261+
side_effect=mock_get_request_metadata,
262+
)
263+
@patch("google.cloud.storage.Client")
264+
@patch("gbfs_data_processor.fetch_gbfs_data", side_effect=mock_fetch_gbfs_data)
265+
@patch("requests.post")
266+
@patch("requests.get")
267+
@patch.dict(
268+
os.environ,
269+
{
270+
"FEEDS_DATABASE_URL": default_db_url,
271+
"GOOGLE_APPLICATION_CREDENTIALS": "test",
272+
},
273+
)
274+
def test_process_gbfs_data_skip_geolocation(
275+
self,
276+
_,
277+
mock_post,
278+
__,
279+
mock_cloud_storage_client,
280+
___,
281+
mock_tasks,
282+
mock_create_http_task,
283+
db_session,
284+
):
285+
"""Test that trigger_location_extraction is not called when extract_geolocation=False."""
286+
autodiscovery_url = "http://example.com/gbfs.json"
287+
gbfs_feed = Gbfsfeed(
288+
id=self.feed_id,
289+
operator=self.faker.company(),
290+
operator_url=self.faker.url(),
291+
stable_id=self.stable_id,
292+
auto_discovery_url=autodiscovery_url,
293+
status="active",
294+
operational_status="published",
295+
)
296+
session = db_session
297+
session.add(gbfs_feed)
298+
session.commit()
299+
(
300+
mock_cloud_storage_client.return_value.bucket.return_value.blob.return_value
301+
).public_url = self.faker.url()
302+
with patch("logging.info"), patch("logging.error"), patch("logging.warning"):
303+
mock_post.return_value.json.return_value = {
304+
"summary": {
305+
"validatorVersion": "1.0.13",
306+
"version": {"detected": "2.2", "validated": "2.2"},
307+
"hasErrors": False,
308+
"errorsCount": 0,
309+
},
310+
"filesSummary": [],
311+
}
312+
self.processor.process_gbfs_data(
313+
autodiscovery_url, extract_geolocation=False
314+
)
315+
mock_create_http_task.assert_not_called()

0 commit comments

Comments
 (0)