Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
b0f9403
feat(stream): introduce SourceTypeOTLP as a first-class source type
Matovidlo May 11, 2026
8825c67
feat(stream): add Signal() to recordctx and per-signal sink routing
Matovidlo May 11, 2026
0eafa5a
fix(stream): OTLP spec compliance — signal pass-through, error escala…
Matovidlo May 11, 2026
648a1b0
feat(stream): expose OTLPSource in Stream API
Matovidlo May 11, 2026
8c01341
test(stream): integration tests for OTLP source with real OTel payloads
Matovidlo May 11, 2026
ecfea9f
test(stream): add e2e tests for OTLP source creation and column mapping
Matovidlo May 11, 2026
71b4674
fix(stream): address CI lint and e2e failures in OTLP code
Matovidlo May 11, 2026
4e20889
fix(stream): complete e2e test fixture files for OTLP source tests
Matovidlo May 11, 2026
48d846f
fix(stream): fix go-jsonnet type compatibility in OTLP flatten code
Matovidlo May 12, 2026
1196dd2
fix(stream): suppress G115 gosec warnings for uint64->int64 casts in …
Matovidlo May 12, 2026
74c2e69
fix(stream): update OTLP flatten tests to match int64/[]any output types
Matovidlo May 12, 2026
2512fa9
feat(stream): expose allowedSignals in sink API for OTLP signal routing
Matovidlo May 12, 2026
848592a
fix(stream): fix gci import order in sink_response.go and add allowed…
Matovidlo May 12, 2026
9dd199d
feat(stream): expose otlp.baseUrl and otlp.secret, enum-validate allo…
Matovidlo May 15, 2026
00492d8
feat(stream): bearer-token auth + source-type enforcement on OTLP end…
Matovidlo May 15, 2026
5e0c3fc
fix(stream): make HTTP records bypass allowedSignals, filter TestSour…
Matovidlo May 15, 2026
0925e5d
fix(stream): saturate uint64→int64 in OTLP metric counts
Matovidlo May 15, 2026
d80f436
test(stream): tighten OTLP integration tests
Matovidlo May 15, 2026
3959fe1
chore(scripts): move stream-otlp-setup.sh into scripts/ and fix shell…
Matovidlo May 15, 2026
f166e52
feat(stream): document signal selector and expose OTLP fields on aggr…
Matovidlo May 15, 2026
f1d130c
fix(stream): authenticate before decode and strip Authorization from …
Matovidlo May 15, 2026
a8d6985
test(stream): clarify all-rejected escalation comment in PartialSucce…
Matovidlo May 15, 2026
c8b5a66
chore(scripts): restrict state-file perms and add scope_version to me…
Matovidlo May 15, 2026
f88c060
refactor(stream): named OTLPSignal type, shared SignalAccepted predic…
Matovidlo May 15, 2026
c046c5e
fix(stream): case-insensitive OTLP Content-Type and Authorization Bea…
Matovidlo May 15, 2026
a6f2aa4
test(stream): cover OTLPSource validation rules
Matovidlo May 15, 2026
667e537
docs(stream): tighten Goa examples, openapi signal enum, and stale co…
Matovidlo May 15, 2026
9dcac19
test(stream): pin OTLP content-type alias, case-insensitive media typ…
Matovidlo May 15, 2026
c3e1de4
chore(scripts): validate type/mapping when reusing existing source or…
Matovidlo May 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 193 additions & 4 deletions api/stream/design.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,11 +296,42 @@ var _ = Service("stream", func() {

Method("TestSource", func() {
Meta("openapi:summary", "Test source payload mapping")
Description("Tests configured mapping of the source and its sinks.")
Description("Tests configured mapping of the source and its sinks against an example request body.\n\n" +
"For HTTP sources the body is the raw payload that an HTTP client would send (treated as a single record).\n\n" +
"For OTLP sources the body must be a single **flattened OTLP record** — a JSON object with the same shape that " +
"the source produces internally for each log record, metric data point, or span. Attributes, resource, and scope " +
"are nested objects (not dotted keys); reference them in column mappings as `Body('attributes')['user.id']`, " +
"`Body('resource')['service.name']`, `Body('scope')['name']`, etc. " +
"Example flattened log record:\n" +
"```json\n" +
"{\n" +
" \"timestamp\": \"2024-01-15T10:30:00Z\",\n" +
" \"observed_timestamp\": \"2024-01-15T10:30:00Z\",\n" +
" \"severity_number\": 9,\n" +
" \"severity_text\": \"INFO\",\n" +
" \"body\": \"User logged in\",\n" +
" \"flags\": 0,\n" +
" \"attributes\": {\"user.id\": \"user-123\"},\n" +
" \"resource\": {\"service.name\": \"auth-service\"},\n" +
" \"scope\": {\"name\": \"github.com/my/auth\", \"version\": \"1.2.3\"}\n" +
"}\n" +
"```\n" +
"Do not send a raw OTLP protobuf or the multi-record envelope produced by an OTel SDK — the test endpoint " +
"intentionally evaluates one already-flattened record so the response is deterministic. " +
"For OTLP sources, the `signal` query parameter selects which signal type the request simulates for sink " +
"routing (`logs` by default); sinks whose `allowedSignals` filter rejects that signal are skipped in the result.")
Result(TestResult)
Payload(TestSourceRequest)
HTTP(func() {
POST("/branches/{branchId}/sources/{sourceId}/test")
// Re-declare the enum here so Swagger 2.0 (openapi.yaml) emits it on
// the parameter schema — Goa propagates named-type enums to OpenAPI 3
// automatically but drops them on the v2 parameter shape.
Param("signal", String, func() {
Description("OTLP signal type to simulate for sink routing.")
Enum("logs", "metrics", "traces")
Example("logs")
})
Meta("openapi:tag:test")
Response(StatusOK)
SourceNotFoundError()
Expand Down Expand Up @@ -955,15 +986,86 @@ var SourceResponse = func() {

var Source = Type("Source", func() {
SourceResponse()
// HTTP and OTLP source examples are mutually exclusive — a real response
// carries exactly one of the type-specific blocks. Providing two named
// examples keeps the OpenAPI documentation from suggesting the impossible
// "type: http with both http and otlp blocks" shape that Goa would otherwise
// auto-synthesize from the optional attributes.
Example("http_source", func() {
Description("HTTP source response shape.")
Value(Val{
"projectId": 1234,
Comment thread
Matovidlo marked this conversation as resolved.
"branchId": 5678,
"sourceId": "my-http-source",
"type": "http",
"name": "My HTTP Source",
"description": "",
"http": Val{
"url": "https://stream-in.keboola.com/EXAMPLE-SECRET-PLACEHOLDER-XXXXXXXXXXXXXXXXXXXXX",
},
"version": exampleVersion(),
"created": exampleCreated(),
})
})
Example("otlp_source", func() {
Description("OTLP source response shape.")
Value(Val{
"projectId": 1234,
"branchId": 5678,
"sourceId": "my-otlp-source",
"type": "otlp",
"name": "My OTLP Source",
"description": "",
"otlp": Val{
"url": "https://stream-in.keboola.com/otlp/1234/my-otlp-source/EXAMPLE-SECRET-PLACEHOLDER-XXXXXXXXXXXXXXXXXXXXX",
"baseUrl": "https://stream-in.keboola.com/otlp/1234/my-otlp-source",
"secret": "EXAMPLE-SECRET-PLACEHOLDER-XXXXXXXXXXXXXXXXXXXXX",
},
"version": exampleVersion(),
"created": exampleCreated(),
})
})
})

// exampleVersion and exampleCreated build the required version/created blocks
// shared by every Source/AggregatedSource example so the OpenAPI documentation
// matches the schema (which marks both fields as required).
func exampleVersion() Val {
return Val{
"number": 1,
"hash": "f43e93acd97eceb3",
"description": "New source.",
"at": "2024-01-15T10:00:00.000Z",
"by": Val{
"type": "user",
"tokenId": "896455",
"tokenDesc": "john.green@company.com",
"userId": "578621",
"userName": "John Green",
},
}
}

func exampleCreated() Val {
return Val{
"at": "2024-01-15T10:00:00.000Z",
"by": Val{
"type": "user",
"tokenId": "896455",
"tokenDesc": "john.green@company.com",
"userId": "578621",
"userName": "John Green",
},
}
}

var Sources = Type("Sources", ArrayOf(Source), func() {
Description(fmt.Sprintf("List of sources, max %d sources per a branch.", source.MaxSourcesPerBranch))
})

var SourceType = Type("SourceType", String, func() {
Meta("struct:field:type", "= definition.SourceType", "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition")
Enum(definition.SourceTypeHTTP.String())
Enum(definition.SourceTypeHTTP.String(), definition.SourceTypeOTLP.String())
Example(definition.SourceTypeHTTP.String())
})

Expand All @@ -986,8 +1088,23 @@ var UpdateSourceRequest = Type("UpdateSourceRequest", func() {
SourceFields(OpUpdate)
})

// OTLPSignal is the named enum reused by allowedSignals on sinks and the
// signal query param on the TestSource endpoint. Naming the type means
// validation errors report a clean field path instead of the doubly-indexed
// `allowedSignals[0].allowedSignals[*]` Goa produces for an anonymous enum.
var OTLPSignal = Type("OTLPSignal", String, func() {
Description("OTLP signal type — one of logs, metrics, or traces.")
Enum("logs", "metrics", "traces")
Example("logs")
})

var TestSourceRequest = Type("TestSourceRequest", func() {
SourceKeyRequest()
Attribute("signal", OTLPSignal, func() {
Comment thread
Matovidlo marked this conversation as resolved.
Description(`OTLP signal type to simulate for sink routing. Only applies to OTLP sources — ignored for HTTP sources. ` +
`Defaults to "logs" if omitted. Sinks whose ` + "`allowedSignals`" + ` filter rejects this signal are skipped in the result.`)
Example("logs")
})
})

var TestResult = Type("TestResult", func() {
Expand Down Expand Up @@ -1074,11 +1191,14 @@ var SourceFields = func(op OperationType) {
Example("The source receives events from Github.")
})

// HTTP - sub-definition - read-only
// Type-specific details - read-only
if op == OpRead {
Attribute("http", HTTPSource, func() {
Description(fmt.Sprintf(`HTTP source details for "type" = "%s".`, definition.SourceTypeHTTP))
})
Attribute("otlp", OTLPSource, func() {
Description(fmt.Sprintf(`OTLP source details for "type" = "%s".`, definition.SourceTypeOTLP))
})
Comment thread
Matovidlo marked this conversation as resolved.
}

// Required fields
Expand All @@ -1098,11 +1218,35 @@ var HTTPSource = Type("HTTPSource", func() {
Description(fmt.Sprintf(`HTTP source details for "type" = "%s".`, definition.SourceTypeHTTP))
Attribute("url", String, func() {
Description("URL of the HTTP source. Contains secret used for authentication.")
Example("https://stream-in.keboola.com/G0lpTbz0vhakDicfoDQQ3BCzGYdW3qewd1D3eUbqETygHKGb")
Example("https://stream-in.keboola.com/EXAMPLE-SECRET-PLACEHOLDER-XXXXXXXXXXXXXXXXXXXXX")
})
Required("url")
})

// OTLP Source----------------------------------------------------------------------------------------------------------

var OTLPSource = Type("OTLPSource", func() {
Description(fmt.Sprintf(`OTLP/HTTP source details for "type" = "%s".`, definition.SourceTypeOTLP))
Attribute("url", String, func() {
Description("Endpoint URL with the secret embedded as the last path segment. " +
"Convenient for SDKs that authenticate by URL only. " +
"The OpenTelemetry SDK automatically appends /v1/logs, /v1/metrics, or /v1/traces based on the signal type — " +
"do not append a signal path yourself. Most SDK exporters reject or silently strip the suffix.")
Example("https://stream-in.keboola.com/otlp/123/my-source/EXAMPLE-SECRET-PLACEHOLDER-XXXXXXXXXXXXXXXXXXXXX")
})
Attribute("baseUrl", String, func() {
Description("Endpoint URL without the secret. Use this together with the `secret` field via the " +
"`Authorization: Bearer <secret>` header so the secret stays out of access/CDN/APM logs. " +
"The OpenTelemetry SDK appends /v1/logs, /v1/metrics, or /v1/traces automatically.")
Example("https://stream-in.keboola.com/otlp/123/my-source")
})
Attribute("secret", String, func() {
Description("48-character secret authenticating writes to this source. Send it as `Authorization: Bearer <secret>` to the `baseUrl`.")
Example("EXAMPLE-SECRET-PLACEHOLDER-XXXXXXXXXXXXXXXXXXXXX")
})
Required("url", "baseUrl", "secret")
})

// Sink ----------------------------------------------------------------------------------------------------------------

var SinkResponse = func() {
Expand Down Expand Up @@ -1312,6 +1456,13 @@ var SinkFields = func(op OperationType) {
Example("The sink stores records to a table.")
})

Attribute("allowedSignals", ArrayOf(OTLPSignal), func() {
Description(`Restricts the sink to specific OTLP signal types. ` +
`Empty (default) accepts all signals. ` +
`Only relevant for OTLP sources; HTTP sources ignore this field.`)
Example([]string{"logs"})
})

// Table sub-definition
switch op {
case OpRead:
Expand Down Expand Up @@ -1573,6 +1724,44 @@ var AggregatedSource = Type("AggregatedSource", func() {
SourceResponse()
Attribute("sinks", AggregatedSinks)
Required("sinks")
// Mutually exclusive type-specific blocks: see the Source definition above
// for the rationale behind these explicit named examples.
Example("http_source", func() {
Description("HTTP source with aggregated sink statistics.")
Value(Val{
"projectId": 1234,
Comment thread
Matovidlo marked this conversation as resolved.
"branchId": 5678,
"sourceId": "my-http-source",
"type": "http",
"name": "My HTTP Source",
"description": "",
"http": Val{
"url": "https://stream-in.keboola.com/EXAMPLE-SECRET-PLACEHOLDER-XXXXXXXXXXXXXXXXXXXXX",
},
"version": exampleVersion(),
"created": exampleCreated(),
"sinks": []any{},
})
})
Example("otlp_source", func() {
Description("OTLP source with aggregated sink statistics.")
Value(Val{
"projectId": 1234,
"branchId": 5678,
"sourceId": "my-otlp-source",
"type": "otlp",
"name": "My OTLP Source",
"description": "",
"otlp": Val{
"url": "https://stream-in.keboola.com/otlp/1234/my-otlp-source/EXAMPLE-SECRET-PLACEHOLDER-XXXXXXXXXXXXXXXXXXXXX",
"baseUrl": "https://stream-in.keboola.com/otlp/1234/my-otlp-source",
"secret": "EXAMPLE-SECRET-PLACEHOLDER-XXXXXXXXXXXXXXXXXXXXX",
},
"version": exampleVersion(),
"created": exampleCreated(),
"sinks": []any{},
})
})
})

var AggregatedSinks = Type("AggregatedSinks", ArrayOf(AggregatedSink))
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading