Skip to content

Commit 46f9ca4

Browse files
committed
feature-benchmark: add IcebergSnapshot scenario
IcebergSink measures steady-state streaming (1 seed row in before() then INSERT of 1M rows inside benchmark()), which exercises incremental emission but not the snapshot path where the sink reads a pre-existing source from as_of. IcebergSnapshot mirrors ExactlyOnce's shape: init() pre-populates a table with 1M rows, before() generates a unique Iceberg destination per run, and benchmark() creates a fresh sink and measures until the full snapshot is committed. This is the path where the zip_into_diff_pairs allocation hotspot lived, and therefore the relevant micro-benchmark for the sink refactor.
1 parent 8a4ed1e commit 46f9ca4

1 file changed

Lines changed: 113 additions & 0 deletions

File tree

misc/python/materialize/feature_benchmark/scenarios/benchmark_main.py

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1520,6 +1520,119 @@ def benchmark(self) -> MeasurementSource:
15201520
""")
15211521

15221522

1523+
class IcebergSnapshot(Sink):
1524+
"""Measure time for a freshly-created Iceberg sink to snapshot 1M pre-existing records.
1525+
1526+
Mirrors ExactlyOnce's shape: data pre-exists in a source table; `benchmark()` then
1527+
creates a brand-new sink that must emit the whole snapshot from as_of. This
1528+
exercises the sink's snapshot path (arrangement build, batch emission) rather
1529+
than the steady-state streaming path that IcebergSink measures.
1530+
"""
1531+
1532+
FIXED_SCALE = True
1533+
1534+
def __init__(
1535+
self, scale: float, mz_version: MzVersion, default_size: int, seed: int
1536+
) -> None:
1537+
super().__init__(scale, mz_version, default_size, seed)
1538+
self._run_counter = 0
1539+
self._invocation_id = uuid.uuid4()
1540+
self._current_table_name: str | None = None
1541+
1542+
@classmethod
1543+
def can_run(cls, version: MzVersion) -> bool:
1544+
return version > MzVersion.create(26, 9, 0)
1545+
1546+
def version(self) -> ScenarioVersion:
1547+
return ScenarioVersion.create(1, 0, 0)
1548+
1549+
def init(self) -> Action:
1550+
# Set up connections, a dedicated sink cluster, and a source table
1551+
# pre-populated with the records the sink will snapshot on each
1552+
# iteration. Population happens once — `benchmark()` only measures sink
1553+
# creation + snapshot emission.
1554+
return TdAction(f"""
1555+
> CREATE SECRET iceberg_secret AS '${{arg.s3-access-key}}';
1556+
1557+
> CREATE CONNECTION aws_conn TO AWS (
1558+
ACCESS KEY ID = 'tduser',
1559+
SECRET ACCESS KEY = SECRET iceberg_secret,
1560+
ENDPOINT = '${{arg.aws-iceberg-endpoint}}',
1561+
REGION = 'us-east-1'
1562+
);
1563+
1564+
> CREATE CONNECTION polaris_conn TO ICEBERG CATALOG (
1565+
CATALOG TYPE = 'REST',
1566+
URL = 'http://polaris:8181/api/catalog',
1567+
CREDENTIAL = 'root:root',
1568+
WAREHOUSE = 'default_catalog',
1569+
SCOPE = 'PRINCIPAL_ROLE:ALL'
1570+
);
1571+
1572+
> DROP CLUSTER IF EXISTS sink_cluster CASCADE
1573+
1574+
> CREATE CLUSTER sink_cluster SIZE 'scale={self._default_size},workers=1', REPLICATION FACTOR 1;
1575+
1576+
> DROP TABLE IF EXISTS snapshot_source_tbl CASCADE
1577+
1578+
> CREATE TABLE snapshot_source_tbl (pk BIGINT, data BIGINT);
1579+
1580+
> INSERT INTO snapshot_source_tbl SELECT x, x * 2 FROM generate_series(1, {self.n()}) AS x;
1581+
1582+
> SELECT COUNT(*) FROM snapshot_source_tbl;
1583+
{self.n()}
1584+
""")
1585+
1586+
def before(self) -> Action:
1587+
# Use a unique Iceberg table name per run so that each snapshot writes
1588+
# to a fresh destination — no catalog conflicts, no lingering state
1589+
# from previous iterations.
1590+
self._run_counter += 1
1591+
version_str = f"v{self._mz_version.major}_{self._mz_version.minor}_{self._mz_version.patch}"
1592+
self._current_table_name = f"snapshot_${{testdrive.seed}}_{self._invocation_id}_{version_str}_{self._run_counter}"
1593+
return TdAction("""
1594+
> DROP SINK IF EXISTS iceberg_sink;
1595+
""")
1596+
1597+
def benchmark(self) -> MeasurementSource:
1598+
# Measure creation of a fresh sink and completion of its snapshot emission.
1599+
# Total rows the sink must emit = n (pre-populated at init()).
1600+
table_name = self._current_table_name
1601+
n = self.n()
1602+
return Td(f"""
1603+
> SELECT 1;
1604+
/* A */
1605+
1
1606+
1607+
> CREATE SINK iceberg_sink
1608+
IN CLUSTER sink_cluster
1609+
FROM snapshot_source_tbl
1610+
INTO ICEBERG CATALOG CONNECTION polaris_conn (
1611+
NAMESPACE 'default_namespace',
1612+
TABLE '{table_name}'
1613+
)
1614+
USING AWS CONNECTION aws_conn
1615+
KEY (pk) NOT ENFORCED
1616+
MODE UPSERT
1617+
WITH (COMMIT INTERVAL '1s');
1618+
1619+
> SELECT messages_committed >= {n}
1620+
FROM mz_internal.mz_sink_statistics
1621+
JOIN mz_sinks ON mz_sink_statistics.id = mz_sinks.id
1622+
WHERE mz_sinks.name = 'iceberg_sink';
1623+
/* B */
1624+
true
1625+
1626+
$ duckdb-execute name=iceberg
1627+
CREATE SECRET s3_secret (TYPE S3, KEY_ID '${{arg.s3-access-user}}', SECRET '${{arg.s3-access-key}}', ENDPOINT '${{arg.aws-iceberg-endpoint}}', URL_STYLE 'path', USE_SSL false, REGION 'minio');
1628+
SET unsafe_enable_version_guessing = true;
1629+
1630+
$ duckdb-query name=iceberg
1631+
SELECT COUNT(*) FROM iceberg_scan('s3://test-bucket/default_namespace/{table_name}')
1632+
{n}
1633+
""")
1634+
1635+
15231636
class ManyKafkaSourcesOnSameCluster(Scenario):
15241637
"""Measure the time it takes to ingest data from many Kafka sources"""
15251638

0 commit comments

Comments
 (0)