-
Notifications
You must be signed in to change notification settings - Fork 68
refactor: Improve cache encapsulation #2525
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,88 @@ | ||
| # Copyright 2024 Google LLC | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import dataclasses | ||
| from typing import Mapping, Optional | ||
| import weakref | ||
|
|
||
| from bigframes.core import bq_data, local_data, nodes | ||
|
|
||
| SourceIdMapping = Mapping[str, str] | ||
|
|
||
|
|
||
| @dataclasses.dataclass(frozen=True) | ||
| class UploadedLocalData: | ||
| bq_source: bq_data.BigqueryDataSource | ||
| source_mapping: SourceIdMapping | ||
|
|
||
|
|
||
| class ExecutionCache: | ||
| def __init__(self): | ||
| # effectively two separate caches that don't interact | ||
| self._cached_executions: weakref.WeakKeyDictionary[ | ||
| nodes.BigFrameNode, bq_data.BigqueryDataSource | ||
| ] = weakref.WeakKeyDictionary() | ||
| # This is state, but probably should be handled by some storage manager rather than by the general plan caching? | ||
| self._uploaded_local_data: weakref.WeakKeyDictionary[ | ||
| local_data.ManagedArrowTable, | ||
| UploadedLocalData, | ||
| ] = weakref.WeakKeyDictionary() | ||
|
|
||
| def subsitute_cached_subplans(self, root: nodes.BigFrameNode) -> nodes.BigFrameNode: | ||
| def maybe_replace_node(node): | ||
|
||
| if node not in self._cached_executions: | ||
| return node | ||
| # Assumption: GBQ cached table uses field name as bq column name | ||
| scan_list = nodes.ScanList( | ||
| tuple(nodes.ScanItem(field.id, field.id.sql) for field in node.fields) | ||
| ) | ||
| bq_data = self._cached_executions[node] | ||
| cached_replacement = nodes.CachedTableNode( | ||
| source=bq_data, | ||
| scan_list=scan_list, | ||
| table_session=node.session, | ||
| original_node=node, | ||
| ) | ||
| assert node.schema == cached_replacement.schema | ||
| return cached_replacement | ||
|
|
||
| return nodes.top_down(root, maybe_replace_node) | ||
|
|
||
| def cache_results_table( | ||
| self, | ||
| original_root: nodes.BigFrameNode, | ||
| data: bq_data.BigqueryDataSource, | ||
| ): | ||
| self._cached_executions[original_root] = data | ||
|
|
||
| ## Local data upload caching | ||
| def cache_remote_replacement( | ||
| self, | ||
| local_data: local_data.ManagedArrowTable, | ||
| bq_data: bq_data.BigqueryDataSource, | ||
| ): | ||
| # bq table has one extra column for offsets, those are implicit for local data | ||
| assert len(local_data.schema.items) + 1 == len(bq_data.table.physical_schema) | ||
| mapping = { | ||
| local_data.schema.items[i].column: bq_data.table.physical_schema[i].name | ||
| for i in range(len(local_data.schema)) | ||
| } | ||
| self._uploaded_local_data[local_data] = UploadedLocalData(bq_data, mapping) | ||
|
|
||
| def get_uploaded_local_data( | ||
| self, local_data: local_data.ManagedArrowTable | ||
| ) -> Optional[UploadedLocalData]: | ||
| return self._uploaded_local_data.get(local_data) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,6 +22,7 @@ | |
|
|
||
| @dataclasses.dataclass(frozen=True) | ||
| class ExecutionSpec: | ||
| # This could probably be a set of side effects, rather than a single destination_spec | ||
|
||
| destination_spec: Union[TableOutputSpec, GcsOutputSpec, CacheSpec, None] = None | ||
| peek: Optional[int] = None | ||
| ordered: bool = ( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo nit: "This is state" -> "This is stateful" ?
Also, if we feel like there are unresolved issues with the current implementation, we'd better open a bug and put more details there, then link that bug with a
TODO(b/12345)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replaced with concrete description rather than ideas for changes. Don't want to file any bugs rn without a concrete payoff.