3838import java .util .ArrayList ;
3939import java .util .Collection ;
4040import java .util .HashMap ;
41+ import java .util .HashSet ;
4142import java .util .LinkedHashMap ;
4243import java .util .List ;
4344import java .util .Map ;
45+ import java .util .Set ;
4446import org .apache .avro .AvroTypeException ;
4547import org .apache .avro .Conversion ;
4648import org .apache .avro .LogicalType ;
@@ -172,6 +174,51 @@ public void add(Object value) {
172174 }
173175 }
174176
177+ private static void addLogicalTypeConversion (SpecificData model , Schema schema , Set <Schema > seenSchemas )
178+ throws IllegalAccessException {
179+ if (seenSchemas .contains (schema )) {
180+ return ;
181+ }
182+ seenSchemas .add (schema );
183+
184+ switch (schema .getType ()) {
185+ case RECORD :
186+ final Class <?> clazz = model .getClass (schema );
187+ if (clazz != null ) {
188+ try {
189+ final Field conversionsField = clazz .getDeclaredField ("conversions" );
190+ conversionsField .setAccessible (true );
191+ final Conversion <?>[] conversions = (Conversion <?>[]) conversionsField .get (null );
192+ for (Conversion <?> conversion : conversions ) {
193+ if (conversion != null ) {
194+ model .addLogicalTypeConversion (conversion );
195+ }
196+ }
197+
198+ for (Schema .Field field : schema .getFields ()) {
199+ addLogicalTypeConversion (model , field .schema (), seenSchemas );
200+ }
201+ } catch (NoSuchFieldException e ) {
202+ // Avro classes without logical types (denoted by the "conversions" field)
203+ }
204+ }
205+ break ;
206+ case MAP :
207+ addLogicalTypeConversion (model , schema .getValueType (), seenSchemas );
208+ break ;
209+ case ARRAY :
210+ addLogicalTypeConversion (model , schema .getElementType (), seenSchemas );
211+ break ;
212+ case UNION :
213+ for (Schema type : schema .getTypes ()) {
214+ addLogicalTypeConversion (model , type , seenSchemas );
215+ }
216+ break ;
217+ default :
218+ break ;
219+ }
220+ }
221+
175222 /**
176223 * Returns the specific data model for a given SpecificRecord schema by reflecting the underlying
177224 * Avro class's `MODEL$` field, or Null if the class is not on the classpath or reflection fails.
@@ -197,49 +244,29 @@ static SpecificData getModelForSchema(Schema schema) {
197244
198245 model = (SpecificData ) modelField .get (null );
199246 } catch (NoSuchFieldException e ) {
200- LOG .info (String .format (
201- "Generated Avro class %s did not contain a MODEL$ field. Parquet will use default SpecificData model for "
202- + "reading and writing." ,
203- clazz ));
247+ LOG .info (String .format ("Generated Avro class %s did not contain a MODEL$ field. " , clazz )
248+ + "Parquet will use default SpecificData model for reading and writing." );
204249 return null ;
205250 } catch (IllegalAccessException e ) {
206251 LOG .warn (
207- String .format (
208- "Field `MODEL$` in class %s was inaccessible. Parquet will use default SpecificData model for "
209- + "reading and writing." ,
210- clazz ),
252+ String .format ("Field `MODEL$` in class %s was inaccessible. " , clazz )
253+ + "Parquet will use default SpecificData model for reading and writing." ,
211254 e );
212255 return null ;
213256 }
214257
215258 final String avroVersion = getRuntimeAvroVersion ();
216259 // Avro 1.7 and 1.8 don't include conversions in the MODEL$ field by default
217260 if (avroVersion != null && (avroVersion .startsWith ("1.8." ) || avroVersion .startsWith ("1.7." ))) {
218- final Field conversionsField ;
219261 try {
220- conversionsField = clazz .getDeclaredField ("conversions" );
221- } catch (NoSuchFieldException e ) {
222- // Avro classes without logical types (denoted by the "conversions" field) can be returned as-is
223- return model ;
224- }
225-
226- final Conversion <?>[] conversions ;
227- try {
228- conversionsField .setAccessible (true );
229- conversions = (Conversion <?>[]) conversionsField .get (null );
262+ addLogicalTypeConversion (model , schema , new HashSet <>());
230263 } catch (IllegalAccessException e ) {
231- LOG .warn (String . format (
232- "Field ` conversions` in class %s was inaccessible. Parquet will use default "
233- + "SpecificData model for reading and writing." ,
234- clazz ) );
264+ LOG .warn (
265+ String . format ( "Logical-type conversions were inaccessible for %s" , clazz )
266+ + "Parquet will use default SpecificData model for reading and writing." ,
267+ e );
235268 return null ;
236269 }
237-
238- for (int i = 0 ; i < conversions .length ; i ++) {
239- if (conversions [i ] != null ) {
240- model .addLogicalTypeConversion (conversions [i ]);
241- }
242- }
243270 }
244271
245272 return model ;
0 commit comments