Skip to content

Commit 35d5608

Browse files
rayokotafangnx
andauthored
Fix JSON schema resolver bug (#2188)
* update * Simplify validation code * Fix build * Revert "Fix build" This reverts commit 9965eca. * Fix build * Fix build * Fix build --------- Co-authored-by: Naxin <nfang@confluent.io>
1 parent 0eac76f commit 35d5608

5 files changed

Lines changed: 523 additions & 4 deletions

File tree

requirements/requirements-examples.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ requests
1515
avro>=1.11.1,<2
1616

1717
pyrsistent
18-
jsonschema
18+
jsonschema >= 4.18.0
1919
orjson >= 3.10
2020

2121
googleapis-common-protos

requirements/requirements-json.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
pyrsistent
2-
jsonschema
2+
jsonschema >= 4.18.0
33
orjson >= 3.10

src/confluent_kafka/schema_registry/common/json_schema.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,8 +201,13 @@ def _validate_subschemas(
201201
try:
202202
ref = subschema.get("$ref")
203203
if ref is not None:
204-
subschema = resolver.lookup(ref).contents
205-
validate(instance=message, schema=subschema, registry=registry)
204+
resolved = resolver.lookup(ref)
205+
subschema = resolved.contents
206+
# Pass _resolver (not resolver) to use the new referencing library's
207+
# Resolver with correct context for nested $ref resolution
208+
validate(instance=message, schema=subschema, registry=registry, _resolver=resolved.resolver)
209+
else:
210+
validate(instance=message, schema=subschema, registry=registry)
206211
return subschema
207212
except ValidationError:
208213
pass

tests/schema_registry/_async/test_json_serdes.py

Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1192,3 +1192,260 @@ async def deserialize_with_all_versions(client, ser_ctx, obj_bytes, obj, obj2, o
11921192
deser = await AsyncJSONDeserializer(None, schema_registry_client=client, conf=deser_conf)
11931193
newobj = await deser(obj_bytes, ser_ctx)
11941194
assert obj3 == newobj
1195+
1196+
1197+
async def test_json_oneof_with_refs_nested_refs():
1198+
"""
1199+
Test oneOf with $ref that contains nested $refs.
1200+
1201+
This test validates the fix for the bug where oneOf subschemas containing
1202+
$refs that themselves contain nested $refs would fail with:
1203+
PointerToNowhere: '/definitions/...' does not exist
1204+
1205+
The fix ensures the resolver context is maintained when validating
1206+
resolved subschemas.
1207+
"""
1208+
executor = FieldEncryptionExecutor.register_with_clock(FakeClock())
1209+
1210+
conf = {'url': _BASE_URL}
1211+
client = AsyncSchemaRegistryClient.new_client(conf)
1212+
ser_conf = {'auto.register.schemas': False, 'use.latest.version': True}
1213+
rule_conf = {'secret': 'mysecret'}
1214+
1215+
# Schema with oneOf containing $ref, where the resolved schema has nested $refs
1216+
schema = {
1217+
"type": "object",
1218+
"properties": {
1219+
"transactionType": {"type": "string"},
1220+
"data": {"oneOf": [{"$ref": "#/$defs/Account"}, {"$ref": "#/$defs/Payment"}]},
1221+
},
1222+
"required": ["transactionType", "data"],
1223+
"$defs": {
1224+
"Account": {
1225+
"type": "object",
1226+
"properties": {
1227+
"accountId": {"type": "string", "confluent:tags": ["PII"]},
1228+
"partyType": {"type": "string"},
1229+
"paymentMethod": {"$ref": "#/$defs/PaymentMethod"}, # Nested $ref!
1230+
"party": {"$ref": "#/$defs/PartyInfo"}, # Another nested $ref!
1231+
},
1232+
"required": ["accountId"],
1233+
},
1234+
"Payment": {
1235+
"type": "object",
1236+
"properties": {
1237+
"paymentId": {"type": "string", "confluent:tags": ["PII"]},
1238+
"amount": {"type": "number"},
1239+
},
1240+
"required": ["paymentId", "amount"],
1241+
},
1242+
"PaymentMethod": {
1243+
"type": "object",
1244+
"properties": {"type": {"type": "string"}, "details": {"type": "string", "confluent:tags": ["PII"]}},
1245+
},
1246+
"PartyInfo": {"type": "object", "properties": {"name": {"type": "string"}, "address": {"type": "string"}}},
1247+
},
1248+
}
1249+
1250+
rule = Rule(
1251+
"test-encrypt",
1252+
"",
1253+
RuleKind.TRANSFORM,
1254+
RuleMode.WRITEREAD,
1255+
"ENCRYPT",
1256+
["PII"],
1257+
RuleParams({"encrypt.kek.name": "kek1", "encrypt.kms.type": "local-kms", "encrypt.kms.key.id": "mykey"}),
1258+
None,
1259+
None,
1260+
"ERROR,NONE",
1261+
False,
1262+
)
1263+
await client.register_schema(_SUBJECT, Schema(json.dumps(schema), "JSON", [], None, RuleSet(None, [rule])))
1264+
1265+
# Test with Account type (has nested $refs)
1266+
obj = {
1267+
"transactionType": "account_update",
1268+
"data": {
1269+
"accountId": "ACC123",
1270+
"partyType": "Payer",
1271+
"paymentMethod": {"type": "card", "details": "1234-5678-9012-3456"},
1272+
"party": {"name": "John Doe", "address": "123 Main St"},
1273+
},
1274+
}
1275+
1276+
ser = await AsyncJSONSerializer(json.dumps(schema), client, conf=ser_conf, rule_conf=rule_conf)
1277+
dek_client = executor.executor.client
1278+
ser_ctx = SerializationContext(_TOPIC, MessageField.VALUE)
1279+
obj_bytes = await ser(obj, ser_ctx)
1280+
1281+
# Verify encryption happened
1282+
assert obj['data']['accountId'] != 'ACC123'
1283+
assert obj['data']['paymentMethod']['details'] != '1234-5678-9012-3456'
1284+
1285+
# Reset for comparison
1286+
obj['data']['accountId'] = 'ACC123'
1287+
obj['data']['paymentMethod']['details'] = '1234-5678-9012-3456'
1288+
1289+
deser = await AsyncJSONDeserializer(None, schema_registry_client=client, rule_conf=rule_conf)
1290+
executor.executor.client = dek_client
1291+
obj2 = await deser(obj_bytes, ser_ctx)
1292+
assert obj == obj2
1293+
1294+
1295+
async def test_json_anyof_with_refs():
1296+
"""
1297+
Test anyOf with $refs to ensure all union types are handled correctly.
1298+
"""
1299+
executor = FieldEncryptionExecutor.register_with_clock(FakeClock())
1300+
1301+
conf = {'url': _BASE_URL}
1302+
client = AsyncSchemaRegistryClient.new_client(conf)
1303+
ser_conf = {'auto.register.schemas': False, 'use.latest.version': True}
1304+
rule_conf = {'secret': 'mysecret'}
1305+
1306+
schema = {
1307+
"type": "object",
1308+
"properties": {"value": {"anyOf": [{"$ref": "#/$defs/StringValue"}, {"$ref": "#/$defs/NumberValue"}]}},
1309+
"$defs": {
1310+
"StringValue": {
1311+
"type": "object",
1312+
"properties": {"strValue": {"type": "string", "confluent:tags": ["PII"]}},
1313+
},
1314+
"NumberValue": {"type": "object", "properties": {"numValue": {"type": "number"}}},
1315+
},
1316+
}
1317+
1318+
rule = Rule(
1319+
"test-encrypt",
1320+
"",
1321+
RuleKind.TRANSFORM,
1322+
RuleMode.WRITEREAD,
1323+
"ENCRYPT",
1324+
["PII"],
1325+
RuleParams({"encrypt.kek.name": "kek1", "encrypt.kms.type": "local-kms", "encrypt.kms.key.id": "mykey"}),
1326+
None,
1327+
None,
1328+
"ERROR,NONE",
1329+
False,
1330+
)
1331+
await client.register_schema(_SUBJECT, Schema(json.dumps(schema), "JSON", [], None, RuleSet(None, [rule])))
1332+
1333+
obj = {"value": {"strValue": "sensitive-data"}}
1334+
1335+
ser = await AsyncJSONSerializer(json.dumps(schema), client, conf=ser_conf, rule_conf=rule_conf)
1336+
dek_client = executor.executor.client
1337+
ser_ctx = SerializationContext(_TOPIC, MessageField.VALUE)
1338+
obj_bytes = await ser(obj, ser_ctx)
1339+
1340+
assert obj['value']['strValue'] != 'sensitive-data'
1341+
obj['value']['strValue'] = 'sensitive-data'
1342+
1343+
deser = await AsyncJSONDeserializer(None, schema_registry_client=client, rule_conf=rule_conf)
1344+
executor.executor.client = dek_client
1345+
obj2 = await deser(obj_bytes, ser_ctx)
1346+
assert obj == obj2
1347+
1348+
1349+
async def test_json_allof_with_refs():
1350+
"""
1351+
Test allOf with $refs to ensure composition works correctly.
1352+
"""
1353+
executor = FieldEncryptionExecutor.register_with_clock(FakeClock())
1354+
1355+
conf = {'url': _BASE_URL}
1356+
client = AsyncSchemaRegistryClient.new_client(conf)
1357+
ser_conf = {'auto.register.schemas': False, 'use.latest.version': True}
1358+
rule_conf = {'secret': 'mysecret'}
1359+
1360+
schema = {
1361+
"type": "object",
1362+
"properties": {"entity": {"allOf": [{"$ref": "#/$defs/BaseEntity"}, {"$ref": "#/$defs/Timestamped"}]}},
1363+
"$defs": {
1364+
"BaseEntity": {"type": "object", "properties": {"id": {"type": "string", "confluent:tags": ["PII"]}}},
1365+
"Timestamped": {"type": "object", "properties": {"createdAt": {"type": "integer"}}},
1366+
},
1367+
}
1368+
1369+
rule = Rule(
1370+
"test-encrypt",
1371+
"",
1372+
RuleKind.TRANSFORM,
1373+
RuleMode.WRITEREAD,
1374+
"ENCRYPT",
1375+
["PII"],
1376+
RuleParams({"encrypt.kek.name": "kek1", "encrypt.kms.type": "local-kms", "encrypt.kms.key.id": "mykey"}),
1377+
None,
1378+
None,
1379+
"ERROR,NONE",
1380+
False,
1381+
)
1382+
await client.register_schema(_SUBJECT, Schema(json.dumps(schema), "JSON", [], None, RuleSet(None, [rule])))
1383+
1384+
obj = {"entity": {"id": "entity-123", "createdAt": 1234567890}}
1385+
1386+
ser = await AsyncJSONSerializer(json.dumps(schema), client, conf=ser_conf, rule_conf=rule_conf)
1387+
dek_client = executor.executor.client
1388+
ser_ctx = SerializationContext(_TOPIC, MessageField.VALUE)
1389+
obj_bytes = await ser(obj, ser_ctx)
1390+
1391+
assert obj['entity']['id'] != 'entity-123'
1392+
obj['entity']['id'] = 'entity-123'
1393+
1394+
deser = await AsyncJSONDeserializer(None, schema_registry_client=client, rule_conf=rule_conf)
1395+
executor.executor.client = dek_client
1396+
obj2 = await deser(obj_bytes, ser_ctx)
1397+
assert obj == obj2
1398+
1399+
1400+
async def test_json_deeply_nested_refs():
1401+
"""
1402+
Test deeply nested $refs (3 levels) with oneOf to ensure resolver context
1403+
is maintained through multiple levels of resolution.
1404+
"""
1405+
executor = FieldEncryptionExecutor.register_with_clock(FakeClock())
1406+
1407+
conf = {'url': _BASE_URL}
1408+
client = AsyncSchemaRegistryClient.new_client(conf)
1409+
ser_conf = {'auto.register.schemas': False, 'use.latest.version': True}
1410+
rule_conf = {'secret': 'mysecret'}
1411+
1412+
# 3-level nesting: oneOf -> Level1 -> Level2 -> Level3
1413+
schema = {
1414+
"type": "object",
1415+
"properties": {"data": {"oneOf": [{"$ref": "#/$defs/Level1"}]}},
1416+
"$defs": {
1417+
"Level1": {"type": "object", "properties": {"level2": {"$ref": "#/$defs/Level2"}}},
1418+
"Level2": {"type": "object", "properties": {"level3": {"$ref": "#/$defs/Level3"}}},
1419+
"Level3": {"type": "object", "properties": {"secretData": {"type": "string", "confluent:tags": ["PII"]}}},
1420+
},
1421+
}
1422+
1423+
rule = Rule(
1424+
"test-encrypt",
1425+
"",
1426+
RuleKind.TRANSFORM,
1427+
RuleMode.WRITEREAD,
1428+
"ENCRYPT",
1429+
["PII"],
1430+
RuleParams({"encrypt.kek.name": "kek1", "encrypt.kms.type": "local-kms", "encrypt.kms.key.id": "mykey"}),
1431+
None,
1432+
None,
1433+
"ERROR,NONE",
1434+
False,
1435+
)
1436+
await client.register_schema(_SUBJECT, Schema(json.dumps(schema), "JSON", [], None, RuleSet(None, [rule])))
1437+
1438+
obj = {"data": {"level2": {"level3": {"secretData": "deep-secret"}}}}
1439+
1440+
ser = await AsyncJSONSerializer(json.dumps(schema), client, conf=ser_conf, rule_conf=rule_conf)
1441+
dek_client = executor.executor.client
1442+
ser_ctx = SerializationContext(_TOPIC, MessageField.VALUE)
1443+
obj_bytes = await ser(obj, ser_ctx)
1444+
1445+
assert obj['data']['level2']['level3']['secretData'] != 'deep-secret'
1446+
obj['data']['level2']['level3']['secretData'] = 'deep-secret'
1447+
1448+
deser = await AsyncJSONDeserializer(None, schema_registry_client=client, rule_conf=rule_conf)
1449+
executor.executor.client = dek_client
1450+
obj2 = await deser(obj_bytes, ser_ctx)
1451+
assert obj == obj2

0 commit comments

Comments
 (0)