-
Notifications
You must be signed in to change notification settings - Fork 14
Expand file tree
/
Copy pathpython_files_project.py
More file actions
75 lines (62 loc) · 2.24 KB
/
Copy pathpython_files_project.py
File metadata and controls
75 lines (62 loc) · 2.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import os
import zipfile
import boto3
from emr_cli.deployments import SparkParams
from emr_cli.deployments.emr_serverless import DeploymentPackage
from emr_cli.utils import (
PrettyUploader,
console_log,
find_files,
mkdir,
parse_bucket_uri,
)
class PythonFilesProject(DeploymentPackage):
"""
A PythonFilesProject is a simple project that includes multiple `.py` files.
This is a simple project that has no external dependencies and requires no
additional packaging. The files in the project are simply zipped up.
"""
def build(self):
"""
Zip all the files except for the entrypoint file.
"""
py_files = find_files(os.getcwd(), [".venv"], ".py")
py_files.remove(os.path.abspath(self.entry_point_path))
cwd = os.getcwd()
mkdir(self.dist_dir)
with zipfile.ZipFile(f"{self.dist_dir}/pyfiles.zip", "w") as zf:
for file in py_files:
relpath = os.path.relpath(file, cwd)
zf.write(file, relpath)
def deploy(self, s3_code_uri: str, profile: str=None) -> str:
"""
Copies local code to S3 and returns the path to the uploaded entrypoint
"""
aws_session = ""
if profile:
aws_session = boto3.session.Session(profile_name=profile)
else:
aws_session = boto3.session.Session()
s3_client = aws_session.client("s3")
bucket, prefix = parse_bucket_uri(s3_code_uri)
filename = os.path.basename(self.entry_point_path)
console_log(f"Deploying {filename} and local python modules to {s3_code_uri}")
uploader = PrettyUploader(
s3_client,
bucket,
{
self.entry_point_path: os.path.join(prefix, filename),
os.path.join(self.dist_dir, "pyfiles.zip"): os.path.join(
prefix, "pyfiles.zip"
),
},
)
uploader.run()
return f"s3://{bucket}/{prefix}/{filename}"
def spark_submit_parameters(self) -> SparkParams:
zip_path = os.path.join(self.s3_uri_base, "pyfiles.zip")
return SparkParams(
common_params={
"spark.submit.pyFiles": zip_path,
},
)