Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion framework/service/config/serviceengine.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ under the License.
min-threads="2"
max-threads="5"
poll-enabled="true"
poll-db-millis="30000">
poll-db-millis="30000"
lease-refresh-millis="300000"
lease-validation-millis="480000"
lease-expiry-millis="600000">
<run-from-pool name="pool"/>
</thread-pool>

Expand Down
1 change: 1 addition & 0 deletions framework/service/entitydef/entitymodel.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ under the License.
<field name="cancelDateTime" type="date-time"></field>
<field name="jobResult" type="value"></field>
<field name="recurrenceTimeZone" type="id-long"/>
<field name="leaseUpdatedStamp" type="date-time"/><!-- Last heartbeat timestamp written by the owning node; used for lease-based stale job recovery in multi-node ASG deployments -->
<prim-key field="jobId"/>
<relation type="one" fk-name="JOB_SNDBX_RECINFO" rel-entity-name="RecurrenceInfo">
<key-map field-name="recurrenceInfoId"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ public final class ThreadPool {
public static final int PURGE_JOBS_DAYS = 30;
public static final int QUEUE_SIZE = 100;
public static final int THREAD_TTL = 120000; // Idle thread lifespan - 2 minutes.
public static final int LEASE_REFRESH_MILLIS = 300000; // Heartbeat interval - 5 minutes.
public static final int LEASE_VALIDATION_MILLIS = 480000; // Stale-job scan interval - 8 minutes.
public static final int LEASE_EXPIRY_MILLIS = 600000; // Lease expiry threshold - 10 minutes.

private final int failedRetryMin;
private final int jobs;
Expand All @@ -55,6 +58,9 @@ public final class ThreadPool {
private final List<RunFromPool> runFromPools;
private final String sendToPool;
private final int ttl;
private final int leaseRefreshMillis;
private final int leaseValidationMillis;
private final int leaseExpiryMillis;

ThreadPool(Element poolElement) throws ServiceConfigException, NumberFormatException {
String sendToPool = poolElement.getAttribute("send-to-pool").intern();
Expand Down Expand Up @@ -170,6 +176,12 @@ public final class ThreadPool {
}
this.runFromPools = Collections.unmodifiableList(runFromPools);
}
String leaseRefreshMillis = poolElement.getAttribute("lease-refresh-millis").intern();
this.leaseRefreshMillis = leaseRefreshMillis.isEmpty() ? LEASE_REFRESH_MILLIS : Integer.parseInt(leaseRefreshMillis);
String leaseValidationMillis = poolElement.getAttribute("lease-validation-millis").intern();
this.leaseValidationMillis = leaseValidationMillis.isEmpty() ? LEASE_VALIDATION_MILLIS : Integer.parseInt(leaseValidationMillis);
String leaseExpiryMillis = poolElement.getAttribute("lease-expiry-millis").intern();
this.leaseExpiryMillis = leaseExpiryMillis.isEmpty() ? LEASE_EXPIRY_MILLIS : Integer.parseInt(leaseExpiryMillis);
}

public int getFailedRetryMin() {
Expand Down Expand Up @@ -211,4 +223,16 @@ public String getSendToPool() {
public int getTtl() {
return ttl;
}

public int getLeaseRefreshMillis() {
return leaseRefreshMillis;
}

public int getLeaseValidationMillis() {
return leaseValidationMillis;
}

public int getLeaseExpiryMillis() {
return leaseExpiryMillis;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,73 @@ protected List<Job> poll(int limit) {
return poll;
}

/**
* Bulk-renews the lease (leaseUpdatedStamp) for all RUNNING and QUEUED jobs owned by this instance.
* Called periodically from the {@link JobPoller} main loop.
*/
protected void heartbeatRunningJobs() {
assertIsRunning();
List<EntityExpr> conditions = UtilMisc.toList(
EntityCondition.makeCondition("runByInstanceId", EntityOperator.EQUALS, INSTANCE_ID),
EntityCondition.makeCondition(
EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING"),
EntityOperator.OR,
EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_QUEUED")));
try {
int updated = delegator.storeByCondition("JobSandbox",
UtilMisc.toMap("leaseUpdatedStamp", UtilDateTime.nowTimestamp()),
EntityCondition.makeCondition(conditions));
if (Debug.verboseOn()) {
Debug.logVerbose("Heartbeat: lease renewed for " + updated + " job(s) on instance [" + INSTANCE_ID + "]", MODULE);
}
} catch (GenericEntityException e) {
Debug.logWarning(e, "Exception thrown while renewing job leases: ", MODULE);
}
}

/**
* Scans all RUNNING/QUEUED jobs across all nodes and resets any whose leaseUpdatedStamp
* is older than the configured lease-expiry threshold, releasing them back to SERVICE_PENDING
* so a healthy node can pick them up.
* Uses storeByCondition for atomicity to avoid race conditions in multi-node deployments.
* Called periodically from the {@link JobPoller} main loop.
*/
protected int recoverStaleJobs() {
assertIsRunning();
long leaseExpiryMillis;
try {
leaseExpiryMillis = ServiceConfigUtil.getServiceEngine().getThreadPool().getLeaseExpiryMillis();
} catch (GenericConfigException e) {
Debug.logWarning(e, "Unable to read lease-expiry-millis; using default.", MODULE);
leaseExpiryMillis = org.apache.ofbiz.service.config.model.ThreadPool.LEASE_EXPIRY_MILLIS;
}
Timestamp expiryThreshold = new Timestamp(System.currentTimeMillis() - leaseExpiryMillis);
// Match QUEUED or RUNNING jobs owned by any instance with an expired (or missing) lease stamp
List<EntityCondition> conditions = UtilMisc.toList(
EntityCondition.makeCondition(
EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_QUEUED"),
EntityOperator.OR,
EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING")),
EntityCondition.makeCondition("runByInstanceId", EntityOperator.NOT_EQUAL, null),
EntityCondition.makeCondition(
EntityCondition.makeCondition("leaseUpdatedStamp", EntityOperator.LESS_THAN, expiryThreshold),
EntityOperator.OR,
EntityCondition.makeCondition("leaseUpdatedStamp", EntityOperator.EQUALS, null)));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @Ankit-Joshi11
On environment with multiple pool, it would be useful to indicate to the JobManager to update only it dedicated pool ?

So add the condition
EntityCondition.makeCondition("poolId", EntityOperator.IN, getRunPools())

Or we just ignore the collision risk ?

int recovered = 0;
try {
recovered = delegator.storeByCondition("JobSandbox",
UtilMisc.toMap("statusId", "SERVICE_PENDING", "runByInstanceId", null, "startDateTime", null, "leaseUpdatedStamp", null),
EntityCondition.makeCondition(conditions));
if (recovered > 0) {
Debug.logInfo("Stale job recovery: reset " + recovered
+ " expired job(s) to SERVICE_PENDING on instance [" + INSTANCE_ID + "]", MODULE);
}
} catch (GenericEntityException e) {
Debug.logWarning(e, "Exception thrown while recovering stale jobs: ", MODULE);
}
return recovered;
}

public static List<GenericValue> getJobsToPurge(Delegator delegator, String poolId, String instanceId, int limit, Timestamp purgeTime)
throws GenericEntityException {
List<EntityCondition> purgeCondition = UtilMisc.toList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,24 @@ private static int pollWaitTime() {
}
}

private static int leaseRefreshTime() {
try {
return ServiceConfigUtil.getServiceEngine(ServiceConfigUtil.getEngine()).getThreadPool().getLeaseRefreshMillis();
} catch (GenericConfigException e) {
Debug.logWarning(e, "Exception thrown while getting lease-refresh-millis, using default: ", MODULE);
return ThreadPool.LEASE_REFRESH_MILLIS;
}
}

private static int leaseValidationTime() {
try {
return ServiceConfigUtil.getServiceEngine(ServiceConfigUtil.getEngine()).getThreadPool().getLeaseValidationMillis();
} catch (GenericConfigException e) {
Debug.logWarning(e, "Exception thrown while getting lease-validation-millis, using default: ", MODULE);
return ThreadPool.LEASE_VALIDATION_MILLIS;
}
}

static int queueSize() {
try {
ThreadPool threadPool = ServiceConfigUtil.getServiceEngine(ServiceConfigUtil.getEngine()).getThreadPool();
Expand Down Expand Up @@ -245,14 +263,16 @@ public Thread newThread(Runnable runnable) {
}
}

// Polls all registered JobManagers for jobs to queue.
// Polls all registered JobManagers for jobs to queue, and runs periodic lease heartbeat and stale-job recovery.
private final class JobManagerPoller implements Runnable {

// Do not check for interrupts in this method. The design requires the
// thread to complete the job manager poll uninterrupted.
@Override
public void run() {
Debug.logInfo("JobPoller thread started.", MODULE);
long lastHeartbeatTime = 0;
long lastRecoveryTime = 0;
try {
while (Start.getInstance().getCurrentState() != Start.ServerState.RUNNING) {
Thread.sleep(1000);
Expand Down Expand Up @@ -295,6 +315,21 @@ public void run() {
}
}
}
long now = System.currentTimeMillis();
// Heartbeat: renew leaseUpdatedStamp for all jobs owned by this instance.
if (now - lastHeartbeatTime > leaseRefreshTime()) {
for (JobManager jm : JOB_MANAGERS.values()) {
jm.heartbeatRunningJobs();
}
lastHeartbeatTime = now;
}
// Recovery: reset stale jobs from dead nodes back to SERVICE_PENDING.
if (now - lastRecoveryTime > leaseValidationTime()) {
for (JobManager jm : JOB_MANAGERS.values()) {
jm.recoverStaleJobs();
}
lastRecoveryTime = now;
}
Thread.sleep(pollWaitTime());
}
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@
import org.apache.ofbiz.entity.GenericValue;
import org.apache.ofbiz.entity.serialize.SerializeException;
import org.apache.ofbiz.entity.serialize.XmlSerializer;
import org.apache.ofbiz.entity.condition.EntityCondition;
import org.apache.ofbiz.entity.condition.EntityOperator;
import org.apache.ofbiz.entity.util.EntityQuery;
import org.apache.ofbiz.base.util.UtilMisc;
import org.apache.ofbiz.service.DispatchContext;
import org.apache.ofbiz.service.GenericRequester;
import org.apache.ofbiz.service.ServiceUtil;
Expand Down Expand Up @@ -156,6 +159,7 @@ protected void init() throws InvalidJobException {
}
jobValue.set("startDateTime", UtilDateTime.nowTimestamp());
jobValue.set("statusId", "SERVICE_RUNNING");
jobValue.set("leaseUpdatedStamp", UtilDateTime.nowTimestamp());
try {
jobValue.store();
} catch (GenericEntityException e) {
Expand Down Expand Up @@ -219,13 +223,30 @@ private void createRecurrence(long next, boolean isRetryOnFailure) throws Generi
/*
This solution ensures that the system uses a consistent,
UTC-based time for scheduling and rescheduling recurring jobs, even when DST changes affect the local time.
*/
*/
ZonedDateTime nextRunTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(next), ZoneId.of("UTC"));
if (nextRunTime.toInstant().toEpochMilli() > startTime) {
String pJobId = jobValue.getString("parentJobId");
if (pJobId == null) {
pJobId = jobValue.getString("jobId");
}
// Check if the next recurrence has already been created (e.g. by a previous failed attempt on another node)
long nextEpoch = nextRunTime.toInstant().toEpochMilli();
long existingCount = EntityQuery.use(delegator).from("JobSandbox")
.where(EntityCondition.makeCondition(UtilMisc.toList(
EntityCondition.makeCondition("parentJobId", EntityOperator.EQUALS, pJobId),
EntityCondition.makeCondition("runTimeEpoch", EntityOperator.EQUALS, nextEpoch),
EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_PENDING"))))
.queryCount();
if (existingCount > 0) {
if (Debug.infoOn()) {
Debug.logInfo("Skipping duplicate recurrence for job [" + getJobId()
+ "] - next slot at epoch [" + nextEpoch + "] already exists for parent [" + pJobId + "]",
MODULE);
}
nextRecurrence = next;
return;
}
GenericValue newJob = GenericValue.create(jobValue);
newJob.remove("jobId");
newJob.set("previousJobId", jobValue.getString("jobId"));
Expand All @@ -235,6 +256,7 @@ private void createRecurrence(long next, boolean isRetryOnFailure) throws Generi
newJob.set("runByInstanceId", null);
newJob.set("runTime", Timestamp.from(nextRunTime.toInstant()));
newJob.set("runTimeEpoch", nextRunTime.toInstant().toEpochMilli());
newJob.set("leaseUpdatedStamp", null);
if (isRetryOnFailure) {
newJob.set("currentRetryCount", currentRetryCount + 1);
} else {
Expand Down Expand Up @@ -405,6 +427,7 @@ public void deQueue() throws InvalidJobException {
jobValue.refresh();
jobValue.set("startDateTime", null);
jobValue.set("runByInstanceId", null);
jobValue.set("leaseUpdatedStamp", null);
jobValue.set("statusId", "SERVICE_PENDING");
jobValue.store();
} catch (GenericEntityException e) {
Expand Down
Loading