77
88import java .util .ArrayList ;
99import java .util .HashMap ;
10- import java .util .HashSet ;
1110import java .util .List ;
1211import java .util .Map ;
13- import java .util .Set ;
1412import java .util .stream .Collectors ;
1513import org .apache .calcite .rel .RelNode ;
1614import org .apache .calcite .rel .type .RelDataType ;
1715import org .apache .calcite .rel .type .RelDataTypeField ;
1816import org .apache .calcite .rex .RexNode ;
19- import org .apache .calcite .sql .validate .SqlValidatorUtil ;
2017
2118/**
22- * Utility class for unifying schemas across multiple RelNodes with type conflict resolution. Uses
23- * the same strategy as append command - renames conflicting fields to avoid type conflicts .
19+ * Utility class for unifying schemas across multiple RelNodes. Throws an exception when type
20+ * conflicts are detected .
2421 */
2522public class SchemaUnifier {
2623
2724 /**
28- * Builds a unified schema for multiple nodes with type conflict resolution .
25+ * Builds a unified schema for multiple nodes. Throws an exception if type conflicts are detected .
2926 *
3027 * @param nodes List of RelNodes to unify schemas for
3128 * @param context Calcite plan context
3229 * @return List of projected RelNodes with unified schema
30+ * @throws IllegalArgumentException if type conflicts are detected
3331 */
3432 public static List <RelNode > buildUnifiedSchemaWithConflictResolution (
3533 List <RelNode > nodes , CalcitePlanContext context ) {
@@ -41,7 +39,7 @@ public static List<RelNode> buildUnifiedSchemaWithConflictResolution(
4139 return nodes ;
4240 }
4341
44- // Step 1: Build the unified schema by processing all nodes
42+ // Step 1: Build the unified schema by processing all nodes (throws on conflict)
4543 List <SchemaField > unifiedSchema = buildUnifiedSchema (nodes );
4644
4745 // Step 2: Create projections for each node to align with unified schema
@@ -55,47 +53,37 @@ public static List<RelNode> buildUnifiedSchemaWithConflictResolution(
5553 projectedNodes .add (projectedNode );
5654 }
5755
58- // Step 3: Unify names to handle type conflicts (this creates age0, age1, etc.)
59- List <String > uniqueNames =
60- SqlValidatorUtil .uniquify (fieldNames , SqlValidatorUtil .EXPR_SUGGESTER , true );
61-
62- // Step 4: Re-project with unique names if needed
63- if (!uniqueNames .equals (fieldNames )) {
64- List <RelNode > renamedNodes = new ArrayList <>();
65- for (RelNode node : projectedNodes ) {
66- RelNode renamedNode =
67- context .relBuilder .push (node ).project (context .relBuilder .fields (), uniqueNames ).build ();
68- renamedNodes .add (renamedNode );
69- }
70- return renamedNodes ;
71- }
72-
7356 return projectedNodes ;
7457 }
7558
7659 /**
77- * Builds a unified schema by merging fields from all nodes. Fields with the same name but
78- * different types are added as separate entries (which will be renamed during uniquification) .
60+ * Builds a unified schema by merging fields from all nodes. Throws an exception if fields with
61+ * the same name have different types .
7962 *
8063 * @param nodes List of RelNodes to merge schemas from
81- * @return List of SchemaField representing the unified schema (may contain duplicate names)
64+ * @return List of SchemaField representing the unified schema
65+ * @throws IllegalArgumentException if type conflicts are detected
8266 */
8367 private static List <SchemaField > buildUnifiedSchema (List <RelNode > nodes ) {
8468 List <SchemaField > schema = new ArrayList <>();
85- Map <String , Set < RelDataType > > seenFields = new HashMap <>();
69+ Map <String , RelDataType > seenFields = new HashMap <>();
8670
8771 for (RelNode node : nodes ) {
8872 for (RelDataTypeField field : node .getRowType ().getFieldList ()) {
8973 String fieldName = field .getName ();
9074 RelDataType fieldType = field .getType ();
9175
92- // Track which (name, type) combinations we've seen
93- Set <RelDataType > typesForName = seenFields .computeIfAbsent (fieldName , k -> new HashSet <>());
94-
95- if (!typesForName .contains (fieldType )) {
96- // New field or same name with different type - add to schema
76+ RelDataType existingType = seenFields .get (fieldName );
77+ if (existingType == null ) {
78+ // New field - add to schema
9779 schema .add (new SchemaField (fieldName , fieldType ));
98- typesForName .add (fieldType );
80+ seenFields .put (fieldName , fieldType );
81+ } else if (!areTypesCompatible (existingType , fieldType )) {
82+ // Same field name but different type - throw exception
83+ throw new IllegalArgumentException (
84+ String .format (
85+ "Unable to process column '%s' due to incompatible types: '%s' and '%s'" ,
86+ fieldName , existingType .getSqlTypeName (), fieldType .getSqlTypeName ()));
9987 }
10088 // If we've seen this exact (name, type) combination, skip it
10189 }
@@ -104,6 +92,10 @@ private static List<SchemaField> buildUnifiedSchema(List<RelNode> nodes) {
10492 return schema ;
10593 }
10694
95+ private static boolean areTypesCompatible (RelDataType type1 , RelDataType type2 ) {
96+ return type1 .getSqlTypeName () != null && type1 .getSqlTypeName ().equals (type2 .getSqlTypeName ());
97+ }
98+
10799 /**
108100 * Builds a projection for a node to align with the unified schema. For each field in the unified
109101 * schema: - If the node has a matching field with the same type, use it - Otherwise, project NULL
@@ -125,8 +117,8 @@ private static List<RexNode> buildProjectionForNode(
125117 RelDataType expectedType = schemaField .getType ();
126118 RelDataTypeField nodeField = nodeFieldMap .get (fieldName );
127119
128- if (nodeField != null && nodeField .getType (). equals ( expectedType )) {
129- // Field exists with matching type - use it
120+ if (nodeField != null && areTypesCompatible ( nodeField .getType (), expectedType )) {
121+ // Field exists with compatible type - use it
130122 projection .add (context .rexBuilder .makeInputRef (node , nodeField .getIndex ()));
131123 } else {
132124 // Field missing or type mismatch - project NULL
0 commit comments