Skip to content

Commit dd0cdaf

Browse files
authored
Add avro schema passthrough (#211)
* Add avro schema passthrough * Rename AvroSchemaProvider
1 parent 403f38e commit dd0cdaf

18 files changed

Lines changed: 939 additions & 37 deletions

File tree

hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroConverter.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ public final class AvroConverter {
2525

2626
private static final String KEY_OPTION = "key.fields";
2727
private static final String KEY_PREFIX_OPTION = "key.fields-prefix";
28-
private static final String PRIMITIVE_KEY = "KEY";
2928

3029
private AvroConverter() {
3130
}
@@ -130,7 +129,7 @@ public static Pair<Schema, Schema> avroKeyPayloadSchema(String namespace, String
130129
String keyName = field.getName().substring(keyPrefix.length());
131130

132131
// Key is a primitive
133-
if (keyNames.size() == 1 && keyName.equals(PRIMITIVE_KEY)) {
132+
if (keyNames.size() == 1 && keyName.equals(AvroSchemas.PRIMITIVE_KEY_NAME)) {
134133
primitiveKeySchema = avro(namespace, keySchemaName, field.getType());
135134
} else {
136135
keyBuilder.add(keyName, field.getType());
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package com.linkedin.hoptimator.avro;
2+
3+
import org.apache.avro.Schema;
4+
5+
6+
/**
7+
* Implemented by Calcite {@link org.apache.calcite.schema.Table}s backed by native Avro metadata.
8+
* Exposes source-of-truth key and value schemas to downstream consumers (connector deployers,
9+
* resolvers) without round-tripping through Calcite's {@code RelDataType} — preserving namespaces,
10+
* nested record names, reused record definitions, default values, and enum/fixed metadata that
11+
* the type system flattens away.
12+
*
13+
* <p>Consumers pick the view that matches their need:
14+
* <ul>
15+
* <li>{@link #valueSchema()} — the record's data payload.
16+
* <li>{@link #keySchema()} — the record's key schema, or {@code null} when the table has no
17+
* distinct key concept. A struct key exposes its fields directly; a primitive key returns
18+
* a primitive {@link Schema}.
19+
* </ul>
20+
*/
21+
public interface AvroSchemaSource {
22+
23+
/**
24+
* Returns the value/payload Avro schema — the data record's schema without any query-layer
25+
* scaffolding like {@code KEY_}-prefixed key fields. Connector payload options should render
26+
* this.
27+
*/
28+
Schema valueSchema();
29+
30+
/**
31+
* Returns the key Avro schema, or {@code null} when this table has no distinct key concept.
32+
* Struct keys expose their fields directly; primitive keys return a primitive {@link Schema}.
33+
* SQL/query-layer consumers may merge this with {@link #valueSchema()} via
34+
* {@link AvroSchemas#mergedAvroSchemaFor(AvroSchemaSource)}.
35+
*/
36+
default Schema keySchema() {
37+
return null;
38+
}
39+
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package com.linkedin.hoptimator.avro;
2+
3+
import org.apache.avro.Schema;
4+
5+
import java.util.ArrayList;
6+
import java.util.List;
7+
8+
9+
/**
10+
* Utilities for composing Avro schemas. The primary use case is Hoptimator-style stores that
11+
* back each logical table with a separate key and value Avro schema and need to expose a single
12+
* flattened record (key fields prefixed, followed by value fields) without round-tripping
13+
* through Calcite's {@code RelDataType}.
14+
*/
15+
public final class AvroSchemas {
16+
17+
/**
18+
* Standard Hoptimator prefix applied to fields contributed by a record-typed key when merging
19+
* into the value schema for SQL/query consumption.
20+
*/
21+
public static final String KEY_PREFIX = "KEY_";
22+
23+
/**
24+
* Standard Hoptimator field name used when a primitive key is merged into the value schema for
25+
* SQL/query consumption.
26+
*/
27+
public static final String PRIMITIVE_KEY_NAME = "KEY";
28+
29+
private AvroSchemas() {
30+
}
31+
32+
/**
33+
* Produces a fresh {@link Schema.Field} clone. Avro's {@code Schema.setFields} rejects Fields
34+
* that already belong to another record (it tracks the field's position as a mutable guard), so
35+
* fields from an input schema cannot be handed to a new record directly. Cloning also lets
36+
* callers rename fields (e.g., apply a {@code KEY_} prefix).
37+
*
38+
* <p>Copies every attribute Avro exposes on a Field: name (from the caller), schema reference
39+
* (the type Schema is shared by reference — nested records, enums, namespaces, etc. survive
40+
* intact), doc, default value, sort order, aliases, and custom properties.
41+
*/
42+
public static Schema.Field cloneField(String name, Schema.Field original) {
43+
Object defaultVal = original.hasDefaultValue() ? original.defaultVal() : null;
44+
Schema.Field clone = new Schema.Field(name, original.schema(), original.doc(), defaultVal,
45+
original.order());
46+
original.aliases().forEach(clone::addAlias);
47+
original.getObjectProps().forEach(clone::addProp);
48+
return clone;
49+
}
50+
51+
/**
52+
* Produces the merged Avro schema for an {@link AvroSchemaSource}: the value schema with key
53+
* fields prepended using Hoptimator's standard convention ({@link #KEY_PREFIX} for struct keys,
54+
* {@link #PRIMITIVE_KEY_NAME} field for primitive keys). When the source has no key
55+
* ({@link AvroSchemaSource#keySchema()} returns {@code null}), returns the value schema
56+
* unchanged.
57+
*
58+
* <p>Centralizes the Hoptimator merging convention so SQL/query-layer consumers (like
59+
* {@code HoptimatorConnection.resolve()}) share one implementation.
60+
*/
61+
public static Schema mergedAvroSchemaFor(AvroSchemaSource source) {
62+
Schema key = source.keySchema();
63+
Schema value = source.valueSchema();
64+
if (key == null) {
65+
return value;
66+
}
67+
return mergeKeyIntoValue(key, value, KEY_PREFIX, PRIMITIVE_KEY_NAME);
68+
}
69+
70+
/**
71+
* Merges a key Avro schema and a value record schema into a single record that inherits the
72+
* value schema's identity (namespace, name, doc, isError flag), aliases, and record-level
73+
* custom properties. Struct keys contribute one field per key field, each prefixed with
74+
* {@code keyPrefix}. Primitive keys (or any non-record key) contribute a single field named
75+
* {@code primitiveKeyName} with the key schema as its type.
76+
*
77+
* @param keySchema key Avro schema. If a record, its fields are prefixed and prepended.
78+
* Otherwise, a single primitive key field is prepended.
79+
* @param valueSchema must be a {@link Schema.Type#RECORD}.
80+
* @param keyPrefix prefix applied to each key field name when key is a record (e.g.
81+
* {@code "KEY_"}).
82+
* @param primitiveKeyName field name used when key is primitive (e.g. {@code "KEY"}).
83+
*/
84+
static Schema mergeKeyIntoValue(Schema keySchema, Schema valueSchema,
85+
String keyPrefix, String primitiveKeyName) {
86+
if (valueSchema.getType() != Schema.Type.RECORD) {
87+
throw new IllegalArgumentException(
88+
"Value schema must be a record; got " + valueSchema.getType());
89+
}
90+
List<Schema.Field> allFields = new ArrayList<>();
91+
if (keySchema.getType() == Schema.Type.RECORD) {
92+
for (Schema.Field kf : keySchema.getFields()) {
93+
allFields.add(cloneField(keyPrefix + kf.name(), kf));
94+
}
95+
} else {
96+
allFields.add(new Schema.Field(primitiveKeyName, keySchema, "Primitive key field.", null));
97+
}
98+
for (Schema.Field vf : valueSchema.getFields()) {
99+
allFields.add(cloneField(vf.name(), vf));
100+
}
101+
Schema merged = Schema.createRecord(
102+
valueSchema.getName(),
103+
valueSchema.getDoc(),
104+
valueSchema.getNamespace(),
105+
valueSchema.isError());
106+
merged.setFields(allFields);
107+
valueSchema.getAliases().forEach(merged::addAlias);
108+
valueSchema.getObjectProps().forEach(merged::addProp);
109+
return merged;
110+
}
111+
}
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
package com.linkedin.hoptimator.avro;
2+
3+
import org.apache.avro.Schema;
4+
import org.apache.avro.SchemaBuilder;
5+
import org.junit.jupiter.api.Test;
6+
7+
import java.util.List;
8+
import java.util.Map;
9+
import java.util.stream.Collectors;
10+
11+
import static org.junit.jupiter.api.Assertions.assertEquals;
12+
import static org.junit.jupiter.api.Assertions.assertFalse;
13+
import static org.junit.jupiter.api.Assertions.assertNotNull;
14+
import static org.junit.jupiter.api.Assertions.assertSame;
15+
import static org.junit.jupiter.api.Assertions.assertThrows;
16+
import static org.junit.jupiter.api.Assertions.assertTrue;
17+
18+
19+
class AvroSchemasTest {
20+
21+
// --- cloneField ---
22+
23+
@Test
24+
void cloneFieldProducesUnownedCopy() {
25+
Schema.Field original = new Schema.Field("a", Schema.create(Schema.Type.STRING), "doc", null);
26+
// Add to a record so position is set (this is what setFields would do).
27+
Schema parent = Schema.createRecord("P", null, "ns", false);
28+
parent.setFields(List.of(original));
29+
assertFalse(original.pos() == -1, "precondition: position assigned by setFields");
30+
31+
Schema.Field clone = AvroSchemas.cloneField("a", original);
32+
33+
// Cloned Field is unowned — can be installed in another record without Avro rejecting it.
34+
Schema other = Schema.createRecord("Q", null, "ns", false);
35+
other.setFields(List.of(clone));
36+
assertEquals("a", other.getField("a").name());
37+
}
38+
39+
@Test
40+
void cloneFieldRenamesField() {
41+
Schema.Field original = new Schema.Field("a", Schema.create(Schema.Type.STRING), null, null);
42+
Schema.Field clone = AvroSchemas.cloneField("KEY_a", original);
43+
assertEquals("KEY_a", clone.name());
44+
assertSame(original.schema(), clone.schema(), "type schema shared by reference");
45+
}
46+
47+
@Test
48+
void cloneFieldPreservesOrderAliasesPropsAndDefault() {
49+
Schema.Field original = new Schema.Field("a", Schema.create(Schema.Type.STRING), "doc",
50+
"fallback", Schema.Field.Order.DESCENDING);
51+
original.addProp("java", Map.of("class", "com.linkedin.common.urn.Urn"));
52+
original.addProp("compliance", "NONE");
53+
original.addAlias("legacyName");
54+
55+
Schema.Field clone = AvroSchemas.cloneField("a", original);
56+
57+
assertEquals("doc", clone.doc());
58+
assertEquals("fallback", clone.defaultVal());
59+
assertEquals(Schema.Field.Order.DESCENDING, clone.order());
60+
assertTrue(clone.aliases().contains("legacyName"));
61+
assertEquals(Map.of("class", "com.linkedin.common.urn.Urn"), clone.getObjectProp("java"));
62+
assertEquals("NONE", clone.getObjectProp("compliance"));
63+
}
64+
65+
@Test
66+
void cloneFieldHandlesNoDefaultValue() {
67+
Schema.Field original = new Schema.Field("a", Schema.create(Schema.Type.STRING), null, null);
68+
// `original` has no default (the null passed above is the default-value arg; field has no
69+
// default because it's not a nullable union). hasDefaultValue() is false.
70+
assertFalse(original.hasDefaultValue());
71+
72+
Schema.Field clone = AvroSchemas.cloneField("a", original);
73+
74+
assertFalse(clone.hasDefaultValue(), "no-default fields clone without inventing one");
75+
}
76+
77+
// --- mergeKeyIntoValue ---
78+
79+
@Test
80+
void mergeKeyIntoValueInheritsValueSchemaIdentityAndProps() {
81+
Schema keySchema = SchemaBuilder.record("Key").fields().requiredString("id").endRecord();
82+
Schema valueSchema = SchemaBuilder.record("User").namespace("com.linkedin.foo")
83+
.aliases("com.linkedin.foo.UserV1").doc("User doc")
84+
.prop("owningTeam", "urn:li:internalTeam:feed")
85+
.fields().requiredString("name").endRecord();
86+
87+
Schema merged = AvroSchemas.mergeKeyIntoValue(keySchema, valueSchema, "KEY_", "KEY");
88+
89+
assertEquals("User doc", merged.getDoc());
90+
assertTrue(merged.getAliases().contains("com.linkedin.foo.UserV1"));
91+
assertEquals("urn:li:internalTeam:feed", merged.getObjectProp("owningTeam"));
92+
}
93+
94+
@Test
95+
void mergeKeyIntoValueThrowsForNonRecordValue() {
96+
assertThrows(IllegalArgumentException.class,
97+
() -> AvroSchemas.mergeKeyIntoValue(
98+
Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.STRING), "KEY_", "KEY"));
99+
}
100+
101+
@Test
102+
void mergeKeyIntoValueStructKeyGetsPrefix() {
103+
Schema keySchema = SchemaBuilder.record("Key").namespace("com.linkedin.k").fields()
104+
.requiredString("id").requiredInt("partition").endRecord();
105+
Schema valueSchema = SchemaBuilder.record("User").namespace("com.linkedin.v").fields()
106+
.requiredString("name").endRecord();
107+
108+
Schema merged = AvroSchemas.mergeKeyIntoValue(keySchema, valueSchema, "KEY_", "KEY");
109+
110+
assertEquals(List.of("KEY_id", "KEY_partition", "name"),
111+
merged.getFields().stream().map(Schema.Field::name).collect(Collectors.toList()));
112+
assertEquals("com.linkedin.v", merged.getNamespace(), "merged inherits value namespace");
113+
assertEquals("User", merged.getName(), "merged inherits value name");
114+
}
115+
116+
@Test
117+
void mergeKeyIntoValuePrimitiveKeyBecomesSingleNamedField() {
118+
Schema keySchema = Schema.create(Schema.Type.STRING);
119+
Schema valueSchema = SchemaBuilder.record("User").namespace("com.linkedin.v").fields()
120+
.requiredString("name").endRecord();
121+
122+
Schema merged = AvroSchemas.mergeKeyIntoValue(keySchema, valueSchema, "KEY_", "KEY");
123+
124+
assertEquals(2, merged.getFields().size());
125+
assertEquals("KEY", merged.getFields().get(0).name());
126+
assertEquals(Schema.Type.STRING, merged.getFields().get(0).schema().getType());
127+
assertEquals("name", merged.getFields().get(1).name());
128+
}
129+
130+
@Test
131+
void mergeKeyIntoValuePreservesNestedRecordIdentities() {
132+
// Nested records in value (or key) keep their source namespaces because the Schema reference
133+
// is shared by cloneField — no round-trip through a type system.
134+
Schema address = SchemaBuilder.record("Address").namespace("com.linkedin.addr").fields()
135+
.requiredString("city").endRecord();
136+
Schema valueSchema = SchemaBuilder.record("User").namespace("com.linkedin.v").fields()
137+
.name("address").type(address).noDefault().endRecord();
138+
139+
Schema merged = AvroSchemas.mergeKeyIntoValue(
140+
Schema.create(Schema.Type.STRING), valueSchema, "KEY_", "KEY");
141+
142+
Schema addrField = merged.getField("address").schema();
143+
assertSame(address, addrField, "nested record schema shared by reference");
144+
assertEquals("com.linkedin.addr", addrField.getNamespace());
145+
}
146+
147+
@Test
148+
void mergeKeyIntoValueReusedRecordsRenderAsNamedReferences() {
149+
// Same Schema instance referenced twice in the value → Avro serializer writes the record
150+
// definition once and references by FQN thereafter.
151+
Schema shared = SchemaBuilder.record("Shared").namespace("com.linkedin.s").fields()
152+
.requiredString("v").endRecord();
153+
Schema valueSchema = SchemaBuilder.record("User").namespace("com.linkedin.v").fields()
154+
.name("first").type(shared).noDefault()
155+
.name("second").type(shared).noDefault()
156+
.endRecord();
157+
158+
Schema merged = AvroSchemas.mergeKeyIntoValue(
159+
Schema.create(Schema.Type.STRING), valueSchema, "KEY_", "KEY");
160+
String json = merged.toString(false);
161+
162+
int firstDef = json.indexOf("\"name\":\"Shared\"");
163+
int secondDef = json.indexOf("\"name\":\"Shared\"", firstDef + 1);
164+
assertEquals(-1, secondDef, "reused record serialized once; got " + json);
165+
}
166+
167+
@Test
168+
void mergeKeyIntoValueKeyFieldsKeepFieldProps() {
169+
// Custom props on key fields (common for LinkedIn schemas — "java", "validate", etc.) should
170+
// survive the key→KEY_ rename.
171+
Schema keySchema = SchemaBuilder.record("Key").fields()
172+
.name("id").type().stringType().noDefault()
173+
.endRecord();
174+
keySchema.getField("id").addProp("compliance", "NONE");
175+
Schema valueSchema = SchemaBuilder.record("V").namespace("v").fields()
176+
.requiredString("x").endRecord();
177+
178+
Schema merged = AvroSchemas.mergeKeyIntoValue(keySchema, valueSchema, "KEY_", "KEY");
179+
180+
assertEquals("NONE", merged.getField("KEY_id").getObjectProp("compliance"));
181+
}
182+
183+
@Test
184+
void mergeKeyIntoValueSchemaRoundTripsThroughAvroParser() {
185+
// End-to-end sanity: the merged schema is parseable — ensures we don't produce anything that
186+
// would trip Avro's validation (e.g. duplicate names, unresolvable references).
187+
Schema keySchema = SchemaBuilder.record("Key").namespace("com.linkedin.k").fields()
188+
.requiredString("id").endRecord();
189+
Schema valueSchema = SchemaBuilder.record("User").namespace("com.linkedin.v").fields()
190+
.requiredString("name").endRecord();
191+
Schema merged = AvroSchemas.mergeKeyIntoValue(keySchema, valueSchema, "KEY_", "KEY");
192+
193+
Schema reparsed = new Schema.Parser().parse(merged.toString(true));
194+
assertNotNull(reparsed);
195+
assertEquals("com.linkedin.v.User", reparsed.getFullName());
196+
assertEquals(2, reparsed.getFields().size());
197+
}
198+
}

0 commit comments

Comments
 (0)