Add scheduling mechanism and new workload#1025
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a comprehensive disaggregated inference framework for Wan2.2 models, featuring a centralized controller for dynamic instance management and a sidecar service for cross-process communication. Key improvements include autoscaling capabilities, enhanced RDMA and Mooncake transfer logic with better error handling, and the integration of a Locust-based workload generator. The review feedback suggests several performance and architectural refinements, including refactoring hardcoded instance initialization in the controller to be configuration-driven, reusing ZMQ contexts to reduce resource overhead, eliminating redundant memory zeroing in RDMA buffers to improve throughput, and moving heavy library imports to the module level for better visibility.
| for instance_type in ("encoder", "transformer", "decoder"): | ||
| address = self.create_instance(instance_type) | ||
| for _ in range(5): | ||
| self.create_instance("transformer") |
There was a problem hiding this comment.
The number of service instances is currently hardcoded in the run method (1 encoder, 6 transformers, 1 decoder). This significantly limits the flexibility of the disaggregated system and ignores the ranks and distribution settings provided in the configuration file. This logic should be refactored to dynamically spawn instances based on the configuration to support different hardware setups. Additionally, the address variable on line 1082 is assigned but never used.
| context = zmq.Context() | ||
| req = context.socket(zmq.REQ) | ||
| req.setsockopt(zmq.RCVTIMEO, 1000) | ||
| req.setsockopt(zmq.SNDTIMEO, 1000) | ||
| req.connect(req_addr) | ||
| try: | ||
| req.send_pyobj({"cmd": str(cmd)}) | ||
| reply = req.recv_pyobj() | ||
| if isinstance(reply, dict): | ||
| return reply | ||
| return None | ||
| except Exception: | ||
| return None | ||
| finally: | ||
| req.close(0) | ||
| context.term() |
There was a problem hiding this comment.
Creating a new zmq.Context for every call to _query_sidecar is inefficient as contexts are heavy resources intended to be long-lived. While sockets should remain per-thread or be protected by locks, the context itself should be initialized once (e.g., in __init__) and reused across the class instance.
|
|
||
| # Write payload to the selected slot (works for both server-local and client-remote paths). | ||
| slot_addr = self.descriptor.slot_addr + offset | ||
| self._rdma_write_bytes(slot_addr, b"\x00" * self.slot_size) |
There was a problem hiding this comment.
Zeroing the entire RDMA slot before writing the payload is redundant and impacts performance, especially for large slot sizes. The implementation already ensures atomicity by writing the length header last (line 291), and the consumer only reads the number of bytes specified by that header. Removing this zeroing step will improve production throughput.
| import numpy as np | ||
| import torch |
There was a problem hiding this comment.
Heavy libraries like numpy and torch are imported inside the _init_transformer_output_room method. It is better practice to place these at the top of the file to improve readability and make dependencies explicit, especially since this file is the entry point for the sidecar process and these libraries will be needed for most operations.
|
Need fix the conflicts @zhtshr |
This pull request introduces several new configuration files and significant updates to the disaggregated (disagg) connection logic and workload orchestration for the LightX2V project. The main focus is on supporting distributed inference with improved network handling, chunked data transfer, and dynamic workload simulation. The changes enhance reliability, configurability, and usability for running and testing disaggregated video inference pipelines.
Key changes:
New configuration and workload simulation
wan22_i2v_workload_stages.json) to define warmup and change phases for dynamic load testing.run_user.pyexample script to simulate dynamic user workloads, sending requests to the controller based on stage specifications and supporting configurable request rates.Disagg connection reliability and protocol improvements
_normalize_loopback_hostto ensure consistent use of IPv4 loopback addresses, controlled by theDISAGG_FORCE_IPV4_LOOPBACKenvironment variable, improving local and mixed-protocol deployments. [1] [2]send_data, controlled by theMOONCAKE_TRANSFER_CHUNK_BYTESenvironment variable, to handle large tensors more efficiently and robustly.Protocol and metadata updates
receiver_engine_rankin multipart messages for both encoder and transformer threads, ensuring correct routing and status updates in distributed settings. [1] [2] [3] [4] [5]mooncake.pyto better select a non-loopback IPv4 address for outbound connections, enhancing compatibility in multi-host environments.New libs
These changes collectively improve the flexibility, reliability, and scalability of the disaggregated inference pipeline, making it easier to configure, test, and deploy in distributed environments.