From b3018620ef8bc52fdc2735718eae85cfd6097c58 Mon Sep 17 00:00:00 2001 From: mouhib Date: Mon, 6 Mar 2023 10:30:30 +0000 Subject: [PATCH 01/18] Add aws profile arg to bootstrap command --- src/emr_cli/base/EmrBase.py | 24 +++++++++++++++++++ src/emr_cli/deployments/emr_serverless.py | 28 ++++++++++++++++------- src/emr_cli/emr_cli.py | 11 ++++++--- 3 files changed, 52 insertions(+), 11 deletions(-) create mode 100644 src/emr_cli/base/EmrBase.py diff --git a/src/emr_cli/base/EmrBase.py b/src/emr_cli/base/EmrBase.py new file mode 100644 index 0000000..5456d70 --- /dev/null +++ b/src/emr_cli/base/EmrBase.py @@ -0,0 +1,24 @@ +import boto3 + +class EmrBase: + + """ + Base class for defining authentication profile and any future common + EMR operations + + Constructor + ---------- + profile : the name of the profile as store in the aws config file + + Attributes + ---------- + aws_session : the object return by for the session initialized + using the profile passed through the extending class + + """ + + aws_session = "" + + def __init__(self, profile): + + self.aws_session = boto3.session.Session(profile_name=profile) diff --git a/src/emr_cli/deployments/emr_serverless.py b/src/emr_cli/deployments/emr_serverless.py index 711792f..cf2dc74 100644 --- a/src/emr_cli/deployments/emr_serverless.py +++ b/src/emr_cli/deployments/emr_serverless.py @@ -5,6 +5,7 @@ import zipfile from time import sleep from typing import List, Optional +from emr_cli.base.EmrBase import EmrBase import boto3 @@ -50,17 +51,21 @@ def _zip_local_pyfiles(self): zf.write(file, relpath) -class Bootstrap: +class Bootstrap (EmrBase): DEFAULT_S3_POLICY_NAME = "emr-cli-S3Access" DEFAULT_GLUE_POLICY_NAME = "emr-cli-GlueAccess" - def __init__(self, code_bucket: str, log_bucket: str, job_role_name: str): + def __init__(self, profile: str, code_bucket: str, log_bucket: str, job_role_name: str): + super().__init__(profile) + + aws_session = self.aws_session + self.code_bucket = code_bucket self.log_bucket = log_bucket or code_bucket self.job_role_name = job_role_name - self.s3_client = boto3.client("s3") - self.iam_client = boto3.client("iam") - self.emrs_client = boto3.client("emr-serverless") + self.s3_client = aws_session.client("s3") + self.iam_client = aws_session.client("iam") + self.emrs_client = aws_session.client("emr-serverless") def create_environment(self): self._create_s3_buckets() @@ -199,23 +204,30 @@ def _create_application(self): return app_id -class EMRServerless: +class EMRServerless (EmrBase): def __init__( self, application_id: str, job_role: str, deployment_package: DeploymentPackage, region: str = "", + profile: str = "" ) -> None: + + super().__init__(profile) + self.application_id = application_id self.job_role = job_role self.dp = deployment_package + + aws_session = self.aws_session + if region: - self.client = boto3.client("emr-serverless", region_name=region) + self.client = aws_session.client("emr-serverless", region_name=region) else: # Note that boto3 uses AWS_DEFAULT_REGION, not AWS_REGION # We may want to add an extra check here for the latter. - self.client = boto3.client("emr-serverless") + self.client = aws_session.client("emr-serverless") def run_job( self, diff --git a/src/emr_cli/emr_cli.py b/src/emr_cli/emr_cli.py index 4506920..b2f36fc 100644 --- a/src/emr_cli/emr_cli.py +++ b/src/emr_cli/emr_cli.py @@ -30,6 +30,11 @@ def status(project): @click.command() +@click.option( + "--profile", + help="The AWS profile to use for bootstraping the environment.", + required=True, +) @click.option( "--target", type=click.Choice(["emr-serverless"]), @@ -53,7 +58,7 @@ def status(project): is_flag=True, help="Prints the commands necessary to destroy the created environment.", ) -def bootstrap(target, code_bucket, logs_bucket, job_role_name, destroy): +def bootstrap(profile, target, code_bucket, logs_bucket, job_role_name, destroy): """ Bootstrap an EMR Serverless environment. @@ -62,12 +67,12 @@ def bootstrap(target, code_bucket, logs_bucket, job_role_name, destroy): """ if destroy: c = ConfigReader.read() - b = Bootstrap(code_bucket, logs_bucket, job_role_name) + b = Bootstrap(profile, code_bucket, logs_bucket, job_role_name) b.print_destroy_commands(c.get("run", {}).get("application_id", None)) exit(0) # For EMR Serverless, we need to create an S3 bucket, a job role, and an Application - b = Bootstrap(code_bucket, logs_bucket, job_role_name) + b = Bootstrap(profile, code_bucket, logs_bucket, job_role_name) config = b.create_environment() # The resulting config is relevant for the "run" command From c208f1e54c9d34d9aabae9bc893bb7961610b738 Mon Sep 17 00:00:00 2001 From: mouhib Date: Mon, 6 Mar 2023 19:01:18 +0000 Subject: [PATCH 02/18] remove the profile requirement in args --- src/emr_cli/emr_cli.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/emr_cli/emr_cli.py b/src/emr_cli/emr_cli.py index b2f36fc..f574c9c 100644 --- a/src/emr_cli/emr_cli.py +++ b/src/emr_cli/emr_cli.py @@ -33,7 +33,6 @@ def status(project): @click.option( "--profile", help="The AWS profile to use for bootstraping the environment.", - required=True, ) @click.option( "--target", From f5cc91954d629590012bb943e2801778ecfcc350 Mon Sep 17 00:00:00 2001 From: mouhib Date: Mon, 6 Mar 2023 19:02:54 +0000 Subject: [PATCH 03/18] add profile to run command --- src/emr_cli/emr_cli.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/emr_cli/emr_cli.py b/src/emr_cli/emr_cli.py index f574c9c..2c681e4 100644 --- a/src/emr_cli/emr_cli.py +++ b/src/emr_cli/emr_cli.py @@ -149,6 +149,10 @@ def deploy(project, entry_point, s3_code_uri): @click.command() +@click.option( + "--profile", + help="The AWS profile to use for bootstraping the environment.", +) @click.option("--application-id", help="EMR Serverless Application ID") @click.option("--cluster-id", help="EMR on EC2 Cluster ID") @click.option( From b17a1e6d20974764f23a553bcd02a645e9e461f2 Mon Sep 17 00:00:00 2001 From: mouhib Date: Tue, 7 Mar 2023 16:46:17 +0000 Subject: [PATCH 04/18] Fix the s3 to use regional endpoint --- src/emr_cli/deployments/emr_serverless.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/emr_cli/deployments/emr_serverless.py b/src/emr_cli/deployments/emr_serverless.py index cf2dc74..1d1492f 100644 --- a/src/emr_cli/deployments/emr_serverless.py +++ b/src/emr_cli/deployments/emr_serverless.py @@ -96,7 +96,8 @@ def _create_s3_buckets(self): Creates both the source and log buckets if they don't already exist. """ for bucket_name in set([self.code_bucket, self.log_bucket]): - self.s3_client.create_bucket(Bucket=bucket_name) + + self.s3_client.create_bucket(Bucket=bucket_name, CreateBucketConfiguration= {'LocationConstraint': self.aws_session.region_name}) console_log(f"Created S3 bucket: s3://{bucket_name}") def _create_job_role(self): From bf80959c04033f9945a32a30b77477b484ac40a2 Mon Sep 17 00:00:00 2001 From: mouhib Date: Tue, 18 Apr 2023 15:26:11 +0100 Subject: [PATCH 05/18] Add support for auth method in EMR-EC2 --- src/emr_cli/deployments/emr_ec2.py | 21 +++++++++++++++++---- src/emr_cli/deployments/emr_serverless.py | 2 +- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/src/emr_cli/deployments/emr_ec2.py b/src/emr_cli/deployments/emr_ec2.py index 6cd8c9f..56a579e 100644 --- a/src/emr_cli/deployments/emr_ec2.py +++ b/src/emr_cli/deployments/emr_ec2.py @@ -7,18 +7,31 @@ from botocore.exceptions import ClientError, WaiterError from emr_cli.deployments.emr_serverless import DeploymentPackage from emr_cli.utils import console_log, parse_bucket_uri +from emr_cli.base.EmrBase import EmrBase LOG_WAITER_DELAY_SEC = 30 -class EMREC2: +class EMREC2 (EmrBase): def __init__( - self, cluster_id: str, deployment_package: DeploymentPackage, region: str = "" + self, cluster_id: str, deployment_package: DeploymentPackage, region: str = "", profile: str = "" ) -> None: + + super().__init__(profile) + + aws_session = self.aws_session + + if region: + self.client = aws_session.client("emr", region_name=region) + else: + # Note that boto3 uses AWS_DEFAULT_REGION, not AWS_REGION + # We may want to add an extra check here for the latter. + self.client = aws_session.client("emr") + self.cluster_id = cluster_id self.dp = deployment_package - self.client = boto3.client("emr") - self.s3_client = boto3.client("s3") + + self.s3_client = aws_session.client("s3") def run_job( self, diff --git a/src/emr_cli/deployments/emr_serverless.py b/src/emr_cli/deployments/emr_serverless.py index 1d1492f..ca420cd 100644 --- a/src/emr_cli/deployments/emr_serverless.py +++ b/src/emr_cli/deployments/emr_serverless.py @@ -5,12 +5,12 @@ import zipfile from time import sleep from typing import List, Optional -from emr_cli.base.EmrBase import EmrBase import boto3 from emr_cli.deployments import SparkParams from emr_cli.utils import console_log, find_files, mkdir +from emr_cli.base.EmrBase import EmrBase class DeploymentPackage(metaclass=abc.ABCMeta): From af6a7be2060bbc84d0a6f46d14e823944c84b3b6 Mon Sep 17 00:00:00 2001 From: mouhib Date: Tue, 18 Apr 2023 16:12:52 +0100 Subject: [PATCH 06/18] Fix boto session init when no profil is provider --- src/emr_cli/base/EmrBase.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/emr_cli/base/EmrBase.py b/src/emr_cli/base/EmrBase.py index 5456d70..e89603d 100644 --- a/src/emr_cli/base/EmrBase.py +++ b/src/emr_cli/base/EmrBase.py @@ -20,5 +20,8 @@ class EmrBase: aws_session = "" def __init__(self, profile): - - self.aws_session = boto3.session.Session(profile_name=profile) + + if profile: + self.aws_session = boto3.session.Session(profile_name=profile) + else: + self.aws_session = boto3.session.Session() From f24c41d8ba5d6364ca2eb0b8b32e464c0e4c6e8b Mon Sep 17 00:00:00 2001 From: mouhib Date: Wed, 19 Apr 2023 15:35:43 +0100 Subject: [PATCH 07/18] Add profile to `deploy` command --- src/emr_cli/deployments/emr_serverless.py | 10 +++++++++- src/emr_cli/emr_cli.py | 8 ++++++-- src/emr_cli/packaging/python_files_project.py | 11 ++++++++--- src/emr_cli/packaging/python_poetry_project.py | 12 +++++++++--- src/emr_cli/packaging/python_project.py | 12 +++++++++--- src/emr_cli/packaging/simple_project.py | 10 ++++++++-- 6 files changed, 49 insertions(+), 14 deletions(-) diff --git a/src/emr_cli/deployments/emr_serverless.py b/src/emr_cli/deployments/emr_serverless.py index ca420cd..47abe45 100644 --- a/src/emr_cli/deployments/emr_serverless.py +++ b/src/emr_cli/deployments/emr_serverless.py @@ -14,14 +14,22 @@ class DeploymentPackage(metaclass=abc.ABCMeta): + + aws_session = "" + def __init__( - self, entry_point_path: str = "entrypoint.py", s3_target_uri: str = "" + self, entry_point_path: str = "entrypoint.py", s3_target_uri: str = "", profile: str = None ) -> None: self.entry_point_path = entry_point_path self.dist_dir = "dist" # We might not populate this until we actually deploy self.s3_uri_base = s3_target_uri + + if profile: + self.aws_session = boto3.session.Session(profile_name=profile) + else: + self.aws_session = boto3.session.Session() def spark_submit_parameters(self) -> SparkParams: """ diff --git a/src/emr_cli/emr_cli.py b/src/emr_cli/emr_cli.py index 2c681e4..1d0af78 100644 --- a/src/emr_cli/emr_cli.py +++ b/src/emr_cli/emr_cli.py @@ -139,12 +139,16 @@ def package(project, entry_point): help="Where to copy code artifacts to", required=True, ) +@click.option( + "--profile", + help="The AWS profile to use to upload the package to S3 bucket", +) @click.pass_obj -def deploy(project, entry_point, s3_code_uri): +def deploy(project, entry_point, s3_code_uri, profile): """ Copy a local project to S3. """ - p = project(entry_point) + p = project(entry_point, profile) p.deploy(s3_code_uri) diff --git a/src/emr_cli/packaging/python_files_project.py b/src/emr_cli/packaging/python_files_project.py index 4e3714d..f23f316 100644 --- a/src/emr_cli/packaging/python_files_project.py +++ b/src/emr_cli/packaging/python_files_project.py @@ -9,6 +9,12 @@ class PythonFilesProject(DeploymentPackage): + + def __init__(self, profile: str = None): + super().__init__(profile) + + self.s3_client = self.aws_session.client("s3") + """ A PythonFilesProject is a simple project that includes multiple `.py` files. @@ -33,14 +39,13 @@ def deploy(self, s3_code_uri: str) -> str: """ Copies local code to S3 and returns the path to the uploaded entrypoint """ - s3_client = boto3.client("s3") bucket, prefix = parse_bucket_uri(s3_code_uri) filename = os.path.basename(self.entry_point_path) console_log(f"Deploying {filename} and local python modules to {s3_code_uri}") - s3_client.upload_file(self.entry_point_path, bucket, f"{prefix}/{filename}") - s3_client.upload_file( + self.s3_client.upload_file(self.entry_point_path, bucket, f"{prefix}/{filename}") + self.s3_client.upload_file( f"{self.dist_dir}/pyfiles.zip", bucket, f"{prefix}/pyfiles.zip" ) diff --git a/src/emr_cli/packaging/python_poetry_project.py b/src/emr_cli/packaging/python_poetry_project.py index dbfedce..39ced19 100644 --- a/src/emr_cli/packaging/python_poetry_project.py +++ b/src/emr_cli/packaging/python_poetry_project.py @@ -13,6 +13,13 @@ class PythonPoetryProject(DeploymentPackage): + + + def __init__(self, profile: str = None): + super().__init__(profile) + + self.s3_client = self.aws_session.client("s3") + def initialize(self, target_dir: str = os.getcwd()): """ Initializes a poetry-based pyspark project in the provided directory. @@ -71,16 +78,15 @@ def deploy(self, s3_code_uri: str) -> str: """ Copies local code to S3 and returns the path to the uploaded entrypoint """ - s3_client = boto3.client("s3") bucket, prefix = self._parse_bucket_uri(s3_code_uri) filename = os.path.basename(self.entry_point_path) console_log(f"Deploying {filename} and dependencies to {s3_code_uri}") - s3_client.upload_file( + self.s3_client.upload_file( self.entry_point_path, bucket, os.path.join(prefix, filename) ) - s3_client.upload_file( + self.s3_client.upload_file( os.path.join(self.dist_dir, "pyspark_deps.tar.gz"), bucket, os.path.join(prefix, "pyspark_deps.tar.gz"), diff --git a/src/emr_cli/packaging/python_project.py b/src/emr_cli/packaging/python_project.py index fdc1a02..37216b0 100644 --- a/src/emr_cli/packaging/python_project.py +++ b/src/emr_cli/packaging/python_project.py @@ -12,6 +12,12 @@ class PythonProject(DeploymentPackage): + + def __init__(self, profile: str = None): + super().__init__(profile) + + self.s3_client = self.aws_session.client("s3") + def initialize(self, target_dir: str = os.getcwd()): """ Initializes a pyspark project in the provided directory. @@ -70,14 +76,14 @@ def deploy(self, s3_code_uri: str) -> str: Copies local code to S3 and returns the path to the uploaded entrypoint """ self.s3_uri_base = s3_code_uri - s3_client = boto3.client("s3") + bucket, prefix = parse_bucket_uri(self.s3_uri_base) filename = os.path.basename(self.entry_point_path) console_log(f"Deploying {filename} and dependencies to {self.s3_uri_base}") - s3_client.upload_file(self.entry_point_path, bucket, f"{prefix}/{filename}") - s3_client.upload_file( + self.s3_client.upload_file(self.entry_point_path, bucket, f"{prefix}/{filename}") + self.s3_client.upload_file( f"{self.dist_dir}/pyspark_deps.tar.gz", bucket, f"{prefix}/pyspark_deps.tar.gz", diff --git a/src/emr_cli/packaging/simple_project.py b/src/emr_cli/packaging/simple_project.py index 198b3ed..acb4ff0 100644 --- a/src/emr_cli/packaging/simple_project.py +++ b/src/emr_cli/packaging/simple_project.py @@ -7,6 +7,12 @@ class SimpleProject(DeploymentPackage): + + def __init__(self, profile: str = None): + super().__init__(profile) + + self.s3_client = self.aws_session.client("s3") + """ A simple project only has a single entry point file. This can be a pyspark file or packaged jar file. @@ -19,12 +25,12 @@ def deploy(self, s3_code_uri: str) -> str: """ Copies local code to S3 and returns the path to the uploaded entrypoint """ - s3_client = boto3.client("s3") + bucket, prefix = parse_bucket_uri(s3_code_uri) filename = os.path.basename(self.entry_point_path) console_log(f"Deploying {filename} to {s3_code_uri}") - s3_client.upload_file(self.entry_point_path, bucket, f"{prefix}/{filename}") + self.s3_client.upload_file(self.entry_point_path, bucket, f"{prefix}/{filename}") return f"s3://{bucket}/{prefix}/{filename}" From ed1c4ac45b76f37a0dc038f416583329205cc8cf Mon Sep 17 00:00:00 2001 From: mouhib Date: Wed, 19 Apr 2023 18:38:55 +0100 Subject: [PATCH 08/18] Fix issue with profile definition in EMREC2 constructor --- src/emr_cli/deployments/emr_ec2.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emr_cli/deployments/emr_ec2.py b/src/emr_cli/deployments/emr_ec2.py index 56a579e..4aba7e3 100644 --- a/src/emr_cli/deployments/emr_ec2.py +++ b/src/emr_cli/deployments/emr_ec2.py @@ -14,7 +14,7 @@ class EMREC2 (EmrBase): def __init__( - self, cluster_id: str, deployment_package: DeploymentPackage, region: str = "", profile: str = "" + self, cluster_id: str, deployment_package: DeploymentPackage, region: str = "", profile: str = None ) -> None: super().__init__(profile) From ab3c39b5383e93d33a169c5c70615a04052d2f7b Mon Sep 17 00:00:00 2001 From: mouhib Date: Wed, 19 Apr 2023 18:53:48 +0100 Subject: [PATCH 09/18] modify env variable in github action --- .github/workflows/unit-tests.yaml | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/.github/workflows/unit-tests.yaml b/.github/workflows/unit-tests.yaml index 45f98fa..c3d4f23 100644 --- a/.github/workflows/unit-tests.yaml +++ b/.github/workflows/unit-tests.yaml @@ -1,7 +1,5 @@ name: Spark Job Unit Tests -on: [push] -env: - AWS_DEFAULT_REGION: us-east-1 +on: [push] jobs: pytest: strategy: From 92fce6ffab2450771a1d9a7cb7b7df85e93fe3d0 Mon Sep 17 00:00:00 2001 From: mouhib Date: Wed, 19 Apr 2023 19:02:17 +0100 Subject: [PATCH 10/18] fix the dumy env variables --- .github/workflows/unit-tests.yaml | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/.github/workflows/unit-tests.yaml b/.github/workflows/unit-tests.yaml index c3d4f23..41be520 100644 --- a/.github/workflows/unit-tests.yaml +++ b/.github/workflows/unit-tests.yaml @@ -1,5 +1,10 @@ name: Spark Job Unit Tests -on: [push] +on: [push] +env: + AWS_DEFAULT_REGION: ${{ env.AWS_DEFAULT_REGION }} + AWS_ACCESS_KEY_ID: ${{ env.AWS_ACCESS_KEY_ID }} + AWS_SECRET_ACCESS_KEY: ${{ env.AWS_SECRET_ACCESS_KEY }} + jobs: pytest: strategy: @@ -47,6 +52,12 @@ jobs: name: Install project run: | poetry install --no-interaction + - + name: create dumy credentials + uses: "finnp/create-file-action@master" + env: + FILE_NAME: "~/.aws/config" + FILE_BASE64: "W2RlZmF1bHRdDQpyZWdpb24gPSBldS13ZXN0LTE=" - name: Run tests run: | From 43db9b81a1814908bfa8ec209a8456a4c1358bc3 Mon Sep 17 00:00:00 2001 From: mouhib Date: Wed, 19 Apr 2023 19:08:05 +0100 Subject: [PATCH 11/18] fix the dumy github action --- .github/workflows/unit-tests.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/unit-tests.yaml b/.github/workflows/unit-tests.yaml index 41be520..c06c1bd 100644 --- a/.github/workflows/unit-tests.yaml +++ b/.github/workflows/unit-tests.yaml @@ -1,9 +1,9 @@ name: Spark Job Unit Tests on: [push] env: - AWS_DEFAULT_REGION: ${{ env.AWS_DEFAULT_REGION }} - AWS_ACCESS_KEY_ID: ${{ env.AWS_ACCESS_KEY_ID }} - AWS_SECRET_ACCESS_KEY: ${{ env.AWS_SECRET_ACCESS_KEY }} + AWS_DEFAULT_REGION: ${{ vars.AWS_DEFAULT_REGION }} + AWS_ACCESS_KEY_ID: ${{ vars.AWS_ACCESS_KEY_ID }} + AWS_SECRET_ACCESS_KEY: ${{ vars.AWS_SECRET_ACCESS_KEY }} jobs: pytest: From eeb6ccfb06ac4f478e52d5f311757067aab5a148 Mon Sep 17 00:00:00 2001 From: mouhib Date: Wed, 19 Apr 2023 19:19:24 +0100 Subject: [PATCH 12/18] clean up --- tests/packaging/test_python_files_project.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/packaging/test_python_files_project.py b/tests/packaging/test_python_files_project.py index 6ec47fa..6357c59 100644 --- a/tests/packaging/test_python_files_project.py +++ b/tests/packaging/test_python_files_project.py @@ -6,6 +6,7 @@ class TestPythonFilesProject: def test_build(self, fs): + fs.create_file("main.py") fs.create_file("lib/file1.py") fs.create_file("lib/file2.py") From 4e0eab34d720e7a5f703006167cbb1ed5328ec96 Mon Sep 17 00:00:00 2001 From: mouhib Date: Wed, 19 Apr 2023 19:24:25 +0100 Subject: [PATCH 13/18] remove dummy config file --- .github/workflows/unit-tests.yaml | 6 ------ 1 file changed, 6 deletions(-) diff --git a/.github/workflows/unit-tests.yaml b/.github/workflows/unit-tests.yaml index c06c1bd..2ffdf4a 100644 --- a/.github/workflows/unit-tests.yaml +++ b/.github/workflows/unit-tests.yaml @@ -52,12 +52,6 @@ jobs: name: Install project run: | poetry install --no-interaction - - - name: create dumy credentials - uses: "finnp/create-file-action@master" - env: - FILE_NAME: "~/.aws/config" - FILE_BASE64: "W2RlZmF1bHRdDQpyZWdpb24gPSBldS13ZXN0LTE=" - name: Run tests run: | From ccd64d8f0bdf4983eb1f05982af301deffafafcb Mon Sep 17 00:00:00 2001 From: mouhib Date: Wed, 19 Apr 2023 22:15:21 +0100 Subject: [PATCH 14/18] fix the init of session with profile for deploy command --- src/emr_cli/deployments/emr_serverless.py | 7 +------ src/emr_cli/emr_cli.py | 4 ++-- src/emr_cli/packaging/python_files_project.py | 20 +++++++++++-------- .../packaging/python_poetry_project.py | 19 ++++++++++-------- src/emr_cli/packaging/python_project.py | 20 +++++++++++-------- src/emr_cli/packaging/simple_project.py | 13 ++++++++++-- 6 files changed, 49 insertions(+), 34 deletions(-) diff --git a/src/emr_cli/deployments/emr_serverless.py b/src/emr_cli/deployments/emr_serverless.py index 47abe45..5ee5a7e 100644 --- a/src/emr_cli/deployments/emr_serverless.py +++ b/src/emr_cli/deployments/emr_serverless.py @@ -18,18 +18,13 @@ class DeploymentPackage(metaclass=abc.ABCMeta): aws_session = "" def __init__( - self, entry_point_path: str = "entrypoint.py", s3_target_uri: str = "", profile: str = None + self, entry_point_path: str = "entrypoint.py", s3_target_uri: str = "" ) -> None: self.entry_point_path = entry_point_path self.dist_dir = "dist" # We might not populate this until we actually deploy self.s3_uri_base = s3_target_uri - - if profile: - self.aws_session = boto3.session.Session(profile_name=profile) - else: - self.aws_session = boto3.session.Session() def spark_submit_parameters(self) -> SparkParams: """ diff --git a/src/emr_cli/emr_cli.py b/src/emr_cli/emr_cli.py index 1d0af78..9793b92 100644 --- a/src/emr_cli/emr_cli.py +++ b/src/emr_cli/emr_cli.py @@ -148,8 +148,8 @@ def deploy(project, entry_point, s3_code_uri, profile): """ Copy a local project to S3. """ - p = project(entry_point, profile) - p.deploy(s3_code_uri) + p = project(entry_point) + p.deploy(s3_code_uri, profile) @click.command() diff --git a/src/emr_cli/packaging/python_files_project.py b/src/emr_cli/packaging/python_files_project.py index f23f316..50a8c91 100644 --- a/src/emr_cli/packaging/python_files_project.py +++ b/src/emr_cli/packaging/python_files_project.py @@ -10,11 +10,6 @@ class PythonFilesProject(DeploymentPackage): - def __init__(self, profile: str = None): - super().__init__(profile) - - self.s3_client = self.aws_session.client("s3") - """ A PythonFilesProject is a simple project that includes multiple `.py` files. @@ -35,17 +30,26 @@ def build(self): relpath = os.path.relpath(file, cwd) zf.write(file, relpath) - def deploy(self, s3_code_uri: str) -> str: + def deploy(self, s3_code_uri: str, profile: str = None) -> str: """ Copies local code to S3 and returns the path to the uploaded entrypoint """ bucket, prefix = parse_bucket_uri(s3_code_uri) filename = os.path.basename(self.entry_point_path) + + aws_session = "" + if profile: + aws_session = boto3.session.Session(profile_name=profile) + else: + aws_session = boto3.session.Session() + + s3_client = aws_session.client("s3") + console_log(f"Deploying {filename} and local python modules to {s3_code_uri}") - self.s3_client.upload_file(self.entry_point_path, bucket, f"{prefix}/{filename}") - self.s3_client.upload_file( + s3_client.upload_file(self.entry_point_path, bucket, f"{prefix}/{filename}") + s3_client.upload_file( f"{self.dist_dir}/pyfiles.zip", bucket, f"{prefix}/pyfiles.zip" ) diff --git a/src/emr_cli/packaging/python_poetry_project.py b/src/emr_cli/packaging/python_poetry_project.py index 39ced19..a3d64cd 100644 --- a/src/emr_cli/packaging/python_poetry_project.py +++ b/src/emr_cli/packaging/python_poetry_project.py @@ -14,12 +14,6 @@ class PythonPoetryProject(DeploymentPackage): - - def __init__(self, profile: str = None): - super().__init__(profile) - - self.s3_client = self.aws_session.client("s3") - def initialize(self, target_dir: str = os.getcwd()): """ Initializes a poetry-based pyspark project in the provided directory. @@ -74,7 +68,7 @@ def _dockerfile_path(self) -> str: ) return os.path.join(templates, "Dockerfile") - def deploy(self, s3_code_uri: str) -> str: + def deploy(self, s3_code_uri: str, profile: str = None) -> str: """ Copies local code to S3 and returns the path to the uploaded entrypoint """ @@ -83,7 +77,16 @@ def deploy(self, s3_code_uri: str) -> str: console_log(f"Deploying {filename} and dependencies to {s3_code_uri}") - self.s3_client.upload_file( + aws_session = "" + + if profile: + aws_session = boto3.session.Session(profile_name=profile) + else: + aws_session = boto3.session.Session() + + s3_client = aws_session.client("s3") + + s3_client.upload_file( self.entry_point_path, bucket, os.path.join(prefix, filename) ) self.s3_client.upload_file( diff --git a/src/emr_cli/packaging/python_project.py b/src/emr_cli/packaging/python_project.py index 37216b0..4879d90 100644 --- a/src/emr_cli/packaging/python_project.py +++ b/src/emr_cli/packaging/python_project.py @@ -13,11 +13,6 @@ class PythonProject(DeploymentPackage): - def __init__(self, profile: str = None): - super().__init__(profile) - - self.s3_client = self.aws_session.client("s3") - def initialize(self, target_dir: str = os.getcwd()): """ Initializes a pyspark project in the provided directory. @@ -71,7 +66,7 @@ def _run_docker_build(self, output_dir: str): env=dict(os.environ, DOCKER_BUILDKIT="1"), ) - def deploy(self, s3_code_uri: str) -> str: + def deploy(self, s3_code_uri: str, profile: str = None) -> str: """ Copies local code to S3 and returns the path to the uploaded entrypoint """ @@ -82,8 +77,17 @@ def deploy(self, s3_code_uri: str) -> str: console_log(f"Deploying {filename} and dependencies to {self.s3_uri_base}") - self.s3_client.upload_file(self.entry_point_path, bucket, f"{prefix}/{filename}") - self.s3_client.upload_file( + aws_session = "" + + if profile: + aws_session = boto3.session.Session(profile_name=profile) + else: + aws_session = boto3.session.Session() + + s3_client = aws_session.client("s3") + + s3_client.upload_file(self.entry_point_path, bucket, f"{prefix}/{filename}") + s3_client.upload_file( f"{self.dist_dir}/pyspark_deps.tar.gz", bucket, f"{prefix}/pyspark_deps.tar.gz", diff --git a/src/emr_cli/packaging/simple_project.py b/src/emr_cli/packaging/simple_project.py index acb4ff0..3d06c15 100644 --- a/src/emr_cli/packaging/simple_project.py +++ b/src/emr_cli/packaging/simple_project.py @@ -21,7 +21,7 @@ def __init__(self, profile: str = None): def build(self): pass - def deploy(self, s3_code_uri: str) -> str: + def deploy(self, s3_code_uri: str, profile: str = None) -> str: """ Copies local code to S3 and returns the path to the uploaded entrypoint """ @@ -31,6 +31,15 @@ def deploy(self, s3_code_uri: str) -> str: console_log(f"Deploying {filename} to {s3_code_uri}") - self.s3_client.upload_file(self.entry_point_path, bucket, f"{prefix}/{filename}") + aws_session = "" + + if profile: + aws_session = boto3.session.Session(profile_name=profile) + else: + aws_session = boto3.session.Session() + + s3_client = aws_session.client("s3") + + s3_client.upload_file(self.entry_point_path, bucket, f"{prefix}/{filename}") return f"s3://{bucket}/{prefix}/{filename}" From df595c786e4ce6215877f63064f71e330c71e770 Mon Sep 17 00:00:00 2001 From: mouhib Date: Mon, 24 Apr 2023 22:12:41 +0100 Subject: [PATCH 15/18] fix the profile parameter --- src/emr_cli/deployments/emr_ec2.py | 4 +++- src/emr_cli/deployments/emr_serverless.py | 6 ++++-- src/emr_cli/emr_cli.py | 20 +++++++++++++------- 3 files changed, 20 insertions(+), 10 deletions(-) diff --git a/src/emr_cli/deployments/emr_ec2.py b/src/emr_cli/deployments/emr_ec2.py index 4aba7e3..71db1a0 100644 --- a/src/emr_cli/deployments/emr_ec2.py +++ b/src/emr_cli/deployments/emr_ec2.py @@ -14,13 +14,15 @@ class EMREC2 (EmrBase): def __init__( - self, cluster_id: str, deployment_package: DeploymentPackage, region: str = "", profile: str = None + self, cluster_id: str, deployment_package: DeploymentPackage, region: str = None, profile: str = None ) -> None: super().__init__(profile) aws_session = self.aws_session + self.client = "" + if region: self.client = aws_session.client("emr", region_name=region) else: diff --git a/src/emr_cli/deployments/emr_serverless.py b/src/emr_cli/deployments/emr_serverless.py index 5ee5a7e..c1e40bc 100644 --- a/src/emr_cli/deployments/emr_serverless.py +++ b/src/emr_cli/deployments/emr_serverless.py @@ -214,8 +214,8 @@ def __init__( application_id: str, job_role: str, deployment_package: DeploymentPackage, - region: str = "", - profile: str = "" + region: str = None, + profile: str = None ) -> None: super().__init__(profile) @@ -226,6 +226,8 @@ def __init__( aws_session = self.aws_session + self.client = "" + if region: self.client = aws_session.client("emr-serverless", region_name=region) else: diff --git a/src/emr_cli/emr_cli.py b/src/emr_cli/emr_cli.py index 9793b92..83170e6 100644 --- a/src/emr_cli/emr_cli.py +++ b/src/emr_cli/emr_cli.py @@ -153,10 +153,6 @@ def deploy(project, entry_point, s3_code_uri, profile): @click.command() -@click.option( - "--profile", - help="The AWS profile to use for bootstraping the environment.", -) @click.option("--application-id", help="EMR Serverless Application ID") @click.option("--cluster-id", help="EMR on EC2 Cluster ID") @click.option( @@ -190,6 +186,14 @@ def deploy(project, entry_point, s3_code_uri, profile): default=False, is_flag=True, ) +@click.option( + "--profile", + help="The AWS profile to use for starting the job execution", +) +@click.option( + "--region", + help="The region of EMR Cluster or Serverless application where the job will be executed", +) @click.pass_obj def run( project, @@ -204,6 +208,8 @@ def run( spark_submit_opts, build, show_stdout, + profile, + region ): """ Run a project on EMR, optionally build and deploy @@ -223,7 +229,7 @@ def run( if build: p.build() - p.deploy(s3_code_uri) + p.deploy(s3_code_uri, profile) # application_id indicates EMR Serverless job if application_id is not None: @@ -235,14 +241,14 @@ def run( if job_args: job_args = job_args.split(",") - emrs = EMRServerless(application_id, job_role, p) + emrs = EMRServerless(application_id, job_role, p, profile=profile, region=region) emrs.run_job(job_name, job_args, spark_submit_opts, wait, show_stdout) # cluster_id indicates EMR on EC2 job if cluster_id is not None: if job_args: job_args = job_args.split(",") - emr = EMREC2(cluster_id, p) + emr = EMREC2(cluster_id, p, profile=profile) emr.run_job(job_name, job_args, wait, show_stdout) From 14983e2f99cdbd171f1f6d7a74e2672ba897a396 Mon Sep 17 00:00:00 2001 From: lmouhib Date: Mon, 24 Apr 2023 22:14:48 +0100 Subject: [PATCH 16/18] Update src/emr_cli/base/EmrBase.py conventions fix Co-authored-by: Damon P. Cortesi --- src/emr_cli/base/EmrBase.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emr_cli/base/EmrBase.py b/src/emr_cli/base/EmrBase.py index e89603d..3d4ff72 100644 --- a/src/emr_cli/base/EmrBase.py +++ b/src/emr_cli/base/EmrBase.py @@ -1,6 +1,6 @@ import boto3 -class EmrBase: +class EMRBase: """ Base class for defining authentication profile and any future common From a8c83ca235a97f3eebd54050cfdaf870a17688ab Mon Sep 17 00:00:00 2001 From: lmouhib Date: Mon, 24 Apr 2023 22:15:17 +0100 Subject: [PATCH 17/18] Update src/emr_cli/base/EmrBase.py Lint Co-authored-by: Damon P. Cortesi --- src/emr_cli/base/EmrBase.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emr_cli/base/EmrBase.py b/src/emr_cli/base/EmrBase.py index 3d4ff72..bd82711 100644 --- a/src/emr_cli/base/EmrBase.py +++ b/src/emr_cli/base/EmrBase.py @@ -17,7 +17,7 @@ class EMRBase: """ - aws_session = "" + aws_session: Session def __init__(self, profile): From fe7c90cca68a7115206aa5536d0063aae5383294 Mon Sep 17 00:00:00 2001 From: "Damon P. Cortesi" Date: Mon, 24 Apr 2023 16:31:35 -0700 Subject: [PATCH 18/18] Add Session and update EMRBase name --- src/emr_cli/base/EmrBase.py | 12 ++++++------ src/emr_cli/deployments/emr_ec2.py | 4 ++-- src/emr_cli/deployments/emr_serverless.py | 6 +++--- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/emr_cli/base/EmrBase.py b/src/emr_cli/base/EmrBase.py index bd82711..5cc0e01 100644 --- a/src/emr_cli/base/EmrBase.py +++ b/src/emr_cli/base/EmrBase.py @@ -1,4 +1,5 @@ -import boto3 +from boto3.session import Session + class EMRBase: @@ -12,16 +13,15 @@ class EMRBase: Attributes ---------- - aws_session : the object return by for the session initialized + aws_session : the object return by for the session initialized using the profile passed through the extending class - + """ aws_session: Session def __init__(self, profile): - if profile: - self.aws_session = boto3.session.Session(profile_name=profile) + self.aws_session = Session(profile_name=profile) else: - self.aws_session = boto3.session.Session() + self.aws_session = Session() diff --git a/src/emr_cli/deployments/emr_ec2.py b/src/emr_cli/deployments/emr_ec2.py index 71db1a0..fbc9a4f 100644 --- a/src/emr_cli/deployments/emr_ec2.py +++ b/src/emr_cli/deployments/emr_ec2.py @@ -7,12 +7,12 @@ from botocore.exceptions import ClientError, WaiterError from emr_cli.deployments.emr_serverless import DeploymentPackage from emr_cli.utils import console_log, parse_bucket_uri -from emr_cli.base.EmrBase import EmrBase +from emr_cli.base.EmrBase import EMRBase LOG_WAITER_DELAY_SEC = 30 -class EMREC2 (EmrBase): +class EMREC2(EMRBase): def __init__( self, cluster_id: str, deployment_package: DeploymentPackage, region: str = None, profile: str = None ) -> None: diff --git a/src/emr_cli/deployments/emr_serverless.py b/src/emr_cli/deployments/emr_serverless.py index c1e40bc..5704e99 100644 --- a/src/emr_cli/deployments/emr_serverless.py +++ b/src/emr_cli/deployments/emr_serverless.py @@ -10,7 +10,7 @@ from emr_cli.deployments import SparkParams from emr_cli.utils import console_log, find_files, mkdir -from emr_cli.base.EmrBase import EmrBase +from emr_cli.base.EmrBase import EMRBase class DeploymentPackage(metaclass=abc.ABCMeta): @@ -54,7 +54,7 @@ def _zip_local_pyfiles(self): zf.write(file, relpath) -class Bootstrap (EmrBase): +class Bootstrap(EMRBase): DEFAULT_S3_POLICY_NAME = "emr-cli-S3Access" DEFAULT_GLUE_POLICY_NAME = "emr-cli-GlueAccess" @@ -208,7 +208,7 @@ def _create_application(self): return app_id -class EMRServerless (EmrBase): +class EMRServerless(EMRBase): def __init__( self, application_id: str,