-
Notifications
You must be signed in to change notification settings - Fork 1.2k
fix: ProcessingS3Output's s3_uri to be an optional field (5559)
#5730
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1385,6 +1385,172 @@ def test_start_new_removes_tags_from_processing_job(self, mock_session): | |
| assert "tags" not in call_kwargs | ||
|
|
||
|
|
||
| class TestProcessingS3OutputOptionalS3Uri: | ||
| """Tests for ProcessingS3Output with optional s3_uri (issue #5559).""" | ||
|
|
||
| def test_processing_s3_output_with_none_s3_uri_is_valid(self): | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing change to shapes file. The PR description states that
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. $context sagemaker-core/src/sagemaker/core/shapes/shapes.py |
||
| """Verify ProcessingS3Output can be instantiated with s3_uri=None.""" | ||
| s3_output = ProcessingS3Output( | ||
| s3_uri=None, | ||
| local_path="/opt/ml/processing/output", | ||
| s3_upload_mode="EndOfJob", | ||
| ) | ||
| assert s3_output.s3_uri is None | ||
| assert s3_output.local_path == "/opt/ml/processing/output" | ||
| assert s3_output.s3_upload_mode == "EndOfJob" | ||
|
|
||
| def test_processing_s3_output_without_s3_uri_kwarg_is_valid(self): | ||
| """Verify ProcessingS3Output can be instantiated without passing s3_uri at all.""" | ||
| s3_output = ProcessingS3Output( | ||
| local_path="/opt/ml/processing/output", | ||
| s3_upload_mode="EndOfJob", | ||
| ) | ||
| assert s3_output.s3_uri is None | ||
|
|
||
| def test_normalize_outputs_with_none_s3_uri_generates_s3_path(self, mock_session): | ||
aviruthen marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| """When s3_uri is None, _normalize_outputs should auto-generate an S3 path.""" | ||
| processor = Processor( | ||
| role="arn:aws:iam::123456789012:role/SageMakerRole", | ||
| image_uri="test-image:latest", | ||
| instance_count=1, | ||
| instance_type="ml.m5.xlarge", | ||
| sagemaker_session=mock_session, | ||
| ) | ||
| processor._current_job_name = "test-job" | ||
|
|
||
| s3_output = ProcessingS3Output( | ||
| s3_uri=None, | ||
| local_path="/opt/ml/processing/output", | ||
| s3_upload_mode="EndOfJob", | ||
| ) | ||
| outputs = [ProcessingOutput(output_name="my-output", s3_output=s3_output)] | ||
|
|
||
| with patch("sagemaker.core.workflow.utilities._pipeline_config", None): | ||
| result = processor._normalize_outputs(outputs) | ||
|
|
||
| assert len(result) == 1 | ||
| generated_uri = result[0].s3_output.s3_uri | ||
| assert generated_uri.startswith("s3://") | ||
| assert "test-job" in generated_uri | ||
| assert "my-output" in generated_uri | ||
|
|
||
| def test_normalize_outputs_with_none_s3_uri_and_pipeline_config(self, mock_session): | ||
| """When s3_uri is None and pipeline_config is set, use pipeline-based path.""" | ||
| from sagemaker.core.workflow.functions import Join | ||
|
|
||
| processor = Processor( | ||
| role="arn:aws:iam::123456789012:role/SageMakerRole", | ||
| image_uri="test-image:latest", | ||
| instance_count=1, | ||
| instance_type="ml.m5.xlarge", | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Test may not catch the fall-through bug. This test verifies that the generated URI starts with |
||
| sagemaker_session=mock_session, | ||
| ) | ||
| processor._current_job_name = "test-job" | ||
|
|
||
| s3_output = ProcessingS3Output( | ||
| s3_uri=None, | ||
| local_path="/opt/ml/processing/output", | ||
| s3_upload_mode="EndOfJob", | ||
| ) | ||
| outputs = [ProcessingOutput(output_name="my-output", s3_output=s3_output)] | ||
|
|
||
| with patch("sagemaker.core.workflow.utilities._pipeline_config") as mock_config: | ||
| mock_config.pipeline_name = "test-pipeline" | ||
aviruthen marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| mock_config.step_name = "test-step" | ||
| result = processor._normalize_outputs(outputs) | ||
|
|
||
| assert len(result) == 1 | ||
| # The result should be a Join object (pipeline variable) when pipeline_config is set | ||
| assert isinstance(result[0].s3_output.s3_uri, Join) | ||
| # Verify the Join contains expected pipeline-related values | ||
| join_obj = result[0].s3_output.s3_uri | ||
| assert join_obj.on == "/" | ||
| assert "test-pipeline" in join_obj.values | ||
|
|
||
| def test_normalize_outputs_with_none_s3_uri_auto_generates_name(self, mock_session): | ||
| """When output_name is None and s3_uri is None, both should be auto-generated.""" | ||
| processor = Processor( | ||
| role="arn:aws:iam::123456789012:role/SageMakerRole", | ||
| image_uri="test-image:latest", | ||
| instance_count=1, | ||
| instance_type="ml.m5.xlarge", | ||
| sagemaker_session=mock_session, | ||
| ) | ||
| processor._current_job_name = "test-job" | ||
|
|
||
| s3_output = ProcessingS3Output( | ||
| s3_uri=None, | ||
| local_path="/opt/ml/processing/output", | ||
| s3_upload_mode="EndOfJob", | ||
| ) | ||
| outputs = [ProcessingOutput(s3_output=s3_output)] | ||
|
|
||
| with patch("sagemaker.core.workflow.utilities._pipeline_config", None): | ||
| result = processor._normalize_outputs(outputs) | ||
|
|
||
| assert len(result) == 1 | ||
| assert result[0].output_name == "output-1" | ||
| generated_uri = result[0].s3_output.s3_uri | ||
| assert generated_uri.startswith("s3://") | ||
| assert "output-1" in generated_uri | ||
|
|
||
| def test_normalize_outputs_with_no_s3_output_at_all(self, mock_session): | ||
| """When s3_output is None entirely, a new ProcessingS3Output is created.""" | ||
| processor = Processor( | ||
| role="arn:aws:iam::123456789012:role/SageMakerRole", | ||
| image_uri="test-image:latest", | ||
| instance_count=1, | ||
| instance_type="ml.m5.xlarge", | ||
| sagemaker_session=mock_session, | ||
| ) | ||
| processor._current_job_name = "test-job" | ||
|
|
||
| outputs = [ProcessingOutput(output_name="my-output")] | ||
|
|
||
| with patch("sagemaker.core.workflow.utilities._pipeline_config", None): | ||
| result = processor._normalize_outputs(outputs) | ||
|
|
||
| assert len(result) == 1 | ||
| assert result[0].s3_output is not None | ||
| assert result[0].s3_output.s3_uri.startswith("s3://") | ||
| assert result[0].s3_output.local_path == "/opt/ml/processing/output" | ||
| assert result[0].s3_output.s3_upload_mode == "EndOfJob" | ||
| assert "my-output" in result[0].s3_output.s3_uri | ||
|
|
||
| def test_processing_output_to_request_dict_omits_s3_uri_when_none(self): | ||
| """Verify _processing_output_to_request_dict omits S3Uri when s3_uri is None.""" | ||
| s3_output = ProcessingS3Output( | ||
| s3_uri=None, | ||
| local_path="/opt/ml/processing/output", | ||
| s3_upload_mode="EndOfJob", | ||
| ) | ||
| processing_output = ProcessingOutput(output_name="results", s3_output=s3_output) | ||
|
|
||
| result = _processing_output_to_request_dict(processing_output) | ||
|
|
||
| assert result["OutputName"] == "results" | ||
| assert "S3Output" in result | ||
| assert "S3Uri" not in result["S3Output"] | ||
| assert result["S3Output"]["LocalPath"] == "/opt/ml/processing/output" | ||
| assert result["S3Output"]["S3UploadMode"] == "EndOfJob" | ||
|
|
||
| def test_processing_output_to_request_dict_includes_s3_uri_when_set(self): | ||
| """Regression test: S3Uri is included when s3_uri is provided.""" | ||
| s3_output = ProcessingS3Output( | ||
| s3_uri="s3://bucket/output", | ||
| local_path="/opt/ml/processing/output", | ||
| s3_upload_mode="EndOfJob", | ||
| ) | ||
| processing_output = ProcessingOutput(output_name="results", s3_output=s3_output) | ||
|
|
||
| result = _processing_output_to_request_dict(processing_output) | ||
|
|
||
| assert result["OutputName"] == "results" | ||
| assert result["S3Output"]["S3Uri"] == "s3://bucket/output" | ||
| assert result["S3Output"]["LocalPath"] == "/opt/ml/processing/output" | ||
| assert result["S3Output"]["S3UploadMode"] == "EndOfJob" | ||
|
|
||
|
|
||
| # Additional tests from test_processing_extended.py | ||
| class TestProcessorBasics: | ||
| """Test cases for basic Processor functionality""" | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.