BigQuery: cdc upsert#4453
Conversation
…lustering Implements CON-394 on top of the schema resolver/evolver work. Three opt-in features layered onto gcp_bigquery_write_api: Pending-stream write mode (write_mode: pending_stream) - Per-batch CreateWriteStream(Pending) → AppendRows → Finalize → BatchCommitWriteStreams. Provides exactly-once semantics within a committed batch. - Rows are split into 9MB chunks (chunkRowsByBytes) so we stay under the AppendRows 10MB hard limit with headroom for proto framing. - In-flight Writes are tracked on a WaitGroup; Close blocks on it so Finalize/BatchCommit cannot race with storage-client teardown. - Default mode (default_stream) is unchanged. Auto-create tables (auto_create_table: true) - schemaResolver gains a *tableCreator. A 404 from Metadata triggers creator.Ensure (idempotent on AlreadyExists) and retries Metadata. - Works for both static and interpolated table names; every auto-created table receives the same configured schema and partition/clustering settings. Schema / time_partitioning / clustering config - New schema YAML field: column name + canonical BQ type (aliases normalised) + mode (NULLABLE/REQUIRED/REPEATED), recursive for RECORD. - New time_partitioning block (DAY/HOUR/MONTH/YEAR + optional field, expiration, require_filter). Absence of the type field is the sentinel for "not configured". - New clustering string list (cap of 4 enforced). - Parser validates that partitioning.field exists in schema with a DATE/TIMESTAMP/DATETIME type and that clustering columns are in schema. Observability - bigquery_write_api_cached_streams (gauge) set on every cache mutation in getOrCreateStream, evictStream, and the idle-sweep. - bigquery_write_api_streams_evicted_total (counter) covers all eviction paths (LRU + on-error + idle-sweep). Tests - Unit tests for: write_mode/auto_create_table parsing, schema parsing + validation, time_partitioning + clustering parsing, schema → BQ conversion, table creator helpers, chunking, pending stream lifecycle plumbing. - Concurrent stress tests under -race for the schema resolver (Resolve/Evict/sf.Do contention) and stream cache (insert + LRU + evict contention). - Integration tests: TestIntegrationAutoCreateTable runs against the goccy emulator. TestIntegrationPendingStreamMode is gated with t.Skip because the emulator does not implement Pending streams or BatchCommitWriteStreams. Docs regenerated with the new fields and a "Write modes" / "Auto-create" / "EOS caveat" section in the output description.
Implements CON-395 on top of the pending-streams branch. Two new opt-in write modes layered onto gcp_bigquery_write_api: upsert and upsert_delete. Both inject BigQuery's _CHANGE_TYPE pseudo-column per row and optionally _CHANGE_SEQUENCE_NUMBER for out-of-order resolution. Write modes - write_mode: upsert — UPSERT-only rows. - write_mode: upsert_delete — UPSERT and DELETE rows. - Both use the default stream (the only stream type BigQuery's CDC contract supports). pending_stream + CDC is rejected at parse time. Pseudo-column injection - change_type: Bloblang field resolving per row to UPSERT or DELETE (case-insensitive; INSERT is rejected — BigQuery doesn't allow mixing INSERT and UPSERT/DELETE in the same write). - change_sequence_number: optional Bloblang field. Validated against BigQuery's 1-to-4-sections-of-1-to-16-hex-chars format. - Injection happens at the JSON byte level via map[string]json.RawMessage so user values (int-as-string, base64 bytes, RFC3339 timestamps) pass through byte-exact. No round-trip through map[string]any that would corrupt int64 strings above 2^53. - Per-row validation failures route to DLQ via BatchError.Failed so a single bad row never poisons the batch. rowsFailed metric counts every reject path. Primary keys - New primary_keys config (≤16 columns). Required when auto_create_table is true; otherwise the connector reads the table's declared PKs via TableConstraints.PrimaryKey and uses those. - When both config and table-declared PKs are present, they must match (same columns, same order). Mismatches fail loudly at createStream. - CDC mode without any PK source fails at createStream with a message pointing users to ALTER TABLE … ADD PRIMARY KEY. Descriptor wrapping - _CHANGE_TYPE and _CHANGE_SEQUENCE_NUMBER (when configured) are appended as proto fields at max(existing)+1, +2 in the user's descriptor. Wrapped descriptors live on the per-stream streamWithDescriptor so concurrent createStream calls cannot race. - Stream cache keying includes the write_mode suffix so CDC and non-CDC pipelines pointed at the same table cannot share a stream. message_format - CDC modes require message_format: json. The protobuf path is rejected at parse time (and defensively at runtime) because the injection works on JSON bytes. Docs and tests - New migration guide at docs/modules/components/pages/outputs/ bigquery_cdc_migration.adoc covering config translation, schema requirements, snapshot vs streaming tradeoffs, and the DML restriction on CDC tables. Bidirectional xref with the output page. - Unit tests cover: validateChangeType, validateChangeSequenceNumber, validateCDCPrimaryKeys, wrapDescriptorForCDC, injectCDCJSON (incl. byte-fidelity for >2^53 integers), cdcInjector.validateAndResolveCDC, extractPrimaryKeysFromMetadata, tableCreator.buildMetadata with PKs, and config parsing for the five new fields plus cross-field validation. - Integration tests stubbed (t.Skip) — goccy emulator does not implement CDC; real-BQ verification deferred.
|
Commits
Review Layered BigQuery features: pending-stream write mode, auto-create-tables, time partitioning + clustering, observability gauges/counters, and CDC upsert/upsert_delete write modes with LGTM |
No description provided.