Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,17 @@ public class DivertConfiguration implements Serializable, EncodingSupport {
public DivertConfiguration() {
}

public DivertConfiguration(DivertConfiguration config) {
this.name = config.getName();
this.routingName = config.getRoutingName();
this.address = config.getAddress();
this.forwardingAddress = config.getForwardingAddress();
this.exclusive = config.isExclusive();
this.filterString = config.getFilterString();
this.transformerConfiguration = config.getTransformerConfiguration();
this.routingType = config.getRoutingType();
}

/**
* Set the value of a parameter based on its "key" {@code String}. Valid key names and corresponding {@code static}
* {@code final} are:
Expand Down Expand Up @@ -108,7 +119,7 @@ public DivertConfiguration set(String key, String value) {
setTransformerConfiguration(transformerConfiguration);
}
} else if (key.equals(ROUTING_TYPE)) {
setRoutingType(ComponentConfigurationRoutingType.valueOf(value));
setRoutingType(value == null ? null : ComponentConfigurationRoutingType.valueOf(value));
}
}
return this;
Expand Down Expand Up @@ -227,7 +238,9 @@ public String toJSON() {
builder.add(TRANSFORMER_CONFIGURATION, tc.createJsonObjectBuilder());
}

if (getRoutingType() != null) {
if (getRoutingType() == null) {
builder.add(ROUTING_TYPE, JsonValue.NULL);
} else {
builder.add(ROUTING_TYPE, getRoutingType().name());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.OperationConsistencyLevel;
import org.apache.activemq.artemis.core.io.SequentialFile;
Expand Down Expand Up @@ -391,6 +392,8 @@ JournalLoadInformation loadBindingJournal(List<QueueBindingInfo> queueBindingInf

List<PersistedDivertConfiguration> recoverDivertConfigurations();

DivertConfiguration getDivertConfiguration(String name);

void storeBridgeConfiguration(PersistedBridgeConfiguration persistedBridgeConfiguration) throws Exception;

void deleteBridgeConfiguration(String bridgeName) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
Expand Down Expand Up @@ -845,6 +846,16 @@ public List<PersistedDivertConfiguration> recoverDivertConfigurations() {
return new ArrayList<>(mapPersistedDivertConfigurations.values());
}

@Override
public DivertConfiguration getDivertConfiguration(String name) {
PersistedDivertConfiguration persistedDivertConfiguration = mapPersistedDivertConfigurations.get(name);
if (persistedDivertConfiguration != null) {
return new DivertConfiguration(persistedDivertConfiguration.getDivertConfiguration());
} else {
return null;
}
}

@Override
public void storeBridgeConfiguration(PersistedBridgeConfiguration persistedBridgeConfiguration) throws Exception {
storeConfiguration(persistedBridgeConfiguration, mapPersistedBridgeConfigurations);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.OperationConsistencyLevel;
Expand Down Expand Up @@ -498,6 +499,11 @@ public List<PersistedDivertConfiguration> recoverDivertConfigurations() {
return null;
}

@Override
public DivertConfiguration getDivertConfiguration(String name) {
return null;
}

@Override
public void storeBridgeConfiguration(PersistedBridgeConfiguration persistedBridgeConfiguration) throws Exception {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1544,4 +1544,7 @@ void slowConsumerDetected(String sessionID,

@LogMessage(id = 224163, value = "Failed to clone SHA256 MessageDigest, falling back to getInstance", level = LogMessage.Level.INFO)
void sha256CloneNotSupported(CloneNotSupportedException cns);

Comment thread
clebertsuconic marked this conversation as resolved.
@LogMessage(id = 224164, value = "Failed to recover stored configuration for divert named '{}': {}. To repair this record create a new divert with the same name via the management API.", level = LogMessage.Level.WARN)
void failedToRecoverStoredDivertConfiguration(String divertName, String divert);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3069,38 +3069,59 @@ public Divert updateDivert(DivertConfiguration config) throws Exception {
return null;
}

final Divert divert = divertBinding.getDivert();
// The divert config may be in defined in the broker config (e.g. XML) or stored in the journal. If it's in the
// journal we want to make sure it's updated propertly otherwise we just update what's in memory.
DivertConfiguration onStorageDivert = storageManager.getDivertConfiguration(config.getName());
final Divert inMemoryDivert = divertBinding.getDivert();

Filter filter = FilterImpl.createFilter(config.getFilterString());
if (filter == null) {
divert.setFilter(null);
inMemoryDivert.setFilter(null);
if (onStorageDivert != null) {
onStorageDivert.setFilterString(null);
}
} else {
if (!filter.equals(divert.getFilter())) {
divert.setFilter(filter);
if (!filter.equals(inMemoryDivert.getFilter())) {
inMemoryDivert.setFilter(filter);
if (onStorageDivert != null) {
onStorageDivert.setFilterString(config.getFilterString());
}
}
}

if (config.getTransformerConfiguration() != null) {
getServiceRegistry().removeDivertTransformer(divert.getUniqueName().toString());
getServiceRegistry().removeDivertTransformer(inMemoryDivert.getUniqueName().toString());
Transformer transformer = getServiceRegistry().getDivertTransformer(
config.getName(), config.getTransformerConfiguration());
divert.setTransformer(transformer);
inMemoryDivert.setTransformer(transformer);
if (onStorageDivert != null) {
onStorageDivert.setTransformerConfiguration(config.getTransformerConfiguration());
}
}

if (config.getForwardingAddress() != null) {
SimpleString forwardAddress = SimpleString.of(config.getForwardingAddress());
if (!forwardAddress.equals(divert.getForwardAddress())) {
divert.setForwardAddress(forwardAddress);
if (!forwardAddress.equals(inMemoryDivert.getForwardAddress())) {
inMemoryDivert.setForwardAddress(forwardAddress);
if (onStorageDivert != null) {
onStorageDivert.setForwardingAddress(config.getForwardingAddress());
}
}
}

if (config.getRoutingType() != null && divert.getRoutingType() != config.getRoutingType()) {
divert.setRoutingType(config.getRoutingType());
if (config.getRoutingType() != null && inMemoryDivert.getRoutingType() != config.getRoutingType()) {
inMemoryDivert.setRoutingType(config.getRoutingType());
if (onStorageDivert != null) {
onStorageDivert.setRoutingType(config.getRoutingType());
}
}

storageManager.storeDivertConfiguration(new PersistedDivertConfiguration(config));
if (onStorageDivert != null) {
// this will replace the existing divert record in the journal using delete + add
storageManager.storeDivertConfiguration(new PersistedDivertConfiguration(onStorageDivert));
}

return divert;
return inMemoryDivert;
}

@Override
Expand Down Expand Up @@ -4465,15 +4486,20 @@ private void recoverStoredDiverts() throws Exception {
if (storageManager.recoverDivertConfigurations() != null) {

for (PersistedDivertConfiguration persistedDivertConfiguration : storageManager.recoverDivertConfigurations()) {
//has it been removed from config
boolean deleted = configuration.getDivertConfigurations().stream().noneMatch(divertConfiguration -> divertConfiguration.getName().equals(persistedDivertConfiguration.getName()));
// if it has remove it if configured to do so
if (deleted) {
if (addressSettingsRepository.getMatch(persistedDivertConfiguration.getDivertConfiguration().getAddress()).getConfigDeleteDiverts() == DeletionPolicy.FORCE) {
storageManager.deleteDivertConfiguration(persistedDivertConfiguration.getName());
} else {
deployDivert(persistedDivertConfiguration.getDivertConfiguration());
try {
//has it been removed from config
boolean deleted = configuration.getDivertConfigurations().stream().noneMatch(divertConfiguration -> divertConfiguration.getName().equals(persistedDivertConfiguration.getName()));
// if it has remove it if configured to do so
if (deleted) {
if (addressSettingsRepository.getMatch(persistedDivertConfiguration.getDivertConfiguration().getAddress()).getConfigDeleteDiverts() == DeletionPolicy.FORCE) {
storageManager.deleteDivertConfiguration(persistedDivertConfiguration.getName());
} else {
deployDivert(persistedDivertConfiguration.getDivertConfiguration());
}
}
} catch (Exception e) {
logger.debug(e.getMessage(), e);
ActiveMQServerLogger.LOGGER.failedToRecoverStoredDivertConfiguration(persistedDivertConfiguration.getName(), String.valueOf(persistedDivertConfiguration.getDivertConfiguration()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.OperationConsistencyLevel;
import org.apache.activemq.artemis.core.io.SequentialFile;
Expand Down Expand Up @@ -746,6 +747,11 @@ public List<PersistedDivertConfiguration> recoverDivertConfigurations() {
return null;
}

@Override
public DivertConfiguration getDivertConfiguration(String name) {
return null;
}

@Override
public void storeBridgeConfiguration(PersistedBridgeConfiguration persistedBridgeConfiguration) throws Exception {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.OperationConsistencyLevel;
Expand Down Expand Up @@ -748,6 +749,11 @@ public List<PersistedDivertConfiguration> recoverDivertConfigurations() {
return null;
}

@Override
public DivertConfiguration getDivertConfiguration(String name) {
return null;
}

@Override
public void storeBridgeConfiguration(PersistedBridgeConfiguration persistedBridgeConfiguration) throws Exception {
}
Expand Down
Loading