diff --git a/framework/service/config/serviceengine.xml b/framework/service/config/serviceengine.xml index d5613533df6..dda2d5688ac 100644 --- a/framework/service/config/serviceengine.xml +++ b/framework/service/config/serviceengine.xml @@ -36,7 +36,10 @@ under the License. min-threads="2" max-threads="5" poll-enabled="true" - poll-db-millis="30000"> + poll-db-millis="30000" + lease-refresh-millis="300000" + lease-validation-millis="480000" + lease-expiry-millis="600000"> diff --git a/framework/service/entitydef/entitymodel.xml b/framework/service/entitydef/entitymodel.xml index 83ffa19436a..cd802f6d506 100644 --- a/framework/service/entitydef/entitymodel.xml +++ b/framework/service/entitydef/entitymodel.xml @@ -67,6 +67,7 @@ under the License. + diff --git a/framework/service/src/main/java/org/apache/ofbiz/service/config/model/ThreadPool.java b/framework/service/src/main/java/org/apache/ofbiz/service/config/model/ThreadPool.java index 3b98af79a4a..da93086e281 100644 --- a/framework/service/src/main/java/org/apache/ofbiz/service/config/model/ThreadPool.java +++ b/framework/service/src/main/java/org/apache/ofbiz/service/config/model/ThreadPool.java @@ -44,6 +44,9 @@ public final class ThreadPool { public static final int PURGE_JOBS_DAYS = 30; public static final int QUEUE_SIZE = 100; public static final int THREAD_TTL = 120000; // Idle thread lifespan - 2 minutes. + public static final int LEASE_REFRESH_MILLIS = 300000; // Heartbeat interval - 5 minutes. + public static final int LEASE_VALIDATION_MILLIS = 480000; // Stale-job scan interval - 8 minutes. + public static final int LEASE_EXPIRY_MILLIS = 600000; // Lease expiry threshold - 10 minutes. private final int failedRetryMin; private final int jobs; @@ -55,6 +58,9 @@ public final class ThreadPool { private final List runFromPools; private final String sendToPool; private final int ttl; + private final int leaseRefreshMillis; + private final int leaseValidationMillis; + private final int leaseExpiryMillis; ThreadPool(Element poolElement) throws ServiceConfigException, NumberFormatException { String sendToPool = poolElement.getAttribute("send-to-pool").intern(); @@ -170,6 +176,12 @@ public final class ThreadPool { } this.runFromPools = Collections.unmodifiableList(runFromPools); } + String leaseRefreshMillis = poolElement.getAttribute("lease-refresh-millis").intern(); + this.leaseRefreshMillis = leaseRefreshMillis.isEmpty() ? LEASE_REFRESH_MILLIS : Integer.parseInt(leaseRefreshMillis); + String leaseValidationMillis = poolElement.getAttribute("lease-validation-millis").intern(); + this.leaseValidationMillis = leaseValidationMillis.isEmpty() ? LEASE_VALIDATION_MILLIS : Integer.parseInt(leaseValidationMillis); + String leaseExpiryMillis = poolElement.getAttribute("lease-expiry-millis").intern(); + this.leaseExpiryMillis = leaseExpiryMillis.isEmpty() ? LEASE_EXPIRY_MILLIS : Integer.parseInt(leaseExpiryMillis); } public int getFailedRetryMin() { @@ -211,4 +223,16 @@ public String getSendToPool() { public int getTtl() { return ttl; } + + public int getLeaseRefreshMillis() { + return leaseRefreshMillis; + } + + public int getLeaseValidationMillis() { + return leaseValidationMillis; + } + + public int getLeaseExpiryMillis() { + return leaseExpiryMillis; + } } diff --git a/framework/service/src/main/java/org/apache/ofbiz/service/job/JobManager.java b/framework/service/src/main/java/org/apache/ofbiz/service/job/JobManager.java index 768f2d52473..9ff92b031c3 100644 --- a/framework/service/src/main/java/org/apache/ofbiz/service/job/JobManager.java +++ b/framework/service/src/main/java/org/apache/ofbiz/service/job/JobManager.java @@ -298,6 +298,73 @@ protected List poll(int limit) { return poll; } + /** + * Bulk-renews the lease (leaseUpdatedStamp) for all RUNNING and QUEUED jobs owned by this instance. + * Called periodically from the {@link JobPoller} main loop. + */ + protected void heartbeatRunningJobs() { + assertIsRunning(); + List conditions = UtilMisc.toList( + EntityCondition.makeCondition("runByInstanceId", EntityOperator.EQUALS, INSTANCE_ID), + EntityCondition.makeCondition( + EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING"), + EntityOperator.OR, + EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_QUEUED"))); + try { + int updated = delegator.storeByCondition("JobSandbox", + UtilMisc.toMap("leaseUpdatedStamp", UtilDateTime.nowTimestamp()), + EntityCondition.makeCondition(conditions)); + if (Debug.verboseOn()) { + Debug.logVerbose("Heartbeat: lease renewed for " + updated + " job(s) on instance [" + INSTANCE_ID + "]", MODULE); + } + } catch (GenericEntityException e) { + Debug.logWarning(e, "Exception thrown while renewing job leases: ", MODULE); + } + } + + /** + * Scans all RUNNING/QUEUED jobs across all nodes and resets any whose leaseUpdatedStamp + * is older than the configured lease-expiry threshold, releasing them back to SERVICE_PENDING + * so a healthy node can pick them up. + * Uses storeByCondition for atomicity to avoid race conditions in multi-node deployments. + * Called periodically from the {@link JobPoller} main loop. + */ + protected int recoverStaleJobs() { + assertIsRunning(); + long leaseExpiryMillis; + try { + leaseExpiryMillis = ServiceConfigUtil.getServiceEngine().getThreadPool().getLeaseExpiryMillis(); + } catch (GenericConfigException e) { + Debug.logWarning(e, "Unable to read lease-expiry-millis; using default.", MODULE); + leaseExpiryMillis = org.apache.ofbiz.service.config.model.ThreadPool.LEASE_EXPIRY_MILLIS; + } + Timestamp expiryThreshold = new Timestamp(System.currentTimeMillis() - leaseExpiryMillis); + // Match QUEUED or RUNNING jobs owned by any instance with an expired (or missing) lease stamp + List conditions = UtilMisc.toList( + EntityCondition.makeCondition( + EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_QUEUED"), + EntityOperator.OR, + EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING")), + EntityCondition.makeCondition("runByInstanceId", EntityOperator.NOT_EQUAL, null), + EntityCondition.makeCondition( + EntityCondition.makeCondition("leaseUpdatedStamp", EntityOperator.LESS_THAN, expiryThreshold), + EntityOperator.OR, + EntityCondition.makeCondition("leaseUpdatedStamp", EntityOperator.EQUALS, null))); + int recovered = 0; + try { + recovered = delegator.storeByCondition("JobSandbox", + UtilMisc.toMap("statusId", "SERVICE_PENDING", "runByInstanceId", null, "startDateTime", null, "leaseUpdatedStamp", null), + EntityCondition.makeCondition(conditions)); + if (recovered > 0) { + Debug.logInfo("Stale job recovery: reset " + recovered + + " expired job(s) to SERVICE_PENDING on instance [" + INSTANCE_ID + "]", MODULE); + } + } catch (GenericEntityException e) { + Debug.logWarning(e, "Exception thrown while recovering stale jobs: ", MODULE); + } + return recovered; + } + public static List getJobsToPurge(Delegator delegator, String poolId, String instanceId, int limit, Timestamp purgeTime) throws GenericEntityException { List purgeCondition = UtilMisc.toList( diff --git a/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPoller.java b/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPoller.java index a1edb54e130..7b1b55f584f 100644 --- a/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPoller.java +++ b/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPoller.java @@ -115,6 +115,24 @@ private static int pollWaitTime() { } } + private static int leaseRefreshTime() { + try { + return ServiceConfigUtil.getServiceEngine(ServiceConfigUtil.getEngine()).getThreadPool().getLeaseRefreshMillis(); + } catch (GenericConfigException e) { + Debug.logWarning(e, "Exception thrown while getting lease-refresh-millis, using default: ", MODULE); + return ThreadPool.LEASE_REFRESH_MILLIS; + } + } + + private static int leaseValidationTime() { + try { + return ServiceConfigUtil.getServiceEngine(ServiceConfigUtil.getEngine()).getThreadPool().getLeaseValidationMillis(); + } catch (GenericConfigException e) { + Debug.logWarning(e, "Exception thrown while getting lease-validation-millis, using default: ", MODULE); + return ThreadPool.LEASE_VALIDATION_MILLIS; + } + } + static int queueSize() { try { ThreadPool threadPool = ServiceConfigUtil.getServiceEngine(ServiceConfigUtil.getEngine()).getThreadPool(); @@ -245,7 +263,7 @@ public Thread newThread(Runnable runnable) { } } - // Polls all registered JobManagers for jobs to queue. + // Polls all registered JobManagers for jobs to queue, and runs periodic lease heartbeat and stale-job recovery. private final class JobManagerPoller implements Runnable { // Do not check for interrupts in this method. The design requires the @@ -253,6 +271,8 @@ private final class JobManagerPoller implements Runnable { @Override public void run() { Debug.logInfo("JobPoller thread started.", MODULE); + long lastHeartbeatTime = 0; + long lastRecoveryTime = 0; try { while (Start.getInstance().getCurrentState() != Start.ServerState.RUNNING) { Thread.sleep(1000); @@ -295,6 +315,21 @@ public void run() { } } } + long now = System.currentTimeMillis(); + // Heartbeat: renew leaseUpdatedStamp for all jobs owned by this instance. + if (now - lastHeartbeatTime > leaseRefreshTime()) { + for (JobManager jm : JOB_MANAGERS.values()) { + jm.heartbeatRunningJobs(); + } + lastHeartbeatTime = now; + } + // Recovery: reset stale jobs from dead nodes back to SERVICE_PENDING. + if (now - lastRecoveryTime > leaseValidationTime()) { + for (JobManager jm : JOB_MANAGERS.values()) { + jm.recoverStaleJobs(); + } + lastRecoveryTime = now; + } Thread.sleep(pollWaitTime()); } } catch (InterruptedException e) { diff --git a/framework/service/src/main/java/org/apache/ofbiz/service/job/PersistedServiceJob.java b/framework/service/src/main/java/org/apache/ofbiz/service/job/PersistedServiceJob.java index 32c2a9ccf41..c700d8969b0 100644 --- a/framework/service/src/main/java/org/apache/ofbiz/service/job/PersistedServiceJob.java +++ b/framework/service/src/main/java/org/apache/ofbiz/service/job/PersistedServiceJob.java @@ -44,7 +44,10 @@ import org.apache.ofbiz.entity.GenericValue; import org.apache.ofbiz.entity.serialize.SerializeException; import org.apache.ofbiz.entity.serialize.XmlSerializer; +import org.apache.ofbiz.entity.condition.EntityCondition; +import org.apache.ofbiz.entity.condition.EntityOperator; import org.apache.ofbiz.entity.util.EntityQuery; +import org.apache.ofbiz.base.util.UtilMisc; import org.apache.ofbiz.service.DispatchContext; import org.apache.ofbiz.service.GenericRequester; import org.apache.ofbiz.service.ServiceUtil; @@ -156,6 +159,7 @@ protected void init() throws InvalidJobException { } jobValue.set("startDateTime", UtilDateTime.nowTimestamp()); jobValue.set("statusId", "SERVICE_RUNNING"); + jobValue.set("leaseUpdatedStamp", UtilDateTime.nowTimestamp()); try { jobValue.store(); } catch (GenericEntityException e) { @@ -219,13 +223,30 @@ private void createRecurrence(long next, boolean isRetryOnFailure) throws Generi /* This solution ensures that the system uses a consistent, UTC-based time for scheduling and rescheduling recurring jobs, even when DST changes affect the local time. - */ + */ ZonedDateTime nextRunTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(next), ZoneId.of("UTC")); if (nextRunTime.toInstant().toEpochMilli() > startTime) { String pJobId = jobValue.getString("parentJobId"); if (pJobId == null) { pJobId = jobValue.getString("jobId"); } + // Check if the next recurrence has already been created (e.g. by a previous failed attempt on another node) + long nextEpoch = nextRunTime.toInstant().toEpochMilli(); + long existingCount = EntityQuery.use(delegator).from("JobSandbox") + .where(EntityCondition.makeCondition(UtilMisc.toList( + EntityCondition.makeCondition("parentJobId", EntityOperator.EQUALS, pJobId), + EntityCondition.makeCondition("runTimeEpoch", EntityOperator.EQUALS, nextEpoch), + EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_PENDING")))) + .queryCount(); + if (existingCount > 0) { + if (Debug.infoOn()) { + Debug.logInfo("Skipping duplicate recurrence for job [" + getJobId() + + "] - next slot at epoch [" + nextEpoch + "] already exists for parent [" + pJobId + "]", + MODULE); + } + nextRecurrence = next; + return; + } GenericValue newJob = GenericValue.create(jobValue); newJob.remove("jobId"); newJob.set("previousJobId", jobValue.getString("jobId")); @@ -235,6 +256,7 @@ private void createRecurrence(long next, boolean isRetryOnFailure) throws Generi newJob.set("runByInstanceId", null); newJob.set("runTime", Timestamp.from(nextRunTime.toInstant())); newJob.set("runTimeEpoch", nextRunTime.toInstant().toEpochMilli()); + newJob.set("leaseUpdatedStamp", null); if (isRetryOnFailure) { newJob.set("currentRetryCount", currentRetryCount + 1); } else { @@ -405,6 +427,7 @@ public void deQueue() throws InvalidJobException { jobValue.refresh(); jobValue.set("startDateTime", null); jobValue.set("runByInstanceId", null); + jobValue.set("leaseUpdatedStamp", null); jobValue.set("statusId", "SERVICE_PENDING"); jobValue.store(); } catch (GenericEntityException e) {