Skip to content

Commit 7fc66a6

Browse files
ChenhanYuclaude
andcommitted
feat(launcher): add vLLM container support for data synthesis
- core.py: clear Docker ENTRYPOINT so containers with vllm serve as entrypoint (e.g. vllm/vllm-openai:qwen3_5-cu130) run correctly - vllm/query.sh: use python3, install datasets+openai before running query.py - query.py: add --max-tokens arg to cap response length; reinitialize OpenAI client after fork to avoid connection-pool corruption in datasets.map(num_proc>1) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: chenhany <chenhany@nvidia.com>
1 parent 9eb0050 commit 7fc66a6

3 files changed

Lines changed: 23 additions & 3 deletions

File tree

tools/launcher/common/query.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,16 +44,30 @@ def _strip_thinking(content: str) -> str:
4444
class LLM:
4545
def __init__(self, args):
4646
self.args = args
47+
self._pid = os.getpid()
4748
self.client = OpenAI(base_url=args.base_url)
4849
self.generate(messages=[{"role": "user", "content": "Hello! /no_think"}], verbose=True)
4950

51+
def _ensure_client(self):
52+
"""Reinitialize the HTTP client if we've been forked into a new process.
53+
54+
datasets.map(num_proc>1) forks worker processes that inherit the parent's
55+
connection pool. Reusing inherited sockets across processes causes
56+
"Invalid HTTP request" errors. Creating a fresh client per-process avoids this.
57+
"""
58+
if os.getpid() != self._pid:
59+
self._pid = os.getpid()
60+
self.client = OpenAI(base_url=self.args.base_url)
61+
5062
def generate(self, messages, verbose=False, **chat_template_kwargs):
5163
global early_termination
64+
self._ensure_client()
5265
try:
5366
completion = self.client.chat.completions.create(
5467
model=self.args.model,
5568
messages=messages,
5669
temperature=self.args.temperature,
70+
max_tokens=self.args.max_tokens,
5771
)
5872
new_message = completion.choices[0].message.content
5973
if verbose:
@@ -88,6 +102,7 @@ def generate(self, messages, verbose=False, **chat_template_kwargs):
88102
)
89103
parser.add_argument("--num-proc", type=int, default=32, help="number of processes (concurrency).")
90104
parser.add_argument("--temperature", type=float, default=0.0, help="temperature.")
105+
parser.add_argument("--max-tokens", type=int, default=None, help="maximum tokens to generate per response.")
91106
args = parser.parse_args()
92107

93108
llm = LLM(args)
@@ -162,7 +177,11 @@ def synthesize(data):
162177
return {"conversations": current_messages}
163178

164179

165-
dataset = load_dataset(args.data, split=args.data_split)
180+
# Support both HF Hub repo IDs and local file paths (.jsonl, .json, .parquet, etc.)
181+
if os.path.isfile(args.data):
182+
dataset = load_dataset("json", data_files=args.data, split=args.data_split)
183+
else:
184+
dataset = load_dataset(args.data, split=args.data_split)
166185

167186
if args.num_shards * 100 > len(dataset):
168187
args.num_shards = min(16, len(dataset) // 100)

tools/launcher/common/vllm/query.sh

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,8 @@ while true; do
118118
sleep 10
119119
done
120120

121-
cmd="python common/query.py http://localhost:8000/v1 ${MODEL} ${QUERY_ARGS[*]}"
121+
pip3 install -q datasets openai 2>/dev/null || true
122+
cmd="python3 common/query.py http://localhost:8000/v1 ${MODEL} ${QUERY_ARGS[*]}"
122123
echo "Running command: $cmd"
123124
eval $cmd
124125
echo "Main process exit"

tools/launcher/core.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ def build_docker_executor(
322322
ipc_mode="host",
323323
container_image=slurm_config.container,
324324
volumes=container_mounts,
325-
additional_kwargs={"user": f"{os.getuid()}:{os.getgid()}"},
325+
additional_kwargs={"user": f"{os.getuid()}:{os.getgid()}", "entrypoint": ""},
326326
packager=packager,
327327
)
328328
return executor

0 commit comments

Comments
 (0)