sinks: walk the input arrangement via cursor per batch#36165
Open
DAlperin wants to merge 4 commits intoMaterializeInc:mainfrom
Open
sinks: walk the input arrangement via cursor per batch#36165DAlperin wants to merge 4 commits intoMaterializeInc:mainfrom
DAlperin wants to merge 4 commits intoMaterializeInc:mainfrom
Conversation
264cfea to
36243b0
Compare
Extracts the batch-level cursor walking logic from combine_at_timestamp into a standalone helper that yields DiffPairs one at a time via callback. This lets sinks consume an arrangement directly inside their own operator without materializing a Vec<DiffPair> per (key, timestamp) group. combine_at_timestamp is left in place; subsequent commits will migrate the Kafka and Iceberg sinks to consume arrangements via the new helper.
c94fb74 to
bc5c160
Compare
def-
reviewed
Apr 22, 2026
Contributor
def-
left a comment
There was a problem hiding this comment.
When a Kafka or Iceberg sink is configured with a non-unique key (KEY (...) NOT ENFORCED), Materialize is supposed to log a warning ("primary key error") whenever multiple rows share the same key at the same timestamp. With this PR the warning doesn't appear when no new data arrives afterwards. Not sure how bad that is.
Reproducer:
diff --git a/test/source-sink-errors/mzcompose.py b/test/source-sink-errors/mzcompose.py
index ac2b7c3e92..5a9bbeb7ee 100644
--- a/test/source-sink-errors/mzcompose.py
+++ b/test/source-sink-errors/mzcompose.py
@@ -13,6 +13,7 @@ Disruption and then checking the mz_internal.mz_*_statuses tables
"""
import random
+import time
from collections.abc import Callable
from dataclasses import dataclass
from textwrap import dedent
@@ -561,3 +562,52 @@ def unsupported_pg_table(c: Composition) -> None:
$ postgres-execute connection=postgres://postgres:postgres@postgres
INSERT INTO source1 VALUES (3, '[2:3]={2,2}')
"""))
+
+
+def workflow_test_pk_violation_warning(c: Composition) -> None:
+ seed = random.randint(0, 256**4)
+ c.up("redpanda", "materialized", Service("testdrive", idle=True))
+
+ with c.override(
+ Testdrive(
+ no_reset=True,
+ seed=seed,
+ entrypoint_extra=["--initial-backoff=1s", "--backoff-factor=0"],
+ )
+ ):
+ c.testdrive(dedent("""
+ > CREATE CONNECTION kafka_conn
+ TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
+
+ > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
+ URL '${testdrive.schema-registry-url}'
+ );
+
+ > CREATE TABLE t (a INT, b INT);
+
+ > CREATE SINK pk_test_sink FROM t
+ INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-pk-test-${testdrive.seed}')
+ KEY (a) NOT ENFORCED
+ FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
+ ENVELOPE UPSERT;
+
+ > SELECT status FROM mz_internal.mz_sink_statuses WHERE name = 'pk_test_sink'
+ running
+ """))
+
+ # PkViolationWarner rate-limits to once per 10s from construction.
+ time.sleep(12)
+
+ c.testdrive(dedent("""
+ > INSERT INTO t VALUES (1, 10), (1, 20);
+
+ $ set-sql-timeout duration=60s
+ $ kafka-verify-topic sink=materialize.public.pk_test_sink await-value-schema=true
+ """))
+
+ time.sleep(5)
+
+ logs = c.invoke("logs", "materialized", capture=True)
+ assert "primary key error" in logs.stdout, (
+ "PkViolationWarner::flush() not called after batch"
+ )Replaces the pre-grouped VecCollection<(Option<Row>, DiffPair<Row>)> that render_sink previously handed sinks with an Arranged<SinkTrace> — each sink now walks cursors itself via for_each_diff_pair. - The shared key-extraction + arrangement lives in render_sink on sinks.rs; zip_into_diff_pairs and combine_at_timestamp are gone. - Kafka's encode_collection walks the arrangement inside its async operator, emitting a KafkaMessage per DiffPair. - Iceberg gains a small walker operator at its input that walks the arrangement into the same (Option<Row>, DiffPair<Row>) stream shape its mint / write / commit pipeline already consumed. - Per-(key, timestamp) primary-key-violation detection is lifted into a new PkViolationWarner in sinks.rs, observed incrementally as each sink walks the arrangement instead of after a Vec<DiffPair> materialization. The immediate win is skipping the Vec<DiffPair> allocation per (key, time) in combine_at_timestamp; the larger architectural win is that sinks can now make envelope- and batch-specific decisions about cursor navigation (e.g., snapshot fast paths) without touching shared plumbing.
SinkRender::render_sink now takes a Stream<Vec<SinkBatch>> instead of an Arranged<SinkTrace>. Sinks only ever walk incoming batches (via for_each_diff_pair) — they never use the TraceAgent for random access — so the reader handle is dead weight. arrange_sink_input still calls arrange_named, but immediately extracts arranged.stream and lets the surrounding Arranged (and its TraceAgent) drop. With no reader holding compaction frontiers, the arrange operator's spine can compact to the empty antichain as batches flow, releasing historical batch state instead of accumulating it. Mirrors the pattern used by DD's consolidate_named, which builds an arrangement only to call as_collection on it and drop the trace.
IcebergSink measures steady-state streaming (1 seed row in before() then INSERT of 1M rows inside benchmark()), which exercises incremental emission but not the snapshot path where the sink reads a pre-existing source from as_of. IcebergSnapshot mirrors ExactlyOnce's shape: init() pre-populates a table with 1M rows, before() generates a unique Iceberg destination per run, and benchmark() creates a fresh sink and measures until the full snapshot is committed. This is the path where the zip_into_diff_pairs allocation hotspot lived, and therefore the relevant micro-benchmark for the sink refactor.
bc5c160 to
46f9ca4
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Refactor the sink rendering path so sinks walk their input arrangement's cursors directly, eliminating the
Vec<DiffPair<Row>>-per-group materialization and the trace-reader overhead thatzip_into_diff_pairs→combine_at_timestamp→ flat_map previously incurred. Motivated by profiles of very large snapshot sinks where that pipeline dominated allocation and caused major page faults.What this buys
(key, time)Vec<DiffPair<Row>>allocation. Biggest direct win — this was the dominant allocator inzip_into_diff_pairson the profile.(key, time)group.combine_at_timestamp→ flat_map → encode). Iceberg: one fewer hop too.TraceAgentlets spine compaction advance its frontier, releasing historical batch state rather than accumulating it.What this does not fix
Vec/operator overhead, not from the spine itself shrinking.Follow-ups (not in this PR) worth considering:
Test plan
cargo check --workspace --all-targetscargo clippy -p mz-interchange -p mz-storage --testsbin/lint(check-no-diff fails locally due to jj colocation; all substantive checks pass)cargo test -p mz-interchange --lib envelopes::(7 tests, all pass)🤖 Generated with Claude Code