Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/source/user-guide/metrics/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,10 @@ The default metrics configuration contains the following UCM metrics.
| `ucm:posix_h2s_bytes_total` | Total bytes transferred from host buffer to Posix storage. |
| `ucm:load_bytes_total` | Total bytes loaded through the UCM connector. |
| `ucm:save_bytes_total` | Total bytes saved through the UCM connector. |
| `ucm:dump_event_reshape_cache_direct_used_total` | Dump submissions synchronized with `attn_metadata.reshape_cache_event`. |
| `ucm:dump_event_reshape_cache_layer_used_total` | Dump submissions synchronized with `attn_metadata[layer_name].reshape_cache_event`. |
| `ucm:dump_event_current_stream_used_total` | Dump submissions synchronized with a UCM-recorded current stream event fallback. |
| `ucm:dump_event_sync_fallback_used_total` | Dump submissions that fell back to device synchronization because no event handle was available. |

### Gauges

Expand Down
3 changes: 2 additions & 1 deletion docs/source/user-guide/prefix-cache/pipeline_store.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ ucm_connectors:
use_gdr: false
enable_event_sync: true
use_layerwise: true
enable_reshape_cache_event_sync: false
enable_record_traces: false
use_lite: false
persist_token_threshold: 0
Expand Down Expand Up @@ -320,4 +321,4 @@ This log indicates that the **Posix Store** has received a **load or dump task**
```text
[UC][D] Posix task({task_id},{operation},{subtask_number},{size}) finished, cost {time}ms. [PID,TID]
```
This log indicates that a load or dump task in the **Posix Store** has completed, along with its execution time in **in ms**.
This log indicates that a load or dump task in the **Posix Store** has completed, along with its execution time in **in ms**.
8 changes: 8 additions & 0 deletions examples/metrics/metrics_configs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ counter:
documentation: "Total bytes loaded through the UCM connector (summed across all start_load_kv calls)"
- name: "save_bytes_total"
documentation: "Total bytes saved through the UCM connector (summed across all wait_for_save calls)"
- name: "dump_event_reshape_cache_direct_used_total"
documentation: "Number of dump submissions synchronized with attn_metadata.reshape_cache_event"
- name: "dump_event_reshape_cache_layer_used_total"
documentation: "Number of dump submissions synchronized with attn_metadata[layer_name].reshape_cache_event"
- name: "dump_event_current_stream_used_total"
documentation: "Number of dump submissions synchronized with a UCM-recorded current stream event fallback"
- name: "dump_event_sync_fallback_used_total"
documentation: "Number of dump submissions that fell back to device synchronization because no event handle was available"

# Gauge metrics configuration
gauge:
Expand Down
3 changes: 3 additions & 0 deletions examples/ucm_config_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ ucm_connectors:

# When you use UcmNfsStore, you should set enable_event_sync to false.
enable_event_sync: true
# Use vLLM-Ascend reshape cache events to start D2H immediately after KV cache is ready.
# Enable for better dump performance.
enable_reshape_cache_event_sync: false
# Enable UCM metrics so they can be monitored online via Grafana and Prometheus.
# metrics_config_path: "/workspace/unified-cache-management/examples/metrics/metrics_configs.yaml"

Expand Down
35 changes: 34 additions & 1 deletion ucm/integration/vllm/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from itertools import accumulate
from typing import Dict, List, Optional, Tuple, Union
from typing import Any, Dict, List, Optional, Tuple, Union

import torch
from vllm.platforms import current_platform
Expand All @@ -26,12 +26,16 @@
class Device(ABC):
def __init__(self):
self.events = {}
self.borrowed_events = []

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Suggestion - Thread safety consideration

The borrowed_events list is accessed without synchronization. If this device is used in a multi-threaded context (e.g., multiple workers sharing the same device instance), there could be race conditions when:

  • Multiple threads call get_event_handle_from_event concurrently (append)
  • One thread clears while another is appending

Consider adding:

  1. A lock for borrowed_events operations
  2. Documentation about thread-safety assumptions (single-threaded per device)
  3. A thread-safe collection if multi-threaded access is expected


@abstractmethod
def get_event_handle(self) -> int:
"""Return event handle for stream sync. 0 means no event (use synchronize instead)."""
pass

def get_event_handle_from_event(self, event: Any) -> int:
return 0

@abstractmethod
def synchronize(self):
pass
Expand Down Expand Up @@ -114,11 +118,23 @@ def get_event_handle(self) -> int:
logger.error(f"get cuda event handle failed. {e}")
return 0

def get_event_handle_from_event(self, event: Any) -> int:
try:
handle = getattr(event, "cuda_event", None)
if handle is None or int(handle) == 0:
return 0
self.borrowed_events.append(event)
return int(handle)
except Exception as e:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Warning - Exception handling too broad

Using except Exception catches all exceptions including system exceptions like KeyboardInterrupt, SystemExit. This could mask critical errors.

Recommended specific exceptions:

except (AttributeError, TypeError, ValueError) as e:

This applies to both CudaDevice (line 128) and NpuDevice (line 279) implementations.

logger.error(f"get cuda event handle from existing event failed. {e}")
return 0

def synchronize(self):
torch.cuda.current_stream().synchronize()

def destroy_event_handles(self):
self.events.clear()
self.borrowed_events.clear()

def destroy_event_handle(self, handle: int):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Warning - Inconsistent cleanup in destroy_event_handle

The destroy_event_handle method only removes events from self.events dict but does not remove corresponding entries from self.borrowed_events.

If individual event handles are destroyed (rather than batch clearing via destroy_event_handles), borrowed event references could accumulate, potentially causing memory growth over time.

Consider either:

  1. Adding logic to track handle-to-event mapping for borrowed events
  2. Documenting that destroy_event_handle is only for UCM-created events, not borrowed ones
  3. Using a different structure for borrowed_events with proper tracking

self.events.pop(handle, None)
Expand Down Expand Up @@ -248,6 +264,22 @@ def get_event_handle(self) -> int:
logger.error(f"get npu event handle failed. {e}")
return 0

def get_event_handle_from_event(self, event: Any) -> int:
try:
handle = getattr(event, "npu_event", None)
if handle is None:
parameter = getattr(event, "_as_parameter_", None)
if callable(parameter):
parameter = parameter()
handle = getattr(parameter, "value", None)
if handle is None or int(handle) == 0:
return 0
self.borrowed_events.append(event)
return int(handle)
except Exception as e:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Warning - Exception handling too broad

Same issue as CudaDevice - using except Exception is too broad. Should catch specific exceptions like AttributeError, TypeError, ValueError.

logger.error(f"get npu event handle from existing event failed. {e}")
return 0

def synchronize(self):
torch.npu.current_stream().synchronize()

Expand All @@ -260,6 +292,7 @@ def destroy_event_handles(self):
except Exception as e:
logger.error(f"destroy npu event failed. {e}")
self.events.clear()
self.borrowed_events.clear()

def destroy_event_handle(self, handle: int):
import acl
Expand Down
55 changes: 53 additions & 2 deletions ucm/integration/vllm/ucm_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ def __init__(
self.tp_rank = self._vllm_config.parallel_config.rank
self.block_size = self._vllm_config.cache_config.block_size
self.is_mla = self._vllm_config.model_config.is_deepseek_mla
self.use_mla = getattr(self._vllm_config.model_config, "use_mla", self.is_mla)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Suggestion - Unused variable

The variable self.use_mla is set here but never used in the visible code changes. If this is intentional (for future use), consider adding a comment explaining the purpose. If not needed, it should be removed to avoid confusion.

Note: self.is_mla is already used elsewhere in the code.

self.num_layers = self._vllm_config.model_config.get_num_layers(
self._vllm_config.parallel_config
)
Expand Down Expand Up @@ -363,6 +364,9 @@ def __init__(
self.launch_config = ucm_config.get_config()
self.connector_configs = self.launch_config.get("ucm_connectors", [])
self.enable_event_sync = self.launch_config.get("enable_event_sync", True)
self.enable_reshape_cache_event_sync = self.launch_config.get(
"enable_reshape_cache_event_sync", False
)
self.enable_record_traces = self.launch_config.get(
"enable_record_traces", False
)
Expand Down Expand Up @@ -826,14 +830,61 @@ def start_load_kv(self, forward_context: "ForwardContext", **kwargs) -> None:
def wait_for_layer_load(self, layer_name: str) -> None:
pass

def _get_dump_event_handle(self) -> int:
def _get_reshape_cache_event(
self, layer_name: Optional[str], attn_metadata: Optional["AttentionMetadata"]
) -> tuple[Optional[Any], str]:
if attn_metadata is None:
return None, "none"

if layer_name:
try:
layer_metadata = attn_metadata[layer_name]
event = getattr(layer_metadata, "reshape_cache_event", None)
if event is not None:
return event, "layer"
except (KeyError, TypeError, AttributeError):
pass

event = getattr(attn_metadata, "reshape_cache_event", None)
if event is not None:
return event, "direct"
return None, "direct_missing"
Comment on lines +833 to +851

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Warning - Metric gap for missing reshape cache events

When enable_reshape_cache_event_sync is True but the reshape cache event is missing (event_source = "none" or "direct_missing"), and we fall back to current stream or device sync, there is no specific metric tracking this "expected-but-missing" case.

This creates an observability gap - operators cannot distinguish between:

  • Intentional fallback (feature disabled)
  • Expected reshape event missing (vLLM version/model mismatch)

Consider adding metrics like:

  • dump_event_reshape_cache_none_total (attn_metadata is None)
  • dump_event_reshape_cache_missing_total (event_source = "direct_missing")


def _get_dump_event_handle(
self,
layer_name: Optional[str] = None,
attn_metadata: Optional["AttentionMetadata"] = None,
) -> int:
if not self.enable_event_sync:
self.device.synchronize()
return 0

if self.enable_reshape_cache_event_sync:
reshape_cache_event, event_source = self._get_reshape_cache_event(
layer_name, attn_metadata
)
event_handle = (
self.device.get_event_handle_from_event(reshape_cache_event)
if reshape_cache_event is not None
else 0
)
if event_handle != 0:
if event_source == "direct":
ucmmetrics.update_stats(
"dump_event_reshape_cache_direct_used_total", 1.0
)
elif event_source == "layer":
ucmmetrics.update_stats(
"dump_event_reshape_cache_layer_used_total", 1.0
)
return event_handle
Comment on lines +862 to +880

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Warning - Metric inconsistency in handle extraction failure

When enable_reshape_cache_event_sync is True and we successfully get a reshape_cache_event but get_event_handle_from_event returns 0 (handle extraction failed), we fall back to the current stream path and track "dump_event_current_stream_used_total".

The metric doesn't reflect that we attempted reshape cache sync first but failed at handle extraction. Consider:

  1. Adding a metric for "dump_event_reshape_cache_handle_failed_total"
  2. Or updating the existing metric logic to distinguish this fallback path


event_handle = self.device.get_event_handle()
if event_handle == 0:
self.device.synchronize()
ucmmetrics.update_stats("dump_event_sync_fallback_used_total", 1.0)
else:
ucmmetrics.update_stats("dump_event_current_stream_used_total", 1.0)
return event_handle

def save_kv_layer(
Expand Down Expand Up @@ -1140,7 +1191,7 @@ def save_kv_layer(
shard_indexs = [layer_id] * len(total_ucm_block_ids)
try:
layer_ptrs = np.ascontiguousarray(self.dump_total_ptrs[local_layer_id])
event_handle = self._get_dump_event_handle()
event_handle = self._get_dump_event_handle(layer_name, attn_metadata)
task = self.store.dump_data(
total_ucm_block_ids, shard_indexs, layer_ptrs, event_handle
)
Expand Down
Loading