diff --git a/bitsail-connectors/connector-elasticsearch/pom.xml b/bitsail-connectors/connector-elasticsearch/pom.xml
index 770f87c0f..818ab8dc5 100644
--- a/bitsail-connectors/connector-elasticsearch/pom.xml
+++ b/bitsail-connectors/connector-elasticsearch/pom.xml
@@ -63,6 +63,13 @@
${revision}
test
+
+
+ com.bytedance.bitsail
+ bitsail-connector-print
+ ${revision}
+ test
+
\ No newline at end of file
diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/base/EsConstants.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/base/EsConstants.java
index a21ab2a4f..1f446ba4d 100644
--- a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/base/EsConstants.java
+++ b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/base/EsConstants.java
@@ -53,4 +53,6 @@ public class EsConstants {
public static final String DEFAULT_OPERATION_TYPE = OPERATION_TYPE_INDEX;
public static final String ES_CONNECTOR_NAME = "elasticsearch";
+
+ public static final String SPLIT_COMMA = ",\\s*";
}
diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/error/ElasticsearchErrorCode.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/error/ElasticsearchErrorCode.java
new file mode 100644
index 000000000..6255abe8d
--- /dev/null
+++ b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/error/ElasticsearchErrorCode.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.bytedance.bitsail.connector.elasticsearch.error;
+
+import com.bytedance.bitsail.common.exception.ErrorCode;
+
+public enum ElasticsearchErrorCode implements ErrorCode {
+
+ REQUIRED_VALUE("Elasticsearch-00", "The configuration file is lack of necessary options"),
+ VALID_INDEX_FAILED("Elasticsearch-01", "Try to connect index failed."),
+ NOT_SUPPORT_SPLIT_STRATEGY("Elasticsearch-02", "Split strategy not support yet."),
+ FETCH_DATA_FAILED("Elasticsearch-03", "Fetch data from elasticsearch cluster failed."),
+ DESERIALIZE_FAILED("Elasticsearch-04", "Deserialize data from elasticsearch cluster failed.");
+
+ private final String code;
+
+ private final String describe;
+
+ ElasticsearchErrorCode(String code, String describe) {
+ this.code = code;
+ this.describe = describe;
+ }
+
+ @Override
+ public String getCode() {
+ return code;
+ }
+
+ @Override
+ public String getDescription() {
+ return describe;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Code:[%s], Describe:[%s]", this.code,
+ this.describe);
+ }
+}
diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/option/ElasticsearchReaderOptions.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/option/ElasticsearchReaderOptions.java
new file mode 100644
index 000000000..d98da16a5
--- /dev/null
+++ b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/option/ElasticsearchReaderOptions.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.bytedance.bitsail.connector.elasticsearch.option;
+
+import com.bytedance.bitsail.common.option.ConfigOption;
+import com.bytedance.bitsail.common.option.ReaderOptions;
+
+import com.alibaba.fastjson.TypeReference;
+
+import java.util.List;
+
+import static com.bytedance.bitsail.common.option.ConfigOptions.key;
+
+public interface ElasticsearchReaderOptions extends ReaderOptions.BaseReaderOptions {
+
+ ConfigOption> ES_HOSTS =
+ key(ReaderOptions.READER_PREFIX + "es_hosts")
+ .onlyReference(new TypeReference>() {
+ });
+
+ ConfigOption ES_INDEX =
+ key(ReaderOptions.READER_PREFIX + "es_index")
+ .noDefaultValue(String.class);
+
+ ConfigOption CONNECTION_REQUEST_TIMEOUT_MS =
+ key(ReaderOptions.READER_PREFIX + "connection_request_timeout_ms")
+ .defaultValue(10000);
+
+ ConfigOption CONNECTION_TIMEOUT_MS =
+ key(ReaderOptions.READER_PREFIX + "connection_timeout_ms")
+ .defaultValue(10000);
+
+ ConfigOption SOCKET_TIMEOUT_MS =
+ key(ReaderOptions.READER_PREFIX + "socket_timeout_ms")
+ .defaultValue(60000);
+
+ ConfigOption SCROLL_TIME =
+ key(ReaderOptions.READER_PREFIX + "scroll_time")
+ .defaultValue("1m");
+
+ ConfigOption SCROLL_SIZE =
+ key(ReaderOptions.READER_PREFIX + "scroll_size")
+ .defaultValue(100);
+
+ ConfigOption SPLIT_STRATEGY =
+ key(ReaderOptions.READER_PREFIX + "split_strategy")
+ .defaultValue("round_robin");
+}
diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/rest/EsRestClientBuilder.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/rest/EsRestClientBuilder.java
index f5721fd20..03671d114 100644
--- a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/rest/EsRestClientBuilder.java
+++ b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/rest/EsRestClientBuilder.java
@@ -19,8 +19,11 @@
import com.bytedance.bitsail.common.BitSailException;
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
import com.bytedance.bitsail.common.exception.CommonErrorCode;
+import com.bytedance.bitsail.common.option.ReaderOptions;
+import com.bytedance.bitsail.common.option.WriterOptions;
import com.bytedance.bitsail.common.util.Preconditions;
import com.bytedance.bitsail.connector.elasticsearch.base.NetUtil;
+import com.bytedance.bitsail.connector.elasticsearch.option.ElasticsearchReaderOptions;
import com.bytedance.bitsail.connector.elasticsearch.option.ElasticsearchWriterOptions;
import org.apache.commons.lang.StringUtils;
@@ -46,7 +49,13 @@ public class EsRestClientBuilder {
private final RestClientBuilder builder;
public EsRestClientBuilder(BitSailConfiguration jobConf) {
- List hostAddressList = jobConf.get(ElasticsearchWriterOptions.ES_HOSTS);
+ List hostAddressList = null;
+ if (jobConf.fieldExists(ReaderOptions.READER_PREFIX)) {
+ hostAddressList = jobConf.get(ElasticsearchReaderOptions.ES_HOSTS);
+ } else if (jobConf.fieldExists(WriterOptions.WRITER_PREFIX)) {
+ hostAddressList = jobConf.get(ElasticsearchWriterOptions.ES_HOSTS);
+ }
+
List hosts = parseHostsAddress(hostAddressList);
Preconditions.checkState(!hosts.isEmpty(), "cannot find any valid host from configurations.");
LOG.info("Elasticsearch http client hosts: {}", hosts);
diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/rest/source/EsSourceRequest.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/rest/source/EsSourceRequest.java
new file mode 100644
index 000000000..e790a6dfa
--- /dev/null
+++ b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/rest/source/EsSourceRequest.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.bytedance.bitsail.connector.elasticsearch.rest.source;
+
+import com.bytedance.bitsail.common.BitSailException;
+import com.bytedance.bitsail.connector.elasticsearch.error.ElasticsearchErrorCode;
+
+import org.apache.commons.compress.utils.Lists;
+import org.elasticsearch.action.search.ClearScrollRequest;
+import org.elasticsearch.action.search.ClearScrollResponse;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.SearchScrollRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.core.CountRequest;
+import org.elasticsearch.client.core.CountResponse;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class EsSourceRequest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(EsSourceRequest.class);
+ private final RestHighLevelClient restHighLevelClient;
+
+ public EsSourceRequest(RestHighLevelClient restHighLevelClient) {
+ this.restHighLevelClient = restHighLevelClient;
+ }
+
+ public Long validateIndex(String index) {
+ CountRequest countRequest = new CountRequest(index);
+ countRequest.query(QueryBuilders.matchAllQuery());
+ CountResponse response = null;
+ try {
+ response = restHighLevelClient.count(countRequest, RequestOptions.DEFAULT);
+ if (response == null) {
+ throw new BitSailException(ElasticsearchErrorCode.VALID_INDEX_FAILED,
+ "GET " + index + " metadata failed");
+ }
+ if (response.status() != RestStatus.OK) {
+ throw new BitSailException(ElasticsearchErrorCode.VALID_INDEX_FAILED,
+ String.format("Get %s response status code = %d", index, response.status().getStatus()));
+ }
+ } catch (IOException e) {
+ throw new BitSailException(ElasticsearchErrorCode.VALID_INDEX_FAILED, e.getMessage());
+ }
+ return response.getCount();
+ }
+
+ /**
+ * Returns a list of all records, Uses scroll API for pagination.
+ * According to: ...
+ *
+ * @param index index name in Elasticsearch cluster
+ * @param columnNames source field names
+ * @param scrollSize scroll size
+ * @param scrollTime scroll time
+ * @return list of documents
+ * @throws IOException throws IOException if Elasticsearch request fails
+ */
+ public List