@@ -55,6 +55,100 @@ write_lmcache_rocm_mp_patch() {
5555"""Runtime compatibility for LMCache MP on ROCm Kimi MLA KV caches."""
5656
5757import os
58+ import threading
59+
60+ if os.environ.get("LMCACHE_ROCM_DEMAND_PINNED_ALLOCATOR") == "1":
61+ from lmcache.v1 import lazy_memory_allocator as _lazy_memory_allocator
62+
63+ _LazyMemoryAllocator = _lazy_memory_allocator.LazyMemoryAllocator
64+
65+ if not getattr(_LazyMemoryAllocator, "_agentic_rocm_demand_patch", False):
66+ _orig_init = _LazyMemoryAllocator.__init__
67+ _orig_allocate = _LazyMemoryAllocator.allocate
68+ _orig_batched_allocate = _LazyMemoryAllocator.batched_allocate
69+
70+ def _expand_to(self, target_size: int) -> None:
71+ target_size = min(
72+ self._final_size,
73+ _lazy_memory_allocator.align_to(target_size, self.PIN_CHUNK_SIZE),
74+ )
75+ lock = self._agentic_rocm_demand_expand_lock
76+ with lock:
77+ if target_size <= self._curr_size:
78+ return
79+
80+ start_size = self._curr_size
81+ while self._curr_size < target_size:
82+ commit_start = self._curr_size
83+ commit_target = min(target_size, self._curr_size + self.COMMIT_SIZE)
84+ while self._curr_size < commit_target:
85+ self._pin_memory_chunk(self._curr_size, self.PIN_CHUNK_SIZE)
86+ self._curr_size += self.PIN_CHUNK_SIZE
87+ self._commit_expansion(self._curr_size - commit_start)
88+
89+ self._log_expansion_progress(self._curr_size - start_size)
90+
91+ def _retry_with_demand_expansion(self, allocate_once):
92+ obj = allocate_once()
93+ step_gb = float(os.environ.get("LMCACHE_ROCM_DEMAND_PINNED_STEP_GB", "64"))
94+ step_bytes = max(self.COMMIT_SIZE, int(step_gb * (1024**3)))
95+
96+ while obj is None and self._curr_size < self._final_size:
97+ _expand_to(self, self._curr_size + step_bytes)
98+ obj = allocate_once()
99+
100+ return obj
101+
102+ def _patched_init(self, *args, **kwargs):
103+ _orig_init(self, *args, **kwargs)
104+ self._agentic_rocm_demand_expand_lock = threading.Lock()
105+
106+ # LMCache MP's upstream LazyMemoryAllocator currently expands to
107+ # the final pinned size in a background thread. On ROCm Kimi TP4,
108+ # vLLM reaches KV-cache registration only after that 2.5 TB pool
109+ # is fully pinned, and the server-side IPC open path can stall
110+ # before acknowledging register_kv_caches. Keep the same final
111+ # capacity, but pin/commit extra host memory only when L1
112+ # allocations actually need it.
113+ self._stop_expand.set()
114+ self._expand_thread.join()
115+ _lazy_memory_allocator.logger.info(
116+ "Agentic ROCm patch: using demand-driven LMCache pinned "
117+ "memory expansion; final capacity remains %s MB",
118+ self._final_size >> 20,
119+ )
120+
121+ def _patched_allocate(
122+ self,
123+ shapes,
124+ dtypes,
125+ fmt=_lazy_memory_allocator.MemoryFormat.UNDEFINED,
126+ allocator_type=None,
127+ ):
128+ return _retry_with_demand_expansion(
129+ self,
130+ lambda: _orig_allocate(self, shapes, dtypes, fmt, allocator_type),
131+ )
132+
133+ def _patched_batched_allocate(
134+ self,
135+ shapes,
136+ dtypes,
137+ batch_size,
138+ fmt=_lazy_memory_allocator.MemoryFormat.UNDEFINED,
139+ allocator_type=None,
140+ ):
141+ return _retry_with_demand_expansion(
142+ self,
143+ lambda: _orig_batched_allocate(
144+ self, shapes, dtypes, batch_size, fmt, allocator_type
145+ ),
146+ )
147+
148+ _LazyMemoryAllocator.__init__ = _patched_init
149+ _LazyMemoryAllocator.allocate = _patched_allocate
150+ _LazyMemoryAllocator.batched_allocate = _patched_batched_allocate
151+ _LazyMemoryAllocator._agentic_rocm_demand_patch = True
58152
59153if os.environ.get("LMCACHE_ROCM_MP_BLOCK_FALLBACK") == "1":
60154 import torch
326420 write_lmcache_rocm_mp_patch " $LMCACHE_ROCM_PATCH_DIR "
327421 export LMCACHE_ROCM_MP_BLOCK_FALLBACK=1
328422 export LMCACHE_ROCM_MP_BLOCK_FALLBACK_DTYPE=bfloat16
423+ export LMCACHE_ROCM_DEMAND_PINNED_ALLOCATOR=1
329424 export PYTHONPATH=" $LMCACHE_ROCM_PATCH_DIR ${PYTHONPATH: +: $PYTHONPATH } "
330425 python3 -c " import lmcache.integration.vllm.lmcache_mp_connector" > /dev/null
331426
0 commit comments