Fix mean and sum on influxdb 1.x#1510
Conversation
There was a problem hiding this comment.
Pull request overview
This PR updates Mycodo’s Flux query handling to avoid InfluxDB 1.8.x crash bugs around MEAN/SUM, by performing those aggregates (and aggregateWindow-style grouping) in Python for InfluxDB 1.x while keeping native Flux aggregates for InfluxDB 2.x.
Changes:
- Replace the prior
mediansubstitution with manualmeancomputation for InfluxDB 1.x (including a manual grouped-mean path whengroup_secis used). - Move the InfluxDB 1.x
SUMworkaround fully intoquery_flux()so callers no longer need to manually sum. - Reduce log severity for known/expected InfluxDB 1.x aggregate issues and simplify downstream consumers (
output_sec_on,average_*,sum_past_seconds).
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| first_ts = measurements[0][0] | ||
| if hasattr(first_ts, 'timestamp'): | ||
| start_epoch = first_ts.timestamp() | ||
| else: | ||
| start_epoch = first_ts | ||
|
|
||
| current_group_start = start_epoch | ||
| current_group_values = [] | ||
|
|
There was a problem hiding this comment.
_manual_aggregate_mean() bins windows starting at the first returned timestamp (current_group_start = start_epoch) and uses the first point’s timestamp as the group time. This does not match Flux aggregateWindow() semantics (windows are aligned to fixed boundaries and _time is typically a window boundary), so 1.x and 2.x grouped results can differ. Align buckets to a consistent epoch boundary (optionally supporting an offset) and set _time to the window start/stop to better emulate aggregateWindow().
| # First, extract all measurements | ||
| measurements = [] | ||
| for table in tables: | ||
| for row in table.records: | ||
| timestamp = row.values['_time'] | ||
| value = row.values['_value'] | ||
| if value is not None: | ||
| measurements.append((timestamp, value)) | ||
|
|
||
| if not measurements: | ||
| return [] | ||
|
|
||
| # Sort by timestamp | ||
| measurements.sort(key=lambda x: x[0].timestamp() if hasattr(x[0], 'timestamp') else float(x[0])) | ||
|
|
There was a problem hiding this comment.
_manual_aggregate_mean() materializes all rows into a list and then sorts them, which can be very expensive for large queries (high memory use + O(n log n)). If results are already time-ordered (or can be forced with a Flux sort(columns:["_time"])), consider aggregating in a single pass without building/sorting the full measurements list.
| # Start new group (advance to the group containing this point) | ||
| while epoch >= current_group_start + group_sec: | ||
| current_group_start += group_sec | ||
| current_group_values = [(ts, value)] |
There was a problem hiding this comment.
The loop that advances current_group_start one window at a time can take many iterations when there are large gaps between points and group_sec is small. You can jump directly to the correct window start using integer arithmetic (e.g., compute the number of whole windows to skip) to avoid potentially long loops.
| def _manual_aggregate_mean(tables, group_sec): | ||
| """ | ||
| Manually aggregate data into time groups and calculate mean for each group. | ||
| Used as workaround for InfluxDB 1.8.10 Flux bug with mean/aggregateWindow. | ||
| """ | ||
| # First, extract all measurements | ||
| measurements = [] | ||
| for table in tables: | ||
| for row in table.records: | ||
| timestamp = row.values['_time'] | ||
| value = row.values['_value'] | ||
| if value is not None: | ||
| measurements.append((timestamp, value)) | ||
|
|
||
| if not measurements: | ||
| return [] | ||
|
|
||
| # Sort by timestamp | ||
| measurements.sort(key=lambda x: x[0].timestamp() if hasattr(x[0], 'timestamp') else float(x[0])) | ||
|
|
||
| # Aggregate into time groups and calculate means | ||
| aggregated = [] | ||
| if measurements: | ||
| first_ts = measurements[0][0] | ||
| if hasattr(first_ts, 'timestamp'): | ||
| start_epoch = first_ts.timestamp() | ||
| else: | ||
| start_epoch = first_ts | ||
|
|
||
| current_group_start = start_epoch | ||
| current_group_values = [] | ||
|
|
||
| for ts, value in measurements: | ||
| if hasattr(ts, 'timestamp'): | ||
| epoch = ts.timestamp() | ||
| else: | ||
| epoch = ts | ||
|
|
||
| # Check if this measurement belongs to current group | ||
| if epoch < current_group_start + group_sec: | ||
| current_group_values.append((ts, value)) | ||
| else: | ||
| # Finalize current group | ||
| if current_group_values: | ||
| mean_val = sum(v for _, v in current_group_values) / len(current_group_values) | ||
| group_time = current_group_values[0][0] # Use first timestamp in group | ||
| aggregated.append(_ManualRecord(group_time, mean_val)) | ||
|
|
||
| # Start new group (advance to the group containing this point) | ||
| while epoch >= current_group_start + group_sec: | ||
| current_group_start += group_sec | ||
| current_group_values = [(ts, value)] | ||
|
|
||
| # Don't forget the last group | ||
| if current_group_values: | ||
| mean_val = sum(v for _, v in current_group_values) / len(current_group_values) | ||
| group_time = current_group_values[0][0] | ||
| aggregated.append(_ManualRecord(group_time, mean_val)) | ||
|
|
||
| return [_ManualTable(aggregated)] | ||
|
|
||
|
|
||
|
|
||
| def _manual_calculate_mean(tables): | ||
| """ | ||
| Manually calculate mean of all values in query results. | ||
| Used as workaround for InfluxDB 1.8.10 Flux bug with mean(). | ||
| """ | ||
| values = [] | ||
| last_time = None | ||
| for table in tables: | ||
| for row in table.records: | ||
| value = row.values['_value'] | ||
| if value is not None: | ||
| values.append(value) | ||
| last_time = row.values['_time'] | ||
|
|
||
| if not values: | ||
| return [] | ||
|
|
||
| mean_val = sum(values) / len(values) | ||
| return [_ManualTable([_ManualRecord(last_time, mean_val)])] | ||
|
|
||
|
|
||
| def _manual_calculate_sum(tables): | ||
| """ | ||
| Manually calculate sum of all values in query results. | ||
| Used as workaround for InfluxDB 1.8.10 Flux bug with sum(). | ||
| """ |
There was a problem hiding this comment.
New manual aggregation paths (_manual_aggregate_mean, _manual_calculate_mean, _manual_calculate_sum) aren’t covered by existing tests. Since the repo already runs pytest (including an InfluxDB integration test), add unit tests that feed synthetic table/record objects into these helpers to validate bucketing, mean/sum correctness, and empty-input behavior (especially for InfluxDB 1.x).
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
|
Honestly that's a pretty good review by Copilot, but at this point I no longer have access to my setup (it was a gift for a friend) so I can't apply or test changes. You could get AI to finish it or something, or just bin it :) For reference, I had a fan that needs around 15% uptime, but since it could only be either on or off, the average uptime was being substituted with the median and causing it to report either 100% uptime or 0%, constantly alternating on and off and ending up with an uncontrollable 50% uptime. So I think there is value in this (or a similar) fix, if the remaining issues can be resolved. All the best! |
Context:
InfluxDB 1.8.x has a crash bug if MEAN or SUM are used, so Mycodo works around the issue by substituting with MEDIAN and by manually summing respectively. InfluxDB 1.8.x is EoL, so these issues won't be fixed.
What this PR does:
For InfluxDB 1.8.x, querying the MEAN will now return the mean. There are minor code and logging improvements too, notably moving the manual calculation for SUM so that callers don't need to participate in the workaround.
Summary of changes: