Refactor rollout topology and rollout metadata#1933
Conversation
53c3a19 to
71a0a3a
Compare
|
@claude review |
|
Claude: ## Review Summary Clean refactor that converges scattered topology fields into ProduceBatchResult impact: not affected IssuesCritical
Warning
Nit
VerdictREQUEST_CHANGES — the removed session_url validation is a silent safety regression that should be restored. |
|
@claude review |
|
Claude: ## Review Summary The follow-up commit (patch 2/2) addresses all critical and major findings from the prior review:
The refactor is well-structured. ProduceBatchResult impact: not affected Remaining IssuesWarning
Nit
VerdictAPPROVE — the core design is sound and all prior critical findings are resolved. The remaining items are defensive suggestions, not blockers. |
3cfb762 to
787428e
Compare
|
@claude review |
787428e to
bb56702
Compare
|
Claude: ## Review Summary Third review after commits were rebased. The topology refactor is solid: ProduceBatchResult impact: not affected IssuesNit
VerdictAPPROVE — the data model is clean, the validation in |
| weight_update_host: str | None = None | ||
| # Optional port used by NCCL external weight update groups. | ||
| weight_update_port: int | None = None | ||
|
|
There was a problem hiding this comment.
Claude: [Nit] This line keeps the or dict() defensive pattern:
backend = (rollout_config.extra_rollout_config or dict()).get("lmdeploy_backend", "pytorch")But lmdeploy.py and sglang.py in this same PR dropped the or dict():
extra_config = self.config.extra_rollout_configSince RolloutConfig.extra_rollout_config defaults to {}, both are safe — but picking one style consistently avoids confusion about whether the field can be None.
总览当前 建议保持
flowchart LR
Public[public Interface] --> Exclusive[_exclusive_lifecycle_operation]
Exclusive --> Registry[RolloutWorkerRegistry]
Exclusive --> Recovery[_restart_worker_group / _shutdown_worker_group]
Recovery --> Checkpoint[_checkpoint_not_stopping]
Public --> Notify[lock 外 listener notify / log]
重点问题1. pause + lock + stop checkpoint 分散当前
如果这些逻辑散在各个方法里,读者必须同时跟踪业务流程和并发控制。更简洁的做法是用一个 context manager 表达这个固定模式: @contextmanager
def _exclusive_lifecycle_operation(self):
with self._background_health_checks_paused():
with self._operation_lock:
self._checkpoint_not_stopping()
yield调用方只看到业务意图: with self._exclusive_lifecycle_operation():
inactive_groups = self._registry.claim_inactive_groups_for_recovery()
...这个改动的核心价值是 Locality:pause、lock、stop checkpoint 的规则只在一个地方维护。外部 Interface 不变,也不新增新的 public Seam。 2. stop check 语义分散当前实现里会反复出现 但 stop check 应该统一表达为一个 checkpoint: def _checkpoint_not_stopping(self) -> None:
if self._is_stopping():
raise InterruptedError长流程只在关键阶段前后调用它: self._checkpoint_not_stopping()
self._shutdown_worker_group(group, wait_server_down=True, best_effort=False)
self._checkpoint_not_stopping()
init_results = self._init_worker_group(group)
self._checkpoint_not_stopping()
self._offload_worker_group(group)这样 except InterruptedError:
self._shutdown_worker_group(group, wait_server_down=False, best_effort=True)
return False这个改动让 stop 控制成为一个内部 Interface,而不是散落的实现细节。 3. 高层方法应该像业务流程当前 Implementation 的阅读成本高,是因为高层方法被 pause、lock、notify、stop 细节打断。建议把高层方法写成线性业务流程。 这个规则是几个设计原则的合流:
这里不能机械理解成“每一步都抽一个函数”。结合 deletion test,只有能隐藏真实规则、提高 Locality 的私有方法才值得保留;纯转发的一行 wrapper 应该内联。
def restart_inactive_workers(self) -> None:
recovered_groups = []
failed_groups = []
with self._exclusive_lifecycle_operation():
groups = self._registry.claim_inactive_groups_for_recovery()
for group in sorted(groups, key=lambda group: group.ranks):
recovered = self._restart_worker_group(group)
updated_group = self._registry.set_group_recovery_result(group, recovered=recovered)
if recovered:
self._clear_failure_counts(group.ranks)
recovered_groups.append(updated_group or group)
else:
failed_groups.append(updated_group or group)
self._notify_worker_group_recovered(tuple(recovered_groups))
self._log_failed_recovery_groups(failed_groups)
def check_and_shutdown_inactive_workers(self) -> None:
with self._exclusive_lifecycle_operation():
newly_inactive = self._check_and_mark_failed_groups_locked(fail_fast=True)
inactive_groups = self._registry.claim_inactive_groups_for_recovery()
failed_shutdown_groups = self._shutdown_and_mark_inactive(inactive_groups)
self._notify_worker_group_inactive(newly_inactive)
self._log_failed_shutdown_groups(failed_shutdown_groups)关键点是:锁内只做 registry 状态迁移和 worker lifecycle 操作;listener 和 log 这类外部副作用放在锁外。这样主流程保持线性,副作用位置也更明确。 |
|
@claude review |
|
Claude: ## Review Summary Fourth review covering the latest two commits (patches 5/6): Both commits are clean refinements that address prior collaborator feedback:
ProduceBatchResult impact: not affected IssuesNit
VerdictAPPROVE — the refactor is well-structured, test coverage is thorough (including lifecycle, topology API, and weight-update target projection tests), and all prior findings are resolved. |
Summary
这个 PR 重构了 rollout topology 和 rollout weight update metadata 的表达方式:
RolloutTopology,作为启动期确定的rollout拓扑,指明有哪些rollout engine和server process;RolloutWorkerRegistry作为 Rollout worker 运行状态的集合,包含worker url, session url, active状态等;RolloutWeightUpdateTarget代替RolloutMetadata,它由RolloutServerProcess.weight_update_ranks和运行时的WorkerSnapshot.server_url/lifecycle_state组合而成,从RolloutWorkerRegistry映射而来;RolloutWeightUpdateInfo,更新成员变量为:rollout_config、weight_update_targets、train_rank等重构后,具体关系如下:
