Skip to content

Commit 65c4824

Browse files
authored
Fix/memory leak fixes (#158)
* Clear out task handlers for completed batches * Fix formatting * Add memory usage logging env var * Bump cache version * bump version v26.18.0 -> v26.18.1
1 parent 08794e8 commit 65c4824

10 files changed

Lines changed: 520 additions & 31 deletions

File tree

.github/workflows/test.yml

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ jobs:
4444
$GITHUB_OUTPUT
4545
- name: Restore Python virtual environment
4646
id: cache-venv
47-
uses: actions/cache@v4.2.0
47+
uses: actions/cache@v5
4848
with:
4949
path: venv
5050
key: >-
@@ -64,7 +64,7 @@ jobs:
6464
hashFiles('.pre-commit-config.yaml') }}" >> $GITHUB_OUTPUT
6565
- name: Restore pre-commit environment
6666
id: cache-precommit
67-
uses: actions/cache@v4.2.0
67+
uses: actions/cache@v5
6868
with:
6969
path: ${{ env.PRE_COMMIT_CACHE }}
7070
key: >-
@@ -92,15 +92,15 @@ jobs:
9292
check-latest: true
9393
- name: Restore Python virtual environment
9494
id: cache-venv
95-
uses: actions/cache@v4.2.0
95+
uses: actions/cache@v5
9696
with:
9797
path: venv
9898
fail-on-cache-miss: true
9999
key: ${{ runner.os }}-${{ steps.python.outputs.python-version }}-${{
100100
needs.prepare-base.outputs.python-key }}
101101
- name: Restore pre-commit environment
102102
id: cache-precommit
103-
uses: actions/cache@v4.2.0
103+
uses: actions/cache@v5
104104
with:
105105
path: ${{ env.PRE_COMMIT_CACHE }}
106106
fail-on-cache-miss: true
@@ -130,15 +130,15 @@ jobs:
130130
check-latest: true
131131
- name: Restore Python virtual environment
132132
id: cache-venv
133-
uses: actions/cache@v4.2.0
133+
uses: actions/cache@v5
134134
with:
135135
path: venv
136136
fail-on-cache-miss: true
137137
key: ${{ runner.os }}-${{ steps.python.outputs.python-version }}-${{
138138
needs.prepare-base.outputs.python-key }}
139139
- name: Restore pre-commit environment
140140
id: cache-precommit
141-
uses: actions/cache@v4.2.0
141+
uses: actions/cache@v5
142142
with:
143143
path: ${{ env.PRE_COMMIT_CACHE }}
144144
fail-on-cache-miss: true
@@ -165,15 +165,15 @@ jobs:
165165
check-latest: true
166166
- name: Restore Python virtual environment
167167
id: cache-venv
168-
uses: actions/cache@v4.2.0
168+
uses: actions/cache@v5
169169
with:
170170
path: venv
171171
fail-on-cache-miss: true
172172
key: ${{ runner.os }}-${{ steps.python.outputs.python-version }}-${{
173173
needs.prepare-base.outputs.python-key }}
174174
- name: Restore pre-commit environment
175175
id: cache-precommit
176-
uses: actions/cache@v4.2.0
176+
uses: actions/cache@v5
177177
with:
178178
path: ${{ env.PRE_COMMIT_CACHE }}
179179
fail-on-cache-miss: true
@@ -204,15 +204,15 @@ jobs:
204204
check-latest: true
205205
- name: Restore Python virtual environment
206206
id: cache-venv
207-
uses: actions/cache@v4.2.0
207+
uses: actions/cache@v5
208208
with:
209209
path: venv
210210
fail-on-cache-miss: true
211211
key: ${{ runner.os }}-${{ steps.python.outputs.python-version }}-${{
212212
needs.prepare-base.outputs.python-key }}
213213
- name: Restore pre-commit environment
214214
id: cache-precommit
215-
uses: actions/cache@v4.2.0
215+
uses: actions/cache@v5
216216
with:
217217
path: ${{ env.PRE_COMMIT_CACHE }}
218218
fail-on-cache-miss: true
@@ -243,15 +243,15 @@ jobs:
243243
check-latest: true
244244
- name: Restore Python virtual environment
245245
id: cache-venv
246-
uses: actions/cache@v4.2.0
246+
uses: actions/cache@v5
247247
with:
248248
path: venv
249249
fail-on-cache-miss: true
250250
key: ${{ runner.os }}-${{ steps.python.outputs.python-version }}-${{
251251
needs.prepare-base.outputs.python-key }}
252252
- name: Restore pre-commit environment
253253
id: cache-precommit
254-
uses: actions/cache@v4.2.0
254+
uses: actions/cache@v5
255255
with:
256256
path: ${{ env.PRE_COMMIT_CACHE }}
257257
fail-on-cache-miss: true
@@ -315,7 +315,7 @@ jobs:
315315
check-latest: true
316316
- name: Restore Python virtual environment
317317
id: cache-venv
318-
uses: actions/cache@v4.2.0
318+
uses: actions/cache@v5
319319
with:
320320
path: venv
321321
fail-on-cache-miss: true

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# Changelog
22

3+
# v26.18.1
4+
5+
- Fix memory leak in batches caused by now tidying up the remote handler objects after a task has completed.
6+
- Fixed potential but very small memory leak in logger where loggers were not being removed from the logger dictionary after they were closed.
7+
- Add `OTF_LOG_MEMORY_USAGE` environment variable to allow logging of RSS memory usage at each batch poll interval using the batch logger. This is useful for diagnosing memory growth in long-running batches. Added optional dev dependency `psutil` to allow this. Needs to be installed manually, otherwise log will just output `-1.0` for RSS memory usage.
8+
39
# v26.18.0
410

511
- Disable logging of initialisation events by default, can be enabled by setting env var `OTG_LOG_INIT_EVENTS=1`

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,8 @@ These are some environment variables that can be used to customise the behaviour
177177
- `OTF_LOG_JSON` - Stderr logging will be in JSON format. Set to `1` to enable
178178
- `OTF_LOG_DIRECTORY` - Path under which log files are written
179179
- `OTF_NO_THREAD_SLEEP` - Disable the 1-second sleep between batch task thread creation. This sleep exists to prevent race conditions with concurrent protocol imports. Only disable this if you understand the implications. Set to `1` to disable.
180+
- `OTF_BATCH_POLL_INTERVAL` - Interval in seconds between batch status-check loop iterations (default: `5`). Reducing this makes batches respond faster when tasks complete, at the cost of higher CPU polling overhead. **Intended for testing only — do not change in production unless you fully understand the trade-offs.**
181+
- `OTF_LOG_MEMORY_USAGE` - Set to `1` to log the RSS memory usage of the OTF process at each batch poll interval using the batch logger. Useful for diagnosing memory growth in long-running batches. Requires `psutil` to be installed separately.
180182
- `OTF_RUN_ID` - (meant for internal use) An aggregator for log files. When set, all log files for a run will go under this sub directory. E.g. running a batch, all execution and transfer logs will be dropped into this sub directory, rather than a directory for each task name. This is equivalent to using `-r` or `--runId` command line arguments, which is generally preferred.
181183
- `OTF_SSH_KEY` - The private SSH key to use by default for all SSH connections. This is essential when using a basic docker container to trigger OTF. If not specified, it will default to use any private SSH keys available to the user executing the application.
182184
- `OTF_STAGING_DIR` - Staging base directory to place files before they're dropped into their final location. Default is `/tmp`

docs/usage.md

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -114,19 +114,21 @@ When running a batch, setting `-r` (or `OTF_RUN_ID`) causes all sub-task logs to
114114

115115
The full list of supported environment variables:
116116

117-
| Variable | Description |
118-
| --------------------------- | ---------------------------------------------------------------------------- |
119-
| `OTF_SSH_KEY` | Default private SSH key path for all SSH connections |
120-
| `OTF_LOG_DIRECTORY` | Base directory for log files |
121-
| `OTF_NO_LOG` | Set to `1` to disable file logging |
122-
| `OTF_LOG_JSON` | Set to `1` for JSON-formatted stderr logging |
123-
| `OTF_STAGING_DIR` | Override the staging directory for file transfers (default: `/tmp`) |
124-
| `OTF_RUN_ID` | Log aggregation identifier; equivalent to `-r` flag |
125-
| `OTF_BATCH_RESUME_LOG_DATE` | Resume batch from a specific date's logs (`YYYYMMDD` format) |
126-
| `OTF_VARIABLES_FILE` | Override the default variables file path. Comma-separate multiple files |
127-
| `OTF_LAZY_LOAD_VARIABLES` | Set to `1` to only resolve variables that are referenced by the current task |
128-
| `OTF_NO_THREAD_SLEEP` | Set to `1` to disable the 1-second sleep between batch task thread creation |
129-
| `OTF_PARAMIKO_ULTRA_DEBUG` | Set to `1` to enable ultra-verbose Paramiko SSH debug output (SFTP only) |
117+
| Variable | Description |
118+
| --------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------ |
119+
| `OTF_SSH_KEY` | Default private SSH key path for all SSH connections |
120+
| `OTF_LOG_DIRECTORY` | Base directory for log files |
121+
| `OTF_NO_LOG` | Set to `1` to disable file logging |
122+
| `OTF_LOG_JSON` | Set to `1` for JSON-formatted stderr logging |
123+
| `OTF_STAGING_DIR` | Override the staging directory for file transfers (default: `/tmp`) |
124+
| `OTF_RUN_ID` | Log aggregation identifier; equivalent to `-r` flag |
125+
| `OTF_BATCH_RESUME_LOG_DATE` | Resume batch from a specific date's logs (`YYYYMMDD` format) |
126+
| `OTF_VARIABLES_FILE` | Override the default variables file path. Comma-separate multiple files |
127+
| `OTF_LAZY_LOAD_VARIABLES` | Set to `1` to only resolve variables that are referenced by the current task |
128+
| `OTF_NO_THREAD_SLEEP` | Set to `1` to disable the 1-second sleep between batch task thread creation |
129+
| `OTF_BATCH_POLL_INTERVAL` | Interval in seconds between batch status-check iterations (default: `5`). **Testing only** — do not lower in production without understanding the trade-offs |
130+
| `OTF_LOG_MEMORY_USAGE` | Set to `1` to log RSS memory usage at each batch poll interval via the batch logger. Requires `psutil` to be installed |
131+
| `OTF_PARAMIKO_ULTRA_DEBUG` | Set to `1` to enable ultra-verbose Paramiko SSH debug output (SFTP only) |
130132

131133
## Overriding Variables at Runtime
132134

pyproject.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
44

55
[project]
66
name = "opentaskpy"
7-
version = "v26.18.0"
7+
version = "v26.18.1"
88
authors = [{ name = "Adam McDonagh", email = "adam@elitemonkey.net" }]
99
license-files = [ "LICENSE" ]
1010

@@ -43,6 +43,7 @@ dev = [
4343
"types-requests >=2.28",
4444
"types-paramiko >=3.0",
4545
"black == 26.3.1",
46+
"psutil >= 5.9",
4647
"isort",
4748
"pytest",
4849
"bumpver",
@@ -71,7 +72,7 @@ otf-batch-validator = "opentaskpy.cli.batch_validator:main"
7172
profile = 'black'
7273

7374
[tool.bumpver]
74-
current_version = "v26.18.0"
75+
current_version = "v26.18.1"
7576
version_pattern = "vYY.WW.PATCH[-TAG]"
7677
commit_message = "bump version {old_version} -> {new_version}"
7778
commit = true

src/opentaskpy/cli/task_run.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ def main() -> None:
3030
OTF_LOG_LEVEL - Equivalent to using -v
3131
OTF_LOG_INIT_EVENTS - Enable logging of log initialisation events. Set to 1 to enable
3232
OTF_NO_THREAD_SLEEP - Disable the 1-second sleep between batch task thread creation. Set to 1 to disable
33+
OTF_BATCH_POLL_INTERVAL - Interval in seconds between batch status-check iterations (default: 5). Intended for testing only; do not lower in production without understanding the trade-offs
34+
OTF_LOG_MEMORY_USAGE - Set to 1 to log RSS memory usage at each batch poll interval via the batch logger. Requires psutil
3335
OTF_SSH_KEY - Specify a particular SSH key to use for SSH/SFTP related transfers
3436
OTF_STAGING_DIR - Staging base directory to place files before final location. Default is /tmp
3537
OTF_BATCH_RESUME_LOG_DATE - Resume batch runs from a specific date in YYYYMMDD format

src/opentaskpy/otflogging.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,7 @@ def close_log_file(logger__: logging.Logger, result: bool = False) -> None:
403403
log_file_name = handler.baseFilename
404404

405405
log_handlers = []
406+
loggers_to_remove = []
406407

407408
if log_file_name:
408409
new_log_filename = None
@@ -413,7 +414,7 @@ def close_log_file(logger__: logging.Logger, result: bool = False) -> None:
413414

414415
# Loop through every logger that exists and has a handler of this filename, and
415416
# call the close method on it. Only the last one should rename the file
416-
for logger_ in list(logging.Logger.manager.loggerDict.values()):
417+
for logger_name, logger_ in list(logging.Logger.manager.loggerDict.items()):
417418
if isinstance(logger_, logging.Logger):
418419
for handler in logger_.handlers:
419420
if (
@@ -422,6 +423,8 @@ def close_log_file(logger__: logging.Logger, result: bool = False) -> None:
422423
):
423424
log_handlers.append(handler)
424425
handler.close()
426+
loggers_to_remove.append(logger_name)
427+
break
425428

426429
# Now everything is closed, we can rename the log file
427430
# If result is True, then rename the file and remove _running from the name
@@ -437,6 +440,16 @@ def close_log_file(logger__: logging.Logger, result: bool = False) -> None:
437440
for handler in log_handlers:
438441
handler.baseFilename = new_log_filename
439442

443+
# Unregister all task-scoped loggers that wrote to this log file so they
444+
# can be garbage collected rather than accumulating in the process.
445+
for logger_name in loggers_to_remove:
446+
logging.Logger.manager.loggerDict.pop(logger_name, None)
447+
448+
# Always unregister the logger passed in directly — this handles the case
449+
# where OTF_NO_LOG is set and no file handler was attached, so the
450+
# loggers_to_remove list above would be empty.
451+
logging.Logger.manager.loggerDict.pop(logger__.name, None)
452+
440453

441454
def redact(log_message: str) -> str:
442455
"""Redact any sensitive information from the log message.

src/opentaskpy/taskhandlers/batch.py

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Batch task handler class."""
22

3+
import os
34
import threading
45
import time
56
from concurrent.futures import ThreadPoolExecutor, wait
@@ -18,6 +19,17 @@
1819
DEFAULT_TASK_EXIT_CODE = 0
1920
TASK_TYPE = "B"
2021
BATCH_TASK_LOG_MARKER = "__OTF_BATCH_TASK_MARKER__"
22+
DEFAULT_BATCH_POLL_INTERVAL = 5
23+
24+
25+
def _rss_mb() -> float:
26+
"""Return the RSS of the current process in MB, or -1 if psutil is unavailable."""
27+
try:
28+
import psutil # pylint: disable=import-outside-toplevel
29+
30+
return float(psutil.Process(os.getpid()).memory_info().rss / (1024 * 1024))
31+
except Exception: # pylint: disable=broad-except
32+
return -1.0
2133

2234

2335
class Batch(TaskHandler):
@@ -403,8 +415,15 @@ def run(self, kill_event: threading.Event | None = None) -> bool: # noqa: C901
403415
)
404416
break
405417

406-
# Sleep 5 seconds before checking again
407-
time.sleep(5)
418+
# Sleep before checking again
419+
poll_interval = float(
420+
environ.get("OTF_BATCH_POLL_INTERVAL", DEFAULT_BATCH_POLL_INTERVAL)
421+
)
422+
time.sleep(poll_interval)
423+
424+
# Log memory usage if requested
425+
if environ.get("OTF_LOG_MEMORY_USAGE"):
426+
self.logger.info(f"[memory] RSS: {_rss_mb():.1f} MB")
408427

409428
# Check if we have been asked to kill the batch
410429
if kill_event and kill_event.is_set():
@@ -483,6 +502,10 @@ def task_runner(self, batch_task: dict, event: threading.Event) -> None:
483502
batch_task["batch_task_spec"]["order_id"],
484503
batch_task["task_id"],
485504
)
505+
# Release the task handler so that remote handler objects
506+
# (boto3 clients, SSH connections, etc.) can be garbage
507+
# collected rather than accumulating for the whole batch run.
508+
batch_task["task_handler"] = None
486509
break
487510

488511
# Check if we have been asked to kill the thread

0 commit comments

Comments
 (0)