11#!/usr/bin/env python3
22"""
3- 分布式批量转码至 H.265( HEVC),支持通过 DeepSpeed/torchrun 等启动方式进行 rank 级分片:
4- - 传入一个或多个 list 文件,每行一个路径(绝对或相对 source_root)
5- - 在构建任务之前,先对“视频条目”按 RANK/WORLD_SIZE 做全局均匀分片,每个 rank 仅处理自身片段,降低重复解析成本
6- - 每个 rank 会保存:
7- - 本 rank 的“目标路径列表”文件( targets.rank{RANK}.txt):该 rank 对应的目标输出路径(包含已存在和将要生成的目标,剔除源缺失项)
8- - 每个 rank 可继续使用本地多进程池进行并行(默认会按 WORLD_SIZE 降低并行度)
3+ Distributed batch transcoding to H.265 ( HEVC), supports rank-level sharding via DeepSpeed/torchrun etc.:
4+ - Pass one or more list files, one path per line (absolute or relative to source_root)
5+ - Before building tasks, perform global uniform sharding of "video entries" by RANK/WORLD_SIZE, each rank only processes its own segment, reducing redundant parsing cost
6+ - Each rank will save:
7+ - This rank's "target path list" file ( targets.rank{RANK}.txt): target output paths for this rank (including existing and to-be-generated targets, excluding missing source items)
8+ - Each rank can continue using local multiprocessing pool for parallelism (parallelism is reduced by WORLD_SIZE by default)
99
1010deepspeed \
1111 --hostfile hosts_14 \
3030import time
3131from tqdm import tqdm
3232
33- # ===== 分布式环境变量(与原代码一致) =====
33+ # ===== Distributed environment variables (consistent with original code) =====
3434RANK = int (os .environ .get ("RANK" , "0" ))
3535LOCAL_RANK = int (os .environ .get ("LOCAL_RANK" , "0" ))
3636WORLD_SIZE = int (os .environ .get ("WORLD_SIZE" , "1" ))
3737# ========================================
3838
39- time .sleep (10 * RANK ) # 避免多进程启动时日志混乱
39+ time .sleep (10 * RANK ) # Avoid log confusion when multiple processes start
4040
41- # ===== 可调参数(环境变量) =====
42- GOP_SIZE = int (os .getenv ("GOP_SIZE" , "16" )) # 固定 GOP=16
43- CRF = int (os .getenv ("CRF" , "23" )) # 画质/码率平衡( libx265)或 hevc_nvenc 的 -cq
44- SKIP_AUDIO = os .getenv ("SKIP_AUDIO" , "1" ) == "1" # 默认去音频
41+ # ===== Adjustable parameters (environment variables) =====
42+ GOP_SIZE = int (os .getenv ("GOP_SIZE" , "16" )) # Fixed GOP=16
43+ CRF = int (os .getenv ("CRF" , "23" )) # Quality/bitrate balance ( libx265) or hevc_nvenc's -cq
44+ SKIP_AUDIO = os .getenv ("SKIP_AUDIO" , "1" ) == "1" # Remove audio by default
4545
46- # 根据 WORLD_SIZE 自动下调每个 rank 的并行度;若显式设置 NPROC 则以 NPROC 为准
46+ # Automatically reduce parallelism per rank based on WORLD_SIZE; if NPROC is explicitly set, use NPROC
4747_auto_proc = max (1 , (os .cpu_count () or 1 ) // max (1 , WORLD_SIZE ))
4848PROCESSES = 8
4949# =================================
5050
5151def _resolve_paths (relative_src_path : str , source_root : str , target_root : str ) -> Tuple [str , str ]:
5252 rel = relative_src_path .strip ()
5353
54- # 1) 解析源路径(支持绝对或相对)
54+ # 1) Parse source path (supports absolute or relative)
5555 if os .path .isabs (rel ):
5656 src_path = os .path .normpath (rel )
5757 else :
5858 src_path = os .path .normpath (os .path .join (source_root , rel .lstrip ('/' )))
5959
60- # 2) 在 target_root 下镜像目录结构
60+ # 2) Mirror directory structure under target_root
6161 if os .path .commonpath ([os .path .abspath (src_path ), os .path .abspath (source_root )]) == os .path .abspath (source_root ):
6262 rel_from_src = os .path .relpath (src_path , start = source_root )
6363 else :
@@ -67,7 +67,7 @@ def _resolve_paths(relative_src_path: str, source_root: str, target_root: str) -
6767 target_dir = os .path .join (target_root , rel_dir )
6868 os .makedirs (target_dir , exist_ok = True )
6969
70- # 统一输出为 .mp4( HEVC)
70+ # Unified output as .mp4 ( HEVC)
7171 file_stem = os .path .splitext (os .path .basename (src_path ))[0 ]
7272 mp4_name = file_stem + ".mp4"
7373 mp4_path = os .path .join (target_dir , mp4_name )
@@ -96,16 +96,16 @@ def _build_ffmpeg_cmd(
9696 src_path : str ,
9797 dst_path : str ,
9898 encoder : str ,
99- ff_log : str = "error" , # FFmpeg 的日志级别: quiet|panic|fatal|error|warning|info|verbose|debug|trace
100- x265_log : str = "error" # libx265 的日志级别: none|error|warning|info|debug|full
99+ ff_log : str = "error" , # FFmpeg log level: quiet|panic|fatal|error|warning|info|verbose|debug|trace
100+ x265_log : str = "error" # libx265 log level: none|error|warning|info|debug|full
101101) -> List [str ]:
102102 cmd = [
103103 "ffmpeg" ,
104104 "-y" ,
105105 "-nostdin" ,
106- "-hide_banner" , # 不打印横幅
107- "-nostats" , # 不打印进度 /stat 行
108- "-loglevel" , ff_log , # 控制 FFmpeg 自身日志
106+ "-hide_banner" , # Do not print banner
107+ "-nostats" , # Do not print progress /stat lines
108+ "-loglevel" , ff_log , # Control FFmpeg's own logs
109109 "-i" , src_path ,
110110 "-vf" , "scale=trunc(iw/2)*2:trunc(ih/2)*2" ,
111111 "-c:v" , encoder ,
@@ -116,7 +116,7 @@ def _build_ffmpeg_cmd(
116116 x265_params = (
117117 f"keyint={ GOP_SIZE } :min-keyint={ GOP_SIZE } :scenecut=0:"
118118 f"bframes=0:ref=1:repeat-headers=1:"
119- f"log-level={ x265_log } " # 控制 libx265 的日志
119+ f"log-level={ x265_log } " # Control libx265 logs
120120 )
121121 cmd += [
122122 "-preset" , "fast" ,
@@ -125,7 +125,7 @@ def _build_ffmpeg_cmd(
125125 "-x265-params" , x265_params ,
126126 ]
127127 else :
128- # hevc_nvenc 或其它硬编参数
128+ # hevc_nvenc or other hardware encoding parameters
129129 cmd += [
130130 "-preset" , "p5" ,
131131 "-cq" , str (CRF ),
@@ -191,11 +191,11 @@ def _extract_field(line: str, field_index: int) -> Optional[str]:
191191 try :
192192 return parts [field_index ]
193193 except Exception :
194- # 若索引非法,尝试找第一个包含 '/' 的 token
194+ # If index invalid, try to find first token containing '/'
195195 for tok in parts :
196196 if "/" in tok or "\\ " in tok :
197197 return tok
198- # 否则退回第 0 个
198+ # Otherwise fall back to token 0
199199 return parts [0 ]
200200
201201def _expand_lists (list_args : List [str ]) -> List [str ]:
@@ -205,10 +205,10 @@ def _expand_lists(list_args: List[str]) -> List[str]:
205205 if matches :
206206 files .extend (sorted (matches ))
207207 else :
208- # 如果没有 glob 匹配且是现有文件,就直接加
208+ # If no glob match and is existing file, add directly
209209 if os .path .isfile (pat ):
210210 files .append (pat )
211- # 去重同时保序
211+ # Deduplicate while preserving order
212212 seen = set ()
213213 deduped = []
214214 for f in files :
@@ -222,8 +222,8 @@ def parse_items_from_lists(
222222 strip_prefix : str ,
223223) -> List [str ]:
224224 """
225- 解析 list 文件,提取并规范化“原始条目”(已应用 field_index 和 strip_prefix),不做文件存在性检查。
226- 返回按输入顺序去重后的条目列表。
225+ Parse list files, extract and normalize "raw entries" (with field_index and strip_prefix applied), no file existence check.
226+ Return deduplicated entry list in input order.
227227 """
228228 items : List [str ] = []
229229 seen = set ()
@@ -255,14 +255,14 @@ def build_tasks_from_items(
255255 log_path : Optional [str ] = None
256256) -> Tuple [List [Tuple [str , str , str , str ]], List [str ]]:
257257 """
258- 基于“分片后的原始条目”构建任务与目标列表(仅对当前 rank 的条目进行处理)。
259- 返回:
260- - tasks: 需要处理的任务列表 (src_path, dst_path, encoder, log_path)
261- - targets_rank: 本 rank 的所有“源存在”的目标输出路径(含已存在与待生成,已去重)
258+ Build task and target lists based on "sharded raw entries" (only process entries for current rank).
259+ Returns:
260+ - tasks: List of tasks to process (src_path, dst_path, encoder, log_path)
261+ - targets_rank: All "source exists" target output paths for this rank (including existing and to-be-generated, deduplicated)
262262 """
263263 encoder = _pick_encoder ()
264264 if not encoder :
265- print ("[warn] 未能确定可用 HEVC 编码器。请设置环境变量 HEVC_ENCODER=libx265 或 hevc_nvenc。 " )
265+ print ("[warn] Unable to determine available HEVC encoder. Please set environment variable HEVC_ENCODER=libx265 or hevc_nvenc. " )
266266
267267 tasks : List [Tuple [str , str , str , str ]] = []
268268 targets_rank : List [str ] = []
@@ -302,18 +302,18 @@ def shard_list(seq: List[str], world_size: int, rank: int) -> List[str]:
302302
303303def main ():
304304 ap = argparse .ArgumentParser (description = "Distributed H.265 batch converter (DeepSpeed compatible via env RANK/WORLD_SIZE)." )
305- ap .add_argument ("--file" , required = True , help = "一个或多个 list 文件或 glob(例: worker-*.txt my.list) " )
306- # 同时支持短横线和下划线参数名
307- ap .add_argument ("--source-root" , "--source_root" , dest = "source_root" , required = True , help = "源视频根目录 " )
308- ap .add_argument ("--target-root" , "--target_root" , dest = "target_root" , required = True , help = "输出根目录(将镜像源目录结构,统一输出 .mp4) " )
309- ap .add_argument ("--log-dir" , "--log_dir" , dest = "log_dir" , default = "logs" , help = "日志目录(默认: logs) " )
310- ap .add_argument ("--field-index" , "--field_index" , dest = "field_index" , type = int , default = 0 , help = "每行取第几个字段作为路径(默认 0) " )
311- ap .add_argument ("--strip-prefix" , "--strip_prefix" , dest = "strip_prefix" , default = "" , help = "从行内路径头部去掉的前缀(可选) " )
305+ ap .add_argument ("--file" , required = True , help = "One or more list files or glob (e.g.: worker-*.txt my.list) " )
306+ # Support both hyphen and underscore parameter names
307+ ap .add_argument ("--source-root" , "--source_root" , dest = "source_root" , required = True , help = "Source video root directory " )
308+ ap .add_argument ("--target-root" , "--target_root" , dest = "target_root" , required = True , help = "Output root directory (will mirror source directory structure, unified output .mp4) " )
309+ ap .add_argument ("--log-dir" , "--log_dir" , dest = "log_dir" , default = "logs" , help = "Log directory (default: logs) " )
310+ ap .add_argument ("--field-index" , "--field_index" , dest = "field_index" , type = int , default = 0 , help = "Which field per line to use as path (default 0) " )
311+ ap .add_argument ("--strip-prefix" , "--strip_prefix" , dest = "strip_prefix" , default = "" , help = "Prefix to remove from line path head (optional) " )
312312 ap .add_argument ("--local_rank" )
313313
314314 args = ap .parse_args ()
315315
316- # 准备日志(每个 rank 单独文件,避免并发写冲突)
316+ # Prepare logs (separate file per rank, avoid concurrent write conflicts)
317317 os .makedirs (args .log_dir , exist_ok = True )
318318 log_path = os .path .join (args .log_dir , f"failed.rank{ RANK } .txt" )
319319
@@ -326,7 +326,7 @@ def main():
326326 items_rank = shard_list (list_files , WORLD_SIZE , RANK )
327327 print (f"[rank { RANK } ] Items assigned to this rank: { len (items_rank )} " )
328328
329- # 2) 基于“当前 rank 的条目”构建任务与目标清单
329+ # 2) Build task and target list based on "current rank entries"
330330 tasks_rank , targets_rank = build_tasks_from_items (
331331 items = items_rank ,
332332 source_root = args .source_root ,
@@ -336,7 +336,7 @@ def main():
336336 print (f"[rank { RANK } ] Targets in this rank (dedup & src exists): { len (targets_rank )} " )
337337 print (f"[rank { RANK } ] Tasks to process in this rank: { len (tasks_rank )} " )
338338
339- # 保存本 rank 的目标路径列表
339+ # Save target path list for this rank
340340 targets_list_path = os .path .join (args .target_root , f"targets.rank{ RANK :03d} .txt" )
341341 try :
342342 with open (targets_list_path , "w" , encoding = "utf-8" ) as f :
@@ -346,7 +346,7 @@ def main():
346346 except Exception as e :
347347 print (f"[rank { RANK } ] Failed to save targets list { targets_list_path } : { e } " )
348348
349- # 3) 执行本 rank 的任务
349+ # 3) Execute tasks for this rank
350350 if tasks_rank :
351351 with Pool (processes = PROCESSES , maxtasksperchild = 64 ) as pool :
352352 pool .map (convert_to_h265 , tasks_rank )
0 commit comments