1515import dash
1616import dash_bootstrap_components as dbc
1717import dash_leaflet as dl
18- import duckdb
1918import pandas as pd
2019import plotly .graph_objects as go
2120from dash import Input , Output , callback , dcc , html
4039TILE_URL = "https://server.arcgisonline.com/ArcGIS/rest/services/World_Imagery/MapServer/tile/{z}/{y}/{x}"
4140TILE_ATTRIBUTION = "Esri, Maxar, Earthstar Geographics"
4241
43- # ── DuckDB connection ──────────────────────────────────────────────────────────
44-
45- def make_conn ():
46- conn = duckdb .connect ()
47- # No spatial extension — dashboard only reads scalar columns from parquet
48- conn .execute (f"""
49- CREATE SECRET IF NOT EXISTS s3_secret (
50- TYPE s3,
51- KEY_ID '{ KEY_ID } ',
52- SECRET '{ KEY_SECRET } ',
53- REGION '{ REGION } '
54- );
55- """ )
56- return conn
57-
58- try :
59- CONN = make_conn ()
60- print (f"DuckDB ready. bucket={ BUCKET } region={ REGION } key={ KEY_ID [:6 ]} ..." )
61- except Exception as e :
62- print (f"DuckDB init error: { e } " )
63- CONN = None
64-
6542# ── Data helpers ───────────────────────────────────────────────────────────────
6643
6744def s3 ():
@@ -79,63 +56,80 @@ def load_aois():
7956 return {}
8057
8158def read_ts (country , aoi_name ):
82- glob = f"s3://{ BUCKET } /{ country } /{ aoi_name } /ts/*.parquet"
59+ """Read time series parquet files from S3 using pyarrow.
60+ Uses pyarrow directly because the parquet files contain a geopandas
61+ geometry column that DuckDB cannot parse without the spatial extension.
62+ """
63+ import io
64+ import pyarrow .parquet as pq
65+ prefix = f"{ country } /{ aoi_name } /ts/"
8366 try :
84- df = CONN .execute (f"""
85- SELECT time, ndvi, bsi, ndmi, nbr
86- FROM read_parquet('{ glob } ',
87- union_by_name=true,
88- hive_partitioning=false)
89- WHERE aoi_name = '{ aoi_name } '
90- AND time > '2018-01-01'
91- ORDER BY time
92- """ ).df ()
67+ s3_client = boto3 .client ("s3" )
68+ resp = s3_client .list_objects_v2 (Bucket = BUCKET , Prefix = prefix )
69+ if "Contents" not in resp :
70+ print (f"read_ts: no files found at { prefix } " )
71+ return pd .DataFrame ()
72+ dfs = []
73+ for obj in resp ["Contents" ]:
74+ if not obj ["Key" ].endswith (".parquet" ):
75+ continue
76+ buf = io .BytesIO (
77+ s3_client .get_object (Bucket = BUCKET , Key = obj ["Key" ])["Body" ].read ()
78+ )
79+ tbl = pq .read_table (buf , columns = ["time" , "ndvi" , "bsi" , "ndmi" , "nbr" , "aoi_name" ])
80+ dfs .append (tbl .to_pandas ())
81+ if not dfs :
82+ return pd .DataFrame ()
83+ df = pd .concat (dfs , ignore_index = True )
84+ df = (df [df ["aoi_name" ] == aoi_name ]
85+ .query ("time > '2018-01-01'" )
86+ .sort_values ("time" )
87+ .reset_index (drop = True ))
9388 df ["time" ] = pd .to_datetime (df ["time" ])
9489 print (f"read_ts OK: { df .shape } " )
95- return df
90+ return df [[ "time" , "ndvi" , "bsi" , "ndmi" , "nbr" ]]
9691 except Exception as e :
9792 print (f"read_ts error: { e } " )
9893 traceback .print_exc ()
99- # Fallback: use boto3 + pyarrow to read parquet directly
100- try :
101- import boto3 , io , pyarrow .parquet as pq
102- s3 = boto3 .client ("s3" )
103- prefix = f"{ country } /{ aoi_name } /ts/"
104- resp = s3 .list_objects_v2 (Bucket = BUCKET , Prefix = prefix )
105- dfs = []
106- for obj in resp .get ("Contents" , []):
107- if obj ["Key" ].endswith (".parquet" ):
108- buf = io .BytesIO (s3 .get_object (Bucket = BUCKET , Key = obj ["Key" ])["Body" ].read ())
109- tbl = pq .read_table (buf , columns = ["time" ,"ndvi" ,"bsi" ,"ndmi" ,"nbr" ,"aoi_name" ])
110- dfs .append (tbl .to_pandas ())
111- if not dfs :
112- return pd .DataFrame ()
113- df = pd .concat (dfs )
114- df = df [df ["aoi_name" ] == aoi_name ].query ("time > '2018-01-01'" ).sort_values ("time" )
115- df ["time" ] = pd .to_datetime (df ["time" ])
116- print (f"read_ts fallback OK: { df .shape } " )
117- return df [["time" ,"ndvi" ,"bsi" ,"ndmi" ,"nbr" ]]
118- except Exception as e2 :
119- print (f"read_ts fallback error: { e2 } " )
120- return pd .DataFrame ()
94+ return pd .DataFrame ()
12195
12296def read_forecasts (country , aoi_name ):
123- glob = f"s3://{ BUCKET } /{ country } /{ aoi_name } /ml/forecast_{ aoi_name } _*.parquet"
97+ """Read latest forecast parquet from S3 using pyarrow."""
98+ import io
99+ import pyarrow .parquet as pq
100+ prefix = f"{ country } /{ aoi_name } /ml/"
124101 try :
125- df = CONN .execute (f"""
126- SELECT unique_id, ds, XGBRegressor
127- FROM read_parquet('{ glob } ')
128- WHERE aoi_name = '{ aoi_name } '
129- AND forecast_date = (
130- SELECT MAX(forecast_date)
131- FROM read_parquet('{ glob } ')
132- WHERE aoi_name = '{ aoi_name } '
102+ s3_client = boto3 .client ("s3" )
103+ resp = s3_client .list_objects_v2 (Bucket = BUCKET , Prefix = prefix )
104+ if "Contents" not in resp :
105+ print (f"read_forecasts: no files found at { prefix } " )
106+ return pd .DataFrame ()
107+ # find all forecast parquet files
108+ fc_keys = [o ["Key" ] for o in resp ["Contents" ]
109+ if o ["Key" ].endswith (".parquet" )
110+ and f"forecast_{ aoi_name } _" in o ["Key" ]]
111+ if not fc_keys :
112+ return pd .DataFrame ()
113+ dfs = []
114+ for key in fc_keys :
115+ buf = io .BytesIO (
116+ s3_client .get_object (Bucket = BUCKET , Key = key )["Body" ].read ()
133117 )
134- ORDER BY unique_id, ds
135- """ ).df ()
118+ tbl = pq .read_table (buf , columns = ["unique_id" , "ds" , "XGBRegressor" ,
119+ "forecast_date" , "aoi_name" ])
120+ dfs .append (tbl .to_pandas ())
121+ if not dfs :
122+ return pd .DataFrame ()
123+ df = pd .concat (dfs , ignore_index = True )
124+ df = df [df ["aoi_name" ] == aoi_name ]
125+ # keep only the latest forecast date
126+ latest = df ["forecast_date" ].max ()
127+ df = (df [df ["forecast_date" ] == latest ]
128+ .sort_values (["unique_id" , "ds" ])
129+ .reset_index (drop = True ))
136130 df ["ds" ] = pd .to_datetime (df ["ds" ])
137- print (f"read_forecasts OK: { df .shape } " )
138- return df
131+ print (f"read_forecasts OK: { df .shape } (forecast_date= { latest } ) " )
132+ return df [[ "unique_id" , "ds" , "XGBRegressor" ]]
139133 except Exception as e :
140134 print (f"read_forecasts error: { e } " )
141135 traceback .print_exc ()
0 commit comments