Skip to content

Commit 4530574

Browse files
committed
refactor(clickhouse): use Builder bulkInsert for addBatch, drop manual JSONEachRow body assembly
addBatch() now hands its built rows directly to the typed bulkInsert(Format::JSONEachRow, ...) entry point on the ClickHouse builder and ships the eager $statement->body to the HTTP layer, replacing the hand-rolled array_map(json_encode, ...) + implode("\n", ...) assembly. The runtime instanceof guard on FormattedInsertStatement is gone too - bulkInsert() returns the typed statement by signature. insert() now takes the serialized body string instead of an array of pre-encoded rows; the only caller is addBatch(). createDailyMaterializedView() picks up the createMaterializedView() argument-order change in the new query branch (name, body, targetTable, ifNotExists). The snapshot test for the MV path is updated to match. New snapshots: - testAddBatchEmitsBulkInsertQueryAndBody asserts the envelope query and the serialized JSONEachRow body for a two-row fixture. - testNestedColumnDotQuoting validates that columns containing a dot (ClickHouse nested-array convention) remain single-backtick-wrapped atomic identifiers, exercising the QuotesIdentifiers::quoteLiteral() fix shipped in utopia-php/query#13.
1 parent f88c8e0 commit 4530574

2 files changed

Lines changed: 76 additions & 35 deletions

File tree

src/Usage/Adapter/ClickHouse.php

Lines changed: 21 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
use InvalidArgumentException;
99
use Utopia\Fetch\Client;
1010
use Utopia\Query\Builder\ClickHouse as ClickHouseBuilder;
11-
use Utopia\Query\Builder\ClickHouse\FormattedInsertStatement;
11+
use Utopia\Query\Builder\ClickHouse\Format;
1212
use Utopia\Query\Method;
1313
use Utopia\Query\Query;
1414
use Utopia\Query\Schema\ClickHouse as ClickHouseSchema;
@@ -27,7 +27,7 @@
2727
* the utopia-php/query 0.3.x builder / schema layer:
2828
*
2929
* - DDL (setup, daily table, materialized view) to Utopia\Query\Schema\ClickHouse
30-
* - INSERT (addBatch) to Utopia\Query\Builder\ClickHouse::insertFormat
30+
* - INSERT (addBatch) to Utopia\Query\Builder\ClickHouse::bulkInsert
3131
* - SELECT (find, count, sum, daily, totals, time series) to Utopia\Query\Builder\ClickHouse
3232
* with `useNamedBindings()` for `{paramN:Type}` placeholders
3333
* - DELETE (purge) to Utopia\Query\Builder\ClickHouse::delete
@@ -719,19 +719,20 @@ function (Exception $e, int $attempt) use ($sql): Exception {
719719
}
720720

721721
/**
722-
* Stream a list of JSON rows into a `INSERT ... FORMAT JSONEachRow` statement.
722+
* Ship a serialized `INSERT ... FORMAT <name>` body to ClickHouse's HTTP
723+
* interface. The body must already be formatted to match the FORMAT
724+
* envelope declared in $sql (the builder's `bulkInsert()` emits both).
723725
*
724-
* @param array<string> $data Pre-encoded JSON rows
725726
* @throws Exception
726727
*/
727-
private function insert(string $sql, array $data): void
728+
private function insert(string $sql, string $body): void
728729
{
729-
if (empty($data)) {
730+
if ($body === '') {
730731
return;
731732
}
732733

733734
$this->executeWithRetry(
734-
function (int $attempt) use ($sql, $data): void {
735+
function (int $attempt) use ($sql, $body): void {
735736
$startTime = microtime(true);
736737
$scheme = $this->secure ? 'https' : 'http';
737738

@@ -759,9 +760,9 @@ function (int $attempt) use ($sql, $data): void {
759760
$this->requestCount++;
760761
}
761762

762-
$body = implode("\n", $data);
763+
$rowCount = \substr_count($body, "\n") + 1;
763764

764-
$params = ['rows' => count($data), 'bytes' => strlen($body)];
765+
$params = ['rows' => $rowCount, 'bytes' => strlen($body)];
765766

766767
try {
767768
$response = $this->client->fetch(
@@ -776,7 +777,6 @@ function (int $attempt) use ($sql, $data): void {
776777
$bodyStr = $response->getBody();
777778
$bodyStr = is_string($bodyStr) ? $bodyStr : '';
778779
$duration = microtime(true) - $startTime;
779-
$rowCount = count($data);
780780
$baseError = "ClickHouse insert failed with HTTP {$httpCode}: {$bodyStr}";
781781
$errorMsg = $this->buildErrorMessage($baseError, null, $sql . " ({$rowCount} rows)");
782782
$this->logQuery($sql, $params, $duration, false, $errorMsg, $attempt);
@@ -793,15 +793,15 @@ function (int $attempt) use ($sql, $data): void {
793793
function (Exception $e, ?int $httpCode): bool {
794794
return false;
795795
},
796-
function (Exception $e, int $attempt) use ($sql, $data): Exception {
796+
function (Exception $e, int $attempt) use ($sql, $body): Exception {
797797
$cleanMessage = preg_replace('/\|HTTP_CODE:\d+$/', '', $e->getMessage());
798798
$cleanMessage = is_string($cleanMessage) ? $cleanMessage : $e->getMessage();
799799

800800
if (strpos($cleanMessage, '[Operation:') !== false) {
801801
return new Exception($cleanMessage, 0, $e);
802802
}
803803

804-
$rowCount = count($data);
804+
$rowCount = \substr_count($body, "\n") + 1;
805805
$baseError = "ClickHouse insert execution failed after " . ($attempt + 1) . " attempt(s): {$cleanMessage}";
806806
$errorMsg = $this->buildErrorMessage($baseError, null, $sql . " ({$rowCount} rows)");
807807
return new Exception($errorMsg, 0, $e);
@@ -1139,8 +1139,8 @@ private function createDailyMaterializedView(): void
11391139

11401140
$statement = $this->newSchema()->createMaterializedView(
11411141
$this->getEventsDailyMvName(),
1142-
$this->getEventsDailyTableName(),
11431142
$body,
1143+
$this->getEventsDailyTableName(),
11441144
true,
11451145
);
11461146

@@ -1330,17 +1330,6 @@ public function addBatch(array $metrics, string $type, int $batchSize = self::IN
13301330
$tableName = $this->getTableForType($type);
13311331
$columns = $this->getInsertColumns($type);
13321332

1333-
$statement = $this->newBuilder($type)
1334-
->into($tableName)
1335-
->insertFormat('JSONEachRow', $columns)
1336-
->insert();
1337-
1338-
if (!$statement instanceof FormattedInsertStatement) {
1339-
throw new Exception('Expected FormattedInsertStatement from builder insertFormat()');
1340-
}
1341-
1342-
$sql = $this->qualifyDdl($statement->query, $tableName);
1343-
13441333
foreach (\array_chunk($metrics, $batchSize) as $metricsBatch) {
13451334
$rows = [];
13461335

@@ -1392,14 +1381,16 @@ public function addBatch(array $metrics, string $type, int $batchSize = self::IN
13921381
$row['tenant'] = $tenant;
13931382
}
13941383

1395-
$encoded = json_encode($row);
1396-
if ($encoded === false) {
1397-
throw new Exception("Failed to JSON encode metric row: " . json_last_error_msg());
1398-
}
1399-
$rows[] = $encoded;
1384+
$rows[] = $row;
14001385
}
14011386

1402-
$this->insert($sql, $rows);
1387+
$statement = $this->newBuilder($type)
1388+
->into($tableName)
1389+
->bulkInsert(Format::JSONEachRow, $rows, $columns);
1390+
1391+
$sql = $this->qualifyDdl($statement->query, $tableName);
1392+
1393+
$this->insert($sql, $statement->body ?? '');
14031394
}
14041395

14051396
return true;

tests/Usage/Adapter/ClickHouseSqlSnapshotTest.php

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use PHPUnit\Framework\TestCase;
66
use Utopia\Query\Builder\ClickHouse as ClickHouseBuilder;
7+
use Utopia\Query\Builder\ClickHouse\Format;
78
use Utopia\Query\Builder\ClickHouse\FormattedInsertStatement;
89
use Utopia\Query\Query;
910
use Utopia\Query\Schema\ClickHouse as ClickHouseSchema;
@@ -111,8 +112,8 @@ public function testDailyMaterializedViewDdl(): void
111112

112113
$statement = (new ClickHouseSchema())->createMaterializedView(
113114
'usage_events_daily_mv',
114-
'usage_events_daily',
115115
$body,
116+
'usage_events_daily',
116117
true,
117118
);
118119

@@ -197,14 +198,32 @@ public function testFindAggregatedWithGroupByTimeBucket(): void
197198
}
198199

199200
/**
200-
* Snapshot for `addBatch()` INSERT … FORMAT JSONEachRow.
201+
* Snapshot for `addBatch()`: `bulkInsert(Format::JSONEachRow, ...)`
202+
* emits both the INSERT envelope and the serialized JSONEachRow body
203+
* in one typed call.
201204
*/
202-
public function testAddBatchInsertFormat(): void
205+
public function testAddBatchEmitsBulkInsertQueryAndBody(): void
203206
{
207+
$rows = [
208+
[
209+
'id' => 'row1',
210+
'metric' => 'requests',
211+
'value' => 7,
212+
'time' => '2026-03-01 00:00:00',
213+
'tags' => ['region' => 'us-east'],
214+
],
215+
[
216+
'id' => 'row2',
217+
'metric' => 'requests',
218+
'value' => 3,
219+
'time' => '2026-03-01 00:01:00',
220+
'tags' => ['region' => 'eu-west'],
221+
],
222+
];
223+
204224
$statement = (new ClickHouseBuilder())
205225
->into('events')
206-
->insertFormat('JSONEachRow', ['id', 'metric', 'value', 'time', 'tags'])
207-
->insert();
226+
->bulkInsert(Format::JSONEachRow, $rows, ['id', 'metric', 'value', 'time', 'tags']);
208227

209228
$this->assertInstanceOf(FormattedInsertStatement::class, $statement);
210229
$this->assertSame(
@@ -214,6 +233,37 @@ public function testAddBatchInsertFormat(): void
214233
$this->assertSame([], $statement->bindings);
215234
$this->assertSame('JSONEachRow', $statement->format);
216235
$this->assertSame(['id', 'metric', 'value', 'time', 'tags'], $statement->columns);
236+
$this->assertSame(
237+
'{"id":"row1","metric":"requests","value":7,"time":"2026-03-01 00:00:00","tags":{"region":"us-east"}}' . "\n"
238+
. '{"id":"row2","metric":"requests","value":3,"time":"2026-03-01 00:01:00","tags":{"region":"eu-west"}}',
239+
$statement->body,
240+
);
241+
$this->assertStringEndsNotWith("\n", (string) $statement->body);
242+
}
243+
244+
/**
245+
* Snapshot for atomic identifier quoting: column names containing a dot
246+
* (ClickHouse nested-array convention, e.g. `meta.key Array(String)`)
247+
* must remain single-backtick-wrapped atomic identifiers, not split
248+
* across `.` into `` `meta`.`key` ``.
249+
*/
250+
public function testNestedColumnDotQuoting(): void
251+
{
252+
$table = (new ClickHouseSchema())->table('events_with_meta');
253+
$table->string('id')->primary();
254+
$table->string('metric');
255+
$table->datetime('time', 3)->nullable();
256+
$table->addColumn('meta.key', ColumnType::String);
257+
$table->addColumn('meta.value', ColumnType::String);
258+
$table->engine(Engine::MergeTree)
259+
->orderBy(['metric', 'time', 'id']);
260+
261+
$statement = $table->createIfNotExists();
262+
263+
$this->assertStringContainsString('`meta.key` String', $statement->query);
264+
$this->assertStringContainsString('`meta.value` String', $statement->query);
265+
$this->assertStringNotContainsString('`meta`.`key`', $statement->query);
266+
$this->assertStringNotContainsString('`meta`.`value`', $statement->query);
217267
}
218268

219269
/**

0 commit comments

Comments
 (0)