-
Notifications
You must be signed in to change notification settings - Fork 38
Expand file tree
/
Copy pathshell_worker.py
More file actions
72 lines (55 loc) · 2.09 KB
/
Copy pathshell_worker.py
File metadata and controls
72 lines (55 loc) · 2.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
"""
Shell Worker Example
====================
Demonstrates creating workers that execute shell commands.
What it does:
-------------
- Defines a worker that can execute shell commands with arguments
- Shows how to capture and return command output
- Uses subprocess module for safe command execution
Use Cases:
----------
- Running system commands from workflows (backups, file operations)
- Integrating with command-line tools
- Executing scripts as part of workflow tasks
- System administration automation
**Security Warning:**
--------------------
⚠️ This example is for educational purposes. In production:
- Never execute arbitrary shell commands from untrusted input
- Always validate and sanitize command inputs
- Use allowlists for permitted commands
- Consider security implications before deployment
- Review subprocess security best practices
Key Concepts:
-------------
- Worker tasks can execute any Python code
- subprocess module for command execution
- Capturing stdout for workflow results
- Type hints for worker inputs
"""
import subprocess
from typing import List
from conductor.client.automator.task_handler import TaskHandler
from conductor.client.configuration.configuration import Configuration
from conductor.client.worker.worker_task import worker_task
# @worker_task(task_definition_name='shell')
def execute_shell(command: str, args: List[str]) -> str:
full_command = [command]
full_command = full_command + args
result = subprocess.run(full_command, stdout=subprocess.PIPE)
return str(result.stdout)
@worker_task(task_definition_name='task_with_retries2')
def execute_shell() -> str:
return "hello"
def main():
# defaults to reading the configuration using following env variables
# CONDUCTOR_SERVER_URL : conductor server e.g. https://developer.orkescloud.com/api
# CONDUCTOR_AUTH_KEY : API Authentication Key
# CONDUCTOR_AUTH_SECRET: API Auth Secret
api_config = Configuration()
task_handler = TaskHandler(configuration=api_config)
task_handler.start_processes()
task_handler.join_processes()
if __name__ == '__main__':
main()