Skip to content

Commit 36243b0

Browse files
committed
feature-benchmark: add IcebergSnapshot scenario
IcebergSink measures steady-state streaming (1 seed row in before() then INSERT of 1M rows inside benchmark()), which exercises incremental emission but not the snapshot path where the sink reads a pre-existing source from as_of. IcebergSnapshot mirrors ExactlyOnce's shape: init() pre-populates a table with 1M rows, before() generates a unique Iceberg destination per run, and benchmark() creates a fresh sink and measures until the full snapshot is committed. This is the path where the zip_into_diff_pairs allocation hotspot lived, and therefore the relevant micro-benchmark for the sink refactor.
1 parent 8d7354f commit 36243b0

1 file changed

Lines changed: 113 additions & 0 deletions

File tree

misc/python/materialize/feature_benchmark/scenarios/benchmark_main.py

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1534,6 +1534,119 @@ def benchmark(self) -> MeasurementSource:
15341534
""")
15351535

15361536

1537+
class IcebergSnapshot(Sink):
1538+
"""Measure time for a freshly-created Iceberg sink to snapshot 1M pre-existing records.
1539+
1540+
Mirrors ExactlyOnce's shape: data pre-exists in a source table; `benchmark()` then
1541+
creates a brand-new sink that must emit the whole snapshot from as_of. This
1542+
exercises the sink's snapshot path (arrangement build, batch emission) rather
1543+
than the steady-state streaming path that IcebergSink measures.
1544+
"""
1545+
1546+
FIXED_SCALE = True
1547+
1548+
def __init__(
1549+
self, scale: float, mz_version: MzVersion, default_size: int, seed: int
1550+
) -> None:
1551+
super().__init__(scale, mz_version, default_size, seed)
1552+
self._run_counter = 0
1553+
self._invocation_id = uuid.uuid4()
1554+
self._current_table_name: str | None = None
1555+
1556+
@classmethod
1557+
def can_run(cls, version: MzVersion) -> bool:
1558+
return version > MzVersion.create(26, 9, 0)
1559+
1560+
def version(self) -> ScenarioVersion:
1561+
return ScenarioVersion.create(1, 0, 0)
1562+
1563+
def init(self) -> Action:
1564+
# Set up connections, a dedicated sink cluster, and a source table
1565+
# pre-populated with the records the sink will snapshot on each
1566+
# iteration. Population happens once — `benchmark()` only measures sink
1567+
# creation + snapshot emission.
1568+
return TdAction(f"""
1569+
> CREATE SECRET iceberg_secret AS '${{arg.s3-access-key}}';
1570+
1571+
> CREATE CONNECTION aws_conn TO AWS (
1572+
ACCESS KEY ID = 'tduser',
1573+
SECRET ACCESS KEY = SECRET iceberg_secret,
1574+
ENDPOINT = '${{arg.aws-iceberg-endpoint}}',
1575+
REGION = 'us-east-1'
1576+
);
1577+
1578+
> CREATE CONNECTION polaris_conn TO ICEBERG CATALOG (
1579+
CATALOG TYPE = 'REST',
1580+
URL = 'http://polaris:8181/api/catalog',
1581+
CREDENTIAL = 'root:root',
1582+
WAREHOUSE = 'default_catalog',
1583+
SCOPE = 'PRINCIPAL_ROLE:ALL'
1584+
);
1585+
1586+
> DROP CLUSTER IF EXISTS sink_cluster CASCADE
1587+
1588+
> CREATE CLUSTER sink_cluster SIZE 'scale={self._default_size},workers=1', REPLICATION FACTOR 1;
1589+
1590+
> DROP TABLE IF EXISTS snapshot_source_tbl CASCADE
1591+
1592+
> CREATE TABLE snapshot_source_tbl (pk BIGINT, data BIGINT);
1593+
1594+
> INSERT INTO snapshot_source_tbl SELECT x, x * 2 FROM generate_series(1, {self.n()}) AS x;
1595+
1596+
> SELECT COUNT(*) FROM snapshot_source_tbl;
1597+
{self.n()}
1598+
""")
1599+
1600+
def before(self) -> Action:
1601+
# Use a unique Iceberg table name per run so that each snapshot writes
1602+
# to a fresh destination — no catalog conflicts, no lingering state
1603+
# from previous iterations.
1604+
self._run_counter += 1
1605+
version_str = f"v{self._mz_version.major}_{self._mz_version.minor}_{self._mz_version.patch}"
1606+
self._current_table_name = f"snapshot_${{testdrive.seed}}_{self._invocation_id}_{version_str}_{self._run_counter}"
1607+
return TdAction("""
1608+
> DROP SINK IF EXISTS iceberg_sink;
1609+
""")
1610+
1611+
def benchmark(self) -> MeasurementSource:
1612+
# Measure creation of a fresh sink and completion of its snapshot emission.
1613+
# Total rows the sink must emit = n (pre-populated at init()).
1614+
table_name = self._current_table_name
1615+
n = self.n()
1616+
return Td(f"""
1617+
> SELECT 1;
1618+
/* A */
1619+
1
1620+
1621+
> CREATE SINK iceberg_sink
1622+
IN CLUSTER sink_cluster
1623+
FROM snapshot_source_tbl
1624+
INTO ICEBERG CATALOG CONNECTION polaris_conn (
1625+
NAMESPACE 'default_namespace',
1626+
TABLE '{table_name}'
1627+
)
1628+
USING AWS CONNECTION aws_conn
1629+
KEY (pk) NOT ENFORCED
1630+
MODE UPSERT
1631+
WITH (COMMIT INTERVAL '1s');
1632+
1633+
> SELECT messages_committed >= {n}
1634+
FROM mz_internal.mz_sink_statistics
1635+
JOIN mz_sinks ON mz_sink_statistics.id = mz_sinks.id
1636+
WHERE mz_sinks.name = 'iceberg_sink';
1637+
/* B */
1638+
true
1639+
1640+
$ duckdb-execute name=iceberg
1641+
CREATE SECRET s3_secret (TYPE S3, KEY_ID '${{arg.s3-access-user}}', SECRET '${{arg.s3-access-key}}', ENDPOINT '${{arg.aws-iceberg-endpoint}}', URL_STYLE 'path', USE_SSL false, REGION 'minio');
1642+
SET unsafe_enable_version_guessing = true;
1643+
1644+
$ duckdb-query name=iceberg
1645+
SELECT COUNT(*) FROM iceberg_scan('s3://test-bucket/default_namespace/{table_name}')
1646+
{n}
1647+
""")
1648+
1649+
15371650
class ManyKafkaSourcesOnSameCluster(Scenario):
15381651
"""Measure the time it takes to ingest data from many Kafka sources"""
15391652

0 commit comments

Comments
 (0)