@@ -773,15 +773,19 @@ def resolve_label_col(requested: Optional[str], df, default_base: str) -> Option
773773 on = EDGE_ID ,
774774 how = 'inner'
775775 )
776+ # Use Series instead of set() to avoid GPU->CPU transfers for cudf
776777 if direction == 'forward' :
777- goal_nodes = set ( valid_endpoint_edges_with_nodes [g2 ._destination ].tolist () )
778+ goal_node_series = valid_endpoint_edges_with_nodes [g2 ._destination ].drop_duplicates ( )
778779 elif direction == 'reverse' :
779- goal_nodes = set ( valid_endpoint_edges_with_nodes [g2 ._source ].tolist () )
780+ goal_node_series = valid_endpoint_edges_with_nodes [g2 ._source ].drop_duplicates ( )
780781 else :
781782 # Undirected: either endpoint could be a goal
782- goal_nodes = set (valid_endpoint_edges_with_nodes [g2 ._source ].tolist ()) | set (valid_endpoint_edges_with_nodes [g2 ._destination ].tolist ())
783+ goal_node_series = concat ([
784+ valid_endpoint_edges_with_nodes [g2 ._source ],
785+ valid_endpoint_edges_with_nodes [g2 ._destination ]
786+ ], ignore_index = True , sort = False ).drop_duplicates ()
783787
784- if goal_nodes :
788+ if len ( goal_node_series ) > 0 :
785789 # Backtrack from goal nodes to find all edges/nodes on valid paths
786790 # We need to traverse backwards through the edge records to find which edges lead to goals
787791 edge_records_with_endpoints = safe_merge (
@@ -791,12 +795,13 @@ def resolve_label_col(requested: Optional[str], df, default_base: str) -> Option
791795 how = 'inner'
792796 )
793797
794- # Build sets of valid nodes and edges by backtracking from goal nodes
795- valid_nodes = set (goal_nodes )
796- valid_edges = set ()
798+ # Build Series of valid nodes and edges by backtracking from goal nodes
799+ # Using Series + concat avoids GPU->CPU transfers for cudf
800+ valid_node_series = goal_node_series
801+ valid_edge_list = [] # Collect edge Series to concat at end
797802
798803 # Start with edges that lead TO goal nodes
799- current_targets = goal_nodes
804+ current_targets = goal_node_series
800805
801806 # Backtrack through hops from max edge hop down to 1
802807 # Use actual max edge hop, not max_reached_hop which may include extra traversal steps
@@ -810,27 +815,35 @@ def resolve_label_col(requested: Optional[str], df, default_base: str) -> Option
810815 if direction == 'forward' :
811816 # Forward: edges go src->dst, so dst should be in targets
812817 reaching_edges = hop_edges [hop_edges [g2 ._destination ].isin (current_targets )]
813- new_sources = set ( reaching_edges [g2 ._source ]. tolist ())
818+ new_source_series = reaching_edges [g2 ._source ]
814819 elif direction == 'reverse' :
815820 # Reverse: edges go dst->src conceptually, so src should be in targets
816821 reaching_edges = hop_edges [hop_edges [g2 ._source ].isin (current_targets )]
817- new_sources = set ( reaching_edges [g2 ._destination ]. tolist ())
822+ new_source_series = reaching_edges [g2 ._destination ]
818823 else :
819824 # Undirected: either endpoint could be in targets
820825 reaching_fwd = hop_edges [hop_edges [g2 ._destination ].isin (current_targets )]
821826 reaching_rev = hop_edges [hop_edges [g2 ._source ].isin (current_targets )]
822827 reaching_edges = concat ([reaching_fwd , reaching_rev ], ignore_index = True , sort = False ).drop_duplicates (subset = [EDGE_ID ])
823- new_sources = set (reaching_fwd [g2 ._source ].tolist ()) | set (reaching_rev [g2 ._destination ].tolist ())
828+ new_source_series = concat ([
829+ reaching_fwd [g2 ._source ],
830+ reaching_rev [g2 ._destination ]
831+ ], ignore_index = True , sort = False )
832+
833+ valid_edge_list .append (reaching_edges [EDGE_ID ])
834+ valid_node_series = concat ([valid_node_series , new_source_series ], ignore_index = True , sort = False )
835+ current_targets = new_source_series .drop_duplicates ()
836+
837+ # Deduplicate collected nodes and edges
838+ valid_node_series = valid_node_series .drop_duplicates ()
839+ valid_edge_series = concat (valid_edge_list , ignore_index = True , sort = False ).drop_duplicates () if valid_edge_list else goal_node_series [:0 ]
824840
825- valid_edges .update (reaching_edges [EDGE_ID ].tolist ())
826- valid_nodes .update (new_sources )
827- current_targets = new_sources
828841 # Filter records to only valid paths
829- edge_hop_records = edge_hop_records [edge_hop_records [EDGE_ID ].isin (valid_edges )]
830- node_hop_records = node_hop_records [node_hop_records [g2 ._node ].isin (valid_nodes )]
831- matches_edges = matches_edges [matches_edges [EDGE_ID ].isin (valid_edges )]
842+ edge_hop_records = edge_hop_records [edge_hop_records [EDGE_ID ].isin (valid_edge_series )]
843+ node_hop_records = node_hop_records [node_hop_records [g2 ._node ].isin (valid_node_series )]
844+ matches_edges = matches_edges [matches_edges [EDGE_ID ].isin (valid_edge_series )]
832845 if matches_nodes is not None :
833- matches_nodes = matches_nodes [matches_nodes [g2 ._node ].isin (valid_nodes )]
846+ matches_nodes = matches_nodes [matches_nodes [g2 ._node ].isin (valid_node_series )]
834847
835848 #hydrate edges
836849 if track_edge_hops and edge_hop_col is not None :
0 commit comments