Skip to content

Commit ffafdc3

Browse files
committed
bq: address review feedback
Move license check from init() to constructor. Reorder config fields logically (core fields first, advanced last). Use sentence-per-line in config descriptions for easier diffs. Add newTestOutput helper for tests. Rewrite test comments as given/when/then. Inject test license via license.InjectTestService. Regenerate docs.
1 parent 8e3ff83 commit ffafdc3

3 files changed

Lines changed: 195 additions & 171 deletions

File tree

docs/modules/components/pages/outputs/gcp_bigquery_write_api.adoc

Lines changed: 93 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,13 @@ output:
4343
dataset: "" # No default (required)
4444
table: "" # No default (required)
4545
message_format: json
46-
credentials_json: ""
4746
max_in_flight: 64
4847
batching:
4948
count: 0
5049
byte_size: 0
5150
period: ""
5251
check: ""
52+
credentials_json: ""
5353
```
5454
5555
--
@@ -66,6 +66,13 @@ output:
6666
dataset: "" # No default (required)
6767
table: "" # No default (required)
6868
message_format: json
69+
max_in_flight: 64
70+
batching:
71+
count: 0
72+
byte_size: 0
73+
period: ""
74+
check: ""
75+
processors: [] # No default (optional)
6976
credentials_json: ""
7077
target_principal: ""
7178
delegates: []
@@ -74,31 +81,23 @@ output:
7481
endpoint:
7582
http: ""
7683
grpc: ""
77-
max_in_flight: 64
78-
batching:
79-
count: 0
80-
byte_size: 0
81-
period: ""
82-
check: ""
83-
processors: [] # No default (optional)
8484
```
8585
8686
--
8787
======
8888
89-
Writes messages to a BigQuery table using the Storage Write API, which provides
90-
higher throughput and lower latency than the legacy streaming API or load jobs.
89+
Writes messages to a BigQuery table using the Storage Write API.
90+
This provides higher throughput and lower latency than the legacy streaming API or load jobs.
9191
92-
Messages can be formatted as JSON (default) or raw protobuf bytes. When using
93-
JSON format the component automatically fetches the table schema and converts
94-
each message to the corresponding proto representation.
92+
Messages can be formatted as JSON (default) or raw protobuf bytes.
93+
When using JSON format the component automatically fetches the table schema and converts each message to the corresponding proto representation.
9594
9695
WARNING: The proto3 JSON mapping encodes int64 and uint64 values as strings.
97-
JSON messages with integer fields must use string values (e.g. `"age": "30"`
98-
not `"age": 30`), otherwise the write will fail with an unmarshalling error.
96+
JSON messages with integer fields must use string values (e.g. `"age": "30"` not `"age": 30`).
97+
Otherwise the write will fail with an unmarshalling error.
9998
100-
When batching is enabled the table name is resolved from the first message in
101-
each batch; all messages in the same batch are written to that table.
99+
When batching is enabled the table name is resolved from the first message in each batch.
100+
All messages in the same batch are written to that table.
102101
103102
104103
== Fields
@@ -131,7 +130,7 @@ This field supports xref:configuration:interpolation.adoc#bloblang-queries[inter
131130
132131
=== `message_format`
133132
134-
The format of input messages. Use 'json' to have the component convert JSON to proto automatically, or 'protobuf' to supply raw proto-encoded bytes.
133+
The format of input messages. Use 'json' to have the component convert JSON to proto automatically. Use 'protobuf' to supply raw proto-encoded bytes.
135134
136135
137136
*Type*: `string`
@@ -143,82 +142,6 @@ Options:
143142
, `protobuf`
144143
.
145144
146-
=== `credentials_json`
147-
148-
An optional JSON string containing GCP credentials. If empty, credentials are loaded from the environment.
149-
[CAUTION]
150-
====
151-
This field contains sensitive information that usually shouldn't be added to a config directly, read our xref:configuration:secrets.adoc[secrets page for more info].
152-
====
153-
154-
155-
156-
*Type*: `string`
157-
158-
*Default*: `""`
159-
160-
=== `target_principal`
161-
162-
Service account email to impersonate. When set, the output obtains tokens acting as this service account. Requires the caller to have roles/iam.serviceAccountTokenCreator on the target.
163-
164-
165-
*Type*: `string`
166-
167-
*Default*: `""`
168-
169-
=== `delegates`
170-
171-
Optional delegation chain for chained service account impersonation. Each service account must be granted roles/iam.serviceAccountTokenCreator on the next in the chain.
172-
173-
174-
*Type*: `array`
175-
176-
*Default*: `[]`
177-
178-
=== `stream_idle_timeout`
179-
180-
How long a cached stream can remain unused before being closed. Relevant when the table field uses interpolation to route to many tables.
181-
182-
183-
*Type*: `string`
184-
185-
*Default*: `"5m"`
186-
187-
=== `stream_sweep_interval`
188-
189-
How often to check for idle streams to close.
190-
191-
192-
*Type*: `string`
193-
194-
*Default*: `"1m"`
195-
196-
=== `endpoint`
197-
198-
Optional endpoint overrides for the BigQuery and Storage Write API clients.
199-
200-
201-
*Type*: `object`
202-
203-
204-
=== `endpoint.http`
205-
206-
Override the BigQuery HTTP endpoint. Useful for local emulators.
207-
208-
209-
*Type*: `string`
210-
211-
*Default*: `""`
212-
213-
=== `endpoint.grpc`
214-
215-
Override the BigQuery Storage gRPC endpoint. Useful for local emulators.
216-
217-
218-
*Type*: `string`
219-
220-
*Default*: `""`
221-
222145
=== `max_in_flight`
223146
224147
The maximum number of messages to have in flight at a given time. Increase this to improve throughput.
@@ -330,4 +253,80 @@ processors:
330253
format: json_array
331254
```
332255
256+
=== `credentials_json`
257+
258+
An optional JSON string containing GCP credentials. If empty, credentials are loaded from the environment.
259+
[CAUTION]
260+
====
261+
This field contains sensitive information that usually shouldn't be added to a config directly, read our xref:configuration:secrets.adoc[secrets page for more info].
262+
====
263+
264+
265+
266+
*Type*: `string`
267+
268+
*Default*: `""`
269+
270+
=== `target_principal`
271+
272+
Service account email to impersonate. When set, the output obtains tokens acting as this service account. Requires the caller to have roles/iam.serviceAccountTokenCreator on the target.
273+
274+
275+
*Type*: `string`
276+
277+
*Default*: `""`
278+
279+
=== `delegates`
280+
281+
Optional delegation chain for chained service account impersonation. Each service account must be granted roles/iam.serviceAccountTokenCreator on the next in the chain.
282+
283+
284+
*Type*: `array`
285+
286+
*Default*: `[]`
287+
288+
=== `stream_idle_timeout`
289+
290+
How long a cached stream can remain unused before being closed. Relevant when the table field uses interpolation to route to many tables.
291+
292+
293+
*Type*: `string`
294+
295+
*Default*: `"5m"`
296+
297+
=== `stream_sweep_interval`
298+
299+
How often to check for idle streams to close.
300+
301+
302+
*Type*: `string`
303+
304+
*Default*: `"1m"`
305+
306+
=== `endpoint`
307+
308+
Optional endpoint overrides for the BigQuery and Storage Write API clients.
309+
310+
311+
*Type*: `object`
312+
313+
314+
=== `endpoint.http`
315+
316+
Override the BigQuery HTTP endpoint. Useful for local emulators.
317+
318+
319+
*Type*: `string`
320+
321+
*Default*: `""`
322+
323+
=== `endpoint.grpc`
324+
325+
Override the BigQuery Storage gRPC endpoint. Useful for local emulators.
326+
327+
328+
*Type*: `string`
329+
330+
*Default*: `""`
331+
333332

internal/impl/gcp/enterprise/output_bigquery_write_api.go

Lines changed: 34 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,6 @@ func init() {
5858
func(conf *service.ParsedConfig, mgr *service.Resources) (
5959
out service.BatchOutput, batchPolicy service.BatchPolicy, maxInFlight int, err error,
6060
) {
61-
if err = license.CheckRunningEnterprise(mgr); err != nil {
62-
return
63-
}
6461
if maxInFlight, err = conf.FieldMaxInFlight(); err != nil {
6562
return
6663
}
@@ -78,45 +75,56 @@ func bigQueryWriteAPISpec() *service.ConfigSpec {
7875
Categories("GCP", "Services").
7976
Summary("Streams data into BigQuery using the Storage Write API.").
8077
Description(`
81-
Writes messages to a BigQuery table using the Storage Write API, which provides
82-
higher throughput and lower latency than the legacy streaming API or load jobs.
78+
Writes messages to a BigQuery table using the Storage Write API.
79+
This provides higher throughput and lower latency than the legacy streaming API or load jobs.
8380
84-
Messages can be formatted as JSON (default) or raw protobuf bytes. When using
85-
JSON format the component automatically fetches the table schema and converts
86-
each message to the corresponding proto representation.
81+
Messages can be formatted as JSON (default) or raw protobuf bytes.
82+
When using JSON format the component automatically fetches the table schema and converts each message to the corresponding proto representation.
8783
8884
WARNING: The proto3 JSON mapping encodes int64 and uint64 values as strings.
89-
JSON messages with integer fields must use string values (e.g. `+"`"+`"age": "30"`+"`"+`
90-
not `+"`"+`"age": 30`+"`"+`), otherwise the write will fail with an unmarshalling error.
85+
JSON messages with integer fields must use string values (e.g. `+"`"+`"age": "30"`+"`"+` not `+"`"+`"age": 30`+"`"+`).
86+
Otherwise the write will fail with an unmarshalling error.
9187
92-
When batching is enabled the table name is resolved from the first message in
93-
each batch; all messages in the same batch are written to that table.
88+
When batching is enabled the table name is resolved from the first message in each batch.
89+
All messages in the same batch are written to that table.
9490
`).
9591
Fields(
9692
service.NewStringField(bqwaFieldProject).
97-
Description("The GCP project ID. If empty, the project is auto-detected from the environment.").
93+
Description("The GCP project ID."+
94+
" If empty, the project is auto-detected from the environment.").
9895
Default(""),
9996
service.NewStringField(bqwaFieldDataset).
10097
Description("The BigQuery dataset ID."),
10198
service.NewInterpolatedStringField(bqwaFieldTable).
102-
Description("The BigQuery table ID. Supports interpolation functions. When batching, resolved from the first message in each batch."),
99+
Description("The BigQuery table ID."+
100+
" Supports interpolation functions."+
101+
" When batching, resolved from the first message in each batch."),
103102
service.NewStringEnumField(bqwaFieldMessageFormat, "json", "protobuf").
104-
Description("The format of input messages. Use 'json' to have the component convert JSON to proto automatically, or 'protobuf' to supply raw proto-encoded bytes.").
103+
Description("The format of input messages."+
104+
" Use 'json' to have the component convert JSON to proto automatically."+
105+
" Use 'protobuf' to supply raw proto-encoded bytes.").
105106
Default("json"),
107+
service.NewOutputMaxInFlightField().Default(64),
108+
service.NewBatchPolicyField(bqwaFieldBatching),
106109
service.NewStringField(bqwaFieldCredentialsJSON).
107-
Description("An optional JSON string containing GCP credentials. If empty, credentials are loaded from the environment.").
110+
Description("An optional JSON string containing GCP credentials."+
111+
" If empty, credentials are loaded from the environment.").
108112
Secret().
109113
Default(""),
110114
service.NewStringField(bqwaFieldTargetPrincipal).
111-
Description("Service account email to impersonate. When set, the output obtains tokens acting as this service account. Requires the caller to have roles/iam.serviceAccountTokenCreator on the target.").
115+
Description("Service account email to impersonate."+
116+
" When set, the output obtains tokens acting as this service account."+
117+
" Requires the caller to have roles/iam.serviceAccountTokenCreator on the target.").
112118
Advanced().
113119
Default(""),
114120
service.NewStringListField(bqwaFieldDelegates).
115-
Description("Optional delegation chain for chained service account impersonation. Each service account must be granted roles/iam.serviceAccountTokenCreator on the next in the chain.").
121+
Description("Optional delegation chain for chained service account impersonation."+
122+
" Each service account must be granted roles/iam.serviceAccountTokenCreator on the next in the chain.").
116123
Advanced().
117124
Default([]any{}),
118125
service.NewStringField(bqwaFieldStreamIdleTimeout).
119-
Description("How long a cached stream can remain unused before being closed. Relevant when the table field uses interpolation to route to many tables.").
126+
Description("How long a cached stream can remain unused before being closed."+
127+
" Relevant when the table field uses interpolation to route to many tables.").
120128
Advanced().
121129
Default("5m"),
122130
service.NewStringField(bqwaFieldStreamSweepInterval).
@@ -125,16 +133,16 @@ each batch; all messages in the same batch are written to that table.
125133
Default("1m"),
126134
service.NewObjectField(bqwaFieldEndpoint,
127135
service.NewStringField(bqwaFieldEndpointHTTP).
128-
Description("Override the BigQuery HTTP endpoint. Useful for local emulators.").
136+
Description("Override the BigQuery HTTP endpoint."+
137+
" Useful for local emulators.").
129138
Default(""),
130139
service.NewStringField(bqwaFieldEndpointGRPC).
131-
Description("Override the BigQuery Storage gRPC endpoint. Useful for local emulators.").
140+
Description("Override the BigQuery Storage gRPC endpoint."+
141+
" Useful for local emulators.").
132142
Default(""),
133143
).
134144
Description("Optional endpoint overrides for the BigQuery and Storage Write API clients.").
135145
Advanced(),
136-
service.NewOutputMaxInFlightField().Default(64),
137-
service.NewBatchPolicyField(bqwaFieldBatching),
138146
)
139147
}
140148

@@ -285,6 +293,9 @@ type bigQueryWriteAPIOutput struct {
285293
var _ service.BatchOutput = (*bigQueryWriteAPIOutput)(nil)
286294

287295
func bigQueryWriteAPIOutputFromConfig(conf *service.ParsedConfig, mgr *service.Resources) (*bigQueryWriteAPIOutput, error) {
296+
if err := license.CheckRunningEnterprise(mgr); err != nil {
297+
return nil, err
298+
}
288299
cfg, err := bigQueryWriteAPIConfigFromParsed(conf)
289300
if err != nil {
290301
return nil, err

0 commit comments

Comments
 (0)