|
16 | 16 | package com.github.sonus21.rqueue.spring.boot.integration; |
17 | 17 |
|
18 | 18 | import com.github.sonus21.rqueue.converter.MessageConverterProvider; |
19 | | -import java.io.ByteArrayOutputStream; |
20 | | -import java.nio.charset.StandardCharsets; |
| 19 | +import java.io.IOException; |
21 | 20 | import java.util.Base64; |
| 21 | +import org.msgpack.core.MessageBufferPacker; |
| 22 | +import org.msgpack.core.MessagePack; |
| 23 | +import org.msgpack.core.MessageUnpacker; |
22 | 24 | import org.springframework.messaging.Message; |
23 | 25 | import org.springframework.messaging.MessageHeaders; |
24 | 26 | import org.springframework.messaging.converter.MessageConversionException; |
@@ -78,89 +80,36 @@ private static final class MsgPackCodec { |
78 | 80 | private MsgPackCodec() {} |
79 | 81 |
|
80 | 82 | static byte[] encode(MessagePackageListenerTest.ListenerPayload payload) { |
81 | | - ByteArrayOutputStream out = new ByteArrayOutputStream(); |
82 | | - out.write(0x82); |
83 | | - writeString(out, "backend"); |
84 | | - writeString(out, payload.getBackend()); |
85 | | - writeString(out, "body"); |
86 | | - writeString(out, payload.getBody()); |
87 | | - return out.toByteArray(); |
| 83 | + try (MessageBufferPacker packer = MessagePack.newDefaultBufferPacker()) { |
| 84 | + packer.packMapHeader(2); |
| 85 | + packer.packString("backend"); |
| 86 | + packer.packString(payload.getBackend()); |
| 87 | + packer.packString("body"); |
| 88 | + packer.packString(payload.getBody()); |
| 89 | + return packer.toByteArray(); |
| 90 | + } catch (IOException e) { |
| 91 | + throw new MessageConversionException("MsgPack encoding failed", e); |
| 92 | + } |
88 | 93 | } |
89 | 94 |
|
90 | 95 | static MessagePackageListenerTest.ListenerPayload decode(byte[] bytes) { |
91 | | - Cursor cursor = new Cursor(bytes); |
92 | | - int mapHeader = cursor.readUnsignedByte(); |
93 | | - int entries; |
94 | | - if ((mapHeader & 0xf0) == 0x80) { |
95 | | - entries = mapHeader & 0x0f; |
96 | | - } else { |
97 | | - throw new MessageConversionException("Expected MsgPack fixmap"); |
98 | | - } |
99 | | - String backend = null; |
100 | | - String body = null; |
101 | | - for (int i = 0; i < entries; i++) { |
102 | | - String key = readString(cursor); |
103 | | - String value = readString(cursor); |
104 | | - if ("backend".equals(key)) { |
105 | | - backend = value; |
106 | | - } else if ("body".equals(key)) { |
107 | | - body = value; |
| 96 | + try (MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(bytes)) { |
| 97 | + int entries = unpacker.unpackMapHeader(); |
| 98 | + String backend = null; |
| 99 | + String body = null; |
| 100 | + for (int i = 0; i < entries; i++) { |
| 101 | + String key = unpacker.unpackString(); |
| 102 | + String value = unpacker.unpackString(); |
| 103 | + if ("backend".equals(key)) { |
| 104 | + backend = value; |
| 105 | + } else if ("body".equals(key)) { |
| 106 | + body = value; |
| 107 | + } |
108 | 108 | } |
| 109 | + return new MessagePackageListenerTest.ListenerPayload(backend, body); |
| 110 | + } catch (IOException e) { |
| 111 | + throw new MessageConversionException("MsgPack decoding failed", e); |
109 | 112 | } |
110 | | - return new MessagePackageListenerTest.ListenerPayload(backend, body); |
111 | | - } |
112 | | - |
113 | | - private static void writeString(ByteArrayOutputStream out, String value) { |
114 | | - byte[] bytes = value.getBytes(StandardCharsets.UTF_8); |
115 | | - if (bytes.length <= 31) { |
116 | | - out.write(0xa0 | bytes.length); |
117 | | - } else if (bytes.length <= 255) { |
118 | | - out.write(0xd9); |
119 | | - out.write(bytes.length); |
120 | | - } else { |
121 | | - throw new MessageConversionException("Test MsgPack codec supports strings up to 255 bytes"); |
122 | | - } |
123 | | - out.writeBytes(bytes); |
124 | | - } |
125 | | - |
126 | | - private static String readString(Cursor cursor) { |
127 | | - int header = cursor.readUnsignedByte(); |
128 | | - int length; |
129 | | - if ((header & 0xe0) == 0xa0) { |
130 | | - length = header & 0x1f; |
131 | | - } else if (header == 0xd9) { |
132 | | - length = cursor.readUnsignedByte(); |
133 | | - } else { |
134 | | - throw new MessageConversionException("Expected MsgPack string"); |
135 | | - } |
136 | | - return new String(cursor.readBytes(length), StandardCharsets.UTF_8); |
137 | | - } |
138 | | - } |
139 | | - |
140 | | - private static final class Cursor { |
141 | | - |
142 | | - private final byte[] bytes; |
143 | | - private int index; |
144 | | - |
145 | | - Cursor(byte[] bytes) { |
146 | | - this.bytes = bytes; |
147 | | - } |
148 | | - |
149 | | - int readUnsignedByte() { |
150 | | - if (index >= bytes.length) { |
151 | | - throw new MessageConversionException("Unexpected end of MsgPack payload"); |
152 | | - } |
153 | | - return bytes[index++] & 0xff; |
154 | | - } |
155 | | - |
156 | | - byte[] readBytes(int length) { |
157 | | - if (index + length > bytes.length) { |
158 | | - throw new MessageConversionException("Unexpected end of MsgPack payload"); |
159 | | - } |
160 | | - byte[] value = new byte[length]; |
161 | | - System.arraycopy(bytes, index, value, 0, length); |
162 | | - index += length; |
163 | | - return value; |
164 | 113 | } |
165 | 114 | } |
166 | 115 | } |
0 commit comments