Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions docs/modules/components/pages/outputs/bigquery_cdc_migration.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
= Migrating from `gcp_bigquery` to `gcp_bigquery_write_api` (CDC)
:description: Migrate a CDC pipeline from the load-jobs based gcp_bigquery output to the Storage Write API gcp_bigquery_write_api output.

The `gcp_bigquery_write_api` output supports BigQuery's Change Data Capture (CDC) ingestion via the Storage Write API. Compared to the load-jobs based `gcp_bigquery` output, the Storage Write API delivers:

* Seconds-scale ingest latency instead of minutes (no batch-and-load cycle).
* Per-row UPSERT and DELETE operations without writing intermediate files.
* Sequence-number-based out-of-order resolution via `_CHANGE_SEQUENCE_NUMBER`.

== Config translation

[source,yaml]
----
# BEFORE: load-jobs based output
output:
gcp_bigquery:
project: my-project
dataset: my_dataset
table: events
format: NEWLINE_DELIMITED_JSON
batching:
count: 10000
period: 30s

# AFTER: Storage Write API CDC mode
output:
gcp_bigquery_write_api:
project: my-project
dataset: my_dataset
table: events
write_mode: upsert_delete
change_type: ${! metadata("operation") }
change_sequence_number: ${! metadata("scn") }
primary_keys: [id]
batching:
count: 500
period: 1s
----

The `change_type` expression must resolve to `UPSERT` or `DELETE` per message (case-insensitive). `change_sequence_number` is optional but recommended for any pipeline where out-of-order delivery is possible.

== Schema requirements

The destination table must have a `PRIMARY KEY` declared. To add one to an existing table:

[source,sql]
----
ALTER TABLE my_dataset.events
ADD PRIMARY KEY (id) NOT ENFORCED;
----

For new tables, use the `auto_create_table` option with `primary_keys`:

[source,yaml]
----
auto_create_table: true
schema:
- { name: id, type: STRING, mode: REQUIRED }
- { name: payload, type: JSON }
primary_keys: [id]
----

[NOTE]
====
Composite primary keys are supported with up to 16 columns. The column order in `primary_keys` is significant for composite keys.
====

== Snapshot vs streaming

BigQuery's CDC contract does not permit mixing INSERT (unspecified `_CHANGE_TYPE`) and UPSERT/DELETE rows in the same write. For initial backfills, recommendations are:

. *UPSERT for everything.* The simplest path: write snapshot rows with `change_type: UPSERT` like any other CDC row. Idempotent; tolerates retries; pays the per-PK merge cost on every snapshot row. Suitable for tables up to ~10M rows.
. *Separate snapshot pipeline.* Land snapshot rows via a second `gcp_bigquery_write_api` instance with `write_mode: default_stream` into a separate staging table, then `CREATE TABLE … AS SELECT` into the CDC-active table once the snapshot completes. Avoids the merge cost on the snapshot but adds operational complexity.

== Operational differences

[WARNING]
====
Tables with active CDC do not support DML statements (`DELETE`, `UPDATE`, `MERGE`). If your existing pipeline relies on post-load DML for cleanup, that pattern will no longer work. Move cleanup to BigQuery scheduled queries against a non-CDC table or pre-process rows in the connector.
====

* BigQuery does not enforce primary-key uniqueness; the producer is responsible.
* DELETEs are retained for a two-day window for point-in-time recovery before being permanently dropped.
* The `_CHANGE_TYPE` and `_CHANGE_SEQUENCE_NUMBER` pseudo-columns are injected by the connector; do not declare them in `schema`.
* CDC ingestion requires the default write stream. The `write_mode: pending_stream` exactly-once mode is not compatible with CDC and is rejected at config parse time.

== Related

* xref:outputs/gcp_bigquery_write_api.adoc[`gcp_bigquery_write_api` output reference]
* https://cloud.google.com/bigquery/docs/change-data-capture[BigQuery CDC documentation]
186 changes: 186 additions & 0 deletions docs/modules/components/pages/outputs/gcp_bigquery_write_api.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ output:
dataset: "" # No default (required)
table: "" # No default (required)
message_format: json
change_type: "" # No default (optional)
change_sequence_number: "" # No default (optional)
primary_keys: [] # No default (optional)
max_in_flight: 4
batching:
count: 0
Expand All @@ -66,6 +69,18 @@ output:
dataset: "" # No default (required)
table: "" # No default (required)
message_format: json
write_mode: default_stream
change_type: "" # No default (optional)
change_sequence_number: "" # No default (optional)
primary_keys: [] # No default (optional)
auto_create_table: false
schema: []
time_partitioning:
type: "" # No default (optional)
field: ""
expiration: 0s
require_filter: false
clustering: []
max_in_flight: 4
batching:
count: 0
Expand Down Expand Up @@ -104,6 +119,25 @@ All messages in the same batch are written to that table.
The interpolated table name is sanitized for BigQuery: dots, hyphens, slashes and whitespace are replaced with underscores, non-ASCII-alphanumeric characters are stripped, leading digits are prefixed with `_`, and the result is truncated to 1024 characters.
A name that sanitizes to the empty string is rejected as a permanent error.

== Write modes

The `write_mode` field selects between two write paths:

- `default_stream` (default): the multiplexed default stream. Lowest latency, at-least-once semantics.
- `pending_stream`: a fresh pending stream is allocated per batch; rows are written with sequential offsets, the stream is finalized, then atomically committed. Provides exactly-once semantics within a single committed batch.

== Auto-create

When `auto_create_table` is true, the output creates missing tables on the fly using the configured `schema`, `time_partitioning`, and `clustering`. `AlreadyExists` errors from concurrent creators are treated as success. When the table name is interpolated, every auto-created table receives the same configuration.

== Exactly-once caveat

The exactly-once guarantee of `pending_stream` is "exactly-once within a stream". If a BatchCommitWriteStreams RPC succeeds but its response is lost to a network failure, benthos retries the batch through a new pending stream and the data lands twice. This is a fundamental limitation of the BigQuery Storage Write API exactly-once contract and applies to every implementation.

== CDC migration

When migrating from the load-jobs based `gcp_bigquery` output to CDC mode, see the xref:outputs/bigquery_cdc_migration.adoc[CDC migration guide].


== Fields

Expand Down Expand Up @@ -147,6 +181,158 @@ Options:
, `protobuf`
.

=== `write_mode`

How the output writes to BigQuery. `default_stream` uses the multiplexed default stream (at-least-once, lowest latency). `pending_stream` allocates a per-batch pending stream that commits atomically, providing exactly-once semantics within a single committed batch. `upsert` writes UPSERT-only rows to a BigQuery CDC-enabled table; the target table must have a PRIMARY KEY. `upsert_delete` allows both UPSERT and DELETE rows. Both CDC modes use the default stream as required by BigQuery.


*Type*: `string`

*Default*: `"default_stream"`

Options:
`default_stream`
, `pending_stream`
, `upsert`
, `upsert_delete`
.

=== `change_type`

Bloblang expression resolving to the `_CHANGE_TYPE` pseudo-column value for each row. Must resolve to `UPSERT` or `DELETE` (case-insensitive). Required when `write_mode` is `upsert` or `upsert_delete`. Example: `${! metadata("operation") }`.
This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions].


*Type*: `string`


=== `change_sequence_number`

Optional Bloblang expression resolving to the `_CHANGE_SEQUENCE_NUMBER` pseudo-column value. Format: 1 to 4 sections of 1 to 16 hexadecimal characters each, separated by `/`. Example: `${! metadata("scn") }` or `${! "0/0/0/0" }`. When unset, BigQuery resolves ordering by arrival time.
This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions].


*Type*: `string`


=== `primary_keys`

Optional list of primary-key column names. Required when `auto_create_table` is true and `write_mode` is `upsert` or `upsert_delete`. When the target table is pre-existing, the connector falls back to the PRIMARY KEY declared on the table. Up to 16 columns; composite keys are supported in the same order they are listed.


*Type*: `array`


=== `auto_create_table`

If true and the target table does not exist, the output creates it using the configured `schema`, `time_partitioning`, and `clustering`. AlreadyExists errors from concurrent creators are treated as success. When the table name is interpolated, every auto-created table receives the same schema and partition/clustering settings.


*Type*: `bool`

*Default*: `false`

=== `schema`

Column definitions used by `auto_create_table`. Required when `auto_create_table` is true.


*Type*: `array`

*Default*: `[]`

=== `schema[].name`

Column name.


*Type*: `string`


=== `schema[].type`

BigQuery column type (STRING, BYTES, INTEGER/INT64, FLOAT/FLOAT64, NUMERIC, BIGNUMERIC, BOOLEAN/BOOL, TIMESTAMP, DATE, TIME, DATETIME, GEOGRAPHY, JSON, RECORD).


*Type*: `string`


=== `schema[].mode`

Column mode: NULLABLE (default), REQUIRED, or REPEATED.


*Type*: `string`

*Default*: `"NULLABLE"`

=== `schema[].fields`

For RECORD columns, the list of nested fields. Same shape as the top-level schema list.


*Type*: `array`


=== `time_partitioning`

Optional time-partitioning settings applied during `auto_create_table`. Setting `type` is the trigger — when omitted, the block is treated as absent.


*Type*: `object`


=== `time_partitioning.type`

Partitioning granularity.


*Type*: `string`


Options:
`DAY`
, `HOUR`
, `MONTH`
, `YEAR`
.

=== `time_partitioning.field`

Column to partition on. Must be of type DATE, TIMESTAMP, or DATETIME. If empty, the table uses ingestion-time partitioning (`_PARTITIONTIME`).


*Type*: `string`

*Default*: `""`

=== `time_partitioning.expiration`

Optional partition expiration. Zero means no expiration.


*Type*: `string`

*Default*: `"0s"`

=== `time_partitioning.require_filter`

If true, queries against the table must filter on the partition column.


*Type*: `bool`

*Default*: `false`

=== `clustering`

Optional clustering columns (up to 4) applied during `auto_create_table`. All names must appear in `schema`.


*Type*: `array`

*Default*: `[]`

=== `max_in_flight`

The maximum number of messages to have in flight at a given time. Increase this to improve throughput.
Expand Down
Loading