Skip to content

Commit 2e346e1

Browse files
Tests: automator, kafka_publish input
1 parent fd825b4 commit 2e346e1

15 files changed

Lines changed: 651 additions & 83 deletions

File tree

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
1+
from __future__ import annotations
2+
13
from conductor.asyncio_client.http.models import TaskExecLog
4+
from typing import Optional, Any
5+
from pydantic import Field
26

37

4-
class TaskExecLogAdapter(TaskExecLog): ...
8+
class TaskExecLogAdapter(TaskExecLog):
9+
created_time: Optional[Any] = Field(default=None, alias="createdTime")

src/conductor/asyncio_client/automator/task_runner.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from conductor.asyncio_client.adapters.models.task_result_adapter import \
1515
TaskResultAdapter
1616
from conductor.asyncio_client.configuration import Configuration
17-
from conductor.asyncio_client.http.api.task_resource_api import TaskResourceApi
17+
from conductor.asyncio_client.adapters.api.task_resource_api import TaskResourceApiAdapter
1818
from conductor.asyncio_client.http.api_client import ApiClient
1919
from conductor.asyncio_client.http.exceptions import UnauthorizedException
2020
from conductor.asyncio_client.telemetry.metrics_collector import \
@@ -43,7 +43,7 @@ def __init__(
4343
self.metrics_collector = None
4444
if metrics_settings is not None:
4545
self.metrics_collector = MetricsCollector(metrics_settings)
46-
self.task_client = TaskResourceApi(ApiClient(configuration=self.configuration))
46+
self.task_client = TaskResourceApiAdapter(ApiClient(configuration=self.configuration))
4747

4848
async def run(self) -> None:
4949
if self.configuration is not None:

src/conductor/asyncio_client/configuration/configuration.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,15 @@ def __init__(
154154
**kwargs,
155155
)
156156

157+
# Debug switch and logging setup
158+
self.__debug = debug
159+
if self.__debug:
160+
self.__log_level = logging.DEBUG
161+
else:
162+
self.__log_level = logging.INFO
163+
# Log format
164+
self.__logger_format = "%(asctime)s %(name)-12s %(levelname)-8s %(message)s"
165+
157166
# Setup logging
158167
self.logger = logging.getLogger(__name__)
159168
if debug:
@@ -336,8 +345,10 @@ def debug(self, value: bool) -> None:
336345
self._http_config.debug = value
337346
if value:
338347
self.logger.setLevel(logging.DEBUG)
348+
self.__log_level = logging.DEBUG
339349
else:
340350
self.logger.setLevel(logging.WARNING)
351+
self.__log_level = logging.INFO
341352

342353
@property
343354
def api_key(self) -> Dict[str, str]:
@@ -420,6 +431,21 @@ def retries(self, value: Optional[int]) -> None:
420431
"""Set number of retries."""
421432
self._http_config.retries = value
422433

434+
@property
435+
def logger_format(self) -> str:
436+
"""Get logger format."""
437+
return self.__logger_format
438+
439+
@logger_format.setter
440+
def logger_format(self, value: str) -> None:
441+
"""Set logger format."""
442+
self.__logger_format = value
443+
444+
@property
445+
def log_level(self) -> int:
446+
"""Get log level."""
447+
return self.__log_level
448+
423449
def apply_logging_config(self, log_format : Optional[str] = None, level = None):
424450
"""Apply logging configuration for the application."""
425451
if log_format is None:

src/conductor/shared/workflow/models/chat_message.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,4 @@ class ChatMessage(BaseModel):
66
message: str = Field(..., alias="message")
77

88
class Config:
9-
allow_population_by_field_name = True
9+
validate_by_name = True

src/conductor/shared/workflow/models/embedding_model.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,4 @@ class EmbeddingModel(BaseModel):
66
model: str = Field(..., alias="embeddingModel")
77

88
class Config:
9-
allow_population_by_field_name = True
9+
validate_by_name = True

src/conductor/shared/workflow/models/http_input.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,6 @@ class HttpInput(BaseModel):
1818
body: Optional[Any] = Field(None, alias="body")
1919

2020
class Config:
21-
allow_population_by_field_name = True
21+
validate_by_name = True
2222
use_enum_values = True
2323
arbitrary_types_allowed = True

src/conductor/shared/workflow/models/http_poll_input.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ class HttpPollInput(BaseModel):
2323
polling_strategy: str = Field("FIXED", alias="pollingStrategy")
2424

2525
class Config:
26-
allow_population_by_field_name = True
26+
validate_by_name = True
2727
use_enum_values = True
2828
arbitrary_types_allowed = True
2929
json_encoders: ClassVar[Dict[Type[Any], Callable[[Any], Any]]] = {

src/conductor/shared/workflow/models/kafka_publish_input.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ class KafkaPublishInput(BaseModel):
1616
topic: Optional[str] = Field(None, alias="topic")
1717

1818
class Config:
19-
allow_population_by_field_name = True
19+
validate_by_name = True
2020
arbitrary_types_allowed = True

src/conductor/shared/workflow/models/prompt.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,4 @@ class Prompt(BaseModel):
88
variables: Dict[str, Any] = Field(..., alias="promptVariables")
99

1010
class Config:
11-
allow_population_by_field_name = True
11+
validate_by_name = True
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import multiprocessing
2+
3+
import pytest
4+
5+
from conductor.asyncio_client.automator.task_handler import TaskHandler
6+
from conductor.asyncio_client.automator.task_runner import AsyncTaskRunner
7+
from conductor.asyncio_client.configuration.configuration import Configuration
8+
from tests.unit.resources.workers import ClassWorker2
9+
10+
11+
def test_initialization_with_invalid_workers(mocker):
12+
mocker.patch(
13+
"conductor.asyncio_client.automator.task_handler._setup_logging_queue",
14+
return_value=(None, None),
15+
)
16+
with pytest.raises(Exception, match="Invalid worker"):
17+
TaskHandler(
18+
configuration=Configuration("http://localhost:8080/api"),
19+
workers=["invalid-worker"],
20+
)
21+
22+
23+
def test_start_processes(mocker, valid_task_handler):
24+
mocker.patch.object(AsyncTaskRunner, "run", return_value=None)
25+
with valid_task_handler as task_handler:
26+
task_handler.start_processes()
27+
assert len(task_handler.task_runner_processes) == 1
28+
for process in task_handler.task_runner_processes:
29+
assert isinstance(process, multiprocessing.Process)
30+
31+
32+
@pytest.fixture
33+
def valid_task_handler():
34+
return TaskHandler(configuration=Configuration(), workers=[ClassWorker2("task")])

0 commit comments

Comments
 (0)