Skip to content

Commit f138eb2

Browse files
authored
Merge pull request #41 from vdoleans01/master
Possibility to reset connection to redis
2 parents 29c53d8 + c75b374 commit f138eb2

17 files changed

Lines changed: 693 additions & 117 deletions
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package com.amadeus.session;
2+
3+
import java.util.concurrent.ConcurrentLinkedQueue;
4+
5+
/**
6+
* The class is used to determine if the application is in error or not
7+
*
8+
* If the number of error stored during the delay reach the max , the application is in error
9+
*
10+
* @param period
11+
* Time in milliseconds while items are kept
12+
* @param max
13+
* If the number of items is more than the max value the Tracker is considered in error
14+
*
15+
*/
16+
17+
public class ErrorTracker {
18+
public ErrorTracker(int period, int max) {
19+
super();
20+
this.period = period;
21+
this.max = max;
22+
}
23+
24+
private ConcurrentLinkedQueue<Long> list = new ConcurrentLinkedQueue<Long>();
25+
26+
/**
27+
* When a new event it added , all the event that is older that ( event time - period ) are removed from the tracker
28+
*/
29+
final int period;
30+
31+
/**
32+
* After the limits the Tracker is considering in error
33+
*/
34+
final int max;
35+
36+
/**
37+
* The parameter is a time with the format millisecond from 1900 ( System.currentTimeMillis() ) This method add an
38+
* event into the Tracker and remove all the old event with the followinf criteria
39+
*
40+
* now - oldevent > period will be
41+
*
42+
* removed
43+
*
44+
* @param now
45+
*/
46+
47+
public void addError(long now) {
48+
list.add(new Long(now));
49+
boolean cont = true;
50+
while (cont) {
51+
Long last = list.peek();
52+
if (now - last > period) {
53+
list.poll();
54+
} else {
55+
cont = false;
56+
}
57+
}
58+
}
59+
60+
public boolean reachLimits() {
61+
return list.size() >= max;
62+
}
63+
64+
public int size() {
65+
return list.size();
66+
}
67+
68+
public void reset() {
69+
list.clear();
70+
}
71+
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package com.amadeus.session;
2+
3+
import static com.codahale.metrics.MetricRegistry.name;
4+
5+
import com.codahale.metrics.JmxReporter;
6+
import com.codahale.metrics.Meter;
7+
import com.codahale.metrics.MetricRegistry;
8+
9+
public class ResetManager {
10+
private final MetricRegistry monitoring;
11+
12+
private JmxReporter reporter;
13+
14+
private final ErrorTracker errorTracker;
15+
16+
private final Meter create_meter;
17+
18+
private final Meter notConnected_meter;
19+
20+
private final Meter connected_meter;
21+
22+
private final SessionConfiguration configuration;
23+
24+
protected final ExecutorFacade executors;
25+
26+
final String SESSIONS_METRIC_MANAGER_PREFIX = "com.amadeus.session.manager";
27+
28+
final String RESETMANAGER_CREATED_METRIC = name(SESSIONS_METRIC_MANAGER_PREFIX, "initialized");
29+
30+
final String RESETMANAGER_NOTCONNECTED_METRIC = name(SESSIONS_METRIC_MANAGER_PREFIX, "notConnected");
31+
32+
final String RESETMANAGER_CONNECTED_METRIC = name(SESSIONS_METRIC_MANAGER_PREFIX, "connected");
33+
34+
public ResetManager(ExecutorFacade executors, SessionConfiguration configuration) {
35+
this.executors = executors;
36+
37+
monitoring = new MetricRegistry();
38+
39+
this.configuration = configuration;
40+
41+
create_meter = monitoring.meter(RESETMANAGER_CREATED_METRIC);
42+
notConnected_meter = monitoring.meter(RESETMANAGER_NOTCONNECTED_METRIC);
43+
connected_meter = monitoring.meter(RESETMANAGER_CONNECTED_METRIC);
44+
45+
errorTracker = new ErrorTracker(configuration.getTrackerInterval(), configuration.getTrackerLimits());
46+
47+
startMonitoring();
48+
49+
create_meter.mark();
50+
}
51+
52+
/**
53+
* Starts monitoring this session manager. The method will expose all metrics through JMX.
54+
*/
55+
private void startMonitoring() {
56+
executors.startMetrics(monitoring);
57+
reporter = JmxReporter.forRegistry(monitoring).inDomain(getJmxDomain()).build();
58+
reporter.start();
59+
}
60+
61+
/**
62+
* Returns JMX domain for metrics for this session manager.
63+
*
64+
* @return JMX domain for metrics
65+
*/
66+
private String getJmxDomain() {
67+
return "metrics.session." + configuration.getNamespace();
68+
}
69+
70+
public ErrorTracker getErrorTracker() {
71+
return errorTracker;
72+
}
73+
74+
public void reset() {
75+
errorTracker.reset();
76+
create_meter.mark();
77+
}
78+
79+
public void connected() {
80+
connected_meter.mark();
81+
}
82+
83+
public void notConnected() {
84+
notConnected_meter.mark();
85+
}
86+
87+
}

session-replacement/src/main/java/com/amadeus/session/SessionConfiguration.java

Lines changed: 67 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,22 @@ public interface AttributeProvider {
130130
Object source();
131131
}
132132

133+
134+
135+
136+
137+
138+
/**
139+
* Indicates the interval when error are count.
140+
*/
141+
public static final int DEFAULT_TRACKER_ERROR_INTERVAL_MILLISECONDS_NUM = 60000;
142+
public static final String TRACKER_ERROR_INTERVAL_MILLISECONDS_KEY = "com.amadeus.session.tracker.interval" ;
143+
/**
144+
* Indicates the maximun number of errors before request the reset of redis connection.
145+
*/
146+
public static final int DEFAULT_TRACKER_ERROR_LIMITS_NUMBER = 50;
147+
public static final String TRACKER_ERROR_LIMITS_NUMBER_KEY = "com.amadeus.session.tracker.limits" ;
148+
133149
/**
134150
* Indicates if sessions can be distributed or not.
135151
*/
@@ -303,6 +319,8 @@ public interface AttributeProvider {
303319
*/
304320
public static final String DELEGATE_WRITER = "com.amadeus.session.delegate.writer";
305321

322+
private int trackerInterval;
323+
private int trackerLimits;
306324
private int maxInactiveInterval;
307325
private boolean distributable;
308326
private boolean sticky;
@@ -362,10 +380,31 @@ public SessionConfiguration() {
362380
logger.error("`{}` system property was not an integer: {}, using default {}", DEFAULT_SESSION_TIMEOUT,
363381
inactiveValue, maxInactiveInterval);
364382
}
383+
384+
trackerInterval = init(TRACKER_ERROR_INTERVAL_MILLISECONDS_KEY,DEFAULT_TRACKER_ERROR_INTERVAL_MILLISECONDS_NUM);
385+
trackerLimits = init(TRACKER_ERROR_LIMITS_NUMBER_KEY,DEFAULT_TRACKER_ERROR_LIMITS_NUMBER);
386+
logger.error("trackerInterval:" + trackerInterval );
387+
logger.error("trackerLimits :" + trackerLimits );
388+
389+
365390
node = initNode();
366391
setEncryptionKey(getPropertySecured(SESSION_ENCRYPTION_KEY, null));
367392
}
368393

394+
private int init( String key , int def) {
395+
int intVal = def;
396+
String strVal = getPropertySecured(key , String.valueOf(def));
397+
try {
398+
if (nonEmpty(strVal)) {
399+
intVal = Integer.parseInt(strVal);
400+
}
401+
} catch (NumberFormatException e) {
402+
logger.error("`{}` system property was not an integer: {}, using default {}", key,
403+
strVal, intVal);
404+
}
405+
return intVal ;
406+
}
407+
369408
/**
370409
* Parses value as comma-separated list of session propagators. Used to
371410
* specify several propagator in order of priority.
@@ -435,19 +474,26 @@ public void initializeFrom(AttributeProvider provider) {
435474
if (nonEmpty(value)) {
436475
setNonCacheable(value);
437476
}
438-
initMaxInactiveInterval(provider);
477+
maxInactiveInterval = initInt(provider,DEFAULT_SESSION_TIMEOUT,maxInactiveInterval);
478+
trackerLimits = initInt(provider,TRACKER_ERROR_LIMITS_NUMBER_KEY ,trackerLimits);
479+
trackerInterval = initInt(provider,TRACKER_ERROR_INTERVAL_MILLISECONDS_KEY,trackerInterval);
480+
481+
logger.error("trackerInterval:" + trackerInterval );
482+
logger.error("trackerLimits :" + trackerLimits );
483+
439484
}
440485

441-
private void initMaxInactiveInterval(AttributeProvider provider) {
442-
String val = provider.getAttribute(DEFAULT_SESSION_TIMEOUT);
486+
private int initInt(AttributeProvider provider , String name , int maxInactiveInterval) {
487+
String val = provider.getAttribute(name);
443488
if (nonEmpty(val)) {
444489
try {
445-
maxInactiveInterval = Integer.parseInt(val);
490+
return Integer.parseInt(val);
446491
} catch (NumberFormatException e) {
447-
logger.warn("`{}` configuration attribute was not an integer: {} for source {}", DEFAULT_SESSION_TIMEOUT, val,
492+
logger.warn("`{}` configuration attribute was not an integer: {} for source {}", name, val,
448493
provider.source());
449494
}
450495
}
496+
return maxInactiveInterval;
451497
}
452498

453499
private boolean read(String key, boolean defaultValue) {
@@ -1046,4 +1092,20 @@ public String toString() {
10461092
.append(commitOnAllConcurrent).append(", timestamp=").append(timestampSufix).append("]");
10471093
return builder.toString();
10481094
}
1095+
1096+
public int getTrackerInterval() {
1097+
return trackerInterval;
1098+
}
1099+
1100+
public void setTrackerInterval(int trackerInterval) {
1101+
this.trackerInterval = trackerInterval;
1102+
}
1103+
1104+
public int getTrackerLimits() {
1105+
return trackerLimits;
1106+
}
1107+
1108+
public void setTrackerLimits(int trackerLimits) {
1109+
this.trackerLimits = trackerLimits;
1110+
}
10491111
}

session-replacement/src/main/java/com/amadeus/session/SessionManager.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,10 @@ public class SessionManager implements Closeable {
5555
private static final Logger logger = LoggerFactory.getLogger(SessionManager.class);
5656

5757
static final String SESSIONS_METRIC_PREFIX = "com.amadeus.session";
58+
5859

60+
61+
5962
private static final String COMMIT_TIMER_METRIC = name(SESSIONS_METRIC_PREFIX, "timer", "commit");
6063
private static final String FETCH_TIMER_METRIC = name(SESSIONS_METRIC_PREFIX, "timer", "fetch");
6164
private static final String CREATED_SESSIONS_METRIC = name(SESSIONS_METRIC_PREFIX, "created");
@@ -115,7 +118,7 @@ public class SessionManager implements Closeable {
115118
* the class loader to use
116119
*/
117120
public SessionManager(ExecutorFacade executors, SessionFactory factory, SessionRepository repository,
118-
SessionTracking tracking, SessionNotifier notifier, SessionConfiguration configuration, ClassLoader classLoader) {
121+
SessionTracking tracking, SessionNotifier notifier, SessionConfiguration configuration, ClassLoader classLoader ) {
119122

120123
this.repository = repository;
121124
this.tracking = tracking;
@@ -126,6 +129,7 @@ public SessionManager(ExecutorFacade executors, SessionFactory factory, SessionR
126129
this.classLoader = classLoader;
127130

128131
monitoring = new MetricRegistry();
132+
129133
createdSessions = monitoring.meter(CREATED_SESSIONS_METRIC);
130134
deletedSessions = monitoring.meter(DELETED_SESSIONS_METRIC);
131135
retrievedSessions = monitoring.meter(RETRIEVED_SESSIONS_METRIC);
@@ -134,7 +138,7 @@ public SessionManager(ExecutorFacade executors, SessionFactory factory, SessionR
134138
invalidationExpiryErrors = monitoring.meter(INVALIDATION_ON_EXPIRY_ERRORS_METRIC);
135139
commitTimer = monitoring.timer(COMMIT_TIMER_METRIC);
136140
fetchTimer = monitoring.timer(FETCH_TIMER_METRIC);
137-
141+
138142
serializerDeserializer = configuration.isUsingEncryption() ?
139143
new EncryptingSerializerDeserializer() :
140144
new JdkSerializerDeserializer();
@@ -570,6 +574,21 @@ public String encodeUrl(RequestWithSession request, String url) {
570574
return tracking.encodeUrl(request, url);
571575
}
572576

577+
578+
579+
/**
580+
* Called to shutdown the session manager and perform needed cleanup.
581+
*/
582+
583+
public void reset() {
584+
if (reporter != null) {
585+
reporter.stop();
586+
reporter.close();
587+
}
588+
589+
repository.reset();
590+
executors.shutdown();
591+
}
573592
/**
574593
* Called to shutdown the session manager and perform needed cleanup.
575594
*/
@@ -637,4 +656,9 @@ public void remove(SessionData sessionData) {
637656
markSessionDeletion( sessionData.getId() );
638657
getRepository().remove(sessionData);
639658
}
659+
660+
public boolean isConnected(){
661+
return this.repository.isConnected();
662+
}
663+
640664
}

session-replacement/src/main/java/com/amadeus/session/SessionRepository.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,11 @@ public interface SessionRepository extends Closeable {
139139
*/
140140
Collection<String> getOwnedSessionIds();
141141

142+
/**
143+
* Called to reset the repository and release resources.
144+
*/
145+
public void reset() ;
146+
142147
/**
143148
* Called to shutdown the repository and release resources.
144149
*/
@@ -210,6 +215,11 @@ interface CommitTransaction {
210215
* data being distributed to remote store.
211216
*/
212217
boolean isDistributing();
218+
219+
220+
213221
}
214222

223+
boolean isConnected();
224+
215225
}

0 commit comments

Comments
 (0)