-
Notifications
You must be signed in to change notification settings - Fork 186
Expand file tree
/
Copy pathprocess_result.py
More file actions
177 lines (151 loc) · 6.64 KB
/
process_result.py
File metadata and controls
177 lines (151 loc) · 6.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import sys
import json
import os
from pathlib import Path
def get_required_env_vars(required_vars):
"""Load and validate required environment variables."""
env_values = {}
missing_env_vars = []
for var_name in required_vars:
value = os.environ.get(var_name)
if value is None:
missing_env_vars.append(var_name)
env_values[var_name] = value
if missing_env_vars:
raise EnvironmentError(
f"Missing required environment variables: {', '.join(missing_env_vars)}")
return env_values
# Base required env vars
base_env = get_required_env_vars([
'RUNNER_TYPE', 'FRAMEWORK', 'PRECISION', 'SPEC_DECODING',
'RESULT_FILENAME', 'ISL', 'OSL', 'DISAGG', 'MODEL_PREFIX', 'IMAGE'
])
hw = base_env['RUNNER_TYPE']
model_prefix = base_env['MODEL_PREFIX']
framework = base_env['FRAMEWORK']
precision = base_env['PRECISION']
spec_decoding = base_env['SPEC_DECODING']
disagg = base_env['DISAGG'].lower() == 'true'
result_filename = base_env['RESULT_FILENAME']
isl = base_env['ISL']
osl = base_env['OSL']
image = base_env['IMAGE']
with open(f'{result_filename}.json') as f:
bmk_result = json.load(f)
# A failed/degenerate benchmark (server never came up, disagg warmup deadlock,
# MoRI/KV-transport failure, etc.) still writes a result JSON, but with zeroed
# throughput and latency metrics. Detect that here and fail with the real,
# actionable reason. Otherwise the tpot reciprocal below (1000.0 / tpot) raises
# an opaque "ZeroDivisionError: float division by zero" that masks the true
# server-side cause and makes every such failure look identical.
_output_tput = float(bmk_result.get('output_throughput', 0) or 0)
_total_tput = float(bmk_result.get('total_token_throughput', 0) or 0)
if _output_tput <= 0 or _total_tput <= 0:
raise SystemExit(
"FAIL: benchmark produced no decode throughput "
f"(output_throughput={_output_tput}, total_token_throughput={_total_tput}) "
f"in {result_filename}.json — the server almost certainly failed to serve "
"(disagg warmup deadlock / MoRI transport failure / server never reached "
"ready). Check the multinode_server_logs artifact for the real error."
)
data = {
'hw': hw,
'conc': int(bmk_result['max_concurrency']),
'image': image,
'model': bmk_result['model_id'],
'infmax_model_prefix': model_prefix,
'framework': framework,
'precision': precision,
'spec_decoding': spec_decoding,
'disagg': disagg,
'isl': int(isl),
'osl': int(osl),
}
is_multinode = os.environ.get('IS_MULTINODE', 'false').lower() == 'true'
if is_multinode:
# TODO: Eventually will have to have a separate condition in here for multinode disagg and
# multinode agg. For now, just assume that multinode implies disagg.
multinode_env = get_required_env_vars(['PREFILL_GPUS', 'DECODE_GPUS', 'PREFILL_NUM_WORKERS', 'PREFILL_TP',
'PREFILL_EP', 'PREFILL_DP_ATTN', 'DECODE_NUM_WORKERS', 'DECODE_TP', 'DECODE_EP', 'DECODE_DP_ATTN'])
prefill_gpus = int(multinode_env['PREFILL_GPUS'])
decode_gpus = int(multinode_env['DECODE_GPUS'])
prefill_num_workers = int(multinode_env['PREFILL_NUM_WORKERS'])
prefill_tp = int(multinode_env['PREFILL_TP'])
prefill_ep = int(multinode_env['PREFILL_EP'])
prefill_dp_attn = multinode_env['PREFILL_DP_ATTN']
decode_num_workers = int(multinode_env['DECODE_NUM_WORKERS'])
decode_tp = int(multinode_env['DECODE_TP'])
decode_ep = int(multinode_env['DECODE_EP'])
decode_dp_attn = multinode_env['DECODE_DP_ATTN']
total_gpus = prefill_gpus + decode_gpus
if total_gpus <= 0:
raise ValueError("Multinode results require at least one GPU.")
if prefill_gpus <= 0:
raise ValueError("Multinode results require at least one prefill GPU.")
output_tput_denominator = decode_gpus if decode_gpus > 0 else total_gpus
output_decode_tp = decode_tp if decode_gpus > 0 else 0
output_decode_ep = decode_ep if decode_gpus > 0 else 0
multi_node_data = {
'is_multinode': True,
'prefill_tp': prefill_tp,
'prefill_ep': prefill_ep,
'prefill_dp_attention': prefill_dp_attn,
'prefill_num_workers': prefill_num_workers,
'decode_tp': output_decode_tp,
'decode_ep': output_decode_ep,
'decode_dp_attention': decode_dp_attn,
'decode_num_workers': decode_num_workers,
'num_prefill_gpu': prefill_gpus,
'num_decode_gpu': decode_gpus,
'tput_per_gpu': float(bmk_result['total_token_throughput']) / total_gpus,
'output_tput_per_gpu': float(bmk_result['output_throughput']) / output_tput_denominator,
'input_tput_per_gpu': (float(bmk_result['total_token_throughput']) - float(bmk_result['output_throughput'])) / prefill_gpus,
}
data = data | multi_node_data
else:
if disagg:
raise ValueError("Disaggregated mode requires multinode setup.")
single_node_env = get_required_env_vars(['TP', 'EP_SIZE', 'DP_ATTENTION'])
tp_size = int(single_node_env['TP'])
ep_size = int(single_node_env['EP_SIZE'])
dp_attention = single_node_env['DP_ATTENTION']
single_node_data = {
'is_multinode': False,
'tp': tp_size,
'ep': ep_size,
'dp_attention': dp_attention,
'tput_per_gpu': float(bmk_result['total_token_throughput']) / tp_size,
'output_tput_per_gpu': float(bmk_result['output_throughput']) / tp_size,
'input_tput_per_gpu': (float(bmk_result['total_token_throughput']) - float(bmk_result['output_throughput'])) / tp_size,
}
data = data | single_node_data
for key, value in bmk_result.items():
if key.endswith('ms'):
data[key.replace('_ms', '')] = float(value) / 1000.0
if 'tpot' in key:
data[key.replace('_ms', '').replace(
'tpot', 'intvty')] = 1000.0 / float(value)
print(json.dumps(data, indent=2))
agg_path = Path(f'agg_{result_filename}.json')
with open(agg_path, 'w') as f:
json.dump(data, f, indent=2)
# Best-effort: patch measured power into the agg JSON. Never fails the run.
try:
from aggregate_power import run as _aggregate_power_run
_csv_candidates = [
os.environ.get('GPU_METRICS_CSV'),
'gpu_metrics.csv',
'/workspace/gpu_metrics.csv',
]
_csv_path = next(
(Path(p) for p in _csv_candidates if p and Path(p).is_file()),
None,
)
if _csv_path is not None:
_aggregate_power_run(
csv_path=_csv_path,
bench_result=Path(f'{result_filename}.json'),
agg_result=agg_path,
)
except Exception as exc: # noqa: BLE001 — never block on telemetry
print(f'[process_result] power aggregation skipped: {exc}', file=sys.stderr)