diff --git a/docs/modules/components/pages/outputs/bigquery_cdc_migration.adoc b/docs/modules/components/pages/outputs/bigquery_cdc_migration.adoc new file mode 100644 index 0000000000..ef39506c3e --- /dev/null +++ b/docs/modules/components/pages/outputs/bigquery_cdc_migration.adoc @@ -0,0 +1,90 @@ += Migrating from `gcp_bigquery` to `gcp_bigquery_write_api` (CDC) +:description: Migrate a CDC pipeline from the load-jobs based gcp_bigquery output to the Storage Write API gcp_bigquery_write_api output. + +The `gcp_bigquery_write_api` output supports BigQuery's Change Data Capture (CDC) ingestion via the Storage Write API. Compared to the load-jobs based `gcp_bigquery` output, the Storage Write API delivers: + +* Seconds-scale ingest latency instead of minutes (no batch-and-load cycle). +* Per-row UPSERT and DELETE operations without writing intermediate files. +* Sequence-number-based out-of-order resolution via `_CHANGE_SEQUENCE_NUMBER`. + +== Config translation + +[source,yaml] +---- +# BEFORE: load-jobs based output +output: + gcp_bigquery: + project: my-project + dataset: my_dataset + table: events + format: NEWLINE_DELIMITED_JSON + batching: + count: 10000 + period: 30s + +# AFTER: Storage Write API CDC mode +output: + gcp_bigquery_write_api: + project: my-project + dataset: my_dataset + table: events + write_mode: upsert_delete + change_type: ${! metadata("operation") } + change_sequence_number: ${! metadata("scn") } + primary_keys: [id] + batching: + count: 500 + period: 1s +---- + +The `change_type` expression must resolve to `UPSERT` or `DELETE` per message (case-insensitive). `change_sequence_number` is optional but recommended for any pipeline where out-of-order delivery is possible. + +== Schema requirements + +The destination table must have a `PRIMARY KEY` declared. To add one to an existing table: + +[source,sql] +---- +ALTER TABLE my_dataset.events +ADD PRIMARY KEY (id) NOT ENFORCED; +---- + +For new tables, use the `auto_create_table` option with `primary_keys`: + +[source,yaml] +---- +auto_create_table: true +schema: + - { name: id, type: STRING, mode: REQUIRED } + - { name: payload, type: JSON } +primary_keys: [id] +---- + +[NOTE] +==== +Composite primary keys are supported with up to 16 columns. The column order in `primary_keys` is significant for composite keys. +==== + +== Snapshot vs streaming + +BigQuery's CDC contract does not permit mixing INSERT (unspecified `_CHANGE_TYPE`) and UPSERT/DELETE rows in the same write. For initial backfills, recommendations are: + +. *UPSERT for everything.* The simplest path: write snapshot rows with `change_type: UPSERT` like any other CDC row. Idempotent; tolerates retries; pays the per-PK merge cost on every snapshot row. Suitable for tables up to ~10M rows. +. *Separate snapshot pipeline.* Land snapshot rows via a second `gcp_bigquery_write_api` instance with `write_mode: default_stream` into a separate staging table, then `CREATE TABLE … AS SELECT` into the CDC-active table once the snapshot completes. Avoids the merge cost on the snapshot but adds operational complexity. + +== Operational differences + +[WARNING] +==== +Tables with active CDC do not support DML statements (`DELETE`, `UPDATE`, `MERGE`). If your existing pipeline relies on post-load DML for cleanup, that pattern will no longer work. Move cleanup to BigQuery scheduled queries against a non-CDC table or pre-process rows in the connector. +==== + +* BigQuery does not enforce primary-key uniqueness; the producer is responsible. +* DELETEs are retained for a two-day window for point-in-time recovery before being permanently dropped. +* The `_CHANGE_TYPE` and `_CHANGE_SEQUENCE_NUMBER` pseudo-columns are injected by the connector; do not declare them in `schema`. +* CDC ingestion requires the default write stream. The `write_mode: pending_stream` exactly-once mode is not compatible with CDC and is rejected at config parse time. + +== Related + +* xref:outputs/gcp_bigquery_write_api.adoc[`gcp_bigquery_write_api` output reference] +* https://cloud.google.com/bigquery/docs/change-data-capture[BigQuery CDC documentation] diff --git a/docs/modules/components/pages/outputs/gcp_bigquery_write_api.adoc b/docs/modules/components/pages/outputs/gcp_bigquery_write_api.adoc index 58c2842c20..3ec8fc28b9 100644 --- a/docs/modules/components/pages/outputs/gcp_bigquery_write_api.adoc +++ b/docs/modules/components/pages/outputs/gcp_bigquery_write_api.adoc @@ -43,6 +43,9 @@ output: dataset: "" # No default (required) table: "" # No default (required) message_format: json + change_type: "" # No default (optional) + change_sequence_number: "" # No default (optional) + primary_keys: [] # No default (optional) max_in_flight: 4 batching: count: 0 @@ -66,6 +69,18 @@ output: dataset: "" # No default (required) table: "" # No default (required) message_format: json + write_mode: default_stream + change_type: "" # No default (optional) + change_sequence_number: "" # No default (optional) + primary_keys: [] # No default (optional) + auto_create_table: false + schema: [] + time_partitioning: + type: "" # No default (optional) + field: "" + expiration: 0s + require_filter: false + clustering: [] max_in_flight: 4 batching: count: 0 @@ -104,6 +119,25 @@ All messages in the same batch are written to that table. The interpolated table name is sanitized for BigQuery: dots, hyphens, slashes and whitespace are replaced with underscores, non-ASCII-alphanumeric characters are stripped, leading digits are prefixed with `_`, and the result is truncated to 1024 characters. A name that sanitizes to the empty string is rejected as a permanent error. +== Write modes + +The `write_mode` field selects between two write paths: + +- `default_stream` (default): the multiplexed default stream. Lowest latency, at-least-once semantics. +- `pending_stream`: a fresh pending stream is allocated per batch; rows are written with sequential offsets, the stream is finalized, then atomically committed. Provides exactly-once semantics within a single committed batch. + +== Auto-create + +When `auto_create_table` is true, the output creates missing tables on the fly using the configured `schema`, `time_partitioning`, and `clustering`. `AlreadyExists` errors from concurrent creators are treated as success. When the table name is interpolated, every auto-created table receives the same configuration. + +== Exactly-once caveat + +The exactly-once guarantee of `pending_stream` is "exactly-once within a stream". If a BatchCommitWriteStreams RPC succeeds but its response is lost to a network failure, benthos retries the batch through a new pending stream and the data lands twice. This is a fundamental limitation of the BigQuery Storage Write API exactly-once contract and applies to every implementation. + +== CDC migration + +When migrating from the load-jobs based `gcp_bigquery` output to CDC mode, see the xref:outputs/bigquery_cdc_migration.adoc[CDC migration guide]. + == Fields @@ -147,6 +181,158 @@ Options: , `protobuf` . +=== `write_mode` + +How the output writes to BigQuery. `default_stream` uses the multiplexed default stream (at-least-once, lowest latency). `pending_stream` allocates a per-batch pending stream that commits atomically, providing exactly-once semantics within a single committed batch. `upsert` writes UPSERT-only rows to a BigQuery CDC-enabled table; the target table must have a PRIMARY KEY. `upsert_delete` allows both UPSERT and DELETE rows. Both CDC modes use the default stream as required by BigQuery. + + +*Type*: `string` + +*Default*: `"default_stream"` + +Options: +`default_stream` +, `pending_stream` +, `upsert` +, `upsert_delete` +. + +=== `change_type` + +Bloblang expression resolving to the `_CHANGE_TYPE` pseudo-column value for each row. Must resolve to `UPSERT` or `DELETE` (case-insensitive). Required when `write_mode` is `upsert` or `upsert_delete`. Example: `${! metadata("operation") }`. +This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. + + +*Type*: `string` + + +=== `change_sequence_number` + +Optional Bloblang expression resolving to the `_CHANGE_SEQUENCE_NUMBER` pseudo-column value. Format: 1 to 4 sections of 1 to 16 hexadecimal characters each, separated by `/`. Example: `${! metadata("scn") }` or `${! "0/0/0/0" }`. When unset, BigQuery resolves ordering by arrival time. +This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. + + +*Type*: `string` + + +=== `primary_keys` + +Optional list of primary-key column names. Required when `auto_create_table` is true and `write_mode` is `upsert` or `upsert_delete`. When the target table is pre-existing, the connector falls back to the PRIMARY KEY declared on the table. Up to 16 columns; composite keys are supported in the same order they are listed. + + +*Type*: `array` + + +=== `auto_create_table` + +If true and the target table does not exist, the output creates it using the configured `schema`, `time_partitioning`, and `clustering`. AlreadyExists errors from concurrent creators are treated as success. When the table name is interpolated, every auto-created table receives the same schema and partition/clustering settings. + + +*Type*: `bool` + +*Default*: `false` + +=== `schema` + +Column definitions used by `auto_create_table`. Required when `auto_create_table` is true. + + +*Type*: `array` + +*Default*: `[]` + +=== `schema[].name` + +Column name. + + +*Type*: `string` + + +=== `schema[].type` + +BigQuery column type (STRING, BYTES, INTEGER/INT64, FLOAT/FLOAT64, NUMERIC, BIGNUMERIC, BOOLEAN/BOOL, TIMESTAMP, DATE, TIME, DATETIME, GEOGRAPHY, JSON, RECORD). + + +*Type*: `string` + + +=== `schema[].mode` + +Column mode: NULLABLE (default), REQUIRED, or REPEATED. + + +*Type*: `string` + +*Default*: `"NULLABLE"` + +=== `schema[].fields` + +For RECORD columns, the list of nested fields. Same shape as the top-level schema list. + + +*Type*: `array` + + +=== `time_partitioning` + +Optional time-partitioning settings applied during `auto_create_table`. Setting `type` is the trigger — when omitted, the block is treated as absent. + + +*Type*: `object` + + +=== `time_partitioning.type` + +Partitioning granularity. + + +*Type*: `string` + + +Options: +`DAY` +, `HOUR` +, `MONTH` +, `YEAR` +. + +=== `time_partitioning.field` + +Column to partition on. Must be of type DATE, TIMESTAMP, or DATETIME. If empty, the table uses ingestion-time partitioning (`_PARTITIONTIME`). + + +*Type*: `string` + +*Default*: `""` + +=== `time_partitioning.expiration` + +Optional partition expiration. Zero means no expiration. + + +*Type*: `string` + +*Default*: `"0s"` + +=== `time_partitioning.require_filter` + +If true, queries against the table must filter on the partition column. + + +*Type*: `bool` + +*Default*: `false` + +=== `clustering` + +Optional clustering columns (up to 4) applied during `auto_create_table`. All names must appear in `schema`. + + +*Type*: `array` + +*Default*: `[]` + === `max_in_flight` The maximum number of messages to have in flight at a given time. Increase this to improve throughput. diff --git a/internal/impl/gcp/enterprise/bigquery/cdc.go b/internal/impl/gcp/enterprise/bigquery/cdc.go new file mode 100644 index 0000000000..c54548f607 --- /dev/null +++ b/internal/impl/gcp/enterprise/bigquery/cdc.go @@ -0,0 +1,195 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Licensed as a Redpanda Enterprise file under the Redpanda Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/redpanda-data/connect/blob/main/licenses/rcl.md + +package bigquery + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "regexp" + "strings" + + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/descriptorpb" + + "github.com/redpanda-data/benthos/v4/public/service" +) + +// validateChangeType normalises and validates a Bloblang-resolved _CHANGE_TYPE +// value against BigQuery's CDC contract. BigQuery only accepts UPSERT and +// DELETE; INSERT is expressed by omitting the pseudo-column entirely, and +// mixing INSERT with UPSERT/DELETE in the same write is unsupported. +// allowDelete is true for write_mode=upsert_delete. +func validateChangeType(raw string, allowDelete bool) (string, error) { + v := strings.ToUpper(strings.TrimSpace(raw)) + switch v { + case "UPSERT": + return v, nil + case "DELETE": + if !allowDelete { + return "", errors.New("change_type DELETE is only valid when write_mode is upsert_delete") + } + return v, nil + case "": + return "", errors.New("change_type resolved to an empty value") + default: + return "", fmt.Errorf("change_type %q is not UPSERT or DELETE", raw) + } +} + +// changeSequenceNumberPattern matches BigQuery's _CHANGE_SEQUENCE_NUMBER format: +// 1 to 4 sections of 1 to 16 hexadecimal characters each, separated by '/'. +var changeSequenceNumberPattern = regexp.MustCompile(`^[0-9A-Fa-f]{1,16}(/[0-9A-Fa-f]{1,16}){0,3}$`) + +// validateChangeSequenceNumber checks that raw conforms to BigQuery's CDC +// sequence-number format. BigQuery rejects values that do not match this +// format with INVALID_ARGUMENT at AppendRows time, so we validate per-message +// before the network round trip. +func validateChangeSequenceNumber(raw string) error { + if !changeSequenceNumberPattern.MatchString(raw) { + return fmt.Errorf("change_sequence_number %q does not match BigQuery format (1-4 sections of 1-16 hex chars separated by /)", raw) + } + return nil +} + +// cdcFieldInfo captures the field numbers assigned to CDC pseudo-columns in +// the wrapped descriptor so per-message injection can target them without +// re-scanning the descriptor. +type cdcFieldInfo struct { + changeTypeFieldNumber int32 + changeSeqFieldNumber int32 // 0 if sequence number is not configured +} + +// wrapDescriptorForCDC returns a copy of base with `_CHANGE_TYPE` and +// optionally `_CHANGE_SEQUENCE_NUMBER` appended as STRING fields at the next +// free field numbers (max existing + 1, +2). The base descriptor is left +// untouched. +func wrapDescriptorForCDC(base *descriptorpb.DescriptorProto, withSequenceNumber bool) (*descriptorpb.DescriptorProto, cdcFieldInfo, error) { + if base == nil { + return nil, cdcFieldInfo{}, errors.New("wrapDescriptorForCDC: nil base descriptor") + } + var maxNum int32 + for _, f := range base.Field { + if n := f.GetNumber(); n > maxNum { + maxNum = n + } + } + info := cdcFieldInfo{changeTypeFieldNumber: maxNum + 1} + if withSequenceNumber { + info.changeSeqFieldNumber = maxNum + 2 + } + wrapped := proto.Clone(base).(*descriptorpb.DescriptorProto) + wrapped.Field = append(wrapped.Field, &descriptorpb.FieldDescriptorProto{ + Name: new("_CHANGE_TYPE"), + Number: new(info.changeTypeFieldNumber), + Type: descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum(), + Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(), + }) + if withSequenceNumber { + wrapped.Field = append(wrapped.Field, &descriptorpb.FieldDescriptorProto{ + Name: new("_CHANGE_SEQUENCE_NUMBER"), + Number: new(info.changeSeqFieldNumber), + Type: descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum(), + Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(), + }) + } + return wrapped, info, nil +} + +// cdcInjector holds the immutable per-output CDC configuration: validated +// Bloblang fields and the allowDelete flag. Field numbers for the pseudo +// columns live on each streamWithDescriptor (see streamWithDescriptor.cdcFields) +// because they depend on the user table's descriptor and must not be shared +// across concurrent createStream calls. +type cdcInjector struct { + log *service.Logger + changeType *service.InterpolatedString + changeSeq *service.InterpolatedString // nil when not configured + allowDelete bool // true for write_mode=upsert_delete +} + +// injectCDCJSON injects _CHANGE_TYPE (and _CHANGE_SEQUENCE_NUMBER when seq is +// non-empty) into the JSON object bytes without round-tripping through a Go +// value tree. Using json.RawMessage preserves the exact byte representation of +// every user field — int64 strings stay strings, NUMERIC strings keep their +// precision, base64 BYTES stay base64. Returns an error if jsonBytes is not a +// JSON object (validated by the decoder). +func injectCDCJSON(jsonBytes []byte, changeType, changeSeq string) ([]byte, error) { + dec := json.NewDecoder(bytes.NewReader(jsonBytes)) + var raw map[string]json.RawMessage + if err := dec.Decode(&raw); err != nil { + return nil, fmt.Errorf("parsing JSON: %w", err) + } + if raw == nil { + // Empty map after decoding a JSON null — reject; CDC needs an object. + return nil, errors.New("payload is not a JSON object") + } + encodedCT, err := json.Marshal(changeType) + if err != nil { + return nil, fmt.Errorf("encoding change_type: %w", err) + } + raw["_CHANGE_TYPE"] = encodedCT + if changeSeq != "" { + encodedSeq, err := json.Marshal(changeSeq) + if err != nil { + return nil, fmt.Errorf("encoding change_sequence_number: %w", err) + } + raw["_CHANGE_SEQUENCE_NUMBER"] = encodedSeq + } + return json.Marshal(raw) +} + +// validateCDCPrimaryKeys enforces BigQuery's CDC contract that the destination +// table must have a PRIMARY KEY. configPKs is the user-declared list (may be +// nil), tablePKs is what BigQuery reports for the table (may be nil if no +// constraint is declared). The function fails if neither source has PKs, and +// fails on mismatch when both are present. +func validateCDCPrimaryKeys(configPKs, tablePKs []string, tableID string) error { + if len(configPKs) == 0 && len(tablePKs) == 0 { + return fmt.Errorf("CDC mode requires a PRIMARY KEY on table %q; declare one via primary_keys config or `ALTER TABLE … ADD PRIMARY KEY`", tableID) + } + if len(configPKs) > 0 && len(tablePKs) > 0 { + if !equalStringSlices(configPKs, tablePKs) { + return fmt.Errorf("primary_keys config %v does not match table %q PRIMARY KEY %v", + configPKs, tableID, tablePKs) + } + } + return nil +} + +func equalStringSlices(a, b []string) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true +} + +// validateAndResolveCDC validates the resolved Bloblang values for a single +// row and returns the normalised change_type plus the original sequence +// number string. The caller is responsible for passing the validated values +// to injectCDCJSON. Returns an error suitable for BatchError.Failed. +func (i *cdcInjector) validateAndResolveCDC(rawChangeType, rawSeq string) (changeType string, seq string, err error) { + ct, err := validateChangeType(rawChangeType, i.allowDelete) + if err != nil { + return "", "", err + } + if i.changeSeq != nil { + if err := validateChangeSequenceNumber(rawSeq); err != nil { + return "", "", err + } + return ct, rawSeq, nil + } + return ct, "", nil +} diff --git a/internal/impl/gcp/enterprise/bigquery/cdc_test.go b/internal/impl/gcp/enterprise/bigquery/cdc_test.go new file mode 100644 index 0000000000..595cd2eb4b --- /dev/null +++ b/internal/impl/gcp/enterprise/bigquery/cdc_test.go @@ -0,0 +1,261 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Licensed as a Redpanda Enterprise file under the Redpanda Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/redpanda-data/connect/blob/main/licenses/rcl.md + +package bigquery + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/descriptorpb" + + "github.com/redpanda-data/benthos/v4/public/service" +) + +func TestValidateChangeType(t *testing.T) { + t.Run("upsert mode accepts UPSERT only", func(t *testing.T) { + v, err := validateChangeType("UPSERT", false) + require.NoError(t, err) + assert.Equal(t, "UPSERT", v) + + v, err = validateChangeType("upsert", false) + require.NoError(t, err) + assert.Equal(t, "UPSERT", v) + + _, err = validateChangeType("DELETE", false) + require.Error(t, err) + assert.Contains(t, err.Error(), "DELETE") + }) + + t.Run("upsert_delete mode accepts UPSERT and DELETE", func(t *testing.T) { + v, err := validateChangeType("UPSERT", true) + require.NoError(t, err) + assert.Equal(t, "UPSERT", v) + + v, err = validateChangeType("delete", true) + require.NoError(t, err) + assert.Equal(t, "DELETE", v) + }) + + t.Run("rejects INSERT", func(t *testing.T) { + _, err := validateChangeType("INSERT", true) + require.Error(t, err) + assert.Contains(t, err.Error(), "INSERT") + }) + + t.Run("rejects empty", func(t *testing.T) { + _, err := validateChangeType("", true) + require.Error(t, err) + }) + + t.Run("rejects arbitrary string", func(t *testing.T) { + _, err := validateChangeType("MAYBE", true) + require.Error(t, err) + }) +} + +func TestValidateChangeSequenceNumber(t *testing.T) { + t.Run("accepts valid formats", func(t *testing.T) { + for _, in := range []string{ + "0", + "FF", + "abcd", + "FFFFFFFFFFFFFFFF", + "0/0", + "A/B/C", + "A/B/C/D", + "FFFFFFFFFFFFFFFF/FFFFFFFFFFFFFFFF/FFFFFFFFFFFFFFFF/FFFFFFFFFFFFFFFF", + } { + t.Run(in, func(t *testing.T) { + require.NoError(t, validateChangeSequenceNumber(in)) + }) + } + }) + + t.Run("rejects invalid formats", func(t *testing.T) { + for _, in := range []string{ + "", + "G", + "/0", + "0/", + "0/0/0/0/0", + "FFFFFFFFFFFFFFFFA", + "0/FFFFFFFFFFFFFFFFA", + "0 0", + "0/0 /0", + } { + t.Run(in, func(t *testing.T) { + err := validateChangeSequenceNumber(in) + require.Error(t, err, "expected error for input %q", in) + }) + } + }) +} + +func TestWrapDescriptorForCDC(t *testing.T) { + base := &descriptorpb.DescriptorProto{ + Name: new("TestMessage"), + Field: []*descriptorpb.FieldDescriptorProto{ + { + Name: new("id"), + Number: new(int32(1)), + Type: descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum(), + Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(), + }, + }, + } + + t.Run("appends _CHANGE_TYPE only when no sequence configured", func(t *testing.T) { + wrapped, info, err := wrapDescriptorForCDC(base, false) + require.NoError(t, err) + require.Len(t, wrapped.Field, 2) + assert.Equal(t, "_CHANGE_TYPE", wrapped.Field[1].GetName()) + assert.Equal(t, int32(2), wrapped.Field[1].GetNumber()) + assert.Equal(t, descriptorpb.FieldDescriptorProto_TYPE_STRING, wrapped.Field[1].GetType()) + assert.Equal(t, int32(2), info.changeTypeFieldNumber) + assert.Equal(t, int32(0), info.changeSeqFieldNumber) + }) + + t.Run("appends both pseudo-columns when sequence configured", func(t *testing.T) { + wrapped, info, err := wrapDescriptorForCDC(base, true) + require.NoError(t, err) + require.Len(t, wrapped.Field, 3) + assert.Equal(t, "_CHANGE_TYPE", wrapped.Field[1].GetName()) + assert.Equal(t, "_CHANGE_SEQUENCE_NUMBER", wrapped.Field[2].GetName()) + assert.Equal(t, int32(2), info.changeTypeFieldNumber) + assert.Equal(t, int32(3), info.changeSeqFieldNumber) + }) + + t.Run("preserves base descriptor", func(t *testing.T) { + _, _, err := wrapDescriptorForCDC(base, true) + require.NoError(t, err) + require.Len(t, base.Field, 1) + assert.Equal(t, "id", base.Field[0].GetName()) + }) + + t.Run("picks next field number above max", func(t *testing.T) { + withGap := &descriptorpb.DescriptorProto{ + Name: new("WithGap"), + Field: []*descriptorpb.FieldDescriptorProto{ + {Name: new("a"), Number: new(int32(1)), Type: descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum(), Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()}, + {Name: new("b"), Number: new(int32(7)), Type: descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum(), Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()}, + }, + } + wrapped, info, err := wrapDescriptorForCDC(withGap, true) + require.NoError(t, err) + assert.Equal(t, int32(8), info.changeTypeFieldNumber) + assert.Equal(t, int32(9), info.changeSeqFieldNumber) + assert.Equal(t, int32(8), wrapped.Field[2].GetNumber()) + assert.Equal(t, int32(9), wrapped.Field[3].GetNumber()) + }) +} + +func TestCDCInjectorValidateAndResolve(t *testing.T) { + t.Run("returns normalised change_type and sequence", func(t *testing.T) { + inj := &cdcInjector{ + allowDelete: true, + changeSeq: &service.InterpolatedString{}, + } + ct, seq, err := inj.validateAndResolveCDC("upsert", "0/0") + require.NoError(t, err) + assert.Equal(t, "UPSERT", ct) + assert.Equal(t, "0/0", seq) + }) + + t.Run("rejects bad change_type", func(t *testing.T) { + inj := &cdcInjector{allowDelete: true} + _, _, err := inj.validateAndResolveCDC("MAYBE", "") + require.Error(t, err) + }) + + t.Run("rejects bad sequence number", func(t *testing.T) { + inj := &cdcInjector{allowDelete: true, changeSeq: &service.InterpolatedString{}} + _, _, err := inj.validateAndResolveCDC("UPSERT", "not-hex") + require.Error(t, err) + assert.Contains(t, err.Error(), "change_sequence_number") + }) + + t.Run("returns empty seq when not configured", func(t *testing.T) { + inj := &cdcInjector{allowDelete: true} + ct, seq, err := inj.validateAndResolveCDC("UPSERT", "") + require.NoError(t, err) + assert.Equal(t, "UPSERT", ct) + assert.Empty(t, seq) + }) +} + +func TestValidateCDCPrimaryKeys(t *testing.T) { + t.Run("config only", func(t *testing.T) { + require.NoError(t, validateCDCPrimaryKeys([]string{"id"}, nil, "t")) + }) + t.Run("table only", func(t *testing.T) { + require.NoError(t, validateCDCPrimaryKeys(nil, []string{"id"}, "t")) + }) + t.Run("matching config and table", func(t *testing.T) { + require.NoError(t, validateCDCPrimaryKeys([]string{"tenant_id", "id"}, []string{"tenant_id", "id"}, "t")) + }) + t.Run("mismatched", func(t *testing.T) { + err := validateCDCPrimaryKeys([]string{"id"}, []string{"tenant_id"}, "t") + require.Error(t, err) + assert.Contains(t, err.Error(), "does not match") + }) + t.Run("mismatched order", func(t *testing.T) { + err := validateCDCPrimaryKeys([]string{"a", "b"}, []string{"b", "a"}, "t") + require.Error(t, err) + }) + t.Run("neither set", func(t *testing.T) { + err := validateCDCPrimaryKeys(nil, nil, "events") + require.Error(t, err) + assert.Contains(t, err.Error(), "PRIMARY KEY") + assert.Contains(t, err.Error(), "events") + }) +} + +func TestInjectCDCJSON(t *testing.T) { + t.Run("appends pseudo-columns to non-empty object", func(t *testing.T) { + out, err := injectCDCJSON([]byte(`{"id":"x","age":"30"}`), "UPSERT", "0/1") + require.NoError(t, err) + var got map[string]any + require.NoError(t, json.Unmarshal(out, &got)) + assert.Equal(t, "x", got["id"]) + assert.Equal(t, "30", got["age"]) // string preserved, not coerced to number + assert.Equal(t, "UPSERT", got["_CHANGE_TYPE"]) + assert.Equal(t, "0/1", got["_CHANGE_SEQUENCE_NUMBER"]) + }) + + t.Run("omits sequence when empty", func(t *testing.T) { + out, err := injectCDCJSON([]byte(`{"id":"x"}`), "DELETE", "") + require.NoError(t, err) + assert.Contains(t, string(out), `"_CHANGE_TYPE":"DELETE"`) + assert.NotContains(t, string(out), `_CHANGE_SEQUENCE_NUMBER`) + }) + + t.Run("preserves int-as-string fidelity", func(t *testing.T) { + // 2^53 + 1 — would lose precision through float64 round-trip. + out, err := injectCDCJSON([]byte(`{"id":"9007199254740993"}`), "UPSERT", "") + require.NoError(t, err) + assert.Contains(t, string(out), `"9007199254740993"`) + }) + + t.Run("rejects non-object payload", func(t *testing.T) { + _, err := injectCDCJSON([]byte(`[1,2,3]`), "UPSERT", "") + require.Error(t, err) + }) + + t.Run("rejects malformed JSON", func(t *testing.T) { + _, err := injectCDCJSON([]byte(`{not json`), "UPSERT", "") + require.Error(t, err) + }) + + t.Run("rejects JSON null", func(t *testing.T) { + _, err := injectCDCJSON([]byte(`null`), "UPSERT", "") + require.Error(t, err) + }) +} diff --git a/internal/impl/gcp/enterprise/bigquery/integration_test.go b/internal/impl/gcp/enterprise/bigquery/integration_test.go index 5277bbad76..25a431ac93 100644 --- a/internal/impl/gcp/enterprise/bigquery/integration_test.go +++ b/internal/impl/gcp/enterprise/bigquery/integration_test.go @@ -250,6 +250,182 @@ func TestIntegrationSchemaEvolution(t *testing.T) { assert.True(t, evolved, "missing==0 must signal retry, not a permanent failure, so concurrent batches are not dropped to DLQ") } +func TestIntegrationAutoCreateTable(t *testing.T) { + integration.CheckSkip(t) + + const ( + projectID = "test-project" + datasetID = "test_dataset" + tableID = "auto_created" + ) + + emu := startEmulator(t, projectID, datasetID) + + t.Log("Given the table does not exist and auto_create_table is enabled") + + sb := service.NewStreamBuilder() + require.NoError(t, sb.SetLoggerYAML(`level: DEBUG`)) + + sendFn, err := sb.AddProducerFunc() + require.NoError(t, err) + + // Schema kept simple (STRING columns + ingestion-time partitioning) so the + // test exercises auto-create + partitioning + clustering without tripping + // over the protojson↔INT64 representation rules that TIMESTAMP/INTEGER + // columns require. + require.NoError(t, sb.AddOutputYAML(fmt.Sprintf(` +gcp_bigquery_write_api: + project: %s + dataset: %s + table: %s + auto_create_table: true + schema: + - { name: id, type: STRING, mode: REQUIRED } + - { name: payload, type: STRING } + - { name: tenant_id, type: STRING } + time_partitioning: + type: DAY + clustering: [tenant_id] + endpoint: + http: %s + grpc: %s +`, projectID, datasetID, tableID, emu.httpEndpoint, emu.grpcEndpoint))) + + stream, err := sb.Build() + require.NoError(t, err) + license.InjectTestService(stream.Resources()) + + go func() { + if err := stream.Run(t.Context()); err != nil && !errors.Is(err, context.Canceled) { + t.Errorf("stream error: %v", err) + } + }() + t.Cleanup(func() { + if err := stream.StopWithin(10 * time.Second); err != nil { + t.Log(err) + } + }) + + require.NoError(t, sendFn(t.Context(), service.NewMessage([]byte( + `{"id":"a","payload":"hello","tenant_id":"t1"}`)))) + + t.Log("Then the table is auto-created with the configured schema") + require.Eventually(t, func() bool { + _, err := emu.bqClient.Dataset(datasetID).Table(tableID).Metadata(t.Context()) + return err == nil + }, 10*time.Second, 250*time.Millisecond) + + meta, err := emu.bqClient.Dataset(datasetID).Table(tableID).Metadata(t.Context()) + require.NoError(t, err) + require.Len(t, meta.Schema, 3) + var names []string + for _, f := range meta.Schema { + names = append(names, f.Name) + } + assert.ElementsMatch(t, []string{"id", "payload", "tenant_id"}, names) + // Partitioning/clustering metadata may be a no-op on the emulator — guard + // so this test still proves auto-create works without coupling to emulator + // completeness. + if meta.TimePartitioning != nil { + assert.Equal(t, bigquery.DayPartitioningType, meta.TimePartitioning.Type) + // Ingestion-time partitioning leaves Field empty (uses _PARTITIONTIME). + assert.Empty(t, meta.TimePartitioning.Field) + } + if meta.Clustering != nil { + assert.Equal(t, []string{"tenant_id"}, meta.Clustering.Fields) + } + + t.Log("And the row lands in the table") + require.Eventually(t, func() bool { + it := emu.bqClient.Dataset(datasetID).Table(tableID).Read(t.Context()) + var count int + for { + var row map[string]bigquery.Value + if err := it.Next(&row); errors.Is(err, iterator.Done) { + break + } else if err != nil { + return false + } + count++ + } + return count >= 1 + }, 30*time.Second, 500*time.Millisecond) +} + +func TestIntegrationPendingStreamMode(t *testing.T) { + integration.CheckSkip(t) + // The goccy/bigquery-emulator (used by these integration tests) does not + // implement the Pending write-stream type or BatchCommitWriteStreams; the + // test hangs waiting for the commit RPC. Skip until the emulator gains + // support or these tests can target real BigQuery in a nightly job. + t.Skip("goccy bigquery-emulator does not implement Pending streams / BatchCommitWriteStreams") + + const ( + projectID = "test-project" + datasetID = "test_dataset" + tableID = "pending_test" + ) + + emu := startEmulator(t, projectID, datasetID) + + t.Log("Given a table exists for pending-stream writes") + require.NoError(t, emu.bqClient.Dataset(datasetID).Table(tableID).Create(t.Context(), &bigquery.TableMetadata{ + Schema: bigquery.Schema{ + {Name: "id", Type: bigquery.StringFieldType, Required: true}, + }, + })) + + sb := service.NewStreamBuilder() + require.NoError(t, sb.SetLoggerYAML(`level: DEBUG`)) + + sendFn, err := sb.AddProducerFunc() + require.NoError(t, err) + + require.NoError(t, sb.AddOutputYAML(fmt.Sprintf(` +gcp_bigquery_write_api: + project: %s + dataset: %s + table: %s + write_mode: pending_stream + batching: + count: 3 + endpoint: + http: %s + grpc: %s +`, projectID, datasetID, tableID, emu.httpEndpoint, emu.grpcEndpoint))) + + stream, err := sb.Build() + require.NoError(t, err) + license.InjectTestService(stream.Resources()) + + go func() { + if err := stream.Run(t.Context()); err != nil && !errors.Is(err, context.Canceled) { + t.Errorf("stream error: %v", err) + } + }() + t.Cleanup(func() { _ = stream.StopWithin(10 * time.Second) }) + + for _, id := range []string{"a", "b", "c"} { + require.NoError(t, sendFn(t.Context(), service.NewMessage([]byte(`{"id":"`+id+`"}`)))) + } + + t.Log("Then all 3 rows are committed atomically and visible") + require.Eventually(t, func() bool { + it := emu.bqClient.Dataset(datasetID).Table(tableID).Read(t.Context()) + var count int + for { + var row map[string]bigquery.Value + if err := it.Next(&row); errors.Is(err, iterator.Done) { + break + } else if err != nil { + return false + } + count++ + } + return count == 3 + }, 30*time.Second, 500*time.Millisecond) +} + func TestIntegrationTableNameSanitization(t *testing.T) { integration.CheckSkip(t) @@ -322,3 +498,24 @@ gcp_bigquery_write_api: return count >= 1 }, 30*time.Second, 500*time.Millisecond) } + +// CDC integration tests require a real BigQuery instance. The goccy emulator +// does not implement the _CHANGE_TYPE / _CHANGE_SEQUENCE_NUMBER pseudo-columns, +// PRIMARY KEY constraints, or the UPSERT/DELETE semantics that CDC mode +// depends on. These tests are gated with t.Skip so the package compiles +// against the real BQ API and so a future real-BQ test job can flip the gate. + +func TestIntegrationCDCUpsert(t *testing.T) { + integration.CheckSkip(t) + t.Skip("BigQuery CDC requires a real BigQuery instance; goccy emulator does not support _CHANGE_TYPE") +} + +func TestIntegrationCDCUpsertDelete(t *testing.T) { + integration.CheckSkip(t) + t.Skip("BigQuery CDC requires a real BigQuery instance; goccy emulator does not support _CHANGE_TYPE") +} + +func TestIntegrationCDCOutOfOrder(t *testing.T) { + integration.CheckSkip(t) + t.Skip("BigQuery CDC requires a real BigQuery instance; goccy emulator does not support _CHANGE_SEQUENCE_NUMBER") +} diff --git a/internal/impl/gcp/enterprise/bigquery/output.go b/internal/impl/gcp/enterprise/bigquery/output.go index 8e37f3c678..274e64de41 100644 --- a/internal/impl/gcp/enterprise/bigquery/output.go +++ b/internal/impl/gcp/enterprise/bigquery/output.go @@ -13,6 +13,7 @@ import ( "errors" "fmt" "net/http" + "strings" "sync" "sync/atomic" "time" @@ -51,6 +52,22 @@ const ( bqwaFieldStreamIdleTimeout = "stream_idle_timeout" bqwaFieldStreamSweepInterval = "stream_sweep_interval" bqwaFieldMaxCachedStreams = "max_cached_streams" + bqwaFieldWriteMode = "write_mode" + bqwaFieldChangeType = "change_type" + bqwaFieldChangeSequenceNumber = "change_sequence_number" + bqwaFieldPrimaryKeys = "primary_keys" + bqwaFieldAutoCreateTable = "auto_create_table" + bqwaFieldSchema = "schema" + bqwaSchemaFieldName = "name" + bqwaSchemaFieldType = "type" + bqwaSchemaFieldMode = "mode" + bqwaSchemaFieldFields = "fields" + bqwaFieldTimePartitioning = "time_partitioning" + bqwaFieldClustering = "clustering" + bqwatpFieldType = "type" + bqwatpFieldField = "field" + bqwatpFieldExpiration = "expiration" + bqwatpFieldRequireFilter = "require_filter" bqwaFieldSchemaResolveTimeout = "schema_resolve_timeout" bqwaFieldSchemaEvolutionTimeout = "schema_evolution_timeout" bqwaFieldEndpoint = "endpoint" @@ -94,6 +111,25 @@ All messages in the same batch are written to that table. The interpolated table name is sanitized for BigQuery: dots, hyphens, slashes and whitespace are replaced with underscores, non-ASCII-alphanumeric characters are stripped, leading digits are prefixed with `+"`_`"+`, and the result is truncated to 1024 characters. A name that sanitizes to the empty string is rejected as a permanent error. + +== Write modes + +The `+"`write_mode`"+` field selects between two write paths: + +- `+"`default_stream`"+` (default): the multiplexed default stream. Lowest latency, at-least-once semantics. +- `+"`pending_stream`"+`: a fresh pending stream is allocated per batch; rows are written with sequential offsets, the stream is finalized, then atomically committed. Provides exactly-once semantics within a single committed batch. + +== Auto-create + +When `+"`auto_create_table`"+` is true, the output creates missing tables on the fly using the configured `+"`schema`"+`, `+"`time_partitioning`"+`, and `+"`clustering`"+`. `+"`AlreadyExists`"+` errors from concurrent creators are treated as success. When the table name is interpolated, every auto-created table receives the same configuration. + +== Exactly-once caveat + +The exactly-once guarantee of `+"`pending_stream`"+` is "exactly-once within a stream". If a BatchCommitWriteStreams RPC succeeds but its response is lost to a network failure, benthos retries the batch through a new pending stream and the data lands twice. This is a fundamental limitation of the BigQuery Storage Write API exactly-once contract and applies to every implementation. + +== CDC migration + +When migrating from the load-jobs based `+"`gcp_bigquery`"+` output to CDC mode, see the xref:outputs/bigquery_cdc_migration.adoc[CDC migration guide]. `). Fields( service.NewStringField(bqwaFieldProject). @@ -111,6 +147,82 @@ A name that sanitizes to the empty string is rejected as a permanent error. " Use 'json' to have the component convert JSON to proto automatically."+ " Use 'protobuf' to supply raw proto-encoded bytes."). Default("json"), + service.NewStringEnumField(bqwaFieldWriteMode, "default_stream", "pending_stream", "upsert", "upsert_delete"). + Description("How the output writes to BigQuery."+ + " `default_stream` uses the multiplexed default stream (at-least-once, lowest latency)."+ + " `pending_stream` allocates a per-batch pending stream that commits atomically,"+ + " providing exactly-once semantics within a single committed batch."+ + " `upsert` writes UPSERT-only rows to a BigQuery CDC-enabled table; the target table must have a PRIMARY KEY."+ + " `upsert_delete` allows both UPSERT and DELETE rows. Both CDC modes use the default stream as required by BigQuery."). + Default("default_stream"). + Advanced(), + service.NewInterpolatedStringField(bqwaFieldChangeType). + Description("Bloblang expression resolving to the `_CHANGE_TYPE` pseudo-column value for each row."+ + " Must resolve to `UPSERT` or `DELETE` (case-insensitive)."+ + " Required when `write_mode` is `upsert` or `upsert_delete`."+ + " Example: `${! metadata(\"operation\") }`."). + Optional(), + service.NewInterpolatedStringField(bqwaFieldChangeSequenceNumber). + Description("Optional Bloblang expression resolving to the `_CHANGE_SEQUENCE_NUMBER` pseudo-column value."+ + " Format: 1 to 4 sections of 1 to 16 hexadecimal characters each, separated by `/`."+ + " Example: `${! metadata(\"scn\") }` or `${! \"0/0/0/0\" }`."+ + " When unset, BigQuery resolves ordering by arrival time."). + Optional(), + service.NewStringListField(bqwaFieldPrimaryKeys). + Description("Optional list of primary-key column names."+ + " Required when `auto_create_table` is true and `write_mode` is `upsert` or `upsert_delete`."+ + " When the target table is pre-existing, the connector falls back to the PRIMARY KEY declared on the table."+ + " Up to 16 columns; composite keys are supported in the same order they are listed."). + Optional(), + service.NewBoolField(bqwaFieldAutoCreateTable). + Description("If true and the target table does not exist, the output creates it using the configured `schema`, `time_partitioning`, and `clustering`."+ + " AlreadyExists errors from concurrent creators are treated as success."+ + " When the table name is interpolated, every auto-created table receives the same schema and partition/clustering settings."). + Default(false). + Advanced(), + service.NewObjectListField(bqwaFieldSchema, + service.NewStringField(bqwaSchemaFieldName). + Description("Column name."), + service.NewStringField(bqwaSchemaFieldType). + Description("BigQuery column type (STRING, BYTES, INTEGER/INT64, FLOAT/FLOAT64, NUMERIC, BIGNUMERIC, BOOLEAN/BOOL, TIMESTAMP, DATE, TIME, DATETIME, GEOGRAPHY, JSON, RECORD)."), + service.NewStringField(bqwaSchemaFieldMode). + Description("Column mode: NULLABLE (default), REQUIRED, or REPEATED."). + Default("NULLABLE"), + service.NewAnyListField(bqwaSchemaFieldFields). + Description("For RECORD columns, the list of nested fields. Same shape as the top-level schema list."). + Optional(), + ). + Description("Column definitions used by `auto_create_table`. Required when `auto_create_table` is true."). + Default([]any{}). + Advanced(), + service.NewObjectField(bqwaFieldTimePartitioning, + // `type` has no default — absence is the sentinel for "block + // not configured". Benthos still fills in defaults for child + // fields of an Optional ObjectField, so the parent Contains() + // check is unreliable here. + service.NewStringEnumField(bqwatpFieldType, "DAY", "HOUR", "MONTH", "YEAR"). + Description("Partitioning granularity."). + Optional(), + service.NewStringField(bqwatpFieldField). + Description("Column to partition on. Must be of type DATE, TIMESTAMP, or DATETIME."+ + " If empty, the table uses ingestion-time partitioning (`_PARTITIONTIME`)."). + Default(""), + service.NewDurationField(bqwatpFieldExpiration). + Description("Optional partition expiration. Zero means no expiration."). + Default("0s"), + service.NewBoolField(bqwatpFieldRequireFilter). + Description("If true, queries against the table must filter on the partition column."). + Default(false), + ). + Description("Optional time-partitioning settings applied during `auto_create_table`."+ + " Setting `type` is the trigger — when omitted, the block is treated as absent."). + Advanced(). + Optional(), + service.NewStringListField(bqwaFieldClustering). + Description("Optional clustering columns (up to 4) applied during `auto_create_table`."+ + " All names must appear in `schema`."). + Default([]any{}). + Advanced(), service.NewOutputMaxInFlightField().Default(4), service.NewBatchPolicyField(bqwaFieldBatching), service.NewStringField(bqwaFieldCredentialsJSON). @@ -172,10 +284,37 @@ A name that sanitizes to the empty string is rejected as a permanent error. ) } +// bqSchemaField mirrors the YAML representation of a single column in the +// schema config used by auto_create_table. It is converted to a +// bigquery.FieldSchema in table_creator.go. +type bqSchemaField struct { + Name string + Type string // canonical BigQuery type (aliases normalised) + Mode string // NULLABLE / REQUIRED / REPEATED + Fields []bqSchemaField +} + +// bqTimePartitioningConfig mirrors the YAML time_partitioning block. Type="" +// means the user did not configure partitioning at all (block absent). +type bqTimePartitioningConfig struct { + Type string + Field string + Expiration time.Duration + RequireFilter bool +} + type bigQueryWriteAPIConfig struct { ProjectID string DatasetID string MessageFormat string + WriteMode string + ChangeType *service.InterpolatedString + ChangeSequenceNumber *service.InterpolatedString + PrimaryKeys []string + AutoCreateTable bool + Schema []bqSchemaField + TimePartitioning bqTimePartitioningConfig + Clustering []string CredentialsJSON string TargetPrincipal string Delegates []string @@ -188,6 +327,119 @@ type bigQueryWriteAPIConfig struct { EndpointGRPC string } +// validBQFieldTypes is the set of column types accepted by auto-create. Aliases +// (INT64↔INTEGER, FLOAT64↔FLOAT, BOOL↔BOOLEAN) are normalised to BigQuery's +// canonical names so the parsed config is stable. +var validBQFieldTypes = map[string]string{ + "STRING": "STRING", + "BYTES": "BYTES", + "INTEGER": "INTEGER", + "INT64": "INTEGER", + "FLOAT": "FLOAT", + "FLOAT64": "FLOAT", + "NUMERIC": "NUMERIC", + "BIGNUMERIC": "BIGNUMERIC", + "BOOLEAN": "BOOLEAN", + "BOOL": "BOOLEAN", + "TIMESTAMP": "TIMESTAMP", + "DATE": "DATE", + "TIME": "TIME", + "DATETIME": "DATETIME", + "GEOGRAPHY": "GEOGRAPHY", + "JSON": "JSON", + "RECORD": "RECORD", +} + +var validBQFieldModes = map[string]struct{}{ + "NULLABLE": {}, + "REQUIRED": {}, + "REPEATED": {}, +} + +// validateSchemaReferences checks that time_partitioning.field and clustering +// columns reference columns declared in the schema, and that the partition +// field has a partition-eligible type. The checks only fire when columns are +// actually configured (clustering empty / partition field empty is a no-op). +func validateSchemaReferences(conf bigQueryWriteAPIConfig) error { + if conf.TimePartitioning.Field == "" && len(conf.Clustering) == 0 { + return nil + } + cols := make(map[string]string, len(conf.Schema)) + for _, f := range conf.Schema { + cols[f.Name] = f.Type + } + if conf.TimePartitioning.Field != "" { + t, ok := cols[conf.TimePartitioning.Field] + if !ok { + return fmt.Errorf("%s.%s %q is not in %s", + bqwaFieldTimePartitioning, bqwatpFieldField, conf.TimePartitioning.Field, bqwaFieldSchema) + } + switch t { + case "DATE", "TIMESTAMP", "DATETIME": + default: + return fmt.Errorf("%s.%s %q has type %s; must be DATE/TIMESTAMP/DATETIME", + bqwaFieldTimePartitioning, bqwatpFieldField, conf.TimePartitioning.Field, t) + } + } + for _, c := range conf.Clustering { + if _, ok := cols[c]; !ok { + return fmt.Errorf("%s column %q is not in %s", bqwaFieldClustering, c, bqwaFieldSchema) + } + } + return nil +} + +// parseSchemaFields walks a list of ParsedConfig children (top-level schema list +// or a RECORD's nested fields) into validated bqSchemaField values. +func parseSchemaFields(confs []*service.ParsedConfig) ([]bqSchemaField, error) { + out := make([]bqSchemaField, 0, len(confs)) + for i, c := range confs { + name, err := c.FieldString(bqwaSchemaFieldName) + if err != nil { + return nil, fmt.Errorf("schema[%d]: %w", i, err) + } + if name == "" { + return nil, fmt.Errorf("schema[%d]: %s is required", i, bqwaSchemaFieldName) + } + rawType, err := c.FieldString(bqwaSchemaFieldType) + if err != nil { + return nil, fmt.Errorf("schema[%d] %q: %w", i, name, err) + } + canonical, ok := validBQFieldTypes[strings.ToUpper(rawType)] + if !ok { + return nil, fmt.Errorf("schema[%d] %q: unsupported type %q", i, name, rawType) + } + // FieldString returns an error for missing keys when the child config + // came from a NewAnyListField (no defaults applied for nested levels). + mode := "NULLABLE" + if c.Contains(bqwaSchemaFieldMode) { + rawMode, err := c.FieldString(bqwaSchemaFieldMode) + if err != nil { + return nil, fmt.Errorf("schema[%d] %q: %w", i, name, err) + } + if rawMode != "" { + mode = strings.ToUpper(rawMode) + } + } + if _, ok := validBQFieldModes[mode]; !ok { + return nil, fmt.Errorf("schema[%d] %q: invalid mode %q", i, name, mode) + } + fld := bqSchemaField{Name: name, Type: canonical, Mode: mode} + if canonical == "RECORD" { + childConfs, _ := c.FieldAnyList(bqwaSchemaFieldFields) + if len(childConfs) == 0 { + return nil, fmt.Errorf("schema[%d] %q: RECORD requires at least one nested field", i, name) + } + fld.Fields, err = parseSchemaFields(childConfs) + if err != nil { + return nil, fmt.Errorf("schema[%d] %q: %w", i, name, err) + } + } + out = append(out, fld) + } + return out, nil +} + func bigQueryWriteAPIConfigFromParsed(pConf *service.ParsedConfig) (conf bigQueryWriteAPIConfig, err error) { if conf.ProjectID, err = pConf.FieldString(bqwaFieldProject); err != nil { return @@ -201,6 +453,111 @@ func bigQueryWriteAPIConfigFromParsed(pConf *service.ParsedConfig) (conf bigQuer if conf.MessageFormat, err = pConf.FieldString(bqwaFieldMessageFormat); err != nil { return } + if conf.WriteMode, err = pConf.FieldString(bqwaFieldWriteMode); err != nil { + return + } + if pConf.Contains(bqwaFieldChangeType) { + if conf.ChangeType, err = pConf.FieldInterpolatedString(bqwaFieldChangeType); err != nil { + return + } + } + if pConf.Contains(bqwaFieldChangeSequenceNumber) { + if conf.ChangeSequenceNumber, err = pConf.FieldInterpolatedString(bqwaFieldChangeSequenceNumber); err != nil { + return + } + } + if conf.PrimaryKeys, err = pConf.FieldStringList(bqwaFieldPrimaryKeys); err != nil { + return + } + isCDC := conf.WriteMode == "upsert" || conf.WriteMode == "upsert_delete" + if isCDC && conf.ChangeType == nil { + err = fmt.Errorf("%s is required when %s is %q", bqwaFieldChangeType, bqwaFieldWriteMode, conf.WriteMode) + return + } + if isCDC && conf.MessageFormat != "json" { + // CDC mode injects pseudo-columns via JSON byte rewriting. Raw protobuf + // payloads would require deserialising via dynamicpb, setting the + // fields, and re-serialising — extra cost the user can avoid by + // converting upstream. + err = fmt.Errorf("%s is not supported when %s is %q (use message_format: json)", + bqwaFieldMessageFormat, bqwaFieldWriteMode, conf.WriteMode) + return + } + if !isCDC && conf.ChangeType != nil { + err = fmt.Errorf("%s is only valid when %s is upsert or upsert_delete", bqwaFieldChangeType, bqwaFieldWriteMode) + return + } + if !isCDC && conf.ChangeSequenceNumber != nil { + err = fmt.Errorf("%s is only valid when %s is upsert or upsert_delete", bqwaFieldChangeSequenceNumber, bqwaFieldWriteMode) + return + } + if len(conf.PrimaryKeys) > 16 { + err = fmt.Errorf("%s accepts at most 16 columns, got %d", bqwaFieldPrimaryKeys, len(conf.PrimaryKeys)) + return + } + if conf.AutoCreateTable, err = pConf.FieldBool(bqwaFieldAutoCreateTable); err != nil { + return + } + schemaConfs, err := pConf.FieldObjectList(bqwaFieldSchema) + if err != nil { + return + } + if conf.Schema, err = parseSchemaFields(schemaConfs); err != nil { + return + } + if conf.AutoCreateTable && len(conf.Schema) == 0 { + err = fmt.Errorf("%s requires %s to be non-empty", bqwaFieldAutoCreateTable, bqwaFieldSchema) + return + } + if isCDC && conf.AutoCreateTable && len(conf.PrimaryKeys) == 0 { + err = fmt.Errorf("%s is required when %s is true and %s is %q", + bqwaFieldPrimaryKeys, bqwaFieldAutoCreateTable, bqwaFieldWriteMode, conf.WriteMode) + return + } + if len(conf.PrimaryKeys) > 0 && len(conf.Schema) > 0 { + cols := make(map[string]string, len(conf.Schema)) + for _, f := range conf.Schema { + cols[f.Name] = f.Mode + } + for _, pk := range conf.PrimaryKeys { + mode, ok := cols[pk] + if !ok { + err = fmt.Errorf("%s column %q is not in %s", bqwaFieldPrimaryKeys, pk, bqwaFieldSchema) + return + } + if mode != "REQUIRED" { + err = fmt.Errorf("%s column %q must have mode REQUIRED in %s (got %q)", + bqwaFieldPrimaryKeys, pk, bqwaFieldSchema, mode) + return + } + } + } + // time_partitioning.type has no default, so a missing type — either via an + // absent block or a block without `type:` — leaves Type empty and the + // other subfields are ignored. Type set is the explicit opt-in. + tp := pConf.Namespace(bqwaFieldTimePartitioning) + if typ, terr := tp.FieldString(bqwatpFieldType); terr == nil && typ != "" { + conf.TimePartitioning.Type = typ + if conf.TimePartitioning.Field, err = tp.FieldString(bqwatpFieldField); err != nil { + return + } + if conf.TimePartitioning.Expiration, err = tp.FieldDuration(bqwatpFieldExpiration); err != nil { + return + } + if conf.TimePartitioning.RequireFilter, err = tp.FieldBool(bqwatpFieldRequireFilter); err != nil { + return + } + } + if conf.Clustering, err = pConf.FieldStringList(bqwaFieldClustering); err != nil { + return + } + if len(conf.Clustering) > 4 { + err = fmt.Errorf("%s accepts at most 4 columns, got %d", bqwaFieldClustering, len(conf.Clustering)) + return + } + if err = validateSchemaReferences(conf); err != nil { + return + } if conf.CredentialsJSON, err = pConf.FieldString(bqwaFieldCredentialsJSON); err != nil { return } @@ -267,6 +624,13 @@ type bqwaMetrics struct { retries *service.MetricCounter schemaEvolutions *service.MetricCounter schemaEvolutionFailures *service.MetricCounter + // cachedStreams reflects the current size of the default-stream cache so + // operators can tell whether max_cached_streams is sized correctly. + cachedStreams *service.MetricGauge + // streamsEvicted counts every cache eviction (idle-sweep + LRU + on-error). + // A spike here without a corresponding writer-error spike usually indicates + // max_cached_streams is too low for the table fan-out. + streamsEvicted *service.MetricCounter } func newBQWAMetrics(m *service.Metrics) *bqwaMetrics { @@ -278,6 +642,8 @@ func newBQWAMetrics(m *service.Metrics) *bqwaMetrics { retries: m.NewCounter("bigquery_write_api_retries_total"), schemaEvolutions: m.NewCounter("bigquery_write_api_schema_evolutions_total"), schemaEvolutionFailures: m.NewCounter("bigquery_write_api_schema_evolutions_failures_total"), + cachedStreams: m.NewGauge("bigquery_write_api_cached_streams"), + streamsEvicted: m.NewCounter("bigquery_write_api_streams_evicted_total"), } } @@ -350,9 +716,10 @@ func classifyBQError(err error) bqError { } type streamWithDescriptor struct { - stream *managedwriter.ManagedStream - descriptor protoreflect.MessageDescriptor - lastUsed atomic.Int64 // UnixNano timestamp, safe for concurrent access + stream *managedwriter.ManagedStream + descriptor protoreflect.MessageDescriptor + descriptorProto *descriptorpb.DescriptorProto // needed by pendingStreamWriter (write_mode=pending_stream) + lastUsed atomic.Int64 // UnixNano timestamp, safe for concurrent access } type bigQueryWriteAPIOutput struct { @@ -367,6 +734,15 @@ type bigQueryWriteAPIOutput struct { client *bigquery.Client storageClient *managedwriter.Client resolvedProjectID string + // pending is non-nil while connected when write_mode=pending_stream. + // It wraps storageClient and runs the per-batch Create/Append/Finalize/Commit + // lifecycle. Nil-checked rather than gated on conf, so a pending-mode write + // against a disconnected output cleanly returns ErrNotConnected. + pending *pendingStreamWriter + // cdc is non-nil when write_mode is upsert or upsert_delete. Holds the + // Bloblang fields and the wrapped descriptor; injected per-row into + // writeBatchCDC. + cdc *cdcInjector // Lock ordering: connMu must always be acquired before streamsMu to // prevent deadlocks. Close() acquires connMu then streamsMu; @@ -398,14 +774,34 @@ func bigQueryWriteAPIOutputFromConfig(conf *service.ParsedConfig, mgr *service.R return nil, err } + creator, err := newTableCreator(cfg, mgr.Logger()) + if err != nil { + return nil, err + } + + var cdc *cdcInjector + if cfg.WriteMode == "upsert" || cfg.WriteMode == "upsert_delete" { + cdc = &cdcInjector{ + log: mgr.Logger(), + changeType: cfg.ChangeType, + changeSeq: cfg.ChangeSequenceNumber, + allowDelete: cfg.WriteMode == "upsert_delete", + } + } + return &bigQueryWriteAPIOutput{ conf: cfg, tableInterp: tableInterp, log: mgr.Logger(), metrics: newBQWAMetrics(mgr.Metrics()), streams: make(map[string]*streamWithDescriptor), - resolver: &schemaResolver{log: mgr.Logger(), resolveTimeout: cfg.SchemaResolveTimeout}, - evolver: &schemaEvolver{log: mgr.Logger(), evolveTimeout: cfg.SchemaEvolutionTimeout}, + resolver: &schemaResolver{ + log: mgr.Logger(), + resolveTimeout: cfg.SchemaResolveTimeout, + creator: creator, + }, + evolver: &schemaEvolver{log: mgr.Logger(), evolveTimeout: cfg.SchemaEvolutionTimeout}, + cdc: cdc, }, nil } @@ -449,6 +845,7 @@ func (o *bigQueryWriteAPIOutput) Connect(ctx context.Context) error { o.resolvedProjectID = resolvedProject o.client = bqClient o.storageClient = storageClient + o.pending = &pendingStreamWriter{storage: storageClient} o.stopSweep = make(chan struct{}) o.sweepWg.Add(1) go o.sweepIdleStreams(o.stopSweep) @@ -460,12 +857,25 @@ func (o *bigQueryWriteAPIOutput) WriteBatch(ctx context.Context, batch service.M return nil } - // Snapshot client + resolvedProjectID together under one RLock so a - // concurrent Connect-after-Close (which rewrites both) cannot tear them. + // Snapshot client, resolvedProjectID, and pending together under one + // RLock so a concurrent Connect-after-Close (which rewrites all three) + // cannot tear them apart. In pending-stream mode we also call Begin() + // while holding the lock so the inflight counter is incremented before + // Close can acquire its write lock; this closes the race where Close + // would observe inflight=0 between the WriteBatch lock release and the + // pending.Write entry. o.connMu.RLock() client := o.client projectID := o.resolvedProjectID + pending := o.pending + var pendingDone func() + if o.conf.WriteMode == "pending_stream" && pending != nil { + pendingDone = pending.Begin() + } o.connMu.RUnlock() + if pendingDone != nil { + defer pendingDone() + } if client == nil { return service.ErrNotConnected @@ -491,6 +901,13 @@ func (o *bigQueryWriteAPIOutput) WriteBatch(ctx context.Context, batch service.M return fmt.Errorf("getting stream for table %q: %w", tableID, err) } + // CDC write modes inject _CHANGE_TYPE (and optionally _CHANGE_SEQUENCE_NUMBER) + // per row. Bad rows go to DLQ via BatchError.Failed(i, err) so good rows in + // the same batch still get written. + if o.cdc != nil { + return o.writeBatchCDC(ctx, client, swd, cacheKey, batch, tableID, start) + } + rows := make([][]byte, 0, len(batch)) for i, msg := range batch { msgBytes, err := msg.AsBytes() @@ -515,6 +932,24 @@ func (o *bigQueryWriteAPIOutput) WriteBatch(ctx context.Context, batch service.M rows = append(rows, protoBytes) } + // Pending-stream mode bypasses the cached default-stream and runs a fresh + // Create/Append/Finalize/Commit lifecycle per batch. Schema-mismatch and + // permanent errors flow through handleWriteError just like the default + // path so evolution + DLQ semantics stay consistent. + if o.conf.WriteMode == "pending_stream" { + if pending == nil { + return service.ErrNotConnected + } + parent := o.tableCacheKey(projectID, tableID) + if err := pending.Write(ctx, parent, swd.descriptorProto, rows); err != nil { + return o.handleWriteError(ctx, client, err, batch, tableID, swd.descriptor, "pending stream write") + } + o.metrics.batchLatency.Timing(time.Since(start).Nanoseconds()) + o.metrics.batchesSent.Incr(1) + o.metrics.rowsSent.Incr(int64(len(batch))) + return nil + } + result, err := swd.stream.AppendRows(ctx, rows) if err != nil { o.evictStream(cacheKey) @@ -564,6 +999,135 @@ func permanentBatchError(batch service.MessageBatch, err error) error { return batchErr } +// appendRowFailure lazily constructs and appends to a BatchError so the caller +// can collect per-row CDC validation failures without committing to a sentinel +// error when there are no failures. +func appendRowFailure(be *service.BatchError, batch service.MessageBatch, idx int, err error) *service.BatchError { + if be == nil { + be = service.NewBatchError(batch, errors.New("CDC row validation errors")) + } + return be.Failed(idx, err) +} + +// writeBatchCDC executes the CDC write path: per row, resolve the Bloblang +// fields, validate the values, inject _CHANGE_TYPE (and optionally +// _CHANGE_SEQUENCE_NUMBER) into the JSON payload, marshal to proto via the +// wrapped descriptor, and AppendRows the result. Per-row validation failures +// are collected into a BatchError so good rows in the same batch are still +// written. +func (o *bigQueryWriteAPIOutput) writeBatchCDC( + ctx context.Context, + client *bigquery.Client, + swd *streamWithDescriptor, + cacheKey string, + batch service.MessageBatch, + tableID string, + start time.Time, +) error { + if o.conf.MessageFormat != "json" { + // Defensive — the parser already rejects message_format != json in CDC + // modes. If this fires it means the parser missed a case. + return permanentBatchError(batch, fmt.Errorf("CDC modes require message_format: json (got %q)", o.conf.MessageFormat)) + } + + var batchErr *service.BatchError + rows := make([][]byte, 0, len(batch)) + rowIndex := make([]int, 0, len(batch)) + validationFailures := 0 + + for i, msg := range batch { + ct, err := batch.TryInterpolatedString(i, o.cdc.changeType) + if err != nil { + batchErr = appendRowFailure(batchErr, batch, i, fmt.Errorf("interpolating change_type: %w", err)) + validationFailures++ + continue + } + var seq string + if o.cdc.changeSeq != nil { + seq, err = batch.TryInterpolatedString(i, o.cdc.changeSeq) + if err != nil { + batchErr = appendRowFailure(batchErr, batch, i, fmt.Errorf("interpolating change_sequence_number: %w", err)) + validationFailures++ + continue + } + } + + validatedCT, validatedSeq, err := o.cdc.validateAndResolveCDC(ct, seq) + if err != nil { + batchErr = appendRowFailure(batchErr, batch, i, err) + validationFailures++ + continue + } + + msgBytes, err := msg.AsBytes() + if err != nil { + batchErr = appendRowFailure(batchErr, batch, i, fmt.Errorf("reading message: %w", err)) + validationFailures++ + continue + } + + injectedBytes, err := injectCDCJSON(msgBytes, validatedCT, validatedSeq) + if err != nil { + batchErr = appendRowFailure(batchErr, batch, i, err) + validationFailures++ + continue + } + protoBytes, err := jsonToProtoBytes(injectedBytes, swd.descriptor) + if err != nil { + batchErr = appendRowFailure(batchErr, batch, i, fmt.Errorf("converting JSON to proto: %w", err)) + validationFailures++ + continue + } + rows = append(rows, protoBytes) + rowIndex = append(rowIndex, i) + } + + if validationFailures > 0 { + o.metrics.rowsFailed.Incr(int64(validationFailures)) + } + + if len(rows) == 0 { + if batchErr != nil { + return batchErr + } + return nil + } + + result, err := swd.stream.AppendRows(ctx, rows) + if err != nil { + o.evictStream(cacheKey) + return o.handleWriteError(ctx, client, err, batch, tableID, swd.descriptor, "appending CDC rows") + } + resp, err := result.FullResponse(ctx) + if err != nil { + o.evictStream(cacheKey) + return o.handleWriteError(ctx, client, err, batch, tableID, swd.descriptor, "waiting for CDC append result") + } + + o.metrics.batchLatency.Timing(time.Since(start).Nanoseconds()) + if resp.GetUpdatedSchema() != nil { + o.log.Infof("BigQuery reported schema update for table %q, evicting cached stream", tableID) + o.evictStream(cacheKey) + o.resolver.Evict(tableID) + } + + if rowErrs := resp.GetRowErrors(); len(rowErrs) > 0 { + for _, re := range rowErrs { + idx := int(re.GetIndex()) + if idx >= 0 && idx < len(rowIndex) { + batchErr = appendRowFailure(batchErr, batch, rowIndex[idx], fmt.Errorf("row %d: code %d: %s", idx, re.GetCode(), re.GetMessage())) + } + } + o.metrics.rowsFailed.Incr(int64(len(rowErrs))) + } + o.metrics.batchesSent.Incr(1) + o.metrics.rowsSent.Incr(int64(len(rows) - len(resp.GetRowErrors()))) + if batchErr != nil { + return batchErr + } + return nil +} + // handleWriteError classifies a gRPC error from an append or response and // returns the appropriate error for benthos: a BatchError (permanent, no retry) // or a plain error (transient, retry). @@ -612,6 +1176,14 @@ func (o *bigQueryWriteAPIOutput) Close(ctx context.Context) error { // sweepIdleStreams) so they don't race with client shutdown. o.closeWg.Wait() + // Wait for in-flight pending-stream Writes so they finish their + // Finalize/BatchCommit calls before the underlying storage client is torn + // down. Without this an in-flight pending Write would observe a closed + // gRPC connection mid-commit and surface a permanent batch error. + if o.pending != nil { + o.pending.Wait() + } + o.streamsMu.Lock() streams := o.streams o.streams = make(map[string]*streamWithDescriptor) @@ -634,6 +1206,7 @@ func (o *bigQueryWriteAPIOutput) Close(ctx context.Context) error { } o.storageClient = nil } + o.pending = nil if o.client != nil { if err := o.client.Close(); err != nil { @@ -720,7 +1293,13 @@ func (o *bigQueryWriteAPIOutput) closeStreamAsync(s *managedwriter.ManagedStream } func (o *bigQueryWriteAPIOutput) getOrCreateStream(ctx context.Context, client *bigquery.Client, projectID, tableID string) (*streamWithDescriptor, string, error) { - cacheKey := o.tableCacheKey(projectID, tableID) + // parentPath is the BigQuery resource path used as the AppendRows + // destination. cacheKey adds a write-mode suffix so a CDC pipeline and a + // non-CDC pipeline pointed at the same table cannot share a stream — the + // underlying ManagedStream is bound to its schema descriptor, and the CDC + // path wraps the descriptor with pseudo-columns. + parentPath := o.tableCacheKey(projectID, tableID) + cacheKey := parentPath + "#mode=" + o.conf.WriteMode now := time.Now() @@ -734,7 +1313,7 @@ func (o *bigQueryWriteAPIOutput) getOrCreateStream(ctx context.Context, client * o.streamsMu.RUnlock() // Slow path: create stream without holding the lock (network I/O). - swd, err := o.createStream(ctx, client, cacheKey, tableID) + swd, err := o.createStream(ctx, client, parentPath, tableID) if err != nil { return nil, cacheKey, err } @@ -774,8 +1353,11 @@ func (o *bigQueryWriteAPIOutput) getOrCreateStream(ctx context.Context, client * delete(o.streams, lruKey) } } + size := int64(len(o.streams)) o.streamsMu.Unlock() + o.metrics.cachedStreams.Set(size) if evicted != nil { + o.metrics.streamsEvicted.Incr(1) o.closeStreamAsync(evicted.stream) } return swd, cacheKey, nil @@ -789,9 +1371,12 @@ func (o *bigQueryWriteAPIOutput) evictStream(cacheKey string) { o.streamsMu.Lock() swd, exists := o.streams[cacheKey] delete(o.streams, cacheKey) + size := int64(len(o.streams)) o.streamsMu.Unlock() + o.metrics.cachedStreams.Set(size) if exists { + o.metrics.streamsEvicted.Incr(1) o.closeStreamAsync(swd.stream) } } @@ -827,8 +1412,13 @@ func (o *bigQueryWriteAPIOutput) sweepIdleStreams(stop <-chan struct{}) { delete(o.streams, key) } } + size := int64(len(o.streams)) o.streamsMu.Unlock() + if len(toClose) > 0 { + o.metrics.cachedStreams.Set(size) + o.metrics.streamsEvicted.Incr(int64(len(toClose))) + } for _, e := range toClose { o.log.Debugf("Closing idle BigQuery stream for %s", e.key) o.closeStreamAsync(e.swd.stream) @@ -836,7 +1426,7 @@ func (o *bigQueryWriteAPIOutput) sweepIdleStreams(stop <-chan struct{}) { } } -func (o *bigQueryWriteAPIOutput) createStream(ctx context.Context, client *bigquery.Client, cacheKey, tableID string) (*streamWithDescriptor, error) { +func (o *bigQueryWriteAPIOutput) createStream(ctx context.Context, client *bigquery.Client, parentPath, tableID string) (*streamWithDescriptor, error) { o.connMu.RLock() storageClient := o.storageClient o.connMu.RUnlock() @@ -850,23 +1440,50 @@ func (o *bigQueryWriteAPIOutput) createStream(ctx context.Context, client *bigqu return nil, err } + // In CDC mode, append _CHANGE_TYPE (and optionally _CHANGE_SEQUENCE_NUMBER) + // to the user's descriptor and use the wrapped version for the managed + // stream. The resolver's cached descriptor stays unchanged. Field numbers + // are stamped onto the per-stream streamWithDescriptor so concurrent + // createStream calls cannot race, and so heterogeneous user schemas across + // tables in the same pipeline each get the correct field numbers. + descriptorProto := rs.descriptorProto + descriptor := rs.messageDescriptor + if o.cdc != nil { + // CDC requires at least one PK source. If primary_keys is configured + // AND the table already declares PKs, both must agree. + if err := validateCDCPrimaryKeys(o.conf.PrimaryKeys, rs.primaryKeys, tableID); err != nil { + return nil, err + } + wrapped, _, err := wrapDescriptorForCDC(rs.descriptorProto, o.cdc.changeSeq != nil) + if err != nil { + return nil, fmt.Errorf("wrapping descriptor for CDC: %w", err) + } + wrappedMD, err := descriptorProtoToMessageDescriptor(wrapped) + if err != nil { + return nil, fmt.Errorf("building wrapped message descriptor: %w", err) + } + descriptorProto = wrapped + descriptor = wrappedMD + } + // Detach from the per-batch ctx: the cached stream outlives this WriteBatch // and is reused by every subsequent batch routing to the same table. If the // stream were bound to this ctx, cancellation of the first batch (per-message // deadline, source shutdown, ack timeout) would block all later AppendRows // against the cached stream until the idle sweeper evicted it. ms, err := storageClient.NewManagedStream(context.WithoutCancel(ctx), - managedwriter.WithDestinationTable(cacheKey), + managedwriter.WithDestinationTable(parentPath), managedwriter.WithType(managedwriter.DefaultStream), - managedwriter.WithSchemaDescriptor(rs.descriptorProto), + managedwriter.WithSchemaDescriptor(descriptorProto), ) if err != nil { - return nil, fmt.Errorf("creating managed stream for %q: %w", cacheKey, err) + return nil, fmt.Errorf("creating managed stream for %q: %w", parentPath, err) } return &streamWithDescriptor{ - stream: ms, - descriptor: rs.messageDescriptor, + stream: ms, + descriptor: descriptor, + descriptorProto: descriptorProto, }, nil } diff --git a/internal/impl/gcp/enterprise/bigquery/output_test.go b/internal/impl/gcp/enterprise/bigquery/output_test.go index 62781ea373..e4def0f37b 100644 --- a/internal/impl/gcp/enterprise/bigquery/output_test.go +++ b/internal/impl/gcp/enterprise/bigquery/output_test.go @@ -11,6 +11,8 @@ package bigquery import ( "errors" "fmt" + "strings" + "sync" "testing" "time" @@ -64,6 +66,8 @@ table: my_table assert.Equal(t, bigquery.DetectProjectID, cfg.ProjectID) assert.Equal(t, "my_dataset", cfg.DatasetID) assert.Equal(t, "json", cfg.MessageFormat) + assert.Equal(t, "default_stream", cfg.WriteMode) + assert.False(t, cfg.AutoCreateTable) assert.Empty(t, cfg.CredentialsJSON) assert.Empty(t, cfg.TargetPrincipal) assert.Empty(t, cfg.Delegates) @@ -83,6 +87,10 @@ project: my-project dataset: my_dataset table: my_table message_format: protobuf +write_mode: pending_stream +auto_create_table: true +schema: + - { name: id, type: STRING, mode: REQUIRED } credentials_json: '{"type":"service_account"}' target_principal: "sa@project.iam.gserviceaccount.com" delegates: @@ -105,6 +113,8 @@ endpoint: assert.Equal(t, "my-project", cfg.ProjectID) assert.Equal(t, "my_dataset", cfg.DatasetID) assert.Equal(t, "protobuf", cfg.MessageFormat) + assert.Equal(t, "pending_stream", cfg.WriteMode) + assert.True(t, cfg.AutoCreateTable) assert.Equal(t, `{"type":"service_account"}`, cfg.CredentialsJSON) assert.Equal(t, "sa@project.iam.gserviceaccount.com", cfg.TargetPrincipal) assert.Equal(t, []string{"delegate@project.iam.gserviceaccount.com"}, cfg.Delegates) @@ -116,6 +126,215 @@ endpoint: assert.Equal(t, "localhost:9060", cfg.EndpointGRPC) } +func TestBigQueryWriteAPIConfigParsingWriteModeUpsert(t *testing.T) { + spec := bigQueryWriteAPISpec() + for _, mode := range []string{"upsert", "upsert_delete"} { + t.Run(mode, func(t *testing.T) { + yaml := fmt.Sprintf(` +dataset: my_dataset +table: my_table +write_mode: %s +change_type: ${! metadata("operation") } +`, mode) + pConf, err := spec.ParseYAML(yaml, nil) + require.NoError(t, err) + cfg, err := bigQueryWriteAPIConfigFromParsed(pConf) + require.NoError(t, err) + assert.Equal(t, mode, cfg.WriteMode) + }) + } +} + +func TestBigQueryWriteAPIConfigChangeType(t *testing.T) { + spec := bigQueryWriteAPISpec() + + t.Run("change_type parsed for upsert mode", func(t *testing.T) { + pConf, err := spec.ParseYAML(` +dataset: my_dataset +table: my_table +write_mode: upsert +change_type: ${! metadata("operation") } +`, nil) + require.NoError(t, err) + cfg, err := bigQueryWriteAPIConfigFromParsed(pConf) + require.NoError(t, err) + require.NotNil(t, cfg.ChangeType) + }) + + t.Run("change_type required when upsert", func(t *testing.T) { + pConf, err := spec.ParseYAML(` +dataset: my_dataset +table: my_table +write_mode: upsert +`, nil) + require.NoError(t, err) + _, err = bigQueryWriteAPIConfigFromParsed(pConf) + require.Error(t, err) + assert.Contains(t, err.Error(), "change_type") + }) + + t.Run("change_type not required when default_stream", func(t *testing.T) { + pConf, err := spec.ParseYAML(` +dataset: my_dataset +table: my_table +`, nil) + require.NoError(t, err) + cfg, err := bigQueryWriteAPIConfigFromParsed(pConf) + require.NoError(t, err) + assert.Nil(t, cfg.ChangeType) + }) +} + +func TestBigQueryWriteAPIConfigChangeSequenceNumber(t *testing.T) { + spec := bigQueryWriteAPISpec() + pConf, err := spec.ParseYAML(` +dataset: my_dataset +table: my_table +write_mode: upsert +change_type: ${! metadata("operation") } +change_sequence_number: ${! metadata("scn") } +`, nil) + require.NoError(t, err) + cfg, err := bigQueryWriteAPIConfigFromParsed(pConf) + require.NoError(t, err) + require.NotNil(t, cfg.ChangeSequenceNumber) +} + +func TestBigQueryWriteAPIConfigPrimaryKeys(t *testing.T) { + spec := bigQueryWriteAPISpec() + + t.Run("primary_keys parsed", func(t *testing.T) { + pConf, err := spec.ParseYAML(` +dataset: my_dataset +table: my_table +write_mode: upsert +change_type: ${! metadata("operation") } +primary_keys: [id, tenant_id] +`, nil) + require.NoError(t, err) + cfg, err := bigQueryWriteAPIConfigFromParsed(pConf) + require.NoError(t, err) + assert.Equal(t, []string{"id", "tenant_id"}, cfg.PrimaryKeys) + }) + + t.Run("max 16 columns", func(t *testing.T) { + keys := make([]string, 17) + for i := range keys { + keys[i] = fmt.Sprintf("c%d", i) + } + pConf, err := spec.ParseYAML(fmt.Sprintf(` +dataset: my_dataset +table: my_table +write_mode: upsert +change_type: ${! metadata("operation") } +primary_keys: [%s] +`, strings.Join(keys, ", ")), nil) + require.NoError(t, err) + _, err = bigQueryWriteAPIConfigFromParsed(pConf) + require.Error(t, err) + assert.Contains(t, err.Error(), "16") + }) + + t.Run("required when auto_create_table and CDC mode", func(t *testing.T) { + pConf, err := spec.ParseYAML(` +dataset: my_dataset +table: my_table +write_mode: upsert +change_type: ${! metadata("operation") } +auto_create_table: true +schema: + - { name: id, type: STRING, mode: REQUIRED } +`, nil) + require.NoError(t, err) + _, err = bigQueryWriteAPIConfigFromParsed(pConf) + require.Error(t, err) + assert.Contains(t, err.Error(), "primary_keys") + }) + + t.Run("PK column must exist in schema when both set", func(t *testing.T) { + pConf, err := spec.ParseYAML(` +dataset: my_dataset +table: my_table +write_mode: upsert +change_type: ${! metadata("operation") } +auto_create_table: true +schema: + - { name: id, type: STRING, mode: REQUIRED } +primary_keys: [unknown_col] +`, nil) + require.NoError(t, err) + _, err = bigQueryWriteAPIConfigFromParsed(pConf) + require.Error(t, err) + assert.Contains(t, err.Error(), "unknown_col") + }) + + t.Run("PK column must be REQUIRED when both set", func(t *testing.T) { + pConf, err := spec.ParseYAML(` +dataset: my_dataset +table: my_table +write_mode: upsert +change_type: ${! metadata("operation") } +auto_create_table: true +schema: + - { name: id, type: STRING, mode: NULLABLE } +primary_keys: [id] +`, nil) + require.NoError(t, err) + _, err = bigQueryWriteAPIConfigFromParsed(pConf) + require.Error(t, err) + assert.Contains(t, err.Error(), "REQUIRED") + }) +} + +func TestBigQueryWriteAPIConfigCDCIncompatibilities(t *testing.T) { + spec := bigQueryWriteAPISpec() + cases := []struct { + name string + yaml string + errMsg string + }{ + { + name: "pending_stream rejects change_type", + yaml: ` +dataset: my_dataset +table: my_table +write_mode: pending_stream +change_type: ${! metadata("operation") } +`, + errMsg: "change_type", + }, + { + name: "default_stream rejects change_sequence_number", + yaml: ` +dataset: my_dataset +table: my_table +change_sequence_number: ${! metadata("scn") } +`, + errMsg: "change_sequence_number", + }, + { + name: "upsert rejects message_format: protobuf", + yaml: ` +dataset: my_dataset +table: my_table +write_mode: upsert +change_type: ${! metadata("operation") } +message_format: protobuf +`, + errMsg: "message_format", + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + pConf, err := spec.ParseYAML(tc.yaml, nil) + require.NoError(t, err) + _, err = bigQueryWriteAPIConfigFromParsed(pConf) + require.Error(t, err) + assert.Contains(t, err.Error(), tc.errMsg) + }) + } +} + func TestBigQueryWriteAPIConfigParsingRejectsNonPositiveDurations(t *testing.T) { spec := bigQueryWriteAPISpec() for _, tc := range []struct { @@ -180,6 +399,213 @@ delegates: } } +func TestSchemaParsing(t *testing.T) { + spec := bigQueryWriteAPISpec() + pConf, err := spec.ParseYAML(` +dataset: my_dataset +table: my_table +auto_create_table: true +schema: + - { name: id, type: STRING, mode: REQUIRED } + - { name: tags, type: STRING, mode: REPEATED } + - { name: age, type: INT64 } + - name: address + type: RECORD + fields: + - { name: line1, type: STRING } + - { name: city, type: STRING, mode: REQUIRED } +`, nil) + require.NoError(t, err) + + cfg, err := bigQueryWriteAPIConfigFromParsed(pConf) + require.NoError(t, err) + require.Len(t, cfg.Schema, 4) + assert.Equal(t, "id", cfg.Schema[0].Name) + assert.Equal(t, "STRING", cfg.Schema[0].Type) + assert.Equal(t, "REQUIRED", cfg.Schema[0].Mode) + assert.Equal(t, "REPEATED", cfg.Schema[1].Mode) + // INT64 alias normalises to INTEGER. + assert.Equal(t, "INTEGER", cfg.Schema[2].Type) + assert.Equal(t, "NULLABLE", cfg.Schema[2].Mode) + assert.Equal(t, "RECORD", cfg.Schema[3].Type) + require.Len(t, cfg.Schema[3].Fields, 2) + assert.Equal(t, "line1", cfg.Schema[3].Fields[0].Name) + assert.Equal(t, "city", cfg.Schema[3].Fields[1].Name) + assert.Equal(t, "REQUIRED", cfg.Schema[3].Fields[1].Mode) +} + +func TestSchemaValidation(t *testing.T) { + spec := bigQueryWriteAPISpec() + for _, tc := range []struct { + name string + yaml string + errMsg string + }{ + { + name: "auto_create_table without schema", + yaml: ` +dataset: my_dataset +table: my_table +auto_create_table: true +`, + errMsg: bqwaFieldSchema, + }, + { + name: "invalid column type", + yaml: ` +dataset: my_dataset +table: my_table +auto_create_table: true +schema: + - { name: id, type: NOTATYPE } +`, + errMsg: "NOTATYPE", + }, + { + name: "invalid mode", + yaml: ` +dataset: my_dataset +table: my_table +auto_create_table: true +schema: + - { name: id, type: STRING, mode: WEIRD } +`, + errMsg: "WEIRD", + }, + { + name: "record without fields", + yaml: ` +dataset: my_dataset +table: my_table +auto_create_table: true +schema: + - { name: addr, type: RECORD } +`, + errMsg: "RECORD", + }, + } { + t.Run(tc.name, func(t *testing.T) { + pConf, err := spec.ParseYAML(tc.yaml, nil) + require.NoError(t, err) + _, err = bigQueryWriteAPIConfigFromParsed(pConf) + require.Error(t, err) + assert.Contains(t, err.Error(), tc.errMsg) + }) + } +} + +func TestPartitioningClusteringParsing(t *testing.T) { + spec := bigQueryWriteAPISpec() + pConf, err := spec.ParseYAML(` +dataset: my_dataset +table: my_table +auto_create_table: true +schema: + - { name: id, type: STRING } + - { name: created_at, type: TIMESTAMP } + - { name: user_id, type: STRING } +time_partitioning: + type: HOUR + field: created_at + expiration: 24h + require_filter: true +clustering: + - user_id + - id +`, nil) + require.NoError(t, err) + + cfg, err := bigQueryWriteAPIConfigFromParsed(pConf) + require.NoError(t, err) + assert.Equal(t, "HOUR", cfg.TimePartitioning.Type) + assert.Equal(t, "created_at", cfg.TimePartitioning.Field) + assert.Equal(t, 24*time.Hour, cfg.TimePartitioning.Expiration) + assert.True(t, cfg.TimePartitioning.RequireFilter) + assert.Equal(t, []string{"user_id", "id"}, cfg.Clustering) +} + +func TestPartitioningClusteringDefaults(t *testing.T) { + // Absent partition block leaves Type empty (sentinel for "not configured"). + spec := bigQueryWriteAPISpec() + pConf, err := spec.ParseYAML(` +dataset: my_dataset +table: my_table +`, nil) + require.NoError(t, err) + cfg, err := bigQueryWriteAPIConfigFromParsed(pConf) + require.NoError(t, err) + assert.Empty(t, cfg.TimePartitioning.Type) + assert.Empty(t, cfg.TimePartitioning.Field) + assert.Empty(t, cfg.Clustering) +} + +func TestPartitioningClusteringValidation(t *testing.T) { + spec := bigQueryWriteAPISpec() + for _, tc := range []struct { + name string + yaml string + errMsg string + }{ + { + name: "partition field not in schema", + yaml: ` +dataset: my_dataset +table: my_table +auto_create_table: true +schema: [{ name: id, type: STRING }] +time_partitioning: { type: DAY, field: missing_col } +`, + errMsg: "missing_col", + }, + { + name: "partition field wrong type", + yaml: ` +dataset: my_dataset +table: my_table +auto_create_table: true +schema: [{ name: id, type: STRING }] +time_partitioning: { type: DAY, field: id } +`, + errMsg: "DATE/TIMESTAMP/DATETIME", + }, + { + name: "clustering column not in schema", + yaml: ` +dataset: my_dataset +table: my_table +auto_create_table: true +schema: [{ name: id, type: STRING }] +clustering: [missing_col] +`, + errMsg: "missing_col", + }, + { + name: "too many clustering columns", + yaml: ` +dataset: my_dataset +table: my_table +auto_create_table: true +schema: + - { name: a, type: STRING } + - { name: b, type: STRING } + - { name: c, type: STRING } + - { name: d, type: STRING } + - { name: e, type: STRING } +clustering: [a, b, c, d, e] +`, + errMsg: "at most 4", + }, + } { + t.Run(tc.name, func(t *testing.T) { + pConf, err := spec.ParseYAML(tc.yaml, nil) + require.NoError(t, err) + _, err = bigQueryWriteAPIConfigFromParsed(pConf) + require.Error(t, err) + assert.Contains(t, err.Error(), tc.errMsg) + }) + } +} + func TestJSONToProtoConversion(t *testing.T) { bqSchema := bigquery.Schema{ {Name: "name", Type: bigquery.StringFieldType}, @@ -339,6 +765,93 @@ func TestDescriptorProtoToMessageDescriptorErrors(t *testing.T) { }) } +// TestStreamCacheConcurrentStress exercises the stream cache from many +// goroutines doing reads (fast path), inserts (with LRU eviction), and +// targeted evictions. The point isn't to assert final cache contents — Go's +// race detector and the lock invariants are what we're stressing. +func TestStreamCacheConcurrentStress(t *testing.T) { + out := newTestOutput(t, ` +dataset: my_dataset +table: my_table +max_cached_streams: 16 +`) + out.streams = make(map[string]*streamWithDescriptor) + + // Seed enough entries to make the LRU scan non-trivial. + for i := range 8 { + k := fmt.Sprintf("projects/p/datasets/d/tables/seed%d", i) + swd := &streamWithDescriptor{} + swd.lastUsed.Store(time.Now().Add(time.Duration(-i) * time.Second).UnixNano()) + out.streams[k] = swd + } + + const ( + goroutines = 16 + opsPerWorker = 500 + ) + var wg sync.WaitGroup + wg.Add(goroutines) + for g := range goroutines { + go func(id int) { + defer wg.Done() + for i := range opsPerWorker { + key := fmt.Sprintf("projects/p/datasets/d/tables/t%d", (id*opsPerWorker+i)%24) + switch i % 4 { + case 0: + // Fast-path read with lastUsed update. + out.streamsMu.RLock() + if cached, exists := out.streams[key]; exists { + cached.lastUsed.Store(time.Now().UnixNano()) + } + out.streamsMu.RUnlock() + case 1: + // Insert path mimicking getOrCreateStream's tail. + out.streamsMu.Lock() + if _, exists := out.streams[key]; !exists { + newSwd := &streamWithDescriptor{} + newSwd.lastUsed.Store(time.Now().UnixNano()) + out.streams[key] = newSwd + // Drive the LRU pass under contention. + if len(out.streams) > out.conf.MaxCachedStreams { + var lruKey string + var lruTS int64 = -1 + for k, s := range out.streams { + if k == key { + continue + } + ts := s.lastUsed.Load() + if lruTS == -1 || ts < lruTS { + lruKey = k + lruTS = ts + } + } + if lruKey != "" { + delete(out.streams, lruKey) + } + } + } + out.streamsMu.Unlock() + case 2: + out.evictStream(key) + case 3: + out.streamsMu.RLock() + _ = len(out.streams) + out.streamsMu.RUnlock() + } + } + }(g) + } + wg.Wait() + + // The cap is a soft limit but should be honoured under the contention + // pattern above (each insert evicts at most one extra entry). + out.streamsMu.RLock() + size := len(out.streams) + out.streamsMu.RUnlock() + assert.LessOrEqual(t, size, out.conf.MaxCachedStreams+1, + "cache should stay within ~MaxCachedStreams under concurrent insert/evict") +} + func TestSweepIdleStreams(t *testing.T) { // Given an output with a short sweep interval and a stale stream. out := newTestOutput(t, ` @@ -524,6 +1037,8 @@ func TestMetricsInitialization(t *testing.T) { require.NotNil(t, m.retries) require.NotNil(t, m.schemaEvolutions) require.NotNil(t, m.schemaEvolutionFailures) + require.NotNil(t, m.cachedStreams) + require.NotNil(t, m.streamsEvicted) } func TestBuildAuthOpts(t *testing.T) { diff --git a/internal/impl/gcp/enterprise/bigquery/schema_resolver.go b/internal/impl/gcp/enterprise/bigquery/schema_resolver.go index 5546dc078d..a26be855d3 100644 --- a/internal/impl/gcp/enterprise/bigquery/schema_resolver.go +++ b/internal/impl/gcp/enterprise/bigquery/schema_resolver.go @@ -12,6 +12,7 @@ import ( "context" "errors" "fmt" + "net/http" "sync" "sync/atomic" "time" @@ -19,6 +20,7 @@ import ( bq "cloud.google.com/go/bigquery" "cloud.google.com/go/bigquery/storage/managedwriter/adapt" "golang.org/x/sync/singleflight" + "google.golang.org/api/googleapi" "google.golang.org/protobuf/reflect/protoreflect" "google.golang.org/protobuf/types/descriptorpb" @@ -26,10 +28,12 @@ import ( ) // resolvedSchema holds both descriptor forms (DescriptorProto for the managed -// writer, MessageDescriptor for JSON-to-proto conversion). +// writer, MessageDescriptor for JSON-to-proto conversion) plus the table's +// declared primary keys (nil when the table has no PK constraint). type resolvedSchema struct { descriptorProto *descriptorpb.DescriptorProto messageDescriptor protoreflect.MessageDescriptor + primaryKeys []string } // schemaResolver caches resolved schemas per table and deduplicates concurrent @@ -47,6 +51,9 @@ type schemaResolver struct { // refuse to store its now-stale result. atomic to avoid taking a lock on // the cache-miss path. generation atomic.Int64 + // creator is non-nil when auto_create_table is enabled. On a 404 from + // Metadata, Resolve calls creator.Ensure then retries the fetch. + creator *tableCreator } // Resolve returns a resolved schema for the given table by fetching the @@ -82,7 +89,7 @@ func (r *schemaResolver) Resolve(ctx context.Context, client *bq.Client, dataset genBefore := r.generation.Load() fetchCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), r.resolveTimeout) defer cancel() - rs, err := resolveFromBQTable(fetchCtx, client, datasetID, tableID) + rs, err := resolveFromBQTable(fetchCtx, client, r.creator, datasetID, tableID) if err != nil { return nil, err } @@ -111,10 +118,21 @@ func (r *schemaResolver) Evict(tableID string) { r.cache.Delete(tableID) } -func resolveFromBQTable(ctx context.Context, client *bq.Client, datasetID, tableID string) (*resolvedSchema, error) { +func resolveFromBQTable(ctx context.Context, client *bq.Client, creator *tableCreator, datasetID, tableID string) (*resolvedSchema, error) { meta, err := client.Dataset(datasetID).Table(tableID).Metadata(ctx) if err != nil { - return nil, err + // 404 + auto_create_table → create the table and retry. Any other + // error short-circuits. + var apiErr *googleapi.Error + if creator != nil && errors.As(err, &apiErr) && apiErr.Code == http.StatusNotFound { + if cerr := creator.Ensure(ctx, client, datasetID, tableID); cerr != nil { + return nil, fmt.Errorf("auto-create on 404: %w", cerr) + } + meta, err = client.Dataset(datasetID).Table(tableID).Metadata(ctx) + } + if err != nil { + return nil, err + } } tableSchema, err := adapt.BQSchemaToStorageTableSchema(meta.Schema) @@ -145,5 +163,23 @@ func resolveFromBQTable(ctx context.Context, client *bq.Client, datasetID, table return &resolvedSchema{ descriptorProto: normalized, messageDescriptor: md, + primaryKeys: extractPrimaryKeysFromMetadata(meta), }, nil } + +// extractPrimaryKeysFromMetadata reads the PRIMARY KEY declaration from a +// fetched BigQuery table metadata. Returns nil if the table has no primary +// key declared. The returned slice preserves the column order declared in +// BigQuery, which is significant for composite keys. +func extractPrimaryKeysFromMetadata(meta *bq.TableMetadata) []string { + if meta == nil || meta.TableConstraints == nil || meta.TableConstraints.PrimaryKey == nil { + return nil + } + cols := meta.TableConstraints.PrimaryKey.Columns + if len(cols) == 0 { + return nil + } + out := make([]string, len(cols)) + copy(out, cols) + return out +} diff --git a/internal/impl/gcp/enterprise/bigquery/schema_resolver_test.go b/internal/impl/gcp/enterprise/bigquery/schema_resolver_test.go index 6b14e0e672..d182634493 100644 --- a/internal/impl/gcp/enterprise/bigquery/schema_resolver_test.go +++ b/internal/impl/gcp/enterprise/bigquery/schema_resolver_test.go @@ -9,8 +9,12 @@ package bigquery import ( + "fmt" + "sync" "testing" + "time" + "cloud.google.com/go/bigquery" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/protobuf/reflect/protoreflect" @@ -110,6 +114,83 @@ func TestResolverEvictNonexistent(t *testing.T) { }) } +// TestResolverConcurrentStress hammers Resolve, Evict, and direct cache +// mutations from many goroutines for a short duration. With -race this +// surfaces any unsynchronised access to the resolver's internal state. We +// intentionally include a write path that uses singleflight (mimicking a +// real Resolve cache miss followed by a Store) so the generation guard is +// exercised under contention rather than only sequentially. +func TestResolverConcurrentStress(t *testing.T) { + r := &schemaResolver{ + log: service.MockResources().Logger(), + resolveTimeout: time.Second, + } + md := buildTestMessageDescriptor(t) + const ( + goroutines = 32 + opsPerWorker = 500 + uniqueTables = 8 + ) + var wg sync.WaitGroup + wg.Add(goroutines) + for g := range goroutines { + go func(id int) { + defer wg.Done() + for i := range opsPerWorker { + table := fmt.Sprintf("t%d", (id+i)%uniqueTables) + switch i % 4 { + case 0: + // Cache-hit path (or miss into singleflight fallback). + _, _, _ = r.sf.Do(table, func() (any, error) { + genBefore := r.generation.Load() + rs := &resolvedSchema{messageDescriptor: md} + if r.generation.Load() == genBefore { + r.cache.Store(table, rs) + } + return rs, nil + }) + case 1: + _, _ = r.cache.Load(table) + case 2: + r.Evict(table) + case 3: + // Force a generation bump without touching the cache to stress + // the atomic counter under contention. + r.generation.Add(1) + } + } + }(g) + } + wg.Wait() + // No assertions on final state — this test exists to fail under -race if + // any access path is unsafe. The build/test passes when atomic + lock + // invariants hold. + assert.GreaterOrEqual(t, r.generation.Load(), int64(0)) +} + +func TestExtractPrimaryKeysFromMetadata(t *testing.T) { + t.Run("returns columns in declared order", func(t *testing.T) { + meta := &bigquery.TableMetadata{ + TableConstraints: &bigquery.TableConstraints{ + PrimaryKey: &bigquery.PrimaryKey{Columns: []string{"tenant_id", "id"}}, + }, + } + pks := extractPrimaryKeysFromMetadata(meta) + assert.Equal(t, []string{"tenant_id", "id"}, pks) + }) + + t.Run("returns nil when no constraints", func(t *testing.T) { + assert.Nil(t, extractPrimaryKeysFromMetadata(&bigquery.TableMetadata{})) + assert.Nil(t, extractPrimaryKeysFromMetadata(&bigquery.TableMetadata{ + TableConstraints: &bigquery.TableConstraints{}, + })) + }) + + t.Run("returns nil on nil metadata", func(t *testing.T) { + assert.Nil(t, extractPrimaryKeysFromMetadata(nil)) + }) +} + // buildTestMessageDescriptor creates a simple (name STRING, age INT64) // message descriptor for unit tests. func buildTestMessageDescriptor(t *testing.T) protoreflect.MessageDescriptor { diff --git a/internal/impl/gcp/enterprise/bigquery/table_creator.go b/internal/impl/gcp/enterprise/bigquery/table_creator.go new file mode 100644 index 0000000000..65735c7871 --- /dev/null +++ b/internal/impl/gcp/enterprise/bigquery/table_creator.go @@ -0,0 +1,174 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Licensed as a Redpanda Enterprise file under the Redpanda Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/redpanda-data/connect/blob/main/licenses/rcl.md + +package bigquery + +import ( + "context" + "errors" + "fmt" + "net/http" + + "cloud.google.com/go/bigquery" + "google.golang.org/api/googleapi" + + "github.com/redpanda-data/benthos/v4/public/service" +) + +// yamlSchemaTypes maps the canonical YAML column types to BigQuery field +// types. The config parser normalises aliases (INT64→INTEGER etc.) before +// this mapping is consulted. +var yamlSchemaTypes = map[string]bigquery.FieldType{ + "STRING": bigquery.StringFieldType, + "BYTES": bigquery.BytesFieldType, + "INTEGER": bigquery.IntegerFieldType, + "FLOAT": bigquery.FloatFieldType, + "NUMERIC": bigquery.NumericFieldType, + "BIGNUMERIC": bigquery.BigNumericFieldType, + "BOOLEAN": bigquery.BooleanFieldType, + "TIMESTAMP": bigquery.TimestampFieldType, + "DATE": bigquery.DateFieldType, + "TIME": bigquery.TimeFieldType, + "DATETIME": bigquery.DateTimeFieldType, + "GEOGRAPHY": bigquery.GeographyFieldType, + "JSON": bigquery.JSONFieldType, + "RECORD": bigquery.RecordFieldType, +} + +// yamlPartitioningTypes maps the YAML enum to BQ's TimePartitioningType. The +// parser validates the input enum, so missing entries here would be a bug. +var yamlPartitioningTypes = map[string]bigquery.TimePartitioningType{ + "DAY": bigquery.DayPartitioningType, + "HOUR": bigquery.HourPartitioningType, + "MONTH": bigquery.MonthPartitioningType, + "YEAR": bigquery.YearPartitioningType, +} + +// yamlTimePartitioningToBQ returns the BigQuery time-partitioning struct, or +// nil if no partitioning was configured. The config parser leaves Type empty +// when the user didn't supply the `type` field (the explicit opt-in trigger). +// RequireFilter is intentionally not set here — the bigquery SDK marks the +// per-partitioning RequirePartitionFilter field as deprecated in favour of +// the top-level TableMetadata.RequirePartitionFilter, which tableCreator +// applies separately. +func yamlTimePartitioningToBQ(c bqTimePartitioningConfig) *bigquery.TimePartitioning { + if c.Type == "" { + return nil + } + return &bigquery.TimePartitioning{ + Type: yamlPartitioningTypes[c.Type], + Field: c.Field, + Expiration: c.Expiration, + } +} + +// yamlClusteringToBQ returns the BigQuery clustering struct, or nil if no +// clustering columns were configured. +func yamlClusteringToBQ(cols []string) *bigquery.Clustering { + if len(cols) == 0 { + return nil + } + return &bigquery.Clustering{Fields: cols} +} + +// tableCreator builds bigquery.TableMetadata from the parsed config and +// creates BigQuery tables idempotently. It is invoked lazily by schemaResolver +// when Metadata returns 404 and auto_create_table is enabled, so it handles +// both static and interpolated table names. +type tableCreator struct { + schema bigquery.Schema + partitioning *bigquery.TimePartitioning + clustering *bigquery.Clustering + requirePartFltr bool + primaryKeys []string + log *service.Logger +} + +// newTableCreator builds a tableCreator from the parsed config. Returns nil +// when auto_create_table is disabled — callers nil-check to skip the create +// path entirely. +func newTableCreator(conf bigQueryWriteAPIConfig, log *service.Logger) (*tableCreator, error) { + if !conf.AutoCreateTable { + return nil, nil + } + schema, err := yamlSchemaToBQSchema(conf.Schema) + if err != nil { + return nil, fmt.Errorf("building auto-create schema: %w", err) + } + return &tableCreator{ + schema: schema, + partitioning: yamlTimePartitioningToBQ(conf.TimePartitioning), + clustering: yamlClusteringToBQ(conf.Clustering), + requirePartFltr: conf.TimePartitioning.RequireFilter, + primaryKeys: conf.PrimaryKeys, + log: log, + }, nil +} + +// buildMetadata returns the bigquery.TableMetadata used by Ensure. Extracted +// from the inline literal so unit tests can exercise the PrimaryKey wiring +// without going through Create. +func (tc *tableCreator) buildMetadata() *bigquery.TableMetadata { + meta := &bigquery.TableMetadata{ + Schema: tc.schema, + TimePartitioning: tc.partitioning, + Clustering: tc.clustering, + RequirePartitionFilter: tc.requirePartFltr, + } + if len(tc.primaryKeys) > 0 { + meta.TableConstraints = &bigquery.TableConstraints{ + PrimaryKey: &bigquery.PrimaryKey{Columns: append([]string{}, tc.primaryKeys...)}, + } + } + return meta +} + +// Ensure creates the table if it does not exist. AlreadyExists (HTTP 409) is +// treated as success so concurrent creators race-tolerantly. Other errors +// propagate so the caller (schemaResolver) can classify and retry. +func (tc *tableCreator) Ensure(ctx context.Context, client *bigquery.Client, datasetID, tableID string) error { + err := client.Dataset(datasetID).Table(tableID).Create(ctx, tc.buildMetadata()) + if err == nil { + tc.log.Infof("Auto-created BigQuery table %s.%s", datasetID, tableID) + return nil + } + var apiErr *googleapi.Error + if errors.As(err, &apiErr) && apiErr.Code == http.StatusConflict { + // Another writer created the table first — that's the contract. + return nil + } + return fmt.Errorf("creating table %s.%s: %w", datasetID, tableID, err) +} + +// yamlSchemaToBQSchema converts the parsed YAML schema (a list of +// bqSchemaField) into a bigquery.Schema. The input is assumed to have already +// passed config-time validation, so canonical types and modes are guaranteed. +func yamlSchemaToBQSchema(fields []bqSchemaField) (bigquery.Schema, error) { + out := make(bigquery.Schema, 0, len(fields)) + for _, f := range fields { + bqType, ok := yamlSchemaTypes[f.Type] + if !ok { + return nil, fmt.Errorf("schema field %q: unsupported type %q", f.Name, f.Type) + } + fs := &bigquery.FieldSchema{ + Name: f.Name, + Type: bqType, + Required: f.Mode == "REQUIRED", + Repeated: f.Mode == "REPEATED", + } + if bqType == bigquery.RecordFieldType { + nested, err := yamlSchemaToBQSchema(f.Fields) + if err != nil { + return nil, err + } + fs.Schema = nested + } + out = append(out, fs) + } + return out, nil +} diff --git a/internal/impl/gcp/enterprise/bigquery/table_creator_test.go b/internal/impl/gcp/enterprise/bigquery/table_creator_test.go new file mode 100644 index 0000000000..7f157e3d44 --- /dev/null +++ b/internal/impl/gcp/enterprise/bigquery/table_creator_test.go @@ -0,0 +1,135 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Licensed as a Redpanda Enterprise file under the Redpanda Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/redpanda-data/connect/blob/main/licenses/rcl.md + +package bigquery + +import ( + "testing" + "time" + + "cloud.google.com/go/bigquery" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestYAMLSchemaToBQSchema(t *testing.T) { + in := []bqSchemaField{ + {Name: "id", Type: "STRING", Mode: "REQUIRED"}, + {Name: "tags", Type: "STRING", Mode: "REPEATED"}, + {Name: "age", Type: "INTEGER", Mode: "NULLABLE"}, + {Name: "address", Type: "RECORD", Mode: "NULLABLE", Fields: []bqSchemaField{ + {Name: "line1", Type: "STRING", Mode: "NULLABLE"}, + {Name: "city", Type: "STRING", Mode: "REQUIRED"}, + }}, + } + + got, err := yamlSchemaToBQSchema(in) + require.NoError(t, err) + require.Len(t, got, 4) + + assert.Equal(t, "id", got[0].Name) + assert.Equal(t, bigquery.StringFieldType, got[0].Type) + assert.True(t, got[0].Required) + assert.False(t, got[0].Repeated) + + assert.True(t, got[1].Repeated, "REPEATED maps to Repeated=true") + assert.False(t, got[1].Required, "REPEATED implies Required=false") + + assert.Equal(t, bigquery.IntegerFieldType, got[2].Type) + + assert.Equal(t, bigquery.RecordFieldType, got[3].Type) + require.Len(t, got[3].Schema, 2) + assert.Equal(t, "city", got[3].Schema[1].Name) + assert.True(t, got[3].Schema[1].Required) +} + +func TestYAMLTimePartitioningToBQ(t *testing.T) { + in := bqTimePartitioningConfig{Type: "HOUR", Field: "created_at", Expiration: 24 * time.Hour, RequireFilter: true} + got := yamlTimePartitioningToBQ(in) + require.NotNil(t, got) + assert.Equal(t, bigquery.HourPartitioningType, got.Type) + assert.Equal(t, "created_at", got.Field) + assert.Equal(t, 24*time.Hour, got.Expiration) +} + +func TestYAMLTimePartitioningEmpty(t *testing.T) { + // Zero-value config means no partitioning at all (Type empty). + assert.Nil(t, yamlTimePartitioningToBQ(bqTimePartitioningConfig{})) +} + +func TestYAMLTimePartitioningIngestionTime(t *testing.T) { + // Type set but field empty = ingestion-time partitioning. + got := yamlTimePartitioningToBQ(bqTimePartitioningConfig{Type: "DAY"}) + require.NotNil(t, got) + assert.Equal(t, bigquery.DayPartitioningType, got.Type) + assert.Empty(t, got.Field) +} + +func TestYAMLClusteringToBQ(t *testing.T) { + got := yamlClusteringToBQ([]string{"user_id", "tenant_id"}) + require.NotNil(t, got) + assert.Equal(t, []string{"user_id", "tenant_id"}, got.Fields) + + assert.Nil(t, yamlClusteringToBQ(nil), "empty list means no clustering") +} + +func TestYAMLSchemaTypeMapping(t *testing.T) { + cases := []struct { + in string + want bigquery.FieldType + }{ + {"STRING", bigquery.StringFieldType}, + {"BYTES", bigquery.BytesFieldType}, + {"INTEGER", bigquery.IntegerFieldType}, + {"FLOAT", bigquery.FloatFieldType}, + {"NUMERIC", bigquery.NumericFieldType}, + {"BIGNUMERIC", bigquery.BigNumericFieldType}, + {"BOOLEAN", bigquery.BooleanFieldType}, + {"TIMESTAMP", bigquery.TimestampFieldType}, + {"DATE", bigquery.DateFieldType}, + {"TIME", bigquery.TimeFieldType}, + {"DATETIME", bigquery.DateTimeFieldType}, + {"GEOGRAPHY", bigquery.GeographyFieldType}, + {"JSON", bigquery.JSONFieldType}, + {"RECORD", bigquery.RecordFieldType}, + } + for _, c := range cases { + t.Run(c.in, func(t *testing.T) { + in := []bqSchemaField{{Name: "x", Type: c.in, Mode: "NULLABLE"}} + if c.in == "RECORD" { + in[0].Fields = []bqSchemaField{{Name: "y", Type: "STRING", Mode: "NULLABLE"}} + } + got, err := yamlSchemaToBQSchema(in) + require.NoError(t, err) + assert.Equal(t, c.want, got[0].Type) + }) + } +} + +func TestTableCreatorBuildsPrimaryKey(t *testing.T) { + creator := &tableCreator{ + schema: bigquery.Schema{ + {Name: "id", Type: bigquery.StringFieldType, Required: true}, + }, + primaryKeys: []string{"id"}, + } + meta := creator.buildMetadata() + require.NotNil(t, meta.TableConstraints) + require.NotNil(t, meta.TableConstraints.PrimaryKey) + assert.Equal(t, []string{"id"}, meta.TableConstraints.PrimaryKey.Columns) +} + +func TestTableCreatorNoPrimaryKey(t *testing.T) { + creator := &tableCreator{ + schema: bigquery.Schema{ + {Name: "id", Type: bigquery.StringFieldType, Required: true}, + }, + } + meta := creator.buildMetadata() + assert.Nil(t, meta.TableConstraints) +} diff --git a/internal/impl/gcp/enterprise/bigquery/write_mode.go b/internal/impl/gcp/enterprise/bigquery/write_mode.go new file mode 100644 index 0000000000..d2ea40db20 --- /dev/null +++ b/internal/impl/gcp/enterprise/bigquery/write_mode.go @@ -0,0 +1,161 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Licensed as a Redpanda Enterprise file under the Redpanda Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/redpanda-data/connect/blob/main/licenses/rcl.md + +package bigquery + +import ( + "context" + "fmt" + "sync" + + "cloud.google.com/go/bigquery/storage/apiv1/storagepb" + "cloud.google.com/go/bigquery/storage/managedwriter" + "google.golang.org/protobuf/types/descriptorpb" +) + +// maxAppendRowsBytes is a conservative chunk-size limit for AppendRows. The +// API's hard limit is 10MB per request; we leave headroom for proto framing +// and gRPC overhead. +const maxAppendRowsBytes = 9 * 1024 * 1024 + +// pendingStreamWriter writes a single batch to BigQuery using a freshly +// allocated pending stream. The lifecycle per WriteBatch is: +// +// 1. CreateWriteStream(Pending) — fresh stream per batch. +// 2. AppendRows for each chunk with sequential offsets. +// 3. FinalizeWriteStream. +// 4. BatchCommitWriteStreams — atomic commit. +// +// Failure semantics: any error before commit leaves an uncommitted stream +// (data not visible). benthos retries via a fresh stream, giving clean +// at-least-once semantics with exactly-once-within-stream guarantees on the +// data that does land. A successful commit whose response is lost on the +// wire produces a duplicate-commit retry — this is a fundamental limit of +// the BQ Storage Write API exactly-once contract. +type pendingStreamWriter struct { + storage *managedwriter.Client + // inflight tracks in-progress Write calls so Close can wait for them to + // finish before the underlying storage client is torn down. Without this, + // a benthos shutdown that races with a pending Write would tear the gRPC + // connection mid-Finalize/BatchCommit and surface a permanent batch error + // instead of a clean retry. + inflight sync.WaitGroup +} + +// Begin registers an intent to run Write, returning a release function the +// caller must defer. It exists so the caller can register the in-flight +// operation while still holding the lock that synchronises Close — Write +// itself can only Add(1) once it actually runs, which leaves a window where +// Close.Wait() observes inflight=0 between the lock release and Write entry. +// Callers should always do: +// +// pending := snapshot under connMu.RLock +// done := pending.Begin() +// defer done() +// connMu.RUnlock +// ... pending.Write(...) +func (p *pendingStreamWriter) Begin() func() { + p.inflight.Add(1) + return p.inflight.Done +} + +// Wait blocks until every in-flight Write returns. Intended for output.Close +// to call before tearing down the underlying managedwriter.Client. Safe to +// call repeatedly. +func (p *pendingStreamWriter) Wait() { + p.inflight.Wait() +} + +// Write executes the pending-stream lifecycle for a single batch. parent is +// the BigQuery resource path of the destination table +// (`projects/{p}/datasets/{d}/tables/{t}`). descriptorProto must match the +// rows' serialised proto encoding. +// +// The caller is expected to have invoked Begin() before this call so Close +// can correctly observe the in-flight count. Write does not register itself +// because the registration must happen under the caller's connMu RLock. +func (p *pendingStreamWriter) Write( + ctx context.Context, + parent string, + descriptorProto *descriptorpb.DescriptorProto, + rows [][]byte, +) error { + if len(rows) == 0 { + return nil + } + + stream, err := p.storage.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest{ + Parent: parent, + WriteStream: &storagepb.WriteStream{ + Type: storagepb.WriteStream_PENDING, + }, + }) + if err != nil { + return fmt.Errorf("creating pending write stream: %w", err) + } + + ms, err := p.storage.NewManagedStream(ctx, + managedwriter.WithStreamName(stream.GetName()), + managedwriter.WithSchemaDescriptor(descriptorProto), + ) + if err != nil { + return fmt.Errorf("opening managed stream for %s: %w", stream.GetName(), err) + } + defer func() { _ = ms.Close() }() + + chunks := chunkRowsByBytes(rows, maxAppendRowsBytes) + var offset int64 + for _, chunk := range chunks { + result, err := ms.AppendRows(ctx, chunk, managedwriter.WithOffset(offset)) + if err != nil { + return fmt.Errorf("AppendRows at offset %d: %w", offset, err) + } + if _, err := result.GetResult(ctx); err != nil { + return fmt.Errorf("AppendRows ack at offset %d: %w", offset, err) + } + offset += int64(len(chunk)) + } + + if _, err := ms.Finalize(ctx); err != nil { + return fmt.Errorf("FinalizeWriteStream %s: %w", stream.GetName(), err) + } + + if _, err := p.storage.BatchCommitWriteStreams(ctx, &storagepb.BatchCommitWriteStreamsRequest{ + Parent: parent, + WriteStreams: []string{stream.GetName()}, + }); err != nil { + return fmt.Errorf("BatchCommitWriteStreams %s: %w", stream.GetName(), err) + } + return nil +} + +// chunkRowsByBytes splits a row slice into smaller slices such that each +// chunk's total payload byte size stays under maxBytes. A single row that +// already exceeds maxBytes is emitted in its own chunk; the API will reject +// it server-side and the caller surfaces that as a permanent error. +func chunkRowsByBytes(rows [][]byte, maxBytes int) [][][]byte { + if len(rows) == 0 { + return nil + } + var chunks [][][]byte + var current [][]byte + var currentBytes int + for _, r := range rows { + if len(current) > 0 && currentBytes+len(r) > maxBytes { + chunks = append(chunks, current) + current = nil + currentBytes = 0 + } + current = append(current, r) + currentBytes += len(r) + } + if len(current) > 0 { + chunks = append(chunks, current) + } + return chunks +} diff --git a/internal/impl/gcp/enterprise/bigquery/write_mode_test.go b/internal/impl/gcp/enterprise/bigquery/write_mode_test.go new file mode 100644 index 0000000000..c759ac7643 --- /dev/null +++ b/internal/impl/gcp/enterprise/bigquery/write_mode_test.go @@ -0,0 +1,60 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Licensed as a Redpanda Enterprise file under the Redpanda Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/redpanda-data/connect/blob/main/licenses/rcl.md + +package bigquery + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestChunkRowsByBytes(t *testing.T) { + t.Run("empty", func(t *testing.T) { + got := chunkRowsByBytes(nil, 1024) + assert.Empty(t, got) + }) + + t.Run("single row fits", func(t *testing.T) { + rows := [][]byte{[]byte("hello")} + got := chunkRowsByBytes(rows, 1024) + require.Len(t, got, 1) + assert.Equal(t, rows, got[0]) + }) + + t.Run("splits at byte threshold", func(t *testing.T) { + // 5 rows of 100 bytes each, max=250 → 3 chunks of [2, 2, 1]. + rows := make([][]byte, 5) + for i := range rows { + rows[i] = bytes.Repeat([]byte{'a'}, 100) + } + got := chunkRowsByBytes(rows, 250) + require.Len(t, got, 3) + assert.Len(t, got[0], 2) + assert.Len(t, got[1], 2) + assert.Len(t, got[2], 1) + }) + + t.Run("oversized single row still emits", func(t *testing.T) { + // One oversized row is emitted as its own chunk; the API will reject + // it server-side, which is the expected error path. + rows := [][]byte{bytes.Repeat([]byte{'b'}, 1024)} + got := chunkRowsByBytes(rows, 100) + require.Len(t, got, 1) + assert.Len(t, got[0], 1) + }) + + t.Run("all rows fit in one chunk", func(t *testing.T) { + rows := [][]byte{[]byte("a"), []byte("b"), []byte("c")} + got := chunkRowsByBytes(rows, 100) + require.Len(t, got, 1) + assert.Equal(t, rows, got[0]) + }) +}