Skip to content

Commit a19b5c1

Browse files
committed
[DEV-14868] DEFCGold to CSVModel
1 parent 5132cc7 commit a19b5c1

4 files changed

Lines changed: 122 additions & 128 deletions

File tree

brus_backend_common/models/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from brus_backend_common.models.lakehouse_model import LakeHouseCurrentMigration, ExternalDataLoadDate
22

3-
from brus_backend_common.models.reference import DEFCSilver, DEFCGroup, DEFCBronze
3+
from brus_backend_common.models.reference import DEFCBronze, DEFCGold, DEFCGroup
44
from brus_backend_common.config import CONFIG
55

66
LAKEHOUSE_BUCKETS = {
@@ -13,7 +13,7 @@
1313
LAKEHOUSE_MODEL_CLASSES = [
1414
DEFCBronze,
1515
DEFCGroup,
16-
DEFCSilver,
16+
DEFCGold,
1717
LakeHouseCurrentMigration,
1818
ExternalDataLoadDate,
1919
]

brus_backend_common/models/reference.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
TimestampType,
99
)
1010
from brus_backend_common.config import CONFIG
11-
from brus_backend_common.models.lakehouse_model import DeltaModel, CSVModel, LakeHouseDatabase
11+
from brus_backend_common.models.lakehouse_model import CSVModel, LakeHouseDatabase
1212

1313

1414
class DEFCBronze(CSVModel):
@@ -31,7 +31,7 @@ class DEFCBronze(CSVModel):
3131

3232
class DEFCGroup(CSVModel):
3333
BUCKET_NAME = CONFIG.REFERENCE_S3_BUCKET
34-
DATABASE_NAME = LakeHouseDatabase.SILVER
34+
DATABASE_NAME = LakeHouseDatabase.GOLD
3535
TABLE_NAME = "defc_mapping"
3636
DESCRIPTION = "Internal CSV to dynamically group DEFCs together"
3737
CSV_NAME = "DEFC_MAPPING.csv"
@@ -47,15 +47,15 @@ class DEFCGroup(CSVModel):
4747
)
4848

4949

50-
class DEFCSilver(DeltaModel):
50+
class DEFCGold(CSVModel):
5151
BUCKET_NAME = CONFIG.REFERENCE_S3_BUCKET
52-
DATABASE_NAME = LakeHouseDatabase.SILVER
52+
DATABASE_NAME = LakeHouseDatabase.GOLD
5353
TABLE_NAME = "defc"
5454
DESCRIPTION = "DEFC data after initial processing"
55+
CSV_NAME = "def_codes.csv"
5556
PK = "defc_id"
5657
UNIQUE_CONSTRAINTS = ["code"]
5758
MIGRATION_HISTORY = []
58-
5959
STRUCTURE = StructType(
6060
[
6161
StructField("created_at", TimestampType(), True),

brus_backend_common/scripts/loaders/load_defc.py renamed to brus_backend_common/scripts/loaders/defc_gold.py

Lines changed: 99 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,9 @@
88
import re
99
from datetime import datetime
1010

11-
from brus_backend_common.models import DEFCBronze, DEFCSilver, DEFCGroup
11+
from brus_backend_common.models import DEFCBronze, DEFCGroup, DEFCGold
1212
from brus_backend_common.models.lakehouse_model import update_external_data_load_date
1313
from brus_backend_common.helpers.aws import _get_boto3
14-
from brus_backend_common.helpers.spark import SparkScriptSession
1514
from brus_backend_common.helpers.pandas import check_dataframe_diff
1615
from brus_backend_common.helpers.scripts import (
1716
clean_data,
@@ -203,107 +202,106 @@ def main(local_file: str | None = None, force_reload: bool = False, metrics_json
203202

204203
s3 = _get_boto3("client", "s3")
205204

206-
with SparkScriptSession() as spark:
207-
raw_model = DEFCBronze()
208-
if not raw_model.exists():
209-
raise ValueError(f"{raw_model.TABLE_REF} doesn't exist. Use create_migrate_delta_table beforehand.")
210-
211-
group_model = DEFCGroup()
212-
if not group_model.exists():
213-
raise ValueError(f"{group_model.TABLE_REF} doesn't exist. Use create_migrate_delta_table beforehand.")
214-
215-
int_model = DEFCSilver(spark=spark)
216-
if not int_model.exists():
217-
raise ValueError(f"{int_model.TABLE_REF} doesn't exist. Use create_migrate_delta_table beforehand.")
218-
219-
start_time = datetime.now()
220-
metrics_json["start_time"] = str(start_time)
221-
222-
logger.info("Parsing DEFC data")
223-
try:
224-
if not local_file:
225-
raw_data = raw_model.to_pandas_df(dtype=str, na_filter=False)
226-
else:
227-
raw_data = pd.read_csv(local_file, dtype=str, na_filter=False)
228-
except pd.errors.EmptyDataError:
229-
metrics_json["blank_file"] = True
230-
metrics_json["exit_code"] = 4 # exit code chosen arbitrarily, to indicate distinct failure states
231-
return metrics_json
232-
headers = set([header.upper() for header in list(raw_data)])
233-
234-
if not VALID_HEADERS.issubset(headers):
235-
logger.error("Missing required headers. Required headers include: %s" % str(VALID_HEADERS))
236-
metrics_json["exit_code"] = 4
237-
return metrics_json
238-
metrics_json["records_received"] = len(raw_data)
239-
# Creating a dataframe of the export csv first and then copying columns to match the database
240-
raw_data = raw_data.rename(columns={"DEFC_CODE": "DEFC", "DEFC_TITLE": "Public Law"})
241-
242-
group_model_df = group_model.to_pandas_df()
243-
group_mapping = group_model_df.groupby("group")["code"].agg(list).to_dict()
244-
245-
raw_data = apply_defc_derivations(raw_data, group_mapping)
246-
247-
raw_data = add_defc_outliers(raw_data, group_mapping)
248-
249-
# Clear any lingering np.nan's
250-
raw_data = raw_data.replace({np.nan: None})
251-
252-
logger.info("Checking for differences in DEFC data")
253-
defc_mapping = {
254-
"defc": "code",
255-
"public_law": "public_laws",
256-
"public_law_short_title": "public_law_short_titles",
257-
"group_name": "group",
258-
"urls": "urls",
259-
"is_valid": "is_valid",
260-
"earliest_public_law_enactment_date": "earliest_pl_action_date",
261-
}
262-
data = clean_data(raw_data, defc_mapping, {})
263-
diff_found = check_dataframe_diff(data, int_model.to_pandas_df(), ["defc_id"], ["code"], date_format="%Y-%m-%d")
264-
if force_reload or diff_found:
265-
266-
# The only diff should be whenever a new code is added. Noting it here
267-
if diff_found:
268-
incoming_defcs = list(data["code"])
269-
curr_defcs = list(int_model.to_pandas_df()["code"])
270-
diff_defcs = list(set(incoming_defcs) - set(curr_defcs))
271-
metrics_json["new_defc"] = diff_defcs
272-
logger.info(f"Difference found: {diff_defcs}")
273-
274-
logger.info("Overwriting new DEFC data to Broker")
275-
int_model.save(data)
276-
277-
update_external_data_load_date(int_model, start_time, datetime.now())
278-
logger.info("{} records inserted to DEFC".format(len(data)))
279-
280-
# convert the arrays to pipe-delimited strings
281-
defc_delim = "|"
282-
array_cols = ["Public Law", "Public Law Short Title", "URLs"]
283-
for array_col in array_cols:
284-
raw_data[array_col] = raw_data[array_col].apply(lambda value: defc_delim.join(value))
285-
286-
header_order = [
287-
"DEFC",
288-
"Public Law",
289-
"Public Law Short Title",
290-
"Group Name",
291-
"URLs",
292-
"Is Valid",
293-
"Earliest Public Law Enactment Date",
294-
]
295-
raw_data = raw_data[header_order]
296-
export_name = "def_codes.csv"
297-
logger.info("Exporting loaded DEFC file to {}".format(export_name))
298-
raw_data.to_csv(export_name, index=0)
299-
300-
s3.upload_file(export_name, CONFIG.PUBLIC_FILES_BUCKET, export_name)
301-
302-
os.remove(export_name)
205+
raw_model = DEFCBronze()
206+
if not raw_model.exists():
207+
raise ValueError(f"{raw_model.TABLE_REF} doesn't exist. Use create_migrate_delta_table beforehand.")
208+
209+
group_model = DEFCGroup()
210+
if not group_model.exists():
211+
raise ValueError(f"{group_model.TABLE_REF} doesn't exist. Use create_migrate_delta_table beforehand.")
212+
213+
gold_model = DEFCGold()
214+
if not gold_model.exists():
215+
raise ValueError(f"{gold_model.TABLE_REF} doesn't exist. Use create_migrate_delta_table beforehand.")
216+
217+
start_time = datetime.now()
218+
metrics_json["start_time"] = str(start_time)
219+
220+
logger.info("Parsing DEFC data")
221+
try:
222+
if not local_file:
223+
raw_data = raw_model.to_pandas_df(dtype=str, na_filter=False)
303224
else:
304-
logger.info("No differences found, skipping defc table reload.")
225+
raw_data = pd.read_csv(local_file, dtype=str, na_filter=False)
226+
except pd.errors.EmptyDataError:
227+
metrics_json["blank_file"] = True
228+
metrics_json["exit_code"] = 4 # exit code chosen arbitrarily, to indicate distinct failure states
229+
return metrics_json
230+
headers = set([header.upper() for header in list(raw_data)])
231+
232+
if not VALID_HEADERS.issubset(headers):
233+
logger.error("Missing required headers. Required headers include: %s" % str(VALID_HEADERS))
234+
metrics_json["exit_code"] = 4
235+
return metrics_json
236+
metrics_json["records_received"] = len(raw_data)
237+
# Creating a dataframe of the export csv first and then copying columns to match the database
238+
raw_data = raw_data.rename(columns={"DEFC_CODE": "DEFC", "DEFC_TITLE": "Public Law"})
239+
240+
group_model_df = group_model.to_pandas_df()
241+
group_mapping = group_model_df.groupby("group")["code"].agg(list).to_dict()
242+
243+
raw_data = apply_defc_derivations(raw_data, group_mapping)
244+
245+
raw_data = add_defc_outliers(raw_data, group_mapping)
246+
247+
# Clear any lingering np.nan's
248+
raw_data = raw_data.replace({np.nan: None})
249+
250+
logger.info("Checking for differences in DEFC data")
251+
defc_mapping = {
252+
"defc": "code",
253+
"public_law": "public_laws",
254+
"public_law_short_title": "public_law_short_titles",
255+
"group_name": "group",
256+
"urls": "urls",
257+
"is_valid": "is_valid",
258+
"earliest_public_law_enactment_date": "earliest_pl_action_date",
259+
}
260+
data = clean_data(raw_data, defc_mapping, {})
261+
diff_found = check_dataframe_diff(data, gold_model.to_pandas_df(), ["defc_id"], ["code"], date_format="%Y-%m-%d")
262+
if force_reload or diff_found:
263+
264+
# The only diff should be whenever a new code is added. Noting it here
265+
if diff_found:
266+
incoming_defcs = list(data["code"])
267+
curr_defcs = list(gold_model.to_pandas_df()["code"])
268+
diff_defcs = list(set(incoming_defcs) - set(curr_defcs))
269+
metrics_json["new_defc"] = diff_defcs
270+
logger.info(f"Difference found: {diff_defcs}")
271+
272+
logger.info("Overwriting new DEFC data to Broker")
273+
gold_model.save(data)
274+
275+
update_external_data_load_date(gold_model, start_time, datetime.now())
276+
logger.info("{} records inserted to DEFC".format(len(data)))
277+
278+
# convert the arrays to pipe-delimited strings
279+
defc_delim = "|"
280+
array_cols = ["Public Law", "Public Law Short Title", "URLs"]
281+
for array_col in array_cols:
282+
raw_data[array_col] = raw_data[array_col].apply(lambda value: defc_delim.join(value))
283+
284+
header_order = [
285+
"DEFC",
286+
"Public Law",
287+
"Public Law Short Title",
288+
"Group Name",
289+
"URLs",
290+
"Is Valid",
291+
"Earliest Public Law Enactment Date",
292+
]
293+
raw_data = raw_data[header_order]
294+
export_name = "def_codes.csv"
295+
logger.info("Exporting loaded DEFC file to {}".format(export_name))
296+
raw_data.to_csv(export_name, index=0)
297+
298+
s3.upload_file(export_name, CONFIG.PUBLIC_FILES_BUCKET, export_name)
299+
300+
os.remove(export_name)
301+
else:
302+
logger.info("No differences found, skipping defc table reload.")
305303

306-
total_defc_count = int_model.count()
304+
total_defc_count = gold_model.count()
307305

308306
metrics_json["total_defc_count"] = total_defc_count
309307

brus_backend_common/tests/integration/test_load_defc.py renamed to brus_backend_common/tests/integration/loaders/test_defc_gold.py

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
import os
22
import pytest
3-
from typing import Generator, List
4-
5-
from pyspark.sql import SparkSession
3+
from typing import List
64

75
from brus_backend_common.config import _SRC_ROOT_DIR
86
from brus_backend_common.helpers.aws import _get_boto3
9-
from brus_backend_common.scripts.loaders import load_defc
7+
from brus_backend_common.scripts.loaders import defc_gold
108
from brus_backend_common.models import LAKEHOUSE_MODELS
119

1210

@@ -26,7 +24,7 @@ def raw_defc_file():
2624
@pytest.fixture(scope="function")
2725
def raw_defc_mapping_file():
2826
# Mimic placing the raw DEFC mapping file in the expected location (directly or copied from another bucket)
29-
defc_mapping_model = LAKEHOUSE_MODELS["silver.defc_mapping"]()
27+
defc_mapping_model = LAKEHOUSE_MODELS["gold.defc_mapping"]()
3028
s3_client = _get_boto3("client", "s3")
3129
csv_file_path = os.path.join(_SRC_ROOT_DIR, "tests", "integration", "data", "defc_groups.csv")
3230
s3_client.upload_file(csv_file_path, defc_mapping_model.BUCKET_NAME, defc_mapping_model.RELATIVE_CSV_PATH)
@@ -39,42 +37,40 @@ def raw_defc_mapping_file():
3937
def test_load_defc(
4038
raw_defc_file: str,
4139
raw_defc_mapping_file: str,
42-
spark: SparkSession,
4340
setup_teardown_buckets: List[str],
44-
hive_unittest_metastore_db: Generator[str | None, None, None],
4541
external_data_load_dates: str,
4642
):
47-
# RAW DEFC
48-
raw_defc_model = LAKEHOUSE_MODELS["bronze.defc"]()
43+
# Bronze DEFC
44+
defc_bronze_model = LAKEHOUSE_MODELS["bronze.defc"]()
4945

50-
assert raw_defc_model.exists()
51-
df = raw_defc_model.to_pandas_df()
46+
assert defc_bronze_model.exists()
47+
df = defc_bronze_model.to_pandas_df()
5248
assert df is not None and not df.empty
5349
assert df.loc[df["DEFC_CODE"] == "S", "DEFC_TITLE"].values[0] == "Disaster PL 116-260"
5450

5551
# DEFC Mapping
56-
defc_mapping_model = LAKEHOUSE_MODELS["silver.defc_mapping"]()
52+
defc_mapping_model = LAKEHOUSE_MODELS["gold.defc_mapping"]()
5753

5854
assert defc_mapping_model.exists()
5955
df = defc_mapping_model.to_pandas_df()
6056
assert df is not None and not df.empty
6157
assert df.loc[df["code"] == "L", "group"].values[0] == "covid_19"
6258

63-
# INT DEFC
64-
int_defc_model = LAKEHOUSE_MODELS["silver.defc"](spark=spark)
65-
int_defc_model.initialize()
59+
# Gold DEFC
60+
defc_gold_model = LAKEHOUSE_MODELS["gold.defc"]()
61+
defc_gold_model.initialize(recreate=True)
6662

67-
load_defc.main()
63+
defc_gold.main()
6864

69-
assert int_defc_model.exists()
70-
df = int_defc_model.to_pandas_df()
65+
assert defc_gold_model.exists()
66+
df = defc_gold_model.to_pandas_df()
7167
assert df is not None and not df.empty
7268
assert df.loc[df["code"] == "L", "public_laws"].values[0] == "Emergency P.L. 116-123"
7369

7470
# Confirming the external load date was updated
75-
edld_model = LAKEHOUSE_MODELS["bronze.external_data_load_date"]()
71+
edld_model = LAKEHOUSE_MODELS["gold.external_data_load_date"]()
7672

7773
assert edld_model.exists()
7874
df = edld_model.to_pandas_df()
7975
assert df is not None and not df.empty
80-
assert not df.loc[df["name"] == "silver.defc"].empty
76+
assert not df.loc[df["name"] == "gold.defc"].empty

0 commit comments

Comments
 (0)