-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathvalidate.py
More file actions
415 lines (344 loc) · 14.8 KB
/
validate.py
File metadata and controls
415 lines (344 loc) · 14.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
"""
ODS v2.x Record Validator
Performs two-pass validation:
Pass 1 — core schema (schema/ods_record_v2.json)
Pass 2 — profile schema (schema/profiles/<namespace>-<version>.json), if profile field is present
Store-level invariants (parent_id existence, FINAL uniqueness, OUTCOME profile consistency)
require the --store flag pointing to a directory of existing records.
Reserved profile namespace detection uses schema/profiles/registry.json.
v2.1.0 additions:
- CHECKPOINT record type validation (checkpoint block fields, sequence_number)
- sequence_number field validation for Merkle-eligible records
"""
import json
import sys
import os
import argparse
from pathlib import Path
try:
from jsonschema import Draft7Validator
except ImportError:
print("Error: jsonschema is required. Run: pip install jsonschema")
sys.exit(2)
REPO_ROOT = Path(__file__).parent.parent
CORE_SCHEMA_PATH = REPO_ROOT / "schema" / "ods_record_v2.json"
PROFILES_DIR = REPO_ROOT / "schema" / "profiles"
REGISTRY_PATH = PROFILES_DIR / "registry.json"
def load_json(path: Path) -> dict:
with open(path, "r") as f:
return json.load(f)
def validate_schema(record: dict, schema: dict) -> list[str]:
errors = []
validator = Draft7Validator(schema)
for error in sorted(validator.iter_errors(record), key=str):
errors.append(error.message)
return errors
def load_registry() -> dict:
if not REGISTRY_PATH.exists():
return {}
try:
return load_json(REGISTRY_PATH)
except (json.JSONDecodeError, OSError):
return {}
def resolve_profile_schema_path(profile_field: str) -> Path:
"""Given 'ODS-Finance/v1', return the expected path schema/profiles/ods-finance-v1.json."""
parts = profile_field.split("/")
if len(parts) != 2:
return PROFILES_DIR / "INVALID"
namespace, major_version = parts
filename = f"{namespace.lower()}-{major_version}.json"
return PROFILES_DIR / filename
def check_reserved_namespace(profile_field: str, registry: dict) -> str | None:
"""Return an error string if the profile namespace is reserved, else None."""
namespace = profile_field.split("/")[0]
profiles = registry.get("profiles", {})
info = profiles.get(namespace, {})
if info.get("status") == "reserved":
return (
f"Profile namespace '{namespace}' has status 'reserved' in registry. "
"Conformance claims against reserved profiles are PROHIBITED (OQ3). "
"See PROFILES.md."
)
return None
def validate_profile(record: dict, args) -> tuple[list[str], list[str]]:
"""
Perform second-pass profile validation.
Returns (errors, warnings). Warnings are prefixed strings; errors cause non-zero exit.
"""
errors = []
warnings = []
profile_field = record.get("profile")
if not profile_field:
record_type = record.get("record_type")
if record_type == "OUTCOME" and not args.store:
warnings.append(
"OUTCOME profile field is absent and --store was not provided. "
"Profile-specific fields in this OUTCOME cannot be validated without the parent DECISION."
)
return errors, warnings
registry = load_registry()
reserved_error = check_reserved_namespace(profile_field, registry)
if reserved_error:
errors.append(reserved_error)
return errors, warnings
schema_path = resolve_profile_schema_path(profile_field)
if not schema_path.exists():
if getattr(args, "skip_missing_profile", False):
warnings.append(
f"Profile schema for '{profile_field}' not found at {schema_path}. "
"--skip-missing-profile is set; profile validation skipped."
)
return errors, warnings
else:
errors.append(
f"Profile schema for '{profile_field}' not found. "
f"Searched: {schema_path}. "
"A missing profile schema is a configuration failure — the profile schema must be "
"available for validation. Use --skip-missing-profile to bypass (not recommended for production)."
)
return errors, warnings
try:
profile_schema = load_json(schema_path)
except (json.JSONDecodeError, OSError) as e:
errors.append(f"Failed to load profile schema at {schema_path}: {e}")
return errors, warnings
profile_errors = validate_schema(record, profile_schema)
errors.extend(profile_errors)
return errors, warnings
def load_store(store_path: Path) -> dict[str, dict]:
"""Load all records from a store directory, keyed by record_id."""
store = {}
if not store_path.is_dir():
print(f"Error: store path is not a directory: {store_path}", file=sys.stderr)
sys.exit(2)
for f in store_path.glob("*.json"):
try:
r = load_json(f)
rid = r.get("record_id")
if rid:
store[rid] = r
except (json.JSONDecodeError, KeyError, OSError):
pass
return store
def validate_checkpoint(record: dict) -> tuple[list[str], list[str]]:
"""
Validate CHECKPOINT-specific constraints beyond JSON Schema.
Does NOT enforce sequence_number presence — that is handled by
validate_stored_representation() when --stored mode is active.
Returns (errors, warnings).
"""
errors = []
warnings = []
if record.get("record_type") != "CHECKPOINT":
return errors, warnings
cp = record.get("checkpoint", {})
# covers_through_sequence_number must equal tree_size for gapless logs
tree_size = cp.get("tree_size")
covers = cp.get("covers_through_sequence_number")
if isinstance(tree_size, int) and isinstance(covers, int):
if covers != tree_size:
warnings.append(
f"checkpoint.covers_through_sequence_number ({covers}) != checkpoint.tree_size ({tree_size}). "
"These are equal when the Merkle log is gapless from sequence_number 1. "
"A mismatch is permitted but should be reviewed."
)
# The CHECKPOINT's own sequence_number must be > covers_through_sequence_number (when present)
own_seq = record.get("sequence_number")
if own_seq is not None and isinstance(covers, int):
if isinstance(own_seq, int) and own_seq <= covers:
errors.append(
f"CHECKPOINT sequence_number ({own_seq}) must be strictly greater than "
f"checkpoint.covers_through_sequence_number ({covers}). "
"A CHECKPOINT is written after the tree it describes."
)
return errors, warnings
def validate_sequence_number(record: dict) -> list[str]:
"""
Validate sequence_number value for any record type, if present.
Does NOT enforce presence — submitted records legitimately omit it.
Presence enforcement is handled by validate_stored_representation() (--stored mode).
"""
errors = []
seq = record.get("sequence_number")
if seq is not None:
if not isinstance(seq, int) or seq < 1:
errors.append(
f"'sequence_number' must be a positive integer >= 1, got: {seq!r}"
)
return errors
def validate_stored_representation(record: dict) -> list[str]:
"""
Enforce sequence_number presence for stored record representations (--stored mode).
All v2.1.0+ stored records must carry the store-assigned sequence_number.
Called only when --stored flag is active. See SPECIFICATION.md §3.3.
"""
errors = []
if "sequence_number" not in record:
record_type = record.get("record_type", "RECORD")
errors.append(
f"'sequence_number' is absent (--stored mode). Stored {record_type} records "
"must have a store-assigned sequence_number. See SPECIFICATION.md §3.3."
)
return errors
def validate_store_invariants(record: dict, store: dict[str, dict]) -> list[str]:
"""
Validate store-level invariants that JSON Schema cannot enforce:
1. parent_id must reference an existing record_id in the store.
2. Only one FINAL OUTCOME per parent_id chain.
"""
errors = []
record_type = record.get("record_type")
parent_id = record.get("parent_id")
if record_type == "OUTCOME":
if not parent_id:
errors.append("OUTCOME record is missing parent_id.")
return errors
if parent_id not in store:
errors.append(
f"parent_id '{parent_id}' does not reference any known record_id in the store. "
"Write must be rejected."
)
if record.get("outcome_status") == "FINAL":
existing_finals = [
r for r in store.values()
if r.get("record_type") == "OUTCOME"
and r.get("parent_id") == parent_id
and r.get("outcome_status") == "FINAL"
]
if existing_finals:
errors.append(
f"A FINAL OUTCOME already exists for parent_id '{parent_id}' "
f"(record_id: {existing_finals[0]['record_id']}). "
"Only one FINAL is permitted per decision chain. Write must be rejected."
)
return errors
def validate_outcome_profile_consistency(record: dict, store: dict[str, dict]) -> tuple[list[str], list[str]]:
"""
C4: For OUTCOME records, validate profile consistency against the parent DECISION.
Returns (errors, warnings).
"""
errors = []
warnings = []
if record.get("record_type") != "OUTCOME":
return errors, warnings
parent_id = record.get("parent_id")
if not parent_id or parent_id not in store:
return errors, warnings
parent = store[parent_id]
parent_profile = parent.get("profile")
outcome_profile = record.get("profile")
if outcome_profile is not None and parent_profile is not None:
if outcome_profile != parent_profile:
errors.append(
f"OUTCOME profile '{outcome_profile}' does not match parent DECISION profile "
f"'{parent_profile}'. All records in a decision graph must share the same profile. "
"Write must be rejected."
)
elif outcome_profile is None and parent_profile is not None:
warnings.append(
f"OUTCOME has no profile field; profile '{parent_profile}' inherited from parent DECISION "
f"'{parent_id}'. Profile-specific fields on this OUTCOME validated against '{parent_profile}'."
)
return errors, warnings
def main():
parser = argparse.ArgumentParser(
description=(
"Validate an ODS v2.0 record against the core schema and optional profile schema. "
"Pass 1: core (ods_record_v2.json). Pass 2: profile schema if profile field is present. "
"Use --store for store-level invariant checks."
)
)
parser.add_argument("file", help="Path to the JSON record to validate")
parser.add_argument(
"--store",
metavar="DIR",
help=(
"Path to a directory of existing records for store-level invariant checking "
"(parent_id existence, FINAL uniqueness, OUTCOME profile consistency). "
"Required for complete OUTCOME validation."
),
)
parser.add_argument(
"--skip-missing-profile",
action="store_true",
default=False,
help=(
"Skip profile schema validation when the profile schema file is not found locally. "
"Emits a warning instead of an error. NOT recommended for production use."
),
)
parser.add_argument(
"--stored",
action="store_true",
default=False,
help=(
"Validate a stored record representation (post-write, sequence_number assigned by store). "
"In stored mode, sequence_number is REQUIRED for all record types. "
"Without this flag, sequence_number is optional (submitted representation, pre-store-assignment). "
"See SPECIFICATION.md §3.3."
),
)
args = parser.parse_args()
record_path = Path(args.file)
if not record_path.exists():
print(f"Error: file not found: {record_path}", file=sys.stderr)
sys.exit(2)
try:
record = load_json(record_path)
except json.JSONDecodeError as e:
print(f"Error: invalid JSON in {record_path}: {e}", file=sys.stderr)
sys.exit(2)
if not CORE_SCHEMA_PATH.exists():
print(f"Error: core schema not found at {CORE_SCHEMA_PATH}", file=sys.stderr)
sys.exit(2)
core_schema = load_json(CORE_SCHEMA_PATH)
# Pass 1 — core schema
core_errors = validate_schema(record, core_schema)
if core_errors:
print("✗ ODS INVALID — core schema errors:")
for e in core_errors:
print(f" · {e}")
sys.exit(1)
# Pass 2 — profile schema (skipped for CHECKPOINT records; they carry no domain profile)
if record.get("record_type") == "CHECKPOINT":
profile_errors, profile_warnings = [], []
else:
profile_errors, profile_warnings = validate_profile(record, args)
all_errors = list(profile_errors)
all_warnings = list(profile_warnings)
# v2.1.0 — sequence_number and CHECKPOINT-specific validation
all_errors.extend(validate_sequence_number(record))
checkpoint_errors, checkpoint_warnings = validate_checkpoint(record)
all_errors.extend(checkpoint_errors)
all_warnings.extend(checkpoint_warnings)
# Stored-mode: enforce sequence_number presence for all record types
if args.stored:
all_errors.extend(validate_stored_representation(record))
# Store-level invariants
if args.store:
store = load_store(Path(args.store))
store_errors = validate_store_invariants(record, store)
all_errors.extend(store_errors)
consistency_errors, consistency_warnings = validate_outcome_profile_consistency(record, store)
all_errors.extend(consistency_errors)
all_warnings.extend(consistency_warnings)
elif record.get("record_type") == "OUTCOME" and not profile_errors:
all_warnings.append(
"Store-level invariants not checked "
"(use --store DIR to validate parent_id existence, FINAL uniqueness, and profile consistency)."
)
if all_errors:
print("✗ ODS INVALID — validation errors:")
for e in all_errors:
print(f" · {e}")
sys.exit(1)
for w in all_warnings:
print(f"⚠ {w}")
record_type = record.get("record_type", "RECORD")
schema_version = record.get("_schema_version", "2.x")
profile_field = record.get("profile")
profile_suffix = f" [{profile_field}]" if profile_field else ""
print(f"✓ ODS VALID: {record_type} record compliant with core schema v{schema_version}{profile_suffix}")
sys.exit(0)
if __name__ == "__main__":
main()