Skip to content

Commit e945b3d

Browse files
committed
fix: add ASOS startup backfill + ingest audit notes
- daemon.rs: add get_asos_locations(), check_asos_staleness(), pub backfill_asos_station() - main.rs: check ASOS staleness on startup; backfill stale/empty stations (30 days) mirrors existing USGS + CWMS startup backfill pattern - TODO.md: document accurate ingest status and 7 known gaps found in audit
1 parent 68f8d37 commit e945b3d

3 files changed

Lines changed: 88 additions & 5 deletions

File tree

TODO.md

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@
55
## ✅ Completed
66

77
- Zone-based HTTP API (`/zones`, `/zone/{id}`, `/status`, `/backwater`, `/health`)
8-
- USGS NWIS ingest (real-time IV + 87 years DV historical backfill)
9-
- USACE CWMS ingest with runtime catalog discovery
10-
- IEM/ASOS precipitation ingest with basin assignment
8+
- USGS NWIS ingest (real-time IV every 15min + startup backfill up to 120 days IV)
9+
- USACE CWMS ingest with runtime catalog discovery (pool, tailwater, stage)
10+
- IEM/ASOS precipitation ingest with basin assignment (hourly, all weather fields)
1111
- 7-zone hydrological model (Illinois River reference implementation)
1212
- Flood type classification (top-down, bottom-up, local tributary, compound)
1313
- Backwater detection (Grafton stage + LaGrange pool/tailwater differential)
@@ -21,9 +21,19 @@
2121
- Integration test suites (ASOS, USGS, CWMS, daemon lifecycle, peak flow)
2222
- Dockerfile healthcheck (curl-based)
2323

24+
## Ingest Bugs & Known Gaps
25+
26+
- **ASOS: no startup backfill**`backfill_asos_station()` is defined in `daemon.rs` but never called from `main.rs`; ASOS data only accumulates forward from first run; fix: add ASOS staleness check + backfill call in `main.rs` startup sequence alongside USGS/CWMS
27+
- **USGS: DV history not fetched**`backfill_days: 120` (default) only fetches IV data; to get years of daily history, increase `backfill_days > 120` or add a one-time historical ingest CLI subcommand; the "87 years" history claim requires a manual config change or separate tool
28+
- **CWMS staleness check uses `location_id`**`check_cwms_staleness()` queries all timeseries for a location in aggregate; if pool has fresh data but tailwater is empty, tailwater won't get backfilled on startup
29+
- **`asos_precip_summary` table is never populated** — schema defines 6h/12h/24h/48h pre-computed rolling totals, but no code ever inserts into it; `/zone/{id}` now correctly sums raw `asos_observations` directly instead
30+
- **`IemAsosMinute` (1-min precip) struct unused** — the 1-minute ASOS data structure exists in `ingest/iem.rs` but `fetch_recent_precip` only fetches hourly data; no 1-min poll path exists in the daemon
31+
- **ASOS poll ignores priority intervals** — all ASOS stations (PRIMARY through EXTENDED) are polled every 15min; the `poll_interval_minutes` field from priority is stored in DB but the daemon doesn't respect it
32+
- **USGS only fetches 2 parameters** — hardcoded `["00060", "00065"]` (discharge + stage); stations may also report temperature (00010), conductance (00095), pH (00400) — not currently ingested
33+
2434
---
2535

26-
## Generic Multi-Waterway Architecture
36+
2737

2838
- `waterway.toml` — top-level deployment config: `waterway_id`, display name, bounding box, datum, timezone
2939
- Dynamic zone deserialization — replace hardcoded `ZoneCollection { zone_0…zone_6 }` with `HashMap<String, Zone>`

flomon_service/src/daemon.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,35 @@ impl Daemon {
210210
&self.cwms_locations
211211
}
212212

213+
/// Get reference to loaded ASOS locations
214+
pub fn get_asos_locations(&self) -> &[AsosLocation] {
215+
&self.asos_locations
216+
}
217+
218+
/// Check staleness of ASOS data for a specific station
219+
pub fn check_asos_staleness(&mut self, station_id: &str) -> Result<Option<Duration>, Box<dyn Error>> {
220+
let client = self.client.as_mut()
221+
.ok_or("Daemon not initialized")?;
222+
223+
let rows = client.query(
224+
"SELECT MAX(observation_time) as latest
225+
FROM asos_observations
226+
WHERE station_id = $1",
227+
&[&station_id]
228+
)?;
229+
230+
if rows.is_empty() {
231+
return Ok(None);
232+
}
233+
234+
let latest: Option<DateTime<Utc>> = rows[0].get(0);
235+
236+
match latest {
237+
Some(dt) => Ok(Some(Utc::now() - dt)),
238+
None => Ok(None),
239+
}
240+
}
241+
213242
/// Check staleness of data for a specific station
214243
pub fn check_staleness(&mut self, site_code: &str) -> Result<Option<Duration>, Box<dyn Error>> {
215244
let client = self.client.as_mut()
@@ -683,7 +712,7 @@ impl Daemon {
683712
}
684713

685714
/// Backfill ASOS historical data for a station
686-
fn backfill_asos_station(&mut self, station_id: &str, days: i64) -> Result<usize, Box<dyn Error>> {
715+
pub fn backfill_asos_station(&mut self, station_id: &str, days: i64) -> Result<usize, Box<dyn Error>> {
687716
let http_client = reqwest::blocking::Client::builder()
688717
.timeout(std::time::Duration::from_secs(30))
689718
.build()?;

flomon_service/src/main.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,50 @@ fn main() {
186186
println!();
187187
}
188188

189+
// Check ASOS stations for stale data
190+
println!("📋 Checking ASOS data freshness...");
191+
let asos_locations: Vec<_> = daemon.get_asos_locations().to_vec();
192+
let mut asos_backfill_needed = Vec::new();
193+
194+
for location in &asos_locations {
195+
// Strip leading "K" to match IEM API station codes
196+
let station_id = if location.station_id.starts_with('K') && location.station_id.len() == 4 {
197+
&location.station_id[1..]
198+
} else {
199+
&location.station_id[..]
200+
};
201+
202+
match daemon.check_asos_staleness(station_id) {
203+
Ok(None) => {
204+
println!(" {} - No data found (needs backfill)", location.station_id);
205+
asos_backfill_needed.push(station_id.to_string());
206+
}
207+
Ok(Some(staleness)) => {
208+
let hours = staleness.num_hours();
209+
if hours > 2 {
210+
println!(" {} - Data is {} hours old (stale)", location.station_id, hours);
211+
asos_backfill_needed.push(station_id.to_string());
212+
} else {
213+
println!(" {} - Data is fresh ({} min old)", location.station_id, staleness.num_minutes());
214+
}
215+
}
216+
Err(e) => {
217+
eprintln!(" {} - Error checking staleness: {}", location.station_id, e);
218+
}
219+
}
220+
}
221+
222+
if !asos_backfill_needed.is_empty() {
223+
println!("\n📥 Backfilling {} ASOS stations (last 30 days)...", asos_backfill_needed.len());
224+
for station_id in &asos_backfill_needed {
225+
match daemon.backfill_asos_station(station_id, 30) {
226+
Ok(count) => println!(" ✓ {} - Inserted {} observations", station_id, count),
227+
Err(e) => eprintln!(" ✗ {} - Backfill failed: {}", station_id, e),
228+
}
229+
}
230+
println!();
231+
}
232+
189233
// Start HTTP endpoint if requested (in background thread)
190234
if let Some(port) = endpoint_port {
191235
println!("🚀 Starting HTTP endpoint server...");

0 commit comments

Comments
 (0)