-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathmain.py
More file actions
463 lines (404 loc) · 16.5 KB
/
main.py
File metadata and controls
463 lines (404 loc) · 16.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
#
# MobilityData 2024
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import logging
from datetime import datetime
import requests
from sqlalchemy.orm import Session
from shared.database.database import with_db_session
from shared.helpers.timezone import (
extract_timezone_from_json_validation_report,
get_service_date_range_with_timezone_utc,
)
import functions_framework
from shared.database_gen.sqlacodegen_models import (
Validationreport,
Feature,
Notice,
Gtfsdataset,
)
from shared.helpers.logger import init_logger
from shared.helpers.transform import get_nested_value
from shared.helpers.feed_status import update_feed_statuses_query
from shared.helpers.task_execution.task_execution_tracker import TaskExecutionTracker
init_logger()
FILES_ENDPOINT = os.getenv("FILES_ENDPOINT")
def read_json_report(json_report_url):
"""
Fetches and returns the JSON content from a given URL.
:param json_report_url: URL to the JSON report
:return: Dict representation of the JSON report
"""
response = requests.get(json_report_url)
return response.json(), response.status_code
def get_feature(feature_name, session):
"""
Retrieves a Feature object by its name from the database.
If the feature does not exist, it creates a new one.
:param feature_name: Name of the feature
:param session: Database session instance
:return: Feature instance
"""
feature = session.query(Feature).filter(Feature.name == feature_name).first()
if not feature:
feature = Feature(name=feature_name)
return feature
def get_dataset(dataset_stable_id, session):
"""
Retrieves a GTFSDataset object by its stable ID from the database.
:param dataset_stable_id: Stable ID of the dataset
:param session: Database session instance
:return: GTFSDataset instance or None if not found
"""
return (
session.query(Gtfsdataset)
.filter(Gtfsdataset.stable_id == dataset_stable_id)
.one_or_none()
)
def validate_json_report(json_report_url):
"""
Validates the JSON report by fetching and reading it.
:param json_report_url: The URL of the JSON report
:return: Tuple containing the JSON report or an error message and the status code
"""
try:
json_report, code = read_json_report(json_report_url)
if code != 200:
logging.error("Error reading JSON report: %s", code)
return f"Error reading JSON report at url {json_report_url}.", code
return json_report, 200
except Exception as error: # JSONDecodeError or RequestException
logging.error("Error reading JSON report: %s", str(error))
return f"Error reading JSON report at url {json_report_url}: {error}", 500
def parse_json_report(json_report):
"""
Parses the JSON report and extracts the validatedAt and validatorVersion fields.
:param json_report: The JSON report
:return: A tuple containing the validatedAt datetime and the validatorVersion
"""
try:
dt = json_report["summary"]["validatedAt"]
validated_at = datetime.fromisoformat(dt.replace("Z", "+00:00"))
version = None
if "validatorVersion" in json_report["summary"]:
version = json_report["summary"]["validatorVersion"]
logging.info(
"Validation report validated_at: %s with version: %s.",
validated_at,
version,
)
return validated_at, version
except Exception as error:
logging.error("Error parsing JSON report: %s", error)
raise Exception(f"Error parsing JSON report: {error}")
def generate_report_entities(
version, validated_at, json_report, dataset_stable_id, session, feed_stable_id
):
"""
Creates validation report entities based on the JSON report.
:param version: The version of the validator
:param validated_at: The datetime the report was validated
:param json_report: The JSON report object
:param dataset_stable_id: Stable ID of the dataset
:param session: The database session
:param feed_stable_id: Stable ID of the feed
:return: List of entities created
"""
entities = []
report_id = f"{dataset_stable_id}_{version}"
logging.info("Creating validation report entities for: %s.", report_id)
html_report_url = (
f"{FILES_ENDPOINT}/{feed_stable_id}/{dataset_stable_id}/report_{version}.html"
)
json_report_url = (
f"{FILES_ENDPOINT}/{feed_stable_id}/{dataset_stable_id}/report_{version}.json"
)
# Check if report already exists
# If exists, the function should graceful finish avoiding retry mechanism to trigger again
if get_validation_report(report_id, session):
logging.warning("Validation report %s already exists. Terminating.", report_id)
return []
validation_report_entity = Validationreport(
id=report_id,
validator_version=version,
validated_at=validated_at,
html_report=html_report_url,
json_report=json_report_url,
)
entities.append(validation_report_entity)
dataset = get_dataset(dataset_stable_id, session)
if not dataset:
raise Exception(f"Dataset {dataset_stable_id} not found.")
dataset.validation_reports.append(validation_report_entity)
extracted_timezone = extract_timezone_from_json_validation_report(json_report)
if extracted_timezone is not None:
dataset.agency_timezone = extracted_timezone
populate_service_date(dataset, json_report, extracted_timezone)
for feature_name in get_nested_value(json_report, ["summary", "gtfsFeatures"], []):
feature = get_feature(feature_name, session)
feature.validations.append(validation_report_entity)
entities.append(feature)
for notice in json_report["notices"]:
notice_entity = Notice(
dataset_id=dataset.id,
validation_report_id=report_id,
notice_code=notice["code"],
severity=notice["severity"],
total_notices=notice["totalNotices"],
)
dataset.notices.append(notice_entity)
entities.append(notice_entity)
# Process notices and compute counters
populate_counters(dataset.notices, validation_report_entity)
return entities
def populate_counters(notices, validation_report_entity):
"""
Populates the validation report entity with counters based on the notices.
:param notices: Notices
:param validation_report_entity: validation report entity
"""
counters = process_validation_report_notices(notices)
# Update the validation report entity with computed counters
validation_report_entity.total_info = counters["total_info"]
validation_report_entity.total_warning = counters["total_warning"]
validation_report_entity.total_error = counters["total_error"]
validation_report_entity.unique_info_count = counters["unique_info_count"]
validation_report_entity.unique_warning_count = counters["unique_warning_count"]
validation_report_entity.unique_error_count = counters["unique_error_count"]
def populate_service_date(dataset, json_report, timezone=None):
"""
Populates the service date range of the dataset based on the JSON report.
The service date range is extracted from the feedServiceWindowStart and feedServiceWindowEnd fields,
if both are present and not empty.
"""
feed_service_window_start = get_nested_value(
json_report, ["summary", "feedInfo", "feedServiceWindowStart"]
)
feed_service_window_end = get_nested_value(
json_report, ["summary", "feedInfo", "feedServiceWindowEnd"]
)
if (
result := get_service_date_range_with_timezone_utc(
feed_service_window_start, feed_service_window_end, timezone
)
) is not None:
utc_service_start_date, utc_service_end_date = result
dataset.service_date_range_start = utc_service_start_date
dataset.service_date_range_end = utc_service_end_date
@with_db_session
def create_validation_report_entities(
feed_stable_id, dataset_stable_id, version, db_session
):
"""
Creates and stores entities based on a validation report.
This includes the validation report itself, related feature entities,
and any notices found within the report.
:param feed_stable_id: Stable ID of the feed
:param dataset_stable_id: Stable ID of the dataset
:param version: Version of the validator
:return: Tuple List of all entities created (Validationreport, Feature, Notice) and status code
"""
json_report_url = (
f"{FILES_ENDPOINT}/{feed_stable_id}/{dataset_stable_id}/report_{version}.json"
)
logging.info(f"Accessing JSON report at {json_report_url}.")
json_report, code = validate_json_report(json_report_url)
if code != 200:
return json_report, code
try:
validated_at, version_from_json = parse_json_report(json_report)
except Exception as error:
return str(error), 500
try:
# Generate the database entities required for the report
# If an error is thrown we should let the retry mechanism to do its work
entities = generate_report_entities(
# default to the version parameter
version_from_json if version_from_json else version,
validated_at,
json_report,
dataset_stable_id,
db_session,
feed_stable_id,
)
for entity in entities:
db_session.add(entity)
# In this case the report entities are already in the DB or cannot be saved for other reasons
# In any case, this will fail in any retried event
try:
logging.info("Committing %s entities to the database.", len(entities))
db_session.commit()
logging.info("Entities committed successfully.")
except Exception as error:
logging.warning(
"Could not commit %s entities to the database: %s", entities, error
)
return str(error), 200
update_feed_statuses_query(db_session, [feed_stable_id])
# Update execution tracker regardless of bypass_db_update, so monitoring
# works for both pre-release and post-release validation runs.
try:
tracker = TaskExecutionTracker(
task_name="gtfs_validation",
run_id=version,
db_session=db_session,
)
tracker.mark_completed(dataset_stable_id)
db_session.commit()
except Exception as tracker_error:
logging.warning(
"Could not update task execution tracker: %s", tracker_error
)
result = f"Created {len(entities)} entities."
logging.info(result)
return result, 200
except Exception as error:
logging.error("Error creating validation report entities: : %s", error)
return f"Error creating validation report entities: {error}", 500
finally:
pass
def get_validation_report(report_id, session):
"""
Retrieves a ValidationReport object by its ID from the database.
:param report_id: The ID of the report
:param session: The database session
:return: ValidationReport instance or None if not found
"""
return (
session.query(Validationreport).filter(Validationreport.id == report_id).first()
)
@functions_framework.http
def process_validation_report(request):
"""
Processes a validation report by creating necessary entities in the database.
It expects a JSON request body with 'dataset_id' and 'feed_id'.
:param request: Request object containing 'dataset_id' and 'feed_id'
:return: HTTP response indicating the result of the operation
"""
request_json = request.get_json(silent=True)
logging.info(
"Processing validation report function called with request: %s", request_json
)
if (
not request_json
or "dataset_id" not in request_json
or "feed_id" not in request_json
or "validator_version" not in request_json
):
return (
f"Invalid request body: {request_json}. We expect 'dataset_id', 'feed_id' and 'validator_version' to be "
f"present.",
400,
)
dataset_id = request_json["dataset_id"]
feed_id = request_json["feed_id"]
validator_version = request_json["validator_version"]
logging.info(
"Processing validation report version: %s for dataset: %s in feed: %s.",
validator_version,
dataset_id,
feed_id,
)
return create_validation_report_entities(feed_id, dataset_id, validator_version)
@functions_framework.http
@with_db_session
def compute_validation_report_counters(request, db_session: Session):
"""
Compute the total number of errors, warnings, and info notices,
as well as the number of distinct codes for each severity level
across all validation reports in the database, and write the results to the database.
"""
batch_size = 100 # Number of reports to process in each batch
offset = 0
notice_exists = (
db_session.query(Notice)
.filter(Notice.validation_report_id == Validationreport.id)
.exists()
)
while True:
validation_reports = (
db_session.query(Validationreport)
.filter(
(Validationreport.unique_info_count == 0)
& (Validationreport.unique_warning_count == 0)
& (Validationreport.unique_error_count == 0)
& notice_exists
)
.order_by(Validationreport.validated_at.desc())
.limit(batch_size)
.offset(offset)
.all()
)
logging.info(
"Processing %s validation reports from offset: %s.",
len(validation_reports),
offset,
)
# Break the loop if no more reports are found
if len(validation_reports) == 0:
break
for report in validation_reports:
populate_counters(report.notices, report)
logging.info(
"Updated ValidationReport %s with counters: "
"INFO=%s, WARNING=%s, ERROR=%s, "
"Unique INFO Code=%s, Unique WARNING Code=%s, "
"Unique ERROR Code=%s",
report.id,
report.total_info,
report.total_warning,
report.total_error,
report.unique_info_count,
report.unique_warning_count,
report.unique_error_count,
)
# Commit the changes for the current batch
db_session.commit()
# Last page
if len(validation_reports) < batch_size:
break
return {"message": "Validation report counters computed successfully."}, 200
def process_validation_report_notices(notices):
"""
Processes the notices of a validation report and computes counters for different severities.
:param report: A Validationreport object containing associated notices.
:return: A dictionary with computed counters for total and unique counts of INFO, WARNING, and ERROR severities.
"""
# Initialize counters for the current report
total_info, total_warning, total_error = 0, 0, 0
info_codes, warning_codes, error_codes = set(), set(), set()
# Process associated notices
for notice in notices:
match notice.severity:
case "INFO":
total_info += notice.total_notices
info_codes.add(notice.notice_code)
case "WARNING":
total_warning += notice.total_notices
warning_codes.add(notice.notice_code)
case "ERROR":
total_error += notice.total_notices
error_codes.add(notice.notice_code)
case _:
logging.warning("Unknown severity: %s", notice.severity)
return {
"total_info": total_info,
"total_warning": total_warning,
"total_error": total_error,
"unique_info_count": len(info_codes),
"unique_warning_count": len(warning_codes),
"unique_error_count": len(error_codes),
}