Skip to content

Commit abd689b

Browse files
authored
Merge branch 'main' into feature/iceberg-compaction-benchmark
2 parents e2e9c33 + 92f6eaa commit abd689b

205 files changed

Lines changed: 8035 additions & 1491 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/dependabot.yml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,22 @@ updates:
3737
patterns:
3838
- "prost*"
3939
- "pbjson*"
40+
# Catch-all: group only minor/patch into a single PR,
41+
# excluding deps we want always separate (and excluding arrow/parquet which have their own group)
42+
all-other-cargo-deps:
43+
applies-to: version-updates
44+
patterns:
45+
- "*"
46+
exclude-patterns:
47+
- "arrow*"
48+
- "parquet"
49+
- "object_store"
50+
- "sqlparser"
51+
- "prost*"
52+
- "pbjson*"
53+
update-types:
54+
- "minor"
55+
- "patch"
4056
- package-ecosystem: "github-actions"
4157
directory: "/"
4258
schedule:

.github/workflows/stale.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ jobs:
2727
issues: write
2828
pull-requests: write
2929
steps:
30-
- uses: actions/stale@997185467fa4f803885201cee163a9f38240193d # v10.1.1
30+
- uses: actions/stale@b5d41d4e1d5dceea10e7104786b73624c18a190f # v10.2.0
3131
with:
3232
stale-pr-message: "Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days."
3333
days-before-pr-stale: 60
Lines changed: 65 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,26 @@ For full instructions on running these benchmarks on an EC2 instance, see the [C
2626

2727
[Comet Benchmarking on EC2 Guide]: https://datafusion.apache.org/comet/contributor-guide/benchmarking_aws_ec2.html
2828

29+
## Usage
30+
31+
All benchmarks are run via `run.py`:
32+
33+
```
34+
python3 run.py --engine <engine> --benchmark <tpch|tpcds> [options]
35+
```
36+
37+
| Option | Description |
38+
| -------------- | ------------------------------------------------ |
39+
| `--engine` | Engine name (matches a TOML file in `engines/`) |
40+
| `--benchmark` | `tpch` or `tpcds` |
41+
| `--iterations` | Number of iterations (default: 1) |
42+
| `--output` | Output directory (default: `.`) |
43+
| `--query` | Run a single query number |
44+
| `--no-restart` | Skip Spark master/worker restart |
45+
| `--dry-run` | Print the spark-submit command without executing |
46+
47+
Available engines: `spark`, `comet`, `comet-iceberg`, `gluten`
48+
2949
## Example usage
3050

3151
Set Spark environment variables:
@@ -47,7 +67,7 @@ Run Spark benchmark:
4767
```shell
4868
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64
4969
sudo ./drop-caches.sh
50-
./spark-tpch.sh
70+
python3 run.py --engine spark --benchmark tpch
5171
```
5272

5373
Run Comet benchmark:
@@ -56,7 +76,7 @@ Run Comet benchmark:
5676
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64
5777
export COMET_JAR=/opt/comet/comet-spark-spark3.5_2.12-0.10.0.jar
5878
sudo ./drop-caches.sh
59-
./comet-tpch.sh
79+
python3 run.py --engine comet --benchmark tpch
6080
```
6181

6282
Run Gluten benchmark:
@@ -65,7 +85,13 @@ Run Gluten benchmark:
6585
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
6686
export GLUTEN_JAR=/opt/gluten/gluten-velox-bundle-spark3.5_2.12-linux_amd64-1.4.0.jar
6787
sudo ./drop-caches.sh
68-
./gluten-tpch.sh
88+
python3 run.py --engine gluten --benchmark tpch
89+
```
90+
91+
Preview a command without running it:
92+
93+
```shell
94+
python3 run.py --engine comet --benchmark tpch --dry-run
6995
```
7096

7197
Generating charts:
@@ -74,6 +100,11 @@ Generating charts:
74100
python3 generate-comparison.py --benchmark tpch --labels "Spark 3.5.3" "Comet 0.9.0" "Gluten 1.4.0" --title "TPC-H @ 100 GB (single executor, 8 cores, local Parquet files)" spark-tpch-1752338506381.json comet-tpch-1752337818039.json gluten-tpch-1752337474344.json
75101
```
76102

103+
## Engine Configuration
104+
105+
Each engine is defined by a TOML file in `engines/`. The config specifies JARs, Spark conf overrides,
106+
required environment variables, and optional defaults/exports. See existing files for examples.
107+
77108
## Iceberg Benchmarking
78109

79110
Comet includes native Iceberg support via iceberg-rust integration. This enables benchmarking TPC-H queries
@@ -90,14 +121,16 @@ export ICEBERG_JAR=/path/to/iceberg-spark-runtime-3.5_2.12-1.8.1.jar
90121

91122
Note: Table creation uses `--packages` which auto-downloads the dependency.
92123

93-
### Create Iceberg TPC-H tables
124+
### Create Iceberg tables
94125

95-
Convert existing Parquet TPC-H data to Iceberg format:
126+
Convert existing Parquet data to Iceberg format using `create-iceberg-tables.py`.
127+
The script configures the Iceberg catalog automatically -- no `--conf` flags needed.
96128

97129
```shell
98130
export ICEBERG_WAREHOUSE=/mnt/bigdata/iceberg-warehouse
99-
export ICEBERG_CATALOG=${ICEBERG_CATALOG:-local}
131+
mkdir -p $ICEBERG_WAREHOUSE
100132

133+
# TPC-H
101134
$SPARK_HOME/bin/spark-submit \
102135
--master $SPARK_MASTER \
103136
--packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \
@@ -106,13 +139,24 @@ $SPARK_HOME/bin/spark-submit \
106139
--conf spark.executor.cores=8 \
107140
--conf spark.cores.max=8 \
108141
--conf spark.executor.memory=16g \
109-
--conf spark.sql.catalog.${ICEBERG_CATALOG}=org.apache.iceberg.spark.SparkCatalog \
110-
--conf spark.sql.catalog.${ICEBERG_CATALOG}.type=hadoop \
111-
--conf spark.sql.catalog.${ICEBERG_CATALOG}.warehouse=$ICEBERG_WAREHOUSE \
112-
create-iceberg-tpch.py \
142+
create-iceberg-tables.py \
143+
--benchmark tpch \
113144
--parquet-path $TPCH_DATA \
114-
--catalog $ICEBERG_CATALOG \
115-
--database tpch
145+
--warehouse $ICEBERG_WAREHOUSE
146+
147+
# TPC-DS
148+
$SPARK_HOME/bin/spark-submit \
149+
--master $SPARK_MASTER \
150+
--packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \
151+
--conf spark.driver.memory=8G \
152+
--conf spark.executor.instances=2 \
153+
--conf spark.executor.cores=8 \
154+
--conf spark.cores.max=16 \
155+
--conf spark.executor.memory=16g \
156+
create-iceberg-tables.py \
157+
--benchmark tpcds \
158+
--parquet-path $TPCDS_DATA \
159+
--warehouse $ICEBERG_WAREHOUSE
116160
```
117161

118162
### Run Iceberg benchmark
@@ -124,20 +168,22 @@ export ICEBERG_JAR=/path/to/iceberg-spark-runtime-3.5_2.12-1.8.1.jar
124168
export ICEBERG_WAREHOUSE=/mnt/bigdata/iceberg-warehouse
125169
export TPCH_QUERIES=/mnt/bigdata/tpch/queries/
126170
sudo ./drop-caches.sh
127-
./comet-tpch-iceberg.sh
171+
python3 run.py --engine comet-iceberg --benchmark tpch
128172
```
129173

130174
The benchmark uses `spark.comet.scan.icebergNative.enabled=true` to enable Comet's native iceberg-rust
131175
integration. Verify native scanning is active by checking for `CometIcebergNativeScanExec` in the
132176
physical plan output.
133177

134-
### Iceberg-specific options
178+
### create-iceberg-tables.py options
135179

136-
| Environment Variable | Default | Description |
137-
| -------------------- | ---------- | ----------------------------------- |
138-
| `ICEBERG_CATALOG` | `local` | Iceberg catalog name |
139-
| `ICEBERG_DATABASE` | `tpch` | Database containing TPC-H tables |
140-
| `ICEBERG_WAREHOUSE` | (required) | Path to Iceberg warehouse directory |
180+
| Option | Required | Default | Description |
181+
| ---------------- | -------- | -------------- | ----------------------------------- |
182+
| `--benchmark` | Yes | | `tpch` or `tpcds` |
183+
| `--parquet-path` | Yes | | Path to source Parquet data |
184+
| `--warehouse` | Yes | | Path to Iceberg warehouse directory |
185+
| `--catalog` | No | `local` | Iceberg catalog name |
186+
| `--database` | No | benchmark name | Database name for the tables |
141187

142188
### Comparing Parquet vs Iceberg performance
143189

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
"""
19+
Convert TPC-H or TPC-DS Parquet data to Iceberg tables.
20+
21+
Usage:
22+
spark-submit \
23+
--packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \
24+
create-iceberg-tables.py \
25+
--benchmark tpch \
26+
--parquet-path /path/to/tpch/parquet \
27+
--warehouse /path/to/iceberg-warehouse
28+
29+
spark-submit \
30+
--packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \
31+
create-iceberg-tables.py \
32+
--benchmark tpcds \
33+
--parquet-path /path/to/tpcds/parquet \
34+
--warehouse /path/to/iceberg-warehouse
35+
"""
36+
37+
import argparse
38+
import os
39+
import sys
40+
from pyspark.sql import SparkSession
41+
import time
42+
43+
TPCH_TABLES = [
44+
"customer",
45+
"lineitem",
46+
"nation",
47+
"orders",
48+
"part",
49+
"partsupp",
50+
"region",
51+
"supplier",
52+
]
53+
54+
TPCDS_TABLES = [
55+
"call_center",
56+
"catalog_page",
57+
"catalog_returns",
58+
"catalog_sales",
59+
"customer",
60+
"customer_address",
61+
"customer_demographics",
62+
"date_dim",
63+
"time_dim",
64+
"household_demographics",
65+
"income_band",
66+
"inventory",
67+
"item",
68+
"promotion",
69+
"reason",
70+
"ship_mode",
71+
"store",
72+
"store_returns",
73+
"store_sales",
74+
"warehouse",
75+
"web_page",
76+
"web_returns",
77+
"web_sales",
78+
"web_site",
79+
]
80+
81+
BENCHMARK_TABLES = {
82+
"tpch": TPCH_TABLES,
83+
"tpcds": TPCDS_TABLES,
84+
}
85+
86+
87+
def main(benchmark: str, parquet_path: str, warehouse: str, catalog: str, database: str):
88+
table_names = BENCHMARK_TABLES[benchmark]
89+
90+
# Validate paths before starting Spark
91+
errors = []
92+
if not os.path.isdir(parquet_path):
93+
errors.append(f"Error: --parquet-path '{parquet_path}' does not exist or is not a directory")
94+
if not os.path.isdir(warehouse):
95+
errors.append(f"Error: --warehouse '{warehouse}' does not exist or is not a directory. "
96+
"Create it with: mkdir -p " + warehouse)
97+
if errors:
98+
for e in errors:
99+
print(e, file=sys.stderr)
100+
sys.exit(1)
101+
102+
spark = SparkSession.builder \
103+
.appName(f"Create Iceberg {benchmark.upper()} Tables") \
104+
.config(f"spark.sql.catalog.{catalog}", "org.apache.iceberg.spark.SparkCatalog") \
105+
.config(f"spark.sql.catalog.{catalog}.type", "hadoop") \
106+
.config(f"spark.sql.catalog.{catalog}.warehouse", warehouse) \
107+
.getOrCreate()
108+
109+
# Set the Iceberg catalog as the current catalog so that
110+
# namespace operations are routed correctly
111+
spark.sql(f"USE {catalog}")
112+
113+
# Create namespace if it doesn't exist
114+
try:
115+
spark.sql(f"CREATE NAMESPACE IF NOT EXISTS {database}")
116+
except Exception:
117+
# Namespace may already exist
118+
pass
119+
120+
for table in table_names:
121+
parquet_table_path = f"{parquet_path}/{table}.parquet"
122+
iceberg_table = f"{catalog}.{database}.{table}"
123+
124+
print(f"Converting {parquet_table_path} -> {iceberg_table}")
125+
start_time = time.time()
126+
127+
# Drop table if exists to allow re-running
128+
spark.sql(f"DROP TABLE IF EXISTS {iceberg_table}")
129+
130+
# Read parquet and write as Iceberg
131+
df = spark.read.parquet(parquet_table_path)
132+
df.writeTo(iceberg_table).using("iceberg").create()
133+
134+
row_count = spark.table(iceberg_table).count()
135+
elapsed = time.time() - start_time
136+
print(f" Created {iceberg_table} with {row_count} rows in {elapsed:.2f}s")
137+
138+
print(f"\nAll {benchmark.upper()} tables created successfully!")
139+
print(f"Tables available at: {catalog}.{database}.*")
140+
141+
spark.stop()
142+
143+
144+
if __name__ == "__main__":
145+
parser = argparse.ArgumentParser(
146+
description="Convert TPC-H or TPC-DS Parquet data to Iceberg tables"
147+
)
148+
parser.add_argument(
149+
"--benchmark", required=True, choices=["tpch", "tpcds"],
150+
help="Benchmark whose tables to convert (tpch or tpcds)"
151+
)
152+
parser.add_argument(
153+
"--parquet-path", required=True,
154+
help="Path to Parquet data directory"
155+
)
156+
parser.add_argument(
157+
"--warehouse", required=True,
158+
help="Path to Iceberg warehouse directory"
159+
)
160+
parser.add_argument(
161+
"--catalog", default="local",
162+
help="Iceberg catalog name (default: 'local')"
163+
)
164+
parser.add_argument(
165+
"--database", default=None,
166+
help="Database name to create tables in (defaults to benchmark name)"
167+
)
168+
args = parser.parse_args()
169+
170+
database = args.database if args.database else args.benchmark
171+
main(args.benchmark, args.parquet_path, args.warehouse, args.catalog, database)

0 commit comments

Comments
 (0)