diff --git a/sagemaker-core/src/sagemaker/core/processing.py b/sagemaker-core/src/sagemaker/core/processing.py index 736eebf01c..bc70362361 100644 --- a/sagemaker-core/src/sagemaker/core/processing.py +++ b/sagemaker-core/src/sagemaker/core/processing.py @@ -16,14 +16,14 @@ data pre-processing, post-processing, feature engineering, data validation, and model evaluation, and interpretation on Amazon SageMaker. """ -from __future__ import absolute_import +from __future__ import annotations import json import logging import os import pathlib import re -from typing import Dict, List, Optional, Union +from typing import Dict, List, Literal, Optional, Union import time from copy import copy from textwrap import dedent @@ -84,6 +84,127 @@ logger = logging.getLogger(__name__) +DEFAULT_PROCESSING_INPUT_PATH = "/opt/ml/processing/input" + + +def processing_input_from_local( + input_name: str, + local_path: str, + destination: str, + s3_data_type: Literal["S3Prefix", "ManifestFile"] = "S3Prefix", + s3_input_mode: Literal["File", "Pipe"] = "File", +) -> ProcessingInput: + """Create a ProcessingInput from a local file or directory path. + + This is a convenience factory that makes it clear users can pass local + paths as processing job inputs. The local path is stored in + ``ProcessingS3Input.s3_uri`` and will be automatically uploaded to S3 + when the processor's ``run()`` method is called. + + :param input_name: The name for this processing input. + :param local_path: A local file or directory path to use as input. + This will be uploaded to S3 automatically before the + processing job starts. + :param destination: The container path where the input data will be + made available (e.g. ``'/opt/ml/processing/input/data'``). + :param s3_data_type: The S3 data type (default: ``'S3Prefix'``). + :param s3_input_mode: The input mode (default: ``'File'``). + :returns: A ``ProcessingInput`` configured with the local path. + The path will be uploaded to S3 during ``Processor.run()``. + :raises ValueError: If ``input_name``, ``local_path``, or ``destination`` is empty. + + Example:: + + >>> inp = processing_input_from_local( + ... input_name="my-data", + ... local_path="/home/user/data/", + ... destination="/opt/ml/processing/input/data", + ... ) + >>> processor.run(inputs=[inp]) + """ + if not input_name: + raise ValueError( + f"input_name must be a non-empty string, got: {input_name!r}" + ) + if not local_path: + raise ValueError( + f"local_path must be a non-empty string, got: {local_path!r}" + ) + if not destination: + raise ValueError( + f"destination must be a non-empty string, got: {destination!r}" + ) + return ProcessingInput( + input_name=input_name, + s3_input=ProcessingS3Input( + s3_uri=local_path, + local_path=destination, + s3_data_type=s3_data_type, + s3_input_mode=s3_input_mode, + ), + ) + + +def create_processing_input( + source: str, + destination: str, + input_name: str | None = None, + s3_data_type: Literal["S3Prefix", "ManifestFile"] = "S3Prefix", + s3_input_mode: Literal["File", "Pipe"] = "File", +) -> ProcessingInput: + """Create a ProcessingInput from a local path or S3 URI. + + This factory provides V2-like ergonomics where users pass a ``source`` + parameter that can be either a local file/directory path or an S3 URI. + If ``source`` is a local path (does not start with ``s3://``), it will + be automatically uploaded to S3 when the processor runs. + + :param source: A local file/directory path or S3 URI. Local paths will + be uploaded to S3 automatically before the processing job starts. + :param destination: The container path where the input data will be + made available (e.g. ``'/opt/ml/processing/input/data'``). + :param input_name: The name for this processing input. If ``None``, + a name will be auto-generated by ``Processor._normalize_inputs()`` + (default: ``None``). + :param s3_data_type: The S3 data type (default: ``'S3Prefix'``). + :param s3_input_mode: The input mode (default: ``'File'``). + :returns: A ``ProcessingInput`` object. If ``source`` is a local path, + it is stored in ``ProcessingS3Input.s3_uri`` and will be uploaded + to S3 during ``Processor.run()``. + :raises ValueError: If ``source`` or ``destination`` is empty. + + Example:: + + >>> # Using a local path + >>> inp = create_processing_input( + ... source="/home/user/data/", + ... destination="/opt/ml/processing/input/data", + ... input_name="my-data", + ... ) + >>> # Using an S3 URI + >>> inp = create_processing_input( + ... source="s3://my-bucket/data/", + ... destination="/opt/ml/processing/input/data", + ... input_name="my-data", + ... ) + >>> processor.run(inputs=[inp]) + """ + if not source: + raise ValueError(f"source must be a non-empty string, got: {source!r}") + if not destination: + raise ValueError( + f"destination must be a non-empty string, got: {destination!r}" + ) + return ProcessingInput( + input_name=input_name, + s3_input=ProcessingS3Input( + s3_uri=source, + local_path=destination, + s3_data_type=s3_data_type, + s3_input_mode=s3_input_mode, + ), + ) + class Processor(object): """Handles Amazon SageMaker Processing tasks.""" @@ -238,6 +359,14 @@ def run( inputs (list[:class:`~sagemaker.core.shapes.ProcessingInput`]): Input files for the processing job. These must be provided as :class:`~sagemaker.core.shapes.ProcessingInput` objects (default: None). + + .. note:: + ``ProcessingS3Input.s3_uri`` can accept local file paths in addition + to S3 URIs. Local paths will be automatically uploaded to S3 before + the processing job starts. For clearer intent when using local paths, + consider using the :func:`processing_input_from_local` or + :func:`create_processing_input` convenience factories. + outputs (list[:class:`~sagemaker.core.shapes.ProcessingOutput`]): Outputs for the processing job. These can be specified as either path strings or :class:`~sagemaker.core.shapes.ProcessingOutput` objects (default: None). @@ -401,6 +530,13 @@ def _normalize_inputs(self, inputs=None, kms_key=None): Raises: TypeError: if the inputs are not ``ProcessingInput`` objects. + + Note: + ``ProcessingS3Input.s3_uri`` can accept local file or directory paths + in addition to S3 URIs. Local paths will be automatically uploaded + to S3 before the processing job starts. For clearer intent when + using local paths, consider using :func:`processing_input_from_local` + or :func:`create_processing_input`. """ from sagemaker.core.workflow.utilities import _pipeline_config @@ -413,7 +549,7 @@ def _normalize_inputs(self, inputs=None, kms_key=None): raise TypeError("Your inputs must be provided as ProcessingInput objects.") # Generate a name for the ProcessingInput if it doesn't have one. if file_input.input_name is None: - file_input.input_name = "input-{}".format(count) + file_input.input_name = f"input-{count}" if file_input.dataset_definition: normalized_inputs.append(file_input) diff --git a/sagemaker-core/tests/unit/test_processing.py b/sagemaker-core/tests/unit/test_processing.py index dbe8d5f9ef..8b685a068a 100644 --- a/sagemaker-core/tests/unit/test_processing.py +++ b/sagemaker-core/tests/unit/test_processing.py @@ -23,6 +23,8 @@ _processing_output_to_request_dict, _get_process_request, logs_for_processing_job, + processing_input_from_local, + create_processing_input, ) from sagemaker.core.shapes import ( ProcessingInput, @@ -493,6 +495,257 @@ def test_multiple_outputs_with_s3_uris_preserved(self, session_local_mode_false) assert result[1].s3_output.s3_uri == "s3://my-bucket/second" +class TestProcessingInputFromLocal: + """Tests for the processing_input_from_local convenience factory.""" + + def test_processing_input_from_local_creates_valid_processing_input(self): + """processing_input_from_local should create a ProcessingInput with the local + path stored in s3_input.s3_uri.""" + result = processing_input_from_local( + input_name="my-data", + local_path="/home/user/data/", + destination="/opt/ml/processing/input/data", + ) + + assert isinstance(result, ProcessingInput) + assert result.input_name == "my-data" + assert result.s3_input.s3_uri == "/home/user/data/" + assert result.s3_input.local_path == "/opt/ml/processing/input/data" + assert result.s3_input.s3_data_type == "S3Prefix" + assert result.s3_input.s3_input_mode == "File" + + def test_processing_input_from_local_with_custom_data_type_and_mode(self): + """processing_input_from_local should accept custom s3_data_type and s3_input_mode.""" + result = processing_input_from_local( + input_name="manifest-input", + local_path="/tmp/manifest.json", + destination="/opt/ml/processing/input/manifest", + s3_data_type="ManifestFile", + s3_input_mode="Pipe", + ) + + assert result.s3_input.s3_data_type == "ManifestFile" + assert result.s3_input.s3_input_mode == "Pipe" + + @pytest.mark.parametrize("input_name", ["", None]) + def test_processing_input_from_local_invalid_input_name_raises(self, input_name): + """processing_input_from_local should raise ValueError for empty or None input_name.""" + with pytest.raises(ValueError, match="input_name must be a non-empty string"): + processing_input_from_local( + input_name=input_name, + local_path="/tmp/data", + destination="/opt/ml/processing/input", + ) + + @pytest.mark.parametrize("local_path", ["", None]) + def test_processing_input_from_local_invalid_local_path_raises(self, local_path): + """processing_input_from_local should raise ValueError for empty or None local_path.""" + with pytest.raises(ValueError, match="local_path must be a non-empty string"): + processing_input_from_local( + input_name="data", + local_path=local_path, + destination="/opt/ml/processing/input", + ) + + @pytest.mark.parametrize("destination", ["", None]) + def test_processing_input_from_local_invalid_destination_raises(self, destination): + """processing_input_from_local should raise ValueError for empty or None destination.""" + with pytest.raises(ValueError, match="destination must be a non-empty string"): + processing_input_from_local( + input_name="data", + local_path="/tmp/data", + destination=destination, + ) + + def test_processing_input_from_local_with_pipeline_config_uses_pipeline_s3_path( + self, mock_session + ): + """When used with a Processor, the local path should be uploaded using + pipeline config S3 path when pipeline config is set.""" + inp = processing_input_from_local( + input_name="my-data", + local_path="/tmp/local_data", + destination="/opt/ml/processing/input/data", + ) + + processor = Processor( + role="arn:aws:iam::123456789012:role/SageMakerRole", + image_uri="test-image:latest", + instance_count=1, + instance_type="ml.m5.xlarge", + sagemaker_session=mock_session, + ) + processor._current_job_name = "test-job" + + with patch("sagemaker.core.workflow.utilities._pipeline_config") as mock_config: + mock_config.pipeline_name = "test-pipeline" + mock_config.step_name = "test-step" + with patch( + "sagemaker.core.s3.S3Uploader.upload", + return_value="s3://test-bucket/sagemaker/test-pipeline/test-step/input/my-data", + ) as mock_upload: + result = processor._normalize_inputs([inp]) + mock_upload.assert_called_once() + # Verify the session is passed correctly to the upload call + call_kwargs = mock_upload.call_args + assert call_kwargs[1]["sagemaker_session"] == mock_session + assert result[0].s3_input.s3_uri.startswith("s3://") + + +class TestCreateProcessingInput: + """Tests for the create_processing_input convenience factory.""" + + def test_create_processing_input_with_local_path_stores_path_in_s3_uri(self): + """create_processing_input with a local path should store it in s3_input.s3_uri.""" + result = create_processing_input( + source="/home/user/data/", + destination="/opt/ml/processing/input/data", + input_name="my-data", + ) + + assert isinstance(result, ProcessingInput) + assert result.input_name == "my-data" + assert result.s3_input.s3_uri == "/home/user/data/" + assert result.s3_input.local_path == "/opt/ml/processing/input/data" + + def test_create_processing_input_with_s3_uri_stores_directly(self): + """create_processing_input with an S3 URI should store it directly.""" + result = create_processing_input( + source="s3://my-bucket/data/", + destination="/opt/ml/processing/input/data", + input_name="my-data", + ) + + assert result.s3_input.s3_uri == "s3://my-bucket/data/" + + def test_create_processing_input_default_s3_data_type_and_input_mode(self): + """create_processing_input should default to S3Prefix and File.""" + result = create_processing_input( + source="/tmp/data", + destination="/opt/ml/processing/input", + input_name="data", + ) + + assert result.s3_input.s3_data_type == "S3Prefix" + assert result.s3_input.s3_input_mode == "File" + + @pytest.mark.parametrize("source", ["", None]) + def test_create_processing_input_invalid_source_raises(self, source): + """create_processing_input should raise ValueError for empty or None source.""" + with pytest.raises(ValueError, match="source must be a non-empty string"): + create_processing_input( + source=source, + destination="/opt/ml/processing/input", + input_name="data", + ) + + @pytest.mark.parametrize("destination", ["", None]) + def test_create_processing_input_invalid_destination_raises(self, destination): + """create_processing_input should raise ValueError for empty or None destination.""" + with pytest.raises(ValueError, match="destination must be a non-empty string"): + create_processing_input( + source="/tmp/data", + destination=destination, + input_name="data", + ) + + def test_create_processing_input_with_none_input_name_succeeds(self): + """create_processing_input should succeed with input_name=None (auto-generated).""" + result = create_processing_input( + source="/tmp/data", + destination="/opt/ml/processing/input", + ) + assert result.input_name is None + assert result.s3_input.s3_uri == "/tmp/data" + + +class TestNormalizeInputsLocalPathUpload: + """Tests for _normalize_inputs handling of local paths in s3_uri.""" + + def test_normalize_inputs_with_local_path_in_s3_uri_uploads_to_s3(self, mock_session): + """A local file path in s3_input.s3_uri should be uploaded to S3.""" + processor = Processor( + role="arn:aws:iam::123456789012:role/SageMakerRole", + image_uri="test-image:latest", + instance_count=1, + instance_type="ml.m5.xlarge", + sagemaker_session=mock_session, + ) + processor._current_job_name = "test-job" + + inp = create_processing_input( + source="/tmp/my_local_data", + destination="/opt/ml/processing/input/data", + input_name="local-data", + ) + + with patch("sagemaker.core.workflow.utilities._pipeline_config", None): + with patch( + "sagemaker.core.s3.S3Uploader.upload", + return_value="s3://test-bucket/sagemaker/test-job/input/local-data", + ) as mock_upload: + result = processor._normalize_inputs([inp]) + + assert len(result) == 1 + assert result[0].s3_input.s3_uri == ( + "s3://test-bucket/sagemaker/test-job/input/local-data" + ) + mock_upload.assert_called_once() + + def test_normalize_inputs_with_local_directory_in_s3_uri_uploads_to_s3(self, mock_session): + """A local directory path in s3_input.s3_uri should be uploaded to S3.""" + processor = Processor( + role="arn:aws:iam::123456789012:role/SageMakerRole", + image_uri="test-image:latest", + instance_count=1, + instance_type="ml.m5.xlarge", + sagemaker_session=mock_session, + ) + processor._current_job_name = "test-job" + + inp = processing_input_from_local( + input_name="dir-data", + local_path="/home/user/datasets/training/", + destination="/opt/ml/processing/input/train", + ) + + with patch("sagemaker.core.workflow.utilities._pipeline_config", None): + with patch( + "sagemaker.core.s3.S3Uploader.upload", + return_value="s3://test-bucket/sagemaker/test-job/input/dir-data", + ) as mock_upload: + result = processor._normalize_inputs([inp]) + + assert len(result) == 1 + assert result[0].s3_input.s3_uri.startswith("s3://") + mock_upload.assert_called_once() + + def test_normalize_inputs_s3_uri_not_uploaded(self, mock_session): + """An S3 URI in s3_input.s3_uri should NOT trigger an upload.""" + processor = Processor( + role="arn:aws:iam::123456789012:role/SageMakerRole", + image_uri="test-image:latest", + instance_count=1, + instance_type="ml.m5.xlarge", + sagemaker_session=mock_session, + ) + processor._current_job_name = "test-job" + + inp = create_processing_input( + source="s3://my-bucket/data/", + destination="/opt/ml/processing/input/data", + input_name="s3-data", + ) + + with patch("sagemaker.core.workflow.utilities._pipeline_config", None): + with patch("sagemaker.core.s3.S3Uploader.upload") as mock_upload: + result = processor._normalize_inputs([inp]) + + assert len(result) == 1 + assert result[0].s3_input.s3_uri == "s3://my-bucket/data/" + mock_upload.assert_not_called() + + class TestProcessorStartNew: def test_start_new_with_pipeline_session(self, mock_session): from sagemaker.core.workflow.pipeline_context import PipelineSession