diff --git a/requirements.txt b/requirements.txt index 859e484..be5aee7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ pandas==1.3.5 -pyarrow==8.0.0 \ No newline at end of file +pyarrow==8.0.0 +docker==6.0.1 \ No newline at end of file diff --git a/src/emr_cli/base/EmrBase.py b/src/emr_cli/base/EmrBase.py new file mode 100644 index 0000000..5456d70 --- /dev/null +++ b/src/emr_cli/base/EmrBase.py @@ -0,0 +1,24 @@ +import boto3 + +class EmrBase: + + """ + Base class for defining authentication profile and any future common + EMR operations + + Constructor + ---------- + profile : the name of the profile as store in the aws config file + + Attributes + ---------- + aws_session : the object return by for the session initialized + using the profile passed through the extending class + + """ + + aws_session = "" + + def __init__(self, profile): + + self.aws_session = boto3.session.Session(profile_name=profile) diff --git a/src/emr_cli/deployments/emr_local_dev.py b/src/emr_cli/deployments/emr_local_dev.py new file mode 100644 index 0000000..3985c9f --- /dev/null +++ b/src/emr_cli/deployments/emr_local_dev.py @@ -0,0 +1,33 @@ +import re +import docker + + +class EMRLocalDev (): + + def start_local_dev( + container_name: str, + spark_ui_port: int, + jupyter_port: int + ) : + + client = docker.from_env() + + #TODO + #add volume mount option + + container = client.containers.run(container_name, ports={'4040/tcp': spark_ui_port, '8888/tcp': jupyter_port}, detach=True) + + logs = container.logs(stream=True) + + while True: + log = logs.__next__().decode('utf-8') + + matcher = re.search(r'(?<=token=)[A-Fa-f0-9]{48}', log) + + if matcher: + token = matcher.group(0) + + print(f'http://127.0.0.1:{jupyter_port}/lab?token={token}') + print(f'The notebook server token is: {token}') + + break \ No newline at end of file diff --git a/src/emr_cli/deployments/emr_serverless.py b/src/emr_cli/deployments/emr_serverless.py index 711792f..1d1492f 100644 --- a/src/emr_cli/deployments/emr_serverless.py +++ b/src/emr_cli/deployments/emr_serverless.py @@ -5,6 +5,7 @@ import zipfile from time import sleep from typing import List, Optional +from emr_cli.base.EmrBase import EmrBase import boto3 @@ -50,17 +51,21 @@ def _zip_local_pyfiles(self): zf.write(file, relpath) -class Bootstrap: +class Bootstrap (EmrBase): DEFAULT_S3_POLICY_NAME = "emr-cli-S3Access" DEFAULT_GLUE_POLICY_NAME = "emr-cli-GlueAccess" - def __init__(self, code_bucket: str, log_bucket: str, job_role_name: str): + def __init__(self, profile: str, code_bucket: str, log_bucket: str, job_role_name: str): + super().__init__(profile) + + aws_session = self.aws_session + self.code_bucket = code_bucket self.log_bucket = log_bucket or code_bucket self.job_role_name = job_role_name - self.s3_client = boto3.client("s3") - self.iam_client = boto3.client("iam") - self.emrs_client = boto3.client("emr-serverless") + self.s3_client = aws_session.client("s3") + self.iam_client = aws_session.client("iam") + self.emrs_client = aws_session.client("emr-serverless") def create_environment(self): self._create_s3_buckets() @@ -91,7 +96,8 @@ def _create_s3_buckets(self): Creates both the source and log buckets if they don't already exist. """ for bucket_name in set([self.code_bucket, self.log_bucket]): - self.s3_client.create_bucket(Bucket=bucket_name) + + self.s3_client.create_bucket(Bucket=bucket_name, CreateBucketConfiguration= {'LocationConstraint': self.aws_session.region_name}) console_log(f"Created S3 bucket: s3://{bucket_name}") def _create_job_role(self): @@ -199,23 +205,30 @@ def _create_application(self): return app_id -class EMRServerless: +class EMRServerless (EmrBase): def __init__( self, application_id: str, job_role: str, deployment_package: DeploymentPackage, region: str = "", + profile: str = "" ) -> None: + + super().__init__(profile) + self.application_id = application_id self.job_role = job_role self.dp = deployment_package + + aws_session = self.aws_session + if region: - self.client = boto3.client("emr-serverless", region_name=region) + self.client = aws_session.client("emr-serverless", region_name=region) else: # Note that boto3 uses AWS_DEFAULT_REGION, not AWS_REGION # We may want to add an extra check here for the latter. - self.client = boto3.client("emr-serverless") + self.client = aws_session.client("emr-serverless") def run_job( self, diff --git a/src/emr_cli/emr_cli.py b/src/emr_cli/emr_cli.py index 4506920..99e09b7 100644 --- a/src/emr_cli/emr_cli.py +++ b/src/emr_cli/emr_cli.py @@ -4,6 +4,7 @@ from emr_cli.deployments.emr_ec2 import EMREC2 from emr_cli.packaging.detector import ProjectDetector from emr_cli.utils import console_log +from .deployments.emr_local_dev import EMRLocalDev from .deployments.emr_serverless import Bootstrap, EMRServerless from .packaging.python_project import PythonProject @@ -30,6 +31,10 @@ def status(project): @click.command() +@click.option( + "--profile", + help="The AWS profile to use for bootstraping the environment.", +) @click.option( "--target", type=click.Choice(["emr-serverless"]), @@ -53,7 +58,7 @@ def status(project): is_flag=True, help="Prints the commands necessary to destroy the created environment.", ) -def bootstrap(target, code_bucket, logs_bucket, job_role_name, destroy): +def bootstrap(profile, target, code_bucket, logs_bucket, job_role_name, destroy): """ Bootstrap an EMR Serverless environment. @@ -62,12 +67,12 @@ def bootstrap(target, code_bucket, logs_bucket, job_role_name, destroy): """ if destroy: c = ConfigReader.read() - b = Bootstrap(code_bucket, logs_bucket, job_role_name) + b = Bootstrap(profile, code_bucket, logs_bucket, job_role_name) b.print_destroy_commands(c.get("run", {}).get("application_id", None)) exit(0) # For EMR Serverless, we need to create an S3 bucket, a job role, and an Application - b = Bootstrap(code_bucket, logs_bucket, job_role_name) + b = Bootstrap(profile, code_bucket, logs_bucket, job_role_name) config = b.create_environment() # The resulting config is relevant for the "run" command @@ -145,6 +150,10 @@ def deploy(project, entry_point, s3_code_uri): @click.command() +@click.option( + "--profile", + help="The AWS profile to use for bootstraping the environment.", +) @click.option("--application-id", help="EMR Serverless Application ID") @click.option("--cluster-id", help="EMR on EC2 Cluster ID") @click.option( @@ -234,12 +243,40 @@ def run( emr.run_job(job_name, job_args, wait, show_stdout) +@click.command() +@click.option( + "--container-name", + required=True, + help="The container name built by Docker" +) +@click.option( + "--spark-ui-port", + help="The port to map for spark ui", + default=4141 +) +@click.option( + "--jupyter-port", + help="The port to map for jupyter", + default=8787 +) +def start_local_dev(container_name, spark_ui_port, jupyter_port): + """ + Start a container with Jupyter notebook and Amazon EMR runtime + """ + + EMRLocalDev.start_local_dev(container_name=container_name, + spark_ui_port=spark_ui_port, + jupyter_port=jupyter_port + ) + + cli.add_command(package) cli.add_command(deploy) cli.add_command(run) cli.add_command(init) cli.add_command(bootstrap) cli.add_command(status) +cli.add_command(start_local_dev) if __name__ == "__main__": cli()