diff --git a/src/emr_cli/base/EmrBase.py b/src/emr_cli/base/EmrBase.py new file mode 100644 index 0000000..0375bca --- /dev/null +++ b/src/emr_cli/base/EmrBase.py @@ -0,0 +1,27 @@ +from boto3.session import Session + + +class EMRBase: + + """ + Base class for defining authentication profile and any future common + EMR operations + + Constructor + ---------- + profile : the name of the profile as store in the aws config file + + Attributes + ---------- + aws_session : the object return by for the session initialized + using the profile passed through the extending class + + """ + + aws_session: Session + + def __init__(self, profile: str = None): + if profile: + self.aws_session = Session(profile_name=profile) + else: + self.aws_session = Session() diff --git a/src/emr_cli/deployments/emr_ec2.py b/src/emr_cli/deployments/emr_ec2.py index 163afb3..9456bb4 100644 --- a/src/emr_cli/deployments/emr_ec2.py +++ b/src/emr_cli/deployments/emr_ec2.py @@ -9,28 +9,35 @@ from botocore.exceptions import ClientError, WaiterError from emr_cli.deployments.emr_serverless import DeploymentPackage from emr_cli.utils import console_log, parse_bucket_uri, print_s3_gz +from emr_cli.base.EmrBase import EMRBase LOG_WAITER_DELAY_SEC = 30 -class Bootstrap: +class Bootstrap(EMRBase): DEFAULT_S3_POLICY_NAME = "emr-cli-S3Access" DEFAULT_GLUE_POLICY_NAME = "emr-cli-GlueAccess" + aws_session = "" + def __init__( self, + profile: str, code_bucket: str, log_bucket: str, instance_role_name: str, job_role_name: str, ): + + super.__init__(profile) + self.code_bucket = code_bucket self.log_bucket = log_bucket or code_bucket self.instance_role_name = instance_role_name self.job_role_name = job_role_name - self.s3_client = boto3.client("s3") - self.iam_client = boto3.client("iam") - self.emr_client = boto3.client("emr") + self.s3_client = self.aws_session.client("s3") + self.iam_client = self.aws_session.client("iam") + self.emr_client = self.aws_session.client("emr") def create_environment(self): self._create_s3_buckets() @@ -88,7 +95,7 @@ def _create_s3_buckets(self): for bucket_name in set([self.code_bucket, self.log_bucket]): self.s3_client.create_bucket( Bucket=bucket_name, - CreateBucketConfiguration={"LocationConstraint": self.s3_client.meta.region_name}, + CreateBucketConfiguration={"LocationConstraint": self.aws_session.region_name}, ) console_log(f"Created S3 bucket: s3://{bucket_name}") self.s3_client.put_bucket_policy(Bucket=bucket_name, Policy=self._default_s3_bucket_policy(bucket_name)) @@ -315,19 +322,22 @@ def _create_cluster(self, security_config_name: str, instance_profile_name: str) return cluster_id -class EMREC2: +class EMREC2(EMRBase): def __init__( self, cluster_id: str, deployment_package: DeploymentPackage, job_role: Optional[str] = None, - region: str = "", + profile: str = "", ) -> None: + + super().__init__(profile) + self.cluster_id = cluster_id self.dp = deployment_package self.job_role = job_role - self.client = boto3.client("emr") - self.s3_client = boto3.client("s3") + self.client = self.aws_session.client("emr") + self.s3_client = self.aws_session.client("s3") def run_job( self, diff --git a/src/emr_cli/deployments/emr_eks.py b/src/emr_cli/deployments/emr_eks.py index b05f5ad..741b159 100644 --- a/src/emr_cli/deployments/emr_eks.py +++ b/src/emr_cli/deployments/emr_eks.py @@ -8,24 +8,22 @@ import boto3 from emr_cli.deployments.emr_serverless import DeploymentPackage from emr_cli.utils import console_log, print_s3_gz +from emr_cli.base.EmrBase import EMRBase - -class EMREKS: +class EMREKS(EMRBase): def __init__( - self, virtual_cluster_id: str, job_role: str, deployment_package: DeploymentPackage, region: str = "" + self, virtual_cluster_id: str, job_role: str, deployment_package: DeploymentPackage, profile: str = None ) -> None: + + super().__init__(profile) + self.virtual_cluster_id = virtual_cluster_id self.job_role = job_role self.dp = deployment_package self.s3_client = boto3.client("s3") - if region: - self.client = boto3.client("emr-containers", region_name=region) - self.emr_client = boto3.client("emr", region_name=region) - else: - # Note that boto3 uses AWS_DEFAULT_REGION, not AWS_REGION - # We may want to add an extra check here for the latter. - self.client = boto3.client("emr-containers") - self.emr_client = boto3.client("emr") + + self.client = self.aws_session.client("emr-containers") + self.emr_client = self.aws_session.client("emr") def fetch_latest_release_label(self): response = self.emr_client.list_release_labels( diff --git a/src/emr_cli/deployments/emr_serverless.py b/src/emr_cli/deployments/emr_serverless.py index 66c522d..5ddb031 100644 --- a/src/emr_cli/deployments/emr_serverless.py +++ b/src/emr_cli/deployments/emr_serverless.py @@ -10,9 +10,12 @@ import boto3 from emr_cli.deployments import SparkParams from emr_cli.utils import console_log, find_files, mkdir, print_s3_gz - +from emr_cli.base.EmrBase import EMRBase class DeploymentPackage(metaclass=abc.ABCMeta): + + aws_session = "" + def __init__(self, entry_point_path: str = "entrypoint.py", s3_target_uri: str = "") -> None: self.entry_point_path = entry_point_path self.dist_dir = "dist" @@ -48,18 +51,23 @@ def _zip_local_pyfiles(self): zf.write(file, relpath) -class Bootstrap: +class Bootstrap(EMRBase): # Maybe add some UUIDs to these? DEFAULT_S3_POLICY_NAME = "emr-cli-S3Access" DEFAULT_GLUE_POLICY_NAME = "emr-cli-GlueAccess" - def __init__(self, code_bucket: str, log_bucket: str, job_role_name: str): + def __init__(self, profile: str, code_bucket: str, log_bucket: str, job_role_name: str): + + super.__init__(profile) + + aws_session = self.aws_session + self.code_bucket = code_bucket self.log_bucket = log_bucket or code_bucket self.job_role_name = job_role_name - self.s3_client = boto3.client("s3") - self.iam_client = boto3.client("iam") - self.emrs_client = boto3.client("emr-serverless") + self.s3_client = aws_session.client("s3") + self.iam_client = aws_session.client("iam") + self.emrs_client = aws_session.client("emr-serverless") def create_environment(self): self._create_s3_buckets() @@ -94,7 +102,7 @@ def _create_s3_buckets(self): self.s3_client.create_bucket( Bucket=bucket_name, CreateBucketConfiguration={ - "LocationConstraint": self.s3_client.meta.region_name # type: ignore + "LocationConstraint": self.aws_session.region_name # type: ignore }, ) @@ -218,24 +226,24 @@ def _create_application(self): return app_id -class EMRServerless: +class EMRServerless(EMRBase): def __init__( self, application_id: str, job_role: str, deployment_package: DeploymentPackage, - region: str = "", + profile: str = "" ) -> None: + + super().__init__(profile) + self.application_id = application_id self.job_role = job_role self.dp = deployment_package - self.s3_client = boto3.client("s3") - if region: - self.client = boto3.client("emr-serverless", region_name=region) - else: - # Note that boto3 uses AWS_DEFAULT_REGION, not AWS_REGION - # We may want to add an extra check here for the latter. - self.client = boto3.client("emr-serverless") + + self.s3_client = self.aws_session.client("s3") + + self.client = self.aws_session.client("emr-serverless") def run_job( self, diff --git a/src/emr_cli/emr_cli.py b/src/emr_cli/emr_cli.py index 1e7e15f..66921ea 100644 --- a/src/emr_cli/emr_cli.py +++ b/src/emr_cli/emr_cli.py @@ -70,7 +70,11 @@ def status(project): is_flag=True, help="Prints the commands necessary to destroy the created environment.", ) -def bootstrap(target, code_bucket, logs_bucket, instance_profile_name, job_role_name, destroy): +@click.option( + "--profile", + help="The AWS profile to use for bootstraping the environment.", +) +def bootstrap(target, code_bucket, logs_bucket, instance_profile_name, job_role_name, destroy, profile): """ Bootstrap an EMR Serverless environment. @@ -82,9 +86,9 @@ def bootstrap(target, code_bucket, logs_bucket, instance_profile_name, job_role_ raise click.BadArgumentUsage("EMR on EC2 clusters require --instance-profile-name to be set.") if target == "emr-serverless": - b = BootstrapEMRServerless(code_bucket, logs_bucket, job_role_name) + b = BootstrapEMRServerless(profile, code_bucket, logs_bucket, job_role_name) else: - b = BootstrapEMRonEC2(code_bucket, logs_bucket, instance_profile_name, job_role_name) + b = BootstrapEMRonEC2(profile, code_bucket, logs_bucket, instance_profile_name, job_role_name) resource_id = "application_id" if target == "emr-serverless" else "cluster_id" if destroy: @@ -161,13 +165,17 @@ def package(project, entry_point): help="Where to copy code artifacts to", required=True, ) +@click.option( + "--profile", + help="The AWS profile to use to upload the package to S3 bucket", +) @click.pass_obj -def deploy(project, entry_point, s3_code_uri): +def deploy(project, entry_point, s3_code_uri, profile): """ Copy a local project to S3. """ p = project(entry_point) - p.deploy(s3_code_uri) + p.deploy(s3_code_uri, profile) @click.command() @@ -220,6 +228,10 @@ def deploy(project, entry_point, s3_code_uri): default=720, # set to AWS default value (12 hours in minutes) type=int ) +@click.option( + "--profile", + help="The AWS profile to use for starting the job execution", +) @click.pass_obj @click.pass_context def run( @@ -241,6 +253,7 @@ def run( save_config, emr_eks_release_label, emr_serverless_timeout, + profile ): """ Run a project on EMR, optionally build and deploy @@ -280,7 +293,7 @@ def run( if build: p.build() - p.deploy(s3_code_uri) + p.deploy(s3_code_uri, profile) if any([application_id, virtual_cluster_id]): # We require entry-point and job-role @@ -296,21 +309,21 @@ def run( if application_id is not None: if job_args: job_args = job_args.split(",") - emrs = EMRServerless(application_id, job_role, p) + emrs = EMRServerless(application_id, job_role, p, profile=profile) emrs.run_job(job_name, job_args, spark_submit_opts, wait, show_stdout, s3_logs_uri, emr_serverless_timeout) # cluster_id indicates EMR on EC2 job if cluster_id is not None: if job_args: job_args = job_args.split(",") - emr = EMREC2(cluster_id, p, job_role) + emr = EMREC2(cluster_id, p, job_role, profile=profile) emr.run_job(job_name, job_args, spark_submit_opts, wait, show_stdout) # virtual_cluster_id is EMR on EKS if virtual_cluster_id is not None: if job_args: job_args = job_args.split(",") - emreks = EMREKS(virtual_cluster_id, job_role, p) + emreks = EMREKS(virtual_cluster_id, job_role, p, profile=profile) emreks.run_job(job_name, job_args, spark_submit_opts, wait, show_stdout, s3_logs_uri, emr_eks_release_label) diff --git a/src/emr_cli/packaging/python_files_project.py b/src/emr_cli/packaging/python_files_project.py index aea4a2e..b8eec87 100644 --- a/src/emr_cli/packaging/python_files_project.py +++ b/src/emr_cli/packaging/python_files_project.py @@ -34,11 +34,19 @@ def build(self): relpath = os.path.relpath(file, cwd) zf.write(file, relpath) - def deploy(self, s3_code_uri: str) -> str: + def deploy(self, s3_code_uri: str, profile: str=None) -> str: """ Copies local code to S3 and returns the path to the uploaded entrypoint """ - s3_client = boto3.client("s3") + aws_session = "" + + if profile: + aws_session = boto3.session.Session(profile_name=profile) + else: + aws_session = boto3.session.Session() + + s3_client = aws_session.client("s3") + bucket, prefix = parse_bucket_uri(s3_code_uri) filename = os.path.basename(self.entry_point_path) diff --git a/src/emr_cli/packaging/python_poetry_project.py b/src/emr_cli/packaging/python_poetry_project.py index 2f02276..680b8f7 100644 --- a/src/emr_cli/packaging/python_poetry_project.py +++ b/src/emr_cli/packaging/python_poetry_project.py @@ -72,11 +72,19 @@ def _dockerfile_path(self) -> str: ) return os.path.join(templates, "Dockerfile") - def deploy(self, s3_code_uri: str) -> str: + def deploy(self, s3_code_uri: str, profile: str=None) -> str: """ Copies local code to S3 and returns the path to the uploaded entrypoint """ - s3_client = boto3.client("s3") + aws_session = "" + + if profile: + aws_session = boto3.session.Session(profile_name=profile) + else: + aws_session = boto3.session.Session() + + s3_client = aws_session.client("s3") + bucket, prefix = self._parse_bucket_uri(s3_code_uri) filename = os.path.basename(self.entry_point_path) diff --git a/src/emr_cli/packaging/python_project.py b/src/emr_cli/packaging/python_project.py index 360fe3f..38a7f29 100644 --- a/src/emr_cli/packaging/python_project.py +++ b/src/emr_cli/packaging/python_project.py @@ -71,12 +71,21 @@ def _run_docker_build(self, output_dir: str): env=dict(os.environ, DOCKER_BUILDKIT="1"), ) - def deploy(self, s3_code_uri: str) -> str: + def deploy(self, s3_code_uri: str, profile: str = None) -> str: """ Copies local code to S3 and returns the path to the uploaded entrypoint """ self.s3_uri_base = s3_code_uri - s3_client = boto3.client("s3") + + aws_session = "" + + if profile: + aws_session = boto3.session.Session(profile_name=profile) + else: + aws_session = boto3.session.Session() + + s3_client = aws_session.client("s3") + bucket, prefix = parse_bucket_uri(self.s3_uri_base) filename = os.path.basename(self.entry_point_path) diff --git a/src/emr_cli/packaging/simple_project.py b/src/emr_cli/packaging/simple_project.py index e066be7..297f9fc 100644 --- a/src/emr_cli/packaging/simple_project.py +++ b/src/emr_cli/packaging/simple_project.py @@ -15,11 +15,20 @@ class SimpleProject(DeploymentPackage): def build(self): pass - def deploy(self, s3_code_uri: str) -> str: + def deploy(self, s3_code_uri: str, profile: str = None) -> str: """ Copies local code to S3 and returns the path to the uploaded entrypoint """ - s3_client = boto3.client("s3") + + aws_session = "" + + if profile: + aws_session = boto3.session.Session(profile_name=profile) + else: + aws_session = boto3.session.Session() + + s3_client = aws_session.client("s3") + bucket, prefix = parse_bucket_uri(s3_code_uri) filename = os.path.basename(self.entry_point_path)