|
37 | 37 | import org.slf4j.LoggerFactory; |
38 | 38 |
|
39 | 39 | import java.time.Duration; |
| 40 | +import java.util.HashSet; |
40 | 41 | import java.util.Iterator; |
41 | 42 | import java.util.List; |
42 | 43 | import java.util.Set; |
@@ -71,11 +72,21 @@ public class ManagementLogger implements MessageReader { |
71 | 72 | private final AtomicInteger evictionTriggerCounter = new AtomicInteger(0); |
72 | 73 | private final ConcurrentMap<Long,EvictionTrigger> evictionTriggerMap = new ConcurrentHashMap<>(); |
73 | 74 |
|
| 75 | + private final Duration ackTimeout; |
| 76 | + private final boolean autoCloseStaleInstances; |
| 77 | + |
74 | 78 | public ManagementLogger(StandardJanusGraph graph, Log sysLog, SchemaCache schemaCache, TimestampProvider times) { |
| 79 | + this(graph, sysLog, schemaCache, times, Duration.ofSeconds(120), true); |
| 80 | + } |
| 81 | + |
| 82 | + public ManagementLogger(StandardJanusGraph graph, Log sysLog, SchemaCache schemaCache, TimestampProvider times, |
| 83 | + Duration ackTimeout, boolean autoCloseStaleInstances) { |
75 | 84 | this.graph = graph; |
76 | 85 | this.schemaCache = schemaCache; |
77 | 86 | this.sysLog = sysLog; |
78 | 87 | this.times = times; |
| 88 | + this.ackTimeout = ackTimeout; |
| 89 | + this.autoCloseStaleInstances = autoCloseStaleInstances; |
79 | 90 | Preconditions.checkNotNull(times); |
80 | 91 | } |
81 | 92 |
|
@@ -163,11 +174,13 @@ private class EvictionTrigger { |
163 | 174 | final List<Callable<Boolean>> updatedTypeTriggers; |
164 | 175 | final StandardJanusGraph graph; |
165 | 176 | final Set<String> instancesToBeAcknowledged; |
| 177 | + final long createdAtMillis; |
166 | 178 |
|
167 | 179 | private EvictionTrigger(long evictionId, List<Callable<Boolean>> updatedTypeTriggers, StandardJanusGraph graph) { |
168 | 180 | this.graph = graph; |
169 | 181 | this.evictionId = evictionId; |
170 | 182 | this.updatedTypeTriggers = updatedTypeTriggers; |
| 183 | + this.createdAtMillis = System.currentTimeMillis(); |
171 | 184 | final JanusGraphManagement mgmt = graph.openManagement(); |
172 | 185 | this.instancesToBeAcknowledged = ConcurrentHashMap.newKeySet(); |
173 | 186 | instancesToBeAcknowledged.addAll(((ManagementSystem) mgmt).getOpenInstancesInternal()); |
@@ -204,8 +217,34 @@ int removeDroppedInstances() { |
204 | 217 | final String instanceRemovedMsg = "Instance [{}] was removed list of open instances and therefore dropped from list of instances to be acknowledged."; |
205 | 218 | instancesToBeAcknowledged.stream().filter(it -> !updatedInstances.contains(it)).filter(instancesToBeAcknowledged::remove).forEach(it -> log.debug(instanceRemovedMsg, it)); |
206 | 219 | mgmt.rollback(); |
| 220 | + |
| 221 | + if (autoCloseStaleInstances && !instancesToBeAcknowledged.isEmpty() |
| 222 | + && (System.currentTimeMillis() - createdAtMillis) > ackTimeout.toMillis()) { |
| 223 | + log.warn("ACK timeout ({}s) exceeded for eviction [{}]. Force-closing unresponsive instances: {}", |
| 224 | + ackTimeout.getSeconds(), evictionId, instancesToBeAcknowledged); |
| 225 | + forceCloseStaleInstances(new HashSet<>(instancesToBeAcknowledged)); |
| 226 | + instancesToBeAcknowledged.clear(); |
| 227 | + } |
| 228 | + |
207 | 229 | return instancesToBeAcknowledged.size(); |
208 | 230 | } |
| 231 | + |
| 232 | + private void forceCloseStaleInstances(Set<String> staleInstances) { |
| 233 | + try { |
| 234 | + final JanusGraphManagement mgmt = graph.openManagement(); |
| 235 | + for (String instanceId : staleInstances) { |
| 236 | + try { |
| 237 | + mgmt.forceCloseInstance(instanceId); |
| 238 | + log.warn("Force-closed stale instance [{}]", instanceId); |
| 239 | + } catch (IllegalArgumentException e) { |
| 240 | + log.debug("Could not force-close instance [{}]: {}", instanceId, e.getMessage()); |
| 241 | + } |
| 242 | + } |
| 243 | + mgmt.commit(); |
| 244 | + } catch (Exception e) { |
| 245 | + log.error("Failed to force-close stale instances", e); |
| 246 | + } |
| 247 | + } |
209 | 248 | } |
210 | 249 |
|
211 | 250 | private class SendAckOnTxClose implements Runnable { |
|
0 commit comments