Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ test-force: ## Run tests in a Docker container while ignoring any stored state

test-debug: ## Spawn an interactive shell in the test container to debug
@docker compose build
@docker compose run --rm test /bin/bash
@docker compose run --rm with-sast /bin/bash

docs-serve: ## Serve the documentation locally
@mkdocs serve --livereload
6 changes: 3 additions & 3 deletions codesectools/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ def get_downloadable() -> dict[str, DownloadableRequirement | Dataset]:


@cli.command(hidden=download_hidden)
def download(name: download_arg_type = download_arg_value) -> None:
"""Download any missing resources that are available for download."""
def download(name: download_arg_type = download_arg_value, test: bool = False) -> None:
"""Download and install any missing resources that are available for download."""
if name is None:
print("All downloadable resources have been retrieved.")
else:
Expand All @@ -174,7 +174,7 @@ def download(name: download_arg_type = download_arg_value) -> None:
if isinstance(downloadable, DownloadableRequirement):
downloadable.download()
else:
downloadable.download_dataset()
downloadable.download_dataset(test=test)


cli.add_typer(build_all_sast_cli())
Expand Down
21 changes: 19 additions & 2 deletions codesectools/datasets/BenchmarkJava/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""

import csv
import random
from pathlib import Path
from typing import Self

Expand Down Expand Up @@ -99,12 +100,28 @@ def __eq__(self, other: str | Self) -> bool:
else:
return False

def download_files(self: Self) -> None:
"""Download the dataset files from the official Git repository."""
def download_files(self: Self, test: bool = False) -> None:
"""Download the dataset files from the official Git repository.

Clones the BenchmarkJava repository and, if in test mode, prunes it to a smaller size.

Args:
test: If True, reduce the number of test files for faster testing.

"""
git.Repo.clone_from(
"https://github.com/OWASP-Benchmark/BenchmarkJava.git", self.directory
)

if test:
testcodes = list(
(
self.directory / "src/main/java/org/owasp/benchmark/testcode"
).iterdir()
)
for to_delete_testcode in random.sample(testcodes, k=len(testcodes) - 50):
to_delete_testcode.unlink()

def load_dataset(self) -> list[TestCode]:
"""Load the BenchmarkJava dataset from its source files.

Expand Down
2 changes: 1 addition & 1 deletion codesectools/datasets/CVEfixes/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def __init__(self, lang: str | None = None) -> None:
self.max_repo_size = 100e6
super().__init__(lang)

def download_files(self: Self) -> None:
def download_files(self: Self, test: bool = False) -> None:
"""Copy the dataset files from the package data directory to the user cache."""
self.directory.mkdir(exist_ok=True, parents=True)
license_file = DATA_DIR / self.name / "LICENSE"
Expand Down
27 changes: 22 additions & 5 deletions codesectools/datasets/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,33 @@ def prompt_license_agreement(self) -> None:
raise typer.Exit(code=1)

@abstractmethod
def download_files(self) -> None:
"""Download the raw dataset files."""
def download_files(self, test: bool = False) -> None:
"""Download the raw dataset files.

This method must be implemented by subclasses to define how the
raw files for the dataset are obtained.

Args:
test: If True, download a smaller subset of the dataset for testing.

"""
pass

def download_dataset(self) -> None:
"""Handle the full dataset download process, including license prompt and caching."""
def download_dataset(self, test: bool = False) -> None:
"""Handle the full dataset download process, including license prompt and caching.

This method orchestrates the download by first prompting for license
agreement, then calling the `download_files` method, and finally creating
a `.complete` file to mark the dataset as cached.

Args:
test: If True, download a smaller subset of the dataset for testing.

"""
self.prompt_license_agreement()
with Progress() as progress:
progress.add_task(f"Downloading [b]{self.name}[/b]...", total=None)
self.download_files()
self.download_files(test=test)
(self.directory / ".complete").write_bytes(b"\x42")
print(f"[b]{self.name}[/b] has been downloaded at {self.directory}.")

Expand Down
79 changes: 54 additions & 25 deletions codesectools/sasts/all/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
from codesectools.sasts import SASTS_ALL
from codesectools.sasts.all.graphics import ProjectGraphics
from codesectools.sasts.all.sast import AllSAST
from codesectools.sasts.core.sast import PrebuiltSAST
from codesectools.sasts.core.sast import PrebuiltBuildlessSAST, PrebuiltSAST
from codesectools.utils import group_successive


def build_cli() -> typer.Typer:
Expand Down Expand Up @@ -72,11 +73,11 @@ def analyze(
),
],
# Additional options
artifact_dir: Annotated[
artifacts: Annotated[
Path | None,
typer.Option(
help="Pre-built artifacts directory (for PrebuiltSAST only)",
metavar="ARTIFACT_DIR",
metavar="ARTIFACTS",
),
] = None,
# Common NOT REQUIRED option
Expand All @@ -90,27 +91,28 @@ def analyze(
) -> None:
"""Run analysis on the current project with all available SAST tools."""
for sast in all_sast.sasts_by_lang.get(lang, []):
if isinstance(sast, PrebuiltSAST) and artifact_dir is None:
print(f"{sast.name} required pre-built artifacts for analysis")
if isinstance(sast, PrebuiltBuildlessSAST) and artifacts is None:
print(
"Please provide the directory with artifacts (with --artifact-dir) to include this tool"
f"[i]{sast.name} can use pre-built artifacts ({sast.artefact_name} {sast.artefact_type}) for more accurate analysis"
)
print("[i]Use the flag --artifacts to provide the artifacts")
elif isinstance(sast, PrebuiltSAST) and artifacts is None:
print(
f"[b]Skipping {sast.name} because it requires pre-built artifacts ({sast.artefact_name} {sast.artefact_type})"
)
print("[b]Use the flag --artifacts to provide the artifacts")
continue

output_dir = sast.output_dir / Path.cwd().name
if output_dir.is_dir():
if overwrite:
shutil.rmtree(output_dir)
sast.run_analysis(
lang, Path.cwd(), output_dir, artifact_dir=artifact_dir
)
sast.run_analysis(lang, Path.cwd(), output_dir, artifacts=artifacts)
else:
print(f"Found existing analysis result at {output_dir}")
print("Use --overwrite to overwrite it")
else:
sast.run_analysis(
lang, Path.cwd(), output_dir, artifact_dir=artifact_dir
)
sast.run_analysis(lang, Path.cwd(), output_dir, artifacts=artifacts)

@cli.command(help="Benchmark a dataset using all SAST tools.")
def benchmark(
Expand Down Expand Up @@ -323,20 +325,43 @@ def report(
defect_table.add_column("SAST", justify="center")
defect_table.add_column("CWE", justify="center")
defect_table.add_column("Message")
for defect in sorted(set(defect_data["raw"]), key=lambda d: d.location[0]):
if location := defect.location:
start, end = location
shortcut = Text(f"{start}", style=Style(link=f"#L{start}"))
rows = []
for defect in defect_data["raw"]:
groups = group_successive(defect.lines)
if groups:
for group in groups:
start, end = group[0], group[-1]
shortcut = Text(f"{start}", style=Style(link=f"#L{start}"))
cwe_link = (
Text(
f"CWE-{defect.cwe.id}",
style=Style(
link=f"https://cwe.mitre.org/data/definitions/{defect.cwe.id}.html"
),
)
if defect.cwe.id != -1
else "None"
)
rows.append(
(start, shortcut, defect.sast, cwe_link, defect.message)
)
else:
shortcut = "None"
cwe_link = Text(
f"CWE-{defect.cwe.id}",
style=Style(
link=f"https://cwe.mitre.org/data/definitions/{defect.cwe.id}.html"
),
)
defect_table.add_row(shortcut, defect.sast, cwe_link, defect.message)
cwe_link = (
Text(
f"CWE-{defect.cwe.id}",
style=Style(
link=f"https://cwe.mitre.org/data/definitions/{defect.cwe.id}.html"
),
)
if defect.cwe.id != -1
else "None"
)
rows.append(
(float("inf"), "None", defect.sast, cwe_link, defect.message)
)

for row in sorted(rows, key=lambda r: r[0]):
defect_table.add_row(*row[1:])
defect_page.print(defect_table)

# Syntax
Expand All @@ -352,7 +377,11 @@ def report(
for location in defect_data["locations"]:
sast, cwe, message, (start, end) = location
for i in range(start, end + 1):
text = f"<b>{sast}</b>: <i>{message} (CWE-{cwe.id})</i>"
text = (
f"<b>{sast}</b>: <i>{message} (CWE-{cwe.id})</i>"
if cwe.id != -1
else f"<b>{sast}</b>: <i>{message}</i>"
)
if highlights.get(i):
highlights[i].add(text)
else:
Expand Down
22 changes: 10 additions & 12 deletions codesectools/sasts/all/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from codesectools.sasts import SASTS_ALL
from codesectools.sasts.core.parser import AnalysisResult
from codesectools.utils import group_successive

if TYPE_CHECKING:
from codesectools.sasts.core.sast import SAST
Expand Down Expand Up @@ -149,12 +150,10 @@ def stats_by_scores(self) -> dict:

defect_locations = {}
for defect in defects:
if any(defect.location):
start, end = defect.location
for line in range(start, end + 1):
if not defect_locations.get(line):
defect_locations[line] = []
defect_locations[line].append(defect)
for line in defect.lines:
if not defect_locations.get(line):
defect_locations[line] = []
defect_locations[line].append(defect)

defects_same_location = 0
defects_same_location_same_cwe = 0
Expand Down Expand Up @@ -202,12 +201,11 @@ def prepare_report_data(self) -> dict:

locations = []
for defect in defects:
if any(defect.location):
start, end = defect.location
if start and end:
locations.append(
(defect.sast, defect.cwe, defect.message, (start, end))
)
for group in group_successive(defect.lines):
start, end = group[0], group[-1]
locations.append(
(defect.sast, defect.cwe, defect.message, (start, end))
)

report["defects"][defect_file] = {
"score": scores[defect_file]["score"],
Expand Down
36 changes: 24 additions & 12 deletions codesectools/sasts/core/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
GitRepoDatasetGraphics,
ProjectGraphics,
)
from codesectools.sasts.core.sast import SAST, PrebuiltSAST
from codesectools.sasts.core.sast import SAST, PrebuiltBuildlessSAST, PrebuiltSAST


class CLIFactory:
Expand Down Expand Up @@ -139,17 +139,23 @@ def add_analyze(self: Self, help: str = "") -> None:

"""
# PrebuiltSAST additional options
if isinstance(self.sast, PrebuiltSAST):
artifact_dir_default = typer.Option(
help="Pre-built artifacts directory",
metavar="ARTIFACT_DIR",
if isinstance(self.sast, PrebuiltBuildlessSAST):
artifacts_default = typer.Option(
default=None,
help=f"Pre-built artifacts ({self.sast.artefact_name} {self.sast.artefact_type}) for more accurate analysis",
metavar="ARTIFACTS",
)
elif isinstance(self.sast, PrebuiltSAST):
artifacts_default = typer.Option(
help=f"Pre-built artifacts ({self.sast.artefact_name} {self.sast.artefact_type})",
metavar="ARTIFACTS",
)
else:
artifact_dir_default = typer.Option(
artifacts_default = typer.Option(
default=None,
hidden=True,
help="Pre-built artifacts directory (for PrebuiltSAST only)",
metavar="ARTIFACT_DIR",
help="Pre-built artifacts (for PrebuiltSAST only)",
metavar="ARTIFACTS",
)

@self.cli.command(help=help)
Expand All @@ -163,7 +169,7 @@ def analyze(
),
],
# Additional REQUIRED options
artifact_dir: Optional[Path] = artifact_dir_default,
artifacts: Optional[Path] = artifacts_default,
# Common NOT REQUIRED option
overwrite: Annotated[
bool,
Expand All @@ -177,23 +183,29 @@ def analyze(

Args:
lang: The source code language to analyze.
artifact_dir: The directory containing pre-built artifacts, required for PrebuiltSAST tools.
artifacts: The path to pre-built artifacts, required for PrebuiltSAST tools.
overwrite: If True, overwrite any existing analysis results for the project.

"""
if isinstance(self.sast, PrebuiltBuildlessSAST) and artifacts is None:
print(
f"[i]{self.sast.name} can use pre-built artifacts ({self.sast.artefact_name} {self.sast.artefact_type}) for more accurate analysis"
)
print("[i]Use the flag --artifacts to provide the artifacts")

output_dir = self.sast.output_dir / Path.cwd().name
if output_dir.is_dir():
if overwrite:
shutil.rmtree(output_dir)
self.sast.run_analysis(
lang, Path.cwd(), output_dir, artifact_dir=artifact_dir
lang, Path.cwd(), output_dir, artifacts=artifacts
)
else:
print(f"Found existing analysis result at {output_dir}")
print("Use --overwrite to overwrite it")
else:
self.sast.run_analysis(
lang, Path.cwd(), output_dir, artifact_dir=artifact_dir
lang, Path.cwd(), output_dir, artifacts=artifacts
)

def add_benchmark(self, help: str = "") -> None:
Expand Down
Loading