Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2024-2025 the original author or authors.
* Copyright 2024-2026 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,9 +15,7 @@
*/
package org.springframework.batch.core.repository.dao.mongodb;

import java.util.Comparator;
import java.util.List;
import java.util.Optional;

import org.jspecify.annotations.Nullable;

Expand All @@ -27,6 +25,7 @@
import org.springframework.batch.core.repository.dao.StepExecutionDao;
import org.springframework.batch.core.repository.persistence.converter.JobExecutionConverter;
import org.springframework.batch.core.repository.persistence.converter.StepExecutionConverter;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.MongoOperations;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer;
Expand Down Expand Up @@ -113,37 +112,32 @@ public StepExecution getStepExecution(JobExecution jobExecution, long stepExecut
@Nullable
@Override
public StepExecution getLastStepExecution(JobInstance jobInstance, String stepName) {
// TODO optimize the query
// get all step executions
Query query = query(where("jobInstanceId").is(jobInstance.getId()));
Query jobExecutionsQuery = query(where("jobInstanceId").is(jobInstance.getId()));
List<org.springframework.batch.core.repository.persistence.JobExecution> jobExecutions = this.mongoOperations
.find(query, org.springframework.batch.core.repository.persistence.JobExecution.class,
.find(jobExecutionsQuery, org.springframework.batch.core.repository.persistence.JobExecution.class,
JOB_EXECUTIONS_COLLECTION_NAME);
List<org.springframework.batch.core.repository.persistence.StepExecution> stepExecutions = this.mongoOperations
.find(query(where("jobExecutionId").in(jobExecutions.stream()
.map(org.springframework.batch.core.repository.persistence.JobExecution::getJobExecutionId)
.toList())), org.springframework.batch.core.repository.persistence.StepExecution.class,
STEP_EXECUTIONS_COLLECTION_NAME);
// sort step executions by creation date then id (see contract) and return the
// last one
Optional<org.springframework.batch.core.repository.persistence.StepExecution> lastStepExecution = stepExecutions
.stream()
.filter(stepExecution -> stepExecution.getName().equals(stepName))
.max(Comparator
.comparing(org.springframework.batch.core.repository.persistence.StepExecution::getCreateTime)
.thenComparing(
org.springframework.batch.core.repository.persistence.StepExecution::getStepExecutionId));
if (lastStepExecution.isPresent()) {
org.springframework.batch.core.repository.persistence.StepExecution stepExecution = lastStepExecution.get();
JobExecution jobExecution = this.jobExecutionConverter.toJobExecution(jobExecutions.stream()
.filter(execution -> execution.getJobExecutionId() == stepExecution.getJobExecutionId())
.findFirst()
.get(), jobInstance);
return this.stepExecutionConverter.toStepExecution(stepExecution, jobExecution);
if (jobExecutions.isEmpty()) {
return null;
}
else {
List<Long> jobExecutionIds = jobExecutions.stream()
.map(org.springframework.batch.core.repository.persistence.JobExecution::getJobExecutionId)
.toList();
// filter by step name and sort by creation date then id (see contract) at the
// database level, then return the most recent step execution
Query stepExecutionQuery = query(where("jobExecutionId").in(jobExecutionIds).and("name").is(stepName))
.with(Sort.by(Sort.Direction.DESC, "createTime", "stepExecutionId"))
.limit(1);
org.springframework.batch.core.repository.persistence.StepExecution lastStepExecution = this.mongoOperations
.findOne(stepExecutionQuery, org.springframework.batch.core.repository.persistence.StepExecution.class,
STEP_EXECUTIONS_COLLECTION_NAME);
if (lastStepExecution == null) {
return null;
}
JobExecution jobExecution = this.jobExecutionConverter.toJobExecution(jobExecutions.stream()
.filter(execution -> execution.getJobExecutionId() == lastStepExecution.getJobExecutionId())
.findFirst()
.get(), jobInstance);
return this.stepExecutionConverter.toStepExecution(lastStepExecution, jobExecution);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2008-2025 the original author or authors.
* Copyright 2008-2026 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -115,6 +115,24 @@ void testSaveAndGetLastExecutionWhenSameStartTime() {
assertEquals(lastStepExecution.getId(), retrieved.getId());
}

@Test
void testGetLastExecutionFiltersByStepName() {
dao.createStepExecution("stepA", jobExecution);
dao.createStepExecution("stepB", jobExecution);
StepExecution stepA2 = dao.createStepExecution("stepA", jobExecution);
StepExecution stepB2 = dao.createStepExecution("stepB", jobExecution);

StepExecution lastA = dao.getLastStepExecution(jobInstance, "stepA");
assertNotNull(lastA);
assertEquals(stepA2.getId(), lastA.getId());

StepExecution lastB = dao.getLastStepExecution(jobInstance, "stepB");
assertNotNull(lastB);
assertEquals(stepB2.getId(), lastB.getId());

assertNull(dao.getLastStepExecution(jobInstance, "nonExistentStep"));
}

@Test
void testGetForNotExistingJobExecution() {
assertNull(dao.getStepExecution(45677L));
Expand Down
Loading