Skip to content

sinks: walk the input arrangement via cursor per batch#36165

Open
DAlperin wants to merge 4 commits intoMaterializeInc:mainfrom
DAlperin:dov/sink-cursor-walk
Open

sinks: walk the input arrangement via cursor per batch#36165
DAlperin wants to merge 4 commits intoMaterializeInc:mainfrom
DAlperin:dov/sink-cursor-walk

Conversation

@DAlperin
Copy link
Copy Markdown
Member

@DAlperin DAlperin commented Apr 20, 2026

Summary

Refactor the sink rendering path so sinks walk their input arrangement's cursors directly, eliminating the Vec<DiffPair<Row>>-per-group materialization and the trace-reader overhead that zip_into_diff_pairscombine_at_timestamp → flat_map previously incurred. Motivated by profiles of very large snapshot sinks where that pipeline dominated allocation and caused major page faults.

What this buys

  • No per-(key, time) Vec<DiffPair<Row>> allocation. Biggest direct win — this was the dominant allocator in zip_into_diff_pairs on the profile.
  • Key owned once per key, not per (key, time) group.
  • Fewer operator boundaries. Kafka: arrangement → encode (was: arrangement → combine_at_timestamp → flat_map → encode). Iceberg: one fewer hop too.
  • Spine compacts aggressively. Dropping the TraceAgent lets spine compaction advance its frontier, releasing historical batch state rather than accumulating it.

What this does not fix

  • The arrangement's pre-spine batcher still buffers un-sealed updates (unavoidable at this layer).
  • For a pure-insertion snapshot with no retractions, spine compaction has limited work to do — savings come primarily from the Vec/operator overhead, not from the spine itself shrinking.
  • Persist snapshot forwarding semantics and sink commit-on-frontier semantics are unchanged.

Follow-ups (not in this PR) worth considering:

  • A batcher-only operator that skips the spine entirely for sinks that don't need a trace.
  • An append-mode Iceberg fast-path that skips arrangement altogether when no key is configured.

Test plan

  • cargo check --workspace --all-targets
  • cargo clippy -p mz-interchange -p mz-storage --tests
  • bin/lint (check-no-diff fails locally due to jj colocation; all substantive checks pass)
  • cargo test -p mz-interchange --lib envelopes:: (7 tests, all pass)
  • Full CI (pending)
  • Kafka sink testdrive (snapshot + steady-state, Upsert and Debezium envelopes)
  • Iceberg sink testdrive (snapshot + steady-state, Upsert and Append envelopes)
  • Re-profile a very large snapshot sink to confirm the `zip_into_diff_pairs` hotspot is gone

🤖 Generated with Claude Code

@DAlperin DAlperin force-pushed the dov/sink-cursor-walk branch 2 times, most recently from 264cfea to 36243b0 Compare April 21, 2026 01:02
Extracts the batch-level cursor walking logic from combine_at_timestamp into
a standalone helper that yields DiffPairs one at a time via callback. This
lets sinks consume an arrangement directly inside their own operator without
materializing a Vec<DiffPair> per (key, timestamp) group.

combine_at_timestamp is left in place; subsequent commits will migrate the
Kafka and Iceberg sinks to consume arrangements via the new helper.
@DAlperin DAlperin force-pushed the dov/sink-cursor-walk branch from c94fb74 to bc5c160 Compare April 21, 2026 18:40
@DAlperin DAlperin marked this pull request as ready for review April 21, 2026 21:53
@DAlperin DAlperin requested a review from a team as a code owner April 21, 2026 21:53
Copy link
Copy Markdown
Contributor

@def- def- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a Kafka or Iceberg sink is configured with a non-unique key (KEY (...) NOT ENFORCED), Materialize is supposed to log a warning ("primary key error") whenever multiple rows share the same key at the same timestamp. With this PR the warning doesn't appear when no new data arrives afterwards. Not sure how bad that is.

Reproducer:

diff --git a/test/source-sink-errors/mzcompose.py b/test/source-sink-errors/mzcompose.py
index ac2b7c3e92..5a9bbeb7ee 100644
--- a/test/source-sink-errors/mzcompose.py
+++ b/test/source-sink-errors/mzcompose.py
@@ -13,6 +13,7 @@ Disruption and then checking the mz_internal.mz_*_statuses tables
 """

 import random
+import time
 from collections.abc import Callable
 from dataclasses import dataclass
 from textwrap import dedent
@@ -561,3 +562,52 @@ def unsupported_pg_table(c: Composition) -> None:
                  $ postgres-execute connection=postgres://postgres:postgres@postgres
                  INSERT INTO source1 VALUES (3, '[2:3]={2,2}')
                  """))
+
+
+def workflow_test_pk_violation_warning(c: Composition) -> None:
+    seed = random.randint(0, 256**4)
+    c.up("redpanda", "materialized", Service("testdrive", idle=True))
+
+    with c.override(
+        Testdrive(
+            no_reset=True,
+            seed=seed,
+            entrypoint_extra=["--initial-backoff=1s", "--backoff-factor=0"],
+        )
+    ):
+        c.testdrive(dedent("""
+                > CREATE CONNECTION kafka_conn
+                  TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
+
+                > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
+                  URL '${testdrive.schema-registry-url}'
+                  );
+
+                > CREATE TABLE t (a INT, b INT);
+
+                > CREATE SINK pk_test_sink FROM t
+                  INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-pk-test-${testdrive.seed}')
+                  KEY (a) NOT ENFORCED
+                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
+                  ENVELOPE UPSERT;
+
+                > SELECT status FROM mz_internal.mz_sink_statuses WHERE name = 'pk_test_sink'
+                running
+        """))
+
+        # PkViolationWarner rate-limits to once per 10s from construction.
+        time.sleep(12)
+
+        c.testdrive(dedent("""
+                > INSERT INTO t VALUES (1, 10), (1, 20);
+
+                $ set-sql-timeout duration=60s
+                $ kafka-verify-topic sink=materialize.public.pk_test_sink await-value-schema=true
+        """))
+
+        time.sleep(5)
+
+        logs = c.invoke("logs", "materialized", capture=True)
+        assert "primary key error" in logs.stdout, (
+            "PkViolationWarner::flush() not called after batch"
+        )

Replaces the pre-grouped VecCollection<(Option<Row>, DiffPair<Row>)> that
render_sink previously handed sinks with an Arranged<SinkTrace> — each sink
now walks cursors itself via for_each_diff_pair.

- The shared key-extraction + arrangement lives in render_sink on sinks.rs;
  zip_into_diff_pairs and combine_at_timestamp are gone.
- Kafka's encode_collection walks the arrangement inside its async operator,
  emitting a KafkaMessage per DiffPair.
- Iceberg gains a small walker operator at its input that walks the arrangement
  into the same (Option<Row>, DiffPair<Row>) stream shape its mint / write /
  commit pipeline already consumed.
- Per-(key, timestamp) primary-key-violation detection is lifted into a new
  PkViolationWarner in sinks.rs, observed incrementally as each sink walks the
  arrangement instead of after a Vec<DiffPair> materialization.

The immediate win is skipping the Vec<DiffPair> allocation per (key, time) in
combine_at_timestamp; the larger architectural win is that sinks can now make
envelope- and batch-specific decisions about cursor navigation (e.g., snapshot
fast paths) without touching shared plumbing.
SinkRender::render_sink now takes a Stream<Vec<SinkBatch>> instead of an
Arranged<SinkTrace>. Sinks only ever walk incoming batches (via
for_each_diff_pair) — they never use the TraceAgent for random access — so the
reader handle is dead weight.

arrange_sink_input still calls arrange_named, but immediately extracts
arranged.stream and lets the surrounding Arranged (and its TraceAgent) drop.
With no reader holding compaction frontiers, the arrange operator's spine can
compact to the empty antichain as batches flow, releasing historical batch
state instead of accumulating it.

Mirrors the pattern used by DD's consolidate_named, which builds an
arrangement only to call as_collection on it and drop the trace.
IcebergSink measures steady-state streaming (1 seed row in before() then
INSERT of 1M rows inside benchmark()), which exercises incremental emission
but not the snapshot path where the sink reads a pre-existing source from
as_of.

IcebergSnapshot mirrors ExactlyOnce's shape: init() pre-populates a table
with 1M rows, before() generates a unique Iceberg destination per run, and
benchmark() creates a fresh sink and measures until the full snapshot is
committed. This is the path where the zip_into_diff_pairs allocation
hotspot lived, and therefore the relevant micro-benchmark for the sink
refactor.
@DAlperin DAlperin force-pushed the dov/sink-cursor-walk branch from bc5c160 to 46f9ca4 Compare April 22, 2026 12:40
@DAlperin DAlperin requested a review from a team as a code owner April 22, 2026 12:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants