Skip to content

Commit 9ece591

Browse files
committed
added a grpc api for getting utilisation and meminfo in json
1 parent 2c102fa commit 9ece591

9 files changed

Lines changed: 1010 additions & 17 deletions

File tree

.gitignore

Lines changed: 487 additions & 0 deletions
Large diffs are not rendered by default.

README.md

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,4 +96,84 @@ grpcurl \
9696

9797
*Note: The limiter daemon now automatically sets the socket permissions to `0o777`, so you do not need `sudo` to run this command from the host.*
9898

99+
## Reporting Compute Utilization via gRPC
100+
101+
In addition to `SetPriority`, the daemon exposes a `GetUtilization` RPC that reports the compute utilization of this container, per NPU device.
102+
103+
### How it works
104+
105+
Each manager thread participates in a global baton-passing scheme: only the container that currently owns the baton is allowed to issue NPU work. A background reporter thread (one per device) wakes on each baton handoff (the same `signal_counter` futex the managers use) and on fixed window deadlines. It records how long our manager held the baton within each window, then computes `busy_us / window_us` at each deadline and keeps a rolling history for averaging. Window boundaries do not shift when baton events are processed.
106+
107+
The reporter only reads shared memory — it does not modify or interfere with the scheduler.
108+
109+
Each device entry also includes **memory** numbers:
110+
111+
- When **`NPU_MEM_QUOTA`** was set at limiter start (`limit_enforced: true`), totals come from the same quota tracker as the hook: `total_mb` is the configured quota (floor MB), `used_mb` is `memory_used` from shared memory (floor MB), and `free_mb` is `total_mb - used_mb`. This matches what the limiter enforces; it is **not** `rtMemGetInfoEx` in that mode.
112+
- When **no quota** was set (`limit_enforced: false`), the daemon uses **`rtMemGetInfoEx`** on that device (after `rtSetDevice` with the visible-device index) so you still get total / used / derived free in MB.
113+
114+
### Environment variables
115+
116+
Read once at daemon startup:
117+
118+
| Variable | Default | Effect |
119+
| --- | --- | --- |
120+
| `NPU_REPORT_INTERVAL_MS` | `1000` | Window length in ms. Set to `0` to disable the reporter thread; `GetUtilization` will then return zero percentages. |
121+
| `NPU_REPORT_HISTORY_SCALE` | `10` | Number of recent windows averaged for `utilization_recent_windows_avg_percent`. Minimum `1`. |
122+
123+
### Example usage
124+
125+
```bash
126+
grpcurl \
127+
-plaintext \
128+
-unix \
129+
-d '{}' \
130+
/tmp/hami-shared-region/npu_limiter_2.sock \
131+
npu_limiter.LimiterControl/GetUtilization
132+
```
133+
134+
Example response:
135+
136+
```json
137+
{
138+
"intervalMs": "1000",
139+
"historyScale": "10",
140+
"devices": [
141+
{
142+
"deviceId": 0,
143+
"tracked": true,
144+
"utilizationLastIntervalPercent": 37.25,
145+
"utilizationRecentWindowsAvgPercent": 32.88,
146+
"memory": {
147+
"limitEnforced": true,
148+
"totalMb": "10240",
149+
"usedMb": "1248",
150+
"freeMb": "8992"
151+
}
152+
}
153+
]
154+
}
155+
```
156+
157+
### Field meanings
158+
159+
| Field | Meaning |
160+
| --- | --- |
161+
| `interval_ms` | Effective window length (from `NPU_REPORT_INTERVAL_MS`). |
162+
| `history_scale` | Effective rolling-average size (from `NPU_REPORT_HISTORY_SCALE`). |
163+
| `devices[].device_id` | Physical device id (matches entries in `ASCEND_RT_VISIBLE_DEVICES`). |
164+
| `devices[].tracked` | `true` once the reporter has located this device's manager slot. |
165+
| `devices[].utilization_last_interval_percent` | Utilization of the most recently completed window, `0..=100`. |
166+
| `devices[].utilization_recent_windows_avg_percent` | Mean utilization over up to `history_scale` completed windows, `0..=100`. |
167+
| `devices[].memory.limit_enforced` | `true` if `NPU_MEM_QUOTA` was set at daemon start for this pod. |
168+
| `devices[].memory.total_mb` | Quota total in MB (floor) when enforced; otherwise runtime total from `rtMemGetInfoEx`. |
169+
| `devices[].memory.used_mb` | Tracked used memory in MB (floor) when enforced; otherwise `total_mb - free_mb` from runtime. |
170+
| `devices[].memory.free_mb` | `total_mb - used_mb` when enforced; otherwise runtime free in MB (floor). |
171+
172+
Notes:
173+
174+
- Scope is **per container**. Multiple containers publish independently on their own UDS.
175+
- History is in-memory and resets on daemon restart.
176+
- "Busy" here means wall time that this container's manager held the global baton between consecutive handoffs; it does not count time spent waiting in the queue.
177+
- If no work happens for a whole window, the value is `0%` and reports continue to arrive at the fixed cadence.
178+
99179
---

crates/limiter/src/externed_api.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ unsafe extern "C" {
1313
pub fn rtGetDevicePhyIdByIndex(device_index: u32, phy_device: *mut u32) -> i32;
1414
pub fn rtDeviceSynchronize() -> i32;
1515
pub fn rtStreamGetCaptureInfo(stream: u64, status: *mut u32, model: *mut u64) -> i32;
16+
/// Same as hooked in libvnpu; used by the limiter daemon when no quota is set.
17+
pub fn rtMemGetInfoEx(mem_info_type: u64, free: *mut usize, total: *mut usize) -> u64;
1618
}
1719

1820
// RT ERROR CODE

crates/limiter/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ pub mod worker;
22
pub mod manager;
33
pub mod shmem;
44
pub mod externed_api;
5+
pub mod reporter;
6+
pub mod memory_report;
57
use ctor::ctor;
68

79
#[ctor]

crates/limiter/src/main.rs

Lines changed: 104 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
// Use the library crate's name to access the module
22
use limiter::manager::ContainerManager;
3+
use limiter::memory_report;
4+
use limiter::reporter::UtilizationReporter;
35
use limiter::shmem::{shm_setup, GlobalRegistry, LocalContainerShmem, local_shmem_name_for};
46
use std::collections::BTreeSet;
57
use std::thread;
8+
use std::time::Duration;
69
use std::sync::Arc;
710
use std::sync::atomic::{AtomicU64, Ordering};
811
use std::os::unix::fs::PermissionsExt;
@@ -16,11 +19,19 @@ pub mod npu_limiter {
1619
}
1720

1821
use npu_limiter::limiter_control_server::{LimiterControl, LimiterControlServer};
19-
use npu_limiter::{SetPriorityRequest, SetPriorityResponse};
22+
use npu_limiter::{
23+
DeviceUtilization, GetUtilizationRequest, GetUtilizationResponse, MemorySnapshot,
24+
SetPriorityRequest, SetPriorityResponse,
25+
};
2026

21-
#[derive(Debug, Clone)]
27+
/// gRPC service. Keeps a handle to the shared priority atomic plus one
28+
/// utilization reporter per device managed by this daemon.
29+
#[derive(Clone)]
2230
pub struct LimiterControlService {
2331
priority_atomic: Arc<AtomicU64>,
32+
reporters: Arc<Vec<(u32, Arc<UtilizationReporter>)>>,
33+
/// Read-only mapping of each device's local shmem (same names as managers).
34+
local_shmems: Arc<Vec<&'static LocalContainerShmem>>,
2435
}
2536

2637
#[tonic::async_trait]
@@ -42,6 +53,48 @@ impl LimiterControl for LimiterControlService {
4253
message: format!("Priority updated to {}", new_priority),
4354
}))
4455
}
56+
57+
async fn get_utilization(
58+
&self,
59+
_request: Request<GetUtilizationRequest>,
60+
) -> Result<Response<GetUtilizationResponse>, Status> {
61+
// All reporters on this daemon share the same interval/history config,
62+
// so any of them can supply the header. Fall back to defaults if the
63+
// daemon was launched without any devices (which shouldn't happen).
64+
let (interval_ms, history_scale) = self
65+
.reporters
66+
.first()
67+
.map(|(_, r)| (r.interval_ms(), r.history_scale()))
68+
.unwrap_or((0, 0));
69+
70+
let devices = self
71+
.reporters
72+
.iter()
73+
.enumerate()
74+
.map(|(idx, (dev, r))| {
75+
let snap = r.snapshot();
76+
let m = memory_report::memory_metrics(self.local_shmems[idx], idx as i32);
77+
DeviceUtilization {
78+
device_id: *dev,
79+
tracked: snap.tracked,
80+
utilization_last_interval_percent: snap.last_interval_percent,
81+
utilization_recent_windows_avg_percent: snap.recent_avg_percent,
82+
memory: Some(MemorySnapshot {
83+
limit_enforced: m.limit_enforced,
84+
total_mb: m.total_mb,
85+
used_mb: m.used_mb,
86+
free_mb: m.free_mb,
87+
}),
88+
}
89+
})
90+
.collect();
91+
92+
Ok(Response::new(GetUtilizationResponse {
93+
interval_ms,
94+
history_scale,
95+
devices,
96+
}))
97+
}
4598
}
4699

47100
fn parse_visible_devices() -> Vec<u32> {
@@ -82,29 +135,61 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
82135
// Shared priority across all devices in this container
83136
let priority_atomic = Arc::new(AtomicU64::new(0.0f64.to_bits()));
84137

138+
// One utilization reporter per device. The gRPC handler reads snapshots
139+
// from these Arcs; each reporter thread is started inside its device's
140+
// manager thread (where we open the shared memory).
141+
let mut reporters: Vec<(u32, Arc<UtilizationReporter>)> = Vec::with_capacity(devices.len());
142+
for dev in &devices {
143+
reporters.push((*dev, UtilizationReporter::from_env()));
144+
}
145+
85146
let mut handles = Vec::new();
86-
for dev in devices {
147+
for (dev_idx, dev) in devices.iter().enumerate() {
87148
let global_path = format!("{}_dev{}", global_base, dev);
88-
let local_path = local_shmem_name_for(dev);
149+
let local_path = local_shmem_name_for(*dev);
89150
let pid_for_thread = pid;
90151
let p_atomic = priority_atomic.clone();
152+
let reporter = reporters[dev_idx].1.clone();
91153

92154
let handle = thread::spawn(move || {
93-
// 1. 创建共享内存 (拥有内存的绝对控制权)
94-
// 只有 Manager 有权限使用 create_shmem
95-
let global_reg = shm_setup::open_global_registry::<GlobalRegistry>(&global_path);
155+
// 1. Open shared memory. `open_global_registry` returns a
156+
// `&'static mut GlobalRegistry`; the reporter needs a shared
157+
// `&'static GlobalRegistry` view of the same mapping, and the
158+
// manager (further below) consumes the same shared reference.
159+
// All fields are atomics, so aliasing as `&` is sound.
160+
let global_reg_mut: &'static mut GlobalRegistry =
161+
shm_setup::open_global_registry::<GlobalRegistry>(&global_path);
162+
let global_reg: &'static GlobalRegistry =
163+
unsafe { &*(global_reg_mut as *const GlobalRegistry) };
96164
let local_shm = shm_setup::create_shmem::<LocalContainerShmem>(local_path.as_str());
97165

98-
// 2. 初始化 Manager (调用 new)
99-
let mut manager = ContainerManager::new(global_reg, local_shm, pid_for_thread as i32, p_atomic);
166+
// 2. Start the utilization reporter. It locates its slot by
167+
// matching on pid, which is written during `ContainerManager::new`
168+
// just below, so a brief retry inside the reporter is expected.
169+
reporter.start(global_reg, pid_for_thread);
100170

101-
// 3. 进入主循环,开始不断调度和分发 Token
102-
// 这行代码会阻塞线程,直到进程被 Kill
171+
// 3. Initialize the manager and enter its scheduling loop.
172+
let mut manager =
173+
ContainerManager::new(global_reg, local_shm, pid_for_thread as i32, p_atomic);
103174
manager.run();
104175
});
105176
handles.push(handle);
106177
}
107178

179+
// Map each device's local shmem for quota / used reads (managers created it above).
180+
let local_shmems: Vec<&'static LocalContainerShmem> = devices
181+
.iter()
182+
.map(|dev| {
183+
let path = local_shmem_name_for(*dev);
184+
loop {
185+
if let Some(ptr) = shm_setup::try_open_shmem::<LocalContainerShmem>(&path) {
186+
break unsafe { &*(ptr as *const LocalContainerShmem) };
187+
}
188+
thread::sleep(Duration::from_millis(20));
189+
}
190+
})
191+
.collect();
192+
108193
// Start gRPC server on UDS if configured
109194
let uds_path = std::env::var("NPU_LIMITER_UDS_PATH").unwrap_or_else(|_| "/tmp/npu_limiter.sock".to_string());
110195

@@ -122,9 +207,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
122207
let uds_stream = UnixListenerStream::new(uds);
123208

124209
log::info!("[Daemon] Starting gRPC control server on UDS: {}", uds_path);
125-
126-
let service = LimiterControlService { priority_atomic };
127-
210+
211+
let service = LimiterControlService {
212+
priority_atomic,
213+
reporters: Arc::new(reporters),
214+
local_shmems: Arc::new(local_shmems),
215+
};
216+
128217
let reflection_service = tonic_reflection::server::Builder::configure()
129218
.register_encoded_file_descriptor_set(npu_limiter::FILE_DESCRIPTOR_SET)
130219
.build_v1()?;
@@ -141,4 +230,4 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
141230
}
142231

143232
Ok(())
144-
}
233+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
//! Memory metrics for gRPC reporting: quota view from `LocalContainerShmem` when
2+
//! `NPU_MEM_QUOTA` is set; otherwise `rtMemGetInfoEx` on the device (same source
3+
//! the hook uses when quota is not enforced).
4+
5+
use crate::externed_api::{rtMemGetInfoEx, rtSetDevice};
6+
use crate::shmem::LocalContainerShmem;
7+
use std::sync::atomic::Ordering;
8+
use std::sync::{Mutex, OnceLock};
9+
10+
const MB: u64 = 1024 * 1024;
11+
/// Matches the hook's `rtMemGetInfoEx` first argument when not using quota.
12+
const MEM_INFO_TYPE: u64 = 0;
13+
14+
/// Serialize runtime memory queries so `rtSetDevice` / `rtMemGetInfoEx` do not
15+
/// race with other threads using the runtime.
16+
fn rt_mem_lock() -> std::sync::MutexGuard<'static, ()> {
17+
static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
18+
LOCK.get_or_init(|| Mutex::new(()))
19+
.lock()
20+
.expect("memory report lock poisoned")
21+
}
22+
23+
#[derive(Debug, Clone, Copy)]
24+
pub struct MemoryMetrics {
25+
pub limit_enforced: bool,
26+
pub total_mb: u64,
27+
pub used_mb: u64,
28+
pub free_mb: u64,
29+
}
30+
31+
/// `logical_device` is the index among visible devices (0, 1, …), passed to
32+
/// `rtSetDevice` when falling back to the runtime.
33+
pub fn memory_metrics(local: &LocalContainerShmem, logical_device: i32) -> MemoryMetrics {
34+
let limit = local.memory_limit.load(Ordering::Relaxed);
35+
if limit > 0 {
36+
let total_mb = limit / MB;
37+
let used = local.memory_used.load(Ordering::Acquire);
38+
let used_mb = used / MB;
39+
let free_mb = total_mb.saturating_sub(used_mb);
40+
return MemoryMetrics {
41+
limit_enforced: true,
42+
total_mb,
43+
used_mb,
44+
free_mb,
45+
};
46+
}
47+
48+
let _g = rt_mem_lock();
49+
unsafe {
50+
if rtSetDevice(logical_device) != 0 {
51+
return MemoryMetrics {
52+
limit_enforced: false,
53+
total_mb: 0,
54+
used_mb: 0,
55+
free_mb: 0,
56+
};
57+
}
58+
let mut free = 0usize;
59+
let mut total = 0usize;
60+
let rc = rtMemGetInfoEx(MEM_INFO_TYPE, &mut free, &mut total);
61+
if rc != 0 {
62+
return MemoryMetrics {
63+
limit_enforced: false,
64+
total_mb: 0,
65+
used_mb: 0,
66+
free_mb: 0,
67+
};
68+
}
69+
let total_mb = (total as u64) / MB;
70+
let free_mb = (free as u64) / MB;
71+
let used_mb = total_mb.saturating_sub(free_mb);
72+
MemoryMetrics {
73+
limit_enforced: false,
74+
total_mb,
75+
used_mb,
76+
free_mb,
77+
}
78+
}
79+
}

0 commit comments

Comments
 (0)