|
14 | 14 |
|
15 | 15 | from __future__ import annotations |
16 | 16 |
|
| 17 | +import concurrent.futures |
17 | 18 | import math |
18 | 19 | import threading |
19 | 20 | from typing import Literal, Mapping, Optional, Sequence, Tuple |
|
28 | 29 | from bigframes import exceptions as bfe |
29 | 30 | import bigframes.constants |
30 | 31 | import bigframes.core |
31 | | -from bigframes.core import bq_data, compile, local_data, rewrite |
| 32 | +from bigframes.core import bq_data, compile, rewrite |
32 | 33 | from bigframes.core.compile.sqlglot import sql as sg_sql |
33 | 34 | from bigframes.core.compile.sqlglot import sqlglot_ir |
34 | 35 | import bigframes.core.events |
@@ -514,13 +515,35 @@ def _substitute_large_local_sources(self, original_root: nodes.BigFrameNode): |
514 | 515 | Replace large local sources with the uploaded version of those datasources. |
515 | 516 | """ |
516 | 517 | # Step 1: Upload all previously un-uploaded data |
| 518 | + needs_upload = [] |
517 | 519 | for leaf in original_root.unique_nodes(): |
518 | 520 | if isinstance(leaf, nodes.ReadLocalNode): |
519 | 521 | if ( |
520 | 522 | leaf.local_data_source.metadata.total_bytes |
521 | 523 | > bigframes.constants.MAX_INLINE_BYTES |
522 | 524 | ): |
523 | | - self._upload_local_data(leaf.local_data_source) |
| 525 | + needs_upload.append(leaf.local_data_source) |
| 526 | + |
| 527 | + futures = [] |
| 528 | + try: |
| 529 | + for local_source in needs_upload: |
| 530 | + future = self.loader.read_data_async( |
| 531 | + local_source, bigframes.core.guid.generate_guid() |
| 532 | + ) |
| 533 | + future.add_done_callback( |
| 534 | + lambda f: self.cache.cache_remote_replacement( |
| 535 | + local_source, f.result() |
| 536 | + ) |
| 537 | + ) |
| 538 | + futures.append(future) |
| 539 | + concurrent.futures.wait(futures) |
| 540 | + for future in futures: |
| 541 | + future.result() |
| 542 | + except Exception as e: |
| 543 | + # cancel all futures |
| 544 | + for future in futures: |
| 545 | + future.cancel() |
| 546 | + raise e |
524 | 547 |
|
525 | 548 | # Step 2: Replace local scans with remote scans |
526 | 549 | def map_local_scans(node: nodes.BigFrameNode): |
@@ -550,18 +573,6 @@ def map_local_scans(node: nodes.BigFrameNode): |
550 | 573 |
|
551 | 574 | return original_root.bottom_up(map_local_scans) |
552 | 575 |
|
553 | | - def _upload_local_data(self, local_table: local_data.ManagedArrowTable): |
554 | | - if self.cache.get_uploaded_local_data(local_table) is not None: |
555 | | - return |
556 | | - # Lock prevents concurrent repeated work, but slows things down. |
557 | | - # Might be better as a queue and a worker thread |
558 | | - with self._upload_lock: |
559 | | - if self.cache.get_uploaded_local_data(local_table) is None: |
560 | | - uploaded = self.loader.load_data_or_write_data( |
561 | | - local_table, bigframes.core.guid.generate_guid() |
562 | | - ) |
563 | | - self.cache.cache_remote_replacement(local_table, uploaded) |
564 | | - |
565 | 576 | def _execute_plan_gbq( |
566 | 577 | self, |
567 | 578 | plan: nodes.BigFrameNode, |
|
0 commit comments