-
Notifications
You must be signed in to change notification settings - Fork 38
Expand file tree
/
Copy pathshell_worker.py
More file actions
121 lines (93 loc) · 3.98 KB
/
Copy pathshell_worker.py
File metadata and controls
121 lines (93 loc) · 3.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import asyncio
from typing import Dict
from conductor.asyncio_client.adapters import ApiClient
from conductor.asyncio_client.automator.task_handler import TaskHandler
from conductor.asyncio_client.configuration.configuration import Configuration
from conductor.asyncio_client.orkes.orkes_clients import OrkesClients
from conductor.asyncio_client.worker.worker_task import worker_task
from conductor.asyncio_client.workflow.conductor_workflow import AsyncConductorWorkflow
from conductor.shared.worker.exception import NonRetryableException
@worker_task(task_definition_name="file_operation")
def file_operation(
operation: str, source: str, destination: str = None
) -> Dict[str, str]:
try:
import os
import shutil
if operation == "copy":
if not destination:
raise NonRetryableException("Destination required for copy operation")
shutil.copy2(source, destination)
result = f"Copied {source} to {destination}"
elif operation == "move":
if not destination:
raise NonRetryableException("Destination required for move operation")
shutil.move(source, destination)
result = f"Moved {source} to {destination}"
elif operation == "delete":
if os.path.isfile(source):
os.remove(source)
elif os.path.isdir(source):
shutil.rmtree(source)
else:
raise NonRetryableException(f"Path does not exist: {source}")
result = f"Deleted {source}"
elif operation == "mkdir":
os.makedirs(source, exist_ok=True)
result = f"Created directory {source}"
elif operation == "exists":
result = f"Path {source} exists: {os.path.exists(source)}"
else:
raise NonRetryableException(f"Unsupported operation: {operation}")
return {
"operation": operation,
"source": source,
"destination": destination,
"result": result,
"success": True,
}
except Exception as e:
raise NonRetryableException(f"File operation failed: {str(e)}")
async def create_shell_workflow(workflow_executor) -> AsyncConductorWorkflow:
workflow = AsyncConductorWorkflow(
name="async_shell_operations", version=1, executor=workflow_executor
)
create_dir = file_operation(
task_ref_name="create_temp_dir", operation="mkdir", source="./temp_workflow_dir"
)
cleanup = file_operation(
task_ref_name="cleanup_temp_dir",
operation="delete",
source="./temp_workflow_dir",
)
workflow >> create_dir >> cleanup
return workflow
async def main():
# Configuration - defaults to reading from environment variables:
# CONDUCTOR_SERVER_URL : conductor server e.g. https://play.orkes.io/api
# CONDUCTOR_AUTH_KEY : API Authentication Key
# CONDUCTOR_AUTH_SECRET: API Auth Secret
api_config = Configuration()
api_config.apply_logging_config()
print("Starting async shell worker...")
task_handler = TaskHandler(
configuration=api_config, scan_for_annotated_workers=True
)
task_handler.start_processes()
async with ApiClient(api_config) as api_client:
clients = OrkesClients(api_client=api_client, configuration=api_config)
workflow_executor = clients.get_workflow_executor()
print("Creating shell workflow...")
workflow = await create_shell_workflow(workflow_executor)
print("Registering shell workflow...")
await workflow.register(True)
print("Executing shell workflow...")
workflow_run = await workflow.execute(workflow_input={})
print(f"Workflow ID: {workflow_run.workflow_id}")
print(f"Status: {workflow_run.status}")
print(
f"Execution URL: {api_config.ui_host}/execution/{workflow_run.workflow_id}"
)
task_handler.stop_processes()
if __name__ == "__main__":
asyncio.run(main())