Skip to content

Commit 77fdeb7

Browse files
authored
chore: Add consistency checks and result hashing to TPC benchmarks (#3582)
1 parent 9d2ad62 commit 77fdeb7

28 files changed

Lines changed: 266 additions & 94 deletions
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
[engine]
19+
name = "comet-hashjoin"
20+
21+
[env]
22+
required = ["COMET_JAR"]
23+
24+
[spark_submit]
25+
jars = ["$COMET_JAR"]
26+
driver_class_path = ["$COMET_JAR"]
27+
28+
[spark_conf]
29+
"spark.driver.extraClassPath" = "$COMET_JAR"
30+
"spark.executor.extraClassPath" = "$COMET_JAR"
31+
"spark.plugins" = "org.apache.spark.CometPlugin"
32+
"spark.shuffle.manager" = "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager"
33+
"spark.comet.scan.impl" = "native_datafusion"
34+
"spark.comet.exec.replaceSortMergeJoin" = "true"
35+
"spark.comet.expression.Cast.allowIncompatible" = "true"
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
[engine]
19+
name = "comet-iceberg-hashjoin"
20+
21+
[env]
22+
required = ["COMET_JAR", "ICEBERG_JAR", "ICEBERG_WAREHOUSE"]
23+
24+
[env.defaults]
25+
ICEBERG_CATALOG = "local"
26+
27+
[spark_submit]
28+
jars = ["$COMET_JAR", "$ICEBERG_JAR"]
29+
driver_class_path = ["$COMET_JAR", "$ICEBERG_JAR"]
30+
31+
[spark_conf]
32+
"spark.driver.extraClassPath" = "$COMET_JAR:$ICEBERG_JAR"
33+
"spark.executor.extraClassPath" = "$COMET_JAR:$ICEBERG_JAR"
34+
"spark.plugins" = "org.apache.spark.CometPlugin"
35+
"spark.shuffle.manager" = "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager"
36+
"spark.comet.exec.replaceSortMergeJoin" = "true"
37+
"spark.comet.expression.Cast.allowIncompatible" = "true"
38+
"spark.comet.enabled" = "true"
39+
"spark.comet.exec.enabled" = "true"
40+
"spark.comet.scan.icebergNative.enabled" = "true"
41+
"spark.comet.explainFallback.enabled" = "true"
42+
"spark.sql.catalog.${ICEBERG_CATALOG}" = "org.apache.iceberg.spark.SparkCatalog"
43+
"spark.sql.catalog.${ICEBERG_CATALOG}.type" = "hadoop"
44+
"spark.sql.catalog.${ICEBERG_CATALOG}.warehouse" = "$ICEBERG_WAREHOUSE"
45+
"spark.sql.defaultCatalog" = "${ICEBERG_CATALOG}"
46+
47+
[tpcbench_args]
48+
use_iceberg = true

benchmarks/tpc/engines/comet-iceberg.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ driver_class_path = ["$COMET_JAR", "$ICEBERG_JAR"]
3333
"spark.executor.extraClassPath" = "$COMET_JAR:$ICEBERG_JAR"
3434
"spark.plugins" = "org.apache.spark.CometPlugin"
3535
"spark.shuffle.manager" = "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager"
36-
"spark.comet.exec.replaceSortMergeJoin" = "true"
3736
"spark.comet.expression.Cast.allowIncompatible" = "true"
3837
"spark.comet.enabled" = "true"
3938
"spark.comet.exec.enabled" = "true"

benchmarks/tpc/engines/comet.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,5 +31,4 @@ driver_class_path = ["$COMET_JAR"]
3131
"spark.plugins" = "org.apache.spark.CometPlugin"
3232
"spark.shuffle.manager" = "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager"
3333
"spark.comet.scan.impl" = "native_datafusion"
34-
"spark.comet.exec.replaceSortMergeJoin" = "true"
3534
"spark.comet.expression.Cast.allowIncompatible" = "true"

benchmarks/tpc/generate-comparison.py

Lines changed: 102 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,88 @@
1717

1818
import argparse
1919
import json
20+
import logging
2021
import matplotlib.pyplot as plt
2122
import numpy as np
2223

24+
logging.basicConfig(level=logging.INFO)
25+
logger = logging.getLogger(__name__)
26+
2327
def geomean(data):
2428
return np.prod(data) ** (1 / len(data))
2529

26-
def generate_query_rel_speedup_chart(baseline, comparison, label1: str, label2: str, benchmark: str, title: str):
30+
def get_durations(result, query_key):
31+
"""Extract durations from a query result, supporting both old (list) and new (dict) formats."""
32+
value = result[query_key]
33+
if isinstance(value, dict):
34+
return value["durations"]
35+
return value
36+
37+
def get_all_queries(results):
38+
"""Return the sorted union of all query keys across all result sets."""
39+
all_keys = set()
40+
for result in results:
41+
all_keys.update(result.keys())
42+
# Filter to numeric query keys and sort numerically
43+
numeric_keys = []
44+
for k in all_keys:
45+
try:
46+
numeric_keys.append(int(k))
47+
except ValueError:
48+
pass
49+
return sorted(numeric_keys)
50+
51+
def get_common_queries(results, labels):
52+
"""Return queries present in ALL result sets, warning about queries missing from some files."""
53+
all_queries = get_all_queries(results)
54+
common = []
55+
for query in all_queries:
56+
key = str(query)
57+
present = [labels[i] for i, r in enumerate(results) if key in r]
58+
missing = [labels[i] for i, r in enumerate(results) if key not in r]
59+
if missing:
60+
logger.warning(f"Query {query}: present in [{', '.join(present)}] but missing from [{', '.join(missing)}]")
61+
if not missing:
62+
common.append(query)
63+
return common
64+
65+
def check_result_consistency(results, labels, benchmark):
66+
"""Log warnings if row counts or result hashes differ across result sets."""
67+
all_queries = get_all_queries(results)
68+
for query in all_queries:
69+
key = str(query)
70+
row_counts = []
71+
hashes = []
72+
for i, result in enumerate(results):
73+
if key not in result:
74+
continue
75+
value = result[key]
76+
if not isinstance(value, dict):
77+
continue
78+
if "row_count" in value:
79+
row_counts.append((labels[i], value["row_count"]))
80+
if "result_hash" in value:
81+
hashes.append((labels[i], value["result_hash"]))
82+
83+
if len(row_counts) > 1:
84+
counts = set(rc for _, rc in row_counts)
85+
if len(counts) > 1:
86+
details = ", ".join(f"{label}={rc}" for label, rc in row_counts)
87+
logger.warning(f"Query {query}: row count mismatch: {details}")
88+
89+
if len(hashes) > 1:
90+
hash_values = set(h for _, h in hashes)
91+
if len(hash_values) > 1:
92+
details = ", ".join(f"{label}={h}" for label, h in hashes)
93+
logger.warning(f"Query {query}: result hash mismatch: {details}")
94+
95+
def generate_query_rel_speedup_chart(baseline, comparison, label1: str, label2: str, benchmark: str, title: str, common_queries=None):
96+
if common_queries is None:
97+
common_queries = range(1, query_count(benchmark)+1)
2798
results = []
28-
for query in range(1, query_count(benchmark)+1):
29-
if query == 999:
30-
continue
31-
a = np.median(np.array(baseline[str(query)]))
32-
b = np.median(np.array(comparison[str(query)]))
99+
for query in common_queries:
100+
a = np.median(np.array(get_durations(baseline, str(query))))
101+
b = np.median(np.array(get_durations(comparison, str(query))))
33102
if a > b:
34103
speedup = a/b-1
35104
else:
@@ -80,13 +149,13 @@ def generate_query_rel_speedup_chart(baseline, comparison, label1: str, label2:
80149
# Save the plot as an image file
81150
plt.savefig(f'{benchmark}_queries_speedup_rel.png', format='png')
82151

83-
def generate_query_abs_speedup_chart(baseline, comparison, label1: str, label2: str, benchmark: str, title: str):
152+
def generate_query_abs_speedup_chart(baseline, comparison, label1: str, label2: str, benchmark: str, title: str, common_queries=None):
153+
if common_queries is None:
154+
common_queries = range(1, query_count(benchmark)+1)
84155
results = []
85-
for query in range(1, query_count(benchmark)+1):
86-
if query == 999:
87-
continue
88-
a = np.median(np.array(baseline[str(query)]))
89-
b = np.median(np.array(comparison[str(query)]))
156+
for query in common_queries:
157+
a = np.median(np.array(get_durations(baseline, str(query))))
158+
b = np.median(np.array(get_durations(comparison, str(query))))
90159
speedup = a-b
91160
results.append(("q" + str(query), round(speedup, 1)))
92161

@@ -130,17 +199,17 @@ def generate_query_abs_speedup_chart(baseline, comparison, label1: str, label2:
130199
# Save the plot as an image file
131200
plt.savefig(f'{benchmark}_queries_speedup_abs.png', format='png')
132201

133-
def generate_query_comparison_chart(results, labels, benchmark: str, title: str):
202+
def generate_query_comparison_chart(results, labels, benchmark: str, title: str, common_queries=None):
203+
if common_queries is None:
204+
common_queries = range(1, query_count(benchmark)+1)
134205
queries = []
135206
benches = []
136207
for _ in results:
137208
benches.append([])
138-
for query in range(1, query_count(benchmark)+1):
139-
if query == 999:
140-
continue
209+
for query in common_queries:
141210
queries.append("q" + str(query))
142211
for i in range(0, len(results)):
143-
benches[i].append(np.median(np.array(results[i][str(query)])))
212+
benches[i].append(np.median(np.array(get_durations(results[i], str(query)))))
144213

145214
# Define the width of the bars
146215
bar_width = 0.3
@@ -168,25 +237,25 @@ def generate_query_comparison_chart(results, labels, benchmark: str, title: str)
168237
# Save the plot as an image file
169238
plt.savefig(f'{benchmark}_queries_compare.png', format='png')
170239

171-
def generate_summary(results, labels, benchmark: str, title: str):
240+
def generate_summary(results, labels, benchmark: str, title: str, common_queries=None):
241+
if common_queries is None:
242+
common_queries = range(1, query_count(benchmark)+1)
172243
timings = []
173244
for _ in results:
174245
timings.append(0)
175246

176-
num_queries = query_count(benchmark)
177-
for query in range(1, num_queries + 1):
178-
if query == 999:
179-
continue
247+
num_queries = len([q for q in common_queries])
248+
for query in common_queries:
180249
for i in range(0, len(results)):
181-
timings[i] += np.median(np.array(results[i][str(query)]))
250+
timings[i] += np.median(np.array(get_durations(results[i], str(query))))
182251

183252
# Create figure and axis
184253
fig, ax = plt.subplots()
185254
fig.set_size_inches(10, 6)
186255

187256
# Add title and labels
188257
ax.set_title(title)
189-
ax.set_ylabel(f'Time in seconds to run all {num_queries} {benchmark} queries (lower is better)')
258+
ax.set_ylabel(f'Time in seconds to run {num_queries} {benchmark} queries (lower is better)')
190259

191260
times = [round(x,0) for x in timings]
192261

@@ -213,11 +282,16 @@ def main(files, labels, benchmark: str, title: str):
213282
for filename in files:
214283
with open(filename) as f:
215284
results.append(json.load(f))
216-
generate_summary(results, labels, benchmark, title)
217-
generate_query_comparison_chart(results, labels, benchmark, title)
285+
check_result_consistency(results, labels, benchmark)
286+
common_queries = get_common_queries(results, labels)
287+
if not common_queries:
288+
logger.error("No queries found in common across all result files")
289+
return
290+
generate_summary(results, labels, benchmark, title, common_queries)
291+
generate_query_comparison_chart(results, labels, benchmark, title, common_queries)
218292
if len(files) == 2:
219-
generate_query_abs_speedup_chart(results[0], results[1], labels[0], labels[1], benchmark, title)
220-
generate_query_rel_speedup_chart(results[0], results[1], labels[0], labels[1], benchmark, title)
293+
generate_query_abs_speedup_chart(results[0], results[1], labels[0], labels[1], benchmark, title, common_queries)
294+
generate_query_rel_speedup_chart(results[0], results[1], labels[0], labels[1], benchmark, title, common_queries)
221295

222296
if __name__ == '__main__':
223297
argparse = argparse.ArgumentParser(description='Generate comparison')

benchmarks/tpc/infra/docker/docker-compose-laptop.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,5 +93,6 @@ services:
9393
- ICEBERG_JAR=/jars/iceberg.jar
9494
- TPCH_DATA=/data
9595
- TPCDS_DATA=/data
96+
- SPARK_EVENT_LOG_DIR=/results/spark-events
9697
mem_limit: 4g
9798
memswap_limit: 4g

benchmarks/tpc/infra/docker/docker-compose.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ services:
107107
- ICEBERG_JAR=/jars/iceberg.jar
108108
- TPCH_DATA=/data
109109
- TPCDS_DATA=/data
110+
- SPARK_EVENT_LOG_DIR=/results/spark-events
110111
mem_limit: ${BENCH_MEM_LIMIT:-10g}
111112
memswap_limit: ${BENCH_MEM_LIMIT:-10g}
112113

benchmarks/tpc/queries/tpcds/q12.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ where
1818
and i_category in ('Jewelry', 'Books', 'Women')
1919
and ws_sold_date_sk = d_date_sk
2020
and d_date between cast('2002-03-22' as date)
21-
and (cast('2002-03-22' as date) + 30 days)
21+
and (cast('2002-03-22' as date) + INTERVAL '30 DAYS')
2222
group by
2323
i_item_id
2424
,i_item_desc

benchmarks/tpc/queries/tpcds/q16.sql

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,17 @@
22
-- TPC-DS queries are Copyright 2021 Transaction Processing Performance Council.
33
-- This query was generated at scale factor 1.
44
select
5-
count(distinct cs_order_number) as "order count"
6-
,sum(cs_ext_ship_cost) as "total shipping cost"
7-
,sum(cs_net_profit) as "total net profit"
5+
count(distinct cs_order_number) as `order count`
6+
,sum(cs_ext_ship_cost) as `total shipping cost`
7+
,sum(cs_net_profit) as `total net profit`
88
from
99
catalog_sales cs1
1010
,date_dim
1111
,customer_address
1212
,call_center
1313
where
1414
d_date between '1999-5-01' and
15-
(cast('1999-5-01' as date) + 60 days)
15+
(cast('1999-5-01' as date) + INTERVAL '60 DAYS')
1616
and cs1.cs_ship_date_sk = d_date_sk
1717
and cs1.cs_ship_addr_sk = ca_address_sk
1818
and ca_state = 'ID'

benchmarks/tpc/queries/tpcds/q20.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ select i_item_id
1616
and i_category in ('Children', 'Sports', 'Music')
1717
and cs_sold_date_sk = d_date_sk
1818
and d_date between cast('2002-04-01' as date)
19-
and (cast('2002-04-01' as date) + 30 days)
19+
and (cast('2002-04-01' as date) + INTERVAL '30 DAYS')
2020
group by i_item_id
2121
,i_item_desc
2222
,i_category

0 commit comments

Comments
 (0)