-
Notifications
You must be signed in to change notification settings - Fork 1.2k
fix: ProcessingS3Output's s3_uri to be an optional field (5559)
#5755
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -51,7 +51,12 @@ | |
| ) | ||
| from sagemaker.core.local.local_session import LocalSession | ||
| from sagemaker.core.helper.session_helper import Session | ||
| from sagemaker.core.shapes import ProcessingInput, ProcessingOutput, ProcessingS3Input | ||
| from sagemaker.core.shapes import ( | ||
| ProcessingInput, | ||
| ProcessingOutput, | ||
| ProcessingS3Input, | ||
| ProcessingS3Output, | ||
| ) | ||
| from sagemaker.core.resources import ProcessingJob | ||
| from sagemaker.core.workflow.pipeline_context import PipelineSession | ||
| from sagemaker.core.common_utils import ( | ||
|
|
@@ -86,6 +91,11 @@ | |
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| # Default values used when creating a ProcessingS3Output for outputs | ||
| # that don't have an explicit s3_output configured. | ||
| DEFAULT_PROCESSING_LOCAL_OUTPUT_PATH = "/opt/ml/processing/output" | ||
| DEFAULT_S3_UPLOAD_MODE = "EndOfJob" | ||
|
|
||
|
|
||
| class Processor(object): | ||
| """Handles Amazon SageMaker Processing tasks.""" | ||
|
|
@@ -483,13 +493,25 @@ def _normalize_outputs(self, outputs=None): | |
| # Generate a name for the ProcessingOutput if it doesn't have one. | ||
| if output.output_name is None: | ||
| output.output_name = "output-{}".format(count) | ||
| # If s3_output is None, create a default one with None s3_uri. | ||
| # The s3_uri will be auto-generated below based on job/pipeline context. | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The default output.s3_output = ProcessingS3Output(
s3_upload_mode=DEFAULT_S3_UPLOAD_MODE,
local_path=DEFAULT_PROCESSING_LOCAL_OUTPUT_PATH,
)Also, the |
||
| if output.s3_output is None: | ||
| output.s3_output = ProcessingS3Output( | ||
aviruthen marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| s3_uri=None, | ||
| local_path=DEFAULT_PROCESSING_LOCAL_OUTPUT_PATH, | ||
| s3_upload_mode=DEFAULT_S3_UPLOAD_MODE, | ||
| ) | ||
| # is_pipeline_variable handles None gracefully (returns False) | ||
| if output.s3_output and is_pipeline_variable(output.s3_output.s3_uri): | ||
aviruthen marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| normalized_outputs.append(output) | ||
| continue | ||
| # If the output's s3_uri is not an s3_uri, create one. | ||
| parse_result = urlparse(output.s3_output.s3_uri) | ||
| if parse_result.scheme != "s3": | ||
| if getattr(self.sagemaker_session, "local_mode", False) and parse_result.scheme == "file": | ||
| # If the output's s3_uri is None or not an s3_uri, create one. | ||
| if output.s3_output.s3_uri is None: | ||
| parse_result_scheme = "" | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line length likely exceeds 100 characters: if getattr(self.sagemaker_session, "local_mode", False) and parse_result_scheme == "file":This should be wrapped to stay within the 100-character limit. |
||
| else: | ||
| parse_result_scheme = urlparse(output.s3_output.s3_uri).scheme | ||
| if parse_result_scheme != "s3": | ||
| if getattr(self.sagemaker_session, "local_mode", False) and parse_result_scheme == "file": | ||
| normalized_outputs.append(output) | ||
| continue | ||
| if _pipeline_config: | ||
|
|
@@ -1421,11 +1443,17 @@ def _processing_output_to_request_dict(processing_output): | |
| } | ||
|
|
||
| if processing_output.s3_output: | ||
| request_dict["S3Output"] = { | ||
| "S3Uri": processing_output.s3_output.s3_uri, | ||
| s3_output_dict = { | ||
| "LocalPath": processing_output.s3_output.local_path, | ||
| "S3UploadMode": processing_output.s3_output.s3_upload_mode, | ||
| } | ||
| # After _normalize_outputs, s3_uri should always be populated. | ||
| # If it is still None at serialization time, omit S3Uri from the dict | ||
| # rather than sending None to the API. This is a defensive guard; | ||
| # _normalize_outputs is expected to fill in s3_uri before we reach here. | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The defensive guard to omit if processing_output.s3_output.s3_uri is not None:
s3_output_dict["S3Uri"] = processing_output.s3_output.s3_uri
else:
logger.warning(
"s3_uri is None for output '%s' at serialization time; "
"expected _normalize_outputs to have populated it.",
processing_output.output_name,
) |
||
| if processing_output.s3_output.s3_uri is not None: | ||
| s3_output_dict["S3Uri"] = processing_output.s3_output.s3_uri | ||
| request_dict["S3Output"] = s3_output_dict | ||
|
|
||
aviruthen marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| return request_dict | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10370,12 +10370,12 @@ class ProcessingS3Output(Base): | |
|
|
||
| Attributes | ||
| ---------------------- | ||
| s3_uri: A URI that identifies the Amazon S3 bucket where you want Amazon SageMaker to save the results of a processing job. | ||
| s3_uri: A URI that identifies the Amazon S3 bucket where you want Amazon SageMaker to save the results of a processing job. When set to None, the SDK auto-generates an S3 path based on the job name and output name. | ||
| local_path: The local path of a directory where you want Amazon SageMaker to upload its contents to Amazon S3. LocalPath is an absolute path to a directory containing output files. This directory will be created by the platform and exist when your container's entrypoint is invoked. | ||
| s3_upload_mode: Whether to upload the results of the processing job continuously or after the job completes. | ||
| """ | ||
|
|
||
| s3_uri: StrPipeVar | ||
| s3_uri: Optional[StrPipeVar] = None | ||
| s3_upload_mode: StrPipeVar | ||
| local_path: Optional[StrPipeVar] = Unassigned() | ||
|
|
||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Critical concern: The The correct fix should be made in the shapes definition/generation configuration (e.g., a JSON/YAML model or codegen template) so that Please confirm whether this file is safe to edit manually or if the change needs to go into the code generation source.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The field ordering changed here: You need to reorder so that s3_upload_mode: StrPipeVar
s3_uri: Optional[StrPipeVar] = None
local_path: Optional[StrPipeVar] = Unassigned()Alternatively, if using |
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -493,6 +493,159 @@ def test_multiple_outputs_with_s3_uris_preserved(self, session_local_mode_false) | |
| assert result[1].s3_output.s3_uri == "s3://my-bucket/second" | ||
|
|
||
|
|
||
| class TestProcessingS3OutputOptionalS3Uri: | ||
| """Tests for ProcessingS3Output with optional s3_uri (issue #5559).""" | ||
|
|
||
| def test_processing_s3_output_with_none_s3_uri_creates_successfully(self): | ||
| """Verify ProcessingS3Output can be created with s3_uri=None.""" | ||
| s3_output = ProcessingS3Output( | ||
| s3_uri=None, | ||
| local_path="/opt/ml/processing/output", | ||
| s3_upload_mode="EndOfJob", | ||
| ) | ||
aviruthen marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| assert s3_output.s3_uri is None | ||
| assert s3_output.local_path == "/opt/ml/processing/output" | ||
| assert s3_output.s3_upload_mode == "EndOfJob" | ||
|
|
||
| def test_processing_s3_output_without_s3_uri_param_creates_successfully(self): | ||
| """Verify ProcessingS3Output works with default None for s3_uri.""" | ||
| s3_output = ProcessingS3Output( | ||
| local_path="/opt/ml/processing/output", | ||
| s3_upload_mode="EndOfJob", | ||
| ) | ||
| assert s3_output.s3_uri is None | ||
|
|
||
| def test_processing_s3_output_with_explicit_s3_uri_preserves_value(self): | ||
| """Regression test: explicit s3_uri string is preserved in the model.""" | ||
| s3_output = ProcessingS3Output( | ||
| s3_uri="s3://my-bucket/my-output", | ||
| local_path="/opt/ml/processing/output", | ||
| s3_upload_mode="EndOfJob", | ||
| ) | ||
| assert s3_output.s3_uri == "s3://my-bucket/my-output" | ||
| assert s3_output.local_path == "/opt/ml/processing/output" | ||
| assert s3_output.s3_upload_mode == "EndOfJob" | ||
|
|
||
| def test_normalize_outputs_with_none_s3_uri_generates_s3_path(self, mock_session): | ||
| """When s3_uri is None, _normalize_outputs should auto-generate an S3 URI.""" | ||
aviruthen marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| processor = Processor( | ||
| role="arn:aws:iam::123456789012:role/SageMakerRole", | ||
| image_uri="test-image:latest", | ||
| instance_count=1, | ||
| instance_type="ml.m5.xlarge", | ||
| sagemaker_session=mock_session, | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good test coverage! However, the |
||
| ) | ||
| processor._current_job_name = "test-job" | ||
|
|
||
| s3_output = ProcessingS3Output( | ||
| s3_uri=None, | ||
| local_path="/opt/ml/processing/output", | ||
| s3_upload_mode="EndOfJob", | ||
| ) | ||
| outputs = [ProcessingOutput(output_name="my-output", s3_output=s3_output)] | ||
|
|
||
| with patch("sagemaker.core.workflow.utilities._pipeline_config", None): | ||
| result = processor._normalize_outputs(outputs) | ||
|
|
||
| assert len(result) == 1 | ||
| assert result[0].s3_output.s3_uri is not None | ||
| assert result[0].s3_output.s3_uri.startswith("s3://") | ||
| assert "test-job" in result[0].s3_output.s3_uri | ||
| assert "my-output" in result[0].s3_output.s3_uri | ||
|
|
||
| def test_normalize_outputs_with_none_s3_uri_and_pipeline_config_generates_join(self, mock_session): | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line exceeds 100 characters. Please wrap this line: def test_normalize_outputs_with_none_s3_uri_and_pipeline_config_generates_join(
self, mock_session
): |
||
| """When in pipeline context with s3_uri=None, should generate a Join expression.""" | ||
| processor = Processor( | ||
| role="arn:aws:iam::123456789012:role/SageMakerRole", | ||
| image_uri="test-image:latest", | ||
| instance_count=1, | ||
| instance_type="ml.m5.xlarge", | ||
| sagemaker_session=mock_session, | ||
| ) | ||
| processor._current_job_name = "test-job" | ||
|
|
||
| s3_output = ProcessingS3Output( | ||
aviruthen marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| s3_uri=None, | ||
| local_path="/opt/ml/processing/output", | ||
| s3_upload_mode="EndOfJob", | ||
| ) | ||
| outputs = [ProcessingOutput(output_name="my-output", s3_output=s3_output)] | ||
|
|
||
| with patch("sagemaker.core.workflow.utilities._pipeline_config") as mock_config: | ||
| mock_config.pipeline_name = "test-pipeline" | ||
| mock_config.step_name = "test-step" | ||
| result = processor._normalize_outputs(outputs) | ||
|
|
||
| assert len(result) == 1 | ||
| # In pipeline context, the s3_uri should be a Join object | ||
| from sagemaker.core.workflow.functions import Join | ||
| assert isinstance(result[0].s3_output.s3_uri, Join) | ||
|
|
||
| def test_normalize_outputs_with_none_s3_output_generates_s3_path(self, mock_session): | ||
| """When s3_output is None, _normalize_outputs should create s3_output and auto-generate URI.""" | ||
| processor = Processor( | ||
| role="arn:aws:iam::123456789012:role/SageMakerRole", | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The import of |
||
| image_uri="test-image:latest", | ||
| instance_count=1, | ||
| instance_type="ml.m5.xlarge", | ||
| sagemaker_session=mock_session, | ||
| ) | ||
| processor._current_job_name = "test-job" | ||
|
|
||
| outputs = [ProcessingOutput(output_name="my-output")] | ||
|
|
||
| with patch("sagemaker.core.workflow.utilities._pipeline_config", None): | ||
| result = processor._normalize_outputs(outputs) | ||
|
|
||
| assert len(result) == 1 | ||
| assert result[0].s3_output is not None | ||
| assert result[0].s3_output.s3_uri is not None | ||
| assert result[0].s3_output.s3_uri.startswith("s3://") | ||
| assert result[0].s3_output.local_path == "/opt/ml/processing/output" | ||
| assert result[0].s3_output.s3_upload_mode == "EndOfJob" | ||
|
|
||
| def test_processing_output_to_request_dict_with_none_s3_uri_omits_key(self): | ||
| """When s3_uri is None, S3Uri should be omitted from the request dict.""" | ||
| s3_output = ProcessingS3Output( | ||
| s3_uri=None, | ||
| local_path="/opt/ml/processing/output", | ||
| s3_upload_mode="EndOfJob", | ||
| ) | ||
| processing_output = ProcessingOutput(output_name="results", s3_output=s3_output) | ||
|
|
||
| result = _processing_output_to_request_dict(processing_output) | ||
|
|
||
| assert result["OutputName"] == "results" | ||
| assert "S3Output" in result | ||
| assert "S3Uri" not in result["S3Output"] | ||
| assert result["S3Output"]["LocalPath"] == "/opt/ml/processing/output" | ||
| assert result["S3Output"]["S3UploadMode"] == "EndOfJob" | ||
|
|
||
| def test_normalize_outputs_with_explicit_s3_uri_unchanged(self, mock_session): | ||
| """Regression test: explicit s3:// URIs should be preserved.""" | ||
| processor = Processor( | ||
| role="arn:aws:iam::123456789012:role/SageMakerRole", | ||
| image_uri="test-image:latest", | ||
| instance_count=1, | ||
| instance_type="ml.m5.xlarge", | ||
| sagemaker_session=mock_session, | ||
| ) | ||
| processor._current_job_name = "test-job" | ||
|
|
||
| s3_output = ProcessingS3Output( | ||
| s3_uri="s3://my-bucket/my-output", | ||
| local_path="/opt/ml/processing/output", | ||
| s3_upload_mode="EndOfJob", | ||
| ) | ||
| outputs = [ProcessingOutput(output_name="my-output", s3_output=s3_output)] | ||
|
|
||
| with patch("sagemaker.core.workflow.utilities._pipeline_config", None): | ||
| result = processor._normalize_outputs(outputs) | ||
|
|
||
| assert len(result) == 1 | ||
| assert result[0].s3_output.s3_uri == "s3://my-bucket/my-output" | ||
|
|
||
|
|
||
| class TestProcessorStartNew: | ||
| def test_start_new_with_pipeline_session(self, mock_session): | ||
| from sagemaker.core.workflow.pipeline_context import PipelineSession | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit:
DEFAULT_S3_UPLOAD_MODEshould use the enum or constant from the shapes/API model if one exists, rather than a raw string"EndOfJob". This avoids drift if the API model changes the accepted values.