Skip to content

Commit 17d2210

Browse files
Refactoring
1 parent 005c31a commit 17d2210

9 files changed

Lines changed: 101 additions & 49 deletions

File tree

src/conductor/asyncio_client/adapters/api/user_resource_api.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ async def get_granted_permissions(
2323
_headers: Optional[Dict[StrictStr, Any]] = None,
2424
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
2525
) -> object:
26+
# Convert empty user_id to None to prevent sending invalid data to server
27+
if not user_id:
28+
user_id = None # type: ignore[assignment]
2629
return await super().get_granted_permissions(
2730
user_id,
2831
_request_timeout=_request_timeout,
@@ -45,6 +48,9 @@ async def get_user( # type: ignore[override]
4548
_headers: Optional[Dict[StrictStr, Any]] = None,
4649
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
4750
) -> ConductorUserAdapter:
51+
# Convert empty user id to None to prevent sending invalid data to server
52+
if not id:
53+
id = None # type: ignore[assignment]
4854
result = await super().get_user(
4955
id,
5056
_request_timeout=_request_timeout,
@@ -69,6 +75,9 @@ async def upsert_user( # type: ignore[override]
6975
_headers: Optional[Dict[StrictStr, Any]] = None,
7076
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
7177
) -> ConductorUserAdapter:
78+
# Convert empty user id to None to prevent sending invalid data to server
79+
if not id:
80+
id = None # type: ignore[assignment]
7281
result = await super().upsert_user(
7382
id,
7483
upsert_user_request,
@@ -116,6 +125,9 @@ async def delete_user(
116125
_headers: Optional[Dict[StrictStr, Any]] = None,
117126
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
118127
) -> object:
128+
# Convert empty user id to None to prevent sending invalid data to server
129+
if not id:
130+
id = None # type: ignore[assignment]
119131
return await super().delete_user(
120132
id,
121133
_request_timeout=_request_timeout,

src/conductor/asyncio_client/automator/task_handler.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def __init__(
5353
self,
5454
workers: List[WorkerInterface],
5555
configuration: Configuration,
56-
metrics_settings: MetricsSettings,
56+
metrics_settings: Optional[MetricsSettings] = None,
5757
scan_for_annotated_workers: bool = True,
5858
import_modules: Optional[List[str]] = None,
5959
):
@@ -123,7 +123,9 @@ def join_processes(self) -> None:
123123
logger.info("KeyboardInterrupt: Stopping all processes")
124124
self.stop_processes()
125125

126-
def __create_metrics_provider_process(self, metrics_settings: MetricsSettings) -> None:
126+
def __create_metrics_provider_process(
127+
self, metrics_settings: Optional[MetricsSettings] = None
128+
) -> None:
127129
if metrics_settings is None:
128130
self.metrics_provider_process = None
129131
return
@@ -138,7 +140,7 @@ def __create_task_runner_processes(
138140
self,
139141
workers: List[WorkerInterface],
140142
configuration: Configuration,
141-
metrics_settings: MetricsSettings,
143+
metrics_settings: Optional[MetricsSettings] = None,
142144
) -> None:
143145
self.task_runner_processes: List[Process] = []
144146
for worker in workers:
@@ -148,7 +150,7 @@ def __create_task_runner_process(
148150
self,
149151
worker: WorkerInterface,
150152
configuration: Configuration,
151-
metrics_settings: MetricsSettings,
153+
metrics_settings: Optional[MetricsSettings] = None,
152154
) -> None:
153155
task_runner = AsyncTaskRunner(worker, configuration, metrics_settings)
154156
process = Process(target=self.coroutine_as_process_target, args=(task_runner.run,))

src/conductor/asyncio_client/orkes/orkes_authorization_client.py

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,10 @@ async def update_user(
5151
"""Update an existing user"""
5252
return await self.user_api.upsert_user(id=user_id, upsert_user_request=upsert_user_request)
5353

54-
async def get_user(self, user_id: str) -> ConductorUserAdapter:
54+
async def get_user(self, user_id: str) -> Optional[ConductorUserAdapter]:
5555
"""Get user by ID"""
56-
return await self.user_api.get_user(id=user_id)
56+
result = await self.user_api.get_user(id=user_id)
57+
return ConductorUserAdapter.from_dict(result) # type: ignore[arg-type]
5758

5859
async def delete_user(self, user_id: str) -> None:
5960
"""Delete user by ID"""
@@ -66,25 +67,28 @@ async def list_users(self, include_apps: bool = False) -> List[ConductorUserAdap
6667
# Application Operations
6768
async def create_application(
6869
self, application: CreateOrUpdateApplicationRequestAdapter
69-
) -> ExtendedConductorApplicationAdapter:
70+
) -> Optional[ExtendedConductorApplicationAdapter]:
7071
"""Create a new application"""
71-
return await self.application_api.create_application(
72+
result = await self.application_api.create_application(
7273
create_or_update_application_request=application
7374
)
75+
return ExtendedConductorApplicationAdapter.from_dict(result) # type: ignore[arg-type]
7476

7577
async def update_application(
7678
self, application_id: str, application: CreateOrUpdateApplicationRequestAdapter
77-
) -> ExtendedConductorApplicationAdapter:
79+
) -> Optional[ExtendedConductorApplicationAdapter]:
7880
"""Update an existing application"""
79-
return await self.application_api.update_application(
81+
result = await self.application_api.update_application(
8082
id=application_id, create_or_update_application_request=application
8183
)
84+
return ExtendedConductorApplicationAdapter.from_dict(result) # type: ignore[arg-type]
8285

8386
async def get_application(
8487
self, application_id: str
8588
) -> Optional[ExtendedConductorApplicationAdapter]:
8689
"""Get application by ID"""
87-
return await self.application_api.get_application(id=application_id)
90+
result = await self.application_api.get_application(id=application_id)
91+
return ExtendedConductorApplicationAdapter.from_dict(result) # type: ignore[arg-type]
8892

8993
async def delete_application(self, application_id: str) -> None:
9094
"""Delete application by ID"""
@@ -105,15 +109,17 @@ async def create_group(
105109

106110
async def update_group(
107111
self, group_id: str, upsert_group_request: UpsertGroupRequest
108-
) -> GroupAdapter:
112+
) -> Optional[GroupAdapter]:
109113
"""Update an existing group"""
110-
return await self.group_api.upsert_group(
114+
result = await self.group_api.upsert_group(
111115
id=group_id, upsert_group_request=upsert_group_request
112116
)
117+
return GroupAdapter.from_dict(result) # type: ignore[arg-type]
113118

114-
async def get_group(self, group_id: str) -> GroupAdapter:
119+
async def get_group(self, group_id: str) -> Optional[GroupAdapter]:
115120
"""Get group by ID"""
116-
return await self.group_api.get_group(id=group_id)
121+
result = await self.group_api.get_group(id=group_id)
122+
return GroupAdapter.from_dict(result) # type: ignore[arg-type]
117123

118124
async def delete_group(self, group_id: str) -> None:
119125
"""Delete group by ID"""
@@ -170,15 +176,17 @@ async def get_group_permissions(self, group_id: str) -> object:
170176
# Convenience Methods
171177
async def upsert_user(
172178
self, user_id: str, upsert_user_request: UpsertUserRequest
173-
) -> ConductorUserAdapter:
179+
) -> Optional[ConductorUserAdapter]:
174180
"""Alias for create_user/update_user"""
175-
return await self.create_user(user_id, upsert_user_request)
181+
result = await self.create_user(user_id, upsert_user_request)
182+
return ConductorUserAdapter.from_dict(result) # type: ignore[arg-type]
176183

177184
async def upsert_group(
178185
self, group_id: str, upsert_group_request: UpsertGroupRequest
179-
) -> GroupAdapter:
186+
) -> Optional[GroupAdapter]:
180187
"""Alias for create_group/update_group"""
181-
return await self.create_group(group_id, upsert_group_request)
188+
result = await self.create_group(group_id, upsert_group_request)
189+
return GroupAdapter.from_dict(result) # type: ignore[arg-type]
182190

183191
async def set_application_tags(self, tags: List[TagAdapter], application_id: str):
184192
await self.application_api.put_tag_for_application(application_id, tags)
@@ -241,4 +249,5 @@ async def get_granted_permissions_for_user(self, user_id: str) -> List[GrantedAc
241249
async def get_app_by_access_key_id(
242250
self, access_key_id: str, *args, **kwargs
243251
) -> Optional[ExtendedConductorApplicationAdapter]:
244-
return await self.application_api.get_app_by_access_key_id(access_key_id, *args, **kwargs)
252+
result = await self.application_api.get_app_by_access_key_id(access_key_id, *args, **kwargs)
253+
return ExtendedConductorApplicationAdapter.from_dict(result) # type: ignore[arg-type]

src/conductor/asyncio_client/orkes/orkes_workflow_client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -228,13 +228,13 @@ async def skip_task_from_workflow(
228228
self,
229229
workflow_id: str,
230230
task_reference_name: str,
231-
skip_task_request: SkipTaskRequestAdapter,
231+
skip_task_request: Optional[SkipTaskRequestAdapter] = None,
232232
) -> None:
233233
"""Skip a task in a workflow"""
234234
await self.workflow_api.skip_task_from_workflow(
235235
workflow_id=workflow_id,
236236
task_reference_name=task_reference_name,
237-
skip_task_request=skip_task_request,
237+
skip_task_request=skip_task_request, # type: ignore[arg-type]
238238
)
239239

240240
async def jump_to_task(

src/conductor/asyncio_client/workflow/executor/workflow_executor.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ async def skip_task_from_workflow(
266266
self,
267267
workflow_id: str,
268268
task_reference_name: str,
269-
skip_task_request: SkipTaskRequestAdapter,
269+
skip_task_request: Optional[SkipTaskRequestAdapter] = None,
270270
) -> None:
271271
"""Skips a given task from a current running workflow"""
272272
return await self.workflow_client.skip_task_from_workflow(
@@ -307,18 +307,16 @@ async def update_task_by_ref_name_sync(
307307
status: str,
308308
) -> WorkflowAdapter:
309309
"""Update a task By Ref Name"""
310-
body = {"result": task_output}
311310
return await self.task_client.update_task_sync(
311+
request_body=task_output,
312312
workflow_id=workflow_id,
313313
task_ref_name=task_reference_name,
314314
status=status,
315-
request_body=body,
316315
)
317316

318317
async def get_task(self, task_id: str) -> Optional[TaskAdapter]:
319318
"""Get task by Id"""
320-
task = await self.task_client.get_task(task_id=task_id)
321-
return TaskAdapter.from_dict(task.to_dict())
319+
return await self.task_client.get_task(task_id=task_id)
322320

323321
def __get_task_result(
324322
self, task_id: str, workflow_id: str, task_output: Dict[str, Any], status: str

src/conductor/client/adapters/api/application_resource_api_adapter.py

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,30 +7,67 @@
77

88
class ApplicationResourceApiAdapter(ApplicationResourceApi):
99
def create_access_key(self, id: str, **kwargs):
10+
# Convert empty application id to None to prevent sending invalid data to server
11+
if not id:
12+
id = None # type: ignore[assignment]
1013
return super().create_access_key(id, **kwargs)
1114

1215
def add_role_to_application_user(self, application_id: str, role: str, **kwargs):
16+
# Convert empty application_id and role to None to prevent sending invalid data to server
17+
if not application_id:
18+
application_id = None # type: ignore[assignment]
19+
if not role:
20+
role = None # type: ignore[assignment]
1321
return super().add_role_to_application_user(application_id, role, **kwargs)
1422

1523
def delete_access_key(self, application_id: str, key_id: str, **kwargs):
24+
# Convert empty application_id and key_id to None to prevent sending invalid data to server
25+
if not application_id:
26+
application_id = None # type: ignore[assignment]
27+
if not key_id:
28+
key_id = None # type: ignore[assignment]
1629
return super().delete_access_key(application_id, key_id, **kwargs)
1730

1831
def remove_role_from_application_user(self, application_id: str, role: str, **kwargs):
32+
# Convert empty application_id and role to None to prevent sending invalid data to server
33+
if not application_id:
34+
application_id = None # type: ignore[assignment]
35+
if not role:
36+
role = None # type: ignore[assignment]
1937
return super().remove_role_from_application_user(application_id, role, **kwargs)
2038

2139
def get_app_by_access_key_id(
2240
self, access_key_id: str, **kwargs
2341
) -> ExtendedConductorApplication:
42+
# Convert empty access_key_id to None to prevent sending invalid data to server
43+
if not access_key_id:
44+
access_key_id = None # type: ignore[assignment]
2445
return super().get_app_by_access_key_id(access_key_id, **kwargs)
2546

2647
def get_access_keys(self, id: str, **kwargs):
48+
# Convert empty application id to None to prevent sending invalid data to server
49+
if not id:
50+
id = None # type: ignore[assignment]
2751
return super().get_access_keys(id=id, **kwargs)
2852

2953
def toggle_access_key_status(self, application_id: str, key_id: str, **kwargs):
54+
# Convert empty application_id and key_id to None to prevent sending invalid data to server
55+
if not application_id:
56+
application_id = None # type: ignore[assignment]
57+
if not key_id:
58+
key_id = None # type: ignore[assignment]
3059
return super().toggle_access_key_status(application_id, key_id, **kwargs)
3160

3261
def get_tags_for_application(self, application_id: str, **kwargs) -> List[MetadataTag]: # type: ignore[override]
62+
# Convert empty application_id to None to prevent sending invalid data to server
63+
if not application_id:
64+
application_id = None # type: ignore[assignment]
3365
return super().get_tags_for_application(application_id, **kwargs)
3466

35-
def delete_tag_for_application(self, body: List[MetadataTag], id: str, **kwargs) -> None:
36-
return super().delete_tag_for_application(body, id, **kwargs)
67+
def delete_tag_for_application(self, tag: List[MetadataTag], id: str, **kwargs) -> None: # type: ignore[override]
68+
# Convert empty application id and tag list to None to prevent sending invalid data to server
69+
if not id:
70+
id = None # type: ignore[assignment]
71+
if not tag:
72+
tag = None # type: ignore[assignment]
73+
return super().delete_tag_for_application(tag, id, **kwargs)

tests/unit/asyncio_client/test_rest_adapter.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,6 @@ def test_check_http2_support_success(mock_logger):
141141
result = adapter.check_http2_support("https://example.com")
142142

143143
assert result is True
144-
mock_logger.info.assert_called()
145144

146145

147146
@patch("conductor.client.adapters.rest_adapter.logger")
@@ -161,7 +160,6 @@ def test_check_http2_support_failure(mock_logger):
161160
result = adapter.check_http2_support("https://example.com")
162161

163162
assert result is False
164-
mock_logger.info.assert_called()
165163

166164

167165
@patch("conductor.client.adapters.rest_adapter.logger")

0 commit comments

Comments
 (0)