feat(streaming): query cache#4636
Conversation
📝 WalkthroughWalkthroughThis PR adds a feature-gated ClickHouse meter query cache, including cache configuration, query opt-in flags, connector wiring, cache storage and invalidation, cached query execution, parity monitoring, and tests. ChangesMeter query-result cache feature
Estimated code review effort: 5 (Critical) | ~120 minutes Sequence Diagram(s)sequenceDiagram
participant Handler as QueryMeter
participant Connector as Connector.QueryMeter
participant Gate as canQueryBeCached
participant CacheExec as queryMeterCached
participant Coverage as cache coverage table
participant Rollup as meter query cache table
Handler->>Connector: QueryMeter(params.Cachable=true)
Connector->>Gate: evaluate cache eligibility
Gate-->>Connector: eligible / not eligible
alt eligible
Connector->>CacheExec: queryMeterCached
CacheExec->>Coverage: read coverage and invalidation markers
CacheExec->>Coverage: store coverage / insert invalidation markers
CacheExec->>Rollup: populate missing hour ranges
CacheExec->>Rollup: execute cached UNION query
else not eligible
Connector->>Connector: queryMeter live path
end
Connector-->>Handler: meter query rows
Possibly related PRs
Suggested labels: 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
TestAuditLiveTenantBinding assumed the openmeter database and om_events table exist (true on a dev machine, not in CI's fresh ClickHouse). The test now creates its own database and both tables through the production DDL builders, so it carries no environment dependency. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Greptile SummaryThis PR introduces an optional meter query-result cache backed by a ClickHouse
Confidence Score: 4/5Safe to merge with one fix: findNamespacesToInvalidateCache lacks a QueryCacheEnabled guard, causing spurious ClickHouse queries and misleading invalidation metrics whenever events older than 24h arrive on a deployment where the cache is disabled (the default). The cache logic itself — lazy population, coverage tracking, in-flight invalidation guard, parity shadow check, and the ReplacingMergeTree dedup design — is carefully implemented and well-tested. The one concrete defect is that findNamespacesToInvalidateCache is not gated on QueryCacheEnabled, so every BatchInsert with late events on a cache-disabled deployment fires 3 failed ClickHouse queries and increments the streaming.query_cache.invalidations metric even though the cache does not exist. A one-line guard at the top of that function fixes it completely. openmeter/streaming/clickhouse/meterqueryrow_cache_exec.go — the findNamespacesToInvalidateCache function needs the QueryCacheEnabled guard. Important Files Changed
|
There was a problem hiding this comment.
Actionable comments posted: 6
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
app/config/aggregation.go (1)
50-80: 🎯 Functional Correctness | 🟠 Major | ⚡ Quick winFail-fast
Validate()skips later checks.The new checks (query cache validation, decimal-precision requirement, async-insert-wait requirement) are appended to a fail-fast chain, so a config with multiple problems only ever surfaces the first one — someone fixing
EventsTableNamewon't see thequeryCachemisconfiguration until the next run. As per coding guidelines,Validate() errormethods should collect all issues intovar errs []errorand return them joined instead of failing fast.♻️ Proposed refactor to collect all validation errors
func (c AggregationConfiguration) Validate() error { - if err := c.ClickHouse.Validate(); err != nil { - return fmt.Errorf("clickhouse: %w", err) - } - - if c.EventsTableName == "" { - return errors.New("events table is required") - } - - if c.AsyncInsertWait && !c.AsyncInsert { - return errors.New("async insert wait is set but async insert is not") - } - - if err := c.QueryCache.Validate(); err != nil { - return fmt.Errorf("query cache: %w", err) - } - - if c.QueryCache.Enabled && !c.EnableDecimalPrecision { - return errors.New("query cache requires enableDecimalPrecision to be true") - } - - // The cache's late-event invalidation must run after the event is visible - // to SELECTs; async insert without wait acks before the buffer flush, which - // would let a read cache rollups missing the event with no invalidation - // ever coming for it. - if c.QueryCache.Enabled && c.AsyncInsert && !c.AsyncInsertWait { - return errors.New("query cache requires asyncInsertWait when asyncInsert is enabled") - } - - return nil + var errs []error + + if err := c.ClickHouse.Validate(); err != nil { + errs = append(errs, fmt.Errorf("clickhouse: %w", err)) + } + + if c.EventsTableName == "" { + errs = append(errs, errors.New("events table is required")) + } + + if c.AsyncInsertWait && !c.AsyncInsert { + errs = append(errs, errors.New("async insert wait is set but async insert is not")) + } + + if err := c.QueryCache.Validate(); err != nil { + errs = append(errs, fmt.Errorf("query cache: %w", err)) + } + + if c.QueryCache.Enabled && !c.EnableDecimalPrecision { + errs = append(errs, errors.New("query cache requires enableDecimalPrecision to be true")) + } + + // The cache's late-event invalidation must run after the event is visible + // to SELECTs; async insert without wait acks before the buffer flush, which + // would let a read cache rollups missing the event with no invalidation + // ever coming for it. + if c.QueryCache.Enabled && c.AsyncInsert && !c.AsyncInsertWait { + errs = append(errs, errors.New("query cache requires asyncInsertWait when asyncInsert is enabled")) + } + + return errors.Join(errs...) }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@app/config/aggregation.go` around lines 50 - 80, AggregationConfiguration.Validate currently returns on the first error, so multiple misconfigurations are hidden until later runs. Update Validate to accumulate all validation failures into a local error slice instead of failing fast, including checks for ClickHouse, EventsTableName, AsyncInsertWait/AsyncInsert, QueryCache.Validate, EnableDecimalPrecision, and the asyncInsertWait requirement. Then return the combined errors at the end so callers of AggregationConfiguration.Validate can see every issue at once.Source: Coding guidelines
openmeter/streaming/clickhouse/connector.go (1)
90-139: 🩺 Stability & Availability | 🟠 Major | ⚡ Quick winMissing
FeatureGatenil-check when cache is enabled.The
QueryCacheEnabledvalidation block checks decimal precision, async-insert visibility, and duration/rate bounds, but never validates thatc.FeatureGateis non-nil. Per the context snippet frommeterqueryrow_cache_gate.go,canQueryBeCachedunconditionally callsc.config.FeatureGate.Enabled(namespace, queryCacheFeatureFlag)— ifFeatureGateis nil, that's a nil-pointer panic on every eligible meter query once the cache is enabled without a properly wired gate.All current
wire_gen.gocall sites do construct afeatureGateChecker, so production is safe today, butValidate()is exactly the place meant to catch this class of misconfiguration (e.g., direct construction in tests/tools, future call sites).🛡️ Proposed fix
if c.QueryCacheEnabled { if !c.EnableDecimalPrecision { return fmt.Errorf("query cache requires decimal precision to be enabled") } + + if c.FeatureGate == nil { + return fmt.Errorf("query cache requires a feature gate checker") + } // The late-event invalidation must run AFTER the event is visible toSeparately, this
Validate()(including the newly added block) fails fast with individualreturns rather than collecting all issues. As per coding guidelines,**/*.goGoValidate() errormethods should collect all validation issues intovar errs []errorand returnmodels.NewNillableGenericValidationError(errors.Join(errs...))instead of failing fast. Since the whole method already predates this PR and follows fail-fast, I'd treat that broader cleanup as separate/optional, but worth keeping in mind for the next pass over this file.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@openmeter/streaming/clickhouse/connector.go` around lines 90 - 139, The QueryCacheEnabled validation in Config.Validate is missing a nil check for FeatureGate, which can lead to a nil-pointer panic when canQueryBeCached calls FeatureGate.Enabled. Add a non-nil validation for c.FeatureGate alongside the existing cache-related checks in Config.Validate, so misconfigured setups fail fast before meter query caching is used.Source: Coding guidelines
🧹 Nitpick comments (2)
openmeter/streaming/clickhouse/meterqueryrow_cache_parity_test.go (1)
281-281: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winDrop the local
ptrwrapper.Please use the repo’s preferred pointer-literal approach instead of adding a package helper that hides
&v. As per coding guidelines, "do not add local wrappers likeptr,loPtr,must, orloMustwhen the standard helpers orloalready suffice."🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@openmeter/streaming/clickhouse/meterqueryrow_cache_parity_test.go` at line 281, The local ptr helper should be removed because it adds an unnecessary wrapper around taking an address. Delete ptr[T any] and update the call sites in meterqueryrow_cache_parity_test.go to use the repo’s preferred pointer-literal approach or existing lo helpers directly, keeping the test code explicit without introducing a new package-level helper.Source: Coding guidelines
openmeter/streaming/clickhouse/meterqueryrow_cache_integration_test.go (1)
1184-1191: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winAssert the parity span outcome too.
The test description pins the
outcometelemetry contract, but the assertions only check span names. Ifoutcome=match|mismatch|errordisappears, this test still passes. As per path instructions, "**/*_test.go: Make sure the tests are comprehensive and cover the changes."🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@openmeter/streaming/clickhouse/meterqueryrow_cache_integration_test.go` around lines 1184 - 1191, The cached-query test only asserts span names, so it can miss regressions in the parity span telemetry contract. Update the assertions around the spanRecorder.Ended() checks in meterqueryrow_cache_integration_test.go to also verify the streaming.query_cache.parity_check span’s outcome value is one of the expected states (match, mismatch, or error), using the existing parity-check span inspection logic so the test covers both name and outcome.Source: Path instructions
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@openmeter/streaming/clickhouse/meterqueryrow_cache_exec.go`:
- Around line 332-353: The cleanup in meterQueryRowCacheExec currently deletes
both coverage claims and rollup rows, which can race with an in-flight cached
query in the populate/merge flow. Update the cleanup path in
meterQueryRowCacheExec to stop removing rollup rows there and only clear the
dead claims/markers, leaving rollup retention to the next populate cycle so
active queries are not undercounted.
In `@openmeter/streaming/clickhouse/meterqueryrow_cache_gate.go`:
- Line 4: The cache gate currently rejects only customer_id, so unknown GroupBy
keys can still reach the cached SQL path and fail later in
queryCachedMeter.toSQL. Update the cacheability check in
meterqueryrow_cache_gate.go to validate every requested GroupBy key against the
supported set before returning true, and route any unknown key through the live
validation path instead of marking the query cacheable.
In `@openmeter/streaming/clickhouse/meterqueryrow_cache_monitor.go`:
- Line 229: The parity mismatch log in meterqueryrow_cache_monitor.go is
exposing raw row-key data through the diff value, which can leak
subject/customer/group-by identifiers. Update the logging in the parity check
paths that use logger.Error to keep the mismatch signal but sanitize diff into
field/index/value deltas instead of raw keys. Use the existing parity comparison
helpers around the cache validation flow to transform the reported mismatch
before logging, and apply the same fix to the other parity-mismatch logging site
mentioned in the diff.
- Around line 301-321: The parity key builder in the meter query row cache
monitor currently serializes optional fields so that nil and empty strings look
identical, which can hide shadow mismatches. Update the key construction logic
in the parity key method on meterqueryrow_cache_monitor to encode an explicit
nil/empty distinction for Subject, CustomerID, and each GroupBy value, using a
dedicated marker or length-prefixed optional encoding while preserving the
existing stable ordering from the GroupBy key sort.
- Around line 231-237: The self-healing invalidation path in
meterqueryrow_cache_monitor.go is reusing the shadow request context, so the
invalidate call can inherit the live query’s deadline and get canceled too
early. Update the parity-mismatch branch around invalidateMeterQueryRowCache to
derive a fresh bounded context from context.WithoutCancel(ctx) with its own
timeout budget before calling invalidateMeterQueryRowCache, and keep the
existing error handling and metrics in the same flow.
In `@openmeter/streaming/clickhouse/meterqueryrow_cache_parity_test.go`:
- Around line 194-213: The parity key builder in the test comparator currently
collapses nil and empty values for Subject, CustomerID, and GroupBy entries, so
update the key generation logic in this helper to encode nullability explicitly.
Modify the existing parity-key construction around the Subject/CustomerID
handling and the GroupBy loop so nil pointers and empty strings produce
different byte sequences, using a dedicated nil marker or length prefix for each
nullable field.
---
Outside diff comments:
In `@app/config/aggregation.go`:
- Around line 50-80: AggregationConfiguration.Validate currently returns on the
first error, so multiple misconfigurations are hidden until later runs. Update
Validate to accumulate all validation failures into a local error slice instead
of failing fast, including checks for ClickHouse, EventsTableName,
AsyncInsertWait/AsyncInsert, QueryCache.Validate, EnableDecimalPrecision, and
the asyncInsertWait requirement. Then return the combined errors at the end so
callers of AggregationConfiguration.Validate can see every issue at once.
In `@openmeter/streaming/clickhouse/connector.go`:
- Around line 90-139: The QueryCacheEnabled validation in Config.Validate is
missing a nil check for FeatureGate, which can lead to a nil-pointer panic when
canQueryBeCached calls FeatureGate.Enabled. Add a non-nil validation for
c.FeatureGate alongside the existing cache-related checks in Config.Validate, so
misconfigured setups fail fast before meter query caching is used.
---
Nitpick comments:
In `@openmeter/streaming/clickhouse/meterqueryrow_cache_integration_test.go`:
- Around line 1184-1191: The cached-query test only asserts span names, so it
can miss regressions in the parity span telemetry contract. Update the
assertions around the spanRecorder.Ended() checks in
meterqueryrow_cache_integration_test.go to also verify the
streaming.query_cache.parity_check span’s outcome value is one of the expected
states (match, mismatch, or error), using the existing parity-check span
inspection logic so the test covers both name and outcome.
In `@openmeter/streaming/clickhouse/meterqueryrow_cache_parity_test.go`:
- Line 281: The local ptr helper should be removed because it adds an
unnecessary wrapper around taking an address. Delete ptr[T any] and update the
call sites in meterqueryrow_cache_parity_test.go to use the repo’s preferred
pointer-literal approach or existing lo helpers directly, keeping the test code
explicit without introducing a new package-level helper.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 05b4063a-2581-4177-a112-6446ce7f7256
📒 Files selected for processing (38)
api/v3/handlers/meters/query.goapi/v3/handlers/meters/query_csv.goapp/common/streaming.goapp/config/aggregation.goapp/config/config_test.goapp/config/testdata/complete.yamlcmd/balance-worker/wire.gocmd/balance-worker/wire_gen.gocmd/billing-worker/wire_gen.gocmd/jobs/internal/wire_gen.gocmd/notification-service/wire.gocmd/notification-service/wire_gen.gocmd/server/wire_gen.gocmd/sink-worker/wire.gocmd/sink-worker/wire_gen.goconfig.example.yamlopenmeter/billing/charges/usagebased/service/rating/service_test.goopenmeter/cost/adapter/adapter.goopenmeter/credit/balance/usage.goopenmeter/entitlement/metered/balance.goopenmeter/entitlement/metered/balance_test.goopenmeter/meter/httphandler/query.goopenmeter/meter/httphandler/query_csv.goopenmeter/streaming/clickhouse/connector.goopenmeter/streaming/clickhouse/meter_query.goopenmeter/streaming/clickhouse/meterqueryrow_cache.goopenmeter/streaming/clickhouse/meterqueryrow_cache_coverage.goopenmeter/streaming/clickhouse/meterqueryrow_cache_coverage_test.goopenmeter/streaming/clickhouse/meterqueryrow_cache_exec.goopenmeter/streaming/clickhouse/meterqueryrow_cache_gate.goopenmeter/streaming/clickhouse/meterqueryrow_cache_integration_test.goopenmeter/streaming/clickhouse/meterqueryrow_cache_monitor.goopenmeter/streaming/clickhouse/meterqueryrow_cache_parity_test.goopenmeter/streaming/clickhouse/meterqueryrow_cache_query.goopenmeter/streaming/clickhouse/meterqueryrow_cache_test.goopenmeter/streaming/clickhouse/zz_audit_test.goopenmeter/streaming/query_params.goopenmeter/streaming/testutils/streaming.go
CodeRabbit: - Post-merge invalidation guard: a namespace invalidation landing between a cached read's populate and its merge SELECT could blank the settled range for that one response (the marker only orders plans). The read now re-checks the marker after the merge and falls back to the live query when it moved. - Gate unknown GroupBy keys to the live path so they surface as the live path's validation error instead of a cached-path SQL-build error. - Parity mismatch logs name the differing fields instead of dumping raw row keys (subject/customer identifiers stay out of error logs). - Self-healing invalidation after a parity mismatch runs on its own bounded context instead of the shadow check's possibly-exhausted one. - Parity comparison keys distinguish nil from empty-string values (monitor and test comparator). Greptile: - ClickHouse UNKNOWN_TABLE detection uses the typed exception with a string fallback for transports that wrap server errors as text. - filterStringWhere fails loudly when the WHERE fragment cannot be extracted instead of silently dropping the filter. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
There was a problem hiding this comment.
♻️ Duplicate comments (1)
openmeter/streaming/clickhouse/meterqueryrow_cache_exec.go (1)
351-399: 🗄️ Data Integrity & Integration | 🟠 Major | ⚡ Quick winOnly delete rollup rows after the marker insert succeeds.
Nice post-merge guard, but the fallback path still has a gap: if the marker INSERT fails and the claim-delete fallback succeeds, Step 2 still deletes rollup rows. In-flight cached reads planned before that failure won’t see a newer marker in
invalidatedSince, so they can merge after rows were deleted and return undercounted data. Keep claim cleanup, but skip rollup cleanup unless the marker was committed.Suggested patch
markerSQL, markerArgs := markers.toSQL() + markerInserted := true if err := c.config.ClickHouse.Exec(ctx, markerSQL, markerArgs...); err != nil { + markerInserted = false // A missing table (code 60) means nothing is cached: invalidation is // always-on even with the cache disabled. if !isUnknownTableError(err) { @@ claimsSQL, claimsArgs := deleteMeterQueryRowCacheCoverageClaims{ Database: c.config.Database, TableName: meterQueryRowCacheCoverageTable, Namespaces: namespaces, }.toSQL() - rollupSQL, rollupArgs := deleteMeterQueryRowCacheForNamespaces{ - Database: c.config.Database, - TableName: meterQueryRowCacheTable, - Namespaces: namespaces, - }.toSQL() - - for _, cleanup := range []struct { + type cleanupStmt struct { sql string args []interface{} - }{ + } + cleanups := []cleanupStmt{ {claimsSQL, claimsArgs}, - {rollupSQL, rollupArgs}, - } { + } + if markerInserted { + rollupSQL, rollupArgs := deleteMeterQueryRowCacheForNamespaces{ + Database: c.config.Database, + TableName: meterQueryRowCacheTable, + Namespaces: namespaces, + }.toSQL() + cleanups = append(cleanups, cleanupStmt{rollupSQL, rollupArgs}) + } + + for _, cleanup := range cleanups {This is the same rollup-delete race class as the earlier review note, now narrowed to the marker-insert fallback path.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@openmeter/streaming/clickhouse/meterqueryrow_cache_exec.go` around lines 351 - 399, The cleanup path in `invalidateMeterQueryRowCache` still deletes rollup rows even when the marker insert fails and only the claim-delete fallback succeeds. Keep the existing fallback claim cleanup, but gate the Step 2 rollup deletion loop so it runs only after the marker write has actually committed successfully; use the marker insert result and the `deleteMeterQueryRowCacheCoverageClaims` / `deleteMeterQueryRowCacheForNamespaces` flow to place that guard correctly.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Duplicate comments:
In `@openmeter/streaming/clickhouse/meterqueryrow_cache_exec.go`:
- Around line 351-399: The cleanup path in `invalidateMeterQueryRowCache` still
deletes rollup rows even when the marker insert fails and only the claim-delete
fallback succeeds. Keep the existing fallback claim cleanup, but gate the Step 2
rollup deletion loop so it runs only after the marker write has actually
committed successfully; use the marker insert result and the
`deleteMeterQueryRowCacheCoverageClaims` /
`deleteMeterQueryRowCacheForNamespaces` flow to place that guard correctly.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 43c505a7-9232-4972-94cf-985d68a987d9
📒 Files selected for processing (7)
openmeter/streaming/clickhouse/meterqueryrow_cache_exec.goopenmeter/streaming/clickhouse/meterqueryrow_cache_gate.goopenmeter/streaming/clickhouse/meterqueryrow_cache_integration_test.goopenmeter/streaming/clickhouse/meterqueryrow_cache_monitor.goopenmeter/streaming/clickhouse/meterqueryrow_cache_parity_test.goopenmeter/streaming/clickhouse/meterqueryrow_cache_query.goopenmeter/streaming/clickhouse/meterqueryrow_cache_test.go
🚧 Files skipped from review as they are similar to previous changes (4)
- openmeter/streaming/clickhouse/meterqueryrow_cache_test.go
- openmeter/streaming/clickhouse/meterqueryrow_cache_query.go
- openmeter/streaming/clickhouse/meterqueryrow_cache_parity_test.go
- openmeter/streaming/clickhouse/meterqueryrow_cache_gate.go
| // FeatureGate gates the query cache per namespace. | ||
| FeatureGate *featuregate.FeatureGateChecker |
There was a problem hiding this comment.
Personally I would love to see FF information being attached to Context resolved by a HTTP middleware instead of wiring the FeatureGateChecker service into each and every service.
There was a problem hiding this comment.
Use ContextResolver instead which does what I meantioned above.
Summary by CodeRabbit