Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
16044cc
feat(go/adbc): prototype OpenTelemetry rotating trace file exporter
birschick-bq Apr 22, 2025
c585e89
Merge branch 'main' into dev/birschick-bq/go-otel-tracing
birschick-bq Apr 22, 2025
f16349a
Merge branch 'main' into dev/birschick-bq/go-otel-tracing
birschick-bq Apr 22, 2025
9e552d0
corrected lint issue
birschick-bq Apr 22, 2025
9bc88dd
corrected lint issues
birschick-bq Apr 22, 2025
27a6f12
improved instantiation
birschick-bq Apr 22, 2025
972f7a6
Handle exporter environment variable. Implement in base classes.
birschick-bq Apr 28, 2025
b282bb0
Correct lint issues.
birschick-bq Apr 28, 2025
ec140ee
More lint issues
birschick-bq Apr 28, 2025
766b191
Remove rotating file writer
birschick-bq Apr 28, 2025
dfa67f2
Small formatting improvements
birschick-bq Apr 28, 2025
706924c
Refactor InitTracer to database, instead of connection
birschick-bq Apr 29, 2025
fe248aa
Created driverbase.statement for implementing OTelTracing as a base c…
birschick-bq Apr 29, 2025
b9c9300
Simplify connection
birschick-bq Apr 29, 2025
317d056
Increased instrumentation example
birschick-bq Apr 30, 2025
a274a21
Corrected lint issue
birschick-bq Apr 30, 2025
2539a33
Merge branch 'main' into dev/birschick-bq/go-otel-tracing
birschick-bq Apr 30, 2025
ef3e25f
Attempt to correct 'bad file descriptor' test error
birschick-bq Apr 30, 2025
f2ba0a1
Correctly implemented file re-use
birschick-bq Apr 30, 2025
7c6fb51
Fixed file size test
birschick-bq Apr 30, 2025
1d79b0c
Revert inadvertant change on c#
birschick-bq Apr 30, 2025
8ac79df
Updates for code review comments
birschick-bq May 3, 2025
06c9884
Fix lint issues
birschick-bq May 3, 2025
591a639
Fix lint issues
birschick-bq May 3, 2025
86ff8ab
Fix empty traces exporter
birschick-bq May 3, 2025
89e31ed
Fix directory permissions on Linux
birschick-bq May 3, 2025
32c533d
Resolve more review comments
birschick-bq May 3, 2025
9615b50
Resolve more reviev comments
birschick-bq May 3, 2025
95b416d
Add NewDatabaseWithContext and two review comments
birschick-bq May 5, 2025
35613cd
remove extraneous file
birschick-bq May 5, 2025
0da4874
refactor methods with context to revert breaking change
birschick-bq May 6, 2025
038bd9b
code review updates
birschick-bq May 6, 2025
5b52eb3
code review updates
birschick-bq May 6, 2025
4bdc7c8
some more code review updates
birschick-bq May 6, 2025
762fb21
unit test for GetSetTraceParent
birschick-bq May 8, 2025
07ae106
code review changes
birschick-bq May 12, 2025
6a11233
remove unit test for validating trace parent
birschick-bq May 12, 2025
d6745ae
Merge branch 'main' into dev/birschick-bq/go-otel-tracing
birschick-bq May 13, 2025
b4cf69a
Merge branch 'main' into dev/birschick-bq/go-otel-tracing
birschick-bq May 13, 2025
5a72e2d
Merge branch 'main' into dev/birschick-bq/go-otel-tracing
birschick-bq May 13, 2025
705a3bf
Mark telemetry options with EXPERIMENTAL in go docs comments
birschick-bq May 14, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions go/adbc/adbc.go
Comment thread
birschick-bq marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,19 @@ const (
OptionKeyURI = "uri"
OptionKeyUsername = "username"
OptionKeyPassword = "password"
// EXPERIMENTAL. Sets/Gets the trace parent on OpenTelemetry traces
OptionKeyTelemetryTraceParent = "adbc.telemetry.trace_parent"
)

// EXPERIMENTAL. Traces Telemetry exporter option type
type OptionTelemetryExporter string

// EXPERIMENTAL. Traces Telemetry exporter options
const (
TelemetryExporterNone OptionTelemetryExporter = "none"
TelemetryExporterOtlp OptionTelemetryExporter = "otlp"
TelemetryExporterConsole OptionTelemetryExporter = "console"
TelemetryExporterAdbcFile OptionTelemetryExporter = "adbcfile"
)

type OptionIsolationLevel string
Expand Down
11 changes: 10 additions & 1 deletion go/adbc/driver/bigquery/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package bigquery

import (
"context"
"fmt"
"runtime/debug"
"strings"
Expand Down Expand Up @@ -111,8 +112,16 @@ func NewDriver(alloc memory.Allocator) adbc.Driver {
}

func (d *driverImpl) NewDatabase(opts map[string]string) (adbc.Database, error) {
return d.NewDatabaseWithContext(context.Background(), opts)
}

func (d *driverImpl) NewDatabaseWithContext(ctx context.Context, opts map[string]string) (adbc.Database, error) {
dbBase, err := driverbase.NewDatabaseImplBase(ctx, &d.DriverImplBase)
if err != nil {
return nil, err
}
db := &databaseImpl{
DatabaseImplBase: driverbase.NewDatabaseImplBase(&d.DriverImplBase),
DatabaseImplBase: dbBase,
authType: OptionValueAuthTypeDefault,
}
if err := db.SetOptions(opts); err != nil {
Expand Down
19 changes: 16 additions & 3 deletions go/adbc/driver/flightsql/flightsql_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
package flightsql

import (
"context"
"net/url"
"time"

Expand Down Expand Up @@ -101,6 +102,7 @@ type driverImpl struct {
type Driver interface {
adbc.Driver
NewDatabaseWithOptions(map[string]string, ...grpc.DialOption) (adbc.Database, error)
NewDatabaseWithOptionsContext(context.Context, map[string]string, ...grpc.DialOption) (adbc.Database, error)
}

// NewDriver creates a new Flight SQL driver using the given Arrow allocator.
Expand All @@ -115,6 +117,10 @@ func NewDriver(alloc memory.Allocator) Driver {
// This enables the use of additional grpc client options not directly exposed by the options map.
// such as grpc.WithStatsHandler() for enabling various telemetry handlers.
func (d *driverImpl) NewDatabaseWithOptions(opts map[string]string, userDialOpts ...grpc.DialOption) (adbc.Database, error) {
return d.NewDatabaseWithOptionsContext(context.Background(), opts, userDialOpts...)
}

func (d *driverImpl) NewDatabaseWithOptionsContext(ctx context.Context, opts map[string]string, userDialOpts ...grpc.DialOption) (adbc.Database, error) {
opts = maps.Clone(opts)
uri, ok := opts[adbc.OptionKeyURI]
if !ok {
Expand All @@ -125,8 +131,12 @@ func (d *driverImpl) NewDatabaseWithOptions(opts map[string]string, userDialOpts
}
delete(opts, adbc.OptionKeyURI)

dbBase, err := driverbase.NewDatabaseImplBase(ctx, &d.DriverImplBase)
if err != nil {
return nil, err
}
db := &databaseImpl{
DatabaseImplBase: driverbase.NewDatabaseImplBase(&d.DriverImplBase),
DatabaseImplBase: dbBase,
timeout: timeoutOption{
// Match gRPC default
connectTimeout: time.Second * 20,
Expand All @@ -135,7 +145,6 @@ func (d *driverImpl) NewDatabaseWithOptions(opts map[string]string, userDialOpts
userDialOpts: userDialOpts,
}

var err error
if db.uri, err = url.Parse(uri); err != nil {
return nil, adbc.Error{Msg: err.Error(), Code: adbc.StatusInvalidArgument}
}
Expand All @@ -154,5 +163,9 @@ func (d *driverImpl) NewDatabaseWithOptions(opts map[string]string, userDialOpts

// NewDatabase creates a new Flight SQL database using the given options.
func (d *driverImpl) NewDatabase(opts map[string]string) (adbc.Database, error) {
return d.NewDatabaseWithOptions(opts)
return d.NewDatabaseWithContext(context.Background(), opts)
}

func (d *driverImpl) NewDatabaseWithContext(ctx context.Context, opts map[string]string) (adbc.Database, error) {
return d.NewDatabaseWithOptionsContext(ctx, opts)
}
76 changes: 71 additions & 5 deletions go/adbc/driver/internal/driverbase/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,31 @@ import (
"errors"
"fmt"
"log/slog"
"strings"

"github.com/apache/arrow-adbc/go/adbc"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
)

const (
ConnectionMessageOptionUnknown = "Unknown connection option"
ConnectionMessageOptionUnsupported = "Unsupported connection option"
ConnectionMessageCannotCommit = "Cannot commit when autocommit is enabled"
ConnectionMessageCannotRollback = "Cannot rollback when autocommit is enabled"
ConnectionMessageOptionUnknown = "Unknown connection option"
ConnectionMessageOptionUnsupported = "Unsupported connection option"
ConnectionMessageCannotCommit = "Cannot commit when autocommit is enabled"
ConnectionMessageCannotRollback = "Cannot rollback when autocommit is enabled"
ConnectionMessageTraceParentIncorrectFormat = "Incorrect or unsupported trace parent format"
)

// ConnectionImpl is an interface that drivers implement to provide
// vendor-specific functionality.
type ConnectionImpl interface {
adbc.Connection
adbc.GetSetOptions
adbc.OTelTracing
Base() *ConnectionImplBase
}

Expand Down Expand Up @@ -107,9 +112,12 @@ type ConnectionImplBase struct {
ErrorHelper ErrorHelper
DriverInfo *DriverInfo
Logger *slog.Logger
Tracer trace.Tracer

Autocommit bool
Closed bool

traceParent string
}

// NewConnectionImplBase instantiates ConnectionImplBase.
Expand All @@ -122,6 +130,7 @@ func NewConnectionImplBase(database *DatabaseImplBase) ConnectionImplBase {
ErrorHelper: database.ErrorHelper,
DriverInfo: database.DriverInfo,
Logger: database.Logger,
Tracer: database.Tracer,
Autocommit: true,
Closed: false,
}
Expand Down Expand Up @@ -223,6 +232,10 @@ func (base *ConnectionImplBase) ReadPartition(ctx context.Context, serializedPar
}

func (base *ConnectionImplBase) GetOption(key string) (string, error) {
switch strings.ToLower(key) {
case adbc.OptionKeyTelemetryTraceParent:
return base.GetTraceParent(), nil
}
return "", base.ErrorHelper.Errorf(adbc.StatusNotFound, "%s '%s'", ConnectionMessageOptionUnknown, key)
}

Expand All @@ -239,9 +252,12 @@ func (base *ConnectionImplBase) GetOptionInt(key string) (int64, error) {
}

func (base *ConnectionImplBase) SetOption(key string, val string) error {
switch key {
switch strings.ToLower(key) {
case adbc.OptionKeyAutoCommit:
return base.ErrorHelper.Errorf(adbc.StatusNotImplemented, "%s '%s'", ConnectionMessageOptionUnsupported, key)
case adbc.OptionKeyTelemetryTraceParent:
base.SetTraceParent(strings.TrimSpace(val))
return nil
}
return base.ErrorHelper.Errorf(adbc.StatusNotImplemented, "%s '%s'", ConnectionMessageOptionUnknown, key)
}
Expand Down Expand Up @@ -332,6 +348,23 @@ func (b *ConnectionBuilder) Connection() Connection {
return conn
}

func (cnxn *ConnectionImplBase) GetTraceParent() string {
return cnxn.traceParent
}

func (cnxn *ConnectionImplBase) SetTraceParent(traceParent string) {
cnxn.traceParent = traceParent
}

func (cnxn *ConnectionImplBase) StartSpan(
ctx context.Context,
spanName string,
opts ...trace.SpanStartOption,
) (context.Context, trace.Span) {
ctx, _ = maybeAddTraceParent(ctx, cnxn, nil)
return cnxn.Tracer.Start(ctx, spanName, opts...)
}

// GetObjects implements Connection.
func (cnxn *connection) GetObjects(ctx context.Context, depth adbc.ObjectDepth, catalog *string, dbSchema *string, tableName *string, columnName *string, tableType []string) (array.RecordReader, error) {
helper := cnxn.dbObjectsEnumerator
Expand Down Expand Up @@ -714,4 +747,37 @@ func ValueOrZero[T any](val *T) T {
return *val
}

func maybeAddTraceParent(ctx context.Context, cnxn adbc.OTelTracing, st adbc.OTelTracing) (context.Context, error) {
var traceParentStr string
if st != nil && st.GetTraceParent() != "" {
traceParentStr = st.GetTraceParent()
} else if cnxn != nil && cnxn.GetTraceParent() != "" {
traceParentStr = cnxn.GetTraceParent()
}
if traceParentStr != "" {
spanContext, err := propagateTraceParent(ctx, traceParentStr)
if err != nil {
return ctx, err
}
ctx = trace.ContextWithRemoteSpanContext(ctx, spanContext)
}
return ctx, nil
}

func propagateTraceParent(ctx context.Context, traceParentStr string) (trace.SpanContext, error) {
if strings.TrimSpace(traceParentStr) == "" {
return trace.SpanContext{}, errors.New("traceparent string is empty")
}

propagator := propagation.TraceContext{}
carrier := propagation.MapCarrier{"traceparent": traceParentStr}
extractedContext := propagator.Extract(ctx, carrier)

spanContext := trace.SpanContextFromContext(extractedContext)
if !spanContext.IsValid() {
return trace.SpanContext{}, errors.New("invalid traceparent string")
}
return spanContext, nil
}

var _ ConnectionImpl = (*ConnectionImplBase)(nil)
Loading
Loading