1- import glob
2- import fnmatch
31import json
42import os
53import pandas as pd
64import re
75
86from absl import logging
9- from google .cloud import storage
7+ from util .file_util import FileIO
8+ from util .file_util import file_get_matching
109
1110
1211def load_mcf_file (file : str ):
1312 """ Reads an MCF text file and returns mcf nodes."""
14- mcf_file = open (file , 'r' , encoding = 'utf-8' )
15- mcf_contents = mcf_file .read ()
16- mcf_file .close ()
13+ with FileIO (file , 'r' , encoding = 'utf-8' ) as mcf_file :
14+ mcf_contents = mcf_file .read ()
1715 # nodes separated by a blank line
1816 mcf_nodes_text = mcf_contents .split ('\n \n ' )
1917 # lines seprated as property: constraint
@@ -36,7 +34,7 @@ def load_mcf_files(path: str) -> pd.DataFrame:
3634 """ Loads all sharded mcf files in the given directory and
3735 returns a combined MCF node list."""
3836 node_list = []
39- filenames = glob . glob (path )
37+ filenames = file_get_matching (path )
4038 logging .info (f'Loading { len (filenames )} files from path { path } ' )
4139 for filename in filenames :
4240 nodes = load_mcf_file (filename )
@@ -48,49 +46,33 @@ def load_csv_data(path: str, tmp_dir: str) -> pd.DataFrame:
4846 """ Loads all matched files in the given path and
4947 returns a single combined dataframe."""
5048 df_list = []
51- pattern = path
52- if path .startswith ('gs://' ):
53- pattern = get_gcs_data (path , tmp_dir )
54-
55- filenames = glob .glob (pattern )
49+ filenames = file_get_matching (path )
5650 for filename in filenames :
57- df = pd .read_csv (filename )
58- df_list .append (df )
51+ with FileIO (filename , mode = 'r' ) as in_file :
52+ df = pd .read_csv (in_file )
53+ df_list .append (df )
5954 result = pd .concat (df_list , ignore_index = True )
6055 return result
6156
6257
6358def write_csv_data (df : pd .DataFrame , dest : str , file : str , tmp_dir : str ):
6459 """ Writes a dataframe to a CSV file with the given path."""
65- if dest .startswith ('gs://' ):
66- path = os .path .join (tmp_dir , file )
67- else :
68- path = os .path .join (dest , file )
69- with open (path , mode = 'w' , encoding = 'utf-8' ) as out_file :
60+ path = os .path .join (dest , file )
61+ with FileIO (path , mode = 'w' , encoding = 'utf-8' ) as out_file :
7062 df .to_csv (out_file , index = False , mode = 'w' , header = True )
71- if dest .startswith ('gs://' ):
72- upload_output_data (path , dest )
7363
7464
7565def write_json_data (data , dest : str , file : str , tmp_dir : str ):
7666 """ Writes data to a JSON file with the given path."""
77- if dest .startswith ('gs://' ):
78- path = os .path .join (tmp_dir , file )
79- else :
80- path = os .path .join (dest , file )
81- with open (path , mode = 'w' , encoding = 'utf-8' ) as out_file :
67+ path = os .path .join (dest , file )
68+ with FileIO (path , mode = 'w' , encoding = 'utf-8' ) as out_file :
8269 json .dump (data , out_file , indent = 4 )
83- if dest .startswith ('gs://' ):
84- upload_output_data (path , dest )
8570
8671
8772def write_mcf_nodes (nodes : list , dest : str , file : str , tmp_dir : str ):
8873 """ Writes mcf nodes to a file with the given path."""
89- if dest .startswith ('gs://' ):
90- path = os .path .join (tmp_dir , file )
91- else :
92- path = os .path .join (dest , file )
93- with open (path , mode = 'w' , encoding = 'utf-8' ) as out_file :
74+ path = os .path .join (dest , file )
75+ with FileIO (path , mode = 'w' , encoding = 'utf-8' ) as out_file :
9476 for node in nodes :
9577 if 'Node' in node :
9678 out_file .write (f'Node: { node ["Node" ]} \n ' )
@@ -102,40 +84,6 @@ def write_mcf_nodes(nodes: list, dest: str, file: str, tmp_dir: str):
10284 continue
10385 out_file .write (f'{ key } : { value } \n ' )
10486 out_file .write ('\n ' )
105- if dest .startswith ('gs://' ):
106- upload_output_data (path , dest )
107-
108-
109- def upload_output_data (src : str , dest : str ):
110- client = storage .Client ()
111- bucket_name = dest .split ('/' )[2 ]
112- bucket = client .get_bucket (bucket_name )
113- for filepath in glob .iglob (src ):
114- filename = os .path .basename (filepath )
115- logging .info ('Uploading %s to %s' , filename , dest )
116- blobname = dest [len ('gs://' + bucket_name + '/' ):] + '/' + filename
117- blob = bucket .blob (blobname )
118- blob .upload_from_filename (filepath )
119-
120-
121- def get_gcs_data (uri : str , dest_dir : str ) -> str :
122- """ Downloads files from GCS and copies them to local.
123- Args:
124- uri: single file path or wildcard format
125- dest_dir: destination folder
126- Returns:
127- path to the output file/folder
128- """
129- client = storage .Client ()
130- bucket = client .get_bucket (uri .split ('/' )[2 ])
131- file_pat = uri .split (bucket .name , 1 )[1 ][1 :]
132- dirname = os .path .dirname (file_pat )
133- for blob in bucket .list_blobs (prefix = dirname ):
134- if fnmatch .fnmatch (blob .name , file_pat ):
135- dest_file = os .path .join (dest_dir , blob .name )
136- os .makedirs (os .path .dirname (dest_file ), exist_ok = True )
137- blob .download_to_filename (dest_file )
138- return os .path .join (dest_dir , file_pat )
13987
14088
14189def load_data (path : str , tmp_dir : str ) -> list :
@@ -146,9 +94,5 @@ def load_data(path: str, tmp_dir: str) -> list:
14694 Returns:
14795 combined list of mcf nodes
14896 """
149- if path .startswith ('gs://' ):
150- os .makedirs (tmp_dir , exist_ok = True )
151- path = get_gcs_data (path , tmp_dir )
152-
15397 mcf_nodes = load_mcf_files (path )
15498 return mcf_nodes
0 commit comments