Skip to content

BigQuery: cdc upsert#4453

Open
squiidz wants to merge 2 commits into
mainfrom
bq-cdc-upsert
Open

BigQuery: cdc upsert#4453
squiidz wants to merge 2 commits into
mainfrom
bq-cdc-upsert

Conversation

@squiidz
Copy link
Copy Markdown
Contributor

@squiidz squiidz commented May 25, 2026

No description provided.

squiidz added 2 commits May 21, 2026 10:14
…lustering

Implements CON-394 on top of the schema resolver/evolver work. Three
opt-in features layered onto gcp_bigquery_write_api:

Pending-stream write mode (write_mode: pending_stream)
- Per-batch CreateWriteStream(Pending) → AppendRows → Finalize →
  BatchCommitWriteStreams. Provides exactly-once semantics within a
  committed batch.
- Rows are split into 9MB chunks (chunkRowsByBytes) so we stay under
  the AppendRows 10MB hard limit with headroom for proto framing.
- In-flight Writes are tracked on a WaitGroup; Close blocks on it so
  Finalize/BatchCommit cannot race with storage-client teardown.
- Default mode (default_stream) is unchanged.

Auto-create tables (auto_create_table: true)
- schemaResolver gains a *tableCreator. A 404 from Metadata triggers
  creator.Ensure (idempotent on AlreadyExists) and retries Metadata.
- Works for both static and interpolated table names; every
  auto-created table receives the same configured schema and
  partition/clustering settings.

Schema / time_partitioning / clustering config
- New schema YAML field: column name + canonical BQ type
  (aliases normalised) + mode (NULLABLE/REQUIRED/REPEATED), recursive
  for RECORD.
- New time_partitioning block (DAY/HOUR/MONTH/YEAR + optional field,
  expiration, require_filter). Absence of the type field is the
  sentinel for "not configured".
- New clustering string list (cap of 4 enforced).
- Parser validates that partitioning.field exists in schema with a
  DATE/TIMESTAMP/DATETIME type and that clustering columns are in
  schema.

Observability
- bigquery_write_api_cached_streams (gauge) set on every cache
  mutation in getOrCreateStream, evictStream, and the idle-sweep.
- bigquery_write_api_streams_evicted_total (counter) covers all
  eviction paths (LRU + on-error + idle-sweep).

Tests
- Unit tests for: write_mode/auto_create_table parsing, schema
  parsing + validation, time_partitioning + clustering parsing,
  schema → BQ conversion, table creator helpers, chunking, pending
  stream lifecycle plumbing.
- Concurrent stress tests under -race for the schema resolver
  (Resolve/Evict/sf.Do contention) and stream cache (insert + LRU +
  evict contention).
- Integration tests: TestIntegrationAutoCreateTable runs against the
  goccy emulator. TestIntegrationPendingStreamMode is gated with
  t.Skip because the emulator does not implement Pending streams or
  BatchCommitWriteStreams.

Docs regenerated with the new fields and a "Write modes" /
"Auto-create" / "EOS caveat" section in the output description.
Implements CON-395 on top of the pending-streams branch. Two new
opt-in write modes layered onto gcp_bigquery_write_api: upsert and
upsert_delete. Both inject BigQuery's _CHANGE_TYPE pseudo-column
per row and optionally _CHANGE_SEQUENCE_NUMBER for out-of-order
resolution.

Write modes
- write_mode: upsert — UPSERT-only rows.
- write_mode: upsert_delete — UPSERT and DELETE rows.
- Both use the default stream (the only stream type BigQuery's CDC
  contract supports). pending_stream + CDC is rejected at parse time.

Pseudo-column injection
- change_type: Bloblang field resolving per row to UPSERT or DELETE
  (case-insensitive; INSERT is rejected — BigQuery doesn't allow
  mixing INSERT and UPSERT/DELETE in the same write).
- change_sequence_number: optional Bloblang field. Validated against
  BigQuery's 1-to-4-sections-of-1-to-16-hex-chars format.
- Injection happens at the JSON byte level via map[string]json.RawMessage
  so user values (int-as-string, base64 bytes, RFC3339 timestamps)
  pass through byte-exact. No round-trip through map[string]any that
  would corrupt int64 strings above 2^53.
- Per-row validation failures route to DLQ via BatchError.Failed
  so a single bad row never poisons the batch. rowsFailed metric
  counts every reject path.

Primary keys
- New primary_keys config (≤16 columns). Required when auto_create_table
  is true; otherwise the connector reads the table's declared PKs via
  TableConstraints.PrimaryKey and uses those.
- When both config and table-declared PKs are present, they must match
  (same columns, same order). Mismatches fail loudly at createStream.
- CDC mode without any PK source fails at createStream with a message
  pointing users to ALTER TABLE … ADD PRIMARY KEY.

Descriptor wrapping
- _CHANGE_TYPE and _CHANGE_SEQUENCE_NUMBER (when configured) are
  appended as proto fields at max(existing)+1, +2 in the user's
  descriptor. Wrapped descriptors live on the per-stream
  streamWithDescriptor so concurrent createStream calls cannot race.
- Stream cache keying includes the write_mode suffix so CDC and
  non-CDC pipelines pointed at the same table cannot share a stream.

message_format
- CDC modes require message_format: json. The protobuf path is
  rejected at parse time (and defensively at runtime) because the
  injection works on JSON bytes.

Docs and tests
- New migration guide at docs/modules/components/pages/outputs/
  bigquery_cdc_migration.adoc covering config translation, schema
  requirements, snapshot vs streaming tradeoffs, and the DML
  restriction on CDC tables. Bidirectional xref with the output page.
- Unit tests cover: validateChangeType, validateChangeSequenceNumber,
  validateCDCPrimaryKeys, wrapDescriptorForCDC, injectCDCJSON (incl.
  byte-fidelity for >2^53 integers), cdcInjector.validateAndResolveCDC,
  extractPrimaryKeysFromMetadata, tableCreator.buildMetadata with PKs,
  and config parsing for the five new fields plus cross-field
  validation.
- Integration tests stubbed (t.Skip) — goccy emulator does not
  implement CDC; real-BQ verification deferred.
@claude
Copy link
Copy Markdown

claude Bot commented May 25, 2026

Commits

  1. Commit f590cba title is truncated mid-word: bigquery: add pending streams, auto-create tables, partitioning and c… with the body starting …lustering. Per the commit policy, the title should be a self-contained imperative sentence — splitting "clustering" across the subject and body produces a headline that does not stand alone.

  2. Both commits mix documentation changes with code changes:

    • f590cba modifies docs/modules/components/pages/outputs/gcp_bigquery_write_api.adoc alongside the implementation.
    • 9fb7297 adds docs/modules/components/pages/outputs/bigquery_cdc_migration.adoc and further edits the output page alongside the implementation.

    Per the commit policy, in multi-commit PRs documentation changes must be in a separate commit from code changes.

Review

Layered BigQuery features: pending-stream write mode, auto-create-tables, time partitioning + clustering, observability gauges/counters, and CDC upsert/upsert_delete write modes with _CHANGE_TYPE / _CHANGE_SEQUENCE_NUMBER injection. Config parsing has good cross-field validation, the locking around o.pending (Begin under connMu RLock → Wait under Lock during Close) is correctly ordered to avoid the inflight-counter race called out in the code comments, and the descriptor wrapping isolates CDC field numbers per-stream. Tests cover the parser, concurrent stress paths, and integration with the goccy emulator where supported.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant