Skip to content

Commit eb7e96e

Browse files
authored
fix(JSONValidator): make JSONValidator thread-safe for parallel spec validation (#95)
* fix: make JSONValidator thread-safe for parallel spec validation Use a per-thread RefResolver and Draft7Validator so concurrent validate() calls from DataflowSpecBuilder and SpecMapper do not corrupt jsonschema ref resolution (e.g. KeyError on definitions/views). Reuse the resolver on each thread to keep referenced schema files cached after the first load on that worker.
1 parent bb91ba6 commit eb7e96e

2 files changed

Lines changed: 21 additions & 7 deletions

File tree

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
v0.15.4
1+
v0.15.5

src/utility.py

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from functools import reduce
55
import logging
66
import os
7+
import threading
78
from typing import Callable, Dict, List
89

910
import json
@@ -80,12 +81,18 @@ class JSONValidator:
8081
Attributes:
8182
schema (dict): The JSON schema loaded from a file.
8283
base_uri (str): The base URI for resolving schema references.
83-
resolver (RefResolver): The JSON schema resolver.
84-
validator (Draft7Validator): The JSON schema validator.
8584
8685
Methods:
8786
validate(json_data: Dict) -> List:
8887
Validates the provided JSON data against the loaded schema and returns a list of validation errors.
88+
89+
Thread safety:
90+
A ``Draft7Validator`` and ``RefResolver`` are created once per OS thread (via
91+
``threading.local``) and reused on that thread. Different threads never share a
92+
resolver, so ``RefResolver`` internal state is not mutated concurrently. The root
93+
schema dict is loaded once in ``__init__``; referenced schema files (e.g.
94+
``definitions_main.json``) are read from disk at most once per thread and then
95+
cached on that resolver's ``_remote_cache``.
8996
"""
9097

9198
def __init__(self, schema_path: str):
@@ -95,14 +102,21 @@ def __init__(self, schema_path: str):
95102
except Exception as e:
96103
raise ValueError(f"JSON Schema not found: {schema_path}") from e
97104

98-
# Resolve references
99105
self.base_uri = "file://" + os.path.abspath(os.path.dirname(schema_path)) + "/"
100-
self.resolver = js.RefResolver(base_uri=self.base_uri, referrer=self.schema)
101-
self.validator = js.Draft7Validator(self.schema, resolver=self.resolver)
106+
self._thread_local = threading.local()
107+
108+
def _thread_validator(self) -> js.Draft7Validator:
109+
"""Return a validator for the current thread, creating it on first use."""
110+
validator = getattr(self._thread_local, "validator", None)
111+
if validator is None:
112+
resolver = js.RefResolver(base_uri=self.base_uri, referrer=self.schema)
113+
validator = js.Draft7Validator(self.schema, resolver=resolver)
114+
self._thread_local.validator = validator
115+
return validator
102116

103117
def validate(self, json_data: Dict) -> List:
104118
"""Validate the provided JSON data against the loaded schema and returns a list of validation errors."""
105-
return list(self.validator.iter_errors(json_data))
119+
return list(self._thread_validator().iter_errors(json_data))
106120

107121

108122
def add_struct_field(struct: StructType, column: Dict):

0 commit comments

Comments
 (0)