Skip to content

Commit d4a95a1

Browse files
committed
1 parent 609196f commit d4a95a1

2 files changed

Lines changed: 96 additions & 0 deletions

File tree

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package invite.cron;
2+
3+
import lombok.SneakyThrows;
4+
import org.apache.commons.logging.Log;
5+
import org.apache.commons.logging.LogFactory;
6+
7+
import javax.sql.DataSource;
8+
import java.sql.Connection;
9+
import java.sql.PreparedStatement;
10+
import java.sql.ResultSet;
11+
12+
public abstract class AbstractNodeLeader {
13+
14+
private static final Log LOG = LogFactory.getLog(AbstractNodeLeader.class);
15+
16+
private final String lockName;
17+
private final int timeoutSeconds;
18+
private final DataSource dataSource;
19+
20+
protected AbstractNodeLeader(String lockName, int timeoutSeconds, DataSource dataSource) {
21+
this.lockName = lockName;
22+
this.timeoutSeconds = timeoutSeconds;
23+
this.dataSource = dataSource;
24+
}
25+
26+
@SneakyThrows
27+
public void perform(String name, Executable executable) {
28+
29+
30+
Connection conn = null;
31+
boolean lockAcquired = false;
32+
33+
try {
34+
conn = dataSource.getConnection();
35+
lockAcquired = tryGetLock(conn, lockName, timeoutSeconds);
36+
37+
if (!lockAcquired) {
38+
LOG.info(String.format("Another node is running %s, skipping this one", name));
39+
return;
40+
}
41+
42+
LOG.info(String.format("Lock acquired for %s", name));
43+
executable.execute();
44+
LOG.info(String.format("Executable %s completed successfully", name));
45+
} catch (Throwable e) {
46+
LOG.error(String.format("Error occurred in %s", name), e);
47+
} finally {
48+
if (lockAcquired) {
49+
try {
50+
releaseLock(conn, lockName);
51+
LOG.info(String.format("Lock released for %s", name));
52+
} catch (Exception e) {
53+
LOG.error(String.format("Failed to release lock %s", name), e);
54+
}
55+
}
56+
57+
if (conn != null) {
58+
try {
59+
conn.close();
60+
} catch (Exception ignored) {
61+
//Can't do anything about this
62+
}
63+
}
64+
}
65+
}
66+
67+
private boolean tryGetLock(Connection conn, String name, int timeoutSec) throws Exception {
68+
try (PreparedStatement ps = conn.prepareStatement("SELECT GET_LOCK(?, ?)")) {
69+
ps.setString(1, name);
70+
ps.setInt(2, timeoutSec);
71+
try (ResultSet rs = ps.executeQuery()) {
72+
if (rs.next()) {
73+
int result = rs.getInt(1);
74+
return !rs.wasNull() && result == 1;
75+
}
76+
return false;
77+
}
78+
}
79+
}
80+
81+
private void releaseLock(Connection conn, String name) throws Exception {
82+
try (PreparedStatement ps = conn.prepareStatement("SELECT RELEASE_LOCK(?)")) {
83+
ps.setString(1, name);
84+
ps.executeQuery(); // ignore result
85+
}
86+
}
87+
88+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package invite.cron;
2+
3+
@FunctionalInterface
4+
public interface Executable {
5+
6+
void execute() throws Throwable;
7+
8+
}

0 commit comments

Comments
 (0)