Skip to content

Commit 4748566

Browse files
committed
bigquery: add pending streams, auto-create tables, partitioning and clustering
Implements CON-394 on top of the schema resolver/evolver work. Three opt-in features layered onto gcp_bigquery_write_api: Pending-stream write mode (write_mode: pending_stream) - Per-batch CreateWriteStream(Pending) → AppendRows → Finalize → BatchCommitWriteStreams. Provides exactly-once semantics within a committed batch. - Rows are split into 9MB chunks (chunkRowsByBytes) so we stay under the AppendRows 10MB hard limit with headroom for proto framing. - In-flight Writes are tracked on a WaitGroup; Close blocks on it so Finalize/BatchCommit cannot race with storage-client teardown. - Default mode (default_stream) is unchanged. Auto-create tables (auto_create_table: true) - schemaResolver gains a *tableCreator. A 404 from Metadata triggers creator.Ensure (idempotent on AlreadyExists) and retries Metadata. - Works for both static and interpolated table names; every auto-created table receives the same configured schema and partition/clustering settings. Schema / time_partitioning / clustering config - New schema YAML field: column name + canonical BQ type (aliases normalised) + mode (NULLABLE/REQUIRED/REPEATED), recursive for RECORD. - New time_partitioning block (DAY/HOUR/MONTH/YEAR + optional field, expiration, require_filter). Absence of the type field is the sentinel for "not configured". - New clustering string list (cap of 4 enforced). - Parser validates that partitioning.field exists in schema with a DATE/TIMESTAMP/DATETIME type and that clustering columns are in schema. Observability - bigquery_write_api_cached_streams (gauge) set on every cache mutation in getOrCreateStream, evictStream, and the idle-sweep. - bigquery_write_api_streams_evicted_total (counter) covers all eviction paths (LRU + on-error + idle-sweep). Tests - Unit tests for: write_mode/auto_create_table parsing, schema parsing + validation, time_partitioning + clustering parsing, schema → BQ conversion, table creator helpers, chunking, pending stream lifecycle plumbing. - Concurrent stress tests under -race for the schema resolver (Resolve/Evict/sf.Do contention) and stream cache (insert + LRU + evict contention). - Integration tests: TestIntegrationAutoCreateTable runs against the goccy emulator. TestIntegrationPendingStreamMode is gated with t.Skip because the emulator does not implement Pending streams or BatchCommitWriteStreams. Docs regenerated with the new fields and a "Write modes" / "Auto-create" / "EOS caveat" section in the output description.
1 parent 952c6ba commit 4748566

10 files changed

Lines changed: 1515 additions & 10 deletions

File tree

docs/modules/components/pages/outputs/gcp_bigquery_write_api.adoc

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,15 @@ output:
6666
dataset: "" # No default (required)
6767
table: "" # No default (required)
6868
message_format: json
69+
write_mode: default_stream
70+
auto_create_table: false
71+
schema: []
72+
time_partitioning:
73+
type: "" # No default (optional)
74+
field: ""
75+
expiration: 0s
76+
require_filter: false
77+
clustering: []
6978
max_in_flight: 4
7079
batching:
7180
count: 0
@@ -104,6 +113,21 @@ All messages in the same batch are written to that table.
104113
The interpolated table name is sanitized for BigQuery: dots, hyphens, slashes and whitespace are replaced with underscores, non-ASCII-alphanumeric characters are stripped, leading digits are prefixed with `_`, and the result is truncated to 1024 characters.
105114
A name that sanitizes to the empty string is rejected as a permanent error.
106115
116+
== Write modes
117+
118+
The `write_mode` field selects between two write paths:
119+
120+
- `default_stream` (default): the multiplexed default stream. Lowest latency, at-least-once semantics.
121+
- `pending_stream`: a fresh pending stream is allocated per batch; rows are written with sequential offsets, the stream is finalized, then atomically committed. Provides exactly-once semantics within a single committed batch.
122+
123+
== Auto-create
124+
125+
When `auto_create_table` is true, the output creates missing tables on the fly using the configured `schema`, `time_partitioning`, and `clustering`. `AlreadyExists` errors from concurrent creators are treated as success. When the table name is interpolated, every auto-created table receives the same configuration.
126+
127+
== Exactly-once caveat
128+
129+
The exactly-once guarantee of `pending_stream` is "exactly-once within a stream". If a BatchCommitWriteStreams RPC succeeds but its response is lost to a network failure, benthos retries the batch through a new pending stream and the data lands twice. This is a fundamental limitation of the BigQuery Storage Write API exactly-once contract and applies to every implementation.
130+
107131
108132
== Fields
109133
@@ -147,6 +171,130 @@ Options:
147171
, `protobuf`
148172
.
149173
174+
=== `write_mode`
175+
176+
How the output writes to BigQuery. `default_stream` uses the multiplexed default stream (at-least-once, lowest latency). `pending_stream` allocates a per-batch pending stream that commits atomically, providing exactly-once semantics within a single committed batch.
177+
178+
179+
*Type*: `string`
180+
181+
*Default*: `"default_stream"`
182+
183+
Options:
184+
`default_stream`
185+
, `pending_stream`
186+
.
187+
188+
=== `auto_create_table`
189+
190+
If true and the target table does not exist, the output creates it using the configured `schema`, `time_partitioning`, and `clustering`. AlreadyExists errors from concurrent creators are treated as success. When the table name is interpolated, every auto-created table receives the same schema and partition/clustering settings.
191+
192+
193+
*Type*: `bool`
194+
195+
*Default*: `false`
196+
197+
=== `schema`
198+
199+
Column definitions used by `auto_create_table`. Required when `auto_create_table` is true.
200+
201+
202+
*Type*: `array`
203+
204+
*Default*: `[]`
205+
206+
=== `schema[].name`
207+
208+
Column name.
209+
210+
211+
*Type*: `string`
212+
213+
214+
=== `schema[].type`
215+
216+
BigQuery column type (STRING, BYTES, INTEGER/INT64, FLOAT/FLOAT64, NUMERIC, BIGNUMERIC, BOOLEAN/BOOL, TIMESTAMP, DATE, TIME, DATETIME, GEOGRAPHY, JSON, RECORD).
217+
218+
219+
*Type*: `string`
220+
221+
222+
=== `schema[].mode`
223+
224+
Column mode: NULLABLE (default), REQUIRED, or REPEATED.
225+
226+
227+
*Type*: `string`
228+
229+
*Default*: `"NULLABLE"`
230+
231+
=== `schema[].fields`
232+
233+
For RECORD columns, the list of nested fields. Same shape as the top-level schema list.
234+
235+
236+
*Type*: `array`
237+
238+
239+
=== `time_partitioning`
240+
241+
Optional time-partitioning settings applied during `auto_create_table`. Setting `type` is the trigger — when omitted, the block is treated as absent.
242+
243+
244+
*Type*: `object`
245+
246+
247+
=== `time_partitioning.type`
248+
249+
Partitioning granularity.
250+
251+
252+
*Type*: `string`
253+
254+
255+
Options:
256+
`DAY`
257+
, `HOUR`
258+
, `MONTH`
259+
, `YEAR`
260+
.
261+
262+
=== `time_partitioning.field`
263+
264+
Column to partition on. Must be of type DATE, TIMESTAMP, or DATETIME. If empty, the table uses ingestion-time partitioning (`_PARTITIONTIME`).
265+
266+
267+
*Type*: `string`
268+
269+
*Default*: `""`
270+
271+
=== `time_partitioning.expiration`
272+
273+
Optional partition expiration. Zero means no expiration.
274+
275+
276+
*Type*: `string`
277+
278+
*Default*: `"0s"`
279+
280+
=== `time_partitioning.require_filter`
281+
282+
If true, queries against the table must filter on the partition column.
283+
284+
285+
*Type*: `bool`
286+
287+
*Default*: `false`
288+
289+
=== `clustering`
290+
291+
Optional clustering columns (up to 4) applied during `auto_create_table`. All names must appear in `schema`.
292+
293+
294+
*Type*: `array`
295+
296+
*Default*: `[]`
297+
150298
=== `max_in_flight`
151299
152300
The maximum number of messages to have in flight at a given time. Increase this to improve throughput.

internal/impl/gcp/enterprise/bigquery/integration_test.go

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,182 @@ func TestIntegrationSchemaEvolution(t *testing.T) {
250250
assert.True(t, evolved, "missing==0 must signal retry, not a permanent failure, so concurrent batches are not dropped to DLQ")
251251
}
252252

253+
func TestIntegrationAutoCreateTable(t *testing.T) {
254+
integration.CheckSkip(t)
255+
256+
const (
257+
projectID = "test-project"
258+
datasetID = "test_dataset"
259+
tableID = "auto_created"
260+
)
261+
262+
emu := startEmulator(t, projectID, datasetID)
263+
264+
t.Log("Given the table does not exist and auto_create_table is enabled")
265+
266+
sb := service.NewStreamBuilder()
267+
require.NoError(t, sb.SetLoggerYAML(`level: DEBUG`))
268+
269+
sendFn, err := sb.AddProducerFunc()
270+
require.NoError(t, err)
271+
272+
// Schema kept simple (STRING columns + ingestion-time partitioning) so the
273+
// test exercises auto-create + partitioning + clustering without tripping
274+
// over the protojson↔INT64 representation rules that TIMESTAMP/INTEGER
275+
// columns require.
276+
require.NoError(t, sb.AddOutputYAML(fmt.Sprintf(`
277+
gcp_bigquery_write_api:
278+
project: %s
279+
dataset: %s
280+
table: %s
281+
auto_create_table: true
282+
schema:
283+
- { name: id, type: STRING, mode: REQUIRED }
284+
- { name: payload, type: STRING }
285+
- { name: tenant_id, type: STRING }
286+
time_partitioning:
287+
type: DAY
288+
clustering: [tenant_id]
289+
endpoint:
290+
http: %s
291+
grpc: %s
292+
`, projectID, datasetID, tableID, emu.httpEndpoint, emu.grpcEndpoint)))
293+
294+
stream, err := sb.Build()
295+
require.NoError(t, err)
296+
license.InjectTestService(stream.Resources())
297+
298+
go func() {
299+
if err := stream.Run(t.Context()); err != nil && !errors.Is(err, context.Canceled) {
300+
t.Errorf("stream error: %v", err)
301+
}
302+
}()
303+
t.Cleanup(func() {
304+
if err := stream.StopWithin(10 * time.Second); err != nil {
305+
t.Log(err)
306+
}
307+
})
308+
309+
require.NoError(t, sendFn(t.Context(), service.NewMessage([]byte(
310+
`{"id":"a","payload":"hello","tenant_id":"t1"}`))))
311+
312+
t.Log("Then the table is auto-created with the configured schema")
313+
require.Eventually(t, func() bool {
314+
_, err := emu.bqClient.Dataset(datasetID).Table(tableID).Metadata(t.Context())
315+
return err == nil
316+
}, 10*time.Second, 250*time.Millisecond)
317+
318+
meta, err := emu.bqClient.Dataset(datasetID).Table(tableID).Metadata(t.Context())
319+
require.NoError(t, err)
320+
require.Len(t, meta.Schema, 3)
321+
var names []string
322+
for _, f := range meta.Schema {
323+
names = append(names, f.Name)
324+
}
325+
assert.ElementsMatch(t, []string{"id", "payload", "tenant_id"}, names)
326+
// Partitioning/clustering metadata may be a no-op on the emulator — guard
327+
// so this test still proves auto-create works without coupling to emulator
328+
// completeness.
329+
if meta.TimePartitioning != nil {
330+
assert.Equal(t, bigquery.DayPartitioningType, meta.TimePartitioning.Type)
331+
// Ingestion-time partitioning leaves Field empty (uses _PARTITIONTIME).
332+
assert.Empty(t, meta.TimePartitioning.Field)
333+
}
334+
if meta.Clustering != nil {
335+
assert.Equal(t, []string{"tenant_id"}, meta.Clustering.Fields)
336+
}
337+
338+
t.Log("And the row lands in the table")
339+
require.Eventually(t, func() bool {
340+
it := emu.bqClient.Dataset(datasetID).Table(tableID).Read(t.Context())
341+
var count int
342+
for {
343+
var row map[string]bigquery.Value
344+
if err := it.Next(&row); errors.Is(err, iterator.Done) {
345+
break
346+
} else if err != nil {
347+
return false
348+
}
349+
count++
350+
}
351+
return count >= 1
352+
}, 30*time.Second, 500*time.Millisecond)
353+
}
354+
355+
func TestIntegrationPendingStreamMode(t *testing.T) {
356+
integration.CheckSkip(t)
357+
// The goccy/bigquery-emulator (used by these integration tests) does not
358+
// implement the Pending write-stream type or BatchCommitWriteStreams; the
359+
// test hangs waiting for the commit RPC. Skip until the emulator gains
360+
// support or these tests can target real BigQuery in a nightly job.
361+
t.Skip("goccy bigquery-emulator does not implement Pending streams / BatchCommitWriteStreams")
362+
363+
const (
364+
projectID = "test-project"
365+
datasetID = "test_dataset"
366+
tableID = "pending_test"
367+
)
368+
369+
emu := startEmulator(t, projectID, datasetID)
370+
371+
t.Log("Given a table exists for pending-stream writes")
372+
require.NoError(t, emu.bqClient.Dataset(datasetID).Table(tableID).Create(t.Context(), &bigquery.TableMetadata{
373+
Schema: bigquery.Schema{
374+
{Name: "id", Type: bigquery.StringFieldType, Required: true},
375+
},
376+
}))
377+
378+
sb := service.NewStreamBuilder()
379+
require.NoError(t, sb.SetLoggerYAML(`level: DEBUG`))
380+
381+
sendFn, err := sb.AddProducerFunc()
382+
require.NoError(t, err)
383+
384+
require.NoError(t, sb.AddOutputYAML(fmt.Sprintf(`
385+
gcp_bigquery_write_api:
386+
project: %s
387+
dataset: %s
388+
table: %s
389+
write_mode: pending_stream
390+
batching:
391+
count: 3
392+
endpoint:
393+
http: %s
394+
grpc: %s
395+
`, projectID, datasetID, tableID, emu.httpEndpoint, emu.grpcEndpoint)))
396+
397+
stream, err := sb.Build()
398+
require.NoError(t, err)
399+
license.InjectTestService(stream.Resources())
400+
401+
go func() {
402+
if err := stream.Run(t.Context()); err != nil && !errors.Is(err, context.Canceled) {
403+
t.Errorf("stream error: %v", err)
404+
}
405+
}()
406+
t.Cleanup(func() { _ = stream.StopWithin(10 * time.Second) })
407+
408+
for _, id := range []string{"a", "b", "c"} {
409+
require.NoError(t, sendFn(t.Context(), service.NewMessage([]byte(`{"id":"`+id+`"}`))))
410+
}
411+
412+
t.Log("Then all 3 rows are committed atomically and visible")
413+
require.Eventually(t, func() bool {
414+
it := emu.bqClient.Dataset(datasetID).Table(tableID).Read(t.Context())
415+
var count int
416+
for {
417+
var row map[string]bigquery.Value
418+
if err := it.Next(&row); errors.Is(err, iterator.Done) {
419+
break
420+
} else if err != nil {
421+
return false
422+
}
423+
count++
424+
}
425+
return count == 3
426+
}, 30*time.Second, 500*time.Millisecond)
427+
}
428+
253429
func TestIntegrationTableNameSanitization(t *testing.T) {
254430
integration.CheckSkip(t)
255431

0 commit comments

Comments
 (0)