diff --git a/.github/workflows/pylint.yml b/.github/workflows/pylint.yml new file mode 100644 index 000000000000..c73e032c0f3f --- /dev/null +++ b/.github/workflows/pylint.yml @@ -0,0 +1,23 @@ +name: Pylint + +on: [push] + +jobs: + build: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.8", "3.9", "3.10"] + steps: + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v3 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install pylint + - name: Analysing the code with pylint + run: | + pylint $(git ls-files '*.py') diff --git a/DIRECTORY.md b/DIRECTORY.md index 36acb3b97f1e..8b5034158708 100644 --- a/DIRECTORY.md +++ b/DIRECTORY.md @@ -1368,6 +1368,9 @@ * [Word Patterns](strings/word_patterns.py) * [Z Function](strings/z_function.py) +## Sustainability + * [Ctrl Compliance Dashboard](sustainability/ctrl_compliance_dashboard.py) + ## Web Programming * [Co2 Emission](web_programming/co2_emission.py) * [Covid Stats Via Xpath](web_programming/covid_stats_via_xpath.py) diff --git a/requirements.txt b/requirements.txt index 66b5d8a6b94e..28539dbeb026 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,12 +2,14 @@ beautifulsoup4 fake-useragent httpx imageio +jinja2 keras lxml matplotlib numpy opencv-python pandas +pdfkit pillow rich scikit-learn diff --git a/scripts/daily_refresh.py b/scripts/daily_refresh.py new file mode 100644 index 000000000000..2ad1f8194453 --- /dev/null +++ b/scripts/daily_refresh.py @@ -0,0 +1,257 @@ +"""Utilities for refreshing multiple Git repositories at once. + +This module provides a small command line application that searches for Git +repositories under a given directory and executes ``git fetch --all --prune`` +for each of them. The goal is to make it easy to plug the script into a cron +job (or any other scheduler) so that a server can refresh all of its local +checkouts on a daily basis. + +Example usage that fetches every repository under ``/srv/git``:: + + $ python -m scripts.daily_refresh /srv/git + +For safety the script performs fast-forward pulls only when the +``--pull`` flag is supplied. The ``--dry-run`` option can be used to inspect +the commands that would be executed without touching the repositories. +""" + +from __future__ import annotations + +import argparse +import logging +import shlex +import subprocess +from collections.abc import Iterable, Sequence +from dataclasses import dataclass +from pathlib import Path + + +LOG_FORMAT = "%(asctime)s | %(levelname)-8s | %(message)s" + + +@dataclass(slots=True) +class RefreshResult: + """Container describing the outcome of refreshing a repository.""" + + repository: Path + command: Sequence[str] + returncode: int + stdout: str + stderr: str + + @property + def succeeded(self) -> bool: + """Return ``True`` when the git command was successful.""" + + return self.returncode == 0 + + +def _discover_repositories(root: Path, excluded: set[Path]) -> list[Path]: + """Return every Git repository rooted at ``root``. + + Parameters + ---------- + root: + Directory that should be scanned. The path is resolved to avoid + surprises with symbolic links. + excluded: + Absolute paths that should not be traversed while searching. A + directory is considered excluded when it is equal to one of the + supplied paths or when it is a child of one. + """ + + repositories: list[Path] = [] + root = root.resolve() + + def is_excluded(path: Path) -> bool: + return any(path == item or item in path.parents for item in excluded) + + def walk(directory: Path) -> None: + if is_excluded(directory): + logging.debug("Skipping excluded directory %s", directory) + return + + git_directory = directory / ".git" + if git_directory.is_dir(): + repositories.append(directory) + logging.debug("Found git repository in %s", directory) + return + + for child in directory.iterdir(): + if not child.is_dir() or child.is_symlink(): + continue + walk(child) + + walk(root) + return repositories + + +def _run_git_command( + repository: Path, command: Sequence[str], *, dry_run: bool +) -> RefreshResult: + full_command = ("git", "-C", str(repository), *command) + if dry_run: + logging.info("DRY-RUN %s", shlex.join(full_command)) + return RefreshResult(repository, full_command, 0, "", "") + + logging.info("Running %s", shlex.join(full_command)) + process = subprocess.run( # noqa: S603, S607 - `git` is a trusted command. + full_command, + capture_output=True, + text=True, + check=False, + ) + return RefreshResult( + repository, + full_command, + process.returncode, + process.stdout.strip(), + process.stderr.strip(), + ) + + +def refresh_repository(repository: Path, *, pull: bool, dry_run: bool) -> bool: + """Refresh ``repository`` by fetching and (optionally) pulling updates. + + Parameters + ---------- + repository: + Path to a directory containing a Git checkout. + pull: + When ``True`` the script performs a fast-forward pull after fetching. + dry_run: + When ``True`` the underlying git commands are not executed. + """ + + commands: Iterable[Sequence[str]] = [("fetch", "--all", "--prune")] + if pull: + commands = (*commands, ("pull", "--ff-only")) + + for command in commands: + result = _run_git_command(repository, command, dry_run=dry_run) + if not result.succeeded: + logging.error( + "Failed to refresh %s with %s (exit code %s)", + repository, + shlex.join(result.command), + result.returncode, + ) + if result.stdout: + logging.error("stdout: %s", result.stdout) + if result.stderr: + logging.error("stderr: %s", result.stderr) + return False + + if result.stdout: + logging.debug("%s", result.stdout) + if result.stderr: + logging.debug("%s", result.stderr) + + return True + + +def _parse_arguments() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Refresh every git repository found under a directory.", + ) + parser.add_argument( + "root", + type=Path, + nargs="?", + default=Path.cwd(), + help="Directory that should be scanned for repositories.", + ) + parser.add_argument( + "-e", + "--exclude", + action="append", + default=[], + metavar="PATH", + help=( + "Directory names (relative to ROOT) or absolute paths that should " + "be skipped while searching for repositories. The option can be " + "provided multiple times." + ), + ) + parser.add_argument( + "-p", + "--pull", + action="store_true", + help="Perform a fast-forward pull after fetching.", + ) + parser.add_argument( + "-n", + "--dry-run", + action="store_true", + help="Print the git commands without executing them.", + ) + parser.add_argument( + "-v", + "--verbose", + action="count", + default=0, + help="Increase logging verbosity (can be supplied multiple times).", + ) + parser.add_argument( + "-q", + "--quiet", + action="store_true", + help="Silence informational output.", + ) + return parser.parse_args() + + +def _prepare_excluded(root: Path, entries: list[str]) -> set[Path]: + excluded: set[Path] = set() + for entry in entries: + path = Path(entry) + if not path.is_absolute(): + path = (root / path).resolve() + else: + path = path.resolve() + excluded.add(path) + return excluded + + +def main() -> int: + args = _parse_arguments() + + log_level = logging.INFO + if args.quiet: + log_level = logging.WARNING + elif args.verbose >= 2: + log_level = logging.DEBUG + + logging.basicConfig(level=log_level, format=LOG_FORMAT) + + root = args.root.resolve() + if not root.is_dir(): + logging.error("%s is not a directory", root) + return 1 + + excluded = _prepare_excluded(root, args.exclude) + repositories = _discover_repositories(root, excluded) + + if not repositories: + logging.warning("No Git repositories found under %s", root) + return 0 + + logging.info( + "Refreshing %s repositories under %s", len(repositories), root + ) + + failures = 0 + for repository in repositories: + if not refresh_repository(repository, pull=args.pull, dry_run=args.dry_run): + failures += 1 + + if failures: + logging.error("Failed to refresh %s repositories", failures) + return 1 + + logging.info("Successfully refreshed all repositories") + return 0 + + +if __name__ == "__main__": # pragma: no cover - CLI entry point. + raise SystemExit(main()) diff --git a/sustainability/__init__.py b/sustainability/__init__.py new file mode 100644 index 000000000000..70713738e630 --- /dev/null +++ b/sustainability/__init__.py @@ -0,0 +1 @@ +"""Sustainability compliance tools for CTRL Environmental.""" diff --git a/sustainability/ctrl_compliance_dashboard.py b/sustainability/ctrl_compliance_dashboard.py new file mode 100644 index 000000000000..317739b80239 --- /dev/null +++ b/sustainability/ctrl_compliance_dashboard.py @@ -0,0 +1,178 @@ +"""Generate a sustainability compliance dashboard for CTRL Environmental. + +The script reads inspection data from CSV or Excel files, categorizes findings +into environmental topics, flags non-compliances with traffic light colours, +and exports the results to a styled HTML report. Optional PDF export is +attempted if a suitable backend is available. The layout follows a black, +red, and white palette reminiscent of Berlin poster aesthetics and includes +placeholders for the CTRL logo and report metadata. +""" + +from __future__ import annotations + +import argparse +from dataclasses import dataclass +from datetime import UTC, datetime +from pathlib import Path + +import pandas as pd +from jinja2 import Environment + +try: + import pdfkit # type: ignore[import-not-found] +except ImportError: + pdfkit = None # type: ignore[assignment] + +try: + from weasyprint import HTML # type: ignore[import-not-found] +except ImportError: + HTML = None # type: ignore[assignment] + +CATEGORY_KEYWORDS = { + "Waste": ["waste", "trash", "garbage", "recycle"], + "Water": ["water", "effluent", "sewage", "storm"], + "Air": ["air", "emission", "dust", "smoke"], + "Chemicals": ["chemical", "hazard", "solvent", "acid", "alkali"], + "ESG": ["esg", "governance", "social", "sustainability", "diversity"], +} + + +@dataclass +class Metadata: + """Metadata describing the inspection report.""" + + site: str + client: str + inspector: str + date: str + + +def read_inspection_data(path: str) -> pd.DataFrame: + """Read inspection data from a CSV or Excel file.""" + + ext = Path(path).suffix.lower() + if ext in {".xls", ".xlsx"}: + return pd.read_excel(path) + return pd.read_csv(path) + + +def categorize_finding(text: str) -> str: + """Return the category that best matches *text*.""" + + lowered = text.lower() + for category, keywords in CATEGORY_KEYWORDS.items(): + if any(word in lowered for word in keywords): + return category + return "Other" + + +def traffic_light(status: str) -> str: + """Return a CSS colour for a traffic light style status.""" + + lowered = status.lower() + if any(word in lowered for word in ["non", "major", "fail", "nc"]): + return "#d50000" # red + if any(word in lowered for word in ["minor", "obs", "warning"]): + return "#ffab00" # amber + return "#00c853" # green + + +def build_table(df: pd.DataFrame) -> str: + """Return an HTML table with traffic light styling.""" + + styled = ( + df.style.applymap( + lambda v: f"background-color:{traffic_light(v)}", subset=["Status"] + ) + .set_table_styles( + [ + { + "selector": "th, td", + "props": [("border", "1px solid black"), ("padding", "4px")], + } + ] + ) + .hide_index() + ) + return styled.to_html() + + +TEMPLATE = """ + + + + +CTRL Environmental Compliance Report + + + +
+ CTRL Logo +

Compliance Dashboard

+
+ Site: {{ meta.site }} | + Client: {{ meta.client }} | + Inspector: {{ meta.inspector }} | + Date: {{ meta.date }} +
+
+
+{{ table | safe }} +
+ + +""" + + +def render_report( + df: pd.DataFrame, meta: Metadata, logo: str, html_path: str, pdf_path: str | None +) -> None: + """Render *df* to HTML and optionally PDF.""" + + table_html = build_table(df) + env = Environment(autoescape=True) + template = env.from_string(TEMPLATE) + html_content = template.render(table=table_html, meta=meta, logo=logo) + Path(html_path).write_text(html_content, encoding="utf-8") + + if pdf_path: + if pdfkit is not None: + pdfkit.from_string(html_content, pdf_path) # type: ignore[no-untyped-call] + elif HTML is not None: + HTML(string=html_content).write_pdf(pdf_path) # type: ignore[no-untyped-call] + else: + print("PDF output requested but no PDF backend is installed.") + + +def main() -> None: + """Command line interface for the compliance dashboard.""" + + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("input", help="CSV or Excel file containing inspection data") + parser.add_argument("--html", default="report.html", help="Output HTML report path") + parser.add_argument("--pdf", help="Optional output PDF path") + parser.add_argument( + "--logo", default="logo_placeholder.png", help="Path to CTRL logo image" + ) + parser.add_argument("--site", default="Unknown Site") + parser.add_argument("--client", default="Unknown Client") + parser.add_argument("--inspector", default="Unknown Inspector") + parser.add_argument("--date", default=datetime.now(tz=UTC).date().isoformat()) + args = parser.parse_args() + + data = read_inspection_data(args.input) + if "Category" not in data.columns and "Finding" in data.columns: + data["Category"] = data["Finding"].map(categorize_finding) + + meta = Metadata(args.site, args.client, args.inspector, args.date) + render_report(data, meta, args.logo, args.html, args.pdf) + print(f"Report written to {args.html}") + + +if __name__ == "__main__": + main()