Skip to content

AI-3190 feat(stream): OTLP as first-class source type with per-signal sink routing#2598

Open
Matovidlo wants to merge 29 commits into
feat/stream-otlp-logs-phase1from
feat/stream-otlp-source-type
Open

AI-3190 feat(stream): OTLP as first-class source type with per-signal sink routing#2598
Matovidlo wants to merge 29 commits into
feat/stream-otlp-logs-phase1from
feat/stream-otlp-source-type

Conversation

@Matovidlo
Copy link
Copy Markdown
Contributor

Summary

  • Introduces SourceTypeOTLP / OTLPSource as a distinct, first-class source type (parallel to SourceTypeHTTP) with its own secret-based URL, validation, and API representation
  • Adds AllowedSignals []string to sink definitions so individual sinks can be wired to only logs, metrics, or traces — a sink with an empty list accepts all signals (backwards compatible)
  • Adds Signal() string to recordctx.Context so the router can filter records without inspecting the body; HTTP records return ""
  • Fixes OTLP spec gaps: application/protobuf accepted as alias, Retry-After: 1 header on 429, proper HTTP status escalation for auth failures (4xx) and transient failures (5xx/429)
  • Extends the dispatcher's in-memory mirror to include OTLP sources alongside HTTP sources, so data is no longer lost on restart
  • Exposes the new source type through the Stream REST API (Goa DSL + regenerated artefacts)
  • Integration tests using real pdata marshalers covering CORS, error cases, all three signals, gzip encoding, application/protobuf alias, per-signal routing, and partial success

Test plan

  • go test ./internal/pkg/service/stream/source/type/otlpsource/... — all unit + integration tests pass
  • go test ./internal/pkg/service/stream/mapping/recordctx/...Signal() tests pass
  • Verify creating a source with type: otlp via API returns an OTLP URL
  • Verify a sink with allowedSignals: ["logs"] only receives log records
  • Verify wrong secret returns 404, disabled source returns 404, gzip payload decodes correctly

🤖 Generated with Claude Code

Matovidlo and others added 6 commits May 11, 2026 15:41
Add `SourceTypeOTLP` / `OTLPSource` alongside `SourceTypeHTTP` / `HTTPSource`
so OTLP sources are a distinct entity in the definition layer rather than
piggy-backing on the HTTP source type.  Key changes:

- `source_otlp.go`: `OTLPSource` struct, signal name constants
  (`OTLPSignalLogs/Metrics/Traces`), `FormatOTLPSourceURL`
- `source_base.go`: add `OTLP *OTLPSource` field with `required_if` validation
- `sink_base.go`: add `AllowedSignals []string` for per-signal routing
- `dispatcher.go`: mirror filter and secret extraction extended to OTLP sources
- `storage/test/source.go`: `NewOTLPSource` test factory

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- `recordctx.Context` gains `Signal() string` (returns "" for HTTP,
  "logs"/"metrics"/"traces" for OTLP records)
- `fasthttp.go` / `http.go`: stub returning ""
- `otlp.go`: carry signal through `FromOTLP`; add `TestOTLPContext_Signal`
- `router.go`: skip sinks whose `AllowedSignals` does not include the record's signal
- `collection.go`: store `allowedSignals` from the sink definition

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…tion, Retry-After

- `handler.go`: pass signal constants to `handle()`; add `Retry-After: 1`
  header on 429 responses
- `decode.go`: accept `application/protobuf` as alias for `application/x-protobuf`
- `dispatch.go`: pass signal to `recordctx.FromOTLP`; count records as
  rejected when sinks return non-2xx (not just routing errors)
- `response.go`: escalate to top-level HTTP status when ALL records are
  rejected (4xx for auth failures, 5xx/429 for transient failures)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- `design.go`: add `otlp` attribute and `OTLPSource` type to Goa DSL;
  register `SourceTypeOTLP` in the `SourceType` enum
- `source_request.go` / `source_response.go` / `aggregation.go`: handle
  the new type in create, update, and list mappers
- Regenerate all Goa/OpenAPI artefacts

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add otlpsource_test.go covering:
- CORS preflight, unsupported content-type, bad project ID, wrong secret
- Source disable / re-enable with dispatcher mirror propagation check
- Empty batches for all three signals
- Logs via protobuf, JSON, gzip, and application/protobuf alias
- Metrics and traces via protobuf
- Per-signal sink routing (AllowedSignals filtering)
- Partial success / top-level error escalation when sinks fail

Test helpers use real pdata marshalers (plog, pmetric, ptrace) to build
payloads that match what actual OTel SDKs send.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- create-otlp-source: verifies OTLP source creation with type=otlp,
  checks the /otlp/{project}/{source}/{secret} URL in the response,
  and validates etcd contains secret + source type fields
- otlp-test-mapping: exercises the /test endpoint for OTLP sources;
  verifies jsonnet Body() expressions extract severity_text, body,
  timestamp, resource object fields (service.name) and nested
  attribute keys (user.id) from a flat JSON test payload; also
  confirms the correct 422 error when a required top-level field
  (resource) is absent from the test body

Adds FromOTLPTestRequest to recordctx — parses the /test endpoint
body as a flat JSON OTLP record and builds an otlpContext so that
the same column renderer used for real traffic can be exercised
without a protobuf payload.

Updates TestSource in the API service to detect SourceTypeOTLP and
dispatch through FromOTLPTestRequest instead of FromHTTP.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@Matovidlo Matovidlo changed the base branch from feat/stream-otlp-logs-phase1 to main May 11, 2026 14:27
@Matovidlo Matovidlo changed the base branch from main to feat/stream-otlp-logs-phase1 May 11, 2026 14:28
Lint fixes (intrange, gci, staticcheck, unparam):
- Convert C-style for loops over pdata slice lengths to Go 1.22+
  integer range form (flatten_common, flatten_logs, flatten_metrics,
  flatten_traces and their tests)
- Fix gci import ordering in source_base.go, response.go,
  otlpsource_test.go (standard / default / keboola-as-code groups)
- Remove embedded RequestCtx selector in handler.go (c.RemoteIP()
  instead of c.RequestCtx.RemoteIP())
- Remove always-constant contentType param from doOTLPPost helper;
  inline "application/x-protobuf" directly

E2E test fixes:
- Add missing 001-create-source/expected-response.json for
  otlp-test-mapping (task creation response)
- Add missing 002-poll-source/expected-http-code for
  create-otlp-source
- Update create-005-invalid-type-400 expected message to include
  "otlp" in the list of valid source types

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Matovidlo and others added 6 commits May 11, 2026 17:26
Add missing expected-http-code and expected-response.json files for
poll steps in otlp-test-mapping:
- 002-poll-source: expected-http-code + full task response
- 003-create-sink: expected-response.json (task creation 202 body)
- 004-poll-sink:  expected-http-code + full task response

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Convert all uint32/uint64/typed-slice fields to go-jsonnet-compatible
types (int64/[]any) to prevent "Not a json type" errors at record write time.
Also add empty-batch source validation so wrong-secret requests get 404
instead of a free 200 on empty OTLP payloads.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…OTLP flatten

These conversions are intentional — OTLP counter values that exceed int64
max are not expected in practice, and the cast is required for go-jsonnet
type compatibility.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Align expected types with the go-jsonnet-compatible conversions introduced
in the previous commit (uint64→int64, []uint64→[]any, []float64→[]any).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add allowedSignals field to the Goa sink design so each sink can be
restricted to specific OTLP signal types (logs, metrics, traces).
Empty means accept all signals (existing behaviour preserved).

- api/stream/design.go: add allowedSignals attribute to SinkFields
- gen/stream/service.go: regenerated — AllowedSignals on Create/Update/Read
- mapper/sink_request.go: wire AllowedSignals on create and update
- mapper/sink_response.go: include AllowedSignals in response
- stream-otlp-setup.sh: pass signal filter per sink so each table
  receives only its own signal type

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…Signals mapper tests

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@jordanrburger
Copy link
Copy Markdown

@claude - Can you fix the unit tests on this one?

@Matovidlo
Copy link
Copy Markdown
Contributor Author

@jordanrburger it's ok there are flaky test

@Matovidlo Matovidlo marked this pull request as ready for review May 14, 2026 13:04
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds OTLP as a first-class Stream source type and introduces per-signal sink routing so OTLP logs, metrics, and traces can be routed independently while preserving HTTP-source compatibility.

Changes:

  • Adds OTLP source definitions, API/schema exposure, source URL formatting, dispatcher mirroring, and OTLP handler updates.
  • Adds allowedSignals to sinks and routes records based on recordctx.Context.Signal().
  • Adds OTLP flattening/test coverage updates, API fixtures, and a setup helper script.

Reviewed changes

Copilot reviewed 64 out of 69 changed files in this pull request and generated 18 comments.

Show a summary per file
File Description
api/stream/design.go Adds OTLP source and allowedSignals to Goa API design.
internal/pkg/service/stream/api/gen/http/stream/server/encode_decode.go Regenerates HTTP encoding for OTLP/allowedSignals fields.
internal/pkg/service/stream/api/gen/http/stream/server/types.go Regenerates HTTP request/response structs and validation.
internal/pkg/service/stream/api/gen/stream/service.go Regenerates service API types.
internal/pkg/service/stream/api/mapper/aggregation.go Maps OTLP URL for aggregated sources.
internal/pkg/service/stream/api/mapper/sink_allowed_signals_test.go Adds mapper tests for sink signal filters.
internal/pkg/service/stream/api/mapper/sink_request.go Maps allowedSignals from create/update payloads.
internal/pkg/service/stream/api/mapper/sink_response.go Exposes allowedSignals in sink responses.
internal/pkg/service/stream/api/mapper/source_request.go Creates/updates OTLP source entities.
internal/pkg/service/stream/api/mapper/source_response.go Exposes OTLP source URLs.
internal/pkg/service/stream/api/openapi/openapi.yaml Regenerates OpenAPI v2 schema.
internal/pkg/service/stream/api/openapi/openapi3.yaml Regenerates OpenAPI v3 schema.
internal/pkg/service/stream/api/service/source.go Handles OTLP source test requests as flat OTLP records.
internal/pkg/service/stream/definition/sink_base.go Adds AllowedSignals to sink definitions.
internal/pkg/service/stream/definition/source_base.go Adds OTLP-specific source config.
internal/pkg/service/stream/definition/source_otlp.go Defines OTLP source type, signals, and URL formatting.
internal/pkg/service/stream/mapping/recordctx/fasthttp.go Adds empty signal for HTTP records.
internal/pkg/service/stream/mapping/recordctx/http.go Adds empty signal for HTTP records.
internal/pkg/service/stream/mapping/recordctx/otlp.go Adds OTLP signal storage and test-request context creation.
internal/pkg/service/stream/mapping/recordctx/otlp_test.go Updates OTLP context tests for signal support.
internal/pkg/service/stream/mapping/recordctx/recordctx.go Adds Signal() to record context interface.
internal/pkg/service/stream/sink/router/collection.go Mirrors sink allowed-signal filters.
internal/pkg/service/stream/sink/router/router.go Applies per-signal sink routing.
internal/pkg/service/stream/source/dispatcher/dispatcher.go Mirrors OTLP sources in dispatcher lookup.
internal/pkg/service/stream/source/type/otlpsource/decode.go Accepts application/protobuf alias.
internal/pkg/service/stream/source/type/otlpsource/dispatch.go Passes OTLP signal into record context and tracks sink failures.
internal/pkg/service/stream/source/type/otlpsource/flatten_common.go Adds conversion helpers for flattened metric slices.
internal/pkg/service/stream/source/type/otlpsource/flatten_logs.go Adjusts flattened log numeric types.
internal/pkg/service/stream/source/type/otlpsource/flatten_logs_test.go Updates log flattening expectations.
internal/pkg/service/stream/source/type/otlpsource/flatten_metrics.go Adjusts flattened metric numeric/slice types.
internal/pkg/service/stream/source/type/otlpsource/flatten_metrics_test.go Updates metric flattening expectations.
internal/pkg/service/stream/source/type/otlpsource/flatten_traces.go Adjusts trace flattening loops and flags type.
internal/pkg/service/stream/source/type/otlpsource/flatten_traces_test.go Updates trace flattening loop style.
internal/pkg/service/stream/source/type/otlpsource/handler.go Uses signal constants, validates empty batches, adds Retry-After for 429.
internal/pkg/service/stream/source/type/otlpsource/otlpsource_test.go Adds OTLP integration-style tests.
internal/pkg/service/stream/source/type/otlpsource/response.go Escalates all-rejected batches to top-level status.
internal/pkg/service/stream/storage/test/source.go Adds OTLP source test factory.
stream-otlp-setup.sh Adds setup helper for OTLP source and sinks.
test/stream/api/source/create-005-invalid-type-400/001-create-source/expected-response.json Updates valid source type error expectation.
test/stream/api/source/create-otlp-source/001-create-source/expected-http-code Adds OTLP source create fixture.
test/stream/api/source/create-otlp-source/001-create-source/expected-response.json Adds OTLP source create expected task.
test/stream/api/source/create-otlp-source/001-create-source/request.json Adds OTLP source create request.
test/stream/api/source/create-otlp-source/002-poll-source/expected-http-code Adds OTLP source task polling fixture.
test/stream/api/source/create-otlp-source/002-poll-source/expected-response.json Adds OTLP source task polling response.
test/stream/api/source/create-otlp-source/002-poll-source/request.json Adds OTLP source task polling request.
test/stream/api/source/create-otlp-source/003-get-source/expected-http-code Adds OTLP source get fixture.
test/stream/api/source/create-otlp-source/003-get-source/expected-response.json Adds OTLP source get expected response.
test/stream/api/source/create-otlp-source/003-get-source/request.json Adds OTLP source get request.
test/stream/api/source/create-otlp-source/expected-etcd-kvs.txt Adds expected persisted OTLP source state.
test/stream/api/source/otlp-test-mapping/001-create-source/expected-http-code Adds OTLP mapping test source create code.
test/stream/api/source/otlp-test-mapping/001-create-source/expected-response.json Adds OTLP mapping test source create response.
test/stream/api/source/otlp-test-mapping/001-create-source/request.json Adds OTLP mapping test source create request.
test/stream/api/source/otlp-test-mapping/002-poll-source/expected-http-code Adds source poll expected code.
test/stream/api/source/otlp-test-mapping/002-poll-source/expected-response.json Adds source poll expected response.
test/stream/api/source/otlp-test-mapping/002-poll-source/request.json Adds source poll request.
test/stream/api/source/otlp-test-mapping/003-create-sink/expected-http-code Adds sink create expected code.
test/stream/api/source/otlp-test-mapping/003-create-sink/expected-response.json Adds sink create expected response.
test/stream/api/source/otlp-test-mapping/003-create-sink/request.json Adds sink with OTLP flat-record mappings.
test/stream/api/source/otlp-test-mapping/004-poll-sink/expected-http-code Adds sink poll expected code.
test/stream/api/source/otlp-test-mapping/004-poll-sink/expected-response.json Adds sink poll expected response.
test/stream/api/source/otlp-test-mapping/004-poll-sink/request.json Adds sink poll request.
test/stream/api/source/otlp-test-mapping/005-test-mapping-logs/expected-http-code Adds successful mapping test expected code.
test/stream/api/source/otlp-test-mapping/005-test-mapping-logs/expected-response.json Adds successful mapping expected response.
test/stream/api/source/otlp-test-mapping/005-test-mapping-logs/request.json Adds OTLP flat log test request.
test/stream/api/source/otlp-test-mapping/006-test-mapping-resource/expected-http-code Adds failing mapping expected code.
test/stream/api/source/otlp-test-mapping/006-test-mapping-resource/expected-response.json Adds missing-resource mapping error expectation.
test/stream/api/source/otlp-test-mapping/006-test-mapping-resource/request.json Adds invalid flat OTLP test request.
Files not reviewed (3)
  • internal/pkg/service/stream/api/gen/http/stream/server/encode_decode.go: Language not supported
  • internal/pkg/service/stream/api/gen/http/stream/server/types.go: Language not supported
  • internal/pkg/service/stream/api/gen/stream/service.go: Language not supported
Comments suppressed due to low confidence (4)

internal/pkg/service/stream/source/type/otlpsource/flatten_metrics.go:129

  • pt.Count() is a uint64; converting it directly to int64 silently overflows for valid OTLP exponential-histogram counts greater than math.MaxInt64, corrupting the flattened record. Please handle overflow explicitly or use a lossless representation for unsigned counts.
    internal/pkg/service/stream/source/type/otlpsource/flatten_metrics.go:159
  • pt.Count() is a uint64; converting it directly to int64 silently overflows for valid OTLP summary counts greater than math.MaxInt64, corrupting the flattened record. Please handle overflow explicitly or use a lossless representation for unsigned counts.
    internal/pkg/service/stream/source/type/otlpsource/otlpsource_test.go:398
  • This test also relies on the final shutdown statements running. A failing assertion before these lines leaves the Stream components running; use t.Cleanup right after StartComponents succeeds so cleanup runs on failures too.
    internal/pkg/service/stream/source/type/otlpsource/otlpsource_test.go:451
  • If the assertions above fail, these shutdown calls are skipped and the source components can keep running after the test fails. Register cleanup with t.Cleanup immediately after startup instead of relying on reaching the end of the test.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread api/stream/design.go Outdated
Comment thread internal/pkg/service/stream/sink/router/router.go Outdated
Comment thread internal/pkg/service/stream/api/mapper/source_request.go
Comment thread api/stream/design.go Outdated
Comment thread internal/pkg/service/stream/source/type/otlpsource/otlpsource_test.go Outdated
Comment thread scripts/stream-otlp-setup.sh
Comment thread scripts/stream-otlp-setup.sh
Comment thread internal/pkg/service/stream/api/service/source.go
Comment thread internal/pkg/service/stream/source/type/otlpsource/otlpsource_test.go Outdated
@jordanrburger jordanrburger changed the title feat(stream): OTLP as first-class source type with per-signal sink routing AI-3190 feat(stream): OTLP as first-class source type with per-signal sink routing May 14, 2026
@linear
Copy link
Copy Markdown

linear Bot commented May 14, 2026

AI-3190

Copy link
Copy Markdown

@jordanrburger jordanrburger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewing through the lens of building the kbc-ui side of this on feat/stream-otlp-ui-phase1 (PR #5589 over there). Architecturally this is solid — OTLP as a first-class source type alongside HTTP, per-signal routing via allowedSignals, OTLP-spec-correct partial-success and retry semantics, full dispatcher mirror coverage so records aren't lost on restart, and decent integration tests. The pieces we hit in production while building the UI all checked out.

Five things I'd push for, prioritized — most importantly, the header-auth feature is half-implemented and the secret rotation / cascade delete story has gaps that will create real ops pain.

🔴 Push for before merge

1. Register the header-auth routes — handler is already ready

handler.go:parseAuthParams falls back to Authorization: Bearer <secret> if no <secret> URL param is present. But httpsource.go only registers the secret-in-URL routes — the secret-less routes are missing. So the bearer-token fallback in the handler is currently dead code.

This isn't a follow-up — the code is already there and not wired. Six lines in httpsource.go:

router.Options("/otlp/<projectID>/<sourceID>/v1/logs", otlpHandler.HandleOptions)
router.Post("/otlp/<projectID>/<sourceID>/v1/logs", otlpHandler.HandleLogs)
router.Options("/otlp/<projectID>/<sourceID>/v1/metrics", otlpHandler.HandleOptions)
router.Post("/otlp/<projectID>/<sourceID>/v1/metrics", otlpHandler.HandleMetrics)
router.Options("/otlp/<projectID>/<sourceID>/v1/traces", otlpHandler.HandleOptions)
router.Post("/otlp/<projectID>/<sourceID>/v1/traces", otlpHandler.HandleTraces)

Without this, the only auth mode is "secret in URL," which will leak into HTTP access logs, CDN logs, and APM URL attributes. We had to actively drop the header-auth tab from the kbc-ui PR because the canary returns 404 on these paths today.

2. Expose the secret separately on the source response

Today FormatOTLPSourceURL returns a single string with the secret embedded:

https://stream-in.../otlp/22/my-source/<48-char-secret>

For the UI to render a clean header-auth flow (once #1 lands), we need otlp.secret exposed separately — or an otlp.baseUrl field that omits the secret. The kbc-ui currently has to parse the URL with a regex to split the secret out, which works but is brittle. A small addition to the OTLPSource Goa type:

var OTLPSource = Type("OTLPSource", func() {
    Attribute("url", String, func() { ... })        // existing — full URL
    Attribute("baseUrl", String, func() { ... })    // new — without secret
    Attribute("secret", String, func() { ... })     // new — 48-char value
    Required("url", "baseUrl", "secret")
})

🟡 Push for as follow-ups (already tracked in Linear)

3. Cascade-delete option on source delete

DELETE /v1/branches/{branchId}/sources/{sourceId}?cascade=true to delete the source's sinks, tables, and bucket atomically. Without this, every deletion leaves orphans behind in Storage, and the recreate-with-same-name flow trips a generic 409 from the backend. Tracked as AI-3191 and assigned to you.

4. Secret rotation endpoint

POST /v1/branches/{branchId}/sources/{sourceId}/rotate-secret, optionally accepting gracePeriodSeconds so the old secret keeps working for N seconds while producers are reconfigured. Without this, leaked secrets force delete-and-recreate, breaking every cached producer config. Tracked as AI-3199.

5. Per-sink rejected-records counter in stats

PartialSuccess.RejectedLogRecords (and the metrics/traces equivalents) are surfaced to the producer in the OTLP response, but SinkStatisticsTotalResult only tracks recordsCount (imported/waiting). The UI has no way to show users "12K records rejected by sink mapping in the last hour." Add a cumulative rejected counter alongside recordsCount. Tracked as AI-3196.

🟢 Smaller things worth flagging

allowedSignals semantics on HTTP sinks

The field accepts any of "logs" | "metrics" | "traces" via validate:"dive,oneof=..." regardless of the parent source's type. The doc-comment says "HTTP sources ignore this field." Two suggestions:

  • (a) Add a validation that rejects allowedSignals on a sink whose parent source is HTTP-typed (clearer error than silent ignore).
  • (b) Or at least add a unit test pinning the silent-ignore behavior so it doesn't regress.

TestSource OTLP body format is undocumented

TestSource now accepts a "flattened OTLP record" body for OTLP sources, with the inline comment pointing at FlattenLogs/Metrics/Traces. The OpenAPI / TestSourcePayload description doesn't mention this — API callers (including future UI iterations of the payload tester) have to read the Go source to know what to send. Suggest expanding the TestSourcePayload description or adding an example flattened-record body in the Goa DSL.

Document the Body() jsonnet null-default behavior

Out of strict scope for this PR, but adjacent: the no-default form of Body('path') throws and silently rejects the entire record when a key is missing. We hit this hard choosing UI defaults — every Cowork event was getting dropped until we switched every template to Body('path', null). Worth a sentence in the OTLP user docs once they exist: "Use Body('path', null) for any optional field unless you want missing fields to drop the record."

FormatOTLPSourceURL — clarify the "base URL" semantics in the field description

The URL we return doesn't include the signal path — the OTel SDK appends /v1/logs etc. itself, per the OTLP/HTTP spec. Some downstream SDKs (we saw Cowork) require the literal base URL with no signal suffix and auto-strip if you try to append one. Not a code change — just expand the URL field's description in design.go:

Base endpoint URL for the OTLP source. Configure this as the endpoint in your OpenTelemetry SDK. The SDK automatically appends /v1/logs, /v1/metrics, or /v1/traces based on the signal type — do not append a signal path yourself. Most SDK exporters reject or silently strip the suffix.

OTLP/gRPC scope

OTLP/HTTP only. One line in the PR description saying gRPC is out of scope (for now / forever / planned for X) would head off reviewer questions.

stream-otlp-setup.sh

A 369-line bash script at the repo root that sets up a test OTLP source. Useful, but it's the only top-level shell script in the repo — feels out-of-place there. Consider scripts/ or dev/.


TL;DR for merge

Block on (cheap to fix in this PR):

  1. Register the 6 header-auth routes (handler already supports it).
  2. Add otlp.baseUrl + otlp.secret to the source response shape.

Track for next PRs (already filed):

Everything else is polish that can land later. Parent epic for full UI context: AI-3190.

Matovidlo and others added 6 commits May 15, 2026 09:42
…wedSignals

- OTLPSource response now carries baseUrl (without secret) and secret as
  separate fields alongside the existing combined url, so SDK callers can
  authenticate via Authorization: Bearer <secret> against baseUrl without
  string-splitting. Existing url is kept for back-compat.
- allowedSignals on sinks gains an item-level Enum(logs|metrics|traces) so
  arbitrary string values are rejected at the API boundary instead of being
  stored and silently dropping every record at the router.
- TestSource description now spells out the flattened-OTLP-record body shape
  expected for OTLP sources, with an example payload.
- OTLP url description warns that SDK exporters append /v1/<signal> themselves
  and must not be configured with a signal suffix.
- AggregatedSink response now includes AllowedSignals (previously dropped).
- Expected fixture updated for the new fields.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…points

- parseAuthParams falls back to Authorization: Bearer <secret> when no URL
  secret is supplied, so callers can authenticate without leaking the secret
  into access/CDN/APM logs.
- Six secret-less OTLP routes registered alongside the existing secret-in-URL
  shape: /otlp/<projectID>/<sourceID>/v1/{logs,metrics,traces}.
- Dispatcher.lookupSources now matches by expected source type as well as
  project/source/secret. Without this, an HTTP secret used on /otlp/... or
  an OTLP secret used on /stream/... would authenticate successfully against
  the wrong source.
- Mapper.UpdateSourceEntity clears the inactive type-specific block when a
  source switches type (HTTP↔OTLP). Previously the persisted entity could
  carry both blocks with stale secrets, leaving versions/etcd state
  inconsistent.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ce by signal

- Router extracted into a sinkAcceptsSignal predicate. HTTP-source records
  arrive with an empty signal and now bypass the allowedSignals filter
  entirely, matching the documented contract "HTTP sources ignore this
  field". Previously such records were silently dropped if the sink had any
  allowedSignals set.
- TestSource filters sinks by the request's OTLP signal before rendering for
  OTLP sources, so a logs test payload no longer fails on metrics/traces-only
  sinks. Signal can be selected via ?signal=logs|metrics|traces; defaults to
  logs for back-compat.
- New unit test pins the routing predicate for HTTP and all OTLP signals.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
OTLP histogram/exponential-histogram/summary counts and bucket counts are
defined as unsigned 64-bit. Direct int64 casts silently wrap any value above
math.MaxInt64 into a negative number, corrupting the flattened record.
Saturating at MaxInt64 keeps the value as close to the truth as the signed
encoding allows and removes the gosec G115 suppressions that were masking
the issue. Affected sites: histogram count, exponential histogram count and
zero_count, summary count, and the shared uint64SliceToAny helper used for
bucket counts.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Dummy SinkController tracks per-sink WriteRecord call counts and supports
  a PipelineWriteHook for per-call outcomes. Lets tests assert which sinks
  actually received a record, and produce deterministic mixed accept/reject
  batches for partial_success coverage.
- TestOTLPSource_SignalRouting now asserts per-sink record counts across
  logs/metrics/traces dispatches (filtered sinks plus one unfiltered sink),
  instead of only checking that the endpoint returned 200.
- TestOTLPSource_PartialSuccess_Mixed exercises the actual partial_success
  path: a 4-record batch with a hook that fails every other write yields a
  HTTP 200 response with an OTLP body carrying rejectedLogRecords=2.
- Bearer-token auth cases (success per signal, wrong secret, missing/
  malformed Authorization, OPTIONS preflight on the secret-less route)
  cover the new header-auth routes.
- Process shutdown moved into t.Cleanup at all three startup sites so a
  failing assertion no longer leaks background components into later tests.
- TestPartialSuccess no longer leaks resp.Body.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… issues

- Relocate from repo root to scripts/ alongside the other tooling shell scripts.
- Fail fast on missing jq/curl instead of half-creating resources mid-run.
- Build the create-source payload with jq so SOURCE_NAME values containing
  quotes, backslashes, or newlines stay valid JSON.
- Cleanup mode polls the asynchronous DELETE task before removing the state
  file so a delayed deletion failure does not lose the source ID.
- Snowflake smoke-verification example uses plain quoted-identifier syntax,
  which works for the default in.c-otlp bucket name; the previous IDENTIFIER()
  expansion produced malformed SQL.
- Removed reference to the non-existent stream-otlp-send.sh; print an inline
  curl smoke-test instead.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 67 out of 72 changed files in this pull request and generated 10 comments.

Files not reviewed (3)
  • internal/pkg/service/stream/api/gen/http/stream/server/encode_decode.go: Language not supported
  • internal/pkg/service/stream/api/gen/http/stream/server/types.go: Language not supported
  • internal/pkg/service/stream/api/gen/stream/service.go: Language not supported
Comments suppressed due to low confidence (1)

internal/pkg/service/stream/source/type/otlpsource/flatten_traces.go:85

  • Span link flags are still stored with the OTLP unsigned integer type inside this loop, while span/log flags were converted to int64 to avoid go-jsonnet's unsupported unsigned values. A trace containing links can still fail mappings that read Body('links') or serialize the full body; convert l.Flags() the same way as span.Flags().

Comment thread internal/pkg/service/stream/api/mapper/aggregation.go Outdated
Comment thread internal/pkg/service/stream/mapping/recordctx/otlp.go Outdated
Comment thread api/stream/design.go Outdated
Comment thread internal/pkg/service/stream/mapping/recordctx/otlp.go Outdated
Comment thread internal/pkg/service/stream/source/type/otlpsource/decode.go
Comment thread internal/pkg/service/stream/source/type/otlpsource/handler.go Outdated
Comment thread scripts/stream-otlp-setup.sh
Comment thread scripts/stream-otlp-setup.sh
Comment thread internal/pkg/service/stream/source/type/otlpsource/handler.go Outdated
Comment thread internal/pkg/service/stream/source/type/otlpsource/otlpsource_test.go Outdated
Matovidlo and others added 4 commits May 15, 2026 10:12
…egated source

- TestSource gains a `signal` query parameter (logs|metrics|traces, enum-
  validated by Goa) so generated clients can discover and validate the
  selector. The handler reads it from the payload and rejects unknown values
  at the API boundary instead of silently falling back to logs on typos.
- TestSource description now lists every field FlattenLogs produces and
  notes the nested shape for attributes/resource/scope. The previous example
  used dotted keys (resource.service.name) which never match the actual
  flattened record and would mislead callers writing mappings against it.
- NewAggregationSource for OTLP populates baseUrl and secret alongside url,
  matching the non-aggregated source response. The list/aggregation endpoint
  previously returned an incomplete OTLPSource that broke header-auth
  clients.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…records

- The OTLP handler now calls ValidateSource immediately after parseAuthParams,
  before content-type detection, decompression, or decoding. Unauthenticated
  callers get a deterministic 404 instead of consuming decode CPU and
  receiving a 400/415 parse error.
- headersToOrderedMap drops the Authorization header before building the
  record context. Without this, a bearer secret supplied via the secret-less
  routes could reach any sink column rendered via Header() or a headers
  mapping and be persisted to storage.
- Unsupported-media-type error message now lists the application/protobuf
  alias accepted by the decoder.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ss test

The fixture's inline comment claimed the test exercised the 200+partial_success
path, but it actually drives the all-rejected case where shouldEscalateToError
returns true and the handler emits a top-level 5xx. Reworded so a future
reader does not confuse this with TestOTLPSource_PartialSuccess_Mixed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…trics sink

- save_state now creates STATE_FILE with mode 0600 (umask 077 + chmod) so the
  embedded OTLP_URL secret is not readable to other users on shared systems.
- Metrics sink mapping now includes a scope_version column alongside scope_name.
  Logs and traces sinks already carried it; metrics dropped the version even
  though the flattened record has it.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 67 out of 72 changed files in this pull request and generated 7 comments.

Files not reviewed (3)
  • internal/pkg/service/stream/api/gen/http/stream/server/encode_decode.go: Language not supported
  • internal/pkg/service/stream/api/gen/http/stream/server/types.go: Language not supported
  • internal/pkg/service/stream/api/gen/stream/service.go: Language not supported

Comment thread api/stream/design.go
Comment thread internal/pkg/service/stream/api/service/source.go Outdated
Comment thread internal/pkg/service/stream/definition/source_otlp.go
Comment thread internal/pkg/service/stream/source/type/otlpsource/decode.go Outdated
Comment thread internal/pkg/service/stream/source/type/otlpsource/handler.go
Comment thread internal/pkg/service/stream/mapping/recordctx/otlp.go Outdated
Comment thread api/stream/design.go Outdated
Matovidlo and others added 3 commits May 15, 2026 11:07
…ate, per-type Source examples

- New named `OTLPSignal` Goa type wraps `String` with the Enum constraint and
  is reused by `allowedSignals` on sinks and the `signal` selector on
  TestSource. Validation errors now report a clean field path
  (`allowedSignals[0]` rather than the doubly-indexed
  `allowedSignals[0].allowedSignals[*]` that an anonymous inline enum
  generates).
- Source and AggregatedSource types declare two named examples — one for HTTP,
  one for OTLP — so OpenAPI no longer suggests the impossible "type: http with
  both http and otlp blocks" shape Goa would auto-synthesize from the optional
  attributes.
- Single source of truth for the per-signal sink filter: new
  `definition.SignalAccepted(allowedSignals, signal)` and `Sink.AcceptsSignal`
  method. The runtime router and the /test endpoint both route through it so
  the two paths cannot drift again.
- Mapper layer converts between the persisted entity's plain `[]string`
  AllowedSignals and the Goa-typed `[]api.OTLPSignal` via small
  signalsToStrings / stringsToSignals helpers. Tests updated accordingly.
- Aggregated OTLP source response now uses the same conversion for the
  AllowedSignals field on sinks.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…rer parsing

- DetectEncoding lower-cases the media type before matching, so HTTP-canonical
  variants like `Application/Protobuf` or `APPLICATION/JSON` are accepted
  instead of being rejected with 415.
- parseAuthParams matches the `Bearer ` scheme via strings.EqualFold so
  `authorization: bearer <secret>` (lowercase) and any other case combination
  is accepted, matching RFC 9110 auth-scheme semantics.
- FromOTLPTestRequest doc comment merged into a single paragraph; the previous
  version repeated the function summary.
- Replaced one Go 1.22+ idiom miss (`for i := 0; i < 4; i++`) with the
  `for i := range 4` form to satisfy the intrange linter.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds the missing analog of source_http_test.go for the OTLP source type:
missing block, empty block (secret required), short secret (must be 48 chars),
and a valid baseline. Catches future regressions in the OTLPSource validate
tags that were previously untested.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 68 out of 73 changed files in this pull request and generated 10 comments.

Files not reviewed (3)
  • internal/pkg/service/stream/api/gen/http/stream/server/encode_decode.go: Language not supported
  • internal/pkg/service/stream/api/gen/http/stream/server/types.go: Language not supported
  • internal/pkg/service/stream/api/gen/stream/service.go: Language not supported

Comment thread api/stream/design.go
Comment thread internal/pkg/service/stream/api/openapi/openapi.yaml
Comment thread internal/pkg/service/stream/api/mapper/sink_request.go Outdated
Comment thread scripts/stream-otlp-setup.sh Outdated
Comment thread scripts/stream-otlp-setup.sh Outdated
Comment thread api/stream/design.go
Comment thread api/stream/design.go
Comment thread internal/pkg/service/stream/source/dispatcher/dispatcher.go Outdated
Comment thread internal/pkg/service/stream/source/type/otlpsource/decode.go
Comment thread internal/pkg/service/stream/source/type/otlpsource/handler.go
Matovidlo and others added 3 commits May 15, 2026 11:36
…mments

- Source / AggregatedSource named examples now include the required `version`
  and `created` blocks (shared exampleVersion / exampleCreated helpers), so
  the generated OpenAPI examples match the schema's required-fields contract
  and the mapper output.
- TestSource HTTP block redeclares `signal` as a typed query param with an
  explicit Enum. Goa propagates named-type enums to OpenAPI 3 but drops them
  on the v2 parameter shape; the inline declaration makes the constraint
  visible in `openapi.yaml` too so generated v2 clients reject unknown values.
- sink_request.go comment on signalsToStrings now points readers at the
  reverse helper immediately below it (was incorrectly cross-referencing
  sink_response.go).
- dispatcher.go ValidateSource comment reflects the current call sites:
  every OTLP request validates before decoding, not just empty batches.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…es and bearer scheme

- TestDetectEncoding now covers application/protobuf (OTLP spec alias) and
  mixed/upper-case media types (Application/Protobuf, APPLICATION/JSON, etc.)
  so the RFC 9110 case-insensitive contract cannot regress unnoticed.
- TestOTLPSource adds two bearer-auth integration cases: lowercase `bearer ` and
  mixed-case `BeArEr ` schemes, mirroring the case-insensitive prefix parsing
  in parseAuthParams.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… sinks

The 409 conflict paths used to reuse the first source or sink with a matching
name without checking whether it was the expected kind. A pre-existing HTTP
source named "OTLP Source" would have been saved as SOURCE_ID — and a later
cleanup run would have deleted that unrelated HTTP source. A pre-existing
sink with the same name could have routed records to the wrong table or
signal filter.

- Source 409: filter by `name AND type=="otlp"`. If a non-OTLP source matches
  the name, fail with a clear message listing the conflicting type.
- Sink 409: verify the existing sink's tableId and allowedSignals match what
  the script would have created. Refuse to reuse otherwise.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@Matovidlo Matovidlo requested a review from jordanrburger May 15, 2026 10:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants