|
1 | 1 | import argparse |
2 | | -import time |
3 | | -from typing import Any, Dict, List, Optional, Union |
4 | 2 |
|
5 | | -from rich.live import Live |
6 | | -from rich.table import Table |
| 3 | +from dstack._internal.cli.commands.metrics import MetricsCommand |
| 4 | +from dstack._internal.utils.logging import get_logger |
7 | 5 |
|
8 | | -from dstack._internal.cli.commands import APIBaseCommand |
9 | | -from dstack._internal.cli.services.completion import RunNameCompleter |
10 | | -from dstack._internal.cli.utils.common import ( |
11 | | - LIVE_TABLE_PROVISION_INTERVAL_SECS, |
12 | | - LIVE_TABLE_REFRESH_RATE_PER_SEC, |
13 | | - add_row_from_dict, |
14 | | - console, |
15 | | -) |
16 | | -from dstack._internal.core.errors import CLIError |
17 | | -from dstack._internal.core.models.metrics import JobMetrics |
18 | | -from dstack.api._public import Client |
19 | | -from dstack.api._public.runs import Run |
| 6 | +logger = get_logger(__name__) |
20 | 7 |
|
21 | 8 |
|
22 | | -class StatsCommand(APIBaseCommand): |
| 9 | +class StatsCommand(MetricsCommand): |
23 | 10 | NAME = "stats" |
24 | | - DESCRIPTION = "Show run stats" |
25 | | - |
26 | | - def _register(self): |
27 | | - super()._register() |
28 | | - self._parser.add_argument("run_name").completer = RunNameCompleter() |
29 | | - self._parser.add_argument( |
30 | | - "-w", |
31 | | - "--watch", |
32 | | - help="Watch run stats in realtime", |
33 | | - action="store_true", |
34 | | - ) |
35 | 11 |
|
36 | 12 | def _command(self, args: argparse.Namespace): |
| 13 | + logger.warning("`dstack stats` is deprecated in favor of `dstack metrics`") |
37 | 14 | super()._command(args) |
38 | | - run = self.api.runs.get(run_name=args.run_name) |
39 | | - if run is None: |
40 | | - raise CLIError(f"Run {args.run_name} not found") |
41 | | - if run.status.is_finished(): |
42 | | - raise CLIError(f"Run {args.run_name} is finished") |
43 | | - metrics = _get_run_jobs_metrics(api=self.api, run=run) |
44 | | - |
45 | | - if not args.watch: |
46 | | - console.print(_get_stats_table(run, metrics)) |
47 | | - return |
48 | | - |
49 | | - try: |
50 | | - with Live(console=console, refresh_per_second=LIVE_TABLE_REFRESH_RATE_PER_SEC) as live: |
51 | | - while True: |
52 | | - live.update(_get_stats_table(run, metrics)) |
53 | | - time.sleep(LIVE_TABLE_PROVISION_INTERVAL_SECS) |
54 | | - run = self.api.runs.get(run_name=args.run_name) |
55 | | - if run is None: |
56 | | - raise CLIError(f"Run {args.run_name} not found") |
57 | | - if run.status.is_finished(): |
58 | | - raise CLIError(f"Run {args.run_name} is finished") |
59 | | - metrics = _get_run_jobs_metrics(api=self.api, run=run) |
60 | | - except KeyboardInterrupt: |
61 | | - pass |
62 | | - |
63 | | - |
64 | | -def _get_run_jobs_metrics(api: Client, run: Run) -> List[JobMetrics]: |
65 | | - metrics = [] |
66 | | - for job in run._run.jobs: |
67 | | - job_metrics = api.client.metrics.get_job_metrics( |
68 | | - project_name=api.project, |
69 | | - run_name=run.name, |
70 | | - replica_num=job.job_spec.replica_num, |
71 | | - job_num=job.job_spec.job_num, |
72 | | - ) |
73 | | - metrics.append(job_metrics) |
74 | | - return metrics |
75 | | - |
76 | | - |
77 | | -def _get_stats_table(run: Run, metrics: List[JobMetrics]) -> Table: |
78 | | - table = Table(box=None) |
79 | | - table.add_column("NAME", style="bold", no_wrap=True) |
80 | | - table.add_column("CPU") |
81 | | - table.add_column("MEMORY") |
82 | | - table.add_column("GPU") |
83 | | - |
84 | | - run_row: Dict[Union[str, int], Any] = {"NAME": run.name} |
85 | | - if len(run._run.jobs) != 1: |
86 | | - add_row_from_dict(table, run_row) |
87 | | - |
88 | | - for job, job_metrics in zip(run._run.jobs, metrics): |
89 | | - cpu_usage = _get_metric_value(job_metrics, "cpu_usage_percent") |
90 | | - if cpu_usage is not None: |
91 | | - cpu_usage = f"{cpu_usage}%" |
92 | | - memory_usage = _get_metric_value(job_metrics, "memory_working_set_bytes") |
93 | | - if memory_usage is not None: |
94 | | - memory_usage = f"{round(memory_usage / 1024 / 1024)}MB" |
95 | | - if job.job_submissions[-1].job_provisioning_data is not None: |
96 | | - memory_usage += f"/{job.job_submissions[-1].job_provisioning_data.instance_type.resources.memory_mib}MB" |
97 | | - gpu_stats = "" |
98 | | - gpus_detected_num = _get_metric_value(job_metrics, "gpus_detected_num") |
99 | | - if gpus_detected_num is not None: |
100 | | - for i in range(gpus_detected_num): |
101 | | - gpu_memory_usage = _get_metric_value(job_metrics, f"gpu_memory_usage_bytes_gpu{i}") |
102 | | - gpu_util_percent = _get_metric_value(job_metrics, f"gpu_util_percent_gpu{i}") |
103 | | - if gpu_memory_usage is not None: |
104 | | - if i != 0: |
105 | | - gpu_stats += "\n" |
106 | | - gpu_stats += f"#{i} {round(gpu_memory_usage / 1024 / 1024)}MB" |
107 | | - if job.job_submissions[-1].job_provisioning_data is not None: |
108 | | - gpu_stats += f"/{job.job_submissions[-1].job_provisioning_data.instance_type.resources.gpus[i].memory_mib}MB" |
109 | | - gpu_stats += f" {gpu_util_percent}% Util" |
110 | | - |
111 | | - job_row: Dict[Union[str, int], Any] = { |
112 | | - "NAME": f" replica={job.job_spec.replica_num} job={job.job_spec.job_num}", |
113 | | - "CPU": cpu_usage or "-", |
114 | | - "MEMORY": memory_usage or "-", |
115 | | - "GPU": gpu_stats or "-", |
116 | | - } |
117 | | - if len(run._run.jobs) == 1: |
118 | | - job_row.update(run_row) |
119 | | - add_row_from_dict(table, job_row) |
120 | | - |
121 | | - return table |
122 | | - |
123 | | - |
124 | | -def _get_metric_value(job_metrics: JobMetrics, name: str) -> Optional[Any]: |
125 | | - for metric in job_metrics.metrics: |
126 | | - if metric.name == name: |
127 | | - return metric.values[-1] |
128 | | - return None |
0 commit comments