Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pandas==1.3.5
pyarrow==8.0.0
pyarrow==8.0.0
docker==6.0.1
24 changes: 24 additions & 0 deletions src/emr_cli/base/EmrBase.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import boto3

class EmrBase:

"""
Base class for defining authentication profile and any future common
EMR operations

Constructor
----------
profile : the name of the profile as store in the aws config file

Attributes
----------
aws_session : the object return by for the session initialized
using the profile passed through the extending class

"""

aws_session = ""

def __init__(self, profile):

self.aws_session = boto3.session.Session(profile_name=profile)
33 changes: 33 additions & 0 deletions src/emr_cli/deployments/emr_local_dev.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import re
import docker


class EMRLocalDev ():

def start_local_dev(
container_name: str,
spark_ui_port: int,
jupyter_port: int
) :

client = docker.from_env()

#TODO
#add volume mount option

container = client.containers.run(container_name, ports={'4040/tcp': spark_ui_port, '8888/tcp': jupyter_port}, detach=True)

logs = container.logs(stream=True)

while True:
log = logs.__next__().decode('utf-8')

matcher = re.search(r'(?<=token=)[A-Fa-f0-9]{48}', log)

if matcher:
token = matcher.group(0)

print(f'http://127.0.0.1:{jupyter_port}/lab?token={token}')
print(f'The notebook server token is: {token}')

break
31 changes: 22 additions & 9 deletions src/emr_cli/deployments/emr_serverless.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import zipfile
from time import sleep
from typing import List, Optional
from emr_cli.base.EmrBase import EmrBase

import boto3

Expand Down Expand Up @@ -50,17 +51,21 @@ def _zip_local_pyfiles(self):
zf.write(file, relpath)


class Bootstrap:
class Bootstrap (EmrBase):
DEFAULT_S3_POLICY_NAME = "emr-cli-S3Access"
DEFAULT_GLUE_POLICY_NAME = "emr-cli-GlueAccess"

def __init__(self, code_bucket: str, log_bucket: str, job_role_name: str):
def __init__(self, profile: str, code_bucket: str, log_bucket: str, job_role_name: str):
super().__init__(profile)

aws_session = self.aws_session

self.code_bucket = code_bucket
self.log_bucket = log_bucket or code_bucket
self.job_role_name = job_role_name
self.s3_client = boto3.client("s3")
self.iam_client = boto3.client("iam")
self.emrs_client = boto3.client("emr-serverless")
self.s3_client = aws_session.client("s3")
self.iam_client = aws_session.client("iam")
self.emrs_client = aws_session.client("emr-serverless")

def create_environment(self):
self._create_s3_buckets()
Expand Down Expand Up @@ -91,7 +96,8 @@ def _create_s3_buckets(self):
Creates both the source and log buckets if they don't already exist.
"""
for bucket_name in set([self.code_bucket, self.log_bucket]):
self.s3_client.create_bucket(Bucket=bucket_name)

self.s3_client.create_bucket(Bucket=bucket_name, CreateBucketConfiguration= {'LocationConstraint': self.aws_session.region_name})
console_log(f"Created S3 bucket: s3://{bucket_name}")

def _create_job_role(self):
Expand Down Expand Up @@ -199,23 +205,30 @@ def _create_application(self):
return app_id


class EMRServerless:
class EMRServerless (EmrBase):
def __init__(
self,
application_id: str,
job_role: str,
deployment_package: DeploymentPackage,
region: str = "",
profile: str = ""
) -> None:

super().__init__(profile)

self.application_id = application_id
self.job_role = job_role
self.dp = deployment_package

aws_session = self.aws_session

if region:
self.client = boto3.client("emr-serverless", region_name=region)
self.client = aws_session.client("emr-serverless", region_name=region)
else:
# Note that boto3 uses AWS_DEFAULT_REGION, not AWS_REGION
# We may want to add an extra check here for the latter.
self.client = boto3.client("emr-serverless")
self.client = aws_session.client("emr-serverless")

def run_job(
self,
Expand Down
43 changes: 40 additions & 3 deletions src/emr_cli/emr_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from emr_cli.deployments.emr_ec2 import EMREC2
from emr_cli.packaging.detector import ProjectDetector
from emr_cli.utils import console_log
from .deployments.emr_local_dev import EMRLocalDev

from .deployments.emr_serverless import Bootstrap, EMRServerless
from .packaging.python_project import PythonProject
Expand All @@ -30,6 +31,10 @@ def status(project):


@click.command()
@click.option(
"--profile",
help="The AWS profile to use for bootstraping the environment.",
)
@click.option(
"--target",
type=click.Choice(["emr-serverless"]),
Expand All @@ -53,7 +58,7 @@ def status(project):
is_flag=True,
help="Prints the commands necessary to destroy the created environment.",
)
def bootstrap(target, code_bucket, logs_bucket, job_role_name, destroy):
def bootstrap(profile, target, code_bucket, logs_bucket, job_role_name, destroy):
"""
Bootstrap an EMR Serverless environment.

Expand All @@ -62,12 +67,12 @@ def bootstrap(target, code_bucket, logs_bucket, job_role_name, destroy):
"""
if destroy:
c = ConfigReader.read()
b = Bootstrap(code_bucket, logs_bucket, job_role_name)
b = Bootstrap(profile, code_bucket, logs_bucket, job_role_name)
b.print_destroy_commands(c.get("run", {}).get("application_id", None))
exit(0)

# For EMR Serverless, we need to create an S3 bucket, a job role, and an Application
b = Bootstrap(code_bucket, logs_bucket, job_role_name)
b = Bootstrap(profile, code_bucket, logs_bucket, job_role_name)
config = b.create_environment()

# The resulting config is relevant for the "run" command
Expand Down Expand Up @@ -145,6 +150,10 @@ def deploy(project, entry_point, s3_code_uri):


@click.command()
@click.option(
"--profile",
help="The AWS profile to use for bootstraping the environment.",
)
@click.option("--application-id", help="EMR Serverless Application ID")
@click.option("--cluster-id", help="EMR on EC2 Cluster ID")
@click.option(
Expand Down Expand Up @@ -234,12 +243,40 @@ def run(
emr.run_job(job_name, job_args, wait, show_stdout)


@click.command()
@click.option(
"--container-name",
required=True,
help="The container name built by Docker"
)
@click.option(
"--spark-ui-port",
help="The port to map for spark ui",
default=4141
)
@click.option(
"--jupyter-port",
help="The port to map for jupyter",
default=8787
)
def start_local_dev(container_name, spark_ui_port, jupyter_port):
"""
Start a container with Jupyter notebook and Amazon EMR runtime
"""

EMRLocalDev.start_local_dev(container_name=container_name,
spark_ui_port=spark_ui_port,
jupyter_port=jupyter_port
)


cli.add_command(package)
cli.add_command(deploy)
cli.add_command(run)
cli.add_command(init)
cli.add_command(bootstrap)
cli.add_command(status)
cli.add_command(start_local_dev)

if __name__ == "__main__":
cli()