1+ # =============================================================================
2+ # Copyright (c) 2026 Botts Innovative Research Inc.
3+ # Author: Ian Patterson
4+ # Contact Email: ian@botts-inc.com
5+ # =============================================================================
6+ """
7+ Verify that OSHConnect's datamodels can faithfully represent the datastream schema
8+ that an OSH server publishes for the FakeWeatherDriver, in both observation
9+ formats served:
10+
11+ - application/om+json (CS API Part 2 §16.1.4 shape: obsFormat + resultSchema)
12+ - application/swe+json (CS API Part 2 §16.2.3 shape: obsFormat + recordSchema
13+ [+ encoding])
14+
15+ Strategy: round-trip the server-supplied schema JSON through the matching
16+ pydantic model (parse -> re-serialize) and assert structural equivalence. If
17+ our datamodels can losslessly express what the Node has, then a schema
18+ *generated* from those same datamodels will match the Node.
19+
20+ Each parametrized case prefers a live node at localhost:8282 (FakeWeatherDriver
21+ running). If the node is unreachable or no weather system is registered, it
22+ falls back to the saved fixture at tests/fixtures/fake_weather_schema_<fmt>.json.
23+ If neither is available, the case is skipped.
24+ """
25+ from __future__ import annotations
26+
27+ import json
28+ from pathlib import Path
29+ from typing import NamedTuple
30+
31+ import pytest
32+ import requests
33+
34+ from src .oshconnect .schema_datamodels import (
35+ JSONDatastreamRecordSchema ,
36+ SWEDatastreamRecordSchema ,
37+ )
38+
39+ NODE_URL = "http://localhost:8282/sensorhub/api"
40+ NODE_AUTH = ("admin" , "admin" )
41+ LIVE_TIMEOUT = 2.0
42+ FIXTURES_DIR = Path (__file__ ).parent / "fixtures"
43+
44+
45+ class FormatCase (NamedTuple ):
46+ obs_format : str
47+ model : type
48+ fixture_path : Path
49+
50+
51+ CASES = [
52+ FormatCase (
53+ obs_format = "application/om+json" ,
54+ model = JSONDatastreamRecordSchema ,
55+ fixture_path = FIXTURES_DIR / "fake_weather_schema_omjson.json" ,
56+ ),
57+ FormatCase (
58+ obs_format = "application/swe+json" ,
59+ model = SWEDatastreamRecordSchema ,
60+ fixture_path = FIXTURES_DIR / "fake_weather_schema_swejson.json" ,
61+ ),
62+ ]
63+
64+
65+ def _find_weather_system (systems : list [dict ]) -> dict | None :
66+ """Pick a system whose name/description/uid mentions 'weather'."""
67+ for sys_ in systems :
68+ props = sys_ .get ("properties" , {}) or {}
69+ haystack = " " .join (
70+ str (x ) for x in (
71+ sys_ .get ("id" , "" ),
72+ props .get ("name" , "" ),
73+ props .get ("description" , "" ),
74+ props .get ("uid" , "" ),
75+ )
76+ ).lower ()
77+ if "weather" in haystack :
78+ return sys_
79+ return None
80+
81+
82+ def _try_live_schema (obs_format : str ) -> tuple [str , dict ] | None :
83+ """Probe the node at localhost:8282 for a FakeWeather datastream and return
84+ (source_label, schema_json) for the requested obs_format. Returns None on
85+ any failure."""
86+ try :
87+ sys_resp = requests .get (f"{ NODE_URL } /systems?f=json" , auth = NODE_AUTH , timeout = LIVE_TIMEOUT )
88+ except (requests .ConnectionError , requests .Timeout ):
89+ return None
90+ if not sys_resp .ok :
91+ return None
92+
93+ weather = _find_weather_system (sys_resp .json ().get ("items" , []))
94+ if not weather :
95+ return None
96+
97+ sys_id = weather .get ("id" )
98+ if not sys_id :
99+ return None
100+
101+ ds_resp = requests .get (
102+ f"{ NODE_URL } /systems/{ sys_id } /datastreams?f=json" ,
103+ auth = NODE_AUTH , timeout = LIVE_TIMEOUT ,
104+ )
105+ if not ds_resp .ok :
106+ return None
107+ datastreams = ds_resp .json ().get ("items" , [])
108+ if not datastreams :
109+ return None
110+
111+ ds_id = datastreams [0 ].get ("id" )
112+ schema_resp = requests .get (
113+ f"{ NODE_URL } /datastreams/{ ds_id } /schema" ,
114+ params = {"obsFormat" : obs_format },
115+ auth = NODE_AUTH , timeout = LIVE_TIMEOUT ,
116+ )
117+ if not schema_resp .ok :
118+ return None
119+
120+ return (
121+ f"live node 8282 ({ obs_format } , system={ sys_id } , datastream={ ds_id } )" ,
122+ schema_resp .json (),
123+ )
124+
125+
126+ def _try_fixture_schema (path : Path ) -> tuple [str , dict ] | None :
127+ """Load the saved fixture if it exists and is non-empty."""
128+ if not path .exists ():
129+ return None
130+ text = path .read_text ().strip ()
131+ if not text or text == "{}" :
132+ return None
133+ data = json .loads (text )
134+ if not data :
135+ return None
136+ return f"fixture { path .name } " , data
137+
138+
139+ @pytest .mark .parametrize (
140+ "case" ,
141+ CASES ,
142+ ids = lambda c : c .obs_format ,
143+ )
144+ def test_fake_weather_schema_round_trips_through_datamodels (case : FormatCase ):
145+ source = _try_live_schema (case .obs_format ) or _try_fixture_schema (case .fixture_path )
146+ if source is None :
147+ pytest .skip (
148+ f"No live FakeWeather node at { NODE_URL } for { case .obs_format } and no "
149+ f"usable fixture at { case .fixture_path } . To enable: start the "
150+ f"FakeWeatherDriver on the node, or paste a schema JSON into the fixture."
151+ )
152+ label , server_schema = source
153+
154+ parsed = case .model .model_validate (server_schema )
155+ round_tripped = parsed .model_dump (
156+ mode = 'json' , by_alias = True , exclude_none = True , exclude_unset = True ,
157+ )
158+
159+ assert server_schema == round_tripped , (
160+ f"Schema round-trip mismatch (source: { label } , model: { case .model .__name__ } ).\n "
161+ f"server:\n { json .dumps (server_schema , indent = 2 , sort_keys = True )} \n \n "
162+ f"datamodel re-serialization:\n { json .dumps (round_tripped , indent = 2 , sort_keys = True )} "
163+ )
0 commit comments