From 67c0d6ac0303344a12d7a5b0e5cba89f343e80e1 Mon Sep 17 00:00:00 2001 From: Vedant Mane Date: Fri, 14 Feb 2025 02:15:01 -0500 Subject: [PATCH 1/4] requirement file for docker image and environment setup --- requirement.txt | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/requirement.txt b/requirement.txt index a8bb104..f132664 100644 --- a/requirement.txt +++ b/requirement.txt @@ -3,12 +3,12 @@ numpy pandas fastapi uvicorn -bs4 requests boto3 -datetime pydantic pathlib -logging -apache-airflow -apache-airflow-providers-snowflake \ No newline at end of file +python-dotenv +sqlalchemy +snowflake-sqlalchemy +apache-airflow-providers-snowflake +apache-airflow-providers-amazon \ No newline at end of file From b7918fa6362624f868c7711b93492ed04e199392 Mon Sep 17 00:00:00 2001 From: Vedant Mane Date: Fri, 14 Feb 2025 02:16:29 -0500 Subject: [PATCH 2/4] dockerfile for fastapi setup, docker image build and deployment to cloud --- dockerfile | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/dockerfile b/dockerfile index 18bc6d1..36eeae2 100644 --- a/dockerfile +++ b/dockerfile @@ -5,17 +5,16 @@ FROM --platform=linux/amd64 python:3.12.8-slim WORKDIR /app # Copy requirements first for layer caching -COPY requirements.txt . +COPY requirement.txt . COPY .env /app/.env # Install dependencies -RUN pip install --no-cache-dir -r requirements.txt +RUN pip install --no-cache-dir -r requirement.txt # Copy the application code COPY ./backend /app/backend # COPY ./frontend /app/frontend -COPY ./features /app/features COPY ./services /app/services # Set environment variables From 84927c48b8b063a19e3288d8cfae7dfe4525cbc4 Mon Sep 17 00:00:00 2001 From: Vedant Mane Date: Fri, 14 Feb 2025 02:17:27 -0500 Subject: [PATCH 3/4] Updated JSON file to create views in Snowflake --- airflow/dags/dag_sec_json_pipeline.py | 63 ++++++++++++++++++++++++++- 1 file changed, 61 insertions(+), 2 deletions(-) diff --git a/airflow/dags/dag_sec_json_pipeline.py b/airflow/dags/dag_sec_json_pipeline.py index f5953a2..0303d4a 100644 --- a/airflow/dags/dag_sec_json_pipeline.py +++ b/airflow/dags/dag_sec_json_pipeline.py @@ -458,7 +458,7 @@ def schema_def_json(**kwargs): year = kwargs['dag_run'].conf.get('year') quarter = kwargs['dag_run'].conf.get('quarter').split('Q')[1] return f""" - CREATE OR REPLACE TABLE SEC_JSON_{year}_{quarter} ( + CREATE OR REPLACE TABLE SEC_JSON_{year}_Q{quarter} ( json_data VARIANT ); """ @@ -478,9 +478,68 @@ def copyinto_json(**kwargs): year = kwargs['dag_run'].conf.get('year') quarter = kwargs['dag_run'].conf.get('quarter').split('Q')[1] return f""" - COPY INTO SEC_JSON_{year}_{quarter} + COPY INTO SEC_JSON_{year}_Q{quarter} FROM @sec_json_stage/data/{year}/{quarter}/json/ FILE_FORMAT = (TYPE = 'JSON'); + CREATE OR REPLACE VIEW JSON_BS_{year}_Q{quarter} AS + SELECT + t.json_data:year::NUMBER AS year, + t.json_data:quarter::STRING AS quarter, + t.json_data:country::STRING AS country, + t.json_data:city::STRING AS city, + t.json_data:name::STRING AS company_name, + t.json_data:symbol::STRING AS symbol, + + bs.value:concept::STRING AS concept, + bs.value:info::STRING AS info, + bs.value:label::STRING AS label, + bs.value:unit::STRING AS unit, + bs.value:value::NUMBER AS value + FROM SEC_JSON_{year}_Q{quarter} t, + LATERAL FLATTEN(input => t.json_data:data.bs) bs; + + CREATE OR REPLACE VIEW JSON_CF_{year}_Q{quarter} AS + SELECT + t.json_data:year::NUMBER AS year, + t.json_data:quarter::STRING AS quarter, + t.json_data:country::STRING AS country, + t.json_data:city::STRING AS city, + t.json_data:name::STRING AS company_name, + t.json_data:symbol::STRING AS symbol, + + cf.value:concept::STRING AS concept, + cf.value:info::STRING AS info, + cf.value:label::STRING AS label, + cf.value:unit::STRING AS unit, + cf.value:value::NUMBER AS value + FROM SEC_JSON_{year}_Q{quarter} t, + LATERAL FLATTEN(input => t.json_data:data.cf) cf; + + CREATE OR REPLACE VIEW JSON_IS_{year}_Q{quarter} AS + SELECT + t.json_data:year::NUMBER AS year, + t.json_data:quarter::STRING AS quarter, + t.json_data:country::STRING AS country, + t.json_data:city::STRING AS city, + t.json_data:name::STRING AS company_name, + t.json_data:symbol::STRING AS symbol, + + ic.value:concept::STRING AS concept, + ic.value:info::STRING AS info, + ic.value:label::STRING AS label, + ic.value:unit::STRING AS unit, + ic.value:value::NUMBER AS value + FROM SEC_JSON_{year}_Q{quarter} t, + LATERAL FLATTEN(input => t.json_data:data.ic) ic; + + CREATE OR REPLACE VIEW JSON_FV_{year}_Q{quarter} AS + SELECT *, 'Balance Sheet' AS section_type FROM JSON_BS_{year}_Q{quarter} + UNION ALL + SELECT *, 'Cash Flow' AS section_type FROM JSON_CF_{year}_Q{quarter} + UNION ALL + SELECT *, 'Income Statement' AS section_type FROM JSON_IS_{year}_Q{quarter}; + + """ generate_copy_sql_json = PythonOperator( task_id='generate_copy_sql_json', From e1476b58c02ad85845021a1b9f0baccea1dea6ba Mon Sep 17 00:00:00 2001 From: Vedant Mane Date: Fri, 14 Feb 2025 02:17:57 -0500 Subject: [PATCH 4/4] Updated FastAPI URL in Streamlit --- frontend/app.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/frontend/app.py b/frontend/app.py index 18575e5..b7e37c6 100644 --- a/frontend/app.py +++ b/frontend/app.py @@ -5,9 +5,10 @@ load_dotenv() # Airflow API endpoint +# AIRFLOW_API_URL = "https://ebaeb7d6-905a-429f-8719-9ff6a3c16313.c67.us-east-1.airflow.amazonaws.com" AIRFLOW_API_URL = "http://localhost:8080" -QUERY_API_URL = "http://localhost:8000" +QUERY_API_URL = "https://fastapi-service-7ss2sa6dka-uc.a.run.app" def populate_airflow_page(): # Display the airflow page @@ -85,9 +86,9 @@ def populate_query_page(): st.session_state.flag = True else: st.info(f"No data available for **{source}**, Year: **{year}**, Quarter: **{quarter}**. Trigger the Airflow DAG to fetch data.") - st.write("Query Results:") - st.dataframe(query_executed) - st.success(f"Query executed successfully.") + # st.write("Query Results:") + # st.dataframe(query_executed) + # st.success(f"Query executed successfully.") # Show query input only if data is available if st.session_state.flag: # Text area for query input (persistent using session state)