Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions node/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ func newFullNode(

mainKV := store.NewEvNodeKVStore(database)
evstore := store.New(mainKV)
if nodeConfig.Instrumentation.IsTracingEnabled() {
evstore = store.WithTracingStore(evstore)
}

headerSyncService, err := initHeaderSyncService(mainKV, nodeConfig, genesis, p2pClient, logger)
if err != nil {
Expand Down
8 changes: 7 additions & 1 deletion pkg/rpc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/evstack/ev-node/pkg/config"
"github.com/evstack/ev-node/pkg/p2p"
"github.com/evstack/ev-node/pkg/store"
"github.com/evstack/ev-node/pkg/telemetry"
"github.com/evstack/ev-node/types"
pb "github.com/evstack/ev-node/types/pb/evnode/v1"
rpc "github.com/evstack/ev-node/types/pb/evnode/v1/v1connect"
Expand Down Expand Up @@ -413,8 +414,13 @@ func NewServiceHandler(
// Register custom HTTP endpoints
RegisterCustomHTTPEndpoints(mux, store, peerManager, config, bestKnown, logger)

var handler http.Handler = mux
if config.Instrumentation.IsTracingEnabled() {
handler = telemetry.ExtractTraceContext(mux)
}

// Use h2c to support HTTP/2 without TLS
return h2c.NewHandler(mux, &http2.Server{
return h2c.NewHandler(handler, &http2.Server{
IdleTimeout: 120 * time.Second,
MaxReadFrameSize: 1 << 24,
MaxConcurrentStreams: 100,
Expand Down
337 changes: 337 additions & 0 deletions pkg/store/tracing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,337 @@
package store

import (
"context"

ds "github.com/ipfs/go-datastore"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"

"github.com/evstack/ev-node/types"
)

var _ Store = (*tracedStore)(nil)

type tracedStore struct {
inner Store
tracer trace.Tracer
}

// WithTracingStore wraps a Store with OpenTelemetry tracing.
func WithTracingStore(inner Store) Store {
return &tracedStore{
inner: inner,
tracer: otel.Tracer("ev-node/store"),
}
}

func (t *tracedStore) Height(ctx context.Context) (uint64, error) {
ctx, span := t.tracer.Start(ctx, "Store.Height")
defer span.End()

height, err := t.inner.Height(ctx)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return height, err
}

span.SetAttributes(attribute.Int64("height", int64(height)))
return height, nil
}

func (t *tracedStore) GetBlockData(ctx context.Context, height uint64) (*types.SignedHeader, *types.Data, error) {
ctx, span := t.tracer.Start(ctx, "Store.GetBlockData",
trace.WithAttributes(attribute.Int64("height", int64(height))),
)
defer span.End()

header, data, err := t.inner.GetBlockData(ctx, height)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return header, data, err
}

return header, data, nil
}

func (t *tracedStore) GetBlockByHash(ctx context.Context, hash []byte) (*types.SignedHeader, *types.Data, error) {
ctx, span := t.tracer.Start(ctx, "Store.GetBlockByHash",
trace.WithAttributes(attribute.String("hash", string(hash))),
)
defer span.End()

header, data, err := t.inner.GetBlockByHash(ctx, hash)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return header, data, err
}

if header != nil {
span.SetAttributes(attribute.Int64("height", int64(header.Height())))
}
return header, data, nil
}

func (t *tracedStore) GetSignature(ctx context.Context, height uint64) (*types.Signature, error) {
ctx, span := t.tracer.Start(ctx, "Store.GetSignature",
trace.WithAttributes(attribute.Int64("height", int64(height))),
)
defer span.End()

sig, err := t.inner.GetSignature(ctx, height)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return sig, err
}

return sig, nil
}

func (t *tracedStore) GetSignatureByHash(ctx context.Context, hash []byte) (*types.Signature, error) {
ctx, span := t.tracer.Start(ctx, "Store.GetSignatureByHash",
trace.WithAttributes(attribute.String("hash", string(hash))),
)
defer span.End()

sig, err := t.inner.GetSignatureByHash(ctx, hash)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return sig, err
}

return sig, nil
}

func (t *tracedStore) GetHeader(ctx context.Context, height uint64) (*types.SignedHeader, error) {
ctx, span := t.tracer.Start(ctx, "Store.GetHeader",
trace.WithAttributes(attribute.Int64("height", int64(height))),
)
defer span.End()

header, err := t.inner.GetHeader(ctx, height)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return header, err
}

return header, nil
}

func (t *tracedStore) GetState(ctx context.Context) (types.State, error) {
ctx, span := t.tracer.Start(ctx, "Store.GetState")
defer span.End()

state, err := t.inner.GetState(ctx)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return state, err
}

span.SetAttributes(attribute.Int64("state.height", int64(state.LastBlockHeight)))
return state, nil
}

func (t *tracedStore) GetStateAtHeight(ctx context.Context, height uint64) (types.State, error) {
ctx, span := t.tracer.Start(ctx, "Store.GetStateAtHeight",
trace.WithAttributes(attribute.Int64("height", int64(height))),
)
defer span.End()

state, err := t.inner.GetStateAtHeight(ctx, height)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return state, err
}

return state, nil
}

func (t *tracedStore) GetMetadata(ctx context.Context, key string) ([]byte, error) {
ctx, span := t.tracer.Start(ctx, "Store.GetMetadata",
trace.WithAttributes(attribute.String("key", key)),
)
defer span.End()

data, err := t.inner.GetMetadata(ctx, key)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return data, err
}

span.SetAttributes(attribute.Int("value.size", len(data)))
return data, nil
}

func (t *tracedStore) SetMetadata(ctx context.Context, key string, value []byte) error {
ctx, span := t.tracer.Start(ctx, "Store.SetMetadata",
trace.WithAttributes(
attribute.String("key", key),
attribute.Int("value.size", len(value)),
),
)
defer span.End()

err := t.inner.SetMetadata(ctx, key, value)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}

return nil
}

func (t *tracedStore) Rollback(ctx context.Context, height uint64, aggregator bool) error {
ctx, span := t.tracer.Start(ctx, "Store.Rollback",
trace.WithAttributes(
attribute.Int64("height", int64(height)),
attribute.Bool("aggregator", aggregator),
),
)
defer span.End()

err := t.inner.Rollback(ctx, height, aggregator)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}

return nil
}

func (t *tracedStore) Close() error {
return t.inner.Close()
}

func (t *tracedStore) NewBatch(ctx context.Context) (Batch, error) {
ctx, span := t.tracer.Start(ctx, "Store.NewBatch")
defer span.End()

batch, err := t.inner.NewBatch(ctx)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}

return &tracedBatch{
inner: batch,
tracer: t.tracer,
}, nil
}

var _ Batch = (*tracedBatch)(nil)

type tracedBatch struct {
inner Batch
tracer trace.Tracer
}

func (b *tracedBatch) SaveBlockData(header *types.SignedHeader, data *types.Data, signature *types.Signature) error {
Comment thread
chatton marked this conversation as resolved.
_, span := b.tracer.Start(context.Background(), "Batch.SaveBlockData",
trace.WithAttributes(attribute.Int64("height", int64(header.Height()))),
)
defer span.End()

err := b.inner.SaveBlockData(header, data, signature)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}

return nil
}

func (b *tracedBatch) SetHeight(height uint64) error {
Comment thread
chatton marked this conversation as resolved.
_, span := b.tracer.Start(context.Background(), "Batch.SetHeight",
trace.WithAttributes(attribute.Int64("height", int64(height))),
)
defer span.End()

err := b.inner.SetHeight(height)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}

return nil
}

func (b *tracedBatch) UpdateState(state types.State) error {
Comment thread
chatton marked this conversation as resolved.
_, span := b.tracer.Start(context.Background(), "Batch.UpdateState",
trace.WithAttributes(attribute.Int64("state.height", int64(state.LastBlockHeight))),
)
defer span.End()

err := b.inner.UpdateState(state)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}

return nil
}

func (b *tracedBatch) Commit() error {
Comment thread
chatton marked this conversation as resolved.
_, span := b.tracer.Start(context.Background(), "Batch.Commit")
defer span.End()

err := b.inner.Commit()
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}

return nil
}

func (b *tracedBatch) Put(key ds.Key, value []byte) error {
Comment thread
chatton marked this conversation as resolved.
_, span := b.tracer.Start(context.Background(), "Batch.Put",
trace.WithAttributes(
attribute.String("key", key.String()),
attribute.Int("value.size", len(value)),
),
)
defer span.End()

err := b.inner.Put(key, value)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}

return nil
}

func (b *tracedBatch) Delete(key ds.Key) error {
Comment thread
chatton marked this conversation as resolved.
_, span := b.tracer.Start(context.Background(), "Batch.Delete",
trace.WithAttributes(attribute.String("key", key.String())),
)
defer span.End()

err := b.inner.Delete(key)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}

return nil
}
Loading
Loading