diff --git a/framework/service/config/serviceengine.xml b/framework/service/config/serviceengine.xml
index d5613533df6..dda2d5688ac 100644
--- a/framework/service/config/serviceengine.xml
+++ b/framework/service/config/serviceengine.xml
@@ -36,7 +36,10 @@ under the License.
min-threads="2"
max-threads="5"
poll-enabled="true"
- poll-db-millis="30000">
+ poll-db-millis="30000"
+ lease-refresh-millis="300000"
+ lease-validation-millis="480000"
+ lease-expiry-millis="600000">
diff --git a/framework/service/entitydef/entitymodel.xml b/framework/service/entitydef/entitymodel.xml
index 83ffa19436a..cd802f6d506 100644
--- a/framework/service/entitydef/entitymodel.xml
+++ b/framework/service/entitydef/entitymodel.xml
@@ -67,6 +67,7 @@ under the License.
+
diff --git a/framework/service/src/main/java/org/apache/ofbiz/service/config/model/ThreadPool.java b/framework/service/src/main/java/org/apache/ofbiz/service/config/model/ThreadPool.java
index 3b98af79a4a..da93086e281 100644
--- a/framework/service/src/main/java/org/apache/ofbiz/service/config/model/ThreadPool.java
+++ b/framework/service/src/main/java/org/apache/ofbiz/service/config/model/ThreadPool.java
@@ -44,6 +44,9 @@ public final class ThreadPool {
public static final int PURGE_JOBS_DAYS = 30;
public static final int QUEUE_SIZE = 100;
public static final int THREAD_TTL = 120000; // Idle thread lifespan - 2 minutes.
+ public static final int LEASE_REFRESH_MILLIS = 300000; // Heartbeat interval - 5 minutes.
+ public static final int LEASE_VALIDATION_MILLIS = 480000; // Stale-job scan interval - 8 minutes.
+ public static final int LEASE_EXPIRY_MILLIS = 600000; // Lease expiry threshold - 10 minutes.
private final int failedRetryMin;
private final int jobs;
@@ -55,6 +58,9 @@ public final class ThreadPool {
private final List runFromPools;
private final String sendToPool;
private final int ttl;
+ private final int leaseRefreshMillis;
+ private final int leaseValidationMillis;
+ private final int leaseExpiryMillis;
ThreadPool(Element poolElement) throws ServiceConfigException, NumberFormatException {
String sendToPool = poolElement.getAttribute("send-to-pool").intern();
@@ -170,6 +176,12 @@ public final class ThreadPool {
}
this.runFromPools = Collections.unmodifiableList(runFromPools);
}
+ String leaseRefreshMillis = poolElement.getAttribute("lease-refresh-millis").intern();
+ this.leaseRefreshMillis = leaseRefreshMillis.isEmpty() ? LEASE_REFRESH_MILLIS : Integer.parseInt(leaseRefreshMillis);
+ String leaseValidationMillis = poolElement.getAttribute("lease-validation-millis").intern();
+ this.leaseValidationMillis = leaseValidationMillis.isEmpty() ? LEASE_VALIDATION_MILLIS : Integer.parseInt(leaseValidationMillis);
+ String leaseExpiryMillis = poolElement.getAttribute("lease-expiry-millis").intern();
+ this.leaseExpiryMillis = leaseExpiryMillis.isEmpty() ? LEASE_EXPIRY_MILLIS : Integer.parseInt(leaseExpiryMillis);
}
public int getFailedRetryMin() {
@@ -211,4 +223,16 @@ public String getSendToPool() {
public int getTtl() {
return ttl;
}
+
+ public int getLeaseRefreshMillis() {
+ return leaseRefreshMillis;
+ }
+
+ public int getLeaseValidationMillis() {
+ return leaseValidationMillis;
+ }
+
+ public int getLeaseExpiryMillis() {
+ return leaseExpiryMillis;
+ }
}
diff --git a/framework/service/src/main/java/org/apache/ofbiz/service/job/JobManager.java b/framework/service/src/main/java/org/apache/ofbiz/service/job/JobManager.java
index 768f2d52473..9ff92b031c3 100644
--- a/framework/service/src/main/java/org/apache/ofbiz/service/job/JobManager.java
+++ b/framework/service/src/main/java/org/apache/ofbiz/service/job/JobManager.java
@@ -298,6 +298,73 @@ protected List poll(int limit) {
return poll;
}
+ /**
+ * Bulk-renews the lease (leaseUpdatedStamp) for all RUNNING and QUEUED jobs owned by this instance.
+ * Called periodically from the {@link JobPoller} main loop.
+ */
+ protected void heartbeatRunningJobs() {
+ assertIsRunning();
+ List conditions = UtilMisc.toList(
+ EntityCondition.makeCondition("runByInstanceId", EntityOperator.EQUALS, INSTANCE_ID),
+ EntityCondition.makeCondition(
+ EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING"),
+ EntityOperator.OR,
+ EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_QUEUED")));
+ try {
+ int updated = delegator.storeByCondition("JobSandbox",
+ UtilMisc.toMap("leaseUpdatedStamp", UtilDateTime.nowTimestamp()),
+ EntityCondition.makeCondition(conditions));
+ if (Debug.verboseOn()) {
+ Debug.logVerbose("Heartbeat: lease renewed for " + updated + " job(s) on instance [" + INSTANCE_ID + "]", MODULE);
+ }
+ } catch (GenericEntityException e) {
+ Debug.logWarning(e, "Exception thrown while renewing job leases: ", MODULE);
+ }
+ }
+
+ /**
+ * Scans all RUNNING/QUEUED jobs across all nodes and resets any whose leaseUpdatedStamp
+ * is older than the configured lease-expiry threshold, releasing them back to SERVICE_PENDING
+ * so a healthy node can pick them up.
+ * Uses storeByCondition for atomicity to avoid race conditions in multi-node deployments.
+ * Called periodically from the {@link JobPoller} main loop.
+ */
+ protected int recoverStaleJobs() {
+ assertIsRunning();
+ long leaseExpiryMillis;
+ try {
+ leaseExpiryMillis = ServiceConfigUtil.getServiceEngine().getThreadPool().getLeaseExpiryMillis();
+ } catch (GenericConfigException e) {
+ Debug.logWarning(e, "Unable to read lease-expiry-millis; using default.", MODULE);
+ leaseExpiryMillis = org.apache.ofbiz.service.config.model.ThreadPool.LEASE_EXPIRY_MILLIS;
+ }
+ Timestamp expiryThreshold = new Timestamp(System.currentTimeMillis() - leaseExpiryMillis);
+ // Match QUEUED or RUNNING jobs owned by any instance with an expired (or missing) lease stamp
+ List conditions = UtilMisc.toList(
+ EntityCondition.makeCondition(
+ EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_QUEUED"),
+ EntityOperator.OR,
+ EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING")),
+ EntityCondition.makeCondition("runByInstanceId", EntityOperator.NOT_EQUAL, null),
+ EntityCondition.makeCondition(
+ EntityCondition.makeCondition("leaseUpdatedStamp", EntityOperator.LESS_THAN, expiryThreshold),
+ EntityOperator.OR,
+ EntityCondition.makeCondition("leaseUpdatedStamp", EntityOperator.EQUALS, null)));
+ int recovered = 0;
+ try {
+ recovered = delegator.storeByCondition("JobSandbox",
+ UtilMisc.toMap("statusId", "SERVICE_PENDING", "runByInstanceId", null, "startDateTime", null, "leaseUpdatedStamp", null),
+ EntityCondition.makeCondition(conditions));
+ if (recovered > 0) {
+ Debug.logInfo("Stale job recovery: reset " + recovered
+ + " expired job(s) to SERVICE_PENDING on instance [" + INSTANCE_ID + "]", MODULE);
+ }
+ } catch (GenericEntityException e) {
+ Debug.logWarning(e, "Exception thrown while recovering stale jobs: ", MODULE);
+ }
+ return recovered;
+ }
+
public static List getJobsToPurge(Delegator delegator, String poolId, String instanceId, int limit, Timestamp purgeTime)
throws GenericEntityException {
List purgeCondition = UtilMisc.toList(
diff --git a/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPoller.java b/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPoller.java
index a1edb54e130..7b1b55f584f 100644
--- a/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPoller.java
+++ b/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPoller.java
@@ -115,6 +115,24 @@ private static int pollWaitTime() {
}
}
+ private static int leaseRefreshTime() {
+ try {
+ return ServiceConfigUtil.getServiceEngine(ServiceConfigUtil.getEngine()).getThreadPool().getLeaseRefreshMillis();
+ } catch (GenericConfigException e) {
+ Debug.logWarning(e, "Exception thrown while getting lease-refresh-millis, using default: ", MODULE);
+ return ThreadPool.LEASE_REFRESH_MILLIS;
+ }
+ }
+
+ private static int leaseValidationTime() {
+ try {
+ return ServiceConfigUtil.getServiceEngine(ServiceConfigUtil.getEngine()).getThreadPool().getLeaseValidationMillis();
+ } catch (GenericConfigException e) {
+ Debug.logWarning(e, "Exception thrown while getting lease-validation-millis, using default: ", MODULE);
+ return ThreadPool.LEASE_VALIDATION_MILLIS;
+ }
+ }
+
static int queueSize() {
try {
ThreadPool threadPool = ServiceConfigUtil.getServiceEngine(ServiceConfigUtil.getEngine()).getThreadPool();
@@ -245,7 +263,7 @@ public Thread newThread(Runnable runnable) {
}
}
- // Polls all registered JobManagers for jobs to queue.
+ // Polls all registered JobManagers for jobs to queue, and runs periodic lease heartbeat and stale-job recovery.
private final class JobManagerPoller implements Runnable {
// Do not check for interrupts in this method. The design requires the
@@ -253,6 +271,8 @@ private final class JobManagerPoller implements Runnable {
@Override
public void run() {
Debug.logInfo("JobPoller thread started.", MODULE);
+ long lastHeartbeatTime = 0;
+ long lastRecoveryTime = 0;
try {
while (Start.getInstance().getCurrentState() != Start.ServerState.RUNNING) {
Thread.sleep(1000);
@@ -295,6 +315,21 @@ public void run() {
}
}
}
+ long now = System.currentTimeMillis();
+ // Heartbeat: renew leaseUpdatedStamp for all jobs owned by this instance.
+ if (now - lastHeartbeatTime > leaseRefreshTime()) {
+ for (JobManager jm : JOB_MANAGERS.values()) {
+ jm.heartbeatRunningJobs();
+ }
+ lastHeartbeatTime = now;
+ }
+ // Recovery: reset stale jobs from dead nodes back to SERVICE_PENDING.
+ if (now - lastRecoveryTime > leaseValidationTime()) {
+ for (JobManager jm : JOB_MANAGERS.values()) {
+ jm.recoverStaleJobs();
+ }
+ lastRecoveryTime = now;
+ }
Thread.sleep(pollWaitTime());
}
} catch (InterruptedException e) {
diff --git a/framework/service/src/main/java/org/apache/ofbiz/service/job/PersistedServiceJob.java b/framework/service/src/main/java/org/apache/ofbiz/service/job/PersistedServiceJob.java
index 32c2a9ccf41..c700d8969b0 100644
--- a/framework/service/src/main/java/org/apache/ofbiz/service/job/PersistedServiceJob.java
+++ b/framework/service/src/main/java/org/apache/ofbiz/service/job/PersistedServiceJob.java
@@ -44,7 +44,10 @@
import org.apache.ofbiz.entity.GenericValue;
import org.apache.ofbiz.entity.serialize.SerializeException;
import org.apache.ofbiz.entity.serialize.XmlSerializer;
+import org.apache.ofbiz.entity.condition.EntityCondition;
+import org.apache.ofbiz.entity.condition.EntityOperator;
import org.apache.ofbiz.entity.util.EntityQuery;
+import org.apache.ofbiz.base.util.UtilMisc;
import org.apache.ofbiz.service.DispatchContext;
import org.apache.ofbiz.service.GenericRequester;
import org.apache.ofbiz.service.ServiceUtil;
@@ -156,6 +159,7 @@ protected void init() throws InvalidJobException {
}
jobValue.set("startDateTime", UtilDateTime.nowTimestamp());
jobValue.set("statusId", "SERVICE_RUNNING");
+ jobValue.set("leaseUpdatedStamp", UtilDateTime.nowTimestamp());
try {
jobValue.store();
} catch (GenericEntityException e) {
@@ -219,13 +223,30 @@ private void createRecurrence(long next, boolean isRetryOnFailure) throws Generi
/*
This solution ensures that the system uses a consistent,
UTC-based time for scheduling and rescheduling recurring jobs, even when DST changes affect the local time.
- */
+ */
ZonedDateTime nextRunTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(next), ZoneId.of("UTC"));
if (nextRunTime.toInstant().toEpochMilli() > startTime) {
String pJobId = jobValue.getString("parentJobId");
if (pJobId == null) {
pJobId = jobValue.getString("jobId");
}
+ // Check if the next recurrence has already been created (e.g. by a previous failed attempt on another node)
+ long nextEpoch = nextRunTime.toInstant().toEpochMilli();
+ long existingCount = EntityQuery.use(delegator).from("JobSandbox")
+ .where(EntityCondition.makeCondition(UtilMisc.toList(
+ EntityCondition.makeCondition("parentJobId", EntityOperator.EQUALS, pJobId),
+ EntityCondition.makeCondition("runTimeEpoch", EntityOperator.EQUALS, nextEpoch),
+ EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_PENDING"))))
+ .queryCount();
+ if (existingCount > 0) {
+ if (Debug.infoOn()) {
+ Debug.logInfo("Skipping duplicate recurrence for job [" + getJobId()
+ + "] - next slot at epoch [" + nextEpoch + "] already exists for parent [" + pJobId + "]",
+ MODULE);
+ }
+ nextRecurrence = next;
+ return;
+ }
GenericValue newJob = GenericValue.create(jobValue);
newJob.remove("jobId");
newJob.set("previousJobId", jobValue.getString("jobId"));
@@ -235,6 +256,7 @@ private void createRecurrence(long next, boolean isRetryOnFailure) throws Generi
newJob.set("runByInstanceId", null);
newJob.set("runTime", Timestamp.from(nextRunTime.toInstant()));
newJob.set("runTimeEpoch", nextRunTime.toInstant().toEpochMilli());
+ newJob.set("leaseUpdatedStamp", null);
if (isRetryOnFailure) {
newJob.set("currentRetryCount", currentRetryCount + 1);
} else {
@@ -405,6 +427,7 @@ public void deQueue() throws InvalidJobException {
jobValue.refresh();
jobValue.set("startDateTime", null);
jobValue.set("runByInstanceId", null);
+ jobValue.set("leaseUpdatedStamp", null);
jobValue.set("statusId", "SERVICE_PENDING");
jobValue.store();
} catch (GenericEntityException e) {