1+ from __future__ import annotations
2+
3+ import json
4+ from pathlib import Path
5+ from typing import Any , Dict , Iterable , List , Tuple
6+ from urllib .parse import urljoin , urlparse
7+ from pathlib import PurePosixPath
8+
9+ from referencing import Registry , Resource
10+ from referencing .exceptions import Unresolvable
11+
12+ def iter_refs (node : Any , path : str = "#" ) -> Iterable [Tuple [str , str ]]:
13+ """
14+ Yield ($ref_value, json_pointer_path_in_schema) for every $ref in a schema tree.
15+ """
16+ #print (f"Travesring {node}")
17+ if isinstance (node , dict ):
18+ if "$ref" in node and isinstance (node ["$ref" ], str ):
19+ yield node ["$ref" ], path + "/$ref"
20+ for k , v in node .items ():
21+ yield from iter_refs (v , f"{ path } /{ k } " )
22+ elif isinstance (node , list ):
23+ for i , v in enumerate (node ):
24+ yield from iter_refs (v , f"{ path } /{ i } " )
25+
26+
27+ def build_registry_from_dir (schema_dir : str | Path ) -> Tuple [Registry , Dict [str , Dict [str , Any ]]]:
28+ """
29+ Load all *.json schemas under schema_dir into a referencing.Registry.
30+
31+ Each resource is keyed by:
32+ - its $id, if present, else
33+ - a file:// URI for its absolute path.
34+ """
35+ schema_dir = Path (schema_dir )
36+ registry = Registry ()
37+ by_uri : Dict [str , Dict [str , Any ]] = {}
38+
39+ for path in schema_dir .rglob ("*.json" ):
40+ with path .open ("r" , encoding = "utf-8" ) as f :
41+ schema = json .load (f )
42+
43+ print (f"Loading { path .name } " )
44+ uri = schema .get ("$id" )
45+ if not uri :
46+ uri = path .resolve ().as_uri () # file:///.../schema.json
47+
48+ resource = Resource .from_contents (schema )
49+ registry = registry .with_resource (uri , resource )
50+ if uri in by_uri :
51+ raise Exception (f"Duplicate schema ID { uri } found in { path .name } " )
52+
53+ by_uri [uri ] = schema
54+
55+ return registry , by_uri
56+
57+
58+ def find_missing_refs_in_dir (schema_dir : str | Path ) -> List [Dict [str , str ]]:
59+ """
60+ Returns a list of unresolved $refs across all schemas in schema_dir.
61+ """
62+ registry , schemas = build_registry_from_dir (schema_dir )
63+ missing : List [Dict [str , str ]] = []
64+ for base_uri , schema in schemas .items ():
65+ resolver = registry .resolver (base_uri = base_uri )
66+
67+ for ref , where in iter_refs (schema ):
68+ # Make relative refs absolute against the schema's base URI
69+ target = urljoin (base_uri , ref )
70+
71+ try :
72+ resolver .lookup (target )
73+ except Unresolvable :
74+ missing .append (
75+ {
76+ "schema" : base_uri ,
77+ "ref" : ref ,
78+ "where" : where ,
79+ "resolved_target" : target ,
80+ }
81+ )
82+
83+ return missing
84+
85+
86+ if __name__ == "__main__" :
87+ problems = find_missing_refs_in_dir (Path (__file__ ).resolve ().parent )
88+ if problems :
89+ print ("\n Missing/unresolvable $refs:" )
90+ for p in problems :
91+ print (f"- In { p ['schema' ].split ("/" )[- 1 ]} :\n at { p ['where' ]} : { p ['ref' ]} (→ { p ['resolved_target' ]} )\n " )
92+ raise SystemExit (2 )
93+ else :
94+ print ("All $refs resolved." )
0 commit comments