Skip to content

Commit 6e7eedb

Browse files
committed
Add maximum executions check
1 parent f6c9cc8 commit 6e7eedb

6 files changed

Lines changed: 126 additions & 3 deletions

File tree

functions-python/helpers/utils.py

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@
1717
import logging
1818
import os
1919
import ssl
20+
from datetime import date, datetime
21+
from logging import Logger
22+
from typing import Optional
2023

2124
import requests
2225
import urllib3
23-
from google.cloud import storage
2426
from requests.adapters import HTTPAdapter
2527
from urllib3.util.retry import Retry
2628
from urllib3.util.ssl_ import create_urllib3_context
@@ -32,6 +34,8 @@ def create_bucket(bucket_name):
3234
Creates GCP storage bucket if it doesn't exist
3335
:param bucket_name: name of the bucket to create
3436
"""
37+
from google.cloud import storage
38+
3539
storage_client = storage.Client()
3640
bucket = storage_client.lookup_bucket(bucket_name)
3741
if bucket is None:
@@ -53,6 +57,8 @@ def download_from_gcs(bucket_name: str, blob_path: str, local_path: str) -> str:
5357
Returns:
5458
The absolute path to the downloaded file.
5559
"""
60+
from google.cloud import storage
61+
5662
storage_client = storage.Client()
5763
bucket = storage_client.bucket(bucket_name)
5864
blob = bucket.blob(blob_path)
@@ -211,3 +217,53 @@ def create_http_task(
211217
task_time=proto_time,
212218
http_method=tasks_v2.HttpMethod.POST,
213219
)
220+
221+
222+
def get_execution_id(json_payload: dict, stable_id: Optional[str]) -> str:
223+
"""
224+
Extracts the execution_id from the JSON payload.
225+
If not present, defaults to today's date in YYYY-MM-DD format followed by a hyphen and the stable_id if provided.
226+
"""
227+
execution_id = json_payload.get("execution_id")
228+
if not execution_id:
229+
execution_id = f"{str(date.today())}"
230+
if stable_id:
231+
execution_id += f"-{stable_id}"
232+
else:
233+
# Even this should not happen, but just in case we are defaulting it to the current time
234+
execution_id += f"-{datetime.now().strftime('%H:%M:%S')}"
235+
return execution_id
236+
237+
238+
def check_maximum_executions(
239+
execution_id: str, stable_id: str, logger: Logger, maximum_executions: int = 1
240+
) -> str:
241+
"""
242+
Checks if the dataset has been executed more than the maximum allowed times.
243+
If it has, returns an error message; otherwise, returns None.
244+
:param execution_id: The ID of the execution.
245+
:param stable_id: The stable ID of the dataset.
246+
:param logger: Logger instance to log messages.
247+
:param maximum_executions: The maximum number of allowed executions.
248+
:return: Error message if the maximum executions are exceeded, otherwise None.
249+
"""
250+
from shared.dataset_service.main import DatasetTraceService
251+
252+
trace_service = DatasetTraceService()
253+
trace = trace_service.get_by_execution_and_stable_ids(execution_id, stable_id)
254+
logger.info(f"Dataset trace: {trace}")
255+
executions = len(trace) if trace else 0
256+
logger.info(
257+
f"Dataset executed times={executions}/{maximum_executions} "
258+
f"in execution=[{execution_id}] "
259+
)
260+
261+
if executions > 0:
262+
if executions >= maximum_executions:
263+
error_message = (
264+
f"Function already executed maximum times "
265+
f"in execution: [{execution_id}]"
266+
)
267+
logger.error(error_message)
268+
return error_message
269+
return None

functions-python/reverse_geolocation/function_config.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
"timeout": 540,
66
"available_memory": "4Gi",
77
"trigger_http": true,
8-
"include_folders": ["helpers"],
8+
"include_folders": ["helpers", "dataset_service"],
99
"include_api_folders": ["database_gen", "database", "common"],
1010
"environment_variables": [],
1111
"secret_environment_variables": [

functions-python/reverse_geolocation/src/parse_request.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,21 @@ def parse_request_parameters(
8080
else:
8181
use_cache = data_type == "gtfs"
8282
logging.info("No use_cache provided, using(%s): %s", data_type, use_cache)
83-
return df, stable_id, dataset_id, data_type, urls, public, strategy, use_cache
83+
if "maximum_executions" in request_json:
84+
maximum_executions = int(request_json["maximum_executions"])
85+
else:
86+
maximum_executions = 1
87+
return (
88+
df,
89+
stable_id,
90+
dataset_id,
91+
data_type,
92+
urls,
93+
public,
94+
strategy,
95+
use_cache,
96+
maximum_executions,
97+
)
8498

8599

86100
def parse_request_parameters_gtfs(

functions-python/reverse_geolocation/src/reverse_geolocation_processor.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
from shared.helpers.locations import ReverseGeocodingStrategy
3737
from shared.helpers.logger import get_logger
3838
from shared.helpers.runtime_metrics import track_metrics
39+
from shared.helpers.utils import check_maximum_executions, get_execution_id
3940
from strategy_extraction_per_point import extract_location_aggregates_per_point
4041
from strategy_extraction_per_polygon import extract_location_aggregates_per_polygon
4142

@@ -272,10 +273,21 @@ def reverse_geolocation_process(
272273
public,
273274
strategy,
274275
use_cache,
276+
maximum_executions,
275277
) = parse_request_parameters(request)
276278

277279
logger = get_logger(__name__, stable_id)
278280

281+
# Check for maximum executions to avoid repeated processing during the same day
282+
request_json = request.get_json(silent=True)
283+
execution_id = get_execution_id(request_json, stable_id)
284+
max_execution_error = check_maximum_executions(
285+
execution_id, stable_id, logger, maximum_executions
286+
)
287+
if max_execution_error:
288+
logger.warning(max_execution_error)
289+
return max_execution_error, ERROR_STATUS_CODE
290+
279291
# Remove duplicate lat/lon points
280292
stops_df["stop_lat"] = pd.to_numeric(stops_df["stop_lat"], errors="coerce")
281293
stops_df["stop_lon"] = pd.to_numeric(stops_df["stop_lon"], errors="coerce")

functions-python/reverse_geolocation/src/scripts/reverse_geolocation_process_verifier.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
# This script is used to verify the reverse geolocation process
2+
# Before running this script, ensure you have the necessary environment set up:
3+
# 1. Google DataStore emulator running on localhost:8081 by running:
4+
# gcloud beta emulators datastore start --project=your-project-id
5+
16
import json
27
import logging
38
import os
@@ -234,12 +239,14 @@ def create_test_data(feed_stable_id: str, feed_dict: Dict, db_session: Session =
234239
"data_type": feed_dict["data_type"],
235240
"use_cache": False,
236241
"public": False,
242+
"maximum_executions": 1000,
237243
}
238244

239245
try:
240246
os.environ["STORAGE_EMULATOR_HOST"] = f"http://{HOST}:{PORT}"
241247
os.environ["DATASETS_BUCKET_NAME_GBFS"] = BUCKET_NAME
242248
os.environ["DATASETS_BUCKET_NAME_GTFS"] = BUCKET_NAME
249+
os.environ["DATASTORE_EMULATOR_HOST"] = "localhost:8081"
243250
server = create_server(
244251
host=HOST, port=PORT, in_memory=False, default_bucket=BUCKET_NAME
245252
)

functions-python/reverse_geolocation/tests/test_reverse_geolocation_processor.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ def test_parse_request_parameters(self, requests_mock):
5959
public,
6060
strategy,
6161
use_cache,
62+
maximum_executions,
6263
) = parse_request_parameters(request)
6364
self.assertEqual("test_stable_id", stable_id)
6465
self.assertEqual("test_dataset_id", dataset_id)
@@ -67,6 +68,8 @@ def test_parse_request_parameters(self, requests_mock):
6768
self.assertEqual(["test_url"], urls)
6869
self.assertEqual(True, public)
6970
self.assertEqual("per-point", strategy)
71+
self.assertEqual(True, use_cache)
72+
self.assertEqual(1, maximum_executions)
7073

7174
# Exception should be raised
7275
requests_mock.get.return_value.content = None
@@ -106,6 +109,7 @@ def test_parse_request_parameters_gbfs_station_information(self, requests_mock):
106109
public,
107110
strategy,
108111
use_cache,
112+
maximum_executions,
109113
) = parse_request_parameters(request)
110114

111115
self.assertEqual("stable123", stable_id)
@@ -115,6 +119,9 @@ def test_parse_request_parameters_gbfs_station_information(self, requests_mock):
115119
self.assertEqual((2, 2), df.shape)
116120
self.assertEqual("per-polygon", strategy)
117121
self.assertEqual(True, public)
122+
# Cache is disabled for GBFS data by default
123+
self.assertEqual(False, use_cache)
124+
self.assertEqual(1, maximum_executions)
118125

119126
@patch("parse_request.requests")
120127
def test_parse_request_parameters_gbfs_vehicle_status(self, requests_mock):
@@ -136,6 +143,7 @@ def test_parse_request_parameters_gbfs_vehicle_status(self, requests_mock):
136143
"vehicle_status_url": "http://dummy.vehicle",
137144
"data_type": "gbfs",
138145
"public": "False",
146+
"maximum_executions": 10,
139147
}
140148

141149
(
@@ -147,6 +155,7 @@ def test_parse_request_parameters_gbfs_vehicle_status(self, requests_mock):
147155
public,
148156
strategy,
149157
use_cache,
158+
maximum_executions,
150159
) = parse_request_parameters(request)
151160

152161
self.assertEqual("stable456", stable_id)
@@ -156,6 +165,9 @@ def test_parse_request_parameters_gbfs_vehicle_status(self, requests_mock):
156165
self.assertEqual((2, 2), df.shape)
157166
self.assertEqual("per-polygon", strategy)
158167
self.assertEqual(False, public)
168+
# Cache is disabled for GBFS data by default
169+
self.assertEqual(False, use_cache)
170+
self.assertEqual(10, maximum_executions)
159171

160172
@patch("parse_request.requests")
161173
def test_parse_request_parameters_invalid_request(self, requests_mock):
@@ -406,15 +418,21 @@ def test_update_dataset_bounding_box_exception(self, db_session):
406418
@patch("reverse_geolocation_processor.update_dataset_bounding_box")
407419
@patch("reverse_geolocation_processor.reverse_geolocation")
408420
@patch("reverse_geolocation_processor.create_geojson_aggregate")
421+
@patch("reverse_geolocation_processor.check_maximum_executions")
422+
@patch("reverse_geolocation_processor.get_execution_id")
409423
def test_valid_request(
410424
self,
425+
mock_get_execution_id,
426+
mock_check_maximum_executions,
411427
mock_create_geojson_aggregate,
412428
mock_reverse_geolocation,
413429
mock_update_bounding_box,
414430
mock_parse_request_parameters,
415431
):
416432
from reverse_geolocation_processor import reverse_geolocation_process
417433

434+
mock_get_execution_id.return_value = "test_execution_id"
435+
mock_check_maximum_executions.return_value = None
418436
# Mocking the parsed request parameters
419437
mock_parse_request_parameters.return_value = (
420438
pd.DataFrame({"stop_lat": [1.0], "stop_lon": [1.0]}),
@@ -425,6 +443,7 @@ def test_valid_request(
425443
True,
426444
"per-point",
427445
False,
446+
1,
428447
)
429448
mock_update_bounding_box.return_value = MagicMock()
430449
mock_reverse_geolocation.return_value = {"group_id": MagicMock()}
@@ -464,14 +483,21 @@ def test_invalid_request(self, mock_parse_request_parameters):
464483
@patch("reverse_geolocation_processor.parse_request_parameters")
465484
@patch("reverse_geolocation_processor.update_dataset_bounding_box")
466485
@patch("reverse_geolocation_processor.reverse_geolocation")
486+
@patch("reverse_geolocation_processor.check_maximum_executions")
487+
@patch("reverse_geolocation_processor.get_execution_id")
467488
def test_exception_handling(
468489
self,
490+
mock_check_get_execution_id,
491+
mock_check_maximum_executions,
469492
mock_reverse_geolocation,
470493
mock_update_bounding_box,
471494
mock_parse_request_parameters,
472495
):
473496
from reverse_geolocation_processor import reverse_geolocation_process
474497

498+
mock_check_get_execution_id.return_value = "test_execution_id"
499+
mock_check_maximum_executions.return_value = None
500+
# mock_dataset_service.get_by_execution_and_stable_ids.return_value = 0
475501
# Mocking the parsed request parameters
476502
mock_parse_request_parameters.return_value = (
477503
pd.DataFrame({"stop_lat": [1.0], "stop_lon": [1.0]}),
@@ -482,6 +508,7 @@ def test_exception_handling(
482508
True,
483509
"per-point",
484510
False,
511+
1,
485512
)
486513
mock_update_bounding_box.side_effect = Exception("Unexpected error")
487514

@@ -499,12 +526,18 @@ def test_exception_handling(
499526
mock_reverse_geolocation.assert_not_called()
500527

501528
@patch("reverse_geolocation_processor.parse_request_parameters")
529+
@patch("reverse_geolocation_processor.check_maximum_executions")
530+
@patch("reverse_geolocation_processor.get_execution_id")
502531
def test_valid_request_empty_stops(
503532
self,
533+
mock_get_execution_id,
534+
mock_check_maximum_executions,
504535
mock_parse_request_parameters,
505536
):
506537
from reverse_geolocation_processor import reverse_geolocation_process
507538

539+
mock_get_execution_id.return_value = "test_execution_id"
540+
mock_check_maximum_executions.return_value = None
508541
# Mocking the parsed request parameters
509542
mock_parse_request_parameters.return_value = (
510543
pd.DataFrame({"stop_lat": [], "stop_lon": []}),
@@ -515,6 +548,7 @@ def test_valid_request_empty_stops(
515548
True,
516549
"per-point",
517550
False,
551+
1,
518552
)
519553

520554
# Mocking a Flask request

0 commit comments

Comments
 (0)