Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions src/emr_cli/base/EmrBase.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from boto3.session import Session


class EMRBase:

"""
Base class for defining authentication profile and any future common
EMR operations

Constructor
----------
profile : the name of the profile as store in the aws config file

Attributes
----------
aws_session : the object return by for the session initialized
using the profile passed through the extending class

"""

aws_session: Session

def __init__(self, profile: str = None):
if profile:
self.aws_session = Session(profile_name=profile)
else:
self.aws_session = Session()
28 changes: 19 additions & 9 deletions src/emr_cli/deployments/emr_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,35 @@
from botocore.exceptions import ClientError, WaiterError
from emr_cli.deployments.emr_serverless import DeploymentPackage
from emr_cli.utils import console_log, parse_bucket_uri, print_s3_gz
from emr_cli.base.EmrBase import EMRBase

LOG_WAITER_DELAY_SEC = 30


class Bootstrap:
class Bootstrap(EMRBase):
DEFAULT_S3_POLICY_NAME = "emr-cli-S3Access"
DEFAULT_GLUE_POLICY_NAME = "emr-cli-GlueAccess"

aws_session = ""

def __init__(
self,
profile: str,
code_bucket: str,
log_bucket: str,
instance_role_name: str,
job_role_name: str,
):

super.__init__(profile)

self.code_bucket = code_bucket
self.log_bucket = log_bucket or code_bucket
self.instance_role_name = instance_role_name
self.job_role_name = job_role_name
self.s3_client = boto3.client("s3")
self.iam_client = boto3.client("iam")
self.emr_client = boto3.client("emr")
self.s3_client = self.aws_session.client("s3")
self.iam_client = self.aws_session.client("iam")
self.emr_client = self.aws_session.client("emr")

def create_environment(self):
self._create_s3_buckets()
Expand Down Expand Up @@ -88,7 +95,7 @@ def _create_s3_buckets(self):
for bucket_name in set([self.code_bucket, self.log_bucket]):
self.s3_client.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={"LocationConstraint": self.s3_client.meta.region_name},
CreateBucketConfiguration={"LocationConstraint": self.aws_session.region_name},
)
console_log(f"Created S3 bucket: s3://{bucket_name}")
self.s3_client.put_bucket_policy(Bucket=bucket_name, Policy=self._default_s3_bucket_policy(bucket_name))
Expand Down Expand Up @@ -315,19 +322,22 @@ def _create_cluster(self, security_config_name: str, instance_profile_name: str)
return cluster_id


class EMREC2:
class EMREC2(EMRBase):
def __init__(
self,
cluster_id: str,
deployment_package: DeploymentPackage,
job_role: Optional[str] = None,
region: str = "",
profile: str = "",
) -> None:

super().__init__(profile)

self.cluster_id = cluster_id
self.dp = deployment_package
self.job_role = job_role
self.client = boto3.client("emr")
self.s3_client = boto3.client("s3")
self.client = self.aws_session.client("emr")
self.s3_client = self.aws_session.client("s3")

def run_job(
self,
Expand Down
20 changes: 9 additions & 11 deletions src/emr_cli/deployments/emr_eks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,22 @@
import boto3
from emr_cli.deployments.emr_serverless import DeploymentPackage
from emr_cli.utils import console_log, print_s3_gz
from emr_cli.base.EmrBase import EMRBase


class EMREKS:
class EMREKS(EMRBase):
def __init__(
self, virtual_cluster_id: str, job_role: str, deployment_package: DeploymentPackage, region: str = ""
self, virtual_cluster_id: str, job_role: str, deployment_package: DeploymentPackage, profile: str = None
) -> None:

super().__init__(profile)

self.virtual_cluster_id = virtual_cluster_id
self.job_role = job_role
self.dp = deployment_package
self.s3_client = boto3.client("s3")
if region:
self.client = boto3.client("emr-containers", region_name=region)
self.emr_client = boto3.client("emr", region_name=region)
else:
# Note that boto3 uses AWS_DEFAULT_REGION, not AWS_REGION
# We may want to add an extra check here for the latter.
self.client = boto3.client("emr-containers")
self.emr_client = boto3.client("emr")

self.client = self.aws_session.client("emr-containers")
self.emr_client = self.aws_session.client("emr")

def fetch_latest_release_label(self):
response = self.emr_client.list_release_labels(
Expand Down
40 changes: 24 additions & 16 deletions src/emr_cli/deployments/emr_serverless.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@
import boto3
from emr_cli.deployments import SparkParams
from emr_cli.utils import console_log, find_files, mkdir, print_s3_gz

from emr_cli.base.EmrBase import EMRBase

class DeploymentPackage(metaclass=abc.ABCMeta):

aws_session = ""

def __init__(self, entry_point_path: str = "entrypoint.py", s3_target_uri: str = "") -> None:
self.entry_point_path = entry_point_path
self.dist_dir = "dist"
Expand Down Expand Up @@ -48,18 +51,23 @@ def _zip_local_pyfiles(self):
zf.write(file, relpath)


class Bootstrap:
class Bootstrap(EMRBase):
# Maybe add some UUIDs to these?
DEFAULT_S3_POLICY_NAME = "emr-cli-S3Access"
DEFAULT_GLUE_POLICY_NAME = "emr-cli-GlueAccess"

def __init__(self, code_bucket: str, log_bucket: str, job_role_name: str):
def __init__(self, profile: str, code_bucket: str, log_bucket: str, job_role_name: str):

super.__init__(profile)

aws_session = self.aws_session

self.code_bucket = code_bucket
self.log_bucket = log_bucket or code_bucket
self.job_role_name = job_role_name
self.s3_client = boto3.client("s3")
self.iam_client = boto3.client("iam")
self.emrs_client = boto3.client("emr-serverless")
self.s3_client = aws_session.client("s3")
self.iam_client = aws_session.client("iam")
self.emrs_client = aws_session.client("emr-serverless")

def create_environment(self):
self._create_s3_buckets()
Expand Down Expand Up @@ -94,7 +102,7 @@ def _create_s3_buckets(self):
self.s3_client.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={
"LocationConstraint": self.s3_client.meta.region_name # type: ignore
"LocationConstraint": self.aws_session.region_name # type: ignore
},
)

Expand Down Expand Up @@ -218,24 +226,24 @@ def _create_application(self):
return app_id


class EMRServerless:
class EMRServerless(EMRBase):
def __init__(
self,
application_id: str,
job_role: str,
deployment_package: DeploymentPackage,
region: str = "",
profile: str = ""
) -> None:

super().__init__(profile)

self.application_id = application_id
self.job_role = job_role
self.dp = deployment_package
self.s3_client = boto3.client("s3")
if region:
self.client = boto3.client("emr-serverless", region_name=region)
else:
# Note that boto3 uses AWS_DEFAULT_REGION, not AWS_REGION
# We may want to add an extra check here for the latter.
self.client = boto3.client("emr-serverless")

self.s3_client = self.aws_session.client("s3")

self.client = self.aws_session.client("emr-serverless")

def run_job(
self,
Expand Down
31 changes: 22 additions & 9 deletions src/emr_cli/emr_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,11 @@ def status(project):
is_flag=True,
help="Prints the commands necessary to destroy the created environment.",
)
def bootstrap(target, code_bucket, logs_bucket, instance_profile_name, job_role_name, destroy):
@click.option(
"--profile",
help="The AWS profile to use for bootstraping the environment.",
)
def bootstrap(target, code_bucket, logs_bucket, instance_profile_name, job_role_name, destroy, profile):
"""
Bootstrap an EMR Serverless environment.

Expand All @@ -82,9 +86,9 @@ def bootstrap(target, code_bucket, logs_bucket, instance_profile_name, job_role_
raise click.BadArgumentUsage("EMR on EC2 clusters require --instance-profile-name to be set.")

if target == "emr-serverless":
b = BootstrapEMRServerless(code_bucket, logs_bucket, job_role_name)
b = BootstrapEMRServerless(profile, code_bucket, logs_bucket, job_role_name)
else:
b = BootstrapEMRonEC2(code_bucket, logs_bucket, instance_profile_name, job_role_name)
b = BootstrapEMRonEC2(profile, code_bucket, logs_bucket, instance_profile_name, job_role_name)

resource_id = "application_id" if target == "emr-serverless" else "cluster_id"
if destroy:
Expand Down Expand Up @@ -161,13 +165,17 @@ def package(project, entry_point):
help="Where to copy code artifacts to",
required=True,
)
@click.option(
"--profile",
help="The AWS profile to use to upload the package to S3 bucket",
)
@click.pass_obj
def deploy(project, entry_point, s3_code_uri):
def deploy(project, entry_point, s3_code_uri, profile):
"""
Copy a local project to S3.
"""
p = project(entry_point)
p.deploy(s3_code_uri)
p.deploy(s3_code_uri, profile)


@click.command()
Expand Down Expand Up @@ -220,6 +228,10 @@ def deploy(project, entry_point, s3_code_uri):
default=720, # set to AWS default value (12 hours in minutes)
type=int
)
@click.option(
"--profile",
help="The AWS profile to use for starting the job execution",
)
@click.pass_obj
@click.pass_context
def run(
Expand All @@ -241,6 +253,7 @@ def run(
save_config,
emr_eks_release_label,
emr_serverless_timeout,
profile
):
"""
Run a project on EMR, optionally build and deploy
Expand Down Expand Up @@ -280,7 +293,7 @@ def run(

if build:
p.build()
p.deploy(s3_code_uri)
p.deploy(s3_code_uri, profile)

if any([application_id, virtual_cluster_id]):
# We require entry-point and job-role
Expand All @@ -296,21 +309,21 @@ def run(
if application_id is not None:
if job_args:
job_args = job_args.split(",")
emrs = EMRServerless(application_id, job_role, p)
emrs = EMRServerless(application_id, job_role, p, profile=profile)
emrs.run_job(job_name, job_args, spark_submit_opts, wait, show_stdout, s3_logs_uri, emr_serverless_timeout)

# cluster_id indicates EMR on EC2 job
if cluster_id is not None:
if job_args:
job_args = job_args.split(",")
emr = EMREC2(cluster_id, p, job_role)
emr = EMREC2(cluster_id, p, job_role, profile=profile)
emr.run_job(job_name, job_args, spark_submit_opts, wait, show_stdout)

# virtual_cluster_id is EMR on EKS
if virtual_cluster_id is not None:
if job_args:
job_args = job_args.split(",")
emreks = EMREKS(virtual_cluster_id, job_role, p)
emreks = EMREKS(virtual_cluster_id, job_role, p, profile=profile)
emreks.run_job(job_name, job_args, spark_submit_opts, wait, show_stdout, s3_logs_uri, emr_eks_release_label)


Expand Down
12 changes: 10 additions & 2 deletions src/emr_cli/packaging/python_files_project.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,19 @@ def build(self):
relpath = os.path.relpath(file, cwd)
zf.write(file, relpath)

def deploy(self, s3_code_uri: str) -> str:
def deploy(self, s3_code_uri: str, profile: str=None) -> str:
"""
Copies local code to S3 and returns the path to the uploaded entrypoint
"""
s3_client = boto3.client("s3")
aws_session = ""

if profile:
aws_session = boto3.session.Session(profile_name=profile)
else:
aws_session = boto3.session.Session()

s3_client = aws_session.client("s3")

bucket, prefix = parse_bucket_uri(s3_code_uri)
filename = os.path.basename(self.entry_point_path)

Expand Down
12 changes: 10 additions & 2 deletions src/emr_cli/packaging/python_poetry_project.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,19 @@ def _dockerfile_path(self) -> str:
)
return os.path.join(templates, "Dockerfile")

def deploy(self, s3_code_uri: str) -> str:
def deploy(self, s3_code_uri: str, profile: str=None) -> str:
"""
Copies local code to S3 and returns the path to the uploaded entrypoint
"""
s3_client = boto3.client("s3")
aws_session = ""

if profile:
aws_session = boto3.session.Session(profile_name=profile)
else:
aws_session = boto3.session.Session()

s3_client = aws_session.client("s3")

bucket, prefix = self._parse_bucket_uri(s3_code_uri)
filename = os.path.basename(self.entry_point_path)

Expand Down
13 changes: 11 additions & 2 deletions src/emr_cli/packaging/python_project.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,21 @@ def _run_docker_build(self, output_dir: str):
env=dict(os.environ, DOCKER_BUILDKIT="1"),
)

def deploy(self, s3_code_uri: str) -> str:
def deploy(self, s3_code_uri: str, profile: str = None) -> str:
"""
Copies local code to S3 and returns the path to the uploaded entrypoint
"""
self.s3_uri_base = s3_code_uri
s3_client = boto3.client("s3")

aws_session = ""

if profile:
aws_session = boto3.session.Session(profile_name=profile)
else:
aws_session = boto3.session.Session()

s3_client = aws_session.client("s3")

bucket, prefix = parse_bucket_uri(self.s3_uri_base)
filename = os.path.basename(self.entry_point_path)

Expand Down
Loading