Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/unit-tests.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
name: Spark Job Unit Tests
on: [push]
env:
AWS_DEFAULT_REGION: us-east-1
AWS_DEFAULT_REGION: ${{ vars.AWS_DEFAULT_REGION }}
AWS_ACCESS_KEY_ID: ${{ vars.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ vars.AWS_SECRET_ACCESS_KEY }}
Comment on lines +5 to +6
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are these used for?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are used for unit tests, they allow to init the boto3 sessions. They contain dummy values.


jobs:
pytest:
strategy:
Expand Down
27 changes: 27 additions & 0 deletions src/emr_cli/base/EmrBase.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from boto3.session import Session


class EMRBase:

"""
Base class for defining authentication profile and any future common
EMR operations

Constructor
----------
profile : the name of the profile as store in the aws config file

Attributes
----------
aws_session : the object return by for the session initialized
using the profile passed through the extending class

"""

aws_session: Session

def __init__(self, profile):
if profile:
self.aws_session = Session(profile_name=profile)
else:
self.aws_session = Session()
23 changes: 19 additions & 4 deletions src/emr_cli/deployments/emr_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,33 @@
from botocore.exceptions import ClientError, WaiterError
from emr_cli.deployments.emr_serverless import DeploymentPackage
from emr_cli.utils import console_log, parse_bucket_uri
from emr_cli.base.EmrBase import EMRBase

LOG_WAITER_DELAY_SEC = 30


class EMREC2:
class EMREC2(EMRBase):
def __init__(
self, cluster_id: str, deployment_package: DeploymentPackage, region: str = ""
self, cluster_id: str, deployment_package: DeploymentPackage, region: str = None, profile: str = None
) -> None:

super().__init__(profile)

aws_session = self.aws_session

self.client = ""

if region:
self.client = aws_session.client("emr", region_name=region)
else:
# Note that boto3 uses AWS_DEFAULT_REGION, not AWS_REGION
# We may want to add an extra check here for the latter.
self.client = aws_session.client("emr")

self.cluster_id = cluster_id
self.dp = deployment_package
self.client = boto3.client("emr")
self.s3_client = boto3.client("s3")

self.s3_client = aws_session.client("s3")

def run_job(
self,
Expand Down
38 changes: 28 additions & 10 deletions src/emr_cli/deployments/emr_serverless.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@

from emr_cli.deployments import SparkParams
from emr_cli.utils import console_log, find_files, mkdir
from emr_cli.base.EmrBase import EMRBase


class DeploymentPackage(metaclass=abc.ABCMeta):

aws_session = ""

def __init__(
self, entry_point_path: str = "entrypoint.py", s3_target_uri: str = ""
) -> None:
Expand Down Expand Up @@ -50,17 +54,21 @@ def _zip_local_pyfiles(self):
zf.write(file, relpath)


class Bootstrap:
class Bootstrap(EMRBase):
DEFAULT_S3_POLICY_NAME = "emr-cli-S3Access"
DEFAULT_GLUE_POLICY_NAME = "emr-cli-GlueAccess"

def __init__(self, code_bucket: str, log_bucket: str, job_role_name: str):
def __init__(self, profile: str, code_bucket: str, log_bucket: str, job_role_name: str):
super().__init__(profile)

aws_session = self.aws_session

self.code_bucket = code_bucket
self.log_bucket = log_bucket or code_bucket
self.job_role_name = job_role_name
self.s3_client = boto3.client("s3")
self.iam_client = boto3.client("iam")
self.emrs_client = boto3.client("emr-serverless")
self.s3_client = aws_session.client("s3")
self.iam_client = aws_session.client("iam")
self.emrs_client = aws_session.client("emr-serverless")

def create_environment(self):
self._create_s3_buckets()
Expand Down Expand Up @@ -91,7 +99,8 @@ def _create_s3_buckets(self):
Creates both the source and log buckets if they don't already exist.
"""
for bucket_name in set([self.code_bucket, self.log_bucket]):
self.s3_client.create_bucket(Bucket=bucket_name)

self.s3_client.create_bucket(Bucket=bucket_name, CreateBucketConfiguration= {'LocationConstraint': self.aws_session.region_name})
console_log(f"Created S3 bucket: s3://{bucket_name}")

def _create_job_role(self):
Expand Down Expand Up @@ -199,23 +208,32 @@ def _create_application(self):
return app_id


class EMRServerless:
class EMRServerless(EMRBase):
def __init__(
self,
application_id: str,
job_role: str,
deployment_package: DeploymentPackage,
region: str = "",
region: str = None,
profile: str = None
) -> None:

super().__init__(profile)

self.application_id = application_id
self.job_role = job_role
self.dp = deployment_package

aws_session = self.aws_session

self.client = ""

if region:
self.client = boto3.client("emr-serverless", region_name=region)
self.client = aws_session.client("emr-serverless", region_name=region)
else:
# Note that boto3 uses AWS_DEFAULT_REGION, not AWS_REGION
# We may want to add an extra check here for the latter.
self.client = boto3.client("emr-serverless")
self.client = aws_session.client("emr-serverless")

def run_job(
self,
Expand Down
34 changes: 26 additions & 8 deletions src/emr_cli/emr_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ def status(project):


@click.command()
@click.option(
"--profile",
help="The AWS profile to use for bootstraping the environment.",
)
@click.option(
"--target",
type=click.Choice(["emr-serverless"]),
Expand All @@ -53,7 +57,7 @@ def status(project):
is_flag=True,
help="Prints the commands necessary to destroy the created environment.",
)
def bootstrap(target, code_bucket, logs_bucket, job_role_name, destroy):
def bootstrap(profile, target, code_bucket, logs_bucket, job_role_name, destroy):
"""
Bootstrap an EMR Serverless environment.

Expand All @@ -62,12 +66,12 @@ def bootstrap(target, code_bucket, logs_bucket, job_role_name, destroy):
"""
if destroy:
c = ConfigReader.read()
b = Bootstrap(code_bucket, logs_bucket, job_role_name)
b = Bootstrap(profile, code_bucket, logs_bucket, job_role_name)
b.print_destroy_commands(c.get("run", {}).get("application_id", None))
exit(0)

# For EMR Serverless, we need to create an S3 bucket, a job role, and an Application
b = Bootstrap(code_bucket, logs_bucket, job_role_name)
b = Bootstrap(profile, code_bucket, logs_bucket, job_role_name)
config = b.create_environment()

# The resulting config is relevant for the "run" command
Expand Down Expand Up @@ -135,13 +139,17 @@ def package(project, entry_point):
help="Where to copy code artifacts to",
required=True,
)
@click.option(
"--profile",
help="The AWS profile to use to upload the package to S3 bucket",
)
@click.pass_obj
def deploy(project, entry_point, s3_code_uri):
def deploy(project, entry_point, s3_code_uri, profile):
"""
Copy a local project to S3.
"""
p = project(entry_point)
p.deploy(s3_code_uri)
p.deploy(s3_code_uri, profile)


@click.command()
Expand Down Expand Up @@ -178,6 +186,14 @@ def deploy(project, entry_point, s3_code_uri):
default=False,
is_flag=True,
)
@click.option(
"--profile",
help="The AWS profile to use for starting the job execution",
)
@click.option(
"--region",
help="The region of EMR Cluster or Serverless application where the job will be executed",
)
@click.pass_obj
def run(
project,
Expand All @@ -192,6 +208,8 @@ def run(
spark_submit_opts,
build,
show_stdout,
profile,
region
):
"""
Run a project on EMR, optionally build and deploy
Expand All @@ -211,7 +229,7 @@ def run(

if build:
p.build()
p.deploy(s3_code_uri)
p.deploy(s3_code_uri, profile)

# application_id indicates EMR Serverless job
if application_id is not None:
Expand All @@ -223,14 +241,14 @@ def run(

if job_args:
job_args = job_args.split(",")
emrs = EMRServerless(application_id, job_role, p)
emrs = EMRServerless(application_id, job_role, p, profile=profile, region=region)
emrs.run_job(job_name, job_args, spark_submit_opts, wait, show_stdout)

# cluster_id indicates EMR on EC2 job
if cluster_id is not None:
if job_args:
job_args = job_args.split(",")
emr = EMREC2(cluster_id, p)
emr = EMREC2(cluster_id, p, profile=profile)
emr.run_job(job_name, job_args, wait, show_stdout)


Expand Down
13 changes: 11 additions & 2 deletions src/emr_cli/packaging/python_files_project.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@


class PythonFilesProject(DeploymentPackage):

"""
A PythonFilesProject is a simple project that includes multiple `.py` files.

Expand All @@ -29,14 +30,22 @@ def build(self):
relpath = os.path.relpath(file, cwd)
zf.write(file, relpath)

def deploy(self, s3_code_uri: str) -> str:
def deploy(self, s3_code_uri: str, profile: str = None) -> str:
"""
Copies local code to S3 and returns the path to the uploaded entrypoint
"""
s3_client = boto3.client("s3")
bucket, prefix = parse_bucket_uri(s3_code_uri)
filename = os.path.basename(self.entry_point_path)

aws_session = ""

if profile:
aws_session = boto3.session.Session(profile_name=profile)
else:
aws_session = boto3.session.Session()

s3_client = aws_session.client("s3")

console_log(f"Deploying {filename} and local python modules to {s3_code_uri}")

s3_client.upload_file(self.entry_point_path, bucket, f"{prefix}/{filename}")
Expand Down
15 changes: 12 additions & 3 deletions src/emr_cli/packaging/python_poetry_project.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@


class PythonPoetryProject(DeploymentPackage):

def initialize(self, target_dir: str = os.getcwd()):
"""
Initializes a poetry-based pyspark project in the provided directory.
Expand Down Expand Up @@ -67,20 +68,28 @@ def _dockerfile_path(self) -> str:
)
return os.path.join(templates, "Dockerfile")

def deploy(self, s3_code_uri: str) -> str:
def deploy(self, s3_code_uri: str, profile: str = None) -> str:
"""
Copies local code to S3 and returns the path to the uploaded entrypoint
"""
s3_client = boto3.client("s3")
bucket, prefix = self._parse_bucket_uri(s3_code_uri)
filename = os.path.basename(self.entry_point_path)

console_log(f"Deploying {filename} and dependencies to {s3_code_uri}")

aws_session = ""

if profile:
aws_session = boto3.session.Session(profile_name=profile)
else:
aws_session = boto3.session.Session()

s3_client = aws_session.client("s3")

s3_client.upload_file(
self.entry_point_path, bucket, os.path.join(prefix, filename)
)
s3_client.upload_file(
self.s3_client.upload_file(
os.path.join(self.dist_dir, "pyspark_deps.tar.gz"),
bucket,
os.path.join(prefix, "pyspark_deps.tar.gz"),
Expand Down
14 changes: 12 additions & 2 deletions src/emr_cli/packaging/python_project.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@


class PythonProject(DeploymentPackage):

def initialize(self, target_dir: str = os.getcwd()):
"""
Initializes a pyspark project in the provided directory.
Expand Down Expand Up @@ -65,17 +66,26 @@ def _run_docker_build(self, output_dir: str):
env=dict(os.environ, DOCKER_BUILDKIT="1"),
)

def deploy(self, s3_code_uri: str) -> str:
def deploy(self, s3_code_uri: str, profile: str = None) -> str:
"""
Copies local code to S3 and returns the path to the uploaded entrypoint
"""
self.s3_uri_base = s3_code_uri
s3_client = boto3.client("s3")

bucket, prefix = parse_bucket_uri(self.s3_uri_base)
filename = os.path.basename(self.entry_point_path)

console_log(f"Deploying {filename} and dependencies to {self.s3_uri_base}")

aws_session = ""

if profile:
aws_session = boto3.session.Session(profile_name=profile)
else:
aws_session = boto3.session.Session()

s3_client = aws_session.client("s3")

s3_client.upload_file(self.entry_point_path, bucket, f"{prefix}/{filename}")
s3_client.upload_file(
f"{self.dist_dir}/pyspark_deps.tar.gz",
Expand Down
Loading