diff --git a/s3_templates/mlops-github-actions/README.md b/s3_templates/mlops-github-actions/README.md index 7c161d33..cd21e4a8 100644 --- a/s3_templates/mlops-github-actions/README.md +++ b/s3_templates/mlops-github-actions/README.md @@ -56,7 +56,7 @@ key=sagemaker value=true In the above example, `aEXAMPLE-8aad-4d5d-8878-dfcab0bc441f` is the unique ID for this connection. We use this ID when we create our SageMaker project later in this example. -### 2. GitHub Personal Access Token +### 2. GitHub Personal Access Token (PAT) Create a GitHub personal access token with access to **Contents** and **Actions** permissions, following the instructions on [Managing your personal access tokens](https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/managing-your-personal-access-tokens) > Note: You can create either classic or fine-grained access token. However, make sure the token has access to the Contents and Actions (workflows, runs and artifacts) for that repository. @@ -80,6 +80,18 @@ Create a GitHub personal access token with access to **Contents** and **Actions* * ✅workflow(Update GitHub Action workflows) - Required * Click "Generate token" +> ⚠️ **Note:** If your repo is part of an Organization, your PAT must be granted access to that Organization. + +* Ensure your token has access to the organization with the required repository permissions (Actions, Contents, Metadata, Workflows). +![](./images/org-pta-1.png) + +* Go to Organization Settings → Third-party Access → Personal access tokens and approve the token request. +![](./images/org-pta-2.png) + +* Confirm your token appears as active before proceeding +![](./images/org-pta-3.png) + + **then store it in AWS Secrets Manager.** ```bash @@ -297,7 +309,7 @@ Add these secrets to your GitHub repository as follows: To create a manual approval step in our deployment pipelines, we use a [GitHub environment](https://docs.github.com/en/actions/how-tos/deploy/configure-and-manage-deployments/manage-environments). Complete the following steps: 1. Go to your repository **Settings** > **Environments** 2. Create environment named `production` -3. Add required reviewers for deployment approval. These are the people who can approve model deployment to production environment +3. Add required reviewers for deployment approval (only available on public repo if using the free version). These are the people who can approve model deployment to production environment ![](./images/reviewer.png) ## Template Deployment @@ -395,7 +407,8 @@ After creating the project: ```yaml env: AWS_REGION: # Your AWS region - SAGEMAKER_PROJECT_NAME: your-project-name # Your project name + SAGEMAKER_PROJECT_NAME: # Your project name + MLFLOW_TRACKING_APP_ARN: "" # Optional: ARN of SageMaker managed MLflow Tracking Server ``` 2. **Test the Pipeline:** @@ -408,6 +421,10 @@ After creating the project: The template will then create two automated ModelOps workflows—one for model building and one for model deployment—that work together to provide CI/CD for your ML models. ![](./images/sagemaker_pipeline.png) + +Pipeline experiments are automatically tracked in the SageMaker Managed MLflow App. You can view the experiment, individual step runs, metrics, datasets, and registered models. + ![](./images/MLflow-1.png) + ![](./images/MLflow-2.png) ## Clean up After deployment, you will incur costs for the deployed resources. If you don’t intend to continue using the setup, delete the ModelOps project resources to avoid unnecessary charges. @@ -446,4 +463,4 @@ In addition to deleting a project, which will remove and deprovision the SageMak ## License -This template is licensed under the MIT-0 License. See the LICENSE file for details. +This template is licensed under the MIT-0 License. See the LICENSE file for details. \ No newline at end of file diff --git a/s3_templates/mlops-github-actions/iam/GithubActionsMLOpsExecutionPolicy.json b/s3_templates/mlops-github-actions/iam/GithubActionsMLOpsExecutionPolicy.json index d81164e2..e73a6afe 100644 --- a/s3_templates/mlops-github-actions/iam/GithubActionsMLOpsExecutionPolicy.json +++ b/s3_templates/mlops-github-actions/iam/GithubActionsMLOpsExecutionPolicy.json @@ -5,7 +5,8 @@ "Effect": "Allow", "Action": [ "s3:CreateBucket", - "s3:PutObject" + "s3:PutObject", + "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::sagemaker-*" diff --git a/s3_templates/mlops-github-actions/images/MLflow-1.png b/s3_templates/mlops-github-actions/images/MLflow-1.png new file mode 100644 index 00000000..a71fd77a Binary files /dev/null and b/s3_templates/mlops-github-actions/images/MLflow-1.png differ diff --git a/s3_templates/mlops-github-actions/images/MLflow-2.png b/s3_templates/mlops-github-actions/images/MLflow-2.png new file mode 100644 index 00000000..7a07f196 Binary files /dev/null and b/s3_templates/mlops-github-actions/images/MLflow-2.png differ diff --git a/s3_templates/mlops-github-actions/images/org-pta-1.png b/s3_templates/mlops-github-actions/images/org-pta-1.png new file mode 100644 index 00000000..34f17d85 Binary files /dev/null and b/s3_templates/mlops-github-actions/images/org-pta-1.png differ diff --git a/s3_templates/mlops-github-actions/images/org-pta-2.png b/s3_templates/mlops-github-actions/images/org-pta-2.png new file mode 100644 index 00000000..48b56565 Binary files /dev/null and b/s3_templates/mlops-github-actions/images/org-pta-2.png differ diff --git a/s3_templates/mlops-github-actions/images/org-pta-3.png b/s3_templates/mlops-github-actions/images/org-pta-3.png new file mode 100644 index 00000000..cd088400 Binary files /dev/null and b/s3_templates/mlops-github-actions/images/org-pta-3.png differ diff --git a/s3_templates/mlops-github-actions/lambda_functions/lambda_function.py b/s3_templates/mlops-github-actions/lambda_functions/lambda_function.py index 25f248c5..1d5dbcce 100644 --- a/s3_templates/mlops-github-actions/lambda_functions/lambda_function.py +++ b/s3_templates/mlops-github-actions/lambda_functions/lambda_function.py @@ -67,7 +67,8 @@ def lambda_handler(event, context): # Getting repository and trigger the deploy GitHub workflow try: - repo = g.get_user().get_repo(github_repo_name) + print("new lambda") + repo = g.get_repo(github_repo_name) workflow = repo.get_workflow(github_workflow_name) branch = repo.get_branch("main") res = workflow.create_dispatch(branch) diff --git a/s3_templates/mlops-github-actions/seedcode/.github/workflows/build.yml b/s3_templates/mlops-github-actions/seedcode/.github/workflows/build.yml index a5b7a91c..3d37dda4 100644 --- a/s3_templates/mlops-github-actions/seedcode/.github/workflows/build.yml +++ b/s3_templates/mlops-github-actions/seedcode/.github/workflows/build.yml @@ -6,8 +6,9 @@ on: paths: - pipelines/** env: - AWS_REGION: us-west-2 - SAGEMAKER_PROJECT_NAME: custom-build-deploy + AWS_REGION: # AWS Region + SAGEMAKER_PROJECT_NAME: # Your SageMaker AI project name + MLFLOW_TRACKING_SERVER_ARN: "" # Optional: ARN of SageMaker managed MLflow Tracking Server jobs: Build: @@ -49,4 +50,4 @@ jobs: run-pipeline --module-name pipelines.abalone.pipeline \ --role-arn ${SAGEMAKER_PIPELINE_ROLE_ARN} \ --tags "[{\"Key\":\"sagemaker:project-name\", \"Value\":\"${SAGEMAKER_PROJECT_NAME}\"}, {\"Key\":\"sagemaker:project-id\", \"Value\":\"${SAGEMAKER_PROJECT_ID}\"}]" \ - --kwargs "{\"region\":\"${AWS_REGION}\",\"sagemaker_project_arn\":\"${SAGEMAKER_PROJECT_ARN}\",\"role\":\"${SAGEMAKER_PIPELINE_ROLE_ARN}\",\"default_bucket\":\"${ARTIFACT_BUCKET}\",\"pipeline_name\":\"${SAGEMAKER_PROJECT_NAME_ID}\",\"model_package_group_name\":\"${SAGEMAKER_PROJECT_NAME_ID}\",\"base_job_prefix\":\"${SAGEMAKER_PROJECT_NAME_ID}\"}" + --kwargs "{\"region\":\"${AWS_REGION}\",\"sagemaker_project_arn\":\"${SAGEMAKER_PROJECT_ARN}\",\"role\":\"${SAGEMAKER_PIPELINE_ROLE_ARN}\",\"default_bucket\":\"${ARTIFACT_BUCKET}\",\"pipeline_name\":\"${SAGEMAKER_PROJECT_NAME_ID}\",\"model_package_group_name\":\"${SAGEMAKER_PROJECT_NAME_ID}\",\"base_job_prefix\":\"${SAGEMAKER_PROJECT_NAME_ID}\",\"mlflow_tracking_arn\":\"${{ env.MLFLOW_TRACKING_SERVER_ARN }}\"}" diff --git a/s3_templates/mlops-github-actions/seedcode/.github/workflows/deploy.yml b/s3_templates/mlops-github-actions/seedcode/.github/workflows/deploy.yml index 5b51b139..16ebcd1b 100644 --- a/s3_templates/mlops-github-actions/seedcode/.github/workflows/deploy.yml +++ b/s3_templates/mlops-github-actions/seedcode/.github/workflows/deploy.yml @@ -3,8 +3,8 @@ name: DeploySageMakerModel on: workflow_dispatch env: - AWS_REGION: us-west-2 - SAGEMAKER_PROJECT_NAME: custom-build-deploy + AWS_REGION: # AWS Region + SAGEMAKER_PROJECT_NAME: # Your SageMaker AI project name EXPORT_TEMPLATE_STAGING_CONFIG: "staging-config-export.json" EXPORT_TEMPLATE_PROD_CONFIG: "prod-config-export.json" diff --git a/s3_templates/mlops-github-actions/seedcode/pipelines/abalone/evaluate.py b/s3_templates/mlops-github-actions/seedcode/pipelines/abalone/evaluate.py index 6ddf06d8..9da761f7 100644 --- a/s3_templates/mlops-github-actions/seedcode/pipelines/abalone/evaluate.py +++ b/s3_templates/mlops-github-actions/seedcode/pipelines/abalone/evaluate.py @@ -19,6 +19,11 @@ if __name__ == "__main__": logger.debug("Starting evaluation.") + + # MLflow child run for evaluation + from mlflow_helper import setup_mlflow, end_mlflow + mlflow_enabled = setup_mlflow("EvaluateAbaloneModel") + model_path = "/opt/ml/processing/model/model.tar.gz" with tarfile.open(model_path) as tar: tar.extractall(path=".") @@ -50,6 +55,14 @@ }, } + # Log evaluation metrics to MLflow + if mlflow_enabled: + try: + import mlflow + mlflow.log_metrics({"mse": mse, "mse_std": std}) + except Exception as e: + logger.warning("Failed to log evaluation metrics to MLflow: %s", e) + output_dir = "/opt/ml/processing/evaluation" pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True) @@ -57,3 +70,5 @@ evaluation_path = f"{output_dir}/evaluation.json" with open(evaluation_path, "w") as f: f.write(json.dumps(report_dict)) + + end_mlflow(mlflow_enabled) diff --git a/s3_templates/mlops-github-actions/seedcode/pipelines/abalone/mlflow_helper.py b/s3_templates/mlops-github-actions/seedcode/pipelines/abalone/mlflow_helper.py new file mode 100644 index 00000000..8fd77e27 --- /dev/null +++ b/s3_templates/mlops-github-actions/seedcode/pipelines/abalone/mlflow_helper.py @@ -0,0 +1,77 @@ +"""Shared MLflow helper for pipeline steps. + +Provides setup/teardown for MLflow runs. Each pipeline step creates its own +run under a shared experiment (the pipeline name). When MLFLOW_TRACKING_ARN +is not set, all functions are no-ops. +""" +import logging +import os + +logger = logging.getLogger(__name__) + + +def _install_mlflow(): + """Install MLflow dependencies at runtime if not already available.""" + try: + import mlflow # noqa: F401 + return True + except ImportError: + pass + try: + import subprocess + import sys + subprocess.check_call( + [sys.executable, "-m", "pip", "install", "mlflow", "sagemaker-mlflow==0.2.0", "-q"] + ) + return True + except Exception as e: + logger.warning("Failed to install MLflow: %s", e) + return False + + +def setup_mlflow(step_name): + """Set up MLflow tracking for a pipeline step. + + Args: + step_name: Name for this run (e.g. "PreprocessAbaloneData") + + Returns: + True if MLflow tracking is active, False otherwise. + """ + tracking_arn = os.environ.get("MLFLOW_TRACKING_ARN", "") + if not tracking_arn: + logger.info("MLFLOW_TRACKING_ARN not set. MLflow tracking disabled.") + return False + + if not _install_mlflow(): + return False + + try: + import mlflow + + mlflow.set_tracking_uri(tracking_arn) + + experiment_name = os.environ.get("MLFLOW_EXPERIMENT_NAME", "Default") + mlflow.set_experiment(experiment_name) + mlflow.start_run(run_name=step_name) + + logger.info("MLflow run started: %s (experiment=%s)", step_name, experiment_name) + return True + except Exception as e: + logger.warning("Failed to set up MLflow: %s. Continuing without tracking.", e) + return False + + +def end_mlflow(mlflow_enabled): + """End the current MLflow run. + + Args: + mlflow_enabled: Return value from setup_mlflow(). + """ + if not mlflow_enabled: + return + try: + import mlflow + mlflow.end_run() + except Exception as e: + logger.warning("Failed to end MLflow run: %s", e) \ No newline at end of file diff --git a/s3_templates/mlops-github-actions/seedcode/pipelines/abalone/pipeline.py b/s3_templates/mlops-github-actions/seedcode/pipelines/abalone/pipeline.py index a9eb6ead..d777fe1f 100644 --- a/s3_templates/mlops-github-actions/seedcode/pipelines/abalone/pipeline.py +++ b/s3_templates/mlops-github-actions/seedcode/pipelines/abalone/pipeline.py @@ -7,62 +7,75 @@ . -(stop) Implements a get_pipeline(**kwargs) method. + +Uses the SageMaker Python SDK v3 API: +- ModelTrainer (replaces Estimator) +- ScriptProcessor from sagemaker.core.processing +- ModelBuilder from sagemaker.serve +- Pipeline/steps from sagemaker.mlops.workflow """ import os import boto3 -import sagemaker -import sagemaker.session - -from sagemaker.estimator import Estimator -from sagemaker.inputs import TrainingInput -from sagemaker.model_metrics import ( - MetricsSource, - ModelMetrics, -) -from sagemaker.processing import ( + +from sagemaker.train.model_trainer import ModelTrainer +from sagemaker.core.training.configs import SourceCode, Compute, InputData, OutputDataConfig +from sagemaker.core.processing import FrameworkProcessor +from sagemaker.core.shapes import ( ProcessingInput, + ProcessingS3Input, ProcessingOutput, - ScriptProcessor, + ProcessingS3Output, ) -from sagemaker.sklearn.processing import SKLearnProcessor -from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo -from sagemaker.workflow.condition_step import ( - ConditionStep, -) -from sagemaker.workflow.functions import ( - JsonGet, -) -from sagemaker.workflow.parameters import ( +from sagemaker.serve.model_builder import ModelBuilder +from sagemaker.core.workflow.parameters import ( ParameterInteger, ParameterString, ) -from sagemaker.workflow.pipeline import Pipeline -from sagemaker.workflow.properties import PropertyFile -from sagemaker.workflow.steps import ( - ProcessingStep, - TrainingStep, -) -from sagemaker.workflow.model_step import ModelStep -from sagemaker.model import Model -from sagemaker.workflow.pipeline_context import PipelineSession +from sagemaker.mlops.workflow.pipeline import Pipeline +from sagemaker.mlops.workflow.steps import ProcessingStep, TrainingStep, CacheConfig +from sagemaker.mlops.workflow.model_step import ModelStep +from sagemaker.mlops.workflow.condition_step import ConditionStep +from sagemaker.core.workflow.conditions import ConditionLessThanOrEqualTo +from sagemaker.core.workflow.functions import JsonGet +from sagemaker.core.workflow.properties import PropertyFile +from sagemaker.core.workflow.pipeline_context import PipelineSession +from sagemaker.core.helper.session_helper import Session, get_execution_role +from sagemaker.core import image_uris +from sagemaker.core.model_metrics import MetricsSource, ModelMetrics BASE_DIR = os.path.dirname(os.path.realpath(__file__)) +# Workaround for SDK v3 bug: get_training_code_hash expects dependencies to be a list, +# but SourceCode.requirements is a str or None. Patch it to handle both cases. +def _patch_training_code_hash(): + from sagemaker.core.workflow import utilities + _original = utilities.get_training_code_hash + + def _patched(entry_point, source_dir, dependencies): + if dependencies is None: + dependencies = [] + elif isinstance(dependencies, str): + dependencies = [dependencies] + return _original(entry_point, source_dir, dependencies) + + utilities.get_training_code_hash = _patched + +_patch_training_code_hash() + + def get_sagemaker_client(region): - """Gets the sagemaker client. + """Gets the sagemaker client. - Args: - region: the aws region to start the session - default_bucket: the bucket to use for storing the artifacts + Args: + region: the aws region to start the session - Returns: - `sagemaker.session.Session instance - """ - boto_session = boto3.Session(region_name=region) - sagemaker_client = boto_session.client("sagemaker") - return sagemaker_client + Returns: + sagemaker client + """ + boto_session = boto3.Session(region_name=region) + return boto_session.client("sagemaker") def get_session(region, default_bucket): @@ -73,20 +86,19 @@ def get_session(region, default_bucket): default_bucket: the bucket to use for storing the artifacts Returns: - `sagemaker.session.Session instance + Session instance """ - boto_session = boto3.Session(region_name=region) - sagemaker_client = boto_session.client("sagemaker") runtime_client = boto_session.client("sagemaker-runtime") - return sagemaker.session.Session( + return Session( boto_session=boto_session, sagemaker_client=sagemaker_client, sagemaker_runtime_client=runtime_client, default_bucket=default_bucket, ) + def get_pipeline_session(region, default_bucket): """Gets the pipeline session based on the region. @@ -97,21 +109,19 @@ def get_pipeline_session(region, default_bucket): Returns: PipelineSession instance """ - boto_session = boto3.Session(region_name=region) sagemaker_client = boto_session.client("sagemaker") - return PipelineSession( boto_session=boto_session, sagemaker_client=sagemaker_client, default_bucket=default_bucket, ) + def get_pipeline_custom_tags(new_tags, region, sagemaker_project_arn=None): try: sm_client = get_sagemaker_client(region) - response = sm_client.list_tags( - ResourceArn=sagemaker_project_arn) + response = sm_client.list_tags(ResourceArn=sagemaker_project_arn) project_tags = response["Tags"] for project_tag in project_tags: new_tags.append(project_tag) @@ -130,6 +140,7 @@ def get_pipeline( base_job_prefix="Abalone", processing_instance_type="ml.m5.xlarge", training_instance_type="ml.m5.xlarge", + mlflow_tracking_arn="", ): """Gets a SageMaker ML Pipeline instance working with on abalone data. @@ -137,13 +148,15 @@ def get_pipeline( region: AWS region to create and run the pipeline. role: IAM role to create and run steps and pipeline. default_bucket: the bucket to use for storing the artifacts + mlflow_tracking_arn: Optional ARN of SageMaker managed MLflow Tracking Server. + When provided, MLflow autologging is enabled for the training step. Returns: an instance of a pipeline """ sagemaker_session = get_session(region, default_bucket) if role is None: - role = sagemaker.session.get_execution_role(sagemaker_session) + role = get_execution_role(sagemaker_session) pipeline_session = get_pipeline_session(region, default_bucket) @@ -157,105 +170,183 @@ def get_pipeline( default_value=f"s3://sagemaker-servicecatalog-seedcode-{region}/dataset/abalone-dataset.csv", ) - # processing step for feature engineering - sklearn_processor = SKLearnProcessor( - framework_version="0.23-1", + cache_config = CacheConfig(enable_caching=True, expire_after="30d") + + # --- MLflow environment variables --- + # When MLflow is enabled, pass tracking config to all pipeline steps. + # Each execution gets a unique experiment name (pipeline name + timestamp) + # so all step runs are grouped per execution in the MLflow UI. + mlflow_env = {} + if mlflow_tracking_arn: + from datetime import datetime, timezone + + execution_ts = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S") + experiment_name = f"{pipeline_name}/{execution_ts}" + mlflow_env = { + "MLFLOW_TRACKING_ARN": mlflow_tracking_arn, + "MLFLOW_EXPERIMENT_NAME": experiment_name, + } + + # --- Processing Step --- + sklearn_image_uri = image_uris.retrieve( + framework="sklearn", + region=region, + version="1.2-1", + py_version="py3", + instance_type=processing_instance_type, + ) + sklearn_processor = FrameworkProcessor( + image_uri=sklearn_image_uri, instance_type=processing_instance_type, instance_count=processing_instance_count, base_job_name=f"{base_job_prefix}/sklearn-abalone-preprocess", sagemaker_session=pipeline_session, role=role, + env=mlflow_env if mlflow_env else None, ) - step_args = sklearn_processor.run( + step_process_args = sklearn_processor.run( + code="preprocess.py", + source_dir=BASE_DIR, outputs=[ - ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), - ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), - ProcessingOutput(output_name="test", source="/opt/ml/processing/test"), + ProcessingOutput( + output_name="train", + s3_output=ProcessingS3Output( + s3_uri=f"s3://{default_bucket}/{base_job_prefix}/train", + local_path="/opt/ml/processing/train", + s3_upload_mode="EndOfJob", + ), + ), + ProcessingOutput( + output_name="validation", + s3_output=ProcessingS3Output( + s3_uri=f"s3://{default_bucket}/{base_job_prefix}/validation", + local_path="/opt/ml/processing/validation", + s3_upload_mode="EndOfJob", + ), + ), + ProcessingOutput( + output_name="test", + s3_output=ProcessingS3Output( + s3_uri=f"s3://{default_bucket}/{base_job_prefix}/test", + local_path="/opt/ml/processing/test", + s3_upload_mode="EndOfJob", + ), + ), ], - code=os.path.join(BASE_DIR, "preprocess.py"), arguments=["--input-data", input_data], ) step_process = ProcessingStep( name="PreprocessAbaloneData", - step_args=step_args, + step_args=step_process_args, + cache_config=cache_config, ) - # training step for generating model artifacts - model_path = f"s3://{sagemaker_session.default_bucket()}/{base_job_prefix}/AbaloneTrain" - image_uri = sagemaker.image_uris.retrieve( + # --- Training Step (XGBoost script mode via ModelTrainer) --- + model_path = f"s3://{default_bucket}/{base_job_prefix}/AbaloneTrain" + + xgb_image_uri = image_uris.retrieve( framework="xgboost", region=region, - version="1.0-1", + version="1.7-1", py_version="py3", instance_type=training_instance_type, ) - xgb_train = Estimator( - image_uri=image_uri, - instance_type=training_instance_type, - instance_count=1, - output_path=model_path, + + model_trainer = ModelTrainer( + training_image=xgb_image_uri, + source_code=SourceCode( + source_dir=BASE_DIR, + entry_script="train.py", + ), + compute=Compute( + instance_type=training_instance_type, + instance_count=1, + ), + hyperparameters={ + "objective": "reg:linear", + "num_round": 50, + "max_depth": 5, + "eta": 0.2, + "gamma": 4, + "min_child_weight": 7, + "subsample": 0.7, + }, + output_data_config=OutputDataConfig(s3_output_path=model_path), base_job_name=f"{base_job_prefix}/abalone-train", sagemaker_session=pipeline_session, role=role, + environment=mlflow_env if mlflow_env else None, ) - xgb_train.set_hyperparameters( - objective="reg:linear", - num_round=50, - max_depth=5, - eta=0.2, - gamma=4, - min_child_weight=6, - subsample=0.7, - silent=0, - ) - step_args = xgb_train.fit( - inputs={ - "train": TrainingInput( - s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ + train_args = model_trainer.train( + input_data_config=[ + InputData( + channel_name="train", + data_source=step_process.properties.ProcessingOutputConfig.Outputs[ "train" ].S3Output.S3Uri, content_type="text/csv", ), - "validation": TrainingInput( - s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ + InputData( + channel_name="validation", + data_source=step_process.properties.ProcessingOutputConfig.Outputs[ "validation" ].S3Output.S3Uri, content_type="text/csv", ), - }, + ], ) step_train = TrainingStep( name="TrainAbaloneModel", - step_args=step_args, + step_args=train_args, + cache_config=cache_config, ) - # processing step for evaluation - script_eval = ScriptProcessor( - image_uri=image_uri, + # --- Evaluation Step --- + script_eval = FrameworkProcessor( + image_uri=xgb_image_uri, command=["python3"], instance_type=processing_instance_type, instance_count=1, base_job_name=f"{base_job_prefix}/script-abalone-eval", sagemaker_session=pipeline_session, role=role, + env=mlflow_env if mlflow_env else None, ) - step_args = script_eval.run( + step_eval_args = script_eval.run( + code="evaluate.py", + source_dir=BASE_DIR, inputs=[ ProcessingInput( - source=step_train.properties.ModelArtifacts.S3ModelArtifacts, - destination="/opt/ml/processing/model", + input_name="model", + s3_input=ProcessingS3Input( + s3_uri=step_train.properties.ModelArtifacts.S3ModelArtifacts, + local_path="/opt/ml/processing/model", + s3_data_type="S3Prefix", + s3_input_mode="File", + ), ), ProcessingInput( - source=step_process.properties.ProcessingOutputConfig.Outputs[ - "test" - ].S3Output.S3Uri, - destination="/opt/ml/processing/test", + input_name="test", + s3_input=ProcessingS3Input( + s3_uri=step_process.properties.ProcessingOutputConfig.Outputs[ + "test" + ].S3Output.S3Uri, + local_path="/opt/ml/processing/test", + s3_data_type="S3Prefix", + s3_input_mode="File", + ), ), ], outputs=[ - ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"), + ProcessingOutput( + output_name="evaluation", + s3_output=ProcessingS3Output( + s3_uri=f"s3://{default_bucket}/{base_job_prefix}/evaluation", + local_path="/opt/ml/processing/evaluation", + s3_upload_mode="EndOfJob", + ), + ), ], - code=os.path.join(BASE_DIR, "evaluate.py"), ) evaluation_report = PropertyFile( name="AbaloneEvaluationReport", @@ -264,45 +355,45 @@ def get_pipeline( ) step_eval = ProcessingStep( name="EvaluateAbaloneModel", - step_args=step_args, + step_args=step_eval_args, property_files=[evaluation_report], + cache_config=cache_config, ) - # register model step that will be conditionally executed + # --- Model Registration Step --- model_metrics = ModelMetrics( model_statistics=MetricsSource( s3_uri="{}/evaluation.json".format( step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"] ), - content_type="application/json" + content_type="application/json", ) ) - model = Model( - image_uri=image_uri, - model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, + model_builder = ModelBuilder( + s3_model_data_url=step_train.properties.ModelArtifacts.S3ModelArtifacts, + image_uri=xgb_image_uri, sagemaker_session=pipeline_session, - role=role, - ) - step_args = model.register( - content_types=["text/csv"], - response_types=["text/csv"], - inference_instances=["ml.t2.medium", "ml.m5.large"], - transform_instances=["ml.m5.large"], - model_package_group_name=model_package_group_name, - approval_status=model_approval_status, - model_metrics=model_metrics, + role_arn=role, ) step_register = ModelStep( name="RegisterAbaloneModel", - step_args=step_args, + step_args=model_builder.register( + model_package_group_name=model_package_group_name, + content_types=["text/csv"], + response_types=["text/csv"], + inference_instances=["ml.t2.medium", "ml.m5.large"], + transform_instances=["ml.m5.large"], + approval_status=model_approval_status, + model_metrics=model_metrics, + ), ) - # condition step for evaluating model quality and branching execution + # --- Condition Step --- cond_lte = ConditionLessThanOrEqualTo( left=JsonGet( step_name=step_eval.name, property_file=evaluation_report, - json_path="regression_metrics.mse.value" + json_path="regression_metrics.mse.value", ), right=6.0, ) @@ -313,11 +404,10 @@ def get_pipeline( else_steps=[], ) - # pipeline instance + # --- Pipeline --- pipeline = Pipeline( name=pipeline_name, parameters=[ - processing_instance_type, processing_instance_count, training_instance_type, model_approval_status, @@ -326,4 +416,4 @@ def get_pipeline( steps=[step_process, step_train, step_eval, step_cond], sagemaker_session=pipeline_session, ) - return pipeline + return pipeline \ No newline at end of file diff --git a/s3_templates/mlops-github-actions/seedcode/pipelines/abalone/preprocess.py b/s3_templates/mlops-github-actions/seedcode/pipelines/abalone/preprocess.py index b1803b2a..90b98531 100644 --- a/s3_templates/mlops-github-actions/seedcode/pipelines/abalone/preprocess.py +++ b/s3_templates/mlops-github-actions/seedcode/pipelines/abalone/preprocess.py @@ -59,6 +59,10 @@ def merge_two_dicts(x, y): parser.add_argument("--input-data", type=str, required=True) args = parser.parse_args() + # MLflow child run for preprocessing + from mlflow_helper import setup_mlflow, end_mlflow + mlflow_enabled = setup_mlflow("PreprocessAbaloneData") + base_dir = "/opt/ml/processing" pathlib.Path(f"{base_dir}/data").mkdir(parents=True, exist_ok=True) input_data = args.input_data @@ -123,6 +127,21 @@ def merge_two_dicts(x, y): X, [int(0.7 * len(X)), int(0.85 * len(X))] ) + # Log dataset stats to MLflow + if mlflow_enabled: + try: + import mlflow + mlflow.log_params({ + "input_data": args.input_data, + "total_rows": len(X), + "num_features": X.shape[1] - 1, + "train_rows": len(train), + "validation_rows": len(validation), + "test_rows": len(test), + }) + except Exception as e: + logger.warning("Failed to log preprocessing params to MLflow: %s", e) + logger.info("Writing out datasets to %s.", base_dir) pd.DataFrame(train).to_csv( f"{base_dir}/train/train.csv", header=False, index=False @@ -133,3 +152,5 @@ def merge_two_dicts(x, y): pd.DataFrame(test).to_csv( f"{base_dir}/test/test.csv", header=False, index=False ) + + end_mlflow(mlflow_enabled) diff --git a/s3_templates/mlops-github-actions/seedcode/pipelines/abalone/requirements.txt b/s3_templates/mlops-github-actions/seedcode/pipelines/abalone/requirements.txt new file mode 100644 index 00000000..c5b344bb --- /dev/null +++ b/s3_templates/mlops-github-actions/seedcode/pipelines/abalone/requirements.txt @@ -0,0 +1,4 @@ +# Optional: mlflow dependencies are only needed when MLFLOW_TRACKING_ARN is set. +# The training script gracefully handles the case where mlflow is not installed. +mlflow +sagemaker-mlflow==0.2.0 diff --git a/s3_templates/mlops-github-actions/seedcode/pipelines/abalone/train.py b/s3_templates/mlops-github-actions/seedcode/pipelines/abalone/train.py new file mode 100644 index 00000000..941fc962 --- /dev/null +++ b/s3_templates/mlops-github-actions/seedcode/pipelines/abalone/train.py @@ -0,0 +1,99 @@ +"""XGBoost training script with optional MLflow autologging. + +This script runs in SageMaker's XGBoost framework container (script mode). +When MLFLOW_TRACKING_ARN is set as an environment variable, MLflow autologging +is enabled to track hyperparameters, metrics, and the trained model artifact. +""" +import argparse +import json +import logging +import os +import pickle + +import numpy as np +import pandas as pd +import xgboost as xgb + +logger = logging.getLogger() +logger.setLevel(logging.INFO) +logger.addHandler(logging.StreamHandler()) + + +def _setup_mlflow(): + """Configure MLflow tracking if MLFLOW_TRACKING_ARN is set.""" + from mlflow_helper import setup_mlflow + mlflow_enabled = setup_mlflow("TrainAbaloneModel") + if mlflow_enabled: + try: + import mlflow + mlflow.xgboost.autolog(log_models=True, log_datasets=True) + except Exception as e: + logger.warning("Failed to enable XGBoost autolog: %s", e) + return mlflow_enabled + + +def _end_mlflow(mlflow_enabled): + """End MLflow runs if tracking was enabled.""" + from mlflow_helper import end_mlflow + end_mlflow(mlflow_enabled) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + + # XGBoost hyperparameters + parser.add_argument("--max_depth", type=int, default=5) + parser.add_argument("--eta", type=float, default=0.2) + parser.add_argument("--gamma", type=int, default=4) + parser.add_argument("--min_child_weight", type=int, default=6) + parser.add_argument("--subsample", type=float, default=0.7) + parser.add_argument("--objective", type=str, default="reg:linear") + parser.add_argument("--num_round", type=int, default=50) + + # SageMaker specific arguments + parser.add_argument("--model-dir", type=str, default=os.environ.get("SM_MODEL_DIR", "/opt/ml/model")) + parser.add_argument("--train", type=str, default=os.environ.get("SM_CHANNEL_TRAIN", "/opt/ml/input/data/train")) + parser.add_argument("--validation", type=str, default=os.environ.get("SM_CHANNEL_VALIDATION", "/opt/ml/input/data/validation")) + + args = parser.parse_args() + + mlflow_enabled = _setup_mlflow() + + logger.info("Loading training data.") + train_files = [os.path.join(args.train, f) for f in os.listdir(args.train) if f.endswith(".csv")] + train_df = pd.concat([pd.read_csv(f, header=None) for f in train_files]) + y_train = train_df.iloc[:, 0].to_numpy() + X_train = train_df.iloc[:, 1:].to_numpy() + dtrain = xgb.DMatrix(X_train, label=y_train) + + logger.info("Loading validation data.") + val_files = [os.path.join(args.validation, f) for f in os.listdir(args.validation) if f.endswith(".csv")] + val_df = pd.concat([pd.read_csv(f, header=None) for f in val_files]) + y_val = val_df.iloc[:, 0].to_numpy() + X_val = val_df.iloc[:, 1:].to_numpy() + dval = xgb.DMatrix(X_val, label=y_val) + + params = { + "max_depth": args.max_depth, + "eta": args.eta, + "gamma": args.gamma, + "min_child_weight": args.min_child_weight, + "subsample": args.subsample, + "objective": args.objective, + } + + logger.info("Training XGBoost model with params: %s", json.dumps(params)) + watchlist = [(dtrain, "train"), (dval, "validation")] + model = xgb.train( + params=params, + dtrain=dtrain, + evals=watchlist, + num_boost_round=args.num_round, + ) + + # Save model artifact + model_path = os.path.join(args.model_dir, "xgboost-model") + pickle.dump(model, open(model_path, "wb")) + logger.info("Model saved to %s", model_path) + + _end_mlflow(mlflow_enabled) \ No newline at end of file diff --git a/s3_templates/mlops-github-actions/seedcode/setup.py b/s3_templates/mlops-github-actions/seedcode/setup.py index 6f701b8b..64264876 100644 --- a/s3_templates/mlops-github-actions/seedcode/setup.py +++ b/s3_templates/mlops-github-actions/seedcode/setup.py @@ -12,7 +12,7 @@ readme = f.read() -required_packages = ["sagemaker==2.93.0"] +required_packages = ["sagemaker>=3.4.0"] extras = { "test": [ "black", @@ -38,6 +38,7 @@ license=about["__license__"], packages=setuptools.find_packages(), include_package_data=True, + package_data={"pipelines.abalone": ["*.txt", "*.py"]}, python_requires=">=3.6", install_requires=required_packages, extras_require=extras, diff --git a/s3_templates/mlops-github-actions/template.yaml b/s3_templates/mlops-github-actions/template.yaml index 6e772598..14b2267e 100644 --- a/s3_templates/mlops-github-actions/template.yaml +++ b/s3_templates/mlops-github-actions/template.yaml @@ -128,7 +128,7 @@ Resources: - arm64 Environment: Variables: - DeployRepoName: !Sub ${CodeRepositoryName} + DeployRepoName: !Sub ${GitHubRepositoryOwnerName}/${CodeRepositoryName} GitHubWorkflowNameForDeployment: !Sub ${GitHubWorkflowNameForDeployment} GitHubTokenSecretName: !Sub ${GitHubTokenSecretName} Region: !Ref AWS::Region @@ -204,4 +204,4 @@ Outputs: GitHubWorkflowTriggerLambda: Description: Lambda function that triggers GitHub workflows - Value: !Ref GitHubWorkflowTriggerLambda + Value: !Ref GitHubWorkflowTriggerLambda \ No newline at end of file