Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 11 additions & 21 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,24 +1,22 @@
version: "2"

linters:
disable-all: true
enable:
- bodyclose
- deadcode
- depguard
# - depguard # config was flagging all legitimate imports; needs proper file-scoped rules to be useful
- dogsled
# - dupl
- errcheck
# - exportloopref
# - funlen
# - gochecknoinits
# - goconst
# - gocritic
# - gocyclo
- gofmt
# - goimports
# - gomnd
# - mnd
- goprintffuncname
- gosec
- gosimple
- govet
- ineffassign
# - lll
Expand All @@ -27,38 +25,30 @@ linters:
# - noctx
- nolintlint
- staticcheck
- structcheck
# - stylecheck
- typecheck
# - unconvert
# - unparam
- unused
- varcheck
# - whitespace

# don't enable:
# - asciicheck
# - scopelint
# - gochecknoglobals
# - gocognit
# - godot
# - godox
# - goerr113
# - interfacer
# - maligned
# - err113
# - ireturn
# - nestif
# - prealloc
# - testpackage
# - revive
# - wsl

formatters:
enable:
- gofmt
# - goimports

linters-settings:
depguard:
rules:
main:
allow:
- $gostd
- github.com/databricks/databricks-sql-go
gosec:
exclude-generated: true
severity: "low"
Expand Down
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,34 @@ To disable Cloud Fetch (e.g., when handling smaller datasets or to avoid additio
token:[your token]@[Workspace hostname]:[Port number][Endpoint HTTP Path]?useCloudFetch=false
```

### Telemetry Configuration (Optional)

The driver includes optional telemetry to help improve performance and reliability. Telemetry is **disabled by default** and requires explicit opt-in.

**Opt-in to telemetry** (respects server-side feature flags):
```
token:[your token]@[Workspace hostname]:[Port number][Endpoint HTTP Path]?enableTelemetry=true
```

**Opt-out of telemetry** (explicitly disable):
```
token:[your token]@[Workspace hostname]:[Port number][Endpoint HTTP Path]?enableTelemetry=false
```

**What data is collected:**
- ✅ Query latency and performance metrics
- ✅ Error codes (not error messages)
- ✅ Feature usage (CloudFetch, LZ4, etc.)
- ✅ Driver version and environment info

**What is NOT collected:**
- ❌ SQL query text
- ❌ Query results or data values
- ❌ Table/column names
- ❌ User identities or credentials

Telemetry has < 1% performance overhead and uses circuit breaker protection to ensure it never impacts your queries. For more details, see `telemetry/DESIGN.md` and `telemetry/TROUBLESHOOTING.md`.

### Connecting with a new Connector

You can also connect with a new connector object. For example:
Expand Down
15 changes: 8 additions & 7 deletions auth/oauth/u2m/authenticator.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,17 @@ func NewAuthenticator(hostName string, timeout time.Duration) (auth.Authenticato
cloud := oauth.InferCloudFromHost(hostName)

var clientID, redirectURL string
if cloud == oauth.AWS {
switch cloud {
case oauth.AWS:
clientID = awsClientId
redirectURL = awsRedirectURL
} else if cloud == oauth.Azure {
case oauth.Azure:
clientID = azureClientId
redirectURL = azureRedirectURL
} else if cloud == oauth.GCP {
case oauth.GCP:
clientID = gcpClientId
redirectURL = gcpRedirectURL
} else {
default:
return nil, errors.New("unhandled cloud type: " + cloud.String())
}

Expand Down Expand Up @@ -147,14 +148,14 @@ func (tsp *tokenSourceProvider) GetTokenSource() (oauth2.TokenSource, error) {
if err != nil {
return nil, err
}
defer listener.Close()
defer listener.Close() //nolint:errcheck

srv := &http.Server{
ReadHeaderTimeout: 3 * time.Second,
WriteTimeout: 30 * time.Second,
}

defer srv.Close()
defer srv.Close() //nolint:errcheck

// Start local server to wait for callback
go func() {
Expand Down Expand Up @@ -209,7 +210,7 @@ func (tsp *tokenSourceProvider) ServeHTTP(w http.ResponseWriter, r *http.Request
if resp.err != "" {
log.Error().Msg(resp.err)
w.WriteHeader(http.StatusBadRequest)
_, err := w.Write([]byte(errorHTML("Identity Provider returned an error: " + resp.err)))
_, err := w.Write([]byte(errorHTML("Identity Provider returned an error: " + resp.err))) //nolint:gosec // XSS not a concern for local OAuth callback
if err != nil {
log.Error().Err(err).Msg("unable to write error response")
}
Expand Down
6 changes: 3 additions & 3 deletions auth/tokenprovider/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (p *FederationProvider) tryTokenExchange(ctx context.Context, subjectToken
}

// Create request
req, err := http.NewRequestWithContext(ctx, "POST", exchangeURL, strings.NewReader(data.Encode()))
req, err := http.NewRequestWithContext(ctx, "POST", exchangeURL, strings.NewReader(data.Encode())) //nolint:gosec // URL is from trusted config
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
Expand All @@ -147,11 +147,11 @@ func (p *FederationProvider) tryTokenExchange(ctx context.Context, subjectToken
req.Header.Set("Accept", "*/*")

// Make request
resp, err := p.httpClient.Do(req)
resp, err := p.httpClient.Do(req) //nolint:gosec // G704: URL is from trusted configuration
if err != nil {
return nil, fmt.Errorf("request failed: %w", err)
}
defer resp.Body.Close()
defer resp.Body.Close() //nolint:errcheck

body, err := io.ReadAll(resp.Body)
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions auth/tokenprovider/federation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ func TestFederationProvider_TokenExchangeSuccess(t *testing.T) {
assert.Equal(t, "application/x-www-form-urlencoded", r.Header.Get("Content-Type"))
assert.Equal(t, "*/*", r.Header.Get("Accept"))

// Parse form data
// Parse form data - limit body size to prevent G120
r.Body = http.MaxBytesReader(w, r.Body, 1<<20)
err := r.ParseForm()
require.NoError(t, err)

Expand Down Expand Up @@ -155,13 +156,14 @@ func TestFederationProvider_TokenExchangeWithClientID(t *testing.T) {

// Create mock server that checks for client_id
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r.Body = http.MaxBytesReader(w, r.Body, 1<<20)
err := r.ParseForm()
require.NoError(t, err)

// Verify client_id is present
assert.Equal(t, clientID, r.FormValue("client_id"))

response := map[string]interface{}{
response := map[string]interface{}{ //nolint:gosec // G101: test token, not a real credential
"access_token": "sp-wide-federation-token",
"token_type": "Bearer",
"expires_in": 3600,
Expand Down
4 changes: 2 additions & 2 deletions auth/tokenprovider/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func TestExternalTokenProvider(t *testing.T) {
callCount := 0
tokenFunc := func() (string, error) {
callCount++
return "external-token-" + string(rune(callCount)), nil
return "external-token-" + string(rune(callCount)), nil //nolint:gosec // G115: test counter, values are always small
}

provider := NewExternalTokenProvider(tokenFunc)
Expand Down Expand Up @@ -211,7 +211,7 @@ func TestExternalTokenProvider(t *testing.T) {
counter := 0
tokenFunc := func() (string, error) {
counter++
return "token-" + string(rune(counter)), nil
return "token-" + string(rune(counter)), nil //nolint:gosec // G115: test counter, values are always small
}

provider := NewExternalTokenProvider(tokenFunc)
Expand Down
59 changes: 44 additions & 15 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,19 @@ func (c *conn) Close() error {
log := logger.WithContext(c.id, "", "")
ctx := driverctx.NewContextWithConnId(context.Background(), c.id)

// Close telemetry and release resources
// Time CloseSession so we can record DELETE_SESSION before flushing telemetry
closeStart := time.Now()
_, err := c.client.CloseSession(ctx, &cli_service.TCloseSessionReq{
SessionHandle: c.session.SessionHandle,
})

// Record DELETE_SESSION regardless of error (matches JDBC), then flush and release
if c.telemetry != nil {
c.telemetry.RecordOperation(ctx, c.id, telemetry.OperationTypeDeleteSession, time.Since(closeStart).Milliseconds(), err)
_ = c.telemetry.Close(ctx)
telemetry.ReleaseForConnection(c.cfg.Host)
}

_, err := c.client.CloseSession(ctx, &cli_service.TCloseSessionReq{
SessionHandle: c.session.SessionHandle,
})

if err != nil {
log.Err(err).Msg("databricks: failed to close connection")
return dbsqlerrint.NewBadConnectionError(err)
Expand Down Expand Up @@ -93,7 +96,7 @@ func (c *conn) Ping(ctx context.Context) error {
log.Err(err).Msg("databricks: failed to ping")
return dbsqlerrint.NewBadConnectionError(err)
}
defer rows.Close()
defer rows.Close() //nolint:errcheck

log.Debug().Msg("databricks: ping successful")
return nil
Expand Down Expand Up @@ -155,9 +158,13 @@ func (c *conn) ExecContext(ctx context.Context, query string, args []driver.Name
alreadyClosed := exStmtResp.DirectResults != nil && exStmtResp.DirectResults.CloseOperation != nil
newCtx := driverctx.NewContextWithCorrelationId(driverctx.NewContextWithConnId(context.Background(), c.id), corrId)
if !alreadyClosed && (opStatusResp == nil || opStatusResp.GetOperationState() != cli_service.TOperationState_CLOSED_STATE) {
closeOpStart := time.Now()
_, err1 := c.client.CloseOperation(newCtx, &cli_service.TCloseOperationReq{
OperationHandle: exStmtResp.OperationHandle,
})
if c.telemetry != nil {
c.telemetry.RecordOperation(ctx, c.id, telemetry.OperationTypeCloseStatement, time.Since(closeOpStart).Milliseconds(), err1)
}
if err1 != nil {
log.Err(err1).Msg("databricks: failed to close operation after executing statement")
closeOpErr = err1 // Capture for telemetry
Expand Down Expand Up @@ -216,7 +223,15 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam
return nil, dbsqlerrint.NewExecutionError(ctx, dbsqlerr.ErrQueryExecution, err, opStatusResp)
}

rows, err := rows.NewRows(ctx, exStmtResp.OperationHandle, c.client, c.cfg, exStmtResp.DirectResults)
// Telemetry callback for tracking row fetching metrics
telemetryUpdate := func(chunkCount int, bytesDownloaded int64) {
if c.telemetry != nil {
c.telemetry.AddTag(ctx, "chunk_count", chunkCount)
c.telemetry.AddTag(ctx, "bytes_downloaded", bytesDownloaded)
}
}

rows, err := rows.NewRows(ctx, exStmtResp.OperationHandle, c.client, c.cfg, exStmtResp.DirectResults, telemetryUpdate)
return rows, err

}
Expand Down Expand Up @@ -381,7 +396,14 @@ func (c *conn) executeStatement(ctx context.Context, query string, args []driver
}
}

executeStart := time.Now()
resp, err := c.client.ExecuteStatement(ctx, &req)
// Record the Thrift call latency as a separate operation metric.
// This is distinct from the statement-level metric (BeforeExecuteWithTime), which
// measures end-to-end latency including polling and row fetching.
if c.telemetry != nil {
c.telemetry.RecordOperation(ctx, c.id, telemetry.OperationTypeExecuteStatement, time.Since(executeStart).Milliseconds(), err)
}
var log *logger.DBSQLLogger
log, ctx = client.LoggerAndContext(ctx, resp)

Expand Down Expand Up @@ -514,11 +536,11 @@ func (c *conn) handleStagingPut(ctx context.Context, presignedUrl string, header
}
client := &http.Client{}

dat, err := os.Open(localFile)
dat, err := os.Open(localFile) //nolint:gosec // localFile is provided by the application, not user input
if err != nil {
return dbsqlerrint.NewDriverError(ctx, "error reading local file", err)
}
defer dat.Close()
defer dat.Close() //nolint:errcheck

info, err := dat.Stat()
if err != nil {
Expand All @@ -535,7 +557,7 @@ func (c *conn) handleStagingPut(ctx context.Context, presignedUrl string, header
if err != nil {
return dbsqlerrint.NewDriverError(ctx, "error sending http request", err)
}
defer res.Body.Close()
defer res.Body.Close() //nolint:errcheck
content, err := io.ReadAll(res.Body)

if err != nil || !Succeeded(res) {
Expand All @@ -559,7 +581,7 @@ func (c *conn) handleStagingGet(ctx context.Context, presignedUrl string, header
if err != nil {
return dbsqlerrint.NewDriverError(ctx, "error sending http request", err)
}
defer res.Body.Close()
defer res.Body.Close() //nolint:errcheck
content, err := io.ReadAll(res.Body)

if err != nil || !Succeeded(res) {
Expand All @@ -583,7 +605,7 @@ func (c *conn) handleStagingRemove(ctx context.Context, presignedUrl string, hea
if err != nil {
return dbsqlerrint.NewDriverError(ctx, "error sending http request", err)
}
defer res.Body.Close()
defer res.Body.Close() //nolint:errcheck
content, err := io.ReadAll(res.Body)

if err != nil || !Succeeded(res) {
Expand Down Expand Up @@ -646,11 +668,18 @@ func (c *conn) execStagingOperation(
}

if len(driverctx.StagingPathsFromContext(ctx)) != 0 {
row, err = rows.NewRows(ctx, exStmtResp.OperationHandle, c.client, c.cfg, exStmtResp.DirectResults)
// Telemetry callback for staging operation row fetching
telemetryUpdate := func(chunkCount int, bytesDownloaded int64) {
if c.telemetry != nil {
c.telemetry.AddTag(ctx, "chunk_count", chunkCount)
c.telemetry.AddTag(ctx, "bytes_downloaded", bytesDownloaded)
}
}
row, err = rows.NewRows(ctx, exStmtResp.OperationHandle, c.client, c.cfg, exStmtResp.DirectResults, telemetryUpdate)
if err != nil {
return dbsqlerrint.NewDriverError(ctx, "error reading row.", err)
}
defer row.Close()
defer row.Close() //nolint:errcheck

} else {
return dbsqlerrint.NewDriverError(ctx, "staging ctx must be provided.", nil)
Expand All @@ -663,7 +692,7 @@ func (c *conn) execStagingOperation(
if err != nil {
return dbsqlerrint.NewDriverError(ctx, "error fetching staging operation results", err)
}
var stringValues []string = make([]string, 4)
stringValues := make([]string, 4)
for i, val := range sqlRow { // this will either be 3 (remove op) or 4 (put/get) elements
if s, ok := val.(string); ok {
stringValues[i] = s
Expand Down
4 changes: 2 additions & 2 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ func TestConn_executeStatement(t *testing.T) {
for _, opTest := range operationStateTests {
closeOperationCount = 0
executeStatementCount = 0
executeStatementResp.DirectResults.OperationStatus.OperationState = &opTest.state
executeStatementResp.DirectResults.OperationStatus.DisplayMessage = &opTest.err
executeStatementResp.DirectResults.OperationStatus.OperationState = &opTest.state //nolint:gosec // G601: pointer is used only within this loop iteration
executeStatementResp.DirectResults.OperationStatus.DisplayMessage = &opTest.err //nolint:gosec // G601: pointer is used only within this loop iteration
_, err := testConn.ExecContext(context.Background(), "select 1", []driver.NamedValue{})
if opTest.err == "" {
assert.NoError(t, err)
Expand Down
Loading
Loading