Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -382,11 +382,11 @@ require (
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/collector/component v1.51.1-0.20260205185216-81bc641f26c0 // indirect
go.opentelemetry.io/collector/pdata v1.51.1-0.20260205185216-81bc641f26c0 // indirect
go.opentelemetry.io/collector/pdata v1.51.1-0.20260205185216-81bc641f26c0
go.opentelemetry.io/contrib/detectors/gcp v1.39.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.39.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.39.0 // indirect
go.opentelemetry.io/proto/otlp v1.9.0 // indirect
go.opentelemetry.io/proto/otlp v1.9.0
gocloud.dev v0.45.0 // indirect
golang.org/x/image v0.39.0 // indirect
golang.org/x/oauth2 v0.36.0 // indirect
Expand Down
169 changes: 169 additions & 0 deletions internal/pkg/service/stream/mapping/recordctx/otlp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package recordctx

import (
"context"
"net"
"net/http"
"sort"
"strings"
"sync"
"time"

"github.com/keboola/go-utils/pkg/orderedmap"
"github.com/valyala/fastjson"

"github.com/keboola/keboola-as-code/internal/pkg/utils/errors"
)

type otlpContext struct {
ctx context.Context
timestamp time.Time
clientIP net.IP
headers *orderedmap.OrderedMap
bodyMap *orderedmap.OrderedMap

lock sync.Mutex
headersString *string
bodyBytes []byte
bodyBytesErr error
jsonValue *fastjson.Value
jsonValueErr error
}

// FromOTLP builds a Context from a single pre-flattened OTLP record body.
//
// timestamp is the request arrival time — the OTLP record's own timestamp
// stays inside bodyMap under "timestamp" so the column renderer can promote
// it to a dedicated column independently of the datetime column.
//
// headers is the original HTTP request headers map (pass through), since the
// OTLP transport rides on HTTP and downstream column mappings may extract
// values like User-Agent.
func FromOTLP(
ctx context.Context,
timestamp time.Time,
clientIP net.IP,
headers *orderedmap.OrderedMap,
bodyMap *orderedmap.OrderedMap,
) Context {
return &otlpContext{
ctx: ctx,
timestamp: timestamp,
clientIP: clientIP,
headers: headers,
bodyMap: bodyMap,
}
}

func (c *otlpContext) Ctx() context.Context {
return c.ctx
}

func (c *otlpContext) Timestamp() time.Time {
return c.timestamp
}

func (c *otlpContext) ClientIP() net.IP {
return c.clientIP
}

func (c *otlpContext) HeadersString() string {
c.lock.Lock()
defer c.lock.Unlock()

if c.headersString != nil {
return *c.headersString
}

var s string
if c.headers == nil {
s = ""
} else {
keys := c.headers.Keys()
lines := make([]string, 0, len(keys))
for _, k := range keys {
v, _ := c.headers.Get(k)
if str, ok := v.(string); ok {
lines = append(lines, http.CanonicalHeaderKey(k)+": "+str+"\n")
}
}
sort.Strings(lines)
s = strings.Join(lines, "")
}
c.headersString = &s
return s
}

func (c *otlpContext) HeadersMap() *orderedmap.OrderedMap {
if c.headers == nil {
return orderedmap.New()
}
return c.headers
}

func (c *otlpContext) ReleaseBuffers() {
c.lock.Lock()
defer c.lock.Unlock()
c.bodyBytes = nil
c.jsonValue = nil
}

func (c *otlpContext) BodyBytes() ([]byte, error) {
c.lock.Lock()
defer c.lock.Unlock()
return c.bodyBytesWithoutLock()
}

func (c *otlpContext) BodyLength() int {
b, err := c.BodyBytes()
if err != nil {
return 0
}
return len(b)
}

func (c *otlpContext) BodyMap() (*orderedmap.OrderedMap, error) {
return c.bodyMap, nil
}

func (c *otlpContext) JSONValue(parserPool *fastjson.ParserPool) (*fastjson.Value, error) {
c.lock.Lock()
defer c.lock.Unlock()

if c.jsonValue != nil || c.jsonValueErr != nil {
return c.jsonValue, c.jsonValueErr
}

body, err := c.bodyBytesWithoutLock()
if err != nil {
c.jsonValueErr = err
return nil, err
}

parser := parserPool.Get()
defer parserPool.Put(parser)

if v, err := parser.ParseBytes(body); err != nil {
c.jsonValueErr = errors.PrefixError(err, "cannot parse OTLP record JSON")
} else {
c.jsonValue = v
}
return c.jsonValue, c.jsonValueErr
}

func (c *otlpContext) bodyBytesWithoutLock() ([]byte, error) {
if c.bodyBytes != nil || c.bodyBytesErr != nil {
return c.bodyBytes, c.bodyBytesErr
}
if c.bodyMap == nil {
c.bodyBytes = []byte("{}")
return c.bodyBytes, nil
}
b, err := json.Marshal(c.bodyMap)
if err != nil {
c.bodyBytesErr = errors.PrefixError(err, "cannot serialize OTLP record body to JSON")
return nil, c.bodyBytesErr
}
c.bodyBytes = b
return c.bodyBytes, nil
}
130 changes: 130 additions & 0 deletions internal/pkg/service/stream/mapping/recordctx/otlp_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package recordctx

import (
"context"
stdjson "encoding/json"
"net"
"testing"
"time"

"github.com/keboola/go-utils/pkg/orderedmap"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/valyala/fastjson"
)

func TestOTLPContext_BodyMap_PassesThroughWithoutParsing(t *testing.T) {
t.Parallel()

body := orderedmap.New()
body.Set("severity_text", "INFO")
body.Set("body", "hello")

c := FromOTLP(context.Background(), time.Now(), net.IPv4(127, 0, 0, 1), nil, body)

gotMap, err := c.BodyMap()
require.NoError(t, err)
assert.Same(t, body, gotMap, "BodyMap should return the pre-flattened map without copying")
}

func TestOTLPContext_BodyBytes_LazyJSONMarshal(t *testing.T) {
t.Parallel()

body := orderedmap.New()
body.Set("severity_text", "WARN")
body.Set("count", 42)

c := FromOTLP(context.Background(), time.Now(), net.IPv4(10, 0, 0, 1), nil, body)

bytesA, err := c.BodyBytes()
require.NoError(t, err)

// Decoding the result must yield the same fields.
decoded := map[string]any{}
require.NoError(t, stdjson.Unmarshal(bytesA, &decoded))
assert.Equal(t, "WARN", decoded["severity_text"])
assert.InDelta(t, 42.0, decoded["count"], 0)

// Subsequent calls must return the cached slice (same pointer).
bytesB, err := c.BodyBytes()
require.NoError(t, err)
assert.Equal(t, &bytesA[0], &bytesB[0], "BodyBytes should cache the marshaled body")
}

func TestOTLPContext_BodyLength(t *testing.T) {
t.Parallel()

body := orderedmap.New()
body.Set("k", "v")
c := FromOTLP(context.Background(), time.Now(), nil, nil, body)

expected, err := c.BodyBytes()
require.NoError(t, err)
assert.Equal(t, len(expected), c.BodyLength())
}

func TestOTLPContext_JSONValue(t *testing.T) {
t.Parallel()

body := orderedmap.New()
body.Set("severity_text", "ERROR")
c := FromOTLP(context.Background(), time.Now(), nil, nil, body)

pool := &fastjson.ParserPool{}
v, err := c.JSONValue(pool)
require.NoError(t, err)
require.NotNil(t, v)
assert.Equal(t, "ERROR", string(v.GetStringBytes("severity_text")))
}

func TestOTLPContext_TimestampAndClientIP(t *testing.T) {
t.Parallel()

now := time.Date(2024, 5, 11, 12, 0, 0, 0, time.UTC)
ip := net.IPv4(192, 168, 1, 100)
c := FromOTLP(context.Background(), now, ip, nil, orderedmap.New())

assert.Equal(t, now, c.Timestamp())
assert.True(t, c.ClientIP().Equal(ip))
}

func TestOTLPContext_HeadersMap_NilSafe(t *testing.T) {
t.Parallel()

c := FromOTLP(context.Background(), time.Now(), nil, nil, orderedmap.New())
m := c.HeadersMap()
require.NotNil(t, m)
assert.Equal(t, 0, m.Len())
}

func TestOTLPContext_HeadersString(t *testing.T) {
t.Parallel()

headers := orderedmap.New()
headers.Set("Content-Type", "application/x-protobuf")
headers.Set("User-Agent", "otel-go/1.0")

c := FromOTLP(context.Background(), time.Now(), nil, headers, orderedmap.New())
s := c.HeadersString()

// Canonical header names, sorted, newline-terminated.
expected := "Content-Type: application/x-protobuf\nUser-Agent: otel-go/1.0\n"
assert.Equal(t, expected, s)
}

func TestOTLPContext_ReleaseBuffers(t *testing.T) {
t.Parallel()

body := orderedmap.New()
body.Set("k", "v")
c := FromOTLP(context.Background(), time.Now(), nil, nil, body)

_, err := c.BodyBytes()
require.NoError(t, err)
c.ReleaseBuffers()

// BodyMap survives — that's the source of truth.
got, err := c.BodyMap()
require.NoError(t, err)
assert.Same(t, body, got)
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/mapping/recordctx"
sinkRouter "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/router"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/source/dispatcher"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/source/type/otlpsource"
"github.com/keboola/keboola-as-code/internal/pkg/telemetry"
"github.com/keboola/keboola-as-code/internal/pkg/utils/errors"
)
Expand Down Expand Up @@ -148,6 +149,18 @@ func Start(ctx context.Context, d dependencies, cfg Config) error {
return nil
})

// Native OTLP/HTTP endpoints. The OTLP transport rides on the existing
// HTTP source (same server, same dispatcher) — the only new pieces are
// route registration, OTLP decoding, record flattening, and OTLP-conformant
// response construction.
otlpHandler := otlpsource.New(ctx, logger, d.Clock(), dp, errorHandler)
router.Options("/otlp/<projectID>/<sourceID>/<secret>/v1/logs", otlpHandler.HandleOptions)
router.Post("/otlp/<projectID>/<sourceID>/<secret>/v1/logs", otlpHandler.HandleLogs)
router.Options("/otlp/<projectID>/<sourceID>/<secret>/v1/metrics", otlpHandler.HandleOptions)
router.Post("/otlp/<projectID>/<sourceID>/<secret>/v1/metrics", otlpHandler.HandleMetrics)
router.Options("/otlp/<projectID>/<sourceID>/<secret>/v1/traces", otlpHandler.HandleOptions)
router.Post("/otlp/<projectID>/<sourceID>/<secret>/v1/traces", otlpHandler.HandleTraces)

// Prepare HTTP server
readBufferSize, err := safecast.Convert[int](cfg.ReadBufferSize.Bytes())
if err != nil {
Expand Down
16 changes: 16 additions & 0 deletions internal/pkg/service/stream/source/type/otlpsource/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Package otlpsource adds native OTLP/HTTP endpoints to the Stream HTTP source.
//
// It decodes OpenTelemetry signals (logs in Phase 1, metrics and traces later),
// flattens each nested record into an ordered JSON map, and dispatches every
// record through the standard sink pipeline via a recordctx.Context.
package otlpsource

type Config struct {
Enabled bool `configKey:"enabled" configUsage:"Enable native OTLP/HTTP endpoints on the HTTP source."`
}

func NewConfig() Config {
return Config{
Enabled: true,
}
}
Loading
Loading