Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 139 additions & 3 deletions sagemaker-core/src/sagemaker/core/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
data pre-processing, post-processing, feature engineering, data validation, and model evaluation,
and interpretation on Amazon SageMaker.
"""
from __future__ import absolute_import
from __future__ import annotations

import json
import logging
import os
import pathlib
import re
from typing import Dict, List, Optional, Union
from typing import Dict, List, Literal, Optional, Union
import time
from copy import copy
from textwrap import dedent
Expand Down Expand Up @@ -84,6 +84,127 @@

logger = logging.getLogger(__name__)

DEFAULT_PROCESSING_INPUT_PATH = "/opt/ml/processing/input"


def processing_input_from_local(
Comment thread
aviruthen marked this conversation as resolved.
input_name: str,
local_path: str,
Comment thread
aviruthen marked this conversation as resolved.
Comment thread
aviruthen marked this conversation as resolved.
destination: str,
s3_data_type: Literal["S3Prefix", "ManifestFile"] = "S3Prefix",
s3_input_mode: Literal["File", "Pipe"] = "File",
) -> ProcessingInput:
"""Create a ProcessingInput from a local file or directory path.

This is a convenience factory that makes it clear users can pass local
paths as processing job inputs. The local path is stored in
``ProcessingS3Input.s3_uri`` and will be automatically uploaded to S3
when the processor's ``run()`` method is called.

:param input_name: The name for this processing input.
:param local_path: A local file or directory path to use as input.
This will be uploaded to S3 automatically before the
processing job starts.
:param destination: The container path where the input data will be
made available (e.g. ``'/opt/ml/processing/input/data'``).
:param s3_data_type: The S3 data type (default: ``'S3Prefix'``).
:param s3_input_mode: The input mode (default: ``'File'``).
:returns: A ``ProcessingInput`` configured with the local path.
The path will be uploaded to S3 during ``Processor.run()``.
:raises ValueError: If ``input_name``, ``local_path``, or ``destination`` is empty.

Example::

>>> inp = processing_input_from_local(
... input_name="my-data",
... local_path="/home/user/data/",
... destination="/opt/ml/processing/input/data",
... )
>>> processor.run(inputs=[inp])
"""
if not input_name:
raise ValueError(
f"input_name must be a non-empty string, got: {input_name!r}"
)
if not local_path:
raise ValueError(
f"local_path must be a non-empty string, got: {local_path!r}"
)
if not destination:
raise ValueError(
f"destination must be a non-empty string, got: {destination!r}"
)
return ProcessingInput(
input_name=input_name,
s3_input=ProcessingS3Input(
s3_uri=local_path,
local_path=destination,
s3_data_type=s3_data_type,
s3_input_mode=s3_input_mode,
),
)


def create_processing_input(
source: str,
destination: str,
input_name: str | None = None,
s3_data_type: Literal["S3Prefix", "ManifestFile"] = "S3Prefix",
s3_input_mode: Literal["File", "Pipe"] = "File",
) -> ProcessingInput:
Comment thread
aviruthen marked this conversation as resolved.
Comment thread
aviruthen marked this conversation as resolved.
"""Create a ProcessingInput from a local path or S3 URI.

This factory provides V2-like ergonomics where users pass a ``source``
parameter that can be either a local file/directory path or an S3 URI.
If ``source`` is a local path (does not start with ``s3://``), it will
be automatically uploaded to S3 when the processor runs.

:param source: A local file/directory path or S3 URI. Local paths will
be uploaded to S3 automatically before the processing job starts.
:param destination: The container path where the input data will be
made available (e.g. ``'/opt/ml/processing/input/data'``).
:param input_name: The name for this processing input. If ``None``,
a name will be auto-generated by ``Processor._normalize_inputs()``
(default: ``None``).
:param s3_data_type: The S3 data type (default: ``'S3Prefix'``).
:param s3_input_mode: The input mode (default: ``'File'``).
:returns: A ``ProcessingInput`` object. If ``source`` is a local path,
it is stored in ``ProcessingS3Input.s3_uri`` and will be uploaded
to S3 during ``Processor.run()``.
:raises ValueError: If ``source`` or ``destination`` is empty.

Example::

>>> # Using a local path
>>> inp = create_processing_input(
... source="/home/user/data/",
... destination="/opt/ml/processing/input/data",
... input_name="my-data",
... )
>>> # Using an S3 URI
>>> inp = create_processing_input(
... source="s3://my-bucket/data/",
... destination="/opt/ml/processing/input/data",
... input_name="my-data",
... )
>>> processor.run(inputs=[inp])
"""
if not source:
raise ValueError(f"source must be a non-empty string, got: {source!r}")
if not destination:
raise ValueError(
f"destination must be a non-empty string, got: {destination!r}"
)
return ProcessingInput(
input_name=input_name,
s3_input=ProcessingS3Input(
s3_uri=source,
local_path=destination,
s3_data_type=s3_data_type,
s3_input_mode=s3_input_mode,
),
)


class Processor(object):
"""Handles Amazon SageMaker Processing tasks."""
Expand Down Expand Up @@ -238,6 +359,14 @@ def run(
inputs (list[:class:`~sagemaker.core.shapes.ProcessingInput`]): Input files for
the processing job. These must be provided as
:class:`~sagemaker.core.shapes.ProcessingInput` objects (default: None).

.. note::
``ProcessingS3Input.s3_uri`` can accept local file paths in addition
to S3 URIs. Local paths will be automatically uploaded to S3 before
the processing job starts. For clearer intent when using local paths,
consider using the :func:`processing_input_from_local` or
:func:`create_processing_input` convenience factories.

outputs (list[:class:`~sagemaker.core.shapes.ProcessingOutput`]): Outputs for
the processing job. These can be specified as either path strings or
:class:`~sagemaker.core.shapes.ProcessingOutput` objects (default: None).
Expand Down Expand Up @@ -401,6 +530,13 @@ def _normalize_inputs(self, inputs=None, kms_key=None):

Raises:
TypeError: if the inputs are not ``ProcessingInput`` objects.

Note:
``ProcessingS3Input.s3_uri`` can accept local file or directory paths
in addition to S3 URIs. Local paths will be automatically uploaded
to S3 before the processing job starts. For clearer intent when
using local paths, consider using :func:`processing_input_from_local`
or :func:`create_processing_input`.
"""
from sagemaker.core.workflow.utilities import _pipeline_config

Expand All @@ -413,7 +549,7 @@ def _normalize_inputs(self, inputs=None, kms_key=None):
raise TypeError("Your inputs must be provided as ProcessingInput objects.")
# Generate a name for the ProcessingInput if it doesn't have one.
if file_input.input_name is None:
Comment thread
aviruthen marked this conversation as resolved.
file_input.input_name = "input-{}".format(count)
file_input.input_name = f"input-{count}"

if file_input.dataset_definition:
normalized_inputs.append(file_input)
Expand Down
Loading
Loading