Skip to content

Commit dc5edba

Browse files
committed
gs: Improve gatewayserver io tracing for ttigw & lbs lns
1 parent d24209c commit dc5edba

3 files changed

Lines changed: 101 additions & 12 deletions

File tree

pkg/gatewayserver/io/semtechws/ws.go

Lines changed: 60 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ import (
2929

3030
"github.com/gorilla/mux"
3131
"github.com/gorilla/websocket"
32+
"go.opentelemetry.io/otel/attribute"
33+
"go.opentelemetry.io/otel/codes"
34+
"go.opentelemetry.io/otel/trace"
3235
"go.thethings.network/lorawan-stack/v3/pkg/auth/rights"
3336
"go.thethings.network/lorawan-stack/v3/pkg/errors"
3437
"go.thethings.network/lorawan-stack/v3/pkg/frequencyplans"
@@ -37,6 +40,7 @@ import (
3740
"go.thethings.network/lorawan-stack/v3/pkg/log"
3841
"go.thethings.network/lorawan-stack/v3/pkg/random"
3942
"go.thethings.network/lorawan-stack/v3/pkg/ratelimit"
43+
"go.thethings.network/lorawan-stack/v3/pkg/telemetry/tracing"
4044
"go.thethings.network/lorawan-stack/v3/pkg/ttnpb"
4145
"go.thethings.network/lorawan-stack/v3/pkg/types"
4246
"go.thethings.network/lorawan-stack/v3/pkg/unique"
@@ -45,7 +49,10 @@ import (
4549
"google.golang.org/grpc/metadata"
4650
)
4751

48-
const pingIntervalJitter = 0.1
52+
const (
53+
pingIntervalJitter = 0.1
54+
tracerNamespace = "go.thethings.network/lorawan-stack/pkg/gatewayserver/io/semtechws"
55+
)
4956

5057
var (
5158
errGatewayID = errors.DefineInvalidArgument("invalid_gateway_id", "invalid gateway ID `{id}`")
@@ -118,6 +125,15 @@ func (s *srv) handleConnectionInfo(w http.ResponseWriter, r *http.Request) {
118125
))
119126
logger := log.FromContext(ctx)
120127

128+
tracer := tracing.FromContext(s.ctx).Tracer(tracerNamespace)
129+
ctx, span := tracer.Start(ctx, "HandleConnectionInfo",
130+
trace.WithAttributes(
131+
attribute.String("protocol", s.Protocol()),
132+
attribute.String("remote_addr", r.RemoteAddr),
133+
),
134+
)
135+
defer span.End()
136+
121137
assertAuth := func(ctx context.Context, ids *ttnpb.GatewayIdentifiers) error {
122138
ctx, hasAuth := withForwardedAuth(ctx, ids, r.Header.Get("Authorization"))
123139
if !hasAuth {
@@ -188,6 +204,21 @@ func (s *srv) handleTraffic(w http.ResponseWriter, r *http.Request) (err error)
188204
downstreamCh = make(chan []byte, 1)
189205
)
190206

207+
tracer := tracing.FromContext(s.ctx).Tracer(tracerNamespace)
208+
ctx, span := tracer.Start(ctx, "HandleTraffic",
209+
trace.WithAttributes(
210+
attribute.String("protocol", s.Protocol()),
211+
attribute.String("remote_addr", r.RemoteAddr),
212+
),
213+
)
214+
defer func() {
215+
if err != nil {
216+
span.RecordError(err)
217+
span.SetStatus(codes.Error, err.Error())
218+
}
219+
span.End()
220+
}()
221+
191222
ctx = log.NewContextWithFields(ctx, log.Fields(
192223
"endpoint", eps.Traffic,
193224
"remote_addr", r.RemoteAddr,
@@ -352,15 +383,23 @@ func (s *srv) handleTraffic(w http.ResponseWriter, r *http.Request) (err error)
352383
return err
353384
}
354385
case down := <-conn.Down():
355-
dnmsg, err := s.formatter.FromDownlink(ctx, down, conn.BandID(), time.Now(), downlinkTokens)
356-
if err != nil {
357-
logger.WithError(err).Warn("Failed to marshal downlink message")
386+
_, dnSpan := tracer.Start(ctx, "HandleDownMessage")
387+
dnmsg, dnErr := s.formatter.FromDownlink(ctx, down, conn.BandID(), time.Now(), downlinkTokens)
388+
if dnErr != nil {
389+
dnSpan.RecordError(dnErr)
390+
dnSpan.SetStatus(codes.Error, dnErr.Error())
391+
dnSpan.End()
392+
logger.WithError(dnErr).Warn("Failed to marshal downlink message")
358393
continue
359394
}
360-
if err = ws.WriteMessage(websocket.TextMessage, dnmsg); err != nil {
361-
logger.WithError(err).Warn("Failed to send downlink message")
362-
return err
395+
if dnErr = ws.WriteMessage(websocket.TextMessage, dnmsg); dnErr != nil {
396+
dnSpan.RecordError(dnErr)
397+
dnSpan.SetStatus(codes.Error, dnErr.Error())
398+
dnSpan.End()
399+
logger.WithError(dnErr).Warn("Failed to send downlink message")
400+
return dnErr
363401
}
402+
dnSpan.End()
364403
case downstream := <-downstreamCh:
365404
if err := ws.WriteMessage(websocket.TextMessage, downstream); err != nil {
366405
logger.WithError(err).Warn("Failed to send message downstream")
@@ -381,7 +420,20 @@ func (s *srv) handleTraffic(w http.ResponseWriter, r *http.Request) (err error)
381420
logger.WithError(err).Debug("Failed to read message")
382421
return err
383422
}
384-
downstream, err := s.formatter.HandleUp(ctx, data, ids, conn, time.Now(), downlinkTokens)
423+
var downstream []byte
424+
func() {
425+
_, msgSpan := tracer.Start(ctx, "HandleUpMessage",
426+
trace.WithAttributes(
427+
attribute.Int("message_size", len(data)),
428+
),
429+
)
430+
defer msgSpan.End()
431+
downstream, err = s.formatter.HandleUp(ctx, data, ids, conn, time.Now(), downlinkTokens)
432+
if err != nil {
433+
msgSpan.RecordError(err)
434+
msgSpan.SetStatus(codes.Error, err.Error())
435+
}
436+
}()
385437
if err != nil {
386438
return err
387439
}

pkg/gatewayserver/io/ttigw/ttigw.go

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ import (
2828
"github.com/coder/websocket"
2929
"github.com/gorilla/mux"
3030
"go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux"
31+
"go.opentelemetry.io/otel/attribute"
32+
"go.opentelemetry.io/otel/codes"
33+
"go.opentelemetry.io/otel/trace"
3134
lorav1 "go.thethings.industries/pkg/api/gen/tti/gateway/data/lora/v1"
3235
ttica "go.thethings.industries/pkg/ca"
3336
"go.thethings.network/lorawan-stack/v3/pkg/auth/mtls"
@@ -51,13 +54,15 @@ const (
5154
pingIntervalJitter = 0.1
5255
pingTimeout = 30 * time.Second
5356
subprotocol = "v1.lora.data.gateway.thethings.industries"
57+
tracerNamespace = "go.thethings.network/lorawan-stack/pkg/gatewayserver/io/ttigw"
5458
)
5559

5660
// Frontend implements the The Things Industries V1 gateway frontend.
5761
type Frontend struct {
5862
http.Handler
5963
server io.Server
6064
cfg Config
65+
tracer trace.Tracer
6166
}
6267

6368
var _ io.Frontend = (*Frontend)(nil)
@@ -87,6 +92,7 @@ func New(ctx context.Context, server io.Server, cfg Config) (*Frontend, error) {
8792
Handler: router,
8893
server: server,
8994
cfg: cfg,
95+
tracer: tracing.FromContext(ctx).Tracer(tracerNamespace),
9096
}
9197

9298
router.HandleFunc("/api/protocols/tti/v1", f.handleGet).Methods(http.MethodGet)
@@ -238,8 +244,23 @@ func (f *Frontend) ping(ctx context.Context, wsConn *websocket.Conn, srvConn *io
238244
}
239245
}
240246

241-
func (f *Frontend) handleConnection(wsConn *websocket.Conn, srvConn *io.Connection) error {
247+
func (f *Frontend) handleConnection(wsConn *websocket.Conn, srvConn *io.Connection) (err error) {
242248
ctx := srvConn.Context()
249+
250+
ctx, span := f.tracer.Start(ctx, "HandleConnection",
251+
trace.WithAttributes(
252+
attribute.String("gateway_uid", unique.ID(ctx, srvConn.Gateway().Ids)),
253+
attribute.String("protocol", f.Protocol()),
254+
),
255+
)
256+
defer func() {
257+
if err != nil {
258+
span.RecordError(err)
259+
span.SetStatus(codes.Error, err.Error())
260+
}
261+
span.End()
262+
}()
263+
243264
logger := log.FromContext(ctx)
244265

245266
gtwConfig, err := buildLoRaGatewayConfig(srvConn.PrimaryFrequencyPlan())
@@ -276,10 +297,10 @@ func (f *Frontend) handleConnection(wsConn *websocket.Conn, srvConn *io.Connecti
276297
return sendMessages(ctx, wsConn, msgCh)
277298
})
278299
wg.Go(func() error {
279-
return enqueueNetworkServerMessages(ctx, srvConn, gtwConfig, msgCh, dlTokens)
300+
return enqueueNetworkServerMessages(ctx, f.tracer, srvConn, gtwConfig, msgCh, dlTokens)
280301
})
281302
wg.Go(func() error {
282-
return readGatewayMessages(ctx, wsConn, srvConn, gtwConfig, dlTokens)
303+
return readGatewayMessages(ctx, f.tracer, wsConn, srvConn, gtwConfig, dlTokens)
283304
})
284305
return wg.Wait()
285306
}
@@ -308,6 +329,7 @@ func sendMessages(
308329

309330
func enqueueNetworkServerMessages(
310331
ctx context.Context,
332+
tracer trace.Tracer,
311333
srvConn *io.Connection,
312334
gtwConfig *lorav1.GatewayConfig,
313335
msgCh chan<- *lorav1.NetworkServerMessage,
@@ -320,12 +342,17 @@ func enqueueNetworkServerMessages(
320342
return ctx.Err()
321343
case down := <-srvConn.Down():
322344
logger.Debug("Send downlink message")
345+
_, dnSpan := tracer.Start(ctx, "HandleDownMessage")
323346
dlToken := dlTokens.Next(down, time.Now())
324347
msg, err := fromDownlinkMessage(gtwConfig, down)
325348
if err != nil {
349+
dnSpan.RecordError(err)
350+
dnSpan.SetStatus(codes.Error, err.Error())
351+
dnSpan.End()
326352
logger.WithError(err).Warn("Failed to convert downlink message")
327353
continue
328354
}
355+
dnSpan.End()
329356
select {
330357
case <-ctx.Done():
331358
return ctx.Err()
@@ -349,6 +376,7 @@ var (
349376

350377
func readGatewayMessages(
351378
ctx context.Context,
379+
tracer trace.Tracer,
352380
wsConn *websocket.Conn,
353381
srvConn *io.Connection,
354382
gtwConfig *lorav1.GatewayConfig,
@@ -404,9 +432,17 @@ func readGatewayMessages(
404432
continue
405433
}
406434

435+
_, msgSpan := tracer.Start(ctx, "HandleUpMessage",
436+
trace.WithAttributes(
437+
attribute.Int("message_size", len(buf)),
438+
),
439+
)
407440
if err := processGatewayMessage(ctx, srvConn, gtwConfig, dlTokens, &envelope, receivedAt); err != nil {
441+
msgSpan.RecordError(err)
442+
msgSpan.SetStatus(codes.Error, err.Error())
408443
logger.WithError(err).Warn("Failed to handle message")
409444
}
445+
msgSpan.End()
410446
}
411447
}
412448

pkg/telemetry/tracing/tracing.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
sdktrace "go.opentelemetry.io/otel/sdk/trace"
2525
semconv "go.opentelemetry.io/otel/semconv/v1.12.0"
2626
otrace "go.opentelemetry.io/otel/trace"
27+
"go.opentelemetry.io/otel/trace/noop"
2728
"go.thethings.network/lorawan-stack/v3/pkg/version"
2829
)
2930

@@ -49,7 +50,7 @@ func initResource(ctx context.Context) (*resource.Resource, error) {
4950
// If tracing is not enabled it returns a noop tracer provider instead.
5051
func Initialize(ctx context.Context, config *Config) (otrace.TracerProvider, func(context.Context) error, error) {
5152
if !config.Enable {
52-
return otrace.NewNoopTracerProvider(), func(_ context.Context) error { return nil }, nil
53+
return noop.NewTracerProvider(), func(_ context.Context) error { return nil }, nil
5354
}
5455

5556
exp, err := exporterFromConfig(ctx, config)

0 commit comments

Comments
 (0)