Skip to content

Commit 2ca552e

Browse files
authored
Merge pull request #111 from MetOffice/aws_utils
Aws utils
2 parents 1b7a263 + c7631ec commit 2ca552e

10 files changed

Lines changed: 205 additions & 7 deletions

File tree

aws-scripts/sample.txt

Lines changed: 0 additions & 1 deletion
This file was deleted.

aws-scripts/sample2.txt

Lines changed: 0 additions & 1 deletion
This file was deleted.

aws-scripts/sample3.txt

Lines changed: 0 additions & 1 deletion
This file was deleted.

aws-scripts/sample4.txt

Lines changed: 0 additions & 1 deletion
This file was deleted.
File renamed without changes.
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ def _split_s3_uri(s3_uri):
3939
return parsed_uri.netloc, parsed_uri.path[1:]
4040

4141

42-
def _find_matching_s3_keys(in_fileglob):
42+
def find_matching_s3_keys(in_fileglob):
4343

4444
bucket_name, file_and_folder_name = _split_s3_uri(in_fileglob)
4545
folder_name = os.path.split(file_and_folder_name)[0]
@@ -80,7 +80,7 @@ def copy_s3_files(in_fileglob, out_folder):
8080
in_fileglob: s3 uri of flies (wild card can be used)
8181
out_folder: local path where data will be stored
8282
'''
83-
matching_keys = _find_matching_s3_keys(in_fileglob)
83+
matching_keys = find_matching_s3_keys(in_fileglob)
8484
in_bucket_name = _split_s3_uri(in_fileglob)[0]
8585
out_scheme = urlparse(out_folder).scheme
8686
for key in matching_keys:

notebooks/awsutils/main.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
from fetch_s3_file import copy_s3_files, find_matching_s3_keys
2+
import iris
3+
import os
4+
5+
6+
7+
8+
def load_data(inpath):
9+
10+
if inpath.startswith('s3'):
11+
keys = find_matching_s3_keys(inpath)
12+
s3dir = get_directory(inpath)
13+
temp_path = '/tmp'
14+
for key in keys:
15+
file = key.split('/')[-1]
16+
if os.path.exists(os.path.join(temp_path,file)) == 0:
17+
copy_s3_files(os.path.join(s3dir,key), temp_path)
18+
else:
19+
print(key, ' already exist')
20+
21+
files = inpath.split('/')[-1]
22+
data = iris.load(os.path.join(temp_path,files))
23+
return data
24+
25+
26+
def get_directory(inpath):
27+
path = inpath.split('/')
28+
dirpath='s3://'
29+
for p in path[2:-1]:
30+
dirpath = os.path.join(dirpath,p)
31+
return dirpath
32+
33+
34+
35+
def main():
36+
inpath = 's3://ias-pyprecis/data/sample_data.nc'
37+
data = load_data(inpath)
38+
print(data)
39+
40+
41+
if __name__ == "__main__":
42+
main()
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ def upload_folder_to_s3(s3_client, s3bucket, input_dir, s3_path):
1919

2020
def main():
2121
s3_client = boto3.client('s3')
22-
upload_folder_to_s3(s3_client, 'ias-pyprecis', 'data', 'data')
22+
upload_folder_to_s3(s3_client, 'ias-pyprecis', '/data/users/fris/s3_uploads/pp', 'data/pp')
2323

2424

2525
if __name__ == "__main__":

notebooks/utils.py

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
2+
import io
3+
import os
4+
import boto3
5+
from urllib.parse import urlparse
6+
from fnmatch import fnmatch
7+
from shutil import copyfile
8+
import iris
9+
10+
11+
def _fetch_s3_file(s3_uri, save_to):
12+
13+
bucket_name, key = _split_s3_uri(s3_uri)
14+
# print(f"Fetching s3 object {key} from bucket {bucket_name}")
15+
16+
client = boto3.client("s3")
17+
obj = client.get_object(
18+
Bucket=bucket_name,
19+
Key=key,
20+
)
21+
with io.FileIO(save_to, "w") as f:
22+
for i in obj["Body"]:
23+
f.write(i)
24+
25+
26+
def _save_s3_file(s3_uri, out_filename, file_to_save="/tmp/tmp"):
27+
bucket, folder = _split_s3_uri(s3_uri)
28+
out_filepath = os.path.join(folder, out_filename)
29+
# print(f"Save s3 object {out_filepath} to bucket {bucket}")
30+
client = boto3.client("s3")
31+
client.upload_file(
32+
Filename=file_to_save,
33+
Bucket=bucket,
34+
Key=out_filepath
35+
)
36+
37+
38+
def _split_s3_uri(s3_uri):
39+
parsed_uri = urlparse(s3_uri)
40+
return parsed_uri.netloc, parsed_uri.path[1:]
41+
42+
43+
def find_matching_s3_keys(in_fileglob):
44+
45+
bucket_name, file_and_folder_name = _split_s3_uri(in_fileglob)
46+
folder_name = os.path.split(file_and_folder_name)[0]
47+
all_key_responses = _get_all_files_in_s3_folder(bucket_name, folder_name)
48+
matching_keys = []
49+
for key in [k["Key"] for k in all_key_responses]:
50+
if fnmatch(key, file_and_folder_name):
51+
matching_keys.append(key)
52+
return matching_keys
53+
54+
55+
def _get_all_files_in_s3_folder(bucket_name, folder_name):
56+
client = boto3.client("s3")
57+
response = client.list_objects_v2(
58+
Bucket=bucket_name,
59+
Prefix=folder_name,
60+
)
61+
all_key_responses = []
62+
if "Contents" in response:
63+
all_key_responses = response["Contents"]
64+
while response["IsTruncated"]:
65+
continuation_token = response["NextContinuationToken"]
66+
response = client.list_objects_v2(
67+
Bucket=bucket_name,
68+
Prefix=folder_name,
69+
ContinuationToken=continuation_token,
70+
)
71+
if "Contents" in response:
72+
all_key_responses += response["Contents"]
73+
return all_key_responses
74+
75+
76+
def copy_s3_files(in_fileglob, out_folder):
77+
'''
78+
This function copy files from s3 bucket to local directory.
79+
args
80+
---
81+
in_fileglob: s3 uri of flies (wild card can be used)
82+
out_folder: local path where data will be stored
83+
'''
84+
matching_keys = find_matching_s3_keys(in_fileglob)
85+
in_bucket_name = _split_s3_uri(in_fileglob)[0]
86+
out_scheme = urlparse(out_folder).scheme
87+
for key in matching_keys:
88+
new_filename = os.path.split(key)[1]
89+
temp_filename = os.path.join("/tmp", new_filename)
90+
in_s3_uri = os.path.join("s3://{}".format(in_bucket_name), key)
91+
_fetch_s3_file(in_s3_uri, temp_filename)
92+
if out_scheme == "s3":
93+
_save_s3_file(
94+
out_folder,
95+
new_filename,
96+
temp_filename,
97+
)
98+
else:
99+
copyfile(
100+
temp_filename, os.path.join(out_folder, new_filename)
101+
)
102+
os.remove(temp_filename)
103+
104+
105+
def load_data(inpath):
106+
'''
107+
This methods copy the data from s3 bucket and load the data as iris cubelist.
108+
Data is stored in data/ directory.
109+
110+
input: file(s) path on s3 bucket
111+
output: iris cubelist
112+
'''
113+
if inpath.startswith('s3'):
114+
keys = find_matching_s3_keys(inpath)
115+
s3dir = _get_directory(inpath)
116+
temp_path = 'data/'
117+
if os.path.exists(temp_path) == 0:
118+
os.mkdir(temp_path)
119+
120+
for key in keys:
121+
file = key.split('/')[-1]
122+
if os.path.exists(os.path.join(temp_path, file)) == 0:
123+
print(os.path.join(s3dir, file))
124+
copy_s3_files(os.path.join(s3dir, file), temp_path)
125+
else:
126+
print(key, ' already exist')
127+
files = inpath.split('/')[-1]
128+
data = iris.load(os.path.join(temp_path, files))
129+
130+
return data
131+
132+
133+
def _get_directory(inpath):
134+
path = inpath.split('/')
135+
dirpath = 's3://'
136+
for p in path[2:-1]:
137+
dirpath = os.path.join(dirpath, p)
138+
return dirpath
139+
140+
141+
def flush_data(path):
142+
'''
143+
It delete the data from compute node.
144+
145+
Input: file(s) path
146+
'''
147+
import glob
148+
files = glob.glob(path)
149+
for file in files:
150+
os.remove(file)
151+
152+
153+
def main():
154+
in_fileglob = 's3://ias-pyprecis/data/cmip5/.nc'
155+
out_folder = '/home/h01/zmaalick/myprojs/PyPRECIS/aws-scripts'
156+
copy_s3_files(in_fileglob, out_folder)
157+
158+
159+
if __name__ == "__main__":
160+
main()

0 commit comments

Comments
 (0)