Skip to content

Commit d5591a0

Browse files
author
miranov25
committed
Phase 5.3: Runtime composite key generation for RDataFrame friend joins
- Add runtime composite key generation for subframes with >2 index columns - Uses TMemFile + SetFile approach (validated by 11 isolation tests) - Main tree: SetFile for __adf_key__ branch only (memory efficient) - Friend tree: Clone to TMemFile + add __adf_key__ branch + BuildIndex - setup_tree_with_friends() now returns 3-tuple (tree, file, composite_key_info) - Add safety checks: friend size limit (1M), int64 overflow detection - 861 tests passing
1 parent fb97c54 commit d5591a0

3 files changed

Lines changed: 1587 additions & 22 deletions

File tree

UTILS/dfextensions/AliasDataFrame/AliasDataFrameRDF.py

Lines changed: 217 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -784,12 +784,15 @@ def _validate_leaf_deps(ordered: List[str], all_aliases: Dict, tree) -> None:
784784
def setup_tree_with_friends(
785785
filename: str,
786786
treename: str,
787-
schema: Dict = None
788-
) -> Tuple[Any, Any]:
787+
schema: Dict = None,
788+
adf = None
789+
) -> Tuple[Any, Any, Dict]:
789790
"""
790791
Load tree with subframes as indexed friends.
791792
Python equivalent of LoadADFTree() from AliasDataFrameTree.C.
792793
794+
Supports runtime composite key generation for subframes with >2 index columns.
795+
793796
Parameters
794797
----------
795798
filename : str
@@ -798,69 +801,244 @@ def setup_tree_with_friends(
798801
Main tree name
799802
schema : dict, optional
800803
Schema with subframe index definitions
804+
adf : AliasDataFrame, optional
805+
If provided, used to get max values for runtime composite key generation
801806
802807
Returns
803808
-------
804809
tuple
805-
(tree, file_handle) - keep file_handle alive!
810+
(tree, file_handle, composite_key_info)
811+
composite_key_info contains metadata about generated composite keys
812+
and '_memfiles' list that must be kept alive for RDF usage
806813
"""
807814
import ROOT
815+
import warnings
808816

809817
f = ROOT.TFile.Open(filename)
810818
if not f or f.IsZombie():
811-
raise IOError(f"Cannot open file: {filename}")
819+
raise OSError(f"Cannot open file: {filename}")
812820

813821
tree = f.Get(treename)
814822
if not tree:
815823
raise ValueError(f"Tree '{treename}' not found in {filename}")
816824

817825
subframes = schema.get('subframes', {}) if schema else {}
826+
composite_key_info = {} # Track runtime-generated composite keys
827+
_memfile_storage = [] # Keep references to prevent garbage collection
818828

819829
for sf_name, sf_info in subframes.items():
820830
sf_tree_name = f"{treename}__subframe__{sf_name}"
821831
sf_tree = f.Get(sf_tree_name)
822832

823833
if not sf_tree:
824-
print(f"Warning: Subframe '{sf_name}' not found")
834+
warnings.warn(f"Subframe tree '{sf_name}' not found in file", UserWarning)
825835
continue
826836

827837
# Get index columns - schema uses 'index' key
828838
index_cols = sf_info.get('index', sf_info.get('index_columns', []))
839+
if isinstance(index_cols, str):
840+
index_cols = [index_cols]
829841

830842
if len(index_cols) == 0:
831-
print(f"Warning: Subframe '{sf_name}' has no index columns")
843+
warnings.warn(f"Subframe '{sf_name}' has no index columns", UserWarning)
832844
continue
833845
elif len(index_cols) == 1:
834846
sf_tree.BuildIndex(index_cols[0])
847+
tree.AddFriend(sf_tree, sf_name)
835848
elif len(index_cols) == 2:
836849
sf_tree.BuildIndex(index_cols[0], index_cols[1])
850+
tree.AddFriend(sf_tree, sf_name)
837851
else:
838-
# Composite index - need __adf_key__ column
839-
key_branch = f"__adf_key_{sf_name}__"
852+
# >2 keys: Need composite key
853+
key_branch = get_composite_key_column_name(sf_name)
854+
840855
if sf_tree.GetBranch(key_branch):
856+
# Composite key branch exists in file - use it
841857
sf_tree.BuildIndex(key_branch)
858+
composite_key_info[sf_name] = {
859+
'method': 'from_file',
860+
'branch': key_branch,
861+
'n_keys': len(index_cols),
862+
}
863+
tree.AddFriend(sf_tree, sf_name)
842864
else:
843-
print(f"Warning: {sf_name} has {len(index_cols)} keys but no composite key branch")
844-
continue
865+
# Runtime composite key generation using Clone + SetFile approach
866+
result = _generate_runtime_composite_key(
867+
tree, sf_tree, sf_name, index_cols, adf
868+
)
869+
870+
if result is None:
871+
# Generation failed - skip this subframe
872+
continue
873+
874+
friend_clone, memfile_main, memfile_friend, runtime_info = result
875+
876+
# Keep memfiles alive!
877+
_memfile_storage.extend([memfile_main, memfile_friend])
878+
879+
# Use the cloned friend tree (with composite key) instead of original
880+
tree.AddFriend(friend_clone, sf_name)
881+
composite_key_info[sf_name] = runtime_info
882+
883+
method_str = composite_key_info.get(sf_name, {}).get('method', 'builtin')
884+
print(f" Added friend: {sf_name} ({sf_tree.GetEntries()} entries) [index: {method_str}]")
885+
886+
# Store memfiles at top level to keep them alive
887+
composite_key_info['_memfiles'] = _memfile_storage
888+
889+
return tree, f, composite_key_info
890+
891+
892+
def _generate_runtime_composite_key(main_tree, friend_tree, sf_name, index_cols, adf=None,
893+
max_friend_clone_entries=1_000_000):
894+
"""
895+
Generate composite key at runtime for >2 index columns.
896+
897+
Strategy (validated by TMemFile tests):
898+
- Main tree: SetFile for __adf_key__ branch only (memory efficient)
899+
- Friend tree: Full clone to TMemFile + add __adf_key__ branch
900+
901+
Parameters
902+
----------
903+
main_tree : ROOT.TTree
904+
Main tree (read-only)
905+
friend_tree : ROOT.TTree
906+
Friend tree (subframe, will be cloned)
907+
sf_name : str
908+
Subframe name
909+
index_cols : list of str
910+
Index column names
911+
adf : AliasDataFrame, optional
912+
If provided, used to get more accurate max values
913+
max_friend_clone_entries : int
914+
Maximum friend tree entries before refusing to clone (memory protection)
915+
916+
Returns
917+
-------
918+
tuple
919+
(friend_clone, memfile_main, memfile_friend, info_dict)
920+
Caller must keep memfiles alive!
845921
846-
tree.AddFriend(sf_tree, sf_name)
847-
print(f" Added friend: {sf_name} ({sf_tree.GetEntries()} entries)")
922+
Raises
923+
------
924+
NotImplementedError
925+
If friend tree too large or key space overflows int64
926+
"""
927+
import ROOT
928+
import numpy as np
929+
import warnings
848930

849-
return tree, f
931+
key_branch_name = get_composite_key_column_name(sf_name)
932+
933+
# Check friend tree size
934+
n_friend_entries = int(friend_tree.GetEntries())
935+
if n_friend_entries > max_friend_clone_entries:
936+
raise NotImplementedError(
937+
f"Friend tree '{sf_name}' too large for runtime clone "
938+
f"({n_friend_entries:,} entries > {max_friend_clone_entries:,}). "
939+
f"Use export_tree(composite_keys='auto') to pre-compute."
940+
)
941+
942+
# Get max values for each key column from both trees
943+
max_values = []
944+
for col in index_cols:
945+
main_max = main_tree.GetMaximum(col)
946+
friend_max = friend_tree.GetMaximum(col)
947+
948+
if main_max == -1e308 or friend_max == -1e308:
949+
warnings.warn(
950+
f"Cannot determine max value for '{col}' in subframe '{sf_name}'. "
951+
f"Skipping subframe.",
952+
UserWarning
953+
)
954+
return None
955+
956+
col_max = int(max(main_max, friend_max)) + 1
957+
max_values.append(col_max)
958+
959+
# Check if dense linearization is safe (int64 overflow)
960+
is_safe, compact_range = check_dense_overflow(max_values)
961+
962+
if not is_safe:
963+
raise NotImplementedError(
964+
f"Runtime composite key for '{sf_name}' exceeds int64 limit "
965+
f"(key space {compact_range:.2e}). "
966+
f"Use export_tree(composite_keys='auto') to pre-compute with sparse mapping."
967+
)
968+
969+
# === MAIN TREE: SetFile for key branch ===
970+
memfile_main = ROOT.TMemFile(f"main_key_{sf_name}", "RECREATE")
971+
972+
main_ckey = np.array([0], dtype=np.int64)
973+
main_key_branch = main_tree.Branch(key_branch_name, main_ckey, f"{key_branch_name}/L")
974+
main_key_branch.SetFile(memfile_main)
975+
976+
n_main_entries = int(main_tree.GetEntries())
977+
for i in range(n_main_entries):
978+
main_tree.GetEntry(i)
979+
# Compute linearized key: k0 + k1*max0 + k2*max0*max1 + ...
980+
key_val = 0
981+
multiplier = 1
982+
for j, col in enumerate(index_cols):
983+
col_val = int(getattr(main_tree, col))
984+
key_val += col_val * multiplier
985+
multiplier *= max_values[j]
986+
main_ckey[0] = key_val
987+
main_key_branch.Fill()
988+
989+
# === FRIEND TREE: Clone to TMemFile ===
990+
memfile_friend = ROOT.TMemFile(f"friend_{sf_name}", "RECREATE")
991+
memfile_friend.cd()
992+
993+
friend_clone = friend_tree.CloneTree(-1, "fast")
994+
friend_clone.SetDirectory(memfile_friend)
995+
996+
# Add composite key branch to clone
997+
friend_ckey = np.array([0], dtype=np.int64)
998+
friend_key_branch = friend_clone.Branch(key_branch_name, friend_ckey, f"{key_branch_name}/L")
999+
1000+
for i in range(n_friend_entries):
1001+
friend_clone.GetEntry(i)
1002+
key_val = 0
1003+
multiplier = 1
1004+
for j, col in enumerate(index_cols):
1005+
col_val = int(getattr(friend_clone, col))
1006+
key_val += col_val * multiplier
1007+
multiplier *= max_values[j]
1008+
friend_ckey[0] = key_val
1009+
friend_key_branch.Fill()
1010+
1011+
# Build index on friend clone
1012+
friend_clone.BuildIndex(key_branch_name)
1013+
1014+
info = {
1015+
'method': 'runtime_dense',
1016+
'branch': key_branch_name,
1017+
'n_keys': len(index_cols),
1018+
'max_values': max_values,
1019+
'compact_range': compact_range,
1020+
'friend_entries_cloned': n_friend_entries,
1021+
}
1022+
1023+
return friend_clone, memfile_main, memfile_friend, info
8501024

8511025

8521026
# =============================================================================
8531027
# Modular RDataFrame API
8541028
# =============================================================================
8551029

856-
def setup_rdf_with_friends(adf, filename, treename="tree"):
1030+
def setup_rdf_with_friends(adf, filename, treename="tree", return_composite_info=False):
8571031
"""
8581032
Create RDataFrame with friend trees from AliasDataFrame schema.
8591033
8601034
This is the primary entry point for interactive Python workflows.
8611035
Returns both the RDataFrame and file handle - the file handle MUST
8621036
be kept alive as long as the RDataFrame is in use.
8631037
1038+
Supports runtime composite key generation for subframes with >2 index columns.
1039+
When a subframe has >2 keys and no pre-computed composite key branch,
1040+
a dense linearization expression is generated and used for BuildIndex.
1041+
8641042
Parameters
8651043
----------
8661044
adf : AliasDataFrame
@@ -869,13 +1047,19 @@ def setup_rdf_with_friends(adf, filename, treename="tree"):
8691047
Path to ROOT file
8701048
treename : str
8711049
Name of main tree (default: "tree")
1050+
return_composite_info : bool
1051+
If True, return (rdf, file_handle, composite_key_info)
1052+
If False (default), return (rdf, file_handle) for backward compatibility
8721053
8731054
Returns
8741055
-------
8751056
rdf : ROOT.RDataFrame
8761057
RDataFrame with friend trees attached
8771058
file_handle : ROOT.TFile
8781059
Open file handle - MUST be kept alive while using rdf
1060+
composite_key_info : dict, optional
1061+
Only returned if return_composite_info=True.
1062+
Contains metadata about runtime-generated composite keys.
8791063
8801064
Examples
8811065
--------
@@ -884,11 +1068,20 @@ def setup_rdf_with_friends(adf, filename, treename="tree"):
8841068
>>> result = rdf.Mean("dyC2").GetValue()
8851069
>>> # f must stay in scope until all actions complete
8861070
1071+
# To get composite key info for debugging:
1072+
>>> rdf, f, info = setup_rdf_with_friends(adf, "data.root", return_composite_info=True)
1073+
>>> print(info) # {'DTrack0': {'method': 'runtime_dense', 'expression': '...'}}
1074+
8871075
Notes
8881076
-----
8891077
The file handle must remain in scope for the lifetime of the RDataFrame
8901078
due to ROOT's lazy evaluation. Letting it go out of scope will cause
8911079
segmentation faults when RDataFrame actions are triggered.
1080+
1081+
For subframes with >2 index columns:
1082+
- If __adf_key_<sf_name>__ branch exists: Uses it
1083+
- If missing and dense is safe: Generates linearization at runtime
1084+
- If sparse required: Raises NotImplementedError with guidance
8921085
"""
8931086
try:
8941087
import ROOT
@@ -898,14 +1091,21 @@ def setup_rdf_with_friends(adf, filename, treename="tree"):
8981091
"Install with: conda install -c conda-forge root"
8991092
)
9001093

901-
# Use existing setup_tree_with_friends
1094+
# Use existing setup_tree_with_friends (now with adf for max value hints)
9021095
schema = adf.schema if hasattr(adf, 'schema') else adf.export_schema()
903-
tree, file_handle = setup_tree_with_friends(filename, treename, schema)
1096+
tree, file_handle, composite_key_info = setup_tree_with_friends(
1097+
filename, treename, schema, adf=adf
1098+
)
9041099

9051100
# Create RDataFrame
9061101
rdf = ROOT.RDataFrame(tree)
9071102

908-
return rdf, file_handle
1103+
if return_composite_info:
1104+
return rdf, file_handle, composite_key_info
1105+
else:
1106+
# Attach memfiles to file handle to keep them alive
1107+
file_handle._adf_memfiles = composite_key_info.get('_memfiles', [])
1108+
return rdf, file_handle
9091109

9101110

9111111
def setup_chain_with_friends(adf, file_patterns, treename="tree"):

0 commit comments

Comments
 (0)