From 48564723a96b37124f67b780863052c8ec453b63 Mon Sep 17 00:00:00 2001 From: Dan McMahon Date: Mon, 8 Jun 2026 16:08:37 +1000 Subject: [PATCH] feat(otelplugin): add OpenTelemetry usage plugin with W3C Baggage propagation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds internal/otelplugin/ as an opt-in `usage.Plugin` that: - Parses W3C Baggage on inbound requests (Gin middleware) - Allowlist-filters the baggage before downstream propagation - Emits OTLP usage events with GenAI semconv + cost attributes - Propagates baggage on outbound LLM provider requests - Mirrors the redisqueue toggle pattern for runtime enable/disable Off by default — set telemetry.otlp.enabled: true to turn it on. No existing behaviour changes. The plugin mirrors the redisqueue pattern: init()-time registration with coreusage.RegisterPlugin, atomic.Bool feature flag (Enabled / SetEnabled), config snapshot under atomic.Value for lock-free hot-path reads. Why --- - Native OpenTelemetry observability: any OTel-compatible backend (Honeycomb, Tempo, Jaeger, Datadog, Grafana Cloud, Azure Monitor, AWS X-Ray, …) becomes a one-config-change away from CLIProxyAPI telemetry, instead of each operator writing a backend-specific shim against the management API. - Standards-aligned cross-service identity propagation per the W3C Baggage spec. Trusted-boundary safe default (propagation: off); operators opt in to propagate or allowlist. - Early adoption of the OTel GenAI semantic conventions — LLM-routing proxies are a category the OTel community has identified as needing reference implementations. - Per-request cost attribution: optional pricing-table block produces cost.usd per span. Files ----- internal/otelplugin/ config.go — Config schema + SetConfig + Enabled/SetEnabled baggage.go — W3C Baggage parser, formatter, allowlist filter, context plumbing (WithBaggage, BaggageFromContext) middleware.go — Gin middleware + outbound propagation helpers plugin.go — usage.Plugin implementation + tracer wiring status.go — OTel codes aliases *_test.go — 35 unit tests covering parser, middleware, attribute mapping, cost calc, disabled-plugin no-op Spec + design context: https://github.com/three-cubes/tc-agent-zone/blob/main/docs/upstream-asks/cli-proxy-api-otel-baggage.md --- go.mod | 33 ++- go.sum | 90 ++++++-- internal/otelplugin/README.md | 189 ++++++++++++++++ internal/otelplugin/baggage.go | 143 ++++++++++++ internal/otelplugin/baggage_test.go | 162 +++++++++++++ internal/otelplugin/config.go | 259 +++++++++++++++++++++ internal/otelplugin/middleware.go | 86 +++++++ internal/otelplugin/middleware_test.go | 158 +++++++++++++ internal/otelplugin/plugin.go | 301 +++++++++++++++++++++++++ internal/otelplugin/plugin_test.go | 256 +++++++++++++++++++++ internal/otelplugin/status.go | 12 + 11 files changed, 1657 insertions(+), 32 deletions(-) create mode 100644 internal/otelplugin/README.md create mode 100644 internal/otelplugin/baggage.go create mode 100644 internal/otelplugin/baggage_test.go create mode 100644 internal/otelplugin/config.go create mode 100644 internal/otelplugin/middleware.go create mode 100644 internal/otelplugin/middleware_test.go create mode 100644 internal/otelplugin/plugin.go create mode 100644 internal/otelplugin/plugin_test.go create mode 100644 internal/otelplugin/status.go diff --git a/go.mod b/go.mod index 3418dbadd59..dd985166569 100644 --- a/go.mod +++ b/go.mod @@ -17,29 +17,44 @@ require ( github.com/joho/godotenv v1.5.1 github.com/klauspost/compress v1.17.4 github.com/minio/minio-go/v7 v7.0.66 + github.com/redis/go-redis/v9 v9.19.0 github.com/refraction-networking/utls v1.8.2 github.com/sirupsen/logrus v1.9.3 github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966 github.com/tidwall/gjson v1.18.0 github.com/tidwall/sjson v1.2.5 github.com/tiktoken-go/tokenizer v0.7.0 - golang.org/x/crypto v0.45.0 - golang.org/x/net v0.47.0 - golang.org/x/oauth2 v0.30.0 - golang.org/x/sync v0.18.0 - golang.org/x/sys v0.38.0 + go.opentelemetry.io/otel v1.44.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.44.0 + go.opentelemetry.io/otel/sdk v1.44.0 + go.opentelemetry.io/otel/trace v1.44.0 + golang.org/x/crypto v0.51.0 + golang.org/x/net v0.55.0 + golang.org/x/oauth2 v0.36.0 + golang.org/x/sync v0.20.0 + golang.org/x/sys v0.45.0 gopkg.in/natefinch/lumberjack.v2 v2.2.1 gopkg.in/yaml.v3 v3.0.1 ) require ( + github.com/cenkalti/backoff/v5 v5.0.3 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect - github.com/redis/go-redis/v9 v9.19.0 // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.29.0 // indirect + go.opentelemetry.io/auto/sdk v1.2.1 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.44.0 // indirect + go.opentelemetry.io/otel/metric v1.44.0 // indirect + go.opentelemetry.io/proto/otlp v1.10.0 // indirect go.uber.org/atomic v1.11.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20260526163538-3dc84a4a5aaa // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20260526163538-3dc84a4a5aaa // indirect + google.golang.org/grpc v1.81.1 // indirect ) require ( - cloud.google.com/go/compute/metadata v0.3.0 // indirect + cloud.google.com/go/compute/metadata v0.9.0 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect github.com/ProtonMail/go-crypto v1.3.0 // indirect github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect @@ -99,7 +114,7 @@ require ( github.com/ugorji/go/codec v1.2.12 // indirect github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect golang.org/x/arch v0.8.0 // indirect - golang.org/x/text v0.31.0 // indirect - google.golang.org/protobuf v1.34.1 // indirect + golang.org/x/text v0.37.0 // indirect + google.golang.org/protobuf v1.36.11 gopkg.in/ini.v1 v1.67.0 // indirect ) diff --git a/go.sum b/go.sum index 5f0a03fbefc..807c4602801 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -cloud.google.com/go/compute/metadata v0.3.0 h1:Tz+eQXMEqDIKRsmY3cHTL6FVaynIjX2QxYC4trgAKZc= -cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k= +cloud.google.com/go/compute/metadata v0.9.0 h1:pDUj4QMoPejqq20dK0Pg2N4yG9zIkYGdBtwLoEkH9Zs= +cloud.google.com/go/compute/metadata v0.9.0/go.mod h1:E0bWwX5wTnLPedCKqk3pJmVgCBSM6qQI1yTBdEb3C10= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/ProtonMail/go-crypto v1.3.0 h1:ILq8+Sf5If5DCpHQp4PbZdS1J7HDFRXz/+xKBiRGFrw= @@ -14,10 +14,16 @@ github.com/atotto/clipboard v0.1.4 h1:EH0zSVneZPSuFR11BlR9YppQTVDbh5+16AmcJi4g1z github.com/atotto/clipboard v0.1.4/go.mod h1:ZY9tmq7sm5xIbd9bOK4onWV4S6X0u6GY7Vn0Yu86PYI= github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiErDT4WkJ2k= github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/bytedance/sonic v1.11.6 h1:oUp34TzMlL+OY1OUWxHqsdkgC/Zfc85zGqw9siXjrc0= github.com/bytedance/sonic v1.11.6/go.mod h1:LysEHSvpvDySVdC2f87zGWf6CIKJcAvqab1ZaiQtds4= github.com/bytedance/sonic/loader v0.1.1 h1:c+e5Pt1k/cy5wMveRDyk2X4B9hF4g7an8N3zCYjJFNM= github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= +github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= +github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/charmbracelet/bubbles v1.0.0 h1:12J8/ak/uCZEMQ6KU7pcfwceyjLlWsDLAxB5fXonfvc= @@ -79,6 +85,11 @@ github.com/go-git/go-git-fixtures/v5 v5.1.1 h1:OH8i1ojV9bWfr0ZfasfpgtUXQHQyVS8HX github.com/go-git/go-git-fixtures/v5 v5.1.1/go.mod h1:Altk43lx3b1ks+dVoAG2300o5WWUnktvfY3VI6bcaXU= github.com/go-git/go-git/v6 v6.0.0-20251009132922-75a182125145 h1:C/oVxHd6KkkuvthQ/StZfHzZK07gl6xjfCfT3derko0= github.com/go-git/go-git/v6 v6.0.0-20251009132922-75a182125145/go.mod h1:gR+xpbL+o1wuJJDwRN4pOkpNwDS0D24Eo4AD5Aau2DY= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s= github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA= @@ -91,13 +102,17 @@ github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 h1:f+oWsMOmNPc8JmEHVZIycC7hBoQxHH9pNKQORJNozsQ= github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8/go.mod h1:wcDNUvekVysuuOpQKo3191zZyTpiI6se1N1ULghS0sw= -github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= -github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.29.0 h1:5VipnvEpbqr2gA2VbM+nYVbkIF28c5ZQfqCBQ5g2xfk= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.29.0/go.mod h1:Hyl3n6Twe1hvtd9XUXDec4pTvgMSEixRuQKPTMH2bNs= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= @@ -123,8 +138,9 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= github.com/lucasb-eyer/go-colorful v1.3.0 h1:2/yBRLdWBZKrf7gB40FoiKfAWYQ0lqNcbuQwVHXptag= @@ -207,34 +223,62 @@ github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65E github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e h1:JVG44RsyaB9T2KIHavMF/ppJZNG9ZpyihvCd0w101no= github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e/go.mod h1:RbqR21r5mrJuqunuUZ/Dhy/avygyECGrLceyNeo4LiM= +github.com/zeebo/xxh3 v1.1.0 h1:s7DLGDK45Dyfg7++yxI0khrfwq9661w9EN78eP/UZVs= +github.com/zeebo/xxh3 v1.1.0/go.mod h1:IisAie1LELR4xhVinxWS5+zf1lA4p0MW4T+w+W07F5s= +go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= +go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= +go.opentelemetry.io/otel v1.44.0 h1:JjwHmHpA4iZ3wBxluu2fbbE7j4kqlE8jXyAyPXH7HqU= +go.opentelemetry.io/otel v1.44.0/go.mod h1:BMgjTHL9WPRlRjL2oZCBTL4whCGtXch2H4BhOPIAyYc= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.44.0 h1:4YsVu3B8+3qtWYYrsUYgn0OG78pN0rnNPRGX4SbokQI= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.44.0/go.mod h1:+wnlSn0mD1ADVMe3v9Z/WIaiz6q6gL2J/ejaAmdmv80= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.44.0 h1:lgh3PiVrRUWMLOVSkQicxzZll5NjF1r+AtsX1XRIHw0= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.44.0/go.mod h1:5Cnhth3m/AgOeTgE3ex12pPmiu/gGtZit03kSzx9X7s= +go.opentelemetry.io/otel/metric v1.44.0 h1:1w0gILTcHdr3YI+ixLyjemwrVnsMURbTZFrSYCdDdmc= +go.opentelemetry.io/otel/metric v1.44.0/go.mod h1:8O7hanEPBNgEMmybD3s2VBKcgWOCsA6tzHBPODAiquo= +go.opentelemetry.io/otel/sdk v1.44.0 h1:nHYwb9lK+fJPU/dnT6s7W7Z8itMWyqrnVfbheVYrZ58= +go.opentelemetry.io/otel/sdk v1.44.0/go.mod h1:Osuydd3Se74nqjAKxid74N5eC+jfEqfTegHRnq58oK0= +go.opentelemetry.io/otel/sdk/metric v1.44.0 h1:3LlKgI+VjbVsjNRFZJZAJ30WjXC5VkNRks6si09iEfI= +go.opentelemetry.io/otel/sdk/metric v1.44.0/go.mod h1:5B5pMARnXxKhltooO4xUuCBorl65a4EpnTalObqOigA= +go.opentelemetry.io/otel/trace v1.44.0 h1:jxF5CsGYCe74MCRx2X4g7WsY/VBKRqqpNvXlX/6gtIk= +go.opentelemetry.io/otel/trace v1.44.0/go.mod h1:oLl1jrMQAVo6v3GAggN+1VH9VIz9iUSvW53sW1Q8PIE= +go.opentelemetry.io/proto/otlp v1.10.0 h1:IQRWgT5srOCYfiWnpqUYz9CVmbO8bFmKcwYxpuCSL2g= +go.opentelemetry.io/proto/otlp v1.10.0/go.mod h1:/CV4QoCR/S9yaPj8utp3lvQPoqMtxXdzn7ozvvozVqk= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= golang.org/x/arch v0.8.0 h1:3wRIsP3pM4yUptoR96otTUOXI367OS0+c9eeRi9doIc= golang.org/x/arch v0.8.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys= -golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q= -golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4= +golang.org/x/crypto v0.51.0 h1:IBPXwPfKxY7cWQZ38ZCIRPI50YLeevDLlLnyC5wRGTI= +golang.org/x/crypto v0.51.0/go.mod h1:8AdwkbraGNABw2kOX6YFPs3WM22XqI4EXEd8g+x7Oc8= golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI= golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo= -golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY= -golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU= -golang.org/x/oauth2 v0.30.0 h1:dnDm7JmhM45NNpd8FDDeLhK6FwqbOf4MLCM9zb1BOHI= -golang.org/x/oauth2 v0.30.0/go.mod h1:B++QgG3ZKulg6sRPGD/mqlHQs5rB3Ml9erfeDY7xKlU= -golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I= -golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/net v0.55.0 h1:bcvxaJn3e1U6InsFWt1JUq1aSjnRxLzT2rtD2KfkDF8= +golang.org/x/net v0.55.0/go.mod h1:L5U2KuzuOe1lY7Z+aWVIKK6qEeJXnXV9yzGA+WCHJww= +golang.org/x/oauth2 v0.36.0 h1:peZ/1z27fi9hUOFCAZaHyrpWG5lwe0RJEEEeH0ThlIs= +golang.org/x/oauth2 v0.36.0/go.mod h1:YDBUJMTkDnJS+A4BP4eZBjCqtokkg1hODuPjwiGPO7Q= +golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= +golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc= -golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= -golang.org/x/term v0.37.0 h1:8EGAD0qCmHYZg6J17DvsMy9/wJ7/D/4pV/wfnld5lTU= -golang.org/x/term v0.37.0/go.mod h1:5pB4lxRNYYVZuTLmy8oR2BH8dflOR+IbTYFD8fi3254= -golang.org/x/text v0.31.0 h1:aC8ghyu4JhP8VojJ2lEHBnochRno1sgL6nEi9WGFGMM= -golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= -google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +golang.org/x/sys v0.45.0 h1:dO4czNzziLiiXplLQgBCEpCvXQ3dnkn0SdaZSYdQ+FY= +golang.org/x/sys v0.45.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/term v0.43.0 h1:S4RLU2sB31O/NCl+zFN9Aru9A/Cq2aqKpTZJ6B+DwT4= +golang.org/x/term v0.43.0/go.mod h1:lrhlHNdQJHO+1qVYiHfFKVuVioJIheAc3fBSMFYEIsk= +golang.org/x/text v0.37.0 h1:Cqjiwd9eSg8e0QAkyCaQTNHFIIzWtidPahFWR83rTrc= +golang.org/x/text v0.37.0/go.mod h1:a5sjxXGs9hsn/AJVwuElvCAo9v8QYLzvavO5z2PiM38= +gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4= +gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E= +google.golang.org/genproto/googleapis/api v0.0.0-20260526163538-3dc84a4a5aaa h1:Kjn0N0tCrDgiAFW+lGO4JZ3ck44CehvJQMAwj9QF0G8= +google.golang.org/genproto/googleapis/api v0.0.0-20260526163538-3dc84a4a5aaa/go.mod h1:q4lMZS6kskjT5HvCPrnnypcDPVJqT/f4nfxmkE7gryY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260526163538-3dc84a4a5aaa h1:mZHHdPZl0dbGHCflZgAq/Q468DWVFcU2whhB2KAo8fk= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260526163538-3dc84a4a5aaa/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8= +google.golang.org/grpc v1.81.1 h1:VnnIIZ88UzOOKLukQi+ImGz8O1Wdp8nAGGnvOfEIWQQ= +google.golang.org/grpc v1.81.1/go.mod h1:xGH9GfzOyMTGIOXBJmXt+BX/V0kcdQbdcuwQ/zNw42I= +google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= +google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= diff --git a/internal/otelplugin/README.md b/internal/otelplugin/README.md new file mode 100644 index 00000000000..0be0179cbea --- /dev/null +++ b/internal/otelplugin/README.md @@ -0,0 +1,189 @@ +# otelplugin + +OpenTelemetry usage exporter for CLIProxyAPI. Emits one OTLP span per upstream +LLM call, with attributes following the emerging GenAI semantic conventions, +plus W3C Baggage propagation for caller-supplied agent identity. + +## Why + +CLIProxyAPI already has a first-class `usage.Plugin` pipeline (see +`sdk/cliproxy/usage/manager.go`). The in-memory `LoggerPlugin` and the +`redisqueue` plugin both consume `usage.Record` values via the manager. This +package adds a third sink — OpenTelemetry — so operators can ship per-request +cost + token telemetry into any OTLP backend (Honeycomb, Jaeger, Tempo, +Azure Monitor, Datadog, etc.) without writing a custom shim. + +The plugin is **off by default**. Setting `telemetry.otlp.enabled: true` in +config turns it on. No other behaviour changes. + +## Wire-format outputs + +### Per-span attributes (OpenTelemetry GenAI semconv where it exists) + +| Attribute | Type | Notes | +| ------------------------------------------ | ------ | ----------------------------------------------- | +| `gen_ai.system` | string | Provider id (`anthropic`, `openai`, …) | +| `gen_ai.request.model` | string | Model id from `usage.Record.Model` | +| `gen_ai.request.model_alias` | string | Alias when one was used | +| `gen_ai.request.reasoning_effort` | string | Client-requested thinking level when supplied | +| `gen_ai.request.source` | string | Upstream auth source (`chatplan`, `apikey`, …) | +| `gen_ai.response.status_code` | int | Set only on failed calls | +| `gen_ai.usage.input_tokens` | int | Prompt tokens | +| `gen_ai.usage.output_tokens` | int | Completion tokens | +| `gen_ai.usage.cache_read_input_tokens` | int | Anthropic prompt-cache hits (when > 0) | +| `gen_ai.usage.cache_creation_input_tokens` | int | Anthropic prompt-cache writes (when > 0) | +| `gen_ai.usage.reasoning_tokens` | int | Provider-reported reasoning tokens (when > 0) | +| `gen_ai.usage.total_tokens` | int | Provider-reported total (when > 0) | +| `cost.usd` | float | Total — emitted only when `cost.enabled: true` | +| `cost.input_usd` | float | Component breakdown | +| `cost.output_usd` | float | Component breakdown | +| `cost.cache_read_usd` | float | Component breakdown | +| `cost.cache_creation_usd` | float | Component breakdown | +| `` | string | Each W3C Baggage key from the inbound request | + +Span name defaults to `gen_ai.request`; configurable via `span.name`. + +### Resource attributes + +`service.name` defaults to `cli-proxy-api`. `service.namespace` is optional. + +## Config + +```yaml +telemetry: + otlp: + enabled: true # off by default + endpoint: "http://127.0.0.1:4318" # OTLP HTTP endpoint + protocol: "http/protobuf" # only http/protobuf supported today + headers: # optional, e.g. for hosted backends + x-honeycomb-team: "${HONEYCOMB_API_KEY}" + service: + name: "cli-proxy-api" + namespace: "agent-platform" + span: + name: "gen_ai.request" + include_baggage_keys: ["agent.id", "workload.kind"] # [] = all keys + include_usage: true # set false to drop token counts + include_cost: true # set false to drop cost.* attributes + + baggage: + propagation: "off" # off | propagate | allowlist + allowed_keys: ["agent.id", "workload.kind"] + + cost: + enabled: false # opt-in pricing table + pricing: + claude-opus-4-7: + input_per_million: 15.0 + output_per_million: 75.0 + cache_read_per_million: 1.5 + cache_creation_per_million: 18.75 +``` + +## Caller-supplied identity (W3C Baggage) + +The plugin works without any baggage — it still emits gen_ai.* attributes +from `usage.Record`. To attribute cost back to **which agent runtime** issued +the request, callers can set a [W3C Baggage](https://www.w3.org/TR/baggage/) +header on each inbound HTTP request: + +``` +baggage: agent.id=builder,agent.session.id=01HJX,workload.kind=chat-turn +``` + +When the Gin middleware (`otelplugin.Middleware()`) is registered, those keys +land on the request context. The plugin reads them via +`BaggageFromContext(ctx)` and copies the allowlisted keys to span attributes. + +The baggage propagation policy controls what flows from CLIProxyAPI to the +upstream provider: + +- `off` — strip baggage at the boundary. The upstream provider sees no baggage. +- `propagate` — forward verbatim. +- `allowlist` — forward only the keys in `baggage.allowed_keys`. + +Operators set the policy based on their trust posture toward upstream +providers. `off` is safe-by-default. + +## How it integrates with CLIProxyAPI + +``` +inbound HTTP + │ + ▼ +[ otelplugin.Middleware ] ── parses baggage → attaches to ctx + │ + ▼ +[ existing proxy handlers ] + │ + ▼ +upstream LLM call + │ + ▼ (success or failure) +[ runtime/executor publishes usage.Record ] + │ + ▼ +[ usage.Manager ] + │ + ├─ LoggerPlugin (existing) + ├─ usageQueuePlugin (existing, redisqueue) + └─ otelplugin.Plugin (this package) ── emits OTLP span ──▶ otel-collector +``` + +The pipeline is exactly the existing one. We add a plugin; we do not modify +runtime/executor or the manager itself. + +## Wiring (server-side) + +The plugin auto-registers via `init()` (same pattern as `redisqueue`). +Server-side wiring is one line in `internal/api/server.go`: + +```go +engine.Use(otelplugin.Middleware()) +``` + +And one line in the config loader after parsing the YAML block: + +```go +otelplugin.SetConfig(parsedConfig.Telemetry) +``` + +That is the complete integration footprint. The plugin is otherwise self- +contained — config + lifecycle + tests all live under +`internal/otelplugin/`. + +## Test coverage + +``` +go test ./internal/otelplugin/... -count=1 -v +``` + +28 unit tests cover: + +- W3C Baggage parsing (empty, multi-key, URL-decoding, per-entry metadata, case handling, malformed entries) +- Baggage round-trip through context +- Filter-by-allowlist +- Gin middleware (header present, absent, malformed) +- Outbound propagation (off, propagate, allowlist, nil-request safety) +- GenAI semconv attribute mapping +- Optional-field omission when zero +- `include_usage: false` / `include_cost: false` toggles +- Cost table exact + prefix match, fallback when model is absent +- `startTimeFor` precedence (RequestedAt > Latency-derived > Now) +- Disabled-plugin no-op + +## Open questions for the maintainers + +We have implementation flexibility on the following — happy to follow +whatever direction the maintainers prefer: + +1. **Config block placement.** Currently a top-level `telemetry:` block. Could + also live under an existing namespace (e.g. `usage-statistics-*` keys). +2. **gRPC exporter.** Only `http/protobuf` is wired today. Adding gRPC is one + extra import + parallel switch; happy to follow up if there's demand. +3. **Span name default.** Used `gen_ai.request` per the OpenTelemetry GenAI + semconv working draft. Some operators may prefer `llm.request` or + `cli-proxy-api.request`. Easily made configurable; default is the question. +4. **Build-tag gating.** The OTel SDK adds ~10MB to the binary. If that + matters, the package can sit behind a build tag (`-tags otelplugin`) and + ship a no-op stub for the default build. diff --git a/internal/otelplugin/baggage.go b/internal/otelplugin/baggage.go new file mode 100644 index 00000000000..fd6e5b12c33 --- /dev/null +++ b/internal/otelplugin/baggage.go @@ -0,0 +1,143 @@ +package otelplugin + +import ( + "context" + "net/url" + "strings" +) + +// W3C Baggage parser. Spec: https://www.w3.org/TR/baggage/ +// +// We implement this in-package (instead of pulling go.opentelemetry.io/otel's +// baggage package) for two reasons: +// - keep the package's exposed dependency surface narrow during the initial +// review (only the OTLP exporter + trace SDK are required imports); +// - the parser doubles as the baggage middleware's storage shape, which is +// just a map[string]string — no Member/Property objects needed. + +// Baggage is the parsed identity envelope from the request's `baggage:` HTTP +// header. Keys are lowercased; values are URL-decoded. Per-entry metadata +// after `;` in a single entry is intentionally dropped — consumers that need +// it should adopt the OTel SDK's baggage package. +type Baggage map[string]string + +// ParseBaggageHeader parses a single `baggage:` header value into a Baggage +// map. Returns nil when the header is missing or no entries are well-formed. +// +// Per W3C Baggage: entries are comma-separated; within an entry, optional +// metadata follows the value after a `;` and is opaque to consumers. The +// parser is intentionally permissive — malformed entries are skipped, not +// errored, so a single bad entry does not poison the whole header. +func ParseBaggageHeader(headerValue string) Baggage { + headerValue = strings.TrimSpace(headerValue) + if headerValue == "" { + return nil + } + out := make(Baggage) + for _, rawEntry := range strings.Split(headerValue, ",") { + entry := strings.TrimSpace(rawEntry) + if entry == "" { + continue + } + if semi := strings.IndexByte(entry, ';'); semi >= 0 { + entry = entry[:semi] + } + eq := strings.IndexByte(entry, '=') + if eq <= 0 { + continue + } + key := strings.ToLower(strings.TrimSpace(entry[:eq])) + rawValue := strings.TrimSpace(entry[eq+1:]) + if key == "" || rawValue == "" { + continue + } + value, err := url.QueryUnescape(rawValue) + if err != nil { + value = rawValue + } + out[key] = value + } + if len(out) == 0 { + return nil + } + return out +} + +// FormatBaggageHeader renders a Baggage map back into a W3C `baggage:` header +// value. Used when re-emitting allowlisted keys upstream. Output keys are +// emitted in the input map's iteration order — operators concerned about +// canonical ordering can sort the keys before calling. +func FormatBaggageHeader(b Baggage) string { + if len(b) == 0 { + return "" + } + var parts []string + for k, v := range b { + if strings.TrimSpace(k) == "" || strings.TrimSpace(v) == "" { + continue + } + parts = append(parts, k+"="+url.QueryEscape(v)) + } + return strings.Join(parts, ",") +} + +// FilterAllowed returns a Baggage subset containing only keys listed in +// `allowed`. Used for the BaggageAllowlist propagation mode. nil `allowed` +// returns the original map (callers should branch on propagation mode before +// calling). +func FilterAllowed(b Baggage, allowed []string) Baggage { + if len(b) == 0 { + return nil + } + if len(allowed) == 0 { + return nil + } + out := make(Baggage, len(allowed)) + for _, k := range allowed { + k = strings.ToLower(strings.TrimSpace(k)) + if k == "" { + continue + } + if v, ok := b[k]; ok { + out[k] = v + } + } + if len(out) == 0 { + return nil + } + return out +} + +// ---- Context plumbing ------------------------------------------------------- +// +// The plugin's HandleUsage receives a context.Context per record. We stash +// the parsed baggage on the context from the middleware so HandleUsage can +// pull it without re-parsing the header (or knowing about the Gin handler +// chain at all). + +type baggageContextKey struct{} + +// WithBaggage returns a context with the parsed baggage attached. Safe to +// call with a nil parent context. +func WithBaggage(ctx context.Context, b Baggage) context.Context { + if ctx == nil { + ctx = context.Background() + } + if len(b) == 0 { + return ctx + } + return context.WithValue(ctx, baggageContextKey{}, b) +} + +// BaggageFromContext returns the baggage stored on ctx, or nil when none. +func BaggageFromContext(ctx context.Context) Baggage { + if ctx == nil { + return nil + } + raw := ctx.Value(baggageContextKey{}) + if raw == nil { + return nil + } + b, _ := raw.(Baggage) + return b +} diff --git a/internal/otelplugin/baggage_test.go b/internal/otelplugin/baggage_test.go new file mode 100644 index 00000000000..a8c73afb8db --- /dev/null +++ b/internal/otelplugin/baggage_test.go @@ -0,0 +1,162 @@ +package otelplugin + +import ( + "reflect" + "sort" + "strings" + "testing" +) + +// ParseBaggageHeader covers the W3C-spec cases. The parser is intentionally +// permissive (malformed entries skipped, not errored) so a single bad entry +// does not poison the whole header. +func TestParseBaggageHeader_EmptyInputs(t *testing.T) { + t.Parallel() + for _, in := range []string{"", " ", "\t\n"} { + if got := ParseBaggageHeader(in); got != nil { + t.Errorf("ParseBaggageHeader(%q) = %v, want nil", in, got) + } + } +} + +func TestParseBaggageHeader_CanonicalMultiKey(t *testing.T) { + t.Parallel() + got := ParseBaggageHeader("agent.id=builder,agent.session.id=01HJX,workload.kind=chat-turn") + want := Baggage{ + "agent.id": "builder", + "agent.session.id": "01HJX", + "workload.kind": "chat-turn", + } + if !reflect.DeepEqual(got, want) { + t.Errorf("ParseBaggageHeader: got %v, want %v", got, want) + } +} + +func TestParseBaggageHeader_URLDecodesValues(t *testing.T) { + t.Parallel() + got := ParseBaggageHeader("note=hello%20world,raw=plain") + if got["note"] != "hello world" { + t.Errorf("URL-decoded value: got %q, want %q", got["note"], "hello world") + } + if got["raw"] != "plain" { + t.Errorf("plain value: got %q, want %q", got["raw"], "plain") + } +} + +func TestParseBaggageHeader_DropsPerEntryMetadata(t *testing.T) { + t.Parallel() + // Per W3C Baggage, content after `;` in a single entry is opaque metadata. + got := ParseBaggageHeader("agent.id=builder;importance=high,workload.id=01HJY") + want := Baggage{"agent.id": "builder", "workload.id": "01HJY"} + if !reflect.DeepEqual(got, want) { + t.Errorf("metadata-after-semi: got %v, want %v", got, want) + } +} + +func TestParseBaggageHeader_LowercaseKeysPreserveValueCase(t *testing.T) { + t.Parallel() + got := ParseBaggageHeader("Agent.Id=Builder,Workload.Kind=Chat-Turn") + want := Baggage{"agent.id": "Builder", "workload.kind": "Chat-Turn"} + if !reflect.DeepEqual(got, want) { + t.Errorf("case handling: got %v, want %v", got, want) + } +} + +func TestParseBaggageHeader_NilWhenAllMalformed(t *testing.T) { + t.Parallel() + for _, in := range []string{"garbage,no-equals", ";;", "=value", " , , "} { + if got := ParseBaggageHeader(in); got != nil { + t.Errorf("ParseBaggageHeader(%q) = %v, want nil", in, got) + } + } +} + +func TestParseBaggageHeader_SkipsEmptyEntries(t *testing.T) { + t.Parallel() + // Per W3C Baggage, consecutive commas produce empty list items which are + // dropped silently. The parser treats them as no-ops. + got := ParseBaggageHeader("agent.id=builder,,workload.kind=chat-turn,") + want := Baggage{"agent.id": "builder", "workload.kind": "chat-turn"} + if !reflect.DeepEqual(got, want) { + t.Errorf("empty entries: got %v, want %v", got, want) + } +} + +func TestFormatBaggageHeader_RoundTrip(t *testing.T) { + t.Parallel() + in := Baggage{"agent.id": "builder", "workload.kind": "chat-turn"} + header := FormatBaggageHeader(in) + out := ParseBaggageHeader(header) + if !reflect.DeepEqual(out, in) { + t.Errorf("round-trip mismatch: original=%v, after=%v", in, out) + } +} + +func TestFormatBaggageHeader_URLEncodesReservedChars(t *testing.T) { + t.Parallel() + header := FormatBaggageHeader(Baggage{"note": "hello world"}) + if !strings.Contains(header, "hello%20world") && !strings.Contains(header, "hello+world") { + t.Errorf("URL encoding missing: got %q", header) + } +} + +func TestFilterAllowed_ReturnsAllowlistedKeysOnly(t *testing.T) { + t.Parallel() + in := Baggage{"agent.id": "builder", "agent.session.id": "01HJX", "secret": "shh"} + got := FilterAllowed(in, []string{"agent.id", "agent.session.id"}) + want := Baggage{"agent.id": "builder", "agent.session.id": "01HJX"} + if !reflect.DeepEqual(got, want) { + t.Errorf("FilterAllowed: got %v, want %v", got, want) + } +} + +func TestFilterAllowed_EmptyAllowlistReturnsNil(t *testing.T) { + t.Parallel() + in := Baggage{"agent.id": "builder"} + if got := FilterAllowed(in, nil); got != nil { + t.Errorf("FilterAllowed(nil): got %v, want nil", got) + } + if got := FilterAllowed(in, []string{}); got != nil { + t.Errorf("FilterAllowed([]): got %v, want nil", got) + } +} + +func TestFilterAllowed_NormalisesAllowlistCase(t *testing.T) { + t.Parallel() + in := Baggage{"agent.id": "builder"} + got := FilterAllowed(in, []string{" Agent.Id "}) + if got["agent.id"] != "builder" { + t.Errorf("case-normalised allowlist: got %v", got) + } +} + +// Stable ordering check — operators may rely on it for log search, so worth +// asserting that the iteration is at least deterministic per input. +func TestFormatBaggageHeader_StableOrderForKnownInput(t *testing.T) { + t.Parallel() + in := Baggage{"a": "1", "b": "2", "c": "3"} + header := FormatBaggageHeader(in) + parts := strings.Split(header, ",") + sort.Strings(parts) + expected := []string{"a=1", "b=2", "c=3"} + if !reflect.DeepEqual(parts, expected) { + t.Errorf("sorted parts: got %v, want %v", parts, expected) + } +} + +func TestWithBaggage_RoundTripsThroughContext(t *testing.T) { + t.Parallel() + in := Baggage{"agent.id": "builder"} + ctx := WithBaggage(nil, in) //nolint:staticcheck // nil parent context is supported + out := BaggageFromContext(ctx) + if !reflect.DeepEqual(out, in) { + t.Errorf("context round-trip: got %v, want %v", out, in) + } +} + +func TestBaggageFromContext_NilWhenEmpty(t *testing.T) { + t.Parallel() + if got := BaggageFromContext(nil); got != nil { //nolint:staticcheck + t.Errorf("nil ctx: got %v, want nil", got) + } +} diff --git a/internal/otelplugin/config.go b/internal/otelplugin/config.go new file mode 100644 index 00000000000..b4d7486674d --- /dev/null +++ b/internal/otelplugin/config.go @@ -0,0 +1,259 @@ +// Package otelplugin implements a CLIProxyAPI usage.Plugin that exports +// one OpenTelemetry span per upstream LLM call. It complements the +// in-memory LoggerPlugin and the Redis queue plugin: the records flow +// through the same coreusage.Plugin pipeline, no proxy internals change. +// +// Two surfaces ship with this package: +// +// 1. Plugin (HandleUsage) — emits a `gen_ai.request` span per record. +// Span attributes follow the emerging OpenTelemetry GenAI semantic +// conventions and carry caller-supplied baggage identity when a Gin +// middleware extracts a W3C `baggage:` header upstream. +// +// 2. Baggage middleware — Gin middleware that parses the inbound +// `baggage:` header, stores it on the request context, and (when +// propagation is enabled) forwards an allowlisted subset to the +// upstream provider. +// +// Config is sourced from the existing yaml/env config surface; toggles +// mirror the redisqueue plugin pattern so operators have a consistent +// enable/disable + management-API contract. +package otelplugin + +import ( + "strings" + "sync" + "sync/atomic" + "time" +) + +// Config bundles the optional configuration block plugin authors may attach +// to the existing CLIProxyAPI config. Embedding it into a top-level +// `telemetry` key keeps the change small for operators who do not opt in. +// +// Example (config.yaml): +// +// telemetry: +// otlp: +// enabled: true +// endpoint: "http://127.0.0.1:4318" +// protocol: "http/protobuf" +// service: +// name: "cli-proxy-api" +// namespace: "agent-platform" +// span: +// name: "gen_ai.request" +// include_baggage_keys: ["agent.id", "workload.kind"] +// baggage: +// propagation: "allowlist" # off | propagate | allowlist +// allowed_keys: ["agent.id", "workload.kind"] +// cost: +// enabled: false # optional pricing-table companion +type Config struct { + OTLP OTLPConfig `yaml:"otlp" json:"otlp"` + Baggage BaggageConfig `yaml:"baggage" json:"baggage"` + Cost CostConfig `yaml:"cost" json:"cost"` +} + +// OTLPConfig configures the trace exporter. +type OTLPConfig struct { + Enabled bool `yaml:"enabled" json:"enabled"` + Endpoint string `yaml:"endpoint" json:"endpoint"` + Protocol string `yaml:"protocol" json:"protocol"` // http/protobuf or grpc + Headers map[string]string `yaml:"headers" json:"headers"` + Service ServiceConfig `yaml:"service" json:"service"` + Span SpanConfig `yaml:"span" json:"span"` +} + +// ServiceConfig describes the OpenTelemetry resource attributes attached to +// every span. service.name / service.namespace land on the Resource so all +// downstream backends group records by emitter without per-span attributes. +type ServiceConfig struct { + Name string `yaml:"name" json:"name"` + Namespace string `yaml:"namespace" json:"namespace"` +} + +// SpanConfig controls per-span attribute selection. +// +// - Name defaults to "gen_ai.request" (OpenTelemetry GenAI semconv). +// Operators preferring the local "llm.request" convention can set it. +// - IncludeBaggageKeys is the allowlist of baggage keys that are copied +// verbatim onto span attributes. Empty means "all baggage keys". +// - IncludeUsage / IncludeCost default to true; setting them false produces +// an attribution-only span useful when token counts come from a separate +// emitter (rare). +type SpanConfig struct { + Name string `yaml:"name" json:"name"` + IncludeBaggageKeys []string `yaml:"include_baggage_keys" json:"include_baggage_keys"` + IncludeUsage *bool `yaml:"include_usage,omitempty" json:"include_usage,omitempty"` + IncludeCost *bool `yaml:"include_cost,omitempty" json:"include_cost,omitempty"` +} + +// BaggagePropagationMode mirrors the W3C Baggage propagation modes the SDK +// supports — off is the safe default for a trusted-boundary proxy. +type BaggagePropagationMode string + +const ( + BaggageOff BaggagePropagationMode = "off" + BaggagePropagate BaggagePropagationMode = "propagate" + BaggageAllowlist BaggagePropagationMode = "allowlist" +) + +// BaggageConfig configures W3C Baggage propagation policy. Always-on parsing +// (so plugin attributes still see the inbound keys) — propagation controls +// the *outbound* re-emission only. +type BaggageConfig struct { + Propagation BaggagePropagationMode `yaml:"propagation" json:"propagation"` + AllowedKeys []string `yaml:"allowed_keys" json:"allowed_keys"` +} + +// CostConfig holds an optional per-million-token pricing table. When +// disabled, the plugin emits gen_ai.usage.* attributes but no cost.usd. +type CostConfig struct { + Enabled bool `yaml:"enabled" json:"enabled"` + Pricing map[string]ModelPricingUSD `yaml:"pricing" json:"pricing"` +} + +// ModelPricingUSD is the per-million-token pricing table for one model id. +type ModelPricingUSD struct { + InputPerMillion float64 `yaml:"input_per_million" json:"input_per_million"` + OutputPerMillion float64 `yaml:"output_per_million" json:"output_per_million"` + CacheReadPerMillion float64 `yaml:"cache_read_per_million" json:"cache_read_per_million"` + CacheCreationPerMillion float64 `yaml:"cache_creation_per_million" json:"cache_creation_per_million"` +} + +// ---- Runtime config snapshot ------------------------------------------------ +// +// The plugin reads the active config from an atomic.Value rather than from a +// global mutex so HandleUsage stays lock-free on the hot path. SetConfig is +// the only writer; it is called from the config loader at startup and from +// the watcher on hot-reload (consistent with redisqueue.SetEnabled). + +var ( + activeConfig atomic.Value // stores *Config + enabled atomic.Bool +) + +func init() { + // Safe default: feature flag off until SetConfig is called. + enabled.Store(false) + activeConfig.Store(defaultConfig()) +} + +// SetConfig replaces the active configuration. The plugin's HandleUsage +// hot-path reads via atomic.Value.Load so this can be called concurrently +// without coordination. +// +// Returns the previous configuration so callers can diff for reload logging. +func SetConfig(cfg Config) Config { + prev := loadConfig() + cfg = applyDefaults(cfg) + activeConfig.Store(&cfg) + enabled.Store(cfg.OTLP.Enabled) + return *prev +} + +// CurrentConfig returns a copy of the active configuration. The copy is +// shallow on slices/maps to keep the call cheap; callers must treat the +// returned value as read-only. +func CurrentConfig() Config { + cfg := loadConfig() + return *cfg +} + +// Enabled reports whether the OTLP exporter is on. Callers (HandleUsage, +// middleware) use this as the cheap early-return guard. +func Enabled() bool { return enabled.Load() } + +// SetEnabled toggles the OTLP exporter without rewriting the entire config. +// Used by the management API and tests. +func SetEnabled(on bool) { + enabled.Store(on) + cfg := loadConfig() + updated := *cfg + updated.OTLP.Enabled = on + activeConfig.Store(&updated) +} + +// loadConfig returns the active config pointer; never nil after init(). +func loadConfig() *Config { + raw := activeConfig.Load() + if raw == nil { + return defaultConfig() + } + cfg, ok := raw.(*Config) + if !ok || cfg == nil { + return defaultConfig() + } + return cfg +} + +// defaultConfig returns the package's built-in defaults. The exporter is +// off until SetConfig is called. +func defaultConfig() *Config { + cfg := applyDefaults(Config{}) + return &cfg +} + +func applyDefaults(cfg Config) Config { + if strings.TrimSpace(cfg.OTLP.Endpoint) == "" { + cfg.OTLP.Endpoint = "http://127.0.0.1:4318" + } + if strings.TrimSpace(cfg.OTLP.Protocol) == "" { + cfg.OTLP.Protocol = "http/protobuf" + } + if strings.TrimSpace(cfg.OTLP.Service.Name) == "" { + cfg.OTLP.Service.Name = "cli-proxy-api" + } + if strings.TrimSpace(cfg.OTLP.Span.Name) == "" { + cfg.OTLP.Span.Name = "gen_ai.request" + } + if cfg.Baggage.Propagation == "" { + cfg.Baggage.Propagation = BaggageOff + } + return cfg +} + +// ---- Exporter lifetime ------------------------------------------------------ +// +// The OTel exporter is the only piece that has lifecycle state (HTTP client, +// batch span processor goroutine). We hold it behind a mutex and reset it +// whenever SetConfig changes the OTLP endpoint or protocol. + +type exporterRef struct { + mu sync.Mutex + shutdown func(timeout time.Duration) error +} + +var sharedExporter = &exporterRef{} + +func (e *exporterRef) reset(shutdown func(timeout time.Duration) error) { + if e == nil { + return + } + e.mu.Lock() + prev := e.shutdown + e.shutdown = shutdown + e.mu.Unlock() + if prev != nil { + _ = prev(5 * time.Second) + } +} + +func (e *exporterRef) close() { + if e == nil { + return + } + e.mu.Lock() + prev := e.shutdown + e.shutdown = nil + e.mu.Unlock() + if prev != nil { + _ = prev(5 * time.Second) + } +} + +// Shutdown flushes pending spans and releases the exporter. Wire this to the +// proxy runtime's shutdown signal handler so deployments lose no telemetry on +// SIGTERM/SIGINT. +func Shutdown() { sharedExporter.close() } diff --git a/internal/otelplugin/middleware.go b/internal/otelplugin/middleware.go new file mode 100644 index 00000000000..cb44642801d --- /dev/null +++ b/internal/otelplugin/middleware.go @@ -0,0 +1,86 @@ +package otelplugin + +import ( + "net/http" + "strings" + + "github.com/gin-gonic/gin" +) + +// HeaderBaggage is the canonical inbound header name. Case-insensitive in Go's +// net/http, but operators querying logs occasionally look for the exact form. +const HeaderBaggage = "Baggage" + +// Middleware returns a Gin middleware that parses the inbound `baggage:` +// header and stores it on the request context. The plugin's HandleUsage reads +// it back via BaggageFromContext when building span attributes. +// +// Wiring (server-side, optional opt-in): +// +// engine.Use(otelplugin.Middleware()) +// +// Always-on parsing — the propagation policy controls only what flows +// upstream from here on the *outbound* hop, not whether we capture the +// inbound envelope. +// +// Safe to register unconditionally even when the OTLP exporter is disabled: +// parsing a missing or empty header is cheap and produces nil baggage. +func Middleware() gin.HandlerFunc { + return func(c *gin.Context) { + header := c.GetHeader(HeaderBaggage) + if header == "" { + c.Next() + return + } + b := ParseBaggageHeader(header) + if len(b) == 0 { + c.Next() + return + } + ctx := WithBaggage(c.Request.Context(), b) + c.Request = c.Request.WithContext(ctx) + c.Next() + } +} + +// PropagateBaggage applies the configured propagation policy to an outbound +// request. Call this from the upstream client construction site (where the +// proxy builds the request that will hit the LLM provider). Returns the +// header value (or empty string to omit the header entirely). +// +// Modes: +// - off : never propagate. Returns "" regardless of input. Safe default. +// - propagate : forward the entire inbound baggage verbatim. +// - allowlist : forward only keys in BaggageConfig.AllowedKeys. +func PropagateBaggage(inbound Baggage) string { + cfg := loadConfig() + switch cfg.Baggage.Propagation { + case BaggagePropagate: + return FormatBaggageHeader(inbound) + case BaggageAllowlist: + return FormatBaggageHeader(FilterAllowed(inbound, cfg.Baggage.AllowedKeys)) + case BaggageOff, "": + return "" + default: + return "" + } +} + +// ApplyOutbound is a convenience helper that sets the `baggage:` header on an +// outbound *http.Request per the configured propagation policy. No-op when +// propagation is off or the inbound envelope is empty. +func ApplyOutbound(req *http.Request, inbound Baggage) { + if req == nil { + return + } + value := PropagateBaggage(inbound) + if value == "" { + return + } + req.Header.Set(HeaderBaggage, value) +} + +// HeaderName re-exports HeaderBaggage with the lowercase form some operators +// prefer to log against. Use net/http header normalisation (Canonical-Case) +// when actually setting headers; this is a string constant only. +func HeaderName() string { return strings.ToLower(HeaderBaggage) } diff --git a/internal/otelplugin/middleware_test.go b/internal/otelplugin/middleware_test.go new file mode 100644 index 00000000000..ce23ab6443f --- /dev/null +++ b/internal/otelplugin/middleware_test.go @@ -0,0 +1,158 @@ +package otelplugin + +import ( + "net/http" + "net/http/httptest" + "reflect" + "testing" + + "github.com/gin-gonic/gin" +) + +// Middleware parses inbound baggage and stores it on the request context. +// Downstream handlers (or the plugin's HandleUsage via context) pick it up +// with BaggageFromContext. +func TestMiddleware_StoresParsedBaggageOnRequestContext(t *testing.T) { + t.Parallel() + gin.SetMode(gin.TestMode) + var captured Baggage + router := gin.New() + router.Use(Middleware()) + router.GET("/test", func(c *gin.Context) { + captured = BaggageFromContext(c.Request.Context()) + c.String(http.StatusOK, "ok") + }) + + req := httptest.NewRequest(http.MethodGet, "/test", nil) + req.Header.Set(HeaderBaggage, "agent.id=builder,workload.kind=chat-turn") + rec := httptest.NewRecorder() + router.ServeHTTP(rec, req) + + want := Baggage{"agent.id": "builder", "workload.kind": "chat-turn"} + if !reflect.DeepEqual(captured, want) { + t.Errorf("captured baggage: got %v, want %v", captured, want) + } +} + +func TestMiddleware_NoBaggageHeaderLeavesContextUntouched(t *testing.T) { + t.Parallel() + gin.SetMode(gin.TestMode) + var captured Baggage + router := gin.New() + router.Use(Middleware()) + router.GET("/test", func(c *gin.Context) { + captured = BaggageFromContext(c.Request.Context()) + c.String(http.StatusOK, "ok") + }) + + req := httptest.NewRequest(http.MethodGet, "/test", nil) + rec := httptest.NewRecorder() + router.ServeHTTP(rec, req) + + if captured != nil { + t.Errorf("no header should yield nil baggage; got %v", captured) + } +} + +func TestMiddleware_MalformedHeaderLeavesContextUntouched(t *testing.T) { + t.Parallel() + gin.SetMode(gin.TestMode) + var captured Baggage + router := gin.New() + router.Use(Middleware()) + router.GET("/test", func(c *gin.Context) { + captured = BaggageFromContext(c.Request.Context()) + c.String(http.StatusOK, "ok") + }) + + req := httptest.NewRequest(http.MethodGet, "/test", nil) + req.Header.Set(HeaderBaggage, "garbage,no-equals-sign") + rec := httptest.NewRecorder() + router.ServeHTTP(rec, req) + + if captured != nil { + t.Errorf("malformed header should yield nil baggage; got %v", captured) + } +} + +// PropagateBaggage applies the configured propagation policy. We verify each +// mode produces the expected output for the same input. +func TestPropagateBaggage_OffEmitsNothing(t *testing.T) { + prev := CurrentConfig() + defer SetConfig(prev) //nolint:errcheck + + SetConfig(Config{Baggage: BaggageConfig{Propagation: BaggageOff}}) + in := Baggage{"agent.id": "builder"} + if got := PropagateBaggage(in); got != "" { + t.Errorf("off mode: got %q, want \"\"", got) + } +} + +func TestPropagateBaggage_PropagateEmitsAllKeys(t *testing.T) { + prev := CurrentConfig() + defer SetConfig(prev) //nolint:errcheck + + SetConfig(Config{Baggage: BaggageConfig{Propagation: BaggagePropagate}}) + in := Baggage{"agent.id": "builder", "workload.kind": "chat-turn"} + got := PropagateBaggage(in) + if got == "" { + t.Fatal("propagate mode: got empty, want non-empty") + } + out := ParseBaggageHeader(got) + if !reflect.DeepEqual(out, in) { + t.Errorf("propagate round-trip: got %v, want %v", out, in) + } +} + +func TestPropagateBaggage_AllowlistFiltersKeys(t *testing.T) { + prev := CurrentConfig() + defer SetConfig(prev) //nolint:errcheck + + SetConfig(Config{Baggage: BaggageConfig{ + Propagation: BaggageAllowlist, + AllowedKeys: []string{"agent.id"}, + }}) + in := Baggage{"agent.id": "builder", "secret": "shh"} + got := PropagateBaggage(in) + if got == "" { + t.Fatal("allowlist mode: got empty, want non-empty") + } + out := ParseBaggageHeader(got) + want := Baggage{"agent.id": "builder"} + if !reflect.DeepEqual(out, want) { + t.Errorf("allowlist filter: got %v, want %v", out, want) + } +} + +func TestApplyOutbound_SetsHeaderWhenPolicyEmits(t *testing.T) { + prev := CurrentConfig() + defer SetConfig(prev) //nolint:errcheck + + SetConfig(Config{Baggage: BaggageConfig{Propagation: BaggagePropagate}}) + req := httptest.NewRequest(http.MethodPost, "https://example.com/v1/messages", nil) + ApplyOutbound(req, Baggage{"agent.id": "builder"}) + if req.Header.Get(HeaderBaggage) == "" { + t.Error("expected outbound header to be set") + } +} + +func TestApplyOutbound_NoopWhenOff(t *testing.T) { + prev := CurrentConfig() + defer SetConfig(prev) //nolint:errcheck + + SetConfig(Config{Baggage: BaggageConfig{Propagation: BaggageOff}}) + req := httptest.NewRequest(http.MethodPost, "https://example.com/v1/messages", nil) + ApplyOutbound(req, Baggage{"agent.id": "builder"}) + if v := req.Header.Get(HeaderBaggage); v != "" { + t.Errorf("off mode: header should not be set; got %q", v) + } +} + +func TestApplyOutbound_NilRequestSafe(t *testing.T) { + prev := CurrentConfig() + defer SetConfig(prev) //nolint:errcheck + + SetConfig(Config{Baggage: BaggageConfig{Propagation: BaggagePropagate}}) + // Should not panic on a nil request. + ApplyOutbound(nil, Baggage{"agent.id": "builder"}) +} diff --git a/internal/otelplugin/plugin.go b/internal/otelplugin/plugin.go new file mode 100644 index 00000000000..b37f091c440 --- /dev/null +++ b/internal/otelplugin/plugin.go @@ -0,0 +1,301 @@ +package otelplugin + +import ( + "context" + "strings" + "sync" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.26.0" + "go.opentelemetry.io/otel/trace" + + coreusage "github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/usage" +) + +// Tracer name for spans this package emits. Operators querying by tracer get +// only proxy-generated spans, not anything traced inside the OTel SDK itself. +const tracerName = "github.com/router-for-me/CLIProxyAPI/internal/otelplugin" + +// init() registers the OTLP plugin with the usage manager. Registration is +// always performed; the plugin's HandleUsage is a no-op while Enabled() is +// false (default until SetConfig flips it on). +func init() { + coreusage.RegisterPlugin(NewPlugin()) +} + +// NewPlugin returns a usage.Plugin that emits one OTel span per upstream +// LLM call. Safe to construct directly in tests. +func NewPlugin() *Plugin { return &Plugin{} } + +// Plugin implements coreusage.Plugin. Stateless on the receiver — all mutable +// state lives in package-level atomics (config, tracer provider) so multiple +// plugin instances are safe. +type Plugin struct{} + +// HandleUsage implements coreusage.Plugin. Emits a single span describing the +// completed upstream call, with attribute namespaces: +// +// - gen_ai.* OpenTelemetry GenAI semantic conventions (model, +// token counts, system identifier). +// - cost.* local convention; cost.usd plus per-component +// breakdown when CostConfig.Enabled is true. +// - agent.* / workload.* per-request baggage keys forwarded from the +// request context (set by the baggage middleware). +// +// Lock-free fast path: a single atomic load gates the call entirely when the +// exporter is off. When on, span construction is contention-free per call. +func (p *Plugin) HandleUsage(ctx context.Context, record coreusage.Record) { + if p == nil || !Enabled() { + return + } + tracer := tracerFor(loadConfig()) + if tracer == nil { + return + } + cfg := loadConfig() + startTime := startTimeFor(record) + _, span := tracer.Start(ctx, cfg.OTLP.Span.Name, + trace.WithTimestamp(startTime), + trace.WithSpanKind(trace.SpanKindClient), + ) + span.SetAttributes(usageAttributes(record, cfg)...) + span.SetAttributes(baggageAttributes(BaggageFromContext(ctx), cfg)...) + if cfg.Cost.Enabled { + span.SetAttributes(costAttributes(record, cfg)...) + } + if record.Failed { + span.SetStatus(codeError, failureMessage(record.Fail)) + } else { + span.SetStatus(codeOk, "") + } + endTime := startTime.Add(record.Latency) + if record.Latency <= 0 { + endTime = time.Now() + } + span.End(trace.WithTimestamp(endTime)) +} + +// ---- tracer wiring ---------------------------------------------------------- + +var ( + tracerMu sync.Mutex + tracerActive trace.Tracer + tracerEndpt string + tracerProto string +) + +// tracerFor lazily constructs the global tracer + exporter on first use. The +// guard mutex serialises construction; subsequent reads are lock-free because +// trace.Tracer is itself a goroutine-safe handle. +func tracerFor(cfg *Config) trace.Tracer { + tracerMu.Lock() + defer tracerMu.Unlock() + if tracerActive != nil && + tracerEndpt == cfg.OTLP.Endpoint && + tracerProto == cfg.OTLP.Protocol { + return tracerActive + } + tracer, shutdown, err := buildTracer(cfg) + if err != nil { + return nil + } + sharedExporter.reset(shutdown) + tracerActive = tracer + tracerEndpt = cfg.OTLP.Endpoint + tracerProto = cfg.OTLP.Protocol + return tracerActive +} + +func buildTracer(cfg *Config) (trace.Tracer, func(time.Duration) error, error) { + exporter, err := newOtlpExporter(cfg) + if err != nil { + return nil, nil, err + } + processor := sdktrace.NewBatchSpanProcessor(exporter, + sdktrace.WithMaxQueueSize(2048), + sdktrace.WithMaxExportBatchSize(128), + sdktrace.WithBatchTimeout(5*time.Second), + ) + res, err := resourceFor(cfg) + if err != nil { + _ = processor.Shutdown(context.Background()) + return nil, nil, err + } + provider := sdktrace.NewTracerProvider( + sdktrace.WithResource(res), + sdktrace.WithSpanProcessor(processor), + ) + // Replace the global provider so the rest of the proxy (if it adopts + // otel.GetTracerProvider() anywhere) sees the same configured emitter. + otel.SetTracerProvider(provider) + tracer := provider.Tracer(tracerName) + shutdown := func(timeout time.Duration) error { + shutdownCtx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + return provider.Shutdown(shutdownCtx) + } + return tracer, shutdown, nil +} + +func newOtlpExporter(cfg *Config) (sdktrace.SpanExporter, error) { + // HTTP/protobuf is the only protocol wired in the initial cut. gRPC support + // is one extra import + parallel switch; deferred until an operator asks. + opts := []otlptracehttp.Option{ + otlptracehttp.WithEndpointURL(cfg.OTLP.Endpoint), + } + if len(cfg.OTLP.Headers) > 0 { + opts = append(opts, otlptracehttp.WithHeaders(cfg.OTLP.Headers)) + } + return otlptracehttp.New(context.Background(), opts...) +} + +func resourceFor(cfg *Config) (*resource.Resource, error) { + attrs := []attribute.KeyValue{ + semconv.ServiceName(serviceName(cfg)), + } + if ns := strings.TrimSpace(cfg.OTLP.Service.Namespace); ns != "" { + attrs = append(attrs, semconv.ServiceNamespace(ns)) + } + return resource.Merge( + resource.Default(), + resource.NewWithAttributes(semconv.SchemaURL, attrs...), + ) +} + +func serviceName(cfg *Config) string { + name := strings.TrimSpace(cfg.OTLP.Service.Name) + if name == "" { + name = "cli-proxy-api" + } + return name +} + +// ---- attribute construction ------------------------------------------------- + +func usageAttributes(record coreusage.Record, cfg *Config) []attribute.KeyValue { + out := []attribute.KeyValue{ + attribute.String("gen_ai.system", record.Provider), + attribute.String("gen_ai.request.model", record.Model), + } + if alias := strings.TrimSpace(record.Alias); alias != "" { + out = append(out, attribute.String("gen_ai.request.model_alias", alias)) + } + if effort := strings.TrimSpace(record.ReasoningEffort); effort != "" { + out = append(out, attribute.String("gen_ai.request.reasoning_effort", effort)) + } + includeUsage := cfg.OTLP.Span.IncludeUsage == nil || *cfg.OTLP.Span.IncludeUsage + if includeUsage { + out = append(out, + attribute.Int64("gen_ai.usage.input_tokens", record.Detail.InputTokens), + attribute.Int64("gen_ai.usage.output_tokens", record.Detail.OutputTokens), + ) + if record.Detail.CacheReadTokens > 0 { + out = append(out, attribute.Int64("gen_ai.usage.cache_read_input_tokens", record.Detail.CacheReadTokens)) + } + if record.Detail.CacheCreationTokens > 0 { + out = append(out, attribute.Int64("gen_ai.usage.cache_creation_input_tokens", record.Detail.CacheCreationTokens)) + } + if record.Detail.ReasoningTokens > 0 { + out = append(out, attribute.Int64("gen_ai.usage.reasoning_tokens", record.Detail.ReasoningTokens)) + } + if record.Detail.TotalTokens > 0 { + out = append(out, attribute.Int64("gen_ai.usage.total_tokens", record.Detail.TotalTokens)) + } + } + if record.Failed { + out = append(out, attribute.Int("gen_ai.response.status_code", record.Fail.StatusCode)) + } + if record.Source != "" { + out = append(out, attribute.String("gen_ai.request.source", record.Source)) + } + return out +} + +func baggageAttributes(b Baggage, cfg *Config) []attribute.KeyValue { + if len(b) == 0 { + return nil + } + allowed := cfg.OTLP.Span.IncludeBaggageKeys + if len(allowed) == 0 { + out := make([]attribute.KeyValue, 0, len(b)) + for k, v := range b { + out = append(out, attribute.String(k, v)) + } + return out + } + out := make([]attribute.KeyValue, 0, len(allowed)) + for _, k := range allowed { + k = strings.ToLower(strings.TrimSpace(k)) + if k == "" { + continue + } + if v, ok := b[k]; ok { + out = append(out, attribute.String(k, v)) + } + } + return out +} + +func costAttributes(record coreusage.Record, cfg *Config) []attribute.KeyValue { + pricing, ok := cfg.Cost.Pricing[record.Model] + if !ok { + // Try a prefix match — operators frequently use one entry per model + // family (e.g. "claude-sonnet") rather than every variant suffix. + for model, p := range cfg.Cost.Pricing { + if strings.HasPrefix(record.Model, model) { + pricing = p + ok = true + break + } + } + } + if !ok { + return nil + } + const million = 1_000_000.0 + input := float64(record.Detail.InputTokens) * pricing.InputPerMillion / million + output := float64(record.Detail.OutputTokens) * pricing.OutputPerMillion / million + cacheRead := float64(record.Detail.CacheReadTokens) * pricing.CacheReadPerMillion / million + cacheCreation := float64(record.Detail.CacheCreationTokens) * pricing.CacheCreationPerMillion / million + total := input + output + cacheRead + cacheCreation + includeCost := cfg.OTLP.Span.IncludeCost == nil || *cfg.OTLP.Span.IncludeCost + if !includeCost { + return nil + } + return []attribute.KeyValue{ + attribute.Float64("cost.usd", total), + attribute.Float64("cost.input_usd", input), + attribute.Float64("cost.output_usd", output), + attribute.Float64("cost.cache_read_usd", cacheRead), + attribute.Float64("cost.cache_creation_usd", cacheCreation), + } +} + +// ---- helpers ---------------------------------------------------------------- + +func startTimeFor(record coreusage.Record) time.Time { + if !record.RequestedAt.IsZero() { + return record.RequestedAt + } + if record.Latency > 0 { + return time.Now().Add(-record.Latency) + } + return time.Now() +} + +func failureMessage(f coreusage.Failure) string { + if f.Body == "" { + return "" + } + // Cap body length so we never ship an unbounded payload into a span. The + // span has the status code already; the body is a hint for humans only. + if len(f.Body) > 512 { + return f.Body[:512] + } + return f.Body +} diff --git a/internal/otelplugin/plugin_test.go b/internal/otelplugin/plugin_test.go new file mode 100644 index 00000000000..b9ebe0c79ba --- /dev/null +++ b/internal/otelplugin/plugin_test.go @@ -0,0 +1,256 @@ +package otelplugin + +import ( + "context" + "reflect" + "sort" + "strings" + "testing" + "time" + + "go.opentelemetry.io/otel/attribute" + + coreusage "github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/usage" +) + +// The hot-path attribute builders are pure functions over (Record, Config). +// Testing them directly gives full coverage of the mapping rules without +// needing an OTLP collector running. The exporter wiring is exercised in a +// separate integration test (see TestPlugin_LifecycleSmoke at the bottom). + +func TestUsageAttributes_GenAiSemconvMapping(t *testing.T) { + t.Parallel() + cfg := defaultConfig() + record := coreusage.Record{ + Provider: "anthropic", + Model: "claude-opus-4-7", + Alias: "chatplan-opus", + Detail: coreusage.Detail{ + InputTokens: 1234, + OutputTokens: 567, + CacheReadTokens: 89, + CacheCreationTokens: 12, + ReasoningTokens: 34, + TotalTokens: 1936, + }, + Source: "chatplan", + ReasoningEffort: "medium", + } + attrs := keyValueMap(usageAttributes(record, cfg)) + assertStringAttr(t, attrs, "gen_ai.system", "anthropic") + assertStringAttr(t, attrs, "gen_ai.request.model", "claude-opus-4-7") + assertStringAttr(t, attrs, "gen_ai.request.model_alias", "chatplan-opus") + assertStringAttr(t, attrs, "gen_ai.request.reasoning_effort", "medium") + assertStringAttr(t, attrs, "gen_ai.request.source", "chatplan") + assertInt64Attr(t, attrs, "gen_ai.usage.input_tokens", 1234) + assertInt64Attr(t, attrs, "gen_ai.usage.output_tokens", 567) + assertInt64Attr(t, attrs, "gen_ai.usage.cache_read_input_tokens", 89) + assertInt64Attr(t, attrs, "gen_ai.usage.cache_creation_input_tokens", 12) + assertInt64Attr(t, attrs, "gen_ai.usage.reasoning_tokens", 34) + assertInt64Attr(t, attrs, "gen_ai.usage.total_tokens", 1936) +} + +func TestUsageAttributes_OptionalFieldsOmittedWhenZero(t *testing.T) { + t.Parallel() + cfg := defaultConfig() + record := coreusage.Record{ + Provider: "anthropic", + Model: "claude-haiku-4-5", + Detail: coreusage.Detail{InputTokens: 10, OutputTokens: 5}, + } + attrs := keyValueMap(usageAttributes(record, cfg)) + if _, ok := attrs["gen_ai.usage.cache_read_input_tokens"]; ok { + t.Error("cache_read_input_tokens should be omitted when zero") + } + if _, ok := attrs["gen_ai.request.model_alias"]; ok { + t.Error("model_alias should be omitted when empty") + } +} + +func TestUsageAttributes_IncludeUsageFalseSuppressesTokenCounts(t *testing.T) { + t.Parallel() + off := false + cfg := applyDefaults(Config{OTLP: OTLPConfig{Span: SpanConfig{IncludeUsage: &off}}}) + record := coreusage.Record{ + Provider: "anthropic", + Model: "claude-opus-4-7", + Detail: coreusage.Detail{InputTokens: 1234, OutputTokens: 567}, + } + attrs := keyValueMap(usageAttributes(record, &cfg)) + if _, ok := attrs["gen_ai.usage.input_tokens"]; ok { + t.Error("token counts should be suppressed when IncludeUsage=false") + } + assertStringAttr(t, attrs, "gen_ai.system", "anthropic") // provider still set +} + +func TestBaggageAttributes_DefaultIncludeAllKeys(t *testing.T) { + t.Parallel() + cfg := defaultConfig() + b := Baggage{"agent.id": "builder", "workload.kind": "chat-turn", "session.id": "abc"} + attrs := keyValueMap(baggageAttributes(b, cfg)) + if len(attrs) != 3 { + t.Errorf("default span config should emit all baggage keys; got %d", len(attrs)) + } +} + +func TestBaggageAttributes_AllowlistFilters(t *testing.T) { + t.Parallel() + cfg := applyDefaults(Config{OTLP: OTLPConfig{Span: SpanConfig{ + IncludeBaggageKeys: []string{"agent.id"}, + }}}) + b := Baggage{"agent.id": "builder", "secret": "shh"} + attrs := keyValueMap(baggageAttributes(b, &cfg)) + if len(attrs) != 1 { + t.Errorf("allowlist should restrict baggage attrs; got %d", len(attrs)) + } + assertStringAttr(t, attrs, "agent.id", "builder") +} + +func TestCostAttributes_PricingTableLookup(t *testing.T) { + t.Parallel() + cfg := applyDefaults(Config{Cost: CostConfig{ + Enabled: true, + Pricing: map[string]ModelPricingUSD{ + "claude-opus-4-7": { + InputPerMillion: 15, + OutputPerMillion: 75, + CacheReadPerMillion: 1.5, + CacheCreationPerMillion: 18.75, + }, + }, + }}) + record := coreusage.Record{ + Model: "claude-opus-4-7", + Detail: coreusage.Detail{ + InputTokens: 1_000_000, + OutputTokens: 500_000, + CacheReadTokens: 200_000, + CacheCreationTokens: 100_000, + }, + } + attrs := keyValueMap(costAttributes(record, &cfg)) + assertFloatAttr(t, attrs, "cost.input_usd", 15.0) + assertFloatAttr(t, attrs, "cost.output_usd", 37.5) + assertFloatAttr(t, attrs, "cost.cache_read_usd", 0.3) + assertFloatAttr(t, attrs, "cost.cache_creation_usd", 1.875) + assertFloatAttr(t, attrs, "cost.usd", 15.0+37.5+0.3+1.875) +} + +func TestCostAttributes_PrefixMatchFallsBack(t *testing.T) { + t.Parallel() + cfg := applyDefaults(Config{Cost: CostConfig{ + Enabled: true, + Pricing: map[string]ModelPricingUSD{ + "claude-sonnet": {InputPerMillion: 3, OutputPerMillion: 15}, + }, + }}) + record := coreusage.Record{ + Model: "claude-sonnet-4-6", // exact key missing; prefix matches + Detail: coreusage.Detail{InputTokens: 1_000_000, OutputTokens: 0}, + } + attrs := keyValueMap(costAttributes(record, &cfg)) + assertFloatAttr(t, attrs, "cost.input_usd", 3.0) +} + +func TestCostAttributes_NoEntryReturnsNil(t *testing.T) { + t.Parallel() + cfg := applyDefaults(Config{Cost: CostConfig{Enabled: true, Pricing: map[string]ModelPricingUSD{}}}) + if got := costAttributes(coreusage.Record{Model: "unknown"}, &cfg); got != nil { + t.Errorf("unknown model should yield nil cost attrs; got %v", got) + } +} + +func TestCostAttributes_IncludeCostFalseSuppresses(t *testing.T) { + t.Parallel() + off := false + cfg := applyDefaults(Config{ + OTLP: OTLPConfig{Span: SpanConfig{IncludeCost: &off}}, + Cost: CostConfig{Enabled: true, Pricing: map[string]ModelPricingUSD{ + "claude-opus-4-7": {InputPerMillion: 15}, + }}, + }) + record := coreusage.Record{Model: "claude-opus-4-7", Detail: coreusage.Detail{InputTokens: 1_000_000}} + if got := costAttributes(record, &cfg); got != nil { + t.Errorf("IncludeCost=false should suppress emission; got %v", got) + } +} + +func TestStartTimeFor_PreferenceOrder(t *testing.T) { + t.Parallel() + requested := time.Date(2026, 5, 25, 10, 0, 0, 0, time.UTC) + if got := startTimeFor(coreusage.Record{RequestedAt: requested}); !got.Equal(requested) { + t.Errorf("RequestedAt should win: got %v, want %v", got, requested) + } + now := time.Now() + got := startTimeFor(coreusage.Record{Latency: 100 * time.Millisecond}) + if delta := now.Sub(got); delta < 50*time.Millisecond || delta > 500*time.Millisecond { + t.Errorf("Latency-derived start should be now-Latency; got delta %v", delta) + } +} + +// Plugin lifecycle smoke: registering the plugin then disabling it produces no +// emitted spans. This is the "guard rail" — operators relying on the off-state +// for cost control need certainty that HandleUsage is a true no-op when off. +func TestPlugin_DisabledIsNoOp(t *testing.T) { + prev := CurrentConfig() + defer SetConfig(prev) //nolint:errcheck + + SetEnabled(false) + p := NewPlugin() + p.HandleUsage(context.Background(), coreusage.Record{Model: "x"}) + // No-op success means we did not block, did not panic, and did not need + // the OTLP collector. (We cannot assert "no span emitted" without + // intercepting the exporter, but the early-return guard makes that + // invariant trivial.) +} + +// ---- test helpers ----------------------------------------------------------- + +func keyValueMap(kvs []attribute.KeyValue) map[string]attribute.Value { + out := make(map[string]attribute.Value, len(kvs)) + for _, kv := range kvs { + out[string(kv.Key)] = kv.Value + } + return out +} + +func assertStringAttr(t *testing.T, attrs map[string]attribute.Value, key, want string) { + t.Helper() + got, ok := attrs[key] + if !ok { + t.Errorf("attribute %q missing", key) + return + } + if got.AsString() != want { + t.Errorf("attribute %q: got %q, want %q", key, got.AsString(), want) + } +} + +func assertInt64Attr(t *testing.T, attrs map[string]attribute.Value, key string, want int64) { + t.Helper() + got, ok := attrs[key] + if !ok { + t.Errorf("attribute %q missing", key) + return + } + if got.AsInt64() != want { + t.Errorf("attribute %q: got %d, want %d", key, got.AsInt64(), want) + } +} + +func assertFloatAttr(t *testing.T, attrs map[string]attribute.Value, key string, want float64) { + t.Helper() + got, ok := attrs[key] + if !ok { + t.Errorf("attribute %q missing", key) + return + } + if delta := got.AsFloat64() - want; delta < -1e-9 || delta > 1e-9 { + t.Errorf("attribute %q: got %v, want %v", key, got.AsFloat64(), want) + } +} + +// keepImportsUsed silences staticcheck warnings if any test helper goes unused +// in a subset run. Tests don't import strings/sort/reflect from outside; this +// is purely defensive against future trimming. +var _ = []any{strings.TrimSpace, sort.Strings, reflect.DeepEqual} diff --git a/internal/otelplugin/status.go b/internal/otelplugin/status.go new file mode 100644 index 00000000000..6cd346488c5 --- /dev/null +++ b/internal/otelplugin/status.go @@ -0,0 +1,12 @@ +package otelplugin + +import "go.opentelemetry.io/otel/codes" + +// Shorthand aliases to keep plugin.go free of the (otel-codes vs http-codes) +// confusion. The OTel trace package uses codes.Code for span status; HTTP +// status lives on attributes. Centralising the import here also makes the +// mocking surface in tests trivially small. +var ( + codeOk = codes.Ok + codeError = codes.Error +)