Skip to content
This repository was archived by the owner on Apr 1, 2026. It is now read-only.

Commit a6542a1

Browse files
feat: Experimental datafusion backend
1 parent 6b0509b commit a6542a1

File tree

14 files changed

+813
-0
lines changed

14 files changed

+813
-0
lines changed

bigframes/_config/bigquery_options.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ def __init__(
9696
Tuple[str, requests.adapters.BaseAdapter]
9797
] = (),
9898
enable_polars_execution: bool = False,
99+
enable_datafusion_execution: bool = False,
99100
):
100101
self._credentials = credentials
101102
self._project = project
@@ -119,6 +120,10 @@ def __init__(
119120
bigframes._importing.import_polars()
120121
self._enable_polars_execution = enable_polars_execution
121122

123+
if enable_datafusion_execution:
124+
bigframes._importing.import_datafusion()
125+
self._enable_datafusion_execution = enable_datafusion_execution
126+
122127
@property
123128
def application_name(self) -> Optional[str]:
124129
"""The application name to amend to the user-agent sent to Google APIs.
@@ -503,3 +508,29 @@ def enable_polars_execution(self, value: bool):
503508
warnings.warn(msg, category=bfe.PreviewWarning)
504509
bigframes._importing.import_polars()
505510
self._enable_polars_execution = value
511+
512+
@property
513+
def enable_datafusion_execution(self) -> bool:
514+
"""If True, will use datafusion to execute some simple query plans locally.
515+
516+
**Examples:**
517+
518+
>>> import bigframes.pandas as bpd
519+
>>> bpd.options.bigquery.enable_datafusion_execution = True # doctest: +SKIP
520+
521+
"""
522+
return self._enable_datafusion_execution
523+
524+
@enable_datafusion_execution.setter
525+
def enable_datafusion_execution(self, value: bool):
526+
if self._session_started and self._enable_datafusion_execution != value:
527+
raise ValueError(
528+
SESSION_STARTED_MESSAGE.format(attribute="enable_datafusion_execution")
529+
)
530+
if value is True:
531+
msg = bfe.format_message(
532+
"DataFusion execution is an experimental feature, and may not be stable. Must have datafusion installed."
533+
)
534+
warnings.warn(msg, category=bfe.PreviewWarning)
535+
bigframes._importing.import_datafusion()
536+
self._enable_datafusion_execution = value

bigframes/_importing.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,17 @@ def import_polars() -> ModuleType:
3333
f"Imported polars version is likely below the minimum version: {POLARS_MIN_VERSION}"
3434
)
3535
return polars_module
36+
37+
38+
DATAFUSION_MIN_VERSION = version.Version("52.0.0")
39+
40+
41+
def import_datafusion() -> ModuleType:
42+
datafusion_module = importlib.import_module("datafusion")
43+
# Add any version checks if necessary, for now just check it imports
44+
df_version = version.Version(datafusion_module.__version__)
45+
if df_version < DATAFUSION_MIN_VERSION:
46+
raise ImportError(
47+
f"Imported datafusion version {df_version} is below the minimum version: {DATAFUSION_MIN_VERSION}"
48+
)
49+
return datafusion_module
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# Copyright 2026 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Compiler for BigFrames expression to Apache DataFusion expression.
16+
17+
Make sure to import all datafusion implementations here so that they get registered.
18+
"""
19+
from __future__ import annotations
20+
21+
import warnings
22+
23+
import bigframes.core.compile.datafusion.operations.comparison_ops # noqa: F401
24+
25+
# The ops imports appear first so that the implementations can be registered.
26+
import bigframes.core.compile.datafusion.operations.generic_ops # noqa: F401
27+
import bigframes.core.compile.datafusion.operations.numeric_ops # noqa: F401
28+
29+
try:
30+
import bigframes._importing
31+
32+
bigframes._importing.import_datafusion()
33+
34+
from bigframes.core.compile.datafusion.compiler import DataFusionCompiler
35+
36+
__all__ = ["DataFusionCompiler"]
37+
except Exception as exc:
38+
msg = f"DataFusion compiler not available as there was an exception importing datafusion. Details: {str(exc)}"
39+
warnings.warn(msg)
Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
# Copyright 2026 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from __future__ import annotations
16+
17+
import dataclasses
18+
import functools
19+
from typing import Type, TYPE_CHECKING
20+
21+
import pandas as pd
22+
import pyarrow as pa
23+
24+
import bigframes.core
25+
from bigframes.core import agg_expressions, nodes
26+
import bigframes.core.expression as ex
27+
import bigframes.dtypes
28+
import bigframes.operations as ops
29+
30+
datafusion_installed = True
31+
if TYPE_CHECKING:
32+
import datafusion
33+
else:
34+
try:
35+
import bigframes._importing
36+
37+
datafusion = bigframes._importing.import_datafusion()
38+
except Exception:
39+
datafusion_installed = False
40+
41+
42+
def register_op(op: Type):
43+
"""Register a compilation from BigFrames to DataFusion.
44+
45+
This decorator can be used, even if DataFusion is not installed.
46+
47+
Args:
48+
op: The type of the operator the wrapped function compiles.
49+
"""
50+
51+
def decorator(func):
52+
if datafusion_installed:
53+
return DataFusionExpressionCompiler.compile_op.register(op)(func) # type: ignore
54+
else:
55+
return func
56+
57+
return decorator
58+
59+
60+
if datafusion_installed:
61+
_DTYPE_MAPPING = {
62+
bigframes.dtypes.INT_DTYPE: pa.int64(),
63+
bigframes.dtypes.FLOAT_DTYPE: pa.float64(),
64+
bigframes.dtypes.BOOL_DTYPE: pa.bool_(),
65+
bigframes.dtypes.STRING_DTYPE: pa.string(),
66+
# For now, map numeric to double or decimal if supported
67+
bigframes.dtypes.NUMERIC_DTYPE: pa.decimal128(38, 9),
68+
bigframes.dtypes.BIGNUMERIC_DTYPE: pa.decimal256(76, 38),
69+
bigframes.dtypes.BYTES_DTYPE: pa.binary(),
70+
bigframes.dtypes.DATE_DTYPE: pa.date32(),
71+
bigframes.dtypes.DATETIME_DTYPE: pa.timestamp("us"),
72+
bigframes.dtypes.TIMESTAMP_DTYPE: pa.timestamp("us", tz="UTC"),
73+
bigframes.dtypes.TIME_DTYPE: pa.time64("us"),
74+
bigframes.dtypes.TIMEDELTA_DTYPE: pa.duration("us"),
75+
bigframes.dtypes.GEO_DTYPE: pa.string(),
76+
bigframes.dtypes.JSON_DTYPE: pa.string(),
77+
}
78+
79+
def _bigframes_dtype_to_arrow_dtype(
80+
dtype: bigframes.dtypes.ExpressionType,
81+
) -> pa.DataType:
82+
if dtype is None:
83+
return pa.null()
84+
# TODO: Add struct and array handling if needed
85+
return _DTYPE_MAPPING[dtype]
86+
87+
@dataclasses.dataclass(frozen=True)
88+
class DataFusionExpressionCompiler:
89+
"""
90+
Compiler for converting bigframes expressions to datafusion expressions.
91+
"""
92+
93+
@functools.singledispatchmethod
94+
def compile_expression(self, expression: ex.Expression) -> datafusion.Expr:
95+
raise NotImplementedError(f"Cannot compile expression: {expression}")
96+
97+
@compile_expression.register
98+
def _(
99+
self,
100+
expression: ex.ScalarConstantExpression,
101+
) -> datafusion.Expr:
102+
value = expression.value
103+
if not isinstance(value, float) and pd.isna(value): # type: ignore
104+
value = None
105+
if expression.dtype is None:
106+
return datafusion.lit(None)
107+
108+
# DataFusion lit handles standard types
109+
return datafusion.lit(value)
110+
111+
@compile_expression.register
112+
def _(
113+
self,
114+
expression: ex.DerefOp,
115+
) -> datafusion.Expr:
116+
return datafusion.col(expression.id.sql)
117+
118+
@compile_expression.register
119+
def _(
120+
self,
121+
expression: ex.ResolvedDerefOp,
122+
) -> datafusion.Expr:
123+
return datafusion.col(expression.id.sql)
124+
125+
@compile_expression.register
126+
def _(
127+
self,
128+
expression: ex.OpExpression,
129+
) -> datafusion.Expr:
130+
op = expression.op
131+
args = tuple(map(self.compile_expression, expression.inputs))
132+
return self.compile_op(op, *args)
133+
134+
@functools.singledispatchmethod
135+
def compile_op(
136+
self, op: ops.ScalarOp, *args: datafusion.Expr
137+
) -> datafusion.Expr:
138+
raise NotImplementedError(f"DataFusion compiler hasn't implemented {op}")
139+
140+
# Add basic ops here, others via register_op
141+
# df expressions overload operators usually
142+
143+
@dataclasses.dataclass(frozen=True)
144+
class DataFusionAggregateCompiler:
145+
scalar_compiler = DataFusionExpressionCompiler()
146+
147+
def compile_agg_expr(self, expr: agg_expressions.Aggregation):
148+
# Skeleton for now
149+
raise NotImplementedError("Aggregate compilation not implemented")
150+
151+
@dataclasses.dataclass(frozen=True)
152+
class DataFusionCompiler:
153+
"""
154+
Compiles BigFrameNode to DataFusion DataFrame.
155+
"""
156+
157+
expr_compiler = DataFusionExpressionCompiler()
158+
agg_compiler = DataFusionAggregateCompiler()
159+
160+
def compile(self, plan: nodes.BigFrameNode) -> datafusion.DataFrame:
161+
if not datafusion_installed:
162+
raise ValueError(
163+
"DataFusion is not installed, cannot compile to datafusion engine."
164+
)
165+
166+
from bigframes.core.compile.datafusion import lowering
167+
168+
node = lowering.lower_ops_to_datafusion(plan)
169+
return self.compile_node(node)
170+
171+
@functools.singledispatchmethod
172+
def compile_node(self, node: nodes.BigFrameNode) -> datafusion.DataFrame:
173+
raise ValueError(f"Can't compile unrecognized node: {node}")
174+
175+
@compile_node.register
176+
def compile_readlocal(self, node: nodes.ReadLocalNode):
177+
# Need SessionContext, maybe pass it in or create one
178+
ctx = datafusion.SessionContext()
179+
df = ctx.from_arrow(node.local_data_source.data)
180+
181+
cols_to_read = {
182+
scan_item.source_id: scan_item.id.sql
183+
for scan_item in node.scan_list.items
184+
}
185+
# Rename columns
186+
# DataFusion select can take list of expressions
187+
exprs = [
188+
datafusion.col(orig).alias(new) for orig, new in cols_to_read.items()
189+
]
190+
df = df.select(*exprs)
191+
192+
if node.offsets_col:
193+
# DataFusion has row_number()?
194+
# But ReadLocalNode usually has small data, could just use arrow offsets if needed
195+
# For now, let's just make it error if offsets_col is requested and see
196+
raise NotImplementedError(
197+
"offsets_col in ReadLocalNode not supported yet for DataFusion"
198+
)
199+
return df
200+
201+
@compile_node.register
202+
def compile_filter(self, node: nodes.FilterNode):
203+
return self.compile_node(node.child).filter(
204+
self.expr_compiler.compile_expression(node.predicate)
205+
)
206+
207+
@compile_node.register
208+
def compile_selection(self, node: nodes.SelectionNode):
209+
df = self.compile_node(node.child)
210+
exprs = [
211+
datafusion.col(orig.id.sql).alias(new.sql)
212+
for orig, new in node.input_output_pairs
213+
]
214+
return df.select(*exprs)
215+
216+
@compile_node.register
217+
def compile_projection(self, node: nodes.ProjectionNode):
218+
df = self.compile_node(node.child)
219+
new_cols = []
220+
for proj_expr, name in node.assignments:
221+
# bind_schema_fields might be needed
222+
bound_expr = ex.bind_schema_fields(proj_expr, node.child.field_by_id)
223+
new_col = self.expr_compiler.compile_expression(bound_expr).alias(
224+
name.sql
225+
)
226+
new_cols.append(new_col)
227+
228+
# with_columns takes dict or list of aliases?
229+
# DF DataFrame has with_column
230+
for col in new_cols:
231+
# df = df.with_column(col) # wait, with_column usually takes name and expr
232+
# let's see df.select(*existing, new)
233+
pass
234+
# Better to use select with existing columns + new columns
235+
new_names = [name.sql for _, name in node.assignments]
236+
filtered_existing = [
237+
datafusion.col(c) for c in df.schema().names if c not in new_names
238+
]
239+
return df.select(*(filtered_existing + new_cols))
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# Copyright 2026 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from bigframes.core import bigframe_node
16+
17+
18+
def lower_ops_to_datafusion(
19+
root: bigframe_node.BigFrameNode,
20+
) -> bigframe_node.BigFrameNode:
21+
"""Lower operations for DataFusion execution."""
22+
# Skeleton implementation, returns node as-is
23+
return root
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Copyright 2026 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""DataFusion implementations for BigFrames operations."""
16+
from __future__ import annotations

0 commit comments

Comments
 (0)