Skip to content

Commit 9ace68b

Browse files
perf(bigframes): Improve write api upload throughput
1 parent 919fd4c commit 9ace68b

File tree

1 file changed

+25
-22
lines changed
  • packages/bigframes/bigframes/session

1 file changed

+25
-22
lines changed

packages/bigframes/bigframes/session/loader.py

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
import pandas
5353
import pyarrow as pa
5454
from google.cloud import bigquery_storage_v1
55-
from google.cloud.bigquery_storage_v1 import types as bq_storage_types
55+
from google.cloud.bigquery_storage_v1 import types as bq_storage_types, writer as bq_storage_writer
5656

5757
import bigframes._tools
5858
import bigframes._tools.strings
@@ -520,38 +520,41 @@ def write_data(
520520
)
521521
serialized_schema = schema.serialize().to_pybytes()
522522

523-
def stream_worker(work: Iterator[pa.RecordBatch]) -> str:
523+
def stream_worker(work: Iterator[pa.RecordBatch], max_outstanding: int = 5) -> str:
524524
requested_stream = bq_storage_types.WriteStream(
525525
type_=bq_storage_types.WriteStream.Type.PENDING
526526
)
527527
stream = self._write_client.create_write_stream(
528528
parent=parent, write_stream=requested_stream
529529
)
530+
base_request = bq_storage_types.AppendRowsRequest(
531+
write_stream=stream.name,
532+
)
533+
base_request.arrow_rows.writer_schema.serialized_schema = serialized_schema
534+
535+
stream_manager = bq_storage_writer.AppendRowsStream(
536+
client=self._write_client, initial_request_template=base_request
537+
)
530538
stream_name = stream.name
539+
current_offset = 0
540+
futures: list[bq_storage_writer.AppendRowsFuture] = []
531541

532-
def request_generator():
533-
current_offset = 0
534-
for batch in work:
535-
request = bq_storage_types.AppendRowsRequest(
536-
write_stream=stream.name, offset=current_offset
537-
)
542+
for batch in work:
543+
if len(futures) >= max_outstanding:
544+
futures.pop(0).result()
538545

539-
request.arrow_rows.writer_schema.serialized_schema = (
540-
serialized_schema
541-
)
542-
request.arrow_rows.rows.serialized_record_batch = (
543-
batch.serialize().to_pybytes()
544-
)
546+
request = bq_storage_types.AppendRowsRequest(offset=current_offset)
547+
request.arrow_rows.rows.serialized_record_batch = (
548+
batch.serialize().to_pybytes()
549+
)
545550

546-
yield request
547-
current_offset += batch.num_rows
551+
futures.append(stream_manager.send(request))
552+
current_offset += batch.num_rows
548553

549-
responses = self._write_client.append_rows(requests=request_generator())
550-
for resp in responses:
551-
if resp.row_errors:
552-
raise ValueError(
553-
f"Errors in stream {stream_name}: {resp.row_errors}"
554-
)
554+
for future in futures:
555+
future.result()
556+
557+
stream_manager.close()
555558
self._write_client.finalize_write_stream(name=stream_name)
556559
return stream_name
557560

0 commit comments

Comments
 (0)