Skip to content

Commit 7810082

Browse files
committed
Clear out task handlers for completed batches
1 parent 08794e8 commit 7810082

7 files changed

Lines changed: 425 additions & 16 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ These are some environment variables that can be used to customise the behaviour
177177
- `OTF_LOG_JSON` - Stderr logging will be in JSON format. Set to `1` to enable
178178
- `OTF_LOG_DIRECTORY` - Path under which log files are written
179179
- `OTF_NO_THREAD_SLEEP` - Disable the 1-second sleep between batch task thread creation. This sleep exists to prevent race conditions with concurrent protocol imports. Only disable this if you understand the implications. Set to `1` to disable.
180+
- `OTF_BATCH_POLL_INTERVAL` - Interval in seconds between batch status-check loop iterations (default: `5`). Reducing this makes batches respond faster when tasks complete, at the cost of higher CPU polling overhead. **Intended for testing only — do not change in production unless you fully understand the trade-offs.**
180181
- `OTF_RUN_ID` - (meant for internal use) An aggregator for log files. When set, all log files for a run will go under this sub directory. E.g. running a batch, all execution and transfer logs will be dropped into this sub directory, rather than a directory for each task name. This is equivalent to using `-r` or `--runId` command line arguments, which is generally preferred.
181182
- `OTF_SSH_KEY` - The private SSH key to use by default for all SSH connections. This is essential when using a basic docker container to trigger OTF. If not specified, it will default to use any private SSH keys available to the user executing the application.
182183
- `OTF_STAGING_DIR` - Staging base directory to place files before they're dropped into their final location. Default is `/tmp`

docs/usage.md

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -114,19 +114,20 @@ When running a batch, setting `-r` (or `OTF_RUN_ID`) causes all sub-task logs to
114114

115115
The full list of supported environment variables:
116116

117-
| Variable | Description |
118-
| --------------------------- | ---------------------------------------------------------------------------- |
119-
| `OTF_SSH_KEY` | Default private SSH key path for all SSH connections |
120-
| `OTF_LOG_DIRECTORY` | Base directory for log files |
121-
| `OTF_NO_LOG` | Set to `1` to disable file logging |
122-
| `OTF_LOG_JSON` | Set to `1` for JSON-formatted stderr logging |
123-
| `OTF_STAGING_DIR` | Override the staging directory for file transfers (default: `/tmp`) |
124-
| `OTF_RUN_ID` | Log aggregation identifier; equivalent to `-r` flag |
125-
| `OTF_BATCH_RESUME_LOG_DATE` | Resume batch from a specific date's logs (`YYYYMMDD` format) |
126-
| `OTF_VARIABLES_FILE` | Override the default variables file path. Comma-separate multiple files |
127-
| `OTF_LAZY_LOAD_VARIABLES` | Set to `1` to only resolve variables that are referenced by the current task |
128-
| `OTF_NO_THREAD_SLEEP` | Set to `1` to disable the 1-second sleep between batch task thread creation |
129-
| `OTF_PARAMIKO_ULTRA_DEBUG` | Set to `1` to enable ultra-verbose Paramiko SSH debug output (SFTP only) |
117+
| Variable | Description |
118+
| --------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------ |
119+
| `OTF_SSH_KEY` | Default private SSH key path for all SSH connections |
120+
| `OTF_LOG_DIRECTORY` | Base directory for log files |
121+
| `OTF_NO_LOG` | Set to `1` to disable file logging |
122+
| `OTF_LOG_JSON` | Set to `1` for JSON-formatted stderr logging |
123+
| `OTF_STAGING_DIR` | Override the staging directory for file transfers (default: `/tmp`) |
124+
| `OTF_RUN_ID` | Log aggregation identifier; equivalent to `-r` flag |
125+
| `OTF_BATCH_RESUME_LOG_DATE` | Resume batch from a specific date's logs (`YYYYMMDD` format) |
126+
| `OTF_VARIABLES_FILE` | Override the default variables file path. Comma-separate multiple files |
127+
| `OTF_LAZY_LOAD_VARIABLES` | Set to `1` to only resolve variables that are referenced by the current task |
128+
| `OTF_NO_THREAD_SLEEP` | Set to `1` to disable the 1-second sleep between batch task thread creation |
129+
| `OTF_BATCH_POLL_INTERVAL` | Interval in seconds between batch status-check iterations (default: `5`). **Testing only** — do not lower in production without understanding the trade-offs |
130+
| `OTF_PARAMIKO_ULTRA_DEBUG` | Set to `1` to enable ultra-verbose Paramiko SSH debug output (SFTP only) |
130131

131132
## Overriding Variables at Runtime
132133

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ dev = [
4343
"types-requests >=2.28",
4444
"types-paramiko >=3.0",
4545
"black == 26.3.1",
46+
"psutil >= 5.9",
4647
"isort",
4748
"pytest",
4849
"bumpver",

src/opentaskpy/cli/task_run.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ def main() -> None:
3030
OTF_LOG_LEVEL - Equivalent to using -v
3131
OTF_LOG_INIT_EVENTS - Enable logging of log initialisation events. Set to 1 to enable
3232
OTF_NO_THREAD_SLEEP - Disable the 1-second sleep between batch task thread creation. Set to 1 to disable
33+
OTF_BATCH_POLL_INTERVAL - Interval in seconds between batch status-check iterations (default: 5). Intended for testing only; do not lower in production without understanding the trade-offs
3334
OTF_SSH_KEY - Specify a particular SSH key to use for SSH/SFTP related transfers
3435
OTF_STAGING_DIR - Staging base directory to place files before final location. Default is /tmp
3536
OTF_BATCH_RESUME_LOG_DATE - Resume batch runs from a specific date in YYYYMMDD format

src/opentaskpy/otflogging.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,7 @@ def close_log_file(logger__: logging.Logger, result: bool = False) -> None:
403403
log_file_name = handler.baseFilename
404404

405405
log_handlers = []
406+
loggers_to_remove = []
406407

407408
if log_file_name:
408409
new_log_filename = None
@@ -413,7 +414,7 @@ def close_log_file(logger__: logging.Logger, result: bool = False) -> None:
413414

414415
# Loop through every logger that exists and has a handler of this filename, and
415416
# call the close method on it. Only the last one should rename the file
416-
for logger_ in list(logging.Logger.manager.loggerDict.values()):
417+
for logger_name, logger_ in list(logging.Logger.manager.loggerDict.items()):
417418
if isinstance(logger_, logging.Logger):
418419
for handler in logger_.handlers:
419420
if (
@@ -422,6 +423,8 @@ def close_log_file(logger__: logging.Logger, result: bool = False) -> None:
422423
):
423424
log_handlers.append(handler)
424425
handler.close()
426+
loggers_to_remove.append(logger_name)
427+
break
425428

426429
# Now everything is closed, we can rename the log file
427430
# If result is True, then rename the file and remove _running from the name
@@ -437,6 +440,16 @@ def close_log_file(logger__: logging.Logger, result: bool = False) -> None:
437440
for handler in log_handlers:
438441
handler.baseFilename = new_log_filename
439442

443+
# Unregister all task-scoped loggers that wrote to this log file so they
444+
# can be garbage collected rather than accumulating in the process.
445+
for logger_name in loggers_to_remove:
446+
logging.Logger.manager.loggerDict.pop(logger_name, None)
447+
448+
# Always unregister the logger passed in directly — this handles the case
449+
# where OTF_NO_LOG is set and no file handler was attached, so the
450+
# loggers_to_remove list above would be empty.
451+
logging.Logger.manager.loggerDict.pop(logger__.name, None)
452+
440453

441454
def redact(log_message: str) -> str:
442455
"""Redact any sensitive information from the log message.

src/opentaskpy/taskhandlers/batch.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
DEFAULT_TASK_EXIT_CODE = 0
1919
TASK_TYPE = "B"
2020
BATCH_TASK_LOG_MARKER = "__OTF_BATCH_TASK_MARKER__"
21+
DEFAULT_BATCH_POLL_INTERVAL = 5
2122

2223

2324
class Batch(TaskHandler):
@@ -403,8 +404,11 @@ def run(self, kill_event: threading.Event | None = None) -> bool: # noqa: C901
403404
)
404405
break
405406

406-
# Sleep 5 seconds before checking again
407-
time.sleep(5)
407+
# Sleep before checking again
408+
poll_interval = float(
409+
environ.get("OTF_BATCH_POLL_INTERVAL", DEFAULT_BATCH_POLL_INTERVAL)
410+
)
411+
time.sleep(poll_interval)
408412

409413
# Check if we have been asked to kill the batch
410414
if kill_event and kill_event.is_set():
@@ -483,6 +487,10 @@ def task_runner(self, batch_task: dict, event: threading.Event) -> None:
483487
batch_task["batch_task_spec"]["order_id"],
484488
batch_task["task_id"],
485489
)
490+
# Release the task handler so that remote handler objects
491+
# (boto3 clients, SSH connections, etc.) can be garbage
492+
# collected rather than accumulating for the whole batch run.
493+
batch_task["task_handler"] = None
486494
break
487495

488496
# Check if we have been asked to kill the thread

0 commit comments

Comments
 (0)