BigQuery: pending streams partitioned#4443
Conversation
| if o.conf.WriteMode == "pending_stream" { | ||
| if o.pending == nil { | ||
| return service.ErrNotConnected | ||
| } | ||
| parent := o.tableCacheKey(projectID, tableID) | ||
| if err := o.pending.Write(ctx, parent, swd.descriptorProto, rows); err != nil { | ||
| return o.handleWriteError(ctx, client, err, batch, tableID, swd.descriptor, "pending stream write") | ||
| } |
There was a problem hiding this comment.
o.pending is read here without holding connMu, but Close writes o.pending = nil under connMu.Lock() (line 953). This is both a data race (race detector will flag it) and worse — if Close runs between the nil-check on line 813 and the o.pending.Write(...) call on line 817, this dereferences a nil pointer and panics.
The fix is to capture pending under the same RLock that already snapshots client and projectID (lines 755-758), then use the local snapshot here. The "Snapshot client + resolvedProjectID together under one RLock" comment already documents this pattern — it just wasn't extended to o.pending.
Note that the existing o.pending.Wait() call inside Close is not sufficient protection: pendingStreamWriter.Write increments inflight inside itself, so if Close acquires the lock after the if o.pending == nil check but before o.pending.Write(...) runs, Wait() returns immediately (inflight=0) and Close proceeds to nil out o.pending and tear down storageClient.
|
Commits
Review
|
4748566 to
1787486
Compare
|
Commits
Review The implementation is well structured: clear separation between LGTM |
…lustering Implements CON-394 on top of the schema resolver/evolver work. Three opt-in features layered onto gcp_bigquery_write_api: Pending-stream write mode (write_mode: pending_stream) - Per-batch CreateWriteStream(Pending) → AppendRows → Finalize → BatchCommitWriteStreams. Provides exactly-once semantics within a committed batch. - Rows are split into 9MB chunks (chunkRowsByBytes) so we stay under the AppendRows 10MB hard limit with headroom for proto framing. - In-flight Writes are tracked on a WaitGroup; Close blocks on it so Finalize/BatchCommit cannot race with storage-client teardown. - Default mode (default_stream) is unchanged. Auto-create tables (auto_create_table: true) - schemaResolver gains a *tableCreator. A 404 from Metadata triggers creator.Ensure (idempotent on AlreadyExists) and retries Metadata. - Works for both static and interpolated table names; every auto-created table receives the same configured schema and partition/clustering settings. Schema / time_partitioning / clustering config - New schema YAML field: column name + canonical BQ type (aliases normalised) + mode (NULLABLE/REQUIRED/REPEATED), recursive for RECORD. - New time_partitioning block (DAY/HOUR/MONTH/YEAR + optional field, expiration, require_filter). Absence of the type field is the sentinel for "not configured". - New clustering string list (cap of 4 enforced). - Parser validates that partitioning.field exists in schema with a DATE/TIMESTAMP/DATETIME type and that clustering columns are in schema. Observability - bigquery_write_api_cached_streams (gauge) set on every cache mutation in getOrCreateStream, evictStream, and the idle-sweep. - bigquery_write_api_streams_evicted_total (counter) covers all eviction paths (LRU + on-error + idle-sweep). Tests - Unit tests for: write_mode/auto_create_table parsing, schema parsing + validation, time_partitioning + clustering parsing, schema → BQ conversion, table creator helpers, chunking, pending stream lifecycle plumbing. - Concurrent stress tests under -race for the schema resolver (Resolve/Evict/sf.Do contention) and stream cache (insert + LRU + evict contention). - Integration tests: TestIntegrationAutoCreateTable runs against the goccy emulator. TestIntegrationPendingStreamMode is gated with t.Skip because the emulator does not implement Pending streams or BatchCommitWriteStreams. Docs regenerated with the new fields and a "Write modes" / "Auto-create" / "EOS caveat" section in the output description.
1787486 to
f590cba
Compare
|
Commits Review LGTM |
| schema bigquery.Schema | ||
| partitioning *bigquery.TimePartitioning | ||
| clustering *bigquery.Clustering | ||
| requirePartFltr bool |
There was a problem hiding this comment.
Is it worth just calling this requirePartitionFilter similar to how it's defined in bigquery.TableMetadata?
No description provided.