Skip to content

Commit c17b975

Browse files
committed
US1065453: Support BE preprocessor rules (#3)
- Adds support for new preprocessor rule objects - Updates logging from log4j to JUL - Bump java-lib to 2026-2.1
1 parent 6941fa9 commit c17b975

103 files changed

Lines changed: 409 additions & 8168 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

proxy/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
<groupId>com.wavefront</groupId>
66
<artifactId>proxy</artifactId>
7-
<version>13.8-RC1-SNAPSHOT</version>
7+
<version>14.0-SNAPSHOT</version>
88

99
<name>Wavefront Proxy</name>
1010
<description>Service for batching and relaying metric traffic to Wavefront</description>
@@ -452,7 +452,7 @@
452452
<dependency>
453453
<groupId>com.wavefront</groupId>
454454
<artifactId>java-lib</artifactId>
455-
<version>2023-08.2</version>
455+
<version>2026-2.1</version>
456456
</dependency>
457457
<dependency>
458458
<groupId>com.fasterxml.jackson.module</groupId>

proxy/src/main/java/com/wavefront/agent/AbstractAgent.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@
2020
import com.wavefront.agent.data.EntityPropertiesFactoryImpl;
2121
import com.wavefront.agent.logsharvesting.InteractiveLogsTester;
2222
import com.wavefront.agent.preprocessor.InteractivePreprocessorTester;
23-
import com.wavefront.agent.preprocessor.LineBasedAllowFilter;
24-
import com.wavefront.agent.preprocessor.LineBasedBlockFilter;
25-
import com.wavefront.agent.preprocessor.PreprocessorConfigManager;
26-
import com.wavefront.agent.preprocessor.PreprocessorRuleMetrics;
23+
import com.wavefront.api.agent.preprocessor.LineBasedAllowFilter;
24+
import com.wavefront.api.agent.preprocessor.LineBasedBlockFilter;
25+
import com.wavefront.agent.preprocessor.ProxyPreprocessorConfigManager;
26+
import com.wavefront.api.agent.preprocessor.PreprocessorRuleMetrics;
2727
import com.wavefront.agent.queueing.QueueExporter;
2828
import com.wavefront.agent.queueing.SQSQueueFactoryImpl;
2929
import com.wavefront.agent.queueing.TaskQueueFactory;
@@ -71,7 +71,8 @@ public abstract class AbstractAgent {
7171
protected APIContainer apiContainer;
7272
protected final List<ExecutorService> managedExecutors = new ArrayList<>();
7373
protected final List<Runnable> shutdownTasks = new ArrayList<>();
74-
protected final PreprocessorConfigManager preprocessors = new PreprocessorConfigManager();
74+
protected final ProxyPreprocessorConfigManager preprocessors =
75+
new ProxyPreprocessorConfigManager();
7576
protected final ValidationConfiguration validationConfiguration = new ValidationConfiguration();
7677
protected final Map<String, EntityPropertiesFactory> entityPropertiesFactoryMap =
7778
Maps.newHashMap();
@@ -142,15 +143,18 @@ void initSslContext() throws SSLException {
142143

143144
private void initPreprocessors() {
144145
String configFileName = proxyConfig.getPreprocessorConfigFile();
145-
if (configFileName != null) {
146+
if (ProxyCheckInScheduler.isRulesSetInFE.get()) {
147+
logger.info("Preprocessor configuration was set in FE. Skipping reading from file " + configFileName);
148+
} else if (configFileName != null) {
146149
try {
147150
preprocessors.loadFile(configFileName);
148151
preprocessors.setUpConfigFileMonitoring(configFileName, 5000); // check every 5s
149152
} catch (FileNotFoundException ex) {
150153
throw new RuntimeException(
151154
"Unable to load preprocessor rules - file does not exist: " + configFileName);
152155
}
153-
logger.info("Preprocessor configuration loaded from " + configFileName);
156+
logger.log(Level.INFO, () -> String.format("Preprocessor configuration loaded from %s",
157+
(ProxyCheckInScheduler.isRulesSetInFE.get()) ? "FE rule" : configFileName));
154158
}
155159

156160
// convert block/allow list fields to filters for full backwards compatibility.

proxy/src/main/java/com/wavefront/agent/ProxyCheckInScheduler.java

Lines changed: 91 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@
44
import static org.apache.commons.lang3.ObjectUtils.firstNonNull;
55

66
import com.fasterxml.jackson.databind.JsonNode;
7+
import com.fasterxml.jackson.databind.ObjectMapper;
78
import com.google.common.annotations.VisibleForTesting;
89
import com.google.common.base.Strings;
910
import com.google.common.base.Throwables;
1011
import com.google.common.collect.Maps;
1112
import com.wavefront.agent.api.APIContainer;
12-
import com.wavefront.agent.preprocessor.PreprocessorConfigManager;
13+
import com.wavefront.agent.preprocessor.ProxyPreprocessorConfigManager;
1314
import com.wavefront.api.agent.AgentConfiguration;
1415
import com.wavefront.api.agent.ValidationConfiguration;
1516
import com.wavefront.common.Clock;
@@ -32,8 +33,8 @@
3233
import javax.ws.rs.ClientErrorException;
3334
import javax.ws.rs.ProcessingException;
3435
import org.apache.commons.lang.StringUtils;
35-
import org.apache.logging.log4j.LogManager;
36-
import org.apache.logging.log4j.Logger;
36+
import java.util.logging.Level;
37+
import java.util.logging.Logger;
3738

3839
/**
3940
* Registers the proxy with the back-end, sets up regular "check-ins" (every minute), transmits
@@ -42,7 +43,7 @@
4243
* @author vasily@wavefront.com
4344
*/
4445
public class ProxyCheckInScheduler {
45-
private static final Logger logger = LogManager.getLogger("proxy");
46+
private static final Logger logger = Logger.getLogger("proxy");
4647
private static final int MAX_CHECKIN_ATTEMPTS = 5;
4748

4849
/**
@@ -68,7 +69,10 @@ public class ProxyCheckInScheduler {
6869
private volatile JsonNode agentMetrics;
6970
private boolean retryImmediately = false;
7071

72+
// check if preprocessor rules need to be sent to update BE
7173
public static AtomicBoolean preprocessorRulesNeedUpdate = new AtomicBoolean(false);
74+
// check if rules are set from FE/API
75+
public static AtomicBoolean isRulesSetInFE = new AtomicBoolean(false);
7276

7377
/**
7478
* @param proxyId Proxy UUID.
@@ -134,19 +138,60 @@ public void shutdown() {
134138
executor.shutdown();
135139
}
136140

141+
/**
142+
* Initial sending of preprocessor rules. Will always check local location for preprocessor rule.
143+
*/
144+
public void sendPreprocessorRules() {
145+
if (preprocessorRulesNeedUpdate.getAndSet(false)) {
146+
try {
147+
JsonNode rulesNode = createRulesNode(ProxyPreprocessorConfigManager.getProxyConfigRules(), null);
148+
apiContainer
149+
.getProxyV2APIForTenant(APIContainer.CENTRAL_TENANT_NAME)
150+
.proxySavePreprocessorRules(
151+
proxyId,
152+
rulesNode
153+
);
154+
} catch (javax.ws.rs.NotFoundException ex) {
155+
logger.warning("'proxySavePreprocessorRules' api end point not found");
156+
}
157+
}
158+
}
159+
137160
/** Send preprocessor rules */
138-
private void sendPreprocessorRules() {
161+
private void sendPreprocessorRules(AgentConfiguration agentConfiguration) {
139162
if (preprocessorRulesNeedUpdate.getAndSet(false)) {
163+
String preprocessorRules = null;
164+
if (agentConfiguration.getPreprocessorRules() != null) {
165+
// reading rules from BE if sent from BE
166+
preprocessorRules = agentConfiguration.getPreprocessorRules();
167+
} else {
168+
// reading local file's rule
169+
preprocessorRules = ProxyPreprocessorConfigManager.getProxyConfigRules();
170+
}
140171
try {
172+
JsonNode rulesNode = createRulesNode(preprocessorRules, agentConfiguration.getPreprocessorRulesId());
141173
apiContainer
142-
.getProxyV2APIForTenant(APIContainer.CENTRAL_TENANT_NAME)
143-
.proxySavePreprocessorRules(proxyId, PreprocessorConfigManager.getJsonRules());
174+
.getProxyV2APIForTenant(APIContainer.CENTRAL_TENANT_NAME)
175+
.proxySavePreprocessorRules(
176+
proxyId,
177+
rulesNode
178+
);
144179
} catch (javax.ws.rs.NotFoundException ex) {
145-
logger.debug("'proxySavePreprocessorRules' api end point not found", ex);
180+
logger.warning("'proxySavePreprocessorRules' api end point not found");
146181
}
147182
}
148183
}
149184

185+
private JsonNode createRulesNode(String preprocessorRules, String proxyId) {
186+
Map<String, String> fieldsMap = new HashMap<>();
187+
fieldsMap.put("proxyRules", preprocessorRules);
188+
if (proxyConfig.getPreprocessorConfigFile() != null) fieldsMap.put("proxyRulesFilePath", proxyConfig.getPreprocessorConfigFile());
189+
if (proxyId != null) fieldsMap.put("proxyRulesId", proxyId);
190+
191+
ObjectMapper objectMapper = new ObjectMapper();
192+
return objectMapper.valueToTree(fieldsMap);
193+
}
194+
150195
/**
151196
* Perform agent check-in and fetch configuration of the daemon from remote server.
152197
*
@@ -335,15 +380,15 @@ private Map<String, AgentConfiguration> checkin() {
335380
if (StringUtils.isBlank(logServerIngestionURL)
336381
|| StringUtils.isBlank(logServerIngestionToken)) {
337382
proxyConfig.setReceivedLogServerDetails(false);
338-
logger.error(
383+
logger.severe(
339384
WARNING_MSG
340385
+ " To ingest logs to the log server, please provide "
341386
+ "logServerIngestionToken & logServerIngestionURL in the proxy configuration.");
342387
}
343388
}
344389
} else if (StringUtils.isBlank(logServerIngestionURL)
345390
|| StringUtils.isBlank(logServerIngestionToken)) {
346-
logger.warn(
391+
logger.severe(
347392
WARNING_MSG
348393
+ " Proxy will not be ingesting data to the log server as it did "
349394
+ "not receive at least one of the values during check-in.");
@@ -358,7 +403,6 @@ private Map<String, AgentConfiguration> checkin() {
358403
void updateConfiguration() {
359404
try {
360405
Map<String, AgentConfiguration> configList = checkin();
361-
sendPreprocessorRules();
362406
if (configList != null && !configList.isEmpty()) {
363407
AgentConfiguration config;
364408
for (Map.Entry<String, AgentConfiguration> configEntry : configList.entrySet()) {
@@ -368,27 +412,53 @@ void updateConfiguration() {
368412
continue;
369413
}
370414
if (configEntry.getKey().equals(APIContainer.CENTRAL_TENANT_NAME)) {
371-
if (logger.isDebugEnabled()) {
372-
logger.debug("Server configuration getShutOffAgents: " + config.getShutOffAgents());
373-
logger.debug("Server configuration isTruncateQueue: " + config.isTruncateQueue());
415+
if (logger.isLoggable(Level.FINE)) {
416+
logger.fine("Server configuration getShutOffAgents: " + config.getShutOffAgents());
417+
logger.fine("Server configuration isTruncateQueue: " + config.isTruncateQueue());
374418
}
375419
if (config.getShutOffAgents()) {
376-
logger.warn(
420+
logger.severe(
377421
firstNonNull(
378422
config.getShutOffMessage(),
379423
"Shutting down: Server side flag indicating proxy has to shut down."));
380424
shutdownHook.run();
381425
} else if (config.isTruncateQueue()) {
382-
logger.warn(
426+
logger.severe(
383427
"Truncating queue: Server side flag indicating proxy queue has to be truncated.");
384428
truncateBacklog.run();
385429
}
386430
}
387431
agentConfigurationConsumer.accept(configEntry.getKey(), config);
432+
433+
// Check if preprocessor rules were set on server side
434+
String checkPreprocessorRules = config.getPreprocessorRules();
435+
if (checkPreprocessorRules != null && !checkPreprocessorRules.isEmpty()) {
436+
AgentConfiguration finalConfig = config;
437+
logger.log(Level.INFO, () -> String.format("New preprocessor rules detected during checkin. Setting new preprocessor rule %s",
438+
(finalConfig.getPreprocessorRulesId() != null && !finalConfig.getPreprocessorRulesId().isEmpty()) ? finalConfig.getPreprocessorRulesId() : ""));
439+
// future implementation, can send timestamp through AgentConfig and skip reloading if rule unchanged
440+
isRulesSetInFE.set(true);
441+
// indicates will need to sendPreprocessorRules()
442+
preprocessorRulesNeedUpdate.set(true);
443+
} else {
444+
// was previously reading from BE
445+
if (isRulesSetInFE.get()) {
446+
if (proxyConfig.getPreprocessorConfigFile() == null || proxyConfig.getPreprocessorConfigFile().isEmpty()) {
447+
logger.info("No preprocessor rules detected during checkin, and no rules file found.");
448+
} else {
449+
logger.log(Level.INFO, () -> String.format("Reverting back to reading rules from file %s", proxyConfig.getPreprocessorConfigFile()));
450+
}
451+
// indicates that previously read from BE, now switching back to reading from file.
452+
isRulesSetInFE.set(false);
453+
preprocessorRulesNeedUpdate.set(true);
454+
}
455+
}
456+
// will always send to BE in order to update Agent with latest rule
457+
sendPreprocessorRules(config);
388458
}
389459
}
390460
} catch (Exception e) {
391-
logger.error("Exception occurred during configuration update", e);
461+
logger.log(Level.SEVERE, "Exception occurred during configuration update", e);
392462
}
393463
}
394464

@@ -405,13 +475,13 @@ void updateProxyMetrics() {
405475
retries.set(0);
406476
}
407477
} catch (Exception ex) {
408-
logger.error("Could not generate proxy metrics", ex);
478+
logger.log(Level.SEVERE, "Could not generate proxy metrics", ex);
409479
}
410480
}
411481

412482
private void checkinError(String errMsg) {
413-
if (successfulCheckIns.get() == 0) logger.error(Strings.repeat("*", errMsg.length()));
414-
logger.error(errMsg);
415-
if (successfulCheckIns.get() == 0) logger.error(Strings.repeat("*", errMsg.length()));
483+
if (successfulCheckIns.get() == 0) logger.severe(Strings.repeat("*", errMsg.length()));
484+
logger.severe(errMsg);
485+
if (successfulCheckIns.get() == 0) logger.severe(Strings.repeat("*", errMsg.length()));
416486
}
417487
}

proxy/src/main/java/com/wavefront/agent/ProxySendConfigScheduler.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,12 @@
55
import java.util.concurrent.Executors;
66
import java.util.concurrent.ScheduledExecutorService;
77
import java.util.concurrent.TimeUnit;
8-
import org.apache.logging.log4j.Level;
9-
import org.apache.logging.log4j.LogManager;
10-
import org.apache.logging.log4j.Logger;
8+
import java.util.logging.Level;
9+
import java.util.logging.Logger;
1110

1211
public class ProxySendConfigScheduler {
1312
private static final Logger logger =
14-
LogManager.getLogger(ProxySendConfigScheduler.class.getCanonicalName());
13+
Logger.getLogger(ProxySendConfigScheduler.class.getCanonicalName());
1514
private boolean successful = false;
1615
private final ScheduledExecutorService executor;
1716
private final Runnable task;
@@ -28,13 +27,13 @@ public ProxySendConfigScheduler(
2827
successful = true;
2928
logger.info("Configuration sent to the server successfully.");
3029
} catch (javax.ws.rs.NotFoundException ex) {
31-
logger.debug("'proxySaveConfig' api end point not found", ex);
30+
logger.log(Level.FINE, "'proxySaveConfig' api end point not found", ex);
3231
successful = true;
3332
} catch (Throwable e) {
34-
logger.warn(
33+
logger.severe(
3534
"Can't send the Proxy configuration to the server, retrying in 60 seconds. "
3635
+ e.getMessage());
37-
logger.log(Level.DEBUG, "Exception: ", e);
36+
logger.log(Level.FINE, "Exception: ", e);
3837
}
3938

4039
if (successful) {

proxy/src/main/java/com/wavefront/agent/PushAgent.java

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,10 @@
4141
import com.wavefront.agent.listeners.tracing.*;
4242
import com.wavefront.agent.logsharvesting.FilebeatIngester;
4343
import com.wavefront.agent.logsharvesting.LogsIngester;
44-
import com.wavefront.agent.preprocessor.PreprocessorRuleMetrics;
45-
import com.wavefront.agent.preprocessor.ReportPointAddPrefixTransformer;
46-
import com.wavefront.agent.preprocessor.ReportPointTimestampInRangeFilter;
47-
import com.wavefront.agent.preprocessor.SpanSanitizeTransformer;
44+
import com.wavefront.api.agent.preprocessor.PreprocessorRuleMetrics;
45+
import com.wavefront.api.agent.preprocessor.ReportPointAddPrefixTransformer;
46+
import com.wavefront.api.agent.preprocessor.ReportPointTimestampInRangeFilter;
47+
import com.wavefront.api.agent.preprocessor.SpanSanitizeTransformer;
4848
import com.wavefront.agent.queueing.*;
4949
import com.wavefront.agent.sampler.SpanSampler;
5050
import com.wavefront.agent.sampler.SpanSamplerUtils;
@@ -71,6 +71,7 @@
7171
import io.netty.handler.codec.http.cors.CorsConfigBuilder;
7272
import io.netty.handler.ssl.SslContext;
7373
import java.io.File;
74+
import java.io.FileNotFoundException;
7475
import java.net.BindException;
7576
import java.net.InetAddress;
7677
import java.nio.ByteOrder;
@@ -79,6 +80,7 @@
7980
import java.util.concurrent.Executors;
8081
import java.util.concurrent.ScheduledExecutorService;
8182
import java.util.concurrent.TimeUnit;
83+
import java.util.concurrent.atomic.AtomicBoolean;
8284
import java.util.function.Function;
8385
import java.util.function.Supplier;
8486
import java.util.logging.Level;
@@ -157,6 +159,7 @@ public class PushAgent extends AbstractAgent {
157159
private Logger blockedHistogramsLogger;
158160
private Logger blockedSpansLogger;
159161
private Logger blockedLogsLogger;
162+
private AtomicBoolean usingLocalFileRules = new AtomicBoolean(true);
160163

161164
public static void main(String[] args) {
162165
// Start the ssh daemon
@@ -1881,6 +1884,18 @@ private void registerPrefixFilter(String strPort) {
18811884
@Override
18821885
protected void processConfiguration(String tenantName, AgentConfiguration config) {
18831886
try {
1887+
boolean configHasPreprocessorRules = config.getPreprocessorRules() != null && !config.getPreprocessorRules().isEmpty();
1888+
// apply new preprocessor rules after checkin
1889+
if (ProxyCheckInScheduler.isRulesSetInFE.get() && configHasPreprocessorRules) {
1890+
loadPreprocessors(config.getPreprocessorRules(), true);
1891+
}
1892+
// check if we want to read local file
1893+
else if (!ProxyCheckInScheduler.isRulesSetInFE.get() &&
1894+
proxyConfig.getPreprocessorConfigFile() != null &&
1895+
!usingLocalFileRules.get()) { // are we already reading local file
1896+
loadPreprocessors(proxyConfig.getPreprocessorConfigFile(), false);
1897+
}
1898+
18841899
Long pointsPerBatch = config.getPointsPerBatch();
18851900
EntityPropertiesFactory tenantSpecificEntityProps =
18861901
entityPropertiesFactoryMap.get(tenantName);
@@ -2140,4 +2155,20 @@ protected void stopListener(int port) {
21402155
protected void truncateBacklog() {
21412156
senderTaskFactory.truncateBuffers();
21422157
}
2158+
2159+
private void loadPreprocessors(String preprocessorStr, boolean readfromFE) {
2160+
try {
2161+
if (readfromFE && preprocessorStr != null) {
2162+
// preprocessorStr is str of rules itself
2163+
preprocessors.loadFERules(preprocessorStr);
2164+
usingLocalFileRules.set(false);
2165+
} else {
2166+
// preprocessorStr is file path
2167+
preprocessors.loadFile(preprocessorStr);
2168+
usingLocalFileRules.set(true);
2169+
}
2170+
} catch (FileNotFoundException e) {
2171+
throw new RuntimeException("Unable to load preprocessor rules - file does not exist: " + preprocessorStr);
2172+
}
2173+
}
21432174
}

0 commit comments

Comments
 (0)