3030import java .util .function .Function ;
3131import lombok .extern .slf4j .Slf4j ;
3232import org .apache .avro .Schema ;
33+ import org .apache .avro .generic .GenericData ;
3334import org .apache .pulsar .client .api .schema .GenericObject ;
3435import org .apache .pulsar .client .api .schema .GenericRecord ;
3536import org .apache .pulsar .client .api .schema .KeyValueSchema ;
@@ -55,6 +56,61 @@ public List<ColumnId> getColumnsForUpsert() {
5556 throw new IllegalStateException ("UPSERT not supported" );
5657 }
5758
59+ /**
60+ * Handles array value binding for database-specific array types.
61+ * <p>
62+ * This method is called when an array value needs to be bound to a PreparedStatement
63+ * parameter. Implementations should convert the array value to the appropriate
64+ * database-specific array type and bind it to the statement using the appropriate
65+ * JDBC method (typically {@code PreparedStatement.setArray()}).
66+ * </p>
67+ * <p>
68+ * The method is invoked automatically by {@link #setColumnValue(PreparedStatement, int, Object, String)}
69+ * when it detects an array type (specifically {@code org.apache.avro.generic.GenericData$Array}).
70+ * </p>
71+ * <p>
72+ * <strong>Implementation Guidelines:</strong>
73+ * <ul>
74+ * <li>Handle null arrays by calling {@code statement.setNull(index, java.sql.Types.ARRAY)}</li>
75+ * <li>Convert array elements to the appropriate database-specific types</li>
76+ * <li>Use the targetSqlType parameter to determine the correct array element type</li>
77+ * <li>Provide descriptive error messages for type mismatches and unsupported types</li>
78+ * <li>Wrap JDBC exceptions with contextual information</li>
79+ * </ul>
80+ * </p>
81+ * <p>
82+ * <strong>Example Usage:</strong>
83+ * <pre>{@code
84+ * // For PostgreSQL implementation:
85+ * if (arrayValue == null) {
86+ * statement.setNull(index, java.sql.Types.ARRAY);
87+ * return;
88+ * }
89+ *
90+ * Object[] elements = convertToObjectArray(arrayValue);
91+ * String postgresType = mapToPostgresType(targetSqlType);
92+ * Array pgArray = connection.createArrayOf(postgresType, elements);
93+ * statement.setArray(index, pgArray);
94+ * }</pre>
95+ * </p>
96+ *
97+ * @param statement the PreparedStatement to bind the array value to
98+ * @param index the parameter index (1-based) in the PreparedStatement
99+ * @param arrayValue the array value to be bound, typically a {@code GenericData.Array} or {@code Object[]}
100+ * @param targetSqlType the target SQL type name for the array column (e.g., "integer", "text", "_int4")
101+ * @throws Exception if array conversion or binding fails, including:
102+ * <ul>
103+ * <li>{@code IllegalArgumentException} for unsupported array types or type mismatches</li>
104+ * <li>{@code SQLException} for JDBC array creation or binding failures</li>
105+ * <li>{@code UnsupportedOperationException} for databases that don't support arrays</li>
106+ * </ul>
107+ * @see #setColumnValue(PreparedStatement, int, Object, String)
108+ * @see java.sql.PreparedStatement#setArray(int, java.sql.Array)
109+ * @see java.sql.Connection#createArrayOf(String, Object[])
110+ */
111+ protected abstract void handleArrayValue (PreparedStatement statement , int index , Object arrayValue ,
112+ String targetSqlType ) throws Exception ;
113+
58114 @ Override
59115 public void bindValue (PreparedStatement statement , Mutation mutation ) throws Exception {
60116 final List <ColumnId > columns = new ArrayList <>();
@@ -78,13 +134,14 @@ public void bindValue(PreparedStatement statement, Mutation mutation) throws Exc
78134 for (ColumnId columnId : columns ) {
79135 String colName = columnId .getName ();
80136 int colType = columnId .getType ();
137+ String typeName = columnId .getTypeName ();
81138 if (log .isDebugEnabled ()) {
82139 log .debug ("getting value for column: {} type: {}" , colName , colType );
83140 }
84141 try {
85142 Object obj = mutation .getValues ().apply (colName );
86143 if (obj != null ) {
87- setColumnValue (statement , index ++, obj );
144+ setColumnValue (statement , index ++, obj , typeName );
88145 } else {
89146 if (log .isDebugEnabled ()) {
90147 log .debug ("Column {} is null" , colName );
@@ -174,10 +231,73 @@ private static void setColumnNull(PreparedStatement statement, int index, int ty
174231
175232 }
176233
177- protected void setColumnValue (PreparedStatement statement , int index , Object value ) throws Exception {
234+ /**
235+ * Sets a column value in a PreparedStatement, handling various data types including arrays.
236+ * <p>
237+ * This method automatically detects the type of the value and calls the appropriate
238+ * PreparedStatement setter method. It supports primitive types, strings, and arrays.
239+ * Array support is implemented through the {@link #handleArrayValue(PreparedStatement, int, Object, String)}
240+ * method, which delegates to database-specific implementations.
241+ * </p>
242+ * <p>
243+ * <strong>Supported Types:</strong>
244+ * <ul>
245+ * <li>Primitive types: Integer, Long, Double, Float, Boolean, Short</li>
246+ * <li>String types: String</li>
247+ * <li>Binary types: ByteString</li>
248+ * <li>JSON types: GenericJsonRecord</li>
249+ * <li>Array types: GenericData.Array (requires targetSqlType parameter)</li>
250+ * </ul>
251+ * </p>
252+ * <p>
253+ * <strong>Array Handling:</strong>
254+ * When an array is detected (GenericData.Array), this method calls the abstract
255+ * {@link #handleArrayValue(PreparedStatement, int, Object, String)} method, which
256+ * must be implemented by database-specific subclasses. The targetSqlType parameter
257+ * is essential for proper array type conversion.
258+ * </p>
259+ * <p>
260+ * <strong>Example Usage:</strong>
261+ * <pre>{@code
262+ * // Setting a primitive value
263+ * setColumnValue(statement, 1, 42, "integer");
264+ *
265+ * // Setting an array value (requires database-specific implementation)
266+ * GenericData.Array<Integer> intArray = ...;
267+ * setColumnValue(statement, 2, intArray, "integer");
268+ * }</pre>
269+ * </p>
270+ *
271+ * @param statement the PreparedStatement to bind the value to
272+ * @param index the parameter index (1-based) in the PreparedStatement
273+ * @param value the value to be bound (null values are handled automatically)
274+ * @param targetSqlType the target SQL type name for the column, required for array types
275+ * @throws Exception if value binding fails, including:
276+ * <ul>
277+ * <li>Unsupported value types</li>
278+ * <li>Array conversion failures (delegated to handleArrayValue)</li>
279+ * <li>JDBC binding errors</li>
280+ * </ul>
281+ * @see #handleArrayValue(PreparedStatement, int, Object, String)
282+ * @see #setColumnValue(PreparedStatement, int, Object)
283+ */
284+ protected void setColumnValue (PreparedStatement statement , int index , Object value ,
285+ String targetSqlType ) throws Exception {
178286
179287 log .debug ("Setting column value, statement: {}, index: {}, value: {}" , statement , index , value );
180288
289+ // Handle null values first
290+ if (value == null ) {
291+ setColumnNull (statement , index , java .sql .Types .NULL );
292+ return ;
293+ }
294+
295+ // Check for array types first, before other type checks
296+ if (value instanceof GenericData .Array || value instanceof Object []) {
297+ handleArrayValue (statement , index , value , targetSqlType );
298+ return ;
299+ }
300+
181301 if (value instanceof Integer ) {
182302 statement .setInt (index , (Integer ) value );
183303 } else if (value instanceof Long ) {
@@ -201,6 +321,24 @@ protected void setColumnValue(PreparedStatement statement, int index, Object val
201321 }
202322 }
203323
324+ /**
325+ * Backward compatibility method for setColumnValue without targetSqlType parameter.
326+ * This method is provided for compatibility with existing code that may call setColumnValue
327+ * without the targetSqlType parameter. Arrays will not be supported when using this method.
328+ *
329+ * @param statement the PreparedStatement to bind the value to
330+ * @param index the parameter index (1-based) in the PreparedStatement
331+ * @param value the value to be bound
332+ * @throws Exception if value binding fails or if an array is encountered (arrays require targetSqlType)
333+ */
334+ protected void setColumnValue (PreparedStatement statement , int index , Object value ) throws Exception {
335+ if (value instanceof GenericData .Array ) {
336+ throw new Exception ("Array values require targetSqlType parameter. "
337+ + "Use setColumnValue(statement, index, value, targetSqlType) instead." );
338+ }
339+ setColumnValue (statement , index , value , null );
340+ }
341+
204342 private static Object getValueFromJsonNode (final JsonNode fn ) {
205343 if (fn == null || fn .isNull ()) {
206344 return null ;
@@ -226,8 +364,8 @@ private static Object getValueFromJsonNode(final JsonNode fn) {
226364 }
227365
228366 private static void fillKeyValueSchemaData (org .apache .pulsar .client .api .Schema <GenericObject > schema ,
229- GenericObject record ,
230- Map <String , Object > data ) {
367+ GenericObject record ,
368+ Map <String , Object > data ) {
231369 if (record == null ) {
232370 return ;
233371 }
@@ -256,6 +394,51 @@ private static void fillKeyValueSchemaData(org.apache.pulsar.client.api.Schema<G
256394 }
257395 }
258396
397+ /**
398+ * Converts an Avro field value to a Java object suitable for JDBC binding.
399+ * <p>
400+ * This method handles the conversion of various Avro schema types to their corresponding
401+ * Java representations. It supports primitive types, strings, unions, and arrays.
402+ * Array conversion is performed recursively, processing each array element according
403+ * to the array's element schema.
404+ * </p>
405+ * <p>
406+ * <strong>Supported Avro Types:</strong>
407+ * <ul>
408+ * <li>Primitive types: NULL, INT, LONG, DOUBLE, FLOAT, BOOLEAN</li>
409+ * <li>String types: STRING, ENUM</li>
410+ * <li>Union types: Automatically selects the non-null type from the union</li>
411+ * <li>Array types: Recursively converts array elements to Object[]</li>
412+ * </ul>
413+ * </p>
414+ * <p>
415+ * <strong>Array Conversion:</strong>
416+ * Arrays are converted by recursively processing each element according to the
417+ * array's element schema. The result is an Object[] that can be further processed
418+ * by database-specific array handling methods.
419+ * </p>
420+ * <p>
421+ * <strong>Example Usage:</strong>
422+ * <pre>{@code
423+ * // Convert a simple integer field
424+ * Schema intSchema = Schema.create(Schema.Type.INT);
425+ * Object result = convertAvroField(42, intSchema); // Returns Integer(42)
426+ *
427+ * // Convert an array field
428+ * Schema arraySchema = Schema.createArray(Schema.create(Schema.Type.STRING));
429+ * GenericData.Array<String> avroArray = ...;
430+ * Object result = convertAvroField(avroArray, arraySchema); // Returns String[]
431+ * }</pre>
432+ * </p>
433+ *
434+ * @param avroValue the Avro value to convert (may be null)
435+ * @param schema the Avro schema describing the value's type
436+ * @return the converted Java object, or null if avroValue is null
437+ * @throws IllegalArgumentException if the avroValue doesn't match the expected schema type
438+ * @throws UnsupportedOperationException if the schema type is not supported
439+ * @see org.apache.avro.Schema.Type
440+ * @see org.apache.avro.generic.GenericData.Array
441+ */
259442 @ VisibleForTesting
260443 static Object convertAvroField (Object avroValue , Schema schema ) {
261444 if (avroValue == null ) {
@@ -281,6 +464,21 @@ static Object convertAvroField(Object avroValue, Schema schema) {
281464 }
282465 throw new IllegalArgumentException ("Found UNION schema but it doesn't contain any type" );
283466 case ARRAY :
467+ // Handle array conversion by recursively processing array elements
468+ if (avroValue instanceof GenericData .Array ) {
469+ GenericData .Array <?> avroArray = (GenericData .Array <?>) avroValue ;
470+ Schema elementSchema = schema .getElementType ();
471+ Object [] convertedArray = new Object [avroArray .size ()];
472+
473+ for (int i = 0 ; i < avroArray .size (); i ++) {
474+ convertedArray [i ] = convertAvroField (avroArray .get (i ), elementSchema );
475+ }
476+
477+ return convertedArray ;
478+ } else {
479+ throw new IllegalArgumentException ("Expected GenericData.Array for ARRAY schema type, got: "
480+ + avroValue .getClass ().getName ());
481+ }
284482 case BYTES :
285483 case FIXED :
286484 case RECORD :
0 commit comments