Skip to content

Commit f30a643

Browse files
committed
work on publish storage
1 parent 8b874fe commit f30a643

8 files changed

Lines changed: 31 additions & 32 deletions

File tree

core-service/src/main/java/javasabr/mqtt/service/impl/InMemoryRetainPublishService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public InMemoryRetainPublishService() {
2121
@Override
2222
public void retain(Publish publish) {
2323
PublishData data = publish.data();
24-
if (data.isEmpty()) {
24+
if (data.isPayloadEmpty()) {
2525
retainedMessageTree.removeRetainedMessage(publish.topicName());
2626
} else {
2727
retainedMessageTree.addRetainedMessage(publish);

core-service/src/main/java/javasabr/mqtt/service/message/out/factory/Mqtt311MessageOutFactory.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package javasabr.mqtt.service.message.out.factory;
22

33
import javasabr.mqtt.model.MqttVersion;
4-
import javasabr.mqtt.model.PayloadFormat;
54
import javasabr.mqtt.model.QoS;
65
import javasabr.mqtt.model.data.type.StringPair;
6+
import javasabr.mqtt.model.publish.PublishData;
77
import javasabr.mqtt.model.reason.code.AuthenticateReasonCode;
88
import javasabr.mqtt.model.reason.code.ConnectAckReasonCode;
99
import javasabr.mqtt.model.reason.code.DisconnectReasonCode;
@@ -63,12 +63,10 @@ public MqttOutMessage newPublish(
6363
boolean duplicate,
6464
TopicName topicName,
6565
int topicAlias,
66-
byte[] payload,
67-
PayloadFormat payloadFormat,
66+
PublishData data,
6867
@Nullable TopicName responseTopic,
69-
byte @Nullable [] correlationData,
7068
Array<StringPair> userProperties) {
71-
return new PublishMqtt311OutMessage(messageId, qos, retain, duplicate, topicName, payload);
69+
return new PublishMqtt311OutMessage(messageId, qos, retain, duplicate, topicName, data);
7270
}
7371

7472
@Override

core-service/src/main/java/javasabr/mqtt/service/message/out/factory/Mqtt5MessageOutFactory.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
import javasabr.mqtt.model.MqttClientConnectionConfig;
44
import javasabr.mqtt.model.MqttVersion;
5-
import javasabr.mqtt.model.PayloadFormat;
65
import javasabr.mqtt.model.QoS;
76
import javasabr.mqtt.model.data.type.StringPair;
7+
import javasabr.mqtt.model.publish.PublishData;
88
import javasabr.mqtt.model.reason.code.AuthenticateReasonCode;
99
import javasabr.mqtt.model.reason.code.ConnectAckReasonCode;
1010
import javasabr.mqtt.model.reason.code.DisconnectReasonCode;
@@ -76,22 +76,18 @@ public MqttOutMessage newPublish(
7676
boolean duplicate,
7777
TopicName topicName,
7878
int topicAlias,
79-
byte[] payload,
80-
PayloadFormat payloadFormat,
79+
PublishData data,
8180
@Nullable TopicName responseTopic,
82-
byte @Nullable [] correlationData,
8381
Array<StringPair> userProperties) {
8482
return new PublishMqtt5OutMessage(
8583
messageId,
8684
qos,
8785
retain,
8886
duplicate,
8987
topicName,
90-
payload,
88+
data,
9189
topicAlias,
92-
payloadFormat,
9390
responseTopic,
94-
correlationData,
9591
userProperties);
9692
}
9793

core-service/src/main/java/javasabr/mqtt/service/publish/impl/InMemoryPublishData.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,15 @@ public InMemoryPublishData(
4040
}
4141

4242
@Override
43-
public boolean isEmpty() {
43+
public boolean isPayloadEmpty() {
4444
return payloadSize < 1;
4545
}
4646

47+
@Override
48+
public boolean isCorrelationDataEmpty() {
49+
return correlationDataSize < 1;
50+
}
51+
4752
@Override
4853
public void writePayloadTo(ByteBuffer buffer) {
4954
buffer.put(payload);

core-service/src/main/java/javasabr/mqtt/service/session/impl/InMemoryProcessingPublishes.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import java.util.concurrent.locks.StampedLock;
44
import javasabr.mqtt.model.MqttUser;
55
import javasabr.mqtt.model.message.TrackableMqttMessage;
6-
import javasabr.mqtt.model.publishing.Publish;
6+
import javasabr.mqtt.model.publish.Publish;
77
import javasabr.mqtt.model.session.MqttSession;
88
import javasabr.mqtt.model.session.ProcessingPublishes;
99
import javasabr.mqtt.model.session.PublishRetryer;

model/src/main/java/javasabr/mqtt/model/publish/PublishData.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ public interface PublishData {
1616
int payloadSize();
1717
int correlationDataSize();
1818

19-
boolean isEmpty();
19+
boolean isPayloadEmpty();
20+
boolean isCorrelationDataEmpty();
2021

2122
void writePayloadTo(ByteBuffer buffer);
2223
void writeCorrelationDataTo(ByteBuffer buffer);

network/src/main/java/javasabr/mqtt/network/message/out/PublishMqtt311OutMessage.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import javasabr.mqtt.base.util.DebugUtils;
55
import javasabr.mqtt.model.QoS;
66
import javasabr.mqtt.model.message.MqttMessageType;
7+
import javasabr.mqtt.model.publish.PublishData;
78
import javasabr.mqtt.model.topic.TopicName;
89
import javasabr.mqtt.network.MqttConnection;
910
import lombok.AccessLevel;
@@ -23,7 +24,7 @@ public class PublishMqtt311OutMessage extends TrackableMqttOutMessage {
2324
}
2425

2526
QoS qos;
26-
byte[] payload;
27+
PublishData data;
2728
TopicName topicName;
2829

2930
boolean retain;
@@ -35,12 +36,12 @@ public PublishMqtt311OutMessage(
3536
boolean retain,
3637
boolean duplicate,
3738
TopicName topicName,
38-
byte[] payload) {
39+
PublishData data) {
3940
super(messageId);
4041
this.qos = qos;
4142
this.retain = retain;
4243
this.duplicate = duplicate;
43-
this.payload = payload;
44+
this.data = data;
4445
this.topicName = topicName;
4546
}
4647

@@ -56,7 +57,7 @@ public MqttMessageType messageType() {
5657

5758
@Override
5859
public int expectedLength(MqttConnection connection) {
59-
return 7 + payload.length;
60+
return 7 + data.payloadSize();
6061
}
6162

6263
@Override
@@ -83,6 +84,6 @@ protected void writeVariableHeader(MqttConnection connection, ByteBuffer buffer)
8384
@Override
8485
protected void writePayload(MqttConnection connection, ByteBuffer buffer) {
8586
// https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc384800413
86-
buffer.put(payload);
87+
data.writePayloadTo(buffer);
8788
}
8889
}

network/src/main/java/javasabr/mqtt/network/message/out/PublishMqtt5OutMessage.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
import javasabr.mqtt.base.util.DebugUtils;
77
import javasabr.mqtt.model.MqttMessageProperty;
88
import javasabr.mqtt.model.MqttProperties;
9-
import javasabr.mqtt.model.PayloadFormat;
109
import javasabr.mqtt.model.QoS;
1110
import javasabr.mqtt.model.data.type.StringPair;
11+
import javasabr.mqtt.model.publish.PublishData;
1212
import javasabr.mqtt.model.topic.TopicName;
1313
import javasabr.mqtt.network.MqttConnection;
1414
import javasabr.rlib.collections.array.Array;
@@ -148,10 +148,8 @@ public class PublishMqtt5OutMessage extends PublishMqtt311OutMessage {
148148

149149
@Nullable
150150
TopicName responseTopic;
151-
byte @Nullable [] correlationData;
152151

153152
Array<StringPair> userProperties;
154-
PayloadFormat payloadFormat;
155153

156154
int topicAlias;
157155

@@ -161,17 +159,13 @@ public PublishMqtt5OutMessage(
161159
boolean retain,
162160
boolean duplicate,
163161
TopicName topicName,
164-
byte[] payload,
162+
PublishData data,
165163
int topicAlias,
166-
PayloadFormat payloadFormat,
167164
@Nullable TopicName responseTopic,
168-
byte @Nullable [] correlationData,
169165
Array<StringPair> userProperties) {
170-
super(messageId, qos, retain, duplicate, topicName, payload);
166+
super(messageId, qos, retain, duplicate, topicName, data);
171167
this.topicAlias = topicAlias;
172-
this.payloadFormat = payloadFormat;
173168
this.responseTopic = responseTopic;
174-
this.correlationData = correlationData;
175169
this.userProperties = userProperties;
176170
}
177171

@@ -189,15 +183,19 @@ protected boolean isPropertiesSupported(MqttConnection connection) {
189183
protected void writeProperties(MqttConnection connection, ByteBuffer buffer) {
190184
// https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc511988586
191185
String rawResponseTopic = responseTopic == null ? null : responseTopic.rawTopic();
192-
writeProperty(buffer, MqttMessageProperty.PAYLOAD_FORMAT_INDICATOR, payloadFormat.code());
186+
writeProperty(buffer, MqttMessageProperty.PAYLOAD_FORMAT_INDICATOR, data.payloadFormat().code());
193187
writeProperty(
194188
buffer,
195189
MqttMessageProperty.MESSAGE_EXPIRY_INTERVAL,
196190
0,
197191
MqttProperties.MESSAGE_EXPIRY_INTERVAL_IS_NOT_SET);
198192
writeProperty(buffer, MqttMessageProperty.TOPIC_ALIAS, topicAlias, MqttProperties.TOPIC_ALIAS_NOT_SET);
199193
writeNotEmptyProperty(buffer, MqttMessageProperty.RESPONSE_TOPIC, rawResponseTopic);
200-
writeNotEmptyProperty(buffer, MqttMessageProperty.CORRELATION_DATA, correlationData);
201194
writeStringPairProperties(buffer, MqttMessageProperty.USER_PROPERTY, userProperties);
195+
196+
if (!data.isCorrelationDataEmpty()) {
197+
buffer.put(MqttMessageProperty.CORRELATION_DATA.id());
198+
data.writeCorrelationDataTo(buffer);
199+
}
202200
}
203201
}

0 commit comments

Comments
 (0)