-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy pathreal.py
More file actions
185 lines (160 loc) · 6.36 KB
/
real.py
File metadata and controls
185 lines (160 loc) · 6.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
__author__ = "Johannes Köster"
__copyright__ = "Copyright 2022, Johannes Köster"
__email__ = "johannes.koester@uni-due.de"
__license__ = "MIT"
from abc import abstractmethod
from typing import Dict, Optional
from snakemake_interface_common import at_least_snakemake_version
from snakemake_interface_executor_plugins.executors.base import (
AbstractExecutor,
SubmittedJobInfo,
)
from snakemake_interface_executor_plugins.logging import LoggerExecutorInterface
from snakemake_interface_executor_plugins.settings import ExecMode, ExecutorSettingsBase
from snakemake_interface_executor_plugins.utils import (
encode_target_jobs_cli_args,
format_cli_arg,
join_cli_args,
)
from snakemake_interface_executor_plugins.jobs import JobExecutorInterface
from snakemake_interface_executor_plugins.workflow import WorkflowExecutorInterface
class RealExecutor(AbstractExecutor):
def __init__(
self,
workflow: WorkflowExecutorInterface,
logger: LoggerExecutorInterface,
executor_settings: Optional[ExecutorSettingsBase],
post_init: bool = True,
):
super().__init__(
workflow,
logger,
executor_settings,
)
self.executor_settings = executor_settings
self.snakefile = workflow.main_snakefile
if post_init:
self.__post_init__()
def __post_init__(self):
"""This method is called after the constructor. By default, it does nothing."""
pass
@property
@abstractmethod
def cores(self):
# return "all" in case of remote executors,
# otherwise self.workflow.resource_settings.cores
...
def report_job_submission(
self, job_info: SubmittedJobInfo, register_job: bool = True
):
super().report_job_submission(job_info)
if register_job:
try:
job_info.job.register(external_jobid=job_info.external_jobid)
except IOError as e:
self.logger.info(
f"Failed to set marker file for job started ({e}). "
"Snakemake will work, but cannot ensure that output files "
"are complete in case of a kill signal or power loss. "
"Please ensure write permissions for the "
"directory {self.workflow.persistence.path}."
)
def handle_job_success(self, job: JobExecutorInterface):
pass
def handle_job_error(self, job: JobExecutorInterface):
pass
def additional_general_args(self):
"""Inherit this method to add stuff to the general args.
A list must be returned.
"""
return []
def get_job_args(self, job: JobExecutorInterface, **kwargs):
unneeded_temp_files = list(self.workflow.dag.get_unneeded_temp_files(job))
arg_list = [
format_cli_arg(
"--target-jobs", encode_target_jobs_cli_args(job.get_target_spec())
),
# Restrict considered rules for faster DAG computation.
format_cli_arg(
"--allowed-rules",
job.rules,
quote=False,
# Via this fix: https://github.com/snakemake/snakemake/pull/3640
# --allowed-rules can be always used. The fix is released in
# snakemake 9.6.2. Before, --allowed-rules had to be skipped
# for jobs that have been updated after checkpoint evaluation.
skip=job.is_updated and not at_least_snakemake_version("9.6.2"),
),
# Ensure that a group uses its proper local groupid.
format_cli_arg("--local-groupid", job.jobid, skip=not job.is_group()),
format_cli_arg("--cores", kwargs.get("cores", self.cores)),
format_cli_arg("--attempt", job.attempt),
format_cli_arg("--force-use-threads", not job.is_group()),
format_cli_arg(
"--unneeded-temp-files",
unneeded_temp_files,
skip=not unneeded_temp_files,
),
]
return join_cli_args(arg_list)
@property
def job_specific_local_groupid(self):
return True
def get_snakefile(self):
return self.snakefile
@abstractmethod
def get_python_executable(self): ...
@abstractmethod
def get_exec_mode(self) -> ExecMode: ...
@property
def common_settings(self):
return self.workflow.executor_plugin.common_settings
def get_envvar_declarations(self):
declaration = ""
envars = self.envvars()
if self.common_settings.pass_envvar_declarations_to_cmd and envars:
defs = " ".join(f"{var}={value!r}" for var, value in envars.items())
declaration = f"export {defs} &&"
return declaration
def get_job_exec_prefix(self, job: JobExecutorInterface):
return ""
def get_job_exec_suffix(self, job: JobExecutorInterface):
return ""
def format_job_exec(self, job: JobExecutorInterface) -> str:
prefix = self.get_job_exec_prefix(job)
if prefix:
prefix += " &&"
suffix = self.get_job_exec_suffix(job)
if suffix:
suffix = f"&& {suffix}"
general_args = self.workflow.spawned_job_args_factory.general_args(
executor_common_settings=self.common_settings
)
precommand = self.workflow.spawned_job_args_factory.precommand(
executor_common_settings=self.common_settings
)
if precommand:
precommand += " &&"
args = join_cli_args(
[
prefix,
self.get_envvar_declarations(),
precommand,
self.get_python_executable(),
"-m snakemake",
format_cli_arg("--snakefile", self.get_snakefile()),
self.get_job_args(job),
general_args,
self.additional_general_args(),
format_cli_arg("--mode", self.get_exec_mode().item_to_choice()),
format_cli_arg(
"--local-groupid",
self.workflow.group_settings.local_groupid,
skip=self.job_specific_local_groupid,
),
suffix,
]
)
return args
def envvars(self) -> Dict[str, str]:
return self.workflow.spawned_job_args_factory.envvars()