feat(connectors): add Apache Doris sink connector#3215
Conversation
a9f3652 to
b5434dd
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #3215 +/- ##
=============================================
- Coverage 73.57% 49.73% -23.84%
Complexity 943 943
=============================================
Files 1222 1221 -1
Lines 114939 100437 -14502
Branches 91675 77202 -14473
=============================================
- Hits 84561 49957 -34604
- Misses 27532 47867 +20335
+ Partials 2846 2613 -233
🚀 New features to boost your workflow:
|
03ed46c to
9fd85d6
Compare
|
There's a performance optimization(may or may not works) that I want to leave as a follow-up PR: |
|
we'll check this in upcoming 2-3 days. |
|
/ready |
hubcio
left a comment
There was a problem hiding this comment.
besides below findings, please try to improve the tests time :( currently, iggy CI takes 11 minutes in github actions CI. if we merge this PR, it'll increase to 19 minutes, which is unacceptable.
f5f1208 to
9906800
Compare
|
/ready |
hubcio
left a comment
There was a problem hiding this comment.
second pass of review below, you're getting there :)
Sink connector that writes Iggy messages to Apache Doris via the HTTP
Stream Load API. v1 scope: JSON payloads only, HTTP Basic auth,
pre-created tables only (no DDL).
Behaviour:
- Manual 307/308 redirect following (capped at 5) so the Authorization
header survives the FE -> BE hop, which reqwest strips by default.
- Deterministic per-batch label
({prefix}-{stream}-{topic}-{partition}-{first_offset}-{last_offset})
so replays are deduplicated by Doris within label_keep_max_second.
- Response body Status field drives error classification: Success and
"Label Already Exists" -> Ok; Publish Timeout -> CannotStoreData
(transient); Fail or any unknown status -> PermanentHttpError so the
runtime DLQs the batch instead of looping.
- Optional columns / where / max_filter_ratio / batch_size / timeout
forwarded as Stream Load headers.
- Password held as secrecy::SecretString; auth header wrapped in
SecretString so Debug derivation never leaks the base64 credential.
- Client built in open() with InitError on failure; fe_url validated
there too so a bad config fails at startup rather than first batch.
Tests: 6 integration tests under core/integration/tests/connectors/doris
backed by an apache/doris all-in-one testcontainer (FE HTTP + FE MySQL).
Coverage includes happy path, 1k-row bulk, max_filter_ratio skip path,
label-replay dedupe, missing-target-table (proves no auto-create), and
the columns derived-expression header. The container must bind host:8040
1:1 because the FE 307-redirects to 127.0.0.1:8040; tests are serialized
via a 'doris' nextest test-group (max-threads = 1) so concurrent test
processes don't race for that port.
Addresses review feedback on the Doris sink connector before merge.
Correctness:
- Label format now appends an 8-hex blake3 of the *raw* stream/topic names,
so streams that sanitize identically (e.g. `events.v1` vs `events_v1`)
can no longer collide and silently dedupe against each other in Doris.
Each variable-length segment is also truncated; total label is bounded
under Doris's 128-char cap regardless of input length.
- `build_label` is now a pure `pub` free function. The integration test's
manual label construction (used to verify server-side dedupe) now calls
it directly, so the test cannot drift from the production format.
- `consume` tracks the *most severe* error across chunks via `record_error`:
permanent shadows transient. The previous first-error strategy let a
transient error from chunk N hide a permanent error from chunk M and
caused the runtime to retry forever instead of routing to DLQ.
- HTTP 408 (Request Timeout) and 429 (Too Many Requests) classified as
`CannotStoreData` (transient). They are 4xx but recoverable; the old
code lumped them with all 4xx and DLQ'd retryable conditions.
- Parse failures on the response body now return `PermanentHttpError`.
An unparseable 200-OK is almost always a Doris bug or proxy interference
— retrying the same bytes won't help.
Security:
- `open()` rejects `database`/`table` values outside `[A-Za-z0-9_]+`.
Doris would reject them server-side anyway, but rejecting at config-load
also prevents path traversal in the `/api/{db}/{table}/_stream_load` URL.
- `open()` emits a `warn!` when `fe_url` is `http://` and the host is
not loopback. README's new "Security notes" section spells out the
trust boundary the manual-redirect-following implies (a compromised FE
could exfiltrate credentials via a hostile `Location` header).
- Response body truncated to 4 KB at a UTF-8 boundary before being
formatted into errors or logs, so a misbehaving proxy that returns a
giant body cannot OOM the connector or flood logs.
Robustness:
- Explicit `connect_timeout` (5 s) so an unreachable FE fails fast
instead of consuming the full request timeout on the handshake alone.
- `send_stream_load` takes `bytes::Bytes`; clones inside the redirect
loop are now refcount bumps instead of full `Vec<u8>` copies.
Observability:
- `warn!` when Doris reports `number_filtered_rows > 0` — schema drift
in upstream messages was previously logged at `info!` and easy to miss.
- Per-batch success log demoted from `info!` to `debug!`.
- README documents `Expect: 100-continue`, `label_keep_max_second`
guidance, and the filtered-row alert.
Tests: 21 unit tests pass (was 13, added 8 covering hash-suffix label
collision resistance, label length cap, severity ordering, identifier
validation, and log truncation). All 6 testcontainer integration tests
pass against a real Doris all-in-one image.
hubcio
left a comment
There was a problem hiding this comment.
i think after fixing these PR would be ready to merge
…eroization, CI image, test assertion - validate_redirect: reject non-HTTP(S) redirect targets up front so an http FE redirecting to ftp://, file://, etc. can't slip past the downgrade rule (base isn't https) and the skipped host check (no allowlist) to get credentials re-attached. - new(): keep the cleartext credential, its base64 form, and the Basic <b64> header value in Zeroizing buffers so they are wiped on drop instead of lingering in freed heap. - CI: bump the shared Doris container to apache/doris:4.0.3-all-slim to match the fixture self-boot image (the 2.1.0 all-in-one no longer matches the HDD-override-free DDL). - wait_for_rows: succeed only on an exact count match and surface an overshoot (duplicate / label-window replay) as a clear error instead of an opaque call-site assert_eq panic.
|
Just fix the issues mentioned. Thank @hubcio for the review!!! |
The shared-container wait block ran `FE_PORT=$(docker port ...)` under `set -e`/`pipefail`. If the container exited during boot, `docker port` returns non-zero and the whole test step aborted before any test ran — contradicting the block's own promise that a launch failure falls back to self-boot. Guard both `docker port` substitutions with `|| VAR=""` so a dead/unhealthy shared container degrades to self-boot instead of failing CI.
Resolve Cargo.toml workspace-deps conflict (keep both windows-native-keyring-store from master and wiremock from this branch, in sorted order) and migrate the Doris integration fixtures to sqlx 0.9's SqlSafeStr API (wrap test-controlled dynamic SQL in AssertSqlSafe).
|
/author |
After bumping the shared CI container from doris-all-in-one-2.1.0 to 4.0.3-all-slim (so CI matches the fixture image), the doris tests began timing out the 1h job cap. Root cause, found by reproducing the container locally: 4.0.3's start_be.sh hard-`exit 1`s unless vm.max_map_count >= 2000000 (it also gates on swap and ulimit). The 2.1.0 image had no such gate, which is exactly why it passed in 13min and 4.0.3 did not. On the runner's low default the BE exits on start; its death ends the entrypoint's `wait $child_pid`, so the container exits 0 and never reports healthy — forcing every test onto the slow per-test self-boot path (480s nextest timeout x 4 retries). - Raise vm.max_map_count to 2000000 on the runner before launch (inherited by all containers) and pass SKIP_CHECK_ULIMIT=true to bypass the swap/ulimit gates without swapoff'ing the runner. Verified locally: BE reaches alive=true. - Self-boot fixture sets SKIP_CHECK_ULIMIT=true too, so the fallback path and local Linux dev aren't blocked by the same gates. - Cap FE heap to 2GB and BE mem_limit to 4GB: the container boots beside the cargo build on a 16GB runner, and 4.0.3 otherwise sizes for a dedicated host (8GB FE heap + ~90%-of-RAM BE), so bounding it keeps room for rustc. - Make the shared-container wait non-fatal and dump docker logs/inspect on the unhealthy path, so a launch failure degrades to self-boot instead of aborting the step. Also rustfmt the sqlx AssertSqlSafe call sites introduced by the sqlx 0.9 migration in the preceding master merge. Verified in CI: shared container reports healthy and test-1/test-2 pass (~13min).
ab7ae60 to
734bfe8
Compare
|
/ready |
|
/author |
…ers fixture
Per hubcio's review feedback, the elaborate `docker run` + wait + diagnostics
dance in .github/actions/rust/pre-merge had no business living in CI bash: an
ordinary dev running `cargo test` got none of it, the failure modes were opaque,
and we had two parallel "how to boot Doris" implementations to keep in sync.
Pull everything that *can* live in Rust into the testcontainers fixture so
`cargo test` and CI follow exactly the same path:
* One shared Doris cluster per nextest test binary, cached in a `SHARED_DORIS`
`OnceCell`. First doris test pays the ~40s boot; the rest reuse the cluster
and just create their own per-test UUID database. Drops the per-test
`DORIS_LOCK` along with the per-test container.
* FE heap cap (-Xmx2048m) and BE mem_limit (4096M) move into the testcontainers
builder via a `with_entrypoint("bash")` + `with_cmd(["-c", ...])` override
that patches the confs before delegating to the image's normal entry_point.sh.
* `SKIP_CHECK_ULIMIT=true` was already there.
* `vm.max_map_count` is the one thing no container can raise. A Linux-only
precheck reads `/proc/sys/vm/max_map_count` at boot and returns a
self-describing error pointing at the exact `sudo sysctl` command if the host
is below Doris's 2_000_000 threshold.
CI collapses from ~80 lines (launch + wait + diagnostics + cleanup) to a single
`sudo sysctl -w vm.max_map_count=2000000` step, since that kernel param can
only live on the host. The trade-off vs the prior CI pre-launch is losing the
~30-60s "boot during build" overlap; in exchange the boot is amortized over
every doris test in the binary, dev and CI run identical code, and the failure
mode on a too-low `vm.max_map_count` is now a one-line message instead of an
hour-long test timeout.
…via testcontainers reuse First CI run after the orchestration move uncovered that an in-process `OnceCell` singleton does not span nextest's per-test-process model: test-1 booted the container fine, exited, dropped the container, then test-2 raced the previous container's port-8040 release on a fresh boot and failed with `port is already allocated`. Enable testcontainers' `reusable-containers` feature, give the container a stable name (`iggy-doris-test`), and mark it `ReuseDirective::Always`. The Docker daemon now holds the shared state: the first test process creates the container; every later test process (in this binary, in test-2's partition, or in a re-run) inspects by name+labels, finds it running, and attaches. The 1:1 BE port is therefore held continuously by one container instead of racing against itself across restarts. This also drops the `SharedDoris`/`OnceCell`/`Arc` indirection — the shared state is in Docker, not in process memory — and inlines its behavior into `DorisContainer::start`. `wait_for_be_alive` fast-paths in well under a second on the attach path. The container intentionally outlives the test session (that is what enables reuse). CI runners are ephemeral so it dies with them; locally, `docker rm -f iggy-doris-test` forces a fresh boot.
Which issue does this PR close?
Closes #3112
Rationale
Adds an Apache Doris sink so Iggy streams can be written into Doris for analytical querying.
What changed?
Iggy had no path to land messages in Apache Doris. A new
iggy_connector_doris_sinkcrate consumes JSON payloads and writes them via Doris's HTTP Stream Load API (PUT /api/{db}/{table}/_stream_load).The non-obvious bits the connector handles: re-attaching
Authorizationacross the FE→BE 307 redirect (whichreqweststrips by default), parsing the JSONStatusbody to classify success /Label Already Exists/ transient (Publish Timeout, 5xx) / permanent (Fail, 4xx, unknown), and emitting a deterministic per-batch label so replays are deduplicated by Doris's label-keep window. v1 is sink-only, JSON-only, HTTP Basic auth only, and assumes pre-created tables — no DDL.Local Execution
AI Usage
quickwit_sink/influxdb_sink, testcontainer fixture, and iteration on the Stream Load redirect +Status-body classification.apache/doris:doris-all-in-one-2.1.0container, covering happy path, 1k-row bulk,max_filter_ratio, label-replay dedupe, missing-target-table, andcolumnsderived expressions; row state verified via the MySQL frontend. help write docs.Some Image!