Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
658c201
Support FIFO consume accelerator for Push consumer, recalling timed/d…
zhaohai666 Apr 22, 2026
bbc3500
update format
zhaohai666 Apr 22, 2026
db7839e
update format
zhaohai666 Apr 22, 2026
206bdc1
update format
zhaohai666 Apr 22, 2026
a23baa3
update version
zhaohai666 Apr 22, 2026
b8900ef
update version
zhaohai666 Apr 23, 2026
d6ebabc
update version
zhaohai666 Apr 23, 2026
59b70eb
update version
zhaohai666 Apr 23, 2026
a985b70
fix test
zhaohai666 Apr 23, 2026
225b8de
Merge branch 'master' into feature/support-fifo-accelerator-recall-de…
zhaohai666 Apr 28, 2026
543c254
update proto
zhaohai666 Apr 29, 2026
545e891
update message type
zhaohai666 Apr 29, 2026
f6c88a6
add lite consumer
zhaohai666 Apr 29, 2026
9cbf724
fix format
zhaohai666 Apr 29, 2026
357b9ac
fix format
zhaohai666 Apr 29, 2026
d3dbcfd
fix format
zhaohai666 Apr 29, 2026
6035f2a
fix lite push consumer
zhaohai666 Apr 29, 2026
490fec7
fix format
zhaohai666 Apr 29, 2026
757fe31
fix lite consumer type
zhaohai666 Apr 29, 2026
a80382a
fix lite push consumer
zhaohai666 Apr 30, 2026
44ec55d
fix format
zhaohai666 Apr 30, 2026
42f94da
fix lite push consumer
zhaohai666 Apr 30, 2026
6899fc7
optimize logs
zhaohai666 Apr 30, 2026
fe305eb
update lite push consumer
zhaohai666 May 6, 2026
19582c6
fix lite push consumer
zhaohai666 May 6, 2026
0ca57ab
fix lite_topic
zhaohai666 May 6, 2026
01403db
fix format
zhaohai666 May 6, 2026
cf649e8
Merge remote-tracking branch 'origin/master' into feature/support-fif…
zhaohai666 May 6, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
| Producer with FIFO messages | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 |
| Producer with timed/delay messages | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 |
| Producer with transactional messages | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 |
| Producer with recalling timed/delay messages | ✅ | ✅ | ✅ | ✅ | 🚧 | ✅ | ✅ | 🚧 |
| Producer with recalling timed/delay messages | ✅ | ✅ | ✅ | ✅ | | ✅ | ✅ | 🚧 |
| Simple consumer | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 |
| Push consumer with concurrent message listener | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 |
| Push consumer with FIFO message listener | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 |
| Push consumer with FIFO consume accelerator | ✅ | ✅ | ✅ | ✅ | 🚧 | ✅ | ✅ | 🚧 |
| Priority Message | ✅ | 🚧 | ✅ | ✅ | 🚧 | ✅ | ✅ | 🚧 |
| Push consumer with FIFO consume accelerator | ✅ | ✅ | ✅ | ✅ | | ✅ | ✅ | 🚧 |
| Priority Message | ✅ | 🚧 | ✅ | ✅ | | ✅ | ✅ | 🚧 |

## 先决条件和构建

Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ Provide cloud-native and robust solutions for Java, C++, C#, Golang, Rust and al
| Producer with FIFO messages | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 |
| Producer with timed/delay messages | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 |
| Producer with transactional messages | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 |
| Producer with recalling timed/delay messages | ✅ | ✅ | ✅ | ✅ | 🚧 | ✅ | ✅ | 🚧 |
| Producer with recalling timed/delay messages | ✅ | ✅ | ✅ | ✅ | | ✅ | ✅ | 🚧 |
| Simple consumer | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 |
| Push consumer with concurrent message listener | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 |
| Push consumer with FIFO message listener | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 |
| Push consumer with FIFO consume accelerator | ✅ | ✅ | ✅ | ✅ | 🚧 | ✅ | ✅ | 🚧 |
| Priority Message | ✅ | 🚧 | ✅ | ✅ | 🚧 | ✅ | ✅ | 🚧 |
| Push consumer with FIFO consume accelerator | ✅ | ✅ | ✅ | ✅ | | ✅ | ✅ | 🚧 |
| Priority Message | ✅ | 🚧 | ✅ | ✅ | | ✅ | ✅ | 🚧 |

## Prerequisite and Build

Expand Down
8 changes: 7 additions & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ minitrace = "0.4"
byteorder = "1"
mac_address = "1.1.4"
hex = "0.4.3"
time = { version = "0.3", features = ["formatting", "local-offset"] }
time = { version = ">=0.3.20, <0.3.37", features = ["formatting", "local-offset"] }
once_cell = "1.18.0"

mockall = "0.11.4"
Expand All @@ -64,12 +64,18 @@ mockall_double = "0.3.0"
siphasher = "0.3.10"
ring = "0.16.20"
tokio-util = { version = "=0.7.10", features = ["rt"] }
# Pin indexmap to avoid hashbrown 0.17+ which requires Rust 2024 edition
indexmap = ">=2.0, <2.8"
# Explicitly pin hashbrown to versions compatible with Rust 1.74
hashbrown = ">=0.12, <0.16"

[build-dependencies]
tonic-build = "0.10.2"
which = "7.0.3"
version_check = "0.9.4"
regex = "1.7.3"
# Pin tempfile to avoid getrandom 0.4+ which requires Rust 2024 edition
tempfile = ">=3.0, <3.15"

[dev-dependencies]
awaitility = "0.3.0"
Expand Down
112 changes: 112 additions & 0 deletions rust/examples/lite_producer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

//! Example demonstrating how to send lite messages using RocketMQ Rust client.
//!
//! Lite topics provide reduced metadata and storage overhead compared to regular topics.
//! This example shows how to create and send messages to lite topics.

use rocketmq::conf::{ClientOption, ProducerOption};
use rocketmq::model::message::MessageBuilder;
use rocketmq::Producer;
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// Initialize tracing for logging
tracing_subscriber::fmt::init();

// Configure client options
let mut client_option = ClientOption::default();
client_option.set_access_url("http://localhost:8080");

// Configure producer options
let mut producer_option = ProducerOption::default();
producer_option.set_topics(vec!["parent_topic".to_string()]);

// Create and start the producer
let mut producer = Producer::new(producer_option, client_option)?;
producer.start().await?;

println!("Producer started successfully");

// Define your message body
let body = b"This is a lite message for Apache RocketMQ";

// Build a lite message
// Note: lite_topic cannot be used with message_group, delivery_timestamp, or priority
let message = MessageBuilder::lite_message_builder(
"parent_topic", // Parent topic name
body.to_vec(), // Message body
"lite-topic-1", // Lite topic name
)
.set_keys(vec!["yourMessageKey-3ee439f945d7".to_string()])
.build()?;

println!("Sending lite message to lite-topic-1...");

// Send the message
match producer.send(message).await {
Ok(receipt) => {
println!(
"Send message successfully, message_id={}",
receipt.message_id()
);
}
Err(e) => {
eprintln!("Failed to send message: {:?}", e);
// Handle quota exceeded error if needed
// if e.kind() == ErrorKind::LiteTopicQuotaExceeded {
// eprintln!("Lite topic quota exceeded. Consider increasing the limit.");
// }
}
}

// Send multiple lite messages to different lite topics
for i in 1..=3 {
let lite_topic = format!("lite-topic-{}", i);
let message_body = format!("Lite message {} content", i);

let message = MessageBuilder::lite_message_builder(
"parent_topic",
message_body.into_bytes(),
&lite_topic,
)
.build()?;

match producer.send(message).await {
Ok(receipt) => {
println!(
"Sent message to {}, message_id={}",
lite_topic,
receipt.message_id()
);
}
Err(e) => {
eprintln!("Failed to send to {}: {:?}", lite_topic, e);
}
}
}

println!("\nAll messages sent successfully!");

// Shutdown the producer when done
producer.shutdown().await?;
println!("Producer shutdown");

Ok(())
}
118 changes: 118 additions & 0 deletions rust/examples/lite_push_consumer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

//! Example demonstrating how to consume lite messages using RocketMQ Rust client.
//!
//! LitePushConsumer provides efficient consumption of lite topics with reduced overhead.
//! This example shows how to subscribe to lite topics and handle messages.

use rocketmq::conf::{ClientOption, PushConsumerOption};
use rocketmq::model::message::MessageView;
use rocketmq::{ConsumeResult, LitePushConsumer, LitePushConsumerTrait};
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// Initialize tracing for logging
tracing_subscriber::fmt::init();

#[cfg(not(test))]
{
// Configure client options
let mut client_option = ClientOption::default();
client_option.set_access_url("http://localhost:8080");

// Configure push consumer options
let mut option = PushConsumerOption::default();
option.set_consumer_group("yourConsumerGroup");

// Create message listener
let message_listener = Box::new(|message: &MessageView| {
println!("Received message: {:?}", message);
println!(" Message ID: {}", message.message_id());
println!(" Topic: {:?}", message.topic());
if let Some(lite_topic) = message.lite_topic() {
println!(" Lite Topic: {}", lite_topic);
}
println!(" Body: {:?}", String::from_utf8_lossy(message.body()));

// Handle the received message and return consume result
ConsumeResult::SUCCESS
});

// Create and start the LitePushConsumer
// Note: bind_topic is the parent topic that lite topics belong to
let mut consumer = LitePushConsumer::new(
client_option,
option,
"yourParentTopic".to_string(),
message_listener,
)?;

consumer.start().await?;
println!("LitePushConsumer started successfully");

// Subscribe to lite topics
// The subscribe_lite() method initiates network requests and performs quota verification,
// so it may fail. It's important to check the result of this call.
//
// Possible failure scenarios:
// 1. Network request errors - can be retried
// 2. Quota verification failures (LiteSubscriptionQuotaExceededException) -
// evaluate whether quota is insufficient and unsubscribe unused topics

match consumer.subscribe_lite("lite-topic-1".to_string()).await {
Ok(_) => println!("Subscribed to lite-topic-1"),
Err(e) => eprintln!("Failed to subscribe to lite-topic-1: {:?}", e),
}

match consumer.subscribe_lite("lite-topic-2".to_string()).await {
Ok(_) => println!("Subscribed to lite-topic-2"),
Err(e) => eprintln!("Failed to subscribe to lite-topic-2: {:?}", e),
}

match consumer.subscribe_lite("lite-topic-3".to_string()).await {
Ok(_) => println!("Subscribed to lite-topic-3"),
Err(e) => eprintln!("Failed to subscribe to lite-topic-3: {:?}", e),
}

// Optionally unsubscribe from a lite topic when no longer needed
// This frees up quota resources
// consumer.unsubscribe_lite("lite-topic-3".to_string()).await?;

// Get current subscribed lite topics
let topics = consumer.get_lite_topic_set();
println!("\nCurrently subscribed lite topics: {:?}", topics);

println!("\nConsumer is running. Press Ctrl+C to stop...");

// Wait for shutdown signal
tokio::signal::ctrl_c().await?;
println!("\nReceived shutdown signal");

// Shutdown the consumer
consumer.shutdown().await?;
println!("LitePushConsumer shutdown");
}

#[cfg(test)]
{
println!("This example is not available in test mode");
}

Ok(())
}
93 changes: 93 additions & 0 deletions rust/examples/priority_producer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use rocketmq::conf::{ClientOption, ProducerOption};
use rocketmq::model::message::MessageBuilder;
use rocketmq::Producer;

#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();

// It's recommended to specify the topics that applications will publish messages to
// because the producer will prefetch topic routes for them on start and fail fast in case they do not exist
let mut producer_option = ProducerOption::default();
producer_option.set_topics(vec!["priority_test"]);

// set which rocketmq proxy to connect
let mut client_option = ClientOption::default();
client_option.set_access_url("localhost:8081");

// build and start producer
let mut producer = Producer::new(producer_option, client_option).unwrap();
let start_result = producer.start().await;
if start_result.is_err() {
eprintln!("producer start failed: {:?}", start_result.unwrap_err());
return;
}

// Build priority message using priority_message_builder
let message = MessageBuilder::priority_message_builder(
"priority_test",
"This is a priority message".as_bytes().to_vec(),
1, // priority level, higher value means higher priority
)
.set_tag("PriorityTag")
.set_keys(vec!["priority-key"])
.build()
.unwrap();

// send message to rocketmq proxy
let send_result = producer.send(message).await;
if send_result.is_err() {
eprintln!("send message failed: {:?}", send_result.unwrap_err());
return;
}
println!(
"Send priority message success, message_id={}",
send_result.unwrap().message_id()
);

// Alternatively, you can use builder pattern with set_priority
let message2 = MessageBuilder::builder()
.set_topic("priority_test")
.set_body("Another priority message".as_bytes().to_vec())
.set_tag("PriorityTag")
.set_keys(vec!["priority-key-2"])
.set_priority(5) // Set priority level
.build()
.unwrap();

let send_result2 = producer.send(message2).await;
if send_result2.is_err() {
eprintln!("send message failed: {:?}", send_result2.unwrap_err());
return;
}
println!(
"Send priority message success, message_id={}",
send_result2.unwrap().message_id()
);

// shutdown the producer when you don't need it anymore.
// recommend shutdown manually to gracefully stop and unregister from server
let shutdown_result = producer.shutdown().await;
if shutdown_result.is_err() {
eprintln!(
"producer shutdown failed: {:?}",
shutdown_result.unwrap_err()
);
}
}
5 changes: 5 additions & 0 deletions rust/examples/push_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ async fn main() {
option.set_consumer_group("test");
option.subscribe("test_topic", FilterExpression::new(FilterType::Tag, "*"));

// Enable FIFO consume accelerator for parallel consumption by messageGroup
// This allows messages with different messageGroups to be consumed in parallel
// while maintaining FIFO order within the same messageGroup
option.set_enable_fifo_consume_accelerator(true);

let callback: MessageListener = Box::new(|message| {
println!("Receive message: {:?}", message);
ConsumeResult::SUCCESS
Expand Down
Loading
Loading