-
Notifications
You must be signed in to change notification settings - Fork 38
Expand file tree
/
Copy pathAgentSink.java
More file actions
147 lines (133 loc) · 5.53 KB
/
Copy pathAgentSink.java
File metadata and controls
147 lines (133 loc) · 5.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.cloudwatchlogs.emf.sinks;
import com.fasterxml.jackson.core.JsonProcessingException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import software.amazon.cloudwatchlogs.emf.Constants;
import software.amazon.cloudwatchlogs.emf.exception.EMFClientException;
import software.amazon.cloudwatchlogs.emf.model.MetricsContext;
import software.amazon.cloudwatchlogs.emf.sinks.retry.RetryStrategy;
import software.amazon.cloudwatchlogs.emf.util.StringUtils;
/** An sink connecting to an agent over a socket. */
@Slf4j
public class AgentSink implements ISink {
private final String logGroupName;
private final String logStreamName;
private final SocketClient client;
private final ExecutorService executor;
private final Supplier<RetryStrategy> retryStrategyFactory;
private final LinkedBlockingQueue<Runnable> queue;
public AgentSink(
String logGroupName,
String logStreamName,
Endpoint endpoint,
SocketClientFactory clientFactory,
int asyncQueueDepth,
Supplier<RetryStrategy> retryStrategy) {
this.logGroupName = logGroupName;
this.logStreamName = logStreamName;
client = clientFactory.getClient(endpoint);
queue = new LinkedBlockingQueue<>(asyncQueueDepth);
executor = createSingleThreadedExecutor();
this.retryStrategyFactory = retryStrategy;
}
private ExecutorService createSingleThreadedExecutor() {
return new ThreadPoolExecutor(
1,
1,
0L,
TimeUnit.MILLISECONDS,
queue,
new ThreadPoolExecutor.DiscardOldestPolicy());
}
public void accept(MetricsContext context) {
if (executor.isShutdown()) {
throw new EMFClientException(
"Attempted to write data to a sink that has been previously shutdown.");
}
if (!StringUtils.isNullOrEmpty(logGroupName)) {
context.putMetadata("LogGroupName", logGroupName);
}
if (!StringUtils.isNullOrEmpty(logStreamName)) {
context.putMetadata("LogStreamName", logStreamName);
}
try {
for (String event : context.serialize()) {
executor.submit(new Sender(event, client, retryStrategyFactory));
}
} catch (JsonProcessingException e) {
log.error("Failed to serialize the metrics with the exception: ", e);
}
}
@Override
public CompletableFuture<Void> shutdown() {
executor.shutdown();
return CompletableFuture.supplyAsync(
() -> {
try {
while ((!executor.awaitTermination(1000, TimeUnit.MILLISECONDS))) {
// we add 1 because we assume that at least one task is running if the
// queue is blocked
log.debug(
"Waiting for graceful shutdown to complete. {} tasks pending.",
queue.size() + 1);
}
} catch (InterruptedException e) {
log.warn("Thread terminated while awaiting shutdown.");
}
return null;
});
}
@AllArgsConstructor
private static class Sender implements Runnable {
private final String event;
private final SocketClient client;
private final Supplier<RetryStrategy> retryStrategyFactory;
@Override
public void run() {
if (!StringUtils.isNullOrEmpty(event)) {
try {
sendMessageForMaxAttempts();
} catch (InterruptedException e) {
log.warn("Thread was interrupted while sending EMF event.");
}
}
}
private void sendMessageForMaxAttempts() throws InterruptedException {
RetryStrategy backoff = null;
for (int i = 0; i < Constants.MAX_ATTEMPTS_PER_MESSAGE; i++) {
try {
client.sendMessage(event + "\n");
return;
} catch (Exception e) {
log.debug(
"Failed to write the message to the socket. Backing off and trying again.",
e);
backoff = backoff != null ? backoff : retryStrategyFactory.get();
Thread.sleep(backoff.next());
}
}
log.warn("Failed all attempts to write message to the socket.");
}
}
}