Skip to content

[OPIK-5659] [BE] fix: prevent concurrent experiment aggregation and reduce ClickHouse query overload#6076

Merged
thiagohora merged 18 commits into
mainfrom
thiagohora/OPIK-5659-fix-experiment-aggregation-query-overload
Apr 22, 2026
Merged

[OPIK-5659] [BE] fix: prevent concurrent experiment aggregation and reduce ClickHouse query overload#6076
thiagohora merged 18 commits into
mainfrom
thiagohora/OPIK-5659-fix-experiment-aggregation-query-overload

Conversation

@thiagohora
Copy link
Copy Markdown
Contributor

@thiagohora thiagohora commented Apr 4, 2026

Details

Experiment aggregation was generating 2,000+ ClickHouse queries/minute under load. The root cause was a mismatch between the aggregation lock TTL (1 minute) and the actual time required to process large experiments. When the lock expired, multiple nodes reprocessed the same experiment concurrently, each issuing the full set of ClickHouse queries per batch. A subsequent load test against 2 experiments with 1M items each (5M traces, 15M spans, 8M feedback scores) surfaced a client-side INSERT scaling wall that was resolved in this PR.

Root cause — lock TTL vs processing time:

  • Per-batch cost: ~1.6 s (fetch items) + ~3.3 s (parallel span / trace / feedback / comments / assertions) + ~0.3 s (INSERT) ≈ 5.2 s/batch at batchSize=1,000.
  • 1M-item experiment → 1,000 batches × 5.2 s = ~87 minutes, against a 1 minute lock TTL.
  • Lock expired → other nodes acquired it and started over → 2,000+ queries/minute.

Original concurrency/contention fixes

  1. Fail-fast on lock contention: switched from executeWithLockCustomExpire (blocks until TTL, enables concurrent processing) to bestEffortLock with lockAcquireWait=500ms — if the lock is held, the message is immediately ack'd without blocking worker threads.
  2. Actual cancellation on timeout: applied .timeout(lockTTL) to the processing Mono. Reactor cancels the upstream R2DBC subscription, stopping in-flight ClickHouse queries when the TTL elapses — instead of leaving them running after the lock silently expires.
  3. Retry logic with max attempts: on timeout the experiment is re-published via the debounce publisher. A per-experiment Redis `RAtomicLong` counter caps retries at `maxLockExpiryRetries=3`; resets on success.

Query-level redundancies removed

  1. `getProjectId` resolved once per experiment: was called inside the per-batch loop — for 1,000 batches that's 1,000 redundant ClickHouse round trips returning the same value. Now resolved once before the `expand()` loop and threaded through.
  2. Batch trace IDs passed directly to parallel queries: each of the 5 parallel queries per batch was re-deriving the batch's trace IDs via a `SELECT DISTINCT trace_id FROM experiment_items FINAL WHERE ... ORDER BY id LIMIT :batchSize` CTE. Those IDs are already in memory after `getExperimentItems`. They are now passed as a UUID array, eliminating 5 `experiment_items FINAL` scans per batch (5,000 per 1M-item experiment at batchSize=1,000).
  3. BatchResult count fixed: ClickHouse R2DBC `getRowsUpdated()` always returns 0 for INSERTs — logging showed `processedCount=0` every batch. Fixed to use `items.size()`.

Client-side INSERT bottleneck eliminated

  1. Bulk-insert `experiment_item_aggregates` via ClickHouse HTTP JSONEachRow: the original template rendered one VALUES tuple per item with distinct named parameters (`:id0, :id1, …, :id9999`). R2DBC's named-parameter resolution scales super-linearly in batch size:

    batchSize Wall/batch CH active Java overhead Items/sec Items per 60s TTL Overhead per item
    1 000 ~500 ms ~100 ms ~400 ms 2 000 90 000 0.4 ms/item
    2 000 ~2.75 s ~263 ms ~2.5 s 727 44 000 1.25 ms/item
    5 000 ~14 s ~300 ms ~13.7 s 333 20 000 2.7 ms/item
    10 000 ~50 s ~500 ms ~49.5 s 200 10 000 4.9 ms/item

    Attempting R2DBC's `Statement.add()` batching with a single-row template silently produced zero-row INSERTs (the ClickHouse R2DBC driver does not serialize accumulated bindings into the `FORMAT Values` body).

    Replaced the R2DBC INSERT with `com.clickhouse:client-v2`'s HTTP client posting `FORMAT JSONEachRow` directly, with client-request + server-response compression. The rest of the aggregation pipeline keeps using R2DBC. `InsertSettings.serverSetting("date_time_input_format", "best_effort")` is set so ClickHouse accepts ISO 8601 `Instant.toString()` values for DateTime64 columns.

    Result at batchSize=10,000:

    Metric Before (R2DBC template) After (HTTP JSONEachRow) Speedup
    Wall/batch ~50 s ~60–100 ms ~500×
    Items/sec per experiment ~200 ~166 000 ~830×
    Items per 60 s TTL attempt 10 000 1 000 000 (full) 100×
    Completion for 1M-item experiment never (hit max retries) single attempt in ~45–48 s

    `EXPERIMENT_AGGREGATES_BATCH_SIZE` is exposed as an env var on the backend docker-compose service so operators can tune it per workspace. The application default stays at 1,000 (appropriate for typical workspaces); raising it to ~10,000 is only needed for workspaces with experiments of ≥1M items, where the smaller batch does not fit in the 60s lock TTL.

Config

All new config fields are externalised in `ExperimentDenormalizationConfig` with no hardcoded values. `aggregationLockTime` default remains 1 min — 1M-item experiments complete in ~45 s at batchSize=10,000, leaving comfortable headroom. `maxLockExpiryRetries` defaults to 3.

image

Change checklist

  • User facing
  • Documentation update

Issues

  • OPIK-5659

AI-WATERMARK

AI-WATERMARK: yes

  • Tools: Claude Code
  • Model(s): claude-sonnet-4-6, claude-opus-4-7
  • Scope: root cause analysis, full implementation, query optimisation, HTTP bulk-insert rewrite, load-test harness and test coverage
  • Human verification: code review + ExperimentAggregatesIntegrationTest (126 tests) + ExperimentAggregatesSubscriberTest (7 tests) + end-to-end load test

Testing

  • `mvn test -Dtest="ExperimentAggregatesSubscriberTest"` — 7 tests, all pass (lifecycle, lock contention, timeout/retry, max-retry-stop).
  • `mvn test -Dtest="ExperimentAggregatesIntegrationTest"` — 126 tests, all pass (includes the HTTP bulk-insert path for item aggregates).
  • `mvn spotless:check` — no formatting issues.

End-to-end load test against a local dockerised backend (MySQL + ClickHouse + Redis + MinIO + Zookeeper):

  1. Ingested 5,000,000 traces (3 spans each = 15M spans) under a load-test project using `tests_load/tests/test_experiment_aggregation_load.py` with 16 multiprocessing workers (~1,490 traces/s sustained).
  2. Created a dataset with 1,000,000 items using a single `dataset.insert()` call (SDK splits internally with a single `batch_group_id` so the backend keeps a single version).
  3. Created 2 experiments × 1,000,000 linked items (2M `experiment_items` rows in ClickHouse).
  4. Triggered lazy aggregation on both experiments via `GET /v1/private/experiments/{id}` at `EXPERIMENT_AGGREGATES_BATCH_SIZE=10000` and verified:
    • Both experiments completed on first attempt: exp₁ in 53.1 s, exp₂ in 49.3 s (running concurrently, sharing backend CPU).
    • `experiment_item_aggregates` contains 1,000,000 unique ids per experiment.
    • `experiment_aggregates` rows populated for both experiments.
    • Typical per-batch insert: `bodyMs=18–40, executeMs=34–61, bodyBytes=7.7–10.4 MB, rowsWritten=10000`.

Scenarios covered:

  • Lock already held → message ack'd immediately, no processing started.
  • Processing completes within TTL → retry counter reset.
  • Processing exceeds TTL → Reactor cancels R2DBC queries, experiment re-triggered via debounce.
  • Max retries exhausted → counter deleted, no further re-triggers.
  • Concurrent subscriber instances → only one processes at a time.

Environment: local process mode with Testcontainers for integration tests; local docker-compose stack for the load test.

Documentation

N/A — internal processing component, no user-facing API changes.

…educe ClickHouse query overload

Root cause: aggregation lock TTL (1 min) was far shorter than the actual processing
time for large experiments (~87 min at batchSize=1000 for 1M items). When the lock
expired, multiple nodes reprocessed the same experiment concurrently, each issuing the
full set of ClickHouse queries per batch — causing 2000+ queries/minute spikes.

Fixes:
- Switch from executeWithLockCustomExpire to bestEffortLock with 500ms acquire wait,
  so a locked experiment is immediately skipped rather than queued concurrently
- Apply .timeout(lockTTL) to the processing Mono so Reactor cancels in-flight R2DBC
  queries when TTL elapses, instead of leaving them running silently after lock expiry
- Raise aggregationLockTime default from 1 min to 10 min
- Add retry logic: on timeout, re-publish via debounce up to maxLockExpiryRetries=3;
  reset counter on successful completion
- All new config fields (lockAcquireWait, maxLockExpiryRetries, retryCounterTtl) are
  externalised in ExperimentDenormalizationConfig — no hardcoded values
- Resolve getProjectId once per experiment (not per batch) eliminating N-1 redundant
  ClickHouse round trips across the expand() loop
- Pass traceIds extracted from the already-fetched batch directly to all 5 parallel
  queries, replacing the repeated experiment_items FINAL CTE subquery in each
@github-actions github-actions Bot added java Pull requests that update Java code Backend tests Including test files, or tests related like configuration. labels Apr 4, 2026
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 4, 2026

Backend Tests - Integration Group 15

209 tests  ±0   207 ✅ ±0   4m 27s ⏱️ +10s
 31 suites ±0     2 💤 ±0 
 31 files   ±0     0 ❌ ±0 

Results for commit 54fd3f6. ± Comparison against base commit 6bc1a91.

♻️ This comment has been updated with latest results.

Move resetRetryCounter into the action Mono so it only runs when this
node actually acquired the lock and processing completed successfully.
Previously it ran after bestEffortLock regardless of whether the lock
was acquired, resetting the shared counter on skip (Mono.empty) paths.

Also add workspaceId to the doOnError log for consistency with other
log statements in the same method.
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 9, 2026

Backend Tests - Integration Group 14

248 tests  ±0   248 ✅ ±0   8m 59s ⏱️ +10s
 21 suites ±0     0 💤 ±0 
 21 files   ±0     0 ❌ ±0 

Results for commit 416ab1b. ± Comparison against base commit 3b943e8.

♻️ This comment has been updated with latest results.

…cross aggregation queries

Removes FINAL from experiments/spans/traces/experiment_items/assertion_results
reads and replaces with explicit ORDER BY (<sort_key>) DESC, last_updated_at DESC
+ LIMIT 1 BY <dedup_key> to avoid the per-query ReplacingSorted merge work.

Also fixes two bugs:
- ::trace_ids typo (double colon) → :trace_ids
- GET_ASSERTIONS_DATA inner subquery was missing the name column and
  LIMIT 1 BY clause, causing "Unknown expression or function identifier
  'name'" at runtime. Aligned with the canonical assertion_results dedup
  pattern used elsewhere in the file.
ORDER BY ... ASC, last_updated_at ASC before LIMIT 1 BY id kept the
oldest row per id, which could cause populateExperimentItemAggregates
to process stale snapshots. Flipped to DESC, last_updated_at DESC to
match the latest-row dedup pattern used in GET_TRACES_DATA and
GET_SPANS_DATA.
…TEMS

The prior commit flipped ORDER BY to DESC on all columns so LIMIT 1 BY id
would keep the latest row per id, but that broke cursor pagination: with
DESC ordering and the filter id > :cursor, each subsequent batch only
excludes the smallest id of the previous batch and re-returns the rest,
so the iteration only ever processes the top ~batchSize ids of the
experiment and never progresses.

Keep id ASC for forward cursor progression while using last_updated_at
DESC so LIMIT 1 BY id still selects the latest version per id.
@github-actions github-actions Bot added dependencies Pull requests that update a dependency file Infrastructure labels Apr 21, 2026
…use HTTP JSONEachRow

The prior INSERT used a StringTemplate that rendered one VALUES tuple per
item with N distinct named parameters (`:id0, :id1, ...`). R2DBC's
named-parameter resolution grew super-linearly with batch size, so
`EXPERIMENT_AGGREGATES_BATCH_SIZE=10000` made the Java-side render + bind
pipeline take ~45s per batch (CH server-side INSERT itself was <200ms).
At the 60s lock TTL this left ~0 headroom: every attempt processed one
10k-item batch, then got cancelled. Smaller batch sizes avoided the
super-linear cost but capped throughput at ~1.5k items/sec.

Replace the R2DBC path for this INSERT with the ClickHouse v2 HTTP
client (`com.clickhouse:client-v2`) POSTing JSONEachRow directly, with
client-request + server-response compression enabled. The rest of the
aggregation pipeline keeps using R2DBC.

At batchSize=10000 the per-batch cost drops from ~45s to ~60-100ms
(bodyMs ~20-40 + executeMs ~35-60). End-to-end a 1M-item experiment
now completes aggregation in ~45s as a single attempt, well within
the 60s lock TTL. ExperimentAggregatesIntegrationTest (126 tests)
passes.

Also wires `EXPERIMENT_AGGREGATES_BATCH_SIZE` through the backend
docker-compose service, defaulting to 10000.
@thiagohora thiagohora force-pushed the thiagohora/OPIK-5659-fix-experiment-aggregation-query-overload branch from 88a7223 to 67ee4d1 Compare April 21, 2026 11:52
@thiagohora thiagohora marked this pull request as ready for review April 21, 2026 12:06
@thiagohora thiagohora requested a review from a team as a code owner April 21, 2026 12:06
@thiagohora thiagohora added the test-environment Deploy Opik adhoc environment label Apr 21, 2026
@CometActions
Copy link
Copy Markdown
Collaborator

Test environment is now available!

To configure additional Environment variables for your environment, run [Deploy Opik AdHoc Environment workflow] (https://github.com/comet-ml/comet-deployment/actions/workflows/deploy_opik_adhoc_env.yaml)

Access Information

The deployment has completed successfully and the version has been verified.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 21, 2026

Python SDK E2E Tests Results (Python 3.13)

347 tests   345 ✅  12m 27s ⏱️
  1 suites    2 💤
  1 files      0 ❌

Results for commit 23e7323.

♻️ This comment has been updated with latest results.

Copy link
Copy Markdown
Contributor

@ldaugusto ldaugusto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's very cool you find out the client-v2 is such more efficient in this situation, if it's something very big it's worthy to have a second pool to manage these operations. Do you have a general guide on which kind of operations would be better to do on each pool? If binded inserts are this better with client-v2, would it be the same for traces/spans inserts?

Also, PR descriptiopn mentions new default for EXPERIMENT_AGGREGATES_BATCH_SIZE is 10000, but I don't see this change

Comment thread apps/opik-backend/pom.xml
Copy link
Copy Markdown
Member

@andrescrz andrescrz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is complex, but generally well done.

I left feedback around the stuff to verify before moving forward.

Good job!

message.experimentId(), message.workspaceId(), retryCount,
config.getMaxLockExpiryRetries());

return publisher.publish(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand this re-injects back a message to this class, making it self-retryable. Please make sure there are no race conditions or other circumstances that can lead to a storm of retries here.

You need to ensure there's a short circuit or exit condition to always prevent this from happening.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good callout — here are the safeguards in place (in retriggerIfBelowMaxRetries / resetRetryCounter):

  1. Atomic retry counter per (workspaceId, experimentId) — uses RedissonClient.getAtomicLong(key) with incrementAndGet(), so increments are race-free across nodes.
  2. Counter increments before publishing — we only re-publish if incrementAndGet() returns <= maxLockExpiryRetries (default 3, configurable via maxLockExpiryRetries). Past that we counter.delete() and return Mono.empty() — the retry chain terminates, no more messages are emitted for that experiment.
  3. TTL on the countercounter.expire(retryCounterTtl) (default 10m, configurable) is set after each successful re-publish. If the experiment stops producing events, the counter self-cleans and a future legitimate trigger starts fresh.
  4. Counter reset on successful processingresetRetryCounter deletes the key only on the success path (after populateAggregations completes within the lock TTL). Fixed by a recent commit (c48ece1fbe) so the counter is NOT reset when the lock was never acquired (otherwise the cap could be bypassed).
  5. Only triggered by TimeoutException — i.e. the lock TTL genuinely expired. Other errors fall through to doOnError and do NOT re-publish.
  6. Debounce on the publisher sidepublisher.publish(...) dedupes concurrent triggers for the same experiment within a window, so even if multiple retries overlap they collapse to a single message.
  7. Log trail at WARN level on every retry and on cap-reached, so a retry storm would be visible in logs and metrics immediately.

Net effect: at most maxLockExpiryRetries automatic re-triggers per (workspace, experiment), then the chain stops — even under concurrent producers and node restarts.

🤖 Reply posted via /address-github-pr-comments

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commit daa62f7 addressed this comment by adding retriggerIfBelowMaxRetries, which increments a per-experiment atomic counter and stops re-publishing once maxLockExpiryRetries is reached, ensuring the retry loop short-circuits instead of creating a storm of retries.

Map<String, BigDecimal> feedbackScoresMap = Optional.ofNullable(feedback)
.map(FeedbackScoreData::feedbackScores).orElse(Map.of());

var node = JsonUtils.createObjectNode();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switching clients is a major architectural change. It's fine under the scope of this PR, but we'll need to be careful with the exact configuration of the JSON mapper to be used. I believe JsonUtils should be fine, so we keep things centralised. Let's make sure that no configuration for it breaks the ClickHouse communication.

I guess this is fine for now as tests pass. Likely not action for now, maybe extending the javadocs documentation in this PR, starting by this method.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in d055ca6 — add javadoc on appendJsonRow documenting the JSONEachRow contract, JsonUtils reuse, and null-coalescing policy.

🤖 Reply posted via /address-github-pr-comments

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commit d055ca6 addressed this comment by adding detailed javadoc to the appendJsonRow helper that explains why we rely on JsonUtils for JSON serialization and how the bulk insert flow keeps ClickHouse configuration centralized; the documentation now documents the reasoning the reviewer asked for.

Optional.ofNullable(assertionsMap.get(item.traceId())).map(AssertionData::assertionsArray)
.orElse(EMPTY_ARRAY_STR));

out.append(node).append('\n');
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment about this. Likely not action for now other than documenting. In the long run we'll need to document these nuances in order to extend the usage for this ClickHouse V2 client.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in d055ca6 — add javadoc on insertExperimentItems explaining the v2-client + JSONEachRow path is scoped to this batch-insert (used when EXPERIMENT_AGGREGATES_BATCH_SIZE > 1K), the per-request date_time_input_format setting, and NUM_ROWS_WRITTEN return semantics.

🤖 Reply posted via /address-github-pr-comments

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commit d055ca6 addressed this comment by adding extensive JavaDoc around appendJsonRow and insertExperimentItems that explains the JSONEachRow format, shared buffer usage, and ClickHouse V2 client considerations mentioned in the discussion.


var settings = new InsertSettings()
.logComment(logComment)
.serverSetting("date_time_input_format", "best_effort");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about this setting for ClickHouse V2 client, can we configure it globally?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We kept it as a per-request InsertSettings on purpose: in the regular R2DBC insert paths, timestamp strings are normalised via a server-side function (e.g. parseDateTime64BestEffort(...)) inside the SQL itself, so no global setting is required. On the JSONEachRow path there's no function-wrapping opportunity — the column receives the raw string from the JSON payload — so we need date_time_input_format=best_effort on the request so ClickHouse parses Instant.toString() (ISO-8601 with nanos) directly into DateTime64(9, 'UTC').

I'd keep it per-request for now — it's scoped only to this bulk-insert path and leaves the global Client defaults conservative. If we extend the v2 client to more paths in the future and they all want the same behaviour, we can promote it to a global Client.Builder setting (or equivalently add it to custom_http_params in config.yml) as a follow-up.

🤖 Reply posted via /address-github-pr-comments

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commit d055ca6 addressed this comment by documenting that the v2 bulk insert path explicitly sets date_time_input_format=best_effort on each insert request and keeps the global Client.Builder defaults conservative, explaining why we do not lift the setting into a global configuration right now.

@thiagohora
Copy link
Copy Markdown
Contributor Author

It's very cool you find out the client-v2 is such more efficient in this situation. If it's something very big it's worthy to have a second pool to manage these operations. Do you have a general guide on which kind of operations would be better to do on each pool? If binded inserts are this better with client-v2, would it be the same for traces/spans inserts?

Also, PR description mentions new default for EXPERIMENT_AGGREGATES_BATCH_SIZE is 10000, but I don't see this change.

Hi @ldaugusto

On which operations go where:

  • R2DBC stays the primary path. From our measurements the per-batch bind cost only becomes a real problem above ~1,000 items per INSERT statement — that's the inflection point where R2DBC's named-parameter resolution starts growing super-linearly with batch size.
  • Client-v2 HTTP is only used for the single INSERT INTO experiment_item_aggregates path, and only matters when you need batches >1k to keep processing within the 60s lock TTL. The win comes from FORMAT JSONEachRow over HTTP with compression — the driver serializes column-by-column without per-parameter parsing.

When the v2 path actually matters: workspaces with experiments of ≥1M items — at batchSize=1000 a full aggregation does 1,000 batches × 5–6s each and misses the TTL window, so operators need to raise EXPERIMENT_AGGREGATES_BATCH_SIZE (we tested 10,000 as a reasonable ceiling). Everyone else stays on batchSize=1000 where R2DBC is perfectly fine.

Rule of thumb: use R2DBC for everything except a bulk-insert path that has to run with >1k rows per statement (where the fixed cost per statement is dominated by named-parameter bind time rather than server-side work).

Traces/spans inserts: same template pattern, but in practice batched ≤1,000 rows per call via the SDK — comfortably below the inflection point, so no win from switching. If ingest rates ever push those per-statement batch sizes above ~1k, the same v2-HTTP pattern would apply. Worth a follow-up once we agree on a shared helper (e.g., a ClickHouseBulkInserter that hides the v2 client behind a small interface).

On EXPERIMENT_AGGREGATES_BATCH_SIZE=10000: good catch — 10,000 was just what I used for the load test against a 1M-item experiment, not something the PR actually changes as a default. The runtime default stays 1,000; the docker-compose file just exposes the env var so large-workspace operators can tune it without code changes. I'll correct the PR description to reflect that.

🤖 Reply posted via /address-github-pr-comments

- Log values moved to end of sentence for production greppability
  (ExperimentAggregatesSubscriber, ExperimentAggregatesService).
- @builder on BatchResult; call sites use builder pattern.
- Javadoc on appendJsonRow documenting JSONEachRow contract, JsonUtils
  reuse, shared-StringBuilder rationale, and null-coalescing policy.
- Javadoc on insertExperimentItems documenting why the ClickHouse v2
  HTTP client + JSONEachRow path is used only for this high-volume
  batch insert (EXPERIMENT_AGGREGATES_BATCH_SIZE can exceed 1K), why
  date_time_input_format is scoped per-request, and the NUM_ROWS_WRITTEN
  return semantics.
- DatabaseAnalyticsFactory encapsulates v2 Client construction via
  buildClient(), parseQueryParameters() splits queryParameters into
  driver options (R2DBC-specific, not forwarded) and server settings
  (custom_http_params content → Client.Builder.serverSetting).
  DatabaseAnalyticsModule.getDatabaseAnalyticsFactory provider removed.
- Unit tests (DatabaseAnalyticsFactoryTest) for parseQueryParameters
  and integration tests (DatabaseAnalyticsFactoryIntegrationTest)
  verifying custom_http_params entries land in system.settings via a
  real ClickHouse Testcontainer, that driver options don't leak, and
  that a JSONEachRow round-trip completes.
- config.yml: new env-var-overridable settings for the retry pipeline
  (lockAcquireWait, maxLockExpiryRetries, retryCounterTtl); updated
  aggregationLockTime default to 10m to match code.
… and guard it

The "finished processing all experiments" log previously fired at INFO on every
5s polling tick, even when no experiments were pending. Now the job tracks the
number of processed experiments and only emits the completion log when at least
one was actually processed. Value placed at the end of the sentence per the log
format convention used across the aggregation pipeline.
- Per-batch log in ExperimentAggregatesService was labelled batchSize but
  carried result.processedCount(); rename placeholder to processedCount so
  the wording matches the value. Configured batch size is still logged once
  at job start.
- Denormalization job counter previously incremented in doOnNext before
  flatMap(processExperiment), so the processedExperiments log counted items
  that failed and were skipped by onErrorContinue. Move the increment to
  doOnSuccess on the inner Mono so the count reflects actually processed
  experiments.
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 21, 2026

Python SDK E2E Tests Results (Python 3.11)

347 tests   345 ✅  12m 48s ⏱️
  1 suites    2 💤
  1 files      0 ❌

Results for commit c04ab13.

♻️ This comment has been updated with latest results.

ldaugusto
ldaugusto previously approved these changes Apr 22, 2026
Copy link
Copy Markdown
Contributor

@ldaugusto ldaugusto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing all the suggestions! It's approved on my side.

andrescrz
andrescrz previously approved these changes Apr 22, 2026
Copy link
Copy Markdown
Member

@andrescrz andrescrz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM:

  1. Just double check the correctness of the values on the aggregated query.
  2. Double check the OPIK_EXPERIMENT_DENORM_RETRY_COUNTER_TTL default of 2h.
  3. Missing mirroring values in test config YML file.

The rest is minor, feel free to go ahead if all good.

Comment on lines +771 to +785
count() AS n,
avg(value) AS avg_value,
any(value) AS any_value,
any(reason) AS any_reason,
arrayStringConcat(groupArray(if(reason = '', '\\<no reason>', reason)), ', ') AS reason_concat,
arrayStringConcat(groupArray(category_name), ', ') AS category_name_concat,
any(source) AS source_any,
arrayStringConcat(groupArray(created_by), ', ') AS created_by_concat,
arrayStringConcat(groupArray(last_updated_by), ', ') AS last_updated_by_concat,
min(created_at) AS created_at_min,
max(last_updated_at) AS last_updated_at_max,
mapFromArrays(
groupArray(author),
groupArray(tuple(value, reason, category_name, source, last_updated_at))
) AS value_by_author
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you double check the correctness of this? Mostly the count without an inner ID (maybe with distinct) and the any functions.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commit 4eccf7f addressed this comment by changing the aggregate count to use count(DISTINCT author), aligning with the requested clarification about counting without an inner ID.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 4eccf7f (count()count(DISTINCT author)). Tested the full query against a live ClickHouse with three scenarios: 1 author, 2 authors (alice@10:55 + bob@11:00), and 3 authors (alice@11:50, bob@11:55, carol@12:00). Also added a duplicate-insert stress test (second alice row at 10:57 for trace2) to simulate dedup edge cases.

count() vs count(DISTINCT author)

  • With the current LIMIT 1 BY ..., author: count() == count(DISTINCT author) in every scenario (1/1, 2/2, 3/3). The dedup guarantees one row per author so both forms agree.
  • Without the LIMIT 1 BY (to simulate a regressed dedup): count() reports 2/1, 4/2, 4/3 — overcounts; count(DISTINCT author) stays correct.

Switched to count(DISTINCT author) — same semantics today, defensive against accidental regression of the dedup clause. Low-risk wording change with no perf impact (the CTE is already 1 row per author).

any(value) / any(reason) / any(source)

  • Ran the aggregation 5 times sequentially + 3 times with max_threads=8 — every run produced identical output (all 3 any(...) picked the latest author's row: bob for trace2, carol for trace3).
  • Compared against the raw-query pattern used in ExperimentItemDAO.java:231 (arrayElement(entries, 1).4 AS source) — both produce the same output. In practice the outer aggregation reads the deduped CTE in scan order which is governed by its ORDER BY last_updated_at DESC, so any() and arrayElement(entries, 1) both yield the latest author's values.
  • any(value) and any(reason) are additionally gated by IF(n=1, any_*, avg/concat) so even if the order ever slipped, a mismatch would only show up for 1-row groups — where any(x) over a single row is trivially deterministic. Same count() + any(value) pattern is used in 14 places across DatasetItemDAO, DatasetItemVersionDAO, ExperimentDAO, KpiCardDAO, OptimizationDAO, ProjectMetricsDAO, SpanDAO, and TraceDAO, so this PR is consistent with the existing codebase convention.

Leaving any(...) as-is in this PR since behaviour matches the raw path end-to-end. Happy to tighten further in a follow-up (e.g., argMax(source, last_updated_at)) if you'd prefer explicit determinism across the aggregation stack.

🤖 Reply posted via /address-github-pr-comments

Comment thread apps/opik-backend/config.yml Outdated
Comment thread apps/opik-backend/config.yml
…ggregation

Defensive change: count() was correct given the LIMIT 1 BY ..., author
dedup but silently overcounts if the dedup ever regresses. Switching
to count(DISTINCT author) keeps the invariant explicit at negligible
cost (one row per author in the deduped CTE).
@thiagohora thiagohora dismissed stale reviews from andrescrz and ldaugusto via 4eccf7f April 22, 2026 11:56
- ExperimentAggregatesSubscriber: extract buildRetryCountKey helper so
  retriggerIfBelowMaxRetries and resetRetryCounter share a single key
  generation site.
- ExperimentDenormalizationJob: switch processExperiment to return the
  experiment id so the outer doOnNext actually fires per successful
  processing (Mono<Void> never triggers doOnNext). Retains the same
  per-experiment log and counter increment semantic.
- config.yml: lower retryCounterTtl default from 2h to 30m to match
  the worst-case retry cycle (3 attempts at 10m lock TTL) plus a
  modest idle buffer.
- config-test.yml: add missing lockAcquireWait, maxLockExpiryRetries,
  retryCounterTtl entries to the experimentDenormalization block.
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 22, 2026

Backend Tests - Integration Group 8

 35 files   35 suites   6m 40s ⏱️
466 tests 465 ✅ 1 💤 0 ❌
450 runs  449 ✅ 1 💤 0 ❌

Results for commit c996420.

♻️ This comment has been updated with latest results.

Previously counter.expire(...) ran only after publisher.publish(...)
succeeded. If publish failed, the retry counter was incremented
without a TTL and could linger, causing subsequent lock-expiry
retries to hit maxLockExpiryRetries prematurely.

Reordering so expire runs right after the increment guarantees the
counter always has a TTL and will age out naturally regardless of
whether the publish succeeds.
@thiagohora thiagohora merged commit f858952 into main Apr 22, 2026
76 checks passed
@thiagohora thiagohora deleted the thiagohora/OPIK-5659-fix-experiment-aggregation-query-overload branch April 22, 2026 13:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Backend dependencies Pull requests that update a dependency file java Pull requests that update Java code test-environment Deploy Opik adhoc environment tests Including test files, or tests related like configuration.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants