Skip to content

Commit a0594f0

Browse files
Simplified ListWriter await logic.
1 parent 9db71d4 commit a0594f0

1 file changed

Lines changed: 47 additions & 53 deletions

File tree

dd-trace-core/src/main/java/datadog/trace/common/writer/ListWriter.java

Lines changed: 47 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,27 @@
11
package datadog.trace.common.writer;
22

3+
import static java.util.concurrent.TimeUnit.SECONDS;
4+
35
import datadog.trace.core.DDSpan;
46
import datadog.trace.core.MetadataConsumer;
5-
import datadog.trace.core.tagprocessor.PeerServiceCalculator;
6-
import java.util.ArrayList;
77
import java.util.List;
88
import java.util.concurrent.CopyOnWriteArrayList;
9-
import java.util.concurrent.CountDownLatch;
109
import java.util.concurrent.TimeUnit;
1110
import java.util.concurrent.TimeoutException;
1211
import java.util.concurrent.atomic.AtomicInteger;
12+
import java.util.function.BooleanSupplier;
1313
import org.slf4j.Logger;
1414
import org.slf4j.LoggerFactory;
1515

1616
/** List writer used by tests mostly */
1717
public class ListWriter extends CopyOnWriteArrayList<List<DDSpan>> implements Writer {
18-
1918
private static final Logger log = LoggerFactory.getLogger(ListWriter.class);
19+
private static final Filter ACCEPT_ALL = trace -> true;
2020

21-
public static final Filter ACCEPT_ALL =
22-
new Filter() {
23-
@Override
24-
public boolean accept(List<DDSpan> trace) {
25-
return true;
26-
}
27-
};
28-
29-
private final List<CountDownLatch> latches = new ArrayList<>();
3021
private final AtomicInteger traceCount = new AtomicInteger();
3122
private final TraceStructureWriter structureWriter = new TraceStructureWriter(true);
23+
private final Object monitor = new Object();
3224

33-
private final PeerServiceCalculator peerServiceCalculator = new PeerServiceCalculator();
3425
private Filter filter = ACCEPT_ALL;
3526

3627
public List<DDSpan> firstTrace() {
@@ -47,30 +38,41 @@ public void write(List<DDSpan> trace) {
4738
// remotely realistic so the test actually test something
4839
span.processTagsAndBaggage(MetadataConsumer.NO_OP);
4940
}
41+
42+
add(trace);
43+
structureWriter.write(trace);
44+
5045
traceCount.incrementAndGet();
51-
synchronized (latches) {
52-
add(trace);
53-
for (final CountDownLatch latch : latches) {
54-
if (size() >= latch.getCount()) {
55-
while (latch.getCount() > 0) {
56-
latch.countDown();
57-
}
58-
}
59-
}
46+
synchronized (monitor) {
47+
monitor.notifyAll();
6048
}
61-
structureWriter.write(trace);
6249
}
6350

64-
public boolean waitForTracesMax(final int number, int seconds)
65-
throws InterruptedException, TimeoutException {
66-
final CountDownLatch latch = new CountDownLatch(number);
67-
synchronized (latches) {
68-
if (size() >= number) {
51+
private boolean awaitUntilDeadline(long timeout, TimeUnit unit, BooleanSupplier predicate)
52+
throws InterruptedException {
53+
long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
54+
55+
while (true) {
56+
if (predicate.getAsBoolean()) {
6957
return true;
7058
}
71-
latches.add(latch);
59+
60+
long now = System.currentTimeMillis();
61+
long waitTime = deadline - now;
62+
if (waitTime <= 0) {
63+
break;
64+
}
65+
66+
synchronized (monitor) {
67+
monitor.wait(waitTime);
68+
}
7269
}
73-
return latch.await(seconds, TimeUnit.SECONDS);
70+
71+
return false;
72+
}
73+
74+
public boolean waitForTracesMax(final int number, int seconds) throws InterruptedException {
75+
return awaitUntilDeadline(seconds, SECONDS, () -> traceCount.get() >= number);
7476
}
7577

7678
public void waitForTraces(final int number) throws InterruptedException, TimeoutException {
@@ -88,24 +90,17 @@ public void waitForTraces(final int number) throws InterruptedException, Timeout
8890
}
8991

9092
public void waitUntilReported(final DDSpan span) throws InterruptedException, TimeoutException {
91-
waitUntilReported(span, 20, TimeUnit.SECONDS);
93+
waitUntilReported(span, 20, SECONDS);
9294
}
9395

9496
public void waitUntilReported(final DDSpan span, int timeout, TimeUnit unit)
9597
throws InterruptedException, TimeoutException {
96-
while (true) {
97-
final CountDownLatch latch = new CountDownLatch(size() + 1);
98-
synchronized (latches) {
99-
latches.add(latch);
100-
}
101-
if (isReported(span)) {
102-
return;
103-
}
104-
if (!latch.await(timeout, unit)) {
105-
String msg = "Timeout waiting for span to be reported: " + span;
106-
log.warn(msg);
107-
throw new TimeoutException(msg);
108-
}
98+
boolean reported = awaitUntilDeadline(timeout, unit, () -> isReported(span));
99+
100+
if (!reported) {
101+
String msg = "Timeout waiting for span to be reported: " + span;
102+
log.warn(msg);
103+
throw new TimeoutException(msg);
109104
}
110105
}
111106

@@ -142,17 +137,16 @@ public boolean flush() {
142137
return true;
143138
}
144139

140+
@Override
141+
public void clear() {
142+
super.clear();
143+
144+
traceCount.set(0);
145+
}
146+
145147
@Override
146148
public void close() {
147149
clear();
148-
synchronized (latches) {
149-
for (final CountDownLatch latch : latches) {
150-
while (latch.getCount() > 0) {
151-
latch.countDown();
152-
}
153-
}
154-
latches.clear();
155-
}
156150
}
157151

158152
@Override

0 commit comments

Comments
 (0)