diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000..682aa28 Binary files /dev/null and b/.DS_Store differ diff --git a/applications/.DS_Store b/applications/.DS_Store new file mode 100644 index 0000000..579a37f Binary files /dev/null and b/applications/.DS_Store differ diff --git a/applications/kong-ai-plugin/.DS_Store b/applications/kong-ai-plugin/.DS_Store new file mode 100644 index 0000000..22528fa Binary files /dev/null and b/applications/kong-ai-plugin/.DS_Store differ diff --git a/applications/kong-ai-plugin/Readme.md b/applications/kong-ai-plugin/Readme.md new file mode 100644 index 0000000..9d60ac6 --- /dev/null +++ b/applications/kong-ai-plugin/Readme.md @@ -0,0 +1,387 @@ +# Kong AI Tracing Plugin + +A high-performance **Kong Gateway plugin** for comprehensive AI observability and distributed tracing. Automatically captures, enriches, and exports AI request/response data to [Langfuse](https://langfuse.com) for full-stack LLM monitoring. + +--- + +## 🚀 Features + +### 🔍 Auto-Detection & Compatibility +- **OpenAI Compatible APIs**: `/v1/chat/completions`, `/v1/completions`, `/v1/embeddings` +- **vLLM Endpoints**: `/generate`, `/v1/completions` +- **Custom AI Providers**: Extensible detection framework + +### 📊 Comprehensive Telemetry +- **User & Session Context**: `user_id`, `session_id`, `chat_id`, `organization_id` +- **Performance Metrics**: Latency, throughput, token-level timing +- **Token Analytics**: Prompt tokens, completion tokens, total usage +- **Content Tracing**: Full conversation history, input/output content +- **Generation Parameters**: Temperature, max_tokens, top_p, frequency_penalty + +### ⚡ Production Ready +- **Non-Blocking Architecture**: Async timers for Langfuse export +- **Error Resilience**: Graceful degradation on external service failures +- **High Performance**: Minimal overhead on request processing +- **Structured Logging**: Kong-native logging with JSON formatting + +--- + +## 🛠 Installation + +### Method 1: Docker Deployment (Recommended) + +```yaml +# docker-compose.yml +version: '3.8' + +services: + kong: + image: kong:3.4 + environment: + KONG_PLUGINS: bundled,ai-tracing + KONG_LUA_PACKAGE_PATH: /usr/local/kong/plugins/?.lua;; + KONG_DATABASE: postgres + KONG_PG_HOST: postgres + KONG_PG_USER: kong + KONG_PG_PASSWORD: kong + volumes: + - ./plugins/ai-tracing:/usr/local/kong/plugins/ai-tracing + ports: + - "8000:8000" + - "8001:8001" +``` + +### Method 2: Traditional Installation + +```bash +# Install via LuaRocks +luarocks install kong-plugin-ai-tracing + +# Or build from source +git clone https://github.com/your-org/kong-ai-tracing.git +cd kong-ai-tracing +luarocks make rockspec/kong-plugin-ai-tracing-1.0.0-1.rockspec +``` + +### Enable Plugin + +```bash +# Add to kong.conf +plugins = bundled,ai-tracing + +# Or via environment variable +export KONG_PLUGINS=bundled,ai-tracing +``` + +--- + +## ⚙️ Configuration + +### Plugin Configuration via Admin API + +```bash +curl -X POST http://localhost:8001/services/your-ai-service/plugins \ + -H "Content-Type: application/json" \ + -d '{ + "name": "ai-tracing", + "config": { + "langfuse_enabled": true, + "langfuse_public_key": "pk-lf-xxxxxxxxxxxx", + "langfuse_secret_key": "sk-lf-xxxxxxxxxxxx", + "langfuse_endpoint": "https://cloud.langfuse.com/api/public/ingestion", + "langfuse_timeout": 5000, + "environment": "production", + "log_level": "info" + } + }' +``` + +### Configuration Reference + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `langfuse_enabled` | `boolean` | `false` | Enable/disable Langfuse integration | +| `langfuse_public_key` | `string` | Required | Langfuse public key | +| `langfuse_secret_key` | `string` | Required | Langfuse secret key | +| `langfuse_endpoint` | `string` | `https://cloud.langfuse.com/api/public/ingestion` | Langfuse API endpoint | +| `langfuse_timeout` | `number` | `5000` | HTTP timeout in milliseconds | +| `environment` | `string` | `production` | Deployment environment tag | +| `log_level` | `string` | `info` | Log level (`debug`, `info`, `warn`, `error`) | + +--- + +## 📡 Usage + +### Basic AI Request + +```bash +curl -X POST http://kong-gateway:8000/v1/chat/completions \ + -H "Content-Type: application/json" \ + -H "X-User-Id: user-12345" \ + -H "X-Session-Id: session-abcde" \ + -d '{ + "model": "gpt-4", + "messages": [ + {"role": "user", "content": "Explain quantum computing in simple terms"} + ], + "temperature": 0.7, + "max_tokens": 500 + }' +``` + +### Advanced Request with Metadata + +```bash +curl -X POST http://kong-gateway:8000/v1/chat/completions \ + -H "Content-Type: application/json" \ + -H "X-User-Id: user-12345" \ + -H "X-Chat-Id: chat-67890" \ + -H "X-Organization-Id: org-acme" \ + -d '{ + "model": "gpt-4", + "messages": [ + {"role": "system", + "content": "You are a helpful assistant." + } + ], + "temperature": 0.8, + "max_tokens": 100, + "stream": false, + "metadata": { + "user_id": "user-12345", + "chat_id": "chat-67890", + "session_id": "session-abcde", + "project_id": "project-xyz", + "features": { + "web_search": true, + "image_generation": false } , + "variables": { + "user_tier": "premium", + "language": "en"}, + }' + +``` + +### Supported Headers + +| Header | Description | Example | +|--------|-------------|---------| +| `X-User-Id` | Unique user identifier | `user-12345` | +| `X-Session-Id` | Session identifier | `session-abcde` | +| `X-Chat-Id` | Chat/conversation ID | `chat-67890` | +| `X-Message-Id` | Individual message ID | `msg-54321` | +| `X-Organization-Id` | Organization context | `org-acme` | +| `X-Project-Id` | Project identifier | `project-xyz` | + +--- + +## 📊 Data Model + +### Trace Structure in Langfuse + +```json +{ + "trace": { + "id": "trace-12345", + "name": "/v1/chat/completions", + "userId": "user-12345", + "sessionId": "session-abcde", + "metadata": { + "provider": "openai_compatible", + "model": "gpt-4", + "status_code": 200, + "total_duration_ms": 1250, + "time_per_token_ms": 12.5, + "throughput_tokens_per_second": 80.0 + } + }, + "observations": [ + { + "type": "input", + "name": "prompt", + "input": [...messages array...], + "metadata": { + "prompt_tokens": 150, + "temperature": 0.7, + "max_tokens": 500, + "message_count": 3 + } + }, + { + "type": "output", + "name": "completion", + "output": "The capital of France is Paris...", + "metadata": { + "completion_tokens": 25, + "finish_reason": "stop", + "total_tokens": 175, + "response_id": "chatcmpl-abc123" + } + } + ] +} +``` + +### Collected Metrics + +#### Performance Metrics +- **Total Duration**: End-to-end request processing time +- **Time per Token**: Average latency per generated token +- **Throughput**: Tokens processed per second +- **Time to First Token**: Stream response latency (when available) + +#### Token Analytics +- **Prompt Tokens**: Input token count +- **Completion Tokens**: Output token count +- **Total Tokens**: Sum of prompt and completion tokens +- **Token Efficiency**: Completion tokens per prompt token + +#### Content Metrics +- **Prompt Length**: Character count of input +- **Response Length**: Character count of output +- **Message Count**: Number of messages in conversation +- **Finish Reason**: Generation termination reason + +--- + +## 🔧 Advanced Configuration + +### Multiple Environment Setup + +```bash +# Development +curl -X POST http://localhost:8001/services/ai-service-dev/plugins \ + --data "name=ai-tracing" \ + --data "config.langfuse_enabled=true" \ + --data "config.langfuse_public_key=pk-lf-dev-xxx" \ + --data "config.langfuse_secret_key=sk-lf-dev-xxx" \ + --data "config.environment=development" + +# Production +curl -X POST http://localhost:8001/services/ai-service-prod/plugins \ + --data "name=ai-tracing" \ + --data "config.langfuse_enabled=true" \ + --data "config.langfuse_public_key=pk-lf-prod-xxx" \ + --data "config.langfuse_secret_key=sk-lf-prod-xxx" \ + --data "config.environment=production" +``` + +### Custom AI Provider Detection + +Extend the detection logic in `handler.lua`: + +```lua +local function detect_ai_provider(path, headers) + if path:find("/v1/chat/completions") then + return "openai_compatible" + elseif path:find("/anthropic") then + return "anthropic" + elseif path:find("/cohere") then + return "cohere" + else + return "custom_provider" + end +end +``` + +--- + +## 🐛 Troubleshooting + +### Enable Debug Logging + +```bash +curl -X PATCH http://localhost:8001/plugins/PLUGIN_ID \ + --data "config.log_level=debug" +``` + +### Common Issues + +#### No Traces in Langfuse +1. Verify Langfuse credentials are correct +2. Check Kong logs for API errors +3. Confirm Langfuse endpoint is accessible from Kong +4. Validate request matches AI provider patterns + +#### Missing User/Session Data +1. Ensure headers are properly set in requests +2. Check metadata field in request body +3. Verify header names match expected format + +#### Performance Impact +1. Monitor Kong metrics during load +2. Check async timer performance +3. Verify Langfuse timeout settings + +### Health Check Endpoint + +```bash +# Verify plugin is active +curl http://localhost:8001/services/ai-service/plugins | jq '.data[] | select(.name=="ai-tracing")' + +# Check Kong logs for plugin activity +docker-compose logs kong | grep "AI Tracing" +``` + +--- + +## 📈 Monitoring & Analytics + +### Key Performance Indicators + +- **Request Volume**: AI calls per minute +- **Average Latency**: End-to-end response time +- **Token Usage**: Cost and efficiency metrics +- **Error Rate**: Failed vs successful requests +- **User Engagement**: Active users and sessions + +### Langfuse Dashboard Setup + +1. **Traces View**: Filter by user, model, or time range +2. **Metrics Dashboard**: Token usage and latency trends +3. **User Analytics**: Usage patterns and behavior +4. **Cost Analysis**: Token consumption by project + +--- + +## 🔒 Security Considerations + +- **API Keys**: Store Langfuse credentials securely +- **Data Privacy**: Review exported data for PII compliance +- **Access Control**: Limit plugin configuration access +- **Network Security**: Ensure Kong-Langfuse communication is secure + +--- + +### Development Setup + +```bash +git clone https://github.com/Ramtinboreili/kong-ai-tracing.git +cd kong-ai-tracing + +# Test with local Kong instance +kong migrations bootstrap +kong start + +# Run tests +busted spec/ +``` + +--- + +## 📄 License + +Apache License © 2025 Ramtin Boreili + +--- + +## 🆘 Support + +- 📧 **Email**: ramtin.bor7hp@gmail.com +- 🐛 **Issues**: [GitHub Issues](https://github.com/Ramtinboreili/kong-ai-tracing/issues) + +--- + +**Maintained by Ramtin Boreili** +*DevOps Engineer & Observability Specialist* +[GitHub](https://github.com/Ramtinboreili) • [LinkedIn](https://linkedin.com/in/ramtinboreili) \ No newline at end of file diff --git a/applications/kong-ai-plugin/kong-plugin.yml b/applications/kong-ai-plugin/kong-plugin.yml new file mode 100644 index 0000000..c188b1e --- /dev/null +++ b/applications/kong-ai-plugin/kong-plugin.yml @@ -0,0 +1,14 @@ +name: kong-langfuse-tracing +description: > + A Kong plugin for AI observability that is Langfuse compatible . +version: 1.0.0 +priority: 1000 +source_url: https://github.com/Ramtinboreili/kong-langfuse-tracing +license: MIT +maintainers: + - name: Ramtin + email: ramtin.bor7hp@gmail.com +categories: + - analytics + - monitoring + - ai-observability diff --git a/applications/kong-ai-plugin/kong/plugins/ai-tracing/handler.lua b/applications/kong-ai-plugin/kong/plugins/ai-tracing/handler.lua new file mode 100644 index 0000000..4bae67b --- /dev/null +++ b/applications/kong-ai-plugin/kong/plugins/ai-tracing/handler.lua @@ -0,0 +1,788 @@ +local AiTracingHandler = {} + +AiTracingHandler.PRIORITY = 1000 +AiTracingHandler.VERSION = "1.0.0" + +-- Check if path should be ignored +local function should_ignore_path(path) + local ignore_patterns = { + "/v1/models", + "/health", + "/metrics", + "/status" + } + + for _, pattern in ipairs(ignore_patterns) do + if path:find(pattern, 1, true) then -- true for plain match + return true + end + end + + return false +end + +-- Extract user and session metadata from headers and body +local function extract_user_session_metadata(headers, request_body) + local cjson = require("cjson") + + -- Extract user info from multiple possible sources + local user_id = headers["x-user-id"] or + headers["user-id"] or + headers["x-user-email"] or + headers["user-email"] or + headers["x-auth-user"] or + headers["authorization-user"] or + "anonymous_" .. ngx.md5(ngx.var.remote_addr) + + -- Remove "anonymous_" prefix if we found actual user data + if user_id ~= "anonymous_" .. ngx.md5(ngx.var.remote_addr) then + -- It's a real user identifier + else + -- It's anonymous, use a shorter format + user_id = "anonymous" + end + + local metadata = { + user_id = user_id, + user_email = headers["x-user-email"] or headers["user-email"], + session_id = headers["x-session-id"] or headers["session-id"] or ngx.md5(ngx.var.remote_addr .. ngx.now()), + organization_id = headers["openai-organization"] or headers["x-organization-id"], + project_id = headers["x-project-id"] or headers["project-id"], + chat_id = nil, + message_id = nil, + filter_ids = {}, + tool_ids = nil, + tool_servers = {}, + files = nil, + variables = {}, + model_info = {}, + features = {}, + params = {}, + input = nil, + output = nil + } + + -- Parse request body for additional metadata + if request_body then + local ok, req_data = pcall(cjson.decode, request_body) + if ok and req_data then + -- Extract direct input/output if available + metadata.input = req_data.input or metadata.input + metadata.output = req_data.output or metadata.output + + -- Extract from metadata field + if req_data.metadata then + metadata.user_id = req_data.metadata.user_id or metadata.user_id + metadata.user_email = req_data.metadata.user_email or metadata.user_email + metadata.chat_id = req_data.metadata.chat_id + metadata.message_id = req_data.metadata.message_id + metadata.session_id = req_data.metadata.session_id or metadata.session_id + metadata.filter_ids = req_data.metadata.filter_ids or {} + metadata.tool_ids = req_data.metadata.tool_ids + metadata.tool_servers = req_data.metadata.tool_servers or {} + metadata.files = req_data.metadata.files + metadata.variables = req_data.metadata.variables or {} + metadata.model_info = req_data.metadata.model or {} + metadata.features = req_data.metadata.features or {} + metadata.params = req_data.metadata.params or {} + metadata.direct = req_data.metadata.direct or false + metadata.input = req_data.metadata.input or metadata.input + metadata.output = req_data.metadata.output or metadata.output + end + + -- Extract user info from request body + if req_data.user then + if type(req_data.user) == "string" then + metadata.user_id = req_data.user + elseif type(req_data.user) == "table" then + metadata.user_id = req_data.user.id or req_data.user.user_id or metadata.user_id + metadata.user_email = req_data.user.email or req_data.user.user_email or metadata.user_email + end + end + + -- Extract from features field + if req_data.features then + metadata.features = req_data.features + end + + -- Extract model capabilities if available + if metadata.model_info and metadata.model_info.info and metadata.model_info.info.meta then + metadata.capabilities = metadata.model_info.info.meta.capabilities or {} + end + + -- Extract all available fields from request body for comprehensive tracing + metadata.raw_request_body = req_data + end + end + + return metadata +end + +-- Detect AI provider +local function detect_ai_provider(path, headers) + if path:find("/v1/chat/completions") or path:find("/v1/completions") or path:find("/v1/embeddings") then + return "openai_compatible" + elseif path:find("/generate") or path:find("/v1/completions") then + return "vllm" + else + return "unknown" + end +end + +-- Convert messages to plain text for display +local function messages_to_plain_text(messages) + if not messages or type(messages) ~= "table" then + return "" + end + + local text = "" + for _, msg in ipairs(messages) do + if msg.role and msg.content then + text = text .. msg.role .. ": " .. msg.content .. "\n" + end + end + return text +end + +-- Extract complete AI metadata from request/response +local function extract_ai_metadata(request_body, response_body) + local cjson = require("cjson") + local metadata = { + -- Basic model info + model = "unknown", + stream = false, + + -- Content info + messages = {}, + input = "", + output = "", + prompt_length = 0, + response_length = 0, + + -- Token usage + usage = {}, + total_tokens = 0, + prompt_tokens = 0, + completion_tokens = 0, + + -- Response details + finish_reason = nil, + response_id = nil, + created = nil, + object = nil, + + -- Generation parameters + temperature = nil, + max_tokens = nil, + top_p = nil, + frequency_penalty = nil, + presence_penalty = nil, + + -- Features and capabilities + features = {}, + capabilities = {}, + + -- Raw data for comprehensive tracing + raw_request = nil, + raw_response = nil + } + + -- Parse request body + if request_body then + local ok, req_data = pcall(cjson.decode, request_body) + if ok and req_data then + -- Store raw request for comprehensive tracing + metadata.raw_request = req_data + + -- Basic model and stream info + metadata.model = req_data.model or "unknown" + metadata.stream = req_data.stream or false + + -- Extract messages + if req_data.messages and type(req_data.messages) == "table" then + metadata.messages = req_data.messages + + -- Build input text from messages (plain text) + metadata.input = messages_to_plain_text(req_data.messages) + for _, msg in ipairs(req_data.messages) do + if msg.content then + metadata.prompt_length = metadata.prompt_length + #msg.content + end + end + elseif req_data.prompt then + metadata.input = req_data.prompt + metadata.prompt_length = #req_data.prompt + elseif req_data.input then + metadata.input = req_data.input + metadata.prompt_length = #req_data.input + end + + -- Extract generation parameters + metadata.temperature = req_data.temperature + metadata.max_tokens = req_data.max_tokens + metadata.top_p = req_data.top_p + metadata.frequency_penalty = req_data.frequency_penalty + metadata.presence_penalty = req_data.presence_penalty + + -- Extract features + if req_data.features then + metadata.features = req_data.features + end + + -- Extract metadata field + if req_data.metadata then + -- Extract capabilities from model info + if req_data.metadata.model and req_data.metadata.model.info and req_data.metadata.model.info.meta then + metadata.capabilities = req_data.metadata.model.info.meta.capabilities or {} + end + + -- Extract features from metadata + if req_data.metadata.features then + metadata.features = req_data.metadata.features + end + end + else + -- If JSON parsing fails, store as raw text + metadata.input = request_body + metadata.prompt_length = #request_body + end + end + + -- Parse response body + if response_body then + local ok, res_data = pcall(cjson.decode, response_body) + if ok and res_data then + -- Store raw response for comprehensive tracing + metadata.raw_response = res_data + + metadata.model = metadata.model or res_data.model or "unknown" + + -- Extract usage data + if res_data.usage then + metadata.usage = res_data.usage + metadata.total_tokens = res_data.usage.total_tokens or 0 + metadata.prompt_tokens = res_data.usage.prompt_tokens or 0 + metadata.completion_tokens = res_data.usage.completion_tokens or 0 + end + + -- Extract output and finish reason + if res_data.choices and res_data.choices[1] then + local choice = res_data.choices[1] + + -- Extract finish reason + metadata.finish_reason = choice.finish_reason or choice.stop_reason + + -- Extract output content (plain text) + if choice.message then + if choice.message.content then + metadata.output = choice.message.content + metadata.response_length = #choice.message.content + end + elseif choice.text then + metadata.output = choice.text + metadata.response_length = #choice.text + end + elseif res_data.output then + metadata.output = res_data.output + metadata.response_length = #res_data.output + end + + -- Extract other response data + metadata.response_id = res_data.id + metadata.created = res_data.created + metadata.object = res_data.object + else + -- If JSON parsing fails, store as raw text + metadata.output = response_body + metadata.response_length = #response_body + end + end + + return metadata +end + +-- Calculate latency metrics +local function calculate_latency_metrics(start_time, end_time, total_tokens) + local total_duration = end_time - start_time + local total_tokens_num = total_tokens or 0 + + return { + total_latency_ms = total_duration * 1000, + time_per_token = total_tokens_num > 0 and (total_duration * 1000 / total_tokens_num) or 0, + throughput_tokens_per_second = total_duration > 0 and (total_tokens_num / total_duration) or 0 + } +end + +-- Send complete trace to Langfuse +local function send_to_langfuse_async(conf, trace_data) + local http = require("resty.http") + local cjson = require("cjson") + + if not conf.langfuse_enabled then + kong.log.debug("Langfuse disabled, skipping trace export") + return + end + + -- Calculate latency metrics + local latency_metrics = calculate_latency_metrics( + trace_data.start_time, + trace_data.start_time + trace_data.duration, + trace_data.total_tokens + ) + + -- Prepare Langfuse payload with proper batch structure + local langfuse_payload = { batch = {} } + + -- Main trace with input and output as plain text + local trace_body = { + id = trace_data.trace_id, + name = trace_data.name, + userId = trace_data.user_id, + sessionId = trace_data.session_id, + input = trace_data.input, -- Plain text + output = trace_data.output, -- Plain text + metadata = { + -- Basic request info + provider = trace_data.provider, + model = trace_data.model, + method = trace_data.method, + path = trace_data.path, + status_code = trace_data.status_code, + + -- Latency metrics + total_duration_ms = trace_data.duration * 1000, + time_per_token_ms = latency_metrics.time_per_token, + throughput_tokens_per_second = latency_metrics.throughput_tokens_per_second, + + -- Token usage + total_tokens = trace_data.total_tokens, + prompt_tokens = trace_data.prompt_tokens, + completion_tokens = trace_data.completion_tokens, + prompt_length = trace_data.prompt_length, + response_length = trace_data.response_length, + + -- Generation parameters + temperature = trace_data.temperature, + max_tokens = trace_data.max_tokens, + top_p = trace_data.top_p, + frequency_penalty = trace_data.frequency_penalty, + presence_penalty = trace_data.presence_penalty, + stream = trace_data.stream, + finish_reason = trace_data.finish_reason, + + -- User and session context + user_id = trace_data.user_id, + user_email = trace_data.user_email, + chat_id = trace_data.chat_id, + message_id = trace_data.message_id, + session_id = trace_data.session_id, + organization_id = trace_data.organization_id, + project_id = trace_data.project_id, + + -- Filter and tool info + filter_ids = trace_data.filter_ids, + tool_ids = trace_data.tool_ids, + tool_servers = trace_data.tool_servers, + files = trace_data.files, + direct = trace_data.direct, + + -- Features and capabilities + features = trace_data.features, + capabilities = trace_data.capabilities, + variables = trace_data.variables, + params = trace_data.params, + + -- Model info + model_info = trace_data.model_info, + + -- Response info + response_id = trace_data.response_id, + created_timestamp = trace_data.created, + object = trace_data.object, + + -- Raw data availability + has_raw_request = trace_data.raw_request ~= nil, + has_raw_response = trace_data.raw_response ~= nil, + + -- Message info + message_count = #trace_data.messages, + has_messages = #trace_data.messages > 0 + }, + tags = { + "kong", + "ai-gateway", + trace_data.provider, + trace_data.model, + trace_data.user_id ~= "anonymous" and "authenticated" or "anonymous", + trace_data.chat_id or "no-chat" + } + } + + -- Add raw request data to metadata if available + if trace_data.raw_request then + trace_body.metadata.raw_request = trace_data.raw_request + end + + -- Add raw response data to metadata if available + if trace_data.raw_response then + trace_body.metadata.raw_response = trace_data.raw_response + end + + table.insert(langfuse_payload.batch, { + id = trace_data.trace_id .. "-trace", + type = "trace-create", + timestamp = os.date("!%Y-%m-%dT%H:%M:%S.000Z", trace_data.start_time), + body = trace_body + }) + + -- Add detailed input observation (prompt with full messages as JSON for detailed view) + if #trace_data.messages > 0 then + table.insert(langfuse_payload.batch, { + id = trace_data.trace_id .. "-input", + type = "observation-create", + timestamp = os.date("!%Y-%m-%dT%H:%M:%S.000Z", trace_data.start_time), + body = { + id = trace_data.trace_id .. "-input", + traceId = trace_data.trace_id, + type = "GENERATION", + name = "prompt", + input = trace_data.messages, -- Full messages as JSON for detailed analysis + metadata = { + prompt_length = trace_data.prompt_length, + prompt_tokens = trace_data.prompt_tokens, + model = trace_data.model, + temperature = trace_data.temperature, + max_tokens = trace_data.max_tokens, + message_count = #trace_data.messages, + stream = trace_data.stream, + features = trace_data.features, + capabilities = trace_data.capabilities + } + } + }) + elseif trace_data.input and trace_data.input ~= "" then + table.insert(langfuse_payload.batch, { + id = trace_data.trace_id .. "-input", + type = "observation-create", + timestamp = os.date("!%Y-%m-%dT%H:%M:%S.000Z", trace_data.start_time), + body = { + id = trace_data.trace_id .. "-input", + traceId = trace_data.trace_id, + type = "GENERATION", + name = "prompt", + input = trace_data.input, -- Plain text input + metadata = { + prompt_length = trace_data.prompt_length, + prompt_tokens = trace_data.prompt_tokens, + model = trace_data.model, + temperature = trace_data.temperature, + max_tokens = trace_data.max_tokens + } + } + }) + end + + -- Add detailed output observation (completion as plain text) + if trace_data.output and trace_data.output ~= "" then + table.insert(langfuse_payload.batch, { + id = trace_data.trace_id .. "-output", + type = "observation-create", + timestamp = os.date("!%Y-%m-%dT%H:%M:%S.000Z", trace_data.start_time + trace_data.duration), + body = { + id = trace_data.trace_id .. "-output", + traceId = trace_data.trace_id, + type = "GENERATION", + name = "completion", + output = trace_data.output, -- Plain text output + metadata = { + response_length = trace_data.response_length, + completion_tokens = trace_data.completion_tokens, + finish_reason = trace_data.finish_reason, + total_tokens = trace_data.total_tokens, + latency_ms = trace_data.duration * 1000, + time_per_token_ms = latency_metrics.time_per_token, + throughput_tokens_per_second = latency_metrics.throughput_tokens_per_second, + response_id = trace_data.response_id, + created_timestamp = trace_data.created + } + } + }) + end + + -- Add user properties for better grouping in Langfuse + if trace_data.user_id and trace_data.user_id ~= "anonymous" then + local user_data = { + id = trace_data.user_id + } + + if trace_data.user_email then + user_data.email = trace_data.user_email + end + + langfuse_payload.batch[1].body.user = user_data + end + + local endpoint = conf.langfuse_endpoint + if not endpoint then + endpoint = "https://cloud.langfuse.com/api/public/ingestion" + end + + -- Create HTTP client in timer context + local httpc = http.new() + httpc:set_timeout(conf.langfuse_timeout) + + local res, err = httpc:request_uri(endpoint, { + method = "POST", + headers = { + ["Content-Type"] = "application/json", + ["Authorization"] = "Basic " .. ngx.encode_base64(conf.langfuse_public_key .. ":" .. conf.langfuse_secret_key) + }, + body = cjson.encode(langfuse_payload) + }) + + if not res then + kong.log.err("Failed to send trace to Langfuse: ", err) + return false + end + + if res.status >= 400 then + kong.log.err("Langfuse API error: ", res.status, " - ", res.body) + return false + end + + kong.log.debug("Complete trace with all metadata sent to Langfuse successfully") + return true +end + +-- Timer callback function +local function langfuse_timer_handler(premature, conf_data, trace_data_copy) + if premature then + kong.log.debug("Timer premature") + return + end + + local ok, err = pcall(send_to_langfuse_async, conf_data, trace_data_copy) + if not ok then + kong.log.err("Error in Langfuse timer handler: ", err) + end +end + +-- Plugin phases +function AiTracingHandler:access(conf) + kong.log.debug("AI Tracing - Access phase") + + local path = ngx.var.uri + + -- Check if path should be ignored + if should_ignore_path(path) then + kong.log.debug("AI Tracing - Ignoring path: ", path) + ngx.ctx.ai_tracing_ignored = true + return + end + + local headers = ngx.req.get_headers() + + -- Capture request body early for metadata extraction + ngx.req.read_body() + local body_data = ngx.req.get_body_data() + + -- Extract metadata from headers and request body + local user_session_metadata = extract_user_session_metadata(headers, body_data) + + ngx.ctx.ai_tracing = { + start_time = ngx.now(), + trace_id = ngx.var.request_id or ngx.md5(ngx.var.remote_addr .. ngx.now()), + name = path, + method = ngx.req.get_method(), + path = path, + provider = detect_ai_provider(path, headers), + request_body = body_data, + + -- User and session context + user_id = user_session_metadata.user_id, + user_email = user_session_metadata.user_email, + chat_id = user_session_metadata.chat_id, + message_id = user_session_metadata.message_id, + session_id = user_session_metadata.session_id, + organization_id = user_session_metadata.organization_id, + project_id = user_session_metadata.project_id, + + -- Additional metadata + filter_ids = user_session_metadata.filter_ids, + tool_ids = user_session_metadata.tool_ids, + tool_servers = user_session_metadata.tool_servers, + files = user_session_metadata.files, + variables = user_session_metadata.variables, + model_info = user_session_metadata.model_info, + features = user_session_metadata.features, + capabilities = user_session_metadata.capabilities, + params = user_session_metadata.params, + direct = user_session_metadata.direct, + input = user_session_metadata.input, + output = user_session_metadata.output, + raw_request_body = user_session_metadata.raw_request_body + } + + kong.service.request.set_header("X-AI-Tracing", "enabled") +end + +function AiTracingHandler:header_filter(conf) + kong.log.debug("AI Tracing - Header Filter phase") + + if ngx.ctx.ai_tracing_ignored then + return + end +end + +function AiTracingHandler:body_filter(conf) + kong.log.debug("AI Tracing - Body Filter phase") + + if ngx.ctx.ai_tracing_ignored or not ngx.ctx.ai_tracing then + return + end + + local chunk = ngx.arg[1] + if not ngx.ctx.ai_tracing.response_body then + ngx.ctx.ai_tracing.response_body = chunk + else + ngx.ctx.ai_tracing.response_body = ngx.ctx.ai_tracing.response_body .. (chunk or "") + end +end + +function AiTracingHandler:log(conf) + kong.log.debug("AI Tracing - Log phase") + + if ngx.ctx.ai_tracing_ignored then + kong.log.debug("AI Tracing - Ignored path, skipping trace") + return + end + + if not ngx.ctx.ai_tracing then + return + end + + local trace_data = ngx.ctx.ai_tracing + trace_data.duration = ngx.now() - trace_data.start_time + trace_data.status_code = ngx.status + + -- Extract AI metadata + local ai_metadata = extract_ai_metadata(trace_data.request_body, trace_data.response_body) + + -- Copy all metadata to trace_data + for key, value in pairs(ai_metadata) do + trace_data[key] = value + end + + -- Send to Langfuse if enabled (async) + if conf.langfuse_enabled then + -- Create a complete copy of trace data for timer + local trace_data_copy = { + -- Basic trace info + trace_id = trace_data.trace_id, + name = trace_data.name, + method = trace_data.method, + path = trace_data.path, + status_code = trace_data.status_code, + duration = trace_data.duration, + start_time = trace_data.start_time, + provider = trace_data.provider, + + -- User and session context + user_id = trace_data.user_id, + user_email = trace_data.user_email, + chat_id = trace_data.chat_id, + message_id = trace_data.message_id, + session_id = trace_data.session_id, + organization_id = trace_data.organization_id, + project_id = trace_data.project_id, + + -- Model and generation info + model = trace_data.model, + stream = trace_data.stream, + finish_reason = trace_data.finish_reason, + + -- Token usage + total_tokens = trace_data.total_tokens, + prompt_tokens = trace_data.prompt_tokens, + completion_tokens = trace_data.completion_tokens, + prompt_length = trace_data.prompt_length, + response_length = trace_data.response_length, + + -- Input/Output + input = trace_data.input, + output = trace_data.output, + messages = trace_data.messages, + + -- Generation parameters + temperature = trace_data.temperature, + max_tokens = trace_data.max_tokens, + top_p = trace_data.top_p, + frequency_penalty = trace_data.frequency_penalty, + presence_penalty = trace_data.presence_penalty, + + -- Additional context + filter_ids = trace_data.filter_ids, + tool_ids = trace_data.tool_ids, + tool_servers = trace_data.tool_servers, + files = trace_data.files, + variables = trace_data.variables, + model_info = trace_data.model_info, + features = trace_data.features, + capabilities = trace_data.capabilities, + params = trace_data.params, + direct = trace_data.direct, + + -- Response info + response_id = trace_data.response_id, + created = trace_data.created, + object = trace_data.object, + usage = trace_data.usage, + + -- Raw data for comprehensive tracing + raw_request = trace_data.raw_request, + raw_response = trace_data.raw_response + } + + -- Use timer to send async + local ok, err = ngx.timer.at(0, langfuse_timer_handler, conf, trace_data_copy) + + if not ok then + kong.log.err("Failed to create async timer for Langfuse: ", err) + else + kong.log.debug("Async Langfuse timer created successfully") + end + end + + -- Log to Kong + kong.log.info("📊 AI Request Trace - Complete Metadata", { + trace_id = trace_data.trace_id, + user_id = trace_data.user_id, + user_email = trace_data.user_email, + chat_id = trace_data.chat_id, + session_id = trace_data.session_id, + model = trace_data.model, + duration_ms = trace_data.duration * 1000, + status = trace_data.status_code, + + -- Token metrics + total_tokens = trace_data.total_tokens, + prompt_tokens = trace_data.prompt_tokens, + completion_tokens = trace_data.completion_tokens, + + -- Message metrics + message_count = #trace_data.messages, + prompt_length = trace_data.prompt_length, + response_length = trace_data.response_length, + + -- Additional context + has_features = trace_data.features and next(trace_data.features) ~= nil, + has_capabilities = trace_data.capabilities and next(trace_data.capabilities) ~= nil, + has_variables = trace_data.variables and next(trace_data.variables) ~= nil, + filter_count = #trace_data.filter_ids, + tool_count = trace_data.tool_ids and #trace_data.tool_ids or 0, + has_raw_request = trace_data.raw_request ~= nil, + has_raw_response = trace_data.raw_response ~= nil + }) +end + +return AiTracingHandler \ No newline at end of file diff --git a/applications/kong-ai-plugin/kong/plugins/ai-tracing/schema.lua b/applications/kong-ai-plugin/kong/plugins/ai-tracing/schema.lua new file mode 100644 index 0000000..3ccc746 --- /dev/null +++ b/applications/kong-ai-plugin/kong/plugins/ai-tracing/schema.lua @@ -0,0 +1,18 @@ +return { + name = "ai-tracing", + fields = { + { + config = { + type = "record", + fields = { + { enabled = { type = "boolean", default = true } }, + { langfuse_enabled = { type = "boolean", default = false } }, + { langfuse_endpoint = { type = "string", default = "https://cloud.langfuse.com/api/traces" } }, + { langfuse_public_key = { type = "string" } }, + { langfuse_secret_key = { type = "string" } }, + { langfuse_timeout = { type = "number", default = 5000 } }, + } + } + } + } + } \ No newline at end of file diff --git a/applications/kong-ai-plugin/rockspec/kong-plugin-ai-tracing-1.0.0-1.rockspec b/applications/kong-ai-plugin/rockspec/kong-plugin-ai-tracing-1.0.0-1.rockspec new file mode 100644 index 0000000..3e7ddf5 --- /dev/null +++ b/applications/kong-ai-plugin/rockspec/kong-plugin-ai-tracing-1.0.0-1.rockspec @@ -0,0 +1,21 @@ +package = "kong-plugin-ai-tracing" +version = "1.0.0-1" +source = { + url = "git://github.com/Ramtinboreili/kong-langfuse-tracing" +} +description = { + summary = "A Kong plugin that provides AI request tracing and observability by exporting traces to Langfuse.", + homepage = "https://github.com/Ramtinboreili/kong-langfuse-tracing", + license = "MIT" +} +dependencies = { + "lua-resty-http >= 0.15", + "lua-cjson >= 2.1.0" +} +build = { + type = "builtin", + modules = { + ["kong.plugins.ai-tracing.handler"] = "kong/plugins/ai-tracing/handler.lua", + ["kong.plugins.ai-tracing.schema"] = "kong/plugins/ai-tracing/schema.lua" + } +}