Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 163 additions & 1 deletion sagemaker-core/src/sagemaker/core/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,137 @@
logger = logging.getLogger(__name__)


def processing_input_from_local(
Comment thread
aviruthen marked this conversation as resolved.
input_name: str,
local_path: str,
Comment thread
aviruthen marked this conversation as resolved.
Comment thread
aviruthen marked this conversation as resolved.
destination: str,
s3_data_type: str = "S3Prefix",
s3_input_mode: str = "File",
) -> ProcessingInput:
"""Creates a ProcessingInput from a local file or directory path.

This is a convenience factory that makes it clear users can pass local
paths as processing job inputs. The local path is stored in
``ProcessingS3Input.s3_uri`` and will be automatically uploaded to S3
when the processor's ``run()`` method is called.

Args:
input_name: The name for this processing input.
local_path: A local file or directory path to use as input.
This will be uploaded to S3 automatically before the
processing job starts.
destination: The container path where the input data will be
made available (e.g. ``/opt/ml/processing/input/data``).
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing validation for destination parameter. processing_input_from_local validates input_name and local_path but not destination, while create_processing_input validates all three. This inconsistency could confuse users. Add:

if not destination:
    raise ValueError(f"destination must be a non-empty string, got: {destination!r}")

s3_data_type: The S3 data type. Valid values: ``'S3Prefix'``,
``'ManifestFile'`` (default: ``'S3Prefix'``).
s3_input_mode: The input mode. Valid values: ``'File'``,
``'Pipe'`` (default: ``'File'``).

Returns:
ProcessingInput: A ``ProcessingInput`` object configured with the
local path. The path will be uploaded to S3 during
``Processor.run()``.

Raises:
ValueError: If ``input_name`` or ``local_path`` is empty.

Example:
>>> inp = processing_input_from_local(
... input_name="my-data",
... local_path="/home/user/data/",
... destination="/opt/ml/processing/input/data",
... )
>>> processor.run(inputs=[inp])
"""
if not input_name:
raise ValueError(
f"input_name must be a non-empty string, got: {input_name!r}"
)
if not local_path:
raise ValueError(
f"local_path must be a non-empty string, got: {local_path!r}"
)
return ProcessingInput(
input_name=input_name,
s3_input=ProcessingS3Input(
s3_uri=local_path,
local_path=destination,
s3_data_type=s3_data_type,
s3_input_mode=s3_input_mode,
),
)


def create_processing_input(
source: str,
destination: str,
input_name: str,
s3_data_type: str = "S3Prefix",
s3_input_mode: str = "File",
) -> ProcessingInput:
Comment thread
aviruthen marked this conversation as resolved.
Comment thread
aviruthen marked this conversation as resolved.
"""Creates a ProcessingInput from a local path or S3 URI.

This factory provides V2-like ergonomics where users pass a ``source``
parameter that can be either a local file/directory path or an S3 URI.
If ``source`` is a local path (does not start with ``s3://``), it will
be automatically uploaded to S3 when the processor runs.

Args:
source: A local file/directory path or S3 URI. Local paths will
be uploaded to S3 automatically before the processing job
starts.
destination: The container path where the input data will be
made available (e.g. ``/opt/ml/processing/input/data``).
input_name: The name for this processing input.
s3_data_type: The S3 data type. Valid values: ``'S3Prefix'``,
``'ManifestFile'`` (default: ``'S3Prefix'``).
s3_input_mode: The input mode. Valid values: ``'File'``,
``'Pipe'`` (default: ``'File'``).

Returns:
ProcessingInput: A ``ProcessingInput`` object. If ``source`` is a
local path, it is stored in ``ProcessingS3Input.s3_uri`` and
will be uploaded to S3 during ``Processor.run()``.

Raises:
ValueError: If ``source``, ``destination``, or ``input_name`` is empty.

Example:
>>> # Using a local path
>>> inp = create_processing_input(
... source="/home/user/data/",
... destination="/opt/ml/processing/input/data",
... input_name="my-data",
... )
>>> # Using an S3 URI
>>> inp = create_processing_input(
... source="s3://my-bucket/data/",
... destination="/opt/ml/processing/input/data",
... input_name="my-data",
... )
>>> processor.run(inputs=[inp])
"""
if not source:
raise ValueError(f"source must be a non-empty string, got: {source!r}")
if not destination:
raise ValueError(
f"destination must be a non-empty string, got: {destination!r}"
)
if not input_name:
raise ValueError(
f"input_name must be a non-empty string, got: {input_name!r}"
)
return ProcessingInput(
input_name=input_name,
s3_input=ProcessingS3Input(
s3_uri=source,
local_path=destination,
s3_data_type=s3_data_type,
s3_input_mode=s3_input_mode,
),
)


class Processor(object):
"""Handles Amazon SageMaker Processing tasks."""

Expand Down Expand Up @@ -238,6 +369,14 @@ def run(
inputs (list[:class:`~sagemaker.core.shapes.ProcessingInput`]): Input files for
the processing job. These must be provided as
:class:`~sagemaker.core.shapes.ProcessingInput` objects (default: None).

.. note::
``ProcessingS3Input.s3_uri`` can accept local file paths in addition
to S3 URIs. Local paths will be automatically uploaded to S3 before
the processing job starts. For clearer intent when using local paths,
consider using the :func:`processing_input_from_local` or
:func:`create_processing_input` convenience factories.

outputs (list[:class:`~sagemaker.core.shapes.ProcessingOutput`]): Outputs for
the processing job. These can be specified as either path strings or
:class:`~sagemaker.core.shapes.ProcessingOutput` objects (default: None).
Expand Down Expand Up @@ -401,6 +540,13 @@ def _normalize_inputs(self, inputs=None, kms_key=None):

Raises:
TypeError: if the inputs are not ``ProcessingInput`` objects.

Note:
``ProcessingS3Input.s3_uri`` can accept local file or directory paths
in addition to S3 URIs. Local paths will be automatically uploaded
to S3 before the processing job starts. For clearer intent when
using local paths, consider using :func:`processing_input_from_local`
or :func:`create_processing_input`.
"""
from sagemaker.core.workflow.utilities import _pipeline_config

Expand All @@ -413,7 +559,23 @@ def _normalize_inputs(self, inputs=None, kms_key=None):
raise TypeError("Your inputs must be provided as ProcessingInput objects.")
# Generate a name for the ProcessingInput if it doesn't have one.
if file_input.input_name is None:
Comment thread
aviruthen marked this conversation as resolved.
file_input.input_name = "input-{}".format(count)
file_input.input_name = f"input-{count}"

# Support ad-hoc 'source' attribute for V2-like ergonomics.
# If a user sets file_input.source (e.g. via monkey-patching or
# a subclass), populate s3_input.s3_uri from it so the existing
# upload logic handles it.
_source = getattr(file_input, "source", None)
if _source is not None:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Critical design concern: getattr(file_input, "source", None) monkey-patching pattern is fragile and anti-pattern.

This relies on users monkey-patching an ad-hoc source attribute onto a Pydantic BaseModel (ProcessingInput is auto-generated from sagemaker-core shapes). Pydantic models by default reject extra attributes, so file_input.source = '/tmp/data' would raise a ValidationError unless the model is configured with model_config = ConfigDict(extra='allow'). This code path is essentially dead unless users bypass Pydantic validation.

Moreover, encouraging monkey-patching on auto-generated shapes is a maintenance hazard — if the shape ever adds a real source field, this code will silently conflict.

Recommendation: Remove this entire _source = getattr(...) block. The two factory functions (processing_input_from_local and create_processing_input) already solve the UX problem cleanly by populating s3_input.s3_uri correctly. There's no need for this fallback path.

if file_input.s3_input is None:
file_input.s3_input = ProcessingS3Input(
s3_uri=_source,
local_path="/opt/ml/processing/input",
s3_data_type="S3Prefix",
s3_input_mode="File",
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hardcoded default /opt/ml/processing/input — this magic string should be extracted to a module-level constant, e.g.:

DEFAULT_PROCESSING_INPUT_PATH = "/opt/ml/processing/input"

This is also used in the factory functions' docstrings and examples, so a constant would ensure consistency.

)
else:
file_input.s3_input.s3_uri = _source

if file_input.dataset_definition:
normalized_inputs.append(file_input)
Expand Down
Loading
Loading