diff --git a/.github/workflows/unit-tests.yaml b/.github/workflows/unit-tests.yaml index 45f98fa..2ffdf4a 100644 --- a/.github/workflows/unit-tests.yaml +++ b/.github/workflows/unit-tests.yaml @@ -1,7 +1,10 @@ name: Spark Job Unit Tests on: [push] env: - AWS_DEFAULT_REGION: us-east-1 + AWS_DEFAULT_REGION: ${{ vars.AWS_DEFAULT_REGION }} + AWS_ACCESS_KEY_ID: ${{ vars.AWS_ACCESS_KEY_ID }} + AWS_SECRET_ACCESS_KEY: ${{ vars.AWS_SECRET_ACCESS_KEY }} + jobs: pytest: strategy: diff --git a/src/emr_cli/base/EmrBase.py b/src/emr_cli/base/EmrBase.py new file mode 100644 index 0000000..5cc0e01 --- /dev/null +++ b/src/emr_cli/base/EmrBase.py @@ -0,0 +1,27 @@ +from boto3.session import Session + + +class EMRBase: + + """ + Base class for defining authentication profile and any future common + EMR operations + + Constructor + ---------- + profile : the name of the profile as store in the aws config file + + Attributes + ---------- + aws_session : the object return by for the session initialized + using the profile passed through the extending class + + """ + + aws_session: Session + + def __init__(self, profile): + if profile: + self.aws_session = Session(profile_name=profile) + else: + self.aws_session = Session() diff --git a/src/emr_cli/deployments/emr_ec2.py b/src/emr_cli/deployments/emr_ec2.py index 6cd8c9f..fbc9a4f 100644 --- a/src/emr_cli/deployments/emr_ec2.py +++ b/src/emr_cli/deployments/emr_ec2.py @@ -7,18 +7,33 @@ from botocore.exceptions import ClientError, WaiterError from emr_cli.deployments.emr_serverless import DeploymentPackage from emr_cli.utils import console_log, parse_bucket_uri +from emr_cli.base.EmrBase import EMRBase LOG_WAITER_DELAY_SEC = 30 -class EMREC2: +class EMREC2(EMRBase): def __init__( - self, cluster_id: str, deployment_package: DeploymentPackage, region: str = "" + self, cluster_id: str, deployment_package: DeploymentPackage, region: str = None, profile: str = None ) -> None: + + super().__init__(profile) + + aws_session = self.aws_session + + self.client = "" + + if region: + self.client = aws_session.client("emr", region_name=region) + else: + # Note that boto3 uses AWS_DEFAULT_REGION, not AWS_REGION + # We may want to add an extra check here for the latter. + self.client = aws_session.client("emr") + self.cluster_id = cluster_id self.dp = deployment_package - self.client = boto3.client("emr") - self.s3_client = boto3.client("s3") + + self.s3_client = aws_session.client("s3") def run_job( self, diff --git a/src/emr_cli/deployments/emr_serverless.py b/src/emr_cli/deployments/emr_serverless.py index 711792f..5704e99 100644 --- a/src/emr_cli/deployments/emr_serverless.py +++ b/src/emr_cli/deployments/emr_serverless.py @@ -10,9 +10,13 @@ from emr_cli.deployments import SparkParams from emr_cli.utils import console_log, find_files, mkdir +from emr_cli.base.EmrBase import EMRBase class DeploymentPackage(metaclass=abc.ABCMeta): + + aws_session = "" + def __init__( self, entry_point_path: str = "entrypoint.py", s3_target_uri: str = "" ) -> None: @@ -50,17 +54,21 @@ def _zip_local_pyfiles(self): zf.write(file, relpath) -class Bootstrap: +class Bootstrap(EMRBase): DEFAULT_S3_POLICY_NAME = "emr-cli-S3Access" DEFAULT_GLUE_POLICY_NAME = "emr-cli-GlueAccess" - def __init__(self, code_bucket: str, log_bucket: str, job_role_name: str): + def __init__(self, profile: str, code_bucket: str, log_bucket: str, job_role_name: str): + super().__init__(profile) + + aws_session = self.aws_session + self.code_bucket = code_bucket self.log_bucket = log_bucket or code_bucket self.job_role_name = job_role_name - self.s3_client = boto3.client("s3") - self.iam_client = boto3.client("iam") - self.emrs_client = boto3.client("emr-serverless") + self.s3_client = aws_session.client("s3") + self.iam_client = aws_session.client("iam") + self.emrs_client = aws_session.client("emr-serverless") def create_environment(self): self._create_s3_buckets() @@ -91,7 +99,8 @@ def _create_s3_buckets(self): Creates both the source and log buckets if they don't already exist. """ for bucket_name in set([self.code_bucket, self.log_bucket]): - self.s3_client.create_bucket(Bucket=bucket_name) + + self.s3_client.create_bucket(Bucket=bucket_name, CreateBucketConfiguration= {'LocationConstraint': self.aws_session.region_name}) console_log(f"Created S3 bucket: s3://{bucket_name}") def _create_job_role(self): @@ -199,23 +208,32 @@ def _create_application(self): return app_id -class EMRServerless: +class EMRServerless(EMRBase): def __init__( self, application_id: str, job_role: str, deployment_package: DeploymentPackage, - region: str = "", + region: str = None, + profile: str = None ) -> None: + + super().__init__(profile) + self.application_id = application_id self.job_role = job_role self.dp = deployment_package + + aws_session = self.aws_session + + self.client = "" + if region: - self.client = boto3.client("emr-serverless", region_name=region) + self.client = aws_session.client("emr-serverless", region_name=region) else: # Note that boto3 uses AWS_DEFAULT_REGION, not AWS_REGION # We may want to add an extra check here for the latter. - self.client = boto3.client("emr-serverless") + self.client = aws_session.client("emr-serverless") def run_job( self, diff --git a/src/emr_cli/emr_cli.py b/src/emr_cli/emr_cli.py index 4506920..83170e6 100644 --- a/src/emr_cli/emr_cli.py +++ b/src/emr_cli/emr_cli.py @@ -30,6 +30,10 @@ def status(project): @click.command() +@click.option( + "--profile", + help="The AWS profile to use for bootstraping the environment.", +) @click.option( "--target", type=click.Choice(["emr-serverless"]), @@ -53,7 +57,7 @@ def status(project): is_flag=True, help="Prints the commands necessary to destroy the created environment.", ) -def bootstrap(target, code_bucket, logs_bucket, job_role_name, destroy): +def bootstrap(profile, target, code_bucket, logs_bucket, job_role_name, destroy): """ Bootstrap an EMR Serverless environment. @@ -62,12 +66,12 @@ def bootstrap(target, code_bucket, logs_bucket, job_role_name, destroy): """ if destroy: c = ConfigReader.read() - b = Bootstrap(code_bucket, logs_bucket, job_role_name) + b = Bootstrap(profile, code_bucket, logs_bucket, job_role_name) b.print_destroy_commands(c.get("run", {}).get("application_id", None)) exit(0) # For EMR Serverless, we need to create an S3 bucket, a job role, and an Application - b = Bootstrap(code_bucket, logs_bucket, job_role_name) + b = Bootstrap(profile, code_bucket, logs_bucket, job_role_name) config = b.create_environment() # The resulting config is relevant for the "run" command @@ -135,13 +139,17 @@ def package(project, entry_point): help="Where to copy code artifacts to", required=True, ) +@click.option( + "--profile", + help="The AWS profile to use to upload the package to S3 bucket", +) @click.pass_obj -def deploy(project, entry_point, s3_code_uri): +def deploy(project, entry_point, s3_code_uri, profile): """ Copy a local project to S3. """ p = project(entry_point) - p.deploy(s3_code_uri) + p.deploy(s3_code_uri, profile) @click.command() @@ -178,6 +186,14 @@ def deploy(project, entry_point, s3_code_uri): default=False, is_flag=True, ) +@click.option( + "--profile", + help="The AWS profile to use for starting the job execution", +) +@click.option( + "--region", + help="The region of EMR Cluster or Serverless application where the job will be executed", +) @click.pass_obj def run( project, @@ -192,6 +208,8 @@ def run( spark_submit_opts, build, show_stdout, + profile, + region ): """ Run a project on EMR, optionally build and deploy @@ -211,7 +229,7 @@ def run( if build: p.build() - p.deploy(s3_code_uri) + p.deploy(s3_code_uri, profile) # application_id indicates EMR Serverless job if application_id is not None: @@ -223,14 +241,14 @@ def run( if job_args: job_args = job_args.split(",") - emrs = EMRServerless(application_id, job_role, p) + emrs = EMRServerless(application_id, job_role, p, profile=profile, region=region) emrs.run_job(job_name, job_args, spark_submit_opts, wait, show_stdout) # cluster_id indicates EMR on EC2 job if cluster_id is not None: if job_args: job_args = job_args.split(",") - emr = EMREC2(cluster_id, p) + emr = EMREC2(cluster_id, p, profile=profile) emr.run_job(job_name, job_args, wait, show_stdout) diff --git a/src/emr_cli/packaging/python_files_project.py b/src/emr_cli/packaging/python_files_project.py index 4e3714d..50a8c91 100644 --- a/src/emr_cli/packaging/python_files_project.py +++ b/src/emr_cli/packaging/python_files_project.py @@ -9,6 +9,7 @@ class PythonFilesProject(DeploymentPackage): + """ A PythonFilesProject is a simple project that includes multiple `.py` files. @@ -29,14 +30,22 @@ def build(self): relpath = os.path.relpath(file, cwd) zf.write(file, relpath) - def deploy(self, s3_code_uri: str) -> str: + def deploy(self, s3_code_uri: str, profile: str = None) -> str: """ Copies local code to S3 and returns the path to the uploaded entrypoint """ - s3_client = boto3.client("s3") bucket, prefix = parse_bucket_uri(s3_code_uri) filename = os.path.basename(self.entry_point_path) + + aws_session = "" + if profile: + aws_session = boto3.session.Session(profile_name=profile) + else: + aws_session = boto3.session.Session() + + s3_client = aws_session.client("s3") + console_log(f"Deploying {filename} and local python modules to {s3_code_uri}") s3_client.upload_file(self.entry_point_path, bucket, f"{prefix}/{filename}") diff --git a/src/emr_cli/packaging/python_poetry_project.py b/src/emr_cli/packaging/python_poetry_project.py index dbfedce..a3d64cd 100644 --- a/src/emr_cli/packaging/python_poetry_project.py +++ b/src/emr_cli/packaging/python_poetry_project.py @@ -13,6 +13,7 @@ class PythonPoetryProject(DeploymentPackage): + def initialize(self, target_dir: str = os.getcwd()): """ Initializes a poetry-based pyspark project in the provided directory. @@ -67,20 +68,28 @@ def _dockerfile_path(self) -> str: ) return os.path.join(templates, "Dockerfile") - def deploy(self, s3_code_uri: str) -> str: + def deploy(self, s3_code_uri: str, profile: str = None) -> str: """ Copies local code to S3 and returns the path to the uploaded entrypoint """ - s3_client = boto3.client("s3") bucket, prefix = self._parse_bucket_uri(s3_code_uri) filename = os.path.basename(self.entry_point_path) console_log(f"Deploying {filename} and dependencies to {s3_code_uri}") + aws_session = "" + + if profile: + aws_session = boto3.session.Session(profile_name=profile) + else: + aws_session = boto3.session.Session() + + s3_client = aws_session.client("s3") + s3_client.upload_file( self.entry_point_path, bucket, os.path.join(prefix, filename) ) - s3_client.upload_file( + self.s3_client.upload_file( os.path.join(self.dist_dir, "pyspark_deps.tar.gz"), bucket, os.path.join(prefix, "pyspark_deps.tar.gz"), diff --git a/src/emr_cli/packaging/python_project.py b/src/emr_cli/packaging/python_project.py index fdc1a02..4879d90 100644 --- a/src/emr_cli/packaging/python_project.py +++ b/src/emr_cli/packaging/python_project.py @@ -12,6 +12,7 @@ class PythonProject(DeploymentPackage): + def initialize(self, target_dir: str = os.getcwd()): """ Initializes a pyspark project in the provided directory. @@ -65,17 +66,26 @@ def _run_docker_build(self, output_dir: str): env=dict(os.environ, DOCKER_BUILDKIT="1"), ) - def deploy(self, s3_code_uri: str) -> str: + def deploy(self, s3_code_uri: str, profile: str = None) -> str: """ Copies local code to S3 and returns the path to the uploaded entrypoint """ self.s3_uri_base = s3_code_uri - s3_client = boto3.client("s3") + bucket, prefix = parse_bucket_uri(self.s3_uri_base) filename = os.path.basename(self.entry_point_path) console_log(f"Deploying {filename} and dependencies to {self.s3_uri_base}") + aws_session = "" + + if profile: + aws_session = boto3.session.Session(profile_name=profile) + else: + aws_session = boto3.session.Session() + + s3_client = aws_session.client("s3") + s3_client.upload_file(self.entry_point_path, bucket, f"{prefix}/{filename}") s3_client.upload_file( f"{self.dist_dir}/pyspark_deps.tar.gz", diff --git a/src/emr_cli/packaging/simple_project.py b/src/emr_cli/packaging/simple_project.py index 198b3ed..3d06c15 100644 --- a/src/emr_cli/packaging/simple_project.py +++ b/src/emr_cli/packaging/simple_project.py @@ -7,6 +7,12 @@ class SimpleProject(DeploymentPackage): + + def __init__(self, profile: str = None): + super().__init__(profile) + + self.s3_client = self.aws_session.client("s3") + """ A simple project only has a single entry point file. This can be a pyspark file or packaged jar file. @@ -15,16 +21,25 @@ class SimpleProject(DeploymentPackage): def build(self): pass - def deploy(self, s3_code_uri: str) -> str: + def deploy(self, s3_code_uri: str, profile: str = None) -> str: """ Copies local code to S3 and returns the path to the uploaded entrypoint """ - s3_client = boto3.client("s3") + bucket, prefix = parse_bucket_uri(s3_code_uri) filename = os.path.basename(self.entry_point_path) console_log(f"Deploying {filename} to {s3_code_uri}") + aws_session = "" + + if profile: + aws_session = boto3.session.Session(profile_name=profile) + else: + aws_session = boto3.session.Session() + + s3_client = aws_session.client("s3") + s3_client.upload_file(self.entry_point_path, bucket, f"{prefix}/{filename}") return f"s3://{bucket}/{prefix}/{filename}" diff --git a/tests/packaging/test_python_files_project.py b/tests/packaging/test_python_files_project.py index 6ec47fa..6357c59 100644 --- a/tests/packaging/test_python_files_project.py +++ b/tests/packaging/test_python_files_project.py @@ -6,6 +6,7 @@ class TestPythonFilesProject: def test_build(self, fs): + fs.create_file("main.py") fs.create_file("lib/file1.py") fs.create_file("lib/file2.py")