This repository was archived by the owner on Apr 1, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 67
Expand file tree
/
Copy pathinterchange.py
More file actions
155 lines (120 loc) · 5.1 KB
/
interchange.py
File metadata and controls
155 lines (120 loc) · 5.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import dataclasses
import functools
from typing import Any, Dict, Iterable, Optional, Sequence, TYPE_CHECKING
from bigframes.core import blocks
import bigframes.enums
if TYPE_CHECKING:
import bigframes.dataframe
@dataclasses.dataclass(frozen=True)
class InterchangeColumn:
_dataframe: InterchangeDataFrame
_pos: int
@functools.cache
def _arrow_column(self):
# Conservatively downloads the whole underlying dataframe
# This is much better if multiple columns end up being used,
# but does incur a lot of overhead otherwise.
return self._dataframe._arrow_dataframe().get_column(self._pos)
def size(self) -> int:
return self._arrow_column().size()
@property
def offset(self) -> int:
return self._arrow_column().offset
@property
def dtype(self):
return self._arrow_column().dtype
@property
def describe_categorical(self):
raise TypeError(f"Column type {self.dtype} is not categorical")
@property
def describe_null(self):
return self._arrow_column().describe_null
@property
def null_count(self):
return self._arrow_column().null_count
@property
def metadata(self) -> Dict[str, Any]:
return self._arrow_column().metadata
def num_chunks(self) -> int:
return self._arrow_column().num_chunks()
def get_chunks(self, n_chunks: Optional[int] = None) -> Iterable:
return self._arrow_column().get_chunks(n_chunks=n_chunks)
def get_buffers(self):
return self._arrow_column().get_buffers()
@dataclasses.dataclass(frozen=True)
class InterchangeDataFrame:
"""
Implements the dataframe interchange format.
Mostly implemented by downloading result to pyarrow, and using pyarrow interchange implementation.
"""
_value: blocks.Block
version: int = 0 # version of the protocol
def __dataframe__(
self, nan_as_null: bool = False, allow_copy: bool = True
) -> InterchangeDataFrame:
return self
@classmethod
def _from_bigframes(cls, df: bigframes.dataframe.DataFrame):
block = df._block.with_column_labels(
[str(label) for label in df._block.column_labels]
)
return cls(block)
# In future, could potentially rely on executor to refetch batches efficiently with caching,
# but safest for now to just request a single execution and save the whole table.
@functools.cache
def _arrow_dataframe(self):
arrow_table, _ = self._value.reset_index(
replacement=bigframes.enums.DefaultIndexKind.NULL
).to_arrow(allow_large_results=False)
return arrow_table.__dataframe__()
@property
def metadata(self):
# Allows round-trip without materialization
return {"bigframes.block": self._value}
def num_columns(self) -> int:
"""
Return the number of columns in the DataFrame.
"""
return len(self._value.value_columns)
def num_rows(self) -> Optional[int]:
return self._value.shape[0]
def num_chunks(self) -> int:
return self._arrow_dataframe().num_chunks()
def column_names(self) -> Iterable[str]:
return [col for col in self._value.column_labels]
def get_column(self, i: int) -> InterchangeColumn:
return InterchangeColumn(self, i)
# For single column getters, we download the whole dataframe still
# This is inefficient in some cases, but more efficient in other
def get_column_by_name(self, name: str) -> InterchangeColumn:
col_id = self._value.resolve_label_exact(name)
assert col_id is not None
pos = self._value.value_columns.index(col_id)
return InterchangeColumn(self, pos)
def get_columns(self) -> Iterable[InterchangeColumn]:
return [InterchangeColumn(self, i) for i in range(self.num_columns())]
def select_columns(self, indices: Sequence[int]) -> InterchangeDataFrame:
col_ids = [self._value.value_columns[i] for i in indices]
new_value = self._value.select_columns(col_ids)
return InterchangeDataFrame(new_value)
def select_columns_by_name(self, names: Sequence[str]) -> InterchangeDataFrame:
col_ids = [self._value.resolve_label_exact(name) for name in names]
assert all(id is not None for id in col_ids)
new_value = self._value.select_columns(col_ids) # type: ignore
return InterchangeDataFrame(new_value)
def get_chunks(self, n_chunks: Optional[int] = None) -> Iterable:
return self._arrow_dataframe().get_chunks(n_chunks)