Skip to content

Commit b42a569

Browse files
committed
Address reviewer comments
1 parent 54112d0 commit b42a569

14 files changed

Lines changed: 247 additions & 274 deletions

tools/import_differ/README.md

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,16 @@ This utility generates a diff of two versions of a dataset for import analysis.
66

77
***Prerequisites***
88
- Python/Pandas is installed for native runner mode.
9+
- Java JRE/JDK is installed for direct runner mode.
910
- gcloud ADC is configured for cloud runner mode.
1011

1112
```bash
12-
python import_differ.py \
13+
python3 import_differ.py \
1314
--current_data=<path> \
1415
--previous_data=<path> \
1516
--output_location=<path> \
1617
--file_format=<mcf/tfrecord> \
17-
--runner_mode=<local/cloud> \
18+
--runner_mode=<native/direct/cloud> \
1819
--project_id=<id> \
1920
--job_name=<name>
2021
```
@@ -31,18 +32,31 @@ python import_differ.py \
3132

3233
***Output***
3334

34-
Summary output generated is of the form below showing counts of differences for each variable.
35-
36-
| variableMeasured | ADDED | DELETED | MODIFIED |
37-
| :--- | :--- | :--- | :--- |
38-
| dcid:var1 | 1 | 0 | 0 |
39-
| dcid:var2 | 0 | 2 | 1 |
40-
| dcid:var3 | 0 | 0 | 1 |
41-
| dcid:var4 | 0 | 2 | 0 |
42-
43-
Detailed diff output is written to files for further analysis. Sample result files can be found under folder 'test/results'.
44-
- obs\_diff\_summary.csv: diff summary for observation analysis
45-
- obs\_diff\_samples.csv: sample diff for observation analysis
46-
- obs\_diff\_log.csv: diff log for observations
47-
- schema\_diff\_summary.csv: diff summary for schema analysis
48-
- schema\_diff\_log.csv: diff log for schema nodes
35+
The utility generates a summary of the differences and detailed MCF files.
36+
37+
**Summary Output**
38+
A summary is printed to the logs and also written to `differ_summary.json` in the output directory:
39+
```json
40+
{
41+
"current_version": "path/to/current",
42+
"previous_version": "path/to/previous",
43+
"current_obs_count": 1000,
44+
"previous_obs_count": 950,
45+
"current_schema_count": 100,
46+
"previous_schema_count": 95,
47+
"added_obs_count": 50,
48+
"deleted_obs_count": 0,
49+
"modified_obs_count": 10,
50+
"added_schema_count": 5,
51+
"deleted_schema_count": 0,
52+
"modified_schema_count": 0,
53+
"obs_diff_count": 60,
54+
"schema_diff_count": 5
55+
}
56+
```
57+
58+
**Detailed Diff Files**
59+
Detailed diff output is written to MCF files in the output directory:
60+
- nodes-added.mcf: MCF nodes added in the current version
61+
- nodes-deleted.mcf: MCF nodes deleted in the current version
62+
- nodes-modified.mcf: MCF nodes modified in the current version
Lines changed: 15 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,17 @@
1-
import glob
2-
import fnmatch
31
import json
42
import os
53
import pandas as pd
64
import re
75

86
from absl import logging
9-
from google.cloud import storage
7+
from util.file_util import FileIO
8+
from util.file_util import file_get_matching
109

1110

1211
def load_mcf_file(file: str):
1312
""" Reads an MCF text file and returns mcf nodes."""
14-
mcf_file = open(file, 'r', encoding='utf-8')
15-
mcf_contents = mcf_file.read()
16-
mcf_file.close()
13+
with FileIO(file, 'r', encoding='utf-8') as mcf_file:
14+
mcf_contents = mcf_file.read()
1715
# nodes separated by a blank line
1816
mcf_nodes_text = mcf_contents.split('\n\n')
1917
# lines seprated as property: constraint
@@ -36,7 +34,7 @@ def load_mcf_files(path: str) -> pd.DataFrame:
3634
""" Loads all sharded mcf files in the given directory and
3735
returns a combined MCF node list."""
3836
node_list = []
39-
filenames = glob.glob(path)
37+
filenames = file_get_matching(path)
4038
logging.info(f'Loading {len(filenames)} files from path {path}')
4139
for filename in filenames:
4240
nodes = load_mcf_file(filename)
@@ -48,49 +46,33 @@ def load_csv_data(path: str, tmp_dir: str) -> pd.DataFrame:
4846
""" Loads all matched files in the given path and
4947
returns a single combined dataframe."""
5048
df_list = []
51-
pattern = path
52-
if path.startswith('gs://'):
53-
pattern = get_gcs_data(path, tmp_dir)
54-
55-
filenames = glob.glob(pattern)
49+
filenames = file_get_matching(path)
5650
for filename in filenames:
57-
df = pd.read_csv(filename)
58-
df_list.append(df)
51+
with FileIO(filename, mode='r') as in_file:
52+
df = pd.read_csv(in_file)
53+
df_list.append(df)
5954
result = pd.concat(df_list, ignore_index=True)
6055
return result
6156

6257

6358
def write_csv_data(df: pd.DataFrame, dest: str, file: str, tmp_dir: str):
6459
""" Writes a dataframe to a CSV file with the given path."""
65-
if dest.startswith('gs://'):
66-
path = os.path.join(tmp_dir, file)
67-
else:
68-
path = os.path.join(dest, file)
69-
with open(path, mode='w', encoding='utf-8') as out_file:
60+
path = os.path.join(dest, file)
61+
with FileIO(path, mode='w', encoding='utf-8') as out_file:
7062
df.to_csv(out_file, index=False, mode='w', header=True)
71-
if dest.startswith('gs://'):
72-
upload_output_data(path, dest)
7363

7464

7565
def write_json_data(data, dest: str, file: str, tmp_dir: str):
7666
""" Writes data to a JSON file with the given path."""
77-
if dest.startswith('gs://'):
78-
path = os.path.join(tmp_dir, file)
79-
else:
80-
path = os.path.join(dest, file)
81-
with open(path, mode='w', encoding='utf-8') as out_file:
67+
path = os.path.join(dest, file)
68+
with FileIO(path, mode='w', encoding='utf-8') as out_file:
8269
json.dump(data, out_file, indent=4)
83-
if dest.startswith('gs://'):
84-
upload_output_data(path, dest)
8570

8671

8772
def write_mcf_nodes(nodes: list, dest: str, file: str, tmp_dir: str):
8873
""" Writes mcf nodes to a file with the given path."""
89-
if dest.startswith('gs://'):
90-
path = os.path.join(tmp_dir, file)
91-
else:
92-
path = os.path.join(dest, file)
93-
with open(path, mode='w', encoding='utf-8') as out_file:
74+
path = os.path.join(dest, file)
75+
with FileIO(path, mode='w', encoding='utf-8') as out_file:
9476
for node in nodes:
9577
if 'Node' in node:
9678
out_file.write(f'Node: {node["Node"]}\n')
@@ -102,40 +84,6 @@ def write_mcf_nodes(nodes: list, dest: str, file: str, tmp_dir: str):
10284
continue
10385
out_file.write(f'{key}: {value}\n')
10486
out_file.write('\n')
105-
if dest.startswith('gs://'):
106-
upload_output_data(path, dest)
107-
108-
109-
def upload_output_data(src: str, dest: str):
110-
client = storage.Client()
111-
bucket_name = dest.split('/')[2]
112-
bucket = client.get_bucket(bucket_name)
113-
for filepath in glob.iglob(src):
114-
filename = os.path.basename(filepath)
115-
logging.info('Uploading %s to %s', filename, dest)
116-
blobname = dest[len('gs://' + bucket_name + '/'):] + '/' + filename
117-
blob = bucket.blob(blobname)
118-
blob.upload_from_filename(filepath)
119-
120-
121-
def get_gcs_data(uri: str, dest_dir: str) -> str:
122-
""" Downloads files from GCS and copies them to local.
123-
Args:
124-
uri: single file path or wildcard format
125-
dest_dir: destination folder
126-
Returns:
127-
path to the output file/folder
128-
"""
129-
client = storage.Client()
130-
bucket = client.get_bucket(uri.split('/')[2])
131-
file_pat = uri.split(bucket.name, 1)[1][1:]
132-
dirname = os.path.dirname(file_pat)
133-
for blob in bucket.list_blobs(prefix=dirname):
134-
if fnmatch.fnmatch(blob.name, file_pat):
135-
dest_file = os.path.join(dest_dir, blob.name)
136-
os.makedirs(os.path.dirname(dest_file), exist_ok=True)
137-
blob.download_to_filename(dest_file)
138-
return os.path.join(dest_dir, file_pat)
13987

14088

14189
def load_data(path: str, tmp_dir: str) -> list:
@@ -146,9 +94,5 @@ def load_data(path: str, tmp_dir: str) -> list:
14694
Returns:
14795
combined list of mcf nodes
14896
"""
149-
if path.startswith('gs://'):
150-
os.makedirs(tmp_dir, exist_ok=True)
151-
path = get_gcs_data(path, tmp_dir)
152-
15397
mcf_nodes = load_mcf_files(path)
15498
return mcf_nodes

tools/import_differ/import_differ.py

Lines changed: 50 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@
3737

3838
_DATAFLOW_TEMPLATE_URL = 'gs://datcom-templates/templates/flex/differ.json'
3939

40+
_GROUPBY_KEYS = [
41+
'variableMeasured', 'observationAbout', 'observationDate',
42+
'observationPeriod', 'measurementMethod', 'unit', 'scalingFactor'
43+
]
44+
4045
Diff = Enum('Diff', [
4146
('ADDED', 1),
4247
('DELETED', 2),
@@ -72,6 +77,14 @@
7277
flags.DEFINE_string('project_id', '', 'GCP project id for the dataflow job.')
7378

7479

80+
def val_str(value) -> str:
81+
if isinstance(value, list):
82+
return ",".join([val_str(v) for v in value])
83+
if value and isinstance(value, str) and " " in value and value[0].isalpha():
84+
return '"' + value + '"'
85+
return str(value)
86+
87+
7588
class ImportDiffer:
7689
"""
7790
Utility to generate a diff of two versions of a dataset for import analysis.
@@ -80,6 +93,11 @@ class ImportDiffer:
8093
$ python import_differ.py --current_data=<path> --previous_data=<path> --output_location=<path> \
8194
--file_format=<mcf/tfrecord> --runner_mode=<native/direct/cloud> --project_id=<id> --job_name=<name>
8295
96+
Runner Modes:
97+
- native: Runs the differ using native Python (Pandas) locally.
98+
- direct: Runs the differ using the Apache Beam DirectRunner (Java jar) locally.
99+
- cloud: Runs the differ as a Dataflow job in GCP.
100+
83101
Summary output generated is of the form below showing
84102
counts of differences for each variable.
85103
@@ -90,7 +108,9 @@ class ImportDiffer:
90108
3 dcid:var4 0 2 0
91109
92110
Detailed diff output is written to files for further analysis.
93-
- import-diff.mcf: combined MCF diff for observations and schema
111+
- nodes-added.mcf: MCF nodes added in the current version
112+
- nodes-deleted.mcf: MCF nodes deleted in the current version
113+
- nodes-modified.mcf: MCF nodes modified in the current version
94114
- differ_summary.json: consolidated diff statistics
95115
96116
"""
@@ -139,7 +159,7 @@ def generate_diff(self, previous_df: pd.DataFrame,
139159
elif previous_df.empty and current_df.empty:
140160
column_list = [
141161
Column.key_combined.name, Column.value_combined.name + '_x',
142-
Column.value_combined.name + '_y' + Column.diff_type.name
162+
Column.value_combined.name + '_y', Column.diff_type.name
143163
]
144164
return pd.DataFrame(columns=column_list)
145165
result = pd.merge(previous_df,
@@ -160,7 +180,7 @@ def generate_diff(self, previous_df: pd.DataFrame,
160180
if result.empty:
161181
column_list = [
162182
Column.key_combined.name, Column.value_combined.name + '_x',
163-
Column.value_combined.name + '_y' + Column.diff_type.name
183+
Column.value_combined.name + '_y', Column.diff_type.name
164184
]
165185
return pd.DataFrame(columns=column_list)
166186

@@ -179,13 +199,8 @@ def split_data(self, mcf_nodes: list) -> (pd.DataFrame, pd.DataFrame):
179199
if 'StatVarObservation' in node.get(Column.typeOf.name):
180200
values_to_combine = []
181201
keys_to_combine = []
182-
groupby_keys = [
183-
'variableMeasured', 'observationAbout', 'observationDate',
184-
'observationPeriod', 'measurementMethod', 'unit',
185-
'scalingFactor'
186-
]
187202
value_keys = [Column.value.name]
188-
for key in groupby_keys:
203+
for key in _GROUPBY_KEYS:
189204
keys_to_combine.append(str(node.get(key, "")))
190205
for key in value_keys:
191206
values_to_combine.append(str(node.get(key, "")))
@@ -211,7 +226,8 @@ def split_data(self, mcf_nodes: list) -> (pd.DataFrame, pd.DataFrame):
211226
node.pop('Node', None)
212227
value_keys = sorted(node.keys())
213228
for key in value_keys:
214-
values_to_combine.append(key + ":" + str(node.get(key, "")))
229+
values_to_combine.append(key + ":" +
230+
val_str(node.get(key, "")))
215231
key_combined = ";".join(keys_to_combine)
216232
value_combined = ";".join(values_to_combine)
217233
schema_list.append({
@@ -224,16 +240,19 @@ def split_data(self, mcf_nodes: list) -> (pd.DataFrame, pd.DataFrame):
224240
obs_df = pd.DataFrame(obs_list)
225241
return obs_df, schema_df
226242

227-
def convert_diff_to_mcf_nodes(self, diff_df: pd.DataFrame,
228-
is_obs: bool) -> list:
243+
def convert_diff_to_mcf_nodes(self,
244+
diff_df: pd.DataFrame,
245+
is_obs: bool,
246+
diff_type: str = None) -> list:
229247
"""
230248
Converts the diff dataframe back to MCF format nodes.
231249
"""
232250
all_nodes = []
233-
for diff_type in [
234-
Diff.ADDED.name, Diff.DELETED.name, Diff.MODIFIED.name
235-
]:
236-
df_type = diff_df[diff_df[Column.diff_type.name] == diff_type]
251+
diff_types = [diff_type] if diff_type else [
252+
Diff.ADDED.name, Diff.DELETED.name, Diff.MODIFIED.name
253+
]
254+
for d_type in diff_types:
255+
df_type = diff_df[diff_df[Column.diff_type.name] == d_type]
237256
if df_type.empty:
238257
continue
239258

@@ -242,7 +261,7 @@ def convert_diff_to_mcf_nodes(self, diff_df: pd.DataFrame,
242261
key_combined = str(row[Column.key_combined.name])
243262

244263
# Determine which column to use for values and node IDs
245-
suffix = '_x' if diff_type == Diff.DELETED.name else '_y'
264+
suffix = '_x' if d_type == Diff.DELETED.name else '_y'
246265

247266
# Helper to get value from row, handles cases with or without suffix
248267
def get_val(base_name):
@@ -262,13 +281,8 @@ def get_val(base_name):
262281
node['dcid'] = dcid_id
263282

264283
# Reconstruct observation node
265-
groupby_keys = [
266-
'variableMeasured', 'observationAbout',
267-
'observationDate', 'observationPeriod',
268-
'measurementMethod', 'unit', 'scalingFactor'
269-
]
270284
keys = key_combined.split(';')
271-
for i, key in enumerate(groupby_keys):
285+
for i, key in enumerate(_GROUPBY_KEYS):
272286
if i < len(keys) and keys[i] and keys[i] != "nan":
273287
node[key] = keys[i]
274288

@@ -290,7 +304,6 @@ def get_val(base_name):
290304
k, v = kv.split(':', 1)
291305
node[k] = v
292306

293-
node['diffType'] = diff_type
294307
all_nodes.append(node)
295308
return all_nodes
296309

@@ -422,12 +435,19 @@ def run_differ(self):
422435
current_df_schema)
423436

424437
logging.info('Writing diff to MCF files...')
425-
obs_nodes = self.convert_diff_to_mcf_nodes(obs_diff, True)
426-
schema_nodes = self.convert_diff_to_mcf_nodes(schema_diff, False)
427-
all_nodes = obs_nodes + schema_nodes
428-
if all_nodes:
429-
differ_utils.write_mcf_nodes(all_nodes, self.output_path,
430-
'import_diff.mcf', tmp_path)
438+
for d_type, filename in [
439+
(Diff.ADDED.name, 'nodes-added.mcf'),
440+
(Diff.DELETED.name, 'nodes-deleted.mcf'),
441+
(Diff.MODIFIED.name, 'nodes-modified.mcf'),
442+
]:
443+
obs_nodes = self.convert_diff_to_mcf_nodes(
444+
obs_diff, True, d_type)
445+
schema_nodes = self.convert_diff_to_mcf_nodes(
446+
schema_diff, False, d_type)
447+
type_nodes = obs_nodes + schema_nodes
448+
if type_nodes:
449+
differ_utils.write_mcf_nodes(type_nodes, self.output_path,
450+
filename, tmp_path)
431451

432452
obs_stats = obs_diff[Column.diff_type.name].value_counts().to_dict()
433453
schema_stats = schema_diff[

0 commit comments

Comments
 (0)