Skip to content

feat(stream): add native OTLP/HTTP endpoints (logs, metrics, traces)#2595

Closed
jordanrburger wants to merge 2 commits into
mainfrom
feat/stream-otlp-logs-phase1
Closed

feat(stream): add native OTLP/HTTP endpoints (logs, metrics, traces)#2595
jordanrburger wants to merge 2 commits into
mainfrom
feat/stream-otlp-logs-phase1

Conversation

@jordanrburger
Copy link
Copy Markdown

@jordanrburger jordanrburger commented May 11, 2026

Summary

Adds a native OTLP/HTTP receiver to the Stream HTTP source so any OpenTelemetry SDK can export logs, metrics, and traces directly to Keboola — no Collector sidecar, no Docker, just an env var:

export OTEL_EXPORTER_OTLP_ENDPOINT=https://stream.keboola.com/otlp/<projectID>/<sourceID>/<secret>

The handler rides on the existing fasthttp server and dispatcher. It decodes the relevant OTLP request (protobuf or JSON, optional gzip), flattens the nested resource → scope → record tree into one *orderedmap.OrderedMap per record/data-point/span, and dispatches each through the standard pipeline via a new recordctx.Context implementation. The sink router, column renderer, and sinks are unchanged.

How it fits together

POST /otlp/<projectID>/<sourceID>/<secret>/v1/{logs|metrics|traces}
        │
        ├─ DetectEncoding (Content-Type)
        ├─ DecompressIfGzip (Content-Encoding)
        ├─ Decode{Logs|Metrics|Traces} → pdata
        ├─ Flatten{Logs|Metrics|Traces} → []FlatRecord    (1 request → N records)
        └─ DispatchRecords
              │ for each record:
              │   recordctx.FromOTLP(...)
              │   dispatcher.Dispatch(...)                  (existing pipeline)
              └─ Build{Logs|Metrics|Traces}Response          (OTLP partial_success / 5xx)

Signal coverage

Signal Route One record per
Logs POST /v1/logs LogRecord
Metrics POST /v1/metrics data point (any of gauge / sum / histogram / exponential_histogram / summary)
Traces POST /v1/traces Span (events and links nested under the span)

Metric type-specific fields (value, count, bucket_counts, quantile_values, is_monotonic, aggregation_temporality, …) are emitted only when they apply — Path columns fall back to defaultValue for fields that don't apply to a given metric type.

Response semantics (OTLP-conformant)

Outcome Status Body
All records dispatched OK 200 empty service response
Some records rejected 200 partial_success with rejected count (client should NOT retry)
All records rejected, retryable 5xx / 429 top-level error (client SHOULD retry the batch)
Malformed body / unsupported encoding 400 / 415 existing httpsource error shape

Response Content-Type always matches the request (application/x-protobuf or application/json).

What's new

File Purpose
source/type/otlpsource/config.go Phase-1 Config (Enabled flag)
source/type/otlpsource/decode.go Content-Type detection, gzip decompress, plog/pmetric/ptrace proto+JSON unmarshal
source/type/otlpsource/flatten_common.go attributesToMap, anyValueToInterface, formatTimestamp, makeScopeMap
source/type/otlpsource/flatten_logs.go FlattenLogs: 1 request → N records
source/type/otlpsource/flatten_metrics.go FlattenMetrics for all five metric types
source/type/otlpsource/flatten_traces.go FlattenTraces with nested events + links
source/type/otlpsource/dispatch.go Sequential dispatch loop, aggregated DispatchResult
source/type/otlpsource/response.go BuildLogsResponse / BuildMetricsResponse / BuildTracesResponse with partial-success / 5xx escalation
source/type/otlpsource/handler.go Signal-agnostic handle() parameterized by decode + response-build function values
mapping/recordctx/otlp.go New Context wrapping a pre-flattened bodyMap
source/type/httpsource/httpsource.go Route registration: POST/OPTIONS /otlp/.../v1/{logs,metrics,traces}
go.mod pdata and proto/otlp promoted from indirect → direct (both were already pulled in transitively)

Tests

  • decode_test.go — encoding detection, gzip decompress, proto/JSON roundtrip for all three signals, invalid inputs
  • flatten_logs_test.go — combinatorial fan-out, all pcommon.Value types, optional ID omission
  • flatten_metrics_test.go — all 5 metric types, optional sum/min/max omission for histograms, quantile values for summary, mixed-type batches
  • flatten_traces_test.go — full span shape, parent-span-ID omission for root spans, events, links, combinatorial fan-out, empty events/links
  • recordctx/otlp_test.goBodyMap pass-through, lazy BodyBytes, JSONValue, ReleaseBuffers

All green under -race.

Test plan

  • go build ./internal/pkg/service/stream/... passes
  • go vet clean on new packages
  • go test -race green for otlpsource and recordctx
  • task lint passes in CI
  • task tests passes in CI (existing TestHTTPSource requires UNIT_ETCD_ENDPOINT; not exercised locally)
  • End-to-end smoke test against a real source — set OTEL_EXPORTER_OTLP_ENDPOINT in an OTel SDK and confirm records land in storage

Out of scope / explicit follow-ups

  • Per-signal sink routing. The dispatcher walks by (projectID, sourceID), so all three signals from one endpoint land in the same sinks. To use the three different column schemas the design proposes, the user currently needs three separate sources. Adding signal-aware routing (or documenting the three-source pattern as the recommendation) is a product/UX decision.
  • OTLP google.rpc.Status error body for 4xx — currently we fall back to the existing httpsource error JSON shape. Works for clients that branch on status code; doesn't strictly conform to the OTLP error-response wire format.
  • Header-based auth as an alternative to URL-embedded secrets (the URL form inherits the same access-log exposure as /stream/...).
  • Pre-built source templates with the recommended column mappings for otel_logs / otel_metrics / otel_traces tables (Phase 3 in the design doc).
  • Integration test with a real dispatcher.Dispatcher + etcd, mirroring TestHTTPSource.
  • Parallel dispatch for high-volume batches — currently sequential; fine for typical OTel batches (<=100 records).

🤖 Generated with Claude Code

Adds an OTLP/HTTP receiver to the Stream HTTP source so any OpenTelemetry
SDK can export logs directly to Keboola without a Collector sidecar.

The OTLP handler rides on the existing fasthttp server and dispatcher.
It decodes ExportLogsServiceRequest (protobuf or JSON, optional gzip),
flattens the nested resource/scope/record tree into one ordered map per
log record, and dispatches each record through the standard pipeline
via a new recordctx.Context implementation. No changes to the sink
router, column renderer, or sinks.

The response is OTLP-conformant: matching Content-Type, partial_success
on per-record failures, and 5xx escalation only when every record fails
with a retryable status — so non-retryable rejections (4xx) are not
retried by the client.

Phase 1 ships logs only. Metrics and traces flattening, header-based
auth, and the OTLP Status proto error body are explicit follow-ups.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Extends the OTLP receiver with metrics and traces support. Same fan-out
pattern as logs: one OTLP request produces N FlatRecord rows dispatched
individually through the existing pipeline.

Metrics: all five OTLP metric types (gauge, sum, histogram,
exponential_histogram, summary) explode to one record per data point.
Type-specific fields (value, count, bucket_counts, quantile_values,
is_monotonic, aggregation_temporality, etc.) are emitted only when
applicable so the column renderer's defaultValue applies to fields the
metric type doesn't carry.

Traces: one record per span. Events and links are kept nested under the
span as ordered arrays — they have no meaning detached from their parent
span, so exploding them would create orphan rows.

Handler refactored to use a signal-agnostic pipeline (handle method
parameterized by decode and response-build function values), eliminating
~50 lines of duplicated boilerplate across HandleLogs / HandleMetrics /
HandleTraces.

Tests cover all five metric types, empty/optional field handling,
events and links, and combinatorial fan-out across resource/scope/span.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@jordanrburger jordanrburger changed the title feat(stream): add native OTLP/HTTP /v1/logs endpoint (Phase 1) feat(stream): add native OTLP/HTTP endpoints (logs, metrics, traces) May 11, 2026
@Matovidlo
Copy link
Copy Markdown
Contributor

closing in favor of #2598

@Matovidlo Matovidlo closed this May 11, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants