-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtask_run.py
More file actions
151 lines (120 loc) · 5.55 KB
/
task_run.py
File metadata and controls
151 lines (120 loc) · 5.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#!/bin/env python3
"""CLI script wrapper for handling env vars and triggering the TaskRun class."""
import argparse
import logging
import os
import sys
from datetime import datetime
from textwrap import dedent
import opentaskpy.otflogging
from opentaskpy import taskrun # type: ignore[attr-defined]
CONFIG_PATH = f"{os.getcwd()}/cfg"
def main() -> None:
"""Parse args and call TaskRun class."""
parser = argparse.ArgumentParser(
formatter_class=argparse.RawTextHelpFormatter,
epilog=dedent("""\
Environment Variables:
There are several environment variables that can be used to impact the behaviour:
OTF_RUN_ID - Equivalent to using --runId argument
OTF_LOG_JSON - Change the log output format to structured JSON format
OTF_NO_LOG - Prevent logging to any files, will log to stdout/err only
OTF_LOG_DIRECTORY - Specify a particular log directory to write log files to
OTF_LOG_LEVEL - Equivalent to using -v
OTF_LOG_INIT_EVENTS - Enable logging of log initialisation events. Set to 1 to enable
OTF_NO_THREAD_SLEEP - Disable the 1-second sleep between batch task thread creation. Set to 1 to disable
OTF_SSH_KEY - Specify a particular SSH key to use for SSH/SFTP related transfers
OTF_STAGING_DIR - Staging base directory to place files before final location. Default is /tmp
OTF_BATCH_RESUME_LOG_DATE - Resume batch runs from a specific date in YYYYMMDD format
OTF_VARIABLES_FILE - Override the default variables file location
OTF_PARAMIKO_ULTRA_DEBUG - Enable Paramiko ultra_debug for verbose SSH communications. Set to 1 to enable
OTF_LAZY_LOAD_VARIABLES - Only load variables used by the task definition. Set to 1 to enable
OTF_STALE_RUNNING_LOG_SECONDS - Resume using stale _running logs after this many inactive seconds
OTF_NOOP - Equivalent to using --noop argument
Task Definition Overrides:
To override task specific values, you can use the following format in the environment variable name:
OTF_OVERRIDE_<TASK_TYPE>_<ATTRIBUTE>_<ATTRIBUTE>_<ATTRIBUTE>
e.g. OTF_OVERRIDE_TRANSFER_SOURCE_HOSTNAME
Case doesn't matter here. For attributes that are nested within an array, you can specify the array index
e.g. OTF_OVERRIDE_TRANSFER_DESTINATION_0_PROTOCOL_CREDENTIALS_USERNAME
"""),
)
parser.add_argument(
"--noop",
help=(
"Do not attempt to run anything. Only load the config files to validate"
" that they're OK"
),
action="store_true",
default=False,
required=False,
)
parser.add_argument(
"-t", "--taskId", help="Name of the JSON config to run", type=str, required=True
)
parser.add_argument(
"-r",
"--runId",
help=(
"Unique identifier to correlate logs with. e.g. if being triggered by an"
" external scheduler"
),
type=str,
required=False,
)
parser.add_argument(
"-v",
"--verbosity",
help="Increase verbosity:\n3 - DEBUG\n2 - VERBOSE2\n1 - VERBOSE1",
type=int,
)
parser.add_argument(
"-c", "--configDir", help="Directory containing task configurations", type=str
)
args = parser.parse_args()
if args.configDir:
global CONFIG_PATH # pylint: disable=global-statement
CONFIG_PATH = args.configDir
# If given a runId, then set the environment variable
if args.runId:
os.environ["OTF_RUN_ID"] = args.runId
if args.taskId:
os.environ["OTF_TASK_ID"] = args.taskId
if args.noop:
os.environ["OTF_NOOP"] = "true"
os.environ["OTF_LOG_RUN_PREFIX"] = datetime.now().strftime("%Y%m%d-%H%M%S.%f")[:-3]
logging_level = logging.INFO
if args.verbosity == 3:
logging_level = logging.DEBUG
elif args.verbosity == 2:
logging_level = 11
elif args.verbosity == 1:
logging_level = 12
logging.addLevelName(11, "VERBOSE2")
logging.addLevelName(12, "VERBOSE1")
# logging.basicConfig(
# format=OTF_LOG_FORMAT,
# level=logging_level,
# handlers=[logging.StreamHandler()],
# )
logger = opentaskpy.otflogging.init_logging(
__name__, args.taskId, level=logging_level, override_root_logger=True
)
logger.log(11, f"Log verbosity: {args.verbosity}")
# Create the TaskRun object
task_run_obj = taskrun.TaskRun(
args.taskId, CONFIG_PATH, noop="OTF_NOOP" in os.environ
)
try:
result = task_run_obj.run()
except Exception as ex: # pylint: disable=broad-exception-caught
logger.error(f"Error running task: {ex}")
# Ensure that the log is closed
opentaskpy.otflogging.close_log_file(logger, False)
if logger.getEffectiveLevel() <= 12:
raise ex
sys.exit(1)
if not result:
sys.exit(1)
if __name__ == "__main__":
main()