Skip to content

Commit 70e9a6a

Browse files
authored
fix(sql/duckdb): stream lazy-polars output via pl(lazy=True) (#9648)
1 parent 10e3a18 commit 70e9a6a

2 files changed

Lines changed: 106 additions & 1 deletion

File tree

marimo/_sql/engines/duckdb.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,22 @@ def to_polars() -> pl.DataFrame:
9191
# instead of relation.pl() so that pyarrow is not required.
9292
return pl.DataFrame(relation)
9393

94+
def to_lazy_polars() -> pl.LazyFrame:
95+
# `lazy=True` requires DuckDB >= 1.4 and pyarrow. Fall back to the
96+
# Arrow PyCapsule path on older DuckDB or when pyarrow is missing.
97+
# batch_size of 100k bounds peak memory at ~10x less than DuckDB's
98+
# 1M default while keeping per-batch overhead negligible.
99+
try:
100+
return relation.pl(batch_size=100_000, lazy=True)
101+
except (TypeError, ImportError, ModuleNotFoundError):
102+
return to_polars().lazy()
103+
94104
return convert_to_output(
95105
sql_output_format=sql_output_format,
96106
to_polars=to_polars,
97107
to_pandas=lambda: relation.df(),
98108
to_native=lambda: relation,
109+
to_lazy_polars=to_lazy_polars,
99110
)
100111

101112
@staticmethod

tests/_sql/test_duckdb.py

Lines changed: 95 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
import sys
66
from copy import deepcopy
7-
from typing import TYPE_CHECKING
7+
from typing import TYPE_CHECKING, Any
88
from unittest import mock
99

1010
import pytest
@@ -345,3 +345,97 @@ def test_duckdb_engine_polars_no_pyarrow(
345345
result.collect() if expected_type_name == "LazyFrame" else result
346346
)
347347
assert len(materialized) == 4
348+
349+
350+
class _RelationProxy:
351+
# _duckdb.DuckDBPyRelation is a pybind class and rejects attribute
352+
# assignment, so we wrap it to intercept .pl().
353+
def __init__(self, relation: Any, pl_override: Any) -> None:
354+
self._relation = relation
355+
self._pl_override = pl_override
356+
357+
def pl(self, *args: Any, **kwargs: Any) -> Any:
358+
return self._pl_override(self._relation, *args, **kwargs)
359+
360+
def __getattr__(self, name: str) -> Any:
361+
return getattr(self._relation, name)
362+
363+
364+
def _run_with_pl_spy(
365+
duckdb_connection: duckdb.DuckDBPyConnection,
366+
pl_impl: Any,
367+
) -> tuple[Any, list[dict]]:
368+
"""Execute a lazy-polars query with `pl_impl` wrapping the real `pl()`."""
369+
from marimo._sql.engines import duckdb as duckdb_engine_mod
370+
371+
pl_calls: list[dict] = []
372+
real_wrapped_sql = duckdb_engine_mod.wrapped_sql
373+
374+
def spy(relation: Any, *args: Any, **kwargs: Any) -> Any:
375+
pl_calls.append(kwargs)
376+
return pl_impl(relation, *args, **kwargs)
377+
378+
def spy_wrapped_sql(query: str, connection: Any) -> Any:
379+
return _RelationProxy(real_wrapped_sql(query, connection), spy)
380+
381+
with (
382+
mock.patch.object(
383+
DuckDBEngine, "sql_output_format", return_value="lazy-polars"
384+
),
385+
mock.patch.object(
386+
duckdb_engine_mod, "wrapped_sql", side_effect=spy_wrapped_sql
387+
),
388+
):
389+
engine = DuckDBEngine(
390+
duckdb_connection, engine_name=VariableName("test_duckdb")
391+
)
392+
result = engine.execute("SELECT * FROM test ORDER BY id")
393+
394+
return result, pl_calls
395+
396+
397+
@pytest.mark.skipif(
398+
not HAS_DUCKDB or not HAS_POLARS,
399+
reason="DuckDB and Polars not installed",
400+
)
401+
def test_duckdb_engine_lazy_polars_uses_streaming(
402+
duckdb_connection: duckdb.DuckDBPyConnection,
403+
) -> None:
404+
# Regression test for #9639: lazy-polars output must stream via
405+
# pl(lazy=True), not eagerly materialize then .lazy().
406+
import polars as pl
407+
408+
def pl_impl(relation: Any, *args: Any, **kwargs: Any) -> Any:
409+
return relation.pl(*args, **kwargs)
410+
411+
result, pl_calls = _run_with_pl_spy(duckdb_connection, pl_impl)
412+
413+
assert isinstance(result, pl.LazyFrame)
414+
assert len(result.collect()) == 4
415+
assert pl_calls == [{"batch_size": 100_000, "lazy": True}]
416+
417+
418+
@pytest.mark.skipif(
419+
not HAS_DUCKDB or not HAS_POLARS,
420+
reason="DuckDB and Polars not installed",
421+
)
422+
def test_duckdb_engine_lazy_polars_falls_back_on_older_duckdb(
423+
duckdb_connection: duckdb.DuckDBPyConnection,
424+
) -> None:
425+
# Regression test for #9639: DuckDB <1.4 rejects the `lazy` kwarg, and
426+
# `pl(lazy=True)` also fails without pyarrow. Both must fall back to the
427+
# Arrow PyCapsule path.
428+
import polars as pl
429+
430+
def pl_impl(relation: Any, *args: Any, **kwargs: Any) -> Any:
431+
if "lazy" in kwargs:
432+
raise TypeError("pl() got an unexpected keyword argument 'lazy'")
433+
return relation.pl(*args, **kwargs)
434+
435+
result, pl_calls = _run_with_pl_spy(duckdb_connection, pl_impl)
436+
437+
assert isinstance(result, pl.LazyFrame)
438+
assert len(result.collect()) == 4
439+
# Only the first call (lazy=True, raises) reaches `pl`; the fallback uses
440+
# `to_polars()` (Arrow PyCapsule) and never touches `relation.pl()`.
441+
assert pl_calls == [{"batch_size": 100_000, "lazy": True}]

0 commit comments

Comments
 (0)