Skip to content

Commit 5132cc7

Browse files
committed
[DEV-14668] CSVModel handling arrays, Gold models
1 parent 823ce0e commit 5132cc7

3 files changed

Lines changed: 21 additions & 4 deletions

File tree

brus_backend_common/models/lakehouse_model.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import ast
12
import importlib.util
23
import io
34
import logging
@@ -11,6 +12,7 @@
1112
from datetime import datetime
1213

1314
import deltalake
15+
import numpy as np
1416
import pyarrow as pa
1517
import pandas as pd
1618
import polars as pl
@@ -21,6 +23,7 @@
2123
from pyspark.sql.functions import monotonically_increasing_id
2224
from pyspark.sql.utils import AnalysisException
2325
from pyspark.sql.types import (
26+
ArrayType,
2427
IntegerType,
2528
StructField,
2629
StringType,
@@ -371,8 +374,22 @@ def _recreate_blank_file(self):
371374
self._s3_client.upload_file(blank_csv, self.BUCKET_NAME, self.RELATIVE_CSV_PATH)
372375

373376
def to_pandas_df(self, **kwargs: Any) -> pd.DataFrame | None:
377+
converters = {}
378+
379+
# Convert arrays from csv format to lists
380+
for col in self.STRUCTURE:
381+
if isinstance(col.dataType, ArrayType):
382+
converters[col.name] = ast.literal_eval
383+
374384
# Type Checker struggles with BytesIO and S3 Objects
375-
return pd.read_csv(io.BytesIO(self._s3_object), **kwargs) if self.exists() else None # type: ignore
385+
df = pd.read_csv(io.BytesIO(self._s3_object), converters=converters, **kwargs) if self.exists() else None # type: ignore
386+
387+
# Convert lists in df to np.arrays
388+
for col in self.STRUCTURE:
389+
if isinstance(col.dataType, ArrayType):
390+
df[col.name] = df[col.name].apply(np.array)
391+
392+
return df
376393

377394
def to_polars_df(self, **kwargs: Any) -> pl.DataFrame | pl.Series | None:
378395
return pl.read_csv(self.CSV_PATH, **kwargs) if self.exists() else None
@@ -394,7 +411,7 @@ def save(self, df: pd.DataFrame | pl.DataFrame) -> None:
394411

395412
class LakeHouseCurrentMigration(CSVModel):
396413
BUCKET_NAME = CONFIG.REFERENCE_S3_BUCKET
397-
DATABASE_NAME = LakeHouseDatabase.BRONZE
414+
DATABASE_NAME = LakeHouseDatabase.GOLD
398415
TABLE_NAME = "migrations"
399416
DESCRIPTION = "Keeps track of migrations for all Lakehouse Models"
400417
CSV_NAME = "current_migrations.csv"
@@ -415,7 +432,7 @@ class LakeHouseCurrentMigration(CSVModel):
415432

416433
class ExternalDataLoadDate(CSVModel):
417434
BUCKET_NAME = CONFIG.REFERENCE_S3_BUCKET
418-
DATABASE_NAME = LakeHouseDatabase.BRONZE
435+
DATABASE_NAME = LakeHouseDatabase.GOLD
419436
TABLE_NAME = "external_data_load_date"
420437
DESCRIPTION = "Keeps track of load dates of certain external data Lakehouse models"
421438
CSV_NAME = "external_load_date.csv"

brus_backend_common/tests/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ def temp_file_path():
8989
# Should just be a simple csv creation, upload, and deletion per test
9090
@pytest.fixture(scope="function")
9191
def external_data_load_dates():
92-
raw_eld_model = LAKEHOUSE_MODELS["bronze.external_data_load_date"]()
92+
raw_eld_model = LAKEHOUSE_MODELS["gold.external_data_load_date"]()
9393
s3_client = _get_boto3("client", "s3")
9494

9595
raw_eld_model.initialize(recreate=True)

brus_backend_common/tests/integration/loaders/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)