Skip to content

Commit a6b3802

Browse files
committed
1 parent 1afb1f0 commit a6b3802

4 files changed

Lines changed: 143 additions & 0 deletions

File tree

dashjoin-core/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,10 @@
259259
<groupId>io.quarkus</groupId>
260260
<artifactId>quarkus-arc</artifactId>
261261
</dependency>
262+
<dependency>
263+
<groupId>io.quarkus</groupId>
264+
<artifactId>quarkus-quartz</artifactId>
265+
</dependency>
262266
<dependency>
263267
<groupId>io.quarkus</groupId>
264268
<artifactId>quarkus-junit5</artifactId>
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
package org.dashjoin.service;
2+
3+
import java.security.Principal;
4+
import java.util.HashMap;
5+
import java.util.Map;
6+
import java.util.logging.Level;
7+
import org.dashjoin.function.FunctionService;
8+
import org.dashjoin.model.Table;
9+
import io.quarkus.runtime.Startup;
10+
import io.quarkus.runtime.StartupEvent;
11+
import io.quarkus.scheduler.Scheduled;
12+
import io.quarkus.scheduler.Scheduler;
13+
import io.quarkus.scheduler.Trigger;
14+
import jakarta.enterprise.context.ApplicationScoped;
15+
import jakarta.enterprise.event.Observes;
16+
import jakarta.inject.Inject;
17+
import jakarta.ws.rs.core.SecurityContext;
18+
import lombok.extern.java.Log;
19+
20+
@Log
21+
@Startup
22+
@ApplicationScoped
23+
public class Cron {
24+
25+
@Inject
26+
Scheduler scheduler;
27+
28+
@Inject
29+
Services services;
30+
31+
@Inject
32+
FunctionService function;
33+
34+
/**
35+
* startup bean must not throw exceptions
36+
*/
37+
public void onStart(@Observes StartupEvent ev) {
38+
try {
39+
resetSchedule();
40+
} catch (Exception e) {
41+
log.log(Level.SEVERE, "Error while starting scheduler", e);
42+
}
43+
}
44+
45+
public Map<String, Object> resetSchedule() throws Exception {
46+
47+
// map of job to error / next runtime
48+
Map<String, Object> res = new HashMap<>();
49+
50+
// unschedule all jobs
51+
for (Trigger t : scheduler.getScheduledJobs())
52+
scheduler.unscheduleJob(t.getId());
53+
54+
// jobs run as "scheduler"
55+
SecurityContext sc = new SecurityContext() {
56+
@Override
57+
public boolean isUserInRole(String role) {
58+
return role.equals("scheduler");
59+
}
60+
61+
@Override
62+
public boolean isSecure() {
63+
return false;
64+
}
65+
66+
@Override
67+
public Principal getUserPrincipal() {
68+
return null;
69+
}
70+
71+
@Override
72+
public String getAuthenticationScheme() {
73+
return null;
74+
}
75+
};
76+
77+
// schedule any function with "cron" or "every"
78+
for (Map<String, Object> f : services.getConfig().getConfigDatabase()
79+
.all(Table.ofName("dj-function"), null, null, null, false, null)) {
80+
String id = (String) f.get("ID");
81+
String cron = (String) f.get("cron");
82+
String every = (String) f.get("every");
83+
if (cron != null) {
84+
try {
85+
log.info("scheduling " + id + " " + cron);
86+
Trigger t = scheduler.newJob((String) id).setCron(cron).setTask(ctx -> {
87+
try {
88+
log.info("running cron scheduled " + id);
89+
function.call(sc, ctx.getTrigger().getId(), null);
90+
log.info(ctx.getTrigger().getId() + " complete");
91+
} catch (Exception e) {
92+
log.log(Level.SEVERE, "error running scheduled job " + id, e);
93+
}
94+
}).setConcurrentExecution(Scheduled.ConcurrentExecution.SKIP).schedule();
95+
res.put(t.getId(), "" + t.getNextFireTime());
96+
} catch (Exception cronEx) {
97+
log.warning("cron syntax error: " + cronEx);
98+
res.put(id, cronEx.toString());
99+
}
100+
}
101+
if (every != null) {
102+
try {
103+
log.info("scheduling " + id + " " + every);
104+
Trigger t = scheduler.newJob((String) id).setInterval(every).setTask(ctx -> {
105+
try {
106+
log.info("running interval scheduled " + id);
107+
function.call(sc, ctx.getTrigger().getId(), null);
108+
log.info(ctx.getTrigger().getId() + " complete");
109+
} catch (Exception e) {
110+
log.log(Level.SEVERE, "error running scheduled job " + id, e);
111+
}
112+
}).setConcurrentExecution(Scheduled.ConcurrentExecution.SKIP).schedule();
113+
res.put(t.getId(), "" + t.getNextFireTime());
114+
} catch (Exception cronEx) {
115+
log.warning("cron syntax error: " + cronEx);
116+
res.put(id, cronEx.toString());
117+
}
118+
}
119+
}
120+
return res;
121+
}
122+
}

dashjoin-core/src/main/java/org/dashjoin/service/Manage.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,9 @@ public class Manage {
107107
@Inject
108108
Data data;
109109

110+
@Inject
111+
Cron cron;
112+
110113
/**
111114
* describes a column when client calls detect
112115
*/
@@ -982,6 +985,16 @@ public List<FunctionVersion> getConfigurableFunctions() {
982985
(java.util.function.Function<FunctionVersion, String>) x -> x.name);
983986
}
984987

988+
@GET
989+
@Path("/resetSchedule")
990+
@Operation(summary = "reloads function schedules")
991+
@APIResponse(description = "job to next execution time or cron syntax error")
992+
public Map<String, Object> resetSchedule(@Context SecurityContext sc) throws Exception {
993+
if (!sc.isUserInRole("admin"))
994+
throw new Exception("must be admin to reset scheduled");
995+
return cron.resetSchedule();
996+
}
997+
985998
/**
986999
* returns the version of the Dashjoin platform
9871000
*/
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
{
2+
"ID": "scheduler",
3+
"description": "Role to run scheduled functions"
4+
}

0 commit comments

Comments
 (0)