Skip to content

Commit 21b2920

Browse files
authored
Add DuckDB resource-limit options to benchmark runner (rapidsai#22266)
Adds three CLI flags (`--duckdb-threads`, `--duckdb-memory-limit`, `--duckdb-temp-dir`) to the benchmark harness to tune DuckDB's resource usage. Useful for avoiding OOM when running DuckDB as a baseline at large scale factors. The [DuckDB OOM guide](https://duckdb.org/docs/current/guides/troubleshooting/oom_errors#troubleshooting-out-of-memory-errors) recommends setting `memory_limit` to 50–60% of system RAM and reducing threads to lower peak memory pressure. Also fixes JSON serialization of column-name mismatch details in `asserts.py`. Authors: - Matthew Murray (https://github.com/Matt711) Approvers: - Mads R. B. Kristensen (https://github.com/madsbk) URL: rapidsai#22266
1 parent 7b18440 commit 21b2920

3 files changed

Lines changed: 95 additions & 6 deletions

File tree

python/cudf_polars/cudf_polars/experimental/benchmarks/asserts.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,8 +177,8 @@ def assert_tpch_result_equal(
177177
"expected_columns": right.columns,
178178
"result_columns": left.columns,
179179
"mismatched_columns": {
180-
"extra": extra,
181-
"missing": missing,
180+
"extra": sorted(extra),
181+
"missing": sorted(missing),
182182
},
183183
}
184184
raise ValidationError(message="Column names mismatch", details=detail)

python/cudf_polars/cudf_polars/experimental/benchmarks/utils_legacy.py

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,9 @@ class RunConfig:
395395
fallback_mode: str | None = None
396396
validation_method: ValidationMethod | None = None
397397
io_mode: Literal["cold", "lukewarm", "hot"] = "lukewarm"
398+
duckdb_threads: int | None = None
399+
duckdb_memory_limit: str | None = None
400+
duckdb_temp_dir: str | None = None
398401

399402
def __post_init__(self) -> None: # noqa: D105
400403
if self.gather_shuffle_stats and self.shuffle != "rapidsmpf":
@@ -531,6 +534,9 @@ def from_args(cls, args: argparse.Namespace) -> RunConfig:
531534
fallback_mode=args.fallback_mode,
532535
validation_method=validation_method,
533536
io_mode=args.io_mode,
537+
duckdb_threads=args.duckdb_threads,
538+
duckdb_memory_limit=args.duckdb_memory_limit,
539+
duckdb_temp_dir=args.duckdb_temp_dir,
534540
)
535541

536542
def serialize(self, engine: pl.GPUEngine | None) -> dict:
@@ -1248,6 +1254,24 @@ def build_parser(num_queries: int = 22) -> argparse.ArgumentParser:
12481254
- raise : Raise an exception
12491255
- silent : Silently fall back to single partition"""),
12501256
)
1257+
parser.add_argument(
1258+
"--duckdb-threads",
1259+
type=int,
1260+
default=None,
1261+
help="Number of threads for DuckDB to use. Defaults to os.cpu_count().",
1262+
)
1263+
parser.add_argument(
1264+
"--duckdb-memory-limit",
1265+
type=str,
1266+
default=None,
1267+
help="DuckDB memory limit (e.g. '500GB'). If unset, DuckDB uses its default.",
1268+
)
1269+
parser.add_argument(
1270+
"--duckdb-temp-dir",
1271+
type=str,
1272+
default=None,
1273+
help="Directory for DuckDB to spill temporary data to disk.",
1274+
)
12511275

12521276
return parser
12531277

@@ -1532,6 +1556,7 @@ def run_polars_query(
15321556
run_config.dataset_path,
15331557
query_set=duckdb_queries_cls.name,
15341558
suffix=run_config.suffix,
1559+
run_config=run_config,
15351560
).with_columns(*casts)
15361561
else:
15371562
raise ValueError(f"Invalid baseline: {args.baseline}")
@@ -1938,13 +1963,28 @@ def inject(
19381963
]
19391964

19401965

1966+
def _make_duckdb_config(run_config: RunConfig | None) -> dict[str, Any]:
1967+
"""Build a DuckDB connection config dict from a RunConfig."""
1968+
config: dict[str, Any] = {
1969+
"threads": run_config.duckdb_threads
1970+
if (run_config and run_config.duckdb_threads is not None)
1971+
else os.cpu_count(),
1972+
}
1973+
if run_config and run_config.duckdb_memory_limit is not None:
1974+
config["memory_limit"] = run_config.duckdb_memory_limit
1975+
if run_config and run_config.duckdb_temp_dir is not None:
1976+
config["temp_directory"] = run_config.duckdb_temp_dir
1977+
return config
1978+
1979+
19411980
def print_duckdb_plan(
19421981
q_id: int,
19431982
sql: str,
19441983
dataset_path: Path,
19451984
suffix: str,
19461985
query_set: str,
19471986
args: argparse.Namespace,
1987+
run_config: RunConfig | None = None,
19481988
) -> None:
19491989
"""Print DuckDB query plan using EXPLAIN."""
19501990
if duckdb is None:
@@ -1955,7 +1995,7 @@ def print_duckdb_plan(
19551995
else:
19561996
tbl_names = PDSH_TABLE_NAMES
19571997

1958-
with duckdb.connect() as conn:
1998+
with duckdb.connect(config=_make_duckdb_config(run_config)) as conn:
19591999
for name in tbl_names:
19602000
pattern = f"{dataset_path}/{name}{suffix}"
19612001
conn.execute(
@@ -1983,6 +2023,7 @@ def execute_duckdb_query(
19832023
*,
19842024
suffix: str = ".parquet",
19852025
query_set: str = "pdsh",
2026+
run_config: RunConfig | None = None,
19862027
) -> pl.DataFrame:
19872028
"""Execute a query with DuckDB."""
19882029
if duckdb is None:
@@ -1991,7 +2032,7 @@ def execute_duckdb_query(
19912032
tbl_names = PDSDS_TABLE_NAMES
19922033
else:
19932034
tbl_names = PDSH_TABLE_NAMES
1994-
with duckdb.connect() as conn:
2035+
with duckdb.connect(config=_make_duckdb_config(run_config)) as conn:
19952036
for name in tbl_names:
19962037
pattern = f"{dataset_path}/{name}{suffix}"
19972038
conn.execute(
@@ -2023,6 +2064,7 @@ def run_duckdb(duckdb_queries_cls: Any, args: argparse.Namespace) -> None:
20232064
suffix=run_config.suffix,
20242065
query_set=duckdb_queries_cls.name,
20252066
args=args,
2067+
run_config=run_config,
20262068
)
20272069

20282070
print(f"DuckDB Executing: {q_id}")
@@ -2037,6 +2079,7 @@ def run_duckdb(duckdb_queries_cls: Any, args: argparse.Namespace) -> None:
20372079
run_config.dataset_path,
20382080
suffix=run_config.suffix,
20392081
query_set=duckdb_queries_cls.name,
2082+
run_config=run_config,
20402083
)
20412084
t1 = time.time()
20422085
record = SuccessRecord(query=q_id, iteration=i, duration=t1 - t0)

python/cudf_polars/cudf_polars/experimental/benchmarks/utils_new_frontends.py

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,11 @@ class RunConfig:
409409
# Validation
410410
validation_method: ValidationMethod | None = None
411411

412+
# DuckDB configuration
413+
duckdb_threads: int | None = None
414+
duckdb_memory_limit: str | None = None
415+
duckdb_temp_dir: str | None = None
416+
412417
# Metadata / output (populated at runtime)
413418
n_workers: int = 1
414419
extra_info: dict[str, Any] = dataclasses.field(default_factory=dict)
@@ -534,6 +539,9 @@ def from_args(cls, args: argparse.Namespace) -> RunConfig:
534539
num_gpus=args.num_gpus,
535540
validation_method=validation_method,
536541
extra_info=args.extra_info,
542+
duckdb_threads=args.duckdb_threads,
543+
duckdb_memory_limit=args.duckdb_memory_limit,
544+
duckdb_temp_dir=args.duckdb_temp_dir,
537545
)
538546

539547
def serialize(self, engine: pl.GPUEngine | None) -> dict:
@@ -928,6 +936,7 @@ def run_polars_query(
928936
run_config.dataset_path,
929937
query_set=duckdb_queries_cls.name,
930938
suffix=run_config.suffix,
939+
run_config=run_config,
931940
).with_columns(*casts)
932941
else:
933942
raise ValueError(f"Invalid baseline: {args.baseline}")
@@ -1423,13 +1432,28 @@ def sort_key(x: dict) -> tuple[int, int]:
14231432
]
14241433

14251434

1435+
def _make_duckdb_config(run_config: RunConfig | None) -> dict[str, Any]:
1436+
"""Build a DuckDB connection config dict from a RunConfig."""
1437+
config: dict[str, Any] = {
1438+
"threads": run_config.duckdb_threads
1439+
if (run_config and run_config.duckdb_threads is not None)
1440+
else os.cpu_count(),
1441+
}
1442+
if run_config and run_config.duckdb_memory_limit is not None:
1443+
config["memory_limit"] = run_config.duckdb_memory_limit
1444+
if run_config and run_config.duckdb_temp_dir is not None:
1445+
config["temp_directory"] = run_config.duckdb_temp_dir
1446+
return config
1447+
1448+
14261449
def print_duckdb_plan(
14271450
q_id: int,
14281451
sql: str,
14291452
dataset_path: Path,
14301453
suffix: str,
14311454
query_set: str,
14321455
args: argparse.Namespace,
1456+
run_config: RunConfig | None = None,
14331457
) -> None:
14341458
"""Print DuckDB query plan using EXPLAIN."""
14351459
if duckdb is None:
@@ -1440,7 +1464,7 @@ def print_duckdb_plan(
14401464
else:
14411465
tbl_names = PDSH_TABLE_NAMES
14421466

1443-
with duckdb.connect() as conn:
1467+
with duckdb.connect(config=_make_duckdb_config(run_config)) as conn:
14441468
for name in tbl_names:
14451469
pattern = (Path(dataset_path) / name).as_posix() + suffix
14461470
conn.execute(
@@ -1468,6 +1492,7 @@ def execute_duckdb_query(
14681492
*,
14691493
suffix: str = ".parquet",
14701494
query_set: str = "pdsh",
1495+
run_config: RunConfig | None = None,
14711496
) -> pl.DataFrame:
14721497
"""Execute a query with DuckDB."""
14731498
if duckdb is None:
@@ -1476,7 +1501,7 @@ def execute_duckdb_query(
14761501
tbl_names = PDSDS_TABLE_NAMES
14771502
else:
14781503
tbl_names = PDSH_TABLE_NAMES
1479-
with duckdb.connect() as conn:
1504+
with duckdb.connect(config=_make_duckdb_config(run_config)) as conn:
14801505
for name in tbl_names:
14811506
pattern = (Path(dataset_path) / name).as_posix() + suffix
14821507
conn.execute(
@@ -1508,6 +1533,7 @@ def run_duckdb(duckdb_queries_cls: Any, args: argparse.Namespace) -> None:
15081533
suffix=run_config.suffix,
15091534
query_set=duckdb_queries_cls.name,
15101535
args=args,
1536+
run_config=run_config,
15111537
)
15121538

15131539
print(f"DuckDB Executing: {q_id}")
@@ -1522,6 +1548,7 @@ def run_duckdb(duckdb_queries_cls: Any, args: argparse.Namespace) -> None:
15221548
run_config.dataset_path,
15231549
suffix=run_config.suffix,
15241550
query_set=duckdb_queries_cls.name,
1551+
run_config=run_config,
15251552
)
15261553
t1 = time.time()
15271554
record = SuccessRecord(query=q_id, iteration=i, duration=t1 - t0)
@@ -1846,6 +1873,25 @@ def build_parser(num_queries: int = 22) -> argparse.ArgumentParser:
18461873
help="Extra information to add to the output file (must be JSON-serializable).",
18471874
)
18481875

1876+
parser.add_argument(
1877+
"--duckdb-threads",
1878+
type=int,
1879+
default=None,
1880+
help="Number of threads for DuckDB to use. Defaults to os.cpu_count().",
1881+
)
1882+
parser.add_argument(
1883+
"--duckdb-memory-limit",
1884+
type=str,
1885+
default=None,
1886+
help="DuckDB memory limit (e.g. '500GB'). If unset, DuckDB uses its default.",
1887+
)
1888+
parser.add_argument(
1889+
"--duckdb-temp-dir",
1890+
type=str,
1891+
default=None,
1892+
help="Directory for DuckDB to spill temporary data to disk.",
1893+
)
1894+
18491895
StreamingOptions._add_cli_args(parser)
18501896

18511897
# Trap legacy flags so we can emit clear errors.

0 commit comments

Comments
 (0)