diff --git a/airflow/dags/dag_sec_json_pipeline.py b/airflow/dags/dag_sec_json_pipeline.py index f5953a2..0303d4a 100644 --- a/airflow/dags/dag_sec_json_pipeline.py +++ b/airflow/dags/dag_sec_json_pipeline.py @@ -458,7 +458,7 @@ def schema_def_json(**kwargs): year = kwargs['dag_run'].conf.get('year') quarter = kwargs['dag_run'].conf.get('quarter').split('Q')[1] return f""" - CREATE OR REPLACE TABLE SEC_JSON_{year}_{quarter} ( + CREATE OR REPLACE TABLE SEC_JSON_{year}_Q{quarter} ( json_data VARIANT ); """ @@ -478,9 +478,68 @@ def copyinto_json(**kwargs): year = kwargs['dag_run'].conf.get('year') quarter = kwargs['dag_run'].conf.get('quarter').split('Q')[1] return f""" - COPY INTO SEC_JSON_{year}_{quarter} + COPY INTO SEC_JSON_{year}_Q{quarter} FROM @sec_json_stage/data/{year}/{quarter}/json/ FILE_FORMAT = (TYPE = 'JSON'); + CREATE OR REPLACE VIEW JSON_BS_{year}_Q{quarter} AS + SELECT + t.json_data:year::NUMBER AS year, + t.json_data:quarter::STRING AS quarter, + t.json_data:country::STRING AS country, + t.json_data:city::STRING AS city, + t.json_data:name::STRING AS company_name, + t.json_data:symbol::STRING AS symbol, + + bs.value:concept::STRING AS concept, + bs.value:info::STRING AS info, + bs.value:label::STRING AS label, + bs.value:unit::STRING AS unit, + bs.value:value::NUMBER AS value + FROM SEC_JSON_{year}_Q{quarter} t, + LATERAL FLATTEN(input => t.json_data:data.bs) bs; + + CREATE OR REPLACE VIEW JSON_CF_{year}_Q{quarter} AS + SELECT + t.json_data:year::NUMBER AS year, + t.json_data:quarter::STRING AS quarter, + t.json_data:country::STRING AS country, + t.json_data:city::STRING AS city, + t.json_data:name::STRING AS company_name, + t.json_data:symbol::STRING AS symbol, + + cf.value:concept::STRING AS concept, + cf.value:info::STRING AS info, + cf.value:label::STRING AS label, + cf.value:unit::STRING AS unit, + cf.value:value::NUMBER AS value + FROM SEC_JSON_{year}_Q{quarter} t, + LATERAL FLATTEN(input => t.json_data:data.cf) cf; + + CREATE OR REPLACE VIEW JSON_IS_{year}_Q{quarter} AS + SELECT + t.json_data:year::NUMBER AS year, + t.json_data:quarter::STRING AS quarter, + t.json_data:country::STRING AS country, + t.json_data:city::STRING AS city, + t.json_data:name::STRING AS company_name, + t.json_data:symbol::STRING AS symbol, + + ic.value:concept::STRING AS concept, + ic.value:info::STRING AS info, + ic.value:label::STRING AS label, + ic.value:unit::STRING AS unit, + ic.value:value::NUMBER AS value + FROM SEC_JSON_{year}_Q{quarter} t, + LATERAL FLATTEN(input => t.json_data:data.ic) ic; + + CREATE OR REPLACE VIEW JSON_FV_{year}_Q{quarter} AS + SELECT *, 'Balance Sheet' AS section_type FROM JSON_BS_{year}_Q{quarter} + UNION ALL + SELECT *, 'Cash Flow' AS section_type FROM JSON_CF_{year}_Q{quarter} + UNION ALL + SELECT *, 'Income Statement' AS section_type FROM JSON_IS_{year}_Q{quarter}; + + """ generate_copy_sql_json = PythonOperator( task_id='generate_copy_sql_json', diff --git a/dockerfile b/dockerfile index 18bc6d1..36eeae2 100644 --- a/dockerfile +++ b/dockerfile @@ -5,17 +5,16 @@ FROM --platform=linux/amd64 python:3.12.8-slim WORKDIR /app # Copy requirements first for layer caching -COPY requirements.txt . +COPY requirement.txt . COPY .env /app/.env # Install dependencies -RUN pip install --no-cache-dir -r requirements.txt +RUN pip install --no-cache-dir -r requirement.txt # Copy the application code COPY ./backend /app/backend # COPY ./frontend /app/frontend -COPY ./features /app/features COPY ./services /app/services # Set environment variables diff --git a/frontend/app.py b/frontend/app.py index 1280aeb..a869408 100644 --- a/frontend/app.py +++ b/frontend/app.py @@ -6,41 +6,10 @@ load_dotenv() # Airflow API endpoint +# AIRFLOW_API_URL = "https://ebaeb7d6-905a-429f-8719-9ff6a3c16313.c67.us-east-1.airflow.amazonaws.com" AIRFLOW_API_URL = "http://localhost:8080" -<<<<<<< HEAD -st.title("SEC Data - Bridge") - - -# # Input fields for year and quarter -year = st.selectbox("Select Year",("2024","2023","2022","2021","2020","2019","2018","2017")) -quarter = st.selectbox("Select Quarter", ("1","2","3","4")) - -if st.button("Fetch Data"): - # Payload for triggering the DAG - payload = { - "conf": { - "year": year, - "quarter": quarter - } - } - dag_id = "sec_data_to_s3_scraper" - AIRFLOW_API_URL = f"http://localhost:8080/api/v1/dags/{dag_id}/dagRuns" - - # Trigger the DAG via Airflow REST API - response = requests.post( - AIRFLOW_API_URL, - json=payload, - auth=(f"{AIRFLOW_USER}", f"{AIRFLOW_PASSCODE}") - ) - - if response.status_code == 200: - st.success("DAG triggered successfully!") - else: - st.error(f"Failed to trigger DAG: {response.text}") -======= -QUERY_API_URL = "http://localhost:8000" ->>>>>>> origin/main +QUERY_API_URL = "https://fastapi-service-7ss2sa6dka-uc.a.run.app" def populate_airflow_page(): # Display the airflow page @@ -118,9 +87,9 @@ def populate_query_page(): st.session_state.flag = True else: st.info(f"No data available for **{source}**, Year: **{year}**, Quarter: **{quarter}**. Trigger the Airflow DAG to fetch data.") - st.write("Query Results:") - st.dataframe(query_executed) - st.success(f"Query executed successfully.") + # st.write("Query Results:") + # st.dataframe(query_executed) + # st.success(f"Query executed successfully.") # Show query input only if data is available if st.session_state.flag: # Text area for query input (persistent using session state) diff --git a/requirement.txt b/requirement.txt index 7e17211..56b168b 100644 --- a/requirement.txt +++ b/requirement.txt @@ -3,14 +3,13 @@ numpy pandas fastapi uvicorn -bs4 requests boto3 -datetime pydantic pathlib -logging -apache-airflow +python-dotenv +sqlalchemy +snowflake-sqlalchemy apache-airflow-providers-snowflake -dbt-core -dbt-snowflake \ No newline at end of file +apache-airflow-providers-amazon +logging \ No newline at end of file