Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions avro-data/src/main/java/io/confluent/connect/avro/AvroData.java
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,10 @@ avroSchema, maybeWrapSchemaless(schema, converted, ANYTHING_SCHEMA_MAP_FIELD),
//This handles the inverting of a union which is held as a struct, where each field is
// one of the union types.
if (isUnionSchema(schema)) {
for (Field field : schema.fields()) {
List<Field> fields = schema.fields();
int numFields = fields.size();
for (int i = 0; i < numFields; i++) {
Field field = fields.get(i);
Object object = ignoreDefaultForNullables
? struct.getWithoutDefault(field.name()) : struct.get(field);
if (object != null) {
Expand All @@ -625,7 +628,10 @@ avroSchema, maybeWrapSchemaless(schema, converted, ANYTHING_SCHEMA_MAP_FIELD),
org.apache.avro.Schema underlyingAvroSchema = avroSchemaForUnderlyingTypeIfOptional(
schema, avroSchema, scrubInvalidNames);
GenericRecordBuilder convertedBuilder = new GenericRecordBuilder(underlyingAvroSchema);
for (Field field : schema.fields()) {
List<Field> fields = schema.fields();
int numFields = fields.size();
for (int i = 0; i < numFields; i++) {
Field field = fields.get(i);
String fieldName = scrubName(field.name(), scrubInvalidNames);
org.apache.avro.Schema.Field theField = underlyingAvroSchema.getField(fieldName);
org.apache.avro.Schema fieldAvroSchema = theField.schema();
Expand Down Expand Up @@ -1648,7 +1654,10 @@ private Object toConnectData(Schema schema, Object value, ToConnectContext toCon
valueRecord.getSchema(), true, null, null, toConnectContext);
}
int index = 0;
for (Field field : schema.fields()) {
List<Field> fields = schema.fields();
int numFields = fields.size();
for (int i = 0; i < numFields; i++) {
Field field = fields.get(i);
Schema fieldSchema = field.schema();
if (isInstanceOfAvroSchemaTypeForSimpleSchema(fieldSchema, value, index)
|| (valueRecordSchema != null && schemaEquals(valueRecordSchema, fieldSchema))) {
Expand All @@ -1666,7 +1675,10 @@ private Object toConnectData(Schema schema, Object value, ToConnectContext toCon
// Default values from Avro are returned as Map
Map<CharSequence, Object> original = (Map<CharSequence, Object>) value;
Struct result = new Struct(schema);
for (Field field : schema.fields()) {
List<Field> fields = schema.fields();
int numFields = fields.size();
for (int i = 0; i < numFields; i++) {
Field field = fields.get(i);
String fieldName = scrubName(field.name());
Object convertedFieldValue = toConnectData(field.schema(),
original.getOrDefault(fieldName, field.schema().defaultValue()),
Expand All @@ -1677,7 +1689,10 @@ private Object toConnectData(Schema schema, Object value, ToConnectContext toCon
} else {
IndexedRecord original = (IndexedRecord) value;
Struct result = new Struct(schema);
for (Field field : schema.fields()) {
List<Field> fields = schema.fields();
int numFields = fields.size();
for (int i = 0; i < numFields; i++) {
Field field = fields.get(i);
String fieldName = scrubName(field.name());
int avroFieldIndex = original.getSchema().getField(fieldName).pos();
Object convertedFieldValue =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,10 @@ public JsonNode fromConnectData(Schema schema, Object logicalValue) {
//This handles the inverting of a union which is held as a struct, where each field is
// one of the union types.
if (isUnionSchema(schema)) {
for (Field field : schema.fields()) {
List<Field> fields = schema.fields();
int numFields = fields.size();
for (int i = 0; i < numFields; i++) {
Field field = fields.get(i);
Object object = config.ignoreDefaultForNullables()
? struct.getWithoutDefault(field.name()) : struct.get(field);
if (object != null) {
Expand All @@ -555,7 +558,10 @@ public JsonNode fromConnectData(Schema schema, Object logicalValue) {
return fromConnectData(schema, null);
} else {
ObjectNode obj = JSON_NODE_FACTORY.objectNode();
for (Field field : schema.fields()) {
List<Field> fields = schema.fields();
int numFields = fields.size();
for (int i = 0; i < numFields; i++) {
Field field = fields.get(i);
Object fieldValue = config.ignoreDefaultForNullables()
? struct.getWithoutDefault(field.name()) : struct.get(field);
JsonNode jsonNode = fromConnectData(field.schema(), fieldValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,15 +460,15 @@ private Object fromConnectData(
if (listValue.isEmpty()) {
return null;
}
List<Object> newListValue = new ArrayList<>();
List<Object> newListValue = new ArrayList<>(listValue.size());
for (Object o : listValue) {
newListValue.add(fromConnectData(ctx, schema.valueSchema(), scope, o, protobufSchema));
}
return newListValue;
case MAP:
final Map<?, ?> mapValue = (Map<?, ?>) value;
String scopedMapName = ((Descriptor) ctx).getFullName();
List<Message> newMapValue = new ArrayList<>();
List<Message> newMapValue = new ArrayList<>(mapValue.size());
for (Map.Entry<?, ?> mapEntry : mapValue.entrySet()) {
DynamicMessage.Builder mapBuilder = protobufSchema.newMessageBuilder(scopedMapName);
if (mapBuilder == null) {
Expand Down Expand Up @@ -508,7 +508,10 @@ private Object fromConnectData(
//This handles the inverting of a union which is held as a struct, where each field is
// one of the union types.
if (isUnionSchema(schema)) {
for (Field field : schema.fields()) {
List<Field> fields = schema.fields();
int numFields = fields.size();
for (int i = 0; i < numFields; i++) {
Field field = fields.get(i);
Object object = ignoreDefaultForNullables
? struct.getWithoutDefault(field.name()) : struct.get(field);
if (object != null) {
Expand All @@ -527,7 +530,10 @@ private Object fromConnectData(
if (messageBuilder == null) {
throw new DataException("Invalid message name: " + scopedStructName);
}
for (Field field : schema.fields()) {
List<Field> fields = schema.fields();
int numFields = fields.size();
for (int i = 0; i < numFields; i++) {
Field field = fields.get(i);
String fieldName = scrubName(field.name());
Object fieldCtx = getFieldType(ctx, fieldName);
Object connectFieldVal = ignoreDefaultForNullables
Expand Down Expand Up @@ -1224,7 +1230,10 @@ protected Object toConnectData(Schema schema, Object value) {
final Struct struct = new Struct(schema.schema());
final Descriptor descriptor = message.getDescriptorForType();

for (OneofDescriptor oneOfDescriptor : descriptor.getRealOneofs()) {
List<OneofDescriptor> oneOfDescriptors = descriptor.getRealOneofs();
int numRealOneOfs = oneOfDescriptors.size();
for (int i = 0; i < numRealOneOfs; i++) {
OneofDescriptor oneOfDescriptor = oneOfDescriptors.get(i);
if (message.hasOneof(oneOfDescriptor)) {
FieldDescriptor fieldDescriptor = message.getOneofFieldDescriptor(oneOfDescriptor);
Object obj = message.getField(fieldDescriptor);
Expand All @@ -1238,7 +1247,10 @@ protected Object toConnectData(Schema schema, Object value) {
}
}

for (FieldDescriptor fieldDescriptor : descriptor.getFields()) {
List<FieldDescriptor> fields = descriptor.getFields();
int numFields = fields.size();
for (int i = 0; i < numFields; i++) {
FieldDescriptor fieldDescriptor = fields.get(i);
OneofDescriptor oneOfDescriptor = fieldDescriptor.getRealContainingOneof();
if (oneOfDescriptor != null) {
// Already added field as oneof
Expand Down
Loading