From 707367b5b266abd3810be9ffd437c3c2ba244374 Mon Sep 17 00:00:00 2001 From: Ankit Joshi Date: Fri, 15 May 2026 18:15:24 +0530 Subject: [PATCH] Improved stale JobSandbox recovery handling in multi-node environments OFBIZ-13383 Added heartbeat based jobs tracking and recovering stale jobs that are still owned by the terminated or replaced nodes. Utilized atomic updates for the jobs heartbeat updates and recovering logic to avoid race conditions and state overriden between multiple nodes --- framework/service/config/serviceengine.xml | 5 +- framework/service/entitydef/entitymodel.xml | 1 + .../service/config/model/ThreadPool.java | 24 +++++++ .../apache/ofbiz/service/job/JobManager.java | 67 +++++++++++++++++++ .../apache/ofbiz/service/job/JobPoller.java | 37 +++++++++- .../service/job/PersistedServiceJob.java | 25 ++++++- 6 files changed, 156 insertions(+), 3 deletions(-) diff --git a/framework/service/config/serviceengine.xml b/framework/service/config/serviceengine.xml index d5613533df6..dda2d5688ac 100644 --- a/framework/service/config/serviceengine.xml +++ b/framework/service/config/serviceengine.xml @@ -36,7 +36,10 @@ under the License. min-threads="2" max-threads="5" poll-enabled="true" - poll-db-millis="30000"> + poll-db-millis="30000" + lease-refresh-millis="300000" + lease-validation-millis="480000" + lease-expiry-millis="600000"> diff --git a/framework/service/entitydef/entitymodel.xml b/framework/service/entitydef/entitymodel.xml index 83ffa19436a..cd802f6d506 100644 --- a/framework/service/entitydef/entitymodel.xml +++ b/framework/service/entitydef/entitymodel.xml @@ -67,6 +67,7 @@ under the License. + diff --git a/framework/service/src/main/java/org/apache/ofbiz/service/config/model/ThreadPool.java b/framework/service/src/main/java/org/apache/ofbiz/service/config/model/ThreadPool.java index 3b98af79a4a..da93086e281 100644 --- a/framework/service/src/main/java/org/apache/ofbiz/service/config/model/ThreadPool.java +++ b/framework/service/src/main/java/org/apache/ofbiz/service/config/model/ThreadPool.java @@ -44,6 +44,9 @@ public final class ThreadPool { public static final int PURGE_JOBS_DAYS = 30; public static final int QUEUE_SIZE = 100; public static final int THREAD_TTL = 120000; // Idle thread lifespan - 2 minutes. + public static final int LEASE_REFRESH_MILLIS = 300000; // Heartbeat interval - 5 minutes. + public static final int LEASE_VALIDATION_MILLIS = 480000; // Stale-job scan interval - 8 minutes. + public static final int LEASE_EXPIRY_MILLIS = 600000; // Lease expiry threshold - 10 minutes. private final int failedRetryMin; private final int jobs; @@ -55,6 +58,9 @@ public final class ThreadPool { private final List runFromPools; private final String sendToPool; private final int ttl; + private final int leaseRefreshMillis; + private final int leaseValidationMillis; + private final int leaseExpiryMillis; ThreadPool(Element poolElement) throws ServiceConfigException, NumberFormatException { String sendToPool = poolElement.getAttribute("send-to-pool").intern(); @@ -170,6 +176,12 @@ public final class ThreadPool { } this.runFromPools = Collections.unmodifiableList(runFromPools); } + String leaseRefreshMillis = poolElement.getAttribute("lease-refresh-millis").intern(); + this.leaseRefreshMillis = leaseRefreshMillis.isEmpty() ? LEASE_REFRESH_MILLIS : Integer.parseInt(leaseRefreshMillis); + String leaseValidationMillis = poolElement.getAttribute("lease-validation-millis").intern(); + this.leaseValidationMillis = leaseValidationMillis.isEmpty() ? LEASE_VALIDATION_MILLIS : Integer.parseInt(leaseValidationMillis); + String leaseExpiryMillis = poolElement.getAttribute("lease-expiry-millis").intern(); + this.leaseExpiryMillis = leaseExpiryMillis.isEmpty() ? LEASE_EXPIRY_MILLIS : Integer.parseInt(leaseExpiryMillis); } public int getFailedRetryMin() { @@ -211,4 +223,16 @@ public String getSendToPool() { public int getTtl() { return ttl; } + + public int getLeaseRefreshMillis() { + return leaseRefreshMillis; + } + + public int getLeaseValidationMillis() { + return leaseValidationMillis; + } + + public int getLeaseExpiryMillis() { + return leaseExpiryMillis; + } } diff --git a/framework/service/src/main/java/org/apache/ofbiz/service/job/JobManager.java b/framework/service/src/main/java/org/apache/ofbiz/service/job/JobManager.java index 768f2d52473..9ff92b031c3 100644 --- a/framework/service/src/main/java/org/apache/ofbiz/service/job/JobManager.java +++ b/framework/service/src/main/java/org/apache/ofbiz/service/job/JobManager.java @@ -298,6 +298,73 @@ protected List poll(int limit) { return poll; } + /** + * Bulk-renews the lease (leaseUpdatedStamp) for all RUNNING and QUEUED jobs owned by this instance. + * Called periodically from the {@link JobPoller} main loop. + */ + protected void heartbeatRunningJobs() { + assertIsRunning(); + List conditions = UtilMisc.toList( + EntityCondition.makeCondition("runByInstanceId", EntityOperator.EQUALS, INSTANCE_ID), + EntityCondition.makeCondition( + EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING"), + EntityOperator.OR, + EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_QUEUED"))); + try { + int updated = delegator.storeByCondition("JobSandbox", + UtilMisc.toMap("leaseUpdatedStamp", UtilDateTime.nowTimestamp()), + EntityCondition.makeCondition(conditions)); + if (Debug.verboseOn()) { + Debug.logVerbose("Heartbeat: lease renewed for " + updated + " job(s) on instance [" + INSTANCE_ID + "]", MODULE); + } + } catch (GenericEntityException e) { + Debug.logWarning(e, "Exception thrown while renewing job leases: ", MODULE); + } + } + + /** + * Scans all RUNNING/QUEUED jobs across all nodes and resets any whose leaseUpdatedStamp + * is older than the configured lease-expiry threshold, releasing them back to SERVICE_PENDING + * so a healthy node can pick them up. + * Uses storeByCondition for atomicity to avoid race conditions in multi-node deployments. + * Called periodically from the {@link JobPoller} main loop. + */ + protected int recoverStaleJobs() { + assertIsRunning(); + long leaseExpiryMillis; + try { + leaseExpiryMillis = ServiceConfigUtil.getServiceEngine().getThreadPool().getLeaseExpiryMillis(); + } catch (GenericConfigException e) { + Debug.logWarning(e, "Unable to read lease-expiry-millis; using default.", MODULE); + leaseExpiryMillis = org.apache.ofbiz.service.config.model.ThreadPool.LEASE_EXPIRY_MILLIS; + } + Timestamp expiryThreshold = new Timestamp(System.currentTimeMillis() - leaseExpiryMillis); + // Match QUEUED or RUNNING jobs owned by any instance with an expired (or missing) lease stamp + List conditions = UtilMisc.toList( + EntityCondition.makeCondition( + EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_QUEUED"), + EntityOperator.OR, + EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING")), + EntityCondition.makeCondition("runByInstanceId", EntityOperator.NOT_EQUAL, null), + EntityCondition.makeCondition( + EntityCondition.makeCondition("leaseUpdatedStamp", EntityOperator.LESS_THAN, expiryThreshold), + EntityOperator.OR, + EntityCondition.makeCondition("leaseUpdatedStamp", EntityOperator.EQUALS, null))); + int recovered = 0; + try { + recovered = delegator.storeByCondition("JobSandbox", + UtilMisc.toMap("statusId", "SERVICE_PENDING", "runByInstanceId", null, "startDateTime", null, "leaseUpdatedStamp", null), + EntityCondition.makeCondition(conditions)); + if (recovered > 0) { + Debug.logInfo("Stale job recovery: reset " + recovered + + " expired job(s) to SERVICE_PENDING on instance [" + INSTANCE_ID + "]", MODULE); + } + } catch (GenericEntityException e) { + Debug.logWarning(e, "Exception thrown while recovering stale jobs: ", MODULE); + } + return recovered; + } + public static List getJobsToPurge(Delegator delegator, String poolId, String instanceId, int limit, Timestamp purgeTime) throws GenericEntityException { List purgeCondition = UtilMisc.toList( diff --git a/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPoller.java b/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPoller.java index a1edb54e130..7b1b55f584f 100644 --- a/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPoller.java +++ b/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPoller.java @@ -115,6 +115,24 @@ private static int pollWaitTime() { } } + private static int leaseRefreshTime() { + try { + return ServiceConfigUtil.getServiceEngine(ServiceConfigUtil.getEngine()).getThreadPool().getLeaseRefreshMillis(); + } catch (GenericConfigException e) { + Debug.logWarning(e, "Exception thrown while getting lease-refresh-millis, using default: ", MODULE); + return ThreadPool.LEASE_REFRESH_MILLIS; + } + } + + private static int leaseValidationTime() { + try { + return ServiceConfigUtil.getServiceEngine(ServiceConfigUtil.getEngine()).getThreadPool().getLeaseValidationMillis(); + } catch (GenericConfigException e) { + Debug.logWarning(e, "Exception thrown while getting lease-validation-millis, using default: ", MODULE); + return ThreadPool.LEASE_VALIDATION_MILLIS; + } + } + static int queueSize() { try { ThreadPool threadPool = ServiceConfigUtil.getServiceEngine(ServiceConfigUtil.getEngine()).getThreadPool(); @@ -245,7 +263,7 @@ public Thread newThread(Runnable runnable) { } } - // Polls all registered JobManagers for jobs to queue. + // Polls all registered JobManagers for jobs to queue, and runs periodic lease heartbeat and stale-job recovery. private final class JobManagerPoller implements Runnable { // Do not check for interrupts in this method. The design requires the @@ -253,6 +271,8 @@ private final class JobManagerPoller implements Runnable { @Override public void run() { Debug.logInfo("JobPoller thread started.", MODULE); + long lastHeartbeatTime = 0; + long lastRecoveryTime = 0; try { while (Start.getInstance().getCurrentState() != Start.ServerState.RUNNING) { Thread.sleep(1000); @@ -295,6 +315,21 @@ public void run() { } } } + long now = System.currentTimeMillis(); + // Heartbeat: renew leaseUpdatedStamp for all jobs owned by this instance. + if (now - lastHeartbeatTime > leaseRefreshTime()) { + for (JobManager jm : JOB_MANAGERS.values()) { + jm.heartbeatRunningJobs(); + } + lastHeartbeatTime = now; + } + // Recovery: reset stale jobs from dead nodes back to SERVICE_PENDING. + if (now - lastRecoveryTime > leaseValidationTime()) { + for (JobManager jm : JOB_MANAGERS.values()) { + jm.recoverStaleJobs(); + } + lastRecoveryTime = now; + } Thread.sleep(pollWaitTime()); } } catch (InterruptedException e) { diff --git a/framework/service/src/main/java/org/apache/ofbiz/service/job/PersistedServiceJob.java b/framework/service/src/main/java/org/apache/ofbiz/service/job/PersistedServiceJob.java index 32c2a9ccf41..c700d8969b0 100644 --- a/framework/service/src/main/java/org/apache/ofbiz/service/job/PersistedServiceJob.java +++ b/framework/service/src/main/java/org/apache/ofbiz/service/job/PersistedServiceJob.java @@ -44,7 +44,10 @@ import org.apache.ofbiz.entity.GenericValue; import org.apache.ofbiz.entity.serialize.SerializeException; import org.apache.ofbiz.entity.serialize.XmlSerializer; +import org.apache.ofbiz.entity.condition.EntityCondition; +import org.apache.ofbiz.entity.condition.EntityOperator; import org.apache.ofbiz.entity.util.EntityQuery; +import org.apache.ofbiz.base.util.UtilMisc; import org.apache.ofbiz.service.DispatchContext; import org.apache.ofbiz.service.GenericRequester; import org.apache.ofbiz.service.ServiceUtil; @@ -156,6 +159,7 @@ protected void init() throws InvalidJobException { } jobValue.set("startDateTime", UtilDateTime.nowTimestamp()); jobValue.set("statusId", "SERVICE_RUNNING"); + jobValue.set("leaseUpdatedStamp", UtilDateTime.nowTimestamp()); try { jobValue.store(); } catch (GenericEntityException e) { @@ -219,13 +223,30 @@ private void createRecurrence(long next, boolean isRetryOnFailure) throws Generi /* This solution ensures that the system uses a consistent, UTC-based time for scheduling and rescheduling recurring jobs, even when DST changes affect the local time. - */ + */ ZonedDateTime nextRunTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(next), ZoneId.of("UTC")); if (nextRunTime.toInstant().toEpochMilli() > startTime) { String pJobId = jobValue.getString("parentJobId"); if (pJobId == null) { pJobId = jobValue.getString("jobId"); } + // Check if the next recurrence has already been created (e.g. by a previous failed attempt on another node) + long nextEpoch = nextRunTime.toInstant().toEpochMilli(); + long existingCount = EntityQuery.use(delegator).from("JobSandbox") + .where(EntityCondition.makeCondition(UtilMisc.toList( + EntityCondition.makeCondition("parentJobId", EntityOperator.EQUALS, pJobId), + EntityCondition.makeCondition("runTimeEpoch", EntityOperator.EQUALS, nextEpoch), + EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_PENDING")))) + .queryCount(); + if (existingCount > 0) { + if (Debug.infoOn()) { + Debug.logInfo("Skipping duplicate recurrence for job [" + getJobId() + + "] - next slot at epoch [" + nextEpoch + "] already exists for parent [" + pJobId + "]", + MODULE); + } + nextRecurrence = next; + return; + } GenericValue newJob = GenericValue.create(jobValue); newJob.remove("jobId"); newJob.set("previousJobId", jobValue.getString("jobId")); @@ -235,6 +256,7 @@ private void createRecurrence(long next, boolean isRetryOnFailure) throws Generi newJob.set("runByInstanceId", null); newJob.set("runTime", Timestamp.from(nextRunTime.toInstant())); newJob.set("runTimeEpoch", nextRunTime.toInstant().toEpochMilli()); + newJob.set("leaseUpdatedStamp", null); if (isRetryOnFailure) { newJob.set("currentRetryCount", currentRetryCount + 1); } else { @@ -405,6 +427,7 @@ public void deQueue() throws InvalidJobException { jobValue.refresh(); jobValue.set("startDateTime", null); jobValue.set("runByInstanceId", null); + jobValue.set("leaseUpdatedStamp", null); jobValue.set("statusId", "SERVICE_PENDING"); jobValue.store(); } catch (GenericEntityException e) {