Skip to content

Commit d00d964

Browse files
committed
[KVCache] unify host block allocation through allocate_host_blocks
## Motivation 直接调用 `_host_pool.allocate()` 时不会触发驱逐,导致在 host block 空闲不足 但存在 evictable block 的情况下,`can_allocate_host_blocks` 返回 True 但分配 静默失败。 ## Modifications - `prepare_prefetch_metadata`:将 `_host_pool.allocate()` 替换为 `allocate_host_blocks()`, 空闲不足时自动驱逐 evictable host block 后再分配 - 删除未被生产代码调用的 `offload_to_host` 方法及其全部测试用例
1 parent f1850f5 commit d00d964

2 files changed

Lines changed: 3 additions & 101 deletions

File tree

fastdeploy/cache_manager/v1/cache_manager.py

Lines changed: 3 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -909,45 +909,6 @@ def check_and_add_pending_backup(
909909
except Exception as e:
910910
logger.error(f"check_and_add_pending_backup error: {e}, {str(traceback.format_exc())}")
911911

912-
# ============ Host/Device Transfer Coordination ============
913-
914-
def offload_to_host(self, block_indices: List[int]) -> bool:
915-
"""
916-
Offload blocks from device to host memory.
917-
918-
This is a coordination method. Actual data transfer happens in Worker.
919-
920-
Args:
921-
block_indices: List of block indices to offload
922-
923-
Returns:
924-
True if successful, False otherwise
925-
"""
926-
try:
927-
with self._lock:
928-
# Allocate host blocks
929-
host_indices = self._host_pool.allocate(len(block_indices))
930-
if host_indices is None or len(host_indices) != len(block_indices):
931-
# Not enough host memory, release what we allocated
932-
if host_indices:
933-
self._host_pool.release(host_indices)
934-
return False
935-
936-
# Perform the offload (actual data transfer would happen in Worker)
937-
for i, dev_idx in enumerate(block_indices):
938-
host_idx = host_indices[i]
939-
metadata = self._device_pool.get_metadata(dev_idx)
940-
if metadata:
941-
self._host_pool.set_metadata(host_idx, metadata)
942-
943-
# Release device blocks
944-
self._device_pool.release(block_indices)
945-
946-
return True
947-
except Exception as e:
948-
logger.error(f"offload_to_host error: {e}, {str(traceback.format_exc())}")
949-
return False
950-
951912
def load_from_host(self, block_indices: List[int]) -> bool:
952913
"""
953914
Load blocks from host to device memory.
@@ -1063,9 +1024,9 @@ def prepare_prefetch_metadata(
10631024
if not self.can_allocate_host_blocks(len(storage_hashes)):
10641025
return []
10651026

1066-
# Allocate host blocks for prefetch
1067-
host_block_ids = self._host_pool.allocate(len(storage_hashes))
1068-
if host_block_ids is None or len(host_block_ids) == 0:
1027+
# Allocate host blocks for prefetch (evicts evictable host blocks if needed)
1028+
host_block_ids = self.allocate_host_blocks(len(storage_hashes))
1029+
if not host_block_ids:
10691030
return []
10701031

10711032
blocks = list(zip(storage_hashes, host_block_ids))

tests/cache_manager/v1/test_cache_manager.py

Lines changed: 0 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -788,65 +788,6 @@ def test_prefetch_node_map_initially_empty(self):
788788
self.assertEqual(len(cache_manager._prefetch_node_map), 0)
789789

790790

791-
class TestCacheManagerOffloadToHost(unittest.TestCase):
792-
"""Tests for CacheManager.offload_to_host."""
793-
794-
def test_offload_frees_device_blocks(self):
795-
"""After offload, device blocks should be released."""
796-
cm = create_cache_manager(total_block_num=20, num_cpu_blocks=20)
797-
device_blocks = cm._device_pool.allocate(4)
798-
self.assertIsNotNone(device_blocks)
799-
free_before = cm.num_free_device_blocks
800-
801-
success = cm.offload_to_host(device_blocks)
802-
803-
self.assertTrue(success)
804-
self.assertEqual(cm.num_free_device_blocks, free_before + 4)
805-
806-
def test_offload_allocates_host_blocks(self):
807-
"""After offload, host blocks should be consumed."""
808-
cm = create_cache_manager(total_block_num=20, num_cpu_blocks=20)
809-
device_blocks = cm._device_pool.allocate(3)
810-
free_host_before = cm.num_free_host_blocks
811-
812-
cm.offload_to_host(device_blocks)
813-
814-
self.assertEqual(cm.num_free_host_blocks, free_host_before - 3)
815-
816-
def test_offload_fails_when_no_host_blocks(self):
817-
"""Offload should return False when host pool is exhausted."""
818-
cm = create_cache_manager(total_block_num=20, num_cpu_blocks=0)
819-
device_blocks = cm._device_pool.allocate(2)
820-
821-
success = cm.offload_to_host(device_blocks)
822-
self.assertFalse(success)
823-
824-
def test_offload_copies_device_metadata_to_host(self):
825-
"""Metadata on device blocks should be copied to host blocks."""
826-
from fastdeploy.cache_manager.v1.metadata import CacheBlockMetadata
827-
828-
cm = create_cache_manager(total_block_num=20, num_cpu_blocks=20)
829-
device_blocks = cm._device_pool.allocate(1)
830-
block_id = device_blocks[0]
831-
meta = CacheBlockMetadata(block_id=block_id, device_id=0, block_size=64, ref_count=5)
832-
cm._device_pool.set_metadata(block_id, meta)
833-
834-
cm.offload_to_host(device_blocks)
835-
836-
# Find the newly used host block (last used)
837-
used_host = list(cm._host_pool._used_blocks)
838-
self.assertEqual(len(used_host), 1)
839-
host_meta = cm._host_pool.get_metadata(used_host[0])
840-
self.assertIsNotNone(host_meta)
841-
self.assertEqual(host_meta.ref_count, 5)
842-
843-
def test_offload_empty_list_returns_true(self):
844-
"""Offloading empty list succeeds."""
845-
cm = create_cache_manager()
846-
success = cm.offload_to_host([])
847-
self.assertTrue(success)
848-
849-
850791
# ---------------------------------------------------------------------------
851792
# load_from_host
852793
# ---------------------------------------------------------------------------

0 commit comments

Comments
 (0)