-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathfeed_status.py
More file actions
87 lines (76 loc) · 2.97 KB
/
feed_status.py
File metadata and controls
87 lines (76 loc) · 2.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import logging
from datetime import datetime, timezone
from sqlalchemy import text
from shared.database_gen.sqlacodegen_models import Gtfsdataset, Feed, t_feedsearch
from shared.database.database import refresh_materialized_view
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from sqlalchemy.orm import Session
# query to update the status of the feeds based on the service date range of the latest dataset
def update_feed_statuses_query(session: "Session", stable_feed_ids: list[str]):
today_utc = datetime.now(timezone.utc).date()
latest_dataset_subq = (
session.query(
Gtfsdataset.feed_id,
Gtfsdataset.service_date_range_start,
Gtfsdataset.service_date_range_end,
)
.filter(
Gtfsdataset.latest.is_(True),
Gtfsdataset.service_date_range_start.isnot(None),
Gtfsdataset.service_date_range_end.isnot(None),
)
.subquery()
)
status_conditions = [
(
latest_dataset_subq.c.service_date_range_end < today_utc,
"inactive",
),
(
latest_dataset_subq.c.service_date_range_start > today_utc,
"future",
),
(
(latest_dataset_subq.c.service_date_range_start <= today_utc)
& (latest_dataset_subq.c.service_date_range_end >= today_utc),
"active",
),
]
try:
diff_counts: dict[str, int] = {}
def get_filters(status: str):
filters = [
Feed.id == latest_dataset_subq.c.feed_id,
Feed.status != text("'deprecated'::status"),
Feed.status != text("'development'::status"),
# We filter out feeds that already have the status so that the
# update count reflects the number of feeds that actually
# changed status.
Feed.status != text("'%s'::status" % status),
service_date_conditions,
]
if len(stable_feed_ids) > 0:
filters.insert(0, Feed.stable_id.in_(stable_feed_ids))
return filters
for service_date_conditions, status in status_conditions:
diff_counts[status] = (
session.query(Feed)
.filter(*get_filters(status))
.update({Feed.status: status}, synchronize_session=False)
)
except Exception as e:
logging.error("Error updating feed statuses: %s", e)
raise Exception(f"Error updating feed statuses: {e}")
try:
session.commit()
refresh_materialized_view(session, t_feedsearch.name)
logging.info("Feed Database changes for status committed.")
logging.info("Status Changes: %s", diff_counts)
session.close()
return diff_counts
except Exception as e:
logging.error("Error committing changes:", e)
session.rollback()
session.close()
raise Exception(f"Error creating dataset: {e}")