Skip to content

Commit 25973e0

Browse files
authored
Ensure use of cachetools is thread-safe (#2178)
* Ensure use of cachetools is thread-safe * Minor cleanup * fix formatting
1 parent 16de77d commit 25973e0

5 files changed

Lines changed: 74 additions & 38 deletions

File tree

src/confluent_kafka/schema_registry/_async/json_schema.py

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1515
# See the License for the specific language governing permissions and
1616
# limitations under the License.
17+
import asyncio as _locks
1718
import io
1819
import logging
1920
from typing import Any, Callable, Coroutine, Optional, Tuple, Union, cast
@@ -214,6 +215,7 @@ class AsyncJSONSerializer(AsyncBaseSerializer):
214215
'_to_dict',
215216
'_parsed_schemas',
216217
'_validators',
218+
'_validators_lock',
217219
'_validate',
218220
'_json_encode',
219221
]
@@ -255,6 +257,7 @@ async def __init_impl(
255257
self._known_subjects: set[str] = set()
256258
self._parsed_schemas = ParsedSchemaCache()
257259
self._validators: LRUCache[Schema, Validator] = LRUCache(1000)
260+
self._validators_lock = _locks.Lock()
258261

259262
if to_dict is not None and not callable(to_dict):
260263
raise ValueError(
@@ -404,7 +407,7 @@ def field_transformer(rule_ctx, field_transform, msg):
404407

405408
if self._validate and schema is not None and parsed_schema is not None and ref_registry is not None:
406409
try:
407-
validator = self._get_validator(schema, parsed_schema, ref_registry)
410+
validator = await self._get_validator(schema, parsed_schema, ref_registry)
408411
validator.validate(value)
409412
except ValidationError as ve:
410413
raise SerializationError(ve.message)
@@ -441,16 +444,18 @@ async def _get_parsed_schema(self, schema: Optional[Schema]) -> Tuple[Optional[J
441444
self._parsed_schemas.set(schema, (parsed_schema, ref_registry))
442445
return parsed_schema, ref_registry
443446

444-
def _get_validator(self, schema: Schema, parsed_schema: JsonSchema, registry: Registry) -> Validator:
445-
validator = self._validators.get(schema, None)
446-
if validator is not None:
447-
return validator
447+
async def _get_validator(self, schema: Schema, parsed_schema: JsonSchema, registry: Registry) -> Validator:
448+
async with self._validators_lock:
449+
validator = self._validators.get(schema, None)
450+
if validator is not None:
451+
return validator
448452

449453
cls = validator_for(parsed_schema)
450454
cls.check_schema(parsed_schema)
451455
validator = cls(parsed_schema, registry=registry)
452456

453-
self._validators[schema] = validator
457+
async with self._validators_lock:
458+
self._validators[schema] = validator
454459
return validator
455460

456461

@@ -518,6 +523,7 @@ class AsyncJSONDeserializer(AsyncBaseDeserializer):
518523
'_schema',
519524
'_parsed_schemas',
520525
'_validators',
526+
'_validators_lock',
521527
'_validate',
522528
'_json_decode',
523529
]
@@ -562,6 +568,7 @@ async def __init_impl(
562568
self._rule_registry = rule_registry if rule_registry else RuleRegistry.get_global_instance()
563569
self._parsed_schemas = ParsedSchemaCache()
564570
self._validators: LRUCache[Schema, Validator] = LRUCache(1000)
571+
self._validators_lock = _locks.Lock()
565572
self._json_decode = json_decode or orjson.loads
566573
self._use_schema_id = None
567574

@@ -708,7 +715,7 @@ def field_transformer(rule_ctx, field_transform, message):
708715
if self._validate:
709716
if reader_schema_raw is not None and reader_schema is not None and reader_ref_registry is not None:
710717
try:
711-
validator = self._get_validator(reader_schema_raw, reader_schema, reader_ref_registry)
718+
validator = await self._get_validator(reader_schema_raw, reader_schema, reader_ref_registry)
712719
validator.validate(obj_dict)
713720
except ValidationError as ve:
714721
raise SerializationError(ve.message)
@@ -745,14 +752,16 @@ async def _get_parsed_schema(self, schema: Schema) -> Tuple[Optional[JsonSchema]
745752
self._parsed_schemas.set(schema, (parsed_schema, ref_registry))
746753
return parsed_schema, ref_registry
747754

748-
def _get_validator(self, schema: Schema, parsed_schema: JsonSchema, registry: Registry) -> Validator:
749-
validator = self._validators.get(schema, None)
750-
if validator is not None:
751-
return validator
755+
async def _get_validator(self, schema: Schema, parsed_schema: JsonSchema, registry: Registry) -> Validator:
756+
async with self._validators_lock:
757+
validator = self._validators.get(schema, None)
758+
if validator is not None:
759+
return validator
752760

753761
cls = validator_for(parsed_schema)
754762
cls.check_schema(parsed_schema)
755763
validator = cls(parsed_schema, registry=registry)
756764

757-
self._validators[schema] = validator
765+
async with self._validators_lock:
766+
self._validators[schema] = validator
758767
return validator

src/confluent_kafka/schema_registry/_async/schema_registry_client.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
# limitations under the License.
1717
#
1818
import asyncio
19+
import asyncio as _locks
1920
import json
2021
import logging
2122
import os
@@ -642,6 +643,7 @@ def __init__(self, conf: dict):
642643
self._conf = conf
643644
self._rest_client = _AsyncRestClient(conf)
644645
self._cache = _SchemaCache()
646+
self._latest_lock = _locks.Lock()
645647
cache_capacity = self._rest_client.cache_capacity
646648
cache_ttl = self._rest_client.cache_latest_ttl_sec
647649
self._latest_version_cache: Cache[Any, Any]
@@ -1045,7 +1047,8 @@ async def get_latest_version(self, subject_name: str, fmt: Optional[str] = None)
10451047
`GET Subject Version API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--subjects-(string-%20subject)-versions-(versionId-%20version)>`_
10461048
""" # noqa: E501
10471049

1048-
registered_schema = self._latest_version_cache.get(subject_name, None)
1050+
async with self._latest_lock:
1051+
registered_schema = self._latest_version_cache.get(subject_name, None)
10491052
if registered_schema is not None:
10501053
return registered_schema
10511054

@@ -1056,7 +1059,8 @@ async def get_latest_version(self, subject_name: str, fmt: Optional[str] = None)
10561059

10571060
registered_schema = RegisteredSchema.from_dict(response)
10581061

1059-
self._latest_version_cache[subject_name] = registered_schema
1062+
async with self._latest_lock:
1063+
self._latest_version_cache[subject_name] = registered_schema
10601064

10611065
return registered_schema
10621066

@@ -1080,7 +1084,8 @@ async def get_latest_with_metadata(
10801084
""" # noqa: E501
10811085

10821086
cache_key = (subject_name, frozenset(metadata.items()), deleted)
1083-
registered_schema = self._latest_with_metadata_cache.get(cache_key, None)
1087+
async with self._latest_lock:
1088+
registered_schema = self._latest_with_metadata_cache.get(cache_key, None)
10841089
if registered_schema is not None:
10851090
return registered_schema
10861091

@@ -1096,7 +1101,8 @@ async def get_latest_with_metadata(
10961101

10971102
registered_schema = RegisteredSchema.from_dict(response)
10981103

1099-
self._latest_with_metadata_cache[cache_key] = registered_schema
1104+
async with self._latest_lock:
1105+
self._latest_with_metadata_cache[cache_key] = registered_schema
11001106

11011107
return registered_schema
11021108

@@ -1548,13 +1554,15 @@ async def get_contexts(self, offset: int = 0, limit: int = -1) -> List[str]:
15481554
result = await self._rest_client.get('contexts', query={'offset': offset, 'limit': limit})
15491555
return result
15501556

1551-
def clear_latest_caches(self):
1552-
self._latest_version_cache.clear()
1553-
self._latest_with_metadata_cache.clear()
1557+
async def clear_latest_caches(self):
1558+
async with self._latest_lock:
1559+
self._latest_version_cache.clear()
1560+
self._latest_with_metadata_cache.clear()
15541561

1555-
def clear_caches(self):
1556-
self._latest_version_cache.clear()
1557-
self._latest_with_metadata_cache.clear()
1562+
async def clear_caches(self):
1563+
async with self._latest_lock:
1564+
self._latest_version_cache.clear()
1565+
self._latest_with_metadata_cache.clear()
15581566
self._cache.clear()
15591567

15601568
@staticmethod

src/confluent_kafka/schema_registry/_sync/json_schema.py

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
# limitations under the License.
1717
import io
1818
import logging
19+
import threading as _locks
1920
from typing import Any, Callable, Optional, Tuple, Union, cast
2021

2122
import orjson
@@ -212,6 +213,7 @@ class JSONSerializer(BaseSerializer):
212213
'_to_dict',
213214
'_parsed_schemas',
214215
'_validators',
216+
'_validators_lock',
215217
'_validate',
216218
'_json_encode',
217219
]
@@ -253,6 +255,7 @@ def __init_impl(
253255
self._known_subjects: set[str] = set()
254256
self._parsed_schemas = ParsedSchemaCache()
255257
self._validators: LRUCache[Schema, Validator] = LRUCache(1000)
258+
self._validators_lock = _locks.Lock()
256259

257260
if to_dict is not None and not callable(to_dict):
258261
raise ValueError(
@@ -440,15 +443,17 @@ def _get_parsed_schema(self, schema: Optional[Schema]) -> Tuple[Optional[JsonSch
440443
return parsed_schema, ref_registry
441444

442445
def _get_validator(self, schema: Schema, parsed_schema: JsonSchema, registry: Registry) -> Validator:
443-
validator = self._validators.get(schema, None)
444-
if validator is not None:
445-
return validator
446+
with self._validators_lock:
447+
validator = self._validators.get(schema, None)
448+
if validator is not None:
449+
return validator
446450

447451
cls = validator_for(parsed_schema)
448452
cls.check_schema(parsed_schema)
449453
validator = cls(parsed_schema, registry=registry)
450454

451-
self._validators[schema] = validator
455+
with self._validators_lock:
456+
self._validators[schema] = validator
452457
return validator
453458

454459

@@ -515,6 +520,7 @@ class JSONDeserializer(BaseDeserializer):
515520
'_schema',
516521
'_parsed_schemas',
517522
'_validators',
523+
'_validators_lock',
518524
'_validate',
519525
'_json_decode',
520526
]
@@ -559,6 +565,7 @@ def __init_impl(
559565
self._rule_registry = rule_registry if rule_registry else RuleRegistry.get_global_instance()
560566
self._parsed_schemas = ParsedSchemaCache()
561567
self._validators: LRUCache[Schema, Validator] = LRUCache(1000)
568+
self._validators_lock = _locks.Lock()
562569
self._json_decode = json_decode or orjson.loads
563570
self._use_schema_id = None
564571

@@ -741,13 +748,15 @@ def _get_parsed_schema(self, schema: Schema) -> Tuple[Optional[JsonSchema], Opti
741748
return parsed_schema, ref_registry
742749

743750
def _get_validator(self, schema: Schema, parsed_schema: JsonSchema, registry: Registry) -> Validator:
744-
validator = self._validators.get(schema, None)
745-
if validator is not None:
746-
return validator
751+
with self._validators_lock:
752+
validator = self._validators.get(schema, None)
753+
if validator is not None:
754+
return validator
747755

748756
cls = validator_for(parsed_schema)
749757
cls.check_schema(parsed_schema)
750758
validator = cls(parsed_schema, registry=registry)
751759

752-
self._validators[schema] = validator
760+
with self._validators_lock:
761+
self._validators[schema] = validator
753762
return validator

src/confluent_kafka/schema_registry/_sync/schema_registry_client.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import logging
2121
import os
2222
import ssl
23+
import threading as _locks
2324
import time
2425
import urllib
2526
from typing import Any, Callable, Dict, List, Literal, Optional, Union
@@ -640,6 +641,7 @@ def __init__(self, conf: dict):
640641
self._conf = conf
641642
self._rest_client = _RestClient(conf)
642643
self._cache = _SchemaCache()
644+
self._latest_lock = _locks.Lock()
643645
cache_capacity = self._rest_client.cache_capacity
644646
cache_ttl = self._rest_client.cache_latest_ttl_sec
645647
self._latest_version_cache: Cache[Any, Any]
@@ -1041,7 +1043,8 @@ def get_latest_version(self, subject_name: str, fmt: Optional[str] = None) -> 'R
10411043
`GET Subject Version API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--subjects-(string-%20subject)-versions-(versionId-%20version)>`_
10421044
""" # noqa: E501
10431045

1044-
registered_schema = self._latest_version_cache.get(subject_name, None)
1046+
with self._latest_lock:
1047+
registered_schema = self._latest_version_cache.get(subject_name, None)
10451048
if registered_schema is not None:
10461049
return registered_schema
10471050

@@ -1050,7 +1053,8 @@ def get_latest_version(self, subject_name: str, fmt: Optional[str] = None) -> 'R
10501053

10511054
registered_schema = RegisteredSchema.from_dict(response)
10521055

1053-
self._latest_version_cache[subject_name] = registered_schema
1056+
with self._latest_lock:
1057+
self._latest_version_cache[subject_name] = registered_schema
10541058

10551059
return registered_schema
10561060

@@ -1074,7 +1078,8 @@ def get_latest_with_metadata(
10741078
""" # noqa: E501
10751079

10761080
cache_key = (subject_name, frozenset(metadata.items()), deleted)
1077-
registered_schema = self._latest_with_metadata_cache.get(cache_key, None)
1081+
with self._latest_lock:
1082+
registered_schema = self._latest_with_metadata_cache.get(cache_key, None)
10781083
if registered_schema is not None:
10791084
return registered_schema
10801085

@@ -1090,7 +1095,8 @@ def get_latest_with_metadata(
10901095

10911096
registered_schema = RegisteredSchema.from_dict(response)
10921097

1093-
self._latest_with_metadata_cache[cache_key] = registered_schema
1098+
with self._latest_lock:
1099+
self._latest_with_metadata_cache[cache_key] = registered_schema
10941100

10951101
return registered_schema
10961102

@@ -1537,12 +1543,14 @@ def get_contexts(self, offset: int = 0, limit: int = -1) -> List[str]:
15371543
return result
15381544

15391545
def clear_latest_caches(self):
1540-
self._latest_version_cache.clear()
1541-
self._latest_with_metadata_cache.clear()
1546+
with self._latest_lock:
1547+
self._latest_version_cache.clear()
1548+
self._latest_with_metadata_cache.clear()
15421549

15431550
def clear_caches(self):
1544-
self._latest_version_cache.clear()
1545-
self._latest_with_metadata_cache.clear()
1551+
with self._latest_lock:
1552+
self._latest_version_cache.clear()
1553+
self._latest_with_metadata_cache.clear()
15461554
self._cache.clear()
15471555

15481556
@staticmethod

tools/unasync.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@
3333
SUBS = [
3434
('from confluent_kafka.schema_registry.common import asyncinit', ''),
3535
('@asyncinit', ''),
36+
('import asyncio as _locks', 'import threading as _locks'),
3637
('import asyncio', ''),
38+
('async with self.', 'with self.'),
3739
('asyncio.sleep', 'time.sleep'),
3840
('._async.', '._sync.'),
3941
('Async([A-Z][A-Za-z0-9_]*)', r'\2'),

0 commit comments

Comments
 (0)