Skip to content

Commit 22b72c2

Browse files
committed
InfluxDB wide format, v0.2.0
Update pyproject.toml
1 parent 7f70f4b commit 22b72c2

6 files changed

Lines changed: 362 additions & 64 deletions

File tree

pyproject.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
44

55
[project]
66
name = "slicks"
7-
version = "0.1.5"
7+
version = "0.2.0"
88
description = "The home baked data pipeline for Western Formula Racing"
99
readme = "README.md"
1010
authors = [
@@ -19,6 +19,8 @@ requires-python = ">=3.11"
1919
dependencies = [
2020
"pandas>=2.0.0",
2121
"influxdb3-python>=0.1.0",
22+
"influxdb-client>=1.30.0",
23+
"cantools>=39.0.0",
2224
"python-dotenv>=1.0.0",
2325
"matplotlib>=3.0.0",
2426
"tqdm>=4.0.0",

src/slicks/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
from .movement_detector import detect_movement_ratio, get_movement_segments, filter_data_in_movement
44
from .config import connect_influxdb3
55
from .scanner import scan_data_availability
6+
from .can_decode import DecodedFrame, decode_frame, load_dbc, resolve_dbc_path
7+
from .writer import WideWriter, frame_to_line_protocol, NON_SIGNAL_COLS
68

79
# New analysis modules
810
from . import battery

src/slicks/can_decode.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
"""CAN frame decoding — shared logic for all telemetry writers."""
2+
from __future__ import annotations
3+
4+
import os
5+
from dataclasses import dataclass, field
6+
from pathlib import Path
7+
from typing import Optional
8+
9+
import cantools
10+
11+
12+
@dataclass
13+
class DecodedFrame:
14+
message_name: str
15+
can_id: int
16+
signals: dict[str, float] = field(default_factory=dict)
17+
18+
19+
def resolve_dbc_path(env_var: str = "DBC_FILE_PATH", fallback: str = "example.dbc") -> Path:
20+
"""Resolve DBC file path from environment variable or common locations."""
21+
env_val = os.getenv(env_var, fallback)
22+
env_path = Path(env_val)
23+
if env_path.exists():
24+
return env_path
25+
26+
candidates = [
27+
Path("/app/example.dbc"),
28+
Path("/installer/example.dbc"),
29+
Path(__file__).parent.parent.parent / "example.dbc",
30+
]
31+
for c in candidates:
32+
if c.exists():
33+
return c
34+
35+
# Try newest .dbc in current directory
36+
dbcs = sorted(Path(".").glob("*.dbc"), key=lambda p: p.stat().st_mtime, reverse=True)
37+
if dbcs:
38+
return dbcs[0]
39+
40+
raise FileNotFoundError(
41+
f"Could not find DBC file. Set {env_var} or place example.dbc in /app/."
42+
)
43+
44+
45+
def load_dbc(path: Optional[Path] = None) -> cantools.Database:
46+
"""Load a cantools DBC database, resolving path if not provided."""
47+
if path is None:
48+
path = resolve_dbc_path()
49+
return cantools.database.load_file(str(path))
50+
51+
52+
def decode_frame(db: cantools.Database, can_id: int, data: bytes) -> Optional[DecodedFrame]:
53+
"""
54+
Decode a CAN frame using a loaded DBC database.
55+
56+
Handles:
57+
- Extended CAN IDs (bit 31 flag stripped before lookup)
58+
- NamedSignalValue enums (converted to float)
59+
- Non-numeric signal values (skipped)
60+
61+
Returns DecodedFrame with only numeric signals, or None if CAN ID not in DBC.
62+
"""
63+
effective_id = can_id & 0x1FFFFFFF # Strip extended CAN ID flag
64+
65+
try:
66+
message = db.get_message_by_frame_id(effective_id)
67+
except KeyError:
68+
return None
69+
70+
try:
71+
raw = message.decode(data)
72+
except Exception:
73+
return None
74+
75+
signals: dict[str, float] = {}
76+
for name, val in raw.items():
77+
if hasattr(val, "value") and hasattr(val, "name"):
78+
# NamedSignalValue enum from cantools
79+
try:
80+
signals[name] = float(val.value)
81+
except (ValueError, TypeError):
82+
continue
83+
elif isinstance(val, (int, float)):
84+
signals[name] = float(val)
85+
# else: skip non-numeric values
86+
87+
return DecodedFrame(message_name=message.name, can_id=can_id, signals=signals)

src/slicks/discovery.py

Lines changed: 43 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22
Sensor discovery module.
33
44
Scans the database for all unique sensor names within a time range.
5-
Uses adaptive chunking with parallel execution.
5+
Uses adaptive chunking with parallel execution for narrow schema.
6+
For wide schema, uses instant information_schema.columns metadata lookup.
67
"""
78

89
from __future__ import annotations
@@ -16,6 +17,7 @@
1617

1718
from . import config
1819
from .query_utils import adaptive_query, run_chunks_parallel, PermanentQueryError, quote_table
20+
from .writer import NON_SIGNAL_COLS
1921

2022

2123
def discover_sensors(
@@ -24,23 +26,53 @@ def discover_sensors(
2426
chunk_size_days: int = 7,
2527
client: Optional[InfluxDBClient3] = None,
2628
show_progress: bool = True,
29+
schema: str = "narrow",
2730
) -> List[str]:
2831
"""
2932
Scan the database for ALL unique sensor names within the time range.
3033
31-
Uses adaptive chunking with parallel execution to handle server
32-
resource limits efficiently.
34+
For ``schema="wide"``, uses an instant ``information_schema.columns`` metadata
35+
lookup (no data scan, no adaptive bisection, ignores time range and chunk params).
36+
37+
For ``schema="narrow"`` (default), uses adaptive chunking with parallel execution
38+
to handle server resource limits efficiently.
3339
3440
Args:
35-
start_time: Start of scan range.
36-
end_time: End of scan range.
37-
chunk_size_days: Days per chunk (default 7).
41+
start_time: Start of scan range (narrow schema only).
42+
end_time: End of scan range (narrow schema only).
43+
chunk_size_days: Days per chunk (default 7, narrow schema only).
3844
client: Ignored (kept for backward compatibility).
39-
show_progress: Show progress bar (default True).
45+
show_progress: Show progress bar (default True, narrow schema only).
46+
schema: "narrow" (legacy EAV) or "wide" (one field per signal).
4047
4148
Returns:
4249
Sorted list of unique sensor name strings.
4350
"""
51+
db_schema = config.INFLUX_SCHEMA or "iox"
52+
table = config.INFLUX_TABLE or config.INFLUX_DB
53+
54+
if schema == "wide":
55+
# Instant metadata lookup — no data scan needed
56+
cli = InfluxDBClient3(
57+
host=config.INFLUX_URL,
58+
token=config.INFLUX_TOKEN,
59+
database=config.INFLUX_DB,
60+
)
61+
sql = (
62+
f"SELECT column_name FROM information_schema.columns "
63+
f"WHERE table_schema = '{db_schema}' AND table_name = '{table}'"
64+
)
65+
result = cli.query(query=sql)
66+
if result.num_rows == 0:
67+
return []
68+
col = result.column("column_name")
69+
return sorted(
70+
v.as_py()
71+
for v in col
72+
if v.as_py() is not None and v.as_py() not in NON_SIGNAL_COLS
73+
)
74+
75+
# --- narrow (legacy EAV) path ---
4476

4577
def _make_client() -> InfluxDBClient3:
4678
return InfluxDBClient3(
@@ -52,21 +84,17 @@ def _make_client() -> InfluxDBClient3:
5284
def _query_distinct(
5385
client: InfluxDBClient3, t0: datetime, t1: datetime,
5486
) -> List[str]:
55-
# Ensure safe defaults if config vars are missing or empty
56-
schema = config.INFLUX_SCHEMA or "iox"
57-
table = config.INFLUX_TABLE or config.INFLUX_DB
58-
table_ref = quote_table(schema, table)
59-
87+
table_ref = quote_table(db_schema, table)
6088
sql = f"""
6189
SELECT DISTINCT "signalName"
6290
FROM {table_ref}
6391
WHERE time >= '{t0.isoformat()}Z'
6492
AND time < '{t1.isoformat()}Z'
6593
"""
66-
table = client.query(query=sql)
67-
if table.num_rows == 0:
94+
tbl = client.query(query=sql)
95+
if tbl.num_rows == 0:
6896
return []
69-
col = table.column("signalName")
97+
col = tbl.column("signalName")
7098
return [v.as_py() for v in col if v.as_py() is not None]
7199

72100
def _process_chunk(

0 commit comments

Comments
 (0)