Skip to content

Commit 9754483

Browse files
hyperxprojbertram
authored andcommitted
ARTEMIS-4692 Allow export of specific queues
1 parent 810a9e1 commit 9754483

3 files changed

Lines changed: 182 additions & 5 deletions

File tree

artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java

Lines changed: 69 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ public final class XmlDataExporter extends DBOption {
8585
@Option(names = "undefined-prefix", description = "In case a queue does not exist, this will define the prefix to be used on the message export. Default: 'UndefinedQueue_'")
8686
private String undefinedPrefix = "UndefinedQueue_";
8787

88+
@Option(names = "--queue", description = "Only export the specified queue(s). Repeatable. When omitted, all queues are exported.")
89+
private List<String> queues;
90+
8891
// an inner map of message refs hashed by the queue ID to which they belong and then hashed by their record ID
8992
private final Map<Long, Map<Long, ReferenceDescribe>> messageRefs = new HashMap<>();
9093

@@ -114,6 +117,20 @@ public XmlDataExporter setUndefinedPrefix(String undefinedPrefix) {
114117
return this;
115118
}
116119

120+
public List<String> getQueues() {
121+
return queues;
122+
}
123+
124+
public XmlDataExporter setQueues(List<String> queues) {
125+
this.queues = queues;
126+
return this;
127+
}
128+
129+
// when no queue filter is set (null/empty) everything is exported, preserving the default behavior
130+
private boolean includeQueue(String queueName) {
131+
return queues == null || queues.isEmpty() || queues.contains(queueName);
132+
}
133+
117134
@Override
118135
public Object execute(ActionContext context) throws Exception {
119136
super.execute(context);
@@ -164,6 +181,7 @@ protected void writeOutput(OutputStream out) throws Exception {
164181
private void writeXMLData() throws Exception {
165182
long start = System.currentTimeMillis();
166183
getBindings();
184+
warnUnknownQueues();
167185
processMessageJournal();
168186
printDataAsXML();
169187
logger.debug("\n\nProcessing took: {}ms", (System.currentTimeMillis() - start));
@@ -328,6 +346,25 @@ private void getBindings() throws Exception {
328346
bindingsJournal.stop();
329347
}
330348

349+
/**
350+
* Warn about any queue name passed via {@code --queue} that doesn't exist among the real bindings, so a typo doesn't
351+
* silently result in an empty export. Must run after {@link #getBindings()} but before any synthetic
352+
* {@code undefinedPrefix} bindings are created.
353+
*/
354+
private void warnUnknownQueues() {
355+
if (queues == null || queues.isEmpty()) {
356+
return;
357+
}
358+
Set<String> existing = queueBindings.values().stream()
359+
.map(binding -> binding.getQueueConfiguration().getName().toString())
360+
.collect(Collectors.toSet());
361+
for (String queue : queues) {
362+
if (!existing.contains(queue)) {
363+
getActionContext().err.println("Queue '" + queue + "' was not found; nothing will be exported for it.");
364+
}
365+
}
366+
}
367+
331368
private void printDataAsXML() {
332369
try {
333370

@@ -349,9 +386,20 @@ public Throwable getLastError() {
349386
}
350387

351388
private void printBindingsAsXML() throws XMLStreamException {
389+
// when a queue filter is active only export the addresses that host at least one of the requested queues
390+
Set<String> includedAddresses = (queues == null || queues.isEmpty()) ? null :
391+
queueBindings.values().stream()
392+
.map(PersistentQueueBindingEncoding::getQueueConfiguration)
393+
.filter(queueConfig -> includeQueue(queueConfig.getName().toString()))
394+
.map(queueConfig -> queueConfig.getAddress().toString())
395+
.collect(Collectors.toSet());
396+
352397
xmlWriter.writeStartElement(XmlDataConstants.BINDINGS_PARENT);
353398
for (Map.Entry<Long, PersistentAddressBindingEncoding> addressBindingEncodingEntry : addressBindings.entrySet()) {
354399
PersistentAddressBindingEncoding bindingEncoding = addressBindings.get(addressBindingEncodingEntry.getKey());
400+
if (includedAddresses != null && !includedAddresses.contains(bindingEncoding.getName().toString())) {
401+
continue;
402+
}
355403
xmlWriter.writeEmptyElement(XmlDataConstants.ADDRESS_BINDINGS_CHILD);
356404
String routingTypes = bindingEncoding.getRoutingTypes().stream().
357405
map(Enum::toString).collect(Collectors.joining(","));
@@ -362,6 +410,9 @@ private void printBindingsAsXML() throws XMLStreamException {
362410
}
363411
for (Map.Entry<Long, PersistentQueueBindingEncoding> queueBindingEncodingEntry : queueBindings.entrySet()) {
364412
QueueConfiguration queueConfig = queueBindings.get(queueBindingEncodingEntry.getKey()).getQueueConfiguration();
413+
if (!includeQueue(queueConfig.getName().toString())) {
414+
continue;
415+
}
365416
xmlWriter.writeEmptyElement(XmlDataConstants.QUEUE_BINDINGS_CHILD);
366417
xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_ADDRESS, queueConfig.getAddress().toString());
367418
xmlWriter.writeAttribute(XmlDataConstants.QUEUE_BINDING_FILTER_STRING, queueConfig.getFilterString() == null ? "" : queueConfig.getFilterString().toString());
@@ -384,7 +435,12 @@ private void printAllMessagesAsXML() throws Exception {
384435
// Order here is important. We must process the messages from the journal before we process those from the page
385436
// files in order to get the messages in the right order.
386437
for (Map.Entry<Long, Message> messageMapEntry : messages.entrySet()) {
387-
printSingleMessageAsXML(messageMapEntry.getValue().toCore(), extractQueueNames(messageRefs.get(messageMapEntry.getKey())));
438+
List<String> queueNames = extractQueueNames(messageRefs.get(messageMapEntry.getKey()));
439+
// when a queue filter is active a message with no matching queue is dropped entirely
440+
if (queueNames.isEmpty()) {
441+
continue;
442+
}
443+
printSingleMessageAsXML(messageMapEntry.getValue().toCore(), queueNames);
388444
msgs++;
389445
if (logInterval > 0) {
390446
if (msgs % logInterval == 0) {
@@ -457,7 +513,9 @@ private void printPagedMessagesAsXML() {
457513
PersistentQueueBindingEncoding queueBinding = queueBindings.get(queueID);
458514
if (queueBinding != null) {
459515
SimpleString queueName = queueBinding.getQueueConfiguration().getName();
460-
queueNames.add(queueName.toString());
516+
if (includeQueue(queueName.toString())) {
517+
queueNames.add(queueName.toString());
518+
}
461519
}
462520
}
463521
}
@@ -482,13 +540,17 @@ private void printPagedMessagesAsXML() {
482540
}
483541

484542
private List<String> extractQueueNames(Map<Long, DescribeJournal.ReferenceDescribe> refMap) {
485-
List<String> queues = new ArrayList<>();
543+
List<String> queueList = new ArrayList<>();
486544
for (DescribeJournal.ReferenceDescribe ref : refMap.values()) {
487545
String queueName;
488546

489547
long id = ref.refEncoding.queueID;
490548
PersistentQueueBindingEncoding persistentQueueBindingEncoding = queueBindings.get(id);
491549
if (persistentQueueBindingEncoding == null) {
550+
// an unknown queue ID has no name that could match a --queue filter, so there's nothing to export for it
551+
if (queues != null && !queues.isEmpty()) {
552+
continue;
553+
}
492554
String name = undefinedPrefix + id;
493555
queueBindings.put(id, new PersistentQueueBindingEncoding(QueueConfiguration.of(name).setAddress(name)));
494556
queueName = String.valueOf(name);
@@ -497,9 +559,11 @@ private List<String> extractQueueNames(Map<Long, DescribeJournal.ReferenceDescri
497559
queueName = String.valueOf(persistentQueueBindingEncoding.getQueueConfiguration().getName());
498560
}
499561

500-
queues.add(queueName);
562+
if (includeQueue(queueName)) {
563+
queueList.add(queueName);
564+
}
501565
}
502-
return queues;
566+
return queueList;
503567
}
504568

505569
/**

docs/user-manual/data-tools.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ SYNOPSIS
158158
[--bindings <binding>] [--jdbc] [--verbose]
159159
[--jdbc-message-table-name <jdbcMessages>]
160160
[--jdbc-node-manager-table-name <jdbcNodeManager>] [--output <output>]
161+
[--queue <queue>]
161162
artemis data imp [--legacy-prefixes] [--password <password>]
162163
[--transaction] [--verbose] [--port <port>] [--user <user>] [--sort]
163164
--input <input> [--host <host>]
@@ -346,6 +347,9 @@ COMMANDS
346347

347348
With --output option, Output name for the file
348349

350+
With --queue option, Only export the specified queue(s). Repeatable.
351+
When omitted, all queues are exported.
352+
349353
imp
350354
Import all message-data using an XML that could be interpreted by
351355
any system.

tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.io.File;
3838
import java.io.FileOutputStream;
3939
import java.util.EnumSet;
40+
import java.util.List;
4041
import java.util.UUID;
4142

4243
import org.apache.activemq.artemis.api.core.Message;
@@ -67,6 +68,7 @@
6768
import org.apache.activemq.artemis.tests.unit.util.InVMContext;
6869
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
6970
import org.apache.activemq.artemis.tests.util.CFUtil;
71+
import org.apache.activemq.artemis.utils.CompositeAddress;
7072
import org.apache.activemq.artemis.utils.RandomUtil;
7173
import org.apache.activemq.artemis.tests.util.Wait;
7274
import org.apache.activemq.artemis.utils.UUIDGenerator;
@@ -725,6 +727,113 @@ public void testPartialQueue() throws Exception {
725727
assertNotNull(msg);
726728
}
727729

730+
@Test
731+
public void testExportSpecificQueue() throws Exception {
732+
ClientSession session = basicSetUp();
733+
734+
// queueA1 and queueA2 share addressA; queueB lives on its own address
735+
session.createQueue(QueueConfiguration.of("queueA1").setAddress("addressA").setRoutingType(RoutingType.ANYCAST));
736+
session.createQueue(QueueConfiguration.of("queueA2").setAddress("addressA").setRoutingType(RoutingType.ANYCAST));
737+
session.createQueue(QueueConfiguration.of("queueB").setAddress("addressB").setRoutingType(RoutingType.ANYCAST));
738+
739+
// FQQN addressing routes each message to exactly one queue
740+
sendDurableTextMessage(session, CompositeAddress.toFullyQualified("addressA", "queueA1"), "A1");
741+
sendDurableTextMessage(session, CompositeAddress.toFullyQualified("addressA", "queueA2"), "A2");
742+
sendDurableTextMessage(session, CompositeAddress.toFullyQualified("addressB", "queueB"), "B");
743+
744+
session.close();
745+
locator.close();
746+
server.stop();
747+
748+
ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
749+
XmlDataExporter xmlDataExporter = new XmlDataExporter();
750+
xmlDataExporter.setQueues(List.of("queueA1"));
751+
xmlDataExporter.process(xmlOutputStream, server.getConfiguration().getBindingsDirectory(), server.getConfiguration().getJournalDirectory(), server.getConfiguration().getPagingDirectory(), server.getConfiguration().getLargeMessagesDirectory());
752+
String xml = new String(xmlOutputStream.toByteArray());
753+
if (logger.isDebugEnabled()) {
754+
logger.debug(xml);
755+
}
756+
757+
// only the requested queue and the address that hosts it are exported
758+
assertTrue(xml.contains("queueA1"), "expected queueA1 in export");
759+
assertTrue(xml.contains("addressA"), "expected addressA in export");
760+
assertFalse(xml.contains("queueA2"), "queueA2 should not be exported");
761+
assertFalse(xml.contains("queueB"), "queueB should not be exported");
762+
assertFalse(xml.contains("addressB"), "addressB should not be exported");
763+
764+
clearDataRecreateServerDirs();
765+
server.start();
766+
forceLong();
767+
locator = createInVMNonHALocator();
768+
factory = createSessionFactory(locator);
769+
session = factory.createSession(false, true, true);
770+
771+
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
772+
XmlDataImporter xmlDataImporter = new XmlDataImporter();
773+
xmlDataImporter.validate(xmlInputStream);
774+
xmlInputStream.reset();
775+
xmlDataImporter.process(xmlInputStream, session);
776+
777+
// round-trip: only queueA1 was recreated, carrying only its own message
778+
assertNotNull(server.locateQueue("queueA1"));
779+
assertNull(server.locateQueue("queueA2"));
780+
assertNull(server.locateQueue("queueB"));
781+
782+
ClientConsumer consumer = session.createConsumer("queueA1");
783+
session.start();
784+
ClientMessage msg = consumer.receive(CONSUMER_TIMEOUT);
785+
assertNotNull(msg);
786+
assertEquals("A1", msg.getBodyBuffer().readString());
787+
assertNull(consumer.receiveImmediate());
788+
consumer.close();
789+
}
790+
791+
@Test
792+
public void testExportUnknownQueueWarns() throws Exception {
793+
ClientSession session = basicSetUp();
794+
795+
session.createQueue(QueueConfiguration.of("realQueue"));
796+
ClientProducer producer = session.createProducer("realQueue");
797+
producer.send(session.createMessage(true));
798+
799+
session.close();
800+
locator.close();
801+
server.stop();
802+
803+
ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
804+
XmlDataExporter xmlDataExporter = new XmlDataExporter();
805+
// filtering on a queue that doesn't exist must not throw and must export nothing for it
806+
xmlDataExporter.setQueues(List.of("ghostQueue"));
807+
xmlDataExporter.process(xmlOutputStream, server.getConfiguration().getBindingsDirectory(), server.getConfiguration().getJournalDirectory(), server.getConfiguration().getPagingDirectory(), server.getConfiguration().getLargeMessagesDirectory());
808+
String xml = new String(xmlOutputStream.toByteArray());
809+
assertNull(xmlDataExporter.getLastError());
810+
assertFalse(xml.contains("realQueue"), "no real queue data should be exported when filtering an unknown queue");
811+
812+
// the produced XML must still be valid and importable; it simply won't recreate any queue
813+
clearDataRecreateServerDirs();
814+
server.start();
815+
forceLong();
816+
locator = createInVMNonHALocator();
817+
factory = createSessionFactory(locator);
818+
session = factory.createSession(false, true, true);
819+
820+
ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
821+
XmlDataImporter xmlDataImporter = new XmlDataImporter();
822+
xmlDataImporter.validate(xmlInputStream);
823+
xmlInputStream.reset();
824+
xmlDataImporter.process(xmlInputStream, session);
825+
826+
assertNull(server.locateQueue("realQueue"));
827+
}
828+
829+
private void sendDurableTextMessage(ClientSession session, String address, String body) throws Exception {
830+
ClientProducer producer = session.createProducer(address);
831+
ClientMessage message = session.createMessage(Message.TEXT_TYPE, true);
832+
message.getBodyBuffer().writeString(body);
833+
producer.send(message);
834+
producer.close();
835+
}
836+
728837
@Test
729838
public void testPagedMessageWithMissingBinding() throws Exception {
730839
final String MY_ADDRESS = "myAddress";

0 commit comments

Comments
 (0)