11"""GraphTracer implementation that builds GraphSpec during pipeline execution."""
22
33from datetime import datetime , timezone
4+ from typing import Any
45
56from typing_extensions import override
67
@@ -48,6 +49,7 @@ def __init__(
4849 self .error : ErrorSpec | None = None
4950 self .input_specs : list [IOSpec ] = input_specs or []
5051 self .output_specs : list [IOSpec ] = []
52+ self .execution_data : dict [str , Any ] = {}
5153
5254 def to_node_spec (self ) -> NodeSpec :
5355 """Convert to immutable NodeSpec."""
@@ -76,6 +78,7 @@ def to_node_spec(self) -> NodeSpec:
7678 node_io = node_io ,
7779 error = self .error ,
7880 metrics = self .metrics ,
81+ execution_data = self .execution_data ,
7982 )
8083
8184
@@ -113,6 +116,9 @@ def __init__(self) -> None:
113116 # The branch_producer_node_id is snapshotted at registration time, before register_controller_output
114117 # overrides _stuff_producer_map to point branch stuff codes to the controller node
115118 self ._parallel_combine_map : dict [str , tuple [str , list [tuple [str , str ]]]] = {}
119+ # Registries for pipe and concept data (keyed by pipe_ref and concept_ref)
120+ self ._pipe_registry : dict [str , dict [str , Any ]] = {}
121+ self ._concept_registry : dict [str , dict [str , Any ]] = {}
116122
117123 @property
118124 def is_active (self ) -> bool :
@@ -143,6 +149,8 @@ def setup(
143149 self ._batch_item_map = {}
144150 self ._batch_aggregate_map = {}
145151 self ._parallel_combine_map = {}
152+ self ._pipe_registry = {}
153+ self ._concept_registry = {}
146154
147155 return GraphContext (
148156 graph_id = graph_id ,
@@ -181,6 +189,8 @@ def teardown(self) -> GraphSpec | None:
181189 pipeline_ref = self ._pipeline_ref or PipelineRef (),
182190 nodes = nodes ,
183191 edges = self ._edges ,
192+ pipe_registry = dict (self ._pipe_registry ),
193+ concept_registry = dict (self ._concept_registry ),
184194 )
185195
186196 # Reset internal state
@@ -193,6 +203,8 @@ def teardown(self) -> GraphSpec | None:
193203 self ._batch_item_map = {}
194204 self ._batch_aggregate_map = {}
195205 self ._parallel_combine_map = {}
206+ self ._pipe_registry = {}
207+ self ._concept_registry = {}
196208
197209 return graph
198210
@@ -409,6 +421,8 @@ def on_pipe_start(
409421 node_kind : NodeKind ,
410422 started_at : datetime ,
411423 input_specs : list [IOSpec ] | None = None ,
424+ pipe_data : dict [str , Any ] | None = None ,
425+ concept_data : list [dict [str , Any ]] | None = None ,
412426 ) -> tuple [str , GraphContext ]:
413427 """Record the start of a pipe execution."""
414428 if not self ._is_active :
@@ -433,6 +447,18 @@ def on_pipe_start(
433447 )
434448 self ._nodes [node_id ] = node_data
435449
450+ # Accumulate pipe and concept registry data (deduplicated)
451+ if graph_context .data_inclusion .pipe_and_concept_registry :
452+ if pipe_data is not None :
453+ pipe_ref = f"{ pipe_data .get ('domain_code' , '' )} .{ pipe_data .get ('code' , '' )} "
454+ if pipe_ref not in self ._pipe_registry :
455+ self ._pipe_registry [pipe_ref ] = pipe_data
456+ if concept_data is not None :
457+ for concept_item in concept_data :
458+ concept_ref = f"{ concept_item .get ('domain_code' , '' )} .{ concept_item .get ('code' , '' )} "
459+ if concept_ref not in self ._concept_registry :
460+ self ._concept_registry [concept_ref ] = concept_item
461+
436462 # Add containment edge from parent if this is a child pipe
437463 if graph_context .parent_node_id is not None :
438464 self .add_edge (
@@ -449,6 +475,20 @@ def on_pipe_start(
449475
450476 return node_id , child_context
451477
478+ @override
479+ def register_execution_data (
480+ self ,
481+ node_id : str ,
482+ execution_data : dict [str , Any ],
483+ ) -> None :
484+ """Register execution metadata for a node."""
485+ if not self ._is_active :
486+ return
487+ node_data = self ._nodes .get (node_id )
488+ if node_data is None :
489+ return
490+ node_data .execution_data .update (execution_data )
491+
452492 @override
453493 def on_pipe_end_success (
454494 self ,
@@ -457,6 +497,7 @@ def on_pipe_end_success(
457497 output_preview : str | None = None ,
458498 metrics : dict [str , float ] | None = None ,
459499 output_spec : IOSpec | None = None ,
500+ output_concept_data : dict [str , Any ] | None = None ,
460501 ) -> None :
461502 """Record successful completion of a pipe execution."""
462503 if not self ._is_active :
@@ -472,6 +513,12 @@ def on_pipe_end_success(
472513 if metrics :
473514 node_data .metrics = metrics
474515
516+ # Accumulate output concept data (deduplicated)
517+ if output_concept_data is not None :
518+ concept_ref = f"{ output_concept_data .get ('domain_code' , '' )} .{ output_concept_data .get ('code' , '' )} "
519+ if concept_ref not in self ._concept_registry :
520+ self ._concept_registry [concept_ref ] = output_concept_data
521+
475522 # Store output spec and register in producer map for data flow tracking
476523 if output_spec is not None :
477524 # Skip pass-through outputs: if the output digest matches one of the node's
0 commit comments