Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions examples/metrics/metrics_configs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,22 @@ counter:
documentation: "Total shards dispatched by Cache dump (mirror of cache_load_shards_total)"
- name: "cache_dump_backend_shards_total"
documentation: "Shards actually pushed to backend on dump (excludes !handle.Owner() skips in shared-buffer scenario)"
- name: "cache_load_queue_full_total"
documentation: "Number of Cache load submissions rejected because the waiting queue was full"
- name: "cache_dump_queue_full_total"
documentation: "Number of Cache dump submissions rejected because the waiting queue was full"
- name: "cache_backend_load_submit_errors_total"
documentation: "Number of Cache load backend submit failures"
- name: "cache_backend_load_wait_errors_total"
documentation: "Number of Cache load backend wait failures"
- name: "cache_backend_dump_submit_errors_total"
documentation: "Number of Cache dump backend submit failures"
- name: "cache_backend_dump_wait_errors_total"
documentation: "Number of Cache dump backend wait failures"
- name: "cache_h2d_errors_total"
documentation: "Number of Cache host-to-device transfer or sync failures"
- name: "cache_d2h_errors_total"
documentation: "Number of Cache device-to-host transfer, event wait, or sync failures"

# -- Pipeline Cache stage: per-worker real throughput counters ------------
# Accumulated per completed Cache task with size = shardSize * num_shards.
Expand All @@ -53,6 +69,14 @@ counter:
documentation: "Total bytes transferred from posix storage to host buffer (load path, summed per completed task)"
- name: "posix_h2s_bytes_total"
documentation: "Total bytes transferred from host buffer to posix storage (dump path, summed per completed task)"
- name: "posix_aio_timeout_total"
documentation: "Number of Posix AIO task or submit timeouts"
- name: "posix_io_timeout_total"
documentation: "Number of Posix synchronous worker task timeouts"
- name: "posix_open_errors_total"
documentation: "Number of Posix open failures"
- name: "posix_io_errors_total"
documentation: "Number of Posix read, write, or AIO completion failures"

# -- Connector layer: per-worker real throughput counters ----------------
# The legacy load_speed / save_speed histograms sample per-call speed
Expand All @@ -63,6 +87,20 @@ counter:
documentation: "Total bytes loaded through the UCM connector (summed across all start_load_kv calls)"
- name: "save_bytes_total"
documentation: "Total bytes saved through the UCM connector (summed across all wait_for_save calls)"
- name: "connector_lookup_errors_total"
documentation: "Number of connector lookup errors treated as cache misses"
- name: "connector_load_submit_errors_total"
documentation: "Number of connector load submit failures"
- name: "connector_load_wait_errors_total"
documentation: "Number of connector load wait failures"
- name: "connector_load_invalid_requests_total"
documentation: "Number of connector load failure events that invalidated request blocks"
- name: "connector_load_invalid_blocks_total"
documentation: "Number of newly invalidated vLLM block ids caused by connector load failures"
- name: "connector_dump_submit_errors_total"
documentation: "Number of connector dump submit failures"
- name: "connector_dump_wait_errors_total"
documentation: "Number of connector dump wait failures"

# Gauge metrics configuration
gauge:
Expand Down
15 changes: 13 additions & 2 deletions ucm/integration/vllm/hma_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,8 @@ def get_num_new_matched_tokens(
f"request {request.request_id} FAWA lookup error. "
f"{type(e).__name__}: {e}"
)
self._record_counter("connector_lookup_errors_total")

total_hit_block_num = wa_hbm_hit_block_num + external_hit_blocks
num_total_hit_tokens = (
external_hit_blocks * self.hash_block_size + wa_computed_tokens
Expand Down Expand Up @@ -977,7 +979,10 @@ def _wait_load_task(
f"request {load_task.request_id} wait FAWA load "
f"task label={load_task.label} error. {type(e).__name__}: {e}"
)
self._invalid_block_ids.update(load_task.anchor_vllm_block_ids)
self._record_load_error(
"connector_load_wait_errors_total",
load_task.anchor_vllm_block_ids,
)

def get_block_ids_with_load_errors(self) -> set[int]:
res = self._invalid_block_ids
Expand Down Expand Up @@ -1121,7 +1126,10 @@ def start_load_kv(self, forward_context: "ForwardContext", **kwargs) -> None:
f"request {request_id} submit FAWA load task "
f"error. {type(e).__name__}: {e}"
)
self._invalid_block_ids.update(all_group_vllm_block_ids)
self._record_load_error(
"connector_load_submit_errors_total",
all_group_vllm_block_ids,
)

for load_task in tasks:
self._wait_load_task(load_task)
Expand Down Expand Up @@ -1259,6 +1267,7 @@ def wait_for_save(self) -> None:
except Exception as e:
self.device.destroy_event_handle(event_handle)
logger.error(f"dump FAWA kv cache failed. {type(e).__name__}: {e}")
self._record_counter("connector_dump_submit_errors_total")
if wa_dump_keys:
event_handle = self._get_dump_event_handle()
window_ptrs = np.vstack(wa_ptr_rows)
Expand All @@ -1277,6 +1286,7 @@ def wait_for_save(self) -> None:
except Exception as e:
self.device.destroy_event_handle(event_handle)
logger.error(f"dump FAWA kv cache failed. {type(e).__name__}: {e}")
self._record_counter("connector_dump_submit_errors_total")

def _poll_completed_dump_tasks(self) -> None:
"""Reap completed FAWA dump tasks without waiting for in-flight tasks."""
Expand Down Expand Up @@ -1330,6 +1340,7 @@ def _drain_best_effort_dump_tasks(self, finished_req_ids: set[str]) -> None:
f"label={dump_task.label}, keys={dump_task.key_count}, "
f"{type(e).__name__}: {e}"
)
self._record_counter("connector_dump_wait_errors_total")
finally:
self.device.destroy_event_handle(dump_task.event_handle)

Expand Down
46 changes: 38 additions & 8 deletions ucm/integration/vllm/ucm_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ def _drop_null_vllm_blocks(
return filtered_ucm_block_ids, filtered_vllm_block_ids


def _record_counter(name: str, value: float = 1.0) -> None:
ucmmetrics.update_stats({name: value})


@dataclass
class RequestMeta:
ucm_block_ids: list[bytes] = field(default_factory=list)
Expand Down Expand Up @@ -421,6 +425,22 @@ def __init__(
self.hash_block_size = self.block_size
self.block_size *= self.cp_world_size

@staticmethod
def _record_counter(name: str, value: float = 1.0) -> None:
_record_counter(name, value)

def _record_load_error(self, metric_name: str, block_ids: Any) -> None:
invalid_blocks = set(block_ids)
new_invalid_blocks = invalid_blocks - self._invalid_block_ids
Comment thread
qyh111 marked this conversation as resolved.
self._invalid_block_ids.update(invalid_blocks)
ucmmetrics.update_stats(
{
metric_name: 1.0,
"connector_load_invalid_requests_total": 1.0,
"connector_load_invalid_blocks_total": float(len(new_invalid_blocks)),
}
)

def generate_hash(
self, block_size: int, token_ids: List[int], parent_block_hash_value: bytes
) -> list[bytes]:
Expand Down Expand Up @@ -594,6 +614,7 @@ def get_num_new_matched_tokens(
logger.error(
f"request {request.request_id} look up error. {type(e).__name__}: {e}"
)
self._record_counter("connector_lookup_errors_total")

logger.info_once(
f"request_id: {request.request_id}, "
Expand Down Expand Up @@ -813,8 +834,9 @@ def start_load_kv(self, forward_context: "ForwardContext", **kwargs) -> None:
logger.error(
f"request {request_id} submit load task error. {type(e).__name__}: {e}"
)
self._invalid_block_ids.update(
metadata.request_meta[request_id].load_block_ids[1]
self._record_load_error(
"connector_load_submit_errors_total",
metadata.request_meta[request_id].load_block_ids[1],
)
self._connector_worker_meta.mark_failed(request_id)
num_loaded_block -= len(ucm_block_ids)
Expand All @@ -826,8 +848,9 @@ def start_load_kv(self, forward_context: "ForwardContext", **kwargs) -> None:
logger.error(
f"request {request_id} wait load task error. {type(e).__name__}: {e}"
)
self._invalid_block_ids.update(
metadata.request_meta[request_id].load_block_ids[1]
self._record_load_error(
"connector_load_wait_errors_total",
metadata.request_meta[request_id].load_block_ids[1],
)
self._connector_worker_meta.mark_failed(request_id)
num_loaded_block -= request_to_load_blocks.get(request_id, 0)
Expand Down Expand Up @@ -1124,8 +1147,9 @@ def _submit_request_load_tasks_for_layer(
logger.error(
f"request {request_id} submit load task for layer {layer_id} error. {type(e).__name__}: {e}"
)
self._invalid_block_ids.update(
metadata.request_meta[request_id].load_block_ids[1]
self._record_load_error(
"connector_load_submit_errors_total",
metadata.request_meta[request_id].load_block_ids[1],
)
self._failure_req_ids.add(request_id)
self._connector_worker_meta.mark_failed(request_id)
Expand Down Expand Up @@ -1189,8 +1213,9 @@ def wait_for_layer_load(self, layer_name: str) -> None:
logger.error(
f"request {request_id} wait {layer_name} load failed. {type(e).__name__}: {e}"
)
self._invalid_block_ids.update(
metadata.request_meta[request_id].load_block_ids[1]
self._record_load_error(
"connector_load_wait_errors_total",
metadata.request_meta[request_id].load_block_ids[1],
)
self._connector_worker_meta.mark_failed(request_id)
self._failure_req_ids.add(request_id)
Expand Down Expand Up @@ -1276,6 +1301,7 @@ def save_kv_layer(
logger.error(
f"submit dump task for {layer_name} failed. {type(e).__name__}: {e}"
)
self._record_counter("connector_dump_submit_errors_total")
if self.enable_event_sync and event_handle and self.device is not None:
self.device.destroy_event_handle(event_handle)
if self.is_save:
Expand Down Expand Up @@ -1542,6 +1568,7 @@ def get_num_new_matched_tokens(self, request, num_computed_tokens):
logger.error(
f"request {request.request_id} wait dump task error. {type(e).__name__}: {e}"
)
self._record_counter("connector_dump_wait_errors_total")
self.requests_meta[request.request_id] = RequestMeta()

self.total_hit_block_nums += external_hit_blocks
Expand Down Expand Up @@ -1861,6 +1888,7 @@ def lookup_external_hit_tokens(
f"full-attn group {fa.group_id} lookup error. "
f"{type(e).__name__}: {e}"
)
_record_counter("connector_lookup_errors_total")
candidates.append(0)
continue
candidates.append(max(fa_hit_blocks, 0) * fa.block_size)
Expand Down Expand Up @@ -1895,6 +1923,7 @@ def lookup_external_hit_tokens(
f"mamba-align group {sw.group_id} lookup error. "
f"{type(e).__name__}: {e}"
)
_record_counter("connector_lookup_errors_total")
return 0, 0
if not all(results):
logger.info(
Expand Down Expand Up @@ -1938,6 +1967,7 @@ def lookup_external_hit_tokens(
f"sliding window group {sw.group_id} lookup error. "
f"{type(e).__name__}: {e}"
)
_record_counter("connector_lookup_errors_total")
return 0, 0
if not all(results):
logger.info(
Expand Down
14 changes: 14 additions & 0 deletions ucm/shared/infra/thread/latch.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,20 @@ class Latch {
if (this->counter_ == 0) { return; }
this->cv_.wait(lk, [this] { return this->counter_ == 0; });
}
// Abort may race with Done(); only the thread moving counter_ to zero runs finish_.
void Abort() noexcept
Comment thread
dante159753 marked this conversation as resolved.
{
auto prev = this->counter_.exchange(0, std::memory_order_acq_rel);
if (prev == 0) {
std::lock_guard<std::mutex> lg(this->mutex_);
this->cv_.notify_all();
return;
}
if (this->finish_) { this->finish_(); }
std::lock_guard<std::mutex> lg(this->mutex_);
this->cv_.notify_all();
}
size_t Pending() const noexcept { return this->counter_.load(std::memory_order_relaxed); }
bool IsTimeout(size_t timeoutMs) noexcept
{
using namespace std::chrono;
Expand Down
7 changes: 7 additions & 0 deletions ucm/store/cache/cc/dump_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ void DumpQueue::Submit(TaskPtr task, WaiterPtr waiter)
auto success = waiting_.TryPush({task, waiter});
if (success) { return; }
UC_ERROR("Waiting queue full, submit dump task({}) failed.", task->id);
UC::Metrics::UpdateStats(NAME_TO_METRIC_ID("cache_dump_queue_full_total"), 1.0);
failureSet_->Insert(task->id);
waiter->Done();
}
Expand Down Expand Up @@ -104,6 +105,7 @@ Status DumpQueue::DumpOneTask(CopyStream& stream, TaskPtr task)
auto s = stream.WaitEvent(reinterpret_cast<void*>(task->desc.prerequisiteHandle));
if (s.Failure()) [[unlikely]] {
UC_ERROR("Failed({}) to wait prerequisite event for dump task({}).", s, task->id);
UC::Metrics::UpdateStats(NAME_TO_METRIC_ID("cache_d2h_errors_total"), 1.0);
return s;
}
}
Expand All @@ -116,6 +118,7 @@ Status DumpQueue::DumpOneTask(CopyStream& stream, TaskPtr task)
DeviceToHostGatherAsync(stream.NextStream(), shard.addrs.data(), handle.Data());
if (s.Failure()) [[unlikely]] {
UC_ERROR("Failed({}) to do D2H batch async for task({}).", s, task->id);
UC::Metrics::UpdateStats(NAME_TO_METRIC_ID("cache_d2h_errors_total"), 1.0);
return s;
}
}
Expand All @@ -131,13 +134,15 @@ Status DumpQueue::DumpOneTask(CopyStream& stream, TaskPtr task)
auto s = stream.Synchronize();
if (s.Failure()) [[unlikely]] {
UC_ERROR("Failed({}) to sync on stream for task({}).", s, task->id);
UC::Metrics::UpdateStats(NAME_TO_METRIC_ID("cache_d2h_errors_total"), 1.0);
return s;
}
auto tpSyncStream = NowTime::Now();
for (auto& handle : dumpCtx.bufferHandles) { handle.MarkReady(); }
auto res = backend_->Dump(std::move(backendTaskDesc));
if (!res) [[unlikely]] {
UC_ERROR("Failed({}) to submit dump task({}) to backend.", res.Error(), task->id);
UC::Metrics::UpdateStats(NAME_TO_METRIC_ID("cache_backend_dump_submit_errors_total"), 1.0);
return res.Error();
}
dumpCtx.backendTaskHandle = res.Value();
Expand Down Expand Up @@ -186,6 +191,8 @@ void DumpQueue::BackendDumpStage()
if (s.Failure()) {
UC_ERROR("Failed({}) to wait backend({}) for task({}).", s, task.backendTaskHandle,
task.taskHandle);
UC::Metrics::UpdateStats(NAME_TO_METRIC_ID("cache_backend_dump_wait_errors_total"),
1.0);
return;
}
}
Expand Down
7 changes: 7 additions & 0 deletions ucm/store/cache/cc/load_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ void LoadQueue::Submit(TaskPtr task, WaiterPtr waiter)
auto success = waiting_.TryPush({task, waiter});
if (success) { return; }
UC_ERROR("Waiting queue full, submit load task({}) failed.", task->id);
UC::Metrics::UpdateStats(NAME_TO_METRIC_ID("cache_load_queue_full_total"), 1.0);
failureSet_->Insert(task->id);
waiter->Done();
}
Expand Down Expand Up @@ -99,6 +100,8 @@ void LoadQueue::DispatchOneTask(TaskPair&& pair)
auto res = backend_->Load(std::move(backendTask));
if (!res) [[unlikely]] {
UC_ERROR("Failed({}) to submit load task({}) to backend.", res.Error(), task->id);
UC::Metrics::UpdateStats(
NAME_TO_METRIC_ID("cache_backend_load_submit_errors_total"), 1.0);
failureSet_->Insert(task->id);
waiter->Done();
return;
Expand Down Expand Up @@ -153,6 +156,7 @@ void LoadQueue::TransferOneTask(CopyStream& stream, ShardTask&& task)
task.shard.addrs.data());
if (s.Failure()) [[unlikely]] {
UC_ERROR("Failed({}) to do H2D batch async for task({}).", s, task.taskHandle);
UC::Metrics::UpdateStats(NAME_TO_METRIC_ID("cache_h2d_errors_total"), 1.0);
break;
}
auto tpH2dSubmitted = NowTime::Now();
Expand All @@ -168,6 +172,7 @@ void LoadQueue::TransferOneTask(CopyStream& stream, ShardTask&& task)
holder_.clear();
if (s.Failure()) [[unlikely]] {
UC_ERROR("Failed({}) to sync on stream for task({}).", s, task.taskHandle);
UC::Metrics::UpdateStats(NAME_TO_METRIC_ID("cache_h2d_errors_total"), 1.0);
break;
}
} while (0);
Expand All @@ -182,6 +187,8 @@ Status LoadQueue::WaitBackendTaskReady(ShardTask& task)
if (s.Failure()) [[unlikely]] {
UC_ERROR("Failed({}) to wait backend({}) for task({}).", s, task.backendTaskHandle,
task.taskHandle);
UC::Metrics::UpdateStats(NAME_TO_METRIC_ID("cache_backend_load_wait_errors_total"),
1.0);
return s;
}
task.bufferHandle.MarkReady();
Expand Down
6 changes: 4 additions & 2 deletions ucm/store/detail/template/task_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#ifndef UNIFIEDCACHE_STORE_DETAIL_TEMPLATE_TASK_WRAPPER_H
#define UNIFIEDCACHE_STORE_DETAIL_TEMPLATE_TASK_WRAPPER_H

#include <memory>
#include <shared_mutex>
#include <unordered_map>
#include "logger/logger.h"
Expand All @@ -47,6 +48,7 @@ class TaskWrapper {
std::shared_mutex mutex_{};
virtual void Dispatch(TaskPtr t, WaiterPtr w) = 0;
virtual void Cancel(TaskPtr t) {}
virtual bool IsAio() const { return false; }

public:
Expected<TaskHandle> Submit(Task task)
Expand Down Expand Up @@ -97,12 +99,12 @@ class TaskWrapper {
while (!w->WaitForDuration(drainSliceMs)) {
UC_WARN("Task({}) has not finished after ({}) ms.", taskId, drainSliceMs);
}
failureSet_.Remove(taskId);
if (!IsAio()) { failureSet_.Remove(taskId); }
Comment thread
qyh111 marked this conversation as resolved.
return Status::Timeout();
}
auto failure = failureSet_.Contains(taskId);
Comment thread
dante159753 marked this conversation as resolved.
if (failure) [[unlikely]] {
failureSet_.Remove(taskId);
if (!IsAio()) { failureSet_.Remove(taskId); }
return Status::Error();
}
return Status::OK();
Expand Down
3 changes: 3 additions & 0 deletions ucm/store/posix/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ file(GLOB_RECURSE UCM_POSIX_STORE_CC_SOURCE_FILES "./cc/*.cc")
add_library(posixstore SHARED ${UCM_POSIX_STORE_CC_SOURCE_FILES})
target_include_directories(posixstore PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/cc)
target_link_libraries(posixstore PUBLIC storeintf infra_logger metrics)
if(BUILD_UNIT_TESTS)
target_compile_definitions(posixstore PUBLIC UCM_ENABLE_TEST_HOOKS)
endif()
# libmetrics.so installs to ucm/shared/metrics/; posixstore.so to
# ucm/store/posix/ — ../../shared/metrics reaches it.
set_target_properties(posixstore PROPERTIES
Expand Down
Loading
Loading