2222from fast_agent .batch .template import DEFAULT_ROW_TEMPLATE , render_row_template
2323from fast_agent .batch .traces import BatchTraceOptions , BatchTraceRecorder
2424from fast_agent .cli .runtime .request_builders import resolve_default_instruction
25- from fast_agent .constants import FAST_AGENT_TIMING
26- from fast_agent .llm .request_params import RequestParams
25+ from fast_agent .constants import FAST_AGENT_TIMING , FAST_AGENT_USAGE
26+ from fast_agent .llm .request_params import BatchRequestContext , RequestParams
2727from fast_agent .llm .structured_schema import (
2828 StructuredSchemaSource ,
2929 load_json_schema_file ,
@@ -120,21 +120,38 @@ def _identity_for_candidate(candidate: RowCandidate, id_field: str | None) -> tu
120120 return str (row [id_field ]), None
121121
122122
123- def _extract_timing (response : Any ) -> dict [str , Any ] | None :
123+ def _extract_json_channel (response : Any , channel_name : str ) -> dict [str , Any ] | None :
124124 channels = response .channels
125125 if not isinstance (channels , Mapping ):
126126 return None
127- timing_blocks = channels .get (FAST_AGENT_TIMING )
128- if not timing_blocks :
127+ blocks = channels .get (channel_name )
128+ if not blocks :
129129 return None
130- timing_text = get_text (timing_blocks [0 ])
131- if not timing_text :
130+ text = get_text (blocks [0 ])
131+ if not text :
132132 return None
133133 try :
134- timing = json .loads (timing_text )
134+ payload = json .loads (text )
135135 except json .JSONDecodeError :
136136 return None
137- return timing if isinstance (timing , dict ) else None
137+ return payload if isinstance (payload , dict ) else None
138+
139+
140+ def _extract_timing (response : Any ) -> dict [str , Any ] | None :
141+ return _extract_json_channel (response , FAST_AGENT_TIMING )
142+
143+
144+ def _extract_usage (response : Any ) -> dict [str , Any ] | None :
145+ usage = _extract_json_channel (response , FAST_AGENT_USAGE )
146+ if usage is None :
147+ return None
148+ if "turn" not in usage and "raw_usage" not in usage :
149+ return usage
150+ return {
151+ key : value
152+ for key in ("turn" , "raw_usage" )
153+ if (value := usage .get (key )) is not None
154+ }
138155
139156
140157def _write_optional_failure (
@@ -152,6 +169,7 @@ def _write_optional_telemetry(
152169 row_number : int ,
153170 ok : bool ,
154171 timing : dict [str , Any ] | None ,
172+ usage : dict [str , Any ] | None = None ,
155173) -> None :
156174 if telemetry_handle is None :
157175 return
@@ -162,7 +180,7 @@ def _write_optional_telemetry(
162180 "row_number" : row_number ,
163181 "ok" : ok ,
164182 "timing" : timing or {},
165- "usage" : {},
183+ "usage" : usage or {},
166184 },
167185 )
168186
@@ -350,8 +368,13 @@ async def run_structured_batch(options: StructuredBatchOptions) -> dict[str, Any
350368 worker ,
351369 rendered = rendered ,
352370 schema_source = schema_source ,
371+ batch_context = BatchRequestContext (
372+ row_number = candidate .row_number ,
373+ identity = identity ,
374+ ),
353375 )
354376 timing = _extract_timing (response )
377+ usage = _extract_usage (response )
355378 summary .add_timing (timing )
356379 if parsed is None :
357380 record = error_envelope (
@@ -372,6 +395,7 @@ async def run_structured_batch(options: StructuredBatchOptions) -> dict[str, Any
372395 row_number = candidate .row_number ,
373396 ok = False ,
374397 timing = timing ,
398+ usage = usage ,
375399 )
376400 summary .processed_rows += 1
377401 summary .failed_rows += 1
@@ -399,6 +423,7 @@ async def run_structured_batch(options: StructuredBatchOptions) -> dict[str, Any
399423 row_number = candidate .row_number ,
400424 ok = True ,
401425 timing = timing ,
426+ usage = usage ,
402427 )
403428 summary .processed_rows += 1
404429 if trace_recorder is not None :
@@ -516,8 +541,9 @@ async def _row_call(
516541 * ,
517542 rendered : str ,
518543 schema_source : SchemaSource | None ,
544+ batch_context : BatchRequestContext ,
519545) -> tuple [Any | None , Any ]:
520- request_params = RequestParams (use_history = False )
546+ request_params = RequestParams (use_history = False , batch_context = batch_context )
521547 if schema_source is None :
522548 response = await worker .generate (rendered , request_params )
523549 return response .last_text () or "" , response
0 commit comments