Skip to content

[FLINK-38721] Support vector search for es connector.#137

Merged
wenjin272 merged 7 commits into
apache:mainfrom
wenjin272:support-vector-search
May 19, 2026
Merged

[FLINK-38721] Support vector search for es connector.#137
wenjin272 merged 7 commits into
apache:mainfrom
wenjin272:support-vector-search

Conversation

@wenjin272
Copy link
Copy Markdown
Contributor

No description provided.

@boring-cyborg
Copy link
Copy Markdown

boring-cyborg Bot commented Dec 24, 2025

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

this.networkClientConfig = networkClientConfig;
this.hosts = hosts;
this.callBridge = callBridge;
this.cosineSimilarity =
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does es7 only support cosine as the metric?

private final ElasticsearchConfiguration config;
protected final DecodingFormat<DeserializationSchema<RowData>> format;
protected final ElasticsearchConfiguration config;
private final int lookupMaxRetryTimes;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use same retry times for lookup and vector search?

@wenjin272 wenjin272 force-pushed the support-vector-search branch from d7abd81 to 17b9210 Compare February 2, 2026 07:01
Copy link
Copy Markdown

@xishuaidelin xishuaidelin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @wenjin272 , thanks for your contribution. I've left some comments below.

ElasticsearchApiCallBridge<RestHighLevelClient> callBridge) {

checkNotNull(deserializationSchema, "No DeserializationSchema supplied.");
checkNotNull(maxRetryTimes, "No maxRetryTimes supplied.");
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to check type 'int'.

NetworkConfig networkConfig) {

checkNotNull(deserializationSchema, "No DeserializationSchema supplied.");
checkNotNull(maxRetryTimes, "No maxRetryTimes supplied.");
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to check type int.

if (hit.source() != null) {
return new SearchResult(hit.source().toJson().toString(), hit.score());
} else {
return new SearchResult(null, hit.score());
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should filter the data with source=null to avoid NPE in method parseSearchResult.

searchHits.stream() .filter(hit -> hit.source() != null) .map(hit -> new SearchResult(hit.source().toJson().toString(), hit.score())) .toArray(SearchResult[]::new);

private SearchSourceBuilder searchSourceBuilder;

private final ElasticsearchApiCallBridge<RestHighLevelClient> callBridge;
private final NetworkClientConfig networkClientConfig;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function in ES8 uses NetworkConfig here. Are there any differences that require us to use a different config?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ES7 and ES8 use different client stacks, so the two configs live on opposite sides of the ApiCallBridge boundary:

ES7 ES8
Target client RestHighLevelClient co.elastic.clients.elasticsearch.ElasticsearchClient
Client creation path ElasticsearchApiCallBridge.createClient(NetworkClientConfig, hosts) NetworkConfig.createEsSyncClient() / createEsClient()
Config role Pure data holder Data holder + client factory
Already used by ES7 lookup function ES8 sink and lookup

The two configs carry roughly the same fields, but:

  • NetworkClientConfig is consumed by ApiCallBridge, which only ES7 implements.
  • NetworkConfig self-builds the ES8 Java API Client and is what the existing ES8 sink/lookup already depend on.

So this PR follows each module's existing convention rather than introducing a new split.

return VectorSearchFunctionProvider.of(vectorSearchFunction);
}

private String getSearchColumn(VectorSearchContext vectorSearchContext) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function appears to be identical to the one in ES8. Would it make sense to extract a base class to avoid duplication?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still seems unresolved.

import static org.apache.flink.util.Preconditions.checkNotNull;

/** The {@link VectorSearchFunction} implementation for Elasticsearch. */
public class ElasticsearchRowDataVectorSearchFunction extends VectorSearchFunction {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we introduce an AbstractElasticsearchVectorSearchFunction base class for both the ES7 and ES8 implementations? This would allow us to extract and reuse the shared logic.

wenjin272 added a commit to wenjin272/flink-connector-elasticsearch that referenced this pull request Apr 19, 2026
…class.

Address PR apache#137 review feedback from xishuaidelin:
 - Introduce AbstractElasticsearchVectorSearchFunction in the base module
   so ES7 / ES8 share the retry loop, result decoding and SearchResult
   type; each subclass now only supplies client initialization and the
   version-specific search call.
 - Filter hits whose source is null to avoid NPE when deserializing.
 - Drop redundant checkNotNull on the primitive maxRetryTimes parameter.

Also apply spotless formatting drift picked up on adjacent files.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
wenjin272 added a commit to wenjin272/flink-connector-elasticsearch that referenced this pull request Apr 21, 2026
…class.

Address PR apache#137 review feedback from xishuaidelin:
 - Introduce AbstractElasticsearchVectorSearchFunction in the base module
   so ES7 / ES8 share the retry loop, result decoding and SearchResult
   type; each subclass now only supplies client initialization and the
   version-specific search call.
 - Filter hits whose source is null to avoid NPE when deserializing.
 - Drop redundant checkNotNull on the primitive maxRetryTimes parameter.

Also apply spotless formatting drift picked up on adjacent files.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@wenjin272 wenjin272 force-pushed the support-vector-search branch from 17b75f1 to ae37638 Compare April 21, 2026 06:30
wenjin272 added a commit to wenjin272/flink-connector-elasticsearch that referenced this pull request Apr 21, 2026
…class.

Address PR apache#137 review feedback from xishuaidelin:
 - Introduce AbstractElasticsearchVectorSearchFunction in the base module
   so ES7 / ES8 share the retry loop, result decoding and SearchResult
   type; each subclass now only supplies client initialization and the
   version-specific search call.
 - Filter hits whose source is null to avoid NPE when deserializing.
 - Drop redundant checkNotNull on the primitive maxRetryTimes parameter.

Also apply spotless formatting drift picked up on adjacent files.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@wenjin272 wenjin272 force-pushed the support-vector-search branch from ae37638 to 39368cb Compare April 21, 2026 07:19
@wenjin272 wenjin272 force-pushed the support-vector-search branch 2 times, most recently from 722bc8c to 736b411 Compare April 21, 2026 09:26
@wenjin272 wenjin272 requested a review from xishuaidelin April 22, 2026 10:45
wenjin272 and others added 2 commits May 15, 2026 16:26
@wenjin272 wenjin272 force-pushed the support-vector-search branch from 736b411 to 5a3737b Compare May 15, 2026 08:27
@wenjin272
Copy link
Copy Markdown
Contributor Author

hi, @reswqa. Sorry to bother you. I noticed that you have made significant contributions to the flink-connector-elasticsearch project and have recently been preparing for a release, so I’d like to ask you a question.

FLINK-38721 introduces VectorSearchFunction usage which is only available since Flink 2.2 (FLIP-548). Currently, we aim to support vector search in the flink-connector-elasticsearch module. Although the Flink version bumped on the main branch is 2.2.1, I noticed that CI runs against both Flink 2.1 and 2.2 on the main branch, which causes compilation errors when building against Flink 2.1. I’d like to ask how the community typically resolves this issue.

@reswqa
Copy link
Copy Markdown
Member

reswqa commented May 15, 2026

Hi @wenjin272, Thanks for informing me.

As one ES connector version can be compatible with multiple flink versions. Currently, ES 3.1 supports Flink 1.18, 1.19 and 1.20. And ES 4.0 is the first version that supports flink 2.0.

I originally planned to release 4.1 which would support both 2.1, 2.2 and 2.3 simultaneously. But considering that vector search is a relatively important feature, I want to include it in this round of release.

I think it could be like this: synchronize other important features from main branch to the v4.0 branch, and then release version 4.0.1 (supporting Flink 2.0 and 2.1) and version 4.1 or 5.0 (supporting Flink >= 2.2) at the same time. In this case, the testing version of flink for the main branch are only 2.2 and 2.3.

What do you think?

@wenjin272
Copy link
Copy Markdown
Contributor Author

wenjin272 commented May 16, 2026

I think it could be like this: synchronize other important features from main branch to the v4.0 branch, and then release version 4.0.1 (supporting Flink 2.0 and 2.1) and version 4.1 (supporting Flink >= 2.2) at the same time. In this case, the testing version of flink for the main branch are only 2.2 and 2.3.

Ty for your kind reply @reswqa. It makes sense to me. I will do my best to push this PR forward in an effort to get it merged next week.

package org.apache.flink.connector.elasticsearch.table.search;

/** Metric for vector search. */
public enum SearchMetric {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Apache license header. Several other files have the same issue.


@Override
protected void doOpen(FunctionContext context) {
this.client = callBridge.createClient(networkClientConfig, hosts);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The client is created but never closed.

* A {@link DynamicTableSource} that describes how to create a {@link Elasticsearch7DynamicSource}
* from a logical description.
*/
public class Elasticsearch7DynamicSource extends ElasticsearchDynamicSource
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class does not override copy(). If copied, new source would lose the vectorSearch capability.

return VectorSearchFunctionProvider.of(vectorSearchFunction);
}

private String getSearchColumn(VectorSearchContext vectorSearchContext) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still seems unresolved.

wenjin272 and others added 4 commits May 19, 2026 12:58
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Deduplicate identical getSearchColumn logic from Elasticsearch7DynamicSource
and Elasticsearch8DynamicSource into a shared static helper in the base module.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Without overriding copy(), a copied source would fall back to the base
ElasticsearchDynamicSource and lose its VectorSearchTableSource capability.
Promote lookupCache / docType / summaryString in the base class to protected
so the subclass can reconstruct an Elasticsearch7DynamicSource.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@xishuaidelin
Copy link
Copy Markdown

LGTM!

@wenjin272 wenjin272 merged commit 08b7d66 into apache:main May 19, 2026
4 checks passed
@boring-cyborg
Copy link
Copy Markdown

boring-cyborg Bot commented May 19, 2026

Awesome work, congrats on your first merged pull request!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants