Considering real use cases, the goal is to run multiple workers in parallel. Due to some limitations with Python, a multiprocessing architecture was chosen in order to enable real parallelization.
You can write your workers independently and append them to a list. The TaskHandler class will spawn a unique and independent process for each worker, making sure it will behave as expected, by running an infinite loop like this:
- Poll for a
Taskat Conductor Server - Generate
TaskResultfrom givenTask - Update given
TaskwithTaskResultat Conductor Server
Currently, there are three ways of writing a Python worker:
The function should follow this signature:
ExecuteTaskFunction = Callable[
[
Union[Task, object]
],
Union[TaskResult, object]
]In other words:
- Input must be either a
Taskor anobject- If it isn't a
Task, the assumption is - you're expecting to receive theTask.input_dataas the object
- If it isn't a
- Output must be either a
TaskResultor anobject- If it isn't a
TaskResult, the assumption is - you're expecting to use the object as theTaskResult.output_data
- If it isn't a
Quick example below:
from conductor.client.http.models import Task, TaskResult
from conductor.shared.http.enums import TaskResultStatus
def execute(task: Task) -> TaskResult:
task_result = TaskResult(
task_id=task.task_id,
workflow_instance_id=task.workflow_instance_id,
worker_id='your_custom_id'
)
task_result.add_output_data('worker_style', 'function')
task_result.status = TaskResultStatus.COMPLETED
return task_resultIn the case you like more details, you can take a look at all possible combinations of workers here
The class must implement WorkerInterface class, which requires an execute method. The remaining ones are inherited, but can be easily overridden. Example with a custom polling interval:
from conductor.client.http.models import Task, TaskResult
from conductor.shared.http.enums import TaskResultStatus
from conductor.client.worker.worker_interface import WorkerInterface
class SimplePythonWorker(WorkerInterface):
def execute(self, task: Task) -> TaskResult:
task_result = self.get_task_result_from_task(task)
task_result.add_output_data('worker_style', 'class')
task_result.add_output_data('secret_number', 1234)
task_result.add_output_data('is_it_true', False)
task_result.status = TaskResultStatus.COMPLETED
return task_result
def get_polling_interval_in_seconds(self) -> float:
# poll every 500ms
return 0.5A worker can also be invoked by adding a WorkerTask decorator as shown in the below example. As long as the annotated worker is in any file inside the root folder of your worker application, it will be picked up by the TaskHandler, see Run Workers
The arguments that can be passed when defining the decorated worker are:
- task_definition_name: The task definition name of the condcutor task that needs to be polled for.
- domain: Optional routing domain of the worker to execute tasks with a specific domain
- worker_id: An optional worker id used to identify the polling worker
- poll_interval: Polling interval in seconds. Defaulted to 1 second if not passed.
from conductor.client.worker.worker_task import WorkerTask
@WorkerTask(task_definition_name='python_annotated_task', worker_id='decorated', poll_interval=200.0)
def python_annotated_task(input) -> object:
return {'message': 'python is so cool :)'}Now you can run your workers by calling a TaskHandler, example:
from conductor.shared.configuration.settings.authentication_settings import AuthenticationSettings
from conductor.client.configuration.configuration import Configuration
from conductor.client.automator.task_handler import TaskHandler
from conductor.client.worker.worker import Worker
#### Add these lines if running on a mac####
from multiprocessing import set_start_method
set_start_method('fork')
############################################
SERVER_API_URL = 'http://localhost:8080/api'
KEY_ID = '<KEY_ID>'
KEY_SECRET = '<KEY_SECRET>'
configuration = Configuration(
server_api_url=SERVER_API_URL,
debug=True,
authentication_settings=AuthenticationSettings(
key_id=KEY_ID,
key_secret=KEY_SECRET
),
)
workers = [
SimplePythonWorker(
task_definition_name='python_task_example'
),
Worker(
task_definition_name='python_execute_function_task',
execute_function=execute,
poll_interval=250,
domain='test'
)
]
# If there are decorated workers in your application, scan_for_annotated_workers should be set
# default value of scan_for_annotated_workers is False
with TaskHandler(workers, configuration, scan_for_annotated_workers=True) as task_handler:
task_handler.start_processes()If you paste the above code in a file called main.py, you can launch the workers by running:
python3 main.pyWorkers can be configured to start polling for work that is tagged by a task domain. See more on domains here.
from conductor.client.worker.worker_task import WorkerTask
@WorkerTask(task_definition_name='python_annotated_task', domain='cool')
def python_annotated_task(input) -> object:
return {'message': 'python is so cool :)'}The above code would run a worker polling for task of type, python_annotated_task, but only for workflows that have a task to domain mapping specified with domain for this task as cool.
"taskToDomain": {
"python_annotated_task": "cool"
}You can choose to pass an worker.ini file for specifying worker arguments like domain and polling_interval. This allows for configuring your workers dynamically and hence provides the flexbility along with cleaner worker code. This file has to be in the same directory as the main.py of your worker application.
[task_definition_name]
domain = <domain>
polling_interval = <polling-interval-in-ms>
There is an option for specifying common set of properties which apply to all workers by putting them in the DEFAULT section. All workers who don't have a domain or/and polling_interval specified will default to these values.
[DEFAULT]
domain = <domain>
polling_interval = <polling-interval-in-ms>
[DEFAULT]
domain = nice
polling_interval = 2000
[python_annotated_task_1]
domain = cool
polling_interval = 500
[python_annotated_task_2]
domain = hot
polling_interval = 300
With the presence of the above config file, you don't need to specify domain and poll_interval for any of the worker task types.
from conductor.client.worker.worker_task import WorkerTask
@WorkerTask(task_definition_name='python_annotated_task_1', domain='cool', poll_interval=500.0)
def python_annotated_task(input) -> object:
return {'message': 'python is so cool :)'}
@WorkerTask(task_definition_name='python_annotated_task_2', domain='hot', poll_interval=300.0)
def python_annotated_task_2(input) -> object:
return {'message': 'python is so hot :)'}
@WorkerTask(task_definition_name='python_annotated_task_3', domain='nice', poll_interval=2000.0)
def python_annotated_task_3(input) -> object:
return {'message': 'python is so nice :)'}
@WorkerTask(task_definition_name='python_annotated_task_4', domain='nice', poll_interval=2000.0)
def python_annotated_task_4(input) -> object:
return {'message': 'python is very nice :)'}from conductor.client.worker.worker_task import WorkerTask
@WorkerTask(task_definition_name='python_annotated_task_1')
def python_annotated_task(input) -> object:
return {'message': 'python is so cool :)'}
@WorkerTask(task_definition_name='python_annotated_task_2')
def python_annotated_task_2(input) -> object:
return {'message': 'python is so hot :)'}
@WorkerTask(task_definition_name='python_annotated_task_3')
def python_annotated_task_3(input) -> object:
return {'message': 'python is so nice :)'}
@WorkerTask(task_definition_name='python_annotated_task_4')
def python_annotated_task_4(input) -> object:
return {'message': 'python is very nice :)'}Workers can also be configured at run time by using environment variables which override configuration files as well.
conductor_worker_polling_interval=<polling-interval-in-ms>
conductor_worker_domain=<domain>
conductor_worker_<task_definition_name>_polling_interval=<polling-interval-in-ms>
conductor_worker_<task_definition_name>_domain=<domain>
conductor_worker_polling_interval=2000
conductor_worker_domain=nice
conductor_worker_python_annotated_task_1_polling_interval=500
conductor_worker_python_annotated_task_1_domain=cool
conductor_worker_python_annotated_task_2_polling_interval=300
conductor_worker_python_annotated_task_2_domain=hot
If the worker configuration is initialized using multiple mechanisms mentioned above then the following order of priority will be considered from highest to lowest:
- Environment Variables
- Config File
- Worker Constructor Arguments
See Using Conductor Playground for more details on how to use Playground environment for testing.
If you're looking for better performance (i.e. more workers of the same type) - you can simply append more instances of the same worker, like this:
workers = [
SimplePythonWorker(
task_definition_name='python_task_example'
),
SimplePythonWorker(
task_definition_name='python_task_example'
),
SimplePythonWorker(
task_definition_name='python_task_example'
),
...
]workers = [
Worker(
task_definition_name='python_task_example',
execute_function=execute,
poll_interval=0.25,
),
Worker(
task_definition_name='python_task_example',
execute_function=execute,
poll_interval=0.25,
),
Worker(
task_definition_name='python_task_example',
execute_function=execute,
poll_interval=0.25,
)
...
]Python is great, but at times you need to call into native C/C++ code. Here is an example how you can do that with Conductor SDK.
- C++ function example (sum two integers)
#include <iostream> extern "C" int32_t get_sum(const int32_t A, const int32_t B) { return A + B; }
- C++ file name:
simple_cpp_lib.cpp - Library output name goal:
lib.sog++ -c -fPIC simple_cpp_lib.cpp -o simple_cpp_lib.o g++ -shared -Wl,-install_name,lib.so -o lib.so simple_cpp_lib.o
You can use the Python library to call native code written in C++. Here is an example that calls native C++ library from the Python worker. See simple_cpp_lib.cpp and simple_cpp_worker.py for complete working example.
from conductor.client.http.models.task import Task
from conductor.client.http.models.task_result import TaskResult
from conductor.shared.http.enums import TaskResultStatus
from conductor.client.worker.worker_interface import WorkerInterface
from ctypes import cdll
class CppWrapper:
def __init__(self, file_path='./lib.so'):
self.cpp_lib = cdll.LoadLibrary(file_path)
def get_sum(self, X: int, Y: int) -> int:
return self.cpp_lib.get_sum(X, Y)
class SimpleCppWorker(WorkerInterface):
cpp_wrapper = CppWrapper()
def execute(self, task: Task) -> TaskResult:
execution_result = self.cpp_wrapper.get_sum(1, 2)
task_result = self.get_task_result_from_task(task)
task_result.add_output_data(
'sum', execution_result
)
task_result.status = TaskResultStatus.COMPLETED
return task_result