feat(stream): add native OTLP/HTTP endpoints (logs, metrics, traces)#2595
Closed
jordanrburger wants to merge 2 commits into
Closed
feat(stream): add native OTLP/HTTP endpoints (logs, metrics, traces)#2595jordanrburger wants to merge 2 commits into
jordanrburger wants to merge 2 commits into
Conversation
Adds an OTLP/HTTP receiver to the Stream HTTP source so any OpenTelemetry SDK can export logs directly to Keboola without a Collector sidecar. The OTLP handler rides on the existing fasthttp server and dispatcher. It decodes ExportLogsServiceRequest (protobuf or JSON, optional gzip), flattens the nested resource/scope/record tree into one ordered map per log record, and dispatches each record through the standard pipeline via a new recordctx.Context implementation. No changes to the sink router, column renderer, or sinks. The response is OTLP-conformant: matching Content-Type, partial_success on per-record failures, and 5xx escalation only when every record fails with a retryable status — so non-retryable rejections (4xx) are not retried by the client. Phase 1 ships logs only. Metrics and traces flattening, header-based auth, and the OTLP Status proto error body are explicit follow-ups. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Extends the OTLP receiver with metrics and traces support. Same fan-out pattern as logs: one OTLP request produces N FlatRecord rows dispatched individually through the existing pipeline. Metrics: all five OTLP metric types (gauge, sum, histogram, exponential_histogram, summary) explode to one record per data point. Type-specific fields (value, count, bucket_counts, quantile_values, is_monotonic, aggregation_temporality, etc.) are emitted only when applicable so the column renderer's defaultValue applies to fields the metric type doesn't carry. Traces: one record per span. Events and links are kept nested under the span as ordered arrays — they have no meaning detached from their parent span, so exploding them would create orphan rows. Handler refactored to use a signal-agnostic pipeline (handle method parameterized by decode and response-build function values), eliminating ~50 lines of duplicated boilerplate across HandleLogs / HandleMetrics / HandleTraces. Tests cover all five metric types, empty/optional field handling, events and links, and combinatorial fan-out across resource/scope/span. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This was referenced May 11, 2026
Contributor
|
closing in favor of #2598 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds a native OTLP/HTTP receiver to the Stream HTTP source so any OpenTelemetry SDK can export logs, metrics, and traces directly to Keboola — no Collector sidecar, no Docker, just an env var:
The handler rides on the existing fasthttp server and dispatcher. It decodes the relevant OTLP request (protobuf or JSON, optional gzip), flattens the nested
resource → scope → recordtree into one*orderedmap.OrderedMapper record/data-point/span, and dispatches each through the standard pipeline via a newrecordctx.Contextimplementation. The sink router, column renderer, and sinks are unchanged.How it fits together
Signal coverage
POST /v1/logsPOST /v1/metricsPOST /v1/tracesMetric type-specific fields (
value,count,bucket_counts,quantile_values,is_monotonic,aggregation_temporality, …) are emitted only when they apply — Path columns fall back todefaultValuefor fields that don't apply to a given metric type.Response semantics (OTLP-conformant)
200200partial_successwith rejected count (client should NOT retry)5xx/429400/415Response
Content-Typealways matches the request (application/x-protobuforapplication/json).What's new
source/type/otlpsource/config.goEnabledflag)source/type/otlpsource/decode.gosource/type/otlpsource/flatten_common.goattributesToMap,anyValueToInterface,formatTimestamp,makeScopeMapsource/type/otlpsource/flatten_logs.goFlattenLogs: 1 request → N recordssource/type/otlpsource/flatten_metrics.goFlattenMetricsfor all five metric typessource/type/otlpsource/flatten_traces.goFlattenTraceswith nested events + linkssource/type/otlpsource/dispatch.goDispatchResultsource/type/otlpsource/response.goBuildLogsResponse/BuildMetricsResponse/BuildTracesResponsewith partial-success / 5xx escalationsource/type/otlpsource/handler.gohandle()parameterized by decode + response-build function valuesmapping/recordctx/otlp.goContextwrapping a pre-flattened bodyMapsource/type/httpsource/httpsource.goPOST/OPTIONS /otlp/.../v1/{logs,metrics,traces}go.modpdataandproto/otlppromoted from indirect → direct (both were already pulled in transitively)Tests
decode_test.go— encoding detection, gzip decompress, proto/JSON roundtrip for all three signals, invalid inputsflatten_logs_test.go— combinatorial fan-out, allpcommon.Valuetypes, optional ID omissionflatten_metrics_test.go— all 5 metric types, optional sum/min/max omission for histograms, quantile values for summary, mixed-type batchesflatten_traces_test.go— full span shape, parent-span-ID omission for root spans, events, links, combinatorial fan-out, empty events/linksrecordctx/otlp_test.go—BodyMappass-through, lazyBodyBytes,JSONValue,ReleaseBuffersAll green under
-race.Test plan
go build ./internal/pkg/service/stream/...passesgo vetclean on new packagesgo test -racegreen forotlpsourceandrecordctxtask lintpasses in CItask testspasses in CI (existingTestHTTPSourcerequiresUNIT_ETCD_ENDPOINT; not exercised locally)OTEL_EXPORTER_OTLP_ENDPOINTin an OTel SDK and confirm records land in storageOut of scope / explicit follow-ups
(projectID, sourceID), so all three signals from one endpoint land in the same sinks. To use the three different column schemas the design proposes, the user currently needs three separate sources. Adding signal-aware routing (or documenting the three-source pattern as the recommendation) is a product/UX decision.google.rpc.Statuserror body for 4xx — currently we fall back to the existing httpsource error JSON shape. Works for clients that branch on status code; doesn't strictly conform to the OTLP error-response wire format./stream/...).otel_logs/otel_metrics/otel_tracestables (Phase 3 in the design doc).dispatcher.Dispatcher+ etcd, mirroringTestHTTPSource.🤖 Generated with Claude Code