-
Notifications
You must be signed in to change notification settings - Fork 14
Expand file tree
/
Copy pathpython_files_project.py
More file actions
64 lines (49 loc) · 2.09 KB
/
python_files_project.py
File metadata and controls
64 lines (49 loc) · 2.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import os
import zipfile
import boto3
from emr_cli.deployments import SparkParams
from emr_cli.deployments.emr_serverless import DeploymentPackage
from emr_cli.utils import console_log, find_files, mkdir, parse_bucket_uri
class PythonFilesProject(DeploymentPackage):
"""
A PythonFilesProject is a simple project that includes multiple `.py` files.
This is a simple project that has no external dependencies and requires no
additional packaging. The files in the project are simply zipped up.
"""
def build(self):
"""
Zip all the files except for the entrypoint file.
"""
py_files = find_files(os.getcwd(), [".venv"], ".py")
py_files.remove(os.path.abspath(self.entry_point_path))
cwd = os.getcwd()
mkdir(self.dist_dir)
with zipfile.ZipFile(f"{self.dist_dir}/pyfiles.zip", "w") as zf:
for file in py_files:
relpath = os.path.relpath(file, cwd)
zf.write(file, relpath)
def deploy(self, s3_code_uri: str, profile: str = None) -> str:
"""
Copies local code to S3 and returns the path to the uploaded entrypoint
"""
bucket, prefix = parse_bucket_uri(s3_code_uri)
filename = os.path.basename(self.entry_point_path)
aws_session = ""
if profile:
aws_session = boto3.session.Session(profile_name=profile)
else:
aws_session = boto3.session.Session()
s3_client = aws_session.client("s3")
console_log(f"Deploying {filename} and local python modules to {s3_code_uri}")
s3_client.upload_file(self.entry_point_path, bucket, f"{prefix}/{filename}")
s3_client.upload_file(
f"{self.dist_dir}/pyfiles.zip", bucket, f"{prefix}/pyfiles.zip"
)
return f"s3://{bucket}/{prefix}/{filename}"
def spark_submit_parameters(self) -> SparkParams:
zip_path = os.path.join(self.s3_uri_base, "pyfiles.zip")
return SparkParams(
common_params={
"spark.submit.pyFiles": zip_path,
},
)