diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index bbc6492..5e13ef4 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -44,7 +44,7 @@ jobs: $GITHUB_OUTPUT - name: Restore Python virtual environment id: cache-venv - uses: actions/cache@v4.2.0 + uses: actions/cache@v5 with: path: venv key: >- @@ -64,7 +64,7 @@ jobs: hashFiles('.pre-commit-config.yaml') }}" >> $GITHUB_OUTPUT - name: Restore pre-commit environment id: cache-precommit - uses: actions/cache@v4.2.0 + uses: actions/cache@v5 with: path: ${{ env.PRE_COMMIT_CACHE }} key: >- @@ -92,7 +92,7 @@ jobs: check-latest: true - name: Restore Python virtual environment id: cache-venv - uses: actions/cache@v4.2.0 + uses: actions/cache@v5 with: path: venv fail-on-cache-miss: true @@ -100,7 +100,7 @@ jobs: needs.prepare-base.outputs.python-key }} - name: Restore pre-commit environment id: cache-precommit - uses: actions/cache@v4.2.0 + uses: actions/cache@v5 with: path: ${{ env.PRE_COMMIT_CACHE }} fail-on-cache-miss: true @@ -130,7 +130,7 @@ jobs: check-latest: true - name: Restore Python virtual environment id: cache-venv - uses: actions/cache@v4.2.0 + uses: actions/cache@v5 with: path: venv fail-on-cache-miss: true @@ -138,7 +138,7 @@ jobs: needs.prepare-base.outputs.python-key }} - name: Restore pre-commit environment id: cache-precommit - uses: actions/cache@v4.2.0 + uses: actions/cache@v5 with: path: ${{ env.PRE_COMMIT_CACHE }} fail-on-cache-miss: true @@ -165,7 +165,7 @@ jobs: check-latest: true - name: Restore Python virtual environment id: cache-venv - uses: actions/cache@v4.2.0 + uses: actions/cache@v5 with: path: venv fail-on-cache-miss: true @@ -173,7 +173,7 @@ jobs: needs.prepare-base.outputs.python-key }} - name: Restore pre-commit environment id: cache-precommit - uses: actions/cache@v4.2.0 + uses: actions/cache@v5 with: path: ${{ env.PRE_COMMIT_CACHE }} fail-on-cache-miss: true @@ -204,7 +204,7 @@ jobs: check-latest: true - name: Restore Python virtual environment id: cache-venv - uses: actions/cache@v4.2.0 + uses: actions/cache@v5 with: path: venv fail-on-cache-miss: true @@ -212,7 +212,7 @@ jobs: needs.prepare-base.outputs.python-key }} - name: Restore pre-commit environment id: cache-precommit - uses: actions/cache@v4.2.0 + uses: actions/cache@v5 with: path: ${{ env.PRE_COMMIT_CACHE }} fail-on-cache-miss: true @@ -243,7 +243,7 @@ jobs: check-latest: true - name: Restore Python virtual environment id: cache-venv - uses: actions/cache@v4.2.0 + uses: actions/cache@v5 with: path: venv fail-on-cache-miss: true @@ -251,7 +251,7 @@ jobs: needs.prepare-base.outputs.python-key }} - name: Restore pre-commit environment id: cache-precommit - uses: actions/cache@v4.2.0 + uses: actions/cache@v5 with: path: ${{ env.PRE_COMMIT_CACHE }} fail-on-cache-miss: true @@ -315,7 +315,7 @@ jobs: check-latest: true - name: Restore Python virtual environment id: cache-venv - uses: actions/cache@v4.2.0 + uses: actions/cache@v5 with: path: venv fail-on-cache-miss: true diff --git a/CHANGELOG.md b/CHANGELOG.md index cc4145c..c00bf33 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +# v26.18.1 + +- Fix memory leak in batches caused by now tidying up the remote handler objects after a task has completed. +- Fixed potential but very small memory leak in logger where loggers were not being removed from the logger dictionary after they were closed. +- Add `OTF_LOG_MEMORY_USAGE` environment variable to allow logging of RSS memory usage at each batch poll interval using the batch logger. This is useful for diagnosing memory growth in long-running batches. Added optional dev dependency `psutil` to allow this. Needs to be installed manually, otherwise log will just output `-1.0` for RSS memory usage. + # v26.18.0 - Disable logging of initialisation events by default, can be enabled by setting env var `OTG_LOG_INIT_EVENTS=1` diff --git a/README.md b/README.md index d6d91f0..3ecb352 100644 --- a/README.md +++ b/README.md @@ -177,6 +177,8 @@ These are some environment variables that can be used to customise the behaviour - `OTF_LOG_JSON` - Stderr logging will be in JSON format. Set to `1` to enable - `OTF_LOG_DIRECTORY` - Path under which log files are written - `OTF_NO_THREAD_SLEEP` - Disable the 1-second sleep between batch task thread creation. This sleep exists to prevent race conditions with concurrent protocol imports. Only disable this if you understand the implications. Set to `1` to disable. +- `OTF_BATCH_POLL_INTERVAL` - Interval in seconds between batch status-check loop iterations (default: `5`). Reducing this makes batches respond faster when tasks complete, at the cost of higher CPU polling overhead. **Intended for testing only — do not change in production unless you fully understand the trade-offs.** +- `OTF_LOG_MEMORY_USAGE` - Set to `1` to log the RSS memory usage of the OTF process at each batch poll interval using the batch logger. Useful for diagnosing memory growth in long-running batches. Requires `psutil` to be installed separately. - `OTF_RUN_ID` - (meant for internal use) An aggregator for log files. When set, all log files for a run will go under this sub directory. E.g. running a batch, all execution and transfer logs will be dropped into this sub directory, rather than a directory for each task name. This is equivalent to using `-r` or `--runId` command line arguments, which is generally preferred. - `OTF_SSH_KEY` - The private SSH key to use by default for all SSH connections. This is essential when using a basic docker container to trigger OTF. If not specified, it will default to use any private SSH keys available to the user executing the application. - `OTF_STAGING_DIR` - Staging base directory to place files before they're dropped into their final location. Default is `/tmp` diff --git a/docs/usage.md b/docs/usage.md index 9b94ece..14daaa9 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -114,19 +114,21 @@ When running a batch, setting `-r` (or `OTF_RUN_ID`) causes all sub-task logs to The full list of supported environment variables: -| Variable | Description | -| --------------------------- | ---------------------------------------------------------------------------- | -| `OTF_SSH_KEY` | Default private SSH key path for all SSH connections | -| `OTF_LOG_DIRECTORY` | Base directory for log files | -| `OTF_NO_LOG` | Set to `1` to disable file logging | -| `OTF_LOG_JSON` | Set to `1` for JSON-formatted stderr logging | -| `OTF_STAGING_DIR` | Override the staging directory for file transfers (default: `/tmp`) | -| `OTF_RUN_ID` | Log aggregation identifier; equivalent to `-r` flag | -| `OTF_BATCH_RESUME_LOG_DATE` | Resume batch from a specific date's logs (`YYYYMMDD` format) | -| `OTF_VARIABLES_FILE` | Override the default variables file path. Comma-separate multiple files | -| `OTF_LAZY_LOAD_VARIABLES` | Set to `1` to only resolve variables that are referenced by the current task | -| `OTF_NO_THREAD_SLEEP` | Set to `1` to disable the 1-second sleep between batch task thread creation | -| `OTF_PARAMIKO_ULTRA_DEBUG` | Set to `1` to enable ultra-verbose Paramiko SSH debug output (SFTP only) | +| Variable | Description | +| --------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------ | +| `OTF_SSH_KEY` | Default private SSH key path for all SSH connections | +| `OTF_LOG_DIRECTORY` | Base directory for log files | +| `OTF_NO_LOG` | Set to `1` to disable file logging | +| `OTF_LOG_JSON` | Set to `1` for JSON-formatted stderr logging | +| `OTF_STAGING_DIR` | Override the staging directory for file transfers (default: `/tmp`) | +| `OTF_RUN_ID` | Log aggregation identifier; equivalent to `-r` flag | +| `OTF_BATCH_RESUME_LOG_DATE` | Resume batch from a specific date's logs (`YYYYMMDD` format) | +| `OTF_VARIABLES_FILE` | Override the default variables file path. Comma-separate multiple files | +| `OTF_LAZY_LOAD_VARIABLES` | Set to `1` to only resolve variables that are referenced by the current task | +| `OTF_NO_THREAD_SLEEP` | Set to `1` to disable the 1-second sleep between batch task thread creation | +| `OTF_BATCH_POLL_INTERVAL` | Interval in seconds between batch status-check iterations (default: `5`). **Testing only** — do not lower in production without understanding the trade-offs | +| `OTF_LOG_MEMORY_USAGE` | Set to `1` to log RSS memory usage at each batch poll interval via the batch logger. Requires `psutil` to be installed | +| `OTF_PARAMIKO_ULTRA_DEBUG` | Set to `1` to enable ultra-verbose Paramiko SSH debug output (SFTP only) | ## Overriding Variables at Runtime diff --git a/pyproject.toml b/pyproject.toml index 0c6e5fd..9316df8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "opentaskpy" -version = "v26.18.0" +version = "v26.18.1" authors = [{ name = "Adam McDonagh", email = "adam@elitemonkey.net" }] license-files = [ "LICENSE" ] @@ -43,6 +43,7 @@ dev = [ "types-requests >=2.28", "types-paramiko >=3.0", "black == 26.3.1", + "psutil >= 5.9", "isort", "pytest", "bumpver", @@ -71,7 +72,7 @@ otf-batch-validator = "opentaskpy.cli.batch_validator:main" profile = 'black' [tool.bumpver] -current_version = "v26.18.0" +current_version = "v26.18.1" version_pattern = "vYY.WW.PATCH[-TAG]" commit_message = "bump version {old_version} -> {new_version}" commit = true diff --git a/src/opentaskpy/cli/task_run.py b/src/opentaskpy/cli/task_run.py index 8c7dec9..1af46f5 100644 --- a/src/opentaskpy/cli/task_run.py +++ b/src/opentaskpy/cli/task_run.py @@ -30,6 +30,8 @@ def main() -> None: OTF_LOG_LEVEL - Equivalent to using -v OTF_LOG_INIT_EVENTS - Enable logging of log initialisation events. Set to 1 to enable OTF_NO_THREAD_SLEEP - Disable the 1-second sleep between batch task thread creation. Set to 1 to disable + OTF_BATCH_POLL_INTERVAL - Interval in seconds between batch status-check iterations (default: 5). Intended for testing only; do not lower in production without understanding the trade-offs + OTF_LOG_MEMORY_USAGE - Set to 1 to log RSS memory usage at each batch poll interval via the batch logger. Requires psutil OTF_SSH_KEY - Specify a particular SSH key to use for SSH/SFTP related transfers OTF_STAGING_DIR - Staging base directory to place files before final location. Default is /tmp OTF_BATCH_RESUME_LOG_DATE - Resume batch runs from a specific date in YYYYMMDD format diff --git a/src/opentaskpy/otflogging.py b/src/opentaskpy/otflogging.py index 9f10095..d14bf60 100644 --- a/src/opentaskpy/otflogging.py +++ b/src/opentaskpy/otflogging.py @@ -403,6 +403,7 @@ def close_log_file(logger__: logging.Logger, result: bool = False) -> None: log_file_name = handler.baseFilename log_handlers = [] + loggers_to_remove = [] if log_file_name: new_log_filename = None @@ -413,7 +414,7 @@ def close_log_file(logger__: logging.Logger, result: bool = False) -> None: # Loop through every logger that exists and has a handler of this filename, and # call the close method on it. Only the last one should rename the file - for logger_ in list(logging.Logger.manager.loggerDict.values()): + for logger_name, logger_ in list(logging.Logger.manager.loggerDict.items()): if isinstance(logger_, logging.Logger): for handler in logger_.handlers: if ( @@ -422,6 +423,8 @@ def close_log_file(logger__: logging.Logger, result: bool = False) -> None: ): log_handlers.append(handler) handler.close() + loggers_to_remove.append(logger_name) + break # Now everything is closed, we can rename the log file # If result is True, then rename the file and remove _running from the name @@ -437,6 +440,16 @@ def close_log_file(logger__: logging.Logger, result: bool = False) -> None: for handler in log_handlers: handler.baseFilename = new_log_filename + # Unregister all task-scoped loggers that wrote to this log file so they + # can be garbage collected rather than accumulating in the process. + for logger_name in loggers_to_remove: + logging.Logger.manager.loggerDict.pop(logger_name, None) + + # Always unregister the logger passed in directly — this handles the case + # where OTF_NO_LOG is set and no file handler was attached, so the + # loggers_to_remove list above would be empty. + logging.Logger.manager.loggerDict.pop(logger__.name, None) + def redact(log_message: str) -> str: """Redact any sensitive information from the log message. diff --git a/src/opentaskpy/taskhandlers/batch.py b/src/opentaskpy/taskhandlers/batch.py index ecf65f3..4db473d 100644 --- a/src/opentaskpy/taskhandlers/batch.py +++ b/src/opentaskpy/taskhandlers/batch.py @@ -1,5 +1,6 @@ """Batch task handler class.""" +import os import threading import time from concurrent.futures import ThreadPoolExecutor, wait @@ -18,6 +19,17 @@ DEFAULT_TASK_EXIT_CODE = 0 TASK_TYPE = "B" BATCH_TASK_LOG_MARKER = "__OTF_BATCH_TASK_MARKER__" +DEFAULT_BATCH_POLL_INTERVAL = 5 + + +def _rss_mb() -> float: + """Return the RSS of the current process in MB, or -1 if psutil is unavailable.""" + try: + import psutil # pylint: disable=import-outside-toplevel + + return float(psutil.Process(os.getpid()).memory_info().rss / (1024 * 1024)) + except Exception: # pylint: disable=broad-except + return -1.0 class Batch(TaskHandler): @@ -403,8 +415,15 @@ def run(self, kill_event: threading.Event | None = None) -> bool: # noqa: C901 ) break - # Sleep 5 seconds before checking again - time.sleep(5) + # Sleep before checking again + poll_interval = float( + environ.get("OTF_BATCH_POLL_INTERVAL", DEFAULT_BATCH_POLL_INTERVAL) + ) + time.sleep(poll_interval) + + # Log memory usage if requested + if environ.get("OTF_LOG_MEMORY_USAGE"): + self.logger.info(f"[memory] RSS: {_rss_mb():.1f} MB") # Check if we have been asked to kill the batch if kill_event and kill_event.is_set(): @@ -483,6 +502,10 @@ def task_runner(self, batch_task: dict, event: threading.Event) -> None: batch_task["batch_task_spec"]["order_id"], batch_task["task_id"], ) + # Release the task handler so that remote handler objects + # (boto3 clients, SSH connections, etc.) can be garbage + # collected rather than accumulating for the whole batch run. + batch_task["task_handler"] = None break # Check if we have been asked to kill the thread diff --git a/tests/test_memory_large_file_transfer.py b/tests/test_memory_large_file_transfer.py new file mode 100644 index 0000000..33b89d7 --- /dev/null +++ b/tests/test_memory_large_file_transfer.py @@ -0,0 +1,385 @@ +# pylint: skip-file +# ruff: noqa +"""Memory utilization test for OTF batch SFTP transfers. + +Creates a batch of 10 sequential SFTP transfers, each moving a unique 1 MB file +from sftp_1 (172.16.0.21) to sftp_2 (172.16.0.22). RSS memory is sampled every +MEMORY_SAMPLE_INTERVAL seconds in a background thread and written to: + + * the console (printed via pytest -s) + * /tmp/otf_mem_.log (CSV: timestamp, elapsed_s, rss_mb) + +Memory growth is driven by Python handler object accumulation (SSH connections, +loggers, etc.), not by file size, so small files are sufficient to catch +regressions while keeping the test fast enough for regular CI runs. + +Run this test in isolation with output visible: + pytest tests/test_memory_large_file_transfer.py -v -s +""" + +import ctypes +import datetime +import gc +import json +import os +import re +import threading +import time +import uuid + +import psutil +import pytest + +from opentaskpy.config.loader import ConfigLoader +from opentaskpy.taskhandlers import batch +from tests.fixtures.ssh_clients import * # noqa: F403 + +os.environ["OTF_LOG_LEVEL"] = "INFO" +os.environ["OTF_BATCH_POLL_INTERVAL"] = ( + "0.1" # don't wait 5s between batch status checks +) +os.environ["OTF_NO_THREAD_SLEEP"] = "1" # don't wait 1s between task thread creation + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +NUM_TASKS = 10 +FILE_SIZE_BYTES = ( + 1 * 1024 * 1024 +) # 1 MB — small enough for fast CI runs; memory growth is handler-driven, not file-size-driven +MEMORY_SAMPLE_INTERVAL = 2 # seconds between RSS samples +TRANSFER_TIMEOUT = 120 # seconds per task + +# Memory regression threshold. Current baseline is ~5 MB/task after fixes; +# 15 MB/task gives generous headroom while catching any return of the original +# ~24 MB/task leak caused by un-nulled task_handler references in batch.py. +MAX_UNRECOVERABLE_GROWTH_PER_TASK_MB = 15 + +# SFTP container IP addresses (match docker-compose network config) +SFTP_1_HOST = "172.16.0.21" +SFTP_2_HOST = "172.16.0.22" +SFTP_SRC_DIR = "/home/application/testFiles/src" +SFTP_DST_DIR = "/home/application/testFiles/dest" +SFTP_USER = "application" + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _create_sparse_file(path: str, size: int) -> None: + """Create a sparse file at *path* of *size* bytes. + + On Linux the OS allocates no real disk blocks for the gap; reading back + returns zeroes. SFTP will stream exactly *size* bytes of zeroes to the + destination, giving a realistic large-file memory-pressure test without + consuming real disk space on the source side. + """ + os.makedirs(os.path.dirname(path), exist_ok=True) + with open(path, "wb") as fh: + fh.seek(size - 1) + fh.write(b"\x00") + + +class MemoryMonitor: + """Samples RSS memory of the current process in a background thread. + + Usage:: + + monitor = MemoryMonitor("/tmp/mem.log") + initial = monitor.current_rss_mb() + monitor.start() + # ... run workload ... + monitor.stop() + summary = monitor.summary() + """ + + def __init__(self, log_file: str, sample_interval: float = 2.0) -> None: + self._log_file = log_file + self._sample_interval = sample_interval + self._stop_event = threading.Event() + self._samples: list[tuple[float, float]] = [] # (elapsed_s, rss_mb) + self._thread = threading.Thread(target=self._run, daemon=True) + self._process = psutil.Process(os.getpid()) + self._start_time: float = 0.0 + + def start(self) -> None: + self._start_time = time.monotonic() + self._thread.start() + + def stop(self) -> None: + self._stop_event.set() + self._thread.join() + + def current_rss_mb(self) -> float: + return self._process.memory_info().rss / (1024 * 1024) + + def _run(self) -> None: + with open(self._log_file, "w", encoding="utf-8") as fh: + fh.write("timestamp,elapsed_s,rss_mb\n") + while not self._stop_event.is_set(): + elapsed = time.monotonic() - self._start_time + rss_mb = self.current_rss_mb() + ts = datetime.datetime.now().isoformat(timespec="seconds") + line = f"{ts},{elapsed:.1f},{rss_mb:.1f}\n" + fh.write(line) + fh.flush() + self._samples.append((elapsed, rss_mb)) + self._stop_event.wait(self._sample_interval) + + def summary(self) -> dict: + if not self._samples: + return {} + rss_values = [s[1] for s in self._samples] + return { + "min_mb": min(rss_values), + "max_mb": max(rss_values), + "final_mb": rss_values[-1], + "samples": len(rss_values), + } + + +# --------------------------------------------------------------------------- +# Task / batch config builders +# --------------------------------------------------------------------------- + + +def _transfer_task_definition(file_name: str) -> dict: + """Return a validated-compatible SFTP transfer task definition dict.""" + return { + "type": "transfer", + "source": { + "hostname": SFTP_1_HOST, + "directory": SFTP_SRC_DIR, + "fileRegex": re.escape(file_name), + "protocol": { + "name": "sftp", + "credentials": {"username": SFTP_USER}, + "timeout": TRANSFER_TIMEOUT, + }, + }, + "destination": [ + { + "hostname": SFTP_2_HOST, + "directory": SFTP_DST_DIR, + "protocol": { + "name": "sftp", + "credentials": {"username": SFTP_USER}, + "timeout": TRANSFER_TIMEOUT, + }, + } + ], + } + + +def _batch_definition(task_ids: list) -> dict: + """Return a sequential batch definition where each task depends on the previous.""" + tasks = [] + for i, task_id in enumerate(task_ids, start=1): + entry: dict = { + "order_id": i, + "task_id": task_id, + "timeout": TRANSFER_TIMEOUT, + } + if i > 1: + entry["dependencies"] = [i - 1] + tasks.append(entry) + return {"type": "batch", "tasks": tasks} + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture(scope="function") +def memory_test_config(tmp_path): + """Write unique task + batch JSON config files to a temporary directory. + + Returns a 4-tuple: + (config_dir: str, batch_id: str, task_ids: list[str], file_names: list[str]) + """ + run_id = uuid.uuid4().hex[:8] + task_ids = [f"mem-sftp-{run_id}-{i}" for i in range(1, NUM_TASKS + 1)] + file_names = [f"mem_test_{run_id}_{i}.dat" for i in range(1, NUM_TASKS + 1)] + batch_id = f"mem-sftp-batch-{run_id}" + + # ConfigLoader requires a variables.json to be present + (tmp_path / "variables.json").write_text("{}", encoding="utf-8") + + # Write one transfer JSON file per task + for task_id, file_name in zip(task_ids, file_names): + task_def = _transfer_task_definition(file_name) + (tmp_path / f"{task_id}.json").write_text( + json.dumps(task_def, indent=2), encoding="utf-8" + ) + + # Write the batch JSON file + batch_def = _batch_definition(task_ids) + (tmp_path / f"{batch_id}.json").write_text( + json.dumps(batch_def, indent=2), encoding="utf-8" + ) + + return str(tmp_path), batch_id, task_ids, file_names + + +@pytest.fixture(scope="function") +def large_source_files(root_dir, memory_test_config): + """Create 2 GB sparse source files on the sftp_1 volume, clean up in teardown. + + Both the source files on sftp_1 and the destination files on sftp_2 are + removed after the test, ensuring the disk is not filled. + """ + _, _, _, file_names = memory_test_config + src_dir = os.path.join(root_dir, "testFiles", "sftp_1", "src") + dst_dir = os.path.join(root_dir, "testFiles", "sftp_2", "dest") + + src_paths = [] + for file_name in file_names: + path = os.path.join(src_dir, file_name) + _create_sparse_file(path, FILE_SIZE_BYTES) + src_paths.append(path) + print( + f"\n [setup] Created sparse source: {path} " + f"({FILE_SIZE_BYTES / (1024 ** 2):.0f} MB apparent)" + ) + + yield src_paths + + # --- teardown: remove source files --- + for path in src_paths: + if os.path.exists(path): + os.remove(path) + print(f" [teardown] Removed source : {path}") + + # --- teardown: remove destination files --- + for file_name in file_names: + dst_path = os.path.join(dst_dir, file_name) + if os.path.exists(dst_path): + os.remove(dst_path) + print(f" [teardown] Removed destination: {dst_path}") + + +# --------------------------------------------------------------------------- +# Test +# --------------------------------------------------------------------------- + + +def test_memory_usage_large_file_batch_sftp_transfer( + root_dir, + setup_sftp_keys, + memory_test_config, + large_source_files, +): + """Run 10 sequential 1 MB SFTP transfers as a batch and monitor RSS memory. + + The test: + 1. Writes 10 unique task JSON configs + 1 batch JSON config to a tmp dir. + 2. Creates a 1 MB sparse file on sftp_1 for each task. + 3. Starts a background thread sampling RSS every MEMORY_SAMPLE_INTERVAL s. + 4. Runs the batch via ConfigLoader + batch.Batch.run(). + 5. Prints a memory summary to the console and writes a CSV log to /tmp. + 6. Asserts all transfers succeeded and all destination files exist. + 7. Asserts unrecoverable RSS growth per task is below the regression threshold. + 8. Cleans up source and destination files in fixture teardown. + """ + config_dir, batch_id, task_ids, file_names = memory_test_config + log_file = f"/tmp/otf_mem_{batch_id}.log" + + print(f"\n[memory-test] Memory log : {log_file}") + print(f"[memory-test] Config dir : {config_dir}") + print(f"[memory-test] Batch ID : {batch_id}") + print( + f"[memory-test] {NUM_TASKS} tasks × {FILE_SIZE_BYTES / (1024 ** 2):.0f} MB each" + ) + + config_loader = ConfigLoader(config_dir) + batch_definition = config_loader.load_task_definition(batch_id) + + # --- profiling: baseline before batch --- + gc.collect() + + monitor = MemoryMonitor(log_file=log_file, sample_interval=MEMORY_SAMPLE_INTERVAL) + initial_rss = monitor.current_rss_mb() + print(f"[memory-test] Initial RSS : {initial_rss:.1f} MB") + + monitor.start() + try: + batch_obj = batch.Batch(None, batch_id, batch_definition, config_loader) + result = batch_obj.run() + finally: + monitor.stop() + + final_rss = monitor.current_rss_mb() + summary = monitor.summary() + growth = final_rss - initial_rss + + print() + print("[memory-test] ========== Memory Usage Summary ==========") + print(f"[memory-test] Initial RSS : {initial_rss:.1f} MB") + print(f"[memory-test] Final RSS : {final_rss:.1f} MB") + print(f"[memory-test] Peak RSS : {summary.get('max_mb', 0):.1f} MB") + print(f"[memory-test] Min RSS : {summary.get('min_mb', 0):.1f} MB") + print(f"[memory-test] Growth : {growth:+.1f} MB") + print(f"[memory-test] Samples : {summary.get('samples', 0)}") + print(f"[memory-test] Log file : {log_file}") + print("[memory-test] =============================================") + + # --- gc analysis --- + rss_before_gc = monitor.current_rss_mb() + collected = gc.collect() + rss_after_gc = monitor.current_rss_mb() + print() + print(f"[gc] Objects collected by gc.collect() : {collected}") + print(f"[gc] RSS before gc.collect() : {rss_before_gc:.1f} MB") + print(f"[gc] RSS after gc.collect() : {rss_after_gc:.1f} MB") + print( + f"[gc] RSS freed by gc : {rss_before_gc - rss_after_gc:+.1f} MB" + ) + + # --- malloc_trim: ask glibc to return freed arenas to the OS --- + try: + libc = ctypes.CDLL("libc.so.6") + libc.malloc_trim(0) + rss_after_trim = monitor.current_rss_mb() + print(f"[gc] RSS after malloc_trim(0) : {rss_after_trim:.1f} MB") + print( + f"[gc] RSS freed by malloc_trim : {rss_after_gc - rss_after_trim:+.1f} MB" + ) + print( + f"[gc] Unrecoverable RSS growth : {rss_after_trim - initial_rss:+.1f} MB" + ) + except Exception as e: + print(f"[gc] malloc_trim not available: {e}") + + # --- assert all transfers completed successfully --- + assert result, "Batch of large-file SFTP transfers reported failure" + + dst_dir = os.path.join(root_dir, "testFiles", "sftp_2", "dest") + for file_name in file_names: + dst_path = os.path.join(dst_dir, file_name) + assert os.path.exists( + dst_path + ), f"Expected destination file not found after transfer: {dst_path}" + + # --- memory regression assertion --- + # Use the post-malloc_trim RSS as the "true" retained memory; fall back to + # post-gc RSS when malloc_trim is unavailable (non-Linux environments). + try: + unrecoverable_mb = rss_after_trim - initial_rss + except NameError: + unrecoverable_mb = rss_after_gc - initial_rss + + growth_per_task_mb = unrecoverable_mb / NUM_TASKS + print() + print(f"[assert] Unrecoverable growth per task : {growth_per_task_mb:.1f} MB") + print( + f"[assert] Threshold : {MAX_UNRECOVERABLE_GROWTH_PER_TASK_MB} MB/task" + ) + assert growth_per_task_mb < MAX_UNRECOVERABLE_GROWTH_PER_TASK_MB, ( + f"Memory regression detected: {growth_per_task_mb:.1f} MB/task retained after " + f"gc.collect() + malloc_trim (threshold: {MAX_UNRECOVERABLE_GROWTH_PER_TASK_MB} MB/task). " + f"Total unrecoverable growth: {unrecoverable_mb:.1f} MB over {NUM_TASKS} tasks." + ) diff --git a/tests/test_taskhandler_batch.py b/tests/test_taskhandler_batch.py index 18dbcc1..c72eb04 100644 --- a/tests/test_taskhandler_batch.py +++ b/tests/test_taskhandler_batch.py @@ -676,3 +676,58 @@ def test_protocol_cache_reuse(env_vars, root_dir, clear_logs): # The second batch should use the cached protocol class # This can be verified through logs, but we're mainly ensuring no errors occur + + +def test_batch_log_memory_usage( + env_vars, root_dir, clear_logs, no_thread_sleep, caplog +): + """Test that OTF_LOG_MEMORY_USAGE causes RSS memory to be logged each poll cycle.""" + import logging + + old_value = os.environ.get("OTF_LOG_MEMORY_USAGE") + os.environ["OTF_LOG_MEMORY_USAGE"] = "1" + # Use a fast poll interval so the test doesn't wait on the default 5s + old_poll = os.environ.get("OTF_BATCH_POLL_INTERVAL") + os.environ["OTF_BATCH_POLL_INTERVAL"] = "0.1" + try: + mem_batch = { + "type": "batch", + "tasks": [{"order_id": i, "task_id": "df-local"} for i in range(1, 4)], + } + config_loader = ConfigLoader("test/cfg") + batch_obj = batch.Batch(None, f"mem-log-{RANDOM}", mem_batch, config_loader) + + with caplog.at_level(logging.INFO): + result = batch_obj.run() + + assert result, "Batch should complete successfully" + + memory_log_lines = [ + r.message for r in caplog.records if "[memory] RSS:" in r.message + ] + assert len(memory_log_lines) > 0, ( + "Expected at least one '[memory] RSS:' log entry when OTF_LOG_MEMORY_USAGE=1, " + f"but none were found. All log messages: {[r.message for r in caplog.records]}" + ) + # Each entry should contain a positive numeric MB value — a value of -1.0 + # means psutil was not importable, which is a missing-dependency error. + import re + + for line in memory_log_lines: + assert re.search( + r"\[memory\] RSS: \d+\.\d+ MB", line + ), f"Memory log line has unexpected format: {line!r}" + rss_value = float(re.search(r"\[memory\] RSS: ([\d.]+) MB", line).group(1)) + assert rss_value > 0, ( + f"RSS value is {rss_value} MB — a value of -1.0 means psutil is not " + "installed. Add 'psutil' to the project dependencies." + ) + finally: + if old_value is None: + os.environ.pop("OTF_LOG_MEMORY_USAGE", None) + else: + os.environ["OTF_LOG_MEMORY_USAGE"] = old_value + if old_poll is None: + os.environ.pop("OTF_BATCH_POLL_INTERVAL", None) + else: + os.environ["OTF_BATCH_POLL_INTERVAL"] = old_poll