From f7fee61c5f6b73ddb0f92d520ba5a081854af49a Mon Sep 17 00:00:00 2001 From: harshilraval Date: Mon, 9 Jun 2025 15:20:06 +0530 Subject: [PATCH 1/7] Add support for signal api changes --- .../client/http/api/task_resource_api.py | 233 ++++++++++++ src/conductor/client/http/models/__init__.py | 4 +- .../client/http/models/signal_response.py | 343 ++++++++++++++++++ src/conductor/client/http/models/task_run.py | 117 ++++++ .../client/http/models/workflow_run.py | 129 ++++++- 5 files changed, 818 insertions(+), 8 deletions(-) create mode 100644 src/conductor/client/http/models/signal_response.py create mode 100644 src/conductor/client/http/models/task_run.py diff --git a/src/conductor/client/http/api/task_resource_api.py b/src/conductor/client/http/api/task_resource_api.py index f81a7cee2..e58c1c9b7 100644 --- a/src/conductor/client/http/api/task_resource_api.py +++ b/src/conductor/client/http/api/task_resource_api.py @@ -7,6 +7,7 @@ import six from conductor.client.http.api_client import ApiClient +from conductor.client.http.models.signal_response import SignalResponse class TaskResourceApi(object): @@ -1730,3 +1731,235 @@ def update_task_sync_with_http_info(self, body, workflow_id, task_ref_name, stat _preload_content=params.get('_preload_content', True), _request_timeout=params.get('_request_timeout'), collection_formats=collection_formats) + + def signal_workflow_task_async(self, workflow_id, status, body, **kwargs): # noqa: E501 + """Update running task in the workflow with given status and output asynchronously # noqa: E501 + + This method makes a synchronous HTTP request by default. To make an + asynchronous HTTP request, please pass async_req=True + >>> thread = api.signal_workflow_task_async(workflow_id, status, body, async_req=True) + >>> result = thread.get() + + :param async_req bool + :param str workflow_id: (required) + :param str status: (required) + :param dict(str, object) body: (required) + :return: None + If the method is called asynchronously, + returns the request thread. + """ + kwargs['_return_http_data_only'] = True + if kwargs.get('async_req'): + return self.signal_workflow_task_async_with_http_info(workflow_id, status, body, **kwargs) # noqa: E501 + else: + (data) = self.signal_workflow_task_async_with_http_info(workflow_id, status, body, **kwargs) # noqa: E501 + return data + + def signal_workflow_task_async_with_http_info(self, workflow_id, status, body, **kwargs): # noqa: E501 + """Update running task in the workflow with given status and output asynchronously # noqa: E501 + + This method makes a synchronous HTTP request by default. To make an + asynchronous HTTP request, please pass async_req=True + >>> thread = api.signal_workflow_task_async_with_http_info(workflow_id, status, body, async_req=True) + >>> result = thread.get() + + :param async_req bool + :param str workflow_id: (required) + :param str status: (required) + :param dict(str, object) body: (required) + :return: None + If the method is called asynchronously, + returns the request thread. + """ + + all_params = ['workflow_id', 'status', 'body'] # noqa: E501 + all_params.append('async_req') + all_params.append('_return_http_data_only') + all_params.append('_preload_content') + all_params.append('_request_timeout') + + params = locals() + for key, val in six.iteritems(params['kwargs']): + if key not in all_params: + raise TypeError( + "Got an unexpected keyword argument '%s'" + " to method signal_workflow_task_async" % key + ) + params[key] = val + del params['kwargs'] + # verify the required parameter 'workflow_id' is set + if ('workflow_id' not in params or + params['workflow_id'] is None): + raise ValueError( + "Missing the required parameter `workflow_id` when calling `signal_workflow_task_async`") # noqa: E501 + # verify the required parameter 'status' is set + if ('status' not in params or + params['status'] is None): + raise ValueError( + "Missing the required parameter `status` when calling `signal_workflow_task_async`") # noqa: E501 + # verify the required parameter 'body' is set + if ('body' not in params or + params['body'] is None): + raise ValueError( + "Missing the required parameter `body` when calling `signal_workflow_task_async`") # noqa: E501 + + collection_formats = {} + + path_params = {} + if 'workflow_id' in params: + path_params['workflowId'] = params['workflow_id'] # noqa: E501 + if 'status' in params: + path_params['status'] = params['status'] # noqa: E501 + + query_params = [] + + header_params = {} + + form_params = [] + local_var_files = {} + + body_params = None + if 'body' in params: + body_params = params['body'] + # HTTP header `Content-Type` + header_params['Content-Type'] = self.api_client.select_header_content_type( # noqa: E501 + ['application/json']) # noqa: E501 + + # Authentication setting + auth_settings = [] # noqa: E501 + + return self.api_client.call_api( + '/tasks/{workflowId}/{status}/signal', 'POST', + path_params, + query_params, + header_params, + body=body_params, + post_params=form_params, + files=local_var_files, + response_type=None, # noqa: E501 + auth_settings=auth_settings, + async_req=params.get('async_req'), + _return_http_data_only=params.get('_return_http_data_only'), + _preload_content=params.get('_preload_content', True), + _request_timeout=params.get('_request_timeout'), + collection_formats=collection_formats) + + def signal_workflow_task_sync(self, workflow_id, status, body, **kwargs): # noqa: E501 + """Update running task in the workflow with given status and output synchronously and return back updated workflow # noqa: E501 + + This method makes a synchronous HTTP request by default. To make an + asynchronous HTTP request, please pass async_req=True + >>> thread = api.signal_workflow_task_sync(workflow_id, status, body, async_req=True) + >>> result = thread.get() + + :param async_req bool + :param str workflow_id: (required) + :param str status: (required) + :param dict(str, object) body: (required) + :param str return_strategy: + :return: SignalResponse + If the method is called asynchronously, + returns the request thread. + """ + kwargs['_return_http_data_only'] = True + if kwargs.get('async_req'): + return self.signal_workflow_task_sync_with_http_info(workflow_id, status, body, **kwargs) # noqa: E501 + else: + (data) = self.signal_workflow_task_sync_with_http_info(workflow_id, status, body, **kwargs) # noqa: E501 + return data + + def signal_workflow_task_sync_with_http_info(self, workflow_id, status, body, **kwargs): # noqa: E501 + """Update running task in the workflow with given status and output synchronously and return back updated workflow # noqa: E501 + + This method makes a synchronous HTTP request by default. To make an + asynchronous HTTP request, please pass async_req=True + >>> thread = api.signal_workflow_task_sync_with_http_info(workflow_id, status, body, async_req=True) + >>> result = thread.get() + + :param async_req bool + :param str workflow_id: (required) + :param str status: (required) + :param dict(str, object) body: (required) + :param str return_strategy: + :return: SignalResponse + If the method is called asynchronously, + returns the request thread. + """ + + all_params = ['workflow_id', 'status', 'body', 'return_strategy'] # noqa: E501 + all_params.append('async_req') + all_params.append('_return_http_data_only') + all_params.append('_preload_content') + all_params.append('_request_timeout') + + params = locals() + for key, val in six.iteritems(params['kwargs']): + if key not in all_params: + raise TypeError( + "Got an unexpected keyword argument '%s'" + " to method signal_workflow_task_sync" % key + ) + params[key] = val + del params['kwargs'] + # verify the required parameter 'workflow_id' is set + if ('workflow_id' not in params or + params['workflow_id'] is None): + raise ValueError( + "Missing the required parameter `workflow_id` when calling `signal_workflow_task_sync`") # noqa: E501 + # verify the required parameter 'status' is set + if ('status' not in params or + params['status'] is None): + raise ValueError( + "Missing the required parameter `status` when calling `signal_workflow_task_sync`") # noqa: E501 + # verify the required parameter 'body' is set + if ('body' not in params or + params['body'] is None): + raise ValueError( + "Missing the required parameter `body` when calling `signal_workflow_task_sync`") # noqa: E501 + + collection_formats = {} + + path_params = {} + if 'workflow_id' in params: + path_params['workflowId'] = params['workflow_id'] # noqa: E501 + if 'status' in params: + path_params['status'] = params['status'] # noqa: E501 + + query_params = [] + if 'return_strategy' in params: + query_params.append(('returnStrategy', params['return_strategy'])) # noqa: E501 + + header_params = {} + + form_params = [] + local_var_files = {} + + body_params = None + if 'body' in params: + body_params = params['body'] + # HTTP header `Accept` + header_params['Accept'] = self.api_client.select_header_accept( + ['application/json']) # noqa: E501 + + # HTTP header `Content-Type` + header_params['Content-Type'] = self.api_client.select_header_content_type( # noqa: E501 + ['application/json']) # noqa: E501 + + # Authentication setting + auth_settings = [] # noqa: E501 + + return self.api_client.call_api( + '/tasks/{workflowId}/{status}/signal/sync', 'POST', + path_params, + query_params, + header_params, + body=body_params, + post_params=form_params, + files=local_var_files, + response_type='SignalResponse', # noqa: E501 + auth_settings=auth_settings, + async_req=params.get('async_req'), + _return_http_data_only=params.get('_return_http_data_only'), + _preload_content=params.get('_preload_content', True), + _request_timeout=params.get('_request_timeout'), + collection_formats=collection_formats) \ No newline at end of file diff --git a/src/conductor/client/http/models/__init__.py b/src/conductor/client/http/models/__init__.py index 0777d1493..cdb03eac0 100644 --- a/src/conductor/client/http/models/__init__.py +++ b/src/conductor/client/http/models/__init__.py @@ -56,4 +56,6 @@ from conductor.client.http.models.state_change_event import StateChangeEvent, StateChangeConfig, StateChangeEventType from conductor.client.http.models.workflow_task import CacheConfig from conductor.client.http.models.schema_def import SchemaDef -from conductor.client.http.models.schema_def import SchemaType \ No newline at end of file +from conductor.client.http.models.schema_def import SchemaType +from conductor.client.http.models.signal_response import SignalResponse +from conductor.client.http.models.task_run import TaskRun \ No newline at end of file diff --git a/src/conductor/client/http/models/signal_response.py b/src/conductor/client/http/models/signal_response.py new file mode 100644 index 000000000..36bd4c3f7 --- /dev/null +++ b/src/conductor/client/http/models/signal_response.py @@ -0,0 +1,343 @@ +import pprint +import re # noqa: F401 +import six +from dataclasses import dataclass, field, InitVar +from typing import Dict, Any, Optional +from enum import Enum + + +class WorkflowSignalReturnStrategy(Enum): + """Enum for workflow signal return strategy""" + TARGET_WORKFLOW = "TARGET_WORKFLOW" + BLOCKING_WORKFLOW = "BLOCKING_WORKFLOW" + BLOCKING_TASK = "BLOCKING_TASK" + BLOCKING_TASK_INPUT = "BLOCKING_TASK_INPUT" + + +@dataclass +class SignalResponse: + """NOTE: This class is auto generated by the swagger code generator program. + + Do not edit the class manually. + """ + """ + Attributes: + swagger_types (dict): The key is attribute name + and the value is attribute type. + attribute_map (dict): The key is attribute name + and the value is json key in definition. + """ + _response_type: Optional[str] = field(default=None, init=False) + _target_workflow_id: Optional[str] = field(default=None, init=False) + _target_workflow_status: Optional[str] = field(default=None, init=False) + _request_id: Optional[str] = field(default=None, init=False) + _workflow_id: Optional[str] = field(default=None, init=False) + _correlation_id: Optional[str] = field(default=None, init=False) + _input: Optional[Dict[str, Any]] = field(default=None, init=False) + _output: Optional[Dict[str, Any]] = field(default=None, init=False) + + response_type: InitVar[Optional[str]] = None + target_workflow_id: InitVar[Optional[str]] = None + target_workflow_status: InitVar[Optional[str]] = None + request_id: InitVar[Optional[str]] = None + workflow_id: InitVar[Optional[str]] = None + correlation_id: InitVar[Optional[str]] = None + input: InitVar[Optional[Dict[str, Any]]] = None + output: InitVar[Optional[Dict[str, Any]]] = None + + swagger_types = { + 'response_type': 'str', + 'target_workflow_id': 'str', + 'target_workflow_status': 'str', + 'request_id': 'str', + 'workflow_id': 'str', + 'correlation_id': 'str', + 'input': 'dict(str, object)', + 'output': 'dict(str, object)' + } + + attribute_map = { + 'response_type': 'responseType', + 'target_workflow_id': 'targetWorkflowId', + 'target_workflow_status': 'targetWorkflowStatus', + 'request_id': 'requestId', + 'workflow_id': 'workflowId', + 'correlation_id': 'correlationId', + 'input': 'input', + 'output': 'output' + } + + def __init__(self, response_type=None, target_workflow_id=None, target_workflow_status=None, + request_id=None, workflow_id=None, correlation_id=None, input=None, output=None): # noqa: E501 + """SignalResponse - a model defined in Swagger""" # noqa: E501 + self._response_type = None + self._target_workflow_id = None + self._target_workflow_status = None + self._request_id = None + self._workflow_id = None + self._correlation_id = None + self._input = None + self._output = None + self.discriminator = None + + if response_type is not None: + self.response_type = response_type + if target_workflow_id is not None: + self.target_workflow_id = target_workflow_id + if target_workflow_status is not None: + self.target_workflow_status = target_workflow_status + if request_id is not None: + self.request_id = request_id + if workflow_id is not None: + self.workflow_id = workflow_id + if correlation_id is not None: + self.correlation_id = correlation_id + if input is not None: + self.input = input + if output is not None: + self.output = output + + def __post_init__(self, response_type, target_workflow_id, target_workflow_status, request_id, + workflow_id, correlation_id, input, output): + if response_type is not None: + self.response_type = response_type + if target_workflow_id is not None: + self.target_workflow_id = target_workflow_id + if target_workflow_status is not None: + self.target_workflow_status = target_workflow_status + if request_id is not None: + self.request_id = request_id + if workflow_id is not None: + self.workflow_id = workflow_id + if correlation_id is not None: + self.correlation_id = correlation_id + if input is not None: + self.input = input + if output is not None: + self.output = output + + @property + def response_type(self): + """Gets the response_type of this SignalResponse. # noqa: E501 + + + :return: The response_type of this SignalResponse. # noqa: E501 + :rtype: str + """ + return self._response_type + + @response_type.setter + def response_type(self, response_type): + """Sets the response_type of this SignalResponse. + + + :param response_type: The response_type of this SignalResponse. # noqa: E501 + :type: str + """ + allowed_values = ["TARGET_WORKFLOW", "BLOCKING_WORKFLOW", "BLOCKING_TASK", "BLOCKING_TASK_INPUT"] # noqa: E501 + if response_type is not None and response_type not in allowed_values: + raise ValueError( + "Invalid value for `response_type` ({0}), must be one of {1}" # noqa: E501 + .format(response_type, allowed_values) + ) + + self._response_type = response_type + + @property + def target_workflow_id(self): + """Gets the target_workflow_id of this SignalResponse. # noqa: E501 + + + :return: The target_workflow_id of this SignalResponse. # noqa: E501 + :rtype: str + """ + return self._target_workflow_id + + @target_workflow_id.setter + def target_workflow_id(self, target_workflow_id): + """Sets the target_workflow_id of this SignalResponse. + + + :param target_workflow_id: The target_workflow_id of this SignalResponse. # noqa: E501 + :type: str + """ + + self._target_workflow_id = target_workflow_id + + @property + def target_workflow_status(self): + """Gets the target_workflow_status of this SignalResponse. # noqa: E501 + + + :return: The target_workflow_status of this SignalResponse. # noqa: E501 + :rtype: str + """ + return self._target_workflow_status + + @target_workflow_status.setter + def target_workflow_status(self, target_workflow_status): + """Sets the target_workflow_status of this SignalResponse. + + + :param target_workflow_status: The target_workflow_status of this SignalResponse. # noqa: E501 + :type: str + """ + allowed_values = ["RUNNING", "COMPLETED", "FAILED", "TIMED_OUT", "TERMINATED", "PAUSED"] # noqa: E501 + if target_workflow_status is not None and target_workflow_status not in allowed_values: + raise ValueError( + "Invalid value for `target_workflow_status` ({0}), must be one of {1}" # noqa: E501 + .format(target_workflow_status, allowed_values) + ) + + self._target_workflow_status = target_workflow_status + + @property + def request_id(self): + """Gets the request_id of this SignalResponse. # noqa: E501 + + + :return: The request_id of this SignalResponse. # noqa: E501 + :rtype: str + """ + return self._request_id + + @request_id.setter + def request_id(self, request_id): + """Sets the request_id of this SignalResponse. + + + :param request_id: The request_id of this SignalResponse. # noqa: E501 + :type: str + """ + + self._request_id = request_id + + @property + def workflow_id(self): + """Gets the workflow_id of this SignalResponse. # noqa: E501 + + + :return: The workflow_id of this SignalResponse. # noqa: E501 + :rtype: str + """ + return self._workflow_id + + @workflow_id.setter + def workflow_id(self, workflow_id): + """Sets the workflow_id of this SignalResponse. + + + :param workflow_id: The workflow_id of this SignalResponse. # noqa: E501 + :type: str + """ + + self._workflow_id = workflow_id + + @property + def correlation_id(self): + """Gets the correlation_id of this SignalResponse. # noqa: E501 + + + :return: The correlation_id of this SignalResponse. # noqa: E501 + :rtype: str + """ + return self._correlation_id + + @correlation_id.setter + def correlation_id(self, correlation_id): + """Sets the correlation_id of this SignalResponse. + + + :param correlation_id: The correlation_id of this SignalResponse. # noqa: E501 + :type: str + """ + + self._correlation_id = correlation_id + + @property + def input(self): + """Gets the input of this SignalResponse. # noqa: E501 + + + :return: The input of this SignalResponse. # noqa: E501 + :rtype: dict(str, object) + """ + return self._input + + @input.setter + def input(self, input): + """Sets the input of this SignalResponse. + + + :param input: The input of this SignalResponse. # noqa: E501 + :type: dict(str, object) + """ + + self._input = input + + @property + def output(self): + """Gets the output of this SignalResponse. # noqa: E501 + + + :return: The output of this SignalResponse. # noqa: E501 + :rtype: dict(str, object) + """ + return self._output + + @output.setter + def output(self, output): + """Sets the output of this SignalResponse. + + + :param output: The output of this SignalResponse. # noqa: E501 + :type: dict(str, object) + """ + + self._output = output + + def to_dict(self): + """Returns the model properties as a dict""" + result = {} + + for attr, _ in six.iteritems(self.swagger_types): + value = getattr(self, attr) + if isinstance(value, list): + result[attr] = list(map( + lambda x: x.to_dict() if hasattr(x, "to_dict") else x, + value + )) + elif hasattr(value, "to_dict"): + result[attr] = value.to_dict() + elif isinstance(value, dict): + result[attr] = dict(map( + lambda item: (item[0], item[1].to_dict()) + if hasattr(item[1], "to_dict") else item, + value.items() + )) + else: + result[attr] = value + if issubclass(SignalResponse, dict): + for key, value in self.items(): + result[key] = value + + return result + + def to_str(self): + """Returns the string representation of the model""" + return pprint.pformat(self.to_dict()) + + def __repr__(self): + """For `print` and `pprint`""" + return self.to_str() + + def __eq__(self, other): + """Returns true if both objects are equal""" + if not isinstance(other, SignalResponse): + return False + + return self.__dict__ == other.__dict__ + + def __ne__(self, other): + """Returns true if both objects are not equal""" + return not self == other \ No newline at end of file diff --git a/src/conductor/client/http/models/task_run.py b/src/conductor/client/http/models/task_run.py new file mode 100644 index 000000000..2ff04f106 --- /dev/null +++ b/src/conductor/client/http/models/task_run.py @@ -0,0 +1,117 @@ +from dataclasses import dataclass, asdict +from typing import Dict, Any, Optional, List +from enum import Enum +from .signal_response import SignalResponse + + +class TaskStatus(Enum): + """Enum for task status""" + IN_PROGRESS = "IN_PROGRESS" + CANCELED = "CANCELED" + FAILED = "FAILED" + FAILED_WITH_TERMINAL_ERROR = "FAILED_WITH_TERMINAL_ERROR" + COMPLETED = "COMPLETED" + COMPLETED_WITH_ERRORS = "COMPLETED_WITH_ERRORS" + SCHEDULED = "SCHEDULED" + TIMED_OUT = "TIMED_OUT" + READY_FOR_RERUN = "READY_FOR_RERUN" + SKIPPED = "SKIPPED" + + +@dataclass +class TaskRun(SignalResponse): + """Task run model extending SignalResponse""" + + task_type: Optional[str] = None + task_id: Optional[str] = None + reference_task_name: Optional[str] = None + retry_count: int = 0 + task_def_name: Optional[str] = None + retried_task_id: Optional[str] = None + workflow_type: Optional[str] = None + reason_for_incompletion: Optional[str] = None + priority: int = 0 + variables: Optional[Dict[str, Any]] = None + tasks: Optional[List[Any]] = None # List of Task objects + created_by: Optional[str] = None + create_time: int = 0 + update_time: int = 0 + status: Optional[TaskStatus] = None + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary with camelCase keys for JSON serialization""" + data = asdict(self) + camel_case_data = super().to_dict() + + field_mapping = { + 'task_type': 'taskType', + 'task_id': 'taskId', + 'reference_task_name': 'referenceTaskName', + 'retry_count': 'retryCount', + 'task_def_name': 'taskDefName', + 'retried_task_id': 'retriedTaskId', + 'workflow_type': 'workflowType', + 'reason_for_incompletion': 'reasonForIncompletion', + 'priority': 'priority', + 'variables': 'variables', + 'tasks': 'tasks', + 'created_by': 'createdBy', + 'create_time': 'createTime', + 'update_time': 'updateTime', + 'status': 'status' + } + + for snake_key, camel_key in field_mapping.items(): + if snake_key in data and data[snake_key] is not None: + if isinstance(data[snake_key], TaskStatus): + camel_case_data[camel_key] = data[snake_key].value + else: + camel_case_data[camel_key] = data[snake_key] + + return camel_case_data + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> 'TaskRun': + """Create instance from dictionary with camelCase keys""" + snake_case_data = {} + + # Handle parent class fields + parent_mapping = { + 'responseType': 'response_type', + 'targetWorkflowId': 'target_workflow_id', + 'targetWorkflowStatus': 'target_workflow_status', + 'requestId': 'request_id', + 'workflowId': 'workflow_id', + 'correlationId': 'correlation_id', + 'input': 'input', + 'output': 'output' + } + + field_mapping = { + 'taskType': 'task_type', + 'taskId': 'task_id', + 'referenceTaskName': 'reference_task_name', + 'retryCount': 'retry_count', + 'taskDefName': 'task_def_name', + 'retriedTaskId': 'retried_task_id', + 'workflowType': 'workflow_type', + 'reasonForIncompletion': 'reason_for_incompletion', + 'priority': 'priority', + 'variables': 'variables', + 'tasks': 'tasks', + 'createdBy': 'created_by', + 'createTime': 'create_time', + 'updateTime': 'update_time', + 'status': 'status' + } + + all_mappings = {**parent_mapping, **field_mapping} + + for camel_key, snake_key in all_mappings.items(): + if camel_key in data: + if snake_key == 'status' and data[camel_key]: + snake_case_data[snake_key] = TaskStatus(data[camel_key]) + else: + snake_case_data[snake_key] = data[camel_key] + + return cls(**snake_case_data) \ No newline at end of file diff --git a/src/conductor/client/http/models/workflow_run.py b/src/conductor/client/http/models/workflow_run.py index d5853c27d..21be96597 100644 --- a/src/conductor/client/http/models/workflow_run.py +++ b/src/conductor/client/http/models/workflow_run.py @@ -4,6 +4,7 @@ from dataclasses import dataclass, field, InitVar from typing import Dict, List, Optional, Any from deprecated import deprecated +from enum import Enum from conductor.client.http.models import Task @@ -12,6 +13,14 @@ running_status = ('RUNNING', 'PAUSED') +class WorkflowSignalReturnStrategy(Enum): + """Enum for workflow signal return strategy""" + TARGET_WORKFLOW = "TARGET_WORKFLOW" + BLOCKING_WORKFLOW = "BLOCKING_WORKFLOW" + BLOCKING_TASK = "BLOCKING_TASK" + BLOCKING_TASK_INPUT = "BLOCKING_TASK_INPUT" + + @dataclass class WorkflowRun: """NOTE: This class is auto generated by the swagger code generator program. @@ -38,7 +47,12 @@ class WorkflowRun: _variables: Optional[Dict[str, Any]] = field(default=None, init=False) _workflow_id: Optional[str] = field(default=None, init=False) _reason_for_incompletion: Optional[str] = field(default=None, init=False) - + + # New SignalResponse fields + _response_type: Optional[str] = field(default=None, init=False) + _target_workflow_id: Optional[str] = field(default=None, init=False) + _target_workflow_status: Optional[str] = field(default=None, init=False) + correlation_id: InitVar[Optional[str]] = None create_time: InitVar[Optional[int]] = None created_by: InitVar[Optional[str]] = None @@ -52,7 +66,10 @@ class WorkflowRun: variables: InitVar[Optional[Dict[str, Any]]] = None workflow_id: InitVar[Optional[str]] = None reason_for_incompletion: InitVar[Optional[str]] = None - + response_type: InitVar[Optional[str]] = None + target_workflow_id: InitVar[Optional[str]] = None + target_workflow_status: InitVar[Optional[str]] = None + swagger_types = { 'correlation_id': 'str', 'create_time': 'int', @@ -65,7 +82,10 @@ class WorkflowRun: 'tasks': 'list[Task]', 'update_time': 'int', 'variables': 'dict(str, object)', - 'workflow_id': 'str' + 'workflow_id': 'str', + 'response_type': 'str', + 'target_workflow_id': 'str', + 'target_workflow_status': 'str' } attribute_map = { @@ -80,12 +100,16 @@ class WorkflowRun: 'tasks': 'tasks', 'update_time': 'updateTime', 'variables': 'variables', - 'workflow_id': 'workflowId' + 'workflow_id': 'workflowId', + 'response_type': 'responseType', + 'target_workflow_id': 'targetWorkflowId', + 'target_workflow_status': 'targetWorkflowStatus' } def __init__(self, correlation_id=None, create_time=None, created_by=None, input=None, output=None, priority=None, request_id=None, status=None, tasks=None, update_time=None, variables=None, workflow_id=None, - reason_for_incompletion: str = None): # noqa: E501 + reason_for_incompletion: str = None, response_type=None, target_workflow_id=None, + target_workflow_status=None): # noqa: E501 """WorkflowRun - a model defined in Swagger""" # noqa: E501 self._correlation_id = None self._create_time = None @@ -99,6 +123,9 @@ def __init__(self, correlation_id=None, create_time=None, created_by=None, input self._update_time = None self._variables = None self._workflow_id = None + self._response_type = None + self._target_workflow_id = None + self._target_workflow_status = None self.discriminator = None if correlation_id is not None: self.correlation_id = correlation_id @@ -124,10 +151,17 @@ def __init__(self, correlation_id=None, create_time=None, created_by=None, input self.variables = variables if workflow_id is not None: self.workflow_id = workflow_id + if response_type is not None: + self.response_type = response_type + if target_workflow_id is not None: + self.target_workflow_id = target_workflow_id + if target_workflow_status is not None: + self.target_workflow_status = target_workflow_status self._reason_for_incompletion = reason_for_incompletion - def __post_init__(self, correlation_id, create_time, created_by, input, output, priority, request_id, status, - tasks, update_time, variables, workflow_id, reason_for_incompletion): + def __post_init__(self, correlation_id, create_time, created_by, input, output, priority, request_id, status, + tasks, update_time, variables, workflow_id, reason_for_incompletion, response_type, + target_workflow_id, target_workflow_status): if correlation_id is not None: self.correlation_id = correlation_id if create_time is not None: @@ -152,6 +186,12 @@ def __post_init__(self, correlation_id, create_time, created_by, input, output, self.variables = variables if workflow_id is not None: self.workflow_id = workflow_id + if response_type is not None: + self.response_type = response_type + if target_workflow_id is not None: + self.target_workflow_id = target_workflow_id + if target_workflow_status is not None: + self.target_workflow_status = target_workflow_status if reason_for_incompletion is not None: self._reason_for_incompletion = reason_for_incompletion @@ -302,6 +342,81 @@ def request_id(self, request_id): self._request_id = request_id + @property + def response_type(self): + """Gets the response_type of this WorkflowRun. # noqa: E501 + + + :return: The response_type of this WorkflowRun. # noqa: E501 + :rtype: str + """ + return self._response_type + + @response_type.setter + def response_type(self, response_type): + """Sets the response_type of this WorkflowRun. + + + :param response_type: The response_type of this WorkflowRun. # noqa: E501 + :type: str + """ + allowed_values = ["TARGET_WORKFLOW", "BLOCKING_WORKFLOW", "BLOCKING_TASK", "BLOCKING_TASK_INPUT"] # noqa: E501 + if response_type is not None and response_type not in allowed_values: + raise ValueError( + "Invalid value for `response_type` ({0}), must be one of {1}" # noqa: E501 + .format(response_type, allowed_values) + ) + + self._response_type = response_type + + @property + def target_workflow_id(self): + """Gets the target_workflow_id of this WorkflowRun. # noqa: E501 + + + :return: The target_workflow_id of this WorkflowRun. # noqa: E501 + :rtype: str + """ + return self._target_workflow_id + + @target_workflow_id.setter + def target_workflow_id(self, target_workflow_id): + """Sets the target_workflow_id of this WorkflowRun. + + + :param target_workflow_id: The target_workflow_id of this WorkflowRun. # noqa: E501 + :type: str + """ + + self._target_workflow_id = target_workflow_id + + @property + def target_workflow_status(self): + """Gets the target_workflow_status of this WorkflowRun. # noqa: E501 + + + :return: The target_workflow_status of this WorkflowRun. # noqa: E501 + :rtype: str + """ + return self._target_workflow_status + + @target_workflow_status.setter + def target_workflow_status(self, target_workflow_status): + """Sets the target_workflow_status of this WorkflowRun. + + + :param target_workflow_status: The target_workflow_status of this WorkflowRun. # noqa: E501 + :type: str + """ + allowed_values = ["RUNNING", "COMPLETED", "FAILED", "TIMED_OUT", "TERMINATED", "PAUSED"] # noqa: E501 + if target_workflow_status is not None and target_workflow_status not in allowed_values: + raise ValueError( + "Invalid value for `target_workflow_status` ({0}), must be one of {1}" # noqa: E501 + .format(target_workflow_status, allowed_values) + ) + + self._target_workflow_status = target_workflow_status + @property def status(self): """Gets the status of this WorkflowRun. # noqa: E501 From b4adf1cb34f9a210a6a285b406935214ac1c46eb Mon Sep 17 00:00:00 2001 From: harshilraval Date: Mon, 9 Jun 2025 22:37:46 +0530 Subject: [PATCH 2/7] Add support for signal api changes --- src/conductor/client/http/models/__init__.py | 3 +- .../client/http/models/signal_response.py | 484 ++++++++++++------ src/conductor/client/http/models/task_run.py | 117 ----- .../client/http/models/workflow_run.py | 124 +---- 4 files changed, 328 insertions(+), 400 deletions(-) delete mode 100644 src/conductor/client/http/models/task_run.py diff --git a/src/conductor/client/http/models/__init__.py b/src/conductor/client/http/models/__init__.py index cdb03eac0..a01bfd456 100644 --- a/src/conductor/client/http/models/__init__.py +++ b/src/conductor/client/http/models/__init__.py @@ -57,5 +57,4 @@ from conductor.client.http.models.workflow_task import CacheConfig from conductor.client.http.models.schema_def import SchemaDef from conductor.client.http.models.schema_def import SchemaType -from conductor.client.http.models.signal_response import SignalResponse -from conductor.client.http.models.task_run import TaskRun \ No newline at end of file +from conductor.client.http.models.signal_response import SignalResponse, TaskStatus \ No newline at end of file diff --git a/src/conductor/client/http/models/signal_response.py b/src/conductor/client/http/models/signal_response.py index 36bd4c3f7..b866d6891 100644 --- a/src/conductor/client/http/models/signal_response.py +++ b/src/conductor/client/http/models/signal_response.py @@ -1,8 +1,8 @@ import pprint import re # noqa: F401 import six -from dataclasses import dataclass, field, InitVar -from typing import Dict, Any, Optional +from dataclasses import dataclass, field, InitVar, asdict +from typing import Dict, Any, Optional, List from enum import Enum @@ -14,10 +14,25 @@ class WorkflowSignalReturnStrategy(Enum): BLOCKING_TASK_INPUT = "BLOCKING_TASK_INPUT" +class TaskStatus(Enum): + """Enum for task status""" + IN_PROGRESS = "IN_PROGRESS" + CANCELED = "CANCELED" + FAILED = "FAILED" + FAILED_WITH_TERMINAL_ERROR = "FAILED_WITH_TERMINAL_ERROR" + COMPLETED = "COMPLETED" + COMPLETED_WITH_ERRORS = "COMPLETED_WITH_ERRORS" + SCHEDULED = "SCHEDULED" + TIMED_OUT = "TIMED_OUT" + READY_FOR_RERUN = "READY_FOR_RERUN" + SKIPPED = "SKIPPED" + + @dataclass class SignalResponse: - """NOTE: This class is auto generated by the swagger code generator program. + """Merged SignalResponse class containing both signal and task run fields. + NOTE: This class is auto generated by the swagger code generator program. Do not edit the class manually. """ """ @@ -27,6 +42,7 @@ class SignalResponse: attribute_map (dict): The key is attribute name and the value is json key in definition. """ + # Signal Response fields _response_type: Optional[str] = field(default=None, init=False) _target_workflow_id: Optional[str] = field(default=None, init=False) _target_workflow_status: Optional[str] = field(default=None, init=False) @@ -36,6 +52,24 @@ class SignalResponse: _input: Optional[Dict[str, Any]] = field(default=None, init=False) _output: Optional[Dict[str, Any]] = field(default=None, init=False) + # Task Run fields + _task_type: Optional[str] = field(default=None, init=False) + _task_id: Optional[str] = field(default=None, init=False) + _reference_task_name: Optional[str] = field(default=None, init=False) + _retry_count: int = field(default=0, init=False) + _task_def_name: Optional[str] = field(default=None, init=False) + _retried_task_id: Optional[str] = field(default=None, init=False) + _workflow_type: Optional[str] = field(default=None, init=False) + _reason_for_incompletion: Optional[str] = field(default=None, init=False) + _priority: int = field(default=0, init=False) + _variables: Optional[Dict[str, Any]] = field(default=None, init=False) + _tasks: Optional[List[Any]] = field(default=None, init=False) + _created_by: Optional[str] = field(default=None, init=False) + _create_time: int = field(default=0, init=False) + _update_time: int = field(default=0, init=False) + _status: Optional[TaskStatus] = field(default=None, init=False) + + # InitVar fields for initialization response_type: InitVar[Optional[str]] = None target_workflow_id: InitVar[Optional[str]] = None target_workflow_status: InitVar[Optional[str]] = None @@ -44,6 +78,21 @@ class SignalResponse: correlation_id: InitVar[Optional[str]] = None input: InitVar[Optional[Dict[str, Any]]] = None output: InitVar[Optional[Dict[str, Any]]] = None + task_type: InitVar[Optional[str]] = None + task_id: InitVar[Optional[str]] = None + reference_task_name: InitVar[Optional[str]] = None + retry_count: InitVar[int] = 0 + task_def_name: InitVar[Optional[str]] = None + retried_task_id: InitVar[Optional[str]] = None + workflow_type: InitVar[Optional[str]] = None + reason_for_incompletion: InitVar[Optional[str]] = None + priority: InitVar[int] = 0 + variables: InitVar[Optional[Dict[str, Any]]] = None + tasks: InitVar[Optional[List[Any]]] = None + created_by: InitVar[Optional[str]] = None + create_time: InitVar[int] = 0 + update_time: InitVar[int] = 0 + status: InitVar[Optional[TaskStatus]] = None swagger_types = { 'response_type': 'str', @@ -53,7 +102,22 @@ class SignalResponse: 'workflow_id': 'str', 'correlation_id': 'str', 'input': 'dict(str, object)', - 'output': 'dict(str, object)' + 'output': 'dict(str, object)', + 'task_type': 'str', + 'task_id': 'str', + 'reference_task_name': 'str', + 'retry_count': 'int', + 'task_def_name': 'str', + 'retried_task_id': 'str', + 'workflow_type': 'str', + 'reason_for_incompletion': 'str', + 'priority': 'int', + 'variables': 'dict(str, object)', + 'tasks': 'list[object]', + 'created_by': 'str', + 'create_time': 'int', + 'update_time': 'int', + 'status': 'str' } attribute_map = { @@ -64,41 +128,30 @@ class SignalResponse: 'workflow_id': 'workflowId', 'correlation_id': 'correlationId', 'input': 'input', - 'output': 'output' + 'output': 'output', + 'task_type': 'taskType', + 'task_id': 'taskId', + 'reference_task_name': 'referenceTaskName', + 'retry_count': 'retryCount', + 'task_def_name': 'taskDefName', + 'retried_task_id': 'retriedTaskId', + 'workflow_type': 'workflowType', + 'reason_for_incompletion': 'reasonForIncompletion', + 'priority': 'priority', + 'variables': 'variables', + 'tasks': 'tasks', + 'created_by': 'createdBy', + 'create_time': 'createTime', + 'update_time': 'updateTime', + 'status': 'status' } - def __init__(self, response_type=None, target_workflow_id=None, target_workflow_status=None, - request_id=None, workflow_id=None, correlation_id=None, input=None, output=None): # noqa: E501 - """SignalResponse - a model defined in Swagger""" # noqa: E501 - self._response_type = None - self._target_workflow_id = None - self._target_workflow_status = None - self._request_id = None - self._workflow_id = None - self._correlation_id = None - self._input = None - self._output = None - self.discriminator = None - - if response_type is not None: - self.response_type = response_type - if target_workflow_id is not None: - self.target_workflow_id = target_workflow_id - if target_workflow_status is not None: - self.target_workflow_status = target_workflow_status - if request_id is not None: - self.request_id = request_id - if workflow_id is not None: - self.workflow_id = workflow_id - if correlation_id is not None: - self.correlation_id = correlation_id - if input is not None: - self.input = input - if output is not None: - self.output = output - def __post_init__(self, response_type, target_workflow_id, target_workflow_status, request_id, - workflow_id, correlation_id, input, output): + workflow_id, correlation_id, input, output, task_type, task_id, + reference_task_name, retry_count, task_def_name, retried_task_id, + workflow_type, reason_for_incompletion, priority, variables, tasks, + created_by, create_time, update_time, status): + # Signal Response fields if response_type is not None: self.response_type = response_type if target_workflow_id is not None: @@ -116,212 +169,321 @@ def __post_init__(self, response_type, target_workflow_id, target_workflow_statu if output is not None: self.output = output - @property - def response_type(self): - """Gets the response_type of this SignalResponse. # noqa: E501 + # Task Run fields + if task_type is not None: + self.task_type = task_type + if task_id is not None: + self.task_id = task_id + if reference_task_name is not None: + self.reference_task_name = reference_task_name + self.retry_count = retry_count + if task_def_name is not None: + self.task_def_name = task_def_name + if retried_task_id is not None: + self.retried_task_id = retried_task_id + if workflow_type is not None: + self.workflow_type = workflow_type + if reason_for_incompletion is not None: + self.reason_for_incompletion = reason_for_incompletion + self.priority = priority + if variables is not None: + self.variables = variables + if tasks is not None: + self.tasks = tasks + if created_by is not None: + self.created_by = created_by + self.create_time = create_time + self.update_time = update_time + if status is not None: + self.status = status + self.discriminator = None - :return: The response_type of this SignalResponse. # noqa: E501 - :rtype: str - """ + # Signal Response Properties + @property + def response_type(self): return self._response_type @response_type.setter def response_type(self, response_type): - """Sets the response_type of this SignalResponse. - - - :param response_type: The response_type of this SignalResponse. # noqa: E501 - :type: str - """ - allowed_values = ["TARGET_WORKFLOW", "BLOCKING_WORKFLOW", "BLOCKING_TASK", "BLOCKING_TASK_INPUT"] # noqa: E501 + allowed_values = ["TARGET_WORKFLOW", "BLOCKING_WORKFLOW", "BLOCKING_TASK", "BLOCKING_TASK_INPUT"] if response_type is not None and response_type not in allowed_values: raise ValueError( - "Invalid value for `response_type` ({0}), must be one of {1}" # noqa: E501 + "Invalid value for `response_type` ({0}), must be one of {1}" .format(response_type, allowed_values) ) - self._response_type = response_type @property def target_workflow_id(self): - """Gets the target_workflow_id of this SignalResponse. # noqa: E501 - - - :return: The target_workflow_id of this SignalResponse. # noqa: E501 - :rtype: str - """ return self._target_workflow_id @target_workflow_id.setter def target_workflow_id(self, target_workflow_id): - """Sets the target_workflow_id of this SignalResponse. - - - :param target_workflow_id: The target_workflow_id of this SignalResponse. # noqa: E501 - :type: str - """ - self._target_workflow_id = target_workflow_id @property def target_workflow_status(self): - """Gets the target_workflow_status of this SignalResponse. # noqa: E501 - - - :return: The target_workflow_status of this SignalResponse. # noqa: E501 - :rtype: str - """ return self._target_workflow_status @target_workflow_status.setter def target_workflow_status(self, target_workflow_status): - """Sets the target_workflow_status of this SignalResponse. - - - :param target_workflow_status: The target_workflow_status of this SignalResponse. # noqa: E501 - :type: str - """ - allowed_values = ["RUNNING", "COMPLETED", "FAILED", "TIMED_OUT", "TERMINATED", "PAUSED"] # noqa: E501 + allowed_values = ["RUNNING", "COMPLETED", "FAILED", "TIMED_OUT", "TERMINATED", "PAUSED"] if target_workflow_status is not None and target_workflow_status not in allowed_values: raise ValueError( - "Invalid value for `target_workflow_status` ({0}), must be one of {1}" # noqa: E501 + "Invalid value for `target_workflow_status` ({0}), must be one of {1}" .format(target_workflow_status, allowed_values) ) - self._target_workflow_status = target_workflow_status @property def request_id(self): - """Gets the request_id of this SignalResponse. # noqa: E501 - - - :return: The request_id of this SignalResponse. # noqa: E501 - :rtype: str - """ return self._request_id @request_id.setter def request_id(self, request_id): - """Sets the request_id of this SignalResponse. - - - :param request_id: The request_id of this SignalResponse. # noqa: E501 - :type: str - """ - self._request_id = request_id @property def workflow_id(self): - """Gets the workflow_id of this SignalResponse. # noqa: E501 - - - :return: The workflow_id of this SignalResponse. # noqa: E501 - :rtype: str - """ return self._workflow_id @workflow_id.setter def workflow_id(self, workflow_id): - """Sets the workflow_id of this SignalResponse. - - - :param workflow_id: The workflow_id of this SignalResponse. # noqa: E501 - :type: str - """ - self._workflow_id = workflow_id @property def correlation_id(self): - """Gets the correlation_id of this SignalResponse. # noqa: E501 - - - :return: The correlation_id of this SignalResponse. # noqa: E501 - :rtype: str - """ return self._correlation_id @correlation_id.setter def correlation_id(self, correlation_id): - """Sets the correlation_id of this SignalResponse. - - - :param correlation_id: The correlation_id of this SignalResponse. # noqa: E501 - :type: str - """ - self._correlation_id = correlation_id @property def input(self): - """Gets the input of this SignalResponse. # noqa: E501 - - - :return: The input of this SignalResponse. # noqa: E501 - :rtype: dict(str, object) - """ return self._input @input.setter def input(self, input): - """Sets the input of this SignalResponse. - - - :param input: The input of this SignalResponse. # noqa: E501 - :type: dict(str, object) - """ - self._input = input @property def output(self): - """Gets the output of this SignalResponse. # noqa: E501 - - - :return: The output of this SignalResponse. # noqa: E501 - :rtype: dict(str, object) - """ return self._output @output.setter def output(self, output): - """Sets the output of this SignalResponse. + self._output = output + # Task Run Properties + @property + def task_type(self): + return self._task_type - :param output: The output of this SignalResponse. # noqa: E501 - :type: dict(str, object) - """ + @task_type.setter + def task_type(self, task_type): + self._task_type = task_type - self._output = output + @property + def task_id(self): + return self._task_id + + @task_id.setter + def task_id(self, task_id): + self._task_id = task_id + + @property + def reference_task_name(self): + return self._reference_task_name + + @reference_task_name.setter + def reference_task_name(self, reference_task_name): + self._reference_task_name = reference_task_name + + @property + def retry_count(self): + return self._retry_count + + @retry_count.setter + def retry_count(self, retry_count): + self._retry_count = retry_count + + @property + def task_def_name(self): + return self._task_def_name + + @task_def_name.setter + def task_def_name(self, task_def_name): + self._task_def_name = task_def_name + + @property + def retried_task_id(self): + return self._retried_task_id + + @retried_task_id.setter + def retried_task_id(self, retried_task_id): + self._retried_task_id = retried_task_id + + @property + def workflow_type(self): + return self._workflow_type + + @workflow_type.setter + def workflow_type(self, workflow_type): + self._workflow_type = workflow_type + + @property + def reason_for_incompletion(self): + return self._reason_for_incompletion + + @reason_for_incompletion.setter + def reason_for_incompletion(self, reason_for_incompletion): + self._reason_for_incompletion = reason_for_incompletion + + @property + def priority(self): + return self._priority + + @priority.setter + def priority(self, priority): + self._priority = priority + + @property + def variables(self): + return self._variables + + @variables.setter + def variables(self, variables): + self._variables = variables + + @property + def tasks(self): + return self._tasks + + @tasks.setter + def tasks(self, tasks): + self._tasks = tasks + + @property + def created_by(self): + return self._created_by + + @created_by.setter + def created_by(self, created_by): + self._created_by = created_by + + @property + def create_time(self): + return self._create_time + + @create_time.setter + def create_time(self, create_time): + self._create_time = create_time + + @property + def update_time(self): + return self._update_time + + @update_time.setter + def update_time(self, update_time): + self._update_time = update_time + + @property + def status(self): + return self._status + + @status.setter + def status(self, status): + self._status = status def to_dict(self): """Returns the model properties as a dict""" result = {} for attr, _ in six.iteritems(self.swagger_types): - value = getattr(self, attr) - if isinstance(value, list): - result[attr] = list(map( - lambda x: x.to_dict() if hasattr(x, "to_dict") else x, - value - )) - elif hasattr(value, "to_dict"): - result[attr] = value.to_dict() - elif isinstance(value, dict): - result[attr] = dict(map( - lambda item: (item[0], item[1].to_dict()) - if hasattr(item[1], "to_dict") else item, - value.items() - )) - else: - result[attr] = value + try: + # Get the actual property value by accessing the private attribute + # or calling the property getter + if hasattr(self, f'_{attr}'): + # Access the private attribute directly to get the actual value + value = getattr(self, f'_{attr}') + else: + # Fallback to the property getter + value = getattr(self, attr) + + # Skip None values and property objects + if value is None or isinstance(value, property): + continue + + if isinstance(value, list): + result[attr] = list(map( + lambda x: x.to_dict() if hasattr(x, "to_dict") else x, + value + )) + elif hasattr(value, "to_dict"): + result[attr] = value.to_dict() + elif isinstance(value, dict): + result[attr] = dict(map( + lambda item: (item[0], item[1].to_dict()) + if hasattr(item[1], "to_dict") else item, + value.items() + )) + elif isinstance(value, TaskStatus): + result[attr] = value.value + else: + result[attr] = value + + except (AttributeError, TypeError): + # Skip attributes that can't be accessed + continue + + # Convert to camelCase using attribute_map + camel_case_result = {} + for snake_key, camel_key in self.attribute_map.items(): + if snake_key in result and result[snake_key] is not None: + camel_case_result[camel_key] = result[snake_key] + if issubclass(SignalResponse, dict): for key, value in self.items(): - result[key] = value - - return result + camel_case_result[key] = value + + return camel_case_result + + # Alternative approach - override __str__ for better printing + def __str__(self): + """Returns a more readable string representation""" + return f"""SignalResponse( + response_type='{self.response_type}', + target_workflow_id='{self.target_workflow_id}', + target_workflow_status='{self.target_workflow_status}', + workflow_id='{self.workflow_id}', + task_id='{self.task_id}', + task_type='{self.task_type}', + status='{self.status}', + reference_task_name='{self.reference_task_name}', + retry_count={self.retry_count}, + input={self.input}, + output={self.output} + )""" + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> 'SignalResponse': + """Create instance from dictionary with camelCase keys""" + snake_case_data = {} + + # Reverse mapping from camelCase to snake_case + reverse_mapping = {v: k for k, v in cls.attribute_map.items()} + + for camel_key, value in data.items(): + if camel_key in reverse_mapping: + snake_key = reverse_mapping[camel_key] + if snake_key == 'status' and value: + snake_case_data[snake_key] = TaskStatus(value) + else: + snake_case_data[snake_key] = value + + return cls(**snake_case_data) def to_str(self): """Returns the string representation of the model""" diff --git a/src/conductor/client/http/models/task_run.py b/src/conductor/client/http/models/task_run.py deleted file mode 100644 index 2ff04f106..000000000 --- a/src/conductor/client/http/models/task_run.py +++ /dev/null @@ -1,117 +0,0 @@ -from dataclasses import dataclass, asdict -from typing import Dict, Any, Optional, List -from enum import Enum -from .signal_response import SignalResponse - - -class TaskStatus(Enum): - """Enum for task status""" - IN_PROGRESS = "IN_PROGRESS" - CANCELED = "CANCELED" - FAILED = "FAILED" - FAILED_WITH_TERMINAL_ERROR = "FAILED_WITH_TERMINAL_ERROR" - COMPLETED = "COMPLETED" - COMPLETED_WITH_ERRORS = "COMPLETED_WITH_ERRORS" - SCHEDULED = "SCHEDULED" - TIMED_OUT = "TIMED_OUT" - READY_FOR_RERUN = "READY_FOR_RERUN" - SKIPPED = "SKIPPED" - - -@dataclass -class TaskRun(SignalResponse): - """Task run model extending SignalResponse""" - - task_type: Optional[str] = None - task_id: Optional[str] = None - reference_task_name: Optional[str] = None - retry_count: int = 0 - task_def_name: Optional[str] = None - retried_task_id: Optional[str] = None - workflow_type: Optional[str] = None - reason_for_incompletion: Optional[str] = None - priority: int = 0 - variables: Optional[Dict[str, Any]] = None - tasks: Optional[List[Any]] = None # List of Task objects - created_by: Optional[str] = None - create_time: int = 0 - update_time: int = 0 - status: Optional[TaskStatus] = None - - def to_dict(self) -> Dict[str, Any]: - """Convert to dictionary with camelCase keys for JSON serialization""" - data = asdict(self) - camel_case_data = super().to_dict() - - field_mapping = { - 'task_type': 'taskType', - 'task_id': 'taskId', - 'reference_task_name': 'referenceTaskName', - 'retry_count': 'retryCount', - 'task_def_name': 'taskDefName', - 'retried_task_id': 'retriedTaskId', - 'workflow_type': 'workflowType', - 'reason_for_incompletion': 'reasonForIncompletion', - 'priority': 'priority', - 'variables': 'variables', - 'tasks': 'tasks', - 'created_by': 'createdBy', - 'create_time': 'createTime', - 'update_time': 'updateTime', - 'status': 'status' - } - - for snake_key, camel_key in field_mapping.items(): - if snake_key in data and data[snake_key] is not None: - if isinstance(data[snake_key], TaskStatus): - camel_case_data[camel_key] = data[snake_key].value - else: - camel_case_data[camel_key] = data[snake_key] - - return camel_case_data - - @classmethod - def from_dict(cls, data: Dict[str, Any]) -> 'TaskRun': - """Create instance from dictionary with camelCase keys""" - snake_case_data = {} - - # Handle parent class fields - parent_mapping = { - 'responseType': 'response_type', - 'targetWorkflowId': 'target_workflow_id', - 'targetWorkflowStatus': 'target_workflow_status', - 'requestId': 'request_id', - 'workflowId': 'workflow_id', - 'correlationId': 'correlation_id', - 'input': 'input', - 'output': 'output' - } - - field_mapping = { - 'taskType': 'task_type', - 'taskId': 'task_id', - 'referenceTaskName': 'reference_task_name', - 'retryCount': 'retry_count', - 'taskDefName': 'task_def_name', - 'retriedTaskId': 'retried_task_id', - 'workflowType': 'workflow_type', - 'reasonForIncompletion': 'reason_for_incompletion', - 'priority': 'priority', - 'variables': 'variables', - 'tasks': 'tasks', - 'createdBy': 'created_by', - 'createTime': 'create_time', - 'updateTime': 'update_time', - 'status': 'status' - } - - all_mappings = {**parent_mapping, **field_mapping} - - for camel_key, snake_key in all_mappings.items(): - if camel_key in data: - if snake_key == 'status' and data[camel_key]: - snake_case_data[snake_key] = TaskStatus(data[camel_key]) - else: - snake_case_data[snake_key] = data[camel_key] - - return cls(**snake_case_data) \ No newline at end of file diff --git a/src/conductor/client/http/models/workflow_run.py b/src/conductor/client/http/models/workflow_run.py index 21be96597..fb0410f0d 100644 --- a/src/conductor/client/http/models/workflow_run.py +++ b/src/conductor/client/http/models/workflow_run.py @@ -4,7 +4,6 @@ from dataclasses import dataclass, field, InitVar from typing import Dict, List, Optional, Any from deprecated import deprecated -from enum import Enum from conductor.client.http.models import Task @@ -12,15 +11,6 @@ successful_status = ('PAUSED', 'COMPLETED') running_status = ('RUNNING', 'PAUSED') - -class WorkflowSignalReturnStrategy(Enum): - """Enum for workflow signal return strategy""" - TARGET_WORKFLOW = "TARGET_WORKFLOW" - BLOCKING_WORKFLOW = "BLOCKING_WORKFLOW" - BLOCKING_TASK = "BLOCKING_TASK" - BLOCKING_TASK_INPUT = "BLOCKING_TASK_INPUT" - - @dataclass class WorkflowRun: """NOTE: This class is auto generated by the swagger code generator program. @@ -48,11 +38,6 @@ class WorkflowRun: _workflow_id: Optional[str] = field(default=None, init=False) _reason_for_incompletion: Optional[str] = field(default=None, init=False) - # New SignalResponse fields - _response_type: Optional[str] = field(default=None, init=False) - _target_workflow_id: Optional[str] = field(default=None, init=False) - _target_workflow_status: Optional[str] = field(default=None, init=False) - correlation_id: InitVar[Optional[str]] = None create_time: InitVar[Optional[int]] = None created_by: InitVar[Optional[str]] = None @@ -66,9 +51,6 @@ class WorkflowRun: variables: InitVar[Optional[Dict[str, Any]]] = None workflow_id: InitVar[Optional[str]] = None reason_for_incompletion: InitVar[Optional[str]] = None - response_type: InitVar[Optional[str]] = None - target_workflow_id: InitVar[Optional[str]] = None - target_workflow_status: InitVar[Optional[str]] = None swagger_types = { 'correlation_id': 'str', @@ -82,10 +64,7 @@ class WorkflowRun: 'tasks': 'list[Task]', 'update_time': 'int', 'variables': 'dict(str, object)', - 'workflow_id': 'str', - 'response_type': 'str', - 'target_workflow_id': 'str', - 'target_workflow_status': 'str' + 'workflow_id': 'str' } attribute_map = { @@ -100,16 +79,12 @@ class WorkflowRun: 'tasks': 'tasks', 'update_time': 'updateTime', 'variables': 'variables', - 'workflow_id': 'workflowId', - 'response_type': 'responseType', - 'target_workflow_id': 'targetWorkflowId', - 'target_workflow_status': 'targetWorkflowStatus' + 'workflow_id': 'workflowId' } def __init__(self, correlation_id=None, create_time=None, created_by=None, input=None, output=None, priority=None, request_id=None, status=None, tasks=None, update_time=None, variables=None, workflow_id=None, - reason_for_incompletion: str = None, response_type=None, target_workflow_id=None, - target_workflow_status=None): # noqa: E501 + reason_for_incompletion: str = None): # noqa: E501 """WorkflowRun - a model defined in Swagger""" # noqa: E501 self._correlation_id = None self._create_time = None @@ -123,9 +98,6 @@ def __init__(self, correlation_id=None, create_time=None, created_by=None, input self._update_time = None self._variables = None self._workflow_id = None - self._response_type = None - self._target_workflow_id = None - self._target_workflow_status = None self.discriminator = None if correlation_id is not None: self.correlation_id = correlation_id @@ -151,17 +123,10 @@ def __init__(self, correlation_id=None, create_time=None, created_by=None, input self.variables = variables if workflow_id is not None: self.workflow_id = workflow_id - if response_type is not None: - self.response_type = response_type - if target_workflow_id is not None: - self.target_workflow_id = target_workflow_id - if target_workflow_status is not None: - self.target_workflow_status = target_workflow_status self._reason_for_incompletion = reason_for_incompletion def __post_init__(self, correlation_id, create_time, created_by, input, output, priority, request_id, status, - tasks, update_time, variables, workflow_id, reason_for_incompletion, response_type, - target_workflow_id, target_workflow_status): + tasks, update_time, variables, workflow_id, reason_for_incompletion): if correlation_id is not None: self.correlation_id = correlation_id if create_time is not None: @@ -186,12 +151,6 @@ def __post_init__(self, correlation_id, create_time, created_by, input, output, self.variables = variables if workflow_id is not None: self.workflow_id = workflow_id - if response_type is not None: - self.response_type = response_type - if target_workflow_id is not None: - self.target_workflow_id = target_workflow_id - if target_workflow_status is not None: - self.target_workflow_status = target_workflow_status if reason_for_incompletion is not None: self._reason_for_incompletion = reason_for_incompletion @@ -342,81 +301,6 @@ def request_id(self, request_id): self._request_id = request_id - @property - def response_type(self): - """Gets the response_type of this WorkflowRun. # noqa: E501 - - - :return: The response_type of this WorkflowRun. # noqa: E501 - :rtype: str - """ - return self._response_type - - @response_type.setter - def response_type(self, response_type): - """Sets the response_type of this WorkflowRun. - - - :param response_type: The response_type of this WorkflowRun. # noqa: E501 - :type: str - """ - allowed_values = ["TARGET_WORKFLOW", "BLOCKING_WORKFLOW", "BLOCKING_TASK", "BLOCKING_TASK_INPUT"] # noqa: E501 - if response_type is not None and response_type not in allowed_values: - raise ValueError( - "Invalid value for `response_type` ({0}), must be one of {1}" # noqa: E501 - .format(response_type, allowed_values) - ) - - self._response_type = response_type - - @property - def target_workflow_id(self): - """Gets the target_workflow_id of this WorkflowRun. # noqa: E501 - - - :return: The target_workflow_id of this WorkflowRun. # noqa: E501 - :rtype: str - """ - return self._target_workflow_id - - @target_workflow_id.setter - def target_workflow_id(self, target_workflow_id): - """Sets the target_workflow_id of this WorkflowRun. - - - :param target_workflow_id: The target_workflow_id of this WorkflowRun. # noqa: E501 - :type: str - """ - - self._target_workflow_id = target_workflow_id - - @property - def target_workflow_status(self): - """Gets the target_workflow_status of this WorkflowRun. # noqa: E501 - - - :return: The target_workflow_status of this WorkflowRun. # noqa: E501 - :rtype: str - """ - return self._target_workflow_status - - @target_workflow_status.setter - def target_workflow_status(self, target_workflow_status): - """Sets the target_workflow_status of this WorkflowRun. - - - :param target_workflow_status: The target_workflow_status of this WorkflowRun. # noqa: E501 - :type: str - """ - allowed_values = ["RUNNING", "COMPLETED", "FAILED", "TIMED_OUT", "TERMINATED", "PAUSED"] # noqa: E501 - if target_workflow_status is not None and target_workflow_status not in allowed_values: - raise ValueError( - "Invalid value for `target_workflow_status` ({0}), must be one of {1}" # noqa: E501 - .format(target_workflow_status, allowed_values) - ) - - self._target_workflow_status = target_workflow_status - @property def status(self): """Gets the status of this WorkflowRun. # noqa: E501 From e44c77f7419604158ac01fe071a7a2a70674e227 Mon Sep 17 00:00:00 2001 From: harshilraval Date: Mon, 9 Jun 2025 22:40:13 +0530 Subject: [PATCH 3/7] remove unwanted changes --- src/conductor/client/http/models/workflow_run.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/conductor/client/http/models/workflow_run.py b/src/conductor/client/http/models/workflow_run.py index fb0410f0d..072305cfb 100644 --- a/src/conductor/client/http/models/workflow_run.py +++ b/src/conductor/client/http/models/workflow_run.py @@ -11,6 +11,7 @@ successful_status = ('PAUSED', 'COMPLETED') running_status = ('RUNNING', 'PAUSED') + @dataclass class WorkflowRun: """NOTE: This class is auto generated by the swagger code generator program. @@ -126,7 +127,7 @@ def __init__(self, correlation_id=None, create_time=None, created_by=None, input self._reason_for_incompletion = reason_for_incompletion def __post_init__(self, correlation_id, create_time, created_by, input, output, priority, request_id, status, - tasks, update_time, variables, workflow_id, reason_for_incompletion): + tasks, update_time, variables, workflow_id, reason_for_incompletion): if correlation_id is not None: self.correlation_id = correlation_id if create_time is not None: From 2de60209ec10b326ab60f59cced317ecf3fbc777 Mon Sep 17 00:00:00 2001 From: harshilraval Date: Tue, 10 Jun 2025 15:44:32 +0530 Subject: [PATCH 4/7] Working signal_response.py for all WorkflowSignalReturnStrategy --- .../client/http/models/signal_response.py | 818 ++++++++++-------- 1 file changed, 444 insertions(+), 374 deletions(-) diff --git a/src/conductor/client/http/models/signal_response.py b/src/conductor/client/http/models/signal_response.py index b866d6891..8f97cb305 100644 --- a/src/conductor/client/http/models/signal_response.py +++ b/src/conductor/client/http/models/signal_response.py @@ -1,7 +1,6 @@ import pprint import re # noqa: F401 import six -from dataclasses import dataclass, field, InitVar, asdict from typing import Dict, Any, Optional, List from enum import Enum @@ -28,72 +27,7 @@ class TaskStatus(Enum): SKIPPED = "SKIPPED" -@dataclass class SignalResponse: - """Merged SignalResponse class containing both signal and task run fields. - - NOTE: This class is auto generated by the swagger code generator program. - Do not edit the class manually. - """ - """ - Attributes: - swagger_types (dict): The key is attribute name - and the value is attribute type. - attribute_map (dict): The key is attribute name - and the value is json key in definition. - """ - # Signal Response fields - _response_type: Optional[str] = field(default=None, init=False) - _target_workflow_id: Optional[str] = field(default=None, init=False) - _target_workflow_status: Optional[str] = field(default=None, init=False) - _request_id: Optional[str] = field(default=None, init=False) - _workflow_id: Optional[str] = field(default=None, init=False) - _correlation_id: Optional[str] = field(default=None, init=False) - _input: Optional[Dict[str, Any]] = field(default=None, init=False) - _output: Optional[Dict[str, Any]] = field(default=None, init=False) - - # Task Run fields - _task_type: Optional[str] = field(default=None, init=False) - _task_id: Optional[str] = field(default=None, init=False) - _reference_task_name: Optional[str] = field(default=None, init=False) - _retry_count: int = field(default=0, init=False) - _task_def_name: Optional[str] = field(default=None, init=False) - _retried_task_id: Optional[str] = field(default=None, init=False) - _workflow_type: Optional[str] = field(default=None, init=False) - _reason_for_incompletion: Optional[str] = field(default=None, init=False) - _priority: int = field(default=0, init=False) - _variables: Optional[Dict[str, Any]] = field(default=None, init=False) - _tasks: Optional[List[Any]] = field(default=None, init=False) - _created_by: Optional[str] = field(default=None, init=False) - _create_time: int = field(default=0, init=False) - _update_time: int = field(default=0, init=False) - _status: Optional[TaskStatus] = field(default=None, init=False) - - # InitVar fields for initialization - response_type: InitVar[Optional[str]] = None - target_workflow_id: InitVar[Optional[str]] = None - target_workflow_status: InitVar[Optional[str]] = None - request_id: InitVar[Optional[str]] = None - workflow_id: InitVar[Optional[str]] = None - correlation_id: InitVar[Optional[str]] = None - input: InitVar[Optional[Dict[str, Any]]] = None - output: InitVar[Optional[Dict[str, Any]]] = None - task_type: InitVar[Optional[str]] = None - task_id: InitVar[Optional[str]] = None - reference_task_name: InitVar[Optional[str]] = None - retry_count: InitVar[int] = 0 - task_def_name: InitVar[Optional[str]] = None - retried_task_id: InitVar[Optional[str]] = None - workflow_type: InitVar[Optional[str]] = None - reason_for_incompletion: InitVar[Optional[str]] = None - priority: InitVar[int] = 0 - variables: InitVar[Optional[Dict[str, Any]]] = None - tasks: InitVar[Optional[List[Any]]] = None - created_by: InitVar[Optional[str]] = None - create_time: InitVar[int] = 0 - update_time: InitVar[int] = 0 - status: InitVar[Optional[TaskStatus]] = None - swagger_types = { 'response_type': 'str', 'target_workflow_id': 'str', @@ -146,326 +80,438 @@ class SignalResponse: 'status': 'status' } - def __post_init__(self, response_type, target_workflow_id, target_workflow_status, request_id, - workflow_id, correlation_id, input, output, task_type, task_id, - reference_task_name, retry_count, task_def_name, retried_task_id, - workflow_type, reason_for_incompletion, priority, variables, tasks, - created_by, create_time, update_time, status): - # Signal Response fields - if response_type is not None: - self.response_type = response_type - if target_workflow_id is not None: - self.target_workflow_id = target_workflow_id - if target_workflow_status is not None: - self.target_workflow_status = target_workflow_status - if request_id is not None: - self.request_id = request_id - if workflow_id is not None: - self.workflow_id = workflow_id - if correlation_id is not None: - self.correlation_id = correlation_id - if input is not None: - self.input = input - if output is not None: - self.output = output - - # Task Run fields - if task_type is not None: - self.task_type = task_type - if task_id is not None: - self.task_id = task_id - if reference_task_name is not None: - self.reference_task_name = reference_task_name - self.retry_count = retry_count - if task_def_name is not None: - self.task_def_name = task_def_name - if retried_task_id is not None: - self.retried_task_id = retried_task_id - if workflow_type is not None: - self.workflow_type = workflow_type - if reason_for_incompletion is not None: - self.reason_for_incompletion = reason_for_incompletion - self.priority = priority - if variables is not None: - self.variables = variables - if tasks is not None: - self.tasks = tasks - if created_by is not None: - self.created_by = created_by - self.create_time = create_time - self.update_time = update_time - if status is not None: - self.status = status - + def __init__(self, **kwargs): + """Initialize with API response data, handling both camelCase and snake_case""" + + # Initialize all attributes with default values + self.response_type = None + self.target_workflow_id = None + self.target_workflow_status = None + self.request_id = None + self.workflow_id = None + self.correlation_id = None + self.input = {} + self.output = {} + self.task_type = None + self.task_id = None + self.reference_task_name = None + self.retry_count = 0 + self.task_def_name = None + self.retried_task_id = None + self.workflow_type = None + self.reason_for_incompletion = None + self.priority = 0 + self.variables = {} + self.tasks = [] + self.created_by = None + self.create_time = 0 + self.update_time = 0 + self.status = None self.discriminator = None - # Signal Response Properties - @property - def response_type(self): - return self._response_type - - @response_type.setter - def response_type(self, response_type): - allowed_values = ["TARGET_WORKFLOW", "BLOCKING_WORKFLOW", "BLOCKING_TASK", "BLOCKING_TASK_INPUT"] - if response_type is not None and response_type not in allowed_values: - raise ValueError( - "Invalid value for `response_type` ({0}), must be one of {1}" - .format(response_type, allowed_values) - ) - self._response_type = response_type - - @property - def target_workflow_id(self): - return self._target_workflow_id - - @target_workflow_id.setter - def target_workflow_id(self, target_workflow_id): - self._target_workflow_id = target_workflow_id - - @property - def target_workflow_status(self): - return self._target_workflow_status - - @target_workflow_status.setter - def target_workflow_status(self, target_workflow_status): - allowed_values = ["RUNNING", "COMPLETED", "FAILED", "TIMED_OUT", "TERMINATED", "PAUSED"] - if target_workflow_status is not None and target_workflow_status not in allowed_values: - raise ValueError( - "Invalid value for `target_workflow_status` ({0}), must be one of {1}" - .format(target_workflow_status, allowed_values) - ) - self._target_workflow_status = target_workflow_status - - @property - def request_id(self): - return self._request_id - - @request_id.setter - def request_id(self, request_id): - self._request_id = request_id - - @property - def workflow_id(self): - return self._workflow_id - - @workflow_id.setter - def workflow_id(self, workflow_id): - self._workflow_id = workflow_id - - @property - def correlation_id(self): - return self._correlation_id - - @correlation_id.setter - def correlation_id(self, correlation_id): - self._correlation_id = correlation_id - - @property - def input(self): - return self._input - - @input.setter - def input(self, input): - self._input = input - - @property - def output(self): - return self._output - - @output.setter - def output(self, output): - self._output = output - - # Task Run Properties - @property - def task_type(self): - return self._task_type - - @task_type.setter - def task_type(self, task_type): - self._task_type = task_type - - @property - def task_id(self): - return self._task_id - - @task_id.setter - def task_id(self, task_id): - self._task_id = task_id - - @property - def reference_task_name(self): - return self._reference_task_name - - @reference_task_name.setter - def reference_task_name(self, reference_task_name): - self._reference_task_name = reference_task_name - - @property - def retry_count(self): - return self._retry_count - - @retry_count.setter - def retry_count(self, retry_count): - self._retry_count = retry_count - - @property - def task_def_name(self): - return self._task_def_name - - @task_def_name.setter - def task_def_name(self, task_def_name): - self._task_def_name = task_def_name - - @property - def retried_task_id(self): - return self._retried_task_id - - @retried_task_id.setter - def retried_task_id(self, retried_task_id): - self._retried_task_id = retried_task_id - - @property - def workflow_type(self): - return self._workflow_type - - @workflow_type.setter - def workflow_type(self, workflow_type): - self._workflow_type = workflow_type - - @property - def reason_for_incompletion(self): - return self._reason_for_incompletion - - @reason_for_incompletion.setter - def reason_for_incompletion(self, reason_for_incompletion): - self._reason_for_incompletion = reason_for_incompletion - - @property - def priority(self): - return self._priority - - @priority.setter - def priority(self, priority): - self._priority = priority - - @property - def variables(self): - return self._variables - - @variables.setter - def variables(self, variables): - self._variables = variables - - @property - def tasks(self): - return self._tasks - - @tasks.setter - def tasks(self, tasks): - self._tasks = tasks - - @property - def created_by(self): - return self._created_by - - @created_by.setter - def created_by(self, created_by): - self._created_by = created_by - - @property - def create_time(self): - return self._create_time - - @create_time.setter - def create_time(self, create_time): - self._create_time = create_time - - @property - def update_time(self): - return self._update_time - - @update_time.setter - def update_time(self, update_time): - self._update_time = update_time - - @property - def status(self): - return self._status + # Handle both camelCase (from API) and snake_case keys + reverse_mapping = {v: k for k, v in self.attribute_map.items()} + + for key, value in kwargs.items(): + if key in reverse_mapping: + # Convert camelCase to snake_case + snake_key = reverse_mapping[key] + if snake_key == 'status' and isinstance(value, str): + try: + setattr(self, snake_key, TaskStatus(value)) + except ValueError: + setattr(self, snake_key, value) + else: + setattr(self, snake_key, value) + elif hasattr(self, key): + # Direct snake_case assignment + if key == 'status' and isinstance(value, str): + try: + setattr(self, key, TaskStatus(value)) + except ValueError: + setattr(self, key, value) + else: + setattr(self, key, value) + + # Extract task information from the first IN_PROGRESS task if available + if self.response_type == "TARGET_WORKFLOW" and self.tasks: + in_progress_task = None + for task in self.tasks: + if isinstance(task, dict) and task.get('status') == 'IN_PROGRESS': + in_progress_task = task + break + + # If no IN_PROGRESS task, get the last task + if not in_progress_task and self.tasks: + in_progress_task = self.tasks[-1] if isinstance(self.tasks[-1], dict) else None + + if in_progress_task: + # Map task fields if they weren't already set + if self.task_id is None: + self.task_id = in_progress_task.get('taskId') + if self.task_type is None: + self.task_type = in_progress_task.get('taskType') + if self.reference_task_name is None: + self.reference_task_name = in_progress_task.get('referenceTaskName') + if self.task_def_name is None: + self.task_def_name = in_progress_task.get('taskDefName') + if self.retry_count == 0: + self.retry_count = in_progress_task.get('retryCount', 0) - @status.setter - def status(self, status): - self._status = status + def __str__(self): + """Returns a detailed string representation similar to Swagger response""" + + def format_dict(d, indent=12): + if not d: + return "{}" + items = [] + for k, v in d.items(): + if isinstance(v, dict): + formatted_v = format_dict(v, indent + 4) + items.append(f"{' ' * indent}'{k}': {formatted_v}") + elif isinstance(v, list): + formatted_v = format_list(v, indent + 4) + items.append(f"{' ' * indent}'{k}': {formatted_v}") + elif isinstance(v, str): + items.append(f"{' ' * indent}'{k}': '{v}'") + else: + items.append(f"{' ' * indent}'{k}': {v}") + return "{\n" + ",\n".join(items) + f"\n{' ' * (indent - 4)}}}" + + def format_list(lst, indent=12): + if not lst: + return "[]" + items = [] + for item in lst: + if isinstance(item, dict): + formatted_item = format_dict(item, indent + 4) + items.append(f"{' ' * indent}{formatted_item}") + elif isinstance(item, str): + items.append(f"{' ' * indent}'{item}'") + else: + items.append(f"{' ' * indent}{item}") + return "[\n" + ",\n".join(items) + f"\n{' ' * (indent - 4)}]" + + # Format input and output + input_str = format_dict(self.input) if self.input else "{}" + output_str = format_dict(self.output) if self.output else "{}" + variables_str = format_dict(self.variables) if self.variables else "{}" + + # Handle different response types + if self.response_type == "TARGET_WORKFLOW": + # Workflow response - show tasks array + tasks_str = format_list(self.tasks, 12) if self.tasks else "[]" + return f"""SignalResponse( + responseType='{self.response_type}', + targetWorkflowId='{self.target_workflow_id}', + targetWorkflowStatus='{self.target_workflow_status}', + workflowId='{self.workflow_id}', + input={input_str}, + output={output_str}, + priority={self.priority}, + variables={variables_str}, + tasks={tasks_str}, + createdBy='{self.created_by}', + createTime={self.create_time}, + updateTime={self.update_time}, + status='{self.status}' +)""" + + elif self.response_type == "BLOCKING_TASK": + # Task response - show task-specific fields + status_str = self.status.value if hasattr(self.status, 'value') else str(self.status) + return f"""SignalResponse( + responseType='{self.response_type}', + targetWorkflowId='{self.target_workflow_id}', + targetWorkflowStatus='{self.target_workflow_status}', + workflowId='{self.workflow_id}', + input={input_str}, + output={output_str}, + taskType='{self.task_type}', + taskId='{self.task_id}', + referenceTaskName='{self.reference_task_name}', + retryCount={self.retry_count}, + taskDefName='{self.task_def_name}', + workflowType='{self.workflow_type}', + priority={self.priority}, + createTime={self.create_time}, + updateTime={self.update_time}, + status='{status_str}' +)""" + + else: + # Generic response - show all available fields + status_str = self.status.value if hasattr(self.status, 'value') else str(self.status) + result = f"""SignalResponse( + responseType='{self.response_type}', + targetWorkflowId='{self.target_workflow_id}', + targetWorkflowStatus='{self.target_workflow_status}', + workflowId='{self.workflow_id}', + input={input_str}, + output={output_str}, + priority={self.priority}""" + + # Add task fields if they exist + if self.task_type: + result += f",\n taskType='{self.task_type}'" + if self.task_id: + result += f",\n taskId='{self.task_id}'" + if self.reference_task_name: + result += f",\n referenceTaskName='{self.reference_task_name}'" + if self.retry_count > 0: + result += f",\n retryCount={self.retry_count}" + if self.task_def_name: + result += f",\n taskDefName='{self.task_def_name}'" + if self.workflow_type: + result += f",\n workflowType='{self.workflow_type}'" + + # Add workflow fields if they exist + if self.variables: + result += f",\n variables={variables_str}" + if self.tasks: + tasks_str = format_list(self.tasks, 12) + result += f",\n tasks={tasks_str}" + if self.created_by: + result += f",\n createdBy='{self.created_by}'" + + result += f",\n createTime={self.create_time}" + result += f",\n updateTime={self.update_time}" + result += f",\n status='{status_str}'" + result += "\n)" + + return result + + def get_task_by_reference_name(self, ref_name: str) -> Optional[Dict]: + """Get a specific task by its reference name""" + if not self.tasks: + return None + + for task in self.tasks: + if isinstance(task, dict) and task.get('referenceTaskName') == ref_name: + return task + return None + + def get_tasks_by_status(self, status: str) -> List[Dict]: + """Get all tasks with a specific status""" + if not self.tasks: + return [] + + return [task for task in self.tasks + if isinstance(task, dict) and task.get('status') == status] + + def get_in_progress_task(self) -> Optional[Dict]: + """Get the current IN_PROGRESS task""" + in_progress_tasks = self.get_tasks_by_status('IN_PROGRESS') + return in_progress_tasks[0] if in_progress_tasks else None + + def get_all_tasks(self) -> List[Dict]: + """Get all tasks in the workflow""" + return self.tasks if self.tasks else [] + + def get_completed_tasks(self) -> List[Dict]: + """Get all completed tasks""" + return self.get_tasks_by_status('COMPLETED') + + def get_failed_tasks(self) -> List[Dict]: + """Get all failed tasks""" + return self.get_tasks_by_status('FAILED') + + def get_task_chain(self) -> List[str]: + """Get the sequence of task reference names in execution order""" + if not self.tasks: + return [] + + # Sort by seq number if available, otherwise by the order in the list + sorted_tasks = sorted(self.tasks, key=lambda t: t.get('seq', 0) if isinstance(t, dict) else 0) + return [task.get('referenceTaskName', f'task_{i}') + for i, task in enumerate(sorted_tasks) if isinstance(task, dict)] + + # ===== HELPER METHODS (Following Go SDK Pattern) ===== + + def is_target_workflow(self) -> bool: + """Returns True if the response contains target workflow details""" + return self.response_type == "TARGET_WORKFLOW" + + def is_blocking_workflow(self) -> bool: + """Returns True if the response contains blocking workflow details""" + return self.response_type == "BLOCKING_WORKFLOW" + + def is_blocking_task(self) -> bool: + """Returns True if the response contains blocking task details""" + return self.response_type == "BLOCKING_TASK" + + def is_blocking_task_input(self) -> bool: + """Returns True if the response contains blocking task input""" + return self.response_type == "BLOCKING_TASK_INPUT" + + def get_workflow(self) -> Optional[Dict]: + """ + Extract workflow details from a SignalResponse. + Returns None if the response type doesn't contain workflow details. + """ + if not (self.is_target_workflow() or self.is_blocking_workflow()): + return None + + return { + 'workflowId': self.workflow_id, + 'status': self.status.value if hasattr(self.status, 'value') else str(self.status), + 'tasks': self.tasks or [], + 'createdBy': self.created_by, + 'createTime': self.create_time, + 'updateTime': self.update_time, + 'input': self.input or {}, + 'output': self.output or {}, + 'variables': self.variables or {}, + 'priority': self.priority, + 'targetWorkflowId': self.target_workflow_id, + 'targetWorkflowStatus': self.target_workflow_status + } + + def get_blocking_task(self) -> Optional[Dict]: + """ + Extract task details from a SignalResponse. + Returns None if the response type doesn't contain task details. + """ + if not (self.is_blocking_task() or self.is_blocking_task_input()): + return None + + return { + 'taskId': self.task_id, + 'taskType': self.task_type, + 'taskDefName': self.task_def_name, + 'workflowType': self.workflow_type, + 'referenceTaskName': self.reference_task_name, + 'retryCount': self.retry_count, + 'status': self.status.value if hasattr(self.status, 'value') else str(self.status), + 'workflowId': self.workflow_id, + 'input': self.input or {}, + 'output': self.output or {}, + 'priority': self.priority, + 'createTime': self.create_time, + 'updateTime': self.update_time + } + + def get_task_input(self) -> Optional[Dict]: + """ + Extract task input from a SignalResponse. + Only valid for BLOCKING_TASK_INPUT responses. + """ + if not self.is_blocking_task_input(): + return None + + return self.input or {} + + def print_summary(self): + """Print a concise summary for quick overview""" + status_str = self.status.value if hasattr(self.status, 'value') else str(self.status) + + print(f""" +=== Signal Response Summary === +Response Type: {self.response_type} +Workflow ID: {self.workflow_id} +Workflow Status: {self.target_workflow_status} +""") + + if self.is_target_workflow() or self.is_blocking_workflow(): + print(f"Total Tasks: {len(self.tasks) if self.tasks else 0}") + print(f"Workflow Status: {status_str}") + if self.created_by: + print(f"Created By: {self.created_by}") + + if self.is_blocking_task() or self.is_blocking_task_input(): + print(f"Task Info:") + print(f" Task ID: {self.task_id}") + print(f" Task Type: {self.task_type}") + print(f" Reference Name: {self.reference_task_name}") + print(f" Status: {status_str}") + print(f" Retry Count: {self.retry_count}") + if self.workflow_type: + print(f" Workflow Type: {self.workflow_type}") + + def get_response_summary(self) -> str: + """Get a quick text summary of the response type and key info""" + status_str = self.status.value if hasattr(self.status, 'value') else str(self.status) + + if self.is_target_workflow(): + return f"TARGET_WORKFLOW: {self.workflow_id} ({self.target_workflow_status}) - {len(self.tasks) if self.tasks else 0} tasks" + elif self.is_blocking_workflow(): + return f"BLOCKING_WORKFLOW: {self.workflow_id} ({status_str}) - {len(self.tasks) if self.tasks else 0} tasks" + elif self.is_blocking_task(): + return f"BLOCKING_TASK: {self.task_type} ({self.reference_task_name}) - {status_str}" + elif self.is_blocking_task_input(): + return f"BLOCKING_TASK_INPUT: {self.task_type} ({self.reference_task_name}) - Input data available" + else: + return f"UNKNOWN_RESPONSE_TYPE: {self.response_type}" + + def print_tasks_summary(self): + """Print a detailed summary of all tasks""" + if not self.tasks: + print("No tasks found in the response.") + return + + print(f"\n=== Tasks Summary ({len(self.tasks)} tasks) ===") + for i, task in enumerate(self.tasks, 1): + if isinstance(task, dict): + print(f"\nTask {i}:") + print(f" Type: {task.get('taskType', 'UNKNOWN')}") + print(f" Reference Name: {task.get('referenceTaskName', 'UNKNOWN')}") + print(f" Status: {task.get('status', 'UNKNOWN')}") + print(f" Task ID: {task.get('taskId', 'UNKNOWN')}") + print(f" Sequence: {task.get('seq', 'N/A')}") + if task.get('startTime'): + print(f" Start Time: {task.get('startTime')}") + if task.get('endTime'): + print(f" End Time: {task.get('endTime')}") + if task.get('inputData'): + print(f" Input Data: {task.get('inputData')}") + if task.get('outputData'): + print(f" Output Data: {task.get('outputData')}") + if task.get('workerId'): + print(f" Worker ID: {task.get('workerId')}") + + def get_full_json(self) -> str: + """Get the complete response as JSON string (like Swagger)""" + import json + return json.dumps(self.to_dict(), indent=2) + + def save_to_file(self, filename: str): + """Save the complete response to a JSON file""" + import json + with open(filename, 'w') as f: + json.dump(self.to_dict(), f, indent=2) + print(f"Response saved to {filename}") def to_dict(self): - """Returns the model properties as a dict""" + """Returns the model properties as a dict with camelCase keys""" result = {} - for attr, _ in six.iteritems(self.swagger_types): - try: - # Get the actual property value by accessing the private attribute - # or calling the property getter - if hasattr(self, f'_{attr}'): - # Access the private attribute directly to get the actual value - value = getattr(self, f'_{attr}') - else: - # Fallback to the property getter - value = getattr(self, attr) + for snake_key, value in self.__dict__.items(): + if value is None or snake_key == 'discriminator': + continue - # Skip None values and property objects - if value is None or isinstance(value, property): + # Convert to camelCase using attribute_map + camel_key = self.attribute_map.get(snake_key, snake_key) + + if isinstance(value, TaskStatus): + result[camel_key] = value.value + elif snake_key == 'tasks' and not value: + # For BLOCKING_TASK responses, don't include empty tasks array + if self.response_type != "BLOCKING_TASK": + result[camel_key] = value + elif snake_key in ['task_type', 'task_id', 'reference_task_name', 'task_def_name', + 'workflow_type'] and not value: + # For TARGET_WORKFLOW responses, don't include empty task fields + if self.response_type == "BLOCKING_TASK": continue - - if isinstance(value, list): - result[attr] = list(map( - lambda x: x.to_dict() if hasattr(x, "to_dict") else x, - value - )) - elif hasattr(value, "to_dict"): - result[attr] = value.to_dict() - elif isinstance(value, dict): - result[attr] = dict(map( - lambda item: (item[0], item[1].to_dict()) - if hasattr(item[1], "to_dict") else item, - value.items() - )) - elif isinstance(value, TaskStatus): - result[attr] = value.value else: - result[attr] = value - - except (AttributeError, TypeError): - # Skip attributes that can't be accessed + result[camel_key] = value + elif snake_key in ['variables', 'created_by'] and not value: + # Don't include empty variables or None created_by continue + else: + result[camel_key] = value - # Convert to camelCase using attribute_map - camel_case_result = {} - for snake_key, camel_key in self.attribute_map.items(): - if snake_key in result and result[snake_key] is not None: - camel_case_result[camel_key] = result[snake_key] - - if issubclass(SignalResponse, dict): - for key, value in self.items(): - camel_case_result[key] = value - - return camel_case_result - - # Alternative approach - override __str__ for better printing - def __str__(self): - """Returns a more readable string representation""" - return f"""SignalResponse( - response_type='{self.response_type}', - target_workflow_id='{self.target_workflow_id}', - target_workflow_status='{self.target_workflow_status}', - workflow_id='{self.workflow_id}', - task_id='{self.task_id}', - task_type='{self.task_type}', - status='{self.status}', - reference_task_name='{self.reference_task_name}', - retry_count={self.retry_count}, - input={self.input}, - output={self.output} - )""" + return result @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'SignalResponse': @@ -485,6 +531,30 @@ def from_dict(cls, data: Dict[str, Any]) -> 'SignalResponse': return cls(**snake_case_data) + @classmethod + def from_api_response(cls, data: Dict[str, Any]) -> 'SignalResponse': + """Create instance from API response dictionary with proper field mapping""" + if not isinstance(data, dict): + return cls() + + kwargs = {} + + # Reverse mapping from camelCase to snake_case + reverse_mapping = {v: k for k, v in cls.attribute_map.items()} + + for camel_key, value in data.items(): + if camel_key in reverse_mapping: + snake_key = reverse_mapping[camel_key] + if snake_key == 'status' and value and isinstance(value, str): + try: + kwargs[snake_key] = TaskStatus(value) + except ValueError: + kwargs[snake_key] = value + else: + kwargs[snake_key] = value + + return cls(**kwargs) + def to_str(self): """Returns the string representation of the model""" return pprint.pformat(self.to_dict()) From 6cecbafa0766d8ffa41e7d4261df5bef98e5ef8f Mon Sep 17 00:00:00 2001 From: harshilraval Date: Tue, 10 Jun 2025 19:16:46 +0530 Subject: [PATCH 5/7] test suit for signal and signalAsync wrapper for signal and signalAsync --- .../client/http/api/task_resource_api.py | 2 +- .../workflow/executor/workflow_executor.py | 18 + .../test_data/complex_wf_signal_test.json | 65 +++ .../complex_wf_signal_test_subworkflow_1.json | 82 ++++ .../complex_wf_signal_test_subworkflow_2.json | 61 +++ .../workflow/test_workflow_execution.py | 429 ++++++++++++++++++ 6 files changed, 656 insertions(+), 1 deletion(-) create mode 100644 tests/integration/resources/test_data/complex_wf_signal_test.json create mode 100644 tests/integration/resources/test_data/complex_wf_signal_test_subworkflow_1.json create mode 100644 tests/integration/resources/test_data/complex_wf_signal_test_subworkflow_2.json diff --git a/src/conductor/client/http/api/task_resource_api.py b/src/conductor/client/http/api/task_resource_api.py index e58c1c9b7..0515cc89e 100644 --- a/src/conductor/client/http/api/task_resource_api.py +++ b/src/conductor/client/http/api/task_resource_api.py @@ -1926,7 +1926,7 @@ def signal_workflow_task_sync_with_http_info(self, workflow_id, status, body, ** path_params['status'] = params['status'] # noqa: E501 query_params = [] - if 'return_strategy' in params: + if 'return_strategy' in params and params['return_strategy'] is not None: query_params.append(('returnStrategy', params['return_strategy'])) # noqa: E501 header_params = {} diff --git a/src/conductor/client/workflow/executor/workflow_executor.py b/src/conductor/client/workflow/executor/workflow_executor.py index feed11684..17f56ff9d 100644 --- a/src/conductor/client/workflow/executor/workflow_executor.py +++ b/src/conductor/client/workflow/executor/workflow_executor.py @@ -235,6 +235,24 @@ def get_task(self, task_id: str) -> str: task_id=task_id ) + def signal(self, workflow_id: str, status: str, body: Dict[str, Any], + return_strategy: str = None) -> SignalResponse: + """Update running task in the workflow with given status and output synchronously and return back updated workflow""" + return self.task_client.signal_workflow_task_sync( + workflow_id=workflow_id, + status=status, + body=body, + return_strategy=return_strategy + ) + + def signal_async(self, workflow_id: str, status: str, body: Dict[str, Any]) -> None: + """Update running task in the workflow with given status and output asynchronously""" + return self.task_client.signal_workflow_task_async( + workflow_id=workflow_id, + status=status, + body=body + ) + def __get_task_result(self, task_id: str, workflow_id: str, task_output: Dict[str, Any], status: str) -> TaskResult: return TaskResult( workflow_instance_id=workflow_id, diff --git a/tests/integration/resources/test_data/complex_wf_signal_test.json b/tests/integration/resources/test_data/complex_wf_signal_test.json new file mode 100644 index 000000000..ecf3d55f4 --- /dev/null +++ b/tests/integration/resources/test_data/complex_wf_signal_test.json @@ -0,0 +1,65 @@ +{ + "createTime": 1744299182957, + "updateTime": 1744299435683, + "name": "complex_wf_signal_test", + "description": "http_yield_signal_test", + "version": 1, + "tasks": [ + { + "name": "http", + "taskReferenceName": "http_ref", + "inputParameters": { + "uri": "http://httpbin:8081/api/hello?name=test1", + "method": "GET", + "accept": "application/json", + "contentType": "application/json", + "encode": true + }, + "type": "HTTP", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [], + "onStateChange": {}, + "permissive": false + }, + { + "name": "sub_workflow", + "taskReferenceName": "sub_workflow_ref", + "inputParameters": {}, + "type": "SUB_WORKFLOW", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "subWorkflowParam": { + "name": "complex_wf_signal_test_subworkflow_1", + "version": 1 + }, + "joinOn": [], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [], + "onStateChange": {}, + "permissive": false + } + ], + "inputParameters": [], + "outputParameters": {}, + "failureWorkflow": "", + "schemaVersion": 2, + "restartable": true, + "workflowStatusListenerEnabled": false, + "ownerEmail": "shailesh.padave@orkes.io", + "timeoutPolicy": "ALERT_ONLY", + "timeoutSeconds": 0, + "variables": {}, + "inputTemplate": {}, + "enforceSchema": true +} \ No newline at end of file diff --git a/tests/integration/resources/test_data/complex_wf_signal_test_subworkflow_1.json b/tests/integration/resources/test_data/complex_wf_signal_test_subworkflow_1.json new file mode 100644 index 000000000..f26a7dbbf --- /dev/null +++ b/tests/integration/resources/test_data/complex_wf_signal_test_subworkflow_1.json @@ -0,0 +1,82 @@ +{ + "createTime": 1744299356718, + "updateTime": 1744287643769, + "name": "complex_wf_signal_test_subworkflow_1", + "description": "complex_wf_signal_test_subworkflow_1", + "version": 1, + "tasks": [ + { + "name": "http", + "taskReferenceName": "http_ref", + "inputParameters": { + "uri": "http://httpbin:8081/api/hello?name=test1", + "method": "GET", + "accept": "application/json", + "contentType": "application/json", + "encode": true + }, + "type": "HTTP", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [], + "onStateChange": {}, + "permissive": false + }, + { + "name": "yield", + "taskReferenceName": "simple_ref_1", + "inputParameters": {}, + "type": "YIELD", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [], + "onStateChange": {}, + "permissive": false + }, + { + "name": "sub_workflow", + "taskReferenceName": "sub_workflow_ref", + "inputParameters": {}, + "type": "SUB_WORKFLOW", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "subWorkflowParam": { + "name": "complex_wf_signal_test_subworkflow_2", + "version": 1 + }, + "joinOn": [], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [], + "onStateChange": {}, + "permissive": false + } + ], + "inputParameters": [], + "outputParameters": {}, + "failureWorkflow": "", + "schemaVersion": 2, + "restartable": true, + "workflowStatusListenerEnabled": false, + "ownerEmail": "shailesh.padave@orkes.io", + "timeoutPolicy": "ALERT_ONLY", + "timeoutSeconds": 0, + "variables": {}, + "inputTemplate": {}, + "enforceSchema": true +} \ No newline at end of file diff --git a/tests/integration/resources/test_data/complex_wf_signal_test_subworkflow_2.json b/tests/integration/resources/test_data/complex_wf_signal_test_subworkflow_2.json new file mode 100644 index 000000000..1bc322979 --- /dev/null +++ b/tests/integration/resources/test_data/complex_wf_signal_test_subworkflow_2.json @@ -0,0 +1,61 @@ +{ + "createTime": 1744299371396, + "updateTime": 0, + "name": "complex_wf_signal_test_subworkflow_2", + "description": "complex_wf_signal_test_subworkflow_2", + "version": 1, + "tasks": [ + { + "name": "http", + "taskReferenceName": "http_ref", + "inputParameters": { + "uri": "http://httpbin:8081/api/hello?name=test1", + "method": "GET", + "accept": "application/json", + "contentType": "application/json", + "encode": true + }, + "type": "HTTP", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [], + "onStateChange": {}, + "permissive": false + }, + { + "name": "yield", + "taskReferenceName": "simple_ref_1", + "inputParameters": {}, + "type": "YIELD", + "decisionCases": {}, + "defaultCase": [], + "forkTasks": [], + "startDelay": 0, + "joinOn": [], + "optional": false, + "defaultExclusiveJoinTask": [], + "asyncComplete": false, + "loopOver": [], + "onStateChange": {}, + "permissive": false + } + ], + "inputParameters": [], + "outputParameters": {}, + "failureWorkflow": "", + "schemaVersion": 2, + "restartable": true, + "workflowStatusListenerEnabled": false, + "ownerEmail": "shailesh.padave@orkes.io", + "timeoutPolicy": "ALERT_ONLY", + "timeoutSeconds": 0, + "variables": {}, + "inputTemplate": {}, + "enforceSchema": true +} \ No newline at end of file diff --git a/tests/integration/workflow/test_workflow_execution.py b/tests/integration/workflow/test_workflow_execution.py index d2cf07a48..f414f1f39 100644 --- a/tests/integration/workflow/test_workflow_execution.py +++ b/tests/integration/workflow/test_workflow_execution.py @@ -1,4 +1,5 @@ import logging +import time from multiprocessing import set_start_method from time import sleep @@ -18,6 +19,9 @@ TASK_NAME = "python_integration_test_task" WORKFLOW_VERSION = 1234 WORKFLOW_OWNER_EMAIL = "test@test" +COMPLEX_WF_NAME = 'complex_wf_signal_test' +SUB_WF_1_NAME = 'complex_wf_signal_test_subworkflow_1' +SUB_WF_2_NAME = 'complex_wf_signal_test_subworkflow_2' logger = logging.getLogger( Configuration.get_logging_formatted_name( @@ -55,6 +59,11 @@ def run_workflow_execution_tests(configuration: Configuration, workflow_executor workflow_completion_timeout=5.0 ) test_decorated_workers(workflow_executor) + + # Add signal tests here + run_signal_tests(configuration, workflow_executor) + logger.debug('finished signal API tests') + except Exception as e: task_handler.stop_processes() raise Exception(f'failed integration tests, reason: {e}') @@ -218,3 +227,423 @@ def _run_with_retry_attempt(f, params, retries=4) -> None: if attempt == retries - 1: raise e sleep(1 << attempt) + + +# ===== SIGNAL TESTS ===== + +def run_signal_tests(configuration: Configuration, workflow_executor: WorkflowExecutor): + """Run all signal API tests using WorkflowExecutor methods""" + logger.info('START: Signal API tests using WorkflowExecutor') + + try: + # Register signal test workflows (same as original test) + _register_signal_test_workflows(workflow_executor) + + # Test sync signal with different return strategies + test_signal_target_workflow(workflow_executor) + test_signal_blocking_workflow(workflow_executor) + test_signal_blocking_task(workflow_executor) + test_signal_blocking_task_input(workflow_executor) + + # Test default return strategy + test_signal_default_strategy(workflow_executor) + + # Test async signal + test_signal_async(workflow_executor) + + # Test to_dict fix + test_signal_to_dict_fix(workflow_executor) + + logger.info('All signal tests completed successfully') + + except Exception as e: + logger.error(f'Signal tests failed: {e}') + raise + finally: + # Cleanup + try: + workflow_executor.metadata_client.unregister_workflow_def( + COMPLEX_WF_NAME, 1 + ) + workflow_executor.metadata_client.unregister_workflow_def( + SUB_WF_1_NAME, 1 + ) + workflow_executor.metadata_client.unregister_workflow_def( + SUB_WF_2_NAME, 1 + ) + except Exception as cleanup_error: + logger.warning(f'Cleanup failed: {cleanup_error}') + + logger.info('END: Signal API tests using WorkflowExecutor') + + +def _register_signal_test_workflows(workflow_executor: WorkflowExecutor): + """Register the complex signal test workflows from JSON files""" + import json + import os + + def _get_workflow_definition(path): + """Get workflow definition from JSON file, following existing pattern""" + # Get directory of current script + current_dir = os.path.dirname(os.path.abspath(__file__)) + + # Path to project root (adjust based on your structure) + project_root = os.path.abspath(os.path.join(current_dir, '..', '..', '..')) + + # Construct path from project root + actual_path = os.path.join(project_root, path) + + # For debugging + logger.info(f"Attempting to load workflow from: {actual_path}") + + try: + with open(actual_path, "r") as f: + workflow_json = json.loads(f.read()) + # Convert to WorkflowDef object + api_client = workflow_executor.workflow_client.api_client + workflow_def = api_client.deserialize_class(workflow_json, "WorkflowDef") + return workflow_def + except FileNotFoundError: + logger.error(f"Workflow definition file not found: {actual_path}") + raise + except Exception as e: + logger.error(f"Error loading workflow definition: {e}") + raise + + try: + # Register main workflow + complex_wf_def = _get_workflow_definition(f'tests/integration/resources/test_data/{COMPLEX_WF_NAME}.json') + workflow_executor.metadata_client.update1(body=[complex_wf_def], overwrite=True) + logger.info(f'Registered workflow: {COMPLEX_WF_NAME}') + + # Register subworkflows + sub_wf1_def = _get_workflow_definition(f'tests/integration/resources/test_data/{SUB_WF_1_NAME}.json') + workflow_executor.metadata_client.update1(body=[sub_wf1_def], overwrite=True) + logger.info(f'Registered workflow: {SUB_WF_1_NAME}') + + sub_wf2_def = _get_workflow_definition(f'tests/integration/resources/test_data/{SUB_WF_2_NAME}.json') + workflow_executor.metadata_client.update1(body=[sub_wf2_def], overwrite=True) + logger.info(f'Registered workflow: {SUB_WF_2_NAME}') + + except Exception as e: + logger.warning(f'Some workflows may already be registered: {e}') + + # Give time for workflow registration + time.sleep(1.0) + + +def _start_complex_workflow(workflow_executor: WorkflowExecutor) -> str: + """Start complex workflow and return workflow ID""" + try: + start_request = StartWorkflowRequest( + name=COMPLEX_WF_NAME, + version=1, + input={} + ) + + # Use the workflow_executor.start_workflow method + workflow_id = workflow_executor.start_workflow(start_request) + + assert workflow_id is not None, "Failed to start workflow" + + # Brief wait for workflow initialization + time.sleep(0.5) + logger.info(f'Started workflow {workflow_id}') + return workflow_id + + except Exception as e: + logger.error(f'Failed to start workflow: {e}') + raise + + +def _complete_workflow(workflow_executor: WorkflowExecutor, workflow_id: str): + """Complete workflow by sending required signals""" + try: + # Send completion signals using signal_async method + time.sleep(5) + workflow_executor.signal_async( + workflow_id=workflow_id, + status="COMPLETED", + body={"result": "signal1"} + ) + workflow_executor.signal_async( + workflow_id=workflow_id, + status="COMPLETED", + body={"result": "signal2"} + ) + + # Wait for completion with timeout + # max_wait_iterations = 500 # 5 seconds max + # for i in range(max_wait_iterations): + # workflow = workflow_executor.get_workflow(workflow_id, include_tasks=True) + # if workflow.status == "COMPLETED": + # logger.info(f'Workflow {workflow_id} completed successfully') + # return + # time.sleep(0.1) + # + # raise TimeoutError(f'Workflow {workflow_id} did not complete within timeout') + + except Exception as e: + logger.error(f'Failed to complete workflow {workflow_id}: {e}') + raise + + +def test_signal_target_workflow(workflow_executor: WorkflowExecutor): + """Test signal with TARGET_WORKFLOW return strategy""" + logger.info('Testing signal with TARGET_WORKFLOW strategy...') + + # Start workflow + workflow_id = _start_complex_workflow(workflow_executor) + + # Wait and check workflow status + time.sleep(1.0) + + # Debug: Check workflow status before signaling + try: + workflow = workflow_executor.get_workflow(workflow_id, include_tasks=True) + logger.info(f"Workflow status before signal: {workflow.status}") + logger.info(f"Workflow tasks: {len(workflow.tasks) if workflow.tasks else 0}") + if workflow.tasks: + for task in workflow.tasks: + logger.info(f"Task: {task.task_type}, Status: {task.status}, Ref: {task.reference_task_name}") + except Exception as e: + logger.warning(f"Could not get workflow status: {e}") + + # Send signal with TARGET_WORKFLOW strategy + response = workflow_executor.signal( + workflow_id=workflow_id, + status="COMPLETED", + body={"result": "test_target_workflow"}, + return_strategy="TARGET_WORKFLOW" + ) + + # Debug: Print response details + logger.info(f"Response object: {response}") + logger.info(f"Response type: {type(response)}") + if hasattr(response, '__dict__'): + logger.info(f"Response attributes: {response.__dict__}") + if hasattr(response, 'to_dict'): + try: + logger.info(f"Response to_dict: {response.to_dict()}") + except Exception as e: + logger.warning(f"to_dict failed: {e}") + + # Validate response + assert response is not None, "Signal response is None" + assert hasattr(response, 'response_type'), "Response missing response_type attribute" + assert response.response_type == "TARGET_WORKFLOW", f"Expected TARGET_WORKFLOW, got {response.response_type}" + assert response.target_workflow_id == workflow_id, "Target workflow ID mismatch" + + # Test helper methods + assert response.is_target_workflow(), "is_target_workflow() should return True" + + workflow_data = response.get_workflow() + assert workflow_data is not None, "get_workflow() should return data" + assert workflow_data.get('workflowId') == workflow_id, "Workflow data should contain correct workflow_id" + + # Wait for workflow completion + _complete_workflow(workflow_executor, workflow_id) + + logger.info('TARGET_WORKFLOW strategy test completed') + + # Test helper methods + assert response.is_target_workflow(), "is_target_workflow() should return True" + + workflow_data = response.get_workflow() + assert workflow_data is not None, "get_workflow() should return data" + assert workflow_data.get('workflowId') == workflow_id, "Workflow data should contain correct workflow_id" + + # Wait for workflow completion + _wait_for_workflow_completion(workflow_executor, workflow_id) + + logger.info('TARGET_WORKFLOW strategy test completed') + + +def test_signal_blocking_workflow(workflow_executor: WorkflowExecutor): + """Test signal with BLOCKING_WORKFLOW return strategy""" + logger.info('Testing signal with BLOCKING_WORKFLOW strategy...') + + workflow_id = _start_complex_workflow(workflow_executor) + time.sleep(0.5) + + response = workflow_executor.signal( + workflow_id=workflow_id, + status="COMPLETED", + body={"result": "test_blocking_workflow"}, + return_strategy="BLOCKING_WORKFLOW" + ) + + # Validate response + assert response is not None, "Signal response is None" + assert response.response_type == "BLOCKING_WORKFLOW", f"Expected BLOCKING_WORKFLOW, got {response.response_type}" + assert response.target_workflow_id == workflow_id, "Target workflow ID mismatch" + + # Test helper methods + assert response.is_blocking_workflow(), "is_blocking_workflow() should return True" + + workflow_data = response.get_workflow() + assert workflow_data is not None, "get_workflow() should return data" + + _complete_workflow(workflow_executor, workflow_id) + logger.info('BLOCKING_WORKFLOW strategy test completed') + + +def test_signal_blocking_task(workflow_executor: WorkflowExecutor): + """Test signal with BLOCKING_TASK return strategy""" + logger.info('Testing signal with BLOCKING_TASK strategy...') + + workflow_id = _start_complex_workflow(workflow_executor) + time.sleep(0.5) + + response = workflow_executor.signal( + workflow_id=workflow_id, + status="COMPLETED", + body={"result": "test_blocking_task"}, + return_strategy="BLOCKING_TASK" + ) + + # Validate response + assert response is not None, "Signal response is None" + assert response.response_type == "BLOCKING_TASK", f"Expected BLOCKING_TASK, got {response.response_type}" + assert response.target_workflow_id == workflow_id, "Target workflow ID mismatch" + + # Test helper methods + assert response.is_blocking_task(), "is_blocking_task() should return True" + + task_data = response.get_blocking_task() + assert task_data is not None, "get_blocking_task() should return data" + assert task_data.get('taskId') is not None, "Task data should contain task_id" + + _complete_workflow(workflow_executor, workflow_id) + logger.info('BLOCKING_TASK strategy test completed') + + +def test_signal_blocking_task_input(workflow_executor: WorkflowExecutor): + """Test signal with BLOCKING_TASK_INPUT return strategy""" + logger.info('Testing signal with BLOCKING_TASK_INPUT strategy...') + + workflow_id = _start_complex_workflow(workflow_executor) + time.sleep(0.5) + + response = workflow_executor.signal( + workflow_id=workflow_id, + status="COMPLETED", + body={"result": "test_blocking_task_input"}, + return_strategy="BLOCKING_TASK_INPUT" + ) + + # Validate response + assert response is not None, "Signal response is None" + assert response.response_type == "BLOCKING_TASK_INPUT", f"Expected BLOCKING_TASK_INPUT, got {response.response_type}" + assert response.target_workflow_id == workflow_id, "Target workflow ID mismatch" + + # Test helper methods + assert response.is_blocking_task_input(), "is_blocking_task_input() should return True" + + task_data = response.get_blocking_task() + task_input = response.get_task_input() + assert task_data is not None, "get_blocking_task() should return data" + assert task_input is not None, "get_task_input() should return data" + + _complete_workflow(workflow_executor, workflow_id) + logger.info('BLOCKING_TASK_INPUT strategy test completed') + + +def test_signal_default_strategy(workflow_executor: WorkflowExecutor): + """Test signal with default return strategy""" + logger.info('Testing signal with default strategy...') + + workflow_id = _start_complex_workflow(workflow_executor) + time.sleep(0.5) + + # Don't specify return_strategy - should default to TARGET_WORKFLOW + response = workflow_executor.signal( + workflow_id=workflow_id, + status="COMPLETED", + body={"result": "test_default"} + ) + + # Validate response + assert response is not None, "Signal response is None" + assert response.response_type == "TARGET_WORKFLOW", f"Expected TARGET_WORKFLOW (default), got {response.response_type}" + assert response.target_workflow_id == workflow_id, "Target workflow ID mismatch" + assert response.is_target_workflow(), "Default should be TARGET_WORKFLOW" + + _complete_workflow(workflow_executor, workflow_id) + logger.info('Default strategy test completed') + + +def test_signal_async(workflow_executor: WorkflowExecutor): + """Test async signal""" + logger.info('Testing async signal...') + + workflow_id = _start_complex_workflow(workflow_executor) + time.sleep(0.5) + + # Send async signal (should not return response) + result = workflow_executor.signal_async( + workflow_id=workflow_id, + status="COMPLETED", + body={"result": "test_async"} + ) + + # Async signal should return None + assert result is None, "Async signal should return None" + + _complete_workflow(workflow_executor, workflow_id) + logger.info('Async signal test completed') + + +def test_signal_to_dict_fix(workflow_executor: WorkflowExecutor): + """Test that to_dict() returns actual values, not property objects""" + logger.info('Testing to_dict() method fix...') + + workflow_id = _start_complex_workflow(workflow_executor) + time.sleep(0.5) + + response = workflow_executor.signal( + workflow_id=workflow_id, + status="COMPLETED", + body={"result": "test_to_dict"}, + return_strategy="BLOCKING_TASK" + ) + + response_dict = response.to_dict() + + # Ensure no property objects in the output + response_str = str(response_dict) + assert 'property object' not in response_str, f"Found property objects in response: {response_str}" + + # Verify actual string values + assert isinstance(response_dict.get('responseType'), str), "responseType should be string" + assert response_dict['responseType'] == 'BLOCKING_TASK', "responseType value incorrect" + assert isinstance(response_dict.get('taskId'), str), "taskId should be string" + assert isinstance(response_dict.get('targetWorkflowId'), str), "targetWorkflowId should be string" + + _complete_workflow(workflow_executor, workflow_id) + logger.info('to_dict() method test completed') + dict['responseType'] == 'BLOCKING_TASK', "responseType value incorrect" + assert isinstance(response_dict.get('taskId'), str), "taskId should be string" + assert isinstance(response_dict.get('targetWorkflowId'), str), "targetWorkflowId should be string" + + _wait_for_workflow_completion(workflow_executor, workflow_id) + + logger.info('to_dict() method test completed') + + +def _wait_for_workflow_completion(workflow_executor: WorkflowExecutor, workflow_id: str, timeout: int = 10): + """Wait for workflow to complete with timeout""" + max_iterations = timeout * 10 # Check every 0.1 seconds + + for i in range(max_iterations): + try: + workflow = workflow_executor.get_workflow(workflow_id, include_tasks=False) + if workflow.status in ["COMPLETED", "FAILED", "TERMINATED"]: + logger.debug(f'Workflow {workflow_id} completed with status: {workflow.status}') + return + except Exception as e: + logger.warning(f'Error checking workflow status: {e}') + + time.sleep(0.1) + + logger.warning(f'Workflow {workflow_id} did not complete within {timeout} seconds') \ No newline at end of file From f96b8648002741a918c7ddcb301bc992d9c1326a Mon Sep 17 00:00:00 2001 From: harshilraval Date: Wed, 11 Jun 2025 12:50:35 +0530 Subject: [PATCH 6/7] fix test failures in gha --- tests/integration/workflow/test_workflow_execution.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/workflow/test_workflow_execution.py b/tests/integration/workflow/test_workflow_execution.py index f414f1f39..a87e86a33 100644 --- a/tests/integration/workflow/test_workflow_execution.py +++ b/tests/integration/workflow/test_workflow_execution.py @@ -622,7 +622,7 @@ def test_signal_to_dict_fix(workflow_executor: WorkflowExecutor): _complete_workflow(workflow_executor, workflow_id) logger.info('to_dict() method test completed') - dict['responseType'] == 'BLOCKING_TASK', "responseType value incorrect" + # dict['responseType'] == 'BLOCKING_TASK', "responseType value incorrect" assert isinstance(response_dict.get('taskId'), str), "taskId should be string" assert isinstance(response_dict.get('targetWorkflowId'), str), "targetWorkflowId should be string" From 0359215c064085818e36e7346b43d91a17ad06af Mon Sep 17 00:00:00 2001 From: harshilraval Date: Fri, 13 Jun 2025 12:25:35 +0530 Subject: [PATCH 7/7] remove unwanted comments --- .../integration/workflow/test_workflow_execution.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/tests/integration/workflow/test_workflow_execution.py b/tests/integration/workflow/test_workflow_execution.py index a87e86a33..5d6594770 100644 --- a/tests/integration/workflow/test_workflow_execution.py +++ b/tests/integration/workflow/test_workflow_execution.py @@ -371,18 +371,6 @@ def _complete_workflow(workflow_executor: WorkflowExecutor, workflow_id: str): status="COMPLETED", body={"result": "signal2"} ) - - # Wait for completion with timeout - # max_wait_iterations = 500 # 5 seconds max - # for i in range(max_wait_iterations): - # workflow = workflow_executor.get_workflow(workflow_id, include_tasks=True) - # if workflow.status == "COMPLETED": - # logger.info(f'Workflow {workflow_id} completed successfully') - # return - # time.sleep(0.1) - # - # raise TimeoutError(f'Workflow {workflow_id} did not complete within timeout') - except Exception as e: logger.error(f'Failed to complete workflow {workflow_id}: {e}') raise