diff --git a/garak/_config.py b/garak/_config.py index 9ee157795..e4bfb8e29 100644 --- a/garak/_config.py +++ b/garak/_config.py @@ -46,7 +46,7 @@ @dataclass class GarakSubConfig: - pass + """Base dataclass for garak configuration sub-objects.""" @dataclass @@ -132,11 +132,10 @@ def _key_exists(d: dict, key: str) -> bool: if not isinstance(d, dict) and not isinstance(d, list): return False if isinstance(d, list): - return any([_key_exists(item, key) for item in d]) + return any(_key_exists(item, key) for item in d) if isinstance(d, dict) and key in d.keys(): return True - else: - return any([_key_exists(val, key) for val in d.values()]) + return any(_key_exists(val, key) for val in d.values()) def _set_settings(config_obj, settings_obj: dict): @@ -187,7 +186,8 @@ def _load_config_files(settings_filenames) -> dict: print(f"⚠️ {msg}") else: logging.info( - f"API key found in {settings_filename}. Checking readability..." + "API key found in %s. Checking readability...", + settings_filename, ) res = os.stat(settings_filename) if res.st_mode & stat.S_IROTH or res.st_mode & stat.S_IRGRP: @@ -228,7 +228,8 @@ def _load_config_files(settings_filenames) -> dict: def _store_config(settings_files) -> None: - global system, run, plugins, reporting, version + """Load config files and apply settings to the global config objects.""" + global system, run, plugins, reporting, version # pylint: disable=global-statement settings = _load_config_files(settings_files) system = _set_settings(system, settings["system"]) run = _set_settings(run, settings["run"]) @@ -246,19 +247,24 @@ def _store_config(settings_files) -> None: REQUESTS_AGENT = "" -def _garak_user_agent(dummy=None): +def _garak_user_agent(_dummy=None): + """Return the current garak requests user-agent string. + + Accepts an ignored positional arg to match the ``requests`` UA callback signature. + """ return str(REQUESTS_AGENT) def set_all_http_lib_agents(agent_string): + """Set the same user-agent string for all HTTP libraries (requests, httpx, aiohttp).""" set_http_lib_agents( {"requests": agent_string, "httpx": agent_string, "aiohttp": agent_string} ) def set_http_lib_agents(agent_strings: dict): - - global REQUESTS_AGENT + """Set per-library user-agent strings from a dict keyed by library name.""" + global REQUESTS_AGENT # pylint: disable=global-statement if "requests" in agent_strings: from requests import utils @@ -276,6 +282,7 @@ def set_http_lib_agents(agent_strings: dict): def get_http_lib_agents(): + """Return the current user-agent strings for requests, httpx, and aiohttp.""" from requests import utils import httpx import aiohttp @@ -289,7 +296,8 @@ def get_http_lib_agents(): def load_base_config() -> None: - global loaded + """Load garak.core.yaml — the minimal base configuration.""" + global loaded # pylint: disable=global-statement settings_files = [str(transient.package_dir / "resources" / "garak.core.yaml")] logging.debug("Loading configs from: %s", ",".join(settings_files)) _store_config(settings_files=settings_files) @@ -299,9 +307,10 @@ def load_base_config() -> None: def load_config( site_config_filename="garak.site.yaml", run_config_filename=None ) -> None: + """Load site and run config files on top of the base config.""" # would be good to bubble up things from run_config, e.g. generator, probe(s), detector(s) # and then not have cli be upset when these are not given as cli params - global loaded + global loaded # pylint: disable=global-statement settings_files = [str(transient.package_dir / "resources" / "garak.core.yaml")] @@ -318,7 +327,7 @@ def load_config( message = "Multiple site config files found (garak.site.json, garak.site.yaml, garak.site.yml). Please use only one site config format." logging.error(message) raise ValueError(message) - elif has_json: + if has_json: settings_files.append(site_config_json) elif has_yaml: settings_files.append(site_config_yaml) @@ -378,7 +387,9 @@ def load_config( if has_json and (has_yaml or has_yml): yaml_ext = ".yaml" if has_yaml else ".yml" logging.warning( - f"Both {run_config_filename}.json and {yaml_ext} found. Using .json" + "Both %s.json and %s found. Using .json", + run_config_filename, + yaml_ext, ) if has_json: settings_files.append(json_path) @@ -410,6 +421,7 @@ def load_config( def parse_plugin_spec( spec: str, category: str, probe_tag_filter: str = "" ) -> tuple[List[str], List[str]]: + """Expand a plugin spec string (e.g. 'all', 'dan', 'probes.dan.AntiDAN') into lists of known and unknown plugin names.""" from garak._plugins import enumerate_plugins if spec is None or spec.lower() in ("", "auto", "none"): @@ -453,7 +465,7 @@ def parse_plugin_spec( plugin_class_name = plugin_name.split(".")[-1] m = importlib.import_module(f"garak.{plugin_module_name}") c = getattr(m, plugin_class_name) - if not any([tag.startswith(probe_tag_filter) for tag in c.tags]): + if not any(tag.startswith(probe_tag_filter) for tag in c.tags): plugins_to_skip.append( plugin_name ) # using list.remove doesn't update for-loop position diff --git a/garak/command.py b/garak/command.py index cf0d3e83b..aafe9a8d1 100644 --- a/garak/command.py +++ b/garak/command.py @@ -11,6 +11,12 @@ def hint(msg, logging=None): + """Print a probabilistic hint message and optionally log it. + + Uses a global HINT_CHANCE probability so hints don't appear on every run. + The logging parameter is passed explicitly to avoid import-order issues with + the thin garak logging setup. + """ # sub-optimal, but because our logging setup is thin & uses the global # default, placing a top-level import can break logging - so we can't # assume `logging` is imported at this point. @@ -22,6 +28,7 @@ def hint(msg, logging=None): def deprecation_notice(deprecated_item: str, version: str, logging=None): + """Print and optionally log a deprecation notice for the given item.""" msg = f"DEPRECATION: {deprecated_item} is deprecated since version {version}" visible_msg = f"✋ {msg}" if logging is not None: @@ -30,6 +37,7 @@ def deprecation_notice(deprecated_item: str, version: str, logging=None): def start_logging(): + """Initialise logging and return the configured log filename.""" from garak import _config log_filename = _config.transient.log_filename @@ -40,6 +48,7 @@ def start_logging(): def start_run(): + """Set up the run UUID, reporting directory, and open the report file.""" import logging import os import uuid @@ -49,7 +58,7 @@ def start_run(): logging.info("run started at %s", _config.transient.starttime_iso) # print("ASSIGN UUID", args) - if _config.system.lite and "probes" not in _config.transient.cli_args and _config.transient.cli_args.list_probes is None and not _config.transient.cli_args.list_detectors and not _config.transient.cli_args.list_generators and not _config.transient.cli_args.list_buffs and not _config.transient.cli_args.list_config and not _config.transient.cli_args.plugin_info and not _config.run.interactive: # type: ignore + if _config.system.lite and "probes" not in _config.transient.cli_args and _config.transient.cli_args.list_probes is None and not _config.transient.cli_args.list_detectors and not _config.transient.cli_args.list_generators and not _config.transient.cli_args.list_buffs and not _config.transient.cli_args.list_config and not _config.transient.cli_args.plugin_info and not _config.run.interactive: # type: ignore # pylint: disable=no-member # cli_args attrs set dynamically by argparse hint( "The current/default config is optimised for speed rather than thoroughness. Try e.g. --config full for a stronger test, or specify some probes.", logging=logging, @@ -122,6 +131,7 @@ def start_run(): def end_run(): + """Close the report file, write a completion entry, and build the HTML digest.""" import datetime import logging @@ -146,9 +156,11 @@ def end_run(): digest_filename = _config.transient.report_filename.replace(".jsonl", ".html") print(f"📜 report html summary being written to {digest_filename}") + # pylint: disable=broad-exception-caught # report building must not crash the CLI run try: write_report_digest(_config.transient.report_filename, digest_filename) except Exception as e: + # pylint: enable=broad-exception-caught msg = "Didn't successfully build the report - JSON log preserved. " + repr(e) logging.exception(e) logging.info(msg) @@ -163,6 +175,7 @@ def _tier_name(tier_value): """Convert a tier int value to its enum name string.""" try: from garak.probes._tier import Tier + return Tier(int(tier_value)).name except (ValueError, TypeError): return "" @@ -171,7 +184,7 @@ def _tier_name(tier_value): def _truncate(text, max_len=80): """Truncate text to max_len, appending ellipsis if needed.""" if len(text) > max_len: - return text[:max_len - 1] + "…" + return text[: max_len - 1] + "…" return text @@ -180,7 +193,12 @@ def _truncate(text, max_len=80): # "name" and "active" are always included and handled separately. _PLUGIN_TABLE_COLUMNS = { "probes": [ - ("tier", lambda info: _tier_name(info.get("tier")) if info.get("tier") is not None else ""), + ( + "tier", + lambda info: ( + _tier_name(info.get("tier")) if info.get("tier") is not None else "" + ), + ), ("description", lambda info: _truncate(info.get("description", ""))), ], # Future plugin types can define their own extra columns here, e.g.: @@ -190,7 +208,7 @@ def _truncate(text, max_len=80): } -def print_plugins(prefix: str, color, selected_plugins=None, verbose: int=0): +def print_plugins(prefix: str, color, selected_plugins=None, verbose: int = 0): """ Print plugins for a category (probes/detectors/generators/buffs). @@ -201,7 +219,7 @@ def print_plugins(prefix: str, color, selected_plugins=None, verbose: int=0): verbose: Verbosity level. 0 = plain list, >=1 = markdown table with metadata. """ from colorama import Style - from garak._plugins import enumerate_plugins, plugin_info as get_plugin_info, PLUGIN_TYPES + from garak._plugins import enumerate_plugins, PLUGIN_TYPES if prefix not in PLUGIN_TYPES: raise ValueError(f"Requested prefix '{prefix}' is not a valid plugin type") @@ -217,7 +235,10 @@ def print_plugins(prefix: str, color, selected_plugins=None, verbose: int=0): print(f"No {prefix} match the provided filter") return - short = [(p.replace(f"{prefix}.", ""), a, p) for p, a, *_ in [(pn, ac, pn) for pn, ac in rows]] + short = [ + (p.replace(f"{prefix}.", ""), a, p) + for p, a, *_ in [(pn, ac, pn) for pn, ac in rows] + ] if selected_plugins is None: module_names = {(m.split(".")[0], True, None) for m, a, _ in short} short += module_names @@ -270,7 +291,12 @@ def _print_plugins_table(sorted_items, prefix): print(f"{prefix}:") print( markdown_table(table_data) - .set_params(row_sep="markdown", padding_width=1, padding_weight="centerleft", quote=False) + .set_params( + row_sep="markdown", + padding_width=1, + padding_weight="centerleft", + quote=False, + ) .get_markdown() ) @@ -288,18 +314,21 @@ def print_probes(selected_probes=None, verbose=0): def print_detectors(selected_detectors=None): + """Print available detectors, optionally filtered to selected_detectors.""" from colorama import Fore print_plugins("detectors", Fore.LIGHTBLUE_EX, selected_detectors) def print_generators(): + """Print all available generators.""" from colorama import Fore print_plugins("generators", Fore.LIGHTMAGENTA_EX) def print_buffs(): + """Print all available buffs.""" from colorama import Fore print_plugins("buffs", Fore.LIGHTGREEN_EX) @@ -307,6 +336,7 @@ def print_buffs(): # describe plugin def plugin_info(plugin_name): + """Print all known metadata for the named plugin.""" from garak._plugins import plugin_info info = plugin_info(plugin_name) @@ -333,6 +363,7 @@ def plugin_info(plugin_name): # do a run def probewise_run(generator, probe_names, evaluator, buffs): + """Run probes one-by-one through the probewise harness.""" import garak.harnesses.probewise probewise_h = garak.harnesses.probewise.ProbewiseHarness() @@ -340,6 +371,7 @@ def probewise_run(generator, probe_names, evaluator, buffs): def pxd_run(generator, probe_names, detector_names, evaluator, buffs): + """Run probes through the probe-x-detector (PxD) harness.""" import garak.harnesses.pxd pxd_h = garak.harnesses.pxd.PxD() @@ -359,6 +391,7 @@ def _enumerate_obj_values(o): def list_config(): + """Print all current garak config values to stdout.""" from garak import _config print("_config:") @@ -370,6 +403,7 @@ def list_config(): def write_report_digest(report_filename, html_report_filename): + """Build and write the HTML digest for the given JSONL report file.""" from garak.analyze import report_digest digest = report_digest.build_digest(report_filename) diff --git a/garak/configurable.py b/garak/configurable.py index f79c3edb9..b1f3ec473 100644 --- a/garak/configurable.py +++ b/garak/configurable.py @@ -1,6 +1,8 @@ # SPDX-FileCopyrightText: Portions Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 +"""Base class providing config loading, dependency injection, and env-var API key handling.""" + import importlib import logging import inspect @@ -22,6 +24,13 @@ def _import_failed(absent_modules: List[str], calling_module: str): class Configurable: + """Mixin providing config-file loading, optional-dep injection, and API key resolution. + + Subclasses may define ``ENV_VAR`` (str) to enable automatic API key lookup from + the environment. This is a dynamic class-level attribute and is intentionally not + declared on the base class — pylint no-member suppressions are applied where needed. + """ + # list of strings naming modules required but not explicitly in garak by default extra_dependency_names = [] @@ -109,7 +118,9 @@ def _load_config(self, config_root=_config): self._apply_config(plugins_config[namespaced_klass]) self._apply_run_defaults() self._apply_missing_instance_defaults() + # pylint: disable=no-member # ENV_VAR is a dynamic class attr defined by subclasses if hasattr(self, "ENV_VAR") and self.ENV_VAR: + # pylint: enable=no-member if not hasattr(self, "key_env_var"): self.key_env_var = self.ENV_VAR self._validate_env_var() @@ -123,14 +134,16 @@ def _apply_config(self, config): # skip entries for more qualified items or any plugin type # should this be coupled to `_plugins`? continue + # pylint: disable=unsupported-membership-test # _supported_params is None or tuple; isinstance guard above ensures safety if ( isinstance(self._supported_params, tuple) and k not in self._supported_params ): + # pylint: enable=unsupported-membership-test # if the class has a set of supported params skip unknown params # should this pass signature arguments as supported? logging.warning( - f"Unknown configuration key for {classname}: '{k}' - skipping" + "Unknown configuration key for %s: '%s' - skipping", classname, k ) continue if hasattr(self, k): @@ -168,7 +181,9 @@ def _apply_missing_instance_defaults(self): def _validate_env_var(self): if hasattr(self, "key_env_var") and self.key_env_var: + # pylint: disable=access-member-before-definition # intentional: api_key is lazy-set below if absent if not hasattr(self, "api_key") or self.api_key is None: + # pylint: enable=access-member-before-definition self.api_key = os.getenv(self.key_env_var, default=None) if self.api_key is None: if hasattr( diff --git a/garak/exception.py b/garak/exception.py index d333d5793..222f1249a 100644 --- a/garak/exception.py +++ b/garak/exception.py @@ -1,6 +1,8 @@ # SPDX-FileCopyrightText: Portions Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 +"""Custom exception types raised by garak components.""" + class GarakException(Exception): """Base class for all garak exceptions""" diff --git a/garak/interactive.py b/garak/interactive.py index 22d5f71a2..f86ff8db1 100644 --- a/garak/interactive.py +++ b/garak/interactive.py @@ -3,6 +3,8 @@ # SPDX-FileCopyrightText: Portions Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 +"""Interactive terminal mode for garak, built on cmd2.""" + import argparse from logging import getLogger import random @@ -52,9 +54,10 @@ def _get_list_parser(): def print_plugins(prefix, color): + """Print all plugins of the given category to stdout.""" plugin_names = enumerate_plugins(category=prefix) plugin_names = [(p.replace(f"{prefix}.", ""), a) for p, a in plugin_names] - module_names = set([(m.split(".")[0], True) for m, a in plugin_names]) + module_names = {(m.split(".")[0], True) for m, a in plugin_names} plugin_names += module_names for plugin_name, active in sorted(plugin_names): print(f"{Style.BRIGHT}{color}{prefix}: {Style.RESET_ALL}", end="") @@ -68,12 +71,15 @@ def print_plugins(prefix, color): @cmd2.with_default_category("Garak Commands") class GarakCommands(cmd2.CommandSet): + """cmd2 CommandSet exposing list, probe, and other interactive garak commands.""" + def __init__(self): """Initialize the Garak Commands object.""" super().__init__() @cmd2.with_argparser(list_parser) def do_list(self, args): + """List available probes, detectors, or generators.""" if not args.type: print("Choose probes, detectors, or generators.") @@ -97,14 +103,15 @@ def do_list(self, args): @cmd2.with_argparser(probe_parser) def do_probe(self, args): + """Run a probe against the configured target.""" if not self._cmd.target_type or not self._cmd.target_model: print( "Use the `set` command to set the target_type and target_model first." ) - return + return None # If probe is already set, overwrite it. if args.probe and self._cmd.probe: - logger.warning(f"Probe already set. Resetting probe to {args.probe}") + logger.warning("Probe already set. Resetting probe to %s", args.probe) print(f"Executing {args.probe}") self._cmd.probe = args.probe elif not args.probe and not self._cmd.probe: @@ -139,6 +146,7 @@ def do_probe(self, args): harness.run(generator, [self._cmd.probe], evaluator) logger.info("Run complete, ending") print("Run complete!") + return None class GarakTerminal(cmd2.Cmd): @@ -192,12 +200,11 @@ def __init__(self): self.remove_settable("editor") self.remove_settable("feedback_to_output") - def default(self, command: str) -> None: - """Execute when a command isn't recognized""" - print(f"Command does not exist.\n") - return None + def default(self, _command: str) -> None: + """Execute when a command isn't recognized.""" + print("Command does not exist.\n") - def postcmd(self, stop, line): + def postcmd(self, stop, _line): """Set the prompt to reflect interaction changes.""" target_type = self.target_type target_model = self.target_model @@ -220,14 +227,19 @@ def _load_garak(self): self.register_command_set(self._cmd) @cmd2.with_argument_list - def do_quit(self, args): + def do_quit( + self, _args + ): # pylint: disable=unused-argument # cmd2 interface requires the parameter + """Quit the interactive garak terminal.""" print(self.quit_message) sys.exit(0) def settings_ns_provider(self) -> argparse.Namespace: """Populate an argparse Namespace with current settings""" ns = argparse.Namespace() - ns.app_settings = self.settings + ns.app_settings = ( + self.settings + ) # pylint: disable=no-member # cmd2.Cmd sets `settings` dynamically return ns diff --git a/garak/payloads.py b/garak/payloads.py index 24d217e0a..65f9db3c8 100644 --- a/garak/payloads.py +++ b/garak/payloads.py @@ -6,11 +6,12 @@ from __future__ import annotations import json -import jsonschema import logging import pathlib from typing import Generator, List, Union +import jsonschema + import garak._config import garak.exception @@ -168,7 +169,7 @@ def _refresh_payloads(self) -> None: payload objects, and refresh self.payload_list""" self.__class__.payload_list = self._scan_payload_dir(PAYLOAD_DIR) - def search( + def search( # pylint: disable=not-an-iterable,unsubscriptable-object # payload_list is None at class level but always a dict at runtime after _refresh_payloads self, types: Union[List[str], None] = None, include_children=True ) -> Generator[str, None, None]: """Return list of payload names, optionally filtered by types""" @@ -176,20 +177,17 @@ def search( if types is None: yield payload else: + payload_types = self.__class__.payload_list[payload]["types"] if include_children is False: matches = [ payload_type == type_prefix - for payload_type in self.__class__.payload_list[payload][ - "types" - ] + for payload_type in payload_types for type_prefix in types ] else: matches = [ payload_type.startswith(type_prefix) - for payload_type in self.__class__.payload_list[payload][ - "types" - ] + for payload_type in payload_types for type_prefix in types ] if any(matches): @@ -206,7 +204,9 @@ def _load_payload( def load(self, name) -> PayloadGroup: """Return a PayloadGroup""" try: + # pylint: disable=unsubscriptable-object # payload_list is None at class level but always a dict at runtime path = self.__class__.payload_list[name]["path"] + # pylint: enable=unsubscriptable-object p = self._load_payload(name, path) # or raise KeyError except KeyError as ke: @@ -232,8 +232,10 @@ def __init__(self) -> None: def search( types: Union[List[str], None] = None, include_children=True ) -> Generator[str, None, None]: + """Module-level convenience: search payloads via a transient Director.""" return Director().search(types, include_children) def load(name: str) -> PayloadGroup: + """Module-level convenience: load a named payload via a transient Director.""" return Director().load(name) diff --git a/garak/report.py b/garak/report.py index 8e7dfa039..7ee239dce 100644 --- a/garak/report.py +++ b/garak/report.py @@ -2,10 +2,10 @@ import importlib import json +from datetime import date + import numpy as np import pandas as pd - -from datetime import date import avidtools.datamodels.report as ar import avidtools.datamodels.components as ac import avidtools.datamodels.enums as ae @@ -67,13 +67,13 @@ def get_evaluations(self): raise ValueError("No evaluations to report 🤷") # preprocess - for i in range(len(evals)): - module_name, plugin_class_name = evals[i]["probe"].split(".") + for eval_record in evals: + module_name, plugin_class_name = eval_record["probe"].split(".") mod = importlib.import_module(f"garak.probes.{module_name}") - evals[i]["probe"] = f"{module_name}.{plugin_class_name}" + eval_record["probe"] = f"{module_name}.{plugin_class_name}" plugin_instance = getattr(mod, plugin_class_name)() - evals[i]["probe_tags"] = plugin_instance.tags + eval_record["probe_tags"] = plugin_instance.tags self.evaluations = pd.DataFrame.from_dict(evals) self.evaluations["score"] = np.where( @@ -149,7 +149,9 @@ def export(self): # TODO: add html format ) ] all_tags = probe_data.iloc[0]["probe_tags"] - if all_tags == all_tags: # check for NaN + # pylint: disable=comparison-with-itself # NaN sentinel: pandas NaN != NaN is False + if all_tags == all_tags: + # pylint: enable=comparison-with-itself tags_split = [ tag.split(":") for tag in all_tags if tag.startswith("avid") ] # supports only avid taxonomy for now diff --git a/pylintrc b/pylintrc index 771492bb8..57156dcad 100644 --- a/pylintrc +++ b/pylintrc @@ -93,9 +93,8 @@ py-version=3.10 # Discover python modules and packages in the file system subtree. recursive=no -# When enabled, pylint would attempt to guess common misconfiguration and emit -# user-friendly hints instead of false-positive error messages. -suggestion-mode=yes +# suggestion-mode was removed in pylint 3.x; option omitted to avoid +# "unrecognized-option" errors on modern pylint versions. # Allow loading of arbitrary C extensions. Extensions are imported into the # active Python interpreter and may run arbitrary code. @@ -472,6 +471,7 @@ disable=invalid-name, too-many-return-statements, too-many-branches, too-many-arguments, + too-many-positional-arguments, too-many-locals, too-many-statements, too-many-boolean-expressions, @@ -928,7 +928,8 @@ ignored-checks-for-mixins=no-member, # List of class names for which member attributes should not be checked (useful # for classes with dynamically set attributes). This supports the use of # qualified names. -ignored-classes=optparse.Values,thread._local,_thread._local,argparse.Namespace +ignored-classes=optparse.Values,thread._local,_thread._local,argparse.Namespace, + Director # Show a hint with possible names when a member name was not found. The aspect # of finding the hint is based on edit distance.