Skip to content

Commit febc3da

Browse files
feat: Add LLM token usage tracking to Application Insights
Implement comprehensive token usage tracking across all LLM call sites in ContentProcessor and ContentProcessorWorkflow, following the MACAE psl-token-usage branch pattern. Changes: - Add token_usage_utils.py with extract/emit helpers for both projects - Instrument MapHandler (ContentProcessor) with per-call token events - Instrument RAI, Summarize, GapAnalysis executors (Workflow) - Add summary and per-model token events in save_handler - Add 18 KQL queries for Azure Workbook visualization - Add unit tests (18 tests) for token_usage_utils Events emitted to Application Insights: - LLM_Agent_Token_Usage: per agent/step with model, process_id - LLM_Model_Token_Usage: per model deployment aggregation - LLM_Token_Usage_Summary: per document totals Ref: User Story #43251 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 1fbb362 commit febc3da

9 files changed

Lines changed: 1044 additions & 0 deletions

File tree

Lines changed: 283 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,283 @@
1+
// ============================================================
2+
// KQL Queries for LLM Token Usage Monitoring
3+
// Content Processing Solution Accelerator
4+
// Run these in Application Insights > Logs
5+
// ============================================================
6+
7+
// 1. Overall token usage summary (last 7 days)
8+
customEvents
9+
| where name == 'LLM_Token_Usage_Summary'
10+
| where timestamp > ago(7d)
11+
| extend input_tokens = toint(customDimensions['total_input_tokens'])
12+
| extend output_tokens = toint(customDimensions['total_output_tokens'])
13+
| extend total_tokens = toint(customDimensions['total_tokens'])
14+
| summarize
15+
TotalDocuments = count(),
16+
TotalInputTokens = sum(input_tokens),
17+
TotalOutputTokens = sum(output_tokens),
18+
TotalTokens = sum(total_tokens),
19+
AvgTokensPerDocument = round(avg(total_tokens), 0)
20+
21+
// 2. Token usage by pipeline step (agent)
22+
customEvents
23+
| where name == 'LLM_Agent_Token_Usage'
24+
| where timestamp > ago(7d)
25+
| extend agent = tostring(customDimensions['agent_name'])
26+
| extend input_tokens = toint(customDimensions['input_tokens'])
27+
| extend output_tokens = toint(customDimensions['output_tokens'])
28+
| extend total_tokens = toint(customDimensions['total_tokens'])
29+
| summarize
30+
InputTokens = sum(input_tokens),
31+
OutputTokens = sum(output_tokens),
32+
TotalTokens = sum(total_tokens),
33+
Invocations = count()
34+
by Step = agent
35+
| order by TotalTokens desc
36+
37+
// 3. Token usage over time (hourly)
38+
customEvents
39+
| where name == 'LLM_Token_Usage_Summary'
40+
| where timestamp > ago(7d)
41+
| extend input_tokens = toint(customDimensions['total_input_tokens'])
42+
| extend output_tokens = toint(customDimensions['total_output_tokens'])
43+
| summarize InputTokens = sum(input_tokens), OutputTokens = sum(output_tokens) by bin(timestamp, 1h)
44+
| order by timestamp asc
45+
| render areachart
46+
47+
// 4. Estimated cost (GPT-4o pricing: $2.50/1M input, $10.00/1M output)
48+
let input_price_per_million = 2.50;
49+
let output_price_per_million = 10.00;
50+
customEvents
51+
| where name == 'LLM_Token_Usage_Summary'
52+
| where timestamp > ago(30d)
53+
| extend input_tokens = toint(customDimensions['total_input_tokens'])
54+
| extend output_tokens = toint(customDimensions['total_output_tokens'])
55+
| summarize TotalInput = sum(input_tokens), TotalOutput = sum(output_tokens) by bin(timestamp, 1d)
56+
| extend InputCost = round(TotalInput * input_price_per_million / 1000000.0, 4)
57+
| extend OutputCost = round(TotalOutput * output_price_per_million / 1000000.0, 4)
58+
| extend TotalCost = InputCost + OutputCost
59+
| project Day = timestamp, TotalInput, TotalOutput, InputCost, OutputCost, TotalCost
60+
| order by Day desc
61+
62+
// 5. Top token consumers by document
63+
customEvents
64+
| where name == 'LLM_Token_Usage_Summary'
65+
| where timestamp > ago(7d)
66+
| extend total_tokens = toint(customDimensions['total_tokens'])
67+
| extend process_id = tostring(customDimensions['process_id'])
68+
| extend file_name = tostring(customDimensions['file_name'])
69+
| summarize TotalTokens = sum(total_tokens) by process_id, file_name
70+
| order by TotalTokens desc
71+
| take 20
72+
73+
// 6. Pipeline step token distribution (pie chart)
74+
customEvents
75+
| where name == 'LLM_Agent_Token_Usage'
76+
| where timestamp > ago(7d)
77+
| extend agent = tostring(customDimensions['agent_name'])
78+
| extend total_tokens = toint(customDimensions['total_tokens'])
79+
| summarize TotalTokens = sum(total_tokens) by agent
80+
| render piechart
81+
82+
// 7. Token usage percentiles per document
83+
customEvents
84+
| where name == 'LLM_Token_Usage_Summary'
85+
| where timestamp > ago(7d)
86+
| extend total_tokens = toint(customDimensions['total_tokens'])
87+
| summarize
88+
p50 = percentile(total_tokens, 50),
89+
p90 = percentile(total_tokens, 90),
90+
p95 = percentile(total_tokens, 95),
91+
p99 = percentile(total_tokens, 99),
92+
Max = max(total_tokens)
93+
94+
// 8. Token usage by step grouping (Extraction vs Analysis vs Safety)
95+
let StepGroupMapping = datatable(agent:string, StepGroup:string) [
96+
"MapHandler", "Extraction",
97+
"RAI", "Safety",
98+
"Summarize", "Analysis",
99+
"GapAnalysis", "Analysis"
100+
];
101+
customEvents
102+
| where name == 'LLM_Agent_Token_Usage'
103+
| where timestamp > ago(7d)
104+
| extend agent = tostring(customDimensions['agent_name'])
105+
| extend input_tokens = toint(customDimensions['input_tokens'])
106+
| extend output_tokens = toint(customDimensions['output_tokens'])
107+
| extend total_tokens = toint(customDimensions['total_tokens'])
108+
| lookup kind=leftouter StepGroupMapping on agent
109+
| extend StepGroup = iff(isempty(StepGroup), "Unknown", StepGroup)
110+
| summarize
111+
TotalRequests = count(),
112+
TotalInputTokens = sum(input_tokens),
113+
TotalOutputTokens = sum(output_tokens),
114+
TotalTokens = sum(total_tokens),
115+
AvgTokensPerRequest = round(avg(total_tokens), 0)
116+
by StepGroup
117+
| order by TotalTokens desc
118+
119+
// 9. Token usage by model deployment
120+
customEvents
121+
| where name == 'LLM_Model_Token_Usage'
122+
| where timestamp > ago(7d)
123+
| extend model = tostring(customDimensions['model_deployment_name'])
124+
| extend input_tokens = toint(customDimensions['input_tokens'])
125+
| extend output_tokens = toint(customDimensions['output_tokens'])
126+
| extend total_tokens = toint(customDimensions['total_tokens'])
127+
| summarize
128+
InputTokens = sum(input_tokens),
129+
OutputTokens = sum(output_tokens),
130+
TotalTokens = sum(total_tokens),
131+
Invocations = count()
132+
by Model = model
133+
| order by TotalTokens desc
134+
135+
// 10. Token usage by model over time (hourly)
136+
customEvents
137+
| where name == 'LLM_Model_Token_Usage'
138+
| where timestamp > ago(7d)
139+
| extend model = tostring(customDimensions['model_deployment_name'])
140+
| extend total_tokens = toint(customDimensions['total_tokens'])
141+
| summarize TotalTokens = sum(total_tokens) by bin(timestamp, 1h), model
142+
| order by timestamp asc
143+
| render areachart
144+
145+
// 11. Model token distribution (pie chart)
146+
customEvents
147+
| where name == 'LLM_Model_Token_Usage'
148+
| where timestamp > ago(7d)
149+
| extend model = tostring(customDimensions['model_deployment_name'])
150+
| extend total_tokens = toint(customDimensions['total_tokens'])
151+
| summarize TotalTokens = sum(total_tokens) by model
152+
| render piechart
153+
154+
// 12. Estimated cost by model (adjust pricing per model)
155+
let gpt4o_input = 2.50;
156+
let gpt4o_output = 10.00;
157+
let gpt4o_mini_input = 0.15;
158+
let gpt4o_mini_output = 0.60;
159+
customEvents
160+
| where name == 'LLM_Model_Token_Usage'
161+
| where timestamp > ago(30d)
162+
| extend model = tostring(customDimensions['model_deployment_name'])
163+
| extend input_tokens = toint(customDimensions['input_tokens'])
164+
| extend output_tokens = toint(customDimensions['output_tokens'])
165+
| summarize TotalInput = sum(input_tokens), TotalOutput = sum(output_tokens) by model
166+
| extend InputPrice = case(
167+
model has "mini", gpt4o_mini_input,
168+
gpt4o_input)
169+
| extend OutputPrice = case(
170+
model has "mini", gpt4o_mini_output,
171+
gpt4o_output)
172+
| extend InputCost = round(TotalInput * InputPrice / 1000000.0, 4)
173+
| extend OutputCost = round(TotalOutput * OutputPrice / 1000000.0, 4)
174+
| extend TotalCost = InputCost + OutputCost
175+
| project Model = model, TotalInput, TotalOutput, InputCost, OutputCost, TotalCost
176+
| order by TotalCost desc
177+
178+
// 13. Step-to-model mapping with token usage
179+
customEvents
180+
| where name == 'LLM_Agent_Token_Usage'
181+
| where timestamp > ago(7d)
182+
| extend agent = tostring(customDimensions['agent_name'])
183+
| extend model = tostring(customDimensions['model_deployment_name'])
184+
| extend input_tokens = toint(customDimensions['input_tokens'])
185+
| extend output_tokens = toint(customDimensions['output_tokens'])
186+
| extend total_tokens = toint(customDimensions['total_tokens'])
187+
| summarize
188+
InputTokens = sum(input_tokens),
189+
OutputTokens = sum(output_tokens),
190+
TotalTokens = sum(total_tokens),
191+
Invocations = count()
192+
by Step = agent, Model = model
193+
| order by TotalTokens desc
194+
195+
// 14. RAI agent specific token usage
196+
customEvents
197+
| where name == 'LLM_Agent_Token_Usage'
198+
| where timestamp > ago(7d)
199+
| extend agent = tostring(customDimensions['agent_name'])
200+
| where agent == "RAI"
201+
| extend input_tokens = toint(customDimensions['input_tokens'])
202+
| extend output_tokens = toint(customDimensions['output_tokens'])
203+
| extend total_tokens = toint(customDimensions['total_tokens'])
204+
| extend model = tostring(customDimensions['model_deployment_name'])
205+
| summarize
206+
InputTokens = sum(input_tokens),
207+
OutputTokens = sum(output_tokens),
208+
TotalTokens = sum(total_tokens),
209+
Invocations = count()
210+
by Model = model
211+
212+
// 15. OpenTelemetry auto-instrumented OpenAI calls (if available)
213+
dependencies
214+
| where name has "openai" or target has "openai"
215+
| where timestamp > ago(7d)
216+
| extend input_tokens = tolong(customDimensions["gen_ai.usage.input_tokens"])
217+
| extend output_tokens = tolong(customDimensions["gen_ai.usage.output_tokens"])
218+
| extend model = tostring(customDimensions["gen_ai.request.model"])
219+
| where isnotnull(input_tokens)
220+
| summarize
221+
Calls = count(),
222+
TotalInput = sum(input_tokens),
223+
TotalOutput = sum(output_tokens)
224+
by model
225+
| order by TotalInput desc
226+
227+
// ============================================================
228+
// Content Processing Specific Queries
229+
// ============================================================
230+
231+
// 16. Token usage by file type (PDF, DOCX, image, etc.)
232+
customEvents
233+
| where name == 'LLM_Token_Usage_Summary'
234+
| where timestamp > ago(7d)
235+
| extend total_tokens = toint(customDimensions['total_tokens'])
236+
| extend input_tokens = toint(customDimensions['total_input_tokens'])
237+
| extend output_tokens = toint(customDimensions['total_output_tokens'])
238+
| extend mime_type = tostring(customDimensions['file_mime_type'])
239+
| extend file_type = case(
240+
mime_type has "pdf", "PDF",
241+
mime_type has "image", "Image",
242+
mime_type has "word" or mime_type has "docx", "Word",
243+
mime_type has "excel" or mime_type has "xlsx", "Excel",
244+
mime_type has "text", "Text",
245+
"Other")
246+
| summarize
247+
Documents = count(),
248+
TotalInputTokens = sum(input_tokens),
249+
TotalOutputTokens = sum(output_tokens),
250+
TotalTokens = sum(total_tokens),
251+
AvgTokensPerDoc = round(avg(total_tokens), 0)
252+
by FileType = file_type
253+
| order by TotalTokens desc
254+
255+
// 17. Per-document token breakdown by step
256+
customEvents
257+
| where name == 'LLM_Agent_Token_Usage'
258+
| where timestamp > ago(7d)
259+
| extend agent = tostring(customDimensions['agent_name'])
260+
| extend process_id = tostring(customDimensions['process_id'])
261+
| extend input_tokens = toint(customDimensions['input_tokens'])
262+
| extend output_tokens = toint(customDimensions['output_tokens'])
263+
| extend total_tokens = toint(customDimensions['total_tokens'])
264+
| summarize
265+
InputTokens = sum(input_tokens),
266+
OutputTokens = sum(output_tokens),
267+
TotalTokens = sum(total_tokens)
268+
by process_id, Step = agent
269+
| order by process_id, TotalTokens desc
270+
271+
// 18. Daily processing volume with token costs
272+
customEvents
273+
| where name == 'LLM_Token_Usage_Summary'
274+
| where timestamp > ago(30d)
275+
| extend total_tokens = toint(customDimensions['total_tokens'])
276+
| extend file_name = tostring(customDimensions['file_name'])
277+
| summarize
278+
DocumentsProcessed = count(),
279+
TotalTokens = sum(total_tokens),
280+
AvgTokensPerDoc = round(avg(total_tokens), 0),
281+
MaxTokensPerDoc = max(total_tokens)
282+
by Day = bin(timestamp, 1d)
283+
| order by Day desc

src/ContentProcessor/src/libs/pipeline/handlers/map_handler.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from libs.pipeline.entities.pipeline_step_result import StepResult
2929
from libs.pipeline.entities.schema import Schema
3030
from libs.pipeline.queue_handler_base import HandlerBase
31+
from libs.token_usage_utils import emit_agent_token_event, extract_token_usage
3132
from libs.utils.remote_schema_loader import load_schema_from_blob_json
3233

3334
logger = logging.getLogger(__name__)
@@ -263,6 +264,15 @@ async def execute(self, context: MessageContext) -> StepResult:
263264
options={"logprobs": True, "top_logprobs": 5},
264265
)
265266

267+
# Track token usage for this LLM call
268+
token_usage = extract_token_usage(gpt_response)
269+
emit_agent_token_event(
270+
agent_name="MapHandler",
271+
model_deployment_name=self.application_context.configuration.app_azure_openai_model,
272+
usage=token_usage,
273+
process_id=context.data_pipeline.pipeline_status.process_id,
274+
)
275+
266276
response_content = gpt_response.text # Json format string
267277

268278
cleaned_content = (

src/ContentProcessor/src/libs/pipeline/handlers/save_handler.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from libs.pipeline.entities.schema import Schema
2121
from libs.pipeline.handlers.logics.evaluate_handler.model import DataExtractionResult
2222
from libs.pipeline.queue_handler_base import HandlerBase
23+
from libs.token_usage_utils import emit_model_token_event, emit_summary_token_event
2324

2425

2526
class SaveHandler(HandlerBase):
@@ -168,6 +169,27 @@ def find_process_result(step_name: str):
168169
collection_name=self.application_context.configuration.app_cosmos_container_process,
169170
)
170171

172+
# Emit token usage summary and per-model events to Application Insights
173+
emit_summary_token_event(
174+
total_input_tokens=evaluated_result.prompt_tokens,
175+
total_output_tokens=evaluated_result.completion_tokens,
176+
total_tokens=evaluated_result.prompt_tokens + evaluated_result.completion_tokens,
177+
process_id=context.data_pipeline.pipeline_status.process_id,
178+
file_name=context.data_pipeline.get_source_files()[0].name,
179+
file_mime_type=context.data_pipeline.get_source_files()[0].mime_type,
180+
agent_count=1,
181+
model_count=1,
182+
)
183+
emit_model_token_event(
184+
model_deployment_name=self.application_context.configuration.app_azure_openai_model,
185+
usage={
186+
"input_tokens": evaluated_result.prompt_tokens,
187+
"output_tokens": evaluated_result.completion_tokens,
188+
"total_tokens": evaluated_result.prompt_tokens + evaluated_result.completion_tokens,
189+
},
190+
process_id=context.data_pipeline.pipeline_status.process_id,
191+
)
192+
171193
# save process_output to blob storage.
172194
processed_history = context.data_pipeline.add_file(
173195
file_name="step_outputs.json", artifact_type=ArtifactType.SavedContent

0 commit comments

Comments
 (0)