Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions aphrodite/config/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ class CacheConfig:
num_cpu_blocks: int | None = field(default=None, init=False)
"""The number of blocks to allocate for CPU memory."""

# Will be set after model loading.
kv_bytes_per_block: int | None = field(default=None, init=False)
"""The number of KV bytes per block, across all workers."""

kv_sharing_fast_prefill: bool = False
"""This feature is work in progress and no prefill optimization takes place
with this flag enabled currently.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import torch
from lmcache import utils
from lmcache.config import LMCacheEngineMetadata
from lmcache.logging import init_logger
from lmcache.observability import LMCStatsMonitor
from lmcache.utils import _lmcache_nvtx_annotate
from lmcache.v1.cache_engine import LMCacheEngine, LMCacheEngineBuilder
Expand All @@ -20,10 +19,6 @@
VLLMPagedMemGPUConnectorV2,
VLLMPagedMemLayerwiseGPUConnector)
from lmcache.v1.internal_api_server.api_server import InternalAPIServer
from lmcache.v1.lookup_client import LookupClientFactory
from lmcache.v1.lookup_client.lmcache_async_lookup_client import (
LMCacheAsyncLookupServer)
from lmcache.v1.offload_server.zmq_server import ZMQOffloadServer
from lmcache.v1.plugin.plugin_launcher import PluginLauncher

from aphrodite.common.sampling_params import SamplingParams
Expand All @@ -35,11 +30,16 @@
lmcache_get_or_create_config, mla_enabled)
from aphrodite.distributed.parallel_state import (
get_tensor_model_parallel_rank, get_tp_group)
from aphrodite.logger import init_logger
from aphrodite.utils.math_utils import cdiv
from aphrodite.utils.torch_utils import get_kv_cache_torch_dtype
from aphrodite.v1.core.sched.output import SchedulerOutput
from aphrodite.version import __version__ as APHRODITE_VERSION

from .lookup_client import LookupClientFactory
from .lookup_client.lmcache_async_lookup_client import LMCacheAsyncLookupServer
from .offload_server.zmq_server import ZMQOffloadServer

if TYPE_CHECKING:
from aphrodite.attention.backends.abstract import AttentionMetadata
from aphrodite.forward_context import ForwardContext
Expand Down Expand Up @@ -819,7 +819,7 @@ def start_load_kv(self, forward_context: "ForwardContext", **kwargs) -> None:
slot_mapping = request.slot_mapping.cuda()
assert len(tokens) == len(slot_mapping)

self._stats_monitor.update_interval_aphrodite_hit_tokens(
self._stats_monitor.update_interval_vllm_hit_tokens(
request.load_spec.aphrodite_cached_tokens
)
Comment on lines +822 to 824

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The method name update_interval_vllm_hit_tokens seems to be a leftover from vllm, which could be confusing in the aphrodite codebase. While this is inherited from the lmcache dependency, consider aliasing or wrapping this to use a more consistent naming convention (e.g., update_interval_aphrodite_hit_tokens) if possible, to improve code clarity and maintainability.

token_mask = torch.ones(len(tokens), dtype=torch.bool)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# SPDX-License-Identifier: Apache-2.0
from .abstract_client import LookupClientInterface
from .factory import LookupClientFactory
from .lmcache_lookup_client import LMCacheLookupClient, LMCacheLookupServer
from .mooncake_lookup_client import MooncakeLookupClient

__all__ = [
"LookupClientInterface",
"LookupClientFactory",
"MooncakeLookupClient",
"LMCacheLookupClient",
"LMCacheLookupServer",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# SPDX-License-Identifier: Apache-2.0
# Standard
import abc
from typing import TYPE_CHECKING, Optional, Union

import torch

if TYPE_CHECKING:
pass


class LookupClientInterface(metaclass=abc.ABCMeta):
"""Abstract interface for lookup clients."""

@abc.abstractmethod
def lookup(
self,
token_ids: Union[torch.Tensor, list[int]],
lookup_id: str,
request_configs: Optional[dict] = None,
) -> Optional[int]:
"""
Perform lookup for the given token IDs.

Args:
token_ids: The token IDs to lookup

lookup_id: The lookup ID to associate with the lookup

request_configs: The configs of the request,
includes tags and the other configs

Returns:
The number of tokens that can be loaded from cache.
None indicates the lookup/prefetch is in progress.
"""
raise NotImplementedError

@abc.abstractmethod
def close(self) -> None:
"""Close the lookup client and clean up resources."""
raise NotImplementedError

def supports_producer_reuse(self) -> bool:
"""
Return whether this lookup client supports producer KV cache reuse.

Returns:
True if producer reuse is supported, False otherwise
"""
return False
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
# SPDX-License-Identifier: Apache-2.0
# Standard
from typing import TYPE_CHECKING, Optional, Union

from lmcache.v1.cache_engine import LMCacheEngine
from lmcache.v1.config import LMCacheEngineConfig

from aphrodite.logger import init_logger

from .abstract_client import LookupClientInterface
from .hit_limit_lookup_client import HitLimitLookupClient
from .mooncake_lookup_client import MooncakeLookupClient

if TYPE_CHECKING:
from aphrodite.config import AphroditeConfig

from .lmcache_async_lookup_client import LMCacheAsyncLookupServer
from .lmcache_lookup_client import LMCacheLookupServer

logger = init_logger(__name__)


class LookupClientFactory:
"""Factory for creating lookup clients and servers based on configuration."""

@staticmethod
def create_lookup_client(
aphrodite_config: "AphroditeConfig",
config: LMCacheEngineConfig,
) -> LookupClientInterface:
"""
Create a lookup client based on the configuration.

Args:
aphrodite_config: The Aphrodite configuration
config: The LMCache engine configuration

Returns:
A lookup client instance
"""

# Check if external_lookup_client is configured
if config.external_lookup_client is not None:
if config.enable_async_loading:
raise ValueError(
"Asynchronous loading is not supported for external lookup clients."
)
client = LookupClientFactory._create_external_lookup_client(
config.external_lookup_client, aphrodite_config
)
else:
from .lmcache_async_lookup_client import LMCacheAsyncLookupClient
from .lmcache_lookup_client import LMCacheLookupClient

if config.enable_async_loading:
client = LMCacheAsyncLookupClient(aphrodite_config)
else:
client = LMCacheLookupClient(aphrodite_config)

if config.hit_miss_ratio is not None and 0 <= config.hit_miss_ratio <= 1:
return HitLimitLookupClient(client, config)
return client

@staticmethod
def create_lookup_server(
lmcache_engine: LMCacheEngine,
aphrodite_config: "AphroditeConfig",
) -> Optional[Union["LMCacheLookupServer", "LMCacheAsyncLookupServer"]]:
"""
Create a lookup server based on the configuration.

Args:
lmcache_engine: The LMCache engine instance
aphrodite_config: The Aphrodite configuration

Returns:
A lookup server instance, or None if no server should be created
"""
config = lmcache_engine.config
assert isinstance(config, LMCacheEngineConfig), (
"LMCache v1 config is expected for lookup server and client"
)

# Only create the KV lookup API server on worker rank 0
# when there are multiple workers and when not using external lookup client
create_lookup_server_only_on_worker_0_for_mla = config.get_extra_config_value(
"create_lookup_server_only_on_worker_0_for_mla",
lmcache_engine.metadata.use_mla,
)

if config.external_lookup_client is None and (
not create_lookup_server_only_on_worker_0_for_mla
or lmcache_engine.metadata.worker_id == 0
):
from .lmcache_async_lookup_client import LMCacheAsyncLookupServer
from .lmcache_lookup_client import LMCacheLookupServer

if config.enable_async_loading:
return LMCacheAsyncLookupServer(lmcache_engine, aphrodite_config)
else:
return LMCacheLookupServer(lmcache_engine, aphrodite_config)

return None

@staticmethod
def _create_external_lookup_client(
external_lookup_uri: str,
aphrodite_config: "AphroditeConfig",
) -> LookupClientInterface:
"""
Create an external lookup client based on the URI format.

Args:
external_lookup_uri: URI in format <scheme>://<address>
aphrodite_config: The Aphrodite configuration

Returns:
A lookup client instance

Raises:
ValueError: If the URI format is unsupported
"""
# Parse URI scheme and address
if "://" not in external_lookup_uri:
raise ValueError(
f"Invalid external lookup client URI format: {external_lookup_uri}. "
"Expected format: <scheme>://<address>"
)

scheme, address = external_lookup_uri.split("://", 1)

# Route to appropriate client based on scheme
if scheme == "mooncakestore":
return LookupClientFactory._create_mooncake_lookup_client(
address, aphrodite_config
)
else:
raise ValueError(
f"Unsupported external lookup client scheme: {scheme}. "
"Supported schemes: mooncakestore"
)

@staticmethod
def _create_mooncake_lookup_client(
master_address: str,
aphrodite_config: "AphroditeConfig",
) -> "MooncakeLookupClient":
"""Create a MooncakeLookupClient instance."""
from .mooncake_lookup_client import MooncakeLookupClient

return MooncakeLookupClient(aphrodite_config, master_address)
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# SPDX-License-Identifier: Apache-2.0
# Standard
from typing import Optional, Union

# Third Party
import torch
from lmcache.v1.config import LMCacheEngineConfig

# First Party
from aphrodite.logger import init_logger

from .abstract_client import LookupClientInterface

logger = init_logger(__name__)


"""
HitLimitLookupClient now is used for test, when lookup is called, cal the cache hit,
- if the cache hit <= (1 - hit_miss_ratio), direct return the result
- if the cache hit > (1 - hit_miss_ratio), re-compute the result by hit_miss_ratio
"""


class HitLimitLookupClient(LookupClientInterface):
def __init__(
self, actual_lookup_client: LookupClientInterface, config: LMCacheEngineConfig
):
assert config.hit_miss_ratio is not None and 0 <= config.hit_miss_ratio <= 1
self.actual_lookup_client = actual_lookup_client
self.hit_ratio_upper = 1 - config.hit_miss_ratio
self.chunk_size = config.chunk_size
logger.info(
f"create HitLimitLookupClient succeed, the hit ratio upper"
f"is {self.hit_ratio_upper}, chunk size is {self.chunk_size}"
)

def lookup(
self,
token_ids: Union[torch.Tensor, list[int]],
lookup_id: str,
request_configs: Optional[dict] = None,
) -> Optional[int]:
# get real hit tokens
result = self.actual_lookup_client.lookup(token_ids, lookup_id, request_configs)
if result is not None:
total_tokens_length = len(token_ids)
assert result <= total_tokens_length
current_hit_ratio = 0.0
if total_tokens_length > 0:
current_hit_ratio = result / total_tokens_length
# limit the hit tokens
if current_hit_ratio > self.hit_ratio_upper:
origin_result = result
# align to chunk size
new_result = (
int(total_tokens_length * self.hit_ratio_upper)
// self.chunk_size
* self.chunk_size
)
# check again
result = min(result, new_result)
logger.debug(
f"hit ratio upper: {self.hit_ratio_upper} is smaller than "
f"the real hit ratio {current_hit_ratio}, "
f"the origin result is {origin_result}, "
f"the new result is {new_result}, the final result is {result}"
)
return result

def supports_producer_reuse(self) -> bool:
return self.actual_lookup_client.supports_producer_reuse()

def clear_lookup_status(self, lookup_id: str) -> None:
"""Clear lookup status for the given lookup_id.

Delegates to the wrapped lookup client.
"""
if hasattr(self.actual_lookup_client, 'clear_lookup_status'):
self.actual_lookup_client.clear_lookup_status(lookup_id)

def close(self) -> None:
self.actual_lookup_client.close()
Loading