AI-3190 feat(stream): OTLP as first-class source type with per-signal sink routing#2598
AI-3190 feat(stream): OTLP as first-class source type with per-signal sink routing#2598Matovidlo wants to merge 29 commits into
Conversation
Add `SourceTypeOTLP` / `OTLPSource` alongside `SourceTypeHTTP` / `HTTPSource` so OTLP sources are a distinct entity in the definition layer rather than piggy-backing on the HTTP source type. Key changes: - `source_otlp.go`: `OTLPSource` struct, signal name constants (`OTLPSignalLogs/Metrics/Traces`), `FormatOTLPSourceURL` - `source_base.go`: add `OTLP *OTLPSource` field with `required_if` validation - `sink_base.go`: add `AllowedSignals []string` for per-signal routing - `dispatcher.go`: mirror filter and secret extraction extended to OTLP sources - `storage/test/source.go`: `NewOTLPSource` test factory Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- `recordctx.Context` gains `Signal() string` (returns "" for HTTP, "logs"/"metrics"/"traces" for OTLP records) - `fasthttp.go` / `http.go`: stub returning "" - `otlp.go`: carry signal through `FromOTLP`; add `TestOTLPContext_Signal` - `router.go`: skip sinks whose `AllowedSignals` does not include the record's signal - `collection.go`: store `allowedSignals` from the sink definition Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…tion, Retry-After - `handler.go`: pass signal constants to `handle()`; add `Retry-After: 1` header on 429 responses - `decode.go`: accept `application/protobuf` as alias for `application/x-protobuf` - `dispatch.go`: pass signal to `recordctx.FromOTLP`; count records as rejected when sinks return non-2xx (not just routing errors) - `response.go`: escalate to top-level HTTP status when ALL records are rejected (4xx for auth failures, 5xx/429 for transient failures) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- `design.go`: add `otlp` attribute and `OTLPSource` type to Goa DSL; register `SourceTypeOTLP` in the `SourceType` enum - `source_request.go` / `source_response.go` / `aggregation.go`: handle the new type in create, update, and list mappers - Regenerate all Goa/OpenAPI artefacts Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add otlpsource_test.go covering: - CORS preflight, unsupported content-type, bad project ID, wrong secret - Source disable / re-enable with dispatcher mirror propagation check - Empty batches for all three signals - Logs via protobuf, JSON, gzip, and application/protobuf alias - Metrics and traces via protobuf - Per-signal sink routing (AllowedSignals filtering) - Partial success / top-level error escalation when sinks fail Test helpers use real pdata marshalers (plog, pmetric, ptrace) to build payloads that match what actual OTel SDKs send. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- create-otlp-source: verifies OTLP source creation with type=otlp,
checks the /otlp/{project}/{source}/{secret} URL in the response,
and validates etcd contains secret + source type fields
- otlp-test-mapping: exercises the /test endpoint for OTLP sources;
verifies jsonnet Body() expressions extract severity_text, body,
timestamp, resource object fields (service.name) and nested
attribute keys (user.id) from a flat JSON test payload; also
confirms the correct 422 error when a required top-level field
(resource) is absent from the test body
Adds FromOTLPTestRequest to recordctx — parses the /test endpoint
body as a flat JSON OTLP record and builds an otlpContext so that
the same column renderer used for real traffic can be exercised
without a protobuf payload.
Updates TestSource in the API service to detect SourceTypeOTLP and
dispatch through FromOTLPTestRequest instead of FromHTTP.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Lint fixes (intrange, gci, staticcheck, unparam): - Convert C-style for loops over pdata slice lengths to Go 1.22+ integer range form (flatten_common, flatten_logs, flatten_metrics, flatten_traces and their tests) - Fix gci import ordering in source_base.go, response.go, otlpsource_test.go (standard / default / keboola-as-code groups) - Remove embedded RequestCtx selector in handler.go (c.RemoteIP() instead of c.RequestCtx.RemoteIP()) - Remove always-constant contentType param from doOTLPPost helper; inline "application/x-protobuf" directly E2E test fixes: - Add missing 001-create-source/expected-response.json for otlp-test-mapping (task creation response) - Add missing 002-poll-source/expected-http-code for create-otlp-source - Update create-005-invalid-type-400 expected message to include "otlp" in the list of valid source types Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add missing expected-http-code and expected-response.json files for poll steps in otlp-test-mapping: - 002-poll-source: expected-http-code + full task response - 003-create-sink: expected-response.json (task creation 202 body) - 004-poll-sink: expected-http-code + full task response Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Convert all uint32/uint64/typed-slice fields to go-jsonnet-compatible types (int64/[]any) to prevent "Not a json type" errors at record write time. Also add empty-batch source validation so wrong-secret requests get 404 instead of a free 200 on empty OTLP payloads. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…OTLP flatten These conversions are intentional — OTLP counter values that exceed int64 max are not expected in practice, and the cast is required for go-jsonnet type compatibility. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Align expected types with the go-jsonnet-compatible conversions introduced in the previous commit (uint64→int64, []uint64→[]any, []float64→[]any). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add allowedSignals field to the Goa sink design so each sink can be restricted to specific OTLP signal types (logs, metrics, traces). Empty means accept all signals (existing behaviour preserved). - api/stream/design.go: add allowedSignals attribute to SinkFields - gen/stream/service.go: regenerated — AllowedSignals on Create/Update/Read - mapper/sink_request.go: wire AllowedSignals on create and update - mapper/sink_response.go: include AllowedSignals in response - stream-otlp-setup.sh: pass signal filter per sink so each table receives only its own signal type Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…Signals mapper tests Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
@claude - Can you fix the unit tests on this one? |
|
@jordanrburger it's ok there are flaky test |
There was a problem hiding this comment.
Pull request overview
Adds OTLP as a first-class Stream source type and introduces per-signal sink routing so OTLP logs, metrics, and traces can be routed independently while preserving HTTP-source compatibility.
Changes:
- Adds OTLP source definitions, API/schema exposure, source URL formatting, dispatcher mirroring, and OTLP handler updates.
- Adds
allowedSignalsto sinks and routes records based onrecordctx.Context.Signal(). - Adds OTLP flattening/test coverage updates, API fixtures, and a setup helper script.
Reviewed changes
Copilot reviewed 64 out of 69 changed files in this pull request and generated 18 comments.
Show a summary per file
| File | Description |
|---|---|
| api/stream/design.go | Adds OTLP source and allowedSignals to Goa API design. |
| internal/pkg/service/stream/api/gen/http/stream/server/encode_decode.go | Regenerates HTTP encoding for OTLP/allowedSignals fields. |
| internal/pkg/service/stream/api/gen/http/stream/server/types.go | Regenerates HTTP request/response structs and validation. |
| internal/pkg/service/stream/api/gen/stream/service.go | Regenerates service API types. |
| internal/pkg/service/stream/api/mapper/aggregation.go | Maps OTLP URL for aggregated sources. |
| internal/pkg/service/stream/api/mapper/sink_allowed_signals_test.go | Adds mapper tests for sink signal filters. |
| internal/pkg/service/stream/api/mapper/sink_request.go | Maps allowedSignals from create/update payloads. |
| internal/pkg/service/stream/api/mapper/sink_response.go | Exposes allowedSignals in sink responses. |
| internal/pkg/service/stream/api/mapper/source_request.go | Creates/updates OTLP source entities. |
| internal/pkg/service/stream/api/mapper/source_response.go | Exposes OTLP source URLs. |
| internal/pkg/service/stream/api/openapi/openapi.yaml | Regenerates OpenAPI v2 schema. |
| internal/pkg/service/stream/api/openapi/openapi3.yaml | Regenerates OpenAPI v3 schema. |
| internal/pkg/service/stream/api/service/source.go | Handles OTLP source test requests as flat OTLP records. |
| internal/pkg/service/stream/definition/sink_base.go | Adds AllowedSignals to sink definitions. |
| internal/pkg/service/stream/definition/source_base.go | Adds OTLP-specific source config. |
| internal/pkg/service/stream/definition/source_otlp.go | Defines OTLP source type, signals, and URL formatting. |
| internal/pkg/service/stream/mapping/recordctx/fasthttp.go | Adds empty signal for HTTP records. |
| internal/pkg/service/stream/mapping/recordctx/http.go | Adds empty signal for HTTP records. |
| internal/pkg/service/stream/mapping/recordctx/otlp.go | Adds OTLP signal storage and test-request context creation. |
| internal/pkg/service/stream/mapping/recordctx/otlp_test.go | Updates OTLP context tests for signal support. |
| internal/pkg/service/stream/mapping/recordctx/recordctx.go | Adds Signal() to record context interface. |
| internal/pkg/service/stream/sink/router/collection.go | Mirrors sink allowed-signal filters. |
| internal/pkg/service/stream/sink/router/router.go | Applies per-signal sink routing. |
| internal/pkg/service/stream/source/dispatcher/dispatcher.go | Mirrors OTLP sources in dispatcher lookup. |
| internal/pkg/service/stream/source/type/otlpsource/decode.go | Accepts application/protobuf alias. |
| internal/pkg/service/stream/source/type/otlpsource/dispatch.go | Passes OTLP signal into record context and tracks sink failures. |
| internal/pkg/service/stream/source/type/otlpsource/flatten_common.go | Adds conversion helpers for flattened metric slices. |
| internal/pkg/service/stream/source/type/otlpsource/flatten_logs.go | Adjusts flattened log numeric types. |
| internal/pkg/service/stream/source/type/otlpsource/flatten_logs_test.go | Updates log flattening expectations. |
| internal/pkg/service/stream/source/type/otlpsource/flatten_metrics.go | Adjusts flattened metric numeric/slice types. |
| internal/pkg/service/stream/source/type/otlpsource/flatten_metrics_test.go | Updates metric flattening expectations. |
| internal/pkg/service/stream/source/type/otlpsource/flatten_traces.go | Adjusts trace flattening loops and flags type. |
| internal/pkg/service/stream/source/type/otlpsource/flatten_traces_test.go | Updates trace flattening loop style. |
| internal/pkg/service/stream/source/type/otlpsource/handler.go | Uses signal constants, validates empty batches, adds Retry-After for 429. |
| internal/pkg/service/stream/source/type/otlpsource/otlpsource_test.go | Adds OTLP integration-style tests. |
| internal/pkg/service/stream/source/type/otlpsource/response.go | Escalates all-rejected batches to top-level status. |
| internal/pkg/service/stream/storage/test/source.go | Adds OTLP source test factory. |
| stream-otlp-setup.sh | Adds setup helper for OTLP source and sinks. |
| test/stream/api/source/create-005-invalid-type-400/001-create-source/expected-response.json | Updates valid source type error expectation. |
| test/stream/api/source/create-otlp-source/001-create-source/expected-http-code | Adds OTLP source create fixture. |
| test/stream/api/source/create-otlp-source/001-create-source/expected-response.json | Adds OTLP source create expected task. |
| test/stream/api/source/create-otlp-source/001-create-source/request.json | Adds OTLP source create request. |
| test/stream/api/source/create-otlp-source/002-poll-source/expected-http-code | Adds OTLP source task polling fixture. |
| test/stream/api/source/create-otlp-source/002-poll-source/expected-response.json | Adds OTLP source task polling response. |
| test/stream/api/source/create-otlp-source/002-poll-source/request.json | Adds OTLP source task polling request. |
| test/stream/api/source/create-otlp-source/003-get-source/expected-http-code | Adds OTLP source get fixture. |
| test/stream/api/source/create-otlp-source/003-get-source/expected-response.json | Adds OTLP source get expected response. |
| test/stream/api/source/create-otlp-source/003-get-source/request.json | Adds OTLP source get request. |
| test/stream/api/source/create-otlp-source/expected-etcd-kvs.txt | Adds expected persisted OTLP source state. |
| test/stream/api/source/otlp-test-mapping/001-create-source/expected-http-code | Adds OTLP mapping test source create code. |
| test/stream/api/source/otlp-test-mapping/001-create-source/expected-response.json | Adds OTLP mapping test source create response. |
| test/stream/api/source/otlp-test-mapping/001-create-source/request.json | Adds OTLP mapping test source create request. |
| test/stream/api/source/otlp-test-mapping/002-poll-source/expected-http-code | Adds source poll expected code. |
| test/stream/api/source/otlp-test-mapping/002-poll-source/expected-response.json | Adds source poll expected response. |
| test/stream/api/source/otlp-test-mapping/002-poll-source/request.json | Adds source poll request. |
| test/stream/api/source/otlp-test-mapping/003-create-sink/expected-http-code | Adds sink create expected code. |
| test/stream/api/source/otlp-test-mapping/003-create-sink/expected-response.json | Adds sink create expected response. |
| test/stream/api/source/otlp-test-mapping/003-create-sink/request.json | Adds sink with OTLP flat-record mappings. |
| test/stream/api/source/otlp-test-mapping/004-poll-sink/expected-http-code | Adds sink poll expected code. |
| test/stream/api/source/otlp-test-mapping/004-poll-sink/expected-response.json | Adds sink poll expected response. |
| test/stream/api/source/otlp-test-mapping/004-poll-sink/request.json | Adds sink poll request. |
| test/stream/api/source/otlp-test-mapping/005-test-mapping-logs/expected-http-code | Adds successful mapping test expected code. |
| test/stream/api/source/otlp-test-mapping/005-test-mapping-logs/expected-response.json | Adds successful mapping expected response. |
| test/stream/api/source/otlp-test-mapping/005-test-mapping-logs/request.json | Adds OTLP flat log test request. |
| test/stream/api/source/otlp-test-mapping/006-test-mapping-resource/expected-http-code | Adds failing mapping expected code. |
| test/stream/api/source/otlp-test-mapping/006-test-mapping-resource/expected-response.json | Adds missing-resource mapping error expectation. |
| test/stream/api/source/otlp-test-mapping/006-test-mapping-resource/request.json | Adds invalid flat OTLP test request. |
Files not reviewed (3)
- internal/pkg/service/stream/api/gen/http/stream/server/encode_decode.go: Language not supported
- internal/pkg/service/stream/api/gen/http/stream/server/types.go: Language not supported
- internal/pkg/service/stream/api/gen/stream/service.go: Language not supported
Comments suppressed due to low confidence (4)
internal/pkg/service/stream/source/type/otlpsource/flatten_metrics.go:129
pt.Count()is auint64; converting it directly toint64silently overflows for valid OTLP exponential-histogram counts greater thanmath.MaxInt64, corrupting the flattened record. Please handle overflow explicitly or use a lossless representation for unsigned counts.
internal/pkg/service/stream/source/type/otlpsource/flatten_metrics.go:159pt.Count()is auint64; converting it directly toint64silently overflows for valid OTLP summary counts greater thanmath.MaxInt64, corrupting the flattened record. Please handle overflow explicitly or use a lossless representation for unsigned counts.
internal/pkg/service/stream/source/type/otlpsource/otlpsource_test.go:398- This test also relies on the final shutdown statements running. A failing assertion before these lines leaves the Stream components running; use
t.Cleanupright afterStartComponentssucceeds so cleanup runs on failures too.
internal/pkg/service/stream/source/type/otlpsource/otlpsource_test.go:451 - If the assertions above fail, these shutdown calls are skipped and the source components can keep running after the test fails. Register cleanup with
t.Cleanupimmediately after startup instead of relying on reaching the end of the test.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
jordanrburger
left a comment
There was a problem hiding this comment.
Reviewing through the lens of building the kbc-ui side of this on feat/stream-otlp-ui-phase1 (PR #5589 over there). Architecturally this is solid — OTLP as a first-class source type alongside HTTP, per-signal routing via allowedSignals, OTLP-spec-correct partial-success and retry semantics, full dispatcher mirror coverage so records aren't lost on restart, and decent integration tests. The pieces we hit in production while building the UI all checked out.
Five things I'd push for, prioritized — most importantly, the header-auth feature is half-implemented and the secret rotation / cascade delete story has gaps that will create real ops pain.
🔴 Push for before merge
1. Register the header-auth routes — handler is already ready
handler.go:parseAuthParams falls back to Authorization: Bearer <secret> if no <secret> URL param is present. But httpsource.go only registers the secret-in-URL routes — the secret-less routes are missing. So the bearer-token fallback in the handler is currently dead code.
This isn't a follow-up — the code is already there and not wired. Six lines in httpsource.go:
router.Options("/otlp/<projectID>/<sourceID>/v1/logs", otlpHandler.HandleOptions)
router.Post("/otlp/<projectID>/<sourceID>/v1/logs", otlpHandler.HandleLogs)
router.Options("/otlp/<projectID>/<sourceID>/v1/metrics", otlpHandler.HandleOptions)
router.Post("/otlp/<projectID>/<sourceID>/v1/metrics", otlpHandler.HandleMetrics)
router.Options("/otlp/<projectID>/<sourceID>/v1/traces", otlpHandler.HandleOptions)
router.Post("/otlp/<projectID>/<sourceID>/v1/traces", otlpHandler.HandleTraces)Without this, the only auth mode is "secret in URL," which will leak into HTTP access logs, CDN logs, and APM URL attributes. We had to actively drop the header-auth tab from the kbc-ui PR because the canary returns 404 on these paths today.
2. Expose the secret separately on the source response
Today FormatOTLPSourceURL returns a single string with the secret embedded:
https://stream-in.../otlp/22/my-source/<48-char-secret>
For the UI to render a clean header-auth flow (once #1 lands), we need otlp.secret exposed separately — or an otlp.baseUrl field that omits the secret. The kbc-ui currently has to parse the URL with a regex to split the secret out, which works but is brittle. A small addition to the OTLPSource Goa type:
var OTLPSource = Type("OTLPSource", func() {
Attribute("url", String, func() { ... }) // existing — full URL
Attribute("baseUrl", String, func() { ... }) // new — without secret
Attribute("secret", String, func() { ... }) // new — 48-char value
Required("url", "baseUrl", "secret")
})🟡 Push for as follow-ups (already tracked in Linear)
3. Cascade-delete option on source delete
DELETE /v1/branches/{branchId}/sources/{sourceId}?cascade=true to delete the source's sinks, tables, and bucket atomically. Without this, every deletion leaves orphans behind in Storage, and the recreate-with-same-name flow trips a generic 409 from the backend. Tracked as AI-3191 and assigned to you.
4. Secret rotation endpoint
POST /v1/branches/{branchId}/sources/{sourceId}/rotate-secret, optionally accepting gracePeriodSeconds so the old secret keeps working for N seconds while producers are reconfigured. Without this, leaked secrets force delete-and-recreate, breaking every cached producer config. Tracked as AI-3199.
5. Per-sink rejected-records counter in stats
PartialSuccess.RejectedLogRecords (and the metrics/traces equivalents) are surfaced to the producer in the OTLP response, but SinkStatisticsTotalResult only tracks recordsCount (imported/waiting). The UI has no way to show users "12K records rejected by sink mapping in the last hour." Add a cumulative rejected counter alongside recordsCount. Tracked as AI-3196.
🟢 Smaller things worth flagging
allowedSignals semantics on HTTP sinks
The field accepts any of "logs" | "metrics" | "traces" via validate:"dive,oneof=..." regardless of the parent source's type. The doc-comment says "HTTP sources ignore this field." Two suggestions:
- (a) Add a validation that rejects
allowedSignalson a sink whose parent source is HTTP-typed (clearer error than silent ignore). - (b) Or at least add a unit test pinning the silent-ignore behavior so it doesn't regress.
TestSource OTLP body format is undocumented
TestSource now accepts a "flattened OTLP record" body for OTLP sources, with the inline comment pointing at FlattenLogs/Metrics/Traces. The OpenAPI / TestSourcePayload description doesn't mention this — API callers (including future UI iterations of the payload tester) have to read the Go source to know what to send. Suggest expanding the TestSourcePayload description or adding an example flattened-record body in the Goa DSL.
Document the Body() jsonnet null-default behavior
Out of strict scope for this PR, but adjacent: the no-default form of Body('path') throws and silently rejects the entire record when a key is missing. We hit this hard choosing UI defaults — every Cowork event was getting dropped until we switched every template to Body('path', null). Worth a sentence in the OTLP user docs once they exist: "Use Body('path', null) for any optional field unless you want missing fields to drop the record."
FormatOTLPSourceURL — clarify the "base URL" semantics in the field description
The URL we return doesn't include the signal path — the OTel SDK appends /v1/logs etc. itself, per the OTLP/HTTP spec. Some downstream SDKs (we saw Cowork) require the literal base URL with no signal suffix and auto-strip if you try to append one. Not a code change — just expand the URL field's description in design.go:
Base endpoint URL for the OTLP source. Configure this as the endpoint in your OpenTelemetry SDK. The SDK automatically appends
/v1/logs,/v1/metrics, or/v1/tracesbased on the signal type — do not append a signal path yourself. Most SDK exporters reject or silently strip the suffix.
OTLP/gRPC scope
OTLP/HTTP only. One line in the PR description saying gRPC is out of scope (for now / forever / planned for X) would head off reviewer questions.
stream-otlp-setup.sh
A 369-line bash script at the repo root that sets up a test OTLP source. Useful, but it's the only top-level shell script in the repo — feels out-of-place there. Consider scripts/ or dev/.
TL;DR for merge
Block on (cheap to fix in this PR):
- Register the 6 header-auth routes (handler already supports it).
- Add
otlp.baseUrl+otlp.secretto the source response shape.
Track for next PRs (already filed):
Everything else is polish that can land later. Parent epic for full UI context: AI-3190.
…wedSignals - OTLPSource response now carries baseUrl (without secret) and secret as separate fields alongside the existing combined url, so SDK callers can authenticate via Authorization: Bearer <secret> against baseUrl without string-splitting. Existing url is kept for back-compat. - allowedSignals on sinks gains an item-level Enum(logs|metrics|traces) so arbitrary string values are rejected at the API boundary instead of being stored and silently dropping every record at the router. - TestSource description now spells out the flattened-OTLP-record body shape expected for OTLP sources, with an example payload. - OTLP url description warns that SDK exporters append /v1/<signal> themselves and must not be configured with a signal suffix. - AggregatedSink response now includes AllowedSignals (previously dropped). - Expected fixture updated for the new fields. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…points
- parseAuthParams falls back to Authorization: Bearer <secret> when no URL
secret is supplied, so callers can authenticate without leaking the secret
into access/CDN/APM logs.
- Six secret-less OTLP routes registered alongside the existing secret-in-URL
shape: /otlp/<projectID>/<sourceID>/v1/{logs,metrics,traces}.
- Dispatcher.lookupSources now matches by expected source type as well as
project/source/secret. Without this, an HTTP secret used on /otlp/... or
an OTLP secret used on /stream/... would authenticate successfully against
the wrong source.
- Mapper.UpdateSourceEntity clears the inactive type-specific block when a
source switches type (HTTP↔OTLP). Previously the persisted entity could
carry both blocks with stale secrets, leaving versions/etcd state
inconsistent.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ce by signal - Router extracted into a sinkAcceptsSignal predicate. HTTP-source records arrive with an empty signal and now bypass the allowedSignals filter entirely, matching the documented contract "HTTP sources ignore this field". Previously such records were silently dropped if the sink had any allowedSignals set. - TestSource filters sinks by the request's OTLP signal before rendering for OTLP sources, so a logs test payload no longer fails on metrics/traces-only sinks. Signal can be selected via ?signal=logs|metrics|traces; defaults to logs for back-compat. - New unit test pins the routing predicate for HTTP and all OTLP signals. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
OTLP histogram/exponential-histogram/summary counts and bucket counts are defined as unsigned 64-bit. Direct int64 casts silently wrap any value above math.MaxInt64 into a negative number, corrupting the flattened record. Saturating at MaxInt64 keeps the value as close to the truth as the signed encoding allows and removes the gosec G115 suppressions that were masking the issue. Affected sites: histogram count, exponential histogram count and zero_count, summary count, and the shared uint64SliceToAny helper used for bucket counts. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Dummy SinkController tracks per-sink WriteRecord call counts and supports a PipelineWriteHook for per-call outcomes. Lets tests assert which sinks actually received a record, and produce deterministic mixed accept/reject batches for partial_success coverage. - TestOTLPSource_SignalRouting now asserts per-sink record counts across logs/metrics/traces dispatches (filtered sinks plus one unfiltered sink), instead of only checking that the endpoint returned 200. - TestOTLPSource_PartialSuccess_Mixed exercises the actual partial_success path: a 4-record batch with a hook that fails every other write yields a HTTP 200 response with an OTLP body carrying rejectedLogRecords=2. - Bearer-token auth cases (success per signal, wrong secret, missing/ malformed Authorization, OPTIONS preflight on the secret-less route) cover the new header-auth routes. - Process shutdown moved into t.Cleanup at all three startup sites so a failing assertion no longer leaks background components into later tests. - TestPartialSuccess no longer leaks resp.Body. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… issues - Relocate from repo root to scripts/ alongside the other tooling shell scripts. - Fail fast on missing jq/curl instead of half-creating resources mid-run. - Build the create-source payload with jq so SOURCE_NAME values containing quotes, backslashes, or newlines stay valid JSON. - Cleanup mode polls the asynchronous DELETE task before removing the state file so a delayed deletion failure does not lose the source ID. - Snowflake smoke-verification example uses plain quoted-identifier syntax, which works for the default in.c-otlp bucket name; the previous IDENTIFIER() expansion produced malformed SQL. - Removed reference to the non-existent stream-otlp-send.sh; print an inline curl smoke-test instead. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 67 out of 72 changed files in this pull request and generated 10 comments.
Files not reviewed (3)
- internal/pkg/service/stream/api/gen/http/stream/server/encode_decode.go: Language not supported
- internal/pkg/service/stream/api/gen/http/stream/server/types.go: Language not supported
- internal/pkg/service/stream/api/gen/stream/service.go: Language not supported
Comments suppressed due to low confidence (1)
internal/pkg/service/stream/source/type/otlpsource/flatten_traces.go:85
- Span link flags are still stored with the OTLP unsigned integer type inside this loop, while span/log flags were converted to
int64to avoid go-jsonnet's unsupported unsigned values. A trace containing links can still fail mappings that readBody('links')or serialize the full body; convertl.Flags()the same way asspan.Flags().
…egated source - TestSource gains a `signal` query parameter (logs|metrics|traces, enum- validated by Goa) so generated clients can discover and validate the selector. The handler reads it from the payload and rejects unknown values at the API boundary instead of silently falling back to logs on typos. - TestSource description now lists every field FlattenLogs produces and notes the nested shape for attributes/resource/scope. The previous example used dotted keys (resource.service.name) which never match the actual flattened record and would mislead callers writing mappings against it. - NewAggregationSource for OTLP populates baseUrl and secret alongside url, matching the non-aggregated source response. The list/aggregation endpoint previously returned an incomplete OTLPSource that broke header-auth clients. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…records - The OTLP handler now calls ValidateSource immediately after parseAuthParams, before content-type detection, decompression, or decoding. Unauthenticated callers get a deterministic 404 instead of consuming decode CPU and receiving a 400/415 parse error. - headersToOrderedMap drops the Authorization header before building the record context. Without this, a bearer secret supplied via the secret-less routes could reach any sink column rendered via Header() or a headers mapping and be persisted to storage. - Unsupported-media-type error message now lists the application/protobuf alias accepted by the decoder. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ss test The fixture's inline comment claimed the test exercised the 200+partial_success path, but it actually drives the all-rejected case where shouldEscalateToError returns true and the handler emits a top-level 5xx. Reworded so a future reader does not confuse this with TestOTLPSource_PartialSuccess_Mixed. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…trics sink - save_state now creates STATE_FILE with mode 0600 (umask 077 + chmod) so the embedded OTLP_URL secret is not readable to other users on shared systems. - Metrics sink mapping now includes a scope_version column alongside scope_name. Logs and traces sinks already carried it; metrics dropped the version even though the flattened record has it. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 67 out of 72 changed files in this pull request and generated 7 comments.
Files not reviewed (3)
- internal/pkg/service/stream/api/gen/http/stream/server/encode_decode.go: Language not supported
- internal/pkg/service/stream/api/gen/http/stream/server/types.go: Language not supported
- internal/pkg/service/stream/api/gen/stream/service.go: Language not supported
…ate, per-type Source examples - New named `OTLPSignal` Goa type wraps `String` with the Enum constraint and is reused by `allowedSignals` on sinks and the `signal` selector on TestSource. Validation errors now report a clean field path (`allowedSignals[0]` rather than the doubly-indexed `allowedSignals[0].allowedSignals[*]` that an anonymous inline enum generates). - Source and AggregatedSource types declare two named examples — one for HTTP, one for OTLP — so OpenAPI no longer suggests the impossible "type: http with both http and otlp blocks" shape Goa would auto-synthesize from the optional attributes. - Single source of truth for the per-signal sink filter: new `definition.SignalAccepted(allowedSignals, signal)` and `Sink.AcceptsSignal` method. The runtime router and the /test endpoint both route through it so the two paths cannot drift again. - Mapper layer converts between the persisted entity's plain `[]string` AllowedSignals and the Goa-typed `[]api.OTLPSignal` via small signalsToStrings / stringsToSignals helpers. Tests updated accordingly. - Aggregated OTLP source response now uses the same conversion for the AllowedSignals field on sinks. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…rer parsing - DetectEncoding lower-cases the media type before matching, so HTTP-canonical variants like `Application/Protobuf` or `APPLICATION/JSON` are accepted instead of being rejected with 415. - parseAuthParams matches the `Bearer ` scheme via strings.EqualFold so `authorization: bearer <secret>` (lowercase) and any other case combination is accepted, matching RFC 9110 auth-scheme semantics. - FromOTLPTestRequest doc comment merged into a single paragraph; the previous version repeated the function summary. - Replaced one Go 1.22+ idiom miss (`for i := 0; i < 4; i++`) with the `for i := range 4` form to satisfy the intrange linter. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds the missing analog of source_http_test.go for the OTLP source type: missing block, empty block (secret required), short secret (must be 48 chars), and a valid baseline. Catches future regressions in the OTLPSource validate tags that were previously untested. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 68 out of 73 changed files in this pull request and generated 10 comments.
Files not reviewed (3)
- internal/pkg/service/stream/api/gen/http/stream/server/encode_decode.go: Language not supported
- internal/pkg/service/stream/api/gen/http/stream/server/types.go: Language not supported
- internal/pkg/service/stream/api/gen/stream/service.go: Language not supported
…mments - Source / AggregatedSource named examples now include the required `version` and `created` blocks (shared exampleVersion / exampleCreated helpers), so the generated OpenAPI examples match the schema's required-fields contract and the mapper output. - TestSource HTTP block redeclares `signal` as a typed query param with an explicit Enum. Goa propagates named-type enums to OpenAPI 3 but drops them on the v2 parameter shape; the inline declaration makes the constraint visible in `openapi.yaml` too so generated v2 clients reject unknown values. - sink_request.go comment on signalsToStrings now points readers at the reverse helper immediately below it (was incorrectly cross-referencing sink_response.go). - dispatcher.go ValidateSource comment reflects the current call sites: every OTLP request validates before decoding, not just empty batches. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…es and bearer scheme - TestDetectEncoding now covers application/protobuf (OTLP spec alias) and mixed/upper-case media types (Application/Protobuf, APPLICATION/JSON, etc.) so the RFC 9110 case-insensitive contract cannot regress unnoticed. - TestOTLPSource adds two bearer-auth integration cases: lowercase `bearer ` and mixed-case `BeArEr ` schemes, mirroring the case-insensitive prefix parsing in parseAuthParams. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… sinks The 409 conflict paths used to reuse the first source or sink with a matching name without checking whether it was the expected kind. A pre-existing HTTP source named "OTLP Source" would have been saved as SOURCE_ID — and a later cleanup run would have deleted that unrelated HTTP source. A pre-existing sink with the same name could have routed records to the wrong table or signal filter. - Source 409: filter by `name AND type=="otlp"`. If a non-OTLP source matches the name, fail with a clear message listing the conflicting type. - Sink 409: verify the existing sink's tableId and allowedSignals match what the script would have created. Refuse to reuse otherwise. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Summary
SourceTypeOTLP/OTLPSourceas a distinct, first-class source type (parallel toSourceTypeHTTP) with its own secret-based URL, validation, and API representationAllowedSignals []stringto sink definitions so individual sinks can be wired to onlylogs,metrics, ortraces— a sink with an empty list accepts all signals (backwards compatible)Signal() stringtorecordctx.Contextso the router can filter records without inspecting the body; HTTP records return""application/protobufaccepted as alias,Retry-After: 1header on 429, proper HTTP status escalation for auth failures (4xx) and transient failures (5xx/429)application/protobufalias, per-signal routing, and partial successTest plan
go test ./internal/pkg/service/stream/source/type/otlpsource/...— all unit + integration tests passgo test ./internal/pkg/service/stream/mapping/recordctx/...—Signal()tests passtype: otlpvia API returns an OTLP URLallowedSignals: ["logs"]only receives log records🤖 Generated with Claude Code