-
Notifications
You must be signed in to change notification settings - Fork 749
[KVCache][Feature] Implement Cache Manager V1 Storage Transfer and Prefetch (2/n) #7529
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
e9f0806
64cbe9f
80f5350
b440af6
647bc9b
ed35db1
4af7d71
2d87335
bdf2632
8729a87
3a0e50b
a049c1b
7716126
bb98c75
d141dd6
4275e1a
2e1dc2f
9c66d8a
c68f0a6
32d115f
ebf0c5f
5a67fbe
7936366
80e7281
570840a
aabfd97
544bd73
a3be152
0677e15
5857171
b50b6da
8dea20a
a656c6e
d889bef
4dcda16
5cbabea
b4f54a9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,7 @@ | |
| import os | ||
| import threading | ||
| import time | ||
| import traceback | ||
| from concurrent.futures import ThreadPoolExecutor | ||
| from typing import TYPE_CHECKING, Any, Dict, List, Optional | ||
|
|
||
|
|
@@ -111,6 +112,11 @@ def write_policy(self) -> Optional[str]: | |
| return self.cache_config.write_policy | ||
| return None | ||
|
|
||
| @property | ||
| def storage_enabled(self) -> bool: | ||
This comment was marked as outdated.
Sorry, something went wrong.
This comment was marked as outdated.
Sorry, something went wrong. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 建议 建议在 # transfer_manager.py
@property
def has_storage_connector(self) -> bool:
return self._storage_connector is not None然后 return self._transfer_manager.has_storage_connector |
||
| """Whether a storage connector is available for Host↔Storage transfers.""" | ||
This comment was marked as outdated.
Sorry, something went wrong. |
||
| return getattr(self._transfer_manager, "_storage_connector", None) is not None | ||
This comment was marked as outdated.
Sorry, something went wrong. |
||
|
|
||
| def _should_wait_for_swap_out(self) -> bool: | ||
| """ | ||
| Determine if swap-out operations should wait synchronously. | ||
|
|
@@ -147,7 +153,17 @@ def submit_swap_tasks( | |
| # Note: evict returns LayerDoneCounter but we don't wait on it layer-by-layer | ||
| # (except in write_back mode where we wait synchronously via wait_all) | ||
| if evict_metadata is not None: | ||
| evict_counter = self.evict_device_to_host(evict_metadata) | ||
| # Build StorageMetadata when storage is enabled and hash_values are available, | ||
| # so that D2H eviction is automatically followed by Host→Storage backup. | ||
| storage_metadata = None | ||
| if self.storage_enabled and evict_metadata.hash_values: | ||
| storage_metadata = StorageMetadata( | ||
| hash_values=evict_metadata.hash_values, | ||
| block_ids=evict_metadata.dst_block_ids, | ||
| direction="evict", | ||
| ) | ||
|
|
||
| evict_counter = self.evict_device_to_host(evict_metadata, storage_metadata) | ||
| self._pending_evict_counters.append(evict_counter) | ||
|
|
||
| # Step 3: For write_back, wait for evict to complete before submitting swap-in | ||
|
|
@@ -622,6 +638,15 @@ def initialize_host_cache( | |
| # Share host_cache_kvs_map with transfer manager | ||
| self._transfer_manager.set_host_cache_kvs_map(self.host_cache_kvs_map) | ||
|
|
||
| # Propagate block shape so transfer manager can compute per-block byte offsets | ||
| # for prefetch_from_storage / backup_to_storage. | ||
| self._transfer_manager.set_host_block_shape( | ||
| key_shape=self._host_key_cache_shape, | ||
| value_shape=self._host_value_cache_shape, | ||
| scale_shape=self._host_cache_scale_shape, | ||
| cache_item_bytes=cache_item_bytes, | ||
| ) | ||
|
|
||
| def get_host_cache_kvs_map(self) -> Dict[str, Any]: | ||
| """ | ||
| Get the Host KV Cache pointer dictionary. | ||
|
|
@@ -641,6 +666,7 @@ def _submit_swap_task( | |
| transfer_fn_all: callable, | ||
| transfer_fn_layer: callable, | ||
| force_all_layers: bool = False, | ||
| on_success: callable = None, | ||
| ) -> LayerDoneCounter: | ||
| """ | ||
| Submit a single swap transfer task (internal method). | ||
|
|
@@ -658,6 +684,8 @@ def _submit_swap_task( | |
| transfer_fn_all: All-layer transfer function, signature (src_ids, dst_ids) -> bool. | ||
| transfer_fn_layer: Layer-by-layer transfer function, signature (layer_indices, on_layer_complete, src_ids, dst_ids) -> bool. | ||
| force_all_layers: If True, always use all-layers mode (used for D2H evict). | ||
| on_success: Optional callback invoked after a successful transfer, | ||
| signature () -> None. Runs in the same worker thread. | ||
|
|
||
| Returns: | ||
| LayerDoneCounter instance for tracking layer completion. | ||
|
|
@@ -755,9 +783,14 @@ def _do_transfer(): | |
| meta.success = result.success | ||
| meta.error_message = result.error_message | ||
|
|
||
| except Exception as e: | ||
| import traceback | ||
| # Chain next task on success | ||
| if result.success and on_success is not None: | ||
| try: | ||
| on_success() | ||
| except Exception as cb_err: | ||
| logger.error(f"[SwapTask] on_success callback failed: {cb_err}\n" f"{traceback.format_exc()}") | ||
|
|
||
| except Exception as e: | ||
| traceback.print_exc() | ||
| logger.error( | ||
| f"[SwapTask] {src_location.value}->{dst_location.value} " | ||
|
|
@@ -807,6 +840,7 @@ def load_host_to_device( | |
| def evict_device_to_host( | ||
| self, | ||
| swap_metadata: CacheSwapMetadata, | ||
| storage_metadata: Optional[StorageMetadata] = None, | ||
| ) -> LayerDoneCounter: | ||
| """ | ||
| Evict device cache to host (async). | ||
|
|
@@ -818,17 +852,32 @@ def evict_device_to_host( | |
| swap_metadata: CacheSwapMetadata containing: | ||
| - src_block_ids: Source device block IDs | ||
| - dst_block_ids: Destination host block IDs | ||
| storage_metadata: Optional StorageMetadata. If provided, a | ||
| backup_host_to_storage task is automatically submitted | ||
| after the D2H transfer succeeds (chained in the same | ||
| worker thread). | ||
|
|
||
| Returns: | ||
| LayerDoneCounter for tracking layer completion. | ||
| """ | ||
| on_success = None | ||
| if storage_metadata is not None: | ||
| host_block_ids = swap_metadata.dst_block_ids | ||
|
|
||
| def _on_success_backup(): | ||
| logger.debug(f"[EvictAndBackup] D2H done, chaining backup to storage " f"host_blocks={host_block_ids}") | ||
| self.backup_host_to_storage(host_block_ids, storage_metadata) | ||
|
|
||
| on_success = _on_success_backup | ||
|
|
||
| layer_counter = self._submit_swap_task( | ||
| meta=swap_metadata, | ||
| src_location=CacheLevel.DEVICE, | ||
| dst_location=CacheLevel.HOST, | ||
| transfer_fn_all=lambda src_ids, dst_ids: self._transfer_manager.evict_to_host_async(src_ids, dst_ids), | ||
| transfer_fn_layer=None, | ||
| force_all_layers=True, # Eviction always uses output_stream for all-layers async transfer | ||
| on_success=on_success, | ||
| ) | ||
| return layer_counter | ||
|
|
||
|
|
@@ -854,35 +903,43 @@ def prefetch_from_storage( | |
|
|
||
| handler = AsyncTaskHandler() | ||
|
|
||
| # TODO: Implement storage prefetch logic | ||
| handler.set_error("Storage prefetch not implemented yet") | ||
|
|
||
| return handler | ||
|
|
||
| def backup_device_to_storage( | ||
| self, | ||
| device_block_ids: List[int], | ||
| metadata: StorageMetadata, | ||
| ) -> AsyncTaskHandler: | ||
| """ | ||
| Backup device cache to storage (async). | ||
| hash_values = metadata.hash_values | ||
| block_ids = metadata.block_ids | ||
|
|
||
| Backup KV cache from device memory to external storage | ||
| for reuse by subsequent requests. | ||
| if not hash_values or not block_ids: | ||
| logger.info(f"[StoragePrefetch] skip: empty hash_values={hash_values}, block_ids={block_ids}") | ||
| handler.set_error("Empty hash_values or block_ids in StorageMetadata") | ||
| return handler | ||
|
|
||
| Args: | ||
| device_block_ids: Device block IDs to backup. | ||
| metadata: Storage transfer metadata. | ||
|
|
||
| Returns: | ||
| AsyncTaskHandler for tracking the async transfer task. | ||
| """ | ||
|
|
||
| handler = AsyncTaskHandler() | ||
| def _do_prefetch(): | ||
| try: | ||
| start_time = time.time() | ||
| results = self._transfer_manager.prefetch_from_storage( | ||
| hash_list=hash_values, | ||
| cpu_block_list=block_ids, | ||
| ) | ||
| elapsed = time.time() - start_time | ||
|
|
||
| # TODO: Implement storage backup logic | ||
| handler.set_error("Storage backup not implemented yet") | ||
| success = all(results) | ||
| if success: | ||
| logger.debug( | ||
| f"[StoragePrefetch] success hash_values={hash_values} " | ||
| f"block_ids={block_ids} elapsed={elapsed*1000:.3f}ms" | ||
| ) | ||
| handler.set_result(results) | ||
| else: | ||
| failed_indices = [i for i, ok in enumerate(results) if not ok] | ||
| logger.warning( | ||
| f"[StoragePrefetch] partial failure " | ||
| f"failed_indices={failed_indices} elapsed={elapsed*1000:.3f}ms" | ||
| ) | ||
| handler.set_error(f"Storage prefetch failed for blocks at indices {failed_indices}") | ||
| except Exception as e: | ||
| traceback.print_exc() | ||
| logger.error(f"[StoragePrefetch] EXCEPTION: {e}\n{traceback.format_exc()}") | ||
| handler.set_error(str(e)) | ||
|
|
||
| self._executor.submit(_do_prefetch) | ||
| return handler | ||
|
|
||
| def backup_host_to_storage( | ||
|
|
@@ -905,9 +962,42 @@ def backup_host_to_storage( | |
|
|
||
| handler = AsyncTaskHandler() | ||
|
|
||
| # TODO: Implement storage backup logic | ||
| handler.set_error("Storage backup not implemented yet") | ||
| hash_values = metadata.hash_values | ||
|
|
||
| if not host_block_ids or not hash_values: | ||
| logger.info(f"[StorageBackup] skip: empty host_block_ids={host_block_ids}, " f"hash_values={hash_values}") | ||
| handler.set_error("Empty host_block_ids or hash_values in StorageMetadata") | ||
| return handler | ||
|
|
||
| def _do_backup(): | ||
| try: | ||
| start_time = time.time() | ||
| results = self._transfer_manager.backup_to_storage( | ||
| cpu_block_list=host_block_ids, | ||
| hash_list=hash_values, | ||
| ) | ||
| elapsed = time.time() - start_time | ||
|
|
||
| success = all(results) | ||
| if success: | ||
| logger.debug( | ||
| f"[StorageBackup] success host_block_ids={host_block_ids} " | ||
| f"hash_values={hash_values} elapsed={elapsed*1000:.3f}ms" | ||
| ) | ||
| handler.set_result(results) | ||
| else: | ||
| failed_indices = [i for i, ok in enumerate(results) if not ok] | ||
| logger.warning( | ||
| f"[StorageBackup] partial failure " | ||
| f"failed_indices={failed_indices} elapsed={elapsed*1000:.3f}ms" | ||
| ) | ||
| handler.set_error(f"Storage backup failed for blocks at indices {failed_indices}") | ||
| except Exception as e: | ||
| traceback.print_exc() | ||
| logger.error(f"[StorageBackup] EXCEPTION: {e}\n{traceback.format_exc()}") | ||
| handler.set_error(str(e)) | ||
|
|
||
| self._executor.submit(_do_backup) | ||
| return handler | ||
|
|
||
| def send_to_node( | ||
|
|
||
This comment was marked as outdated.
Sorry, something went wrong.
Uh oh!
There was an error while loading. Please reload this page.