diff --git a/bitsail-connectors/connector-elasticsearch/pom.xml b/bitsail-connectors/connector-elasticsearch/pom.xml index 770f87c0f..818ab8dc5 100644 --- a/bitsail-connectors/connector-elasticsearch/pom.xml +++ b/bitsail-connectors/connector-elasticsearch/pom.xml @@ -63,6 +63,13 @@ ${revision} test + + + com.bytedance.bitsail + bitsail-connector-print + ${revision} + test + \ No newline at end of file diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/base/EsConstants.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/base/EsConstants.java index a21ab2a4f..1f446ba4d 100644 --- a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/base/EsConstants.java +++ b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/base/EsConstants.java @@ -53,4 +53,6 @@ public class EsConstants { public static final String DEFAULT_OPERATION_TYPE = OPERATION_TYPE_INDEX; public static final String ES_CONNECTOR_NAME = "elasticsearch"; + + public static final String SPLIT_COMMA = ",\\s*"; } diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/error/ElasticsearchErrorCode.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/error/ElasticsearchErrorCode.java new file mode 100644 index 000000000..6255abe8d --- /dev/null +++ b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/error/ElasticsearchErrorCode.java @@ -0,0 +1,53 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.elasticsearch.error; + +import com.bytedance.bitsail.common.exception.ErrorCode; + +public enum ElasticsearchErrorCode implements ErrorCode { + + REQUIRED_VALUE("Elasticsearch-00", "The configuration file is lack of necessary options"), + VALID_INDEX_FAILED("Elasticsearch-01", "Try to connect index failed."), + NOT_SUPPORT_SPLIT_STRATEGY("Elasticsearch-02", "Split strategy not support yet."), + FETCH_DATA_FAILED("Elasticsearch-03", "Fetch data from elasticsearch cluster failed."), + DESERIALIZE_FAILED("Elasticsearch-04", "Deserialize data from elasticsearch cluster failed."); + + private final String code; + + private final String describe; + + ElasticsearchErrorCode(String code, String describe) { + this.code = code; + this.describe = describe; + } + + @Override + public String getCode() { + return code; + } + + @Override + public String getDescription() { + return describe; + } + + @Override + public String toString() { + return String.format("Code:[%s], Describe:[%s]", this.code, + this.describe); + } +} diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/option/ElasticsearchReaderOptions.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/option/ElasticsearchReaderOptions.java new file mode 100644 index 000000000..d98da16a5 --- /dev/null +++ b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/option/ElasticsearchReaderOptions.java @@ -0,0 +1,62 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.elasticsearch.option; + +import com.bytedance.bitsail.common.option.ConfigOption; +import com.bytedance.bitsail.common.option.ReaderOptions; + +import com.alibaba.fastjson.TypeReference; + +import java.util.List; + +import static com.bytedance.bitsail.common.option.ConfigOptions.key; + +public interface ElasticsearchReaderOptions extends ReaderOptions.BaseReaderOptions { + + ConfigOption> ES_HOSTS = + key(ReaderOptions.READER_PREFIX + "es_hosts") + .onlyReference(new TypeReference>() { + }); + + ConfigOption ES_INDEX = + key(ReaderOptions.READER_PREFIX + "es_index") + .noDefaultValue(String.class); + + ConfigOption CONNECTION_REQUEST_TIMEOUT_MS = + key(ReaderOptions.READER_PREFIX + "connection_request_timeout_ms") + .defaultValue(10000); + + ConfigOption CONNECTION_TIMEOUT_MS = + key(ReaderOptions.READER_PREFIX + "connection_timeout_ms") + .defaultValue(10000); + + ConfigOption SOCKET_TIMEOUT_MS = + key(ReaderOptions.READER_PREFIX + "socket_timeout_ms") + .defaultValue(60000); + + ConfigOption SCROLL_TIME = + key(ReaderOptions.READER_PREFIX + "scroll_time") + .defaultValue("1m"); + + ConfigOption SCROLL_SIZE = + key(ReaderOptions.READER_PREFIX + "scroll_size") + .defaultValue(100); + + ConfigOption SPLIT_STRATEGY = + key(ReaderOptions.READER_PREFIX + "split_strategy") + .defaultValue("round_robin"); +} diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/rest/EsRestClientBuilder.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/rest/EsRestClientBuilder.java index f5721fd20..03671d114 100644 --- a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/rest/EsRestClientBuilder.java +++ b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/rest/EsRestClientBuilder.java @@ -19,8 +19,11 @@ import com.bytedance.bitsail.common.BitSailException; import com.bytedance.bitsail.common.configuration.BitSailConfiguration; import com.bytedance.bitsail.common.exception.CommonErrorCode; +import com.bytedance.bitsail.common.option.ReaderOptions; +import com.bytedance.bitsail.common.option.WriterOptions; import com.bytedance.bitsail.common.util.Preconditions; import com.bytedance.bitsail.connector.elasticsearch.base.NetUtil; +import com.bytedance.bitsail.connector.elasticsearch.option.ElasticsearchReaderOptions; import com.bytedance.bitsail.connector.elasticsearch.option.ElasticsearchWriterOptions; import org.apache.commons.lang.StringUtils; @@ -46,7 +49,13 @@ public class EsRestClientBuilder { private final RestClientBuilder builder; public EsRestClientBuilder(BitSailConfiguration jobConf) { - List hostAddressList = jobConf.get(ElasticsearchWriterOptions.ES_HOSTS); + List hostAddressList = null; + if (jobConf.fieldExists(ReaderOptions.READER_PREFIX)) { + hostAddressList = jobConf.get(ElasticsearchReaderOptions.ES_HOSTS); + } else if (jobConf.fieldExists(WriterOptions.WRITER_PREFIX)) { + hostAddressList = jobConf.get(ElasticsearchWriterOptions.ES_HOSTS); + } + List hosts = parseHostsAddress(hostAddressList); Preconditions.checkState(!hosts.isEmpty(), "cannot find any valid host from configurations."); LOG.info("Elasticsearch http client hosts: {}", hosts); diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/rest/source/EsSourceRequest.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/rest/source/EsSourceRequest.java new file mode 100644 index 000000000..e790a6dfa --- /dev/null +++ b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/rest/source/EsSourceRequest.java @@ -0,0 +1,138 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.elasticsearch.rest.source; + +import com.bytedance.bitsail.common.BitSailException; +import com.bytedance.bitsail.connector.elasticsearch.error.ElasticsearchErrorCode; + +import org.apache.commons.compress.utils.Lists; +import org.elasticsearch.action.search.ClearScrollRequest; +import org.elasticsearch.action.search.ClearScrollResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchScrollRequest; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.core.CountRequest; +import org.elasticsearch.client.core.CountResponse; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public class EsSourceRequest { + + private static final Logger LOG = LoggerFactory.getLogger(EsSourceRequest.class); + private final RestHighLevelClient restHighLevelClient; + + public EsSourceRequest(RestHighLevelClient restHighLevelClient) { + this.restHighLevelClient = restHighLevelClient; + } + + public Long validateIndex(String index) { + CountRequest countRequest = new CountRequest(index); + countRequest.query(QueryBuilders.matchAllQuery()); + CountResponse response = null; + try { + response = restHighLevelClient.count(countRequest, RequestOptions.DEFAULT); + if (response == null) { + throw new BitSailException(ElasticsearchErrorCode.VALID_INDEX_FAILED, + "GET " + index + " metadata failed"); + } + if (response.status() != RestStatus.OK) { + throw new BitSailException(ElasticsearchErrorCode.VALID_INDEX_FAILED, + String.format("Get %s response status code = %d", index, response.status().getStatus())); + } + } catch (IOException e) { + throw new BitSailException(ElasticsearchErrorCode.VALID_INDEX_FAILED, e.getMessage()); + } + return response.getCount(); + } + + /** + * Returns a list of all records, Uses scroll API for pagination. + * According to: ... + * + * @param index index name in Elasticsearch cluster + * @param columnNames source field names + * @param scrollSize scroll size + * @param scrollTime scroll time + * @return list of documents + * @throws IOException throws IOException if Elasticsearch request fails + */ + public List> getAllDocuments(String index, List columnNames, int scrollSize, String scrollTime) throws IOException { + LOG.info("Start get all records from index: {}, scroll size: {}, scroll time: {}", index, scrollSize, scrollTime); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.size(scrollSize); + searchSourceBuilder.query(QueryBuilders.matchAllQuery()); + searchSourceBuilder.fetchSource(columnNames.toArray(new String[0]), null); + searchSourceBuilder.sort("_doc"); + + SearchRequest searchRequest = new SearchRequest(index); + searchRequest.scroll(scrollTime); + searchRequest.source(searchSourceBuilder); + + List> allData = Lists.newArrayList(); + + SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); + String scrollId = searchResponse.getScrollId(); + SearchHit[] hits = addSearchHits(searchResponse, allData); + LOG.info("Finish first time search, get {} hits.", Objects.isNull(hits) ? 0 : hits.length); + + while (Objects.nonNull(hits) && hits.length > 0) { + LOG.info("Continue scroll query with scrollId: {}", scrollId); + SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId); + searchScrollRequest.scroll(scrollTime); + searchResponse = restHighLevelClient.scroll(searchScrollRequest, RequestOptions.DEFAULT); + scrollId = searchResponse.getScrollId(); + hits = addSearchHits(searchResponse, allData); + } + + clearScroll(scrollId); + return allData; + } + + private void clearScroll(String scrollId) throws IOException { + ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); + clearScrollRequest.addScrollId(scrollId); + ClearScrollResponse clearScrollResponse = restHighLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT); + boolean succeeded = clearScrollResponse.isSucceeded(); + if (succeeded) { + LOG.info("Scroll response cleared successfully"); + } else { + LOG.error("Fail to clear scroll response"); + } + } + + private SearchHit[] addSearchHits(SearchResponse searchResponse, List> allData) { + SearchHit[] hits = searchResponse.getHits().getHits(); + if (Objects.isNull(hits)) { + return null; + } + for (SearchHit hit : hits) { + allData.add(hit.getSourceAsMap()); + } + return hits; + } +} diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/source/ElasticsearchSource.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/source/ElasticsearchSource.java new file mode 100644 index 000000000..1b06b5ac3 --- /dev/null +++ b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/source/ElasticsearchSource.java @@ -0,0 +1,94 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.elasticsearch.source; + +import com.bytedance.bitsail.base.connector.reader.v1.Boundedness; +import com.bytedance.bitsail.base.connector.reader.v1.Source; +import com.bytedance.bitsail.base.connector.reader.v1.SourceReader; +import com.bytedance.bitsail.base.connector.reader.v1.SourceSplitCoordinator; +import com.bytedance.bitsail.base.connector.writer.v1.state.EmptyState; +import com.bytedance.bitsail.base.execution.ExecutionEnviron; +import com.bytedance.bitsail.base.extension.ParallelismComputable; +import com.bytedance.bitsail.base.parallelism.ParallelismAdvice; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.common.row.Row; +import com.bytedance.bitsail.common.type.TypeInfoConverter; +import com.bytedance.bitsail.common.type.filemapping.FileMappingTypeInfoConverter; +import com.bytedance.bitsail.connector.elasticsearch.base.EsConstants; +import com.bytedance.bitsail.connector.elasticsearch.option.ElasticsearchReaderOptions; +import com.bytedance.bitsail.connector.elasticsearch.source.coordinator.ElasticsearchSourceSplitCoordinator; +import com.bytedance.bitsail.connector.elasticsearch.source.reader.ElasticsearchReader; +import com.bytedance.bitsail.connector.elasticsearch.source.split.ElasticsearchSourceSplit; +import com.bytedance.bitsail.connector.elasticsearch.source.split.ElasticsearchSplitByIndexStrategy; +import com.bytedance.bitsail.connector.elasticsearch.source.split.ElasticsearchSplitStrategy; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class ElasticsearchSource implements Source, ParallelismComputable { + + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSource.class); + + private BitSailConfiguration jobConf; + + @Override + public void configure(ExecutionEnviron execution, BitSailConfiguration readerConfiguration) throws IOException { + this.jobConf = readerConfiguration; + } + + @Override + public Boundedness getSourceBoundedness() { + return Boundedness.BOUNDEDNESS; + } + + @Override + public SourceReader createReader(SourceReader.Context readerContext) { + return new ElasticsearchReader(jobConf, readerContext); + } + + @Override + public SourceSplitCoordinator createSplitCoordinator( + SourceSplitCoordinator.Context coordinatorContext) { + return new ElasticsearchSourceSplitCoordinator(coordinatorContext, jobConf); + } + + @Override + public String getReaderName() { + return EsConstants.ES_CONNECTOR_NAME; + } + + @Override + public TypeInfoConverter createTypeInfoConverter() { + return new FileMappingTypeInfoConverter(getReaderName()); + } + + @Override + public ParallelismAdvice getParallelismAdvice(BitSailConfiguration commonConf, BitSailConfiguration selfConf, ParallelismAdvice upstreamAdvice) throws Exception { + int parallelism; + if (selfConf.fieldExists(ElasticsearchReaderOptions.READER_PARALLELISM_NUM)) { + parallelism = selfConf.get(ElasticsearchReaderOptions.READER_PARALLELISM_NUM); + LOG.info("Use user-defined reader parallelism: {}", parallelism); + } else { + ElasticsearchSplitStrategy splitStrategy = new ElasticsearchSplitByIndexStrategy(); + parallelism = splitStrategy.estimateSplitNum(jobConf); + LOG.info("Use index number as parallelism: {}", parallelism); + } + return new ParallelismAdvice(false, parallelism); + } +} diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/source/coordinator/ElasticsearchSourceSplitCoordinator.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/source/coordinator/ElasticsearchSourceSplitCoordinator.java new file mode 100644 index 000000000..0e2884948 --- /dev/null +++ b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/source/coordinator/ElasticsearchSourceSplitCoordinator.java @@ -0,0 +1,165 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.elasticsearch.source.coordinator; + +import com.bytedance.bitsail.base.connector.reader.v1.SourceSplitCoordinator; +import com.bytedance.bitsail.base.connector.writer.v1.state.EmptyState; +import com.bytedance.bitsail.common.BitSailException; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.elasticsearch.error.ElasticsearchErrorCode; +import com.bytedance.bitsail.connector.elasticsearch.option.ElasticsearchReaderOptions; +import com.bytedance.bitsail.connector.elasticsearch.rest.EsRestClientBuilder; +import com.bytedance.bitsail.connector.elasticsearch.source.reader.selector.HashReaderSelector; +import com.bytedance.bitsail.connector.elasticsearch.source.reader.selector.ReaderSelector; +import com.bytedance.bitsail.connector.elasticsearch.source.reader.selector.RoundRobinReaderSelector; +import com.bytedance.bitsail.connector.elasticsearch.source.split.ElasticsearchSourceSplit; +import com.bytedance.bitsail.connector.elasticsearch.source.split.ElasticsearchSplitByIndexStrategy; +import com.bytedance.bitsail.connector.elasticsearch.source.split.ElasticsearchSplitStrategy; + +import com.google.common.collect.Maps; +import lombok.Getter; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; +import org.elasticsearch.client.RestHighLevelClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static com.bytedance.bitsail.connector.elasticsearch.source.reader.selector.SelectorType.HASH; +import static com.bytedance.bitsail.connector.elasticsearch.source.reader.selector.SelectorType.ROUND_ROBIN; + +public class ElasticsearchSourceSplitCoordinator implements SourceSplitCoordinator { + + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSourceSplitCoordinator.class); + + private final SourceSplitCoordinator.Context context; + + private final BitSailConfiguration jobConf; + + @Getter + private final Map> splitAssignmentPlan; + + private RestHighLevelClient restClient; + + private ReaderSelector selector; + + public ElasticsearchSourceSplitCoordinator(SourceSplitCoordinator.Context context, + BitSailConfiguration jobConf) { + this.context = context; + this.jobConf = jobConf; + this.splitAssignmentPlan = Maps.newConcurrentMap(); + } + + @Override + public void start() { + this.restClient = new EsRestClientBuilder(this.jobConf).build(); + + ElasticsearchSplitStrategy elasticsearchSplitStrategy = new ElasticsearchSplitByIndexStrategy(restClient); + List splitList = elasticsearchSplitStrategy.getElasticsearchSplits(jobConf); + + int readerNum = context.totalParallelism(); + LOG.info("Found {} readers and {} splits.", readerNum, splitList.size()); + if (readerNum > splitList.size()) { + LOG.error("Reader number {} is larger than split number {}.", readerNum, splitList.size()); + } + + selector = getReaderSelector(jobConf, readerNum); + for (ElasticsearchSourceSplit split : splitList) { + int readerIndex = selector.getReaderIndex(split.getIndex()); + splitAssignmentPlan.computeIfAbsent(readerIndex, r -> new ArrayList<>()).add(split); + LOG.info("Will assign split {} to the {}-th reader.", split.uniqSplitId(), readerIndex); + } + } + + @Override + public void addReader(int subtaskId) { + LOG.info("Found reader {}.", subtaskId); + tryToAssignSplitsToReader(subtaskId); + } + + @Override + public void addSplitsBack(List splits, int subtaskId) { + LOG.info("Source reader {} return splits {}.", subtaskId, splits); + + for (ElasticsearchSourceSplit split : splits) { + int readerIndex = selector.getReaderIndex(split.getIndex()); + splitAssignmentPlan.computeIfAbsent(readerIndex, r -> new ArrayList<>()).add(split); + LOG.info("Re-assign split {} to the {}-th reader.", split.uniqSplitId(), readerIndex); + } + + tryToAssignSplitsToReader(subtaskId); + } + + private void tryToAssignSplitsToReader(int readerIndex) { + List splits = splitAssignmentPlan.get(readerIndex); + if (CollectionUtils.isEmpty(splits) && !context.registeredReaders().contains(readerIndex)) { + return; + } + + LOG.info("Try assigning splits reader {}, splits are: [{}]", readerIndex, + splits.stream().map(ElasticsearchSourceSplit::uniqSplitId).collect(Collectors.toList())); + splitAssignmentPlan.remove(readerIndex); + context.assignSplit(readerIndex, splits); + context.signalNoMoreSplits(readerIndex); + LOG.info("Finish assign splits for reader {}.", readerIndex); + } + + @Override + public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { + // empty + } + + @Override + public EmptyState snapshotState() throws Exception { + return new EmptyState(); + } + + @Override + public void close() { + try { + this.restClient.close(); + } catch (IOException e) { + LOG.error("Fail to close {}.", this.restClient.getClass().getSimpleName(), e); + } + } + + private ReaderSelector getReaderSelector(BitSailConfiguration jobConf, int readerNum) { + String splitStrategy = jobConf.getUnNecessaryOption( + ElasticsearchReaderOptions.SPLIT_STRATEGY, "round_robin"); + String upper = StringUtils.upperCase(splitStrategy); + ReaderSelector selector = null; + if (ROUND_ROBIN.name().equals(upper)) { + selector = new RoundRobinReaderSelector(readerNum); + } else if (HASH.name().equals(upper)) { + selector = new HashReaderSelector(readerNum); + } else { + throw BitSailException.asBitSailException( + ElasticsearchErrorCode.NOT_SUPPORT_SPLIT_STRATEGY, + String.format("Split strategy %s is not supported yet.", splitStrategy)); + } + + LOG.info("Select {} as split strategy.", selector.toString()); + return selector; + } +} diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/source/reader/ElasticsearchReader.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/source/reader/ElasticsearchReader.java new file mode 100644 index 000000000..6b2d2debe --- /dev/null +++ b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/source/reader/ElasticsearchReader.java @@ -0,0 +1,166 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.elasticsearch.source.reader; + +import com.bytedance.bitsail.base.connector.reader.v1.SourcePipeline; +import com.bytedance.bitsail.base.connector.reader.v1.SourceReader; +import com.bytedance.bitsail.common.BitSailException; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.common.model.ColumnInfo; +import com.bytedance.bitsail.common.option.ReaderOptions; +import com.bytedance.bitsail.common.row.Row; +import com.bytedance.bitsail.connector.elasticsearch.error.ElasticsearchErrorCode; +import com.bytedance.bitsail.connector.elasticsearch.option.ElasticsearchReaderOptions; +import com.bytedance.bitsail.connector.elasticsearch.rest.EsRestClientBuilder; +import com.bytedance.bitsail.connector.elasticsearch.rest.source.EsSourceRequest; +import com.bytedance.bitsail.connector.elasticsearch.source.reader.deserializer.ElasticsearchRowDeserializer; +import com.bytedance.bitsail.connector.elasticsearch.source.split.ElasticsearchSourceSplit; + +import org.elasticsearch.client.RestHighLevelClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Deque; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +public class ElasticsearchReader implements SourceReader { + + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchReader.class); + + private final int subTaskId; + + private int totalSplitNum = 0; + + private boolean hasNoMoreSplits = false; + + private final Deque splits; + + private final RestHighLevelClient restHighLevelClient; + + private EsSourceRequest esSourceRequest; + + private final List columnInfos; + + private final String scrollTime; + + private final int scrollSize; + + private final List columnNames; + + private final ElasticsearchRowDeserializer deserializer; + + private List> curResult; + + private ElasticsearchSourceSplit curSplit; + + private AtomicInteger readIndex = new AtomicInteger(0); + + public ElasticsearchReader(BitSailConfiguration jobConf, SourceReader.Context readerContext) { + this.subTaskId = readerContext.getIndexOfSubtask(); + + this.columnInfos = jobConf.getNecessaryOption( + ReaderOptions.BaseReaderOptions.COLUMNS, ElasticsearchErrorCode.REQUIRED_VALUE); + columnNames = columnInfos.stream().map(ColumnInfo::getName).collect(Collectors.toList()); + this.scrollTime = jobConf.get(ElasticsearchReaderOptions.SCROLL_TIME); + this.scrollSize = jobConf.get(ElasticsearchReaderOptions.SCROLL_SIZE); + + this.splits = new ConcurrentLinkedDeque<>(); + + this.restHighLevelClient = new EsRestClientBuilder(jobConf).build(); + this.esSourceRequest = new EsSourceRequest(this.restHighLevelClient); + this.deserializer = new ElasticsearchRowDeserializer( + readerContext.getTypeInfos(), readerContext.getFieldNames(), jobConf); + + LOG.info("Elasticsearch source reader {} is initialized.", subTaskId); + } + + @Override + public void start() { + LOG.info("Task {} started.", subTaskId); + } + + @Override + public void pollNext(SourcePipeline pipeline) throws Exception { + if (Objects.isNull(curResult) && splits.isEmpty()) { + return; + } + + if (Objects.isNull(curResult)) { + this.curSplit = splits.poll(); + try { + this.curResult = esSourceRequest.getAllDocuments( + this.curSplit.getIndex(), columnNames, scrollSize, scrollTime); + } catch (IOException e) { + throw BitSailException.asBitSailException( + ElasticsearchErrorCode.FETCH_DATA_FAILED, + "Failed to fetch document data." + ); + } + LOG.info("Task {} finish read split: {}=[{}]", subTaskId, this.curSplit.getSplitId(), this.curSplit); + } + + if (this.curResult.size() > 0) { + for (int i = 0; i < curResult.size(); i++) { + Map doc = this.curResult.get(this.readIndex.getAndIncrement()); + Row row = deserializer.deserialize(doc); + pipeline.output(row); + } + } else { + curResult = null; + LOG.info("Task {} finishes reading rows from split: {}", subTaskId, curSplit.uniqSplitId()); + } + } + + @Override + public void addSplits(List splits) { + totalSplitNum += splits.size(); + this.splits.addAll(splits); + } + + @Override + public boolean hasMoreElements() { + if (hasNoMoreSplits && splits.isEmpty()) { + LOG.info("Finish reading all {} splits.", totalSplitNum); + return false; + } + return true; + } + + @Override + public List snapshotState(long checkpointId) { + return Collections.emptyList(); + } + + @Override + public void close() throws Exception { + this.restHighLevelClient.close(); + LOG.info("Task {} is closed.", subTaskId); + } + + @Override + public void notifyNoMoreSplits() { + this.hasNoMoreSplits = true; + LOG.info("No more splits will be assigned."); + } +} diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/source/reader/deserializer/ElasticsearchRowDeserializer.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/source/reader/deserializer/ElasticsearchRowDeserializer.java new file mode 100644 index 000000000..4365e088a --- /dev/null +++ b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/source/reader/deserializer/ElasticsearchRowDeserializer.java @@ -0,0 +1,194 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.elasticsearch.source.reader.deserializer; + +import com.bytedance.bitsail.base.format.DeserializationSchema; +import com.bytedance.bitsail.common.BitSailException; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.common.exception.CommonErrorCode; +import com.bytedance.bitsail.common.row.Row; +import com.bytedance.bitsail.common.typeinfo.BasicTypeInfo; +import com.bytedance.bitsail.common.typeinfo.TypeInfo; +import com.bytedance.bitsail.common.typeinfo.TypeInfos; +import com.bytedance.bitsail.connector.elasticsearch.error.ElasticsearchErrorCode; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Timestamp; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +public class ElasticsearchRowDeserializer implements DeserializationSchema, Row> { + + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchRowDeserializer.class); + private final TypeInfo[] typeInfos; + + private final String[] fieldNames; + + private final int fieldSize; + + private final BitSailConfiguration deserializerConf; + + public ElasticsearchRowDeserializer(TypeInfo[] typeInfos, String[] fieldNames, BitSailConfiguration deserializerConf) { + this.typeInfos = typeInfos; + this.fieldNames = fieldNames; + this.deserializerConf = deserializerConf; + this.fieldSize = this.typeInfos.length; + } + + @Override + public Row deserialize(Map message) { + Row row = new Row(this.fieldSize); + for (int i = 0; i < this.fieldSize; i++) { + String fieldName = this.fieldNames[i]; + Object value = message.get(fieldName); + if (Objects.nonNull(value)) { + TypeInfo typeInfo = this.typeInfos[i]; + try { + row.setField(i, convert(typeInfo, value.toString())); + } catch (ParseException e) { + LOG.error("Parse value {} with type {} failed.", value, typeInfo); + throw new RuntimeException(e); + } + } + } + return row; + } + + @Override + public boolean isEndOfStream(Row nextElement) { + return false; + } + + private Object convert(TypeInfo typeInfo, String value) throws ParseException { + if (!(typeInfo instanceof BasicTypeInfo)) { + throw BitSailException.asBitSailException(CommonErrorCode.UNSUPPORTED_COLUMN_TYPE, typeInfo.getTypeClass().getName() + " is not supported yet."); + } + Class curClass = typeInfo.getTypeClass(); + if (TypeInfos.BYTE_TYPE_INFO.getTypeClass() == curClass) { + return Boolean.parseBoolean(value); + } + if (TypeInfos.SHORT_TYPE_INFO.getTypeClass() == curClass) { + return Short.parseShort(value); + } + if (TypeInfos.INT_TYPE_INFO.getTypeClass() == curClass) { + return Integer.parseInt(value); + } + if (TypeInfos.LONG_TYPE_INFO.getTypeClass() == curClass) { + return Long.parseLong(value); + } + if (TypeInfos.FLOAT_TYPE_INFO.getTypeClass() == curClass) { + return Float.parseFloat(value); + } + if (TypeInfos.DOUBLE_TYPE_INFO.getTypeClass() == curClass) { + return Double.parseDouble(value); + } + if (TypeInfos.BIG_INTEGER_TYPE_INFO.getTypeClass() == curClass) { + return new BigInteger(value); + } + if (TypeInfos.BIG_DECIMAL_TYPE_INFO.getTypeClass() == curClass) { + return new BigDecimal(value); + } + if (TypeInfos.STRING_TYPE_INFO.getTypeClass() == curClass) { + return value; + } + if (TypeInfos.BOOLEAN_TYPE_INFO.getTypeClass() == curClass) { + return Boolean.parseBoolean(value); + } + if (TypeInfos.LOCAL_DATE_TYPE_INFO.getTypeClass() == curClass) { + return LocalDate.parse(value); + } + if (TypeInfos.LOCAL_TIME_TYPE_INFO.getTypeClass() == curClass) { + return LocalTime.parse(value); + } + if (TypeInfos.LOCAL_DATE_TIME_TYPE_INFO.getTypeClass() == curClass) { + return LocalDateTime.parse(value); + } + if (TypeInfos.SQL_DATE_TYPE_INFO.getTypeClass() == curClass) { + String format = determineDateFormat(value); + SimpleDateFormat simpleDateFormat = new SimpleDateFormat(format); + return simpleDateFormat.parse(value); + } + if (TypeInfos.SQL_TIME_TYPE_INFO.getTypeClass() == curClass) { + return Timestamp.parse(value); + } + if (TypeInfos.SQL_TIMESTAMP_TYPE_INFO.getTypeClass() == curClass) { + return Timestamp.parse(value); + } + if (TypeInfos.VOID_TYPE_INFO.getTypeClass() == curClass) { + return null; + } + throw new UnsupportedOperationException("Unsupported data type: " + typeInfo); + } + + /** + * Determine SimpleDateFormat pattern matching with the given date string. Returns null if + * format is unknown. You can simply extend DateUtil with more formats if needed. + * ... + * + * @param dateString The date string to determine the SimpleDateFormat pattern for. + * @return The matching SimpleDateFormat pattern, or null if format is unknown. + * @see SimpleDateFormat + */ + public static String determineDateFormat(String dateString) { + for (String regexp : DATE_FORMAT_REGEXPS.keySet()) { + if (dateString.toLowerCase().matches(regexp)) { + return DATE_FORMAT_REGEXPS.get(regexp); + } + } + // Unknown format. + throw BitSailException.asBitSailException( + ElasticsearchErrorCode.DESERIALIZE_FAILED, + "Format not recognized." + ); + } + + private static final Map DATE_FORMAT_REGEXPS = new HashMap() {{ + put("^\\d{8}$", "yyyyMMdd"); + put("^\\d{1,2}-\\d{1,2}-\\d{4}$", "dd-MM-yyyy"); + put("^\\d{4}-\\d{1,2}-\\d{1,2}$", "yyyy-MM-dd"); + put("^\\d{1,2}/\\d{1,2}/\\d{4}$", "MM/dd/yyyy"); + put("^\\d{4}/\\d{1,2}/\\d{1,2}$", "yyyy/MM/dd"); + put("^\\d{1,2}\\s[a-z]{3}\\s\\d{4}$", "dd MMM yyyy"); + put("^\\d{1,2}\\s[a-z]{4,}\\s\\d{4}$", "dd MMMM yyyy"); + put("^\\d{12}$", "yyyyMMddHHmm"); + put("^\\d{8}\\s\\d{4}$", "yyyyMMdd HHmm"); + put("^\\d{1,2}-\\d{1,2}-\\d{4}\\s\\d{1,2}:\\d{2}$", "dd-MM-yyyy HH:mm"); + put("^\\d{4}-\\d{1,2}-\\d{1,2}\\s\\d{1,2}:\\d{2}$", "yyyy-MM-dd HH:mm"); + put("^\\d{1,2}/\\d{1,2}/\\d{4}\\s\\d{1,2}:\\d{2}$", "MM/dd/yyyy HH:mm"); + put("^\\d{4}/\\d{1,2}/\\d{1,2}\\s\\d{1,2}:\\d{2}$", "yyyy/MM/dd HH:mm"); + put("^\\d{1,2}\\s[a-z]{3}\\s\\d{4}\\s\\d{1,2}:\\d{2}$", "dd MMM yyyy HH:mm"); + put("^\\d{1,2}\\s[a-z]{4,}\\s\\d{4}\\s\\d{1,2}:\\d{2}$", "dd MMMM yyyy HH:mm"); + put("^\\d{14}$", "yyyyMMddHHmmss"); + put("^\\d{8}\\s\\d{6}$", "yyyyMMdd HHmmss"); + put("^\\d{1,2}-\\d{1,2}-\\d{4}\\s\\d{1,2}:\\d{2}:\\d{2}$", "dd-MM-yyyy HH:mm:ss"); + put("^\\d{4}-\\d{1,2}-\\d{1,2}\\s\\d{1,2}:\\d{2}:\\d{2}$", "yyyy-MM-dd HH:mm:ss"); + put("^\\d{1,2}/\\d{1,2}/\\d{4}\\s\\d{1,2}:\\d{2}:\\d{2}$", "MM/dd/yyyy HH:mm:ss"); + put("^\\d{4}/\\d{1,2}/\\d{1,2}\\s\\d{1,2}:\\d{2}:\\d{2}$", "yyyy/MM/dd HH:mm:ss"); + put("^\\d{1,2}\\s[a-z]{3}\\s\\d{4}\\s\\d{1,2}:\\d{2}:\\d{2}$", "dd MMM yyyy HH:mm:ss"); + put("^\\d{1,2}\\s[a-z]{4,}\\s\\d{4}\\s\\d{1,2}:\\d{2}:\\d{2}$", "dd MMMM yyyy HH:mm:ss"); + }}; +} diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/source/reader/selector/HashReaderSelector.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/source/reader/selector/HashReaderSelector.java new file mode 100644 index 000000000..c31edea10 --- /dev/null +++ b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/source/reader/selector/HashReaderSelector.java @@ -0,0 +1,35 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.elasticsearch.source.reader.selector; + +import lombok.AllArgsConstructor; +import lombok.Data; + +@Data +@AllArgsConstructor +public class HashReaderSelector implements ReaderSelector { + + private int totalReaderNum; + + public int getReaderIndex(String index) { + return (index.hashCode() & Integer.MAX_VALUE) % totalReaderNum; + } + + public String toString() { + return "HashReaderSelector"; + } +} diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/source/reader/selector/ReaderSelector.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/source/reader/selector/ReaderSelector.java new file mode 100644 index 000000000..a21d679c3 --- /dev/null +++ b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/source/reader/selector/ReaderSelector.java @@ -0,0 +1,29 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.elasticsearch.source.reader.selector; + +public interface ReaderSelector { + + /** + * Use selector to determine correspond reader idx + * + * @param index es index + * @return reader idx + */ + int getReaderIndex(String index); + +} diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/source/reader/selector/RoundRobinReaderSelector.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/source/reader/selector/RoundRobinReaderSelector.java new file mode 100644 index 000000000..867cfc5c9 --- /dev/null +++ b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/source/reader/selector/RoundRobinReaderSelector.java @@ -0,0 +1,38 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.elasticsearch.source.reader.selector; + +import lombok.Data; + +@Data +public class RoundRobinReaderSelector implements ReaderSelector { + private long startIndex = 0; + + private int totalReaderNum; + + public RoundRobinReaderSelector(int totalReaderNum) { + this.totalReaderNum = totalReaderNum; + } + + public int getReaderIndex(String index) { + return (int) startIndex++ % totalReaderNum; + } + + public String toString() { + return "RoundRobinReaderSelector"; + } +} diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/source/reader/selector/SelectorType.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/source/reader/selector/SelectorType.java new file mode 100644 index 000000000..b4ccaee69 --- /dev/null +++ b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/source/reader/selector/SelectorType.java @@ -0,0 +1,24 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.elasticsearch.source.reader.selector; + +public enum SelectorType { + + ROUND_ROBIN, + + HASH +} diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/source/split/ElasticsearchSourceSplit.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/source/split/ElasticsearchSourceSplit.java new file mode 100644 index 000000000..fd49c7b6e --- /dev/null +++ b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/source/split/ElasticsearchSourceSplit.java @@ -0,0 +1,44 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.elasticsearch.source.split; + +import com.bytedance.bitsail.base.connector.reader.v1.SourceSplit; + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +@Setter +@Getter +@ToString +public class ElasticsearchSourceSplit implements SourceSplit { + + public static final String SOURCE_SPLIT_PREFIX = "elasticsearch_source_split_"; + + private final String splitId; + + private String index; + + public ElasticsearchSourceSplit(int splitId) { + this.splitId = SOURCE_SPLIT_PREFIX + splitId; + } + + @Override + public String uniqSplitId() { + return this.splitId; + } +} diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/source/split/ElasticsearchSplitByIndexStrategy.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/source/split/ElasticsearchSplitByIndexStrategy.java new file mode 100644 index 000000000..f984a8076 --- /dev/null +++ b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/source/split/ElasticsearchSplitByIndexStrategy.java @@ -0,0 +1,80 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.elasticsearch.source.split; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.elasticsearch.option.ElasticsearchReaderOptions; +import com.bytedance.bitsail.connector.elasticsearch.rest.source.EsSourceRequest; +import com.bytedance.bitsail.connector.elasticsearch.util.SplitStringUtil; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.commons.compress.utils.Lists; +import org.elasticsearch.client.RestHighLevelClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Objects; + +import static com.bytedance.bitsail.connector.elasticsearch.error.ElasticsearchErrorCode.REQUIRED_VALUE; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class ElasticsearchSplitByIndexStrategy implements ElasticsearchSplitStrategy { + + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSplitByIndexStrategy.class); + private RestHighLevelClient restHighLevelClient; + + public List getElasticsearchSplits(BitSailConfiguration jobConf) { + List splits = Lists.newArrayList(); + + String indices = jobConf.getNecessaryOption(ElasticsearchReaderOptions.ES_INDEX, REQUIRED_VALUE); + EsSourceRequest esSourceRequest = new EsSourceRequest(restHighLevelClient); + + String[] splitIndices = SplitStringUtil.splitString(indices); + int idx = 0; + for (String index : splitIndices) { + if (check(index, esSourceRequest)) { + ElasticsearchSourceSplit split = new ElasticsearchSourceSplit(idx++); + split.setIndex(index); + splits.add(split); + } + } + return splits; + } + + private boolean check(String index, EsSourceRequest esSourceRequest) { + Long count = esSourceRequest.validateIndex(index); + return Objects.nonNull(count) && count > 0; + } + + /** + * Used for determine parallelism num, simply based on indices num. + * + * @return estimate split num + */ + @Override + public int estimateSplitNum(BitSailConfiguration configuration) { + String indices = configuration.getNecessaryOption(ElasticsearchReaderOptions.ES_INDEX, REQUIRED_VALUE); + int estimatedSplitNum = SplitStringUtil.splitString(indices).length; + LOG.info("Estimated split num is: {}", estimatedSplitNum); + return estimatedSplitNum; + } +} diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/source/split/ElasticsearchSplitStrategy.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/source/split/ElasticsearchSplitStrategy.java new file mode 100644 index 000000000..a59eaef1c --- /dev/null +++ b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/source/split/ElasticsearchSplitStrategy.java @@ -0,0 +1,40 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.elasticsearch.source.split; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; + +import java.util.List; + +public interface ElasticsearchSplitStrategy { + + /** + * Get source split by elasticsearch cluster metadata + * + * @param jobConf user conf + * @return es-source-split collection + */ + List getElasticsearchSplits(BitSailConfiguration jobConf); + + /** + * Estimate split num base on conf + * + * @param jobConf user conf + * @return estimate split num + */ + int estimateSplitNum(BitSailConfiguration jobConf); +} diff --git a/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/util/SplitStringUtil.java b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/util/SplitStringUtil.java new file mode 100644 index 000000000..9c6ab01b9 --- /dev/null +++ b/bitsail-connectors/connector-elasticsearch/src/main/java/com/bytedance/bitsail/connector/elasticsearch/util/SplitStringUtil.java @@ -0,0 +1,36 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.elasticsearch.util; + +import java.util.ArrayList; +import java.util.List; + +import static com.bytedance.bitsail.connector.elasticsearch.base.EsConstants.SPLIT_COMMA; + +public class SplitStringUtil { + + public static String[] splitString(String indices) { + String[] splits = indices.split(SPLIT_COMMA); + List validNames = new ArrayList<>(); + for (String name : splits) { + if (name.trim().length() > 0) { + validNames.add(name.trim()); + } + } + return validNames.toArray(new String[0]); + } +} diff --git a/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/ElasticsearchSourceITCase.java b/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/ElasticsearchSourceITCase.java new file mode 100644 index 000000000..49d8ba2bf --- /dev/null +++ b/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/ElasticsearchSourceITCase.java @@ -0,0 +1,88 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.elasticsearch; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.elasticsearch.option.ElasticsearchReaderOptions; +import com.bytedance.bitsail.connector.elasticsearch.option.ElasticsearchWriterOptions; +import com.bytedance.bitsail.connector.elasticsearch.util.SourceSetupUtil; +import com.bytedance.bitsail.test.connector.test.EmbeddedFlinkCluster; +import com.bytedance.bitsail.test.connector.test.testcontainers.elasticsearch.ElasticsearchCluster; +import com.bytedance.bitsail.test.connector.test.utils.JobConfUtils; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; + +public class ElasticsearchSourceITCase { + + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSourceITCase.class); + + private ElasticsearchCluster esCluster; + + private final String indices = "test1, test2"; + + private final int count = 10; + + private SourceSetupUtil sourceEnv; + + @Before + public void prepareEsCluster() throws Exception { + esCluster = new ElasticsearchCluster(); + esCluster.startService(); + esCluster.checkClusterHealth(); + + BitSailConfiguration jobConf = BitSailConfiguration.newDefault(); + jobConf.set(ElasticsearchWriterOptions.ES_HOSTS, + Collections.singletonList(esCluster.getHttpHostAddress())); + + this.sourceEnv = + SourceSetupUtil.builder() + .esCluster(esCluster) + .jobConf(jobConf) + .indices(Arrays.asList("test1", "test2")) + .count(count) + .build(); + + sourceEnv.start(); + } + + @Test + public void testSource() throws Exception { + BitSailConfiguration jobConf = JobConfUtils.fromClasspath("es_source_test.json"); + + jobConf.set(ElasticsearchReaderOptions.ES_INDEX, indices); + jobConf.set(ElasticsearchReaderOptions.ES_HOSTS, + Collections.singletonList(esCluster.getHttpHostAddress())); + jobConf.set(ElasticsearchReaderOptions.SCROLL_SIZE, 3); + jobConf.set(ElasticsearchReaderOptions.SCROLL_TIME, "1m"); + + EmbeddedFlinkCluster.submitJob(jobConf); + } + + @After + public void closeEsCluster() throws IOException { + this.sourceEnv.client.close(); + esCluster.close(); + } +} diff --git a/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/rest/source/EsSourceRequestITCase.java b/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/rest/source/EsSourceRequestITCase.java new file mode 100644 index 000000000..2a61c83dd --- /dev/null +++ b/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/rest/source/EsSourceRequestITCase.java @@ -0,0 +1,93 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.elasticsearch.rest.source; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.elasticsearch.option.ElasticsearchWriterOptions; +import com.bytedance.bitsail.connector.elasticsearch.util.SourceSetupUtil; +import com.bytedance.bitsail.test.connector.test.testcontainers.elasticsearch.ElasticsearchCluster; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class EsSourceRequestITCase { + + private static final Logger LOG = LoggerFactory.getLogger(EsSourceRequestITCase.class); + private SourceSetupUtil sourceEnv; + + private ElasticsearchCluster esCluster; + + private EsSourceRequest esSourceRequest; + + private final String index = "test1"; + + private final int count = 10; + + @Before + public void setup() throws IOException, InterruptedException { + esCluster = new ElasticsearchCluster(); + esCluster.startService(); + esCluster.checkClusterHealth(); + + BitSailConfiguration jobConf = BitSailConfiguration.newDefault(); + jobConf.set(ElasticsearchWriterOptions.ES_HOSTS, + Collections.singletonList(esCluster.getHttpHostAddress())); + + this.sourceEnv = + SourceSetupUtil.builder() + .esCluster(esCluster) + .jobConf(jobConf) + .indices(Collections.singletonList(index)) + .count(count) + .build(); + + sourceEnv.start(); + esSourceRequest = new EsSourceRequest(this.sourceEnv.client); + } + + @Test + public void testGetAllDocuments() throws IOException { + List> allDocuments = esSourceRequest.getAllDocuments(index, + Arrays.asList("id", "text_type", "keyword_type", "long_type", "date_type"), + 3, "30s"); + LOG.info("All documents: {}", allDocuments); + assertEquals("Get All Document through scroll api wrong.", count, allDocuments.size()); + } + + @Test + public void testValidateIndex() { + Long getCount = esSourceRequest.validateIndex(index); + assertEquals("Document count don't match.", count, getCount.intValue()); + } + + @After + public void tearDown() throws IOException { + this.sourceEnv.client.close(); + this.esCluster.close(); + } +} diff --git a/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/source/coordinator/ElasticsearchSourceSplitCoordinatorITCase.java b/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/source/coordinator/ElasticsearchSourceSplitCoordinatorITCase.java new file mode 100644 index 000000000..f48b3624f --- /dev/null +++ b/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/source/coordinator/ElasticsearchSourceSplitCoordinatorITCase.java @@ -0,0 +1,142 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.elasticsearch.source.coordinator; + +import com.bytedance.bitsail.base.connector.reader.v1.SourceEvent; +import com.bytedance.bitsail.base.connector.reader.v1.SourceSplitCoordinator; +import com.bytedance.bitsail.base.connector.writer.v1.state.EmptyState; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.elasticsearch.option.ElasticsearchReaderOptions; +import com.bytedance.bitsail.connector.elasticsearch.source.split.ElasticsearchSourceSplit; +import com.bytedance.bitsail.connector.elasticsearch.util.SourceSetupUtil; +import com.bytedance.bitsail.test.connector.test.testcontainers.elasticsearch.ElasticsearchCluster; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; + +import static org.junit.Assert.assertEquals; + +public class ElasticsearchSourceSplitCoordinatorITCase { + + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSourceSplitCoordinatorITCase.class); + + private ElasticsearchCluster esCluster; + + private SourceSetupUtil sourceEnv; + + private BitSailConfiguration jobConf; + + @Before + public void setup() throws IOException, InterruptedException { + esCluster = new ElasticsearchCluster(); + esCluster.startService(); + esCluster.checkClusterHealth(); + + jobConf = BitSailConfiguration.newDefault(); + jobConf.set(ElasticsearchReaderOptions.ES_HOSTS, + Collections.singletonList(esCluster.getHttpHostAddress())); + jobConf.set(ElasticsearchReaderOptions.ES_INDEX, "test1, test2, test3"); + jobConf.set(ElasticsearchReaderOptions.READER_PARALLELISM_NUM, 2); + jobConf.set(ElasticsearchReaderOptions.SPLIT_STRATEGY, "round_robin"); + + this.sourceEnv = + SourceSetupUtil.builder() + .esCluster(esCluster) + .jobConf(jobConf) + .indices(Arrays.asList("test1", "test2", "test3")) + .count(10) + .build(); + + sourceEnv.start(); + } + + @After + public void tearDown() throws IOException { + this.sourceEnv.client.close(); + esCluster.close(); + } + + @Test + public void testConstructSplit() { + + SourceSplitCoordinator.Context context = new SourceSplitCoordinator.Context() { + @Override + public boolean isRestored() { + return false; + } + + @Override + public EmptyState getRestoreState() { + return new EmptyState(); + } + + @Override + public int totalParallelism() { + return 2; + } + + @Override + public Set registeredReaders() { + return null; + } + + @Override + public void assignSplit(int subtaskId, List splits) { + // pass + } + + @Override + public void signalNoMoreSplits(int subtask) { + // pass + } + + @Override + public void sendEventToSourceReader(int subtaskId, SourceEvent event) { + // pass + } + + @Override + public void runAsync(Callable callable, BiConsumer handler, int initialDelay, long interval) { + // pass + } + + @Override + public void runAsyncOnce(Callable callable, BiConsumer handler) { + // pass + } + }; + ElasticsearchSourceSplitCoordinator elasticsearchSourceSplitCoordinator = new ElasticsearchSourceSplitCoordinator(context, jobConf); + elasticsearchSourceSplitCoordinator.start(); + Map> splitAssignmentPlan = elasticsearchSourceSplitCoordinator.getSplitAssignmentPlan(); + LOG.info("Get split assignment plan: {}", splitAssignmentPlan); + assertEquals("Reader parallelism wrong", 2, splitAssignmentPlan.size()); + assertEquals("Reader-1 accept splits wrong", 2, splitAssignmentPlan.get(0).size()); + assertEquals("Reader-2 accept splits wrong", 1, splitAssignmentPlan.get(1).size()); + } +} diff --git a/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/source/reader/deserializer/ElasticsearchRowDeserializerTest.java b/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/source/reader/deserializer/ElasticsearchRowDeserializerTest.java new file mode 100644 index 000000000..47ece6a73 --- /dev/null +++ b/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/source/reader/deserializer/ElasticsearchRowDeserializerTest.java @@ -0,0 +1,72 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.elasticsearch.source.reader.deserializer; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.common.row.Row; +import com.bytedance.bitsail.common.typeinfo.TypeInfo; +import com.bytedance.bitsail.common.typeinfo.TypeInfos; + +import com.google.common.collect.Maps; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Map; + +public class ElasticsearchRowDeserializerTest { + + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchRowDeserializerTest.class); + + private ElasticsearchRowDeserializer deserializer; + + @Before + public void setup() { + BitSailConfiguration jobConf = BitSailConfiguration.newDefault(); + TypeInfo[] typeInfos = new TypeInfo[] { + TypeInfos.INT_TYPE_INFO, + TypeInfos.STRING_TYPE_INFO, + TypeInfos.STRING_TYPE_INFO, + TypeInfos.LONG_TYPE_INFO, + TypeInfos.SQL_DATE_TYPE_INFO + }; + String[] fieldNames = new String[] { + "id", "text_type", "keyword_type", "long_type", "date_type" + }; + deserializer = new ElasticsearchRowDeserializer(typeInfos, fieldNames, jobConf); + } + + @Test + public void testDeserialize() throws ParseException { + Date date = new SimpleDateFormat("yyyy-MM-dd").parse("2013-01-17"); + Row testRow = new Row(new Object[]{1, "test1-text-1", "test1-keyword-1", 2023011700L, date}); + + Map map = Maps.newHashMap(); + map.put("id", 1); + map.put("text_type", "test1-text-1"); + map.put("keyword_type", "test1-keyword-1"); + map.put("long_type", 2023011700L); + map.put("date_type", "2013-01-17"); + Row row = deserializer.deserialize(map); + Assert.assertEquals(row, testRow); + } +} diff --git a/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/source/reader/selector/HashReaderSelectorTest.java b/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/source/reader/selector/HashReaderSelectorTest.java new file mode 100644 index 000000000..e5794974a --- /dev/null +++ b/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/source/reader/selector/HashReaderSelectorTest.java @@ -0,0 +1,57 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.elasticsearch.source.reader.selector; + +import com.google.common.collect.Maps; +import org.apache.commons.compress.utils.Lists; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class HashReaderSelectorTest { + + private ReaderSelector selector; + + private final int readerNumber = 3; + + private final int splitNumber = 10; + + private final String indexPrefix = "test_index_"; + + @Before + public void setup() { + selector = new HashReaderSelector(readerNumber); + } + + @Test + public void testGetReaderIndex() { + Map> map = Maps.newHashMap(); + for (int i = 0; i < splitNumber; i++) { + int idx = selector.getReaderIndex(indexPrefix + i); + List list = map.getOrDefault(idx, Lists.newArrayList()); + list.add(i); + map.put(idx, list); + } + assertEquals(3, map.get(0).size()); + assertEquals(4, map.get(1).size()); + assertEquals(3, map.get(2).size()); + } +} diff --git a/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/source/reader/selector/RoundRobinReaderSelectorTest.java b/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/source/reader/selector/RoundRobinReaderSelectorTest.java new file mode 100644 index 000000000..e1b13720b --- /dev/null +++ b/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/source/reader/selector/RoundRobinReaderSelectorTest.java @@ -0,0 +1,55 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.elasticsearch.source.reader.selector; + +import com.google.common.collect.Maps; +import org.apache.commons.compress.utils.Lists; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class RoundRobinReaderSelectorTest { + + private ReaderSelector selector; + + private final int readerNumber = 3; + + private final int splitNumber = 10; + + @Before + public void setup() { + selector = new RoundRobinReaderSelector(readerNumber); + } + + @Test + public void testGetReaderIndex() { + Map> map = Maps.newHashMap(); + for (int i = 0; i < splitNumber; i++) { + int idx = selector.getReaderIndex(""); + List list = map.getOrDefault(idx, Lists.newArrayList()); + list.add(i); + map.put(idx, list); + } + assertEquals(4, map.get(0).size()); + assertEquals(3, map.get(1).size()); + assertEquals(3, map.get(2).size()); + } +} diff --git a/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/split/ElasticsearchSplitByIndexStrategyTest.java b/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/split/ElasticsearchSplitByIndexStrategyTest.java new file mode 100644 index 000000000..dc9c6e28d --- /dev/null +++ b/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/split/ElasticsearchSplitByIndexStrategyTest.java @@ -0,0 +1,45 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.elasticsearch.split; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.elasticsearch.option.ElasticsearchReaderOptions; +import com.bytedance.bitsail.connector.elasticsearch.source.split.ElasticsearchSplitByIndexStrategy; +import com.bytedance.bitsail.connector.elasticsearch.source.split.ElasticsearchSplitStrategy; + +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class ElasticsearchSplitByIndexStrategyTest { + + private ElasticsearchSplitStrategy elasticsearchSplitStrategy; + + @Before + public void setup() { + elasticsearchSplitStrategy = new ElasticsearchSplitByIndexStrategy(); + } + + @Test + public void testEstimateSplitNum() { + BitSailConfiguration jobConf = BitSailConfiguration.newDefault(); + jobConf.set(ElasticsearchReaderOptions.ES_INDEX, "test1, test2,test3"); + int estimateSplitNum = elasticsearchSplitStrategy.estimateSplitNum(jobConf); + assertEquals("Estimate split number error.", 3, estimateSplitNum); + } +} diff --git a/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/util/SourceSetupUtil.java b/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/util/SourceSetupUtil.java new file mode 100644 index 000000000..a30b7b51c --- /dev/null +++ b/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/util/SourceSetupUtil.java @@ -0,0 +1,149 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.elasticsearch.util; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.elasticsearch.rest.EsRestClientBuilder; +import com.bytedance.bitsail.test.connector.test.testcontainers.elasticsearch.ElasticsearchCluster; + +import lombok.Builder; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.indices.CreateIndexRequest; +import org.elasticsearch.client.indices.CreateIndexResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.LocalDate; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@Builder +public class SourceSetupUtil { + + private static final Logger LOG = LoggerFactory.getLogger(SourceSetupUtil.class); + + private final String mappings = "{\n" + + " \"properties\": {\n" + + " \"id\": {\n" + + " \"type\": \"integer\"\n" + + " },\n" + + " \"text_type\": {\n" + + " \"type\": \"text\"\n" + + " },\n" + + " \"keyword_type\": {\n" + + " \"type\": \"keyword\"\n" + + " },\n" + + " \"long_type\": {\n" + + " \"type\": \"long\"\n" + + " },\n" + + " \"date_type\": {\n" + + " \"type\": \"date\"\n" + + " }\n" + + " }\n" + + "}"; + + public RestHighLevelClient client; + + public BitSailConfiguration jobConf; + + private ElasticsearchCluster esCluster; + + private int count; + + private List indices; + + public void start() throws IOException, InterruptedException { + client = new EsRestClientBuilder(jobConf).build(); + for (String index : indices) { + createIndex(index); + insertDocuments(index); + TimeUnit.SECONDS.sleep(2); + esCluster.flush(); + searchDocuments(index); + } + } + + @SuppressWarnings("checkstyle:MagicNumber") + private void createIndex(String index) throws IOException { + CreateIndexRequest request = new CreateIndexRequest(index); + request.settings(Settings.builder() + .put("index.number_of_shards", 3) + .put("index.number_of_replicas", 0) + .build()); + request.mapping(mappings, XContentType.JSON); + CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT); + assertTrue("Create index failed.", response.isShardsAcknowledged()); + LOG.info("Create {} index successfully. \n {}", response.index(), mappings); + } + + @SuppressWarnings("checkstyle:MagicNumber") + private void insertDocuments(String index) throws IOException { + Map jsonMap = new HashMap<>(); + for (int i = 0; i < count; i++) { + jsonMap.put("id", i); + jsonMap.put("text_type", appendString("text", index, i)); + jsonMap.put("keyword_type", appendString("keyword", index, i)); + jsonMap.put("long_type", 20230116L + i); + jsonMap.put("date_type", LocalDate.now()); + IndexRequest request = new IndexRequest(index).id(String.valueOf(i)).source(jsonMap); + IndexResponse response = client.index(request, RequestOptions.DEFAULT); + if (response.status() != RestStatus.CREATED) { + LOG.error("Insert into index: {}, document {} failed", index, i); + throw new RuntimeException("Insert document into index failed"); + } + } + LOG.info("Add {} documents to {} index successfully, mappings: ", count, index); + } + + private String appendString(String type, String index, int idx) { + return String.join("-", index, type, String.valueOf(idx)); + } + + private void searchDocuments(String index) throws IOException { + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(QueryBuilders.matchAllQuery()); + SearchRequest request = new SearchRequest(); + request.indices(index); + request.source(searchSourceBuilder); + SearchResponse response = client.search(request, RequestOptions.DEFAULT); + SearchHits hits = response.getHits(); + LOG.info("Totally get {}", hits.getTotalHits()); + SearchHit[] searchHits = hits.getHits(); + assertEquals(String.format("Index: %s, doc count wrong.", index), count, searchHits.length); + for (int i = 0; i < searchHits.length; i++) { + LOG.info("Index: {}, hit-{}: {}", index, i, searchHits[i].getSourceAsString()); + } + } +} diff --git a/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/util/SplitStringUtilTest.java b/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/util/SplitStringUtilTest.java new file mode 100644 index 000000000..496b37809 --- /dev/null +++ b/bitsail-connectors/connector-elasticsearch/src/test/java/com/bytedance/bitsail/connector/elasticsearch/util/SplitStringUtilTest.java @@ -0,0 +1,36 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.elasticsearch.util; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; + +public class SplitStringUtilTest { + + private static final Logger LOG = LoggerFactory.getLogger(SplitStringUtilTest.class); + + @Test + public void testSplitString() { + String[] splitNames = SplitStringUtil.splitString(" test1, test2, , test3 , test4 "); + LOG.info("split names: {}", Arrays.toString(splitNames)); + Assert.assertEquals("Index names parse error.", 4, splitNames.length); + } +} diff --git a/bitsail-connectors/connector-elasticsearch/src/test/resources/es_source_test.json b/bitsail-connectors/connector-elasticsearch/src/test/resources/es_source_test.json new file mode 100644 index 000000000..1116217f5 --- /dev/null +++ b/bitsail-connectors/connector-elasticsearch/src/test/resources/es_source_test.json @@ -0,0 +1,48 @@ +{ + "job": { + "common": { + "cid": 0, + "domain": "test", + "job_id": -24, + "job_name": "bitsail_connector_es_batch_source_test", + "instance_id": -720, + "user_name": "root" + }, + "reader": { + "class": "com.bytedance.bitsail.connector.elasticsearch.source.ElasticsearchSource", + "scroll_size": 3, + "scroll_time": "1m", + "columns": [ + { + "index": 0, + "name": "id", + "type": "integer" + }, + { + "index": 1, + "name": "text_type", + "type": "text" + }, + { + "index": 2, + "name": "keyword_type", + "type": "keyword" + }, + { + "index": 3, + "name": "long_type", + "type": "long" + }, + { + "index": 4, + "name": "date_type", + "type": "date" + } + ] + }, + "writer": { + "class": "com.bytedance.bitsail.connector.legacy.print.sink.PrintSink", + "writer_parallelism_num": 2 + } + } +}