Skip to content

Commit 22b4e2a

Browse files
committed
Improve performance for "SimpleJobRepository#update(StepExecution)"
This commit save queries for execution context. See GH-5360 Signed-off-by: Yanming Zhou <zhouyanming@gmail.com>
1 parent d8632a3 commit 22b4e2a

5 files changed

Lines changed: 89 additions & 12 deletions

File tree

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/StepExecutionDao.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,13 @@ default StepExecution createStepExecution(String stepName, JobExecution jobExecu
6868
@Deprecated(since = "6.0", forRemoval = true)
6969
StepExecution getStepExecution(JobExecution jobExecution, long stepExecutionId);
7070

71+
/**
72+
* Because it may be possible that the status of a StepExecution is updated while
73+
* running, the following method will synchronize only the status and version fields.
74+
* @param stepExecution to be updated.
75+
*/
76+
void synchronizeStatus(StepExecution stepExecution);
77+
7178
/**
7279
* Retrieve the last {@link StepExecution} for a given {@link JobInstance} ordered by
7380
* creation time and then id.

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcStepExecutionDao.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,7 @@
1919
import java.sql.ResultSet;
2020
import java.sql.Timestamp;
2121
import java.sql.Types;
22-
import java.util.ArrayList;
23-
import java.util.Arrays;
24-
import java.util.List;
22+
import java.util.*;
2523
import java.util.concurrent.locks.Lock;
2624
import java.util.concurrent.locks.ReentrantLock;
2725

@@ -90,6 +88,12 @@ public class JdbcStepExecutionDao extends AbstractJdbcBatchMetadataDao implement
9088

9189
private static final String GET_STEP_EXECUTION = GET_RAW_STEP_EXECUTIONS + " WHERE STEP_EXECUTION_ID = ?";
9290

91+
private static final String GET_VERSION_AND_STATUS = """
92+
SELECT VERSION,STATUS
93+
FROM %PREFIX%STEP_EXECUTION
94+
WHERE STEP_EXECUTION_ID=?
95+
""";
96+
9397
private static final String GET_LAST_STEP_EXECUTION = """
9498
SELECT SE.STEP_EXECUTION_ID, SE.STEP_NAME, SE.START_TIME, SE.END_TIME, SE.STATUS, SE.COMMIT_COUNT, SE.READ_COUNT, SE.FILTER_COUNT, SE.WRITE_COUNT, SE.EXIT_CODE, SE.EXIT_MESSAGE, SE.READ_SKIP_COUNT, SE.WRITE_SKIP_COUNT, SE.PROCESS_SKIP_COUNT, SE.ROLLBACK_COUNT, SE.LAST_UPDATED, SE.VERSION, SE.CREATE_TIME, JE.JOB_EXECUTION_ID, JE.START_TIME, JE.END_TIME, JE.STATUS, JE.EXIT_CODE, JE.EXIT_MESSAGE, JE.CREATE_TIME, JE.LAST_UPDATED, JE.VERSION
9599
FROM %PREFIX%JOB_EXECUTION JE
@@ -308,6 +312,17 @@ public StepExecution getStepExecution(JobExecution jobExecution, long stepExecut
308312
}
309313
}
310314

315+
@Override
316+
public void synchronizeStatus(StepExecution stepExecution) {
317+
getJdbcTemplate().query(getQuery(GET_VERSION_AND_STATUS), rs -> {
318+
Integer currentVersion = rs.getInt("VERSION");
319+
if (!Objects.equals(currentVersion, stepExecution.getVersion())) {
320+
stepExecution.upgradeStatus(BatchStatus.valueOf(rs.getString("STATUS")));
321+
stepExecution.setVersion(currentVersion);
322+
}
323+
}, stepExecution.getId());
324+
}
325+
311326
@Nullable
312327
@Override
313328
public StepExecution getLastStepExecution(JobInstance jobInstance, String stepName) {

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/mongodb/MongoStepExecutionDao.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,17 @@ public StepExecution getStepExecution(JobExecution jobExecution, long stepExecut
110110
return stepExecution != null ? this.stepExecutionConverter.toStepExecution(stepExecution, jobExecution) : null;
111111
}
112112

113+
@Override
114+
public void synchronizeStatus(StepExecution stepExecution) {
115+
StepExecution currentStepExecution = getStepExecution(stepExecution.getId());
116+
if (currentStepExecution != null && currentStepExecution.getStatus().isGreaterThan(stepExecution.getStatus())) {
117+
stepExecution.upgradeStatus(currentStepExecution.getStatus());
118+
}
119+
// TODO the contract mentions to update the version as well. Double check if this
120+
// is needed as the version is not used in the tests following the call sites of
121+
// synchronizeStatus
122+
}
123+
113124
@Nullable
114125
@Override
115126
public StepExecution getLastStepExecution(JobInstance jobInstance, String stepName) {

spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/SimpleJobRepository.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -159,15 +159,11 @@ public void update(StepExecution stepExecution) {
159159

160160
stepExecution.setLastUpdated(LocalDateTime.now());
161161

162-
StepExecution latestStepExecution = getStepExecution(stepExecution.getId());
163-
Assert.state(latestStepExecution != null,
164-
"StepExecution with id " + stepExecution.getId() + "not found. Batch metadata state may be corrupted.");
165-
166-
if (latestStepExecution.getJobExecution().isStopped() || latestStepExecution.getJobExecution().isStopping()) {
167-
Integer version = latestStepExecution.getVersion();
168-
if (version != null) {
169-
stepExecution.setVersion(version);
170-
}
162+
JobExecution jobExecution = stepExecution.getJobExecution();
163+
this.jobExecutionDao.synchronizeStatus(jobExecution);
164+
165+
if (jobExecution.isStopped() || jobExecution.isStopping()) {
166+
this.stepExecutionDao.synchronizeStatus(stepExecution);
171167
stepExecution.setTerminateOnly();
172168
}
173169

spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/MongoStepExecutionDaoIntegrationTests.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,54 @@ void testConcurrentModificationException() {
161161
assertThrows(OptimisticLockingFailureException.class, () -> dao.updateStepExecution(exec2));
162162
}
163163

164+
/**
165+
* Successful synchronization from STARTED to STOPPING status.
166+
*/
167+
@Test
168+
void testSynchronizeStatusUpgrade() {
169+
170+
StepExecution exec1 = dao.createStepExecution("step", jobExecution);
171+
exec1.setStatus(BatchStatus.STOPPING);
172+
dao.updateStepExecution(exec1);
173+
174+
StepExecution exec2 = dao.getStepExecution(exec1.getId());
175+
assertNotNull(exec2);
176+
exec2.setStatus(BatchStatus.STARTED);
177+
// exec2.setVersion(7);
178+
// assertNotSame(exec1.getVersion(), exec2.getVersion());
179+
assertNotSame(exec1.getStatus(), exec2.getStatus());
180+
181+
dao.synchronizeStatus(exec2);
182+
183+
// assertEquals(exec1.getVersion(), exec2.getVersion());
184+
assertEquals(exec1.getStatus(), exec2.getStatus());
185+
}
186+
187+
/**
188+
* UNKNOWN status won't be changed by synchronizeStatus, because it is the 'largest'
189+
* BatchStatus (will not downgrade).
190+
*/
191+
@Test
192+
void testSynchronizeStatusDowngrade() {
193+
194+
StepExecution exec1 = dao.createStepExecution("step", jobExecution);
195+
exec1.setStatus(BatchStatus.STARTED);
196+
dao.updateStepExecution(exec1);
197+
198+
StepExecution exec2 = dao.getStepExecution(exec1.getId());
199+
assertNotNull(exec2);
200+
201+
exec2.setStatus(BatchStatus.UNKNOWN);
202+
// exec2.setVersion(7);
203+
// assertNotSame(exec1.getVersion(), exec2.getVersion());
204+
assertTrue(exec1.getStatus().isLessThan(exec2.getStatus()));
205+
206+
dao.synchronizeStatus(exec2);
207+
208+
// assertEquals(exec1.getVersion(), exec2.getVersion());
209+
assertEquals(BatchStatus.UNKNOWN, exec2.getStatus());
210+
}
211+
164212
private void assertStepExecutionsAreEqual(StepExecution expected, StepExecution actual) {
165213
assertEquals(expected.getId(), actual.getId());
166214
assertTemporalEquals(expected.getStartTime(), actual.getStartTime());

0 commit comments

Comments
 (0)