Skip to content
Open
22 changes: 16 additions & 6 deletions .env.template
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ PYTHON_VERSION=3.12.12

# ==== [Local Development] ====
IS_LOCAL=True
PROJECT_LOG_DIR=brus_backend_common/logs
ENV_CODE=local
FAPC=False
TRACE_ENV=unspecified
PROJECT_LOG_DIR=brus_backend_common/logs

# ==== [AWS] ====
# AWS_PROFILE needs to be left defaulted to None in python if not intending to use it. No way to set it to None in the .env file
Expand All @@ -28,13 +30,21 @@ TRACE_ENV=unspecified
AWS_REGION=us-gov-west-1

# ==== [Buckets] ====
DATA_ARCHIVE_BUCKET=
DATA_EXTRACTS_BUCKET=
DATA_SOURCES_BUCKET=
PUBLIC_FILES_BUCKET=
BROKER_EXTERNAL_S3_BUCKET=
BROKER_SUBMISSIONS_S3_BUCKET=
REFERENCE_S3_BUCKET=
USAS_S3_BUCKET=
FPDS_DELETE_BUCKET=
METRICS_BUCKET=
PUBLIC_FILES_BUCKET=
PUBLISHED_BUCKET=
SF133_BUCKET=
SUB_ZIPS_BUCKET=
UNPUBLISHED_BUCKET=

# Lakehouse Buckets
LAKEHOUSE_BROKER_BUCKET=
LAKEHOUSE_REFERENCE_BUCKET=
LAKEHOUSE_USAS_BUCKET=

# ==== [Postgres] ====
DB1_URL=
Expand Down
22 changes: 16 additions & 6 deletions .github/.env.test
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ PYTHON_VERSION=3.12.12

# ==== [Local Development] ====
IS_LOCAL=True
PROJECT_LOG_DIR=brus_backend_common/logs
ENV_CODE=local
FAPC=False
TRACE_ENV=unspecified
PROJECT_LOG_DIR=brus_backend_common/logs

# ==== [AWS] ====
AWS_ACCESS_KEY=minio_user
Expand All @@ -17,13 +19,21 @@ AWS_SECRET_KEY=minio_secret
AWS_REGION=us-gov-west-1

# ==== [Buckets] ====
DATA_ARCHIVE_BUCKET=data-archive-test
DATA_EXTRACTS_BUCKET=data-extracts-test
DATA_SOURCES_BUCKET=data-sources-test
PUBLIC_FILES_BUCKET=public-files-test
BROKER_EXTERNAL_S3_BUCKET=broker-external-test
BROKER_SUBMISSIONS_S3_BUCKET=broker-submissions-test
REFERENCE_S3_BUCKET=reference-test
USAS_S3_BUCKET=usas-test
FPDS_DELETE_BUCKET=fpds-delete-test
METRICS_BUCKET=metrics-test
PUBLIC_FILES_BUCKET=public-files-test
PUBLISHED_BUCKET=published-test
SF133_BUCKET=sf133-test
SUB_ZIPS_BUCKET=sub-zips-test
UNPUBLISHED_BUCKET=unpublished-test

# Lakehouse Buckets
LAKEHOUSE_BROKER_BUCKET=broker-test
LAKEHOUSE_REFERENCE_BUCKET=reference-test
LAKEHOUSE_USAS_BUCKET=usas-test

# ==== [Postgres] ====
DB1_URL=
Expand Down
48 changes: 34 additions & 14 deletions brus_backend_common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ class DefaultConfig(BaseSettings):
Attributes:
# App
IS_LOCAL: Whether it's running locally or remotely
PROJECT_LOG_DIR: where log files will be kept
ENV_CODE: the environment code this is running on
FAPC: whether this is running on FAPC or not
TRACE_ENV: set by deploys to help with tracing
PROJECT_LOG_DIR: where log files will be kept

# AWS
AWS_ACCESS_KEY: The current AWS access key
Expand All @@ -53,13 +55,21 @@ class DefaultConfig(BaseSettings):
AWS_STS_ENDPOINT: (derived) The current AWS S3 endpoint

# Buckets
DATA_SOURCES_BUCKET: The S3 data sources bucket name
PUBLIC_FILES_BUCKET: The S3 public files bucket name
BROKER_EXTERNAL_S3_BUCKET: The S3 broker external bucket name
BROKER_SUBMISSIONS_S3_BUCKET: The S3 broker submissions bucket name
REFERENCE_S3_BUCKET: The S3 reference bucket name
USAS_S3_BUCKET: The S3 USAS bucket name
METRICS_BUCKET: The S3 metrics bucket name
DATA_ARCHIVE_BUCKET: The data archive bucket name
DATA_EXTRACTS_BUCKET: The data extracts bucket name
DATA_SOURCES_BUCKET: The data sources bucket name
FPDS_DELETE_BUCKET: The data sources bucket name
METRICS_BUCKET: The metrics bucket name
PUBLIC_FILES_BUCKET: The public files bucket name
PUBLISHED_BUCKET: The broker published files bucket name
SF133_BUCKET: The sf133 bucket name
SUB_ZIPS_BUCKET: The broker submission zips bucket name
UNPUBLISHED_BUCKET: The broker unpublished files bucket name

# Lakehouse Buckets
LAKEHOUSE_BROKER_BUCKET: The lakehouse Broker bucket name
LAKEHOUSE_REFERENCE_BUCKET: The lakehouse reference bucket name
LAKEHOUSE_USAS_BUCKET: The lakehouse USAS bucket name

# Postgres
DB1_URL: Postgres url to your applications database
Expand Down Expand Up @@ -101,8 +111,10 @@ class Config:

# App
IS_LOCAL: bool = True
PROJECT_LOG_DIR: str = str(_SRC_ROOT_DIR / "logs")
ENV_CODE: str = "local"
FAPC: bool = False
TRACE_ENV: str = ""
PROJECT_LOG_DIR: str = str(_SRC_ROOT_DIR / "logs")

# AWS
AWS_ACCESS_KEY: SecretStr = SecretStr("")
Expand All @@ -119,13 +131,21 @@ def AWS_STS_ENDPOINT(self):
return f"sts.{self.AWS_REGION}.amazonaws.com" if not self.IS_LOCAL else f"{self.MINIO_HOST}:{self.MINIO_PORT}"

# Buckets
DATA_ARCHIVE_BUCKET: str = ""
DATA_EXTRACTS_BUCKET: str = ""
DATA_SOURCES_BUCKET: str = ""
PUBLIC_FILES_BUCKET: str = ""
BROKER_EXTERNAL_S3_BUCKET = ""
BROKER_SUBMISSIONS_S3_BUCKET = ""
REFERENCE_S3_BUCKET = ""
USAS_S3_BUCKET = ""
FPDS_DELETE_BUCKET: str = ""
METRICS_BUCKET: str = ""
PUBLIC_FILES_BUCKET: str = ""
PUBLISHED_BUCKET: str = ""
SF133_BUCKET: str = ""
SUB_ZIPS_BUCKET: str = ""
UNPUBLISHED_BUCKET: str = ""

# Lakehouse Buckets
LAKEHOUSE_BROKER_BUCKET: str = ""
LAKEHOUSE_REFERENCE_BUCKET: str = ""
LAKEHOUSE_USAS_BUCKET: str = ""

# Postgres
DB1_URL: SecretStr = SecretStr("")
Expand Down
11 changes: 5 additions & 6 deletions brus_backend_common/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
from brus_backend_common.models.lakehouse_model import LakeHouseCurrentMigration, ExternalDataLoadDate

from brus_backend_common.models.reference import DEFCSilver, DEFCGroup, DEFCBronze
from brus_backend_common.models.reference import DEFCBronze, DEFCGold, DEFCGroup
from brus_backend_common.config import CONFIG

LAKEHOUSE_BUCKETS = {
"broker-submissions": CONFIG.BROKER_SUBMISSIONS_S3_BUCKET,
"broker-external": CONFIG.BROKER_EXTERNAL_S3_BUCKET,
"reference": CONFIG.REFERENCE_S3_BUCKET,
"usas": CONFIG.USAS_S3_BUCKET,
"broker": CONFIG.LAKEHOUSE_BROKER_BUCKET,
"reference": CONFIG.LAKEHOUSE_REFERENCE_BUCKET,
"usas": CONFIG.LAKEHOUSE_USAS_BUCKET,
}
LAKEHOUSE_BUCKET_NAMES = list(LAKEHOUSE_BUCKETS.values())

LAKEHOUSE_MODEL_CLASSES = [
DEFCBronze,
DEFCGroup,
DEFCSilver,
DEFCGold,
LakeHouseCurrentMigration,
ExternalDataLoadDate,
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
#
# from brus_backend_common.models.lakehouse_model import LakeHouseModel, LakeHouseDatabase
# from brus_backend_common.config import CONFIG
# CONFIG.BROKER_EXTERNAL_S3_BUCKET
# CONFIG.LAKEHOUSE_BROKER_BUCKET
15 changes: 0 additions & 15 deletions brus_backend_common/models/broker_submissions.py

This file was deleted.

27 changes: 22 additions & 5 deletions brus_backend_common/models/lakehouse_model.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import ast
import importlib.util
import io
import logging
Expand All @@ -11,6 +12,7 @@
from datetime import datetime

import deltalake
import numpy as np
import pyarrow as pa
import pandas as pd
import polars as pl
Expand All @@ -21,6 +23,7 @@
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.utils import AnalysisException
from pyspark.sql.types import (
ArrayType,
IntegerType,
StructField,
StringType,
Expand Down Expand Up @@ -371,8 +374,22 @@ def _recreate_blank_file(self):
self._s3_client.upload_file(blank_csv, self.BUCKET_NAME, self.RELATIVE_CSV_PATH)

def to_pandas_df(self, **kwargs: Any) -> pd.DataFrame | None:
converters = {}

# Convert arrays from csv format to lists
for col in self.STRUCTURE:
if isinstance(col.dataType, ArrayType):
converters[col.name] = ast.literal_eval

# Type Checker struggles with BytesIO and S3 Objects
return pd.read_csv(io.BytesIO(self._s3_object), **kwargs) if self.exists() else None # type: ignore
df = pd.read_csv(io.BytesIO(self._s3_object), converters=converters, **kwargs) if self.exists() else None # type: ignore

# Convert lists in df to np.arrays
for col in self.STRUCTURE:
if isinstance(col.dataType, ArrayType):
df[col.name] = df[col.name].apply(np.array)

return df

def to_polars_df(self, **kwargs: Any) -> pl.DataFrame | pl.Series | None:
return pl.read_csv(self.CSV_PATH, **kwargs) if self.exists() else None
Expand All @@ -393,8 +410,8 @@ def save(self, df: pd.DataFrame | pl.DataFrame) -> None:


class LakeHouseCurrentMigration(CSVModel):
BUCKET_NAME = CONFIG.REFERENCE_S3_BUCKET
DATABASE_NAME = LakeHouseDatabase.BRONZE
BUCKET_NAME = CONFIG.LAKEHOUSE_REFERENCE_BUCKET
DATABASE_NAME = LakeHouseDatabase.GOLD
TABLE_NAME = "migrations"
DESCRIPTION = "Keeps track of migrations for all Lakehouse Models"
CSV_NAME = "current_migrations.csv"
Expand All @@ -414,8 +431,8 @@ class LakeHouseCurrentMigration(CSVModel):


class ExternalDataLoadDate(CSVModel):
BUCKET_NAME = CONFIG.REFERENCE_S3_BUCKET
DATABASE_NAME = LakeHouseDatabase.BRONZE
BUCKET_NAME = CONFIG.LAKEHOUSE_REFERENCE_BUCKET
DATABASE_NAME = LakeHouseDatabase.GOLD
TABLE_NAME = "external_data_load_date"
DESCRIPTION = "Keeps track of load dates of certain external data Lakehouse models"
CSV_NAME = "external_load_date.csv"
Expand Down
16 changes: 8 additions & 8 deletions brus_backend_common/models/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
TimestampType,
)
from brus_backend_common.config import CONFIG
from brus_backend_common.models.lakehouse_model import DeltaModel, CSVModel, LakeHouseDatabase
from brus_backend_common.models.lakehouse_model import CSVModel, LakeHouseDatabase


class DEFCBronze(CSVModel):
BUCKET_NAME = CONFIG.REFERENCE_S3_BUCKET
BUCKET_NAME = CONFIG.LAKEHOUSE_REFERENCE_BUCKET
DATABASE_NAME = LakeHouseDatabase.BRONZE
TABLE_NAME = "defc"
DESCRIPTION = "Raw DEFC CSV placed in S3"
Expand All @@ -30,8 +30,8 @@ class DEFCBronze(CSVModel):


class DEFCGroup(CSVModel):
BUCKET_NAME = CONFIG.REFERENCE_S3_BUCKET
DATABASE_NAME = LakeHouseDatabase.SILVER
BUCKET_NAME = CONFIG.LAKEHOUSE_REFERENCE_BUCKET
DATABASE_NAME = LakeHouseDatabase.GOLD
TABLE_NAME = "defc_mapping"
DESCRIPTION = "Internal CSV to dynamically group DEFCs together"
CSV_NAME = "DEFC_MAPPING.csv"
Expand All @@ -47,15 +47,15 @@ class DEFCGroup(CSVModel):
)


class DEFCSilver(DeltaModel):
BUCKET_NAME = CONFIG.REFERENCE_S3_BUCKET
DATABASE_NAME = LakeHouseDatabase.SILVER
class DEFCGold(CSVModel):
BUCKET_NAME = CONFIG.LAKEHOUSE_REFERENCE_BUCKET
DATABASE_NAME = LakeHouseDatabase.GOLD
TABLE_NAME = "defc"
DESCRIPTION = "DEFC data after initial processing"
CSV_NAME = "def_codes.csv"
PK = "defc_id"
UNIQUE_CONSTRAINTS = ["code"]
MIGRATION_HISTORY = []

STRUCTURE = StructType(
[
StructField("created_at", TimestampType(), True),
Expand Down
2 changes: 1 addition & 1 deletion brus_backend_common/models/usas.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
#
# from brus_backend_common.models.lakehouse_model import LakeHouseModel, LakeHouseDatabase
# from brus_backend_common.config import CONFIG
# CONFIG.USAS_S3_BUCKET
# CONFIG.LAKEHOUSE_USAS_BUCKET
Loading
Loading