Skip to content

Commit 1eefa1a

Browse files
author
Rishabh Devnani
committed
feat(pipeline): Make _PipelineExecution a public class
Renames _PipelineExecution to PipelineExecution and exports it from the workflow module. All internal references updated. Closes #4391 --- X-AI-Prompt: Make _PipelineExecution public per GitHub issue 4391 X-AI-Tool: kiro-cli
1 parent 02e864d commit 1eefa1a

File tree

4 files changed

+176
-21
lines changed

4 files changed

+176
-21
lines changed

sagemaker-mlops/src/sagemaker/mlops/workflow/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
__version__ = "0.1.0"
2020

2121
# Pipeline and configuration
22-
from sagemaker.mlops.workflow.pipeline import Pipeline, PipelineGraph
22+
from sagemaker.mlops.workflow.pipeline import Pipeline, PipelineGraph, PipelineExecution
2323
from sagemaker.mlops.workflow.pipeline_experiment_config import (
2424
PipelineExperimentConfig,
2525
PipelineExperimentConfigProperty,
@@ -74,6 +74,7 @@
7474
__all__ = [
7575
# Pipeline and configuration
7676
"Pipeline",
77+
"PipelineExecution",
7778
"PipelineGraph",
7879
"PipelineExperimentConfig",
7980
"PipelineExperimentConfigProperty",

sagemaker-mlops/src/sagemaker/mlops/workflow/pipeline.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,7 @@ def start(
406406
specified, uses the latest version ID.
407407
408408
Returns:
409-
A `_PipelineExecution` instance, if successful.
409+
A `PipelineExecution` instance, if successful.
410410
"""
411411
if selective_execution_config is not None:
412412
if (
@@ -438,7 +438,7 @@ def start(
438438
lambda: self.sagemaker_session.sagemaker_client.start_pipeline_execution(**kwargs),
439439
botocore_client_error_code="AccessDeniedException",
440440
)
441-
return _PipelineExecution(
441+
return PipelineExecution(
442442
arn=response["PipelineExecutionArn"],
443443
sagemaker_session=self.sagemaker_session,
444444
)
@@ -602,7 +602,7 @@ def _get_parameters_for_execution(self, pipeline_execution_arn: str) -> Dict[str
602602
Returns:
603603
A parameter dict from the execution.
604604
"""
605-
pipeline_execution = _PipelineExecution(
605+
pipeline_execution = PipelineExecution(
606606
arn=pipeline_execution_arn,
607607
sagemaker_session=self.sagemaker_session,
608608
)
@@ -950,8 +950,21 @@ def _generate_step_map(steps: Sequence[Step], step_map: dict):
950950

951951

952952
@attr.s
953-
class _PipelineExecution:
954-
"""Internal class for encapsulating pipeline execution instances.
953+
class PipelineExecution:
954+
"""Encapsulates a pipeline execution instance.
955+
956+
This class can be used to interact with pipeline executions that were
957+
started from any source (Python SDK, Studio UI, console, etc.).
958+
959+
Example::
960+
961+
execution = PipelineExecution(
962+
arn="arn:aws:sagemaker:us-west-2:123456789012:pipeline/my-pipeline/execution/abc123",
963+
sagemaker_session=sagemaker_session,
964+
)
965+
execution.describe()
966+
execution.wait()
967+
execution.list_steps()
955968
956969
Attributes:
957970
arn (str): The arn of the pipeline execution.

sagemaker-mlops/tests/unit/workflow/test_pipeline.py

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ def test_pipeline_get_latest_execution_arn_none(mock_session, mock_step):
175175

176176

177177
def test_pipeline_build_parameters_from_execution(mock_session, mock_step):
178-
from sagemaker.mlops.workflow.pipeline import _PipelineExecution
178+
from sagemaker.mlops.workflow.pipeline import PipelineExecution
179179
pipeline = Pipeline(name="test-pipeline", steps=[mock_step], sagemaker_session=mock_session)
180180

181181
mock_session.sagemaker_client.list_pipeline_parameters_for_execution.return_value = {
@@ -268,43 +268,43 @@ def test_pipeline_delete_triggers_not_found(mock_session, mock_step):
268268

269269

270270
def test_pipeline_execution_stop(mock_session):
271-
from sagemaker.mlops.workflow.pipeline import _PipelineExecution
271+
from sagemaker.mlops.workflow.pipeline import PipelineExecution
272272

273-
execution = _PipelineExecution(arn="arn", sagemaker_session=mock_session)
273+
execution = PipelineExecution(arn="arn", sagemaker_session=mock_session)
274274
execution.stop()
275275
mock_session.sagemaker_client.stop_pipeline_execution.assert_called_once()
276276

277277

278278
def test_pipeline_execution_describe(mock_session):
279-
from sagemaker.mlops.workflow.pipeline import _PipelineExecution
279+
from sagemaker.mlops.workflow.pipeline import PipelineExecution
280280

281-
execution = _PipelineExecution(arn="arn", sagemaker_session=mock_session)
281+
execution = PipelineExecution(arn="arn", sagemaker_session=mock_session)
282282
execution.describe()
283283
mock_session.sagemaker_client.describe_pipeline_execution.assert_called_once()
284284

285285

286286
def test_pipeline_execution_list_steps(mock_session):
287-
from sagemaker.mlops.workflow.pipeline import _PipelineExecution
287+
from sagemaker.mlops.workflow.pipeline import PipelineExecution
288288

289289
mock_session.sagemaker_client.list_pipeline_execution_steps.return_value = {"PipelineExecutionSteps": []}
290-
execution = _PipelineExecution(arn="arn", sagemaker_session=mock_session)
290+
execution = PipelineExecution(arn="arn", sagemaker_session=mock_session)
291291
result = execution.list_steps()
292292
assert result == []
293293

294294

295295
def test_pipeline_execution_list_parameters(mock_session):
296-
from sagemaker.mlops.workflow.pipeline import _PipelineExecution
296+
from sagemaker.mlops.workflow.pipeline import PipelineExecution
297297

298-
execution = _PipelineExecution(arn="arn", sagemaker_session=mock_session)
298+
execution = PipelineExecution(arn="arn", sagemaker_session=mock_session)
299299
execution.list_parameters(max_results=10, next_token="token")
300300
mock_session.sagemaker_client.list_pipeline_parameters_for_execution.assert_called_once()
301301

302302

303303
def test_pipeline_execution_wait(mock_session):
304-
from sagemaker.mlops.workflow.pipeline import _PipelineExecution
304+
from sagemaker.mlops.workflow.pipeline import PipelineExecution
305305
import botocore.waiter
306306

307-
execution = _PipelineExecution(arn="arn", sagemaker_session=mock_session)
307+
execution = PipelineExecution(arn="arn", sagemaker_session=mock_session)
308308
with patch("botocore.waiter.create_waiter_with_client") as mock_waiter:
309309
mock_waiter.return_value.wait = Mock()
310310
execution.wait(delay=10, max_attempts=5)
@@ -476,22 +476,22 @@ def test_pipeline_list_versions(mock_session, mock_step):
476476

477477

478478
def test_pipeline_execution_result_waiter_error(mock_session):
479-
from sagemaker.mlops.workflow.pipeline import _PipelineExecution
479+
from sagemaker.mlops.workflow.pipeline import PipelineExecution
480480
from botocore.exceptions import WaiterError
481481

482-
execution = _PipelineExecution(arn="arn:aws:sagemaker:us-west-2:123456789012:pipeline/test/execution/exec-id", sagemaker_session=mock_session)
482+
execution = PipelineExecution(arn="arn:aws:sagemaker:us-west-2:123456789012:pipeline/test/execution/exec-id", sagemaker_session=mock_session)
483483

484484
with patch.object(execution, "wait", side_effect=WaiterError("name", "reason", {})):
485485
with pytest.raises(WaiterError):
486486
execution.result("step1")
487487

488488

489489
def test_pipeline_execution_result_terminal_failure(mock_session):
490-
from sagemaker.mlops.workflow.pipeline import _PipelineExecution
490+
from sagemaker.mlops.workflow.pipeline import PipelineExecution
491491
from botocore.exceptions import WaiterError
492492
from sagemaker.core.remote_function.job import JOBS_CONTAINER_ENTRYPOINT
493493

494-
execution = _PipelineExecution(arn="arn:aws:sagemaker:us-west-2:123456789012:pipeline/test/execution/exec-id", sagemaker_session=mock_session)
494+
execution = PipelineExecution(arn="arn:aws:sagemaker:us-west-2:123456789012:pipeline/test/execution/exec-id", sagemaker_session=mock_session)
495495
mock_session.sagemaker_client.list_pipeline_execution_steps.return_value = {
496496
"PipelineExecutionSteps": [{"StepName": "step1", "Metadata": {"TrainingJob": {"Arn": "arn:aws:sagemaker:us-west-2:123456789012:training-job/job"}}}]
497497
}
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "markdown",
5+
"id": "a1b2c3d4",
6+
"metadata": {},
7+
"source": [
8+
"# Interact with an Existing Pipeline Execution\n",
9+
"Use `PipelineExecution` to monitor and control pipeline executions started from any source (SDK, Studio UI, console, etc.)"
10+
]
11+
},
12+
{
13+
"cell_type": "code",
14+
"execution_count": null,
15+
"id": "b1c2d3e4",
16+
"metadata": {},
17+
"outputs": [],
18+
"source": [
19+
"from sagemaker.mlops.workflow.pipeline import PipelineExecution\n",
20+
"from sagemaker.core.helper.session_helper import Session"
21+
]
22+
},
23+
{
24+
"cell_type": "code",
25+
"execution_count": null,
26+
"id": "c1d2e3f4",
27+
"metadata": {},
28+
"outputs": [],
29+
"source": [
30+
"sagemaker_session = Session()"
31+
]
32+
},
33+
{
34+
"cell_type": "markdown",
35+
"id": "d1e2f3a4",
36+
"metadata": {},
37+
"source": [
38+
"## Attach to an existing execution\n",
39+
"Replace the ARN below with a pipeline execution ARN from your account. You can find this in the Studio UI or by calling `list_pipeline_executions`."
40+
]
41+
},
42+
{
43+
"cell_type": "code",
44+
"execution_count": null,
45+
"id": "e1f2a3b4",
46+
"metadata": {},
47+
"outputs": [],
48+
"source": [
49+
"execution = PipelineExecution(\n",
50+
" arn=\"arn:aws:sagemaker:<region>:<account-id>:pipeline/<pipeline-name>/execution/<execution-id>\",\n",
51+
" sagemaker_session=sagemaker_session,\n",
52+
")"
53+
]
54+
},
55+
{
56+
"cell_type": "markdown",
57+
"id": "f1a2b3c4",
58+
"metadata": {},
59+
"source": [
60+
"## Describe the execution"
61+
]
62+
},
63+
{
64+
"cell_type": "code",
65+
"execution_count": null,
66+
"id": "a2b3c4d5",
67+
"metadata": {},
68+
"outputs": [],
69+
"source": [
70+
"execution.describe()"
71+
]
72+
},
73+
{
74+
"cell_type": "markdown",
75+
"id": "b2c3d4e5",
76+
"metadata": {},
77+
"source": [
78+
"## List execution steps"
79+
]
80+
},
81+
{
82+
"cell_type": "code",
83+
"execution_count": null,
84+
"id": "c2d3e4f5",
85+
"metadata": {},
86+
"outputs": [],
87+
"source": [
88+
"execution.list_steps()"
89+
]
90+
},
91+
{
92+
"cell_type": "markdown",
93+
"id": "d2e3f4a5",
94+
"metadata": {},
95+
"source": [
96+
"## Wait for completion"
97+
]
98+
},
99+
{
100+
"cell_type": "code",
101+
"execution_count": null,
102+
"id": "e2f3a4b5",
103+
"metadata": {},
104+
"outputs": [],
105+
"source": [
106+
"execution.wait()"
107+
]
108+
},
109+
{
110+
"cell_type": "markdown",
111+
"id": "f2a3b4c5",
112+
"metadata": {},
113+
"source": [
114+
"## Stop an execution"
115+
]
116+
},
117+
{
118+
"cell_type": "code",
119+
"execution_count": null,
120+
"id": "a3b4c5d6",
121+
"metadata": {},
122+
"outputs": [],
123+
"source": [
124+
"# execution.stop()"
125+
]
126+
}
127+
],
128+
"metadata": {
129+
"kernelspec": {
130+
"display_name": "Python 3 (ipykernel)",
131+
"language": "python",
132+
"name": "python3"
133+
},
134+
"language_info": {
135+
"name": "python",
136+
"version": "3.10.14"
137+
}
138+
},
139+
"nbformat": 4,
140+
"nbformat_minor": 5
141+
}

0 commit comments

Comments
 (0)