Skip to content

BigQuery: pending streams partitioned#4443

Open
squiidz wants to merge 1 commit into
mainfrom
bq-pending-streams-partitioned
Open

BigQuery: pending streams partitioned#4443
squiidz wants to merge 1 commit into
mainfrom
bq-pending-streams-partitioned

Conversation

@squiidz
Copy link
Copy Markdown
Contributor

@squiidz squiidz commented May 20, 2026

No description provided.

Comment on lines +812 to +819
if o.conf.WriteMode == "pending_stream" {
if o.pending == nil {
return service.ErrNotConnected
}
parent := o.tableCacheKey(projectID, tableID)
if err := o.pending.Write(ctx, parent, swd.descriptorProto, rows); err != nil {
return o.handleWriteError(ctx, client, err, batch, tableID, swd.descriptor, "pending stream write")
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

o.pending is read here without holding connMu, but Close writes o.pending = nil under connMu.Lock() (line 953). This is both a data race (race detector will flag it) and worse — if Close runs between the nil-check on line 813 and the o.pending.Write(...) call on line 817, this dereferences a nil pointer and panics.

The fix is to capture pending under the same RLock that already snapshots client and projectID (lines 755-758), then use the local snapshot here. The "Snapshot client + resolvedProjectID together under one RLock" comment already documents this pattern — it just wasn't extended to o.pending.

Note that the existing o.pending.Wait() call inside Close is not sufficient protection: pendingStreamWriter.Write increments inflight inside itself, so if Close acquires the lock after the if o.pending == nil check but before o.pending.Write(...) runs, Wait() returns immediately (inflight=0) and Close proceeds to nil out o.pending and tear down storageClient.

@claude
Copy link
Copy Markdown

claude Bot commented May 20, 2026

Commits

  1. bigquery: address PR review feedback — vague title that gives no indication of the actual change (a substantial refactor that collapses classifyGRPCError, isPermanentBQError, and extractGRPCStatus into a single bqError type). Per the commit policy, commit titles should not be vague. Suggest something like bigquery: collapse error classifiers into bqError. The bodies of commits 4 (bigquery: rename failures metric and use sync.Map.Clear) and 5 (bigquery: address PR review feedback) also overlap (both describe the metric rename and sync.Map.Clear), which is confusing — consider squashing or making each commit's scope distinct.

Review
The core PR — pending streams, auto-create, partitioning/clustering, schema resolution/evolution, table-name sanitization, configurable timeouts, LRU cap, metrics — is well-structured. Most concurrency edges (detached contexts for cached streams / resolver / evolver, generation counter on evict, closeWg for async stream closes, pendingStreamWriter.inflight) are carefully handled.

  1. Data race / nil-deref on o.pending in WriteBatch. o.pending is read outside connMu but written under connMu.Lock() in Close (line 953). A concurrent Close between the nil-check and the call dereferences a nil pointer. See the inline comment for the suggested snapshot-under-RLock fix.

@squiidz squiidz force-pushed the bq-pending-streams-partitioned branch from 4748566 to 1787486 Compare May 20, 2026 16:07
@claude
Copy link
Copy Markdown

claude Bot commented May 20, 2026

Commits

  1. Commit 4f7c63c ("bigquery: address PR review feedback") duplicates work already in commit fca50c6 ("bigquery: rename failures metric and use sync.Map.Clear") — both rename schema_evolution_failures_totalschema_evolutions_failures_total and switch the resolver-cache close from Range+Delete to sync.Map.Clear. These should be squashed (the bqError refactor in 4f7c63c is fine to keep, but the duplicated lines look like an unsquashed fixup).
  2. Commit 4f7c63c message is vague (address PR review feedback). Even with the bullet list in the body, the headline should describe the change (e.g. bigquery: collapse classifyGRPCError into bqError type). Same softer concern applies to commit cea44c2 (address review — race fix, LRU cap, emulator fixes).
  3. Commit 1787486 headline is truncated mid-word: bigquery: add pending streams, auto-create tables, partitioning and c…. The title needs to be a complete sentence — e.g. bigquery: add pending streams, auto-create, partitioning, clustering.

Review

The implementation is well structured: clear separation between schemaResolver, schemaEvolver, tableCreator, and pendingStreamWriter; lifecycle/lock ordering is documented and consistent; license headers (RCL, year 2026), error wrapping (gerund + %w), context detachment with bounded timeouts, and configurable timeouts all match the project conventions. Test coverage is solid — unit tests for the parser/diff/classifier/LRU/sweeper plus integration tests against the goccy emulator (with documented skips for the cases the emulator can't handle).

LGTM

…lustering

Implements CON-394 on top of the schema resolver/evolver work. Three
opt-in features layered onto gcp_bigquery_write_api:

Pending-stream write mode (write_mode: pending_stream)
- Per-batch CreateWriteStream(Pending) → AppendRows → Finalize →
  BatchCommitWriteStreams. Provides exactly-once semantics within a
  committed batch.
- Rows are split into 9MB chunks (chunkRowsByBytes) so we stay under
  the AppendRows 10MB hard limit with headroom for proto framing.
- In-flight Writes are tracked on a WaitGroup; Close blocks on it so
  Finalize/BatchCommit cannot race with storage-client teardown.
- Default mode (default_stream) is unchanged.

Auto-create tables (auto_create_table: true)
- schemaResolver gains a *tableCreator. A 404 from Metadata triggers
  creator.Ensure (idempotent on AlreadyExists) and retries Metadata.
- Works for both static and interpolated table names; every
  auto-created table receives the same configured schema and
  partition/clustering settings.

Schema / time_partitioning / clustering config
- New schema YAML field: column name + canonical BQ type
  (aliases normalised) + mode (NULLABLE/REQUIRED/REPEATED), recursive
  for RECORD.
- New time_partitioning block (DAY/HOUR/MONTH/YEAR + optional field,
  expiration, require_filter). Absence of the type field is the
  sentinel for "not configured".
- New clustering string list (cap of 4 enforced).
- Parser validates that partitioning.field exists in schema with a
  DATE/TIMESTAMP/DATETIME type and that clustering columns are in
  schema.

Observability
- bigquery_write_api_cached_streams (gauge) set on every cache
  mutation in getOrCreateStream, evictStream, and the idle-sweep.
- bigquery_write_api_streams_evicted_total (counter) covers all
  eviction paths (LRU + on-error + idle-sweep).

Tests
- Unit tests for: write_mode/auto_create_table parsing, schema
  parsing + validation, time_partitioning + clustering parsing,
  schema → BQ conversion, table creator helpers, chunking, pending
  stream lifecycle plumbing.
- Concurrent stress tests under -race for the schema resolver
  (Resolve/Evict/sf.Do contention) and stream cache (insert + LRU +
  evict contention).
- Integration tests: TestIntegrationAutoCreateTable runs against the
  goccy emulator. TestIntegrationPendingStreamMode is gated with
  t.Skip because the emulator does not implement Pending streams or
  BatchCommitWriteStreams.

Docs regenerated with the new fields and a "Write modes" /
"Auto-create" / "EOS caveat" section in the output description.
@squiidz squiidz force-pushed the bq-pending-streams-partitioned branch from 1787486 to f590cba Compare May 21, 2026 15:42
@claude
Copy link
Copy Markdown

claude Bot commented May 21, 2026

Commits
LGTM

Review
Reviewed BigQuery write API additions: pending-stream mode, auto-create tables, time partitioning + clustering, plus observability metrics and concurrency stress tests. Code follows project patterns (license headers correct for RCL, lock ordering preserved via connMustreamsMu, errors wrapped with gerund form + %w, named-return parsing pattern). The Begin()/Wait() pattern in output.go#L753-L771 correctly closes the Close-vs-pending-Write race by incrementing the WaitGroup inside the RLock. Tests cover schema parsing/validation, partitioning + clustering validation, chunking, schema→BQ conversion, and concurrent stream cache + resolver paths under -race.

LGTM

schema bigquery.Schema
partitioning *bigquery.TimePartitioning
clustering *bigquery.Clustering
requirePartFltr bool
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth just calling this requirePartitionFilter similar to how it's defined in bigquery.TableMetadata?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants