diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml index a21e7eb2..676d1c00 100644 --- a/.github/workflows/push_pr.yml +++ b/.github/workflows/push_pr.yml @@ -28,7 +28,7 @@ jobs: compile_and_test: strategy: matrix: - flink: [ 2.2.1, 2.1.2 ] + flink: [ 2.2.1 ] jdk: [ '11', '17, 21' ] uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils with: diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSource.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSource.java index 83a95f9e..5250b4be 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSource.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSource.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.flink.connector.elasticsearch.table; import org.apache.flink.api.common.serialization.DeserializationSchema; @@ -24,20 +42,20 @@ * from a logical description. */ public class ElasticsearchDynamicSource implements LookupTableSource, SupportsProjectionPushDown { - private final DecodingFormat> format; - private final ElasticsearchConfiguration config; - private final int lookupMaxRetryTimes; - private final LookupCache lookupCache; - private final String docType; - private final String summaryString; - private final ElasticsearchApiCallBridge apiCallBridge; - private DataType physicalRowDataType; + protected final DecodingFormat> format; + protected final ElasticsearchConfiguration config; + protected final int maxRetryTimes; + protected final LookupCache lookupCache; + protected final String docType; + protected final String summaryString; + protected final ElasticsearchApiCallBridge apiCallBridge; + protected DataType physicalRowDataType; public ElasticsearchDynamicSource( DecodingFormat> format, ElasticsearchConfiguration config, DataType physicalRowDataType, - int lookupMaxRetryTimes, + int maxRetryTimes, String summaryString, ElasticsearchApiCallBridge apiCallBridge, @Nullable LookupCache lookupCache, @@ -45,7 +63,7 @@ public ElasticsearchDynamicSource( this.format = format; this.config = config; this.physicalRowDataType = physicalRowDataType; - this.lookupMaxRetryTimes = lookupMaxRetryTimes; + this.maxRetryTimes = maxRetryTimes; this.summaryString = summaryString; this.apiCallBridge = apiCallBridge; this.lookupCache = lookupCache; @@ -68,7 +86,7 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { ElasticsearchRowDataLookupFunction lookupFunction = new ElasticsearchRowDataLookupFunction<>( this.format.createRuntimeDecoder(context, physicalRowDataType), - lookupMaxRetryTimes, + maxRetryTimes, config.getIndex(), docType, DataType.getFieldNames(physicalRowDataType).toArray(new String[0]), @@ -84,7 +102,7 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { } } - private NetworkClientConfig buildNetworkClientConfig() { + protected NetworkClientConfig buildNetworkClientConfig() { NetworkClientConfig.Builder builder = new NetworkClientConfig.Builder(); if (config.getUsername().isPresent() && !StringUtils.isNullOrWhitespaceOnly(config.getUsername().get())) { @@ -123,7 +141,7 @@ public DynamicTableSource copy() { format, config, physicalRowDataType, - lookupMaxRetryTimes, + maxRetryTimes, summaryString, apiCallBridge, lookupCache, diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java index f2233807..8a728936 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java @@ -165,7 +165,7 @@ ElasticsearchConfiguration getConfiguration(FactoryUtil.TableFactoryHelper helpe } @Nullable - private LookupCache getLookupCache(ReadableConfig tableOptions) { + protected LookupCache getLookupCache(ReadableConfig tableOptions) { LookupCache cache = null; if (tableOptions .get(LookupOptions.CACHE_TYPE) diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/search/AbstractElasticsearchVectorSearchFunction.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/search/AbstractElasticsearchVectorSearchFunction.java new file mode 100644 index 00000000..14783077 --- /dev/null +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/search/AbstractElasticsearchVectorSearchFunction.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.table.search; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.utils.JoinedRowData; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.VectorSearchFunction; +import org.apache.flink.util.FlinkRuntimeException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Base {@link VectorSearchFunction} implementation for Elasticsearch. Shared retry loop, result + * decoding and null-source filtering live here; version-specific subclasses only need to provide + * the client initialization and the search call. + */ +public abstract class AbstractElasticsearchVectorSearchFunction extends VectorSearchFunction { + private static final Logger LOG = + LoggerFactory.getLogger(AbstractElasticsearchVectorSearchFunction.class); + private static final long serialVersionUID = 1L; + + protected final DeserializationSchema deserializationSchema; + protected final String index; + protected final String searchColumn; + protected final String[] producedNames; + protected final int maxRetryTimes; + + protected AbstractElasticsearchVectorSearchFunction( + DeserializationSchema deserializationSchema, + int maxRetryTimes, + String index, + String searchColumn, + String[] producedNames) { + this.deserializationSchema = + checkNotNull(deserializationSchema, "No DeserializationSchema supplied."); + this.producedNames = checkNotNull(producedNames, "No fieldNames supplied."); + this.maxRetryTimes = maxRetryTimes; + this.index = index; + this.searchColumn = searchColumn; + } + + @Override + public void open(FunctionContext context) throws Exception { + doOpen(context); + deserializationSchema.open(null); + } + + @Override + public void close() throws Exception { + try { + doClose(); + } finally { + super.close(); + } + } + + @Override + public Collection vectorSearch(int topK, RowData features) throws IOException { + for (int retry = 0; retry <= maxRetryTimes; retry++) { + try { + SearchResult[] results = doSearch(topK, features); + if (results.length > 0) { + ArrayList rows = new ArrayList<>(results.length); + for (SearchResult result : results) { + if (result.source == null) { + continue; + } + RowData row = parseSearchResult(result.source); + if (row == null) { + continue; + } + GenericRowData scoreData = new GenericRowData(1); + scoreData.setField(0, result.score); + rows.add(new JoinedRowData(row, scoreData)); + } + rows.trimToSize(); + return rows; + } + } catch (IOException e) { + LOG.error(String.format("Elasticsearch search error, retry times = %d", retry), e); + if (retry >= maxRetryTimes) { + throw new FlinkRuntimeException("Execution of Elasticsearch search failed.", e); + } + try { + Thread.sleep(1000L * retry); + } catch (InterruptedException e1) { + LOG.warn( + "Interrupted while waiting to retry failed elasticsearch search, aborting"); + throw new FlinkRuntimeException(e1); + } + } + } + return Collections.emptyList(); + } + + /** Version-specific initialization (e.g., creating the underlying Elasticsearch client). */ + protected abstract void doOpen(FunctionContext context) throws Exception; + + /** Version-specific resource release (e.g., closing the underlying Elasticsearch client). */ + protected abstract void doClose() throws Exception; + + /** Execute a single vector search call and return raw results, excluding nothing. */ + protected abstract SearchResult[] doSearch(int topK, RowData features) throws IOException; + + private RowData parseSearchResult(String result) { + try { + return deserializationSchema.deserialize(result.getBytes()); + } catch (IOException e) { + LOG.error("Deserialize search hit failed: " + e.getMessage()); + return null; + } + } + + /** One hit from Elasticsearch — raw JSON source plus score. */ + protected static class SearchResult { + final String source; + final Double score; + + public SearchResult(String source, Double score) { + this.source = source; + this.score = score; + } + } +} diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/search/SearchMetric.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/search/SearchMetric.java new file mode 100644 index 00000000..5e42f7c6 --- /dev/null +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/search/SearchMetric.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.table.search; + +/** Metric for vector search. */ +public enum SearchMetric { + COSINE_SIMILARITY("cosineSimilarity"), + L1NORM("l1norm"), + L2NORM("l2norm"), + HAMMING("hamming"), + DOT_PRODUCT("dotProduct"); + + private final String name; + + SearchMetric(String name) { + this.name = name; + } + + @Override + public String toString() { + return name; + } +} diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/search/VectorSearchUtils.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/search/VectorSearchUtils.java new file mode 100644 index 00000000..54fbbafb --- /dev/null +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/search/VectorSearchUtils.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.table.search; + +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.connector.source.VectorSearchTableSource; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.RowType; + +/** Shared helpers for the Elasticsearch vector search table sources. */ +public class VectorSearchUtils { + + private VectorSearchUtils() {} + + /** + * Validates the search columns declared on the given context and returns the resolved physical + * column name. Elasticsearch only supports a single, non-nested float-array column. + */ + public static String resolveSearchColumn( + DataType physicalRowDataType, + VectorSearchTableSource.VectorSearchContext vectorSearchContext) { + int[][] searchColumns = vectorSearchContext.getSearchColumns(); + + if (searchColumns.length != 1) { + throw new IllegalArgumentException( + String.format( + "Elasticsearch only supports one search columns now, but input search columns size is %d.", + searchColumns.length)); + } + int[] searchColumn = searchColumns[0]; + if (searchColumn.length != 1) { + throw new IllegalArgumentException( + "Elasticsearch doesn't support to search data using nested columns."); + } + int searchColumnIndex = searchColumn[0]; + + if (searchColumnIndex < 0 + || searchColumnIndex >= physicalRowDataType.getChildren().size()) { + throw new ValidationException( + String.format( + "The specified search column with index %d doesn't exist in schema.", + searchColumnIndex)); + } + + DataType searchColumnType = physicalRowDataType.getChildren().get(searchColumnIndex); + if (!searchColumnType.getLogicalType().is(LogicalTypeRoot.ARRAY) + || !((ArrayType) searchColumnType.getLogicalType()) + .getElementType() + .is(LogicalTypeRoot.FLOAT)) { + throw new UnsupportedOperationException( + String.format( + "Elasticsearch only supports search data using float vector now, but input search column type is %s.", + searchColumnType)); + } + + return ((RowType) physicalRowDataType.getLogicalType()) + .getFieldNames() + .get(searchColumnIndex); + } +} diff --git a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/ElasticsearchUtil.java b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/ElasticsearchUtil.java index f7126b78..2149bcbe 100644 --- a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/ElasticsearchUtil.java +++ b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/ElasticsearchUtil.java @@ -26,9 +26,11 @@ import org.slf4j.Logger; import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.elasticsearch.ElasticsearchContainer; import org.testcontainers.utility.DockerImageName; +import java.time.Duration; import java.util.Optional; /** Collection of utility methods for Elasticsearch tests. */ @@ -62,10 +64,16 @@ public static ElasticsearchContainer createElasticsearchContainer( logLevel = "OFF"; } - return new ElasticsearchContainer(DockerImageName.parse(dockerImageVersion)) - .withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g") - .withEnv("logger.org.elasticsearch", logLevel) - .withLogConsumer(new Slf4jLogConsumer(log)); + ElasticsearchContainer container = + new ElasticsearchContainer(DockerImageName.parse(dockerImageVersion)) + .withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g") + .withEnv("logger.org.elasticsearch", logLevel) + .withLogConsumer(new Slf4jLogConsumer(log)); + + container.setWaitStrategy( + Wait.defaultWaitStrategy().withStartupTimeout(Duration.ofMinutes(1))); + + return container; } /** A mock {@link DynamicTableSink.Context} for Elasticsearch tests. */ diff --git a/flink-connector-elasticsearch7/pom.xml b/flink-connector-elasticsearch7/pom.xml index f8cbbf43..b0b8014d 100644 --- a/flink-connector-elasticsearch7/pom.xml +++ b/flink-connector-elasticsearch7/pom.xml @@ -165,6 +165,14 @@ under the License. test + + org.apache.flink + flink-table-planner_${scala.binary.version} + ${flink.version} + test-jar + test + + org.apache.flink flink-table-runtime diff --git a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7Configuration.java b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7Configuration.java new file mode 100644 index 00000000..8b373c7a --- /dev/null +++ b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7Configuration.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.table; + +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.elasticsearch.table.search.SearchMetric; + +import static org.apache.flink.connector.elasticsearch.table.Elasticsearch7ConnectorOptions.MAX_RETRIES; +import static org.apache.flink.connector.elasticsearch.table.Elasticsearch7ConnectorOptions.VECTOR_SEARCH_METRIC; + +/** Elasticsearch 7 specific configuration. */ +public class Elasticsearch7Configuration extends ElasticsearchConfiguration { + Elasticsearch7Configuration(ReadableConfig config) { + super(config); + } + + public int getMaxRetries() { + return config.get(MAX_RETRIES); + } + + public SearchMetric getVectorSearchMetric() { + return config.get(VECTOR_SEARCH_METRIC); + } +} diff --git a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7ConnectorOptions.java b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7ConnectorOptions.java new file mode 100644 index 00000000..56fbaf21 --- /dev/null +++ b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7ConnectorOptions.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.table; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.connector.elasticsearch.table.search.SearchMetric; + +/** + * Options specific for the Elasticsearch 7 connector. Public so that the {@link + * org.apache.flink.table.api.TableDescriptor} can access it. + */ +@PublicEvolving +public class Elasticsearch7ConnectorOptions extends ElasticsearchConnectorOptions { + private Elasticsearch7ConnectorOptions() {} + + public static final ConfigOption MAX_RETRIES = + ConfigOptions.key("max-retries") + .intType() + .defaultValue(3) + .withFallbackKeys("lookup.max-retries") + .withDescription( + "The maximum allowed retries if a lookup/search operation fails."); + + public static final ConfigOption VECTOR_SEARCH_METRIC = + ConfigOptions.key("vector-search.metric") + .enumType(SearchMetric.class) + .defaultValue(SearchMetric.COSINE_SIMILARITY) + .withDescription( + "The metric of vector search, by default is cosineSimilarity."); +} diff --git a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7DynamicSource.java b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7DynamicSource.java new file mode 100644 index 00000000..1f182810 --- /dev/null +++ b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7DynamicSource.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.table; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.connector.elasticsearch.ElasticsearchApiCallBridge; +import org.apache.flink.connector.elasticsearch.NetworkClientConfig; +import org.apache.flink.connector.elasticsearch.table.search.ElasticsearchRowDataVectorSearchFunction; +import org.apache.flink.connector.elasticsearch.table.search.VectorSearchUtils; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.VectorSearchTableSource; +import org.apache.flink.table.connector.source.lookup.cache.LookupCache; +import org.apache.flink.table.connector.source.search.VectorSearchFunctionProvider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; + +import org.elasticsearch.client.RestHighLevelClient; + +import javax.annotation.Nullable; + +/** + * A {@link DynamicTableSource} that describes how to create a {@link Elasticsearch7DynamicSource} + * from a logical description. + */ +public class Elasticsearch7DynamicSource extends ElasticsearchDynamicSource + implements VectorSearchTableSource { + + public Elasticsearch7DynamicSource( + DecodingFormat> format, + ElasticsearchConfiguration config, + DataType physicalRowDataType, + int maxRetryTimes, + String summaryString, + ElasticsearchApiCallBridge apiCallBridge, + @Nullable LookupCache lookupCache, + @Nullable String docType) { + super( + format, + config, + physicalRowDataType, + maxRetryTimes, + summaryString, + apiCallBridge, + lookupCache, + docType); + } + + @SuppressWarnings("unchecked") + @Override + public DynamicTableSource copy() { + return new Elasticsearch7DynamicSource( + format, + config, + physicalRowDataType, + maxRetryTimes, + summaryString, + (ElasticsearchApiCallBridge) apiCallBridge, + lookupCache, + docType); + } + + @SuppressWarnings("unchecked") + @Override + public VectorSearchRuntimeProvider getSearchRuntimeProvider( + VectorSearchContext vectorSearchContext) { + + NetworkClientConfig networkClientConfig = buildNetworkClientConfig(); + + ElasticsearchRowDataVectorSearchFunction vectorSearchFunction = + new ElasticsearchRowDataVectorSearchFunction( + this.format.createRuntimeDecoder(vectorSearchContext, physicalRowDataType), + this.maxRetryTimes, + ((Elasticsearch7Configuration) config).getVectorSearchMetric(), + config.getIndex(), + VectorSearchUtils.resolveSearchColumn( + physicalRowDataType, vectorSearchContext), + DataType.getFieldNames(physicalRowDataType).toArray(new String[0]), + config.getHosts(), + networkClientConfig, + (ElasticsearchApiCallBridge) apiCallBridge); + + return VectorSearchFunctionProvider.of(vectorSearchFunction); + } +} diff --git a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7DynamicTableFactory.java b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7DynamicTableFactory.java index 2f6d8849..755175fb 100644 --- a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7DynamicTableFactory.java +++ b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7DynamicTableFactory.java @@ -19,10 +19,49 @@ package org.apache.flink.connector.elasticsearch.table; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.connector.elasticsearch.Elasticsearch7ApiCallBridge; import org.apache.flink.connector.elasticsearch.ElasticsearchApiCallBridge; import org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DeserializationFormatFactory; import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.FactoryUtil; + +import org.elasticsearch.client.RestHighLevelClient; + +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.connector.elasticsearch.table.Elasticsearch7ConnectorOptions.MAX_RETRIES; +import static org.apache.flink.connector.elasticsearch.table.Elasticsearch7ConnectorOptions.VECTOR_SEARCH_METRIC; +import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION; +import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION; +import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION; +import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_INTERVAL_OPTION; +import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_MAX_ACTIONS_OPTION; +import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_MAX_SIZE_OPTION; +import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.CONNECTION_PATH_PREFIX_OPTION; +import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.CONNECTION_REQUEST_TIMEOUT; +import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.CONNECTION_TIMEOUT; +import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.DELIVERY_GUARANTEE_OPTION; +import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.FORMAT_OPTION; +import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION; +import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION; +import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.SOCKET_TIMEOUT; +import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION; +import static org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE; +import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY; +import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS; +import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE; +import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_MAX_ROWS; +import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM; +import static org.elasticsearch.common.Strings.capitalize; /** A {@link DynamicTableSinkFactory} for discovering {@link ElasticsearchDynamicSink}. */ @Internal @@ -34,7 +73,67 @@ public Elasticsearch7DynamicTableFactory() { } @Override - ElasticsearchApiCallBridge getElasticsearchApiCallBridge() { + ElasticsearchConfiguration getConfiguration(FactoryUtil.TableFactoryHelper helper) { + return new Elasticsearch7Configuration(helper.getOptions()); + } + + @Override + public DynamicTableSource createDynamicTableSource(Context context) { + final FactoryUtil.TableFactoryHelper helper = + FactoryUtil.createTableFactoryHelper(this, context); + final ReadableConfig options = helper.getOptions(); + final DecodingFormat> format = + helper.discoverDecodingFormat( + DeserializationFormatFactory.class, + org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions + .FORMAT_OPTION); + + Elasticsearch7Configuration config = (Elasticsearch7Configuration) getConfiguration(helper); + helper.validate(); + validateConfiguration(config); + + return new Elasticsearch7DynamicSource( + format, + config, + context.getPhysicalRowDataType(), + config.getMaxRetries(), + capitalize(FACTORY_IDENTIFIER), + getElasticsearchApiCallBridge(), + getLookupCache(options), + getDocumentType(config)); + } + + @Override + ElasticsearchApiCallBridge getElasticsearchApiCallBridge() { return new Elasticsearch7ApiCallBridge(); } + + @Override + public Set> optionalOptions() { + return Stream.of( + KEY_DELIMITER_OPTION, + BULK_FLUSH_MAX_SIZE_OPTION, + BULK_FLUSH_MAX_ACTIONS_OPTION, + BULK_FLUSH_INTERVAL_OPTION, + BULK_FLUSH_BACKOFF_TYPE_OPTION, + BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION, + BULK_FLUSH_BACKOFF_DELAY_OPTION, + CONNECTION_PATH_PREFIX_OPTION, + CONNECTION_REQUEST_TIMEOUT, + CONNECTION_TIMEOUT, + SOCKET_TIMEOUT, + FORMAT_OPTION, + DELIVERY_GUARANTEE_OPTION, + PASSWORD_OPTION, + USERNAME_OPTION, + SINK_PARALLELISM, + CACHE_TYPE, + PARTIAL_CACHE_EXPIRE_AFTER_ACCESS, + PARTIAL_CACHE_EXPIRE_AFTER_WRITE, + PARTIAL_CACHE_MAX_ROWS, + PARTIAL_CACHE_CACHE_MISSING_KEY, + MAX_RETRIES, + VECTOR_SEARCH_METRIC) + .collect(Collectors.toSet()); + } } diff --git a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/search/ElasticsearchRowDataVectorSearchFunction.java b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/search/ElasticsearchRowDataVectorSearchFunction.java new file mode 100644 index 00000000..66f19cb9 --- /dev/null +++ b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/table/search/ElasticsearchRowDataVectorSearchFunction.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.table.search; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.connector.elasticsearch.ElasticsearchApiCallBridge; +import org.apache.flink.connector.elasticsearch.NetworkClientConfig; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.VectorSearchFunction; + +import org.apache.http.HttpHost; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.index.query.MatchAllQueryBuilder; +import org.elasticsearch.index.query.functionscore.ScriptScoreQueryBuilder; +import org.elasticsearch.script.Script; +import org.elasticsearch.script.ScriptType; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** The {@link VectorSearchFunction} implementation for Elasticsearch 7. */ +public class ElasticsearchRowDataVectorSearchFunction + extends AbstractElasticsearchVectorSearchFunction { + private static final long serialVersionUID = 1L; + private static final String QUERY_VECTOR = "query_vector"; + + private final ElasticsearchApiCallBridge callBridge; + private final NetworkClientConfig networkClientConfig; + private final List hosts; + private final String scriptScore; + + private transient RestHighLevelClient client; + private transient SearchRequest searchRequest; + private transient SearchSourceBuilder searchSourceBuilder; + + public ElasticsearchRowDataVectorSearchFunction( + DeserializationSchema deserializationSchema, + int maxRetryTimes, + SearchMetric searchMetric, + String index, + String searchColumn, + String[] producedNames, + List hosts, + NetworkClientConfig networkClientConfig, + ElasticsearchApiCallBridge callBridge) { + super(deserializationSchema, maxRetryTimes, index, searchColumn, producedNames); + this.networkClientConfig = + checkNotNull(networkClientConfig, "No networkClientConfig supplied."); + this.hosts = checkNotNull(hosts, "No hosts supplied."); + this.callBridge = checkNotNull(callBridge, "No ElasticsearchApiCallBridge supplied."); + this.scriptScore = + String.format( + "%s(params.%s, '%s') + 1.0", + searchMetric.toString(), QUERY_VECTOR, searchColumn); + } + + @Override + protected void doOpen(FunctionContext context) { + this.client = callBridge.createClient(networkClientConfig, hosts); + + // Reuse searchRequest / searchSourceBuilder across invocations to avoid rebuilding them + // per record. + this.searchRequest = new SearchRequest(index); + this.searchSourceBuilder = new SearchSourceBuilder(); + this.searchSourceBuilder.fetchSource(producedNames, null); + } + + @Override + protected void doClose() throws IOException { + if (client != null) { + client.close(); + client = null; + } + } + + @Override + protected SearchResult[] doSearch(int topK, RowData features) throws IOException { + // Elasticsearch 7.x doesn't support ANN, we use script score to achieve exact matching. + Map params = + Collections.singletonMap(QUERY_VECTOR, features.getArray(0).toFloatArray()); + + Script script = new Script(ScriptType.INLINE, "painless", scriptScore, params); + ScriptScoreQueryBuilder scriptScoreQuery = + new ScriptScoreQueryBuilder(new MatchAllQueryBuilder(), script); + + searchSourceBuilder.query(scriptScoreQuery).size(topK); + searchRequest.source(searchSourceBuilder); + + SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); + SearchHit[] searchHits = searchResponse.getHits().getHits(); + + return Stream.of(searchHits) + .filter(hit -> hit.getSourceAsString() != null) + .map(hit -> new SearchResult(hit.getSourceAsString(), (double) hit.getScore())) + .toArray(SearchResult[]::new); + } +} diff --git a/flink-connector-elasticsearch7/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7VectorSearchITCase.java b/flink-connector-elasticsearch7/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7VectorSearchITCase.java new file mode 100644 index 00000000..8f6c2494 --- /dev/null +++ b/flink-connector-elasticsearch7/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch7VectorSearchITCase.java @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.table; + +import org.apache.flink.connector.elasticsearch.ElasticsearchUtil; +import org.apache.flink.connector.elasticsearch.test.DockerImageVersions; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.types.Row; +import org.apache.flink.util.CollectionUtil; + +import org.apache.http.HttpHost; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.indices.CreateIndexRequest; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.elasticsearch.ElasticsearchContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.io.IOException; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.table.api.Expressions.row; +import static org.assertj.core.api.Assertions.assertThat; + +/** {@code VECTOR_SEARCH } ITCase for Elasticsearch. */ +@Testcontainers +public class Elasticsearch7VectorSearchITCase { + private static final Logger LOG = + LoggerFactory.getLogger(Elasticsearch7VectorSearchITCase.class); + + private static final int PARALLELISM = 2; + + @Container + private static final ElasticsearchContainer ES_CONTAINER = + ElasticsearchUtil.createElasticsearchContainer( + DockerImageVersions.ELASTICSEARCH_7, LOG); + + String getElasticsearchHttpHostAddress() { + return ES_CONTAINER.getHttpHostAddress(); + } + + private RestHighLevelClient getClient() { + return new RestHighLevelClient( + RestClient.builder(HttpHost.create(getElasticsearchHttpHostAddress()))); + } + + @RegisterExtension + private static final MiniClusterExtension MINI_CLUSTER_RESOURCE = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(PARALLELISM) + .build()); + + private final List inputData = + Arrays.asList( + Row.of(1L, "Spark", new Float[] {5f, 12f, 13f}), + Row.of(2L, "Flink", new Float[] {-5f, -12f, -13f})); + + private TableEnvironment tEnv; + + @BeforeEach + void beforeEach() { + tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode()); + } + + @Test + public void testSearchFullTypeVectorTable() throws Exception { + String index = "table_with_all_supported_types"; + createFullTypesIndex(index); + tEnv.executeSql( + "CREATE TABLE esTable (" + + " id BIGINT,\n" + + " f1 STRING,\n" + + " f2 BOOLEAN,\n" + + " f3 TINYINT,\n" + + " f4 SMALLINT,\n" + + " f5 INTEGER,\n" + + " f6 DATE,\n" + + " f7 TIMESTAMP,\n" + + " f8 FLOAT,\n" + + " f9 DOUBLE,\n" + + " f10 ARRAY,\n" + + " f11 ARRAY,\n" + + " f12 ARRAY,\n" + + " f13 ARRAY,\n" + + " PRIMARY KEY (id) NOT ENFORCED\n" + + ")\n" + + "WITH (\n" + + String.format("'%s'='%s',\n", "connector", "elasticsearch-7") + + String.format( + "'%s'='%s',\n", + ElasticsearchConnectorOptions.INDEX_OPTION.key(), index) + + String.format( + "'%s'='%s'\n", + ElasticsearchConnectorOptions.HOSTS_OPTION.key(), + ES_CONTAINER.getHttpHostAddress()) + + ")"); + + tEnv.fromValues( + row( + 1, + "ABCDE", + true, + (byte) 127, + (short) 257, + 65535, + LocalDate.ofEpochDay(12345), + LocalDateTime.parse("2012-12-12T12:12:12"), + 11.11f, + 12.22d, + new Float[] {11.11f, 11.12f}, + new Double[] {12.22d, 12.22d}, + new int[] {Integer.MIN_VALUE, Integer.MAX_VALUE}, + new long[] {Long.MIN_VALUE, Long.MAX_VALUE})) + .executeInsert("esTable") + .await(); + + // Wait for es construct index. + Thread.sleep(2000); + + List rows = + CollectionUtil.iteratorToList( + tEnv.executeSql( + "WITH t(id, vector) AS (SELECT * FROM (VALUES (1, CAST(ARRAY[11.11, 1] AS ARRAY))))\n" + + "SELECT * FROM t, LATERAL TABLE(VECTOR_SEARCH(TABLE esTable, DESCRIPTOR(f10), t.vector, 3))\n") + .collect()) + .stream() + .map(Row::toString) + .collect(Collectors.toList()); + assertThat(rows) + .isEqualTo( + Collections.singletonList( + "+I[1, [11.11, 1.0], 1, ABCDE, true, 127, 257, 65535, 2003-10-20, 2012-12-12T12:12:12, 11.11, 12.22, [11.11, 11.12], [12.22, 12.22], [-2147483648, 2147483647], [-9223372036854775808, 9223372036854775807], 1.767361044883728]")); + } + + @ParameterizedTest + @ValueSource(strings = {"cosineSimilarity", "l1norm", "l2norm", "hamming", "dotProduct"}) + void testSearchUsingFloatArray(String metric) throws Exception { + String index = "table_with_multiple_data_with_" + metric.toLowerCase(); + createSimpleIndex(index); + tEnv.executeSql( + "CREATE TABLE es_table(" + + " id BIGINT," + + " label STRING," + + " vector ARRAY" + + ")\n WITH (\n" + + String.format("'%s'='%s',\n", "connector", "elasticsearch-7") + + String.format( + "'%s'='%s',\n", + ElasticsearchConnectorOptions.INDEX_OPTION.key(), index) + + String.format( + "'%s'='%s'\n", + ElasticsearchConnectorOptions.HOSTS_OPTION.key(), + ES_CONTAINER.getHttpHostAddress()) + + ")"); + + tEnv.fromValues( + row(1L, "Batch", new Float[] {5f, 12f, 13f}), + row(2L, "Streaming", new Float[] {-5f, -12f, -13f}), + row(3L, "Big Data", new Float[] {1f, 1f, 0f})) + .executeInsert("es_table") + .await(); + + // Wait for es construct index. + Thread.sleep(2000); + + tEnv.executeSql( + String.format( + "CREATE TABLE src(\n" + + " id BIGINT PRIMARY KEY NOT ENFORCED,\n" + + " content STRING,\n" + + " index ARRAY\n" + + ") WITH (\n" + + " 'connector' = 'values',\n" + + " 'data-id' = '%s'\n" + + ");\n", + TestValuesTableFactory.registerData(inputData))); + assertThat( + CollectionUtil.iteratorToList( + tEnv.executeSql( + "SELECT content, label FROM src, LATERAL TABLE(VECTOR_SEARCH(TABLE es_table, DESCRIPTOR(vector), src.index, 2))") + .collect()) + .stream() + .map(Row::toString) + .collect(Collectors.toList())) + .isEqualTo( + Arrays.asList( + "+I[Spark, Batch]", + "+I[Spark, Big Data]", + "+I[Flink, Streaming]", + "+I[Flink, Big Data]")); + } + + private void createFullTypesIndex(String index) throws IOException { + XContentBuilder mappingBuilder = XContentFactory.jsonBuilder(); + mappingBuilder.startObject(); + mappingBuilder.startObject("properties"); + + // id: long + mappingBuilder.startObject("id"); + mappingBuilder.field("type", "long"); + mappingBuilder.endObject(); + + // f1: string + mappingBuilder.startObject("f1"); + mappingBuilder.field("type", "text"); + mappingBuilder.endObject(); + + // f2: boolean + mappingBuilder.startObject("f2"); + mappingBuilder.field("type", "boolean"); + mappingBuilder.endObject(); + + // f3: tinyint + mappingBuilder.startObject("f3"); + mappingBuilder.field("type", "byte"); + mappingBuilder.endObject(); + + // f4: long + mappingBuilder.startObject("f4"); + mappingBuilder.field("type", "short"); + mappingBuilder.endObject(); + + // f5: long + mappingBuilder.startObject("f5"); + mappingBuilder.field("type", "integer"); + mappingBuilder.endObject(); + + // f6: date + mappingBuilder.startObject("f6"); + mappingBuilder.field("type", "date"); + mappingBuilder.endObject(); + + // f7: timestamp + mappingBuilder.startObject("f7"); + mappingBuilder.field("type", "text"); + mappingBuilder.endObject(); + + // f8: float + mappingBuilder.startObject("f8"); + mappingBuilder.field("type", "float"); + mappingBuilder.endObject(); + + // f9: double + mappingBuilder.startObject("f9"); + mappingBuilder.field("type", "double"); + mappingBuilder.endObject(); + + // f10: Array + mappingBuilder.startObject("f10"); + mappingBuilder.field("type", "dense_vector"); + mappingBuilder.field("dims", 2); + mappingBuilder.endObject(); + + // f11: Array + mappingBuilder.startObject("f11"); + mappingBuilder.field("type", "dense_vector"); + mappingBuilder.field("dims", 2); + mappingBuilder.endObject(); + + // f12: Array + mappingBuilder.startObject("f12"); + mappingBuilder.field("type", "dense_vector"); + mappingBuilder.field("dims", 2); + mappingBuilder.endObject(); + + // f13: Array + mappingBuilder.startObject("f13"); + mappingBuilder.field("type", "dense_vector"); + mappingBuilder.field("dims", 2); + mappingBuilder.endObject(); + + mappingBuilder.endObject(); // end properties + mappingBuilder.endObject(); // end root + + CreateIndexRequest request = new CreateIndexRequest(index); + request.mapping(mappingBuilder); + + this.getClient().indices().create(request, RequestOptions.DEFAULT); + } + + private void createSimpleIndex(String index) throws IOException { + XContentBuilder mappingBuilder = XContentFactory.jsonBuilder(); + mappingBuilder.startObject(); + mappingBuilder.startObject("properties"); + + // id: long + mappingBuilder.startObject("id"); + mappingBuilder.field("type", "long"); + mappingBuilder.endObject(); + + // f1: string + mappingBuilder.startObject("label"); + mappingBuilder.field("type", "text"); + mappingBuilder.endObject(); + + // f2: float vector + mappingBuilder.startObject("vector"); + mappingBuilder.field("type", "dense_vector"); + mappingBuilder.field("dims", 3); + mappingBuilder.endObject(); + + mappingBuilder.endObject(); // end properties + mappingBuilder.endObject(); // end root + + CreateIndexRequest request = new CreateIndexRequest(index); + request.mapping(mappingBuilder); + + this.getClient().indices().create(request, RequestOptions.DEFAULT); + } +} diff --git a/flink-connector-elasticsearch7/src/test/resources/testcontainers.properties b/flink-connector-elasticsearch7/src/test/resources/testcontainers.properties new file mode 100644 index 00000000..07514cc8 --- /dev/null +++ b/flink-connector-elasticsearch7/src/test/resources/testcontainers.properties @@ -0,0 +1,17 @@ +################################################################################ +# Copyright 2023 Ververica Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +ryuk.container.image = testcontainers/ryuk:0.6.0 diff --git a/flink-connector-elasticsearch8/pom.xml b/flink-connector-elasticsearch8/pom.xml index e86af073..9e8962fb 100644 --- a/flink-connector-elasticsearch8/pom.xml +++ b/flink-connector-elasticsearch8/pom.xml @@ -88,6 +88,23 @@ under the License. ${jackson.version} + + org.apache.flink + flink-connector-elasticsearch-base + ${project.version} + + + + org.elasticsearch + elasticsearch + + + org.elasticsearch.client + elasticsearch-rest-high-level-client + + + + co.elastic.clients @@ -123,45 +140,46 @@ under the License. test - + org.apache.flink - flink-json + flink-table-planner-loader ${flink.version} test - org.apache.flink - flink-architecture-tests-test + flink-table-planner_${scala.binary.version} + ${flink.version} + test-jar test + org.apache.flink - flink-architecture-tests-production + flink-json + ${flink.version} test + org.apache.flink - flink-connector-base - ${flink.version} - test-jar + flink-architecture-tests-test test org.apache.flink - flink-table-planner-loader - ${flink.version} + flink-architecture-tests-production test org.apache.flink - flink-table-planner_2.12 + flink-connector-base ${flink.version} test-jar test diff --git a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkConfig.java b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkConfig.java index 93ecd783..34447bfc 100644 --- a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkConfig.java +++ b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkConfig.java @@ -24,6 +24,7 @@ import org.apache.flink.util.function.SerializableSupplier; import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient; +import co.elastic.clients.elasticsearch.ElasticsearchClient; import co.elastic.clients.json.jackson.JacksonJsonpMapper; import co.elastic.clients.transport.rest_client.RestClientTransport; import com.fasterxml.jackson.databind.ObjectMapper; @@ -108,6 +109,11 @@ public ElasticsearchAsyncClient createEsClient() { new RestClientTransport(this.getRestClient(), new JacksonJsonpMapper(mapper))); } + public ElasticsearchClient createEsSyncClient() { + return new ElasticsearchClient( + new RestClientTransport(this.getRestClient(), new JacksonJsonpMapper())); + } + private RestClient getRestClient() { RestClientBuilder restClientBuilder = RestClient.builder(hosts.toArray(new HttpHost[0])) diff --git a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8Configuration.java b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8Configuration.java index 9e3e8bfb..351fa55b 100644 --- a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8Configuration.java +++ b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8Configuration.java @@ -46,6 +46,8 @@ import static org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.HOSTS_OPTION; import static org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.INDEX_OPTION; import static org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.KEY_DELIMITER_OPTION; +import static org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.MAX_RETRIES; +import static org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.NUM_CANDIDATES; import static org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.PASSWORD_OPTION; import static org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.SOCKET_TIMEOUT; import static org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.SSL_CERTIFICATE_FINGERPRINT; @@ -132,6 +134,16 @@ public Optional getParallelism() { return config.getOptional(SINK_PARALLELISM); } + // --- Lookup / vector search accessors -------------------------------------------------- + + public int getMaxRetries() { + return config.get(MAX_RETRIES); + } + + public int getNumCandidates() { + return config.get(NUM_CANDIDATES); + } + /** * Parse Hosts String to list. * diff --git a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8ConnectorOptions.java b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8ConnectorOptions.java index 1defeba7..74a450b7 100644 --- a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8ConnectorOptions.java +++ b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8ConnectorOptions.java @@ -149,4 +149,21 @@ public class Elasticsearch8ConnectorOptions { .enumType(DeliveryGuarantee.class) .defaultValue(DeliveryGuarantee.AT_LEAST_ONCE) .withDescription("Optional delivery guarantee when committing."); + + // --- Lookup / vector search options ---------------------------------------------------- + + public static final ConfigOption MAX_RETRIES = + ConfigOptions.key("max-retries") + .intType() + .defaultValue(3) + .withFallbackKeys("lookup.max-retries") + .withDescription( + "The maximum allowed retries if a lookup/search operation fails."); + + public static final ConfigOption NUM_CANDIDATES = + ConfigOptions.key("vector-search.num-candidates") + .intType() + .defaultValue(100) + .withDescription( + "The number of candidate neighbors considered for each shard during the vector search."); } diff --git a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8DynamicSource.java b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8DynamicSource.java new file mode 100644 index 00000000..23c1eb61 --- /dev/null +++ b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8DynamicSource.java @@ -0,0 +1,145 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.flink.connector.elasticsearch.table; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.connector.elasticsearch.sink.NetworkConfig; +import org.apache.flink.connector.elasticsearch.table.search.ElasticsearchRowDataVectorSearchFunction; +import org.apache.flink.connector.elasticsearch.table.search.VectorSearchUtils; +import org.apache.flink.table.connector.Projection; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.VectorSearchTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; +import org.apache.flink.table.connector.source.search.VectorSearchFunctionProvider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.StringUtils; +import org.apache.flink.util.function.SerializableSupplier; + +import co.elastic.clients.transport.TransportUtils; +import org.apache.http.HttpHost; + +import javax.net.ssl.SSLContext; + +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * A {@link DynamicTableSource} that describes how to create a {@link Elasticsearch8DynamicSource} + * from a logical description. + */ +public class Elasticsearch8DynamicSource + implements VectorSearchTableSource, SupportsProjectionPushDown { + + protected final DecodingFormat> format; + protected final Elasticsearch8Configuration config; + private final String summaryString; + protected DataType physicalRowDataType; + + public Elasticsearch8DynamicSource( + DecodingFormat> format, + Elasticsearch8Configuration config, + DataType physicalRowDataType, + String summaryString) { + this.format = format; + this.config = config; + this.physicalRowDataType = physicalRowDataType; + this.summaryString = summaryString; + } + + @Override + public VectorSearchRuntimeProvider getSearchRuntimeProvider( + VectorSearchContext vectorSearchContext) { + + ElasticsearchRowDataVectorSearchFunction vectorSearchFunction = + new ElasticsearchRowDataVectorSearchFunction( + format.createRuntimeDecoder(vectorSearchContext, physicalRowDataType), + config.getMaxRetries(), + config.getNumCandidates(), + config.getIndex(), + VectorSearchUtils.resolveSearchColumn( + physicalRowDataType, vectorSearchContext), + DataType.getFieldNames(physicalRowDataType).toArray(new String[0]), + buildNetworkConfig()); + + return VectorSearchFunctionProvider.of(vectorSearchFunction); + } + + private NetworkConfig buildNetworkConfig() { + List hosts = config.getHosts(); + checkArgument(!hosts.isEmpty(), "Hosts cannot be empty."); + + String username = + config.getUsername() + .filter(v -> !StringUtils.isNullOrWhitespaceOnly(v)) + .orElse(null); + String password = + config.getPassword() + .filter(v -> !StringUtils.isNullOrWhitespaceOnly(v)) + .orElse(null); + String pathPrefix = + config.getPathPrefix() + .filter(v -> !StringUtils.isNullOrWhitespaceOnly(v)) + .orElse(null); + + SerializableSupplier sslContextSupplier = + config.getCertificateFingerprint() + .filter(v -> !StringUtils.isNullOrWhitespaceOnly(v)) + .>map( + fp -> () -> TransportUtils.sslContextFromCaFingerprint(fp)) + .orElse(null); + + return new NetworkConfig( + hosts, + username, + password, + null, + pathPrefix, + config.getConnectionRequestTimeout().map(d -> (int) d.toMillis()).orElse(null), + config.getConnectionTimeout().map(d -> (int) d.toMillis()).orElse(null), + config.getSocketTimeout().map(d -> (int) d.toMillis()).orElse(null), + sslContextSupplier, + null); + } + + @Override + public DynamicTableSource copy() { + return new Elasticsearch8DynamicSource(format, config, physicalRowDataType, summaryString); + } + + @Override + public String asSummaryString() { + return summaryString; + } + + @Override + public boolean supportsNestedProjection() { + return false; + } + + @Override + public void applyProjection(int[][] projectedFields, DataType type) { + this.physicalRowDataType = Projection.of(projectedFields).project(type); + } +} diff --git a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticSearch8AsyncDynamicTableFactory.java b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8DynamicTableFactory.java similarity index 84% rename from flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticSearch8AsyncDynamicTableFactory.java rename to flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8DynamicTableFactory.java index 3008a143..bf21cbce 100644 --- a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticSearch8AsyncDynamicTableFactory.java +++ b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8DynamicTableFactory.java @@ -1,5 +1,4 @@ /* - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -8,20 +7,19 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * http://www.apache.org/licenses/LICENSE-2.0 * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.flink.connector.elasticsearch.table; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ReadableConfig; @@ -31,9 +29,13 @@ import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.connector.Projection; +import org.apache.flink.table.connector.format.DecodingFormat; import org.apache.flink.table.connector.format.EncodingFormat; import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DeserializationFormatFactory; +import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.factories.SerializationFormatFactory; import org.apache.flink.table.types.DataType; @@ -63,24 +65,49 @@ import static org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.HOSTS_OPTION; import static org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.INDEX_OPTION; import static org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.KEY_DELIMITER_OPTION; +import static org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.MAX_RETRIES; +import static org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.NUM_CANDIDATES; import static org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.PASSWORD_OPTION; import static org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.SOCKET_TIMEOUT; import static org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.SSL_CERTIFICATE_FINGERPRINT; import static org.apache.flink.connector.elasticsearch.table.Elasticsearch8ConnectorOptions.USERNAME_OPTION; import static org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE; -import static org.apache.flink.table.connector.source.lookup.LookupOptions.MAX_RETRIES; import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY; import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS; import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE; import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_MAX_ROWS; import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM; -/** Factory for creating {@link ElasticSearch8AsyncDynamicSink} . */ +/** + * A factory for discovering both {@link Elasticsearch8DynamicSource} (lookup / vector search) and + * {@link ElasticSearch8AsyncDynamicSink} under the same {@code elasticsearch-8} identifier. + */ @Internal -public class ElasticSearch8AsyncDynamicTableFactory extends AsyncDynamicTableSinkFactory { - +public class Elasticsearch8DynamicTableFactory extends AsyncDynamicTableSinkFactory + implements DynamicTableSourceFactory { private static final String IDENTIFIER = "elasticsearch-8"; + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public DynamicTableSource createDynamicTableSource(Context context) { + final FactoryUtil.TableFactoryHelper helper = + FactoryUtil.createTableFactoryHelper(this, context); + + final DecodingFormat> format = + helper.discoverDecodingFormat(DeserializationFormatFactory.class, FORMAT_OPTION); + + Elasticsearch8Configuration config = getConfiguration(helper); + helper.validate(); + validateConfiguration(config); + + return new Elasticsearch8DynamicSource( + format, config, context.getPhysicalRowDataType(), capitalize(IDENTIFIER)); + } + @Override public DynamicTableSink createDynamicTableSink(Context context) { List primaryKeyLogicalTypesWithIndex = @@ -95,10 +122,8 @@ public DynamicTableSink createDynamicTableSink(Context context) { helper.validate(); validateConfiguration(config); - ElasticSearch8AsyncDynamicSink.ElasticSearch8AsyncDynamicSinkBuilder builder = - new ElasticSearch8AsyncDynamicSink.ElasticSearch8AsyncDynamicSinkBuilder(); - - return builder.setConfig(config) + return new ElasticSearch8AsyncDynamicSink.ElasticSearch8AsyncDynamicSinkBuilder() + .setConfig(config) .setFormat(format) .setPrimaryKeyLogicalTypesWithIndex(primaryKeyLogicalTypesWithIndex) .setPhysicalRowDataType(context.getPhysicalRowDataType()) @@ -107,40 +132,6 @@ public DynamicTableSink createDynamicTableSink(Context context) { .build(); } - ZoneId getLocalTimeZoneId(ReadableConfig readableConfig) { - final String zone = readableConfig.get(TableConfigOptions.LOCAL_TIME_ZONE); - - return TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zone) - ? ZoneId.systemDefault() - : ZoneId.of(zone); - } - - List getPrimaryKeyLogicalTypesWithIndex(Context context) { - DataType physicalRowDataType = context.getPhysicalRowDataType(); - int[] primaryKeyIndexes = context.getPrimaryKeyIndexes(); - if (primaryKeyIndexes.length != 0) { - DataType pkDataType = Projection.of(primaryKeyIndexes).project(physicalRowDataType); - - ElasticsearchValidationUtils.validatePrimaryKey(pkDataType); - } - - ResolvedSchema resolvedSchema = context.getCatalogTable().getResolvedSchema(); - return Arrays.stream(primaryKeyIndexes) - .mapToObj( - index -> { - Optional column = resolvedSchema.getColumn(index); - if (!column.isPresent()) { - throw new IllegalStateException( - String.format( - "No primary key column found with index '%s'.", - index)); - } - LogicalType logicalType = column.get().getDataType().getLogicalType(); - return new LogicalTypeWithIndex(index, logicalType); - }) - .collect(Collectors.toList()); - } - Elasticsearch8Configuration getConfiguration(FactoryUtil.TableFactoryHelper helper) { return new Elasticsearch8Configuration(helper.getOptions()); } @@ -187,9 +178,37 @@ static void validate(boolean condition, Supplier message) { } } - @Override - public String factoryIdentifier() { - return IDENTIFIER; + ZoneId getLocalTimeZoneId(ReadableConfig readableConfig) { + final String zone = readableConfig.get(TableConfigOptions.LOCAL_TIME_ZONE); + + return TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zone) + ? ZoneId.systemDefault() + : ZoneId.of(zone); + } + + List getPrimaryKeyLogicalTypesWithIndex(Context context) { + DataType physicalRowDataType = context.getPhysicalRowDataType(); + int[] primaryKeyIndexes = context.getPrimaryKeyIndexes(); + if (primaryKeyIndexes.length != 0) { + DataType pkDataType = Projection.of(primaryKeyIndexes).project(physicalRowDataType); + ElasticsearchValidationUtils.validatePrimaryKey(pkDataType); + } + + ResolvedSchema resolvedSchema = context.getCatalogTable().getResolvedSchema(); + return Arrays.stream(primaryKeyIndexes) + .mapToObj( + index -> { + Optional column = resolvedSchema.getColumn(index); + if (!column.isPresent()) { + throw new IllegalStateException( + String.format( + "No primary key column found with index '%s'.", + index)); + } + LogicalType logicalType = column.get().getDataType().getLogicalType(); + return new LogicalTypeWithIndex(index, logicalType); + }) + .collect(Collectors.toList()); } @Override @@ -201,16 +220,15 @@ public Set> requiredOptions() { public Set> optionalOptions() { return Stream.of( KEY_DELIMITER_OPTION, - BULK_FLUSH_MAX_SIZE_OPTION, BULK_FLUSH_MAX_ACTIONS_OPTION, - BULK_FLUSH_INTERVAL_OPTION, BULK_FLUSH_MAX_BUFFERED_ACTIONS_OPTION, BULK_FLUSH_MAX_IN_FLIGHT_ACTIONS_OPTION, + BULK_FLUSH_MAX_SIZE_OPTION, + BULK_FLUSH_INTERVAL_OPTION, CONNECTION_PATH_PREFIX_OPTION, CONNECTION_REQUEST_TIMEOUT, CONNECTION_TIMEOUT, SOCKET_TIMEOUT, - SSL_CERTIFICATE_FINGERPRINT, FORMAT_OPTION, DELIVERY_GUARANTEE_OPTION, PASSWORD_OPTION, @@ -221,7 +239,9 @@ public Set> optionalOptions() { PARTIAL_CACHE_EXPIRE_AFTER_WRITE, PARTIAL_CACHE_MAX_ROWS, PARTIAL_CACHE_CACHE_MISSING_KEY, - MAX_RETRIES) + MAX_RETRIES, + NUM_CANDIDATES, + SSL_CERTIFICATE_FINGERPRINT) .collect(Collectors.toSet()); } @@ -234,10 +254,10 @@ public Set> forwardOptions() { USERNAME_OPTION, KEY_DELIMITER_OPTION, BULK_FLUSH_MAX_ACTIONS_OPTION, - BULK_FLUSH_MAX_SIZE_OPTION, - BULK_FLUSH_INTERVAL_OPTION, BULK_FLUSH_MAX_BUFFERED_ACTIONS_OPTION, BULK_FLUSH_MAX_IN_FLIGHT_ACTIONS_OPTION, + BULK_FLUSH_MAX_SIZE_OPTION, + BULK_FLUSH_INTERVAL_OPTION, CONNECTION_PATH_PREFIX_OPTION, CONNECTION_REQUEST_TIMEOUT, CONNECTION_TIMEOUT, diff --git a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/search/ElasticsearchRowDataVectorSearchFunction.java b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/search/ElasticsearchRowDataVectorSearchFunction.java new file mode 100644 index 00000000..516752aa --- /dev/null +++ b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/table/search/ElasticsearchRowDataVectorSearchFunction.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.table.search; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.connector.elasticsearch.sink.NetworkConfig; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.VectorSearchFunction; + +import co.elastic.clients.elasticsearch.ElasticsearchClient; +import co.elastic.clients.elasticsearch.core.SearchRequest; +import co.elastic.clients.elasticsearch.core.SearchResponse; +import co.elastic.clients.json.JsonData; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** The {@link VectorSearchFunction} implementation for Elasticsearch 8. */ +public class ElasticsearchRowDataVectorSearchFunction + extends AbstractElasticsearchVectorSearchFunction { + private static final long serialVersionUID = 1L; + + private final int numCandidates; + private final NetworkConfig networkConfig; + + private transient ElasticsearchClient client; + + public ElasticsearchRowDataVectorSearchFunction( + DeserializationSchema deserializationSchema, + int maxRetryTimes, + int numCandidates, + String index, + String searchColumn, + String[] producedNames, + NetworkConfig networkConfig) { + super(deserializationSchema, maxRetryTimes, index, searchColumn, producedNames); + this.numCandidates = numCandidates; + this.networkConfig = checkNotNull(networkConfig, "No networkConfig supplied."); + } + + @Override + protected void doOpen(FunctionContext context) { + this.client = networkConfig.createEsSyncClient(); + } + + @Override + protected void doClose() throws IOException { + if (client != null) { + client._transport().close(); + client = null; + } + } + + @Override + protected SearchResult[] doSearch(int topK, RowData features) throws IOException { + List queryVector = new ArrayList<>(); + for (float feature : features.getArray(0).toFloatArray()) { + queryVector.add(feature); + } + + SearchRequest request = + new SearchRequest.Builder() + .index(index) + .knn( + kb -> + kb.field(searchColumn) + .numCandidates(numCandidates) + .queryVector(queryVector) + .k(topK)) + .source(src -> src.filter(f -> f.includes(Arrays.asList(producedNames)))) + .build(); + + SearchResponse searchResponse = client.search(request, JsonData.class); + + return searchResponse.hits().hits().stream() + .filter(hit -> hit.source() != null) + .map(hit -> new SearchResult(hit.source().toJson().toString(), hit.score())) + .toArray(SearchResult[]::new); + } +} diff --git a/flink-connector-elasticsearch8/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-connector-elasticsearch8/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index de87735b..b138912a 100644 --- a/flink-connector-elasticsearch8/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/flink-connector-elasticsearch8/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -13,5 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.flink.connector.elasticsearch.table.ElasticSearch8AsyncDynamicTableFactory - +org.apache.flink.connector.elasticsearch.table.Elasticsearch8DynamicTableFactory diff --git a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8DynamicSinkITCase.java b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8DynamicSinkITCase.java index 2381b882..f65e72af 100644 --- a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8DynamicSinkITCase.java +++ b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8DynamicSinkITCase.java @@ -94,8 +94,7 @@ public void testWritingDocuments() throws Exception { LocalDateTime.parse("2012-12-12T12:12:12"))); String index = "writing-documents"; - ElasticSearch8AsyncDynamicTableFactory sinkFactory = - new ElasticSearch8AsyncDynamicTableFactory(); + Elasticsearch8DynamicTableFactory sinkFactory = new Elasticsearch8DynamicTableFactory(); DynamicTableSink.SinkRuntimeProvider runtimeProvider = sinkFactory diff --git a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8VectorSearchITCase.java b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8VectorSearchITCase.java new file mode 100644 index 00000000..46e5c346 --- /dev/null +++ b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch8VectorSearchITCase.java @@ -0,0 +1,473 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.table; + +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.types.Row; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.Preconditions; + +import co.elastic.clients.elasticsearch.ElasticsearchClient; +import co.elastic.clients.elasticsearch._types.mapping.BooleanProperty; +import co.elastic.clients.elasticsearch._types.mapping.ByteNumberProperty; +import co.elastic.clients.elasticsearch._types.mapping.DateProperty; +import co.elastic.clients.elasticsearch._types.mapping.DenseVectorIndexOptions; +import co.elastic.clients.elasticsearch._types.mapping.DenseVectorProperty; +import co.elastic.clients.elasticsearch._types.mapping.DoubleNumberProperty; +import co.elastic.clients.elasticsearch._types.mapping.FloatNumberProperty; +import co.elastic.clients.elasticsearch._types.mapping.IntegerNumberProperty; +import co.elastic.clients.elasticsearch._types.mapping.LongNumberProperty; +import co.elastic.clients.elasticsearch._types.mapping.Property; +import co.elastic.clients.elasticsearch._types.mapping.ShortNumberProperty; +import co.elastic.clients.elasticsearch._types.mapping.TextProperty; +import co.elastic.clients.elasticsearch._types.mapping.TypeMapping; +import co.elastic.clients.elasticsearch.core.IndexResponse; +import co.elastic.clients.json.jackson.JacksonJsonpMapper; +import co.elastic.clients.transport.rest_client.RestClientTransport; +import org.apache.commons.codec.binary.Hex; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.elasticsearch.client.RestClient; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.elasticsearch.ElasticsearchContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.security.MessageDigest; +import java.security.cert.CertificateFactory; +import java.security.cert.X509Certificate; +import java.time.Duration; +import java.time.LocalDate; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** {@code VECTOR_SEARCH } ITCase for Elasticsearch 8. */ +@Testcontainers +public class Elasticsearch8VectorSearchITCase { + private static final Logger LOG = + LoggerFactory.getLogger(Elasticsearch8VectorSearchITCase.class); + + private static final int PARALLELISM = 2; + + public static final String ELASTICSEARCH_VERSION = "8.19.0"; + public static final DockerImageName ELASTICSEARCH_IMAGE = + DockerImageName.parse("docker.elastic.co/elasticsearch/elasticsearch") + .withTag(ELASTICSEARCH_VERSION); + private static final String ES_CLUSTER_USERNAME = "elastic"; + private static final String ES_CLUSTER_PASSWORD = "s3cret"; + + @Container + private static final ElasticsearchContainer ES_CONTAINER = createElasticsearchContainer(); + + private static ElasticsearchContainer createElasticsearchContainer() { + final ElasticsearchContainer container = + new ElasticsearchContainer(ELASTICSEARCH_IMAGE) + .withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g") + .withEnv("logger.org.elasticsearch", "ERROR") + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + container.withPassword(ES_CLUSTER_PASSWORD); + + container.setWaitStrategy( + Wait.defaultWaitStrategy().withStartupTimeout(Duration.ofMinutes(3))); + + return container; + } + + private String getEsCertFingerprint() throws Exception { + Preconditions.checkArgument(ES_CONTAINER.caCertAsBytes().isPresent()); + byte[] caCertBytes = ES_CONTAINER.caCertAsBytes().get(); + X509Certificate caCert = + (X509Certificate) + CertificateFactory.getInstance("X.509") + .generateCertificate(new ByteArrayInputStream(caCertBytes)); + byte[] fingerprint = MessageDigest.getInstance("SHA-256").digest(caCert.getEncoded()); + return Hex.encodeHexString(fingerprint); + } + + private ElasticsearchClient getClient() { + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials( + AuthScope.ANY, + new UsernamePasswordCredentials(ES_CLUSTER_USERNAME, ES_CLUSTER_PASSWORD)); + RestClient restClient = + RestClient.builder( + new HttpHost( + ES_CONTAINER.getHost(), + ES_CONTAINER.getFirstMappedPort(), + "https")) + .setHttpClientConfigCallback( + httpClientBuilder -> + httpClientBuilder + .setDefaultCredentialsProvider(credentialsProvider) + .setSSLContext( + ES_CONTAINER.createSslContextFromCa())) + .build(); + RestClientTransport transport = + new RestClientTransport(restClient, new JacksonJsonpMapper()); + return new ElasticsearchClient(transport); + } + + @RegisterExtension + private static final MiniClusterExtension MINI_CLUSTER_RESOURCE = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(PARALLELISM) + .build()); + + private final List inputData = + Arrays.asList( + Row.of(1L, "Spark", new Float[] {0.2718f, 0.6527f, 0.7076f}), + Row.of(2L, "Flink", new Float[] {-0.2718f, -0.6527f, -0.7076f})); + + private TableEnvironment tEnv; + + @BeforeEach + void beforeEach() { + tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode()); + } + + @Test + public void testSearchFullTypeVectorTable() throws Exception { + String index = "table_with_all_supported_types"; + createFullTypesIndex(index); + + // Insert data using ES client since elasticsearch-8 connector doesn't support sink + Map document = new HashMap<>(); + document.put("id", 1L); + document.put("f1", "ABCDE"); + document.put("f2", true); + document.put("f3", (byte) 127); + document.put("f4", (short) 257); + document.put("f5", 65535); + document.put("f6", LocalDate.ofEpochDay(12345).toString()); + document.put("f7", "2012-12-12 12:12:12"); + document.put("f8", 11.11f); + document.put("f9", 12.22d); + document.put("f10", new float[] {11.11f, 11.12f}); + document.put("f11", new double[] {12.22d, 12.22d}); + document.put("f12", new int[] {Integer.MIN_VALUE, Integer.MAX_VALUE}); + document.put("f13", new long[] {Long.MIN_VALUE, Long.MAX_VALUE}); + + IndexResponse response = getClient().index(i -> i.index(index).id("1").document(document)); + LOG.info("Indexed document with result: {}", response.result()); + + // Wait for es to refresh index + getClient().indices().refresh(r -> r.index(index)); + + String certFingerprint = getEsCertFingerprint(); + + tEnv.executeSql( + "CREATE TABLE esTable (" + + " id BIGINT,\n" + + " f1 STRING,\n" + + " f2 BOOLEAN,\n" + + " f3 TINYINT,\n" + + " f4 SMALLINT,\n" + + " f5 INTEGER,\n" + + " f6 DATE,\n" + + " f7 TIMESTAMP,\n" + + " f8 FLOAT,\n" + + " f9 DOUBLE,\n" + + " f10 ARRAY,\n" + + " f11 ARRAY,\n" + + " f12 ARRAY,\n" + + " f13 ARRAY,\n" + + " PRIMARY KEY (id) NOT ENFORCED\n" + + ")\n" + + "WITH (\n" + + String.format("'%s'='%s',\n", "connector", "elasticsearch-8") + + String.format( + "'%s'='%s',\n", + ElasticsearchConnectorOptions.INDEX_OPTION.key(), index) + + String.format( + "'%s'='%s',\n", + ElasticsearchConnectorOptions.HOSTS_OPTION.key(), + "https://" + ES_CONTAINER.getHttpHostAddress()) + + String.format( + "'%s'='%s',\n", + ElasticsearchConnectorOptions.USERNAME_OPTION.key(), + ES_CLUSTER_USERNAME) + + String.format( + "'%s'='%s',\n", + ElasticsearchConnectorOptions.PASSWORD_OPTION.key(), + ES_CLUSTER_PASSWORD) + + String.format( + "'%s'='%s'\n", + Elasticsearch8ConnectorOptions.SSL_CERTIFICATE_FINGERPRINT.key(), + certFingerprint) + + ")"); + + List rows = + CollectionUtil.iteratorToList( + tEnv.executeSql( + "WITH t(id, vector) AS (SELECT * FROM (VALUES (1, CAST(ARRAY[11.11, 1] AS ARRAY))))\n" + + "SELECT * FROM t, LATERAL TABLE(VECTOR_SEARCH(TABLE esTable, DESCRIPTOR(f10), t.vector, 3))\n") + .collect()) + .stream() + .map(Row::toString) + .collect(Collectors.toList()); + assertThat(rows) + .isEqualTo( + Collections.singletonList( + "+I[1, [11.11, 1.0], 1, ABCDE, true, 127, 257, 65535, 2003-10-20, 2012-12-12T12:12:12, 11.11, 12.22, [11.11, 11.12], [12.22, 12.22], [-2147483648, 2147483647], [-9223372036854775808, 9223372036854775807], 0.8836806]")); + } + + @ParameterizedTest + @ValueSource(strings = {"cosine", "l2_norm", "dot_product"}) + void testSearchUsingFloatArray(String metric) throws Exception { + String index = "table_with_multiple_data_with_" + metric.toLowerCase().replace("_", ""); + createSimpleIndex(index, metric); + + // Insert data using ES client since elasticsearch-8 connector doesn't support sink + // For dot_product, vectors must be normalized (unit vectors) + indexSimpleDocument(index, "1", 1L, "Batch", new float[] {0.2718f, 0.6527f, 0.7076f}); + indexSimpleDocument( + index, "2", 2L, "Streaming", new float[] {-0.2718f, -0.6527f, -0.7076f}); + indexSimpleDocument(index, "3", 3L, "Big Data", new float[] {0.7071f, 0.7071f, 0f}); + + // Refresh index to make documents searchable + getClient().indices().refresh(r -> r.index(index)); + + String certFingerprint = getEsCertFingerprint(); + + tEnv.executeSql( + "CREATE TABLE es_table(" + + " id BIGINT," + + " label STRING," + + " vector ARRAY" + + ")\n WITH (\n" + + String.format("'%s'='%s',\n", "connector", "elasticsearch-8") + + String.format( + "'%s'='%s',\n", + ElasticsearchConnectorOptions.INDEX_OPTION.key(), index) + + String.format( + "'%s'='%s',\n", + ElasticsearchConnectorOptions.HOSTS_OPTION.key(), + "https://" + ES_CONTAINER.getHttpHostAddress()) + + String.format( + "'%s'='%s',\n", + ElasticsearchConnectorOptions.USERNAME_OPTION.key(), + ES_CLUSTER_USERNAME) + + String.format( + "'%s'='%s',\n", + ElasticsearchConnectorOptions.PASSWORD_OPTION.key(), + ES_CLUSTER_PASSWORD) + + String.format( + "'%s'='%s'\n", + Elasticsearch8ConnectorOptions.SSL_CERTIFICATE_FINGERPRINT.key(), + certFingerprint) + + ")"); + + tEnv.executeSql( + String.format( + "CREATE TABLE src(\n" + + " id BIGINT PRIMARY KEY NOT ENFORCED,\n" + + " content STRING,\n" + + " index ARRAY\n" + + ") WITH (\n" + + " 'connector' = 'values',\n" + + " 'data-id' = '%s'\n" + + ");\n", + TestValuesTableFactory.registerData(inputData))); + assertThat( + CollectionUtil.iteratorToList( + tEnv.executeSql( + "SELECT content, label FROM src, LATERAL TABLE(VECTOR_SEARCH(TABLE es_table, DESCRIPTOR(vector), src.index, 2))") + .collect()) + .stream() + .map(Row::toString) + .collect(Collectors.toList())) + .isEqualTo( + Arrays.asList( + "+I[Spark, Batch]", + "+I[Spark, Big Data]", + "+I[Flink, Streaming]", + "+I[Flink, Big Data]")); + } + + private void createFullTypesIndex(String index) throws IOException { + // In ES 8.x, dense_vector requires index: true and similarity for kNN search + TypeMapping mapping = + TypeMapping.of( + m -> + m.properties( + "id", + Property.of( + p -> + p.long_( + LongNumberProperty.of( + l -> l)))) + .properties( + "f1", + Property.of(p -> p.text(TextProperty.of(t -> t)))) + .properties( + "f2", + Property.of( + p -> + p.boolean_( + BooleanProperty.of( + b -> b)))) + .properties( + "f3", + Property.of( + p -> + p.byte_( + ByteNumberProperty.of( + b -> b)))) + .properties( + "f4", + Property.of( + p -> + p.short_( + ShortNumberProperty.of( + s -> s)))) + .properties( + "f5", + Property.of( + p -> + p.integer( + IntegerNumberProperty.of( + i -> i)))) + .properties( + "f6", + Property.of(p -> p.date(DateProperty.of(d -> d)))) + .properties( + "f7", + Property.of(p -> p.text(TextProperty.of(t -> t)))) + .properties( + "f8", + Property.of( + p -> + p.float_( + FloatNumberProperty.of( + f -> f)))) + .properties( + "f9", + Property.of( + p -> + p.double_( + DoubleNumberProperty.of( + d -> d)))) + .properties( + "f10", + Property.of( + p -> + p.denseVector( + createDenseVectorProperty( + 2, "cosine")))) + .properties( + "f11", + Property.of( + p -> + p.denseVector( + createDenseVectorProperty( + 2, "cosine")))) + .properties( + "f12", + Property.of( + p -> + p.denseVector( + createDenseVectorProperty( + 2, "cosine")))) + .properties( + "f13", + Property.of( + p -> + p.denseVector( + createDenseVectorProperty( + 2, "cosine"))))); + + this.getClient().indices().create(c -> c.index(index).mappings(mapping)); + } + + private void createSimpleIndex(String index, String similarity) throws IOException { + // In ES 8.x, dense_vector requires index: true and similarity for kNN search + TypeMapping mapping = + TypeMapping.of( + m -> + m.properties( + "id", + Property.of( + p -> + p.long_( + LongNumberProperty.of( + l -> l)))) + .properties( + "label", + Property.of(p -> p.text(TextProperty.of(t -> t)))) + .properties( + "vector", + Property.of( + p -> + p.denseVector( + createDenseVectorProperty( + 3, similarity))))); + + this.getClient().indices().create(c -> c.index(index).mappings(mapping)); + } + + private DenseVectorProperty createDenseVectorProperty(int dims, String similarity) { + return DenseVectorProperty.of( + d -> + d.dims(dims) + .index(true) + .similarity(similarity) + .indexOptions( + DenseVectorIndexOptions.of( + o -> o.type("hnsw").m(16).efConstruction(100)))); + } + + private void indexSimpleDocument( + String index, String docId, Long id, String label, float[] vector) throws IOException { + Map document = new HashMap<>(); + document.put("id", id); + document.put("label", label); + document.put("vector", vector); + + IndexResponse response = + getClient().index(i -> i.index(index).id(docId).document(document)); + LOG.info("Indexed document {} with result: {}", docId, response.result()); + } +} diff --git a/flink-connector-elasticsearch8/src/test/resources/testcontainers.properties b/flink-connector-elasticsearch8/src/test/resources/testcontainers.properties new file mode 100644 index 00000000..07514cc8 --- /dev/null +++ b/flink-connector-elasticsearch8/src/test/resources/testcontainers.properties @@ -0,0 +1,17 @@ +################################################################################ +# Copyright 2023 Ververica Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +ryuk.container.image = testcontainers/ryuk:0.6.0 diff --git a/pom.xml b/pom.xml index 98775f6e..98dbf912 100644 --- a/pom.xml +++ b/pom.xml @@ -52,6 +52,7 @@ under the License. 2.2.1 + 2.12 2.15.3 4.13.2