perf(bigframes): Improve write api upload throughput#16641
perf(bigframes): Improve write api upload throughput#16641TrevorBergeron wants to merge 2 commits intomainfrom
Conversation
There was a problem hiding this comment.
Code Review
This pull request refactors the stream_worker function in loader.py to use AppendRowsStream and a futures-based approach for appending rows to BigQuery, which introduces concurrency control via a max_outstanding parameter. The review feedback identifies a regression where row-level errors are no longer checked, potentially leading to silent data loss, and suggests using a try...finally block to ensure the stream manager is properly closed.
| stream_manager = bq_storage_writer.AppendRowsStream( | ||
| client=self._write_client, initial_request_template=base_request | ||
| ) | ||
| stream_name = stream.name | ||
| current_offset = 0 | ||
| futures: list[bq_storage_writer.AppendRowsFuture] = [] | ||
|
|
||
| def request_generator(): | ||
| current_offset = 0 | ||
| for batch in work: | ||
| request = bq_storage_types.AppendRowsRequest( | ||
| write_stream=stream.name, offset=current_offset | ||
| ) | ||
| for batch in work: | ||
| if len(futures) >= max_outstanding: | ||
| futures.pop(0).result() | ||
|
|
||
| request.arrow_rows.writer_schema.serialized_schema = ( | ||
| serialized_schema | ||
| ) | ||
| request.arrow_rows.rows.serialized_record_batch = ( | ||
| batch.serialize().to_pybytes() | ||
| ) | ||
| request = bq_storage_types.AppendRowsRequest(offset=current_offset) | ||
| request.arrow_rows.rows.serialized_record_batch = ( | ||
| batch.serialize().to_pybytes() | ||
| ) | ||
|
|
||
| yield request | ||
| current_offset += batch.num_rows | ||
| futures.append(stream_manager.send(request)) | ||
| current_offset += batch.num_rows | ||
|
|
||
| responses = self._write_client.append_rows(requests=request_generator()) | ||
| for resp in responses: | ||
| if resp.row_errors: | ||
| raise ValueError( | ||
| f"Errors in stream {stream_name}: {resp.row_errors}" | ||
| ) | ||
| for future in futures: | ||
| future.result() | ||
|
|
||
| stream_manager.close() |
There was a problem hiding this comment.
The new implementation using AppendRowsStream and futures is a great performance improvement. However, it currently omits the row_errors check that was present in the previous implementation. This is a regression that could lead to silent data loss if specific rows fail to be appended due to data-related issues. Additionally, stream_manager.close() should be wrapped in a try...finally block to ensure resources are cleaned up even if an exception occurs. Raising an explicit exception for row errors aligns with the repository's preference for making unexpected states explicit in data streams.
stream_manager = bq_storage_writer.AppendRowsStream(
client=self._write_client, initial_request_template=base_request
)
try:
for batch in work:
if len(futures) >= max_outstanding:
resp = futures.pop(0).result()
if resp.row_errors:
raise ValueError(
f"Errors in stream {stream_name}: {resp.row_errors}"
)
request = bq_storage_types.AppendRowsRequest(offset=current_offset)
request.arrow_rows.rows.serialized_record_batch = (
batch.serialize().to_pybytes()
)
futures.append(stream_manager.send(request))
current_offset += batch.num_rows
for future in futures:
resp = future.result()
if resp.row_errors:
raise ValueError(
f"Errors in stream {stream_name}: {resp.row_errors}"
)
finally:
stream_manager.close()References
- For data streams, if an unexpected state or error is detected, it is preferable to raise an exception to make the state explicit rather than attempting to handle it silently.
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
Fixes #<issue_number_goes_here> 🦕