Skip to content

Commit a7ec5a5

Browse files
committed
Add memory usage logging env var
1 parent f7d582c commit a7ec5a5

6 files changed

Lines changed: 79 additions & 0 deletions

File tree

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# Changelog
22

3+
# v26.18.1
4+
5+
- Fix memory leak in batches caused by now tidying up the remote handler objects after a task has completed.
6+
- Fixed potential but very small memory leak in logger where loggers were not being removed from the logger dictionary after they were closed.
7+
- Add `OTF_LOG_MEMORY_USAGE` environment variable to allow logging of RSS memory usage at each batch poll interval using the batch logger. This is useful for diagnosing memory growth in long-running batches. Added optional dev dependency `psutil` to allow this. Needs to be installed manually, otherwise log will just output `-1.0` for RSS memory usage.
8+
39
# v26.18.0
410

511
- Disable logging of initialisation events by default, can be enabled by setting env var `OTG_LOG_INIT_EVENTS=1`

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ These are some environment variables that can be used to customise the behaviour
178178
- `OTF_LOG_DIRECTORY` - Path under which log files are written
179179
- `OTF_NO_THREAD_SLEEP` - Disable the 1-second sleep between batch task thread creation. This sleep exists to prevent race conditions with concurrent protocol imports. Only disable this if you understand the implications. Set to `1` to disable.
180180
- `OTF_BATCH_POLL_INTERVAL` - Interval in seconds between batch status-check loop iterations (default: `5`). Reducing this makes batches respond faster when tasks complete, at the cost of higher CPU polling overhead. **Intended for testing only — do not change in production unless you fully understand the trade-offs.**
181+
- `OTF_LOG_MEMORY_USAGE` - Set to `1` to log the RSS memory usage of the OTF process at each batch poll interval using the batch logger. Useful for diagnosing memory growth in long-running batches. Requires `psutil` to be installed separately.
181182
- `OTF_RUN_ID` - (meant for internal use) An aggregator for log files. When set, all log files for a run will go under this sub directory. E.g. running a batch, all execution and transfer logs will be dropped into this sub directory, rather than a directory for each task name. This is equivalent to using `-r` or `--runId` command line arguments, which is generally preferred.
182183
- `OTF_SSH_KEY` - The private SSH key to use by default for all SSH connections. This is essential when using a basic docker container to trigger OTF. If not specified, it will default to use any private SSH keys available to the user executing the application.
183184
- `OTF_STAGING_DIR` - Staging base directory to place files before they're dropped into their final location. Default is `/tmp`

docs/usage.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ The full list of supported environment variables:
127127
| `OTF_LAZY_LOAD_VARIABLES` | Set to `1` to only resolve variables that are referenced by the current task |
128128
| `OTF_NO_THREAD_SLEEP` | Set to `1` to disable the 1-second sleep between batch task thread creation |
129129
| `OTF_BATCH_POLL_INTERVAL` | Interval in seconds between batch status-check iterations (default: `5`). **Testing only** — do not lower in production without understanding the trade-offs |
130+
| `OTF_LOG_MEMORY_USAGE` | Set to `1` to log RSS memory usage at each batch poll interval via the batch logger. Requires `psutil` to be installed |
130131
| `OTF_PARAMIKO_ULTRA_DEBUG` | Set to `1` to enable ultra-verbose Paramiko SSH debug output (SFTP only) |
131132

132133
## Overriding Variables at Runtime

src/opentaskpy/cli/task_run.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ def main() -> None:
3131
OTF_LOG_INIT_EVENTS - Enable logging of log initialisation events. Set to 1 to enable
3232
OTF_NO_THREAD_SLEEP - Disable the 1-second sleep between batch task thread creation. Set to 1 to disable
3333
OTF_BATCH_POLL_INTERVAL - Interval in seconds between batch status-check iterations (default: 5). Intended for testing only; do not lower in production without understanding the trade-offs
34+
OTF_LOG_MEMORY_USAGE - Set to 1 to log RSS memory usage at each batch poll interval via the batch logger. Requires psutil
3435
OTF_SSH_KEY - Specify a particular SSH key to use for SSH/SFTP related transfers
3536
OTF_STAGING_DIR - Staging base directory to place files before final location. Default is /tmp
3637
OTF_BATCH_RESUME_LOG_DATE - Resume batch runs from a specific date in YYYYMMDD format

src/opentaskpy/taskhandlers/batch.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Batch task handler class."""
22

3+
import os
34
import threading
45
import time
56
from concurrent.futures import ThreadPoolExecutor, wait
@@ -21,6 +22,16 @@
2122
DEFAULT_BATCH_POLL_INTERVAL = 5
2223

2324

25+
def _rss_mb() -> float:
26+
"""Return the RSS of the current process in MB, or -1 if psutil is unavailable."""
27+
try:
28+
import psutil # pylint: disable=import-outside-toplevel
29+
30+
return float(psutil.Process(os.getpid()).memory_info().rss / (1024 * 1024))
31+
except Exception: # pylint: disable=broad-except
32+
return -1.0
33+
34+
2435
class Batch(TaskHandler):
2536
"""Batch task handler class."""
2637

@@ -410,6 +421,10 @@ def run(self, kill_event: threading.Event | None = None) -> bool: # noqa: C901
410421
)
411422
time.sleep(poll_interval)
412423

424+
# Log memory usage if requested
425+
if environ.get("OTF_LOG_MEMORY_USAGE"):
426+
self.logger.info(f"[memory] RSS: {_rss_mb():.1f} MB")
427+
413428
# Check if we have been asked to kill the batch
414429
if kill_event and kill_event.is_set():
415430
self.logger.info("Kill event received, stopping threads")

tests/test_taskhandler_batch.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -676,3 +676,58 @@ def test_protocol_cache_reuse(env_vars, root_dir, clear_logs):
676676

677677
# The second batch should use the cached protocol class
678678
# This can be verified through logs, but we're mainly ensuring no errors occur
679+
680+
681+
def test_batch_log_memory_usage(
682+
env_vars, root_dir, clear_logs, no_thread_sleep, caplog
683+
):
684+
"""Test that OTF_LOG_MEMORY_USAGE causes RSS memory to be logged each poll cycle."""
685+
import logging
686+
687+
old_value = os.environ.get("OTF_LOG_MEMORY_USAGE")
688+
os.environ["OTF_LOG_MEMORY_USAGE"] = "1"
689+
# Use a fast poll interval so the test doesn't wait on the default 5s
690+
old_poll = os.environ.get("OTF_BATCH_POLL_INTERVAL")
691+
os.environ["OTF_BATCH_POLL_INTERVAL"] = "0.1"
692+
try:
693+
mem_batch = {
694+
"type": "batch",
695+
"tasks": [{"order_id": i, "task_id": "df-local"} for i in range(1, 4)],
696+
}
697+
config_loader = ConfigLoader("test/cfg")
698+
batch_obj = batch.Batch(None, f"mem-log-{RANDOM}", mem_batch, config_loader)
699+
700+
with caplog.at_level(logging.INFO):
701+
result = batch_obj.run()
702+
703+
assert result, "Batch should complete successfully"
704+
705+
memory_log_lines = [
706+
r.message for r in caplog.records if "[memory] RSS:" in r.message
707+
]
708+
assert len(memory_log_lines) > 0, (
709+
"Expected at least one '[memory] RSS:' log entry when OTF_LOG_MEMORY_USAGE=1, "
710+
f"but none were found. All log messages: {[r.message for r in caplog.records]}"
711+
)
712+
# Each entry should contain a positive numeric MB value — a value of -1.0
713+
# means psutil was not importable, which is a missing-dependency error.
714+
import re
715+
716+
for line in memory_log_lines:
717+
assert re.search(
718+
r"\[memory\] RSS: \d+\.\d+ MB", line
719+
), f"Memory log line has unexpected format: {line!r}"
720+
rss_value = float(re.search(r"\[memory\] RSS: ([\d.]+) MB", line).group(1))
721+
assert rss_value > 0, (
722+
f"RSS value is {rss_value} MB — a value of -1.0 means psutil is not "
723+
"installed. Add 'psutil' to the project dependencies."
724+
)
725+
finally:
726+
if old_value is None:
727+
os.environ.pop("OTF_LOG_MEMORY_USAGE", None)
728+
else:
729+
os.environ["OTF_LOG_MEMORY_USAGE"] = old_value
730+
if old_poll is None:
731+
os.environ.pop("OTF_BATCH_POLL_INTERVAL", None)
732+
else:
733+
os.environ["OTF_BATCH_POLL_INTERVAL"] = old_poll

0 commit comments

Comments
 (0)