Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions sagemaker-core/src/sagemaker/core/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
from sagemaker.core.local.local_session import LocalSession
from sagemaker.core.helper.session_helper import Session
from sagemaker.core.shapes import ProcessingInput, ProcessingOutput, ProcessingS3Input
from sagemaker.core.shapes.shapes import _DEFAULT_S3_DATA_TYPE, _DEFAULT_S3_INPUT_MODE
from sagemaker.core.resources import ProcessingJob
from sagemaker.core.workflow.pipeline_context import PipelineSession
from sagemaker.core.common_utils import (
Expand Down Expand Up @@ -418,12 +419,24 @@ def _normalize_inputs(self, inputs=None, kms_key=None):
if file_input.dataset_definition:
normalized_inputs.append(file_input)
continue
if file_input.s3_input and is_pipeline_variable(file_input.s3_input.s3_uri):
if file_input.s3_input is None:
raise ValueError(
f"ProcessingInput '{file_input.input_name}' has no "
"s3_input or dataset_definition. Provide 'source', "
"'s3_input', or 'dataset_definition'."
)
if is_pipeline_variable(file_input.s3_input.s3_uri):
normalized_inputs.append(file_input)
continue
# If the s3_uri is not an s3_uri, create one.
parse_result = urlparse(file_input.s3_input.s3_uri)
if parse_result.scheme != "s3":
local_path = file_input.s3_input.s3_uri
logger.info(
"Uploading local input '%s' (%s) to S3...",
file_input.input_name,
local_path,
)
if _pipeline_config:
desired_s3_uri = s3.s3_path_join(
"s3://",
Expand All @@ -444,7 +457,7 @@ def _normalize_inputs(self, inputs=None, kms_key=None):
file_input.input_name,
)
s3_uri = s3.S3Uploader.upload(
local_path=file_input.s3_input.s3_uri,
local_path=local_path,
desired_s3_uri=desired_s3_uri,
sagemaker_session=self.sagemaker_session,
kms_key=kms_key,
Expand Down
51 changes: 49 additions & 2 deletions sagemaker-core/src/sagemaker/core/shapes/shapes.py
Original file line number Diff line number Diff line change
Expand Up @@ -6354,7 +6354,7 @@ class ProcessingS3Input(Base):

Attributes
----------------------
s3_uri: The URI of the Amazon S3 prefix Amazon SageMaker downloads data required to run a processing job.
s3_uri: The URI of the Amazon S3 prefix Amazon SageMaker downloads data required to run a processing job. Also accepts local file or directory paths, which will be automatically uploaded to S3 during job normalization.
local_path: The local path in your container where you want Amazon SageMaker to write input data to. LocalPath is an absolute path to the input data and must begin with /opt/ml/processing/. LocalPath is a required parameter when AppManaged is False (default).
s3_data_type: Whether you use an S3Prefix or a ManifestFile for the data type. If you choose S3Prefix, S3Uri identifies a key name prefix. Amazon SageMaker uses all objects with the specified key name prefix for the processing job. If you choose ManifestFile, S3Uri identifies an object that is a manifest file containing a list of object keys that you want Amazon SageMaker to use for the processing job.
s3_input_mode: Whether to use File or Pipe input mode. In File mode, Amazon SageMaker copies the data from the input source onto the local ML storage volume before starting your processing container. This is the most commonly used input mode. In Pipe mode, Amazon SageMaker streams input data from the source directly to your processing container into named pipes without using the ML storage volume.
Expand Down Expand Up @@ -6474,6 +6474,11 @@ class DatasetDefinition(Base):
snowflake_dataset_definition: Optional[SnowflakeDatasetDefinition] = Unassigned()


# Default constants for ProcessingS3Input creation from source parameter
_DEFAULT_S3_DATA_TYPE = "S3Prefix"
_DEFAULT_S3_INPUT_MODE = "File"


class ProcessingInput(Base):
"""
ProcessingInput
Expand All @@ -6485,12 +6490,54 @@ class ProcessingInput(Base):
app_managed: When True, input operations such as data download are managed natively by the processing job application. When False (default), input operations are managed by Amazon SageMaker.
s3_input: Configuration for downloading input data from Amazon S3 into the processing container.
dataset_definition: Configuration for a Dataset Definition input.
source: Convenience parameter that accepts a local file/directory path or S3 URI.
When provided (and s3_input is not), a ProcessingS3Input is automatically created.
Local paths will be uploaded to S3 during job normalization.
Cannot be specified together with s3_input.
"""

model_config = ConfigDict(
protected_namespaces=(),
validate_assignment=True,
extra="forbid",
json_schema_extra={"exclude": {"source"}},
)

input_name: StrPipeVar
app_managed: Optional[bool] = Unassigned()
s3_input: Optional[ProcessingS3Input] = Unassigned()
dataset_definition: Optional[DatasetDefinition] = Unassigned()
source: Optional[StrPipeVar] = Field(default=None, exclude=True)

@classmethod
def _validate_source_and_s3_input(cls, values):
"""Validate and handle the source convenience parameter."""
source = values.get("source")
s3_input = values.get("s3_input")

if source is not None and s3_input is not None and not isinstance(
s3_input, type(Unassigned())
):
raise ValueError(
"Cannot specify both 'source' and 's3_input'. "
"Use 'source' for convenience (local paths or S3 URIs) "
"or 's3_input' for full control, but not both."
)

if source is not None and (
s3_input is None or isinstance(s3_input, type(Unassigned()))
):
values["s3_input"] = ProcessingS3Input(
s3_uri=source,
s3_data_type=_DEFAULT_S3_DATA_TYPE,
s3_input_mode=_DEFAULT_S3_INPUT_MODE,
)

return values

def __init__(self, **data):
data = ProcessingInput._validate_source_and_s3_input(data)
super().__init__(**data)


class EndpointInput(Base):
Expand Down Expand Up @@ -8577,7 +8624,7 @@ class InferenceComponentComputeResourceRequirements(Base):
max_memory_required_in_mb: The maximum MB of memory to allocate to run a model that you assign to an inference component.
"""

min_memory_required_in_mb: int
min_memory_required_in_mb: Optional[int] = Unassigned()
Comment thread
aviruthen marked this conversation as resolved.
number_of_cpu_cores_required: Optional[float] = Unassigned()
number_of_accelerator_devices_required: Optional[float] = Unassigned()
max_memory_required_in_mb: Optional[int] = Unassigned()
Expand Down
Loading
Loading