Skip to content

Commit dd45a77

Browse files
committed
Add adaptive query utilities and parallel scan
Introduce slicks.query_utils with error classification, adaptive_query (recursive split/fallback) and run_chunks_parallel (threaded chunk execution with client_factory). Refactor discovery.discover_sensors and scanner._fetch_bins_adaptive to use these utilities, add progress bar handling, per-thread InfluxDBClient3 creation/closing, and surface permanent errors as RuntimeError. Add comprehensive unit tests for discovery, query_utils and update scanner tests to match parallel client usage.
1 parent cf73e63 commit dd45a77

6 files changed

Lines changed: 813 additions & 120 deletions

File tree

src/slicks/discovery.py

Lines changed: 109 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,55 +1,116 @@
1-
from datetime import timedelta
1+
"""
2+
Sensor discovery module.
3+
4+
Scans the database for all unique sensor names within a time range.
5+
Uses adaptive chunking with parallel execution.
6+
"""
7+
8+
from __future__ import annotations
9+
10+
import threading
11+
from datetime import datetime, timedelta
12+
from typing import List, Optional
13+
14+
from influxdb_client_3 import InfluxDBClient3
15+
from tqdm.auto import tqdm
16+
217
from . import config
3-
from .fetcher import get_influx_client
18+
from .query_utils import adaptive_query, run_chunks_parallel, PermanentQueryError
419

5-
def discover_sensors(start_time, end_time, chunk_size_days=1, client=None):
20+
21+
def discover_sensors(
22+
start_time: datetime,
23+
end_time: datetime,
24+
chunk_size_days: int = 7,
25+
client: Optional[InfluxDBClient3] = None,
26+
show_progress: bool = True,
27+
) -> List[str]:
628
"""
7-
Scans the database for ALL unique sensor names within the time range.
8-
Uses recursive splitting to handle server resource limits.
29+
Scan the database for ALL unique sensor names within the time range.
30+
31+
Uses adaptive chunking with parallel execution to handle server
32+
resource limits efficiently.
33+
34+
Args:
35+
start_time: Start of scan range.
36+
end_time: End of scan range.
37+
chunk_size_days: Days per chunk (default 7).
38+
client: Ignored (kept for backward compatibility).
39+
show_progress: Show progress bar (default True).
40+
41+
Returns:
42+
Sorted list of unique sensor name strings.
943
"""
10-
if client is None:
11-
client = get_influx_client()
12-
13-
unique_sensors = set()
14-
15-
def _scan_recursive(start, end, depth=0):
16-
# Stop recursion if interval is too small (< 10 seconds) or depth too high
17-
if (end - start).total_seconds() < 10 or depth > 5:
18-
# print(f" Skipping small/deep chunk: {start} to {end}")
19-
return
20-
21-
query = f"""
44+
45+
def _make_client() -> InfluxDBClient3:
46+
return InfluxDBClient3(
47+
host=config.INFLUX_URL,
48+
token=config.INFLUX_TOKEN,
49+
database=config.INFLUX_DB,
50+
)
51+
52+
def _query_distinct(
53+
client: InfluxDBClient3, t0: datetime, t1: datetime,
54+
) -> List[str]:
55+
sql = f"""
2256
SELECT DISTINCT "signalName"
2357
FROM "iox"."{config.INFLUX_DB}"
24-
WHERE time >= '{start.isoformat()}Z'
25-
AND time < '{end.isoformat()}Z'
58+
WHERE time >= '{t0.isoformat()}Z'
59+
AND time < '{t1.isoformat()}Z'
2660
"""
27-
28-
try:
29-
# print(f"Scanning {start} -> {end} (Depth {depth})...")
30-
table = client.query(query=query, mode="all")
31-
df = table.to_pandas()
32-
33-
if not df.empty and "signalName" in df.columns:
34-
batch_sensors = set(df["signalName"].unique())
35-
unique_sensors.update(batch_sensors)
36-
37-
except Exception as e:
38-
# print(f" Chunk failed ({e}). Splitting...")
39-
mid_point = start + (end - start) / 2
40-
_scan_recursive(start, mid_point, depth + 1)
41-
_scan_recursive(mid_point, end, depth + 1)
42-
43-
print(f"Discovering sensors from {start_time} to {end_time}...")
44-
current = start_time
45-
while current < end_time:
46-
next_step = min(current + timedelta(days=chunk_size_days), end_time)
47-
if next_step <= current: break
48-
49-
# Start recursion for this chunk
50-
_scan_recursive(current, next_step)
51-
current = next_step
52-
53-
sorted_sensors = sorted(list(unique_sensors))
54-
print(f"Discovery Complete. Found {len(sorted_sensors)} unique sensors.")
55-
return sorted_sensors
61+
table = client.query(query=sql)
62+
if table.num_rows == 0:
63+
return []
64+
col = table.column("signalName")
65+
return [v.as_py() for v in col if v.as_py() is not None]
66+
67+
def _process_chunk(
68+
client: InfluxDBClient3, t0: datetime, t1: datetime,
69+
) -> List[str]:
70+
return adaptive_query(
71+
client=client,
72+
t0=t0,
73+
t1=t1,
74+
primary_fn=_query_distinct,
75+
fallback_fn=None,
76+
min_span=timedelta(seconds=10),
77+
max_depth=5,
78+
)
79+
80+
# Build chunk list
81+
chunks = []
82+
cur = start_time
83+
while cur < end_time:
84+
nxt = min(cur + timedelta(days=chunk_size_days), end_time)
85+
if nxt <= cur:
86+
break
87+
chunks.append((cur, nxt))
88+
cur = nxt
89+
90+
pbar = tqdm(
91+
total=len(chunks),
92+
desc="Discovering sensors",
93+
unit="chunk",
94+
disable=not show_progress,
95+
)
96+
pbar_lock = threading.Lock()
97+
98+
def on_chunk_done(idx: int) -> None:
99+
with pbar_lock:
100+
pbar.update(1)
101+
102+
try:
103+
all_names = run_chunks_parallel(
104+
client_factory=_make_client,
105+
chunks=chunks,
106+
query_fn=_process_chunk,
107+
max_workers=4,
108+
on_chunk_done=on_chunk_done,
109+
)
110+
except PermanentQueryError as e:
111+
raise RuntimeError(f"Sensor discovery aborted: {e}") from e
112+
finally:
113+
pbar.close()
114+
115+
unique = sorted(set(all_names))
116+
return unique

src/slicks/query_utils.py

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
"""
2+
Shared utilities for adaptive chunked querying against InfluxDB 3.x (IOx).
3+
4+
Provides:
5+
- Error classification (recoverable vs permanent)
6+
- Parallel chunk execution via ThreadPoolExecutor
7+
- Adaptive recursive splitting on resource-limit failures
8+
"""
9+
10+
from __future__ import annotations
11+
12+
import threading
13+
from concurrent.futures import ThreadPoolExecutor, as_completed
14+
from datetime import datetime, timedelta
15+
from typing import Callable, List, Optional, Sequence, Tuple, TypeVar
16+
17+
from influxdb_client_3 import InfluxDBClient3
18+
19+
T = TypeVar("T")
20+
21+
# ---------------------------------------------------------------------------
22+
# Error classification
23+
# ---------------------------------------------------------------------------
24+
25+
_PERMANENT_ERROR_PATTERNS = (
26+
"table not found",
27+
"not found",
28+
"unauthorized",
29+
"unauthenticated",
30+
"permission denied",
31+
"invalid token",
32+
"database not found",
33+
"bucket not found",
34+
"syntax error",
35+
)
36+
37+
38+
class PermanentQueryError(Exception):
39+
"""An error that will not resolve by splitting the time range."""
40+
41+
42+
def is_permanent_error(exc: Exception) -> bool:
43+
"""Classify an exception as permanent (non-retryable) vs recoverable."""
44+
msg = str(exc).lower()
45+
return any(pattern in msg for pattern in _PERMANENT_ERROR_PATTERNS)
46+
47+
48+
# ---------------------------------------------------------------------------
49+
# Adaptive recursive query
50+
# ---------------------------------------------------------------------------
51+
52+
def adaptive_query(
53+
client: InfluxDBClient3,
54+
t0: datetime,
55+
t1: datetime,
56+
primary_fn: Callable[[InfluxDBClient3, datetime, datetime], List[T]],
57+
fallback_fn: Optional[Callable[[InfluxDBClient3, datetime, datetime], List[T]]] = None,
58+
min_span: Optional[timedelta] = None,
59+
max_depth: int = 10,
60+
_depth: int = 0,
61+
) -> List[T]:
62+
"""
63+
Execute *primary_fn* on [t0, t1). On a recoverable failure the range is
64+
split in half and each half is retried recursively.
65+
66+
When the remaining span is smaller than *min_span* (or *max_depth* is
67+
reached) *fallback_fn* is used instead — if provided — otherwise an empty
68+
list is returned.
69+
70+
Raises ``PermanentQueryError`` immediately for non-retryable errors such
71+
as authentication failures or missing tables.
72+
"""
73+
if min_span and (t1 - t0) <= min_span:
74+
if fallback_fn:
75+
return fallback_fn(client, t0, t1)
76+
return []
77+
78+
if _depth > max_depth:
79+
if fallback_fn:
80+
return fallback_fn(client, t0, t1)
81+
return []
82+
83+
try:
84+
return primary_fn(client, t0, t1)
85+
except Exception as exc:
86+
if is_permanent_error(exc):
87+
raise PermanentQueryError(str(exc)) from exc
88+
89+
mid = t0 + (t1 - t0) / 2
90+
if mid <= t0 or mid >= t1:
91+
if fallback_fn:
92+
return fallback_fn(client, t0, t1)
93+
return []
94+
95+
left = adaptive_query(
96+
client, t0, mid, primary_fn, fallback_fn,
97+
min_span, max_depth, _depth + 1,
98+
)
99+
right = adaptive_query(
100+
client, mid, t1, primary_fn, fallback_fn,
101+
min_span, max_depth, _depth + 1,
102+
)
103+
return left + right
104+
105+
106+
# ---------------------------------------------------------------------------
107+
# Parallel chunk execution
108+
# ---------------------------------------------------------------------------
109+
110+
def run_chunks_parallel(
111+
client_factory: Callable[[], InfluxDBClient3],
112+
chunks: Sequence[Tuple[datetime, datetime]],
113+
query_fn: Callable[[InfluxDBClient3, datetime, datetime], List[T]],
114+
max_workers: int = 4,
115+
on_chunk_done: Optional[Callable[[int], None]] = None,
116+
) -> List[T]:
117+
"""
118+
Execute *query_fn* across time-range *chunks* in parallel.
119+
120+
Each worker thread receives its own ``InfluxDBClient3`` instance
121+
(via *client_factory*) because the client is not guaranteed thread-safe.
122+
123+
Results are returned in chunk order regardless of completion order.
124+
125+
Raises ``PermanentQueryError`` immediately, cancelling remaining work.
126+
"""
127+
if not chunks:
128+
return []
129+
130+
results: dict[int, List[T]] = {}
131+
lock = threading.Lock()
132+
133+
with ThreadPoolExecutor(max_workers=max_workers) as pool:
134+
future_to_idx: dict = {}
135+
clients: list[InfluxDBClient3] = []
136+
137+
for idx, (t0, t1) in enumerate(chunks):
138+
client = client_factory()
139+
clients.append(client)
140+
future = pool.submit(query_fn, client, t0, t1)
141+
future_to_idx[future] = idx
142+
143+
try:
144+
for future in as_completed(future_to_idx):
145+
idx = future_to_idx[future]
146+
result = future.result() # raises on exception
147+
with lock:
148+
results[idx] = result
149+
if on_chunk_done:
150+
on_chunk_done(idx)
151+
except PermanentQueryError:
152+
for f in future_to_idx:
153+
f.cancel()
154+
raise
155+
finally:
156+
for c in clients:
157+
try:
158+
c.close()
159+
except Exception:
160+
pass
161+
162+
ordered: List[T] = []
163+
for idx in sorted(results.keys()):
164+
ordered.extend(results[idx])
165+
return ordered

0 commit comments

Comments
 (0)