Skip to content

perf(bigframes): Improve write api upload throughput#16641

Open
TrevorBergeron wants to merge 2 commits intomainfrom
tbergeron_write_api_redo
Open

perf(bigframes): Improve write api upload throughput#16641
TrevorBergeron wants to merge 2 commits intomainfrom
tbergeron_write_api_redo

Conversation

@TrevorBergeron
Copy link
Copy Markdown
Contributor

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:

  • Make sure to open an issue as a bug/issue before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea
  • Ensure the tests and linter pass
  • Code coverage does not decrease (if any source code was changed)
  • Appropriate docs were updated (if necessary)

Fixes #<issue_number_goes_here> 🦕

@TrevorBergeron TrevorBergeron requested review from a team as code owners April 13, 2026 22:16
@TrevorBergeron TrevorBergeron requested review from sycai and removed request for a team April 13, 2026 22:16
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the stream_worker function in loader.py to use AppendRowsStream and a futures-based approach for appending rows to BigQuery, which introduces concurrency control via a max_outstanding parameter. The review feedback identifies a regression where row-level errors are no longer checked, potentially leading to silent data loss, and suggests using a try...finally block to ensure the stream manager is properly closed.

Comment on lines +535 to +557
stream_manager = bq_storage_writer.AppendRowsStream(
client=self._write_client, initial_request_template=base_request
)
stream_name = stream.name
current_offset = 0
futures: list[bq_storage_writer.AppendRowsFuture] = []

def request_generator():
current_offset = 0
for batch in work:
request = bq_storage_types.AppendRowsRequest(
write_stream=stream.name, offset=current_offset
)
for batch in work:
if len(futures) >= max_outstanding:
futures.pop(0).result()

request.arrow_rows.writer_schema.serialized_schema = (
serialized_schema
)
request.arrow_rows.rows.serialized_record_batch = (
batch.serialize().to_pybytes()
)
request = bq_storage_types.AppendRowsRequest(offset=current_offset)
request.arrow_rows.rows.serialized_record_batch = (
batch.serialize().to_pybytes()
)

yield request
current_offset += batch.num_rows
futures.append(stream_manager.send(request))
current_offset += batch.num_rows

responses = self._write_client.append_rows(requests=request_generator())
for resp in responses:
if resp.row_errors:
raise ValueError(
f"Errors in stream {stream_name}: {resp.row_errors}"
)
for future in futures:
future.result()

stream_manager.close()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The new implementation using AppendRowsStream and futures is a great performance improvement. However, it currently omits the row_errors check that was present in the previous implementation. This is a regression that could lead to silent data loss if specific rows fail to be appended due to data-related issues. Additionally, stream_manager.close() should be wrapped in a try...finally block to ensure resources are cleaned up even if an exception occurs. Raising an explicit exception for row errors aligns with the repository's preference for making unexpected states explicit in data streams.

            stream_manager = bq_storage_writer.AppendRowsStream(
                client=self._write_client, initial_request_template=base_request
            )
            try:
                for batch in work:
                    if len(futures) >= max_outstanding:
                        resp = futures.pop(0).result()
                        if resp.row_errors:
                            raise ValueError(
                                f"Errors in stream {stream_name}: {resp.row_errors}"
                            )

                    request = bq_storage_types.AppendRowsRequest(offset=current_offset)
                    request.arrow_rows.rows.serialized_record_batch = (
                        batch.serialize().to_pybytes()
                    )

                    futures.append(stream_manager.send(request))
                    current_offset += batch.num_rows

                for future in futures:
                    resp = future.result()
                    if resp.row_errors:
                        raise ValueError(
                            f"Errors in stream {stream_name}: {resp.row_errors}"
                        )
            finally:
                stream_manager.close()
References
  1. For data streams, if an unexpected state or error is detected, it is preferable to raise an exception to make the state explicit rather than attempting to handle it silently.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant