[FLINK-38721] Support vector search for es connector.#137
Conversation
|
Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html) |
| this.networkClientConfig = networkClientConfig; | ||
| this.hosts = hosts; | ||
| this.callBridge = callBridge; | ||
| this.cosineSimilarity = |
There was a problem hiding this comment.
Does es7 only support cosine as the metric?
| private final ElasticsearchConfiguration config; | ||
| protected final DecodingFormat<DeserializationSchema<RowData>> format; | ||
| protected final ElasticsearchConfiguration config; | ||
| private final int lookupMaxRetryTimes; |
There was a problem hiding this comment.
can we use same retry times for lookup and vector search?
d7abd81 to
17b9210
Compare
xishuaidelin
left a comment
There was a problem hiding this comment.
Hi @wenjin272 , thanks for your contribution. I've left some comments below.
| ElasticsearchApiCallBridge<RestHighLevelClient> callBridge) { | ||
|
|
||
| checkNotNull(deserializationSchema, "No DeserializationSchema supplied."); | ||
| checkNotNull(maxRetryTimes, "No maxRetryTimes supplied."); |
| NetworkConfig networkConfig) { | ||
|
|
||
| checkNotNull(deserializationSchema, "No DeserializationSchema supplied."); | ||
| checkNotNull(maxRetryTimes, "No maxRetryTimes supplied."); |
| if (hit.source() != null) { | ||
| return new SearchResult(hit.source().toJson().toString(), hit.score()); | ||
| } else { | ||
| return new SearchResult(null, hit.score()); |
There was a problem hiding this comment.
Maybe we should filter the data with source=null to avoid NPE in method parseSearchResult.
searchHits.stream() .filter(hit -> hit.source() != null) .map(hit -> new SearchResult(hit.source().toJson().toString(), hit.score())) .toArray(SearchResult[]::new);
| private SearchSourceBuilder searchSourceBuilder; | ||
|
|
||
| private final ElasticsearchApiCallBridge<RestHighLevelClient> callBridge; | ||
| private final NetworkClientConfig networkClientConfig; |
There was a problem hiding this comment.
This function in ES8 uses NetworkConfig here. Are there any differences that require us to use a different config?
There was a problem hiding this comment.
ES7 and ES8 use different client stacks, so the two configs live on opposite sides of the ApiCallBridge boundary:
| ES7 | ES8 | |
|---|---|---|
| Target client | RestHighLevelClient |
co.elastic.clients.elasticsearch.ElasticsearchClient |
| Client creation path | ElasticsearchApiCallBridge.createClient(NetworkClientConfig, hosts) |
NetworkConfig.createEsSyncClient() / createEsClient() |
| Config role | Pure data holder | Data holder + client factory |
| Already used by | ES7 lookup function | ES8 sink and lookup |
The two configs carry roughly the same fields, but:
NetworkClientConfigis consumed byApiCallBridge, which only ES7 implements.NetworkConfigself-builds the ES8 Java API Client and is what the existing ES8 sink/lookup already depend on.
So this PR follows each module's existing convention rather than introducing a new split.
| return VectorSearchFunctionProvider.of(vectorSearchFunction); | ||
| } | ||
|
|
||
| private String getSearchColumn(VectorSearchContext vectorSearchContext) { |
There was a problem hiding this comment.
This function appears to be identical to the one in ES8. Would it make sense to extract a base class to avoid duplication?
| import static org.apache.flink.util.Preconditions.checkNotNull; | ||
|
|
||
| /** The {@link VectorSearchFunction} implementation for Elasticsearch. */ | ||
| public class ElasticsearchRowDataVectorSearchFunction extends VectorSearchFunction { |
There was a problem hiding this comment.
Could we introduce an AbstractElasticsearchVectorSearchFunction base class for both the ES7 and ES8 implementations? This would allow us to extract and reuse the shared logic.
…class. Address PR apache#137 review feedback from xishuaidelin: - Introduce AbstractElasticsearchVectorSearchFunction in the base module so ES7 / ES8 share the retry loop, result decoding and SearchResult type; each subclass now only supplies client initialization and the version-specific search call. - Filter hits whose source is null to avoid NPE when deserializing. - Drop redundant checkNotNull on the primitive maxRetryTimes parameter. Also apply spotless formatting drift picked up on adjacent files. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…class. Address PR apache#137 review feedback from xishuaidelin: - Introduce AbstractElasticsearchVectorSearchFunction in the base module so ES7 / ES8 share the retry loop, result decoding and SearchResult type; each subclass now only supplies client initialization and the version-specific search call. - Filter hits whose source is null to avoid NPE when deserializing. - Drop redundant checkNotNull on the primitive maxRetryTimes parameter. Also apply spotless formatting drift picked up on adjacent files. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
17b75f1 to
ae37638
Compare
…class. Address PR apache#137 review feedback from xishuaidelin: - Introduce AbstractElasticsearchVectorSearchFunction in the base module so ES7 / ES8 share the retry loop, result decoding and SearchResult type; each subclass now only supplies client initialization and the version-specific search call. - Filter hits whose source is null to avoid NPE when deserializing. - Drop redundant checkNotNull on the primitive maxRetryTimes parameter. Also apply spotless formatting drift picked up on adjacent files. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
ae37638 to
39368cb
Compare
722bc8c to
736b411
Compare
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
736b411 to
5a3737b
Compare
|
hi, @reswqa. Sorry to bother you. I noticed that you have made significant contributions to the flink-connector-elasticsearch project and have recently been preparing for a release, so I’d like to ask you a question. FLINK-38721 introduces VectorSearchFunction usage which is only available since Flink 2.2 (FLIP-548). Currently, we aim to support vector search in the flink-connector-elasticsearch module. Although the Flink version bumped on the main branch is 2.2.1, I noticed that CI runs against both Flink 2.1 and 2.2 on the main branch, which causes compilation errors when building against Flink 2.1. I’d like to ask how the community typically resolves this issue. |
|
Hi @wenjin272, Thanks for informing me. As one ES connector version can be compatible with multiple flink versions. Currently, ES I originally planned to release I think it could be like this: synchronize other important features from What do you think? |
Ty for your kind reply @reswqa. It makes sense to me. I will do my best to push this PR forward in an effort to get it merged next week. |
| package org.apache.flink.connector.elasticsearch.table.search; | ||
|
|
||
| /** Metric for vector search. */ | ||
| public enum SearchMetric { |
There was a problem hiding this comment.
Missing Apache license header. Several other files have the same issue.
|
|
||
| @Override | ||
| protected void doOpen(FunctionContext context) { | ||
| this.client = callBridge.createClient(networkClientConfig, hosts); |
There was a problem hiding this comment.
The client is created but never closed.
| * A {@link DynamicTableSource} that describes how to create a {@link Elasticsearch7DynamicSource} | ||
| * from a logical description. | ||
| */ | ||
| public class Elasticsearch7DynamicSource extends ElasticsearchDynamicSource |
There was a problem hiding this comment.
This class does not override copy(). If copied, new source would lose the vectorSearch capability.
| return VectorSearchFunctionProvider.of(vectorSearchFunction); | ||
| } | ||
|
|
||
| private String getSearchColumn(VectorSearchContext vectorSearchContext) { |
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Deduplicate identical getSearchColumn logic from Elasticsearch7DynamicSource and Elasticsearch8DynamicSource into a shared static helper in the base module. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Without overriding copy(), a copied source would fall back to the base ElasticsearchDynamicSource and lose its VectorSearchTableSource capability. Promote lookupCache / docType / summaryString in the base class to protected so the subclass can reconstruct an Elasticsearch7DynamicSource. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
LGTM! |
|
Awesome work, congrats on your first merged pull request! |
No description provided.