Skip to content

Commit 0f67fe2

Browse files
committed
test(benchmark): DFP on-vs-off harness + cluster rebuilder + fact-side index
Extends TpcdsBenchmarkRunner to sweep DFP state per lance run: - --dfp-mode {on,off,both,default} flag. When "both", each lance query is run twice (once pinning spark.lance.runtime.filtering.enabled=true, once false); non-lance formats run once with dfp_mode="n/a" - dfp_mode column in the output CSV - BenchmarkReporter emits a separate "DFP On-vs-Off Comparison (Lance only)" table with per-query median wall-clock, speedup, fragmentsScanned on/off, pruning %, plus a geometric-mean summary - TpcdsQueryRunner now captures the fragmentsScanned SQLMetric by walking the executed plan (reaching through AdaptiveSparkPlanExec via reflection) and stores it on QueryMetrics.lanceFragmentsScanned Supporting pieces: - DfpClusterRebuilder: Spark job that re-sorts a Lance table by a target column so downstream zonemap index bounds are tight per fragment (DFP benefit requires fact-side data clustering) - index/store_sales.sql: fact-side btree indexes on ss_sold_date_sk, ss_item_sk, ss_customer_sk, ss_store_sk, ss_promo_sk — the join keys DFP needs to operate on - TpcdsIndexBuilder adds store_sales to the table list so the above resource is applied by the standard indexing step - run-benchmark.sh forwards a DFP_MODE env var through to --dfp-mode End-to-end DFP activity is still blocked upstream by Lance's btree getZonemapStats returning "must be retrained" for indexes built with lance-core 6.0.0-beta.2 — see lance-format#475 which propagates index_details from distributed index creation and uses the safer describeIndices(criteria) overload. Once lance-format#475 merges this harness produces real pruning numbers.
1 parent 9f42049 commit 0f67fe2

9 files changed

Lines changed: 519 additions & 35 deletions

File tree

benchmark/scripts/run-benchmark.sh

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ fi
8585
if [ -n "${QUERIES:-}" ]; then
8686
BENCHMARK_EXTRA_ARGS="${BENCHMARK_EXTRA_ARGS} --queries ${QUERIES}"
8787
fi
88+
if [ -n "${DFP_MODE:-}" ]; then
89+
BENCHMARK_EXTRA_ARGS="${BENCHMARK_EXTRA_ARGS} --dfp-mode ${DFP_MODE}"
90+
fi
8891

8992
${SPARK_SUBMIT} \
9093
--class org.lance.spark.benchmark.TpcdsBenchmarkRunner \

benchmark/src/main/java/org/lance/spark/benchmark/BenchmarkReporter.java

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,129 @@ public void printSummary() {
183183
}
184184

185185
System.out.printf("Queries passed: %d, partial/failed: %d%n", passCount, failCount);
186+
187+
printDfpComparison();
188+
}
189+
190+
/**
191+
* Emit a separate DFP on-vs-off table when the input contains both {@code dfp_mode="on"} and
192+
* {@code dfp_mode="off"} rows for the same query+format. Silent no-op otherwise so the regular
193+
* Parquet-vs-Lance run (no DFP sweep) doesn't grow an irrelevant extra section.
194+
*/
195+
private void printDfpComparison() {
196+
// Collect per-query medians split by dfpMode, restricted to the lance format since only
197+
// lance honors the flag. A row is comparable only when both "on" and "off" measurements
198+
// exist.
199+
Map<String, Long> medianOn = new LinkedHashMap<>();
200+
Map<String, Long> medianOff = new LinkedHashMap<>();
201+
Map<String, Long> fragmentsOn = new LinkedHashMap<>();
202+
Map<String, Long> fragmentsOff = new LinkedHashMap<>();
203+
204+
Map<String, List<Long>> timesOn = new LinkedHashMap<>();
205+
Map<String, List<Long>> timesOff = new LinkedHashMap<>();
206+
207+
for (BenchmarkResult r : results) {
208+
if (!r.isSuccess() || !"lance".equals(r.getFormat())) {
209+
continue;
210+
}
211+
Map<String, List<Long>> bucket = null;
212+
if (BenchmarkResult.DFP_ON.equals(r.getDfpMode())) {
213+
bucket = timesOn;
214+
} else if (BenchmarkResult.DFP_OFF.equals(r.getDfpMode())) {
215+
bucket = timesOff;
216+
}
217+
if (bucket != null) {
218+
bucket.computeIfAbsent(r.getQueryName(), k -> new ArrayList<>()).add(r.getElapsedMs());
219+
// Record the first observed fragmentsScanned per query+mode. The metric is plan-level
220+
// and identical across iterations of the same query, so there's no need to median it.
221+
QueryMetrics qm = r.getMetrics();
222+
if (qm != null && qm.getLanceFragmentsScanned() >= 0) {
223+
Map<String, Long> fragments =
224+
BenchmarkResult.DFP_ON.equals(r.getDfpMode()) ? fragmentsOn : fragmentsOff;
225+
fragments.putIfAbsent(r.getQueryName(), qm.getLanceFragmentsScanned());
226+
}
227+
}
228+
}
229+
230+
for (Map.Entry<String, List<Long>> e : timesOn.entrySet()) {
231+
List<Long> t = e.getValue();
232+
t.sort(Long::compareTo);
233+
medianOn.put(e.getKey(), t.get(t.size() / 2));
234+
}
235+
for (Map.Entry<String, List<Long>> e : timesOff.entrySet()) {
236+
List<Long> t = e.getValue();
237+
t.sort(Long::compareTo);
238+
medianOff.put(e.getKey(), t.get(t.size() / 2));
239+
}
240+
241+
// Only emit the section when we have paired on/off measurements on at least one query.
242+
boolean hasPair = false;
243+
for (String q : medianOn.keySet()) {
244+
if (medianOff.containsKey(q)) {
245+
hasPair = true;
246+
break;
247+
}
248+
}
249+
if (!hasPair) {
250+
return;
251+
}
252+
253+
System.out.println();
254+
System.out.println("=== DFP On-vs-Off Comparison (Lance only) ===");
255+
System.out.println();
256+
System.out.printf(
257+
"%-8s %10s %10s %8s %10s %10s %8s%n",
258+
"Query", "OFF(ms)", "ON(ms)", "Speedup", "Frags OFF", "Frags ON", "Pruned%");
259+
System.out.println("-".repeat(70));
260+
261+
List<Double> speedups = new ArrayList<>();
262+
List<Double> prunePcts = new ArrayList<>();
263+
int firedCount = 0;
264+
265+
for (String q : medianOn.keySet()) {
266+
if (!medianOff.containsKey(q)) {
267+
continue;
268+
}
269+
long on = medianOn.get(q);
270+
long off = medianOff.get(q);
271+
double speedup = off > 0 ? (double) off / on : 0.0;
272+
speedups.add(speedup);
273+
274+
Long fOn = fragmentsOn.get(q);
275+
Long fOff = fragmentsOff.get(q);
276+
String fOnStr = fOn == null || fOn < 0 ? "-" : String.valueOf(fOn);
277+
String fOffStr = fOff == null || fOff < 0 ? "-" : String.valueOf(fOff);
278+
String prunedPctStr = "-";
279+
if (fOn != null && fOff != null && fOff > 0 && fOn >= 0) {
280+
double pct = 100.0 * (fOff - fOn) / fOff;
281+
prunedPctStr = String.format("%.1f%%", pct);
282+
prunePcts.add(pct);
283+
if (fOn < fOff) {
284+
firedCount++;
285+
}
286+
}
287+
288+
System.out.printf(
289+
"%-8s %10d %10d %8.2fx %10s %10s %8s%n",
290+
q, off, on, speedup, fOffStr, fOnStr, prunedPctStr);
291+
}
292+
293+
System.out.println();
294+
if (!speedups.isEmpty()) {
295+
double logSum = 0;
296+
for (double s : speedups) {
297+
logSum += Math.log(s);
298+
}
299+
System.out.printf(
300+
"Geometric mean speedup (OFF/ON): %.2fx across %d queries%n",
301+
Math.exp(logSum / speedups.size()), speedups.size());
302+
}
303+
if (!prunePcts.isEmpty()) {
304+
double avg = prunePcts.stream().mapToDouble(Double::doubleValue).average().orElse(0.0);
305+
System.out.printf(
306+
"DFP fired on %d / %d queries; mean fragment reduction: %.1f%%%n",
307+
firedCount, prunePcts.size(), avg);
308+
}
186309
}
187310

188311
private QueryMetrics findMetricsForQuery(String queryName) {

benchmark/src/main/java/org/lance/spark/benchmark/BenchmarkResult.java

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,24 @@
1515

1616
public class BenchmarkResult {
1717

18+
/**
19+
* DFP toggle state during this run. For Lance: {@code "on"} or {@code "off"}. For non-Lance
20+
* scans the flag has no effect, so we record {@code "n/a"} to make cross-format joins on the
21+
* CSV unambiguous.
22+
*/
23+
public static final String DFP_ON = "on";
24+
25+
public static final String DFP_OFF = "off";
26+
public static final String DFP_NA = "n/a";
27+
1828
private final String queryName;
1929
private final String format;
2030
private final int iteration;
2131
private final long elapsedMs;
2232
private final boolean success;
2333
private final String errorMessage;
2434
private final QueryMetrics metrics;
35+
private final String dfpMode;
2536

2637
private BenchmarkResult(
2738
String queryName,
@@ -30,29 +41,56 @@ private BenchmarkResult(
3041
long elapsedMs,
3142
boolean success,
3243
String errorMessage,
33-
QueryMetrics metrics) {
44+
QueryMetrics metrics,
45+
String dfpMode) {
3446
this.queryName = queryName;
3547
this.format = format;
3648
this.iteration = iteration;
3749
this.elapsedMs = elapsedMs;
3850
this.success = success;
3951
this.errorMessage = errorMessage;
4052
this.metrics = metrics;
53+
this.dfpMode = dfpMode == null ? DFP_NA : dfpMode;
4154
}
4255

4356
public static BenchmarkResult success(
4457
String queryName, String format, int iteration, long elapsedMs) {
45-
return new BenchmarkResult(queryName, format, iteration, elapsedMs, true, null, null);
58+
return new BenchmarkResult(
59+
queryName, format, iteration, elapsedMs, true, null, null, DFP_NA);
4660
}
4761

4862
public static BenchmarkResult success(
4963
String queryName, String format, int iteration, long elapsedMs, QueryMetrics metrics) {
50-
return new BenchmarkResult(queryName, format, iteration, elapsedMs, true, null, metrics);
64+
return new BenchmarkResult(
65+
queryName, format, iteration, elapsedMs, true, null, metrics, DFP_NA);
66+
}
67+
68+
public static BenchmarkResult success(
69+
String queryName,
70+
String format,
71+
int iteration,
72+
long elapsedMs,
73+
QueryMetrics metrics,
74+
String dfpMode) {
75+
return new BenchmarkResult(
76+
queryName, format, iteration, elapsedMs, true, null, metrics, dfpMode);
5177
}
5278

5379
public static BenchmarkResult failure(
5480
String queryName, String format, int iteration, long elapsedMs, String errorMessage) {
55-
return new BenchmarkResult(queryName, format, iteration, elapsedMs, false, errorMessage, null);
81+
return new BenchmarkResult(
82+
queryName, format, iteration, elapsedMs, false, errorMessage, null, DFP_NA);
83+
}
84+
85+
public static BenchmarkResult failure(
86+
String queryName,
87+
String format,
88+
int iteration,
89+
long elapsedMs,
90+
String errorMessage,
91+
String dfpMode) {
92+
return new BenchmarkResult(
93+
queryName, format, iteration, elapsedMs, false, errorMessage, null, dfpMode);
5694
}
5795

5896
public String getQueryName() {
@@ -83,12 +121,17 @@ public QueryMetrics getMetrics() {
83121
return metrics;
84122
}
85123

124+
public String getDfpMode() {
125+
return dfpMode;
126+
}
127+
86128
public String toCsvLine() {
87129
String base =
88130
String.join(
89131
",",
90132
queryName,
91133
format,
134+
dfpMode,
92135
String.valueOf(iteration),
93136
String.valueOf(elapsedMs),
94137
String.valueOf(success),
@@ -100,7 +143,7 @@ public String toCsvLine() {
100143
}
101144

102145
public static String csvHeader() {
103-
return "query,format,iteration,elapsed_ms,success,error";
146+
return "query,format,dfp_mode,iteration,elapsed_ms,success,error";
104147
}
105148

106149
public static String csvHeaderWithMetrics() {
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package org.lance.spark.benchmark;
15+
16+
import org.apache.spark.sql.Dataset;
17+
import org.apache.spark.sql.Row;
18+
import org.apache.spark.sql.SaveMode;
19+
import org.apache.spark.sql.SparkSession;
20+
21+
/**
22+
* Re-sorts an existing Lance table by a given column so downstream zonemap indexes produce
23+
* tight per-fragment min/max bounds. A DFP run against an unclustered fact table has nothing
24+
* to prune because every fragment's zone spans the full join-key domain.
25+
*
26+
* <p>The rebuild is done out-of-place: the source table is read, sorted, and written to
27+
* {@code <table>_clustered.lance}, which the caller then swaps in (e.g. via {@code mv} in the
28+
* orchestrating shell script). Destination-in-place rewriting would require a delete-&-rewrite
29+
* inside Lance and is out of scope here.
30+
*
31+
* <p>Usage:
32+
* <pre>
33+
* spark-submit --class org.lance.spark.benchmark.DfpClusterRebuilder benchmark.jar \
34+
* --src /path/to/tpcds/lance/store_sales.lance \
35+
* --dst /path/to/tpcds/lance/store_sales_clustered.lance \
36+
* --sort-by ss_sold_date_sk
37+
* </pre>
38+
*/
39+
public class DfpClusterRebuilder {
40+
41+
public static void main(String[] args) {
42+
String src = null;
43+
String dst = null;
44+
String sortBy = null;
45+
46+
for (int i = 0; i < args.length; i++) {
47+
switch (args[i]) {
48+
case "--src":
49+
src = args[++i];
50+
break;
51+
case "--dst":
52+
dst = args[++i];
53+
break;
54+
case "--sort-by":
55+
sortBy = args[++i];
56+
break;
57+
default:
58+
System.err.println("Unknown argument: " + args[i]);
59+
printUsage();
60+
System.exit(1);
61+
}
62+
}
63+
64+
if (src == null || dst == null || sortBy == null) {
65+
System.err.println("Missing required arguments.");
66+
printUsage();
67+
System.exit(1);
68+
}
69+
70+
SparkSession spark =
71+
SparkSession.builder().appName("DFP Cluster Rebuilder: " + sortBy).getOrCreate();
72+
73+
try {
74+
System.out.println("=== DFP Cluster Rebuilder ===");
75+
System.out.println("Source: " + src);
76+
System.out.println("Dest: " + dst);
77+
System.out.println("Sort by: " + sortBy);
78+
long start = System.currentTimeMillis();
79+
80+
Dataset<Row> df = spark.read().format("lance").load(src);
81+
long rowCount = df.count();
82+
System.out.println("Rows: " + rowCount);
83+
84+
// sortWithinPartitions keeps fragment locality roughly intact; a full orderBy would
85+
// introduce a shuffle + reduce partitioning, which we don't want for the fixture.
86+
// For DFP's purposes what matters is intra-fragment clustering, which sortWithinPartitions
87+
// achieves given the default Spark partitioning = one fragment per partition on read.
88+
df.sortWithinPartitions(sortBy)
89+
.write()
90+
.mode(SaveMode.ErrorIfExists)
91+
.format("lance")
92+
.save(dst);
93+
94+
long elapsed = System.currentTimeMillis() - start;
95+
System.out.printf("Wrote %s in %d ms%n", dst, elapsed);
96+
} finally {
97+
spark.stop();
98+
}
99+
}
100+
101+
private static void printUsage() {
102+
System.err.println(
103+
"Usage: DfpClusterRebuilder --src <path> --dst <path> --sort-by <column>");
104+
}
105+
}

0 commit comments

Comments
 (0)