Skip to content

Commit 7a347be

Browse files
authored
[feat_1193][taier-jobGraph] Expand the new scheduleJobs T+1 allocation strategy algorithm #1193
1 parent aa10a54 commit 7a347be

10 files changed

Lines changed: 785 additions & 56 deletions

File tree

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.taier.common.enums;
20+
21+
/**
22+
* 周期实例分配策略
23+
* @author xingyi
24+
*/
25+
public enum EScheduleJobDistributeType {
26+
27+
/**
28+
* 历史默认策略,通过构建schedule_engine_job_cache 进行初始化分配,依据数据总量+节点负载 来分配
29+
*/
30+
DEFAULT(0),
31+
32+
/**
33+
* 依据ScheduleJob -> taskType_cycTime 策略进行平均分配,不关注schedule_engine_job_cache
34+
*/
35+
TASK_TYPE_CYCTIME(1);
36+
37+
private int value;
38+
39+
EScheduleJobDistributeType(int value) {
40+
this.value = value;
41+
}
42+
43+
public int getValue() {
44+
return value;
45+
}
46+
47+
public static EScheduleJobDistributeType getDistributeType(int value) {
48+
for (EScheduleJobDistributeType type : EScheduleJobDistributeType.values()) {
49+
if (type.value == value) {
50+
return type;
51+
}
52+
}
53+
return EScheduleJobDistributeType.DEFAULT;
54+
}
55+
}

taier-common/src/main/java/com/dtstack/taier/common/env/EnvironmentContext.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -687,6 +687,15 @@ public long getPrometheusPushGatewayInterval() {
687687
return Long.parseLong(environment.getProperty("taier.monitor.metrics.prometheus.pushgateway.interval", "60"));
688688
}
689689

690+
/**
691+
* 周期实例生成策略
692+
* 0: 默认策略
693+
* 1: 按照taskType_cycTime 进行均分配策略
694+
*/
695+
public int getJobGraphDistributeType() {
696+
return Integer.parseInt(environment.getProperty("engine.job.graph.distribute.type", "0"));
697+
}
698+
690699

691700

692701

taier-scheduler/src/main/java/com/dtstack/taier/scheduler/server/builder/CycleJobBuilder.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@
1919
package com.dtstack.taier.scheduler.server.builder;
2020

2121
import com.dtstack.taier.common.enums.Deleted;
22+
import com.dtstack.taier.common.enums.EScheduleJobDistributeType;
2223
import com.dtstack.taier.common.enums.EScheduleStatus;
2324
import com.dtstack.taier.common.enums.EScheduleType;
2425
import com.dtstack.taier.common.exception.TaierDefineException;
2526
import com.dtstack.taier.dao.domain.ScheduleTaskShade;
2627
import com.dtstack.taier.pluginapi.util.DateUtil;
2728
import com.dtstack.taier.pluginapi.util.RetryUtil;
2829
import com.dtstack.taier.scheduler.server.ScheduleJobDetails;
30+
import com.dtstack.taier.scheduler.server.distribute.ScheduleJobDistributeContext;
2931
import com.dtstack.taier.scheduler.service.JobGraphTriggerService;
3032
import com.dtstack.taier.scheduler.utils.JobExecuteOrderUtil;
3133
import com.google.common.collect.Lists;
@@ -98,6 +100,8 @@ public void buildTaskJobGraph(String triggerDay) {
98100
CountDownLatch ctl = new CountDownLatch(totalBatch);
99101
AtomicJobSortWorker sortWorker = new AtomicJobSortWorker();
100102

103+
ScheduleJobDistributeContext distributeContext = new ScheduleJobDistributeContext();
104+
101105
// 3. 查询db多线程生成周期实例
102106
Long startId = 0L;
103107
for (int i = 0; i < totalBatch; i++) {
@@ -124,7 +128,7 @@ public void buildTaskJobGraph(String triggerDay) {
124128
List<ScheduleJobDetails> scheduleJobDetails = RetryUtil.executeWithRetry(() -> buildJob(batchTaskShade, triggerDay, sortWorker),
125129
environmentContext.getBuildJobErrorRetry(), 200, false);
126130
// 插入周期实例
127-
savaJobList(scheduleJobDetails);
131+
savaJobList(scheduleJobDetails, distributeContext);
128132
} catch (Throwable e) {
129133
LOGGER.error("build task failure taskId:{}", batchTaskShade.getTaskId(), e);
130134
}
@@ -166,7 +170,7 @@ private void clearInterruptJob(Timestamp triggerDay) {
166170
* @param scheduleJobDetails 实例详情
167171
*/
168172
@Transactional(rollbackFor = Exception.class)
169-
public void savaJobList(List<ScheduleJobDetails> scheduleJobDetails) {
173+
public void savaJobList(List<ScheduleJobDetails> scheduleJobDetails, ScheduleJobDistributeContext distributeContext) {
170174
List<ScheduleJobDetails> savaJobDetails = Lists.newArrayList();
171175
for (ScheduleJobDetails scheduleJobDetail : scheduleJobDetails) {
172176
savaJobDetails.add(scheduleJobDetail);
@@ -177,7 +181,8 @@ public void savaJobList(List<ScheduleJobDetails> scheduleJobDetails) {
177181
}
178182
}
179183

180-
scheduleJobService.insertJobList(savaJobDetails, getType());
184+
EScheduleJobDistributeType distributeType = EScheduleJobDistributeType.getDistributeType(environmentContext.getJobGraphDistributeType());
185+
scheduleJobService.insertJobList(savaJobDetails, EScheduleType.NORMAL_SCHEDULE.getType(), distributeType, distributeContext);
181186
}
182187

183188
/**
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.taier.scheduler.server.distribute;
20+
21+
import com.dtstack.taier.common.enums.EScheduleJobDistributeType;
22+
import com.dtstack.taier.common.exception.TaierDefineException;
23+
import com.dtstack.taier.scheduler.server.JobPartitioner;
24+
import com.dtstack.taier.scheduler.server.ScheduleJobDetails;
25+
import com.google.common.collect.Maps;
26+
import org.apache.commons.collections.MapUtils;
27+
import org.springframework.beans.factory.annotation.Autowired;
28+
import org.springframework.stereotype.Component;
29+
30+
import java.util.Iterator;
31+
import java.util.List;
32+
import java.util.Map;
33+
34+
/**
35+
* 历史默认策略,按调度节点当前负载计算每个节点应接收的实例数。
36+
* @author xingyi
37+
*/
38+
@Component
39+
public class DefaultScheduleJobDistributeStrategy implements ScheduleJobDistributeStrategy {
40+
41+
@Autowired
42+
private JobPartitioner jobPartitioner;
43+
44+
@Override
45+
public EScheduleJobDistributeType distributeType() {
46+
return EScheduleJobDistributeType.DEFAULT;
47+
}
48+
49+
@Override
50+
public Map<ScheduleJobDetails, String> distribute(List<ScheduleJobDetails> scheduleJobDetails,
51+
Integer scheduleType,
52+
ScheduleJobDistributeContext distributeContext) {
53+
Map<ScheduleJobDetails, String> jobNodeMap = Maps.newHashMap();
54+
Map<String, Integer> nodeJobSize = jobPartitioner.computeBatchJobSize(scheduleType, scheduleJobDetails.size());
55+
if (MapUtils.isEmpty(nodeJobSize)) {
56+
throw new TaierDefineException("No available node to distribute schedule jobs");
57+
}
58+
59+
Iterator<ScheduleJobDetails> batchJobIterator = scheduleJobDetails.iterator();
60+
for (Map.Entry<String, Integer> nodeJobSizeEntry : nodeJobSize.entrySet()) {
61+
String nodeAddress = nodeJobSizeEntry.getKey();
62+
int nodeSize = nodeJobSizeEntry.getValue();
63+
while (nodeSize > 0 && batchJobIterator.hasNext()) {
64+
jobNodeMap.put(batchJobIterator.next(), nodeAddress);
65+
nodeSize--;
66+
}
67+
}
68+
return jobNodeMap;
69+
}
70+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.taier.scheduler.server.distribute;
20+
21+
import java.util.Map;
22+
import java.util.concurrent.ConcurrentHashMap;
23+
24+
/**
25+
* 周期实例分配上下文,用于在分批生成实例时共享策略状态。
26+
* @author xingyi
27+
*/
28+
public class ScheduleJobDistributeContext {
29+
30+
private final Map<String, Long> globalNodeLoadCache = new ConcurrentHashMap<>();
31+
32+
private final Map<String, Map<String, Long>> globalGroupNodeLoadCache = new ConcurrentHashMap<>();
33+
34+
public Map<String, Long> getGlobalNodeLoadCache() {
35+
return globalNodeLoadCache;
36+
}
37+
38+
public Map<String, Map<String, Long>> getGlobalGroupNodeLoadCache() {
39+
return globalGroupNodeLoadCache;
40+
}
41+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.taier.scheduler.server.distribute;
20+
21+
import com.dtstack.taier.common.enums.EScheduleJobDistributeType;
22+
import com.dtstack.taier.scheduler.server.ScheduleJobDetails;
23+
24+
import java.util.List;
25+
import java.util.Map;
26+
27+
/**
28+
* 周期实例节点分配策略。
29+
* @author xingyi
30+
*/
31+
public interface ScheduleJobDistributeStrategy {
32+
33+
EScheduleJobDistributeType distributeType();
34+
35+
Map<ScheduleJobDetails, String> distribute(List<ScheduleJobDetails> scheduleJobDetails,
36+
Integer scheduleType,
37+
ScheduleJobDistributeContext distributeContext);
38+
}

0 commit comments

Comments
 (0)