Skip to content

Commit 7db7ae4

Browse files
committed
fix test failures in gha
1 parent 22d189f commit 7db7ae4

5 files changed

Lines changed: 168 additions & 10 deletions

File tree

src/conductor/client/http/api/workflow_resource_api.py

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3051,4 +3051,135 @@ def update_workflow_and_task_state_with_http_info(self, body, request_id, workfl
30513051
_return_http_data_only=params.get('_return_http_data_only'),
30523052
_preload_content=params.get('_preload_content', True),
30533053
_request_timeout=params.get('_request_timeout'),
3054+
collection_formats=collection_formats)
3055+
3056+
def execute_workflow_cr(self, body, name, version, **kwargs): # noqa: E501
3057+
"""Execute a workflow synchronously with reactive response # noqa: E501
3058+
3059+
This method makes a synchronous HTTP request by default. To make an
3060+
asynchronous HTTP request, please pass async_req=True
3061+
>>> thread = api.execute_workflow_cr(body,name,version)
3062+
>>> result = thread.get()
3063+
3064+
:param async_req bool
3065+
:param StartWorkflowRequest body: (required)
3066+
:param str name: (required)
3067+
:param int version: (required)
3068+
:param str request_id:
3069+
:param str wait_until_task_ref:
3070+
:param int wait_for_seconds:
3071+
:param str consistency: DURABLE or EVENTUAL
3072+
:param str return_strategy: TARGET_WORKFLOW or WAIT_WORKFLOW
3073+
:return: WorkflowRun
3074+
If the method is called asynchronously,
3075+
returns the request thread.
3076+
"""
3077+
kwargs['_return_http_data_only'] = True
3078+
if kwargs.get('async_req'):
3079+
return self.execute_workflow_reactive_with_http_info(body, name, version, **kwargs) # noqa: E501
3080+
else:
3081+
(data) = self.execute_workflow_reactive_with_http_info(body, name, version, **kwargs) # noqa: E501
3082+
return data
3083+
3084+
def execute_workflow_reactive_with_http_info(self, body, name, version, **kwargs): # noqa: E501
3085+
"""Execute a workflow synchronously with reactive response # noqa: E501
3086+
3087+
This method makes a synchronous HTTP request by default. To make an
3088+
asynchronous HTTP request, please pass async_req=True
3089+
>>> thread = api.execute_workflow_reactive_with_http_info(body, name, version, async_req=True)
3090+
>>> result = thread.get()
3091+
3092+
:param async_req bool
3093+
:param StartWorkflowRequest body: (required)
3094+
:param str name: (required)
3095+
:param int version: (required)
3096+
:param str request_id:
3097+
:param str wait_until_task_ref:
3098+
:param int wait_for_seconds:
3099+
:param str consistency: DURABLE or EVENTUAL
3100+
:param str return_strategy: TARGET_WORKFLOW or WAIT_WORKFLOW
3101+
:return: WorkflowRun
3102+
If the method is called asynchronously,
3103+
returns the request thread.
3104+
"""
3105+
3106+
all_params = ['body', 'name', 'version', 'request_id', 'wait_until_task_ref', 'wait_for_seconds', 'consistency',
3107+
'return_strategy', 'async_req', '_return_http_data_only', '_preload_content',
3108+
'_request_timeout'] # noqa: E501
3109+
3110+
params = locals()
3111+
for key, val in six.iteritems(params['kwargs']):
3112+
if key not in all_params:
3113+
raise TypeError(
3114+
"Got an unexpected keyword argument '%s'"
3115+
" to method execute_workflow" % key
3116+
)
3117+
params[key] = val
3118+
del params['kwargs']
3119+
# verify the required parameter 'body' is set
3120+
if ('body' not in params or
3121+
params['body'] is None):
3122+
raise ValueError("Missing the required parameter `body` when calling `execute_workflow`") # noqa: E501
3123+
# verify the required parameter 'name' is set
3124+
if ('name' not in params or
3125+
params['name'] is None):
3126+
raise ValueError("Missing the required parameter `name` when calling `execute_workflow`") # noqa: E501
3127+
# verify the required parameter 'version' is set
3128+
if ('version' not in params or
3129+
params['version'] is None):
3130+
raise ValueError("Missing the required parameter `version` when calling `execute_workflow`") # noqa: E501
3131+
3132+
collection_formats = {}
3133+
3134+
path_params = {}
3135+
if 'name' in params:
3136+
path_params['name'] = params['name'] # noqa: E501
3137+
if 'version' in params:
3138+
path_params['version'] = params['version'] # noqa: E501
3139+
3140+
query_params = []
3141+
if 'request_id' in params:
3142+
query_params.append(('requestId', params['request_id'])) # noqa: E501
3143+
if 'wait_until_task_ref' in params:
3144+
query_params.append(('waitUntilTaskRef', params['wait_until_task_ref'])) # noqa: E501
3145+
if 'wait_for_seconds' in params:
3146+
query_params.append(('waitForSeconds', params['wait_for_seconds'])) # noqa: E501
3147+
if 'consistency' in params:
3148+
query_params.append(('consistency', params['consistency'])) # noqa: E501
3149+
if 'return_strategy' in params:
3150+
query_params.append(('returnStrategy', params['return_strategy'])) # noqa: E501
3151+
3152+
header_params = {}
3153+
3154+
form_params = []
3155+
local_var_files = {}
3156+
3157+
body_params = None
3158+
if 'body' in params:
3159+
body_params = params['body']
3160+
# HTTP header `Accept`
3161+
header_params['Accept'] = self.api_client.select_header_accept(
3162+
['application/json']) # noqa: E501
3163+
3164+
# HTTP header `Content-Type`
3165+
header_params['Content-Type'] = self.api_client.select_header_content_type( # noqa: E501
3166+
['application/json']) # noqa: E501
3167+
3168+
# Authentication setting
3169+
auth_settings = ['api_key'] # noqa: E501
3170+
3171+
return self.api_client.call_api(
3172+
'/workflow/execute/{name}/{version}', 'POST',
3173+
path_params,
3174+
query_params,
3175+
header_params,
3176+
body=body_params,
3177+
post_params=form_params,
3178+
files=local_var_files,
3179+
response_type='WorkflowRun', # noqa: E501
3180+
auth_settings=auth_settings,
3181+
async_req=params.get('async_req'),
3182+
_return_http_data_only=params.get('_return_http_data_only'),
3183+
_preload_content=params.get('_preload_content', True),
3184+
_request_timeout=params.get('_request_timeout'),
30543185
collection_formats=collection_formats)

src/conductor/client/orkes/orkes_workflow_client.py

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,16 +47,37 @@ def execute_workflow(
4747
start_workflow_request: StartWorkflowRequest,
4848
request_id: str = None,
4949
wait_until_task_ref: Optional[str] = None,
50-
wait_for_seconds: int = 30
50+
wait_for_seconds: int = 30,
51+
consistency: Optional[str] = None,
52+
return_strategy: Optional[str] = None
5153
) -> WorkflowRun:
52-
53-
return self.workflowResourceApi.execute_workflow(
54+
"""Execute a workflow synchronously with optional reactive features
55+
56+
Args:
57+
start_workflow_request: StartWorkflowRequest containing workflow details
58+
request_id: Optional request ID for tracking
59+
wait_until_task_ref: Wait until this task reference is reached
60+
wait_for_seconds: How long to wait for completion (default 30)
61+
consistency: Workflow consistency level - 'DURABLE' or 'EVENTUAL'
62+
return_strategy: Return strategy - 'TARGET_WORKFLOW' or 'WAIT_WORKFLOW'
63+
64+
Returns:
65+
WorkflowRun: The workflow execution result
66+
"""
67+
if consistency is None:
68+
consistency = 'DURABLE'
69+
if return_strategy is None:
70+
return_strategy = 'TARGET_WORKFLOW'
71+
72+
return self.workflowResourceApi.execute_workflow_cr(
5473
body=start_workflow_request,
5574
request_id=request_id,
5675
version=start_workflow_request.version,
5776
name=start_workflow_request.name,
5877
wait_until_task_ref=wait_until_task_ref,
5978
wait_for_seconds=wait_for_seconds,
79+
consistency=consistency,
80+
return_strategy=return_strategy
6081
)
6182

6283
def pause_workflow(self, workflow_id: str):

src/conductor/client/workflow/executor/workflow_executor.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,11 @@ def start_workflows(self, *start_workflow_request: StartWorkflowRequest) -> List
4545
)
4646
return workflow_id_list
4747

48-
def execute_workflow(self, request: StartWorkflowRequest, wait_until_task_ref: str, wait_for_seconds: int = 10,
49-
request_id: str = None) -> WorkflowRun:
50-
"""Executes a workflow with StartWorkflowRequest and waits for the completion of the workflow or until a
51-
specific task in the workflow """
48+
def execute_workflow(self, request: StartWorkflowRequest, wait_until_task_ref: str = None,
49+
wait_for_seconds: int = 10, request_id: str = None,
50+
consistency: str = None,
51+
return_strategy: str = None) -> WorkflowRun: # todo change this to SignalResponse
52+
"""Execute a workflow synchronously with optional reactive features"""
5253
if request_id is None:
5354
request_id = str(uuid.uuid4())
5455

@@ -57,6 +58,8 @@ def execute_workflow(self, request: StartWorkflowRequest, wait_until_task_ref: s
5758
request_id=request_id,
5859
wait_until_task_ref=wait_until_task_ref,
5960
wait_for_seconds=wait_for_seconds,
61+
consistency=consistency,
62+
return_strategy=return_strategy
6063
)
6164

6265
def execute(self, name: str, version: Optional[int] = None, workflow_input: Any = {},

src/conductor/client/workflow_client.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ def execute_workflow(
4040
start_workflow_request: StartWorkflowRequest,
4141
request_id: str = None,
4242
wait_until_task_ref: Optional[str] = None,
43-
wait_for_seconds: int = 30
43+
wait_for_seconds: int = 30,
44+
consistency: Optional[str] = None,
45+
return_strategy: Optional[str] = None
4446
) -> WorkflowRun:
4547
pass
4648

tests/unit/orkes/test_workflow_client.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ def test_startWorkflow(self, mock):
7575
mock.assert_called_with(startWorkflowReq)
7676
self.assertEqual(wfId, WORKFLOW_UUID)
7777

78-
@patch.object(WorkflowResourceApi, 'execute_workflow')
78+
@patch.object(WorkflowResourceApi, 'execute_workflow_cr')
7979
def test_executeWorkflow(self, mock):
8080
expectedWfRun = WorkflowRun()
8181
mock.return_value = expectedWfRun
@@ -86,7 +86,8 @@ def test_executeWorkflow(self, mock):
8686
startWorkflowReq, "request_id", None, 30
8787
)
8888
mock.assert_called_with(body=startWorkflowReq, request_id="request_id", name=WORKFLOW_NAME, version=1,
89-
wait_until_task_ref=None, wait_for_seconds=30)
89+
wait_until_task_ref=None, wait_for_seconds=30, consistency='DURABLE',
90+
return_strategy='TARGET_WORKFLOW')
9091
self.assertEqual(workflowRun, expectedWfRun)
9192

9293
@patch.object(WorkflowResourceApi, 'pause_workflow')

0 commit comments

Comments
 (0)