Skip to content

Commit 333e5b0

Browse files
committed
fix: ProcessingS3Output's s3_uri to be an optional field (5559)
1 parent 272fdbf commit 333e5b0

File tree

2 files changed

+183
-3
lines changed

2 files changed

+183
-3
lines changed

sagemaker-core/src/sagemaker/core/processing.py

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -483,7 +483,48 @@ def _normalize_outputs(self, outputs=None):
483483
# Generate a name for the ProcessingOutput if it doesn't have one.
484484
if output.output_name is None:
485485
output.output_name = "output-{}".format(count)
486-
if output.s3_output and is_pipeline_variable(output.s3_output.s3_uri):
486+
if output.s3_output and output.s3_output.s3_uri is not None and is_pipeline_variable(output.s3_output.s3_uri):
487+
normalized_outputs.append(output)
488+
continue
489+
# If s3_output is None or s3_uri is None, auto-generate an S3 URI
490+
if not output.s3_output or output.s3_output.s3_uri is None:
491+
if _pipeline_config:
492+
s3_uri = Join(
493+
on="/",
494+
values=[
495+
"s3:/",
496+
self.sagemaker_session.default_bucket(),
497+
*(
498+
# don't include default_bucket_prefix if it is None or ""
499+
[self.sagemaker_session.default_bucket_prefix]
500+
if self.sagemaker_session.default_bucket_prefix
501+
else []
502+
),
503+
_pipeline_config.pipeline_name,
504+
ExecutionVariables.PIPELINE_EXECUTION_ID,
505+
_pipeline_config.step_name,
506+
"output",
507+
output.output_name,
508+
],
509+
)
510+
else:
511+
s3_uri = s3.s3_path_join(
512+
"s3://",
513+
self.sagemaker_session.default_bucket(),
514+
self.sagemaker_session.default_bucket_prefix,
515+
self._current_job_name,
516+
"output",
517+
output.output_name,
518+
)
519+
if output.s3_output:
520+
output.s3_output.s3_uri = s3_uri
521+
else:
522+
from sagemaker.core.shapes import ProcessingS3Output as _ProcessingS3Output
523+
output.s3_output = _ProcessingS3Output(
524+
s3_uri=s3_uri,
525+
local_path=output.s3_output.local_path if output.s3_output else "/opt/ml/processing/output",
526+
s3_upload_mode="EndOfJob",
527+
)
487528
normalized_outputs.append(output)
488529
continue
489530
# If the output's s3_uri is not an s3_uri, create one.
@@ -1421,11 +1462,13 @@ def _processing_output_to_request_dict(processing_output):
14211462
}
14221463

14231464
if processing_output.s3_output:
1424-
request_dict["S3Output"] = {
1425-
"S3Uri": processing_output.s3_output.s3_uri,
1465+
s3_output_dict = {
14261466
"LocalPath": processing_output.s3_output.local_path,
14271467
"S3UploadMode": processing_output.s3_output.s3_upload_mode,
14281468
}
1469+
if processing_output.s3_output.s3_uri is not None:
1470+
s3_output_dict["S3Uri"] = processing_output.s3_output.s3_uri
1471+
request_dict["S3Output"] = s3_output_dict
14291472

14301473
return request_dict
14311474

sagemaker-core/tests/unit/test_processing.py

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1385,6 +1385,143 @@ def test_start_new_removes_tags_from_processing_job(self, mock_session):
13851385
assert "tags" not in call_kwargs
13861386

13871387

1388+
class TestProcessingS3OutputOptionalS3Uri:
1389+
"""Tests for ProcessingS3Output with optional s3_uri (issue #5559)."""
1390+
1391+
def test_processing_s3_output_with_none_s3_uri_is_valid(self):
1392+
"""Verify ProcessingS3Output can be instantiated with s3_uri=None."""
1393+
s3_output = ProcessingS3Output(
1394+
s3_uri=None,
1395+
local_path="/opt/ml/processing/output",
1396+
s3_upload_mode="EndOfJob",
1397+
)
1398+
assert s3_output.s3_uri is None
1399+
assert s3_output.local_path == "/opt/ml/processing/output"
1400+
assert s3_output.s3_upload_mode == "EndOfJob"
1401+
1402+
def test_processing_s3_output_without_s3_uri_kwarg_is_valid(self):
1403+
"""Verify ProcessingS3Output can be instantiated without passing s3_uri at all."""
1404+
s3_output = ProcessingS3Output(
1405+
local_path="/opt/ml/processing/output",
1406+
s3_upload_mode="EndOfJob",
1407+
)
1408+
assert s3_output.s3_uri is None
1409+
1410+
def test_normalize_outputs_with_none_s3_uri_generates_s3_path(self, mock_session):
1411+
"""When s3_uri is None, _normalize_outputs should auto-generate an S3 path."""
1412+
processor = Processor(
1413+
role="arn:aws:iam::123456789012:role/SageMakerRole",
1414+
image_uri="test-image:latest",
1415+
instance_count=1,
1416+
instance_type="ml.m5.xlarge",
1417+
sagemaker_session=mock_session,
1418+
)
1419+
processor._current_job_name = "test-job"
1420+
1421+
s3_output = ProcessingS3Output(
1422+
s3_uri=None,
1423+
local_path="/opt/ml/processing/output",
1424+
s3_upload_mode="EndOfJob",
1425+
)
1426+
outputs = [ProcessingOutput(output_name="my-output", s3_output=s3_output)]
1427+
1428+
with patch("sagemaker.core.workflow.utilities._pipeline_config", None):
1429+
result = processor._normalize_outputs(outputs)
1430+
1431+
assert len(result) == 1
1432+
generated_uri = result[0].s3_output.s3_uri
1433+
assert generated_uri.startswith("s3://")
1434+
assert "test-job" in generated_uri
1435+
assert "my-output" in generated_uri
1436+
1437+
def test_normalize_outputs_with_none_s3_uri_and_pipeline_config(self, mock_session):
1438+
"""When s3_uri is None and pipeline_config is set, use pipeline-based path."""
1439+
processor = Processor(
1440+
role="arn:aws:iam::123456789012:role/SageMakerRole",
1441+
image_uri="test-image:latest",
1442+
instance_count=1,
1443+
instance_type="ml.m5.xlarge",
1444+
sagemaker_session=mock_session,
1445+
)
1446+
processor._current_job_name = "test-job"
1447+
1448+
s3_output = ProcessingS3Output(
1449+
s3_uri=None,
1450+
local_path="/opt/ml/processing/output",
1451+
s3_upload_mode="EndOfJob",
1452+
)
1453+
outputs = [ProcessingOutput(output_name="my-output", s3_output=s3_output)]
1454+
1455+
with patch("sagemaker.core.workflow.utilities._pipeline_config") as mock_config:
1456+
mock_config.pipeline_name = "test-pipeline"
1457+
mock_config.step_name = "test-step"
1458+
result = processor._normalize_outputs(outputs)
1459+
1460+
assert len(result) == 1
1461+
# The result should be a Join object (pipeline variable) when pipeline_config is set
1462+
assert result[0].s3_output.s3_uri is not None
1463+
1464+
def test_normalize_outputs_with_none_s3_uri_auto_generates_name(self, mock_session):
1465+
"""When output_name is None and s3_uri is None, both should be auto-generated."""
1466+
processor = Processor(
1467+
role="arn:aws:iam::123456789012:role/SageMakerRole",
1468+
image_uri="test-image:latest",
1469+
instance_count=1,
1470+
instance_type="ml.m5.xlarge",
1471+
sagemaker_session=mock_session,
1472+
)
1473+
processor._current_job_name = "test-job"
1474+
1475+
s3_output = ProcessingS3Output(
1476+
s3_uri=None,
1477+
local_path="/opt/ml/processing/output",
1478+
s3_upload_mode="EndOfJob",
1479+
)
1480+
outputs = [ProcessingOutput(s3_output=s3_output)]
1481+
1482+
with patch("sagemaker.core.workflow.utilities._pipeline_config", None):
1483+
result = processor._normalize_outputs(outputs)
1484+
1485+
assert len(result) == 1
1486+
assert result[0].output_name == "output-1"
1487+
generated_uri = result[0].s3_output.s3_uri
1488+
assert generated_uri.startswith("s3://")
1489+
assert "output-1" in generated_uri
1490+
1491+
def test_processing_output_to_request_dict_omits_s3_uri_when_none(self):
1492+
"""Verify _processing_output_to_request_dict omits S3Uri when s3_uri is None."""
1493+
s3_output = ProcessingS3Output(
1494+
s3_uri=None,
1495+
local_path="/opt/ml/processing/output",
1496+
s3_upload_mode="EndOfJob",
1497+
)
1498+
processing_output = ProcessingOutput(output_name="results", s3_output=s3_output)
1499+
1500+
result = _processing_output_to_request_dict(processing_output)
1501+
1502+
assert result["OutputName"] == "results"
1503+
assert "S3Output" in result
1504+
assert "S3Uri" not in result["S3Output"]
1505+
assert result["S3Output"]["LocalPath"] == "/opt/ml/processing/output"
1506+
assert result["S3Output"]["S3UploadMode"] == "EndOfJob"
1507+
1508+
def test_processing_output_to_request_dict_includes_s3_uri_when_set(self):
1509+
"""Regression test: S3Uri is included when s3_uri is provided."""
1510+
s3_output = ProcessingS3Output(
1511+
s3_uri="s3://bucket/output",
1512+
local_path="/opt/ml/processing/output",
1513+
s3_upload_mode="EndOfJob",
1514+
)
1515+
processing_output = ProcessingOutput(output_name="results", s3_output=s3_output)
1516+
1517+
result = _processing_output_to_request_dict(processing_output)
1518+
1519+
assert result["OutputName"] == "results"
1520+
assert result["S3Output"]["S3Uri"] == "s3://bucket/output"
1521+
assert result["S3Output"]["LocalPath"] == "/opt/ml/processing/output"
1522+
assert result["S3Output"]["S3UploadMode"] == "EndOfJob"
1523+
1524+
13881525
# Additional tests from test_processing_extended.py
13891526
class TestProcessorBasics:
13901527
"""Test cases for basic Processor functionality"""

0 commit comments

Comments
 (0)