Skip to content

Commit bf1d116

Browse files
ludochgae-java-bot
authored andcommitted
Upgrade App Engine dev task queue emulation to Quartz 2.4.1 which is the supported version going forward.
PiperOrigin-RevId: 899078838 Change-Id: Id81b6a5e26d3b587b715692edbb288063247ac39
1 parent f3e3646 commit bf1d116

10 files changed

Lines changed: 343 additions & 205 deletions

File tree

api_dev/pom.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@
112112
<artifactId>jsoup</artifactId>
113113
</dependency>
114114
<dependency>
115-
<groupId>quartz</groupId>
115+
<groupId>org.quartz-scheduler</groupId>
116116
<artifactId>quartz</artifactId>
117117
</dependency>
118118
<dependency>
@@ -244,3 +244,4 @@
244244
</plugins>
245245
</build>
246246
</project>
247+

api_dev/src/main/java/com/google/appengine/api/taskqueue/dev/DevPushQueue.java

Lines changed: 75 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,12 @@
1616

1717
package com.google.appengine.api.taskqueue.dev;
1818

19+
import static org.quartz.JobBuilder.newJob;
20+
import static org.quartz.JobKey.jobKey;
21+
import static org.quartz.TriggerBuilder.newTrigger;
22+
import static org.quartz.impl.matchers.GroupMatcher.jobGroupEquals;
23+
import static org.quartz.impl.matchers.GroupMatcher.triggerGroupEquals;
24+
1925
import com.google.appengine.api.taskqueue.dev.QueueStateInfo.TaskStateInfo;
2026
import com.google.appengine.api.taskqueue_bytes.TaskQueuePb.TaskQueueAddRequest;
2127
import com.google.appengine.api.taskqueue_bytes.TaskQueuePb.TaskQueueAddRequest.Header;
@@ -28,19 +34,24 @@
2834
import com.google.apphosting.utils.config.QueueXml;
2935
import com.google.common.flogger.GoogleLogger;
3036
import com.google.protobuf.ByteString;
37+
import java.time.Instant;
3138
import java.util.ArrayList;
32-
import java.util.Arrays;
3339
import java.util.Collections;
3440
import java.util.Date;
3541
import java.util.List;
42+
import java.util.Set;
3643
import org.quartz.Job;
44+
import org.quartz.JobDataMap;
3745
import org.quartz.JobDetail;
3846
import org.quartz.JobExecutionContext;
3947
import org.quartz.JobExecutionException;
48+
import org.quartz.JobKey;
4049
import org.quartz.Scheduler;
4150
import org.quartz.SchedulerException;
4251
import org.quartz.SimpleTrigger;
4352
import org.quartz.Trigger;
53+
import org.quartz.impl.JobExecutionContextImpl;
54+
import org.quartz.spi.OperableTrigger;
4455
import org.quartz.spi.TriggerFiredBundle;
4556

4657
/**
@@ -89,7 +100,7 @@ Mode getMode() {
89100
// to the contrary, pausing a job group only pauses jobs that already
90101
// exist. We need to make sure all future jobs are paused, and that
91102
// works if we pause the trigger group.
92-
scheduler.pauseTriggerGroup(getQueueName());
103+
scheduler.pauseTriggers(triggerGroupEquals(getQueueName()));
93104
} catch (SchedulerException e) {
94105
throw new ApiProxy.ApplicationException(ErrorCode.INTERNAL_ERROR_VALUE, e.getMessage());
95106
}
@@ -112,7 +123,7 @@ private synchronized String scheduleTask(TaskQueueAddRequest.Builder addRequest)
112123
taskName = genTaskName();
113124
}
114125
try {
115-
if (scheduler.getJobDetail(taskName, getQueueName()) != null) {
126+
if (scheduler.checkExists(jobKey(taskName, getQueueName()))) {
116127
throw new ApiProxy.ApplicationException(ErrorCode.TASK_ALREADY_EXISTS_VALUE);
117128
}
118129
} catch (SchedulerException e) {
@@ -121,19 +132,32 @@ private synchronized String scheduleTask(TaskQueueAddRequest.Builder addRequest)
121132

122133
TaskQueueRetryParameters retryParams = getRetryParameters(addRequest);
123134
long etaMillis = addRequest.getEtaUsec() / 1000L;
124-
SimpleTrigger trigger = new SimpleTrigger(taskName, getQueueName());
125-
trigger.setStartTime(new Date(etaMillis));
126-
JobDetail jd = newUrlFetchJobDetail(taskName, getQueueName(), addRequest, retryParams);
135+
136+
JobDataMap jobDataMap =
137+
newUrlFetchJobDataMap(taskName, getQueueName(), addRequest, retryParams);
138+
139+
JobDetail job =
140+
newJob(UrlFetchJob.class)
141+
.withIdentity(taskName, getQueueName())
142+
.usingJobData(jobDataMap)
143+
.build();
144+
145+
Trigger trigger =
146+
newTrigger()
147+
.withIdentity(taskName, getQueueName())
148+
.startAt(Date.from(Instant.ofEpochMilli(etaMillis)))
149+
.build();
150+
127151
try {
128-
scheduler.scheduleJob(jd, trigger);
152+
scheduler.scheduleJob(job, trigger);
129153
} catch (SchedulerException e) {
130154
throw new ApiProxy.ApplicationException(ErrorCode.INTERNAL_ERROR_VALUE, e.getMessage());
131155
}
132156
return taskName;
133157
}
134158

135159
// broken out to support testing
136-
JobDetail newUrlFetchJobDetail(
160+
JobDataMap newUrlFetchJobDataMap(
137161
String taskName,
138162
String queueName,
139163
TaskQueueAddRequest.Builder addRequest,
@@ -143,18 +167,20 @@ JobDetail newUrlFetchJobDetail(
143167
String host = header.getValue().toStringUtf8();
144168
if (host.startsWith("localhost:")) {
145169
return new UrlFetchJobDetail(
146-
taskName,
147-
queueName,
148-
addRequest,
149-
"http://" + host,
150-
callback,
151-
queueXmlEntry,
152-
retryParams);
170+
taskName,
171+
queueName,
172+
addRequest,
173+
"http://" + host,
174+
callback,
175+
queueXmlEntry,
176+
retryParams)
177+
.getJobDataMap();
153178
}
154179
}
155180
}
156181
return new UrlFetchJobDetail(
157-
taskName, queueName, addRequest, baseUrl, callback, queueXmlEntry, retryParams);
182+
taskName, queueName, addRequest, baseUrl, callback, queueXmlEntry, retryParams)
183+
.getJobDataMap();
158184
}
159185

160186
@Override
@@ -177,8 +203,11 @@ TaskQueueAddResponse add(TaskQueueAddRequest.Builder addRequest) {
177203
}
178204

179205
List<String> getSortedJobNames() throws SchedulerException {
180-
String[] jobNames = scheduler.getJobNames(getQueueName());
181-
List<String> jobNameList = Arrays.asList(jobNames);
206+
Set<JobKey> jobKeys = scheduler.getJobKeys(jobGroupEquals(getQueueName()));
207+
List<String> jobNameList = new ArrayList<>();
208+
for (JobKey jobKey : jobKeys) {
209+
jobNameList.add(jobKey.getName());
210+
}
182211
Collections.sort(jobNameList);
183212
return jobNameList;
184213
}
@@ -191,23 +220,27 @@ QueueStateInfo getStateInfo() {
191220
// Get the names of all jobs belonging to this queue (group).
192221
for (String jobName : getSortedJobNames()) {
193222
// Now get job details
194-
UrlFetchJobDetail jd = (UrlFetchJobDetail) scheduler.getJobDetail(jobName, getQueueName());
195-
if (jd == null) {
223+
JobDetail jobDetail = scheduler.getJobDetail(jobKey(jobName, getQueueName()));
224+
if (jobDetail == null) {
196225
// oops, gone, must have already run
197226
continue;
198227
}
199-
Trigger[] triggers = scheduler.getTriggersOfJob(jobName, getQueueName());
200-
if (triggers.length == 0) {
228+
229+
UrlFetchJobDetail jd = new UrlFetchJobDetail(jobDetail.getJobDataMap());
230+
231+
List<? extends Trigger> triggers =
232+
scheduler.getTriggersOfJob(jobKey(jobName, getQueueName()));
233+
if (triggers.isEmpty()) {
201234
// must have run in between the time we fetched the job detail and the time we fetched the
202235
// trigger
203236
continue;
204237
}
205-
if (triggers.length != 1) {
238+
if (triggers.size() != 1) {
206239
throw new IllegalStateException(
207240
"Multiple triggers for task " + jobName + " in queue " + getQueueName());
208241
}
209-
long execTime = triggers[0].getStartTime().getTime();
210-
taskInfoList.add(new TaskStateInfo(jd.getName(), execTime, jd.getAddRequest(), clock));
242+
long execTime = triggers.get(0).getStartTime().toInstant().toEpochMilli();
243+
taskInfoList.add(new TaskStateInfo(jd.getTaskName(), execTime, jd.getAddRequest(), clock));
211244
}
212245
} catch (SchedulerException e) {
213246
throw new ApiProxy.ApplicationException(ErrorCode.INTERNAL_ERROR_VALUE);
@@ -231,7 +264,7 @@ QueueStateInfo getStateInfo() {
231264
@Override
232265
boolean deleteTask(String taskName) {
233266
try {
234-
return scheduler.deleteJob(taskName, getQueueName());
267+
return scheduler.deleteJob(jobKey(taskName, getQueueName()));
235268
} catch (SchedulerException e) {
236269
throw new ApiProxy.ApplicationException(ErrorCode.INTERNAL_ERROR_VALUE);
237270
}
@@ -241,20 +274,18 @@ boolean deleteTask(String taskName) {
241274
@Override
242275
void flush() {
243276
try {
244-
for (String name : scheduler.getJobNames(getQueueName())) {
245-
scheduler.deleteJob(name, getQueueName());
246-
}
277+
Set<JobKey> jobKeys = scheduler.getJobKeys(jobGroupEquals(getQueueName()));
278+
scheduler.deleteJobs(new ArrayList<>(jobKeys));
247279
} catch (SchedulerException e) {
248280
throw new ApiProxy.ApplicationException(ErrorCode.INTERNAL_ERROR_VALUE);
249281
}
250282
}
251283

252-
private JobExecutionContext getExecutionContext(UrlFetchJobDetail jobDetail) {
253-
Trigger trigger = new SimpleTrigger(jobDetail.getTaskName(), jobDetail.getQueueName());
254-
trigger.setJobDataMap(jobDetail.getJobDataMap());
284+
private JobExecutionContext getExecutionContext(JobDetail jobDetail, SimpleTrigger trigger) {
255285
TriggerFiredBundle bundle =
256-
new TriggerFiredBundle(jobDetail, trigger, null, false, null, null, null, null);
257-
return new JobExecutionContext(scheduler, bundle, null);
286+
new TriggerFiredBundle(
287+
jobDetail, (OperableTrigger) trigger, null, false, null, null, null, null);
288+
return new JobExecutionContextImpl(scheduler, bundle, null);
258289
}
259290

260291
/**
@@ -269,11 +300,19 @@ boolean runTask(String taskName) {
269300
Job job;
270301
JobExecutionContext context;
271302
try {
272-
UrlFetchJobDetail jd = (UrlFetchJobDetail) scheduler.getJobDetail(taskName, getQueueName());
303+
JobDetail jd = scheduler.getJobDetail(jobKey(taskName, getQueueName()));
273304
if (jd == null) {
274305
return false;
275306
}
276-
context = getExecutionContext(jd);
307+
// Reconstruct trigger for execution context - just needs to hold data map
308+
SimpleTrigger trigger =
309+
(SimpleTrigger)
310+
newTrigger()
311+
.withIdentity(taskName, getQueueName())
312+
.usingJobData(jd.getJobDataMap())
313+
.build();
314+
315+
context = getExecutionContext(jd, trigger);
277316
job = (Job) jd.getJobClass().newInstance();
278317
} catch (SchedulerException e) {
279318
return false;

api_dev/src/main/java/com/google/appengine/api/taskqueue/dev/LocalTaskQueue.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import java.util.List;
6464
import java.util.Map;
6565
import java.util.Map.Entry;
66+
import java.util.Properties;
6667
import java.util.Random;
6768
import java.util.TreeMap;
6869
import org.jspecify.annotations.Nullable;
@@ -690,7 +691,13 @@ static Scheduler startScheduler(boolean disableAutoTaskExecution) {
690691
// TODO: Investigate config options for the scheduler like
691692
// threadpool size.
692693
try {
693-
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
694+
StdSchedulerFactory factory = new StdSchedulerFactory();
695+
Properties props = new Properties();
696+
props.setProperty("org.quartz.scheduler.instanceName", "AppEngineLocalTaskQueue");
697+
props.setProperty("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
698+
props.setProperty("org.quartz.threadPool.threadCount", "10");
699+
factory.initialize(props);
700+
Scheduler scheduler = factory.getScheduler();
694701
// When a scheduler is first created it is in standby mode, which means
695702
// it will accept and schedule tasks but won't ever run them.
696703
if (!disableAutoTaskExecution) {

api_dev/src/main/java/com/google/appengine/api/taskqueue/dev/UrlFetchJob.java

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,29 @@
1616

1717
package com.google.appengine.api.taskqueue.dev;
1818

19+
import static org.quartz.JobBuilder.newJob;
20+
import static org.quartz.TriggerBuilder.newTrigger;
21+
1922
import com.google.appengine.api.taskqueue_bytes.TaskQueuePb.TaskQueueAddRequest;
2023
import com.google.appengine.api.taskqueue_bytes.TaskQueuePb.TaskQueueRetryParameters;
2124
import com.google.appengine.api.urlfetch.URLFetchServicePb.URLFetchRequest;
22-
import com.google.appengine.api.urlfetch.dev.LocalURLFetchService;
2325
import com.google.appengine.tools.development.Clock;
2426
import com.google.appengine.tools.development.LocalServerEnvironment;
2527
import com.google.apphosting.utils.config.QueueXml;
2628
import com.google.common.flogger.GoogleLogger;
2729
import java.text.DecimalFormat;
30+
import java.time.Instant;
2831
import java.util.Date;
2932
import org.quartz.Job;
33+
import org.quartz.JobDataMap;
34+
import org.quartz.JobDetail;
3035
import org.quartz.JobExecutionContext;
3136
import org.quartz.JobExecutionException;
3237
import org.quartz.Scheduler;
3338
import org.quartz.SchedulerException;
3439
import org.quartz.SimpleTrigger;
3540
import org.quartz.Trigger;
41+
import org.quartz.TriggerKey;
3642

3743
/**
3844
* Quartz {@link Job} implementation that hits a url. The url to hit, the http method to invoke,
@@ -85,8 +91,9 @@ public void execute(JobExecutionContext context) throws JobExecutionException {
8591
throw new JobExecutionException(
8692
"Interrupted while waiting for server to initialize.", e, false);
8793
}
88-
Trigger trigger = context.getTrigger();
89-
UrlFetchJobDetail jd = (UrlFetchJobDetail) context.getJobDetail();
94+
95+
UrlFetchJobDetail jd = new UrlFetchJobDetail(context.getJobDetail().getJobDataMap());
96+
9097
URLFetchRequest fetchReq =
9198
newFetchRequest(
9299
jd.getTaskName(),
@@ -104,10 +111,11 @@ public void execute(JobExecutionContext context) throws JobExecutionException {
104111
if ((status < 200 || status > 299) && canRetry(jd, firstTryMs)) {
105112
logger.atInfo().log(
106113
"Web hook at %s returned status code %d. Rescheduling...", fetchReq.getUrl(), status);
107-
reschedule(context.getScheduler(), trigger, jd, firstTryMs, status);
114+
reschedule(context.getScheduler(), context.getTrigger(), jd, firstTryMs, status);
108115
} else {
109116
try {
110-
context.getScheduler().unscheduleJob(trigger.getName(), trigger.getGroup());
117+
Trigger trigger = context.getTrigger();
118+
context.getScheduler().unscheduleJob(trigger.getKey());
111119
} catch (SchedulerException e) {
112120
logger.atSevere().withCause(e).log("Unsubscription of task %s failed.", jd.getAddRequest());
113121
}
@@ -141,16 +149,29 @@ private void reschedule(
141149
long firstTryMs,
142150
int previousResponse) {
143151
// Builds a new job.
144-
UrlFetchJobDetail newJobDetail = jd.retry(firstTryMs, previousResponse);
152+
UrlFetchJobDetail newJobData = jd.retry(firstTryMs, previousResponse);
153+
JobDataMap newJobDataMap = newJobData.getJobDataMap();
145154

146155
// Build the new trigger from the old trigger
147-
SimpleTrigger newTrigger = new SimpleTrigger(trigger.getName(), trigger.getGroup());
148-
newTrigger.setStartTime(new Date(clock.getCurrentTime() + newJobDetail.getRetryDelayMs()));
156+
TriggerKey triggerKey = trigger.getKey();
157+
Instant newStartTime =
158+
Instant.ofEpochMilli(clock.getCurrentTime() + newJobData.getRetryDelayMs());
159+
160+
SimpleTrigger newTrigger =
161+
(SimpleTrigger)
162+
newTrigger().withIdentity(triggerKey).startAt(Date.from(newStartTime)).build();
163+
164+
JobDetail newJob =
165+
newJob(UrlFetchJob.class)
166+
.withIdentity(jd.getTaskName(), jd.getQueueName())
167+
.usingJobData(newJobDataMap)
168+
.build();
169+
149170
try {
150-
// Quartz doesn't allow 2 jobs with the same name so we need to first
171+
// Quartz 2.x doesn't allow 2 jobs with the same name so we need to first
151172
// unschedule the currently executing job before we reschedule
152-
scheduler.unscheduleJob(trigger.getName(), trigger.getGroup());
153-
scheduler.scheduleJob(newJobDetail, newTrigger);
173+
scheduler.unscheduleJob(triggerKey);
174+
scheduler.scheduleJob(newJob, newTrigger);
154175
} catch (SchedulerException e) {
155176
logger.atSevere().withCause(e).log("Reschedule of task %s failed.", jd.getAddRequest());
156177
}

0 commit comments

Comments
 (0)