-
Notifications
You must be signed in to change notification settings - Fork 0
fix: Support local source for sagemaker.core.shapes.ProcessingInput (5672) #30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -85,6 +85,137 @@ | |
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| def processing_input_from_local( | ||
| input_name: str, | ||
| local_path: str, | ||
|
aviruthen marked this conversation as resolved.
aviruthen marked this conversation as resolved.
|
||
| destination: str, | ||
| s3_data_type: str = "S3Prefix", | ||
| s3_input_mode: str = "File", | ||
| ) -> ProcessingInput: | ||
| """Creates a ProcessingInput from a local file or directory path. | ||
|
|
||
| This is a convenience factory that makes it clear users can pass local | ||
| paths as processing job inputs. The local path is stored in | ||
| ``ProcessingS3Input.s3_uri`` and will be automatically uploaded to S3 | ||
| when the processor's ``run()`` method is called. | ||
|
|
||
| Args: | ||
| input_name: The name for this processing input. | ||
| local_path: A local file or directory path to use as input. | ||
| This will be uploaded to S3 automatically before the | ||
| processing job starts. | ||
| destination: The container path where the input data will be | ||
| made available (e.g. ``/opt/ml/processing/input/data``). | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing validation for if not destination:
raise ValueError(f"destination must be a non-empty string, got: {destination!r}") |
||
| s3_data_type: The S3 data type. Valid values: ``'S3Prefix'``, | ||
| ``'ManifestFile'`` (default: ``'S3Prefix'``). | ||
| s3_input_mode: The input mode. Valid values: ``'File'``, | ||
| ``'Pipe'`` (default: ``'File'``). | ||
|
|
||
| Returns: | ||
| ProcessingInput: A ``ProcessingInput`` object configured with the | ||
| local path. The path will be uploaded to S3 during | ||
| ``Processor.run()``. | ||
|
|
||
| Raises: | ||
| ValueError: If ``input_name`` or ``local_path`` is empty. | ||
|
|
||
| Example: | ||
| >>> inp = processing_input_from_local( | ||
| ... input_name="my-data", | ||
| ... local_path="/home/user/data/", | ||
| ... destination="/opt/ml/processing/input/data", | ||
| ... ) | ||
| >>> processor.run(inputs=[inp]) | ||
| """ | ||
| if not input_name: | ||
| raise ValueError( | ||
| f"input_name must be a non-empty string, got: {input_name!r}" | ||
| ) | ||
| if not local_path: | ||
| raise ValueError( | ||
| f"local_path must be a non-empty string, got: {local_path!r}" | ||
| ) | ||
| return ProcessingInput( | ||
| input_name=input_name, | ||
| s3_input=ProcessingS3Input( | ||
| s3_uri=local_path, | ||
| local_path=destination, | ||
| s3_data_type=s3_data_type, | ||
| s3_input_mode=s3_input_mode, | ||
| ), | ||
| ) | ||
|
|
||
|
|
||
| def create_processing_input( | ||
| source: str, | ||
| destination: str, | ||
| input_name: str, | ||
| s3_data_type: str = "S3Prefix", | ||
| s3_input_mode: str = "File", | ||
| ) -> ProcessingInput: | ||
|
aviruthen marked this conversation as resolved.
aviruthen marked this conversation as resolved.
|
||
| """Creates a ProcessingInput from a local path or S3 URI. | ||
|
|
||
| This factory provides V2-like ergonomics where users pass a ``source`` | ||
| parameter that can be either a local file/directory path or an S3 URI. | ||
| If ``source`` is a local path (does not start with ``s3://``), it will | ||
| be automatically uploaded to S3 when the processor runs. | ||
|
|
||
| Args: | ||
| source: A local file/directory path or S3 URI. Local paths will | ||
| be uploaded to S3 automatically before the processing job | ||
| starts. | ||
| destination: The container path where the input data will be | ||
| made available (e.g. ``/opt/ml/processing/input/data``). | ||
| input_name: The name for this processing input. | ||
| s3_data_type: The S3 data type. Valid values: ``'S3Prefix'``, | ||
| ``'ManifestFile'`` (default: ``'S3Prefix'``). | ||
| s3_input_mode: The input mode. Valid values: ``'File'``, | ||
| ``'Pipe'`` (default: ``'File'``). | ||
|
|
||
| Returns: | ||
| ProcessingInput: A ``ProcessingInput`` object. If ``source`` is a | ||
| local path, it is stored in ``ProcessingS3Input.s3_uri`` and | ||
| will be uploaded to S3 during ``Processor.run()``. | ||
|
|
||
| Raises: | ||
| ValueError: If ``source``, ``destination``, or ``input_name`` is empty. | ||
|
|
||
| Example: | ||
| >>> # Using a local path | ||
| >>> inp = create_processing_input( | ||
| ... source="/home/user/data/", | ||
| ... destination="/opt/ml/processing/input/data", | ||
| ... input_name="my-data", | ||
| ... ) | ||
| >>> # Using an S3 URI | ||
| >>> inp = create_processing_input( | ||
| ... source="s3://my-bucket/data/", | ||
| ... destination="/opt/ml/processing/input/data", | ||
| ... input_name="my-data", | ||
| ... ) | ||
| >>> processor.run(inputs=[inp]) | ||
| """ | ||
| if not source: | ||
| raise ValueError(f"source must be a non-empty string, got: {source!r}") | ||
| if not destination: | ||
| raise ValueError( | ||
| f"destination must be a non-empty string, got: {destination!r}" | ||
| ) | ||
| if not input_name: | ||
| raise ValueError( | ||
| f"input_name must be a non-empty string, got: {input_name!r}" | ||
| ) | ||
| return ProcessingInput( | ||
| input_name=input_name, | ||
| s3_input=ProcessingS3Input( | ||
| s3_uri=source, | ||
| local_path=destination, | ||
| s3_data_type=s3_data_type, | ||
| s3_input_mode=s3_input_mode, | ||
| ), | ||
| ) | ||
|
|
||
|
|
||
| class Processor(object): | ||
| """Handles Amazon SageMaker Processing tasks.""" | ||
|
|
||
|
|
@@ -238,6 +369,14 @@ def run( | |
| inputs (list[:class:`~sagemaker.core.shapes.ProcessingInput`]): Input files for | ||
| the processing job. These must be provided as | ||
| :class:`~sagemaker.core.shapes.ProcessingInput` objects (default: None). | ||
|
|
||
| .. note:: | ||
| ``ProcessingS3Input.s3_uri`` can accept local file paths in addition | ||
| to S3 URIs. Local paths will be automatically uploaded to S3 before | ||
| the processing job starts. For clearer intent when using local paths, | ||
| consider using the :func:`processing_input_from_local` or | ||
| :func:`create_processing_input` convenience factories. | ||
|
|
||
| outputs (list[:class:`~sagemaker.core.shapes.ProcessingOutput`]): Outputs for | ||
| the processing job. These can be specified as either path strings or | ||
| :class:`~sagemaker.core.shapes.ProcessingOutput` objects (default: None). | ||
|
|
@@ -401,6 +540,13 @@ def _normalize_inputs(self, inputs=None, kms_key=None): | |
|
|
||
| Raises: | ||
| TypeError: if the inputs are not ``ProcessingInput`` objects. | ||
|
|
||
| Note: | ||
| ``ProcessingS3Input.s3_uri`` can accept local file or directory paths | ||
| in addition to S3 URIs. Local paths will be automatically uploaded | ||
| to S3 before the processing job starts. For clearer intent when | ||
| using local paths, consider using :func:`processing_input_from_local` | ||
| or :func:`create_processing_input`. | ||
| """ | ||
| from sagemaker.core.workflow.utilities import _pipeline_config | ||
|
|
||
|
|
@@ -413,7 +559,23 @@ def _normalize_inputs(self, inputs=None, kms_key=None): | |
| raise TypeError("Your inputs must be provided as ProcessingInput objects.") | ||
| # Generate a name for the ProcessingInput if it doesn't have one. | ||
| if file_input.input_name is None: | ||
|
aviruthen marked this conversation as resolved.
|
||
| file_input.input_name = "input-{}".format(count) | ||
| file_input.input_name = f"input-{count}" | ||
|
|
||
| # Support ad-hoc 'source' attribute for V2-like ergonomics. | ||
| # If a user sets file_input.source (e.g. via monkey-patching or | ||
| # a subclass), populate s3_input.s3_uri from it so the existing | ||
| # upload logic handles it. | ||
| _source = getattr(file_input, "source", None) | ||
| if _source is not None: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Critical design concern: This relies on users monkey-patching an ad-hoc Moreover, encouraging monkey-patching on auto-generated shapes is a maintenance hazard — if the shape ever adds a real Recommendation: Remove this entire |
||
| if file_input.s3_input is None: | ||
| file_input.s3_input = ProcessingS3Input( | ||
| s3_uri=_source, | ||
| local_path="/opt/ml/processing/input", | ||
| s3_data_type="S3Prefix", | ||
| s3_input_mode="File", | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hardcoded default DEFAULT_PROCESSING_INPUT_PATH = "/opt/ml/processing/input"This is also used in the factory functions' docstrings and examples, so a constant would ensure consistency. |
||
| ) | ||
| else: | ||
| file_input.s3_input.s3_uri = _source | ||
|
|
||
| if file_input.dataset_definition: | ||
| normalized_inputs.append(file_input) | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.