Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ repos:
rev: v0.12.3
hooks:
- id: ruff
args: [""]
args: ["--exit-zero"]
2 changes: 1 addition & 1 deletion src/conductor/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.1.10"
__version__ = "1.1.10"
2 changes: 1 addition & 1 deletion src/conductor/client/automator/task_handler.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import importlib
import logging
import os
from multiprocessing import Process, freeze_support, Queue, set_start_method, get_context
from multiprocessing import Process, freeze_support, Queue, set_start_method
from sys import platform
from typing import List

Expand Down
5 changes: 2 additions & 3 deletions src/conductor/client/automator/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def run(self) -> None:
while True:
try:
self.run_once()
except Exception as e:
except Exception:
pass

def run_once(self) -> None:
Expand Down Expand Up @@ -229,14 +229,13 @@ def __set_worker_properties(self) -> None:
if polling_interval:
try:
self.worker.poll_interval = float(polling_interval)
except Exception as e:
except Exception:
logger.error(f'error reading and parsing the polling interval value {polling_interval}')
self.worker.poll_interval = self.worker.get_polling_interval_in_seconds()

if polling_interval:
try:
self.worker.poll_interval = float(polling_interval)
polling_interval_initialized = True
except Exception as e:
logger.error("Exception in reading polling interval from environment variable: {0}.".format(str(e)))

Expand Down
8 changes: 4 additions & 4 deletions src/conductor/client/automator/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def convert_from_dict(cls: type, data: dict) -> object:
if data is None:
return data

if type(data) == cls:
if isinstance(data, cls):
return data

if dataclasses.is_dataclass(cls):
Expand All @@ -53,7 +53,7 @@ def convert_from_dict(cls: type, data: dict) -> object:
if not ((str(typ).startswith('dict[') or
str(typ).startswith('typing.Dict[') or
str(typ).startswith('requests.structures.CaseInsensitiveDict[') or
typ == dict or str(typ).startswith('OrderedDict['))):
isinstance(typ, dict) or str(typ).startswith('OrderedDict['))):
data = {}

members = inspect.signature(cls.__init__).parameters
Expand Down Expand Up @@ -81,7 +81,7 @@ def convert_from_dict(cls: type, data: dict) -> object:
elif (str(typ).startswith('dict[') or
str(typ).startswith('typing.Dict[') or
str(typ).startswith('requests.structures.CaseInsensitiveDict[') or
typ == dict or str(typ).startswith('OrderedDict[')):
isinstance(typ, dict) or str(typ).startswith('OrderedDict[')):

values = {}
generic_type = object
Expand Down Expand Up @@ -116,7 +116,7 @@ def get_value(typ: type, val: object) -> object:
values.append(converted)
return values
elif str(typ).startswith('dict[') or str(typ).startswith(
'typing.Dict[') or str(typ).startswith('requests.structures.CaseInsensitiveDict[') or typ == dict:
'typing.Dict[') or str(typ).startswith('requests.structures.CaseInsensitiveDict[') or isinstance(typ, dict):
values = {}
for k in val:
v = val[k]
Expand Down
10 changes: 5 additions & 5 deletions src/conductor/client/helpers/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def __deserialize(self, data, klass):
if data is None:
return None

if type(klass) == str:
if isinstance(klass, str):
if klass.startswith('list['):
sub_kls = re.match(r'list\[(.*)\]', klass).group(1)
return [self.__deserialize(sub_data, sub_kls)
Expand All @@ -90,11 +90,11 @@ def __deserialize(self, data, klass):

if klass in self.PRIMITIVE_TYPES:
return self.__deserialize_primitive(data, klass)
elif klass == object:
elif klass is object:
return self.__deserialize_object(data)
elif klass == datetime.date:
elif klass is datetime.date:
return self.__deserialize_date(data)
elif klass == datetime.datetime:
elif klass is datetime.datetime:
return self.__deserialize_datatime(data)
else:
return self.__deserialize_model(data, klass)
Expand All @@ -108,7 +108,7 @@ def __deserialize_primitive(self, data, klass):
:return: int, long, float, str, bool.
"""
try:
if klass == str and type(data) == bytes:
if isinstance(klass, str) and isinstance(data, bytes):
return self.__deserialize_bytes_to_str(data)
return klass(data)
except UnicodeEncodeError:
Expand Down
2 changes: 1 addition & 1 deletion src/conductor/client/metadata_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,4 @@ def set_workflow_tags(self, tags: List[MetadataTag], workflow_name: str):
pass

def delete_workflow_tag(self, tag: MetadataTag, workflow_name: str):
pass
pass
5 changes: 2 additions & 3 deletions src/conductor/client/orkes/orkes_schema_client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from typing import List, Optional
from typing import List

from conductor.client.configuration.configuration import Configuration
from conductor.client.http.models.schema_def import SchemaDef
from conductor.client.http.rest import ApiException
from conductor.client.orkes.orkes_base_client import OrkesBaseClient
from conductor.client.schema_client import SchemaClient

Expand All @@ -24,4 +23,4 @@ def delete_schema(self, schema_name: str, version: int) -> None:
self.schemaApi.delete_schema_by_name_and_version(name=schema_name, version=version)

def delete_schema_by_name(self, schema_name: str) -> None:
self.schemaApi.delete_schema_by_name(name=schema_name)
self.schemaApi.delete_schema_by_name(name=schema_name)
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,4 @@ def get_queue_sizes_for_all_tasks(self) -> dict:
def is_circuit_breaker_open(self, name: str) -> bool:
"""Check if circuit breaker is open for a service"""
status = self.get_circuit_breaker_status(name)
return status.current_state and status.current_state.upper() == "OPEN"
return status.current_state and status.current_state.upper() == "OPEN"
2 changes: 1 addition & 1 deletion src/conductor/client/orkes/orkes_workflow_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,4 +212,4 @@ def update_state(self, workflow_id: str, update_requesst: WorkflowStateUpdate,
if wait_for_seconds is not None:
kwargs['wait_for_seconds'] = wait_for_seconds

return self.workflowResourceApi.update_workflow_and_task_state(update_requesst=update_requesst, workflow_id=workflow_id, **kwargs)
return self.workflowResourceApi.update_workflow_and_task_state(update_requesst=update_requesst, workflow_id=workflow_id, **kwargs)
2 changes: 1 addition & 1 deletion src/conductor/client/orkes_clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,4 @@ def get_prompt_client(self) -> PromptClient:
return OrkesPromptClient(self.configuration)

def get_schema_client(self) -> SchemaClient:
return OrkesSchemaClient(self.configuration)
return OrkesSchemaClient(self.configuration)
4 changes: 1 addition & 3 deletions src/conductor/client/prompt_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@
from typing import List

# python 2 and python 3 compatibility library
import six

from conductor.client.http.api_client import ApiClient
from conductor.client.http.models.prompt_template import PromptTemplate
from conductor.client.orkes.models.metadata_tag import MetadataTag

Expand Down Expand Up @@ -45,4 +43,4 @@ def delete_tag_for_prompt_template(self, prompt_name: str, tags: List[MetadataTa
@abstractmethod
def test_prompt(self, prompt_text: str, variables: dict, ai_integration: str, text_complete_model: str,
temperature : float = 0.1, top_p : float = 0.9, stop_words: List[str] = None) -> str:
pass
pass
6 changes: 1 addition & 5 deletions src/conductor/client/schema_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,8 @@
from typing import List

# python 2 and python 3 compatibility library
import six

from conductor.client.http.api_client import ApiClient
from conductor.client.http.models.prompt_template import PromptTemplate
from conductor.client.http.models.schema_def import SchemaDef
from conductor.client.orkes.models.metadata_tag import MetadataTag


class SchemaClient(ABC):
Expand Down Expand Up @@ -48,4 +44,4 @@ def delete_schema_by_name(self, schema_name: str) -> None:
"""
Delete all the versions of a schema by its name
"""
pass
pass
2 changes: 1 addition & 1 deletion src/conductor/client/service_registry_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,4 @@ def get_all_protos(self, registry_name: str) -> List[ProtoRegistryEntry]:

@abstractmethod
def discover(self, name: str, create: Optional[bool] = False) -> List[ServiceMethod]:
pass
pass
2 changes: 1 addition & 1 deletion src/conductor/client/task_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,4 @@ def get_task_logs(self, task_id: str) -> List[TaskExecLog]:

@abstractmethod
def get_task_poll_data(self, task_type: str) -> List[PollData]:
pass
pass
4 changes: 2 additions & 2 deletions src/conductor/client/telemetry/metrics_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ class MetricsCollector:
must_collect_metrics = False

def __init__(self, settings: MetricsSettings):
if settings != None:
if settings is not None:
os.environ["PROMETHEUS_MULTIPROC_DIR"] = settings.directory
MultiProcessCollector(self.registry)
self.must_collect_metrics = True

@staticmethod
def provide_metrics(settings: MetricsSettings) -> None:
if settings == None:
if settings is None:
return
OUTPUT_FILE_PATH = os.path.join(
settings.directory,
Expand Down
4 changes: 2 additions & 2 deletions src/conductor/client/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def is_callable_input_parameter_a_task(callable: ExecuteTaskFunction, object_typ
if len(parameters) != 1:
return False
parameter = parameters[list(parameters.keys())[0]]
return parameter.annotation == object_type or parameter.annotation == parameter.empty or parameter.annotation == object
return parameter.annotation == object_type or parameter.annotation == parameter.empty or parameter.annotation is object


def is_callable_return_value_of_type(callable: ExecuteTaskFunction, object_type: Any) -> bool:
Expand Down Expand Up @@ -93,7 +93,7 @@ def execute(self, task: Task) -> TaskResult:
task_input[input_name] = None
task_output = self.execute_function(**task_input)

if type(task_output) == TaskResult:
if isinstance(task_output, TaskResult):
task_output.task_id = task.task_id
task_output.workflow_instance_id = task.workflow_instance_id
return task_output
Expand Down
24 changes: 14 additions & 10 deletions src/conductor/client/workflow/conductor_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@
from typing import Any, Dict, List, Union

from shortuuid import uuid
from typing import Dict
from typing_extensions import Self

from conductor.client.http.models import *
from conductor.client.http.models import (
StartWorkflowRequest,
WorkflowDef,
WorkflowRun,
WorkflowTask,
SubWorkflowParams,
)
from conductor.client.http.models.start_workflow_request import IdempotencyStrategy
from conductor.client.workflow.executor.workflow_executor import WorkflowExecutor
from conductor.client.workflow.task.fork_task import ForkTask
Expand Down Expand Up @@ -56,7 +61,7 @@ def version(self) -> int:

@version.setter
def version(self, version: int) -> None:
if version != None and not isinstance(version, int):
if version is not None and not isinstance(version, int):
raise Exception('invalid type')
self._version = deepcopy(version)

Expand All @@ -66,7 +71,7 @@ def description(self) -> str:

@description.setter
def description(self, description: str) -> None:
if description != None and not isinstance(description, str):
if description is not None and not isinstance(description, str):
raise Exception('invalid type')
self._description = deepcopy(description)

Expand Down Expand Up @@ -115,7 +120,7 @@ def disable_status_listener(self) -> Self:
# Workflow output follows similar structure as task input
# See https://conductor.netflix.com/how-tos/Tasks/task-inputs.html for more details
def output_parameters(self, output_parameters: Dict[str, Any]) -> Self:
if output_parameters == None:
if output_parameters is None:
self._output_parameters = {}
return
if not isinstance(output_parameters, dict):
Expand All @@ -135,7 +140,7 @@ def output_parameter(self, key: str, value: Any) -> Self:

# InputTemplate template input to the workflow. Can have combination of variables (e.g. ${workflow.input.abc}) and static values
def input_template(self, input_template: Dict[str, Any]) -> Self:
if input_template == None:
if input_template is None:
self._input_template = {}
return
if not isinstance(input_template, dict):
Expand All @@ -149,7 +154,7 @@ def input_template(self, input_template: Dict[str, Any]) -> Self:
# Variables are set using SET_VARIABLE task. Excellent way to maintain business state
# e.g. Variables can maintain business/user specific states which can be queried and inspected to find out the state of the workflow
def variables(self, variables: Dict[str, Any]) -> Self:
if variables == None:
if variables is None:
self._variables = {}
return
if not isinstance(variables, dict):
Expand All @@ -174,7 +179,6 @@ def input_parameters(self, input_parameters: List[str]) -> Self:
return self

def workflow_input(self, input: dict) -> Self:
keys = list(input.keys())
self.input_template(input)
return self

Expand Down Expand Up @@ -353,13 +357,13 @@ def __call__(self, **kwargs) -> WorkflowRun:

def input(self, json_path: str) -> str:
if json_path is None:
return '${' + f'workflow.input' + '}'
return '${' + 'workflow.input' + '}'
else:
return '${' + f'workflow.input.{json_path}' + '}'

def output(self, json_path: str = None) -> str:
if json_path is None:
return '${' + f'workflow.output' + '}'
return '${' + 'workflow.output' + '}'
else:
return '${' + f'workflow.output.{json_path}' + '}'

Expand Down
17 changes: 15 additions & 2 deletions src/conductor/client/workflow/executor/workflow_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,21 @@
from conductor.client.http.api.metadata_resource_api import MetadataResourceApi
from conductor.client.http.api.task_resource_api import TaskResourceApi
from conductor.client.http.api_client import ApiClient
from conductor.client.http.models import *
from conductor.client.http.models.correlation_ids_search_request import CorrelationIdsSearchRequest
from conductor.client.http.models import (
TaskResult,
Workflow,
WorkflowDef,
WorkflowRun,
WorkflowStatus,
ScrollableSearchResultWorkflowSummary,
StartWorkflowRequest,
SkipTaskRequest,
RerunWorkflowRequest,
SignalResponse,
)
from conductor.client.http.models.correlation_ids_search_request import (
CorrelationIdsSearchRequest,
)
from conductor.client.orkes.orkes_workflow_client import OrkesWorkflowClient


Expand Down
2 changes: 1 addition & 1 deletion src/conductor/client/workflow/task/do_while_task.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from copy import deepcopy
from typing import List, Dict, Any
from typing import List

from typing_extensions import Self

Expand Down
2 changes: 1 addition & 1 deletion src/conductor/client/workflow/task/dynamic_fork_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ def to_workflow_task(self) -> WorkflowTask:
tasks = [
wf_task,
]
if self._join_task != None:
if self._join_task is not None:
tasks.append(self._join_task.to_workflow_task())
return tasks
5 changes: 2 additions & 3 deletions src/conductor/client/workflow/task/http_poll_task.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
from copy import deepcopy
from enum import Enum
from typing import Any, Dict, List, Union
from typing import Any, Dict, List

from typing_extensions import Self

from conductor.client.workflow.task.http_task import HttpTask, HttpInput, HttpMethod
from conductor.client.workflow.task.http_task import HttpMethod
from conductor.client.workflow.task.task import TaskInterface
from conductor.client.workflow.task.task_type import TaskType

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,4 @@ def prompt_variables(self, variables: Dict[str, object]) -> Self:

def prompt_variable(self, variable: str, value: object) -> Self:
self.input_parameters['promptVariables'][variable] = value
return self
return self
Loading
Loading