88import java .sql .Connection ;
99import java .sql .PreparedStatement ;
1010import java .sql .ResultSet ;
11+ import java .sql .SQLException ;
1112
1213public abstract class AbstractNodeLeader {
1314
@@ -32,6 +33,8 @@ public void perform(String name, Executable executable) {
3233
3334 if (!lockAcquired ) {
3435 LOG .info (String .format ("Another node is running %s, skipping this one" , name ));
36+ //Might be that there is a lock not cleaned up due to VM crash
37+ this .cleanupStaleLocks (conn , 60 );
3538 return ;
3639 }
3740
@@ -55,30 +58,44 @@ public void perform(String name, Executable executable) {
5558 conn .close ();
5659 } catch (Exception ignored ) {
5760 //Can't do anything about this
61+ LOG .warn (String .format ("Failed to close lock %s" , name ));
5862 }
5963 }
6064 }
6165 }
6266
6367 protected boolean tryGetLock (Connection conn , String name ) throws Exception {
64- try (PreparedStatement ps = conn .prepareStatement ("SELECT GET_LOCK(?, ?)" )) {
65- ps .setString (1 , name );
66- //Use timeout of 0 to have an immediate result
67- ps .setInt (2 , 0 );
68- try (ResultSet rs = ps .executeQuery ()) {
69- if (rs .next ()) {
70- int result = rs .getInt (1 );
71- return !rs .wasNull () && result == 1 ;
72- }
68+ try {
69+ conn .setAutoCommit (false );
70+ try (PreparedStatement ps = conn .prepareStatement (
71+ "INSERT INTO distributed_locks (lock_name, acquired_at) VALUES (?, NOW())" )) {
72+ ps .setString (1 , name );
73+ ps .executeUpdate ();
74+ conn .commit ();
75+ return true ;
76+ } catch (SQLException e ) {
77+ conn .rollback ();
78+ // Duplicate key or other constraint violation means lock is held
7379 return false ;
7480 }
81+ } finally {
82+ conn .setAutoCommit (true );
7583 }
7684 }
7785
7886 private void releaseLock (Connection conn , String name ) throws Exception {
79- try (PreparedStatement ps = conn .prepareStatement ("SELECT RELEASE_LOCK(?)" )) {
87+ try (PreparedStatement ps = conn .prepareStatement (
88+ "DELETE FROM distributed_locks WHERE lock_name = ?" )) {
8089 ps .setString (1 , name );
81- ps .executeQuery (); // ignore result
90+ ps .executeUpdate ();
91+ }
92+ }
93+
94+ private void cleanupStaleLocks (Connection conn , int timeoutMinutes ) throws Exception {
95+ try (PreparedStatement ps = conn .prepareStatement (
96+ "DELETE FROM distributed_locks WHERE acquired_at < NOW() - INTERVAL ? MINUTE" )) {
97+ ps .setInt (1 , timeoutMinutes );
98+ ps .executeUpdate ();
8299 }
83100 }
84101
0 commit comments