Skip to content
Merged
28 changes: 20 additions & 8 deletions scripts/validate_gsifi_governance_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

def load_json(path: Path) -> dict:
try:
return json.loads(_read_text(path))

Check warning on line 36 in scripts/validate_gsifi_governance_assets.py

View check run for this annotation

Precaution / Precaution Basic

PY009: Deserialization of Untrusted Data

Potential unsafe usage of 'json.loads' that can allow instantiation of arbitrary objects.
except json.JSONDecodeError as exc:
raise ValidationError(f"Unable to parse JSON: {path}: {exc}") from exc

Expand Down Expand Up @@ -92,12 +92,21 @@
if validator_type is None:
return

validator = validator_type(schema)
errors = sorted(validator.iter_errors(sample), key=lambda e: e.path)
if errors:
first = errors[0]
path = ".".join(str(p) for p in first.path) or "<root>"
raise ValidationError(f"JSON Schema validation failed at {path}: {first.message}")
try:
validator = validator_type(schema)
errors = list(validator.iter_errors(sample))
if errors:
errors = sorted(errors, key=lambda e: e.path)
first = errors[0]
path = ".".join(str(p) for p in first.path) or "<root>"
raise ValidationError(f"JSON Schema validation failed at {path}: {first.message}")
except ValidationError:
raise
except Exception as exc:
# Wrap any jsonschema-related exceptions that might occur during validation or initialization
if "jsonschema" in str(type(exc)):
raise ValidationError(f"JSON Schema validation failed: {exc}") from exc
raise

def validate_event_schema_and_sample(
schema_path: Path = SCHEMA_PATH,
Expand All @@ -110,8 +119,8 @@
if not isinstance(sample, dict):
raise ValidationError("Sample event root must be a JSON object")

_validate_with_jsonschema(schema, sample)

# Perform basic structure validation before letting jsonschema take over,
# as existing tests expect these specific error messages.
required = schema.get("required", [])
if not isinstance(required, list):
raise ValidationError("Schema field 'required' must be a list")
Expand All @@ -122,6 +131,7 @@
properties = schema.get("properties", {})
if not isinstance(properties, dict):
raise ValidationError("Schema field 'properties' must be an object")

additional_allowed = schema.get("additionalProperties", True)
if additional_allowed is False:
allowed = set(properties.keys())
Expand Down Expand Up @@ -155,6 +165,8 @@
if prop.get("format") == "date-time" and isinstance(value, str):
_validate_date_time(value, key)

_validate_with_jsonschema(schema, sample)


def validate_rego_policy(rego_path: Path = REGO_PATH) -> None:
text = _read_text(rego_path)
Expand Down
Loading