-
Notifications
You must be signed in to change notification settings - Fork 38
Expand file tree
/
Copy pathtask_client.py
More file actions
73 lines (61 loc) · 2.02 KB
/
task_client.py
File metadata and controls
73 lines (61 loc) · 2.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Optional, List
from conductor.client.http.models import PollData
from conductor.client.http.models.workflow import Workflow
from conductor.client.http.models.task import Task
from conductor.client.http.models.task_result import TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.http.models.task_exec_log import TaskExecLog
class TaskClient(ABC):
@abstractmethod
def poll_task(self, task_type: str, worker_id: Optional[str] = None, domain: Optional[str] = None) -> Optional[Task]:
pass
@abstractmethod
def batch_poll_tasks(
self,
task_type: str,
worker_id: Optional[str] = None,
count: Optional[int] = None,
timeout_in_millisecond: Optional[int] = None,
domain: Optional[str] = None
) -> List[Task]:
pass
@abstractmethod
def get_task(self, task_id: str) -> Task:
pass
@abstractmethod
def update_task(self, task_result: TaskResult) -> str:
pass
@abstractmethod
def update_task_by_ref_name(
self,
workflow_id: str,
task_ref_name: str,
status: TaskResultStatus,
output: object,
worker_id: Optional[str] = None
) -> str:
pass
@abstractmethod
def update_task_sync(
self,
workflow_id: str,
task_ref_name: str,
status: TaskResultStatus,
output: object,
worker_id: Optional[str] = None
) -> Workflow:
pass
@abstractmethod
def get_queue_size_for_task(self, task_type: str) -> int:
pass
@abstractmethod
def add_task_log(self, task_id: str, log_message: str):
pass
@abstractmethod
def get_task_logs(self, task_id: str) -> List[TaskExecLog]:
pass
@abstractmethod
def get_task_poll_data(self, task_type: str) -> List[PollData]:
pass