diff --git a/containers/api-proxy/Dockerfile b/containers/api-proxy/Dockerfile index 1863c211..0c71ff1e 100644 --- a/containers/api-proxy/Dockerfile +++ b/containers/api-proxy/Dockerfile @@ -15,7 +15,9 @@ COPY package*.json ./ RUN npm ci --omit=dev # Copy application files -COPY server.js logging.js metrics.js rate-limiter.js token-tracker.js \ +COPY server.js logging.js metrics.js rate-limiter.js \ + token-tracker.js token-persistence.js token-parsers.js \ + token-tracker-http.js token-tracker-ws.js \ model-resolver.js proxy-utils.js anthropic-transforms.js \ proxy-request.js model-discovery.js management.js oidc-token-provider.js \ oidc-token-provider-base.js \ diff --git a/containers/api-proxy/token-parsers.js b/containers/api-proxy/token-parsers.js new file mode 100644 index 00000000..c13a6b6d --- /dev/null +++ b/containers/api-proxy/token-parsers.js @@ -0,0 +1,264 @@ +/** + * Token usage parsers for AWF API Proxy. + * + * Pure parsing and normalization functions — no I/O, no side effects. + * Covers SSE streaming, non-streaming JSON, decompression helpers, and + * usage normalization into a unified format. + */ + +'use strict'; + +const zlib = require('zlib'); + +/** + * Check if a response is SSE (Server-Sent Events) streaming. + */ +function isStreamingResponse(headers) { + const ct = headers['content-type'] || ''; + return ct.includes('text/event-stream'); +} + +/** + * Check if a response is gzip or deflate compressed. + */ +function isCompressedResponse(headers) { + const ce = (headers['content-encoding'] || '').toLowerCase(); + return ce === 'gzip' || ce === 'deflate' || ce === 'br'; +} + +/** + * Create a decompression transform stream based on content-encoding. + * Returns null if the encoding is not supported. + */ +function createDecompressor(headers) { + const ce = (headers['content-encoding'] || '').toLowerCase(); + if (ce === 'gzip') return zlib.createGunzip(); + if (ce === 'deflate') return zlib.createInflate(); + if (ce === 'br') return zlib.createBrotliDecompress(); + return null; +} + +/** + * Extract reasoning token count from provider usage payloads. + * + * Supports explicit `reasoning_tokens` and provider-specific nested fields. + * Priority order: top-level → completion_tokens_details → output_tokens_details. + */ +function extractReasoningTokens(usage) { + if (!usage || typeof usage !== 'object') return undefined; + if (typeof usage.reasoning_tokens === 'number') return usage.reasoning_tokens; + if (usage.completion_tokens_details && typeof usage.completion_tokens_details.reasoning_tokens === 'number') { + return usage.completion_tokens_details.reasoning_tokens; + } + if (usage.output_tokens_details && typeof usage.output_tokens_details.reasoning_tokens === 'number') { + return usage.output_tokens_details.reasoning_tokens; + } + return undefined; +} + +/** + * Extract token usage from a non-streaming JSON response body. + * + * Supports: + * - OpenAI/Copilot: { usage: { prompt_tokens, completion_tokens, total_tokens, prompt_tokens_details: { cached_tokens } } } + * - Anthropic: { usage: { input_tokens, output_tokens, cache_creation_input_tokens, cache_read_input_tokens } } + * + * Also extracts the model field if present. + * + * @param {Buffer} body - Response body + * @returns {{ usage: object|null, model: string|null }} + */ +function extractUsageFromJson(body) { + try { + const text = body.toString('utf8'); + const json = JSON.parse(text); + const usageSource = (json.usage && typeof json.usage === 'object') + ? json.usage + : ((json.response && json.response.usage && typeof json.response.usage === 'object') + ? json.response.usage + : null); + const result = { usage: null, model: json.model || (json.response && json.response.model) || null }; + + if (usageSource) { + const usage = {}; + let hasField = false; + // Anthropic fields + if (typeof usageSource.input_tokens === 'number') { + usage.input_tokens = usageSource.input_tokens; + hasField = true; + } + if (typeof usageSource.output_tokens === 'number') { + usage.output_tokens = usageSource.output_tokens; + hasField = true; + } + if (typeof usageSource.cache_creation_input_tokens === 'number') { + usage.cache_creation_input_tokens = usageSource.cache_creation_input_tokens; + hasField = true; + } + if (typeof usageSource.cache_read_input_tokens === 'number') { + usage.cache_read_input_tokens = usageSource.cache_read_input_tokens; + hasField = true; + } + // OpenAI/Copilot fields + if (typeof usageSource.prompt_tokens === 'number') { + usage.prompt_tokens = usageSource.prompt_tokens; + hasField = true; + } + if (typeof usageSource.completion_tokens === 'number') { + usage.completion_tokens = usageSource.completion_tokens; + hasField = true; + } + if (typeof usageSource.total_tokens === 'number') { + usage.total_tokens = usageSource.total_tokens; + hasField = true; + } + const reasoningTokens = extractReasoningTokens(usageSource); + if (typeof reasoningTokens === 'number') { + usage.reasoning_tokens = reasoningTokens; + hasField = true; + } + // OpenAI/Copilot nested cache fields (prompt_tokens_details.cached_tokens) + if (usageSource.prompt_tokens_details && typeof usageSource.prompt_tokens_details.cached_tokens === 'number') { + usage.cache_read_input_tokens = usageSource.prompt_tokens_details.cached_tokens; + hasField = true; + } + if (hasField) { + result.usage = usage; + } + } + + return result; + } catch { + return { usage: null, model: null }; + } +} + +/** + * Extract token usage from a single SSE data line. + * + * SSE format: "data: {json}\n\n" + * + * Anthropic streaming events with usage: + * - message_start: { type: "message_start", message: { usage: { input_tokens, cache_creation_input_tokens, cache_read_input_tokens } } } + * - message_delta: { type: "message_delta", usage: { output_tokens } } + * + * OpenAI/Copilot streaming events with usage: + * - Final chunk: { usage: { prompt_tokens, completion_tokens, total_tokens, prompt_tokens_details: { cached_tokens } } } + * + * @param {string} line - A single SSE data line (without "data: " prefix) + * @returns {{ usage: object|null, model: string|null }} + */ +function extractUsageFromSseLine(line) { + if (!line || line === '[DONE]') return { usage: null, model: null }; + + try { + const json = JSON.parse(line); + const result = { usage: null, model: json.model || null }; + + // Anthropic message_start: usage is inside message object + if (json.type === 'message_start' && json.message && json.message.usage) { + result.usage = {}; + const u = json.message.usage; + if (typeof u.input_tokens === 'number') result.usage.input_tokens = u.input_tokens; + if (typeof u.cache_creation_input_tokens === 'number') result.usage.cache_creation_input_tokens = u.cache_creation_input_tokens; + if (typeof u.cache_read_input_tokens === 'number') result.usage.cache_read_input_tokens = u.cache_read_input_tokens; + result.model = (json.message && json.message.model) || result.model; + return result; + } + + // Anthropic message_delta: usage at top level + if (json.type === 'message_delta' && json.usage) { + result.usage = {}; + if (typeof json.usage.output_tokens === 'number') result.usage.output_tokens = json.usage.output_tokens; + return result; + } + + // OpenAI Responses API: usage in response object on completion events + if ((json.type === 'response.completed' || json.type === 'response.done') + && json.response && json.response.usage && typeof json.response.usage === 'object') { + const u = json.response.usage; + result.usage = {}; + if (typeof u.input_tokens === 'number') result.usage.input_tokens = u.input_tokens; + if (typeof u.output_tokens === 'number') result.usage.output_tokens = u.output_tokens; + if (typeof u.total_tokens === 'number') result.usage.total_tokens = u.total_tokens; + const reasoningTokens = extractReasoningTokens(u); + if (typeof reasoningTokens === 'number') result.usage.reasoning_tokens = reasoningTokens; + if (u.prompt_tokens_details && typeof u.prompt_tokens_details.cached_tokens === 'number') { + result.usage.cache_read_input_tokens = u.prompt_tokens_details.cached_tokens; + } + result.model = json.response.model || result.model; + return result; + } + + // OpenAI/Copilot: usage at top level in final chunk + if (json.usage && typeof json.usage === 'object') { + result.usage = {}; + if (typeof json.usage.prompt_tokens === 'number') result.usage.prompt_tokens = json.usage.prompt_tokens; + if (typeof json.usage.completion_tokens === 'number') result.usage.completion_tokens = json.usage.completion_tokens; + if (typeof json.usage.total_tokens === 'number') result.usage.total_tokens = json.usage.total_tokens; + const reasoningTokens = extractReasoningTokens(json.usage); + if (typeof reasoningTokens === 'number') { + result.usage.reasoning_tokens = reasoningTokens; + } + // OpenAI/Copilot nested cache fields (prompt_tokens_details.cached_tokens) + if (json.usage.prompt_tokens_details && typeof json.usage.prompt_tokens_details.cached_tokens === 'number') { + result.usage.cache_read_input_tokens = json.usage.prompt_tokens_details.cached_tokens; + } + return result; + } + + return result; + } catch { + return { usage: null, model: null }; + } +} + +/** + * Extract all SSE data lines from a text chunk. + * Lines are prefixed with "data: " in the SSE protocol. + */ +function parseSseDataLines(text) { + const lines = []; + const parts = text.split('\n'); + for (const part of parts) { + const trimmed = part.trim(); + if (trimmed.startsWith('data: ')) { + lines.push(trimmed.slice(6)); + } else if (trimmed === 'data:') { + // empty data line + } + } + return lines; +} + +/** + * Normalize extracted usage into a unified format. + * + * Output fields: + * - input_tokens: number (from Anthropic input_tokens or OpenAI prompt_tokens) + * - output_tokens: number (from Anthropic output_tokens or OpenAI completion_tokens) + * - cache_read_tokens: number (from Anthropic cache_read_input_tokens or OpenAI prompt_tokens_details.cached_tokens) + * - cache_write_tokens: number (Anthropic cache_creation_input_tokens; not available in OpenAI format) + */ +function normalizeUsage(usage) { + if (!usage) return null; + + return { + input_tokens: usage.input_tokens ?? usage.prompt_tokens ?? 0, + output_tokens: usage.output_tokens ?? usage.completion_tokens ?? 0, + cache_read_tokens: usage.cache_read_input_tokens ?? 0, + cache_write_tokens: usage.cache_creation_input_tokens ?? 0, + reasoning_tokens: usage.reasoning_tokens ?? 0, + }; +} + +module.exports = { + isStreamingResponse, + isCompressedResponse, + createDecompressor, + extractReasoningTokens, + extractUsageFromJson, + extractUsageFromSseLine, + parseSseDataLines, + normalizeUsage, +}; diff --git a/containers/api-proxy/token-persistence.js b/containers/api-proxy/token-persistence.js new file mode 100644 index 00000000..962821f6 --- /dev/null +++ b/containers/api-proxy/token-persistence.js @@ -0,0 +1,187 @@ +/** + * Token usage persistence layer for AWF API Proxy. + * + * Manages the NDJSON token-usage log file: stream lifecycle, record + * validation, and disk writes. Also owns the optional diagnostics log + * (AWF_DEBUG_TOKENS=1). + */ + +'use strict'; + +const fs = require('fs'); +const path = require('path'); +const { logRequest } = require('./logging'); + +// Token usage log file path (inside the mounted log volume) +const TOKEN_LOG_DIR = process.env.AWF_TOKEN_LOG_DIR || '/var/log/api-proxy'; +const TOKEN_LOG_FILE = path.join(TOKEN_LOG_DIR, 'token-usage.jsonl'); +const DIAG_LOG_FILE = path.join(TOKEN_LOG_DIR, 'token-diag.log'); +const DIAG_ENABLED = process.env.AWF_DEBUG_TOKENS === '1'; + +// AWF version used to identify schema version in JSONL records. +// Set to the container image version at build time via ARG AWF_VERSION in the Dockerfile +// (baked in by the release workflow with --build-arg AWF_VERSION=). +// Falls back to "0.0.0-dev" for local/un-versioned builds. +const AWF_VERSION = process.env.AWF_VERSION; +if (!AWF_VERSION) { + // Log a warning (to stderr to avoid polluting stdout) when running without the env var. + // This can happen during local development or tests outside the container. + process.stderr.write('{"level":"warn","event":"awf_version_missing","message":"AWF_VERSION env var not set; _schema will use 0.0.0-dev"}\n'); +} +const TOKEN_USAGE_SCHEMA = `token-usage/v${AWF_VERSION || '0.0.0-dev'}`; + +let logStream = null; +let diagStream = null; + +/** + * Write a diagnostic line to the diagnostics log file. + * Only active when AWF_DEBUG_TOKENS=1 environment variable is set. + * Does not persist request/response payload data. + */ +function diag(msg, _data) { + if (!DIAG_ENABLED) return; + try { + if (!diagStream) { + fs.mkdirSync(TOKEN_LOG_DIR, { recursive: true }); + diagStream = fs.createWriteStream(DIAG_LOG_FILE, { flags: 'a' }); + diagStream.on('error', () => { diagStream = null; }); + } + const safeMsg = typeof msg === 'string' + ? msg.replace(/[^\w:-]/g, '_').slice(0, 64) + : 'DIAG'; + const line = `${new Date().toISOString()} ${safeMsg}\n`; + diagStream.write(line); + } catch { /* best-effort */ } +} + +/** + * Get or create the JSONL append stream for token usage logs. + * Uses a lazy singleton — created on first write. + */ +function getLogStream() { + if (logStream) return logStream; + try { + // Ensure directory exists + fs.mkdirSync(TOKEN_LOG_DIR, { recursive: true }); + logStream = fs.createWriteStream(TOKEN_LOG_FILE, { flags: 'a' }); + logStream.on('error', (err) => { + logRequest('warn', 'token_log_error', { error: err.message }); + logStream = null; + }); + return logStream; + } catch (err) { + logRequest('warn', 'token_log_init_error', { error: err.message }); + return null; + } +} + +/** + * Validate a token usage record against the token-usage schema contract. + * + * Checks that all required fields are present and have the expected types. + * Logs a warning and returns false if the record is non-conformant; does + * not throw, so a bad record is dropped rather than crashing the proxy. + * + * @param {object} record - The record to validate + * @returns {boolean} true if the record is valid, false otherwise + */ +function validateTokenUsageRecord(record) { + if (!record || typeof record !== 'object') { + logRequest('warn', 'token_record_schema_violation', { + field: 'record', + expected: 'object', + actual: record === null ? 'null' : typeof record, + }); + return false; + } + + const required = [ + ['_schema', 'string'], + ['timestamp', 'string'], + ['request_id', 'string'], + ['provider', 'string'], + ['model', 'string'], + ['path', 'string'], + ['status', 'number'], + ['streaming', 'boolean'], + ['input_tokens', 'number'], + ['output_tokens', 'number'], + ['cache_read_tokens', 'number'], + ['cache_write_tokens', 'number'], + ['duration_ms', 'number'], + ]; + + for (const [field, expectedType] of required) { + // eslint-disable-next-line valid-typeof + if (typeof record[field] !== expectedType) { + logRequest('warn', 'token_record_schema_violation', { + request_id: record.request_id, + field, + expected: expectedType, + actual: typeof record[field], + }); + return false; + } + } + + if (!/^token-usage\/v\d+\.\d+\.\d+(-\w+)?$/.test(record._schema)) { + logRequest('warn', 'token_record_schema_violation', { + request_id: record.request_id, + field: '_schema', + expected: 'token-usage/v', + actual: record._schema, + }); + return false; + } + + return true; +} + +/** + * Write a token usage record to the JSONL log file. + * Validates the record against the token-usage schema before writing. + * Handles backpressure by dropping writes when the stream buffer is full. + */ +function writeTokenUsage(record) { + if (!validateTokenUsageRecord(record)) return; + + const stream = getLogStream(); + if (stream && !stream.writableEnded) { + const ok = stream.write(JSON.stringify(record) + '\n'); + if (!ok) { + // Backpressure — stream buffer full. Drop this write rather than + // accumulating unbounded memory. The 'drain' event will unblock + // future writes naturally. + logRequest('warn', 'token_log_backpressure', { request_id: record.request_id }); + } + } +} + +/** + * Close the log stream (for graceful shutdown). + * Returns a Promise that resolves once the stream has flushed. + */ +function closeLogStream() { + return new Promise((resolve) => { + let pending = 0; + const check = () => { if (pending === 0) resolve(); }; + if (logStream) { + pending++; + logStream.end(() => { logStream = null; pending--; check(); }); + } + if (diagStream) { + pending++; + diagStream.end(() => { diagStream = null; pending--; check(); }); + } + if (pending === 0) resolve(); + }); +} + +module.exports = { + TOKEN_LOG_FILE, + TOKEN_USAGE_SCHEMA, + diag, + validateTokenUsageRecord, + writeTokenUsage, + closeLogStream, +}; diff --git a/containers/api-proxy/token-tracker-http.js b/containers/api-proxy/token-tracker-http.js new file mode 100644 index 00000000..72980ef0 --- /dev/null +++ b/containers/api-proxy/token-tracker-http.js @@ -0,0 +1,280 @@ +/** + * HTTP response token usage tracker for AWF API Proxy. + * + * Intercepts LLM API HTTP responses (both streaming SSE and non-streaming + * JSON) to extract token usage data without adding latency to the client. + * + * Architecture: + * proxyRes (LLM response) → res (client) + * ├─ on('data'): buffer/inspect chunks for usage extraction + * └─ on('end'): finalize parsing → log to file + metrics + * + * For non-streaming responses: buffer the JSON body (up to MAX_BUFFER_SIZE), + * then parse it on 'end' to extract usage fields. + * For streaming (SSE) responses: scan each chunk for usage events as they + * are received, accumulate usage from message_start / message_delta / final + * data events, and log the aggregated result on 'end'. + */ + +'use strict'; + +const { logRequest } = require('./logging'); +const { + isStreamingResponse, + isCompressedResponse, + createDecompressor, + parseSseDataLines, + extractUsageFromSseLine, + extractUsageFromJson, + normalizeUsage, +} = require('./token-parsers'); +const { writeTokenUsage, TOKEN_USAGE_SCHEMA, diag } = require('./token-persistence'); + +// Max response body to buffer for non-streaming usage extraction (5 MB). +// Responses larger than this are still forwarded but usage is not extracted. +const MAX_BUFFER_SIZE = 5 * 1024 * 1024; + +/** + * Attach token usage tracking to an upstream response. + * + * This function listens on the proxyRes 'data' and 'end' events to extract + * token usage. It does NOT modify the response stream — the caller still + * does proxyRes.pipe(res) as before. + * + * If the response is gzip/deflate compressed (common with Anthropic API), + * we decompress a copy of the data for parsing while the compressed bytes + * still flow to the client unchanged. + * + * @param {http.IncomingMessage} proxyRes - Upstream response + * @param {object} opts + * @param {string} opts.requestId - Request ID for correlation + * @param {string} opts.provider - Provider name (openai, anthropic, copilot, opencode) + * @param {string} opts.path - Request path + * @param {number} opts.startTime - Request start time (Date.now()) + * @param {object} opts.metrics - Metrics module reference + * @param {object|null} opts.billingInfo - Extracted billing/quota headers from response + * @param {string|null} opts.initiatorSent - X-Initiator value sent on the request + * @param {(normalizedUsage: object, model: string|null) => void} [opts.onUsage] - Optional callback invoked after normalized usage is extracted + */ +function trackTokenUsage(proxyRes, opts) { + const { requestId, provider, path: reqPath, startTime, metrics: metricsRef, billingInfo, initiatorSent, onUsage } = opts; + const streaming = isStreamingResponse(proxyRes.headers); + const contentType = proxyRes.headers['content-type'] || '(none)'; + const contentEncoding = proxyRes.headers['content-encoding'] || '(none)'; + const compressed = isCompressedResponse(proxyRes.headers); + + logRequest('debug', 'token_track_start', { + request_id: requestId, + provider, + path: reqPath, + streaming, + content_type: contentType, + content_encoding: contentEncoding, + status: proxyRes.statusCode, + }); + diag('HTTP_TRACK_START', { request_id: requestId, provider, path: reqPath, streaming, content_type: contentType, content_encoding: contentEncoding, status: proxyRes.statusCode }); + + // Accumulate response body for usage extraction + const chunks = []; + let totalBytes = 0; + let bufferedBytes = 0; + let overflow = false; + + // For streaming: accumulate usage across SSE events + let streamingUsage = {}; + let streamingModel = null; + let partialLine = ''; + + // If the response is compressed, create a decompressor. + // We feed raw chunks into it and listen on the decompressed output. + // The raw proxyRes still flows to the client unchanged via pipe(). + let decompressor = null; + if (compressed) { + decompressor = createDecompressor(proxyRes.headers); + if (decompressor) { + decompressor.on('error', (err) => { + diag('DECOMPRESS_ERROR', { request_id: requestId, error: err.message }); + }); + } + } + + // The source for text parsing: decompressor output (if compressed) or raw chunks + function handleDecodedChunk(text) { + if (streaming) { + const combined = partialLine + text; + const lastNewline = combined.lastIndexOf('\n'); + if (lastNewline >= 0) { + const complete = combined.slice(0, lastNewline); + partialLine = combined.slice(lastNewline + 1); + + const dataLines = parseSseDataLines(complete); + for (const line of dataLines) { + const { usage, model } = extractUsageFromSseLine(line); + if (model && !streamingModel) streamingModel = model; + if (usage) { + for (const [k, v] of Object.entries(usage)) { + streamingUsage[k] = v; + } + } + } + } else { + partialLine = combined; + } + } else if (!overflow) { + const chunkBuffer = Buffer.from(text, 'utf8'); + if (bufferedBytes + chunkBuffer.length > MAX_BUFFER_SIZE) { + const attemptedBytes = bufferedBytes + chunkBuffer.length; + overflow = true; + chunks.length = 0; + bufferedBytes = 0; + diag('HTTP_TRACK_BUFFER_OVERFLOW', { request_id: requestId, provider, buffered_bytes: attemptedBytes }); + return; + } + chunks.push(chunkBuffer); + bufferedBytes += chunkBuffer.length; + } + } + + if (decompressor) { + // Feed decompressed text to our parser + decompressor.on('data', (decompressedChunk) => { + handleDecodedChunk(decompressedChunk.toString('utf8')); + }); + + // Feed raw compressed bytes into the decompressor + proxyRes.on('data', (chunk) => { + totalBytes += chunk.length; + try { decompressor.write(chunk); } catch { /* ignore write errors */ } + }); + + proxyRes.on('end', () => { + try { decompressor.end(); } catch { /* ignore */ } + }); + + // Finalize on decompressor end + decompressor.on('end', () => { + finalizeTracking(); + }); + } else { + // No compression — parse raw chunks directly + proxyRes.on('data', (chunk) => { + totalBytes += chunk.length; + handleDecodedChunk(chunk.toString('utf8')); + }); + + proxyRes.on('end', () => { + finalizeTracking(); + }); + } + + function finalizeTracking() { + // Only process successful responses (2xx) + if (proxyRes.statusCode < 200 || proxyRes.statusCode >= 300) { + logRequest('debug', 'token_track_skip_status', { + request_id: requestId, + provider, + status: proxyRes.statusCode, + }); + diag('HTTP_TRACK_SKIP_STATUS', { request_id: requestId, provider, status: proxyRes.statusCode }); + return; + } + + const duration = Date.now() - startTime; + let usage = null; + let model = null; + + if (streaming) { + // Process any remaining partial line + if (partialLine.trim()) { + const dataLines = parseSseDataLines(partialLine); + for (const line of dataLines) { + const { usage: u, model: m } = extractUsageFromSseLine(line); + if (m && !streamingModel) streamingModel = m; + if (u) { + for (const [k, v] of Object.entries(u)) { + streamingUsage[k] = v; + } + } + } + } + + if (Object.keys(streamingUsage).length > 0) { + usage = streamingUsage; + model = streamingModel; + } + } else if (!overflow && chunks.length > 0) { + const body = Buffer.concat(chunks); + const result = extractUsageFromJson(body); + usage = result.usage; + model = result.model; + } + + logRequest('debug', 'token_track_end', { + request_id: requestId, + provider, + streaming, + total_bytes: totalBytes, + overflow, + has_usage: !!usage, + usage_keys: usage ? Object.keys(usage) : [], + model, + compressed, + }); + diag('HTTP_TRACK_END', { request_id: requestId, provider, streaming, total_bytes: totalBytes, overflow, has_usage: !!usage, usage_keys: usage ? Object.keys(usage) : [], model, compressed, content_encoding: contentEncoding }); + + const normalized = normalizeUsage(usage); + if (!normalized) return; + if (typeof onUsage === 'function') { + try { + onUsage(normalized, model || 'unknown'); + } catch { + // best-effort callback + } + } + + // Update metrics + if (metricsRef) { + metricsRef.increment('input_tokens_total', { provider }, normalized.input_tokens); + metricsRef.increment('output_tokens_total', { provider }, normalized.output_tokens); + } + + // Build log record + const record = { + _schema: TOKEN_USAGE_SCHEMA, + timestamp: new Date().toISOString(), + request_id: requestId, + provider, + model: model || 'unknown', + path: reqPath, + status: proxyRes.statusCode, + streaming, + input_tokens: normalized.input_tokens, + output_tokens: normalized.output_tokens, + cache_read_tokens: normalized.cache_read_tokens, + cache_write_tokens: normalized.cache_write_tokens, + duration_ms: duration, + response_bytes: totalBytes, + }; + + // Include billing/quota info when available (Copilot PRU tracking) + if (initiatorSent) record.x_initiator = initiatorSent; + if (billingInfo) record.billing = billingInfo; + + // Write to JSONL log file + writeTokenUsage(record); + + // Log summary to stdout + logRequest('info', 'token_usage', { + request_id: requestId, + provider, + model: model || 'unknown', + input_tokens: normalized.input_tokens, + output_tokens: normalized.output_tokens, + cache_read_tokens: normalized.cache_read_tokens, + cache_write_tokens: normalized.cache_write_tokens, + streaming, + }); + } +} + +module.exports = { trackTokenUsage }; diff --git a/containers/api-proxy/token-tracker-ws.js b/containers/api-proxy/token-tracker-ws.js new file mode 100644 index 00000000..e2734797 --- /dev/null +++ b/containers/api-proxy/token-tracker-ws.js @@ -0,0 +1,242 @@ +/** + * WebSocket token usage tracker for AWF API Proxy. + * + * Claude Code CLI uses WebSocket streaming to the Anthropic API. The + * api-proxy relays this as a raw socket pipe (tlsSocket ↔ clientSocket). + * This module adds a non-blocking 'data' listener on the upstream socket + * to parse WebSocket frames and extract token usage from JSON text messages. + */ + +'use strict'; + +const { logRequest } = require('./logging'); +const { extractUsageFromSseLine, normalizeUsage } = require('./token-parsers'); +const { writeTokenUsage, TOKEN_USAGE_SCHEMA, diag } = require('./token-persistence'); + +/** + * Parse WebSocket frames from a buffer (server→client direction, unmasked). + * + * Returns an object with: + * - messages: Array of decoded text frame payloads (strings) + * - consumed: Number of bytes consumed from the buffer + * + * Only handles non-fragmented text frames (FIN=1, opcode=1). + * Other frame types (binary, ping, pong, close, continuation) are consumed + * but their payloads are not returned. + * + * @param {Buffer} buf - Buffer containing WebSocket frame data + * @returns {{ messages: string[], consumed: number }} + */ +function parseWebSocketFrames(buf) { + const messages = []; + let pos = 0; + + while (pos + 2 <= buf.length) { + const firstByte = buf[pos]; + const secondByte = buf[pos + 1]; + const fin = (firstByte & 0x80) !== 0; + const opcode = firstByte & 0x0F; + const masked = (secondByte & 0x80) !== 0; + let payloadLength = secondByte & 0x7F; + let headerSize = 2; + + if (payloadLength === 126) { + if (pos + 4 > buf.length) break; + payloadLength = buf.readUInt16BE(pos + 2); + headerSize = 4; + } else if (payloadLength === 127) { + if (pos + 10 > buf.length) break; + payloadLength = Number(buf.readBigUInt64BE(pos + 2)); + headerSize = 10; + } + + if (masked) { + if (pos + headerSize + 4 > buf.length) break; + headerSize += 4; // masking key length + } + + const frameEnd = pos + headerSize + payloadLength; + if (frameEnd > buf.length) break; + + // Extract text frames (opcode 1) with FIN set + if (opcode === 1 && fin) { + const payloadStart = pos + headerSize; + if (masked) { + const maskKeyStart = payloadStart - 4; + const maskingKey = buf.slice(maskKeyStart, maskKeyStart + 4); + const maskedPayload = buf.slice(payloadStart, frameEnd); + const unmasked = Buffer.allocUnsafe(payloadLength); + for (let i = 0; i < payloadLength; i++) { + unmasked[i] = maskedPayload[i] ^ maskingKey[i % 4]; + } + messages.push(unmasked.toString('utf8')); + } else { + messages.push(buf.slice(payloadStart, frameEnd).toString('utf8')); + } + } + + pos = frameEnd; + } + + return { messages, consumed: pos }; +} + +/** + * Attach token usage tracking to a WebSocket upstream connection. + * + * Claude Code CLI uses WebSocket streaming to the Anthropic API. The + * api-proxy relays this as a raw socket pipe (tlsSocket ↔ clientSocket). + * This function adds a non-blocking 'data' listener on the upstream socket + * to parse WebSocket frames and extract token usage from JSON text messages. + * + * The upstream stream starts with an HTTP 101 response header, followed by + * WebSocket frames. This function skips the HTTP header before parsing frames. + * + * @param {import('tls').TLSSocket} upstreamSocket - Upstream TLS socket + * @param {object} opts + * @param {string} opts.requestId - Request ID for correlation + * @param {string} opts.provider - Provider name (anthropic, copilot, etc.) + * @param {string} opts.path - Request path + * @param {number} opts.startTime - Request start time (Date.now()) + * @param {object} opts.metrics - Metrics module reference + * @param {(normalizedUsage: object, model: string|null) => void} [opts.onUsage] - Optional callback invoked after normalized usage is extracted + */ +function trackWebSocketTokenUsage(upstreamSocket, opts) { + const { requestId, provider, path: reqPath, startTime, metrics: metricsRef, onUsage } = opts; + + logRequest('debug', 'ws_token_track_start', { + request_id: requestId, + provider, + path: reqPath, + }); + diag('WS_TRACK_START', { request_id: requestId, provider, path: reqPath }); + + let httpHeaderParsed = false; + let buffer = Buffer.alloc(0); + let totalBytes = 0; + let headerBytes = 0; + let streamingUsage = {}; + let streamingModel = null; + let finalized = false; + let frameCount = 0; + let textMessageCount = 0; + + // Max buffer to prevent unbounded memory growth (1 MB) + const MAX_WS_BUFFER = 1 * 1024 * 1024; + + upstreamSocket.on('data', (chunk) => { + totalBytes += chunk.length; + buffer = Buffer.concat([buffer, chunk]); + + // Safety: drop buffer if it grows too large (malformed frames) + if (buffer.length > MAX_WS_BUFFER) { + buffer = Buffer.alloc(0); + httpHeaderParsed = true; // skip header parsing + return; + } + + // Skip the HTTP 101 Switching Protocols response header + if (!httpHeaderParsed) { + const headerEnd = buffer.indexOf('\r\n\r\n'); + if (headerEnd === -1) return; // need more data for full header + headerBytes = headerEnd + 4; + buffer = buffer.slice(headerBytes); + httpHeaderParsed = true; + } + + // Parse any complete WebSocket frames + const { messages, consumed } = parseWebSocketFrames(buffer); + if (consumed > 0) { + buffer = buffer.slice(consumed); + } + frameCount += messages.length; + + for (const text of messages) { + textMessageCount++; + const { usage, model } = extractUsageFromSseLine(text); + if (model && !streamingModel) streamingModel = model; + if (usage) { + logRequest('debug', 'ws_token_usage_found', { + request_id: requestId, + provider, + usage_keys: Object.keys(usage), + model, + }); + for (const [k, v] of Object.entries(usage)) { + streamingUsage[k] = v; + } + } + } + }); + + function doFinalize() { + if (finalized) return; + finalized = true; + + logRequest('debug', 'ws_token_track_end', { + request_id: requestId, + provider, + total_bytes: totalBytes, + frame_count: frameCount, + text_message_count: textMessageCount, + has_usage: Object.keys(streamingUsage).length > 0, + usage_keys: Object.keys(streamingUsage), + model: streamingModel, + }); + diag('WS_TRACK_END', { request_id: requestId, provider, total_bytes: totalBytes, frame_count: frameCount, text_message_count: textMessageCount, has_usage: Object.keys(streamingUsage).length > 0, usage_keys: Object.keys(streamingUsage), model: streamingModel }); + + if (Object.keys(streamingUsage).length === 0) return; + + const duration = Date.now() - startTime; + const normalized = normalizeUsage(streamingUsage); + if (!normalized) return; + if (typeof onUsage === 'function') { + try { + onUsage(normalized, streamingModel || 'unknown'); + } catch { + // best-effort callback + } + } + + if (metricsRef) { + metricsRef.increment('input_tokens_total', { provider }, normalized.input_tokens); + metricsRef.increment('output_tokens_total', { provider }, normalized.output_tokens); + } + + const record = { + _schema: TOKEN_USAGE_SCHEMA, + timestamp: new Date().toISOString(), + request_id: requestId, + provider, + model: streamingModel || 'unknown', + path: reqPath, + status: 101, + streaming: true, + input_tokens: normalized.input_tokens, + output_tokens: normalized.output_tokens, + cache_read_tokens: normalized.cache_read_tokens, + cache_write_tokens: normalized.cache_write_tokens, + duration_ms: duration, + response_bytes: totalBytes - headerBytes, + }; + + writeTokenUsage(record); + + logRequest('info', 'token_usage', { + request_id: requestId, + provider, + model: streamingModel || 'unknown', + input_tokens: normalized.input_tokens, + output_tokens: normalized.output_tokens, + cache_read_tokens: normalized.cache_read_tokens, + cache_write_tokens: normalized.cache_write_tokens, + streaming: true, + transport: 'websocket', + }); + } + + upstreamSocket.on('close', doFinalize); + upstreamSocket.on('end', doFinalize); +} + +module.exports = { parseWebSocketFrames, trackWebSocketTokenUsage }; diff --git a/containers/api-proxy/token-tracker.js b/containers/api-proxy/token-tracker.js index a00e9cba..859f13d5 100644 --- a/containers/api-proxy/token-tracker.js +++ b/containers/api-proxy/token-tracker.js @@ -1,902 +1,31 @@ /** - * Token usage tracking for AWF API Proxy. + * Token usage tracking for AWF API Proxy — re-export facade. * - * Intercepts LLM API responses (both streaming SSE and non-streaming JSON) - * to extract token usage data without adding latency to the client. - * - * Architecture: - * proxyRes (LLM response) → res (client) - * ├─ on('data'): buffer/inspect chunks for usage extraction - * └─ on('end'): finalize parsing → log to file + metrics - * - * For non-streaming responses: buffer the JSON body (up to MAX_BUFFER_SIZE), - * then parse it on 'end' to extract usage fields. - * For streaming (SSE) responses: scan each chunk for usage events as they - * are received, accumulate usage from message_start / message_delta / final - * data events, and log the aggregated result on 'end'. - * - * Zero external dependencies — uses Node.js built-in streams and fs. + * Maintains the original public API while delegating to focused modules: + * - token-persistence.js : log stream lifecycle, record validation, disk writes + * - token-parsers.js : pure SSE/JSON parsing and usage normalization + * - token-tracker-http.js : HTTP response tracker (trackTokenUsage) + * - token-tracker-ws.js : WebSocket tracker (parseWebSocketFrames, trackWebSocketTokenUsage) */ 'use strict'; -const fs = require('fs'); -const path = require('path'); -const zlib = require('zlib'); -const { logRequest } = require('./logging'); - -// Max response body to buffer for non-streaming usage extraction (5 MB). -// Responses larger than this are still forwarded but usage is not extracted. -const MAX_BUFFER_SIZE = 5 * 1024 * 1024; - -// Token usage log file path (inside the mounted log volume) -const TOKEN_LOG_DIR = process.env.AWF_TOKEN_LOG_DIR || '/var/log/api-proxy'; -const TOKEN_LOG_FILE = path.join(TOKEN_LOG_DIR, 'token-usage.jsonl'); -const DIAG_LOG_FILE = path.join(TOKEN_LOG_DIR, 'token-diag.log'); -const DIAG_ENABLED = process.env.AWF_DEBUG_TOKENS === '1'; - -// AWF version used to identify schema version in JSONL records. -// Set to the container image version at build time via ARG AWF_VERSION in the Dockerfile -// (baked in by the release workflow with --build-arg AWF_VERSION=). -// Falls back to "0.0.0-dev" for local/un-versioned builds. -const AWF_VERSION = process.env.AWF_VERSION; -if (!AWF_VERSION) { - // Log a warning (to stderr to avoid polluting stdout) when running without the env var. - // This can happen during local development or tests outside the container. - process.stderr.write('{"level":"warn","event":"awf_version_missing","message":"AWF_VERSION env var not set; _schema will use 0.0.0-dev"}\n'); -} -const TOKEN_USAGE_SCHEMA = `token-usage/v${AWF_VERSION || '0.0.0-dev'}`; - -let logStream = null; -let diagStream = null; - -/** - * Write a diagnostic line to the diagnostics log file. - * Only active when AWF_DEBUG_TOKENS=1 environment variable is set. - * Data is sanitized to prevent writing raw network content to disk. - */ -function diag(msg, data) { - if (!DIAG_ENABLED) return; - try { - if (!diagStream) { - fs.mkdirSync(TOKEN_LOG_DIR, { recursive: true }); - diagStream = fs.createWriteStream(DIAG_LOG_FILE, { flags: 'a' }); - diagStream.on('error', () => { diagStream = null; }); - } - // Sanitize: only log known safe fields, omit raw response data - let safeData = data; - if (data && typeof data === 'object') { - const { raw_sample, ...rest } = data; - safeData = rest; - } - const line = `${new Date().toISOString()} ${msg}` + - (safeData ? ' ' + JSON.stringify(safeData) : '') + '\n'; - diagStream.write(line); - } catch { /* best-effort */ } -} - -/** - * Get or create the JSONL append stream for token usage logs. - * Uses a lazy singleton — created on first write. - */ -function getLogStream() { - if (logStream) return logStream; - try { - // Ensure directory exists - fs.mkdirSync(TOKEN_LOG_DIR, { recursive: true }); - logStream = fs.createWriteStream(TOKEN_LOG_FILE, { flags: 'a' }); - logStream.on('error', (err) => { - logRequest('warn', 'token_log_error', { error: err.message }); - logStream = null; - }); - return logStream; - } catch (err) { - logRequest('warn', 'token_log_init_error', { error: err.message }); - return null; - } -} - -/** - * Validate a token usage record against the token-usage schema contract. - * - * Checks that all required fields are present and have the expected types. - * Logs a warning and returns false if the record is non-conformant; does - * not throw, so a bad record is dropped rather than crashing the proxy. - * - * @param {object} record - The record to validate - * @returns {boolean} true if the record is valid, false otherwise - */ -function validateTokenUsageRecord(record) { - if (!record || typeof record !== 'object') { - logRequest('warn', 'token_record_schema_violation', { - field: 'record', - expected: 'object', - actual: record === null ? 'null' : typeof record, - }); - return false; - } - - const required = [ - ['_schema', 'string'], - ['timestamp', 'string'], - ['request_id', 'string'], - ['provider', 'string'], - ['model', 'string'], - ['path', 'string'], - ['status', 'number'], - ['streaming', 'boolean'], - ['input_tokens', 'number'], - ['output_tokens', 'number'], - ['cache_read_tokens', 'number'], - ['cache_write_tokens', 'number'], - ['duration_ms', 'number'], - ]; - - for (const [field, expectedType] of required) { - // eslint-disable-next-line valid-typeof - if (typeof record[field] !== expectedType) { - logRequest('warn', 'token_record_schema_violation', { - request_id: record.request_id, - field, - expected: expectedType, - actual: typeof record[field], - }); - return false; - } - } - - if (!/^token-usage\/v\d+\.\d+\.\d+(-\w+)?$/.test(record._schema)) { - logRequest('warn', 'token_record_schema_violation', { - request_id: record.request_id, - field: '_schema', - expected: 'token-usage/v', - actual: record._schema, - }); - return false; - } - - return true; -} - -/** - * Write a token usage record to the JSONL log file. - * Validates the record against the token-usage schema before writing. - * Handles backpressure by dropping writes when the stream buffer is full. - */ -function writeTokenUsage(record) { - if (!validateTokenUsageRecord(record)) return; - - const stream = getLogStream(); - if (stream && !stream.writableEnded) { - const ok = stream.write(JSON.stringify(record) + '\n'); - if (!ok) { - // Backpressure — stream buffer full. Drop this write rather than - // accumulating unbounded memory. The 'drain' event will unblock - // future writes naturally. - logRequest('warn', 'token_log_backpressure', { request_id: record.request_id }); - } - } -} - -/** - * Check if a response is SSE (Server-Sent Events) streaming. - */ -function isStreamingResponse(headers) { - const ct = headers['content-type'] || ''; - return ct.includes('text/event-stream'); -} - -/** - * Check if a response is gzip or deflate compressed. - */ -function isCompressedResponse(headers) { - const ce = (headers['content-encoding'] || '').toLowerCase(); - return ce === 'gzip' || ce === 'deflate' || ce === 'br'; -} - -/** - * Create a decompression transform stream based on content-encoding. - * Returns null if the encoding is not supported. - */ -function createDecompressor(headers) { - const ce = (headers['content-encoding'] || '').toLowerCase(); - if (ce === 'gzip') return zlib.createGunzip(); - if (ce === 'deflate') return zlib.createInflate(); - if (ce === 'br') return zlib.createBrotliDecompress(); - return null; -} - -/** - * Extract reasoning token count from provider usage payloads. - * - * Supports explicit `reasoning_tokens` and provider-specific nested fields. - * Priority order: top-level → completion_tokens_details → output_tokens_details. - */ -function extractReasoningTokens(usage) { - if (!usage || typeof usage !== 'object') return undefined; - if (typeof usage.reasoning_tokens === 'number') return usage.reasoning_tokens; - if (usage.completion_tokens_details && typeof usage.completion_tokens_details.reasoning_tokens === 'number') { - return usage.completion_tokens_details.reasoning_tokens; - } - if (usage.output_tokens_details && typeof usage.output_tokens_details.reasoning_tokens === 'number') { - return usage.output_tokens_details.reasoning_tokens; - } - return undefined; -} - -/** - * Extract token usage from a non-streaming JSON response body. - * - * Supports: - * - OpenAI/Copilot: { usage: { prompt_tokens, completion_tokens, total_tokens, prompt_tokens_details: { cached_tokens } } } - * - Anthropic: { usage: { input_tokens, output_tokens, cache_creation_input_tokens, cache_read_input_tokens } } - * - * Also extracts the model field if present. - * - * @param {Buffer} body - Response body - * @returns {{ usage: object|null, model: string|null }} - */ -function extractUsageFromJson(body) { - try { - const text = body.toString('utf8'); - const json = JSON.parse(text); - const usageSource = (json.usage && typeof json.usage === 'object') - ? json.usage - : ((json.response && json.response.usage && typeof json.response.usage === 'object') - ? json.response.usage - : null); - const result = { usage: null, model: json.model || (json.response && json.response.model) || null }; - - if (usageSource) { - const usage = {}; - let hasField = false; - // Anthropic fields - if (typeof usageSource.input_tokens === 'number') { - usage.input_tokens = usageSource.input_tokens; - hasField = true; - } - if (typeof usageSource.output_tokens === 'number') { - usage.output_tokens = usageSource.output_tokens; - hasField = true; - } - if (typeof usageSource.cache_creation_input_tokens === 'number') { - usage.cache_creation_input_tokens = usageSource.cache_creation_input_tokens; - hasField = true; - } - if (typeof usageSource.cache_read_input_tokens === 'number') { - usage.cache_read_input_tokens = usageSource.cache_read_input_tokens; - hasField = true; - } - // OpenAI/Copilot fields - if (typeof usageSource.prompt_tokens === 'number') { - usage.prompt_tokens = usageSource.prompt_tokens; - hasField = true; - } - if (typeof usageSource.completion_tokens === 'number') { - usage.completion_tokens = usageSource.completion_tokens; - hasField = true; - } - if (typeof usageSource.total_tokens === 'number') { - usage.total_tokens = usageSource.total_tokens; - hasField = true; - } - const reasoningTokens = extractReasoningTokens(usageSource); - if (typeof reasoningTokens === 'number') { - usage.reasoning_tokens = reasoningTokens; - hasField = true; - } - // OpenAI/Copilot nested cache fields (prompt_tokens_details.cached_tokens) - if (usageSource.prompt_tokens_details && typeof usageSource.prompt_tokens_details.cached_tokens === 'number') { - usage.cache_read_input_tokens = usageSource.prompt_tokens_details.cached_tokens; - hasField = true; - } - if (hasField) { - result.usage = usage; - } - } - - return result; - } catch { - return { usage: null, model: null }; - } -} - -/** - * Extract token usage from a single SSE data line. - * - * SSE format: "data: {json}\n\n" - * - * Anthropic streaming events with usage: - * - message_start: { type: "message_start", message: { usage: { input_tokens, cache_creation_input_tokens, cache_read_input_tokens } } } - * - message_delta: { type: "message_delta", usage: { output_tokens } } - * - * OpenAI/Copilot streaming events with usage: - * - Final chunk: { usage: { prompt_tokens, completion_tokens, total_tokens, prompt_tokens_details: { cached_tokens } } } - * - * @param {string} line - A single SSE data line (without "data: " prefix) - * @returns {{ usage: object|null, model: string|null }} - */ -function extractUsageFromSseLine(line) { - if (!line || line === '[DONE]') return { usage: null, model: null }; - - try { - const json = JSON.parse(line); - const result = { usage: null, model: json.model || null }; - - // Anthropic message_start: usage is inside message object - if (json.type === 'message_start' && json.message && json.message.usage) { - result.usage = {}; - const u = json.message.usage; - if (typeof u.input_tokens === 'number') result.usage.input_tokens = u.input_tokens; - if (typeof u.cache_creation_input_tokens === 'number') result.usage.cache_creation_input_tokens = u.cache_creation_input_tokens; - if (typeof u.cache_read_input_tokens === 'number') result.usage.cache_read_input_tokens = u.cache_read_input_tokens; - result.model = (json.message && json.message.model) || result.model; - return result; - } - - // Anthropic message_delta: usage at top level - if (json.type === 'message_delta' && json.usage) { - result.usage = {}; - if (typeof json.usage.output_tokens === 'number') result.usage.output_tokens = json.usage.output_tokens; - return result; - } - - // OpenAI Responses API: usage in response object on completion events - if ((json.type === 'response.completed' || json.type === 'response.done') - && json.response && json.response.usage && typeof json.response.usage === 'object') { - const u = json.response.usage; - result.usage = {}; - if (typeof u.input_tokens === 'number') result.usage.input_tokens = u.input_tokens; - if (typeof u.output_tokens === 'number') result.usage.output_tokens = u.output_tokens; - if (typeof u.total_tokens === 'number') result.usage.total_tokens = u.total_tokens; - const reasoningTokens = extractReasoningTokens(u); - if (typeof reasoningTokens === 'number') result.usage.reasoning_tokens = reasoningTokens; - if (u.prompt_tokens_details && typeof u.prompt_tokens_details.cached_tokens === 'number') { - result.usage.cache_read_input_tokens = u.prompt_tokens_details.cached_tokens; - } - result.model = json.response.model || result.model; - return result; - } - - // OpenAI/Copilot: usage at top level in final chunk - if (json.usage && typeof json.usage === 'object') { - result.usage = {}; - if (typeof json.usage.prompt_tokens === 'number') result.usage.prompt_tokens = json.usage.prompt_tokens; - if (typeof json.usage.completion_tokens === 'number') result.usage.completion_tokens = json.usage.completion_tokens; - if (typeof json.usage.total_tokens === 'number') result.usage.total_tokens = json.usage.total_tokens; - const reasoningTokens = extractReasoningTokens(json.usage); - if (typeof reasoningTokens === 'number') { - result.usage.reasoning_tokens = reasoningTokens; - } - // OpenAI/Copilot nested cache fields (prompt_tokens_details.cached_tokens) - if (json.usage.prompt_tokens_details && typeof json.usage.prompt_tokens_details.cached_tokens === 'number') { - result.usage.cache_read_input_tokens = json.usage.prompt_tokens_details.cached_tokens; - } - return result; - } - - return result; - } catch { - return { usage: null, model: null }; - } -} - -/** - * Extract all SSE data lines from a text chunk. - * Lines are prefixed with "data: " in the SSE protocol. - */ -function parseSseDataLines(text) { - const lines = []; - const parts = text.split('\n'); - for (const part of parts) { - const trimmed = part.trim(); - if (trimmed.startsWith('data: ')) { - lines.push(trimmed.slice(6)); - } else if (trimmed === 'data:') { - // empty data line - } - } - return lines; -} - -/** - * Normalize extracted usage into a unified format. - * - * Output fields: - * - input_tokens: number (from Anthropic input_tokens or OpenAI prompt_tokens) - * - output_tokens: number (from Anthropic output_tokens or OpenAI completion_tokens) - * - cache_read_tokens: number (from Anthropic cache_read_input_tokens or OpenAI prompt_tokens_details.cached_tokens) - * - cache_write_tokens: number (Anthropic cache_creation_input_tokens; not available in OpenAI format) - */ -function normalizeUsage(usage) { - if (!usage) return null; - - return { - input_tokens: usage.input_tokens ?? usage.prompt_tokens ?? 0, - output_tokens: usage.output_tokens ?? usage.completion_tokens ?? 0, - cache_read_tokens: usage.cache_read_input_tokens ?? 0, - cache_write_tokens: usage.cache_creation_input_tokens ?? 0, - reasoning_tokens: usage.reasoning_tokens ?? 0, - }; -} - -/** - * Attach token usage tracking to an upstream response. - * - * This function listens on the proxyRes 'data' and 'end' events to extract - * token usage. It does NOT modify the response stream — the caller still - * does proxyRes.pipe(res) as before. - * - * If the response is gzip/deflate compressed (common with Anthropic API), - * we decompress a copy of the data for parsing while the compressed bytes - * still flow to the client unchanged. - * - * @param {http.IncomingMessage} proxyRes - Upstream response - * @param {object} opts - * @param {string} opts.requestId - Request ID for correlation - * @param {string} opts.provider - Provider name (openai, anthropic, copilot, opencode) - * @param {string} opts.path - Request path - * @param {number} opts.startTime - Request start time (Date.now()) - * @param {object} opts.metrics - Metrics module reference - * @param {object|null} opts.billingInfo - Extracted billing/quota headers from response - * @param {string|null} opts.initiatorSent - X-Initiator value sent on the request - * @param {(normalizedUsage: object, model: string|null) => void} [opts.onUsage] - Optional callback invoked after normalized usage is extracted - */ -function trackTokenUsage(proxyRes, opts) { - const { requestId, provider, path: reqPath, startTime, metrics: metricsRef, billingInfo, initiatorSent, onUsage } = opts; - const streaming = isStreamingResponse(proxyRes.headers); - const contentType = proxyRes.headers['content-type'] || '(none)'; - const contentEncoding = proxyRes.headers['content-encoding'] || '(none)'; - const compressed = isCompressedResponse(proxyRes.headers); - - logRequest('debug', 'token_track_start', { - request_id: requestId, - provider, - path: reqPath, - streaming, - content_type: contentType, - content_encoding: contentEncoding, - status: proxyRes.statusCode, - }); - diag('HTTP_TRACK_START', { request_id: requestId, provider, path: reqPath, streaming, content_type: contentType, content_encoding: contentEncoding, status: proxyRes.statusCode }); - - // Accumulate response body for usage extraction - const chunks = []; - let totalBytes = 0; - let overflow = false; - - // For streaming: accumulate usage across SSE events - let streamingUsage = {}; - let streamingModel = null; - let partialLine = ''; - - // If the response is compressed, create a decompressor. - // We feed raw chunks into it and listen on the decompressed output. - // The raw proxyRes still flows to the client unchanged via pipe(). - let decompressor = null; - if (compressed) { - decompressor = createDecompressor(proxyRes.headers); - if (decompressor) { - decompressor.on('error', (err) => { - diag('DECOMPRESS_ERROR', { request_id: requestId, error: err.message }); - }); - } - } - - // The source for text parsing: decompressor output (if compressed) or raw chunks - function handleDecodedChunk(text) { - if (streaming) { - const combined = partialLine + text; - const lastNewline = combined.lastIndexOf('\n'); - if (lastNewline >= 0) { - const complete = combined.slice(0, lastNewline); - partialLine = combined.slice(lastNewline + 1); - - const dataLines = parseSseDataLines(complete); - for (const line of dataLines) { - const { usage, model } = extractUsageFromSseLine(line); - if (model && !streamingModel) streamingModel = model; - if (usage) { - for (const [k, v] of Object.entries(usage)) { - streamingUsage[k] = v; - } - } - } - } else { - partialLine = combined; - } - } else if (!overflow) { - chunks.push(Buffer.from(text, 'utf8')); - } - } - - if (decompressor) { - // Feed decompressed text to our parser - decompressor.on('data', (decompressedChunk) => { - handleDecodedChunk(decompressedChunk.toString('utf8')); - }); - - // Feed raw compressed bytes into the decompressor - proxyRes.on('data', (chunk) => { - totalBytes += chunk.length; - try { decompressor.write(chunk); } catch { /* ignore write errors */ } - }); - - proxyRes.on('end', () => { - try { decompressor.end(); } catch { /* ignore */ } - }); - - // Finalize on decompressor end - decompressor.on('end', () => { - finalizeTracking(); - }); - } else { - // No compression — parse raw chunks directly - proxyRes.on('data', (chunk) => { - totalBytes += chunk.length; - handleDecodedChunk(chunk.toString('utf8')); - }); - - proxyRes.on('end', () => { - finalizeTracking(); - }); - } - - function finalizeTracking() { - // Only process successful responses (2xx) - if (proxyRes.statusCode < 200 || proxyRes.statusCode >= 300) { - logRequest('debug', 'token_track_skip_status', { - request_id: requestId, - provider, - status: proxyRes.statusCode, - }); - diag('HTTP_TRACK_SKIP_STATUS', { request_id: requestId, provider, status: proxyRes.statusCode }); - return; - } - - const duration = Date.now() - startTime; - let usage = null; - let model = null; - - if (streaming) { - // Process any remaining partial line - if (partialLine.trim()) { - const dataLines = parseSseDataLines(partialLine); - for (const line of dataLines) { - const { usage: u, model: m } = extractUsageFromSseLine(line); - if (m && !streamingModel) streamingModel = m; - if (u) { - for (const [k, v] of Object.entries(u)) { - streamingUsage[k] = v; - } - } - } - } - - if (Object.keys(streamingUsage).length > 0) { - usage = streamingUsage; - model = streamingModel; - } - } else if (!overflow && chunks.length > 0) { - const body = Buffer.concat(chunks); - const result = extractUsageFromJson(body); - usage = result.usage; - model = result.model; - } - - logRequest('debug', 'token_track_end', { - request_id: requestId, - provider, - streaming, - total_bytes: totalBytes, - overflow, - has_usage: !!usage, - usage_keys: usage ? Object.keys(usage) : [], - model, - compressed, - }); - diag('HTTP_TRACK_END', { request_id: requestId, provider, streaming, total_bytes: totalBytes, overflow, has_usage: !!usage, usage_keys: usage ? Object.keys(usage) : [], model, compressed, content_encoding: contentEncoding }); - - const normalized = normalizeUsage(usage); - if (!normalized) return; - if (typeof onUsage === 'function') { - try { - onUsage(normalized, model || 'unknown'); - } catch { - // best-effort callback - } - } - - // Update metrics - if (metricsRef) { - metricsRef.increment('input_tokens_total', { provider }, normalized.input_tokens); - metricsRef.increment('output_tokens_total', { provider }, normalized.output_tokens); - } - - // Build log record - const record = { - _schema: TOKEN_USAGE_SCHEMA, - timestamp: new Date().toISOString(), - request_id: requestId, - provider, - model: model || 'unknown', - path: reqPath, - status: proxyRes.statusCode, - streaming, - input_tokens: normalized.input_tokens, - output_tokens: normalized.output_tokens, - cache_read_tokens: normalized.cache_read_tokens, - cache_write_tokens: normalized.cache_write_tokens, - duration_ms: duration, - response_bytes: totalBytes, - }; - - // Include billing/quota info when available (Copilot PRU tracking) - if (initiatorSent) record.x_initiator = initiatorSent; - if (billingInfo) record.billing = billingInfo; - - // Write to JSONL log file - writeTokenUsage(record); - - // Log summary to stdout - logRequest('info', 'token_usage', { - request_id: requestId, - provider, - model: model || 'unknown', - input_tokens: normalized.input_tokens, - output_tokens: normalized.output_tokens, - cache_read_tokens: normalized.cache_read_tokens, - cache_write_tokens: normalized.cache_write_tokens, - streaming, - }); - } -} - -/** - * Parse WebSocket frames from a buffer (server→client direction, unmasked). - * - * Returns an object with: - * - messages: Array of decoded text frame payloads (strings) - * - consumed: Number of bytes consumed from the buffer - * - * Only handles non-fragmented text frames (FIN=1, opcode=1). - * Other frame types (binary, ping, pong, close, continuation) are consumed - * but their payloads are not returned. - * - * @param {Buffer} buf - Buffer containing WebSocket frame data - * @returns {{ messages: string[], consumed: number }} - */ -function parseWebSocketFrames(buf) { - const messages = []; - let pos = 0; - - while (pos + 2 <= buf.length) { - const firstByte = buf[pos]; - const secondByte = buf[pos + 1]; - const fin = (firstByte & 0x80) !== 0; - const opcode = firstByte & 0x0F; - const masked = (secondByte & 0x80) !== 0; - let payloadLength = secondByte & 0x7F; - let headerSize = 2; - - if (payloadLength === 126) { - if (pos + 4 > buf.length) break; - payloadLength = buf.readUInt16BE(pos + 2); - headerSize = 4; - } else if (payloadLength === 127) { - if (pos + 10 > buf.length) break; - payloadLength = Number(buf.readBigUInt64BE(pos + 2)); - headerSize = 10; - } - - if (masked) { - if (pos + headerSize + 4 > buf.length) break; - headerSize += 4; // masking key length - } - - const frameEnd = pos + headerSize + payloadLength; - if (frameEnd > buf.length) break; - - // Extract text frames (opcode 1) with FIN set - if (opcode === 1 && fin) { - const payloadStart = pos + headerSize; - if (masked) { - const maskKeyStart = payloadStart - 4; - const maskingKey = buf.slice(maskKeyStart, maskKeyStart + 4); - const maskedPayload = buf.slice(payloadStart, frameEnd); - const unmasked = Buffer.allocUnsafe(payloadLength); - for (let i = 0; i < payloadLength; i++) { - unmasked[i] = maskedPayload[i] ^ maskingKey[i % 4]; - } - messages.push(unmasked.toString('utf8')); - } else { - messages.push(buf.slice(payloadStart, frameEnd).toString('utf8')); - } - } - - pos = frameEnd; - } - - return { messages, consumed: pos }; -} - -/** - * Attach token usage tracking to a WebSocket upstream connection. - * - * Claude Code CLI uses WebSocket streaming to the Anthropic API. The - * api-proxy relays this as a raw socket pipe (tlsSocket ↔ clientSocket). - * This function adds a non-blocking 'data' listener on the upstream socket - * to parse WebSocket frames and extract token usage from JSON text messages. - * - * The upstream stream starts with an HTTP 101 response header, followed by - * WebSocket frames. This function skips the HTTP header before parsing frames. - * - * @param {import('tls').TLSSocket} upstreamSocket - Upstream TLS socket - * @param {object} opts - * @param {string} opts.requestId - Request ID for correlation - * @param {string} opts.provider - Provider name (anthropic, copilot, etc.) - * @param {string} opts.path - Request path - * @param {number} opts.startTime - Request start time (Date.now()) - * @param {object} opts.metrics - Metrics module reference - * @param {(normalizedUsage: object, model: string|null) => void} [opts.onUsage] - Optional callback invoked after normalized usage is extracted - */ -function trackWebSocketTokenUsage(upstreamSocket, opts) { - const { requestId, provider, path: reqPath, startTime, metrics: metricsRef, onUsage } = opts; - - logRequest('debug', 'ws_token_track_start', { - request_id: requestId, - provider, - path: reqPath, - }); - diag('WS_TRACK_START', { request_id: requestId, provider, path: reqPath }); - - let httpHeaderParsed = false; - let buffer = Buffer.alloc(0); - let totalBytes = 0; - let headerBytes = 0; - let streamingUsage = {}; - let streamingModel = null; - let finalized = false; - let frameCount = 0; - let textMessageCount = 0; - - // Max buffer to prevent unbounded memory growth (1 MB) - const MAX_WS_BUFFER = 1 * 1024 * 1024; - - upstreamSocket.on('data', (chunk) => { - totalBytes += chunk.length; - buffer = Buffer.concat([buffer, chunk]); - - // Safety: drop buffer if it grows too large (malformed frames) - if (buffer.length > MAX_WS_BUFFER) { - buffer = Buffer.alloc(0); - httpHeaderParsed = true; // skip header parsing - return; - } - - // Skip the HTTP 101 Switching Protocols response header - if (!httpHeaderParsed) { - const headerEnd = buffer.indexOf('\r\n\r\n'); - if (headerEnd === -1) return; // need more data for full header - headerBytes = headerEnd + 4; - buffer = buffer.slice(headerBytes); - httpHeaderParsed = true; - } - - // Parse any complete WebSocket frames - const { messages, consumed } = parseWebSocketFrames(buffer); - if (consumed > 0) { - buffer = buffer.slice(consumed); - } - frameCount += messages.length; - - for (const text of messages) { - textMessageCount++; - const { usage, model } = extractUsageFromSseLine(text); - if (model && !streamingModel) streamingModel = model; - if (usage) { - logRequest('debug', 'ws_token_usage_found', { - request_id: requestId, - provider, - usage_keys: Object.keys(usage), - model, - }); - for (const [k, v] of Object.entries(usage)) { - streamingUsage[k] = v; - } - } - } - }); - - function doFinalize() { - if (finalized) return; - finalized = true; - - logRequest('debug', 'ws_token_track_end', { - request_id: requestId, - provider, - total_bytes: totalBytes, - frame_count: frameCount, - text_message_count: textMessageCount, - has_usage: Object.keys(streamingUsage).length > 0, - usage_keys: Object.keys(streamingUsage), - model: streamingModel, - }); - diag('WS_TRACK_END', { request_id: requestId, provider, total_bytes: totalBytes, frame_count: frameCount, text_message_count: textMessageCount, has_usage: Object.keys(streamingUsage).length > 0, usage_keys: Object.keys(streamingUsage), model: streamingModel }); - - if (Object.keys(streamingUsage).length === 0) return; - - const duration = Date.now() - startTime; - const normalized = normalizeUsage(streamingUsage); - if (!normalized) return; - if (typeof onUsage === 'function') { - try { - onUsage(normalized, streamingModel || 'unknown'); - } catch { - // best-effort callback - } - } - - if (metricsRef) { - metricsRef.increment('input_tokens_total', { provider }, normalized.input_tokens); - metricsRef.increment('output_tokens_total', { provider }, normalized.output_tokens); - } - - const record = { - _schema: TOKEN_USAGE_SCHEMA, - timestamp: new Date().toISOString(), - request_id: requestId, - provider, - model: streamingModel || 'unknown', - path: reqPath, - status: 101, - streaming: true, - input_tokens: normalized.input_tokens, - output_tokens: normalized.output_tokens, - cache_read_tokens: normalized.cache_read_tokens, - cache_write_tokens: normalized.cache_write_tokens, - duration_ms: duration, - response_bytes: totalBytes - headerBytes, - }; - - writeTokenUsage(record); - - logRequest('info', 'token_usage', { - request_id: requestId, - provider, - model: streamingModel || 'unknown', - input_tokens: normalized.input_tokens, - output_tokens: normalized.output_tokens, - cache_read_tokens: normalized.cache_read_tokens, - cache_write_tokens: normalized.cache_write_tokens, - streaming: true, - transport: 'websocket', - }); - } - - upstreamSocket.on('close', doFinalize); - upstreamSocket.on('end', doFinalize); -} - -/** - * Close the log stream (for graceful shutdown). - * Returns a Promise that resolves once the stream has flushed. - */ -function closeLogStream() { - return new Promise((resolve) => { - let pending = 0; - const check = () => { if (pending === 0) resolve(); }; - if (logStream) { - pending++; - logStream.end(() => { logStream = null; pending--; check(); }); - } - if (diagStream) { - pending++; - diagStream.end(() => { diagStream = null; pending--; check(); }); - } - if (pending === 0) resolve(); - }); -} +const { trackTokenUsage } = require('./token-tracker-http'); +const { parseWebSocketFrames, trackWebSocketTokenUsage } = require('./token-tracker-ws'); +const { + closeLogStream, + validateTokenUsageRecord, + writeTokenUsage, + TOKEN_LOG_FILE, +} = require('./token-persistence'); +const { + extractUsageFromJson, + extractUsageFromSseLine, + parseSseDataLines, + normalizeUsage, + isStreamingResponse, + isCompressedResponse, +} = require('./token-parsers'); module.exports = { trackTokenUsage,