-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathbuild_oss_fuzz.py
More file actions
138 lines (117 loc) · 4.42 KB
/
build_oss_fuzz.py
File metadata and controls
138 lines (117 loc) · 4.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
build_oss_fuzz.py
Parallel build of OSS-Fuzz projects (Docker images and Fuzzer compilation).
Uses multiprocessing.Pool to distribute projects across multiple CPU cores for concurrent processing.
Usage: python3 build_oss_fuzz.py [project_list_file] [--sanitizer type] [--workers N]
Example: python3 fuzz/build_oss_fuzz.py data/valid_projects.txt \
--sanitizer address \
--workers 8
"""
import os
import sys
import subprocess
import argparse
from pathlib import Path
from typing import List, Optional, Tuple
from multiprocessing import Pool, cpu_count
import logging
# --- Global configuration ---
HOME_DIR = Path.home()
OSS_FUZZ_DIR = HOME_DIR / "FuzzAug" / "fuzz" / "oss-fuzz"
def run_command(
cmd: str,
log_msg: str,
allowed_exit_codes: Optional[List[int]] = None
) -> bool:
"""Execute a shell command and stream output to console"""
allowed_exit_codes = allowed_exit_codes or []
logging.info(f"▶️ {log_msg}")
logging.debug(f"$ {cmd}")
try:
process = subprocess.Popen(
f"yes | {cmd}",
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
encoding="utf-8",
errors="replace"
)
except FileNotFoundError:
logging.error(f"Command not found: {cmd}")
return False
except OSError as e:
logging.error(f"OS error while executing command: {e}")
return False
except ValueError as e:
logging.error(f"Invalid arguments to Popen: {e}")
return False
try:
if process.stdout:
for line in iter(process.stdout.readline, ""):
sys.stdout.write(line)
sys.stdout.flush()
process.wait()
exit_code = process.returncode
if exit_code in [0, *allowed_exit_codes]:
logging.info("✅ Command completed successfully")
return True
logging.error(f"❌ Command failed (exit code: {exit_code})")
return False
except KeyboardInterrupt:
logging.warning("⛔️ Command interrupted by user")
process.terminate()
return False
except Exception as e:
logging.exception(f"Unexpected error during process execution: {e}")
return False
def build_project(project_name: str, sanitizer: str) -> Tuple[bool, str]:
"""Build workflow for a single project"""
os.chdir(OSS_FUZZ_DIR)
logging.info("=" * 60)
logging.info(f"🔨 Starting build for project: {project_name}")
logging.info("=" * 60)
if not run_command(
f"python3 infra/helper.py build_image {project_name}",
"Step 1/2: Building Docker image"
):
return (False, project_name)
if not run_command(
f"python3 infra/helper.py build_fuzzers --sanitizer {sanitizer} {project_name}",
f"Step 2/2: Compiling Fuzzers (sanitizer={sanitizer})"
):
return (False, project_name)
logging.info(f"✅ Project {project_name} build completed")
return (True, project_name)
def main():
parser = argparse.ArgumentParser(description="OSS-Fuzz Parallel Build Tool")
parser.add_argument("project_list", help="Project list file path")
parser.add_argument("--sanitizer", default="address", choices=["address", "memory", "undefined"])
parser.add_argument("--workers", type=int, default=cpu_count())
args = parser.parse_args()
logging.basicConfig(
level=logging.INFO,
format='[%(levelname)s] [PID:%(process)d] %(message)s'
)
if not os.path.isfile(args.project_list):
logging.error(f"Project list file not found: {args.project_list}")
sys.exit(1)
try:
with open(args.project_list, "r", encoding="utf-8") as f:
projects = [line.strip() for line in f if line.strip()]
except OSError as e:
logging.error(f"OS error while reading project list: {e}")
sys.exit(1)
except UnicodeDecodeError as e:
logging.error(f"Encoding error while reading file: {e}")
sys.exit(1)
with Pool(args.workers) as pool:
results = pool.starmap(build_project, [(p, args.sanitizer) for p in projects])
failed = [p for success, p in results if not success]
logging.info(f"\n📊 Build completed: Success {len(projects) - len(failed)}/{len(projects)}")
if failed:
logging.warning("❌ Failed projects: " + ", ".join(failed))
if __name__ == "__main__":
main()