|
| 1 | +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. |
| 2 | +# |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License"). You |
| 4 | +# may not use this file except in compliance with the License. A copy of |
| 5 | +# the License is located at |
| 6 | +# |
| 7 | +# http://aws.amazon.com/apache2.0/ |
| 8 | +# |
| 9 | +# or in the "license" file accompanying this file. This file is |
| 10 | +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF |
| 11 | +# ANY KIND, either express or implied. See the License for the specific |
| 12 | +# language governing permissions and limitations under the License. |
| 13 | +"""Implementation of ModelBuilder.start_benchmark and start_inference_recommendation.""" |
| 14 | +from __future__ import absolute_import |
| 15 | + |
| 16 | +import time |
| 17 | +import uuid |
| 18 | +from typing import List, Optional, Union |
| 19 | + |
| 20 | +from sagemaker.core.helper.session_helper import Session, get_execution_role |
| 21 | +from sagemaker.core.resources import ( |
| 22 | + AIBenchmarkJob, |
| 23 | + AIRecommendationJob, |
| 24 | + AIWorkloadConfig, |
| 25 | + Endpoint, |
| 26 | +) |
| 27 | +from sagemaker.core.shapes.shapes import ( |
| 28 | + AIBenchmarkEndpoint, |
| 29 | + AIBenchmarkInferenceComponent, |
| 30 | + AIBenchmarkNetworkConfig, |
| 31 | + AIBenchmarkOutputConfig, |
| 32 | + AIBenchmarkTarget, |
| 33 | + AICapacityReservationConfig, |
| 34 | + AIModelSource, |
| 35 | + AIModelSourceS3, |
| 36 | + AIRecommendationComputeSpec, |
| 37 | + AIRecommendationConstraint, |
| 38 | + AIRecommendationInferenceSpecification, |
| 39 | + AIRecommendationOutputConfig, |
| 40 | + AIRecommendationPerformanceTarget, |
| 41 | + AIWorkloadConfigs, |
| 42 | + Tag, |
| 43 | + VpcConfig, |
| 44 | + WorkloadSpec, |
| 45 | +) |
| 46 | +from sagemaker.serve.ai_inference_recommender._constants import MAX_INSTANCE_TYPES |
| 47 | +from sagemaker.serve.ai_inference_recommender.workload import Workload |
| 48 | + |
| 49 | + |
| 50 | +def start_benchmark( |
| 51 | + builder, # ModelBuilder; not annotated to avoid a circular import. |
| 52 | + endpoint: Union[Endpoint, str], |
| 53 | + workload: Union[Workload, str], |
| 54 | + *, |
| 55 | + output_path: Optional[str] = None, |
| 56 | + role: Optional[str] = None, |
| 57 | + inference_components: Optional[List[str]] = None, |
| 58 | + vpc_config: Optional[VpcConfig] = None, |
| 59 | + tags: Optional[List[Tag]] = None, |
| 60 | + name: Optional[str] = None, |
| 61 | + workload_config_name: Optional[str] = None, |
| 62 | + wait: bool = False, |
| 63 | +) -> AIBenchmarkJob: |
| 64 | + """Start an AI benchmark job against a SageMaker endpoint. |
| 65 | +
|
| 66 | + Args: |
| 67 | + endpoint: An ``Endpoint`` resource, or the name/ARN of an existing |
| 68 | + endpoint to benchmark. |
| 69 | + workload: Either a ``Workload`` (auto-creates a workload config) or |
| 70 | + the name/ARN of an existing ``AIWorkloadConfig``. |
| 71 | + output_path: ``s3://`` URI for benchmark output. Defaults to the |
| 72 | + session's default bucket. |
| 73 | + role: IAM execution role ARN. Defaults to the SageMaker execution |
| 74 | + role from the ambient session. |
| 75 | + inference_components: Optional list of inference component names to |
| 76 | + target on the endpoint. |
| 77 | + vpc_config: Optional ``VpcConfig`` for VPC-only endpoints. |
| 78 | + tags: Optional resource tags. |
| 79 | + name: Optional benchmark job name. Auto-generated if omitted. |
| 80 | + workload_config_name: Optional name for the auto-created workload |
| 81 | + config. Auto-generated if omitted. |
| 82 | + wait: If True, block until the job reaches a terminal state. |
| 83 | +
|
| 84 | + Returns: |
| 85 | + The created ``AIBenchmarkJob`` resource. After it reaches a terminal |
| 86 | + state, pass it to ``BenchmarkResult.from_job(job)`` to retrieve the |
| 87 | + parsed metrics. |
| 88 | + """ |
| 89 | + sagemaker_session = Session() |
| 90 | + role_arn = role or get_execution_role(sagemaker_session=sagemaker_session) |
| 91 | + output_location = output_path or _default_output_path(sagemaker_session, "benchmarks") |
| 92 | + |
| 93 | + workload_config_id = _ensure_workload_config(workload, workload_config_name, tags=tags) |
| 94 | + |
| 95 | + endpoint_name = endpoint.endpoint_name if isinstance(endpoint, Endpoint) else endpoint |
| 96 | + components = ( |
| 97 | + [AIBenchmarkInferenceComponent(identifier=ic) for ic in inference_components] |
| 98 | + if inference_components |
| 99 | + else None |
| 100 | + ) |
| 101 | + target = AIBenchmarkTarget( |
| 102 | + endpoint=AIBenchmarkEndpoint( |
| 103 | + identifier=endpoint_name, |
| 104 | + inference_components=components, |
| 105 | + ) |
| 106 | + ) |
| 107 | + network_config = ( |
| 108 | + AIBenchmarkNetworkConfig(vpc_config=vpc_config) if vpc_config else None |
| 109 | + ) |
| 110 | + |
| 111 | + suffix = uuid.uuid4().hex[:8] |
| 112 | + job_name = name or f"sm-bench-{int(time.time())}-{suffix}" |
| 113 | + |
| 114 | + job = AIBenchmarkJob.create( |
| 115 | + ai_benchmark_job_name=job_name, |
| 116 | + benchmark_target=target, |
| 117 | + output_config=AIBenchmarkOutputConfig(s3_output_location=output_location), |
| 118 | + ai_workload_config_identifier=workload_config_id, |
| 119 | + role_arn=role_arn, |
| 120 | + network_config=network_config, |
| 121 | + tags=tags, |
| 122 | + ) |
| 123 | + if wait: |
| 124 | + job.wait() |
| 125 | + return job |
| 126 | + |
| 127 | + |
| 128 | +def start_inference_recommendation( |
| 129 | + builder, # ModelBuilder; not annotated to avoid a circular import. |
| 130 | + workload: Union[Workload, str], |
| 131 | + performance_target: str, |
| 132 | + *, |
| 133 | + output_path: Optional[str] = None, |
| 134 | + role: Optional[str] = None, |
| 135 | + instance_types: Optional[List[str]] = None, |
| 136 | + capacity_reservation_arns: Optional[List[str]] = None, |
| 137 | + optimize_model: bool = True, |
| 138 | + framework: Optional[str] = None, |
| 139 | + model_package_group: Optional[str] = None, |
| 140 | + tags: Optional[List[Tag]] = None, |
| 141 | + name: Optional[str] = None, |
| 142 | + workload_config_name: Optional[str] = None, |
| 143 | + wait: bool = False, |
| 144 | +) -> AIRecommendationJob: |
| 145 | + """Start an AI recommendation job for the model configured on this builder. |
| 146 | +
|
| 147 | + Args: |
| 148 | + workload: Either a ``Workload`` (auto-creates a workload config) or |
| 149 | + the name/ARN of an existing ``AIWorkloadConfig``. |
| 150 | + performance_target: One of ``"throughput"``, ``"ttft-ms"``, or |
| 151 | + ``"cost"``. |
| 152 | + output_path: ``s3://`` URI for recommendation output. Defaults to |
| 153 | + the session's default bucket. |
| 154 | + role: IAM execution role ARN. Defaults to the SageMaker execution |
| 155 | + role from the ambient session. |
| 156 | + instance_types: Up to 3 instance types to consider. |
| 157 | + capacity_reservation_arns: Optional list of ML reservation ARNs. |
| 158 | + optimize_model: If True (default), allow optimization techniques |
| 159 | + like speculative decoding and kernel tuning. |
| 160 | + framework: Inference framework. ``"LMI"`` or ``"VLLM"``. |
| 161 | + model_package_group: Optional model package group identifier in |
| 162 | + which to register the optimized model. |
| 163 | + tags: Optional resource tags. |
| 164 | + name: Optional recommendation job name. Auto-generated if omitted. |
| 165 | + workload_config_name: Optional name for the auto-created workload |
| 166 | + config. Auto-generated if omitted. |
| 167 | + wait: If True, block until the job reaches a terminal state. |
| 168 | +
|
| 169 | + Returns: |
| 170 | + The created ``AIRecommendationJob`` resource. |
| 171 | + """ |
| 172 | + sagemaker_session = Session() |
| 173 | + role_arn = role or get_execution_role(sagemaker_session=sagemaker_session) |
| 174 | + output_location = output_path or _default_output_path( |
| 175 | + sagemaker_session, "recommendations" |
| 176 | + ) |
| 177 | + |
| 178 | + s3_uri = _resolve_model_s3_uri(builder) |
| 179 | + if not s3_uri: |
| 180 | + raise ValueError( |
| 181 | + "ModelBuilder must be configured with an S3 model_path before " |
| 182 | + "calling start_inference_recommendation()." |
| 183 | + ) |
| 184 | + |
| 185 | + if instance_types and len(instance_types) > MAX_INSTANCE_TYPES: |
| 186 | + raise ValueError( |
| 187 | + f"At most {MAX_INSTANCE_TYPES} instance_types are accepted; " |
| 188 | + f"got {len(instance_types)}." |
| 189 | + ) |
| 190 | + |
| 191 | + workload_config_id = _ensure_workload_config(workload, workload_config_name, tags=tags) |
| 192 | + |
| 193 | + suffix = uuid.uuid4().hex[:8] |
| 194 | + job_name = name or f"sm-rec-{int(time.time())}-{suffix}" |
| 195 | + |
| 196 | + compute_spec = None |
| 197 | + if instance_types or capacity_reservation_arns: |
| 198 | + capacity = ( |
| 199 | + AICapacityReservationConfig( |
| 200 | + capacity_reservation_preference="capacity-reservations-only", |
| 201 | + ml_reservation_arns=capacity_reservation_arns, |
| 202 | + ) |
| 203 | + if capacity_reservation_arns |
| 204 | + else None |
| 205 | + ) |
| 206 | + compute_spec = AIRecommendationComputeSpec( |
| 207 | + instance_types=instance_types, |
| 208 | + capacity_reservation_config=capacity, |
| 209 | + ) |
| 210 | + |
| 211 | + inference_spec = ( |
| 212 | + AIRecommendationInferenceSpecification(framework=framework) if framework else None |
| 213 | + ) |
| 214 | + |
| 215 | + job = AIRecommendationJob.create( |
| 216 | + ai_recommendation_job_name=job_name, |
| 217 | + model_source=AIModelSource(s3=AIModelSourceS3(s3_uri=s3_uri)), |
| 218 | + output_config=AIRecommendationOutputConfig( |
| 219 | + s3_output_location=output_location, |
| 220 | + model_package_group_identifier=model_package_group, |
| 221 | + ), |
| 222 | + ai_workload_config_identifier=workload_config_id, |
| 223 | + performance_target=AIRecommendationPerformanceTarget( |
| 224 | + constraints=[AIRecommendationConstraint(metric=performance_target)], |
| 225 | + ), |
| 226 | + role_arn=role_arn, |
| 227 | + inference_specification=inference_spec, |
| 228 | + optimize_model=optimize_model, |
| 229 | + compute_spec=compute_spec, |
| 230 | + tags=tags, |
| 231 | + ) |
| 232 | + if wait: |
| 233 | + job.wait() |
| 234 | + return job |
| 235 | + |
| 236 | + |
| 237 | +def _resolve_model_s3_uri(builder) -> Optional[str]: |
| 238 | + for attr in ("model_path", "s3_upload_path", "s3_model_data_url"): |
| 239 | + candidate = getattr(builder, attr, None) |
| 240 | + if isinstance(candidate, str) and candidate.startswith("s3://"): |
| 241 | + return candidate |
| 242 | + return None |
| 243 | + |
| 244 | + |
| 245 | +def _ensure_workload_config( |
| 246 | + workload: Union[Workload, str], |
| 247 | + name: Optional[str], |
| 248 | + *, |
| 249 | + tags: Optional[List[Tag]] = None, |
| 250 | +) -> str: |
| 251 | + if isinstance(workload, str): |
| 252 | + return workload |
| 253 | + |
| 254 | + config_name = name or f"sm-wl-{int(time.time())}-{uuid.uuid4().hex[:8]}" |
| 255 | + AIWorkloadConfig.create( |
| 256 | + ai_workload_config_name=config_name, |
| 257 | + ai_workload_configs=AIWorkloadConfigs( |
| 258 | + workload_spec=WorkloadSpec(inline=workload.to_inline()), |
| 259 | + ), |
| 260 | + tags=tags, |
| 261 | + ) |
| 262 | + return config_name |
| 263 | + |
| 264 | + |
| 265 | +def _default_output_path(session: Session, prefix: str) -> str: |
| 266 | + bucket = session.default_bucket() |
| 267 | + return f"s3://{bucket}/{prefix}/" |
0 commit comments