Skip to content

Commit ce93c37

Browse files
committed
CheckRequestAggregator: remove unused, leaking, out queue.
Check requests are already sent when doing a request. Even if it would work on App Engine (no background threads allowed), there is no point in sending check requests again at some later point. Check requests are still cached (and expired).
1 parent ff4fe8a commit ce93c37

5 files changed

Lines changed: 54 additions & 294 deletions

File tree

endpoints-control/src/main/java/com/google/api/control/Client.java

Lines changed: 2 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public Client(String serviceName, CheckAggregationOptions checkOptions,
9191
ServiceControl transport, ThreadFactory threads,
9292
SchedulerFactory schedulers, int statsLogFrequency, @Nullable Ticker ticker) {
9393
ticker = ticker == null ? Ticker.systemTicker() : ticker;
94-
this.checkAggregator = new CheckRequestAggregator(serviceName, checkOptions, null, ticker);
94+
this.checkAggregator = new CheckRequestAggregator(serviceName, checkOptions, ticker);
9595
this.reportAggregator = new ReportRequestAggregator(serviceName, reportOptions, null, ticker);
9696
this.quotaAggregator = new QuotaRequestAggregator(serviceName, quotaOptions, ticker);
9797
this.serviceName = serviceName;
@@ -134,6 +134,7 @@ public void run() {
134134
scheduleFlushes();
135135
}
136136
});
137+
// Note: this is not supported on App Engine Standard.
137138
schedulerThread.start();
138139
} catch (RuntimeException e) {
139140
log.atInfo().log(BACKGROUND_THREAD_ERROR);
@@ -305,7 +306,6 @@ private synchronized void initializeFlushing() {
305306
this.scheduler = schedulers.create(ticker);
306307
this.scheduler.setStatistics(statistics);
307308
log.atInfo().log("scheduling the initial check, report, and quota");
308-
flushAndScheduleChecks();
309309
flushAndScheduleReports();
310310
flushAndScheduleQuota();
311311
}
@@ -323,51 +323,6 @@ private synchronized boolean resetIfStopped() {
323323
return true;
324324
}
325325

326-
private void flushAndScheduleChecks() {
327-
if (resetIfStopped()) {
328-
log.atFine().log("did not schedule check flush: client is stopped");
329-
return;
330-
}
331-
int interval = checkAggregator.getFlushIntervalMillis();
332-
if (interval < 0) {
333-
log.atFine().log("did not schedule check flush: caching is disabled");
334-
return; // cache is disabled, so no flushing it
335-
}
336-
337-
if (isRunningSchedulerDirectly()) {
338-
log.atFine().log("did not schedule check flush: no scheduler thread is running");
339-
return;
340-
}
341-
342-
log.atFine().log("flushing the check aggregator");
343-
Stopwatch w = Stopwatch.createUnstarted(ticker);
344-
for (CheckRequest req : checkAggregator.flush()) {
345-
try {
346-
statistics.recachedChecks.incrementAndGet();
347-
w.reset().start();
348-
CheckResponse resp = transport.services().check(serviceName, req).execute();
349-
statistics.totalCheckTransportTimeMillis.addAndGet(w.elapsed(TimeUnit.MILLISECONDS));
350-
w.reset().start();
351-
checkAggregator.addResponse(req, resp);
352-
statistics.totalCheckCacheUpdateTimeMillis.addAndGet(w.elapsed(TimeUnit.MILLISECONDS));
353-
} catch (IOException e) {
354-
log.atSevere().withCause(e).log("direct send of a check request %s failed", req);
355-
}
356-
}
357-
// copy scheduler into a local variable to avoid data races beween this method and stop()
358-
Scheduler currentScheduler = scheduler;
359-
if (resetIfStopped()) {
360-
log.atFine().log("did not schedule succeeding check flush: client is stopped");
361-
return;
362-
}
363-
currentScheduler.enter(new Runnable() {
364-
@Override
365-
public void run() {
366-
flushAndScheduleChecks(); // Do this again after the interval
367-
}
368-
}, interval, 0 /* high priority */);
369-
}
370-
371326
private void flushAndScheduleReports() {
372327
if (resetIfStopped()) {
373328
log.atFine().log("did not schedule report flush: client is stopped");

endpoints-control/src/main/java/com/google/api/control/aggregator/CheckAggregationOptions.java

Lines changed: 12 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,7 @@
2020
import com.google.common.base.Ticker;
2121
import com.google.common.cache.Cache;
2222
import com.google.common.cache.CacheBuilder;
23-
import com.google.common.cache.RemovalListener;
24-
import com.google.common.cache.RemovalNotification;
2523

26-
import java.util.concurrent.ConcurrentLinkedDeque;
2724
import java.util.concurrent.TimeUnit;
2825

2926
import javax.annotation.Nullable;
@@ -42,13 +39,7 @@ public class CheckAggregationOptions {
4239
*/
4340
public static final int DEFAULT_RESPONSE_EXPIRATION_MILLIS = 4000;
4441

45-
/**
46-
* The default flush cache entry interval.
47-
*/
48-
public static final int DEFAULT_FLUSH_CACHE_ENTRY_INTERVAL_MILLIS = 2000;
49-
5042
private final int numEntries;
51-
private final int flushCacheEntryIntervalMillis;
5243
private final int expirationMillis;
5344

5445
/**
@@ -58,21 +49,13 @@ public class CheckAggregationOptions {
5849
* is the maximum number of cache entries that can be kept in the
5950
* aggregation cache. The cache is disabled if this value is
6051
* negative.
61-
* @param flushCacheEntryIntervalMillis
62-
* the maximum interval before an aggregated check request is
63-
* flushed to the server. The cache entry is deleted after the
64-
* flush
6552
* @param expirationMillis
6653
* is the maximum interval in milliseconds before a cached check
67-
* response is invalidated. This value should be greater than
68-
* {@code flushCacheEntryIntervalMillis}. If not, it is ignored,
69-
* and a value of {@code flushCacheEntryIntervalMillis} is used
70-
* instead.
54+
* response is invalidated.
7155
*/
72-
public CheckAggregationOptions(int numEntries, int flushCacheEntryIntervalMillis, int expirationMillis) {
56+
public CheckAggregationOptions(int numEntries, int expirationMillis) {
7357
this.numEntries = numEntries;
74-
this.flushCacheEntryIntervalMillis = flushCacheEntryIntervalMillis;
75-
this.expirationMillis = Math.max(expirationMillis, flushCacheEntryIntervalMillis + 1);
58+
this.expirationMillis = expirationMillis;
7659
}
7760

7861
/**
@@ -81,7 +64,7 @@ public CheckAggregationOptions(int numEntries, int flushCacheEntryIntervalMillis
8164
* Creates an instance initialized with the default values.
8265
*/
8366
public CheckAggregationOptions() {
84-
this(DEFAULT_NUM_ENTRIES, DEFAULT_FLUSH_CACHE_ENTRY_INTERVAL_MILLIS, DEFAULT_RESPONSE_EXPIRATION_MILLIS);
67+
this(DEFAULT_NUM_ENTRIES, DEFAULT_RESPONSE_EXPIRATION_MILLIS);
8568
}
8669

8770
/**
@@ -92,18 +75,9 @@ public int getNumEntries() {
9275
return numEntries;
9376
}
9477

95-
/**
96-
* @return the maximum interval before aggregated report requests are
97-
* flushed to the server
98-
*/
99-
public int getFlushCacheEntryIntervalMillis() {
100-
return flushCacheEntryIntervalMillis;
101-
}
102-
10378
/**
10479
* @return the maximum interval before a cached check response should be
105-
* deleted. This value will not be greater than
106-
* {@link #getFlushCacheEntryIntervalMillis()}
80+
* deleted.
10781
*/
10882
public int getExpirationMillis() {
10983
return expirationMillis;
@@ -115,45 +89,29 @@ public int getExpirationMillis() {
11589
* @param <T>
11690
* the type of the instance being cached
11791
*
118-
* @param out
119-
* a concurrent {@code Deque} to which previously cached items
120-
* are added as they expire
12192
* @return a {@link Cache} corresponding to this instance's values or
12293
* {@code null} unless {@link #numEntries} is positive.
12394
*/
12495
@Nullable
125-
public <T> Cache<String, T> createCache(ConcurrentLinkedDeque<T> out) {
126-
return createCache(out, Ticker.systemTicker());
96+
public <T> Cache<String, T> createCache() {
97+
return createCache(Ticker.systemTicker());
12798
}
12899

129100
/**
130101
* Creates a {@link Cache} configured by this instance.
131102
*
132-
* @param <T>
133-
* the type of the value stored in the Cache
134-
* @param out
135-
* a concurrent {@code Deque} to which the cached values are
136-
* added as they are removed from the cache
137-
* @param ticker
138-
* the time source used to determine expiration
103+
* @param <T> the type of the value stored in the Cache
104+
* @param ticker the time source used to determine expiration
139105
* @return a {@link Cache} corresponding to this instance's values or
140-
* {@code null} unless {@code #numEntries} is positive.
106+
* {@code null} unless {@code #numEntries} is positive.
141107
*/
142108
@Nullable
143-
public <T> Cache<String, T> createCache(final ConcurrentLinkedDeque<T> out, Ticker ticker) {
144-
Preconditions.checkNotNull(out, "The out deque cannot be null");
109+
public <T> Cache<String, T> createCache(Ticker ticker) {
145110
Preconditions.checkNotNull(ticker, "The ticker cannot be null");
146111
if (numEntries <= 0) {
147112
return null;
148113
}
149-
final RemovalListener<String, T> listener = new RemovalListener<String, T>() {
150-
@Override
151-
public void onRemoval(RemovalNotification<String, T> notification) {
152-
out.addFirst(notification.getValue());
153-
}
154-
};
155-
CacheBuilder<String, T> b = CacheBuilder.newBuilder().maximumSize(numEntries).ticker(ticker)
156-
.removalListener(listener);
114+
CacheBuilder<Object, Object> b = CacheBuilder.newBuilder().maximumSize(numEntries).ticker(ticker);
157115
if (expirationMillis >= 0) {
158116
b.expireAfterWrite(expirationMillis, TimeUnit.MILLISECONDS);
159117
}

0 commit comments

Comments
 (0)