|
27 | 27 |
|
28 | 28 | package org.opensearch.sql.calcite.utils; |
29 | 29 |
|
| 30 | +import static java.util.Objects.requireNonNull; |
| 31 | + |
30 | 32 | import com.google.common.collect.ImmutableList; |
| 33 | +import java.lang.reflect.Type; |
31 | 34 | import java.sql.Connection; |
32 | 35 | import java.sql.PreparedStatement; |
33 | 36 | import java.sql.SQLException; |
34 | 37 | import java.time.Instant; |
35 | 38 | import java.util.Properties; |
36 | 39 | import java.util.function.Consumer; |
| 40 | +import org.apache.calcite.adapter.enumerable.EnumerableConvention; |
| 41 | +import org.apache.calcite.adapter.enumerable.EnumerableRel; |
37 | 42 | import org.apache.calcite.adapter.java.JavaTypeFactory; |
38 | 43 | import org.apache.calcite.avatica.AvaticaConnection; |
39 | 44 | import org.apache.calcite.avatica.AvaticaFactory; |
| 45 | +import org.apache.calcite.avatica.Meta; |
40 | 46 | import org.apache.calcite.avatica.UnregisteredDriver; |
41 | 47 | import org.apache.calcite.config.CalciteConnectionProperty; |
| 48 | +import org.apache.calcite.interpreter.BindableConvention; |
42 | 49 | import org.apache.calcite.interpreter.Bindables; |
43 | 50 | import org.apache.calcite.jdbc.CalciteFactory; |
44 | 51 | import org.apache.calcite.jdbc.CalciteJdbc41Factory; |
45 | 52 | import org.apache.calcite.jdbc.CalcitePrepare; |
46 | 53 | import org.apache.calcite.jdbc.CalciteSchema; |
47 | 54 | import org.apache.calcite.jdbc.Driver; |
| 55 | +import org.apache.calcite.linq4j.function.Function0; |
48 | 56 | import org.apache.calcite.plan.Context; |
49 | 57 | import org.apache.calcite.plan.Contexts; |
| 58 | +import org.apache.calcite.plan.Convention; |
50 | 59 | import org.apache.calcite.plan.RelOptCluster; |
51 | 60 | import org.apache.calcite.plan.RelOptPlanner; |
52 | 61 | import org.apache.calcite.plan.RelOptSchema; |
|
55 | 64 | import org.apache.calcite.prepare.CalcitePrepareImpl; |
56 | 65 | import org.apache.calcite.rel.RelHomogeneousShuttle; |
57 | 66 | import org.apache.calcite.rel.RelNode; |
| 67 | +import org.apache.calcite.rel.RelRoot; |
58 | 68 | import org.apache.calcite.rel.RelShuttle; |
59 | 69 | import org.apache.calcite.rel.core.TableScan; |
60 | 70 | import org.apache.calcite.rel.logical.LogicalTableScan; |
| 71 | +import org.apache.calcite.rel.type.RelDataType; |
| 72 | +import org.apache.calcite.rel.type.RelDataTypeFactory; |
61 | 73 | import org.apache.calcite.rel.type.RelDataTypeSystem; |
62 | 74 | import org.apache.calcite.rex.RexBuilder; |
63 | 75 | import org.apache.calcite.rex.RexNode; |
| 76 | +import org.apache.calcite.runtime.Bindable; |
64 | 77 | import org.apache.calcite.runtime.Hook; |
65 | 78 | import org.apache.calcite.schema.SchemaPlus; |
66 | 79 | import org.apache.calcite.server.CalciteServerStatement; |
67 | 80 | import org.apache.calcite.sql.SqlAggFunction; |
68 | 81 | import org.apache.calcite.sql.SqlKind; |
69 | 82 | import org.apache.calcite.sql.parser.SqlParserPos; |
| 83 | +import org.apache.calcite.sql2rel.SqlRexConvertletTable; |
70 | 84 | import org.apache.calcite.tools.FrameworkConfig; |
71 | 85 | import org.apache.calcite.tools.Frameworks; |
72 | 86 | import org.apache.calcite.tools.RelBuilder; |
73 | 87 | import org.apache.calcite.tools.RelRunner; |
74 | 88 | import org.apache.calcite.util.Holder; |
75 | 89 | import org.apache.calcite.util.Util; |
76 | 90 | import org.opensearch.sql.calcite.CalcitePlanContext; |
| 91 | +import org.opensearch.sql.calcite.plan.Scannable; |
77 | 92 | import org.opensearch.sql.calcite.udf.udaf.NullableSqlAvgAggFunction; |
78 | 93 |
|
79 | 94 | /** |
80 | 95 | * Calcite Tools Helper. This class is used to create customized: 1. Connection 2. JavaTypeFactory |
81 | | - * 3. RelBuilder 4. RelRunner TODO delete it in future if possible. |
| 96 | + * 3. RelBuilder 4. RelRunner 5. CalcitePreparingStmt. TODO delete it in future if possible. |
82 | 97 | */ |
83 | 98 | public class CalciteToolsHelper { |
84 | 99 |
|
@@ -153,6 +168,11 @@ public Connection connect( |
153 | 168 | this.handler.onConnectionInit(connection); |
154 | 169 | return connection; |
155 | 170 | } |
| 171 | + |
| 172 | + @Override |
| 173 | + protected Function0<CalcitePrepare> createPrepareFactory() { |
| 174 | + return OpenSearchPrepareImpl::new; |
| 175 | + } |
156 | 176 | } |
157 | 177 |
|
158 | 178 | /** do nothing, just extend for a public construct for new */ |
@@ -214,6 +234,104 @@ public <R> R perform( |
214 | 234 | final RelOptCluster cluster = createCluster(planner, rexBuilder); |
215 | 235 | return action.apply(cluster, catalogReader, prepareContext.getRootSchema().plus(), statement); |
216 | 236 | } |
| 237 | + |
| 238 | + /** |
| 239 | + * Customize CalcitePreparingStmt. Override {@link CalcitePrepareImpl#getPreparingStmt} and |
| 240 | + * return {@link OpenSearchCalcitePreparingStmt} |
| 241 | + */ |
| 242 | + @Override |
| 243 | + protected CalcitePrepareImpl.CalcitePreparingStmt getPreparingStmt( |
| 244 | + CalcitePrepare.Context context, |
| 245 | + Type elementType, |
| 246 | + CalciteCatalogReader catalogReader, |
| 247 | + RelOptPlanner planner) { |
| 248 | + final JavaTypeFactory typeFactory = context.getTypeFactory(); |
| 249 | + final EnumerableRel.Prefer prefer; |
| 250 | + if (elementType == Object[].class) { |
| 251 | + prefer = EnumerableRel.Prefer.ARRAY; |
| 252 | + } else { |
| 253 | + prefer = EnumerableRel.Prefer.CUSTOM; |
| 254 | + } |
| 255 | + final Convention resultConvention = |
| 256 | + enableBindable ? BindableConvention.INSTANCE : EnumerableConvention.INSTANCE; |
| 257 | + return new OpenSearchCalcitePreparingStmt( |
| 258 | + this, |
| 259 | + context, |
| 260 | + catalogReader, |
| 261 | + typeFactory, |
| 262 | + context.getRootSchema(), |
| 263 | + prefer, |
| 264 | + createCluster(planner, new RexBuilder(typeFactory)), |
| 265 | + resultConvention, |
| 266 | + createConvertletTable()); |
| 267 | + } |
| 268 | + } |
| 269 | + |
| 270 | + /** |
| 271 | + * Similar to {@link CalcitePrepareImpl.CalcitePreparingStmt}. Customize the logic to convert an |
| 272 | + * EnumerableTableScan to BindableTableScan. |
| 273 | + */ |
| 274 | + public static class OpenSearchCalcitePreparingStmt |
| 275 | + extends CalcitePrepareImpl.CalcitePreparingStmt { |
| 276 | + |
| 277 | + public OpenSearchCalcitePreparingStmt( |
| 278 | + CalcitePrepareImpl prepare, |
| 279 | + CalcitePrepare.Context context, |
| 280 | + CatalogReader catalogReader, |
| 281 | + RelDataTypeFactory typeFactory, |
| 282 | + CalciteSchema schema, |
| 283 | + EnumerableRel.Prefer prefer, |
| 284 | + RelOptCluster cluster, |
| 285 | + Convention resultConvention, |
| 286 | + SqlRexConvertletTable convertletTable) { |
| 287 | + super( |
| 288 | + prepare, |
| 289 | + context, |
| 290 | + catalogReader, |
| 291 | + typeFactory, |
| 292 | + schema, |
| 293 | + prefer, |
| 294 | + cluster, |
| 295 | + resultConvention, |
| 296 | + convertletTable); |
| 297 | + } |
| 298 | + |
| 299 | + @Override |
| 300 | + protected PreparedResult implement(RelRoot root) { |
| 301 | + Hook.PLAN_BEFORE_IMPLEMENTATION.run(root); |
| 302 | + RelDataType resultType = root.rel.getRowType(); |
| 303 | + boolean isDml = root.kind.belongsTo(SqlKind.DML); |
| 304 | + if (root.rel instanceof Scannable scannable) { |
| 305 | + final Bindable bindable = dataContext -> scannable.scan(); |
| 306 | + |
| 307 | + return new PreparedResultImpl( |
| 308 | + resultType, |
| 309 | + requireNonNull(parameterRowType, "parameterRowType"), |
| 310 | + requireNonNull(fieldOrigins, "fieldOrigins"), |
| 311 | + root.collation.getFieldCollations().isEmpty() |
| 312 | + ? ImmutableList.of() |
| 313 | + : ImmutableList.of(root.collation), |
| 314 | + root.rel, |
| 315 | + mapTableModOp(isDml, root.kind), |
| 316 | + isDml) { |
| 317 | + @Override |
| 318 | + public String getCode() { |
| 319 | + throw new UnsupportedOperationException(); |
| 320 | + } |
| 321 | + |
| 322 | + @Override |
| 323 | + public Bindable getBindable(Meta.CursorFactory cursorFactory) { |
| 324 | + return bindable; |
| 325 | + } |
| 326 | + |
| 327 | + @Override |
| 328 | + public Type getElementType() { |
| 329 | + return resultType.getFieldList().size() == 1 ? Object.class : Object[].class; |
| 330 | + } |
| 331 | + }; |
| 332 | + } |
| 333 | + return super.implement(root); |
| 334 | + } |
217 | 335 | } |
218 | 336 |
|
219 | 337 | public static class OpenSearchRelRunners { |
|
0 commit comments