-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathquery_helper.py
More file actions
268 lines (230 loc) · 8.14 KB
/
query_helper.py
File metadata and controls
268 lines (230 loc) · 8.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
import logging
import re
from datetime import datetime
from typing import Type
from sqlalchemy import and_, func, or_
from sqlalchemy.orm import Session, joinedload
from sqlalchemy.orm.query import Query
from shared.database_gen.sqlacodegen_models import (
Feed,
Gtfsrealtimefeed,
Gtfsfeed,
Gbfsfeed,
Gtfsdataset,
Validationreport,
)
feed_mapping = {"gtfs_rt": Gtfsrealtimefeed, "gtfs": Gtfsfeed, "gbfs": Gbfsfeed}
def get_model(data_type: str | None) -> Type[Feed]:
"""
Get the model based on the data type
"""
return feed_mapping.get(data_type, Feed)
def query_feed_by_stable_id(
session: Session, stable_id: str, data_type: str | None
) -> Gtfsrealtimefeed | Gtfsfeed | Gbfsfeed:
"""
Query the feed by stable id
"""
model = get_model(data_type)
return session.query(model).filter(model.stable_id == stable_id).first()
def get_eager_loading_options(model: Type[Feed]):
"""
Get the appropriate eager loading options based on the model type.
Args:
model: The SQLAlchemy model class
Returns:
List of joinedload options for the query
"""
if model == Gtfsrealtimefeed:
logging.info("Adding GTFS-RT specific eager loading")
return [
joinedload(Gtfsrealtimefeed.locations),
joinedload(Gtfsrealtimefeed.entitytypes),
joinedload(Gtfsrealtimefeed.gtfs_feeds),
joinedload(Gtfsrealtimefeed.externalids),
joinedload(Gtfsrealtimefeed.redirectingids),
]
elif model == Gtfsfeed:
logging.info("Adding GTFS specific eager loading")
return [
joinedload(Gtfsfeed.locations),
joinedload(Gtfsfeed.externalids),
joinedload(Gtfsfeed.redirectingids),
]
else:
logging.info("Adding base Feed eager loading")
return [
joinedload(Feed.locations),
joinedload(Feed.externalids),
joinedload(Feed.redirectingids),
joinedload(Gtfsrealtimefeed.entitytypes),
joinedload(Gtfsrealtimefeed.gtfs_feeds),
]
def get_feeds_query(
db_session: Session,
search_query: str | None = None,
operation_status: str | None = None,
data_type: str | None = None,
limit: int | None = None,
offset: int | None = None,
model: Type[Feed | Gtfsfeed | Gtfsrealtimefeed] | None = Feed,
) -> Query:
"""
Build a consolidated query for feeds with filtering options.
Args:
db_session: SQLAlchemy session
search_query: Optional general search query
operation_status: Optional filter for operational status (wip or published)
data_type: Optional filter for feed type (gtfs or gtfs_rt)
limit: Maximum number of items to return
offset: Number of items to skip
Returns:
Query: SQLAlchemy query object
"""
try:
logging.info(
"Building query with params: data_type=%s, operation_status=%s and model=%s",
data_type,
operation_status,
model.__name__,
)
conditions = []
if data_type is None or len(data_type.strip()) == 0:
conditions.append(model.data_type.in_(["gtfs", "gtfs_rt"]))
logging.info("Added filter to exclude gbfs feeds")
else:
conditions.append(model.data_type == data_type)
logging.info("Added data_type filter: %s", data_type)
if operation_status and operation_status.strip():
conditions.append(model.operational_status == operation_status)
logging.info("Added operational_status filter: %s", operation_status)
if search_query and search_query.strip():
search_pattern = f"%{search_query.strip()}%"
conditions.append(
or_(
model.stable_id.ilike(search_pattern),
model.feed_name.ilike(search_pattern),
model.provider.ilike(search_pattern),
)
)
logging.info("Added search_query filter: %s", search_query)
query = db_session.query(model)
logging.info("Created base query with model %s", model.__name__)
eager_loading_options = get_eager_loading_options(model)
query = query.options(*eager_loading_options)
if conditions:
query = query.filter(and_(*conditions))
logging.info("Applied conditions: %s", conditions)
query = query.order_by(model.provider, model.stable_id)
if offset is not None:
query = query.offset(offset)
if limit is not None:
query = query.limit(limit)
logging.info("Generated SQL Query: %s", query)
return query
except Exception as e:
logging.error("Error building query: %s", str(e))
raise
def get_datasets_with_missing_reports_query(
db_session: Session,
filter_after: datetime | None = None,
) -> Query:
"""
Get datasets with missing validation reports.
Args:
db_session: SQLAlchemy session
filter_after: Optional date to filter datasets
Returns:
A SQLAlchemy query object for datasets with missing validation reports order by feed and dataset stable id.
"""
query = (
db_session.query(
Gtfsfeed.stable_id,
Gtfsdataset.stable_id,
)
.select_from(Gtfsfeed)
.join(Gtfsdataset, Gtfsdataset.feed_id == Gtfsfeed.id)
.outerjoin(Validationreport, Gtfsdataset.validation_reports)
.filter(Validationreport.id.is_(None))
)
if filter_after:
query = query.filter(Gtfsdataset.downloaded_at >= filter_after)
query = query.distinct(Gtfsfeed.stable_id, Gtfsdataset.stable_id).order_by(
Gtfsdataset.stable_id, Gtfsfeed.stable_id
)
return query
def get_feeds_with_missing_bounding_boxes_query(
db_session: Session,
) -> Query:
"""
Get GTFS feeds with datasets missing bounding boxes.
Args:
db_session: SQLAlchemy session
Returns:
A SQLAlchemy query object for GTFS feeds with datasets missing bounding boxes
ordered by feed stable id.
"""
query = (
db_session.query(Gtfsfeed)
.filter(Gtfsfeed.bounding_box.is_(None))
.filter(~Gtfsfeed.feedlocationgrouppoints.any())
.distinct(Gtfsfeed.stable_id)
.order_by(Gtfsfeed.stable_id)
)
return query
def normalize_url(url_column) -> str:
"""
Normalize a URL by removing the protocol (http:// or https://), 'www.' prefix, and trailing slash.
This function generates a SQLAlchemy expression that can be used in queries.
Args:
url_column: The SQLAlchemy column representing the URL.
Returns:
A SQLAlchemy expression that normalizes the URL.
"""
return func.regexp_replace(
func.regexp_replace(
func.regexp_replace(url_column, r"^https?://", "", "gi"),
r"^www\.",
"",
"gi",
),
r"/$",
"",
"g",
)
def normalize_url_str(url: str | None) -> str:
"""Normalize a license URL for matching.
Steps:
- Trim whitespace and quotes
- Remove BOM characters
- Strip fragments and query parameters
- Remove scheme (http/https) and www prefix
- Lowercase the host
"""
u = (url or "").strip().strip("'\"").replace("\ufeff", "")
u = re.sub(r"#.*$", "", u)
u = re.sub(r"\?.*$", "", u)
u = re.sub(r"^https?://", "", u, flags=re.I)
u = re.sub(r"^www\.", "", u, flags=re.I)
# remove trailing slashes
u = re.sub(r"/+$", "", u)
if "/" in u:
host, rest = u.split("/", 1)
return host.lower() + "/" + rest
return u.lower()
def get_feed_by_normalized_url(url: str, db_session: Session) -> Feed | None:
"""
Query the feed by normalized URL and exclude deprecated feeds.
Args:
url: The URL to normalize and search for.
db_session: SQLAlchemy session.
"""
normalized_url = normalize_url_str(url)
return (
db_session.query(Feed)
.filter(
func.lower(func.trim(normalize_url(Feed.producer_url))) == normalized_url,
Feed.status != "deprecated",
)
.first()
)