Skip to content

Commit 7f70f4b

Browse files
committed
Add chunked telemetry fetch with adaptive queries
Introduce fetch_telemetry_chunked to fetch telemetry over large time ranges by splitting queries into time chunks and using adaptive_query to recursively halve ranges when server file limits are hit. Supports optional resampling, movement filtering, parallel top-level chunks via run_chunks_parallel, and progress output. Export the new function in package __init__ and add .DS_Store to .gitignore. Also import needed utilities (adaptive_query, run_chunks_parallel) and typing hints. Add sequential execution for single-worker chunks
1 parent 7b2307d commit 7f70f4b

5 files changed

Lines changed: 157 additions & 3 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,4 @@ build/
1515
coverage.xml
1616
htmlcov/
1717
/examples/__pycache__
18+
.DS_Store

src/slicks/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from .fetcher import fetch_telemetry, bulk_fetch_season, list_target_sensors, get_influx_client
1+
from .fetcher import fetch_telemetry, fetch_telemetry_chunked, bulk_fetch_season, list_target_sensors, get_influx_client
22
from .discovery import discover_sensors
33
from .movement_detector import detect_movement_ratio, get_movement_segments, filter_data_in_movement
44
from .config import connect_influxdb3

src/slicks/fetcher.py

Lines changed: 131 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import os
22
from datetime import datetime, timedelta
3+
from typing import List, Optional
34
import pandas as pd
45
from influxdb_client_3 import InfluxDBClient3
56
from . import config
6-
from .query_utils import quote_table
7+
from .query_utils import quote_table, adaptive_query, run_chunks_parallel
78
from .movement_detector import filter_data_in_movement
89

910

@@ -107,6 +108,135 @@ def fetch_telemetry(start_time, end_time, signals=None, client=None, filter_move
107108
print(f"Error fetching data: {e}")
108109
return None
109110

111+
def fetch_telemetry_chunked(
112+
start_time: datetime,
113+
end_time: datetime,
114+
signals=None,
115+
client=None,
116+
filter_movement: bool = True,
117+
resample: Optional[str] = "1s",
118+
chunk_size: timedelta = timedelta(hours=6),
119+
max_workers: int = 1,
120+
show_progress: bool = True,
121+
) -> Optional[pd.DataFrame]:
122+
"""
123+
Fetch telemetry with automatic time-splitting when InfluxDB's per-query
124+
file limit is exceeded.
125+
126+
Identical interface to ``fetch_telemetry`` but uses ``adaptive_query``
127+
internally: if a time window hits the server's parquet-file cap the range
128+
is recursively halved until each sub-query succeeds, then results are
129+
concatenated. Suitable for ranges that span many test sessions.
130+
131+
Args:
132+
start_time: Start of the query range.
133+
end_time: End of the query range.
134+
signals: Sensor names (defaults to config.SIGNALS).
135+
client: Existing InfluxDBClient3 instance (creates one if None).
136+
filter_movement: Apply movement-detection filtering to the final result.
137+
resample: Pandas frequency string, e.g. "1s", "100ms", or None for raw.
138+
chunk_size: Initial time window per adaptive-query call. Each chunk is
139+
split further on file-limit errors. Default: 6 hours.
140+
max_workers: Parallel workers for top-level chunks (1 = sequential).
141+
show_progress: Print progress messages.
142+
143+
Returns:
144+
Combined DataFrame with DatetimeIndex, or None if no data found.
145+
"""
146+
if signals is None:
147+
signals = config.SIGNALS
148+
if isinstance(signals, str):
149+
signals = [signals]
150+
if not signals:
151+
return None
152+
153+
if client is None:
154+
client = get_influx_client()
155+
156+
signal_list = "', '".join(signals)
157+
schema = config.INFLUX_SCHEMA or "iox"
158+
table = config.INFLUX_TABLE or config.INFLUX_DB
159+
table_ref = quote_table(schema, table)
160+
161+
def _fmt(dt: datetime) -> str:
162+
"""Format datetime as UTC ISO string for SQL, safe for both naive and tz-aware."""
163+
return dt.strftime("%Y-%m-%dT%H:%M:%S") + "Z"
164+
165+
def _fetch_chunk(cli: InfluxDBClient3, t0: datetime, t1: datetime) -> List[pd.DataFrame]:
166+
"""Fetch one time window; return list-of-DataFrame for adaptive_query."""
167+
query = (
168+
f"SELECT time, \"signalName\", \"sensorReading\" "
169+
f"FROM {table_ref} "
170+
f"WHERE \"signalName\" IN ('{signal_list}') "
171+
f"AND time >= '{_fmt(t0)}' AND time < '{_fmt(t1)}' "
172+
f"ORDER BY time ASC"
173+
)
174+
raw = cli.query(query=query, mode="pandas")
175+
if raw.empty:
176+
return []
177+
df = raw.pivot_table(
178+
index="time",
179+
columns="signalName",
180+
values="sensorReading",
181+
aggfunc="mean",
182+
)
183+
return [df]
184+
185+
# Split full range into top-level chunks, then use adaptive_query per chunk
186+
chunks: List[tuple] = []
187+
t = start_time
188+
while t < end_time:
189+
chunks.append((t, min(t + chunk_size, end_time)))
190+
t += chunk_size
191+
192+
if show_progress:
193+
print(f"Fetching {len(chunks)} chunk(s) from {start_time.date()} to {end_time.date()}...")
194+
195+
all_dfs: List[pd.DataFrame] = []
196+
197+
def _fetch_adaptive(cli: InfluxDBClient3, t0: datetime, t1: datetime) -> List[pd.DataFrame]:
198+
return adaptive_query(
199+
client=cli,
200+
t0=t0,
201+
t1=t1,
202+
primary_fn=_fetch_chunk,
203+
min_span=timedelta(minutes=1),
204+
)
205+
206+
if max_workers > 1:
207+
all_dfs = run_chunks_parallel(
208+
client_factory=get_influx_client,
209+
chunks=chunks,
210+
query_fn=_fetch_adaptive,
211+
max_workers=max_workers,
212+
)
213+
else:
214+
for i, (t0, t1) in enumerate(chunks):
215+
if show_progress:
216+
print(f" chunk {i + 1}/{len(chunks)}: {t0}{t1}")
217+
results = _fetch_adaptive(client, t0, t1)
218+
all_dfs.extend(results)
219+
220+
if not all_dfs:
221+
if show_progress:
222+
print("No data found.")
223+
return None
224+
225+
df = pd.concat(all_dfs).sort_index()
226+
# Remove duplicate timestamps from chunk boundaries
227+
df = df[~df.index.duplicated(keep="first")]
228+
229+
if resample:
230+
df = df.resample(resample).mean().dropna(how="all")
231+
232+
if filter_movement:
233+
df = filter_data_in_movement(df)
234+
235+
if show_progress:
236+
print(f"Fetched {len(df)} rows.")
237+
return df
238+
239+
110240
def bulk_fetch_season(start_date, end_date, output_file="telemetry_season.csv"):
111241
"""
112242
Fetch data day-by-day.

src/slicks/query_utils.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,26 @@ def run_chunks_parallel(
134134
if not chunks:
135135
return []
136136

137+
# Sequential path — avoids nested ThreadPoolExecutor when called from inside
138+
# another executor thread (e.g. asyncio run_in_executor). Creating gRPC clients
139+
# inside nested thread pools causes "bad value(s) in fds_to_keep" on macOS.
140+
if max_workers == 1:
141+
ordered: List[T] = []
142+
for idx, (t0, t1) in enumerate(chunks):
143+
client = client_factory()
144+
try:
145+
ordered.extend(query_fn(client, t0, t1))
146+
except PermanentQueryError:
147+
raise
148+
finally:
149+
try:
150+
client.close()
151+
except Exception:
152+
pass
153+
if on_chunk_done:
154+
on_chunk_done(idx)
155+
return ordered
156+
137157
results: dict[int, List[T]] = {}
138158
lock = threading.Lock()
139159

src/slicks/scanner.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,7 @@ def scan_data_availability(
325325
bin_size: str = "hour",
326326
include_counts: bool = True,
327327
show_progress: bool = True,
328+
max_workers: int = 4,
328329
) -> ScanResult:
329330
"""
330331
Scan the database for data availability windows.
@@ -397,6 +398,7 @@ def scan_data_availability(
397398
initial_chunk_days=initial_chunk_days,
398399
show_progress=show_progress,
399400
total_chunks=total_chunks,
401+
max_workers=max_workers,
400402
))
401403
except PermanentQueryError as e:
402404
raise RuntimeError(
@@ -439,6 +441,7 @@ def _fetch_bins_adaptive(
439441
initial_chunk_days: int = 31,
440442
show_progress: bool = True,
441443
total_chunks: int = 1,
444+
max_workers: int = 4,
442445
) -> Iterable[Tuple[datetime, int]]:
443446
"""Iterate over bucket start times with counts using parallel adaptive chunking."""
444447

@@ -541,7 +544,7 @@ def on_chunk_done(idx: int) -> None:
541544
client_factory=_make_client,
542545
chunks=chunks,
543546
query_fn=process_chunk,
544-
max_workers=4,
547+
max_workers=max_workers,
545548
on_chunk_done=on_chunk_done,
546549
)
547550
finally:

0 commit comments

Comments
 (0)