Skip to content

feat(streaming): query cache#4636

Draft
tothandras wants to merge 3 commits into
mainfrom
feat/ch-query-cache
Draft

feat(streaming): query cache#4636
tothandras wants to merge 3 commits into
mainfrom
feat/ch-query-cache

Conversation

@tothandras

@tothandras tothandras commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

Summary by CodeRabbit

  • New Features
    • Added ClickHouse-backed meter query-result caching for eligible requests, including parity-check sampling and OpenTelemetry telemetry.
    • Introduced a cache opt-in flag and updated meter query endpoints and internal query flows to enable cached execution when eligible.
  • Bug Fixes
    • Improved cache safety with late-event invalidation and automatic fallback to live queries when caching isn’t reliable or is gated out.
  • Configuration
    • Added query-cache settings (disabled by default) with validation rules tied to decimal precision and async insert behavior.
  • Tests
    • Added unit and integration coverage for caching correctness, parity, invalidation, routing, and cache opt-out behavior.

@tothandras tothandras requested a review from a team as a code owner July 2, 2026 13:14
@tothandras tothandras added the release-note/misc Miscellaneous changes label Jul 2, 2026
@coderabbitai

coderabbitai Bot commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

📝 Walkthrough

Walkthrough

This PR adds a feature-gated ClickHouse meter query cache, including cache configuration, query opt-in flags, connector wiring, cache storage and invalidation, cached query execution, parity monitoring, and tests.

Changes

Meter query-result cache feature

Layer / File(s) Summary
Cache config and query opt-in
app/config/aggregation.go, app/config/config_test.go, app/config/testdata/complete.yaml, config.example.yaml, openmeter/streaming/query_params.go, openmeter/streaming/testutils/streaming.go, api/v3/handlers/meters/query*.go, openmeter/meter/httphandler/query*.go, openmeter/cost/adapter/adapter.go, openmeter/credit/balance/usage.go, openmeter/entitlement/metered/balance*.go, openmeter/billing/charges/usagebased/service/rating/service_test.go
Adds query-cache config and validation, adds QueryParams.Cachable, marks meter-related query paths cacheable, and keeps billing rating on the uncached path.
Connector wiring and feature gate
app/common/streaming.go, cmd/*/wire.go, cmd/*/wire_gen.go
Wires feature-gate, meter, and tracer dependencies into streaming connector construction across the app and worker binaries.
Connector cache routing and SQL helpers
openmeter/streaming/clickhouse/connector.go, openmeter/streaming/clickhouse/meter_query.go
Extends the ClickHouse connector with cache config, eligibility, table creation, query routing, batch invalidation, and shared meter SQL helpers.
Cache tables and planning
openmeter/streaming/clickhouse/meterqueryrow_cache.go, openmeter/streaming/clickhouse/meterqueryrow_cache_coverage.go, openmeter/streaming/clickhouse/meterqueryrow_cache_gate.go, openmeter/streaming/clickhouse/*_test.go
Adds the rollup and coverage table schemas, shape hashing, population planning, eligibility checks, and supporting unit tests.
Cached execution, monitoring, and parity tests
openmeter/streaming/clickhouse/meterqueryrow_cache_exec.go, openmeter/streaming/clickhouse/meterqueryrow_cache_monitor.go, openmeter/streaming/clickhouse/meterqueryrow_cache_query.go, openmeter/streaming/clickhouse/*_integration_test.go, openmeter/streaming/clickhouse/zz_audit_test.go
Adds cached query execution, invalidation, parity monitoring, cached SQL generation, and integration and audit tests for the cache path.

Estimated code review effort: 5 (Critical) | ~120 minutes

Sequence Diagram(s)

sequenceDiagram
  participant Handler as QueryMeter
  participant Connector as Connector.QueryMeter
  participant Gate as canQueryBeCached
  participant CacheExec as queryMeterCached
  participant Coverage as cache coverage table
  participant Rollup as meter query cache table

  Handler->>Connector: QueryMeter(params.Cachable=true)
  Connector->>Gate: evaluate cache eligibility
  Gate-->>Connector: eligible / not eligible
  alt eligible
    Connector->>CacheExec: queryMeterCached
    CacheExec->>Coverage: read coverage and invalidation markers
    CacheExec->>Coverage: store coverage / insert invalidation markers
    CacheExec->>Rollup: populate missing hour ranges
    CacheExec->>Rollup: execute cached UNION query
  else not eligible
    Connector->>Connector: queryMeter live path
  end
  Connector-->>Handler: meter query rows
Loading

Possibly related PRs

Suggested labels: kind/feature

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title is concise and accurately captures the main change: adding query caching to streaming.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/ch-query-cache

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

Comment thread openmeter/streaming/clickhouse/meterqueryrow_cache_query.go
Comment thread openmeter/streaming/clickhouse/meterqueryrow_cache.go
Comment thread openmeter/streaming/clickhouse/meterqueryrow_cache_coverage.go
Comment thread openmeter/streaming/clickhouse/meterqueryrow_cache_query.go
Comment thread openmeter/streaming/clickhouse/meterqueryrow_cache_query.go
Comment thread openmeter/streaming/clickhouse/meterqueryrow_cache.go
Comment thread openmeter/streaming/clickhouse/meterqueryrow_cache.go
Comment thread openmeter/streaming/clickhouse/meterqueryrow_cache.go
Comment thread openmeter/streaming/clickhouse/meterqueryrow_cache.go
Comment thread openmeter/streaming/clickhouse/meterqueryrow_cache_coverage.go
Comment thread openmeter/streaming/clickhouse/meterqueryrow_cache.go
Comment thread openmeter/streaming/clickhouse/meterqueryrow_cache.go
Comment thread openmeter/streaming/clickhouse/meterqueryrow_cache.go
Comment thread openmeter/streaming/clickhouse/meterqueryrow_cache.go
Comment thread openmeter/streaming/clickhouse/meterqueryrow_cache.go
Comment thread openmeter/streaming/clickhouse/meterqueryrow_cache_coverage.go
Comment thread openmeter/streaming/clickhouse/meterqueryrow_cache_query.go
Comment thread openmeter/streaming/clickhouse/meterqueryrow_cache_coverage.go
TestAuditLiveTenantBinding assumed the openmeter database and om_events
table exist (true on a dev machine, not in CI's fresh ClickHouse). The
test now creates its own database and both tables through the production
DDL builders, so it carries no environment dependency.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Comment thread openmeter/streaming/clickhouse/meterqueryrow_cache_query.go
@greptile-apps

greptile-apps Bot commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR introduces an optional meter query-result cache backed by a ClickHouse ReplacingMergeTree rollup table that stores pre-aggregated hourly windows. The cache serves the settled history of cacheable queries (SUM/COUNT/MIN/MAX, with decimal precision enabled) while the fresh tail is scanned live and merged in a single SQL statement.

  • New cache layer: Lazy population via INSERT…SELECT, per-namespace coverage tracking, parity shadow-verification, and feature-gate rollout control. Cache is off by default and requires decimal precision + sync async insert.
  • Invalidation: BatchInsert detects late events and wipes cached rollups for affected namespaces; markers are written before the DELETE so racing claim writers are defeated.
  • Opt-in callers: HTTP meter-query handlers and entitlement balance queries set Cachable = true; billing rating paths explicitly never do.

Confidence Score: 4/5

Safe to merge with one fix: findNamespacesToInvalidateCache lacks a QueryCacheEnabled guard, causing spurious ClickHouse queries and misleading invalidation metrics whenever events older than 24h arrive on a deployment where the cache is disabled (the default).

The cache logic itself — lazy population, coverage tracking, in-flight invalidation guard, parity shadow check, and the ReplacingMergeTree dedup design — is carefully implemented and well-tested. The one concrete defect is that findNamespacesToInvalidateCache is not gated on QueryCacheEnabled, so every BatchInsert with late events on a cache-disabled deployment fires 3 failed ClickHouse queries and increments the streaming.query_cache.invalidations metric even though the cache does not exist. A one-line guard at the top of that function fixes it completely.

openmeter/streaming/clickhouse/meterqueryrow_cache_exec.go — the findNamespacesToInvalidateCache function needs the QueryCacheEnabled guard.

Important Files Changed

Filename Overview
openmeter/streaming/clickhouse/meterqueryrow_cache_exec.go Core cache execution: lazy populate, coverage tracking, in-flight invalidation guard, and live-fallback on error. Missing QueryCacheEnabled guard in findNamespacesToInvalidateCache causes spurious ClickHouse queries and metrics when the cache is disabled.
openmeter/streaming/clickhouse/connector.go Wires QueryCache config into the Connector struct; BatchInsert now calls findNamespacesToInvalidateCache unconditionally, which runs even when QueryCacheEnabled=false.
openmeter/streaming/clickhouse/meterqueryrow_cache_gate.go canQueryBeCached gate logic: exhaustive guards for aggregation type, window size, timezone, subject/customer_id, feature gate. DST handling via day-boundary sampling is thorough.
openmeter/streaming/clickhouse/meterqueryrow_cache_query.go UNION-based merge query builder: cache leg (argMax collapse) + live legs (head and tail). filterStringWhere guards against empty WHERE fragment. Column/arg ordering is carefully documented and tested.
openmeter/streaming/clickhouse/meterqueryrow_cache_coverage.go Coverage claim DDL, planCachePopulation logic, and marker/claim insert/delete SQL. Trust window and clock-skew margin are well-reasoned.
openmeter/streaming/clickhouse/meterqueryrow_cache_monitor.go OTel metrics and shadow parity check. All metric methods are nil-receiver safe. Parity goroutine correctly detaches from request context with its own timeout.
openmeter/streaming/clickhouse/meterqueryrow_cache.go DDL for rollup table (ReplacingMergeTree) and helper types. meterShapeHash fingerprints the meter's extraction shape to orphan rows on definition changes.
app/config/aggregation.go Adds AggregationQueryCacheConfiguration with validation; enforces decimal precision and async insert wait requirements. Defaults QueryCache to disabled.
openmeter/billing/charges/usagebased/service/rating/service_test.go New test asserts billing rating path never sets Cachable=true — an important safety guard preventing billing quantities from being served from the cache.
openmeter/entitlement/metered/balance.go Sets params.Cachable = true for entitlement balance queries, opting them into the cache. Fresh tail is always live, so this is safe for entitlement accuracy.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant C as HTTP/Entitlement Caller
    participant Conn as Connector
    participant Gate as canQueryBeCached
    participant CH as ClickHouse

    C->>Conn: "QueryMeter(namespace, meter, params{Cachable:true})"
    Conn->>Gate: canQueryBeCached(namespace, meter, params)
    Gate-->>Conn: true (all guards pass)
    Conn->>CH: readMeterQueryRowCacheCoverage (plan)
    CH-->>Conn: coverage claim + invalidatedAt
    Note over Conn: planCachePopulation — missing ranges
    loop For each unpopulated range
        Conn->>CH: INSERT INTO cache SELECT FROM events (populate)
    end
    Conn->>CH: INSERT coverage claim
    Conn->>CH: SELECT merge (cache leg UNION live legs)
    CH-->>Conn: merged rows
    Conn->>CH: readMeterQueryRowCacheCoverage (post-merge guard)
    CH-->>Conn: latest invalidatedAt
    alt invalidated during read
        Conn->>CH: queryMeter (live fallback)
        CH-->>Conn: live rows
    end
    Conn-->>C: rows

    Note over Conn,CH: BatchInsert late-event invalidation
    C->>Conn: BatchInsert(events)
    Conn->>CH: INSERT events
    Conn->>CH: INSERT invalidation markers (coverage table)
    Conn->>CH: DELETE coverage claims
    Conn->>CH: DELETE rollup rows
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant C as HTTP/Entitlement Caller
    participant Conn as Connector
    participant Gate as canQueryBeCached
    participant CH as ClickHouse

    C->>Conn: "QueryMeter(namespace, meter, params{Cachable:true})"
    Conn->>Gate: canQueryBeCached(namespace, meter, params)
    Gate-->>Conn: true (all guards pass)
    Conn->>CH: readMeterQueryRowCacheCoverage (plan)
    CH-->>Conn: coverage claim + invalidatedAt
    Note over Conn: planCachePopulation — missing ranges
    loop For each unpopulated range
        Conn->>CH: INSERT INTO cache SELECT FROM events (populate)
    end
    Conn->>CH: INSERT coverage claim
    Conn->>CH: SELECT merge (cache leg UNION live legs)
    CH-->>Conn: merged rows
    Conn->>CH: readMeterQueryRowCacheCoverage (post-merge guard)
    CH-->>Conn: latest invalidatedAt
    alt invalidated during read
        Conn->>CH: queryMeter (live fallback)
        CH-->>Conn: live rows
    end
    Conn-->>C: rows

    Note over Conn,CH: BatchInsert late-event invalidation
    C->>Conn: BatchInsert(events)
    Conn->>CH: INSERT events
    Conn->>CH: INSERT invalidation markers (coverage table)
    Conn->>CH: DELETE coverage claims
    Conn->>CH: DELETE rollup rows
Loading

Reviews (2): Last reviewed commit: "fix(streaming): address query cache PR r..." | Re-trigger Greptile

Comment thread openmeter/streaming/clickhouse/meterqueryrow_cache_exec.go Outdated
Comment thread openmeter/streaming/clickhouse/meterqueryrow_cache_query.go Outdated

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
app/config/aggregation.go (1)

50-80: 🎯 Functional Correctness | 🟠 Major | ⚡ Quick win

Fail-fast Validate() skips later checks.

The new checks (query cache validation, decimal-precision requirement, async-insert-wait requirement) are appended to a fail-fast chain, so a config with multiple problems only ever surfaces the first one — someone fixing EventsTableName won't see the queryCache misconfiguration until the next run. As per coding guidelines, Validate() error methods should collect all issues into var errs []error and return them joined instead of failing fast.

♻️ Proposed refactor to collect all validation errors
 func (c AggregationConfiguration) Validate() error {
-	if err := c.ClickHouse.Validate(); err != nil {
-		return fmt.Errorf("clickhouse: %w", err)
-	}
-
-	if c.EventsTableName == "" {
-		return errors.New("events table is required")
-	}
-
-	if c.AsyncInsertWait && !c.AsyncInsert {
-		return errors.New("async insert wait is set but async insert is not")
-	}
-
-	if err := c.QueryCache.Validate(); err != nil {
-		return fmt.Errorf("query cache: %w", err)
-	}
-
-	if c.QueryCache.Enabled && !c.EnableDecimalPrecision {
-		return errors.New("query cache requires enableDecimalPrecision to be true")
-	}
-
-	// The cache's late-event invalidation must run after the event is visible
-	// to SELECTs; async insert without wait acks before the buffer flush, which
-	// would let a read cache rollups missing the event with no invalidation
-	// ever coming for it.
-	if c.QueryCache.Enabled && c.AsyncInsert && !c.AsyncInsertWait {
-		return errors.New("query cache requires asyncInsertWait when asyncInsert is enabled")
-	}
-
-	return nil
+	var errs []error
+
+	if err := c.ClickHouse.Validate(); err != nil {
+		errs = append(errs, fmt.Errorf("clickhouse: %w", err))
+	}
+
+	if c.EventsTableName == "" {
+		errs = append(errs, errors.New("events table is required"))
+	}
+
+	if c.AsyncInsertWait && !c.AsyncInsert {
+		errs = append(errs, errors.New("async insert wait is set but async insert is not"))
+	}
+
+	if err := c.QueryCache.Validate(); err != nil {
+		errs = append(errs, fmt.Errorf("query cache: %w", err))
+	}
+
+	if c.QueryCache.Enabled && !c.EnableDecimalPrecision {
+		errs = append(errs, errors.New("query cache requires enableDecimalPrecision to be true"))
+	}
+
+	// The cache's late-event invalidation must run after the event is visible
+	// to SELECTs; async insert without wait acks before the buffer flush, which
+	// would let a read cache rollups missing the event with no invalidation
+	// ever coming for it.
+	if c.QueryCache.Enabled && c.AsyncInsert && !c.AsyncInsertWait {
+		errs = append(errs, errors.New("query cache requires asyncInsertWait when asyncInsert is enabled"))
+	}
+
+	return errors.Join(errs...)
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/config/aggregation.go` around lines 50 - 80,
AggregationConfiguration.Validate currently returns on the first error, so
multiple misconfigurations are hidden until later runs. Update Validate to
accumulate all validation failures into a local error slice instead of failing
fast, including checks for ClickHouse, EventsTableName,
AsyncInsertWait/AsyncInsert, QueryCache.Validate, EnableDecimalPrecision, and
the asyncInsertWait requirement. Then return the combined errors at the end so
callers of AggregationConfiguration.Validate can see every issue at once.

Source: Coding guidelines

openmeter/streaming/clickhouse/connector.go (1)

90-139: 🩺 Stability & Availability | 🟠 Major | ⚡ Quick win

Missing FeatureGate nil-check when cache is enabled.

The QueryCacheEnabled validation block checks decimal precision, async-insert visibility, and duration/rate bounds, but never validates that c.FeatureGate is non-nil. Per the context snippet from meterqueryrow_cache_gate.go, canQueryBeCached unconditionally calls c.config.FeatureGate.Enabled(namespace, queryCacheFeatureFlag) — if FeatureGate is nil, that's a nil-pointer panic on every eligible meter query once the cache is enabled without a properly wired gate.

All current wire_gen.go call sites do construct a featureGateChecker, so production is safe today, but Validate() is exactly the place meant to catch this class of misconfiguration (e.g., direct construction in tests/tools, future call sites).

🛡️ Proposed fix
 	if c.QueryCacheEnabled {
 		if !c.EnableDecimalPrecision {
 			return fmt.Errorf("query cache requires decimal precision to be enabled")
 		}
+
+		if c.FeatureGate == nil {
+			return fmt.Errorf("query cache requires a feature gate checker")
+		}
 
 		// The late-event invalidation must run AFTER the event is visible to

Separately, this Validate() (including the newly added block) fails fast with individual returns rather than collecting all issues. As per coding guidelines, **/*.go Go Validate() error methods should collect all validation issues into var errs []error and return models.NewNillableGenericValidationError(errors.Join(errs...)) instead of failing fast. Since the whole method already predates this PR and follows fail-fast, I'd treat that broader cleanup as separate/optional, but worth keeping in mind for the next pass over this file.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@openmeter/streaming/clickhouse/connector.go` around lines 90 - 139, The
QueryCacheEnabled validation in Config.Validate is missing a nil check for
FeatureGate, which can lead to a nil-pointer panic when canQueryBeCached calls
FeatureGate.Enabled. Add a non-nil validation for c.FeatureGate alongside the
existing cache-related checks in Config.Validate, so misconfigured setups fail
fast before meter query caching is used.

Source: Coding guidelines

🧹 Nitpick comments (2)
openmeter/streaming/clickhouse/meterqueryrow_cache_parity_test.go (1)

281-281: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick win

Drop the local ptr wrapper.

Please use the repo’s preferred pointer-literal approach instead of adding a package helper that hides &v. As per coding guidelines, "do not add local wrappers like ptr, loPtr, must, or loMust when the standard helpers or lo already suffice."

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@openmeter/streaming/clickhouse/meterqueryrow_cache_parity_test.go` at line
281, The local ptr helper should be removed because it adds an unnecessary
wrapper around taking an address. Delete ptr[T any] and update the call sites in
meterqueryrow_cache_parity_test.go to use the repo’s preferred pointer-literal
approach or existing lo helpers directly, keeping the test code explicit without
introducing a new package-level helper.

Source: Coding guidelines

openmeter/streaming/clickhouse/meterqueryrow_cache_integration_test.go (1)

1184-1191: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick win

Assert the parity span outcome too.

The test description pins the outcome telemetry contract, but the assertions only check span names. If outcome=match|mismatch|error disappears, this test still passes. As per path instructions, "**/*_test.go: Make sure the tests are comprehensive and cover the changes."

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@openmeter/streaming/clickhouse/meterqueryrow_cache_integration_test.go`
around lines 1184 - 1191, The cached-query test only asserts span names, so it
can miss regressions in the parity span telemetry contract. Update the
assertions around the spanRecorder.Ended() checks in
meterqueryrow_cache_integration_test.go to also verify the
streaming.query_cache.parity_check span’s outcome value is one of the expected
states (match, mismatch, or error), using the existing parity-check span
inspection logic so the test covers both name and outcome.

Source: Path instructions

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@openmeter/streaming/clickhouse/meterqueryrow_cache_exec.go`:
- Around line 332-353: The cleanup in meterQueryRowCacheExec currently deletes
both coverage claims and rollup rows, which can race with an in-flight cached
query in the populate/merge flow. Update the cleanup path in
meterQueryRowCacheExec to stop removing rollup rows there and only clear the
dead claims/markers, leaving rollup retention to the next populate cycle so
active queries are not undercounted.

In `@openmeter/streaming/clickhouse/meterqueryrow_cache_gate.go`:
- Line 4: The cache gate currently rejects only customer_id, so unknown GroupBy
keys can still reach the cached SQL path and fail later in
queryCachedMeter.toSQL. Update the cacheability check in
meterqueryrow_cache_gate.go to validate every requested GroupBy key against the
supported set before returning true, and route any unknown key through the live
validation path instead of marking the query cacheable.

In `@openmeter/streaming/clickhouse/meterqueryrow_cache_monitor.go`:
- Line 229: The parity mismatch log in meterqueryrow_cache_monitor.go is
exposing raw row-key data through the diff value, which can leak
subject/customer/group-by identifiers. Update the logging in the parity check
paths that use logger.Error to keep the mismatch signal but sanitize diff into
field/index/value deltas instead of raw keys. Use the existing parity comparison
helpers around the cache validation flow to transform the reported mismatch
before logging, and apply the same fix to the other parity-mismatch logging site
mentioned in the diff.
- Around line 301-321: The parity key builder in the meter query row cache
monitor currently serializes optional fields so that nil and empty strings look
identical, which can hide shadow mismatches. Update the key construction logic
in the parity key method on meterqueryrow_cache_monitor to encode an explicit
nil/empty distinction for Subject, CustomerID, and each GroupBy value, using a
dedicated marker or length-prefixed optional encoding while preserving the
existing stable ordering from the GroupBy key sort.
- Around line 231-237: The self-healing invalidation path in
meterqueryrow_cache_monitor.go is reusing the shadow request context, so the
invalidate call can inherit the live query’s deadline and get canceled too
early. Update the parity-mismatch branch around invalidateMeterQueryRowCache to
derive a fresh bounded context from context.WithoutCancel(ctx) with its own
timeout budget before calling invalidateMeterQueryRowCache, and keep the
existing error handling and metrics in the same flow.

In `@openmeter/streaming/clickhouse/meterqueryrow_cache_parity_test.go`:
- Around line 194-213: The parity key builder in the test comparator currently
collapses nil and empty values for Subject, CustomerID, and GroupBy entries, so
update the key generation logic in this helper to encode nullability explicitly.
Modify the existing parity-key construction around the Subject/CustomerID
handling and the GroupBy loop so nil pointers and empty strings produce
different byte sequences, using a dedicated nil marker or length prefix for each
nullable field.

---

Outside diff comments:
In `@app/config/aggregation.go`:
- Around line 50-80: AggregationConfiguration.Validate currently returns on the
first error, so multiple misconfigurations are hidden until later runs. Update
Validate to accumulate all validation failures into a local error slice instead
of failing fast, including checks for ClickHouse, EventsTableName,
AsyncInsertWait/AsyncInsert, QueryCache.Validate, EnableDecimalPrecision, and
the asyncInsertWait requirement. Then return the combined errors at the end so
callers of AggregationConfiguration.Validate can see every issue at once.

In `@openmeter/streaming/clickhouse/connector.go`:
- Around line 90-139: The QueryCacheEnabled validation in Config.Validate is
missing a nil check for FeatureGate, which can lead to a nil-pointer panic when
canQueryBeCached calls FeatureGate.Enabled. Add a non-nil validation for
c.FeatureGate alongside the existing cache-related checks in Config.Validate, so
misconfigured setups fail fast before meter query caching is used.

---

Nitpick comments:
In `@openmeter/streaming/clickhouse/meterqueryrow_cache_integration_test.go`:
- Around line 1184-1191: The cached-query test only asserts span names, so it
can miss regressions in the parity span telemetry contract. Update the
assertions around the spanRecorder.Ended() checks in
meterqueryrow_cache_integration_test.go to also verify the
streaming.query_cache.parity_check span’s outcome value is one of the expected
states (match, mismatch, or error), using the existing parity-check span
inspection logic so the test covers both name and outcome.

In `@openmeter/streaming/clickhouse/meterqueryrow_cache_parity_test.go`:
- Line 281: The local ptr helper should be removed because it adds an
unnecessary wrapper around taking an address. Delete ptr[T any] and update the
call sites in meterqueryrow_cache_parity_test.go to use the repo’s preferred
pointer-literal approach or existing lo helpers directly, keeping the test code
explicit without introducing a new package-level helper.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 05b4063a-2581-4177-a112-6446ce7f7256

📥 Commits

Reviewing files that changed from the base of the PR and between fc2ec36 and c523711.

📒 Files selected for processing (38)
  • api/v3/handlers/meters/query.go
  • api/v3/handlers/meters/query_csv.go
  • app/common/streaming.go
  • app/config/aggregation.go
  • app/config/config_test.go
  • app/config/testdata/complete.yaml
  • cmd/balance-worker/wire.go
  • cmd/balance-worker/wire_gen.go
  • cmd/billing-worker/wire_gen.go
  • cmd/jobs/internal/wire_gen.go
  • cmd/notification-service/wire.go
  • cmd/notification-service/wire_gen.go
  • cmd/server/wire_gen.go
  • cmd/sink-worker/wire.go
  • cmd/sink-worker/wire_gen.go
  • config.example.yaml
  • openmeter/billing/charges/usagebased/service/rating/service_test.go
  • openmeter/cost/adapter/adapter.go
  • openmeter/credit/balance/usage.go
  • openmeter/entitlement/metered/balance.go
  • openmeter/entitlement/metered/balance_test.go
  • openmeter/meter/httphandler/query.go
  • openmeter/meter/httphandler/query_csv.go
  • openmeter/streaming/clickhouse/connector.go
  • openmeter/streaming/clickhouse/meter_query.go
  • openmeter/streaming/clickhouse/meterqueryrow_cache.go
  • openmeter/streaming/clickhouse/meterqueryrow_cache_coverage.go
  • openmeter/streaming/clickhouse/meterqueryrow_cache_coverage_test.go
  • openmeter/streaming/clickhouse/meterqueryrow_cache_exec.go
  • openmeter/streaming/clickhouse/meterqueryrow_cache_gate.go
  • openmeter/streaming/clickhouse/meterqueryrow_cache_integration_test.go
  • openmeter/streaming/clickhouse/meterqueryrow_cache_monitor.go
  • openmeter/streaming/clickhouse/meterqueryrow_cache_parity_test.go
  • openmeter/streaming/clickhouse/meterqueryrow_cache_query.go
  • openmeter/streaming/clickhouse/meterqueryrow_cache_test.go
  • openmeter/streaming/clickhouse/zz_audit_test.go
  • openmeter/streaming/query_params.go
  • openmeter/streaming/testutils/streaming.go

Comment thread openmeter/streaming/clickhouse/meterqueryrow_cache_exec.go
Comment thread openmeter/streaming/clickhouse/meterqueryrow_cache_gate.go
Comment thread openmeter/streaming/clickhouse/meterqueryrow_cache_monitor.go
Comment thread openmeter/streaming/clickhouse/meterqueryrow_cache_monitor.go Outdated
Comment thread openmeter/streaming/clickhouse/meterqueryrow_cache_monitor.go Outdated
Comment thread openmeter/streaming/clickhouse/meterqueryrow_cache_parity_test.go Outdated
CodeRabbit:
- Post-merge invalidation guard: a namespace invalidation landing between
  a cached read's populate and its merge SELECT could blank the settled
  range for that one response (the marker only orders plans). The read
  now re-checks the marker after the merge and falls back to the live
  query when it moved.
- Gate unknown GroupBy keys to the live path so they surface as the live
  path's validation error instead of a cached-path SQL-build error.
- Parity mismatch logs name the differing fields instead of dumping raw
  row keys (subject/customer identifiers stay out of error logs).
- Self-healing invalidation after a parity mismatch runs on its own
  bounded context instead of the shadow check's possibly-exhausted one.
- Parity comparison keys distinguish nil from empty-string values
  (monitor and test comparator).

Greptile:
- ClickHouse UNKNOWN_TABLE detection uses the typed exception with a
  string fallback for transports that wrap server errors as text.
- filterStringWhere fails loudly when the WHERE fragment cannot be
  extracted instead of silently dropping the filter.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Comment thread openmeter/streaming/clickhouse/meterqueryrow_cache_query.go
Comment thread openmeter/streaming/clickhouse/meterqueryrow_cache_query.go

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
openmeter/streaming/clickhouse/meterqueryrow_cache_exec.go (1)

351-399: 🗄️ Data Integrity & Integration | 🟠 Major | ⚡ Quick win

Only delete rollup rows after the marker insert succeeds.

Nice post-merge guard, but the fallback path still has a gap: if the marker INSERT fails and the claim-delete fallback succeeds, Step 2 still deletes rollup rows. In-flight cached reads planned before that failure won’t see a newer marker in invalidatedSince, so they can merge after rows were deleted and return undercounted data. Keep claim cleanup, but skip rollup cleanup unless the marker was committed.

Suggested patch
 	markerSQL, markerArgs := markers.toSQL()
+	markerInserted := true
 	if err := c.config.ClickHouse.Exec(ctx, markerSQL, markerArgs...); err != nil {
+		markerInserted = false
 		// A missing table (code 60) means nothing is cached: invalidation is
 		// always-on even with the cache disabled.
 		if !isUnknownTableError(err) {
@@
 	claimsSQL, claimsArgs := deleteMeterQueryRowCacheCoverageClaims{
 		Database:   c.config.Database,
 		TableName:  meterQueryRowCacheCoverageTable,
 		Namespaces: namespaces,
 	}.toSQL()
 
-	rollupSQL, rollupArgs := deleteMeterQueryRowCacheForNamespaces{
-		Database:   c.config.Database,
-		TableName:  meterQueryRowCacheTable,
-		Namespaces: namespaces,
-	}.toSQL()
-
-	for _, cleanup := range []struct {
+	type cleanupStmt struct {
 		sql  string
 		args []interface{}
-	}{
+	}
+	cleanups := []cleanupStmt{
 		{claimsSQL, claimsArgs},
-		{rollupSQL, rollupArgs},
-	} {
+	}
+	if markerInserted {
+		rollupSQL, rollupArgs := deleteMeterQueryRowCacheForNamespaces{
+			Database:   c.config.Database,
+			TableName:  meterQueryRowCacheTable,
+			Namespaces: namespaces,
+		}.toSQL()
+		cleanups = append(cleanups, cleanupStmt{rollupSQL, rollupArgs})
+	}
+
+	for _, cleanup := range cleanups {

This is the same rollup-delete race class as the earlier review note, now narrowed to the marker-insert fallback path.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@openmeter/streaming/clickhouse/meterqueryrow_cache_exec.go` around lines 351
- 399, The cleanup path in `invalidateMeterQueryRowCache` still deletes rollup
rows even when the marker insert fails and only the claim-delete fallback
succeeds. Keep the existing fallback claim cleanup, but gate the Step 2 rollup
deletion loop so it runs only after the marker write has actually committed
successfully; use the marker insert result and the
`deleteMeterQueryRowCacheCoverageClaims` /
`deleteMeterQueryRowCacheForNamespaces` flow to place that guard correctly.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Duplicate comments:
In `@openmeter/streaming/clickhouse/meterqueryrow_cache_exec.go`:
- Around line 351-399: The cleanup path in `invalidateMeterQueryRowCache` still
deletes rollup rows even when the marker insert fails and only the claim-delete
fallback succeeds. Keep the existing fallback claim cleanup, but gate the Step 2
rollup deletion loop so it runs only after the marker write has actually
committed successfully; use the marker insert result and the
`deleteMeterQueryRowCacheCoverageClaims` /
`deleteMeterQueryRowCacheForNamespaces` flow to place that guard correctly.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 43c505a7-9232-4972-94cf-985d68a987d9

📥 Commits

Reviewing files that changed from the base of the PR and between 72ae914 and 92f66cc.

📒 Files selected for processing (7)
  • openmeter/streaming/clickhouse/meterqueryrow_cache_exec.go
  • openmeter/streaming/clickhouse/meterqueryrow_cache_gate.go
  • openmeter/streaming/clickhouse/meterqueryrow_cache_integration_test.go
  • openmeter/streaming/clickhouse/meterqueryrow_cache_monitor.go
  • openmeter/streaming/clickhouse/meterqueryrow_cache_parity_test.go
  • openmeter/streaming/clickhouse/meterqueryrow_cache_query.go
  • openmeter/streaming/clickhouse/meterqueryrow_cache_test.go
🚧 Files skipped from review as they are similar to previous changes (4)
  • openmeter/streaming/clickhouse/meterqueryrow_cache_test.go
  • openmeter/streaming/clickhouse/meterqueryrow_cache_query.go
  • openmeter/streaming/clickhouse/meterqueryrow_cache_parity_test.go
  • openmeter/streaming/clickhouse/meterqueryrow_cache_gate.go

@tothandras tothandras marked this pull request as draft July 2, 2026 22:11
Comment on lines +86 to +87
// FeatureGate gates the query cache per namespace.
FeatureGate *featuregate.FeatureGateChecker

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I would love to see FF information being attached to Context resolved by a HTTP middleware instead of wiring the FeatureGateChecker service into each and every service.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use ContextResolver instead which does what I meantioned above.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

release-note/misc Miscellaneous changes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants