Skip to content

Commit a0496cf

Browse files
authored
DGS-24052 Handle anyOf/allOf in JSON transforms (#2237)
* DGS-24052 Handle anyOf/allOf in JSON transforms * Update changelog * Fix style * Fix mypy
1 parent f5f03dc commit a0496cf

6 files changed

Lines changed: 329 additions & 28 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
### Fixes
66

77
- Fix URL joining in Python client (#2228)
8+
- Handle anyOf/allOf in JSON transforms (#2237)
89

910

1011
## v2.14.0 - 2026-04-01

src/confluent_kafka/schema_registry/_async/mock_schema_registry_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ def clear(self):
152152

153153
class _AssociationStore(object):
154154

155-
def __init__(self):
155+
def __init__(self) -> None:
156156
self.lock = Lock()
157157
# Key: resource_id -> List[Association]
158158
self.associations_by_resource_id: Dict[str, List[Association]] = defaultdict(list)

src/confluent_kafka/schema_registry/_sync/mock_schema_registry_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ def clear(self):
152152

153153
class _AssociationStore(object):
154154

155-
def __init__(self):
155+
def __init__(self) -> None:
156156
self.lock = Lock()
157157
# Key: resource_id -> List[Association]
158158
self.associations_by_resource_id: Dict[str, List[Association]] = defaultdict(list)

src/confluent_kafka/schema_registry/common/json_schema.py

Lines changed: 60 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -88,20 +88,38 @@ def transform(
8888
finally:
8989
schema["type"] = original_type # restore original type
9090
all_of = schema.get("allOf")
91-
if all_of is not None:
92-
subschema = _validate_subschemas(all_of, message, ref_registry, ref_resolver)
93-
if subschema is not None:
94-
return transform(ctx, subschema, ref_registry, ref_resolver, path, message, field_transform)
9591
any_of = schema.get("anyOf")
96-
if any_of is not None:
97-
subschema = _validate_subschemas(any_of, message, ref_registry, ref_resolver)
98-
if subschema is not None:
99-
return transform(ctx, subschema, ref_registry, ref_resolver, path, message, field_transform)
10092
one_of = schema.get("oneOf")
101-
if one_of is not None:
102-
subschema = _validate_subschemas(one_of, message, ref_registry, ref_resolver)
103-
if subschema is not None:
104-
return transform(ctx, subschema, ref_registry, ref_resolver, path, message, field_transform)
93+
if all_of is not None or any_of is not None or one_of is not None:
94+
if all_of is not None:
95+
for subschema in all_of:
96+
message = transform(ctx, subschema, ref_registry, ref_resolver, path, message, field_transform)
97+
elif one_of is not None:
98+
for subschema in one_of:
99+
resolved = _validate_subschema(subschema, message, ref_registry, ref_resolver)
100+
if resolved is not None:
101+
message = transform(ctx, resolved, ref_registry, ref_resolver, path, message, field_transform)
102+
break
103+
elif any_of is not None:
104+
for subschema in any_of:
105+
resolved = _validate_subschema(subschema, message, ref_registry, ref_resolver)
106+
if resolved is not None:
107+
message = transform(ctx, resolved, ref_registry, ref_resolver, path, message, field_transform)
108+
# Also visit sibling properties/items at this level
109+
# (siblings to allOf/anyOf/oneOf).
110+
props = schema.get("properties")
111+
if props is not None and isinstance(message, dict):
112+
for prop_name, prop_schema in props.items():
113+
if isinstance(prop_schema, dict):
114+
_transform_field(
115+
ctx, path, prop_name, message, prop_schema, ref_registry, ref_resolver, field_transform
116+
)
117+
items = schema.get("items")
118+
if items is not None and isinstance(message, list):
119+
message = [
120+
transform(ctx, items, ref_registry, ref_resolver, path, item, field_transform) for item in message
121+
]
122+
return message
105123
items = schema.get("items")
106124
if items is not None:
107125
if isinstance(message, list):
@@ -197,23 +215,39 @@ def _validate_subschemas(
197215
The validated schema if the message is valid against the subschemas, otherwise None.
198216
"""
199217
for subschema in subschemas:
200-
if isinstance(subschema, dict):
201-
try:
202-
ref = subschema.get("$ref")
203-
if ref is not None:
204-
resolved = resolver.lookup(ref)
205-
subschema = resolved.contents
206-
# Pass _resolver (not resolver) to use the new referencing library's
207-
# Resolver with correct context for nested $ref resolution
208-
validate(instance=message, schema=subschema, registry=registry, _resolver=resolved.resolver)
209-
else:
210-
validate(instance=message, schema=subschema, registry=registry)
211-
return subschema
212-
except ValidationError:
213-
pass
218+
resolved = _validate_subschema(subschema, message, registry, resolver)
219+
if resolved is not None:
220+
return resolved
214221
return None
215222

216223

224+
def _validate_subschema(
225+
subschema: JsonSchema,
226+
message: JsonMessage,
227+
registry: Registry,
228+
resolver: Resolver,
229+
) -> Optional[JsonSchema]:
230+
"""
231+
Validate the message against a single subschema.
232+
Returns the resolved subschema (with $ref followed) if valid, otherwise None.
233+
"""
234+
if not isinstance(subschema, dict):
235+
return None
236+
try:
237+
ref = subschema.get("$ref")
238+
if ref is not None:
239+
resolved = resolver.lookup(ref)
240+
subschema = resolved.contents
241+
# Pass _resolver (not resolver) to use the new referencing library's
242+
# Resolver with correct context for nested $ref resolution
243+
validate(instance=message, schema=subschema, registry=registry, _resolver=resolved.resolver)
244+
else:
245+
validate(instance=message, schema=subschema, registry=registry)
246+
return subschema
247+
except ValidationError:
248+
return None
249+
250+
217251
def get_type(schema: JsonSchema) -> FieldType:
218252
if isinstance(schema, bool):
219253
return FieldType.COMBINED

tests/schema_registry/_async/test_json_serdes.py

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -582,6 +582,139 @@ async def test_json_cel_field_transform_with_def():
582582
assert obj2 == newobj
583583

584584

585+
async def test_json_cel_field_transform_all_of():
586+
conf = {'url': _BASE_URL}
587+
client = AsyncSchemaRegistryClient.new_client(conf)
588+
ser_conf = {'auto.register.schemas': False, 'use.latest.version': True}
589+
schema = {
590+
"type": "object",
591+
"properties": {
592+
"pins": {
593+
"type": "object",
594+
"allOf": [
595+
{"properties": {"pin": {"confluent:tags": ["PII"], "type": ["string", "null"]}}},
596+
{"properties": {"npin": {"confluent:tags": ["PII"], "type": ["string", "null"]}}},
597+
],
598+
}
599+
},
600+
}
601+
602+
rule = Rule(
603+
"test-cel",
604+
"",
605+
RuleKind.TRANSFORM,
606+
RuleMode.WRITE,
607+
"CEL_FIELD",
608+
["PII"],
609+
None,
610+
"value + '-suffix'",
611+
None,
612+
None,
613+
False,
614+
)
615+
await client.register_schema(_SUBJECT, Schema(json.dumps(schema), "JSON", [], None, RuleSet(None, [rule])))
616+
617+
obj = {'pins': {'pin': 'P123456789', 'npin': 'NP00012345678'}}
618+
ser = await AsyncJSONSerializer(json.dumps(schema), client, conf=ser_conf)
619+
ser_ctx = SerializationContext(_TOPIC, MessageField.VALUE)
620+
obj_bytes = await ser(obj, ser_ctx)
621+
622+
obj2 = {'pins': {'pin': 'P123456789-suffix', 'npin': 'NP00012345678-suffix'}}
623+
deser = await AsyncJSONDeserializer(None, schema_registry_client=client)
624+
newobj = await deser(obj_bytes, ser_ctx)
625+
assert obj2 == newobj
626+
627+
628+
async def test_json_cel_field_transform_nested_any_of():
629+
conf = {'url': _BASE_URL}
630+
client = AsyncSchemaRegistryClient.new_client(conf)
631+
ser_conf = {'auto.register.schemas': False, 'use.latest.version': True}
632+
schema = {
633+
"type": "object",
634+
"properties": {
635+
"pins": {
636+
"type": "object",
637+
"anyOf": [
638+
{"properties": {"pin": {"confluent:tags": ["PII"], "type": ["string", "null"]}}},
639+
{"properties": {"npin": {"confluent:tags": ["PII"], "type": ["string", "null"]}}},
640+
],
641+
}
642+
},
643+
}
644+
645+
rule = Rule(
646+
"test-cel",
647+
"",
648+
RuleKind.TRANSFORM,
649+
RuleMode.WRITE,
650+
"CEL_FIELD",
651+
["PII"],
652+
None,
653+
"value + '-suffix'",
654+
None,
655+
None,
656+
False,
657+
)
658+
await client.register_schema(_SUBJECT, Schema(json.dumps(schema), "JSON", [], None, RuleSet(None, [rule])))
659+
660+
obj = {'pins': {'pin': 'P123456789', 'npin': 'NP00012345678'}}
661+
ser = await AsyncJSONSerializer(json.dumps(schema), client, conf=ser_conf)
662+
ser_ctx = SerializationContext(_TOPIC, MessageField.VALUE)
663+
obj_bytes = await ser(obj, ser_ctx)
664+
665+
obj2 = {'pins': {'pin': 'P123456789-suffix', 'npin': 'NP00012345678-suffix'}}
666+
deser = await AsyncJSONDeserializer(None, schema_registry_client=client)
667+
newobj = await deser(obj_bytes, ser_ctx)
668+
assert obj2 == newobj
669+
670+
671+
async def test_json_cel_field_transform_sibling_any_of():
672+
conf = {'url': _BASE_URL}
673+
client = AsyncSchemaRegistryClient.new_client(conf)
674+
ser_conf = {'auto.register.schemas': False, 'use.latest.version': True}
675+
schema = {
676+
"type": "object",
677+
"properties": {
678+
"pins": {
679+
"type": "object",
680+
"anyOf": [
681+
{"required": ["pin"]},
682+
{"required": ["npin"]},
683+
],
684+
"properties": {
685+
"pin": {"confluent:tags": ["PII"], "type": ["string", "null"]},
686+
"npin": {"confluent:tags": ["PII"], "type": ["string", "null"]},
687+
},
688+
}
689+
},
690+
}
691+
692+
rule = Rule(
693+
"test-cel",
694+
"",
695+
RuleKind.TRANSFORM,
696+
RuleMode.WRITE,
697+
"CEL_FIELD",
698+
["PII"],
699+
None,
700+
"value + '-suffix'",
701+
None,
702+
None,
703+
False,
704+
)
705+
await client.register_schema(_SUBJECT, Schema(json.dumps(schema), "JSON", [], None, RuleSet(None, [rule])))
706+
707+
obj = {'pins': {'pin': 'P123456789', 'npin': 'NP00012345678'}}
708+
ser = await AsyncJSONSerializer(json.dumps(schema), client, conf=ser_conf)
709+
ser_ctx = SerializationContext(_TOPIC, MessageField.VALUE)
710+
obj_bytes = await ser(obj, ser_ctx)
711+
712+
obj2 = {'pins': {'pin': 'P123456789-suffix', 'npin': 'NP00012345678-suffix'}}
713+
deser = await AsyncJSONDeserializer(None, schema_registry_client=client)
714+
newobj = await deser(obj_bytes, ser_ctx)
715+
assert obj2 == newobj
716+
717+
585718
async def test_json_cel_field_transform_complex():
586719
conf = {'url': _BASE_URL}
587720
client = AsyncSchemaRegistryClient.new_client(conf)

0 commit comments

Comments
 (0)