feat(connectors): Clickhouse Sink Connector#2886
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #2886 +/- ##
=============================================
- Coverage 74.48% 52.09% -22.39%
Complexity 943 943
=============================================
Files 1188 1192 +4
Lines 106530 94673 -11857
Branches 83560 71721 -11839
=============================================
- Hits 79350 49324 -30026
- Misses 24433 42636 +18203
+ Partials 2747 2713 -34
🚀 New features to boost your workflow:
|
|
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 7 days if no further activity occurs. If you need a review, please ensure CI is green and the PR is rebased on the latest master. Don't hesitate to ping the maintainers - either @core on Discord or by mentioning them directly here on the PR. Thank you for your contribution! |
hubcio
left a comment
There was a problem hiding this comment.
overall good direction, just needs a little bit polishing
|
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 7 days if no further activity occurs. If you need a review, please ensure CI is green and the PR is rebased on the latest master. Don't hesitate to ping the maintainers - either @core on Discord or by mentioning them directly here on the PR. Thank you for your contribution! |
|
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 7 days if no further activity occurs. If you need a review, please ensure CI is green and the PR is rebased on the latest master. Don't hesitate to ping the maintainers - either @core on Discord or by mentioning them directly here on the PR. Thank you for your contribution! |
|
Well, the Clickhouse native client situation is kinda sad, we should definitely create a tracking issue for that + TCP support. I don't see a reason to hold this PR anymore, I think we should merge it as it is. |
|
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 7 days if no further activity occurs. If you need a review, please ensure CI is green and the PR is rebased on the latest master. Don't hesitate to ping the maintainers - either @core on Discord or by mentioning them directly here on the PR. Thank you for your contribution! |
| escape_backtick(&self.database), | ||
| escape_backtick(table), | ||
| format, | ||
| ); |
There was a problem hiding this comment.
non-idempotent INSERT with no insert_deduplication_token. retry loop at lines 219-252 resends the same body on transient network/HTTP failures. if the server applied the batch and the response was lost, the retry produces duplicate rows.
pass a deterministic token derived from (partition_id, first_offset, last_offset) as a query setting; works on ReplicatedMergeTree by default and on plain MergeTree with non_replicated_deduplication_window set. otherwise document at-least-once semantics loudly.
| } | ||
|
|
||
| fn escape_single_quote(s: &str) -> String { | ||
| s.replace('\'', "\\'") |
There was a problem hiding this comment.
sql injection vector. escape_single_quote only handles '; backslash passes through unchanged. clickhouse string-literal grammar requires escaping both ' and \ (per https://clickhouse.com/docs/sql-reference/syntax). adversarial config like database = "x\" followed by table = "y' OR 1=1 --" produces a literal that closes early because the trailing \ swallows the next '.
fix: validate identifiers with a strict allowlist (^[A-Za-z_][A-Za-z0-9_]*$) at config parse, and escape both ' and \ in any literal that gets concatenated into SQL.
| } | ||
|
|
||
| fn escape_backtick(s: &str) -> String { | ||
| s.replace('`', "\\`") |
There was a problem hiding this comment.
same backslash issue as escape_single_quote. clickhouse's lexer accepts both doubled-backtick and \``-escape inside backtick-quoted identifiers, so the produced SQL parses, but the unescaped backslash means an attacker-controlled identifier with a trailing ` can break out of the identifier just like the literal case.
| for (k, v) in obj { | ||
| // Map keys must be serialisable as the key type. JSON object | ||
| // keys are always strings, so we wrap them in OwnedValue::String. | ||
| let key_val = OwnedValue::String(k.clone()); |
There was a problem hiding this comment.
OwnedValue::String(k.clone()) per map entry per row, then full serialize_value recursion. when key_type == ChType::String (the common case for Map(String, _)), fast-path to write_string(k.as_bytes(), buf) directly.
| error!("Cannot decode UUID hex: {s}"); | ||
| Error::InvalidRecord | ||
| })?; | ||
| let bytes = hex::decode(hex_str).map_err(|_| { |
There was a problem hiding this comment.
uuid path allocates a Vec<u8> via hex::decode for every uuid cell. four u64::from_str_radix calls on the hex segments + to_le_bytes() is zero-alloc and avoids the inline mod hex reimplementation at lines 1215-1238.
| (s, "00:00:00") | ||
| }; | ||
|
|
||
| let date_parts: Vec<&str> = date_part.splitn(3, '-').collect(); |
There was a problem hiding this comment.
splitn(...).collect::<Vec<&str>>() per call (also at lines 491, 583, 605). per-row hot path when timestamps arrive as iso strings.
use iterator destructuring (let mut it = s.split('-'); let y = it.next()...) -- zero alloc.
| return Ok(()); | ||
| } | ||
|
|
||
| let query = format!( |
There was a problem hiding this comment.
format! rebuilds the INSERT query and the URL on every batch. both depend only on (database, table, format) which are fixed after open().
precompute in ClickHouseClient::new (or via OnceLock keyed on (table, format) if the format is allowed to change per call). saves two small allocations per batch.
| /// Build a newline-delimited JSON body for `FORMAT JSONEachRow`. | ||
| /// Each `Payload::Json` message becomes one line. Other payload types are skipped. | ||
| pub(crate) fn build_json_body(messages: &[ConsumedMessage]) -> Vec<u8> { | ||
| let mut buf = Vec::with_capacity(messages.len() * 64); |
There was a problem hiding this comment.
fresh Vec<u8> allocated and dropped per batch (also at lines 64, 89). at default batch_length = 1000 and poll_interval = 5ms this is meaningful allocator pressure.
reuse a sink-owned buffer. note: consume(&self) may run concurrently for multi-topic configs, so the buffer needs Mutex<BytesMut> or per-task allocation -- check the runtime contract first.
|
/author |
Which issue does this PR close?
Closes #2539
Rationale
Clickhouse is a real-time data analytics engine, and very popular in modern analytics architectures.
What changed?
This PR introduces a Clickhouse Sink Connector that enables writing data from Iggy to Clickhouse.
The Clickhouse writing logic is heavily inspired by the official Clickhouse Kafka Connector.
Local Execution
Images 1&2: Produced 30456 + 29060 rows into Iggy in two batches
Image 3: Verified schema and number of rows in Clickhouse
AI Usage