-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy pathjobs.py
More file actions
119 lines (86 loc) · 2.59 KB
/
jobs.py
File metadata and controls
119 lines (86 loc) · 2.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
__author__ = "Johannes Köster"
__copyright__ = "Copyright 2023, Johannes Köster"
__email__ = "johannes.koester@uni-due.de"
__license__ = "MIT"
from abc import ABC, abstractmethod
import sys
from typing import Any, Iterable, Mapping, Optional, Sequence, Union, List
from snakemake_interface_common.rules import RuleInterface
from snakemake_interface_executor_plugins.utils import TargetSpec
class JobExecutorInterface(ABC):
HIGHEST_PRIORITY = sys.maxsize
@property
@abstractmethod
def name(self) -> str: ...
@property
@abstractmethod
def jobid(self) -> int: ...
@abstractmethod
def logfile_suggestion(self, prefix: str) -> str: ...
@abstractmethod
def is_group(self) -> bool: ...
@abstractmethod
def log_info(self, skip_dynamic: bool = False) -> None: ...
@abstractmethod
def log_error(self, msg: Optional[str] = None, **kwargs) -> None: ...
@abstractmethod
def properties(
self, omit_resources: Sequence[str] = ("_cores", "_nodes"), **aux_properties
) -> Mapping[str, Any]: ...
@property
@abstractmethod
def resources(self) -> Mapping[str, Union[int, str]]: ...
@property
@abstractmethod
def is_local(self) -> bool: ...
@property
@abstractmethod
def is_updated(self) -> bool: ...
@property
@abstractmethod
def output(self) -> Iterable[str]: ...
@abstractmethod
def register(self, external_jobid: Optional[str] = None) -> None: ...
@abstractmethod
def get_target_spec(self) -> List[TargetSpec]: ...
@abstractmethod
def rules(self) -> Iterable[RuleInterface]: ...
@property
@abstractmethod
def attempt(self) -> int: ...
@property
@abstractmethod
def input(self) -> Iterable[str]: ...
@property
@abstractmethod
def threads(self) -> int: ...
@property
@abstractmethod
def log(self) -> Iterable[str]: ...
@abstractmethod
def get_wait_for_files(self) -> Iterable[str]: ...
@abstractmethod
def format_wildcards(self, string, **variables) -> str: ...
@property
@abstractmethod
def is_containerized(self) -> bool: ...
class SingleJobExecutorInterface(ABC):
@property
@abstractmethod
def rule(self) -> RuleInterface: ...
@property
@abstractmethod
def benchmark(self) -> Optional[str]: ...
@property
@abstractmethod
def message(self): ...
class GroupJobExecutorInterface(ABC):
@property
@abstractmethod
def jobs(self): ...
@property
@abstractmethod
def groupid(self): ...
@property
@abstractmethod
def toposorted(self): ...