Skip to content

Commit 85fe1c5

Browse files
authored
sinks: walk the input arrangement via cursor per batch (#36165)
## Summary Refactor the sink rendering path so sinks walk their input arrangement's cursors directly, eliminating the `Vec<DiffPair<Row>>`-per-group materialization and the trace-reader overhead that `zip_into_diff_pairs` → `combine_at_timestamp` → flat_map previously incurred. Motivated by profiles of very large snapshot sinks where that pipeline dominated allocation and caused major page faults. ## What this buys - **No per-`(key, time)` `Vec<DiffPair<Row>>` allocation.** Biggest direct win — this was the dominant allocator in `zip_into_diff_pairs` on the profile. - **Key owned once per key, not per `(key, time)` group.** - **Fewer operator boundaries.** Kafka: arrangement → encode (was: arrangement → `combine_at_timestamp` → flat_map → encode). Iceberg: one fewer hop too. - **Spine compacts aggressively.** Dropping the `TraceAgent` lets spine compaction advance its frontier, releasing historical batch state rather than accumulating it. ## What this does *not* fix - The arrangement's **pre-spine batcher** still buffers un-sealed updates (unavoidable at this layer). - For a pure-insertion snapshot with no retractions, spine compaction has limited work to do — savings come primarily from the `Vec`/operator overhead, not from the spine itself shrinking. - Persist snapshot forwarding semantics and sink commit-on-frontier semantics are unchanged. Follow-ups (not in this PR) worth considering: - A batcher-only operator that skips the spine entirely for sinks that don't need a trace. - An append-mode Iceberg fast-path that skips arrangement altogether when no key is configured. ## Test plan - [x] `cargo check --workspace --all-targets` - [x] `cargo clippy -p mz-interchange -p mz-storage --tests` - [x] `bin/lint` (check-no-diff fails locally due to jj colocation; all substantive checks pass) - [x] `cargo test -p mz-interchange --lib envelopes::` (7 tests, all pass) - [ ] Full CI (pending) - [ ] Kafka sink testdrive (snapshot + steady-state, Upsert and Debezium envelopes) - [ ] Iceberg sink testdrive (snapshot + steady-state, Upsert and Append envelopes) - [ ] Re-profile a very large snapshot sink to confirm the \`zip_into_diff_pairs\` hotspot is gone 🤖 Generated with [Claude Code](https://claude.com/claude-code)
1 parent 8ef14a9 commit 85fe1c5

5 files changed

Lines changed: 637 additions & 258 deletions

File tree

misc/python/materialize/feature_benchmark/scenarios/benchmark_main.py

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1538,6 +1538,119 @@ def benchmark(self) -> MeasurementSource:
15381538
""")
15391539

15401540

1541+
class IcebergSnapshot(Sink):
1542+
"""Measure time for a freshly-created Iceberg sink to snapshot 1M pre-existing records.
1543+
1544+
Mirrors ExactlyOnce's shape: data pre-exists in a source table; `benchmark()` then
1545+
creates a brand-new sink that must emit the whole snapshot from as_of. This
1546+
exercises the sink's snapshot path (arrangement build, batch emission) rather
1547+
than the steady-state streaming path that IcebergSink measures.
1548+
"""
1549+
1550+
FIXED_SCALE = True
1551+
1552+
def __init__(
1553+
self, scale: float, mz_version: MzVersion, default_size: int, seed: int
1554+
) -> None:
1555+
super().__init__(scale, mz_version, default_size, seed)
1556+
self._run_counter = 0
1557+
self._invocation_id = uuid.uuid4()
1558+
self._current_table_name: str | None = None
1559+
1560+
@classmethod
1561+
def can_run(cls, version: MzVersion) -> bool:
1562+
return version > MzVersion.create(26, 9, 0)
1563+
1564+
def version(self) -> ScenarioVersion:
1565+
return ScenarioVersion.create(1, 0, 0)
1566+
1567+
def init(self) -> Action:
1568+
# Set up connections, a dedicated sink cluster, and a source table
1569+
# pre-populated with the records the sink will snapshot on each
1570+
# iteration. Population happens once — `benchmark()` only measures sink
1571+
# creation + snapshot emission.
1572+
return TdAction(f"""
1573+
> CREATE SECRET iceberg_secret AS '${{arg.s3-access-key}}';
1574+
1575+
> CREATE CONNECTION aws_conn TO AWS (
1576+
ACCESS KEY ID = 'tduser',
1577+
SECRET ACCESS KEY = SECRET iceberg_secret,
1578+
ENDPOINT = '${{arg.aws-iceberg-endpoint}}',
1579+
REGION = 'us-east-1'
1580+
);
1581+
1582+
> CREATE CONNECTION polaris_conn TO ICEBERG CATALOG (
1583+
CATALOG TYPE = 'REST',
1584+
URL = 'http://polaris:8181/api/catalog',
1585+
CREDENTIAL = 'root:root',
1586+
WAREHOUSE = 'default_catalog',
1587+
SCOPE = 'PRINCIPAL_ROLE:ALL'
1588+
);
1589+
1590+
> DROP CLUSTER IF EXISTS sink_cluster CASCADE
1591+
1592+
> CREATE CLUSTER sink_cluster SIZE 'scale={self._default_size},workers=1', REPLICATION FACTOR 1;
1593+
1594+
> DROP TABLE IF EXISTS snapshot_source_tbl CASCADE
1595+
1596+
> CREATE TABLE snapshot_source_tbl (pk BIGINT, data BIGINT);
1597+
1598+
> INSERT INTO snapshot_source_tbl SELECT x, x * 2 FROM generate_series(1, {self.n()}) AS x;
1599+
1600+
> SELECT COUNT(*) FROM snapshot_source_tbl;
1601+
{self.n()}
1602+
""")
1603+
1604+
def before(self) -> Action:
1605+
# Use a unique Iceberg table name per run so that each snapshot writes
1606+
# to a fresh destination — no catalog conflicts, no lingering state
1607+
# from previous iterations.
1608+
self._run_counter += 1
1609+
version_str = f"v{self._mz_version.major}_{self._mz_version.minor}_{self._mz_version.patch}"
1610+
self._current_table_name = f"snapshot_${{testdrive.seed}}_{self._invocation_id}_{version_str}_{self._run_counter}"
1611+
return TdAction("""
1612+
> DROP SINK IF EXISTS iceberg_sink;
1613+
""")
1614+
1615+
def benchmark(self) -> MeasurementSource:
1616+
# Measure creation of a fresh sink and completion of its snapshot emission.
1617+
# Total rows the sink must emit = n (pre-populated at init()).
1618+
table_name = self._current_table_name
1619+
n = self.n()
1620+
return Td(f"""
1621+
> SELECT 1;
1622+
/* A */
1623+
1
1624+
1625+
> CREATE SINK iceberg_sink
1626+
IN CLUSTER sink_cluster
1627+
FROM snapshot_source_tbl
1628+
INTO ICEBERG CATALOG CONNECTION polaris_conn (
1629+
NAMESPACE 'default_namespace',
1630+
TABLE '{table_name}'
1631+
)
1632+
USING AWS CONNECTION aws_conn
1633+
KEY (pk) NOT ENFORCED
1634+
MODE UPSERT
1635+
WITH (COMMIT INTERVAL '1s');
1636+
1637+
> SELECT messages_committed >= {n}
1638+
FROM mz_internal.mz_sink_statistics
1639+
JOIN mz_sinks ON mz_sink_statistics.id = mz_sinks.id
1640+
WHERE mz_sinks.name = 'iceberg_sink';
1641+
/* B */
1642+
true
1643+
1644+
$ duckdb-execute name=iceberg
1645+
CREATE SECRET s3_secret (TYPE S3, KEY_ID '${{arg.s3-access-user}}', SECRET '${{arg.s3-access-key}}', ENDPOINT '${{arg.aws-iceberg-endpoint}}', URL_STYLE 'path', USE_SSL false, REGION 'minio');
1646+
SET unsafe_enable_version_guessing = true;
1647+
1648+
$ duckdb-query name=iceberg
1649+
SELECT COUNT(*) FROM iceberg_scan('s3://test-bucket/default_namespace/{table_name}')
1650+
{n}
1651+
""")
1652+
1653+
15411654
class ManyKafkaSourcesOnSameCluster(Scenario):
15421655
"""Measure the time it takes to ingest data from many Kafka sources"""
15431656

0 commit comments

Comments
 (0)