[OPIK-5659] [BE] fix: prevent concurrent experiment aggregation and reduce ClickHouse query overload#6076
Conversation
…educe ClickHouse query overload Root cause: aggregation lock TTL (1 min) was far shorter than the actual processing time for large experiments (~87 min at batchSize=1000 for 1M items). When the lock expired, multiple nodes reprocessed the same experiment concurrently, each issuing the full set of ClickHouse queries per batch — causing 2000+ queries/minute spikes. Fixes: - Switch from executeWithLockCustomExpire to bestEffortLock with 500ms acquire wait, so a locked experiment is immediately skipped rather than queued concurrently - Apply .timeout(lockTTL) to the processing Mono so Reactor cancels in-flight R2DBC queries when TTL elapses, instead of leaving them running silently after lock expiry - Raise aggregationLockTime default from 1 min to 10 min - Add retry logic: on timeout, re-publish via debounce up to maxLockExpiryRetries=3; reset counter on successful completion - All new config fields (lockAcquireWait, maxLockExpiryRetries, retryCounterTtl) are externalised in ExperimentDenormalizationConfig — no hardcoded values - Resolve getProjectId once per experiment (not per batch) eliminating N-1 redundant ClickHouse round trips across the expand() loop - Pass traceIds extracted from the already-fetched batch directly to all 5 parallel queries, replacing the repeated experiment_items FINAL CTE subquery in each
Move resetRetryCounter into the action Mono so it only runs when this node actually acquired the lock and processing completed successfully. Previously it ran after bestEffortLock regardless of whether the lock was acquired, resetting the shared counter on skip (Mono.empty) paths. Also add workspaceId to the doOnError log for consistency with other log statements in the same method.
…ion-query-overload
…ion-query-overload
…ion-query-overload
…cross aggregation queries Removes FINAL from experiments/spans/traces/experiment_items/assertion_results reads and replaces with explicit ORDER BY (<sort_key>) DESC, last_updated_at DESC + LIMIT 1 BY <dedup_key> to avoid the per-query ReplacingSorted merge work. Also fixes two bugs: - ::trace_ids typo (double colon) → :trace_ids - GET_ASSERTIONS_DATA inner subquery was missing the name column and LIMIT 1 BY clause, causing "Unknown expression or function identifier 'name'" at runtime. Aligned with the canonical assertion_results dedup pattern used elsewhere in the file.
ORDER BY ... ASC, last_updated_at ASC before LIMIT 1 BY id kept the oldest row per id, which could cause populateExperimentItemAggregates to process stale snapshots. Flipped to DESC, last_updated_at DESC to match the latest-row dedup pattern used in GET_TRACES_DATA and GET_SPANS_DATA.
…TEMS The prior commit flipped ORDER BY to DESC on all columns so LIMIT 1 BY id would keep the latest row per id, but that broke cursor pagination: with DESC ordering and the filter id > :cursor, each subsequent batch only excludes the smallest id of the previous batch and re-returns the rest, so the iteration only ever processes the top ~batchSize ids of the experiment and never progresses. Keep id ASC for forward cursor progression while using last_updated_at DESC so LIMIT 1 BY id still selects the latest version per id.
…use HTTP JSONEachRow The prior INSERT used a StringTemplate that rendered one VALUES tuple per item with N distinct named parameters (`:id0, :id1, ...`). R2DBC's named-parameter resolution grew super-linearly with batch size, so `EXPERIMENT_AGGREGATES_BATCH_SIZE=10000` made the Java-side render + bind pipeline take ~45s per batch (CH server-side INSERT itself was <200ms). At the 60s lock TTL this left ~0 headroom: every attempt processed one 10k-item batch, then got cancelled. Smaller batch sizes avoided the super-linear cost but capped throughput at ~1.5k items/sec. Replace the R2DBC path for this INSERT with the ClickHouse v2 HTTP client (`com.clickhouse:client-v2`) POSTing JSONEachRow directly, with client-request + server-response compression enabled. The rest of the aggregation pipeline keeps using R2DBC. At batchSize=10000 the per-batch cost drops from ~45s to ~60-100ms (bodyMs ~20-40 + executeMs ~35-60). End-to-end a 1M-item experiment now completes aggregation in ~45s as a single attempt, well within the 60s lock TTL. ExperimentAggregatesIntegrationTest (126 tests) passes. Also wires `EXPERIMENT_AGGREGATES_BATCH_SIZE` through the backend docker-compose service, defaulting to 10000.
88a7223 to
67ee4d1
Compare
…ion-query-overload
|
✅ Test environment is now available! To configure additional Environment variables for your environment, run [Deploy Opik AdHoc Environment workflow] (https://github.com/comet-ml/comet-deployment/actions/workflows/deploy_opik_adhoc_env.yaml) Access Information
The deployment has completed successfully and the version has been verified. |
Python SDK E2E Tests Results (Python 3.13)347 tests 345 ✅ 12m 27s ⏱️ Results for commit 23e7323. ♻️ This comment has been updated with latest results. |
ldaugusto
left a comment
There was a problem hiding this comment.
It's very cool you find out the client-v2 is such more efficient in this situation, if it's something very big it's worthy to have a second pool to manage these operations. Do you have a general guide on which kind of operations would be better to do on each pool? If binded inserts are this better with client-v2, would it be the same for traces/spans inserts?
Also, PR descriptiopn mentions new default for EXPERIMENT_AGGREGATES_BATCH_SIZE is 10000, but I don't see this change
andrescrz
left a comment
There was a problem hiding this comment.
This PR is complex, but generally well done.
I left feedback around the stuff to verify before moving forward.
Good job!
| message.experimentId(), message.workspaceId(), retryCount, | ||
| config.getMaxLockExpiryRetries()); | ||
|
|
||
| return publisher.publish( |
There was a problem hiding this comment.
I understand this re-injects back a message to this class, making it self-retryable. Please make sure there are no race conditions or other circumstances that can lead to a storm of retries here.
You need to ensure there's a short circuit or exit condition to always prevent this from happening.
There was a problem hiding this comment.
Good callout — here are the safeguards in place (in retriggerIfBelowMaxRetries / resetRetryCounter):
- Atomic retry counter per
(workspaceId, experimentId)— usesRedissonClient.getAtomicLong(key)withincrementAndGet(), so increments are race-free across nodes. - Counter increments before publishing — we only re-publish if
incrementAndGet()returns<= maxLockExpiryRetries(default3, configurable viamaxLockExpiryRetries). Past that wecounter.delete()and returnMono.empty()— the retry chain terminates, no more messages are emitted for that experiment. - TTL on the counter —
counter.expire(retryCounterTtl)(default10m, configurable) is set after each successful re-publish. If the experiment stops producing events, the counter self-cleans and a future legitimate trigger starts fresh. - Counter reset on successful processing —
resetRetryCounterdeletes the key only on the success path (afterpopulateAggregationscompletes within the lock TTL). Fixed by a recent commit (c48ece1fbe) so the counter is NOT reset when the lock was never acquired (otherwise the cap could be bypassed). - Only triggered by
TimeoutException— i.e. the lock TTL genuinely expired. Other errors fall through todoOnErrorand do NOT re-publish. - Debounce on the publisher side —
publisher.publish(...)dedupes concurrent triggers for the same experiment within a window, so even if multiple retries overlap they collapse to a single message. - Log trail at WARN level on every retry and on cap-reached, so a retry storm would be visible in logs and metrics immediately.
Net effect: at most maxLockExpiryRetries automatic re-triggers per (workspace, experiment), then the chain stops — even under concurrent producers and node restarts.
🤖 Reply posted via /address-github-pr-comments
There was a problem hiding this comment.
Commit daa62f7 addressed this comment by adding retriggerIfBelowMaxRetries, which increments a per-experiment atomic counter and stops re-publishing once maxLockExpiryRetries is reached, ensuring the retry loop short-circuits instead of creating a storm of retries.
| Map<String, BigDecimal> feedbackScoresMap = Optional.ofNullable(feedback) | ||
| .map(FeedbackScoreData::feedbackScores).orElse(Map.of()); | ||
|
|
||
| var node = JsonUtils.createObjectNode(); |
There was a problem hiding this comment.
Switching clients is a major architectural change. It's fine under the scope of this PR, but we'll need to be careful with the exact configuration of the JSON mapper to be used. I believe JsonUtils should be fine, so we keep things centralised. Let's make sure that no configuration for it breaks the ClickHouse communication.
I guess this is fine for now as tests pass. Likely not action for now, maybe extending the javadocs documentation in this PR, starting by this method.
There was a problem hiding this comment.
Fixed in d055ca6 — add javadoc on appendJsonRow documenting the JSONEachRow contract, JsonUtils reuse, and null-coalescing policy.
🤖 Reply posted via /address-github-pr-comments
There was a problem hiding this comment.
Commit d055ca6 addressed this comment by adding detailed javadoc to the appendJsonRow helper that explains why we rely on JsonUtils for JSON serialization and how the bulk insert flow keeps ClickHouse configuration centralized; the documentation now documents the reasoning the reviewer asked for.
| Optional.ofNullable(assertionsMap.get(item.traceId())).map(AssertionData::assertionsArray) | ||
| .orElse(EMPTY_ARRAY_STR)); | ||
|
|
||
| out.append(node).append('\n'); |
There was a problem hiding this comment.
Same comment about this. Likely not action for now other than documenting. In the long run we'll need to document these nuances in order to extend the usage for this ClickHouse V2 client.
There was a problem hiding this comment.
Fixed in d055ca6 — add javadoc on insertExperimentItems explaining the v2-client + JSONEachRow path is scoped to this batch-insert (used when EXPERIMENT_AGGREGATES_BATCH_SIZE > 1K), the per-request date_time_input_format setting, and NUM_ROWS_WRITTEN return semantics.
🤖 Reply posted via /address-github-pr-comments
There was a problem hiding this comment.
Commit d055ca6 addressed this comment by adding extensive JavaDoc around appendJsonRow and insertExperimentItems that explains the JSONEachRow format, shared buffer usage, and ClickHouse V2 client considerations mentioned in the discussion.
|
|
||
| var settings = new InsertSettings() | ||
| .logComment(logComment) | ||
| .serverSetting("date_time_input_format", "best_effort"); |
There was a problem hiding this comment.
What about this setting for ClickHouse V2 client, can we configure it globally?
There was a problem hiding this comment.
We kept it as a per-request InsertSettings on purpose: in the regular R2DBC insert paths, timestamp strings are normalised via a server-side function (e.g. parseDateTime64BestEffort(...)) inside the SQL itself, so no global setting is required. On the JSONEachRow path there's no function-wrapping opportunity — the column receives the raw string from the JSON payload — so we need date_time_input_format=best_effort on the request so ClickHouse parses Instant.toString() (ISO-8601 with nanos) directly into DateTime64(9, 'UTC').
I'd keep it per-request for now — it's scoped only to this bulk-insert path and leaves the global Client defaults conservative. If we extend the v2 client to more paths in the future and they all want the same behaviour, we can promote it to a global Client.Builder setting (or equivalently add it to custom_http_params in config.yml) as a follow-up.
🤖 Reply posted via /address-github-pr-comments
There was a problem hiding this comment.
Commit d055ca6 addressed this comment by documenting that the v2 bulk insert path explicitly sets date_time_input_format=best_effort on each insert request and keeps the global Client.Builder defaults conservative, explaining why we do not lift the setting into a global configuration right now.
Hi @ldaugusto On which operations go where:
When the v2 path actually matters: workspaces with experiments of ≥1M items — at Rule of thumb: use R2DBC for everything except a bulk-insert path that has to run with >1k rows per statement (where the fixed cost per statement is dominated by named-parameter bind time rather than server-side work). Traces/spans inserts: same template pattern, but in practice batched ≤1,000 rows per call via the SDK — comfortably below the inflection point, so no win from switching. If ingest rates ever push those per-statement batch sizes above ~1k, the same v2-HTTP pattern would apply. Worth a follow-up once we agree on a shared helper (e.g., a On 🤖 Reply posted via /address-github-pr-comments |
- Log values moved to end of sentence for production greppability (ExperimentAggregatesSubscriber, ExperimentAggregatesService). - @builder on BatchResult; call sites use builder pattern. - Javadoc on appendJsonRow documenting JSONEachRow contract, JsonUtils reuse, shared-StringBuilder rationale, and null-coalescing policy. - Javadoc on insertExperimentItems documenting why the ClickHouse v2 HTTP client + JSONEachRow path is used only for this high-volume batch insert (EXPERIMENT_AGGREGATES_BATCH_SIZE can exceed 1K), why date_time_input_format is scoped per-request, and the NUM_ROWS_WRITTEN return semantics. - DatabaseAnalyticsFactory encapsulates v2 Client construction via buildClient(), parseQueryParameters() splits queryParameters into driver options (R2DBC-specific, not forwarded) and server settings (custom_http_params content → Client.Builder.serverSetting). DatabaseAnalyticsModule.getDatabaseAnalyticsFactory provider removed. - Unit tests (DatabaseAnalyticsFactoryTest) for parseQueryParameters and integration tests (DatabaseAnalyticsFactoryIntegrationTest) verifying custom_http_params entries land in system.settings via a real ClickHouse Testcontainer, that driver options don't leak, and that a JSONEachRow round-trip completes. - config.yml: new env-var-overridable settings for the retry pipeline (lockAcquireWait, maxLockExpiryRetries, retryCounterTtl); updated aggregationLockTime default to 10m to match code.
… and guard it The "finished processing all experiments" log previously fired at INFO on every 5s polling tick, even when no experiments were pending. Now the job tracks the number of processed experiments and only emits the completion log when at least one was actually processed. Value placed at the end of the sentence per the log format convention used across the aggregation pipeline.
- Per-batch log in ExperimentAggregatesService was labelled batchSize but carried result.processedCount(); rename placeholder to processedCount so the wording matches the value. Configured batch size is still logged once at job start. - Denormalization job counter previously incremented in doOnNext before flatMap(processExperiment), so the processedExperiments log counted items that failed and were skipped by onErrorContinue. Move the increment to doOnSuccess on the inner Mono so the count reflects actually processed experiments.
Python SDK E2E Tests Results (Python 3.11)347 tests 345 ✅ 12m 48s ⏱️ Results for commit c04ab13. ♻️ This comment has been updated with latest results. |
ldaugusto
left a comment
There was a problem hiding this comment.
Thanks for addressing all the suggestions! It's approved on my side.
andrescrz
left a comment
There was a problem hiding this comment.
LGTM:
- Just double check the correctness of the values on the aggregated query.
- Double check the OPIK_EXPERIMENT_DENORM_RETRY_COUNTER_TTL default of 2h.
- Missing mirroring values in test config YML file.
The rest is minor, feel free to go ahead if all good.
| count() AS n, | ||
| avg(value) AS avg_value, | ||
| any(value) AS any_value, | ||
| any(reason) AS any_reason, | ||
| arrayStringConcat(groupArray(if(reason = '', '\\<no reason>', reason)), ', ') AS reason_concat, | ||
| arrayStringConcat(groupArray(category_name), ', ') AS category_name_concat, | ||
| any(source) AS source_any, | ||
| arrayStringConcat(groupArray(created_by), ', ') AS created_by_concat, | ||
| arrayStringConcat(groupArray(last_updated_by), ', ') AS last_updated_by_concat, | ||
| min(created_at) AS created_at_min, | ||
| max(last_updated_at) AS last_updated_at_max, | ||
| mapFromArrays( | ||
| groupArray(author), | ||
| groupArray(tuple(value, reason, category_name, source, last_updated_at)) | ||
| ) AS value_by_author |
There was a problem hiding this comment.
Could you double check the correctness of this? Mostly the count without an inner ID (maybe with distinct) and the any functions.
There was a problem hiding this comment.
Commit 4eccf7f addressed this comment by changing the aggregate count to use count(DISTINCT author), aligning with the requested clarification about counting without an inner ID.
There was a problem hiding this comment.
Fixed in 4eccf7f (count() → count(DISTINCT author)). Tested the full query against a live ClickHouse with three scenarios: 1 author, 2 authors (alice@10:55 + bob@11:00), and 3 authors (alice@11:50, bob@11:55, carol@12:00). Also added a duplicate-insert stress test (second alice row at 10:57 for trace2) to simulate dedup edge cases.
count() vs count(DISTINCT author)
- With the current
LIMIT 1 BY ..., author:count()==count(DISTINCT author)in every scenario (1/1, 2/2, 3/3). The dedup guarantees one row per author so both forms agree. - Without the
LIMIT 1 BY(to simulate a regressed dedup):count()reports 2/1, 4/2, 4/3 — overcounts;count(DISTINCT author)stays correct.
Switched to count(DISTINCT author) — same semantics today, defensive against accidental regression of the dedup clause. Low-risk wording change with no perf impact (the CTE is already 1 row per author).
any(value) / any(reason) / any(source)
- Ran the aggregation 5 times sequentially + 3 times with
max_threads=8— every run produced identical output (all 3 any(...) picked the latest author's row: bob for trace2, carol for trace3). - Compared against the raw-query pattern used in
ExperimentItemDAO.java:231(arrayElement(entries, 1).4 AS source) — both produce the same output. In practice the outer aggregation reads the deduped CTE in scan order which is governed by itsORDER BY last_updated_at DESC, soany()andarrayElement(entries, 1)both yield the latest author's values. any(value)andany(reason)are additionally gated byIF(n=1, any_*, avg/concat)so even if the order ever slipped, a mismatch would only show up for 1-row groups — whereany(x)over a single row is trivially deterministic. Samecount() + any(value)pattern is used in 14 places acrossDatasetItemDAO,DatasetItemVersionDAO,ExperimentDAO,KpiCardDAO,OptimizationDAO,ProjectMetricsDAO,SpanDAO, andTraceDAO, so this PR is consistent with the existing codebase convention.
Leaving any(...) as-is in this PR since behaviour matches the raw path end-to-end. Happy to tighten further in a follow-up (e.g., argMax(source, last_updated_at)) if you'd prefer explicit determinism across the aggregation stack.
🤖 Reply posted via /address-github-pr-comments
…ggregation Defensive change: count() was correct given the LIMIT 1 BY ..., author dedup but silently overcounts if the dedup ever regresses. Switching to count(DISTINCT author) keeps the invariant explicit at negligible cost (one row per author in the deduped CTE).
- ExperimentAggregatesSubscriber: extract buildRetryCountKey helper so retriggerIfBelowMaxRetries and resetRetryCounter share a single key generation site. - ExperimentDenormalizationJob: switch processExperiment to return the experiment id so the outer doOnNext actually fires per successful processing (Mono<Void> never triggers doOnNext). Retains the same per-experiment log and counter increment semantic. - config.yml: lower retryCounterTtl default from 2h to 30m to match the worst-case retry cycle (3 attempts at 10m lock TTL) plus a modest idle buffer. - config-test.yml: add missing lockAcquireWait, maxLockExpiryRetries, retryCounterTtl entries to the experimentDenormalization block.
Backend Tests - Integration Group 8 35 files 35 suites 6m 40s ⏱️ Results for commit c996420. ♻️ This comment has been updated with latest results. |
Previously counter.expire(...) ran only after publisher.publish(...) succeeded. If publish failed, the retry counter was incremented without a TTL and could linger, causing subsequent lock-expiry retries to hit maxLockExpiryRetries prematurely. Reordering so expire runs right after the increment guarantees the counter always has a TTL and will age out naturally regardless of whether the publish succeeds.
Details
Experiment aggregation was generating 2,000+ ClickHouse queries/minute under load. The root cause was a mismatch between the aggregation lock TTL (1 minute) and the actual time required to process large experiments. When the lock expired, multiple nodes reprocessed the same experiment concurrently, each issuing the full set of ClickHouse queries per batch. A subsequent load test against 2 experiments with 1M items each (5M traces, 15M spans, 8M feedback scores) surfaced a client-side INSERT scaling wall that was resolved in this PR.
Root cause — lock TTL vs processing time:
Original concurrency/contention fixes
executeWithLockCustomExpire(blocks until TTL, enables concurrent processing) tobestEffortLockwithlockAcquireWait=500ms— if the lock is held, the message is immediately ack'd without blocking worker threads..timeout(lockTTL)to the processing Mono. Reactor cancels the upstream R2DBC subscription, stopping in-flight ClickHouse queries when the TTL elapses — instead of leaving them running after the lock silently expires.Query-level redundancies removed
Client-side INSERT bottleneck eliminated
Bulk-insert `experiment_item_aggregates` via ClickHouse HTTP JSONEachRow: the original template rendered one VALUES tuple per item with distinct named parameters (`:id0, :id1, …, :id9999`). R2DBC's named-parameter resolution scales super-linearly in batch size:
Attempting R2DBC's `Statement.add()` batching with a single-row template silently produced zero-row INSERTs (the ClickHouse R2DBC driver does not serialize accumulated bindings into the `FORMAT Values` body).
Replaced the R2DBC INSERT with `com.clickhouse:client-v2`'s HTTP client posting `FORMAT JSONEachRow` directly, with client-request + server-response compression. The rest of the aggregation pipeline keeps using R2DBC. `InsertSettings.serverSetting("date_time_input_format", "best_effort")` is set so ClickHouse accepts ISO 8601 `Instant.toString()` values for DateTime64 columns.
Result at batchSize=10,000:
`EXPERIMENT_AGGREGATES_BATCH_SIZE` is exposed as an env var on the backend docker-compose service so operators can tune it per workspace. The application default stays at 1,000 (appropriate for typical workspaces); raising it to ~10,000 is only needed for workspaces with experiments of ≥1M items, where the smaller batch does not fit in the 60s lock TTL.
Config
All new config fields are externalised in `ExperimentDenormalizationConfig` with no hardcoded values. `aggregationLockTime` default remains 1 min — 1M-item experiments complete in ~45 s at batchSize=10,000, leaving comfortable headroom. `maxLockExpiryRetries` defaults to 3.
Change checklist
Issues
AI-WATERMARK
AI-WATERMARK: yes
Testing
End-to-end load test against a local dockerised backend (MySQL + ClickHouse + Redis + MinIO + Zookeeper):
Scenarios covered:
Environment: local process mode with Testcontainers for integration tests; local docker-compose stack for the load test.
Documentation
N/A — internal processing component, no user-facing API changes.