Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 113 additions & 0 deletions misc/python/materialize/feature_benchmark/scenarios/benchmark_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -1520,6 +1520,119 @@ def benchmark(self) -> MeasurementSource:
""")


class IcebergSnapshot(Sink):
"""Measure time for a freshly-created Iceberg sink to snapshot 1M pre-existing records.

Mirrors ExactlyOnce's shape: data pre-exists in a source table; `benchmark()` then
creates a brand-new sink that must emit the whole snapshot from as_of. This
exercises the sink's snapshot path (arrangement build, batch emission) rather
than the steady-state streaming path that IcebergSink measures.
"""

FIXED_SCALE = True

def __init__(
self, scale: float, mz_version: MzVersion, default_size: int, seed: int
) -> None:
super().__init__(scale, mz_version, default_size, seed)
self._run_counter = 0
self._invocation_id = uuid.uuid4()
self._current_table_name: str | None = None

@classmethod
def can_run(cls, version: MzVersion) -> bool:
return version > MzVersion.create(26, 9, 0)

def version(self) -> ScenarioVersion:
return ScenarioVersion.create(1, 0, 0)

def init(self) -> Action:
# Set up connections, a dedicated sink cluster, and a source table
# pre-populated with the records the sink will snapshot on each
# iteration. Population happens once — `benchmark()` only measures sink
# creation + snapshot emission.
return TdAction(f"""
> CREATE SECRET iceberg_secret AS '${{arg.s3-access-key}}';

> CREATE CONNECTION aws_conn TO AWS (
ACCESS KEY ID = 'tduser',
SECRET ACCESS KEY = SECRET iceberg_secret,
ENDPOINT = '${{arg.aws-iceberg-endpoint}}',
REGION = 'us-east-1'
);

> CREATE CONNECTION polaris_conn TO ICEBERG CATALOG (
CATALOG TYPE = 'REST',
URL = 'http://polaris:8181/api/catalog',
CREDENTIAL = 'root:root',
WAREHOUSE = 'default_catalog',
SCOPE = 'PRINCIPAL_ROLE:ALL'
);

> DROP CLUSTER IF EXISTS sink_cluster CASCADE

> CREATE CLUSTER sink_cluster SIZE 'scale={self._default_size},workers=1', REPLICATION FACTOR 1;

> DROP TABLE IF EXISTS snapshot_source_tbl CASCADE

> CREATE TABLE snapshot_source_tbl (pk BIGINT, data BIGINT);

> INSERT INTO snapshot_source_tbl SELECT x, x * 2 FROM generate_series(1, {self.n()}) AS x;

> SELECT COUNT(*) FROM snapshot_source_tbl;
{self.n()}
""")

def before(self) -> Action:
# Use a unique Iceberg table name per run so that each snapshot writes
# to a fresh destination — no catalog conflicts, no lingering state
# from previous iterations.
self._run_counter += 1
version_str = f"v{self._mz_version.major}_{self._mz_version.minor}_{self._mz_version.patch}"
self._current_table_name = f"snapshot_${{testdrive.seed}}_{self._invocation_id}_{version_str}_{self._run_counter}"
return TdAction("""
> DROP SINK IF EXISTS iceberg_sink;
""")

def benchmark(self) -> MeasurementSource:
# Measure creation of a fresh sink and completion of its snapshot emission.
# Total rows the sink must emit = n (pre-populated at init()).
table_name = self._current_table_name
n = self.n()
return Td(f"""
> SELECT 1;
/* A */
1

> CREATE SINK iceberg_sink
IN CLUSTER sink_cluster
FROM snapshot_source_tbl
INTO ICEBERG CATALOG CONNECTION polaris_conn (
NAMESPACE 'default_namespace',
TABLE '{table_name}'
)
USING AWS CONNECTION aws_conn
KEY (pk) NOT ENFORCED
MODE UPSERT
WITH (COMMIT INTERVAL '1s');

> SELECT messages_committed >= {n}
FROM mz_internal.mz_sink_statistics
JOIN mz_sinks ON mz_sink_statistics.id = mz_sinks.id
WHERE mz_sinks.name = 'iceberg_sink';
/* B */
true

$ duckdb-execute name=iceberg
CREATE SECRET s3_secret (TYPE S3, KEY_ID '${{arg.s3-access-user}}', SECRET '${{arg.s3-access-key}}', ENDPOINT '${{arg.aws-iceberg-endpoint}}', URL_STYLE 'path', USE_SSL false, REGION 'minio');
SET unsafe_enable_version_guessing = true;

$ duckdb-query name=iceberg
SELECT COUNT(*) FROM iceberg_scan('s3://test-bucket/default_namespace/{table_name}')
{n}
""")


class ManyKafkaSourcesOnSameCluster(Scenario):
"""Measure the time it takes to ingest data from many Kafka sources"""

Expand Down
Loading
Loading