diff --git a/taier-common/src/main/java/com/dtstack/taier/common/enums/EScheduleJobDistributeType.java b/taier-common/src/main/java/com/dtstack/taier/common/enums/EScheduleJobDistributeType.java new file mode 100644 index 0000000000..ea630961da --- /dev/null +++ b/taier-common/src/main/java/com/dtstack/taier/common/enums/EScheduleJobDistributeType.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.taier.common.enums; + +/** + * 周期实例分配策略 + * @author xingyi + */ +public enum EScheduleJobDistributeType { + + /** + * 历史默认策略,通过构建schedule_engine_job_cache 进行初始化分配,依据数据总量+节点负载 来分配 + */ + DEFAULT(0), + + /** + * 依据ScheduleJob -> taskType_cycTime 策略进行平均分配,不关注schedule_engine_job_cache + */ + TASK_TYPE_CYCTIME(1); + + private int value; + + EScheduleJobDistributeType(int value) { + this.value = value; + } + + public int getValue() { + return value; + } + + public static EScheduleJobDistributeType getDistributeType(int value) { + for (EScheduleJobDistributeType type : EScheduleJobDistributeType.values()) { + if (type.value == value) { + return type; + } + } + return EScheduleJobDistributeType.DEFAULT; + } +} diff --git a/taier-common/src/main/java/com/dtstack/taier/common/env/EnvironmentContext.java b/taier-common/src/main/java/com/dtstack/taier/common/env/EnvironmentContext.java index 70af802b7d..971f123347 100644 --- a/taier-common/src/main/java/com/dtstack/taier/common/env/EnvironmentContext.java +++ b/taier-common/src/main/java/com/dtstack/taier/common/env/EnvironmentContext.java @@ -687,6 +687,15 @@ public long getPrometheusPushGatewayInterval() { return Long.parseLong(environment.getProperty("taier.monitor.metrics.prometheus.pushgateway.interval", "60")); } + /** + * 周期实例生成策略 + * 0: 默认策略 + * 1: 按照taskType_cycTime 进行均分配策略 + */ + public int getJobGraphDistributeType() { + return Integer.parseInt(environment.getProperty("engine.job.graph.distribute.type", "0")); + } + diff --git a/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/server/builder/CycleJobBuilder.java b/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/server/builder/CycleJobBuilder.java index 598196234f..4471491ab9 100644 --- a/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/server/builder/CycleJobBuilder.java +++ b/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/server/builder/CycleJobBuilder.java @@ -19,6 +19,7 @@ package com.dtstack.taier.scheduler.server.builder; import com.dtstack.taier.common.enums.Deleted; +import com.dtstack.taier.common.enums.EScheduleJobDistributeType; import com.dtstack.taier.common.enums.EScheduleStatus; import com.dtstack.taier.common.enums.EScheduleType; import com.dtstack.taier.common.exception.TaierDefineException; @@ -26,6 +27,7 @@ import com.dtstack.taier.pluginapi.util.DateUtil; import com.dtstack.taier.pluginapi.util.RetryUtil; import com.dtstack.taier.scheduler.server.ScheduleJobDetails; +import com.dtstack.taier.scheduler.server.distribute.ScheduleJobDistributeContext; import com.dtstack.taier.scheduler.service.JobGraphTriggerService; import com.dtstack.taier.scheduler.utils.JobExecuteOrderUtil; import com.google.common.collect.Lists; @@ -98,6 +100,8 @@ public void buildTaskJobGraph(String triggerDay) { CountDownLatch ctl = new CountDownLatch(totalBatch); AtomicJobSortWorker sortWorker = new AtomicJobSortWorker(); + ScheduleJobDistributeContext distributeContext = new ScheduleJobDistributeContext(); + // 3. 查询db多线程生成周期实例 Long startId = 0L; for (int i = 0; i < totalBatch; i++) { @@ -124,7 +128,7 @@ public void buildTaskJobGraph(String triggerDay) { List scheduleJobDetails = RetryUtil.executeWithRetry(() -> buildJob(batchTaskShade, triggerDay, sortWorker), environmentContext.getBuildJobErrorRetry(), 200, false); // 插入周期实例 - savaJobList(scheduleJobDetails); + savaJobList(scheduleJobDetails, distributeContext); } catch (Throwable e) { LOGGER.error("build task failure taskId:{}", batchTaskShade.getTaskId(), e); } @@ -166,7 +170,7 @@ private void clearInterruptJob(Timestamp triggerDay) { * @param scheduleJobDetails 实例详情 */ @Transactional(rollbackFor = Exception.class) - public void savaJobList(List scheduleJobDetails) { + public void savaJobList(List scheduleJobDetails, ScheduleJobDistributeContext distributeContext) { List savaJobDetails = Lists.newArrayList(); for (ScheduleJobDetails scheduleJobDetail : scheduleJobDetails) { savaJobDetails.add(scheduleJobDetail); @@ -177,7 +181,8 @@ public void savaJobList(List scheduleJobDetails) { } } - scheduleJobService.insertJobList(savaJobDetails, getType()); + EScheduleJobDistributeType distributeType = EScheduleJobDistributeType.getDistributeType(environmentContext.getJobGraphDistributeType()); + scheduleJobService.insertJobList(savaJobDetails, EScheduleType.NORMAL_SCHEDULE.getType(), distributeType, distributeContext); } /** diff --git a/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/server/distribute/DefaultScheduleJobDistributeStrategy.java b/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/server/distribute/DefaultScheduleJobDistributeStrategy.java new file mode 100644 index 0000000000..9e598beafa --- /dev/null +++ b/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/server/distribute/DefaultScheduleJobDistributeStrategy.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.taier.scheduler.server.distribute; + +import com.dtstack.taier.common.enums.EScheduleJobDistributeType; +import com.dtstack.taier.common.exception.TaierDefineException; +import com.dtstack.taier.scheduler.server.JobPartitioner; +import com.dtstack.taier.scheduler.server.ScheduleJobDetails; +import com.google.common.collect.Maps; +import org.apache.commons.collections.MapUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * 历史默认策略,按调度节点当前负载计算每个节点应接收的实例数。 + * @author xingyi + */ +@Component +public class DefaultScheduleJobDistributeStrategy implements ScheduleJobDistributeStrategy { + + @Autowired + private JobPartitioner jobPartitioner; + + @Override + public EScheduleJobDistributeType distributeType() { + return EScheduleJobDistributeType.DEFAULT; + } + + @Override + public Map distribute(List scheduleJobDetails, + Integer scheduleType, + ScheduleJobDistributeContext distributeContext) { + Map jobNodeMap = Maps.newHashMap(); + Map nodeJobSize = jobPartitioner.computeBatchJobSize(scheduleType, scheduleJobDetails.size()); + if (MapUtils.isEmpty(nodeJobSize)) { + throw new TaierDefineException("No available node to distribute schedule jobs"); + } + + Iterator batchJobIterator = scheduleJobDetails.iterator(); + for (Map.Entry nodeJobSizeEntry : nodeJobSize.entrySet()) { + String nodeAddress = nodeJobSizeEntry.getKey(); + int nodeSize = nodeJobSizeEntry.getValue(); + while (nodeSize > 0 && batchJobIterator.hasNext()) { + jobNodeMap.put(batchJobIterator.next(), nodeAddress); + nodeSize--; + } + } + return jobNodeMap; + } +} diff --git a/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/server/distribute/ScheduleJobDistributeContext.java b/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/server/distribute/ScheduleJobDistributeContext.java new file mode 100644 index 0000000000..647d4a4204 --- /dev/null +++ b/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/server/distribute/ScheduleJobDistributeContext.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.taier.scheduler.server.distribute; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * 周期实例分配上下文,用于在分批生成实例时共享策略状态。 + * @author xingyi + */ +public class ScheduleJobDistributeContext { + + private final Map globalNodeLoadCache = new ConcurrentHashMap<>(); + + private final Map> globalGroupNodeLoadCache = new ConcurrentHashMap<>(); + + public Map getGlobalNodeLoadCache() { + return globalNodeLoadCache; + } + + public Map> getGlobalGroupNodeLoadCache() { + return globalGroupNodeLoadCache; + } +} diff --git a/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/server/distribute/ScheduleJobDistributeStrategy.java b/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/server/distribute/ScheduleJobDistributeStrategy.java new file mode 100644 index 0000000000..b105d4844b --- /dev/null +++ b/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/server/distribute/ScheduleJobDistributeStrategy.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.taier.scheduler.server.distribute; + +import com.dtstack.taier.common.enums.EScheduleJobDistributeType; +import com.dtstack.taier.scheduler.server.ScheduleJobDetails; + +import java.util.List; +import java.util.Map; + +/** + * 周期实例节点分配策略。 + * @author xingyi + */ +public interface ScheduleJobDistributeStrategy { + + EScheduleJobDistributeType distributeType(); + + Map distribute(List scheduleJobDetails, + Integer scheduleType, + ScheduleJobDistributeContext distributeContext); +} diff --git a/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/server/distribute/TaskTypeCycTimeScheduleJobDistributeStrategy.java b/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/server/distribute/TaskTypeCycTimeScheduleJobDistributeStrategy.java new file mode 100644 index 0000000000..ff8ac01580 --- /dev/null +++ b/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/server/distribute/TaskTypeCycTimeScheduleJobDistributeStrategy.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.taier.scheduler.server.distribute; + +import com.dtstack.taier.common.enums.EScheduleJobDistributeType; +import com.dtstack.taier.common.exception.TaierDefineException; +import com.dtstack.taier.dao.domain.ScheduleJob; +import com.dtstack.taier.scheduler.server.ScheduleJobDetails; +import com.dtstack.taier.scheduler.zookeeper.ZkService; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * 按 taskType + cycTime 分组的周期实例分配策略。 + * @author xingyi + */ +@Component +public class TaskTypeCycTimeScheduleJobDistributeStrategy implements ScheduleJobDistributeStrategy { + + @Autowired + private ZkService zkService; + + private final Object nodeLoadAssignLock = new Object(); + + @Override + public EScheduleJobDistributeType distributeType() { + return EScheduleJobDistributeType.TASK_TYPE_CYCTIME; + } + + @Override + public Map distribute(List batchJobs, + Integer scheduleType, + ScheduleJobDistributeContext distributeContext) { + Map jobNodeMap = Maps.newHashMap(); + if (CollectionUtils.isEmpty(batchJobs)) { + return jobNodeMap; + } + + List aliveNodes = zkService.getAliveBrokersChildren(); + if (CollectionUtils.isEmpty(aliveNodes)) { + throw new TaierDefineException("No available node to distribute schedule jobs"); + } + + synchronized (nodeLoadAssignLock) { + Map globalNodeLoadCache = distributeContext.getGlobalNodeLoadCache(); + Map> globalGroupNodeLoadCache = distributeContext.getGlobalGroupNodeLoadCache(); + cleanOfflineNodes(aliveNodes, globalNodeLoadCache, globalGroupNodeLoadCache); + + Map nodeLoad = Maps.newHashMapWithExpectedSize(aliveNodes.size()); + for (String aliveNode : aliveNodes) { + nodeLoad.put(aliveNode, globalNodeLoadCache.getOrDefault(aliveNode, 0L)); + } + + Map> groupedJobs = groupJobs(batchJobs); + if (MapUtils.isEmpty(groupedJobs)) { + return jobNodeMap; + } + + Map> groupLoadSnapshot = buildGroupLoadSnapshot(aliveNodes, groupedJobs, globalGroupNodeLoadCache); + distributeGroupedJobs(aliveNodes, nodeLoad, groupLoadSnapshot, groupedJobs, jobNodeMap); + writeBackLoadSnapshot(aliveNodes, nodeLoad, groupLoadSnapshot, globalNodeLoadCache, globalGroupNodeLoadCache); + return jobNodeMap; + } + } + + private void cleanOfflineNodes(List aliveNodes, + Map globalNodeLoadCache, + Map> globalGroupNodeLoadCache) { + globalNodeLoadCache.keySet().removeIf(node -> !aliveNodes.contains(node)); + for (String aliveNode : aliveNodes) { + globalNodeLoadCache.putIfAbsent(aliveNode, 0L); + } + globalGroupNodeLoadCache.values().forEach(groupLoad -> { + groupLoad.keySet().removeIf(node -> !aliveNodes.contains(node)); + for (String aliveNode : aliveNodes) { + groupLoad.putIfAbsent(aliveNode, 0L); + } + }); + } + + private Map> groupJobs(List batchJobs) { + Map> groupedJobs = Maps.newLinkedHashMap(); + for (ScheduleJobDetails batchJob : batchJobs) { + String groupKey = buildTaskTypeCycTimeGroupKey(batchJob.getScheduleJob()); + groupedJobs.computeIfAbsent(groupKey, key -> Lists.newArrayList()).add(batchJob); + } + return groupedJobs; + } + + private Map> buildGroupLoadSnapshot(List aliveNodes, + Map> groupedJobs, + Map> globalGroupNodeLoadCache) { + Map> groupLoadSnapshot = Maps.newHashMapWithExpectedSize(groupedJobs.size()); + for (String groupKey : groupedJobs.keySet()) { + Map cacheGroupLoad = globalGroupNodeLoadCache.computeIfAbsent(groupKey, k -> new ConcurrentHashMap<>()); + for (String aliveNode : aliveNodes) { + cacheGroupLoad.putIfAbsent(aliveNode, 0L); + } + Map localGroupLoad = Maps.newHashMapWithExpectedSize(aliveNodes.size()); + for (String aliveNode : aliveNodes) { + localGroupLoad.put(aliveNode, cacheGroupLoad.getOrDefault(aliveNode, 0L)); + } + groupLoadSnapshot.put(groupKey, localGroupLoad); + } + return groupLoadSnapshot; + } + + private void distributeGroupedJobs(List aliveNodes, + Map nodeLoad, + Map> groupLoadSnapshot, + Map> groupedJobs, + Map jobNodeMap) { + List>> sortedGroups = groupedJobs.entrySet().stream() + .sorted((left, right) -> { + int compare = Integer.compare(right.getValue().size(), left.getValue().size()); + if (compare != 0) { + return compare; + } + return left.getKey().compareTo(right.getKey()); + }) + .collect(Collectors.toList()); + + List pendingSmallGroupJobs = Lists.newArrayList(); + for (Map.Entry> groupedJobsEntry : sortedGroups) { + String groupKey = groupedJobsEntry.getKey(); + List groupJobs = groupedJobsEntry.getValue(); + if (groupJobs.size() >= aliveNodes.size()) { + if (CollectionUtils.isNotEmpty(pendingSmallGroupJobs)) { + distributeSmallGroupJobs(pendingSmallGroupJobs, aliveNodes, nodeLoad, groupLoadSnapshot, jobNodeMap); + pendingSmallGroupJobs.clear(); + } + distributeLargeGroupJobs(groupKey, groupJobs, aliveNodes, nodeLoad, groupLoadSnapshot.get(groupKey), jobNodeMap); + } else { + pendingSmallGroupJobs.addAll(groupJobs); + if (pendingSmallGroupJobs.size() >= aliveNodes.size()) { + distributeSmallGroupJobs(pendingSmallGroupJobs, aliveNodes, nodeLoad, groupLoadSnapshot, jobNodeMap); + pendingSmallGroupJobs.clear(); + } + } + } + + if (CollectionUtils.isNotEmpty(pendingSmallGroupJobs)) { + distributeSmallGroupJobs(pendingSmallGroupJobs, aliveNodes, nodeLoad, groupLoadSnapshot, jobNodeMap); + } + } + + private void writeBackLoadSnapshot(List aliveNodes, + Map nodeLoad, + Map> groupLoadSnapshot, + Map globalNodeLoadCache, + Map> globalGroupNodeLoadCache) { + for (String aliveNode : aliveNodes) { + globalNodeLoadCache.put(aliveNode, nodeLoad.getOrDefault(aliveNode, 0L)); + } + for (Map.Entry> groupLoadEntry : groupLoadSnapshot.entrySet()) { + globalGroupNodeLoadCache.put(groupLoadEntry.getKey(), new ConcurrentHashMap<>(groupLoadEntry.getValue())); + } + } + + private String buildTaskTypeCycTimeGroupKey(ScheduleJob scheduleJob) { + Integer taskType = scheduleJob.getTaskType() == null ? -1 : scheduleJob.getTaskType(); + String cycTime = StringUtils.defaultString(scheduleJob.getCycTime()); + String cycMinute = cycTime.length() > 12 ? cycTime.substring(0, 12) : cycTime; + return taskType + "_" + cycMinute; + } + + private void distributeSmallGroupJobs(List jobs, + List aliveNodes, + Map nodeLoad, + Map> groupLoadSnapshot, + Map jobNodeMap) { + for (ScheduleJobDetails scheduleBatchJob : jobs) { + String groupKey = buildTaskTypeCycTimeGroupKey(scheduleBatchJob.getScheduleJob()); + Map groupLoad = groupLoadSnapshot.computeIfAbsent(groupKey, key -> { + Map initLoad = Maps.newHashMapWithExpectedSize(aliveNodes.size()); + for (String node : aliveNodes) { + initLoad.put(node, 0L); + } + return initLoad; + }); + String nodeAddress = findMinLoadNodeForGroup(aliveNodes, groupLoad, nodeLoad); + jobNodeMap.put(scheduleBatchJob, nodeAddress); + nodeLoad.put(nodeAddress, nodeLoad.get(nodeAddress) + 1L); + groupLoad.put(nodeAddress, groupLoad.get(nodeAddress) + 1L); + } + } + + private void distributeLargeGroupJobs(String groupKey, + List groupJobs, + List aliveNodes, + Map nodeLoad, + Map groupLoad, + Map jobNodeMap) { + for (ScheduleJobDetails scheduleBatchJob : groupJobs) { + String nodeAddress = findMinLoadNodeForGroup(aliveNodes, groupLoad, nodeLoad); + jobNodeMap.put(scheduleBatchJob, nodeAddress); + nodeLoad.put(nodeAddress, nodeLoad.get(nodeAddress) + 1L); + groupLoad.put(nodeAddress, groupLoad.get(nodeAddress) + 1L); + } + } + + private String findMinLoadNodeForGroup(List aliveNodes, Map groupLoad, Map nodeLoad) { + String minLoadNode = aliveNodes.get(0); + long minGroupLoad = groupLoad.get(minLoadNode); + long minGlobalLoad = nodeLoad.get(minLoadNode); + for (int i = 1; i < aliveNodes.size(); i++) { + String currentNode = aliveNodes.get(i); + long currentGroupLoad = groupLoad.get(currentNode); + long currentGlobalLoad = nodeLoad.get(currentNode); + if (currentGroupLoad < minGroupLoad + || (currentGroupLoad == minGroupLoad && currentGlobalLoad < minGlobalLoad) + || (currentGroupLoad == minGroupLoad && currentGlobalLoad == minGlobalLoad && currentNode.compareTo(minLoadNode) < 0)) { + minLoadNode = currentNode; + minGroupLoad = currentGroupLoad; + minGlobalLoad = currentGlobalLoad; + } + } + return minLoadNode; + } +} diff --git a/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/service/ScheduleJobService.java b/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/service/ScheduleJobService.java index 14eff52b11..154b84b264 100644 --- a/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/service/ScheduleJobService.java +++ b/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/service/ScheduleJobService.java @@ -23,6 +23,7 @@ import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.dtstack.taier.common.enums.Deleted; +import com.dtstack.taier.common.enums.EScheduleJobDistributeType; import com.dtstack.taier.common.enums.EScheduleType; import com.dtstack.taier.common.enums.ForceCancelFlag; import com.dtstack.taier.common.enums.OperatorType; @@ -43,10 +44,10 @@ import com.dtstack.taier.scheduler.enums.JobPhaseStatus; import com.dtstack.taier.scheduler.impl.pojo.ParamActionExt; import com.dtstack.taier.scheduler.mapstruct.ScheduleJobMapStruct; -import com.dtstack.taier.scheduler.server.JobPartitioner; import com.dtstack.taier.scheduler.server.ScheduleJobDetails; +import com.dtstack.taier.scheduler.server.distribute.ScheduleJobDistributeContext; +import com.dtstack.taier.scheduler.server.distribute.ScheduleJobDistributeStrategy; import com.dtstack.taier.scheduler.server.pipeline.operator.UnnecessaryPreprocessJobPipeline; -import com.dtstack.taier.scheduler.zookeeper.ZkService; import com.google.common.collect.Lists; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; @@ -62,9 +63,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; -import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -81,15 +80,9 @@ public class ScheduleJobService extends ServiceImpl distributeStrategies; + /** * 开始运行实例 * @@ -193,15 +189,23 @@ public void restartScheduleJob(Map resumeBatchJobs) { */ @Transactional(rollbackFor = Exception.class) public Long insertJobList(Collection jobBuilderBeanCollection, Integer scheduleType) { + return insertJobList(jobBuilderBeanCollection, scheduleType, EScheduleJobDistributeType.DEFAULT, new ScheduleJobDistributeContext()); + } + + @Transactional(rollbackFor = Exception.class) + public Long insertJobList(Collection jobBuilderBeanCollection, + Integer scheduleType, + EScheduleJobDistributeType distributeType, + ScheduleJobDistributeContext distributeContext) { if (CollectionUtils.isEmpty(jobBuilderBeanCollection)) { return null; } - Iterator batchJobIterator = jobBuilderBeanCollection.iterator(); + List batchJobs = Lists.newArrayList(jobBuilderBeanCollection); + ScheduleJobDistributeStrategy distributeStrategy = getDistributeStrategy(distributeType); + ScheduleJobDistributeContext context = distributeContext == null ? new ScheduleJobDistributeContext() : distributeContext; + Map jobNodeMap = distributeStrategy.distribute(batchJobs, scheduleType, context); - //count%20 为一批 - //1: 批量插入BatchJob - //2: 批量插入BatchJobJobList int count = 0; int jobBatchSize = environmentContext.getBatchInsertSize(); int jobJobBatchSize = environmentContext.getBatchJobJobInsertSize(); @@ -209,55 +213,46 @@ public Long insertJobList(Collection jobBuilderBeanCollectio List jobWaitForSave = Lists.newArrayList(); List jobJobWaitForSave = Lists.newArrayList(); - Map nodeJobSize = computeJobSizeForNode(jobBuilderBeanCollection.size(), scheduleType); - for (Map.Entry nodeJobSizeEntry : nodeJobSize.entrySet()) { - String nodeAddress = nodeJobSizeEntry.getKey(); - int nodeSize = nodeJobSizeEntry.getValue(); - final int finalBatchNodeSize = nodeSize; - while (nodeSize > 0 && batchJobIterator.hasNext()) { - nodeSize--; - count++; - - ScheduleJobDetails jobBuilderBean = batchJobIterator.next(); - - ScheduleJob scheduleJob = jobBuilderBean.getScheduleJob(); - scheduleJob.setNodeAddress(nodeAddress); + for (ScheduleJobDetails scheduleJobDetails : batchJobs) { + count++; + ScheduleJob scheduleJob = scheduleJobDetails.getScheduleJob(); + String nodeAddress = jobNodeMap.get(scheduleJobDetails); + if (StringUtils.isBlank(nodeAddress)) { + throw new TaierDefineException(String.format("No node address assigned for job: %s", scheduleJob.getJobId())); + } + scheduleJob.setNodeAddress(nodeAddress); - jobWaitForSave.add(scheduleJob); - jobJobWaitForSave.addAll(jobBuilderBean.getJobJobList()); + jobWaitForSave.add(scheduleJob); + jobJobWaitForSave.addAll(scheduleJobDetails.getJobJobList()); - LOGGER.debug("insertJobList count:{} batchJobs:{} finalBatchNodeSize:{}", count, jobBuilderBeanCollection.size(), finalBatchNodeSize); - if (count % jobBatchSize == 0 || count == (jobBuilderBeanCollection.size() - 1) || jobJobWaitForSave.size() > jobJobBatchSize) { - minJobId = persistJobs(jobWaitForSave, jobJobWaitForSave, minJobId, jobJobBatchSize); - LOGGER.info("insertJobList nodeAddress: {} count:{} batchJobs:{} finalBatchNodeSize:{} jobJobSize:{}", nodeAddress, count, jobBuilderBeanCollection.size(), finalBatchNodeSize, jobJobWaitForSave.size()); - } + LOGGER.debug("insertJobList count:{} batchJobs:{} nodeAddress:{} strategy:{}", + count, batchJobs.size(), nodeAddress, distributeStrategy.distributeType()); + if (count % jobBatchSize == 0 || count == batchJobs.size() || jobJobWaitForSave.size() > jobJobBatchSize) { + minJobId = persistJobs(jobWaitForSave, jobJobWaitForSave, minJobId, jobJobBatchSize); + LOGGER.info("insertJobList count:{} batchJobs:{} jobJobSize:{} strategy:{}", + count, batchJobs.size(), jobJobWaitForSave.size(), distributeStrategy.distributeType()); } - LOGGER.info("insertJobList nodeAddress: {} count:{} batchJobs:{} finalBatchNodeSize:{}", nodeAddress, count, jobBuilderBeanCollection.size(), finalBatchNodeSize); - //结束前persist一次,flush所有jobs + } + if (CollectionUtils.isNotEmpty(jobWaitForSave) || CollectionUtils.isNotEmpty(jobJobWaitForSave)) { minJobId = persistJobs(jobWaitForSave, jobJobWaitForSave, minJobId, jobJobBatchSize); - } return minJobId; } - /** - * 获得调度各个节点的ip - * - * @param jobSize 实例数 - * @param scheduleType 调度类型 正常调度 和 补数据 - */ - private Map computeJobSizeForNode(int jobSize, int scheduleType) { - Map jobSizeInfo = jobPartitioner.computeBatchJobSize(scheduleType, jobSize); - if (jobSizeInfo == null) { - //if empty - List aliveNodes = zkService.getAliveBrokersChildren(); - jobSizeInfo = new HashMap<>(aliveNodes.size()); - int size = jobSize / aliveNodes.size() + 1; - for (String aliveNode : aliveNodes) { - jobSizeInfo.put(aliveNode, size); + private ScheduleJobDistributeStrategy getDistributeStrategy(EScheduleJobDistributeType distributeType) { + EScheduleJobDistributeType targetType = distributeType == null ? EScheduleJobDistributeType.DEFAULT : distributeType; + for (ScheduleJobDistributeStrategy distributeStrategy : distributeStrategies) { + if (distributeStrategy.distributeType() == targetType) { + return distributeStrategy; + } + } + for (ScheduleJobDistributeStrategy distributeStrategy : distributeStrategies) { + if (distributeStrategy.distributeType() == EScheduleJobDistributeType.DEFAULT) { + LOGGER.warn("Distribute strategy {} not found, fallback to DEFAULT", targetType); + return distributeStrategy; } } - return jobSizeInfo; + throw new TaierDefineException(String.format("Distribute strategy %s not found", targetType)); } /** diff --git a/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/zookeeper/ZkService.java b/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/zookeeper/ZkService.java index f6e77e747d..eb504abefd 100644 --- a/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/zookeeper/ZkService.java +++ b/taier-scheduler/src/main/java/com/dtstack/taier/scheduler/zookeeper/ZkService.java @@ -282,6 +282,10 @@ public List getAliveBrokersChildren() { } catch (Exception e) { LOGGER.error("getBrokersChildren error:", e); } + // when zk down alive is empty,at least contains self node + if (alives.isEmpty()) { + alives.add(localAddress); + } return alives; } diff --git a/taier-scheduler/src/test/java/com/dtstack/taier/scheduler/service/ScheduleJobServiceTest.java b/taier-scheduler/src/test/java/com/dtstack/taier/scheduler/service/ScheduleJobServiceTest.java new file mode 100644 index 0000000000..448087ca5f --- /dev/null +++ b/taier-scheduler/src/test/java/com/dtstack/taier/scheduler/service/ScheduleJobServiceTest.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.taier.scheduler.service; + +import com.dtstack.taier.common.enums.EScheduleJobDistributeType; +import com.dtstack.taier.common.enums.EScheduleType; +import com.dtstack.taier.common.env.EnvironmentContext; +import com.dtstack.taier.common.exception.TaierDefineException; +import com.dtstack.taier.dao.domain.ScheduleJob; +import com.dtstack.taier.dao.domain.ScheduleJobJob; +import com.dtstack.taier.scheduler.server.ScheduleJobDetails; +import com.dtstack.taier.scheduler.server.distribute.ScheduleJobDistributeContext; +import com.dtstack.taier.scheduler.server.distribute.ScheduleJobDistributeStrategy; +import com.dtstack.taier.scheduler.server.distribute.TaskTypeCycTimeScheduleJobDistributeStrategy; +import com.dtstack.taier.scheduler.zookeeper.ZkService; +import com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.springframework.test.util.ReflectionTestUtils; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class ScheduleJobServiceTest { + + private ScheduleJobService scheduleJobService; + + private ScheduleJobJobService scheduleJobJobService; + + private ScheduleJobExpandService scheduleJobExpandService; + + private StubDistributeStrategy defaultStrategy; + + private StubDistributeStrategy taskTypeCycTimeStrategy; + + @Before + public void setUp() { + scheduleJobService = spy(new ScheduleJobService()); + scheduleJobJobService = mock(ScheduleJobJobService.class); + scheduleJobExpandService = mock(ScheduleJobExpandService.class); + EnvironmentContext environmentContext = mock(EnvironmentContext.class); + + when(environmentContext.getBatchInsertSize()).thenReturn(10); + when(environmentContext.getBatchJobJobInsertSize()).thenReturn(10); + when(environmentContext.getBuildJobErrorRetry()).thenReturn(1); + doReturn(true).when(scheduleJobService).saveBatch(any(Collection.class)); + when(scheduleJobJobService.saveBatch(any(Collection.class))).thenReturn(true); + when(scheduleJobExpandService.saveBatch(any(Collection.class))).thenReturn(true); + + defaultStrategy = new StubDistributeStrategy(EScheduleJobDistributeType.DEFAULT, "default-node"); + taskTypeCycTimeStrategy = new StubDistributeStrategy(EScheduleJobDistributeType.TASK_TYPE_CYCTIME, "cyc-node"); + + ReflectionTestUtils.setField(scheduleJobService, "environmentContext", environmentContext); + ReflectionTestUtils.setField(scheduleJobService, "scheduleJobJobService", scheduleJobJobService); + ReflectionTestUtils.setField(scheduleJobService, "scheduleJobExpandService", scheduleJobExpandService); + ReflectionTestUtils.setField(scheduleJobService, "distributeStrategies", Lists.newArrayList(defaultStrategy, taskTypeCycTimeStrategy)); + } + + @Test + public void testInsertJobListUseTaskTypeCycTimeStrategy() { + ScheduleJobDetails jobDetails = buildJobDetails("job-1"); + ScheduleJobDistributeContext distributeContext = new ScheduleJobDistributeContext(); + + Long minJobId = scheduleJobService.insertJobList(Lists.newArrayList(jobDetails), + EScheduleType.NORMAL_SCHEDULE.getType(), + EScheduleJobDistributeType.TASK_TYPE_CYCTIME, + distributeContext); + + Assert.assertEquals("cyc-node", jobDetails.getScheduleJob().getNodeAddress()); + Assert.assertEquals(Long.valueOf(1L), minJobId); + Assert.assertEquals(0, defaultStrategy.getInvokeCount()); + Assert.assertEquals(1, taskTypeCycTimeStrategy.getInvokeCount()); + Assert.assertSame(distributeContext, taskTypeCycTimeStrategy.getLastContext()); + verify(scheduleJobService, times(1)).saveBatch(any(Collection.class)); + verify(scheduleJobJobService, times(1)).saveBatch(any(Collection.class)); + verify(scheduleJobExpandService, times(1)).saveBatch(any(Collection.class)); + } + + @Test + public void testInsertJobListUseDefaultStrategyByCompatibleMethod() { + ScheduleJobDetails jobDetails = buildJobDetails("job-2"); + + scheduleJobService.insertJobList(Lists.newArrayList(jobDetails), EScheduleType.FILL_DATA.getType()); + + Assert.assertEquals("default-node", jobDetails.getScheduleJob().getNodeAddress()); + Assert.assertEquals(1, defaultStrategy.getInvokeCount()); + Assert.assertEquals(0, taskTypeCycTimeStrategy.getInvokeCount()); + } + + @Test + public void testInsertJobListFallbackToDefaultWhenStrategyNotFound() { + ReflectionTestUtils.setField(scheduleJobService, "distributeStrategies", Lists.newArrayList(defaultStrategy)); + ScheduleJobDetails jobDetails = buildJobDetails("job-3"); + + scheduleJobService.insertJobList(Lists.newArrayList(jobDetails), + EScheduleType.NORMAL_SCHEDULE.getType(), + EScheduleJobDistributeType.TASK_TYPE_CYCTIME, + new ScheduleJobDistributeContext()); + + Assert.assertEquals("default-node", jobDetails.getScheduleJob().getNodeAddress()); + Assert.assertEquals(1, defaultStrategy.getInvokeCount()); + } + + @Test(expected = TaierDefineException.class) + public void testInsertJobListThrowWhenStrategyDoesNotAssignNode() { + ReflectionTestUtils.setField(scheduleJobService, "distributeStrategies", + Lists.newArrayList(new StubDistributeStrategy(EScheduleJobDistributeType.TASK_TYPE_CYCTIME, null))); + + scheduleJobService.insertJobList(Lists.newArrayList(buildJobDetails("job-4")), + EScheduleType.NORMAL_SCHEDULE.getType(), + EScheduleJobDistributeType.TASK_TYPE_CYCTIME, + new ScheduleJobDistributeContext()); + } + + @Test + public void testInsertJobListKeepTaskTypeCycTimeBalancedAcrossMultipleCalls() { + TaskTypeCycTimeScheduleJobDistributeStrategy realStrategy = new TaskTypeCycTimeScheduleJobDistributeStrategy(); + ZkService zkService = mock(ZkService.class); + List aliveNodes = Lists.newArrayList("nodeAddress1", "nodeAddress2", "nodeAddress3", "nodeAddress4"); + when(zkService.getAliveBrokersChildren()).thenReturn(aliveNodes); + ReflectionTestUtils.setField(realStrategy, "zkService", zkService); + ReflectionTestUtils.setField(scheduleJobService, "distributeStrategies", Lists.newArrayList(defaultStrategy, realStrategy)); + + ScheduleJobDistributeContext distributeContext = new ScheduleJobDistributeContext(); + List allJobs = new ArrayList<>(); + long jobId = 1L; + for (int i = 0; i < 3; i++) { + List batchJobs = Lists.newArrayList(); + for (int j = 0; j < 5; j++) { + batchJobs.add(buildJobDetails("job-group-a-" + i + "-" + j, jobId++, 1, "20260617000000")); + } + for (int j = 0; j < 3; j++) { + batchJobs.add(buildJobDetails("job-group-b-" + i + "-" + j, jobId++, 2, "20260617010000")); + } + + scheduleJobService.insertJobList(batchJobs, + EScheduleType.NORMAL_SCHEDULE.getType(), + EScheduleJobDistributeType.TASK_TYPE_CYCTIME, + distributeContext); + allJobs.addAll(batchJobs); + } + + assertGroupNodeCountBalanced(allJobs, aliveNodes, 1, "202606170000"); + assertGroupNodeCountBalanced(allJobs, aliveNodes, 2, "202606170100"); + Assert.assertEquals(0, defaultStrategy.getInvokeCount()); + } + + private static ScheduleJobDetails buildJobDetails(String jobId) { + return buildJobDetails(jobId, 1L, 1, "20260617000000"); + } + + private static ScheduleJobDetails buildJobDetails(String jobId, Long id, Integer taskType, String cycTime) { + ScheduleJob scheduleJob = new ScheduleJob(); + scheduleJob.setId(id); + scheduleJob.setJobId(jobId); + scheduleJob.setJobName(jobId); + scheduleJob.setCycTime(cycTime); + scheduleJob.setTaskType(taskType); + + ScheduleJobJob scheduleJobJob = new ScheduleJobJob(); + scheduleJobJob.setJobKey(jobId); + + ScheduleJobDetails scheduleJobDetails = new ScheduleJobDetails(); + scheduleJobDetails.setScheduleJob(scheduleJob); + scheduleJobDetails.setJobJobList(Lists.newArrayList(scheduleJobJob)); + return scheduleJobDetails; + } + + private static void assertGroupNodeCountBalanced(List allJobs, + List aliveNodes, + Integer taskType, + String cycMinute) { + Map nodeJobCount = new HashMap<>(); + for (String aliveNode : aliveNodes) { + nodeJobCount.put(aliveNode, 0); + } + + for (ScheduleJobDetails scheduleJobDetails : allJobs) { + ScheduleJob scheduleJob = scheduleJobDetails.getScheduleJob(); + if (taskType.equals(scheduleJob.getTaskType()) && scheduleJob.getCycTime().startsWith(cycMinute)) { + String nodeAddress = scheduleJob.getNodeAddress(); + Assert.assertTrue("unexpected node address: " + nodeAddress, nodeJobCount.containsKey(nodeAddress)); + nodeJobCount.put(nodeAddress, nodeJobCount.get(nodeAddress) + 1); + } + } + + int minCount = Integer.MAX_VALUE; + int maxCount = Integer.MIN_VALUE; + for (Integer count : nodeJobCount.values()) { + minCount = Math.min(minCount, count); + maxCount = Math.max(maxCount, count); + } + Assert.assertTrue("node count should be balanced within 1, actual: " + nodeJobCount, maxCount - minCount <= 1); + } + + private static class StubDistributeStrategy implements ScheduleJobDistributeStrategy { + + private final EScheduleJobDistributeType distributeType; + + private final String nodeAddress; + + private int invokeCount; + + private ScheduleJobDistributeContext lastContext; + + private StubDistributeStrategy(EScheduleJobDistributeType distributeType, String nodeAddress) { + this.distributeType = distributeType; + this.nodeAddress = nodeAddress; + } + + @Override + public EScheduleJobDistributeType distributeType() { + return distributeType; + } + + @Override + public Map distribute(List scheduleJobDetails, + Integer scheduleType, + ScheduleJobDistributeContext distributeContext) { + invokeCount++; + lastContext = distributeContext; + Map jobNodeMap = new IdentityHashMap<>(); + for (ScheduleJobDetails scheduleJobDetail : scheduleJobDetails) { + jobNodeMap.put(scheduleJobDetail, nodeAddress); + } + return jobNodeMap; + } + + private int getInvokeCount() { + return invokeCount; + } + + private ScheduleJobDistributeContext getLastContext() { + return lastContext; + } + } +}