Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions fuzzing/tools/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,10 @@ py_test(
srcs = ["dict_validation_test.py"],
deps = [":dict_validation"],
)

py_test(
name = "make_corpus_dir_test",
srcs = ["make_corpus_dir_test.py"],
data = ["make_corpus_dir.py"],
deps = [requirement("absl-py")],
)
77 changes: 75 additions & 2 deletions fuzzing/tools/make_corpus_dir.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,36 @@

flags.mark_flag_as_required("output_dir")

def flatten_corpus_path(corpus):
prefix = ""
if corpus.startswith("./") or (os.sep == "\\" and corpus.startswith(".\\")):
prefix = "dot-"
corpus = corpus[2:]

if os.sep == "\\":
corpus = corpus.replace("/", "\\")

drive, tail = os.path.splitdrive(corpus)

parts = [part for part in tail.split(os.sep) if part]
flattened = "-".join(parts)

if drive:
drive_part = drive.rstrip(":\\/").replace("\\", "-").replace("/", "-")
flattened = drive_part + ("-" + flattened if flattened else "")
elif tail.startswith(os.sep):
flattened = "-" + flattened

return prefix + flattened

def expand_corpus_to_file_list(corpus, file_list):
if not os.path.exists(corpus):
raise FileNotFoundError("file " + corpus + " doesn't exist")
if os.path.isdir(corpus):
# The first element in glob("dir/**") is "dir/", which needs to be excluded
file_list.extend(glob.glob(os.path.join(corpus, "**"), recursive=True)[1:])
for expanded_path in glob.glob(os.path.join(corpus, "**"), recursive=True)[1:]:
if os.path.isfile(expanded_path):
file_list.append(expanded_path)
else:
file_list.append(corpus)

Expand All @@ -59,8 +83,57 @@ def main(argv):
corpus_line.rstrip("\n"), expanded_file_list)

if expanded_file_list:
max_flattened_length = 200
flattened_names = {}
flattened_name_counts = {}
needs_suffix = set()

for corpus in expanded_file_list:
flattened = flatten_corpus_path(corpus)
flattened_names[corpus] = flattened
flattened_key = flattened.lower() if os.name == "nt" else flattened
flattened_name_counts[flattened_key] = (
flattened_name_counts.get(flattened_key, 0) + 1)
if len(flattened) > max_flattened_length:
needs_suffix.add(corpus)

for corpus in expanded_file_list:
flattened = flattened_names[corpus]
flattened_key = flattened.lower() if os.name == "nt" else flattened
if flattened_name_counts[flattened_key] > 1:
needs_suffix.add(corpus)

suffix_map = {}
if needs_suffix:
suffix_width = len(str(len(needs_suffix)))
for index, corpus in enumerate(sorted(needs_suffix), start=1):
suffix_map[corpus] = f"{index:0{suffix_width}d}"

final_name_map = {}
final_name_counts = {}
for corpus in expanded_file_list:
flattened = flattened_names[corpus]
suffix = suffix_map.get(corpus)
if suffix:
prefix_budget = max_flattened_length - len(suffix) - 2
flattened = flattened[:max(1, prefix_budget)] + "--" + suffix
final_name_map[corpus] = flattened
flattened_key = flattened.lower() if os.name == "nt" else flattened
final_name_counts[flattened_key] = (
final_name_counts.get(flattened_key, 0) + 1)

if any(count > 1 for count in final_name_counts.values()):
unique_corpora = sorted(set(expanded_file_list))
alias_width = len(str(len(unique_corpora)))
alias_map = {
corpus: f"entry-{index:0{alias_width}d}"
for index, corpus in enumerate(unique_corpora, start=1)
}
for corpus in expanded_file_list:
final_name_map[corpus] = alias_map[corpus]

for corpus in expanded_file_list:
dest = os.path.join(FLAGS.output_dir, corpus.replace("/", "-"))
dest = os.path.join(FLAGS.output_dir, final_name_map[corpus])
# Whatever the separator we choose, there is an chance that
# the dest name conflicts with another file
if os.path.exists(dest):
Expand Down
186 changes: 186 additions & 0 deletions fuzzing/tools/make_corpus_dir_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Lint as: python3
"""Unit tests for make_corpus_dir.py."""

import os
import subprocess
import sys
import tempfile
import unittest
from pathlib import Path


def resolve_script_path():
candidates = [Path(__file__).with_name("make_corpus_dir.py")]
test_workspace = os.environ.get("TEST_WORKSPACE")
manifest_lookup_path = "fuzzing/tools/make_corpus_dir.py"
if test_workspace:
test_srcdir = os.environ.get("TEST_SRCDIR")
if test_srcdir:
candidates.append(
Path(test_srcdir) / test_workspace / "fuzzing" / "tools" /
"make_corpus_dir.py")
runfiles_dir = os.environ.get("RUNFILES_DIR")
if runfiles_dir:
candidates.append(
Path(runfiles_dir) / test_workspace / "fuzzing" / "tools" /
"make_corpus_dir.py")
manifest_lookup_path = (
f"{test_workspace}/fuzzing/tools/make_corpus_dir.py")

for candidate in candidates:
if candidate.is_file():
return candidate

manifest_file = os.environ.get("RUNFILES_MANIFEST_FILE")
if manifest_file:
try:
workspace_match = None
main_match = None
with open(manifest_file, "r", encoding="utf-8") as manifest:
for line in manifest:
entry = line.rstrip("\n")
if not entry:
continue
logical_path, separator, real_path = entry.partition(" ")
if not separator:
continue
normalized_path = logical_path.replace("\\", "/")
if not normalized_path.endswith("fuzzing/tools/make_corpus_dir.py"):
continue
candidate = Path(real_path)
if not candidate.is_file():
continue
if test_workspace and normalized_path.startswith(f"{test_workspace}/"):
workspace_match = candidate
break
if normalized_path.startswith("_main/") and not main_match:
main_match = candidate
if workspace_match:
return workspace_match
if main_match:
return main_match
except OSError:
pass

raise FileNotFoundError("could not resolve make_corpus_dir.py in test runfiles")


SCRIPT_PATH = resolve_script_path()


class MakeCorpusDirTest(unittest.TestCase):

def run_tool(self, args, cwd):
return subprocess.run(
[sys.executable, str(SCRIPT_PATH)] + args,
cwd=str(cwd),
text=True,
capture_output=True,
check=False,
)

def test_copies_nested_corpus_directory(self):
with tempfile.TemporaryDirectory() as td:
tmp = Path(td)
corpus = tmp / "corpus"
(corpus / "nested").mkdir(parents=True)
(corpus / "a.txt").write_text("A", encoding="utf-8")
(corpus / "nested" / "b.txt").write_text("B", encoding="utf-8")
output_dir = tmp / "out"

result = self.run_tool(
["--corpus_list=corpus", "--output_dir=out"], cwd=tmp)

self.assertEqual(result.returncode, 0, msg=result.stderr)
copied_files = [path for path in output_dir.iterdir() if path.is_file()]
self.assertEqual(len(copied_files), 2)
copied_contents = sorted(path.read_text(encoding="utf-8")
for path in copied_files)
self.assertEqual(copied_contents, ["A", "B"])

def test_copies_absolute_corpus_file(self):
with tempfile.TemporaryDirectory() as td:
tmp = Path(td)
corpus_file = tmp / "corpus-input.txt"
corpus_file.write_text("payload", encoding="utf-8")
output_dir = tmp / "out"

result = self.run_tool(
[f"--corpus_list={corpus_file}", f"--output_dir={output_dir}"],
cwd=tmp,
)

self.assertEqual(result.returncode, 0, msg=result.stderr)
copied_files = [path for path in output_dir.iterdir() if path.is_file()]
self.assertEqual(len(copied_files), 1)
self.assertEqual(copied_files[0].read_text(encoding="utf-8"), "payload")

def test_distinguishes_dot_prefix_from_plain_relative_path(self):
with tempfile.TemporaryDirectory() as td:
tmp = Path(td)
corpus_file = tmp / "a.txt"
corpus_file.write_text("payload", encoding="utf-8")
output_dir = tmp / "out"

result = self.run_tool(
["--corpus_list=./a.txt,a.txt", "--output_dir=out"],
cwd=tmp,
)

self.assertEqual(result.returncode, 0, msg=result.stderr)
copied_files = [path for path in output_dir.iterdir() if path.is_file()]
self.assertEqual(len(copied_files), 2)

def test_distinguishes_parent_navigation_from_plain_relative_path(self):
with tempfile.TemporaryDirectory() as td:
tmp = Path(td)
(tmp / "dir").mkdir()
corpus_file = tmp / "a.txt"
corpus_file.write_text("payload", encoding="utf-8")
output_dir = tmp / "out"

result = self.run_tool(
["--corpus_list=dir/../a.txt,a.txt", "--output_dir=out"],
cwd=tmp,
)

self.assertEqual(result.returncode, 0, msg=result.stderr)
copied_files = [path for path in output_dir.iterdir() if path.is_file()]
self.assertEqual(len(copied_files), 2)

def test_distinguishes_dot_prefix_from_literal_dot_filename(self):
with tempfile.TemporaryDirectory() as td:
tmp = Path(td)
(tmp / "a.txt").write_text("from-a", encoding="utf-8")
(tmp / "dot-a.txt").write_text("from-dot-a", encoding="utf-8")
output_dir = tmp / "out"

result = self.run_tool(
["--corpus_list=./a.txt,dot-a.txt", "--output_dir=out"],
cwd=tmp,
)

self.assertEqual(result.returncode, 0, msg=result.stderr)
copied_files = [path for path in output_dir.iterdir() if path.is_file()]
self.assertEqual(len(copied_files), 2)
copied_contents = sorted(path.read_text(encoding="utf-8")
for path in copied_files)
self.assertEqual(copied_contents, ["from-a", "from-dot-a"])


if __name__ == "__main__":
unittest.main()
Loading