Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions pkg/gofr/container/datasources.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ type CassandraBatchWithContext interface {
ExecuteBatchCASWithCtx(ctx context.Context, name string, dest ...any) (bool, error)
}

// CassandraProvider extends CassandraWithContext with logger/metrics/tracer setup hooks.
//
// Deprecated: AddCassandra now accepts [CassandraWithContext] directly via duck-typed
// instrumentation. The Provider shim is retained only for backwards compatibility — implement [CassandraWithContext] instead.
type CassandraProvider interface {
CassandraWithContext

Expand All @@ -201,6 +205,10 @@ type Clickhouse interface {
HealthChecker
}

// ClickhouseProvider extends Clickhouse with logger/metrics/tracer setup hooks.
//
// Deprecated: AddClickhouse now accepts [Clickhouse] directly via duck-typed
// instrumentation. The Provider shim is retained only for backwards compatibility — implement [Clickhouse] instead.
type ClickhouseProvider interface {
Clickhouse

Expand All @@ -222,6 +230,10 @@ type OracleTx interface {
Rollback() error
}

// OracleProvider extends OracleDB with logger/metrics/tracer setup hooks.
//
// Deprecated: AddOracle now accepts [OracleDB] directly via duck-typed
// instrumentation. The Provider shim is retained only for backwards compatibility — implement [OracleDB] instead.
type OracleProvider interface {
OracleDB

Expand Down Expand Up @@ -292,6 +304,9 @@ type Transaction interface {

// MongoProvider is an interface that extends Mongo with additional methods for logging, metrics, and connection management.
// Which is used for initializing datasource.
//
// Deprecated: AddMongo now accepts [Mongo] directly via duck-typed
// instrumentation. The Provider shim is retained only for backwards compatibility — implement [Mongo] instead.
type MongoProvider interface {
Mongo

Expand Down Expand Up @@ -337,6 +352,9 @@ type SurrealDB interface {

// SurrealBDProvider is an interface that extends SurrealDB with additional methods for logging, metrics, or connection management.
// It is typically used for initializing and managing SurrealDB-based data sources.
//
// Deprecated: AddSurrealDB now accepts [SurrealDB] directly via duck-typed
// instrumentation. The Provider shim is retained only for backwards compatibility — implement [SurrealDB] instead.
type SurrealBDProvider interface {
SurrealDB

Expand Down Expand Up @@ -371,12 +389,20 @@ type KVStore interface {
HealthChecker
}

// KVStoreProvider extends KVStore with logger/metrics/tracer setup hooks.
//
// Deprecated: AddKVStore now accepts [KVStore] directly via duck-typed
// instrumentation. The Provider shim is retained only for backwards compatibility — implement [KVStore] instead.
type KVStoreProvider interface {
KVStore

provider
}

// PubSubProvider extends pubsub.Client with logger/metrics/tracer setup hooks.
//
// Deprecated: AddPubSub now accepts [pubsub.Client] directly via duck-typed
// instrumentation. The Provider shim is retained only for backwards compatibility — implement [pubsub.Client] instead.
type PubSubProvider interface {
pubsub.Client

Expand All @@ -398,6 +424,10 @@ type Solr interface {
HealthChecker
}

// SolrProvider extends Solr with logger/metrics/tracer setup hooks.
//
// Deprecated: AddSolr now accepts [Solr] directly via duck-typed
// instrumentation. The Provider shim is retained only for backwards compatibility — implement [Solr] instead.
type SolrProvider interface {
Solr

Expand Down Expand Up @@ -485,11 +515,18 @@ type Dgraph interface {
}

// DgraphProvider extends Dgraph with connection management capabilities.
//
// Deprecated: AddDgraph now accepts [Dgraph] directly via duck-typed
// instrumentation. The Provider shim is retained only for backwards compatibility — implement [Dgraph] instead.
type DgraphProvider interface {
Dgraph
provider
}

// OpenTSDBProvider extends OpenTSDB with logger/metrics/tracer setup hooks.
//
// Deprecated: AddOpenTSDB now accepts [OpenTSDB] directly via duck-typed
// instrumentation. The Provider shim is retained only for backwards compatibility — implement [OpenTSDB] instead.
type OpenTSDBProvider interface {
OpenTSDB
provider
Expand Down Expand Up @@ -629,6 +666,10 @@ type ScyllaDB interface {
HealthChecker
}

// ScyllaDBProvider extends ScyllaDB with logger/metrics/tracer setup hooks.
//
// Deprecated: AddScyllaDB now accepts [ScyllaDB] directly via duck-typed
// instrumentation. The Provider shim is retained only for backwards compatibility — implement [ScyllaDB] instead.
type ScyllaDBProvider interface {
ScyllaDB
provider
Expand Down Expand Up @@ -698,6 +739,9 @@ type ArangoDB interface {
}

// ArangoDBProvider is an interface that extends ArangoDB with additional methods for logging, metrics, and connection management.
//
// Deprecated: AddArangoDB now accepts [ArangoDB] directly via duck-typed
// instrumentation. The Provider shim is retained only for backwards compatibility — implement [ArangoDB] instead.
type ArangoDBProvider interface {
ArangoDB

Expand Down Expand Up @@ -738,6 +782,9 @@ type Elasticsearch interface {
}

// ElasticsearchProvider an interface that extends Elasticsearch with additional methods for logging, metrics, and connection management.
//
// Deprecated: AddElasticsearch now accepts [Elasticsearch] directly via duck-typed
// instrumentation. The Provider shim is retained only for backwards compatibility — implement [Elasticsearch] instead.
type ElasticsearchProvider interface {
Elasticsearch

Expand Down Expand Up @@ -780,6 +827,9 @@ type Couchbase interface {
// CouchbaseProvider is an interface that extends Couchbase with additional methods
// for logging, metrics, tracing, and connection management, aligning with other
// data source providers in your package.
//
// Deprecated: AddCouchbase now accepts [Couchbase] directly via duck-typed
// instrumentation. The Provider shim is retained only for backwards compatibility — implement [Couchbase] instead.
type CouchbaseProvider interface {
Couchbase

Expand Down Expand Up @@ -832,6 +882,9 @@ type InfluxDB interface {
}

// InfluxDBProvider an interface that extends InfluxDB with additional methods for logging, metrics, and connection management.
//
// Deprecated: AddInfluxDB now accepts [InfluxDB] directly via duck-typed
// instrumentation. The Provider shim is retained only for backwards compatibility — implement [InfluxDB] instead.
type InfluxDBProvider interface {
InfluxDB

Expand Down
40 changes: 21 additions & 19 deletions pkg/gofr/datasource/arangodb/arango.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,13 +233,10 @@ func (c *Client) validateConfig() error {
// fmt.Printf("User: %+v\n", doc)
// }
func (c *Client) Query(ctx context.Context, dbName, query string, bindVars map[string]any, result any, options ...map[string]any) error {
tracerCtx, span := c.addTrace(ctx, "query", map[string]string{"DB": dbName})
startTime := time.Now()

defer c.sendOperationStats(&QueryLog{Operation: "query",
Database: dbName, Query: query}, startTime, "query", span)
ctx, done := c.instrumentOp(ctx, &QueryLog{Operation: "query", Database: dbName, Query: query})
defer done()

db, err := c.client.GetDatabase(tracerCtx, dbName, nil)
db, err := c.client.GetDatabase(ctx, dbName, nil)
if err != nil {
return err
}
Expand All @@ -253,7 +250,7 @@ func (c *Client) Query(ctx context.Context, dbName, query string, bindVars map[s

queryOptions.BindVars = bindVars

cursor, err := db.Query(tracerCtx, query, &queryOptions)
cursor, err := db.Query(ctx, query, &queryOptions)
if err != nil {
return err
}
Expand All @@ -268,7 +265,7 @@ func (c *Client) Query(ctx context.Context, dbName, query string, bindVars map[s
for {
var doc map[string]any

_, err = cursor.ReadDocument(tracerCtx, &doc)
_, err = cursor.ReadDocument(ctx, &doc)
if arangoShared.IsNoMoreDocuments(err) {
break
}
Expand Down Expand Up @@ -326,21 +323,26 @@ func (c *Client) addTrace(ctx context.Context, operation string, attributes map[
return ctx, nil
}

func (c *Client) sendOperationStats(ql *QueryLog, startTime time.Time, method string, span trace.Span) {
duration := time.Since(startTime).Microseconds()
ql.Duration = duration
// instrumentOp starts a trace span, captures the start time, and returns the traced context
// along with a cleanup function. The cleanup function logs the operation, records metrics,
// and ends the span. It should be deferred immediately after calling instrumentOp.
func (c *Client) instrumentOp(ctx context.Context, ql *QueryLog) (tracedCtx context.Context, done func()) {
tracerCtx, span := c.addTrace(ctx, ql.Operation, ql.traceAttrs())
startTime := time.Now()

c.logger.Debug(ql)
return tracerCtx, func() {
duration := time.Since(startTime).Microseconds()
ql.Duration = duration

c.metrics.RecordHistogram(context.Background(), "app_arango_stats", float64(duration),
"endpoint", c.endpoint,
"type", ql.Query,
)
c.logger.Debug(ql)

if span != nil {
defer span.End()
c.metrics.RecordHistogram(context.Background(), "app_arango_stats", float64(duration),
"endpoint", c.endpoint, "type", ql.Operation)

span.SetAttributes(attribute.Int64(fmt.Sprintf("arangodb.%v.duration", method), duration))
if span != nil {
span.SetAttributes(attribute.Int64(fmt.Sprintf("arangodb.%v.duration", ql.Operation), duration))
span.End()
}
}
}

Expand Down
65 changes: 24 additions & 41 deletions pkg/gofr/datasource/arangodb/arango_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package arangodb

import (
"context"
"time"

"github.com/arangodb/go-driver/v2/arangodb"
)
Expand All @@ -15,13 +14,11 @@ type DB struct {
// It first checks if the database already exists before attempting to create it.
// Returns ErrDatabaseExists if the database already exists.
func (d *DB) CreateDB(ctx context.Context, database string) error {
tracerCtx, span := d.client.addTrace(ctx, "createDB", map[string]string{"DB": database})
startTime := time.Now()

defer d.client.sendOperationStats(&QueryLog{Operation: "createDB", Database: database}, startTime, "createDB", span)
ctx, done := d.client.instrumentOp(ctx, &QueryLog{Operation: "createDB", Database: database})
defer done()

// Check if the database already exists
exists, err := d.client.client.DatabaseExists(tracerCtx, database)
exists, err := d.client.client.DatabaseExists(ctx, database)
if err != nil {
return err
}
Expand All @@ -31,24 +28,22 @@ func (d *DB) CreateDB(ctx context.Context, database string) error {
return ErrDatabaseExists
}

_, err = d.client.client.CreateDatabase(tracerCtx, database, nil)
_, err = d.client.client.CreateDatabase(ctx, database, nil)

return err
}

// DropDB deletes a database from ArangoDB.
func (d *DB) DropDB(ctx context.Context, database string) error {
tracerCtx, span := d.client.addTrace(ctx, "dropDB", map[string]string{"DB": database})
startTime := time.Now()

defer d.client.sendOperationStats(&QueryLog{Operation: "dropDB", Database: database}, startTime, "dropDB", span)
ctx, done := d.client.instrumentOp(ctx, &QueryLog{Operation: "dropDB", Database: database})
defer done()

db, err := d.client.client.GetDatabase(tracerCtx, database, &arangodb.GetDatabaseOptions{})
db, err := d.client.client.GetDatabase(ctx, database, &arangodb.GetDatabaseOptions{})
if err != nil {
return err
}

err = db.Remove(tracerCtx)
err = db.Remove(ctx)
if err != nil {
return err
}
Expand All @@ -60,19 +55,17 @@ func (d *DB) DropDB(ctx context.Context, database string) error {
// It first checks if the collection already exists before attempting to create it.
// Returns ErrCollectionExists if the collection already exists.
func (d *DB) CreateCollection(ctx context.Context, database, collection string, isEdge bool) error {
tracerCtx, span := d.client.addTrace(ctx, "createCollection", map[string]string{"collection": collection})
startTime := time.Now()
ctx, done := d.client.instrumentOp(ctx, &QueryLog{Operation: "createCollection", Database: database,
Collection: collection, Filter: isEdge})
defer done()

defer d.client.sendOperationStats(&QueryLog{Operation: "createCollection", Database: database,
Collection: collection, Filter: isEdge}, startTime, "createCollection", span)

db, err := d.client.client.GetDatabase(tracerCtx, database, nil)
db, err := d.client.client.GetDatabase(ctx, database, nil)
if err != nil {
return err
}

// Check if the collection already exists
exists, err := db.CollectionExists(tracerCtx, collection)
exists, err := db.CollectionExists(ctx, collection)
if err != nil {
return err
}
Expand All @@ -89,16 +82,23 @@ func (d *DB) CreateCollection(ctx context.Context, database, collection string,

options := arangodb.CreateCollectionPropertiesV2{Type: &collectionType}

_, err = db.CreateCollectionV2(tracerCtx, collection, &options)
_, err = db.CreateCollectionV2(ctx, collection, &options)

return err
}

// DropCollection deletes an existing collection from a database.
func (d *DB) DropCollection(ctx context.Context, database, collectionName string) error {
return d.handleCollectionOperation(ctx, "dropCollection", database, collectionName, func(collection arangodb.Collection) error {
return collection.Remove(ctx)
})
ctx, done := d.client.instrumentOp(ctx, &QueryLog{Operation: "dropCollection", Database: database,
Collection: collectionName})
defer done()

collection, err := d.getCollection(ctx, database, collectionName)
if err != nil {
return err
}

return collection.Remove(ctx)
}

func (d *DB) getCollection(ctx context.Context, dbName, collectionName string) (arangodb.Collection, error) {
Expand All @@ -114,20 +114,3 @@ func (d *DB) getCollection(ctx context.Context, dbName, collectionName string) (

return collection, nil
}

// handleCollectionOperation handles common logic for collection operations.
func (d *DB) handleCollectionOperation(ctx context.Context, operation, database, collectionName string,
action func(arangodb.Collection) error) error {
tracerCtx, span := d.client.addTrace(ctx, operation, map[string]string{"collection": collectionName})
startTime := time.Now()

defer d.client.sendOperationStats(&QueryLog{Operation: operation, Database: database,
Collection: collectionName}, startTime, operation, span)

collection, err := d.getCollection(tracerCtx, database, collectionName)
if err != nil {
return err
}

return action(collection)
}
Loading
Loading