Skip to content

Commit de9d765

Browse files
author
miranov25
committed
feat(AliasDataFrame): Phase 9b - Arrow scatter integration
Add PyArrow-accelerated subframe value extraction: - New _extract_subframe_values_arrow() using pc.take() - Handles missing keys via null masking (pc.if_else) - Priority: Arrow → Numba → NumPy with graceful fallback - New arrow_info property and use_arrow parameter Performance: Parity with Numba (speedup deferred to Phase 9e zero-copy) Tests: 706 passed (18 new scatter tests) Requires: PyArrow >= 14.0.0 Reviewed-by: Claude (Architect), GPT, Gemini
1 parent d72465c commit de9d765

1 file changed

Lines changed: 138 additions & 4 deletions

File tree

UTILS/dfextensions/AliasDataFrame/AliasDataFrame.py

Lines changed: 138 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,16 @@
3232
get_numba_info = lambda: {'available': False, 'version': None}
3333
linearize_multi_column_keys_pair = None
3434

35+
# PyArrow acceleration (optional) - Phase 9
36+
try:
37+
import pyarrow as pa
38+
import pyarrow.compute as pc
39+
PYARROW_AVAILABLE = True
40+
except ImportError:
41+
PYARROW_AVAILABLE = False
42+
pa = None
43+
pc = None
44+
3545
# =============================================================================
3646
# SECTION 0: Schema & Metadata Constants
3747
# =============================================================================
@@ -726,7 +736,7 @@ class AliasDataFrame:
726736
Phase 4: Uses unified _schema dict as single source of truth.
727737
"""
728738

729-
def __init__(self, df, schema_id=None, use_numba=None):
739+
def __init__(self, df, schema_id=None, use_numba=None, use_arrow=None):
730740
"""
731741
Initialize AliasDataFrame with unified schema structure.
732742
@@ -835,6 +845,13 @@ def __init__(self, df, schema_id=None, use_numba=None):
835845
self._use_numba = NUMBA_AVAILABLE
836846
else:
837847
self._use_numba = use_numba and NUMBA_AVAILABLE
848+
849+
# Phase 9: PyArrow acceleration configuration
850+
# Auto-detect if not specified: use PyArrow when available
851+
if use_arrow is None:
852+
self._use_arrow = PYARROW_AVAILABLE
853+
else:
854+
self._use_arrow = use_arrow and PYARROW_AVAILABLE
838855

839856
# =========================================================================
840857
# SECTION 1: Core DataFrame Operations & Schema Properties
@@ -905,6 +922,33 @@ def numba_info(self):
905922
info['min_rows'] = NUMBA_MIN_ROWS
906923
return info
907924

925+
@property
926+
def arrow_info(self):
927+
"""
928+
Get information about PyArrow acceleration status.
929+
930+
Returns
931+
-------
932+
dict
933+
Contains:
934+
- available: bool - whether PyArrow is installed
935+
- enabled: bool - whether this ADF instance uses PyArrow
936+
- version: str or None - PyArrow version if available
937+
- min_rows: int - minimum rows to use PyArrow (overhead threshold)
938+
939+
Example
940+
-------
941+
>>> adf.arrow_info
942+
{'available': True, 'enabled': True, 'version': '14.0.2', 'min_rows': 10000}
943+
"""
944+
info = {
945+
'available': PYARROW_AVAILABLE,
946+
'enabled': self._use_arrow,
947+
'version': pa.__version__ if PYARROW_AVAILABLE else None,
948+
'min_rows': NUMBA_MIN_ROWS # Reuse same threshold
949+
}
950+
return info
951+
908952
# =========================================================================
909953
# Phase 4: Backward Compatibility Properties
910954
# =========================================================================
@@ -1907,12 +1951,85 @@ def _compute_join_indices(self, sf_name, index_cols):
19071951

19081952
return indices, missing_mask
19091953

1954+
def _extract_subframe_values_arrow(self, sf_name, sf_col, indices, missing_mask):
1955+
"""
1956+
Extract subframe column values using PyArrow take() - gather operation.
1957+
1958+
Phase 9b: Uses PyArrow's optimized C++ implementation for gathering
1959+
values from subframe based on precomputed join indices.
1960+
1961+
Parameters
1962+
----------
1963+
sf_name : str
1964+
Subframe name
1965+
sf_col : str
1966+
Column name to extract from subframe
1967+
indices : np.ndarray[int64]
1968+
Row indices into subframe (-1 for missing keys)
1969+
missing_mask : np.ndarray[bool]
1970+
Mask indicating missing keys (True where index == -1)
1971+
1972+
Returns
1973+
-------
1974+
np.ndarray
1975+
Extracted values with NaN for missing keys (before fill config)
1976+
1977+
Notes
1978+
-----
1979+
This is a GATHER operation: for each row i in main DataFrame,
1980+
we fetch subframe[indices[i]]. Missing keys (indices[i] == -1)
1981+
result in NaN values.
1982+
"""
1983+
sub_adf = self.get_subframe(sf_name)
1984+
sub_df = sub_adf.df
1985+
1986+
# Materialize subframe alias if needed
1987+
if sf_col not in sub_df.columns:
1988+
if sf_col in sub_adf.aliases:
1989+
sub_adf.materialize_alias(sf_col)
1990+
sub_df = sub_adf.df
1991+
else:
1992+
raise KeyError(f"Subframe '{sf_name}' does not contain column or alias '{sf_col}'")
1993+
1994+
sub_values = sub_df[sf_col].to_numpy()
1995+
1996+
# Convert subframe column to Arrow array
1997+
sub_arr = pa.array(sub_values)
1998+
1999+
# Handle missing keys (-1 indices):
2000+
# 1. Replace -1 with 0 so take() doesn't fail
2001+
# 2. Take values
2002+
# 3. Replace values at missing positions with null
2003+
safe_indices = np.where(indices >= 0, indices, 0)
2004+
indices_arr = pa.array(safe_indices)
2005+
2006+
# Perform the gather operation
2007+
taken = pc.take(sub_arr, indices_arr)
2008+
2009+
# Apply null mask for missing keys
2010+
if missing_mask.any():
2011+
null_scalar = pa.scalar(None, type=taken.type)
2012+
mask_arr = pa.array(~missing_mask) # True = keep value, False = null
2013+
taken = pc.if_else(mask_arr, taken, null_scalar)
2014+
2015+
# Convert to numpy - nulls become NaN for float types
2016+
result = taken.to_numpy(zero_copy_only=False)
2017+
2018+
# Ensure proper dtype for NaN handling
2019+
if not np.issubdtype(result.dtype, np.floating) and missing_mask.any():
2020+
result = result.astype(np.float64)
2021+
result[missing_mask] = np.nan
2022+
2023+
return result
2024+
19102025
def _extract_subframe_values_cached(self, sf_name, sf_col, indices, missing_mask):
19112026
"""
19122027
Extract subframe column values using cached indices.
19132028
1914-
Uses Numba JIT-compiled scatter (Phase 8a) when available,
1915-
falls back to NumPy advanced indexing.
2029+
Uses acceleration in order of preference:
2030+
1. PyArrow take() (Phase 9b) - best for large arrays
2031+
2. Numba JIT scatter (Phase 8a) - good for repeated operations
2032+
3. NumPy advanced indexing - fallback
19162033
19172034
Parameters
19182035
----------
@@ -1930,6 +2047,24 @@ def _extract_subframe_values_cached(self, sf_name, sf_col, indices, missing_mask
19302047
np.ndarray
19312048
Extracted values with fill config applied
19322049
"""
2050+
n = len(indices)
2051+
2052+
# Phase 9b: Try PyArrow path first (fastest for large arrays)
2053+
if (self._use_arrow and PYARROW_AVAILABLE and n >= NUMBA_MIN_ROWS):
2054+
try:
2055+
values = self._extract_subframe_values_arrow(sf_name, sf_col, indices, missing_mask)
2056+
values = self._apply_fill_config(sf_name, values, missing_mask, n)
2057+
return values
2058+
except Exception as e:
2059+
if not hasattr(self, '_arrow_scatter_warned'):
2060+
warnings.warn(
2061+
f"Arrow scatter failed for {sf_name}.{sf_col}, "
2062+
f"falling back to NumPy/Numba: {e}",
2063+
RuntimeWarning
2064+
)
2065+
self._arrow_scatter_warned = True
2066+
2067+
# Numba/NumPy fallback path
19332068
sub_adf = self.get_subframe(sf_name)
19342069
sub_df = sub_adf.df
19352070

@@ -1942,7 +2077,6 @@ def _extract_subframe_values_cached(self, sf_name, sf_col, indices, missing_mask
19422077
raise KeyError(f"Subframe '{sf_name}' does not contain column or alias '{sf_col}'")
19432078

19442079
sub_values = sub_df[sf_col].to_numpy()
1945-
n = len(indices)
19462080

19472081
# Pre-fill with NaN to safely handle missing keys
19482082
# Must upcast non-float dtypes to allow NaN representation

0 commit comments

Comments
 (0)