Skip to content

Commit ef856b0

Browse files
perf(bigframes): Improve write api upload throughput (#16641)
1 parent 3cc859c commit ef856b0

File tree

1 file changed

+37
-21
lines changed
  • packages/bigframes/bigframes/session

1 file changed

+37
-21
lines changed

packages/bigframes/bigframes/session/loader.py

Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,10 @@
5252
import pandas
5353
import pyarrow as pa
5454
from google.cloud import bigquery_storage_v1
55-
from google.cloud.bigquery_storage_v1 import types as bq_storage_types
55+
from google.cloud.bigquery_storage_v1 import (
56+
types as bq_storage_types,
57+
writer as bq_storage_writer,
58+
)
5659

5760
import bigframes._tools
5861
import bigframes._tools.strings
@@ -520,38 +523,51 @@ def write_data(
520523
)
521524
serialized_schema = schema.serialize().to_pybytes()
522525

523-
def stream_worker(work: Iterator[pa.RecordBatch]) -> str:
526+
def stream_worker(
527+
work: Iterator[pa.RecordBatch], max_outstanding: int = 5
528+
) -> str:
524529
requested_stream = bq_storage_types.WriteStream(
525530
type_=bq_storage_types.WriteStream.Type.PENDING
526531
)
527532
stream = self._write_client.create_write_stream(
528533
parent=parent, write_stream=requested_stream
529534
)
530-
stream_name = stream.name
535+
base_request = bq_storage_types.AppendRowsRequest(
536+
write_stream=stream.name,
537+
)
538+
base_request.arrow_rows.writer_schema.serialized_schema = serialized_schema
531539

532-
def request_generator():
533-
current_offset = 0
534-
for batch in work:
535-
request = bq_storage_types.AppendRowsRequest(
536-
write_stream=stream.name, offset=current_offset
537-
)
540+
stream_manager = bq_storage_writer.AppendRowsStream(
541+
client=self._write_client, initial_request_template=base_request
542+
)
543+
stream_name = stream.name
544+
current_offset = 0
545+
futures: list[bq_storage_writer.AppendRowsFuture] = []
546+
547+
for batch in work:
548+
if len(futures) >= max_outstanding:
549+
row_errors = futures.pop(0).result().row_errors
550+
if row_errors:
551+
raise ValueError(
552+
f"Problem loading rows: {row_errors}. {constants.FEEDBACK_LINK}"
553+
)
538554

539-
request.arrow_rows.writer_schema.serialized_schema = (
540-
serialized_schema
541-
)
542-
request.arrow_rows.rows.serialized_record_batch = (
543-
batch.serialize().to_pybytes()
544-
)
555+
request = bq_storage_types.AppendRowsRequest(offset=current_offset)
556+
request.arrow_rows.rows.serialized_record_batch = (
557+
batch.serialize().to_pybytes()
558+
)
545559

546-
yield request
547-
current_offset += batch.num_rows
560+
futures.append(stream_manager.send(request))
561+
current_offset += batch.num_rows
548562

549-
responses = self._write_client.append_rows(requests=request_generator())
550-
for resp in responses:
551-
if resp.row_errors:
563+
for future in futures:
564+
row_errors = future.result().row_errors
565+
if row_errors:
552566
raise ValueError(
553-
f"Errors in stream {stream_name}: {resp.row_errors}"
567+
f"Problem loading rows: {row_errors}. {constants.FEEDBACK_LINK}"
554568
)
569+
570+
stream_manager.close()
555571
self._write_client.finalize_write_stream(name=stream_name)
556572
return stream_name
557573

0 commit comments

Comments
 (0)