Skip to content

Commit 03d0d4d

Browse files
chattonjulienrbrt
andauthored
feat(execution/grpc): adding support for grpc otlp (#3300)
* feat: adding support for grpc oltp * chore: fix linting * cl --------- Co-authored-by: Julien Robert <julien@rbrt.fr>
1 parent 05979c1 commit 03d0d4d

7 files changed

Lines changed: 305 additions & 2 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1111

1212
### Changes
1313

14+
- Add support for otlp in execution/grpc. [#3300](https://github.com/evstack/ev-node/pull/3300)
1415
- Optimization of mutex usage in cache for reaper [#3286](https://github.com/evstack/ev-node/pull/3286)
1516
- Add Unix domain socket support for gRPC execution endpoints via `unix:///path/to/socket` [#3297](https://github.com/evstack/ev-node/pull/3297)
1617
- **BREAKING:** (execution/grpc)
17-
- Move execution service where it belongs in execution/grpc. []()
18+
- Move execution service where it belongs in execution/grpc. [#3302](https://github.com/evstack/ev-node/pull/3302)
1819
- Replace legacy gRPC execution `txs` payload fields with `tx_batch` so clients and servers use contiguous transaction buffers [#3297](https://github.com/evstack/ev-node/pull/3297)
1920
- Optimize metadata writes by making it async in cache store [#3298](https://github.com/evstack/ev-node/pull/3298)
2021
- Reduce tx cache retention to avoid OOM under (really) heavy tx load [#3299](https://github.com/evstack/ev-node/pull/3299)

execution/grpc/client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ func NewClient(url string, opts ...connect.ClientOption) (*Client, error) {
9898
if err != nil {
9999
return nil, err
100100
}
101+
opts = append([]connect.ClientOption{connect.WithInterceptors(outboundPropagationInterceptor())}, opts...)
101102
return &Client{
102103
client: v1connect.NewExecutorServiceClient(
103104
httpClient,

execution/grpc/go.mod

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,20 @@ require (
66
connectrpc.com/connect v1.19.2
77
connectrpc.com/grpcreflect v1.3.0
88
github.com/evstack/ev-node/core v1.0.0
9+
go.opentelemetry.io/otel v1.43.0
10+
go.opentelemetry.io/otel/sdk v1.43.0
11+
go.opentelemetry.io/otel/trace v1.43.0
912
golang.org/x/net v0.53.0
1013
google.golang.org/protobuf v1.36.11
1114
)
1215

13-
require golang.org/x/text v0.36.0 // indirect
16+
require (
17+
github.com/cespare/xxhash/v2 v2.3.0 // indirect
18+
github.com/go-logr/logr v1.4.3 // indirect
19+
github.com/go-logr/stdr v1.2.2 // indirect
20+
github.com/google/uuid v1.6.0 // indirect
21+
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
22+
go.opentelemetry.io/otel/metric v1.43.0 // indirect
23+
golang.org/x/sys v0.43.0 // indirect
24+
golang.org/x/text v0.36.0 // indirect
25+
)

execution/grpc/go.sum

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,46 @@ connectrpc.com/connect v1.19.2 h1:McQ83FGdzL+t60peksi0gXC7MQ/iLKgLduAnThbM0mo=
22
connectrpc.com/connect v1.19.2/go.mod h1:tN20fjdGlewnSFeZxLKb0xwIZ6ozc3OQs2hTXy4du9w=
33
connectrpc.com/grpcreflect v1.3.0 h1:Y4V+ACf8/vOb1XOc251Qun7jMB75gCUNw6llvB9csXc=
44
connectrpc.com/grpcreflect v1.3.0/go.mod h1:nfloOtCS8VUQOQ1+GTdFzVg2CJo4ZGaat8JIovCtDYs=
5+
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
6+
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
7+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
8+
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
59
github.com/evstack/ev-node/core v1.0.0 h1:s0Tx0uWHme7SJn/ZNEtee4qNM8UO6PIxXnHhPbbKTz8=
610
github.com/evstack/ev-node/core v1.0.0/go.mod h1:n2w/LhYQTPsi48m6lMj16YiIqsaQw6gxwjyJvR+B3sY=
11+
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
12+
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
13+
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
14+
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
15+
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
716
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
817
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
18+
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
19+
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
20+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
21+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
22+
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
23+
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
24+
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
25+
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
26+
go.opentelemetry.io/otel v1.43.0 h1:mYIM03dnh5zfN7HautFE4ieIig9amkNANT+xcVxAj9I=
27+
go.opentelemetry.io/otel v1.43.0/go.mod h1:JuG+u74mvjvcm8vj8pI5XiHy1zDeoCS2LB1spIq7Ay0=
28+
go.opentelemetry.io/otel/metric v1.43.0 h1:d7638QeInOnuwOONPp4JAOGfbCEpYb+K6DVWvdxGzgM=
29+
go.opentelemetry.io/otel/metric v1.43.0/go.mod h1:RDnPtIxvqlgO8GRW18W6Z/4P462ldprJtfxHxyKd2PY=
30+
go.opentelemetry.io/otel/sdk v1.43.0 h1:pi5mE86i5rTeLXqoF/hhiBtUNcrAGHLKQdhg4h4V9Dg=
31+
go.opentelemetry.io/otel/sdk v1.43.0/go.mod h1:P+IkVU3iWukmiit/Yf9AWvpyRDlUeBaRg6Y+C58QHzg=
32+
go.opentelemetry.io/otel/sdk/metric v1.43.0 h1:S88dyqXjJkuBNLeMcVPRFXpRw2fuwdvfCGLEo89fDkw=
33+
go.opentelemetry.io/otel/sdk/metric v1.43.0/go.mod h1:C/RJtwSEJ5hzTiUz5pXF1kILHStzb9zFlIEe85bhj6A=
34+
go.opentelemetry.io/otel/trace v1.43.0 h1:BkNrHpup+4k4w+ZZ86CZoHHEkohws8AY+WTX09nk+3A=
35+
go.opentelemetry.io/otel/trace v1.43.0/go.mod h1:/QJhyVBUUswCphDVxq+8mld+AvhXZLhe+8WVFxiFff0=
36+
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
37+
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
938
golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA=
1039
golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs=
40+
golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI=
41+
golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
1142
golang.org/x/text v0.36.0 h1:JfKh3XmcRPqZPKevfXVpI1wXPTqbkE5f7JA92a55Yxg=
1243
golang.org/x/text v0.36.0/go.mod h1:NIdBknypM8iqVmPiuco0Dh6P5Jcdk8lJL0CUebqK164=
1344
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
1445
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
46+
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
47+
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

execution/grpc/handler.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
// - http.Handler: The configured HTTP handler
2626
func NewExecutorServiceHandler(executor execution.Executor, opts ...connect.HandlerOption) http.Handler {
2727
server := NewServer(executor)
28+
opts = append([]connect.HandlerOption{connect.WithInterceptors(inboundPropagationInterceptor())}, opts...)
2829

2930
mux := http.NewServeMux()
3031

execution/grpc/otel_propagation.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package grpc
2+
3+
import (
4+
"context"
5+
6+
"connectrpc.com/connect"
7+
"go.opentelemetry.io/otel"
8+
"go.opentelemetry.io/otel/propagation"
9+
)
10+
11+
func inboundPropagationInterceptor() connect.UnaryInterceptorFunc {
12+
return connect.UnaryInterceptorFunc(func(next connect.UnaryFunc) connect.UnaryFunc {
13+
return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) {
14+
prop := otel.GetTextMapPropagator()
15+
ctx = prop.Extract(ctx, propagation.HeaderCarrier(req.Header()))
16+
return next(ctx, req)
17+
}
18+
})
19+
}
20+
21+
func outboundPropagationInterceptor() connect.UnaryInterceptorFunc {
22+
return connect.UnaryInterceptorFunc(func(next connect.UnaryFunc) connect.UnaryFunc {
23+
return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) {
24+
prop := otel.GetTextMapPropagator()
25+
prop.Inject(ctx, propagation.HeaderCarrier(req.Header()))
26+
return next(ctx, req)
27+
}
28+
})
29+
}
Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
package grpc
2+
3+
import (
4+
"context"
5+
"net/http/httptest"
6+
"testing"
7+
"time"
8+
9+
"connectrpc.com/connect"
10+
"go.opentelemetry.io/otel"
11+
"go.opentelemetry.io/otel/baggage"
12+
"go.opentelemetry.io/otel/propagation"
13+
sdktrace "go.opentelemetry.io/otel/sdk/trace"
14+
"go.opentelemetry.io/otel/sdk/trace/tracetest"
15+
"go.opentelemetry.io/otel/trace"
16+
17+
"github.com/evstack/ev-node/core/execution"
18+
)
19+
20+
func setupTracer(t *testing.T) (*tracetest.SpanRecorder, func()) {
21+
t.Helper()
22+
rec := tracetest.NewSpanRecorder()
23+
tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(rec))
24+
oldTP := otel.GetTracerProvider()
25+
oldProp := otel.GetTextMapPropagator()
26+
otel.SetTracerProvider(tp)
27+
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
28+
return rec, func() {
29+
_ = tp.Shutdown(context.Background())
30+
otel.SetTracerProvider(oldTP)
31+
otel.SetTextMapPropagator(oldProp)
32+
}
33+
}
34+
35+
func TestInboundMetadataCreatesChildSpanWithSameTraceID(t *testing.T) {
36+
rec, cleanup := setupTracer(t)
37+
defer cleanup()
38+
39+
tracer := otel.Tracer("test")
40+
parentCtx, parent := tracer.Start(context.Background(), "parent")
41+
defer parent.End()
42+
parentTraceID := parent.SpanContext().TraceID()
43+
44+
mockExec := &mockExecutor{getTxsFunc: func(ctx context.Context) ([][]byte, error) {
45+
_, span := tracer.Start(ctx, "server-child")
46+
span.End()
47+
return [][]byte{}, nil
48+
}}
49+
50+
handler := NewExecutorServiceHandler(mockExec)
51+
ts := httptest.NewServer(handler)
52+
defer ts.Close()
53+
54+
client, err := NewClient(ts.URL)
55+
if err != nil {
56+
t.Fatalf("NewClient failed: %v", err)
57+
}
58+
59+
_, err = client.GetTxs(parentCtx)
60+
if err != nil {
61+
t.Fatalf("GetTxs failed: %v", err)
62+
}
63+
64+
var found bool
65+
for _, s := range rec.Ended() {
66+
if s.Name() == "server-child" {
67+
found = true
68+
if s.SpanContext().TraceID() != parentTraceID {
69+
t.Fatalf("trace id mismatch: got %s want %s", s.SpanContext().TraceID(), parentTraceID)
70+
}
71+
}
72+
}
73+
if !found {
74+
t.Fatalf("server-child span not found")
75+
}
76+
}
77+
78+
func TestOutboundGRPCCallCarriesTraceparentMetadata(t *testing.T) {
79+
rec, cleanup := setupTracer(t)
80+
_ = rec
81+
defer cleanup()
82+
83+
tracer := otel.Tracer("test")
84+
ctx, parent := tracer.Start(context.Background(), "parent")
85+
defer parent.End()
86+
87+
gotTraceparent := ""
88+
captureHeader := connect.UnaryInterceptorFunc(func(next connect.UnaryFunc) connect.UnaryFunc {
89+
return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) {
90+
gotTraceparent = req.Header().Get("traceparent")
91+
return next(ctx, req)
92+
}
93+
})
94+
95+
mockExec := &mockExecutor{}
96+
handler := NewExecutorServiceHandler(mockExec, connect.WithInterceptors(captureHeader))
97+
ts := httptest.NewServer(handler)
98+
defer ts.Close()
99+
100+
client, err := NewClient(ts.URL)
101+
if err != nil {
102+
t.Fatalf("NewClient failed: %v", err)
103+
}
104+
105+
if _, err = client.GetTxs(ctx); err != nil {
106+
t.Fatalf("GetTxs failed: %v", err)
107+
}
108+
if gotTraceparent == "" {
109+
t.Fatalf("expected traceparent metadata to be propagated")
110+
}
111+
}
112+
113+
func TestOutboundGRPCCallCarriesPropagationHeaders(t *testing.T) {
114+
rec, cleanup := setupTracer(t)
115+
_ = rec
116+
defer cleanup()
117+
118+
tracer := otel.Tracer("test")
119+
ctx, parent := tracer.Start(context.Background(), "parent")
120+
defer parent.End()
121+
member, err := baggage.NewMember("tenant", "alpha")
122+
if err != nil {
123+
t.Fatalf("failed to create baggage member: %v", err)
124+
}
125+
bg, err := baggage.New(member)
126+
if err != nil {
127+
t.Fatalf("failed to create baggage: %v", err)
128+
}
129+
ctx = baggage.ContextWithBaggage(ctx, bg)
130+
131+
var gotTraceparent string
132+
var gotBaggage string
133+
captureHeader := connect.UnaryInterceptorFunc(func(next connect.UnaryFunc) connect.UnaryFunc {
134+
return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) {
135+
gotTraceparent = req.Header().Get("traceparent")
136+
gotBaggage = req.Header().Get("baggage")
137+
return next(ctx, req)
138+
}
139+
})
140+
141+
mockExec := &mockExecutor{}
142+
handler := NewExecutorServiceHandler(mockExec, connect.WithInterceptors(captureHeader))
143+
ts := httptest.NewServer(handler)
144+
defer ts.Close()
145+
146+
client, err := NewClient(ts.URL)
147+
if err != nil {
148+
t.Fatalf("NewClient failed: %v", err)
149+
}
150+
151+
if _, err = client.GetTxs(ctx); err != nil {
152+
t.Fatalf("GetTxs failed: %v", err)
153+
}
154+
155+
if gotTraceparent == "" {
156+
t.Fatalf("expected traceparent metadata to be propagated")
157+
}
158+
if gotBaggage == "" {
159+
t.Fatalf("expected baggage metadata to be propagated")
160+
}
161+
}
162+
163+
func TestEndToEndParentChildAcrossServerClientHop(t *testing.T) {
164+
rec, cleanup := setupTracer(t)
165+
defer cleanup()
166+
167+
tracer := otel.Tracer("test")
168+
var midSpan trace.Span
169+
170+
downstreamExec := &mockExecutor{getExecutionInfoFunc: func(ctx context.Context) (executionInfo execution.ExecutionInfo, err error) {
171+
_, span := tracer.Start(ctx, "downstream-child")
172+
span.End()
173+
return execution.ExecutionInfo{MaxGas: 1}, nil
174+
}}
175+
downstreamHandler := NewExecutorServiceHandler(downstreamExec)
176+
downstreamSrv := httptest.NewServer(downstreamHandler)
177+
defer downstreamSrv.Close()
178+
downstreamClient, err := NewClient(downstreamSrv.URL)
179+
if err != nil {
180+
t.Fatalf("NewClient failed: %v", err)
181+
}
182+
183+
upstreamExec := &mockExecutor{getTxsFunc: func(ctx context.Context) ([][]byte, error) {
184+
ctx, span := tracer.Start(ctx, "upstream-mid")
185+
midSpan = span
186+
defer span.End()
187+
_, err := downstreamClient.GetExecutionInfo(ctx)
188+
if err != nil {
189+
return nil, err
190+
}
191+
return [][]byte{}, nil
192+
}}
193+
upstreamHandler := NewExecutorServiceHandler(upstreamExec)
194+
upstreamSrv := httptest.NewServer(upstreamHandler)
195+
defer upstreamSrv.Close()
196+
197+
client, err := NewClient(upstreamSrv.URL)
198+
if err != nil {
199+
t.Fatalf("NewClient failed: %v", err)
200+
}
201+
202+
rootCtx, root := tracer.Start(context.Background(), "root")
203+
defer root.End()
204+
if _, err := client.GetTxs(rootCtx); err != nil {
205+
t.Fatalf("GetTxs failed: %v", err)
206+
}
207+
208+
time.Sleep(10 * time.Millisecond)
209+
210+
rootTraceID := root.SpanContext().TraceID()
211+
if midSpan.SpanContext().TraceID() != rootTraceID {
212+
t.Fatalf("mid span trace id mismatch")
213+
}
214+
var found bool
215+
for _, s := range rec.Ended() {
216+
if s.Name() == "downstream-child" {
217+
found = true
218+
if s.SpanContext().TraceID() != rootTraceID {
219+
t.Fatalf("downstream trace id mismatch")
220+
}
221+
}
222+
}
223+
if !found {
224+
t.Fatalf("downstream-child span not found")
225+
}
226+
}

0 commit comments

Comments
 (0)