Skip to content

Commit 67ee4d1

Browse files
committed
perf(aggregation): bulk-insert experiment_item_aggregates via ClickHouse HTTP JSONEachRow
The prior INSERT used a StringTemplate that rendered one VALUES tuple per item with N distinct named parameters (`:id0, :id1, ...`). R2DBC's named-parameter resolution grew super-linearly with batch size, so `EXPERIMENT_AGGREGATES_BATCH_SIZE=10000` made the Java-side render + bind pipeline take ~45s per batch (CH server-side INSERT itself was <200ms). At the 60s lock TTL this left ~0 headroom: every attempt processed one 10k-item batch, then got cancelled. Smaller batch sizes avoided the super-linear cost but capped throughput at ~1.5k items/sec. Replace the R2DBC path for this INSERT with the ClickHouse v2 HTTP client (`com.clickhouse:client-v2`) POSTing JSONEachRow directly, with client-request + server-response compression enabled. The rest of the aggregation pipeline keeps using R2DBC. At batchSize=10000 the per-batch cost drops from ~45s to ~60-100ms (bodyMs ~20-40 + executeMs ~35-60). End-to-end a 1M-item experiment now completes aggregation in ~45s as a single attempt, well within the 60s lock TTL. ExperimentAggregatesIntegrationTest (126 tests) passes. Also wires `EXPERIMENT_AGGREGATES_BATCH_SIZE` through the backend docker-compose service, defaulting to 10000.
1 parent cab39db commit 67ee4d1

3 files changed

Lines changed: 124 additions & 153 deletions

File tree

apps/opik-backend/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,11 @@
250250
<artifactId>clickhouse-http-client</artifactId>
251251
<version>${clickhouse-java.version}</version>
252252
</dependency>
253+
<dependency>
254+
<groupId>com.clickhouse</groupId>
255+
<artifactId>client-v2</artifactId>
256+
<version>${clickhouse-java.version}</version>
257+
</dependency>
253258
<dependency>
254259
<groupId>org.apache.httpcomponents.client5</groupId>
255260
<artifactId>httpclient5</artifactId>

apps/opik-backend/src/main/java/com/comet/opik/domain/experiments/aggregations/ExperimentAggregatesDAO.java

Lines changed: 97 additions & 153 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
package com.comet.opik.domain.experiments.aggregations;
22

3+
import com.clickhouse.client.api.Client;
4+
import com.clickhouse.client.api.insert.InsertResponse;
5+
import com.clickhouse.client.api.insert.InsertSettings;
6+
import com.clickhouse.client.api.metrics.ServerMetrics;
7+
import com.clickhouse.data.ClickHouseFormat;
38
import com.comet.opik.api.AssertionScoreAverage;
49
import com.comet.opik.api.DatasetItem.DatasetItemPage;
510
import com.comet.opik.api.EvaluationMethod;
@@ -35,7 +40,6 @@
3540
import com.comet.opik.infrastructure.auth.RequestContext;
3641
import com.comet.opik.infrastructure.db.TransactionTemplateAsync;
3742
import com.comet.opik.utils.JsonUtils;
38-
import com.comet.opik.utils.template.TemplateUtils;
3943
import com.fasterxml.jackson.core.type.TypeReference;
4044
import com.fasterxml.jackson.databind.JsonNode;
4145
import com.google.inject.ImplementedBy;
@@ -54,8 +58,11 @@
5458
import org.stringtemplate.v4.ST;
5559
import reactor.core.publisher.Flux;
5660
import reactor.core.publisher.Mono;
61+
import reactor.core.scheduler.Schedulers;
5762

63+
import java.io.ByteArrayInputStream;
5864
import java.math.BigDecimal;
65+
import java.nio.charset.StandardCharsets;
5966
import java.time.Instant;
6067
import java.util.Arrays;
6168
import java.util.List;
@@ -83,7 +90,6 @@
8390
import static com.comet.opik.infrastructure.DatabaseUtils.getSTWithLogComment;
8491
import static com.comet.opik.utils.AsyncUtils.makeFluxContextAware;
8592
import static com.comet.opik.utils.AsyncUtils.makeMonoContextAware;
86-
import static com.comet.opik.utils.template.TemplateUtils.getQueryItemPlaceHolder;
8793

8894
@ImplementedBy(ExperimentAggregatesDAOImpl.class)
8995
public interface ExperimentAggregatesDAO {
@@ -126,6 +132,7 @@ class ExperimentAggregatesDAOImpl implements ExperimentAggregatesDAO {
126132
private final @NonNull TransactionTemplateAsync asyncTemplate;
127133
private final @NonNull FilterQueryBuilder filterQueryBuilder;
128134
private final @NonNull GroupingQueryBuilder groupingQueryBuilder;
135+
private final @NonNull Client clickHouseClient;
129136

130137
/**
131138
* Filter strategies used for experiment aggregates search binding.
@@ -945,71 +952,6 @@ ORDER BY (workspace_id, project_id, entity_id, id) DESC, last_updated_at DESC
945952
;
946953
""";
947954

948-
/**
949-
* Insert experiment item aggregate
950-
*/
951-
private static final String INSERT_EXPERIMENT_ITEM_AGGREGATE = """
952-
INSERT INTO experiment_item_aggregates
953-
(
954-
workspace_id,
955-
id,
956-
project_id,
957-
experiment_id,
958-
dataset_item_id,
959-
trace_id,
960-
input,
961-
output,
962-
input_slim,
963-
output_slim,
964-
metadata,
965-
duration,
966-
total_estimated_cost,
967-
usage,
968-
feedback_scores,
969-
feedback_scores_array,
970-
comments_array_agg,
971-
visibility_mode,
972-
created_at,
973-
last_updated_at,
974-
created_by,
975-
last_updated_by,
976-
execution_policy,
977-
assertions_array
978-
)
979-
SETTINGS log_comment = '<log_comment>'
980-
FORMAT Values
981-
<items:{item |
982-
(
983-
:workspace_id,
984-
:id<item.index>,
985-
:project_id,
986-
:experiment_id<item.index>,
987-
:dataset_item_id<item.index>,
988-
:trace_id<item.index>,
989-
:input<item.index>,
990-
:output<item.index>,
991-
:input_slim<item.index>,
992-
:output_slim<item.index>,
993-
:metadata<item.index>,
994-
:duration<item.index>,
995-
:total_estimated_cost<item.index>,
996-
if(:has_usage<item.index>, mapFromArrays(:usage_keys<item.index>, :usage_values<item.index>), CAST(map() AS Map(String, Int64)) ),
997-
if(:has_feedback_scores<item.index>, mapFromArrays(:feedback_scores_keys<item.index>, CAST(:feedback_scores_values<item.index> AS Array(Decimal64(9)))), CAST(map() AS Map(String, Decimal64(9))) ),
998-
:feedback_scores_array<item.index>,
999-
:comments_array_agg<item.index>,
1000-
:visibility_mode<item.index>,
1001-
parseDateTime64BestEffort(:created_at<item.index>, 9),
1002-
parseDateTime64BestEffort(:last_updated_at<item.index>, 9),
1003-
:created_by<item.index>,
1004-
:last_updated_by<item.index>,
1005-
:execution_policy<item.index>,
1006-
:assertions_array<item.index>
1007-
)
1008-
<if(item.hasNext)>,<endif>
1009-
}>
1010-
;
1011-
""";
1012-
1013955
/**
1014956
* Get experiment items count
1015957
*/
@@ -1852,79 +1794,67 @@ private AssertionScoreAggregations createEmptyAssertionScoreAggregations(UUID ex
18521794
.build();
18531795
}
18541796

1855-
private void bindItemsParameters(Statement statement,
1856-
List<ExperimentItemData> items,
1797+
private void appendJsonRow(StringBuilder out,
1798+
String workspaceId,
1799+
UUID projectId,
1800+
ExperimentItemData item,
18571801
Map<UUID, TraceData> tracesMap,
18581802
Map<UUID, SpanData> spansMap,
18591803
Map<UUID, FeedbackScoreData> feedbackMap,
18601804
Map<UUID, CommentsData> commentsMap,
18611805
Map<UUID, AssertionData> assertionsMap) {
18621806

1863-
for (int i = 0; i < items.size(); i++) {
1864-
var item = items.get(i);
1865-
1866-
TraceData trace = tracesMap.get(item.traceId());
1867-
SpanData span = spansMap.get(item.traceId());
1868-
FeedbackScoreData feedback = feedbackMap.get(item.traceId());
1869-
1870-
Map<String, Long> usageMap = Optional.ofNullable(span)
1871-
.map(SpanData::usage)
1872-
.orElse(Map.of());
1873-
Map<String, BigDecimal> feedbackScoresMap = Optional.ofNullable(feedback)
1874-
.map(FeedbackScoreData::feedbackScores)
1875-
.orElse(Map.of());
1876-
String feedbackScoresArray = Optional.ofNullable(feedback)
1877-
.map(FeedbackScoreData::feedbackScoresArray)
1878-
.orElse(EMPTY_ARRAY_STR);
1879-
1880-
statement.bind("id" + i, item.id())
1881-
.bind("has_usage" + i, !usageMap.isEmpty())
1882-
.bind("has_feedback_scores" + i, !feedbackScoresMap.isEmpty())
1883-
.bind("experiment_id" + i, item.experimentId())
1884-
.bind("dataset_item_id" + i, item.datasetItemId())
1885-
.bind("trace_id" + i, item.traceId())
1886-
.bind("input" + i, Optional.ofNullable(trace).map(TraceData::input).orElse(""))
1887-
.bind("output" + i, Optional.ofNullable(trace).map(TraceData::output).orElse(""))
1888-
.bind("input_slim" + i, Optional.ofNullable(trace).map(TraceData::inputSlim).orElse(""))
1889-
.bind("output_slim" + i, Optional.ofNullable(trace).map(TraceData::outputSlim).orElse(""))
1890-
.bind("metadata" + i, Optional.ofNullable(trace).map(TraceData::metadata).orElse(""))
1891-
.bind("duration" + i, Optional.ofNullable(trace).map(TraceData::duration).orElse(BigDecimal.ZERO))
1892-
.bind("total_estimated_cost" + i,
1893-
Optional.ofNullable(span).map(SpanData::totalEstimatedCost).orElse(BigDecimal.ZERO))
1894-
.bind("feedback_scores_array" + i, feedbackScoresArray)
1895-
.bind("comments_array_agg" + i,
1896-
Optional.ofNullable(commentsMap.get(item.traceId()))
1897-
.map(CommentsData::commentsArrayAgg)
1898-
.orElse(EMPTY_ARRAY_STR))
1899-
.bind("visibility_mode" + i,
1900-
Optional.ofNullable(trace)
1901-
.map(TraceData::visibilityMode)
1902-
.map(VisibilityMode::getValue)
1903-
.orElse(VisibilityMode.DEFAULT.getValue()))
1904-
.bind("created_at" + i, item.createdAt().toString())
1905-
.bind("last_updated_at" + i, item.lastUpdatedAt().toString())
1906-
.bind("created_by" + i, item.createdBy())
1907-
.bind("last_updated_by" + i, item.lastUpdatedBy())
1908-
.bind("execution_policy" + i,
1909-
Optional.ofNullable(item.executionPolicy())
1910-
.filter(StringUtils::isNotBlank)
1911-
.orElse(""))
1912-
.bind("assertions_array" + i,
1913-
Optional.ofNullable(assertionsMap.get(item.traceId()))
1914-
.map(AssertionData::assertionsArray)
1915-
.orElse(EMPTY_ARRAY_STR));
1916-
1917-
// Bind array parameters only if maps are not empty
1918-
var usageArrays = mapToArrays(usageMap, String[]::new, Long[]::new, Long::longValue);
1919-
statement.bind("usage_keys" + i, usageArrays.keys());
1920-
statement.bind("usage_values" + i, usageArrays.values());
1921-
1922-
var feedbackScoresArrays = mapToArrays(feedbackScoresMap,
1923-
String[]::new, BigDecimal[]::new,
1924-
v -> BigDecimal.valueOf(v.doubleValue()));
1925-
statement.bind("feedback_scores_keys" + i, feedbackScoresArrays.keys());
1926-
statement.bind("feedback_scores_values" + i, feedbackScoresArrays.values());
1927-
}
1807+
TraceData trace = tracesMap.get(item.traceId());
1808+
SpanData span = spansMap.get(item.traceId());
1809+
FeedbackScoreData feedback = feedbackMap.get(item.traceId());
1810+
1811+
Map<String, Long> usageMap = Optional.ofNullable(span).map(SpanData::usage).orElse(Map.of());
1812+
Map<String, BigDecimal> feedbackScoresMap = Optional.ofNullable(feedback)
1813+
.map(FeedbackScoreData::feedbackScores).orElse(Map.of());
1814+
1815+
var node = JsonUtils.createObjectNode();
1816+
node.put("workspace_id", workspaceId);
1817+
node.put("id", item.id().toString());
1818+
node.put("project_id", projectId.toString());
1819+
node.put("experiment_id", item.experimentId().toString());
1820+
node.put("dataset_item_id", item.datasetItemId().toString());
1821+
node.put("trace_id", item.traceId().toString());
1822+
node.put("input", Optional.ofNullable(trace).map(TraceData::input).orElse(""));
1823+
node.put("output", Optional.ofNullable(trace).map(TraceData::output).orElse(""));
1824+
node.put("input_slim", Optional.ofNullable(trace).map(TraceData::inputSlim).orElse(""));
1825+
node.put("output_slim", Optional.ofNullable(trace).map(TraceData::outputSlim).orElse(""));
1826+
node.put("metadata", Optional.ofNullable(trace).map(TraceData::metadata).orElse(""));
1827+
node.put("duration",
1828+
Optional.ofNullable(trace).map(TraceData::duration).orElse(BigDecimal.ZERO).doubleValue());
1829+
node.put("total_estimated_cost",
1830+
Optional.ofNullable(span).map(SpanData::totalEstimatedCost).orElse(BigDecimal.ZERO)
1831+
.toPlainString());
1832+
1833+
var usageNode = node.putObject("usage");
1834+
usageMap.forEach(usageNode::put);
1835+
1836+
var feedbackNode = node.putObject("feedback_scores");
1837+
feedbackScoresMap.forEach((k, v) -> feedbackNode.put(k, v.toPlainString()));
1838+
1839+
node.put("feedback_scores_array",
1840+
Optional.ofNullable(feedback).map(FeedbackScoreData::feedbackScoresArray).orElse(EMPTY_ARRAY_STR));
1841+
node.put("comments_array_agg",
1842+
Optional.ofNullable(commentsMap.get(item.traceId())).map(CommentsData::commentsArrayAgg)
1843+
.orElse(EMPTY_ARRAY_STR));
1844+
node.put("visibility_mode",
1845+
Optional.ofNullable(trace).map(TraceData::visibilityMode).map(VisibilityMode::getValue)
1846+
.orElse(VisibilityMode.DEFAULT.getValue()));
1847+
node.put("created_at", item.createdAt().toString());
1848+
node.put("last_updated_at", item.lastUpdatedAt().toString());
1849+
node.put("created_by", item.createdBy());
1850+
node.put("last_updated_by", item.lastUpdatedBy());
1851+
node.put("execution_policy",
1852+
Optional.ofNullable(item.executionPolicy()).filter(StringUtils::isNotBlank).orElse(""));
1853+
node.put("assertions_array",
1854+
Optional.ofNullable(assertionsMap.get(item.traceId())).map(AssertionData::assertionsArray)
1855+
.orElse(EMPTY_ARRAY_STR));
1856+
1857+
out.append(node).append('\n');
19281858
}
19291859

19301860
private Mono<Long> insertExperimentItemAggregates(
@@ -1948,27 +1878,41 @@ private Mono<Long> insertExperimentItemAggregates(
19481878
Map<UUID, AssertionData> assertionsMap = assertionsData.stream()
19491879
.collect(Collectors.toMap(AssertionData::traceId, Function.identity()));
19501880

1951-
return asyncTemplate.nonTransaction(connection -> {
1952-
return makeMonoContextAware((userName, workspaceId) -> {
1953-
1954-
List<TemplateUtils.QueryItem> queryItems = getQueryItemPlaceHolder(items.size());
1955-
1956-
var template = getSTWithLogComment(INSERT_EXPERIMENT_ITEM_AGGREGATE,
1957-
"insertExperimentItemAggregate", workspaceId, userName, items.size())
1958-
.add("items", queryItems);
1959-
1960-
var statement = connection.createStatement(template.render())
1961-
.bind("workspace_id", workspaceId)
1962-
.bind("project_id", projectId);
1881+
return insertExperimentItems(projectId, items, tracesMap, spansMap, feedbackMap, commentsMap, assertionsMap);
1882+
}
19631883

1964-
// Bind item parameters in batch
1965-
bindItemsParameters(statement, items, tracesMap, spansMap, feedbackMap, commentsMap, assertionsMap);
1884+
private Mono<Long> insertExperimentItems(UUID projectId,
1885+
List<ExperimentItemData> items,
1886+
Map<UUID, TraceData> tracesMap,
1887+
Map<UUID, SpanData> spansMap,
1888+
Map<UUID, FeedbackScoreData> feedbackMap,
1889+
Map<UUID, CommentsData> commentsMap,
1890+
Map<UUID, AssertionData> assertionsMap) {
19661891

1967-
return Mono.from(statement.execute())
1968-
.flatMapMany(Result::getRowsUpdated)
1969-
.reduce(0L, Long::sum);
1970-
});
1971-
});
1892+
return makeMonoContextAware((userName, workspaceId) -> Mono.fromCallable(() -> {
1893+
StringBuilder body = new StringBuilder();
1894+
items.forEach(item -> appendJsonRow(body, workspaceId, projectId, item,
1895+
tracesMap, spansMap, feedbackMap, commentsMap, assertionsMap));
1896+
byte[] payload = body.toString().getBytes(StandardCharsets.UTF_8);
1897+
1898+
String logComment = "insert_experiment_item_aggregate:%s:%s:%d".formatted(
1899+
workspaceId, userName == null ? "" : userName, items.size());
1900+
1901+
var settings = new InsertSettings()
1902+
.logComment(logComment)
1903+
.serverSetting("date_time_input_format", "best_effort");
1904+
1905+
try (InsertResponse response = clickHouseClient.insert(
1906+
"experiment_item_aggregates",
1907+
new ByteArrayInputStream(payload),
1908+
ClickHouseFormat.JSONEachRow,
1909+
settings).get()) {
1910+
return response.getMetrics().getMetric(ServerMetrics.NUM_ROWS_WRITTEN).getLong();
1911+
}
1912+
}).subscribeOn(Schedulers.boundedElastic())
1913+
.doOnError(err -> log.error(
1914+
"Failed to insert experiment item aggregates: items='{}'",
1915+
items.size(), err)));
19721916
}
19731917

19741918
// Row mapping methods

apps/opik-backend/src/main/java/com/comet/opik/infrastructure/db/DatabaseAnalyticsModule.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.comet.opik.infrastructure.db;
22

3+
import com.clickhouse.client.api.Client;
34
import com.comet.opik.infrastructure.ClickHouseLogAppenderConfig;
45
import com.comet.opik.infrastructure.DatabaseAnalyticsFactory;
56
import com.comet.opik.infrastructure.OpikConfiguration;
@@ -38,6 +39,27 @@ public ConnectionFactory getConnectionFactory() {
3839
return connectionFactory;
3940
}
4041

42+
@Provides
43+
@Singleton
44+
public DatabaseAnalyticsFactory getDatabaseAnalyticsFactory() {
45+
return databaseAnalyticsFactory;
46+
}
47+
48+
@Provides
49+
@Singleton
50+
public Client getClickHouseClient() {
51+
var config = databaseAnalyticsFactory;
52+
String endpoint = "%s://%s:%d/".formatted(config.getProtocol().getValue(), config.getHost(), config.getPort());
53+
return new Client.Builder()
54+
.addEndpoint(endpoint)
55+
.setUsername(config.getUsername())
56+
.setPassword(config.getPassword())
57+
.setDefaultDatabase(config.getDatabaseName())
58+
.compressClientRequest(true)
59+
.compressServerResponse(true)
60+
.build();
61+
}
62+
4163
@Provides
4264
@Singleton
4365
@Named("Database Analytics Database Name")

0 commit comments

Comments
 (0)