Skip to content

Commit c4fb6b8

Browse files
feat: UN-2166 Passed optional container name from backend to runner (#1158)
* UN-2166 Passed optional container name from backend to runner * Removed tool-sandbox from lockfile automation script - since core is a private lib that's added, the lockfile can't be resolved and pushed * minor: Addressed review comments for renaming a variable
1 parent 22f3757 commit c4fb6b8

13 files changed

Lines changed: 99 additions & 60 deletions

File tree

backend/workflow_manager/workflow_v2/execution.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -231,9 +231,7 @@ def build(self) -> None:
231231
)
232232
raise WorkflowExecutionError(self.compilation_result["problems"][0])
233233

234-
def execute(
235-
self, file_execution_id: str, file_name: str, single_step: bool = False
236-
) -> None:
234+
def execute(self, file_execution_id: str, single_step: bool = False) -> None:
237235
execution_type = ExecutionType.COMPLETE
238236
if single_step:
239237
execution_type = ExecutionType.STEP
@@ -256,7 +254,6 @@ def execute(
256254
try:
257255
self.execute_workflow(
258256
file_execution_id=file_execution_id,
259-
file_name=file_name,
260257
execution_type=execution_type,
261258
)
262259
end_time = time.time()
@@ -356,7 +353,13 @@ def execute_input_file(
356353
)
357354
workflow_file_execution.update_status(ExecutionStatus.EXECUTING)
358355

359-
self.execute(file_execution_id, file_name, single_step)
356+
logger.info(
357+
f"Running execution: '{self.execution_id}', "
358+
f"file_execution_id: '{file_execution_id}', "
359+
f"file '{file_name}'"
360+
)
361+
362+
self.execute(file_execution_id, single_step)
360363
self.publish_log(f"Tool executed successfully for '{file_name}'")
361364
self._handle_execution_type(execution_type)
362365

docker/scripts/pdm-lock-gen/pdm-lock.sh

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@ directories=(
7979
"platform-service"
8080
"x2text-service"
8181
"unstract/connectors"
82-
"unstract/tool-sandbox"
8382
)
8483

8584
# If directories are passed as arguments, override the default

runner/src/unstract/runner/clients/docker.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -158,17 +158,24 @@ def get_image(self) -> str:
158158
def get_container_run_config(
159159
self,
160160
command: list[str],
161-
run_id: str,
161+
file_execution_id: str,
162+
container_name: Optional[str] = None,
162163
envs: Optional[dict[str, Any]] = None,
163164
auto_remove: bool = False,
164165
) -> dict[str, Any]:
165166
if envs is None:
166167
envs = {}
167168
mounts = []
169+
170+
if not container_name:
171+
container_name = UnstractUtils.build_tool_container_name(
172+
tool_image=self.image_name,
173+
tool_version=self.image_tag,
174+
file_execution_id=file_execution_id,
175+
)
176+
168177
return {
169-
"name": UnstractUtils.build_tool_container_name(
170-
tool_image=self.image_name, tool_version=self.image_tag, run_id=run_id
171-
),
178+
"name": container_name,
172179
"image": self.get_image(),
173180
"command": command,
174181
"detach": True,

runner/src/unstract/runner/clients/interface.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ def get_image(self) -> str:
6262
def get_container_run_config(
6363
self,
6464
command: list[str],
65-
run_id: str,
65+
file_execution_id: str,
66+
container_name: Optional[str] = None,
6667
envs: Optional[dict[str, Any]] = None,
6768
auto_remove: bool = False,
6869
) -> dict[str, Any]:

runner/src/unstract/runner/clients/test_docker.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -109,19 +109,21 @@ def test_get_image(docker_client, mocker):
109109
def test_get_container_run_config(docker_client, mocker):
110110
"""Test the get_container_run_config method."""
111111
command = ["echo", "hello"]
112-
run_id = "run123"
112+
file_execution_id = "run123"
113113

114114
mocker.patch.object(docker_client, "_Client__image_exists", return_value=True)
115115
mocker_normalize = mocker.patch(
116116
"unstract.core.utilities.UnstractUtils.build_tool_container_name",
117117
return_value="test-image",
118118
)
119119
config = docker_client.get_container_run_config(
120-
command, run_id, envs={"KEY": "VALUE"}, auto_remove=True
120+
command, file_execution_id, envs={"KEY": "VALUE"}, auto_remove=True
121121
)
122122

123123
mocker_normalize.assert_called_once_with(
124-
tool_image="test-image", tool_version="latest", run_id=run_id
124+
tool_image="test-image",
125+
tool_version="latest",
126+
file_execution_id=file_execution_id,
125127
)
126128
assert config["name"] == "test-image"
127129
assert config["image"] == "test-image:latest"
@@ -134,17 +136,21 @@ def test_get_container_run_config_without_mount(docker_client, mocker):
134136
"""Test the get_container_run_config method."""
135137
os.environ[Env.EXECUTION_DATA_DIR] = "/source"
136138
command = ["echo", "hello"]
137-
run_id = "run123"
139+
file_execution_id = "run123"
138140

139141
mocker.patch.object(docker_client, "_Client__image_exists", return_value=True)
140142
mocker_normalize = mocker.patch(
141143
"unstract.core.utilities.UnstractUtils.build_tool_container_name",
142144
return_value="test-image",
143145
)
144-
config = docker_client.get_container_run_config(command, run_id, auto_remove=True)
146+
config = docker_client.get_container_run_config(
147+
command, file_execution_id, auto_remove=True
148+
)
145149

146150
mocker_normalize.assert_called_once_with(
147-
tool_image="test-image", tool_version="latest", run_id=run_id
151+
tool_image="test-image",
152+
tool_version="latest",
153+
file_execution_id=file_execution_id,
148154
)
149155
assert config["name"] == "test-image"
150156
assert config["image"] == "test-image:latest"

runner/src/unstract/runner/main.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,19 @@ def run_container() -> Optional[Any]:
2828
organization_id = data["organization_id"]
2929
workflow_id = data["workflow_id"]
3030
execution_id = data["execution_id"]
31-
run_id = data["run_id"]
31+
file_execution_id = data["file_execution_id"]
32+
container_name = data["container_name"]
3233
settings = data["settings"]
3334
envs = data["envs"]
3435
messaging_channel = data["messaging_channel"]
3536

3637
runner = UnstractRunner(image_name, image_tag, app)
3738
result = runner.run_container(
39+
container_name=container_name,
3840
organization_id=organization_id,
3941
workflow_id=workflow_id,
4042
execution_id=execution_id,
41-
run_id=run_id,
43+
file_execution_id=file_execution_id,
4244
settings=settings,
4345
envs=envs,
4446
messaging_channel=messaging_channel,

runner/src/unstract/runner/runner.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ def stream_logs(
3939
tool_instance_id: str,
4040
execution_id: str,
4141
organization_id: str,
42-
run_id: str,
42+
file_execution_id: str,
4343
channel: Optional[str] = None,
4444
) -> None:
4545
for line in container.logs(follow=True):
@@ -51,7 +51,7 @@ def stream_logs(
5151
channel=channel,
5252
execution_id=execution_id,
5353
organization_id=organization_id,
54-
run_id=run_id,
54+
file_execution_id=file_execution_id,
5555
)
5656

5757
def get_valid_log_message(self, log_message: str) -> Optional[dict[str, Any]]:
@@ -76,7 +76,7 @@ def process_log_message(
7676
tool_instance_id: str,
7777
execution_id: str,
7878
organization_id: str,
79-
run_id: str,
79+
file_execution_id: str,
8080
channel: Optional[str] = None,
8181
) -> Optional[dict[str, Any]]:
8282
log_dict = self.get_valid_log_message(log_message)
@@ -99,7 +99,7 @@ def process_log_message(
9999
log_dict[LogFieldName.EXECUTION_ID] = execution_id
100100
log_dict[LogFieldName.ORGANIZATION_ID] = organization_id
101101
log_dict[LogFieldName.TIMESTAMP] = datetime.now(timezone.utc).timestamp()
102-
log_dict[LogFieldName.FILE_EXECUTION_ID] = run_id
102+
log_dict[LogFieldName.FILE_EXECUTION_ID] = file_execution_id
103103

104104
# Publish to channel of socket io
105105
LogPublisher.publish(channel, log_dict)
@@ -139,7 +139,7 @@ def run_command(self, command: str) -> Optional[Any]:
139139
"""
140140
command = command.upper()
141141
container_config = self.client.get_container_run_config(
142-
command=["--command", command], run_id="", auto_remove=True
142+
command=["--command", command], file_execution_id="", auto_remove=True
143143
)
144144
container = None
145145

@@ -163,10 +163,11 @@ def run_container(
163163
organization_id: str,
164164
workflow_id: str,
165165
execution_id: str,
166-
run_id: str,
166+
file_execution_id: str,
167167
settings: dict[str, Any],
168168
envs: dict[str, Any],
169169
messaging_channel: Optional[str] = None,
170+
container_name: Optional[str] = None,
170171
) -> Optional[Any]:
171172
"""RUN container With RUN Command.
172173
@@ -200,7 +201,8 @@ def run_container(
200201
"--log-level",
201202
"DEBUG",
202203
],
203-
run_id=run_id,
204+
file_execution_id=file_execution_id,
205+
container_name=container_name,
204206
envs=envs,
205207
)
206208
# Add labels to container for logging with Loki.
@@ -217,7 +219,7 @@ def run_container(
217219
try:
218220
self.logger.info(
219221
f"Execution ID: {execution_id}, running docker "
220-
f"container: {container_config.get('name')}"
222+
f"container: {container_name}"
221223
)
222224
container: ContainerInterface = self.client.run_container(container_config)
223225
tool_instance_id = str(settings.get(ToolKey.TOOL_INSTANCE_ID))
@@ -228,7 +230,7 @@ def run_container(
228230
channel=messaging_channel,
229231
execution_id=execution_id,
230232
organization_id=organization_id,
231-
run_id=run_id,
233+
file_execution_id=file_execution_id,
232234
)
233235
except ToolRunException as te:
234236
self.logger.error(

unstract/core/src/unstract/core/utilities.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,21 @@ def get_env(env_key: str, default: Optional[str] = None, raise_err=False) -> str
3030

3131
@staticmethod
3232
def build_tool_container_name(
33-
tool_image: str, tool_version: str, run_id: str
33+
tool_image: str,
34+
tool_version: str,
35+
file_execution_id: str,
36+
retry_count: Optional[int] = None,
3437
) -> str:
3538
tool_name = tool_image.split("/")[-1]
36-
# TODO: Add execution attempt to better track instead of uuid
37-
short_uuid = uuid.uuid4().hex[:6] # To avoid duplicate name collision
38-
container_name = f"{tool_name}-{tool_version}-{short_uuid}-{run_id}"
39+
# To avoid duplicate name collision
40+
if retry_count:
41+
unique_suffix = retry_count
42+
else:
43+
unique_suffix = uuid.uuid4().hex[:6]
44+
45+
container_name = (
46+
f"{tool_name}-{tool_version}-{unique_suffix}-{file_execution_id}"
47+
)
3948

4049
# To support limits of container clients like K8s
4150
if len(container_name) > 63:

unstract/tool-sandbox/pyproject.toml

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,18 @@ authors = [
1010
{name = "Zipstack Inc.", email = "devsupport@zipstack.com"},
1111
]
1212
dependencies = [
13-
"requests==2.31.0"
13+
"requests==2.31.0",
14+
# ! IMPORTANT!
15+
# Local dependencies usually need to be added as:
16+
# https://pdm-project.org/latest/usage/dependency/#local-dependencies
17+
#
18+
# However, local dependencies which are not direct depedency of main project
19+
# appear as absolute paths in pdm.lock of main project, making it impossible
20+
# to check in the lock file.
21+
#
22+
# Hence instead, add the dependencies without version constraints where the
23+
# assumption is they are added as direct dependencies in main project itself.
24+
"unstract-core",
1425
]
1526
requires-python = ">=3.9"
1627
readme = "README.md"

unstract/tool-sandbox/src/unstract/tool_sandbox/helper.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
import requests
77
from unstract.tool_sandbox.constants import UnstractRunner
88

9+
from unstract.core.utilities import UnstractUtils
10+
911
logger = logging.getLogger(__name__)
1012

1113

@@ -69,10 +71,11 @@ def make_get_request(
6971

7072
def call_tool_handler(
7173
self,
72-
run_id: str,
74+
file_execution_id: str,
7375
image_name: str,
7476
image_tag: str,
7577
settings: dict[str, Any],
78+
retry_count: Optional[int] = None,
7679
) -> Optional[dict[str, Any]]:
7780
"""Calling unstract runner to run the required tool.
7881
@@ -87,10 +90,7 @@ def call_tool_handler(
8790
"""
8891
url = f"{self.base_url}{UnstractRunner.RUN_API_ENDPOINT}"
8992
data = self.create_tool_request_data(
90-
run_id,
91-
image_name,
92-
image_tag,
93-
settings,
93+
file_execution_id, image_name, image_tag, settings, retry_count
9494
)
9595

9696
response = requests.post(url, json=data)
@@ -110,18 +110,26 @@ def call_tool_handler(
110110

111111
def create_tool_request_data(
112112
self,
113-
run_id: str,
113+
file_execution_id: str,
114114
image_name: str,
115115
image_tag: str,
116116
settings: dict[str, Any],
117+
retry_count: Optional[int] = None,
117118
) -> dict[str, Any]:
119+
container_name = UnstractUtils.build_tool_container_name(
120+
tool_image=image_name,
121+
tool_version=image_tag,
122+
file_execution_id=file_execution_id,
123+
retry_count=retry_count,
124+
)
118125
data = {
119126
"image_name": image_name,
120127
"image_tag": image_tag,
121128
"organization_id": self.organization_id,
122129
"workflow_id": self.workflow_id,
123130
"execution_id": self.execution_id,
124-
"run_id": run_id,
131+
"file_execution_id": file_execution_id,
132+
"container_name": container_name,
125133
"settings": settings,
126134
"envs": self.envs,
127135
"messaging_channel": self.messaging_channel,

0 commit comments

Comments
 (0)