Skip to content

Commit 8e3ff83

Browse files
committed
bq: add metrics, error classification, and SA impersonation
Add basic operational metrics (rows sent/failed, batches, latency, retries). Classify gRPC errors as transient or permanent for smarter retry behavior. Support service account impersonation via target_principal and delegates config fields. Make stream idle timeout and sweep interval YAML-configurable. Regenerate docs.
1 parent 2c077b5 commit 8e3ff83

3 files changed

Lines changed: 94 additions & 33 deletions

File tree

docs/modules/components/pages/outputs/gcp_bigquery_write_api.adoc

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ output:
6767
table: "" # No default (required)
6868
message_format: json
6969
credentials_json: ""
70+
target_principal: ""
71+
delegates: []
72+
stream_idle_timeout: 5m
73+
stream_sweep_interval: 1m
7074
endpoint:
7175
http: ""
7276
grpc: ""
@@ -153,6 +157,42 @@ This field contains sensitive information that usually shouldn't be added to a c
153157
154158
*Default*: `""`
155159
160+
=== `target_principal`
161+
162+
Service account email to impersonate. When set, the output obtains tokens acting as this service account. Requires the caller to have roles/iam.serviceAccountTokenCreator on the target.
163+
164+
165+
*Type*: `string`
166+
167+
*Default*: `""`
168+
169+
=== `delegates`
170+
171+
Optional delegation chain for chained service account impersonation. Each service account must be granted roles/iam.serviceAccountTokenCreator on the next in the chain.
172+
173+
174+
*Type*: `array`
175+
176+
*Default*: `[]`
177+
178+
=== `stream_idle_timeout`
179+
180+
How long a cached stream can remain unused before being closed. Relevant when the table field uses interpolation to route to many tables.
181+
182+
183+
*Type*: `string`
184+
185+
*Default*: `"5m"`
186+
187+
=== `stream_sweep_interval`
188+
189+
How often to check for idle streams to close.
190+
191+
192+
*Type*: `string`
193+
194+
*Default*: `"1m"`
195+
156196
=== `endpoint`
157197
158198
Optional endpoint overrides for the BigQuery and Storage Write API clients.

internal/impl/gcp/enterprise/output_bigquery_write_api.go

Lines changed: 49 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -38,17 +38,19 @@ import (
3838
)
3939

4040
const (
41-
bqwaFieldProject = "project"
42-
bqwaFieldDataset = "dataset"
43-
bqwaFieldTable = "table"
44-
bqwaFieldMessageFormat = "message_format"
45-
bqwaFieldCredentialsJSON = "credentials_json"
46-
bqwaFieldEndpoint = "endpoint"
47-
bqwaFieldEndpointHTTP = "http"
48-
bqwaFieldEndpointGRPC = "grpc"
49-
bqwaFieldBatching = "batching"
50-
bqwaFieldTargetPrincipal = "target_principal"
51-
bqwaFieldDelegates = "delegates"
41+
bqwaFieldProject = "project"
42+
bqwaFieldDataset = "dataset"
43+
bqwaFieldTable = "table"
44+
bqwaFieldMessageFormat = "message_format"
45+
bqwaFieldCredentialsJSON = "credentials_json"
46+
bqwaFieldEndpoint = "endpoint"
47+
bqwaFieldEndpointHTTP = "http"
48+
bqwaFieldEndpointGRPC = "grpc"
49+
bqwaFieldBatching = "batching"
50+
bqwaFieldTargetPrincipal = "target_principal"
51+
bqwaFieldDelegates = "delegates"
52+
bqwaFieldStreamIdleTimeout = "stream_idle_timeout"
53+
bqwaFieldStreamSweepInterval = "stream_sweep_interval"
5254
)
5355

5456
func init() {
@@ -113,6 +115,14 @@ each batch; all messages in the same batch are written to that table.
113115
Description("Optional delegation chain for chained service account impersonation. Each service account must be granted roles/iam.serviceAccountTokenCreator on the next in the chain.").
114116
Advanced().
115117
Default([]any{}),
118+
service.NewStringField(bqwaFieldStreamIdleTimeout).
119+
Description("How long a cached stream can remain unused before being closed. Relevant when the table field uses interpolation to route to many tables.").
120+
Advanced().
121+
Default("5m"),
122+
service.NewStringField(bqwaFieldStreamSweepInterval).
123+
Description("How often to check for idle streams to close.").
124+
Advanced().
125+
Default("1m"),
116126
service.NewObjectField(bqwaFieldEndpoint,
117127
service.NewStringField(bqwaFieldEndpointHTTP).
118128
Description("Override the BigQuery HTTP endpoint. Useful for local emulators.").
@@ -129,14 +139,16 @@ each batch; all messages in the same batch are written to that table.
129139
}
130140

131141
type bigQueryWriteAPIConfig struct {
132-
ProjectID string
133-
DatasetID string
134-
MessageFormat string
135-
CredentialsJSON string
136-
TargetPrincipal string
137-
Delegates []string
138-
EndpointHTTP string
139-
EndpointGRPC string
142+
ProjectID string
143+
DatasetID string
144+
MessageFormat string
145+
CredentialsJSON string
146+
TargetPrincipal string
147+
Delegates []string
148+
StreamIdleTimeout time.Duration
149+
StreamSweepInterval time.Duration
150+
EndpointHTTP string
151+
EndpointGRPC string
140152
}
141153

142154
func bigQueryWriteAPIConfigFromParsed(pConf *service.ParsedConfig) (conf bigQueryWriteAPIConfig, err error) {
@@ -161,6 +173,21 @@ func bigQueryWriteAPIConfigFromParsed(pConf *service.ParsedConfig) (conf bigQuer
161173
if conf.Delegates, err = pConf.FieldStringList(bqwaFieldDelegates); err != nil {
162174
return
163175
}
176+
var idleTimeoutStr, sweepIntervalStr string
177+
if idleTimeoutStr, err = pConf.FieldString(bqwaFieldStreamIdleTimeout); err != nil {
178+
return
179+
}
180+
if conf.StreamIdleTimeout, err = time.ParseDuration(idleTimeoutStr); err != nil {
181+
err = fmt.Errorf("invalid %s: %w", bqwaFieldStreamIdleTimeout, err)
182+
return
183+
}
184+
if sweepIntervalStr, err = pConf.FieldString(bqwaFieldStreamSweepInterval); err != nil {
185+
return
186+
}
187+
if conf.StreamSweepInterval, err = time.ParseDuration(sweepIntervalStr); err != nil {
188+
err = fmt.Errorf("invalid %s: %w", bqwaFieldStreamSweepInterval, err)
189+
return
190+
}
164191
epConf := pConf.Namespace(bqwaFieldEndpoint)
165192
if conf.EndpointHTTP, err = epConf.FieldString(bqwaFieldEndpointHTTP); err != nil {
166193
return
@@ -171,16 +198,6 @@ func bigQueryWriteAPIConfigFromParsed(pConf *service.ParsedConfig) (conf bigQuer
171198
return
172199
}
173200

174-
const (
175-
// streamIdleTimeout is how long a cached stream can remain unused before
176-
// being eligible for eviction by the idle sweep.
177-
streamIdleTimeout = 5 * time.Minute
178-
179-
// streamSweepInterval is how often the background goroutine checks for
180-
// idle streams.
181-
streamSweepInterval = 1 * time.Minute
182-
)
183-
184201
type bqwaMetrics struct {
185202
rowsSent *service.MetricCounter
186203
rowsFailed *service.MetricCounter
@@ -556,12 +573,12 @@ func (o *bigQueryWriteAPIOutput) evictStream(cacheKey string) {
556573
}
557574

558575
// sweepIdleStreams periodically evicts streams that haven't been used within
559-
// streamIdleTimeout. This prevents unbounded growth of the stream cache when
576+
// the configured idle timeout. This prevents unbounded growth of the stream cache when
560577
// the table field uses interpolation and routes to many distinct tables.
561578
func (o *bigQueryWriteAPIOutput) sweepIdleStreams() {
562579
defer o.sweepWg.Done()
563580

564-
ticker := time.NewTicker(streamSweepInterval)
581+
ticker := time.NewTicker(o.conf.StreamSweepInterval)
565582
defer ticker.Stop()
566583

567584
for {
@@ -581,7 +598,7 @@ func (o *bigQueryWriteAPIOutput) sweepIdleStreams() {
581598
o.streamsMu.Lock()
582599
for key, swd := range o.streams {
583600
lastUsed := time.Unix(0, swd.lastUsed.Load())
584-
if now.Sub(lastUsed) > streamIdleTimeout {
601+
if now.Sub(lastUsed) > o.conf.StreamIdleTimeout {
585602
toClose = append(toClose, evicted{key, swd})
586603
delete(o.streams, key)
587604
}

internal/impl/gcp/enterprise/output_bigquery_write_api_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,10 @@ func TestDescriptorProtoToMessageDescriptorErrors(t *testing.T) {
241241

242242
func TestSweepIdleStreams(t *testing.T) {
243243
out := &bigQueryWriteAPIOutput{
244+
conf: bigQueryWriteAPIConfig{
245+
StreamIdleTimeout: 5 * time.Minute,
246+
StreamSweepInterval: 1 * time.Minute,
247+
},
244248
log: service.MockResources().Logger(),
245249
streams: make(map[string]*streamWithDescriptor),
246250
stopSweep: make(chan struct{}),
@@ -273,7 +277,7 @@ func TestSweepIdleStreams(t *testing.T) {
273277
var evictedKeys []string
274278
for key, swd := range out.streams {
275279
lastUsed := time.Unix(0, swd.lastUsed.Load())
276-
if now.Sub(lastUsed) > streamIdleTimeout {
280+
if now.Sub(lastUsed) > out.conf.StreamIdleTimeout {
277281
evictedKeys = append(evictedKeys, key)
278282
delete(out.streams, key)
279283
}

0 commit comments

Comments
 (0)