|
| 1 | +package io.arex.inst.mqtt.adapter; |
| 2 | + |
| 3 | +import io.arex.agent.bootstrap.util.StringUtil; |
| 4 | +import io.arex.inst.mqtt.warp.GenericMessageWarp; |
| 5 | +import io.arex.inst.mqtt.warp.MessageHeaderWarp; |
| 6 | +import io.arex.inst.runtime.util.LogUtil; |
| 7 | +import org.springframework.messaging.Message; |
| 8 | +import org.springframework.messaging.MessageChannel; |
| 9 | +import org.springframework.messaging.MessageHeaders; |
| 10 | +import org.springframework.messaging.support.GenericMessage; |
| 11 | + |
| 12 | +import java.lang.reflect.Field; |
| 13 | +import java.nio.charset.StandardCharsets; |
| 14 | + |
| 15 | +/** |
| 16 | + * MessageImpl |
| 17 | + */ |
| 18 | +public class MessageAdapterImpl implements MessageAdapter<MessageChannel, Message> { |
| 19 | + |
| 20 | + private static final MessageAdapterImpl INSTANCE = new MessageAdapterImpl(); |
| 21 | + |
| 22 | + public static MessageAdapterImpl getInstance() { |
| 23 | + return INSTANCE; |
| 24 | + } |
| 25 | + |
| 26 | + @Override |
| 27 | + public byte[] getMsg(MessageChannel messageChannel, Message msg) { |
| 28 | + if (msg == null){ |
| 29 | + return new byte[]{}; |
| 30 | + } |
| 31 | + Object payload = msg.getPayload(); |
| 32 | + if (payload == null){ |
| 33 | + return new byte[]{}; |
| 34 | + } |
| 35 | + if (payload instanceof byte[]){ |
| 36 | + return ((byte[]) payload); |
| 37 | + } |
| 38 | + return payload.toString().getBytes(StandardCharsets.UTF_8); |
| 39 | + } |
| 40 | + |
| 41 | + @Override |
| 42 | + public MessageChannel warpMC(Object messageChannel) { |
| 43 | + if (messageChannel == null){ |
| 44 | + return null; |
| 45 | + } |
| 46 | + if (messageChannel instanceof MessageChannel){ |
| 47 | + return (MessageChannel) messageChannel; |
| 48 | + } |
| 49 | + return null; |
| 50 | + } |
| 51 | + |
| 52 | + @Override |
| 53 | + public Message warpMessage(Object message) { |
| 54 | + if (message == null){ |
| 55 | + return null; |
| 56 | + } |
| 57 | + if (message instanceof GenericMessageWarp){ |
| 58 | + return (GenericMessageWarp) message; |
| 59 | + } |
| 60 | + |
| 61 | + if (message instanceof GenericMessage) { |
| 62 | + GenericMessage messageTemp = (GenericMessage) message; |
| 63 | + MessageHeaders headers = messageTemp.getHeaders(); |
| 64 | + MessageHeaderWarp messageHeaderWarp = new MessageHeaderWarp(headers); |
| 65 | + return new GenericMessageWarp(messageTemp.getPayload(), messageHeaderWarp); |
| 66 | + } |
| 67 | + if (message instanceof Message){ |
| 68 | + return (Message)message; |
| 69 | + } |
| 70 | + return null; |
| 71 | + } |
| 72 | + |
| 73 | + @Override |
| 74 | + public MessageHeaders getHeader(MessageChannel messageChannel, Message msg) { |
| 75 | + if (msg == null){ |
| 76 | + return null; |
| 77 | + } |
| 78 | + if (msg instanceof GenericMessageWarp){ |
| 79 | + GenericMessageWarp messageTemp = (GenericMessageWarp) msg; |
| 80 | + return messageTemp.getMessageHeaderWarp(); |
| 81 | + } |
| 82 | + return msg.getHeaders(); |
| 83 | + } |
| 84 | + |
| 85 | + @Override |
| 86 | + public boolean markProcessed(Message message, String flagKey) { |
| 87 | + if (message == null){ |
| 88 | + return true; |
| 89 | + } |
| 90 | + if (message instanceof GenericMessageWarp){ |
| 91 | + GenericMessageWarp genericMessageWarp = (GenericMessageWarp)message; |
| 92 | + genericMessageWarp.put(flagKey,Boolean.TRUE.toString()); |
| 93 | + } |
| 94 | + return false; |
| 95 | + } |
| 96 | + |
| 97 | + @Override |
| 98 | + public String getHeader(MessageChannel messageChannel, Message message, String key) { |
| 99 | + if (message == null || StringUtil.isEmpty(key)){ |
| 100 | + return null; |
| 101 | + } |
| 102 | + if (message instanceof GenericMessageWarp) { |
| 103 | + GenericMessageWarp genericMessageWarp = (GenericMessageWarp) message; |
| 104 | + Object object = genericMessageWarp.get(key); |
| 105 | + return object != null ? object.toString() : null; |
| 106 | + } |
| 107 | + |
| 108 | + if(message instanceof GenericMessage){ |
| 109 | + Object obj = message.getHeaders().get(key); |
| 110 | + return obj != null ? obj.toString() : null; |
| 111 | + } |
| 112 | + if (message.getHeaders() != null){ |
| 113 | + Object obj = message.getHeaders().get(key); |
| 114 | + return obj != null ? obj.toString() : null ; |
| 115 | + } |
| 116 | + return null; |
| 117 | + } |
| 118 | + |
| 119 | + @Override |
| 120 | + public boolean removeHeader(MessageChannel messageChannel, Message message, String key) { |
| 121 | + if (message == null || StringUtil.isEmpty(key)){ |
| 122 | + return false; |
| 123 | + } |
| 124 | + if (message instanceof GenericMessageWarp){ |
| 125 | + GenericMessageWarp genericMessageWarp = (GenericMessageWarp) message; |
| 126 | + genericMessageWarp.removeHeader(key); |
| 127 | + return true; |
| 128 | + } |
| 129 | + return false; |
| 130 | + } |
| 131 | + |
| 132 | + @Override |
| 133 | + public boolean addHeader(MessageChannel messageChannel, Message message, String key, String value) { |
| 134 | + if (message == null ){ |
| 135 | + return false; |
| 136 | + } |
| 137 | + if (message instanceof GenericMessageWarp){ |
| 138 | + GenericMessageWarp genericMessageWarp = (GenericMessageWarp) message; |
| 139 | + genericMessageWarp.put(key,value); |
| 140 | + return true; |
| 141 | + } |
| 142 | + return false; |
| 143 | + } |
| 144 | + |
| 145 | + @Override |
| 146 | + public Message resetMsg(Message message) { |
| 147 | + if (message == null){ |
| 148 | + return null; |
| 149 | + } |
| 150 | + if (message instanceof GenericMessageWarp){ |
| 151 | + try { |
| 152 | + GenericMessageWarp messageWarp = (GenericMessageWarp) message; |
| 153 | + Field headers = message.getClass().getSuperclass().getDeclaredField("headers"); |
| 154 | + headers.setAccessible(true); |
| 155 | + headers.set(message, messageWarp.getMessageHeaderWarp()); |
| 156 | + } catch (NoSuchFieldException e) { |
| 157 | + LogUtil.warn("MessageAdapterImpl.resetMsg - NoSuchFieldException", e); |
| 158 | + } catch (IllegalAccessException e) { |
| 159 | + LogUtil.warn("MessageAdapterImpl.resetMsg - IllegalAccessException", e); |
| 160 | + } |
| 161 | + } |
| 162 | + return message; |
| 163 | + } |
| 164 | + |
| 165 | + |
| 166 | +} |
0 commit comments