Skip to content

Commit 840b18c

Browse files
authored
[ISSUE #10260] Reject delayed transactional messages in gRPC send path (#10261)
1 parent 9879968 commit 840b18c

2 files changed

Lines changed: 33 additions & 0 deletions

File tree

proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,9 @@ protected Map<String, String> buildMessageProperty(ProxyContext context, apache.
249249
// set transaction property
250250
MessageType messageType = message.getSystemProperties().getMessageType();
251251
if (messageType.equals(MessageType.TRANSACTION)) {
252+
if (message.getSystemProperties().hasDeliveryTimestamp()) {
253+
throw new GrpcProxyException(Code.BAD_REQUEST, "transaction message cannot set delivery timestamp");
254+
}
252255
MessageAccessor.putProperty(messageWithHeader, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
253256

254257
if (message.getSystemProperties().hasOrphanedTransactionRecoveryDuration()) {

proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -925,6 +925,36 @@ public void testParameterValidate() {
925925
}
926926
});
927927

928+
// transaction message cannot be delay message
929+
assertThrows(GrpcProxyException.class, () -> {
930+
try {
931+
this.sendMessageActivity.sendMessage(
932+
createContext(),
933+
SendMessageRequest.newBuilder()
934+
.addMessages(Message.newBuilder()
935+
.setTopic(Resource.newBuilder()
936+
.setName(TOPIC)
937+
.build())
938+
.setSystemProperties(SystemProperties.newBuilder()
939+
.setMessageId("id")
940+
.setDeliveryTimestamp(Timestamps.fromMillis(System.currentTimeMillis() + Duration.ofSeconds(5).toMillis()))
941+
.setQueueId(0)
942+
.setMessageType(MessageType.TRANSACTION)
943+
.setBornTimestamp(Timestamps.fromMillis(System.currentTimeMillis()))
944+
.setBornHost(StringUtils.defaultString(NetworkUtil.getLocalAddress(), "127.0.0.1:1234"))
945+
.build())
946+
.setBody(ByteString.copyFrom(new byte[3]))
947+
.build())
948+
.build()
949+
).get();
950+
} catch (ExecutionException t) {
951+
GrpcProxyException e = (GrpcProxyException) t.getCause();
952+
assertEquals(Code.BAD_REQUEST, e.getCode());
953+
assertEquals("transaction message cannot set delivery timestamp", e.getMessage());
954+
throw e;
955+
}
956+
});
957+
928958
// transactionRecoverySecond
929959
assertThrows(GrpcProxyException.class, () -> {
930960
try {

0 commit comments

Comments
 (0)