Skip to content

Fix mean and sum on influxdb 1.x#1510

Open
20chimps wants to merge 2 commits into
kizniche:masterfrom
20chimps:fix_influxdb_1x_mean
Open

Fix mean and sum on influxdb 1.x#1510
20chimps wants to merge 2 commits into
kizniche:masterfrom
20chimps:fix_influxdb_1x_mean

Conversation

@20chimps
Copy link
Copy Markdown

@20chimps 20chimps commented Feb 2, 2026

Context:
InfluxDB 1.8.x has a crash bug if MEAN or SUM are used, so Mycodo works around the issue by substituting with MEDIAN and by manually summing respectively. InfluxDB 1.8.x is EoL, so these issues won't be fixed.

What this PR does:
For InfluxDB 1.8.x, querying the MEAN will now return the mean. There are minor code and logging improvements too, notably moving the manual calculation for SUM so that callers don't need to participate in the workaround.

Summary of changes:

  1. Swaps the MEDIAN substitution with manually calculating the MEAN. If aggregateWindow is used, the aggregated results are simulated too.
  2. Moves the workarounds to occur within the query execution method (query_flux), that way the callers don't need to handle half the workaround themselves (previous SUM query behavior).
  3. Demotes related error logs to debug logs. For an error that won't be officially fixed, something like this may be all it gets.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR updates Mycodo’s Flux query handling to avoid InfluxDB 1.8.x crash bugs around MEAN/SUM, by performing those aggregates (and aggregateWindow-style grouping) in Python for InfluxDB 1.x while keeping native Flux aggregates for InfluxDB 2.x.

Changes:

  • Replace the prior median substitution with manual mean computation for InfluxDB 1.x (including a manual grouped-mean path when group_sec is used).
  • Move the InfluxDB 1.x SUM workaround fully into query_flux() so callers no longer need to manually sum.
  • Reduce log severity for known/expected InfluxDB 1.x aggregate issues and simplify downstream consumers (output_sec_on, average_*, sum_past_seconds).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread mycodo/utils/influx.py
Comment on lines +652 to +660
first_ts = measurements[0][0]
if hasattr(first_ts, 'timestamp'):
start_epoch = first_ts.timestamp()
else:
start_epoch = first_ts

current_group_start = start_epoch
current_group_values = []

Copy link

Copilot AI Mar 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_manual_aggregate_mean() bins windows starting at the first returned timestamp (current_group_start = start_epoch) and uses the first point’s timestamp as the group time. This does not match Flux aggregateWindow() semantics (windows are aligned to fixed boundaries and _time is typically a window boundary), so 1.x and 2.x grouped results can differ. Align buckets to a consistent epoch boundary (optionally supporting an offset) and set _time to the window start/stop to better emulate aggregateWindow().

Copilot uses AI. Check for mistakes.
Comment thread mycodo/utils/influx.py
Comment on lines +634 to +648
# First, extract all measurements
measurements = []
for table in tables:
for row in table.records:
timestamp = row.values['_time']
value = row.values['_value']
if value is not None:
measurements.append((timestamp, value))

if not measurements:
return []

# Sort by timestamp
measurements.sort(key=lambda x: x[0].timestamp() if hasattr(x[0], 'timestamp') else float(x[0]))

Copy link

Copilot AI Mar 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_manual_aggregate_mean() materializes all rows into a list and then sorts them, which can be very expensive for large queries (high memory use + O(n log n)). If results are already time-ordered (or can be forced with a Flux sort(columns:["_time"])), consider aggregating in a single pass without building/sorting the full measurements list.

Copilot uses AI. Check for mistakes.
Comment thread mycodo/utils/influx.py
Comment on lines +677 to +680
# Start new group (advance to the group containing this point)
while epoch >= current_group_start + group_sec:
current_group_start += group_sec
current_group_values = [(ts, value)]
Copy link

Copilot AI Mar 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The loop that advances current_group_start one window at a time can take many iterations when there are large gaps between points and group_sec is small. You can jump directly to the correct window start using integer arithmetic (e.g., compute the number of whole windows to skip) to avoid potentially long loops.

Copilot uses AI. Check for mistakes.
Comment thread mycodo/utils/influx.py
Comment on lines +629 to +717
def _manual_aggregate_mean(tables, group_sec):
"""
Manually aggregate data into time groups and calculate mean for each group.
Used as workaround for InfluxDB 1.8.10 Flux bug with mean/aggregateWindow.
"""
# First, extract all measurements
measurements = []
for table in tables:
for row in table.records:
timestamp = row.values['_time']
value = row.values['_value']
if value is not None:
measurements.append((timestamp, value))

if not measurements:
return []

# Sort by timestamp
measurements.sort(key=lambda x: x[0].timestamp() if hasattr(x[0], 'timestamp') else float(x[0]))

# Aggregate into time groups and calculate means
aggregated = []
if measurements:
first_ts = measurements[0][0]
if hasattr(first_ts, 'timestamp'):
start_epoch = first_ts.timestamp()
else:
start_epoch = first_ts

current_group_start = start_epoch
current_group_values = []

for ts, value in measurements:
if hasattr(ts, 'timestamp'):
epoch = ts.timestamp()
else:
epoch = ts

# Check if this measurement belongs to current group
if epoch < current_group_start + group_sec:
current_group_values.append((ts, value))
else:
# Finalize current group
if current_group_values:
mean_val = sum(v for _, v in current_group_values) / len(current_group_values)
group_time = current_group_values[0][0] # Use first timestamp in group
aggregated.append(_ManualRecord(group_time, mean_val))

# Start new group (advance to the group containing this point)
while epoch >= current_group_start + group_sec:
current_group_start += group_sec
current_group_values = [(ts, value)]

# Don't forget the last group
if current_group_values:
mean_val = sum(v for _, v in current_group_values) / len(current_group_values)
group_time = current_group_values[0][0]
aggregated.append(_ManualRecord(group_time, mean_val))

return [_ManualTable(aggregated)]



def _manual_calculate_mean(tables):
"""
Manually calculate mean of all values in query results.
Used as workaround for InfluxDB 1.8.10 Flux bug with mean().
"""
values = []
last_time = None
for table in tables:
for row in table.records:
value = row.values['_value']
if value is not None:
values.append(value)
last_time = row.values['_time']

if not values:
return []

mean_val = sum(values) / len(values)
return [_ManualTable([_ManualRecord(last_time, mean_val)])]


def _manual_calculate_sum(tables):
"""
Manually calculate sum of all values in query results.
Used as workaround for InfluxDB 1.8.10 Flux bug with sum().
"""
Copy link

Copilot AI Mar 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New manual aggregation paths (_manual_aggregate_mean, _manual_calculate_mean, _manual_calculate_sum) aren’t covered by existing tests. Since the repo already runs pytest (including an InfluxDB integration test), add unit tests that feed synthetic table/record objects into these helpers to validate bucketing, mean/sum correctness, and empty-input behavior (especially for InfluxDB 1.x).

Copilot uses AI. Check for mistakes.
Comment thread mycodo/utils/influx.py Outdated
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
@20chimps
Copy link
Copy Markdown
Author

Honestly that's a pretty good review by Copilot, but at this point I no longer have access to my setup (it was a gift for a friend) so I can't apply or test changes. You could get AI to finish it or something, or just bin it :)

For reference, I had a fan that needs around 15% uptime, but since it could only be either on or off, the average uptime was being substituted with the median and causing it to report either 100% uptime or 0%, constantly alternating on and off and ending up with an uncontrollable 50% uptime. So I think there is value in this (or a similar) fix, if the remaining issues can be resolved.

All the best!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants