66from langchain_core .outputs import LLMResult
77
88class PipelineMonitorCallback (BaseCallbackHandler ):
9+ """Callback handler for monitoring pipeline execution and auditing events.
10+
11+ Integrates with OpenTelemetry for metrics and EventLogger for audit trails.
12+ """
13+
914 def __init__ (self , presenter : ConsolePresenter ):
1015 from nl2sql .common .settings import settings
1116 from nl2sql .common .metrics import configure_metrics
1217
13- # One-time setup of metrics based on settings
1418 configure_metrics (
1519 exporter_type = settings .observability_exporter ,
1620 otlp_endpoint = settings .otlp_endpoint
@@ -20,29 +24,31 @@ def __init__(self, presenter: ConsolePresenter):
2024 self .tokens = TokenHandler (self .node_handler .node_metrics )
2125
2226 def on_chain_start (self , serialized : Dict [str , Any ], inputs : Dict [str , Any ], ** kwargs : Any ) -> Any :
27+ """Called when a chain starts."""
2328 node_name = kwargs .get ("metadata" , {}).get ("langgraph_node" )
2429 run_id = str (kwargs .get ("run_id" ))
2530 parent_run_id = kwargs .get ("parent_run_id" )
2631 parent_run_id = str (parent_run_id ) if parent_run_id else None
2732 self .node_handler .on_chain_start (run_id , parent_run_id , node_name , inputs )
2833
2934 def on_chain_end (self , outputs : Dict [str , Any ], ** kwargs : Any ) -> Any :
35+ """Called when a chain ends."""
3036 run_id = str (kwargs .get ("run_id" ))
3137 self .node_handler .on_chain_end (run_id )
3238
3339 def on_chain_error (self , error : BaseException , ** kwargs : Any ) -> Any :
40+ """Called when a chain errors."""
3441 run_id = str (kwargs .get ("run_id" ))
3542 self .node_handler .on_chain_error (run_id , error )
3643
3744 def on_llm_end (self , response : LLMResult , ** kwargs : Any ) -> Any :
45+ """Called when an LLM ends."""
3846 from nl2sql .common .event_logger import event_logger
3947 from nl2sql .common .logger import _trace_id_ctx , _tenant_id_ctx
4048
4149 tags = kwargs .get ("tags" , [])
4250 agent_name = next ((t for t in tags if not t .startswith ("seq:" ) and not t .startswith ("langsmith:" )), "unknown" )
4351
44- # Capture audit event
45- # Assuming single generation per call for simplicity in audit log
4652 text_output = ""
4753 model_name = "unknown"
4854 token_usage = {}
@@ -58,11 +64,10 @@ def on_llm_end(self, response: LLMResult, **kwargs: Any) -> Any:
5864 audit_payload = {
5965 "agent" : agent_name ,
6066 "model" : model_name ,
61- "response_snippet" : text_output [:1000 ], # Trucate for sanity, full content maybe too big?
67+ "response_snippet" : text_output [:1000 ],
6268 "token_usage" : token_usage
6369 }
6470
65- # Pull context directly from vars since we are in the same thread execution context
6671 event_logger .log_event (
6772 event_type = "llm_interaction" ,
6873 payload = audit_payload ,
0 commit comments