-
Notifications
You must be signed in to change notification settings - Fork 119
Expand file tree
/
Copy pathDataFrameUtils.py
More file actions
329 lines (244 loc) · 9.05 KB
/
DataFrameUtils.py
File metadata and controls
329 lines (244 loc) · 9.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
from __future__ import annotations
from typing import (
Protocol,
runtime_checkable,
Iterable,
List,
Any,
TYPE_CHECKING,
)
import numpy as np
from TM1py.Utils.Utils import (
CaseAndSpaceInsensitiveTuplesDict
)
# Only import pandas/polars for type hints (not at runtime)
if TYPE_CHECKING:
import pandas as pd
import polars as pl
def _require_pandas():
try:
import pandas as pd
return pd
except ImportError:
raise ImportError("Pandas is required but not installed.")
def _require_polars():
try:
import polars as pl
return pl
except ImportError:
raise ImportError("Polars is required but not installed.")
@runtime_checkable
class DataFrameLike(Protocol):
"""
A backend-agnostic interface for DataFrame-like objects
(pandas, polars, or others).
"""
@property
def columns(self) -> Iterable[str]:
...
@columns.setter
def columns(self, new_columns: Iterable[str]) -> None:
...
def iter_rows(self, columns: Iterable[str] = None) -> Iterable[tuple]:
...
def reset_index(self) -> DataFrameLike:
...
def is_numeric_column(self, col: str) -> bool:
...
def filter_rows(self, mask: Iterable[bool]) -> DataFrameLike:
...
def concat(self, others: List[DataFrameLike]) -> DataFrameLike:
...
def get_column_values(self, col: str) -> Iterable[Any]:
...
def copy(self) -> "DataFrameLike":
...
def aggregate_duplicate_intersections(
self,
dimension_headers: Iterable[str],
value_header: str
) -> DataFrameLike:
...
def __getitem__(self, key) -> DataFrameLike:
...
def __setitem__(self, key, value) -> DataFrameLike:
...
class PandasFrame:
def __init__(self, df: "pd.DataFrame"):
self._pd = _require_pandas()
self.df = df
@property
def columns(self):
return self.df.columns
@columns.setter
def columns(self, new_columns):
self.df.columns = new_columns
def copy(self) -> "PandasFrame":
return PandasFrame(self.df.copy(deep=True))
def iter_rows(self, columns: Iterable[str] = None):
if columns is None:
return self.df.itertuples(index=False, name=None)
return self.df[list(columns)].itertuples(index=False, name=None)
def reset_index(self) -> "PandasFrame":
if isinstance(self.df.index, self._pd.MultiIndex):
return PandasFrame(self.df.reset_index())
return self
def is_numeric_column(self, col: str) -> bool:
return self._pd.api.types.is_numeric_dtype(self.df[col])
def filter_rows(self, mask):
return PandasFrame(self.df[mask])
def concat(self, others):
dfs = [self.df] + [o.df for o in others]
return PandasFrame(self._pd.concat(dfs, ignore_index=True))
def get_column_values(self, col: str):
return self.df[col].values
def aggregate_duplicate_intersections(
self,
dimension_headers: Iterable[str],
value_header: str
) -> "PandasFrame":
df = self.df
for col in dimension_headers:
df[col] = df[col].astype(str).str.lower().str.replace(" ", "")
if self.is_numeric_column(value_header):
grouped = (
df.groupby([*dimension_headers])[value_header].sum().reset_index()
)
return PandasFrame(grouped)
filter_mask = df[value_header].apply(np.isreal)
df_n = df[filter_mask]
df_s = df[~filter_mask]
if not df_n.empty:
df_n = (
df_n.groupby([*dimension_headers])[value_header].sum().reset_index()
)
combined = self._pd.concat([df_n, df_s], ignore_index=True)
return PandasFrame(combined)
def __getitem__(self, key):
if isinstance(key, list):
# DataFrame result -> wrap again
return PandasFrame(self.df[key])
if isinstance(key, str):
# Single column -> Series
return self.df[key]
raise TypeError(f"Unsupported key type: {type(key)}")
def __setitem__(self, key, value):
if not isinstance(key, str):
raise TypeError("Column name must be a string")
self.df[key] = value
class PolarsFrame:
def __init__(self, df: "pl.DataFrame"):
self._pl = _require_polars()
self.df = df
@property
def columns(self):
return self.df.columns
@columns.setter
def columns(self, new_columns):
# rename all columns at once
if len(new_columns) != len(self.df.columns):
raise ValueError("Number of new columns must match existing columns")
self.df = self.df.rename(dict(zip(self.df.columns, new_columns)))
def iter_rows(self, columns: Iterable[str] = None):
df_to_iter = self.df.select(list(columns)) if columns else self.df
return df_to_iter.iter_rows()
def reset_index(self) -> "PolarsFrame":
return self
def is_numeric_column(self, col: str) -> bool:
pl = self._pl
return self.df[col].dtype in (
pl.Int8, pl.Int16, pl.Int32, pl.Int64,
pl.UInt8, pl.UInt16, pl.UInt32, pl.UInt64,
pl.Float32, pl.Float64,
)
def filter_rows(self, mask):
mask_series = self._pl.pl.Series("mask", mask)
return PolarsFrame(self.df.filter(mask_series))
def concat(self, others):
dfs = [self.df] + [o.df for o in others]
return PolarsFrame(self._pl.concat(dfs))
def get_column_values(self, col: str):
return self.df[col].to_list()
def aggregate_duplicate_intersections(
self,
dimension_headers: Iterable[str],
value_header: str
) -> "PolarsFrame":
pl = self._pl
df = self.df
for col in dimension_headers:
df = df.with_columns(
pl.col(col).cast(pl.Utf8).str.to_lowercase().str.replace(" ", "").alias(col)
)
is_numeric = self.is_numeric_column(value_header)
if is_numeric:
grouped = ((df.group_by(list(dimension_headers))
.agg(pl.col(value_header).sum().alias(value_header)))
.select([*dimension_headers, value_header])
)
return PolarsFrame(grouped)
casted = df.with_columns(pl.col(value_header).cast(pl.Float64, strict=False).alias("__value_float__"))
numeric_mask = pl.col("__value_float__").is_not_null()
df_n = casted.filter(numeric_mask).select([*dimension_headers, "__value_float__"])
if df_n.height > 0:
df_n = df_n.rename({"__value_float__": value_header})
df_n = ((df_n.group_by(list(dimension_headers))
.agg(pl.col(value_header).sum().alias(value_header)))
.select([*dimension_headers, value_header])
)
df_s = casted.filter(~numeric_mask).select([*dimension_headers, value_header])
if df_n.height > 0 and df_s.height > 0:
df = pl.concat([df_n, df_s], how="vertical")
elif df_n.height > 0:
df = df_n
else:
df = df_s
return PolarsFrame(df)
def __getitem__(self, key):
if isinstance(key, list):
return PolarsFrame(self.df.select(key))
if isinstance(key, str):
return self.df[key]
raise TypeError(f"Unsupported key type: {type(key)}")
def __setitem__(self, key, value):
"""
Assign scalar value (str, int, float, bool) to a column:
data[key] = value
"""
if not isinstance(key, str):
raise TypeError("Column name must be a string")
# Allow string or numeric types
if not isinstance(value, (str, int, float, bool)):
raise TypeError("Only scalar string or numeric values are supported")
self.df = self.df.with_columns(self._pl.lit(value).alias(key))
def convert_to_dataframe_like(df: Any) -> DataFrameLike:
try:
import pandas as pd
if isinstance(df, pd.DataFrame):
return PandasFrame(df)
except ImportError:
pass
try:
import polars as pl
if isinstance(df, pl.DataFrame):
return PolarsFrame(df)
except ImportError:
pass
raise TypeError(f"Unsupported dataframe type: {type(df)}")
def build_cellset_from_dataframe(
df: "DataFrameLike",
sum_numeric_duplicates: bool = True
) -> "CaseAndSpaceInsensitiveTuplesDict":
# Reset index if backend supports it (noop for polars)
df = df.reset_index()
# Identify value and dimension columns
columns = list(df.columns)
value_header = columns[-1]
dimension_headers = columns[:-1]
# Aggregate duplicates if requested
if sum_numeric_duplicates:
df = df.aggregate_duplicate_intersections(dimension_headers, value_header)
keys = df.iter_rows(columns=dimension_headers)
values = df.get_column_values(value_header)
cellset = CaseAndSpaceInsensitiveTuplesDict(dict(zip(keys, values)))
return cellset