diff --git a/README-CN.md b/README-CN.md index 8947e01a6..67dd9e929 100644 --- a/README-CN.md +++ b/README-CN.md @@ -23,12 +23,12 @@ | Producer with FIFO messages | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | | Producer with timed/delay messages | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | | Producer with transactional messages | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | -| Producer with recalling timed/delay messages | ✅ | ✅ | ✅ | ✅ | 🚧 | ✅ | ✅ | 🚧 | +| Producer with recalling timed/delay messages | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | | Simple consumer | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | | Push consumer with concurrent message listener | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | | Push consumer with FIFO message listener | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | -| Push consumer with FIFO consume accelerator | ✅ | ✅ | ✅ | ✅ | 🚧 | ✅ | ✅ | 🚧 | -| Priority Message | ✅ | 🚧 | ✅ | ✅ | 🚧 | ✅ | ✅ | 🚧 | +| Push consumer with FIFO consume accelerator | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | +| Priority Message | ✅ | 🚧 | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | ## 先决条件和构建 diff --git a/README.md b/README.md index e416805fa..b33aa49d9 100644 --- a/README.md +++ b/README.md @@ -23,12 +23,12 @@ Provide cloud-native and robust solutions for Java, C++, C#, Golang, Rust and al | Producer with FIFO messages | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | | Producer with timed/delay messages | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | | Producer with transactional messages | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | -| Producer with recalling timed/delay messages | ✅ | ✅ | ✅ | ✅ | 🚧 | ✅ | ✅ | 🚧 | +| Producer with recalling timed/delay messages | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | | Simple consumer | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | | Push consumer with concurrent message listener | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | | Push consumer with FIFO message listener | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | -| Push consumer with FIFO consume accelerator | ✅ | ✅ | ✅ | ✅ | 🚧 | ✅ | ✅ | 🚧 | -| Priority Message | ✅ | 🚧 | ✅ | ✅ | 🚧 | ✅ | ✅ | 🚧 | +| Push consumer with FIFO consume accelerator | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | +| Priority Message | ✅ | 🚧 | ✅ | ✅ | ✅ | ✅ | ✅ | 🚧 | ## Prerequisite and Build diff --git a/rust/Cargo.toml b/rust/Cargo.toml index e55a0bba9..dc8ca8d7d 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -55,7 +55,7 @@ minitrace = "0.4" byteorder = "1" mac_address = "1.1.4" hex = "0.4.3" -time = { version = "0.3", features = ["formatting", "local-offset"] } +time = { version = ">=0.3.20, <0.3.37", features = ["formatting", "local-offset"] } once_cell = "1.18.0" mockall = "0.11.4" @@ -64,12 +64,18 @@ mockall_double = "0.3.0" siphasher = "0.3.10" ring = "0.16.20" tokio-util = { version = "=0.7.10", features = ["rt"] } +# Pin indexmap to avoid hashbrown 0.17+ which requires Rust 2024 edition +indexmap = ">=2.0, <2.8" +# Explicitly pin hashbrown to versions compatible with Rust 1.74 +hashbrown = ">=0.12, <0.16" [build-dependencies] tonic-build = "0.10.2" which = "7.0.3" version_check = "0.9.4" regex = "1.7.3" +# Pin tempfile to avoid getrandom 0.4+ which requires Rust 2024 edition +tempfile = ">=3.0, <3.15" [dev-dependencies] awaitility = "0.3.0" diff --git a/rust/examples/lite_producer.rs b/rust/examples/lite_producer.rs new file mode 100644 index 000000000..69fae57d4 --- /dev/null +++ b/rust/examples/lite_producer.rs @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//! Example demonstrating how to send lite messages using RocketMQ Rust client. +//! +//! Lite topics provide reduced metadata and storage overhead compared to regular topics. +//! This example shows how to create and send messages to lite topics. + +use rocketmq::conf::{ClientOption, ProducerOption}; +use rocketmq::model::message::MessageBuilder; +use rocketmq::Producer; +use std::error::Error; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Initialize tracing for logging + tracing_subscriber::fmt::init(); + + // Configure client options + let mut client_option = ClientOption::default(); + client_option.set_access_url("http://localhost:8080"); + + // Configure producer options + let mut producer_option = ProducerOption::default(); + producer_option.set_topics(vec!["parent_topic".to_string()]); + + // Create and start the producer + let mut producer = Producer::new(producer_option, client_option)?; + producer.start().await?; + + println!("Producer started successfully"); + + // Define your message body + let body = b"This is a lite message for Apache RocketMQ"; + + // Build a lite message + // Note: lite_topic cannot be used with message_group, delivery_timestamp, or priority + let message = MessageBuilder::lite_message_builder( + "parent_topic", // Parent topic name + body.to_vec(), // Message body + "lite-topic-1", // Lite topic name + ) + .set_keys(vec!["yourMessageKey-3ee439f945d7".to_string()]) + .build()?; + + println!("Sending lite message to lite-topic-1..."); + + // Send the message + match producer.send(message).await { + Ok(receipt) => { + println!( + "Send message successfully, message_id={}", + receipt.message_id() + ); + } + Err(e) => { + eprintln!("Failed to send message: {:?}", e); + // Handle quota exceeded error if needed + // if e.kind() == ErrorKind::LiteTopicQuotaExceeded { + // eprintln!("Lite topic quota exceeded. Consider increasing the limit."); + // } + } + } + + // Send multiple lite messages to different lite topics + for i in 1..=3 { + let lite_topic = format!("lite-topic-{}", i); + let message_body = format!("Lite message {} content", i); + + let message = MessageBuilder::lite_message_builder( + "parent_topic", + message_body.into_bytes(), + &lite_topic, + ) + .build()?; + + match producer.send(message).await { + Ok(receipt) => { + println!( + "Sent message to {}, message_id={}", + lite_topic, + receipt.message_id() + ); + } + Err(e) => { + eprintln!("Failed to send to {}: {:?}", lite_topic, e); + } + } + } + + println!("\nAll messages sent successfully!"); + + // Shutdown the producer when done + producer.shutdown().await?; + println!("Producer shutdown"); + + Ok(()) +} diff --git a/rust/examples/lite_push_consumer.rs b/rust/examples/lite_push_consumer.rs new file mode 100644 index 000000000..f73117c81 --- /dev/null +++ b/rust/examples/lite_push_consumer.rs @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//! Example demonstrating how to consume lite messages using RocketMQ Rust client. +//! +//! LitePushConsumer provides efficient consumption of lite topics with reduced overhead. +//! This example shows how to subscribe to lite topics and handle messages. + +use rocketmq::conf::{ClientOption, PushConsumerOption}; +use rocketmq::model::message::MessageView; +use rocketmq::{ConsumeResult, LitePushConsumer, LitePushConsumerTrait}; +use std::error::Error; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Initialize tracing for logging + tracing_subscriber::fmt::init(); + + #[cfg(not(test))] + { + // Configure client options + let mut client_option = ClientOption::default(); + client_option.set_access_url("http://localhost:8080"); + + // Configure push consumer options + let mut option = PushConsumerOption::default(); + option.set_consumer_group("yourConsumerGroup"); + + // Create message listener + let message_listener = Box::new(|message: &MessageView| { + println!("Received message: {:?}", message); + println!(" Message ID: {}", message.message_id()); + println!(" Topic: {:?}", message.topic()); + if let Some(lite_topic) = message.lite_topic() { + println!(" Lite Topic: {}", lite_topic); + } + println!(" Body: {:?}", String::from_utf8_lossy(message.body())); + + // Handle the received message and return consume result + ConsumeResult::SUCCESS + }); + + // Create and start the LitePushConsumer + // Note: bind_topic is the parent topic that lite topics belong to + let mut consumer = LitePushConsumer::new( + client_option, + option, + "yourParentTopic".to_string(), + message_listener, + )?; + + consumer.start().await?; + println!("LitePushConsumer started successfully"); + + // Subscribe to lite topics + // The subscribe_lite() method initiates network requests and performs quota verification, + // so it may fail. It's important to check the result of this call. + // + // Possible failure scenarios: + // 1. Network request errors - can be retried + // 2. Quota verification failures (LiteSubscriptionQuotaExceededException) - + // evaluate whether quota is insufficient and unsubscribe unused topics + + match consumer.subscribe_lite("lite-topic-1".to_string()).await { + Ok(_) => println!("Subscribed to lite-topic-1"), + Err(e) => eprintln!("Failed to subscribe to lite-topic-1: {:?}", e), + } + + match consumer.subscribe_lite("lite-topic-2".to_string()).await { + Ok(_) => println!("Subscribed to lite-topic-2"), + Err(e) => eprintln!("Failed to subscribe to lite-topic-2: {:?}", e), + } + + match consumer.subscribe_lite("lite-topic-3".to_string()).await { + Ok(_) => println!("Subscribed to lite-topic-3"), + Err(e) => eprintln!("Failed to subscribe to lite-topic-3: {:?}", e), + } + + // Optionally unsubscribe from a lite topic when no longer needed + // This frees up quota resources + // consumer.unsubscribe_lite("lite-topic-3".to_string()).await?; + + // Get current subscribed lite topics + let topics = consumer.get_lite_topic_set(); + println!("\nCurrently subscribed lite topics: {:?}", topics); + + println!("\nConsumer is running. Press Ctrl+C to stop..."); + + // Wait for shutdown signal + tokio::signal::ctrl_c().await?; + println!("\nReceived shutdown signal"); + + // Shutdown the consumer + consumer.shutdown().await?; + println!("LitePushConsumer shutdown"); + } + + #[cfg(test)] + { + println!("This example is not available in test mode"); + } + + Ok(()) +} diff --git a/rust/examples/priority_producer.rs b/rust/examples/priority_producer.rs new file mode 100644 index 000000000..f15c7b5e1 --- /dev/null +++ b/rust/examples/priority_producer.rs @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use rocketmq::conf::{ClientOption, ProducerOption}; +use rocketmq::model::message::MessageBuilder; +use rocketmq::Producer; + +#[tokio::main] +async fn main() { + tracing_subscriber::fmt::init(); + + // It's recommended to specify the topics that applications will publish messages to + // because the producer will prefetch topic routes for them on start and fail fast in case they do not exist + let mut producer_option = ProducerOption::default(); + producer_option.set_topics(vec!["priority_test"]); + + // set which rocketmq proxy to connect + let mut client_option = ClientOption::default(); + client_option.set_access_url("localhost:8081"); + + // build and start producer + let mut producer = Producer::new(producer_option, client_option).unwrap(); + let start_result = producer.start().await; + if start_result.is_err() { + eprintln!("producer start failed: {:?}", start_result.unwrap_err()); + return; + } + + // Build priority message using priority_message_builder + let message = MessageBuilder::priority_message_builder( + "priority_test", + "This is a priority message".as_bytes().to_vec(), + 1, // priority level, higher value means higher priority + ) + .set_tag("PriorityTag") + .set_keys(vec!["priority-key"]) + .build() + .unwrap(); + + // send message to rocketmq proxy + let send_result = producer.send(message).await; + if send_result.is_err() { + eprintln!("send message failed: {:?}", send_result.unwrap_err()); + return; + } + println!( + "Send priority message success, message_id={}", + send_result.unwrap().message_id() + ); + + // Alternatively, you can use builder pattern with set_priority + let message2 = MessageBuilder::builder() + .set_topic("priority_test") + .set_body("Another priority message".as_bytes().to_vec()) + .set_tag("PriorityTag") + .set_keys(vec!["priority-key-2"]) + .set_priority(5) // Set priority level + .build() + .unwrap(); + + let send_result2 = producer.send(message2).await; + if send_result2.is_err() { + eprintln!("send message failed: {:?}", send_result2.unwrap_err()); + return; + } + println!( + "Send priority message success, message_id={}", + send_result2.unwrap().message_id() + ); + + // shutdown the producer when you don't need it anymore. + // recommend shutdown manually to gracefully stop and unregister from server + let shutdown_result = producer.shutdown().await; + if shutdown_result.is_err() { + eprintln!( + "producer shutdown failed: {:?}", + shutdown_result.unwrap_err() + ); + } +} diff --git a/rust/examples/push_consumer.rs b/rust/examples/push_consumer.rs index 3de2691c9..62bf90374 100644 --- a/rust/examples/push_consumer.rs +++ b/rust/examples/push_consumer.rs @@ -34,6 +34,11 @@ async fn main() { option.set_consumer_group("test"); option.subscribe("test_topic", FilterExpression::new(FilterType::Tag, "*")); + // Enable FIFO consume accelerator for parallel consumption by messageGroup + // This allows messages with different messageGroups to be consumed in parallel + // while maintaining FIFO order within the same messageGroup + option.set_enable_fifo_consume_accelerator(true); + let callback: MessageListener = Box::new(|message| { println!("Receive message: {:?}", message); ConsumeResult::SUCCESS diff --git a/rust/examples/recall_producer.rs b/rust/examples/recall_producer.rs new file mode 100644 index 000000000..a761287f1 --- /dev/null +++ b/rust/examples/recall_producer.rs @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use std::ops::Add; +use std::time::{Duration, SystemTime}; + +use rocketmq::conf::{ClientOption, ProducerOption}; +use rocketmq::model::message::MessageBuilder; +use rocketmq::Producer; + +#[tokio::main] +async fn main() { + tracing_subscriber::fmt::init(); + + // It's recommended to specify the topics that applications will publish messages to + let mut producer_option = ProducerOption::default(); + producer_option.set_topics(vec!["delay_test"]); + + // set which rocketmq proxy to connect + let mut client_option = ClientOption::default(); + client_option.set_access_url("localhost:8081"); + + // build and start producer + let mut producer = Producer::new(producer_option, client_option).unwrap(); + let start_result = producer.start().await; + if start_result.is_err() { + eprintln!("producer start failed: {:?}", start_result.unwrap_err()); + return; + } + + // Build delay message + let message = MessageBuilder::delay_message_builder( + "delay_test", + "This is a delay message that will be recalled" + .as_bytes() + .to_vec(), + // deliver in 60 seconds + SystemTime::now() + .add(Duration::from_secs(60)) + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs() as i64, + ) + .set_tag("DelayTag") + .set_keys(vec!["delay-key"]) + .build() + .unwrap(); + + // Send delay message to rocketmq proxy + let send_result = producer.send(message).await; + if send_result.is_err() { + eprintln!("send message failed: {:?}", send_result.unwrap_err()); + return; + } + + let receipt = send_result.unwrap(); + println!( + "Send delay message success, message_id={}, recall_handle={:?}", + receipt.message_id(), + receipt.recall_handle() + ); + + // Attempt to recall the message if recall_handle is available + if let Some(recall_handle) = receipt.recall_handle() { + println!("Attempting to recall the message..."); + + let recall_result = producer.recall_message("delay_test", recall_handle).await; + + match recall_result { + Ok(recalled_message_id) => { + println!( + "Successfully recalled message! Recalled message_id: {}", + recalled_message_id + ); + } + Err(e) => { + eprintln!("Failed to recall message: {:?}", e); + println!("Note: Recall operation requires server support."); + } + } + } else { + println!("No recall handle available. This message cannot be recalled."); + } + + // shutdown the producer when you don't need it anymore. + let shutdown_result = producer.shutdown().await; + if shutdown_result.is_err() { + eprintln!( + "producer shutdown failed: {:?}", + shutdown_result.unwrap_err() + ); + } +} diff --git a/rust/proto/apache/rocketmq/v2/definition.proto b/rust/proto/apache/rocketmq/v2/definition.proto index 753bfcebe..516474df4 100644 --- a/rust/proto/apache/rocketmq/v2/definition.proto +++ b/rust/proto/apache/rocketmq/v2/definition.proto @@ -146,6 +146,12 @@ enum MessageType { // Messages that are transactional. Only committed messages are delivered to // subscribers. TRANSACTION = 4; + + // lite topic + LITE = 5; + + // Messages that lower prioritised ones may need to wait for higher priority messages to be processed first + PRIORITY = 6; } enum DigestType { @@ -186,6 +192,8 @@ enum ClientType { PUSH_CONSUMER = 2; SIMPLE_CONSUMER = 3; PULL_CONSUMER = 4; + LITE_PUSH_CONSUMER = 5; + LITE_SIMPLE_CONSUMER = 6; } enum Encoding { @@ -270,6 +278,12 @@ message SystemProperties { // Information to identify whether this message is from dead letter queue. optional DeadLetterQueue dead_letter_queue = 20; + + // lite topic + optional string lite_topic = 21; + + // Priority of message, which is optional + optional int32 priority = 22; } message DeadLetterQueue { @@ -346,6 +360,10 @@ enum Code { CLIENT_ID_REQUIRED = 40017; // Polling time is illegal. ILLEGAL_POLLING_TIME = 40018; + // Offset is illegal. + ILLEGAL_OFFSET = 40019; + // Format of lite topic is illegal. + ILLEGAL_LITE_TOPIC = 40020; // Generic code indicates that the client request lacks valid authentication // credentials for the requested resource. @@ -365,6 +383,8 @@ enum Code { TOPIC_NOT_FOUND = 40402; // Consumer group resource does not exist. CONSUMER_GROUP_NOT_FOUND = 40403; + // Offset not found from server. + OFFSET_NOT_FOUND = 40404; // Generic code representing client side timeout when connecting to, reading data from, or write data to server. REQUEST_TIMEOUT = 40800; @@ -373,6 +393,8 @@ enum Code { PAYLOAD_TOO_LARGE = 41300; // Message body size exceeds the threshold. MESSAGE_BODY_TOO_LARGE = 41301; + // Message body is empty. + MESSAGE_BODY_EMPTY = 41302; // Generic code for use cases where pre-conditions are not met. // For example, if a producer instance is used to publish messages without prior start() invocation, @@ -383,6 +405,10 @@ enum Code { // Requests are throttled. TOO_MANY_REQUESTS = 42900; + // LiteTopic related quota exceeded + LITE_TOPIC_QUOTA_EXCEEDED = 42901; + LITE_SUBSCRIPTION_QUOTA_EXCEEDED = 42902; + // Generic code for the case that the server is unwilling to process the request because its header fields are too large. // The request may be resubmitted after reducing the size of the request header fields. REQUEST_HEADER_FIELDS_TOO_LARGE = 43100; @@ -542,6 +568,21 @@ message Subscription { // Long-polling timeout for `ReceiveMessageRequest`, which is essential for // push consumer. optional google.protobuf.Duration long_polling_timeout = 5; + + // Only lite push consumer + // client-side lite subscription quota limit + optional int32 lite_subscription_quota = 6; + + // Only lite push consumer + // Maximum length limit for lite topic + optional int32 max_lite_topic_size = 7; +} + +enum LiteSubscriptionAction { + PARTIAL_ADD = 0; + PARTIAL_REMOVE = 1; + COMPLETE_ADD = 2; + COMPLETE_REMOVE = 3; } message Metric { @@ -561,4 +602,19 @@ enum QueryOffsetPolicy { // Use this option if time-based seek is targeted. TIMESTAMP = 2; +} + +message OffsetOption { + oneof offset_type { + Policy policy = 1; + int64 offset = 2; + int64 tail_n = 3; + int64 timestamp = 4; + } + + enum Policy { + LAST = 0; + MIN = 1; + MAX = 2; + } } \ No newline at end of file diff --git a/rust/proto/apache/rocketmq/v2/service.proto b/rust/proto/apache/rocketmq/v2/service.proto index f662f769e..633d166a5 100644 --- a/rust/proto/apache/rocketmq/v2/service.proto +++ b/rust/proto/apache/rocketmq/v2/service.proto @@ -66,6 +66,8 @@ message SendResultEntry { string message_id = 2; string transaction_id = 3; int64 offset = 4; + // Unique handle to identify message to recall, support delay message for now. + string recall_handle = 5; } message SendMessageResponse { @@ -97,6 +99,7 @@ message ReceiveMessageRequest { // For message auto renew and clean bool auto_renew = 6; optional google.protobuf.Duration long_polling_timeout = 7; + optional string attempt_id = 8; } message ReceiveMessageResponse { @@ -111,6 +114,7 @@ message ReceiveMessageResponse { message AckMessageEntry { string message_id = 1; string receipt_handle = 2; + optional string lite_topic = 3; } message AckMessageRequest { @@ -145,6 +149,7 @@ message ForwardMessageToDeadLetterQueueRequest { string message_id = 4; int32 delivery_attempt = 5; int32 max_delivery_attempts = 6; + optional string lite_topic = 7; } message ForwardMessageToDeadLetterQueueResponse { Status status = 1; } @@ -169,6 +174,8 @@ message EndTransactionResponse { Status status = 1; } message PrintThreadStackTraceCommand { string nonce = 1; } +message ReconnectEndpointsCommand { string nonce = 1; } + message ThreadStackTrace { string nonce = 1; optional string thread_stack_trace = 2; @@ -188,6 +195,10 @@ message RecoverOrphanedTransactionCommand { string transaction_id = 2; } +message NotifyUnsubscribeLiteCommand { + string lite_topic = 1; +} + message TelemetryCommand { optional Status status = 1; @@ -213,6 +224,12 @@ message TelemetryCommand { // Request client to verify the consumption of the appointed message. VerifyMessageCommand verify_message_command = 7; + + // Request client to reconnect server use the latest endpoints. + ReconnectEndpointsCommand reconnect_endpoints_command = 8; + + // Request client to unsubscribe lite topic. + NotifyUnsubscribeLiteCommand notify_unsubscribe_lite_command = 9; } } @@ -235,6 +252,10 @@ message ChangeInvisibleDurationRequest { // For message tracing string message_id = 5; + + optional string lite_topic = 6; + // If true, server will not increment the retry times for this message + optional bool suspend = 7; } message ChangeInvisibleDurationResponse { @@ -292,6 +313,33 @@ message QueryOffsetResponse { int64 offset = 2; } +message RecallMessageRequest { + Resource topic = 1; + // Refer to SendResultEntry. + string recall_handle = 2; +} + +message RecallMessageResponse { + Status status = 1; + string message_id = 2; +} + +message SyncLiteSubscriptionRequest { + LiteSubscriptionAction action = 1; + // bindTopic for lite push consumer + Resource topic = 2; + // consumer group + Resource group = 3; + // lite subscription set of lite topics + repeated string lite_topic_set = 4; + optional int64 version = 5; + optional OffsetOption offset_option = 6; +} + +message SyncLiteSubscriptionResponse { + Status status = 1; +} + // For all the RPCs in MessagingService, the following error handling policies // apply: // @@ -379,10 +427,15 @@ service MessagingService { rpc PullMessage(PullMessageRequest) returns (stream PullMessageResponse) {} + // Update the consumption progress of the designated queue of the + // consumer group to the remote. rpc UpdateOffset(UpdateOffsetRequest) returns (UpdateOffsetResponse) {} + // Query the consumption progress of the designated queue of the + // consumer group to the remote. rpc GetOffset(GetOffsetRequest) returns (GetOffsetResponse) {} + // Query the offset of the designated queue by the query offset policy. rpc QueryOffset(QueryOffsetRequest) returns (QueryOffsetResponse) {} // Commits or rollback one transactional message. @@ -408,4 +461,13 @@ service MessagingService { // ChangeInvisibleDuration to lengthen invisible duration. rpc ChangeInvisibleDuration(ChangeInvisibleDurationRequest) returns (ChangeInvisibleDurationResponse) { } + + // Recall a message, + // for delay message, should recall before delivery time, like the rollback operation of transaction message, + // for normal message, not supported for now. + rpc RecallMessage(RecallMessageRequest) returns (RecallMessageResponse) { + } + + // Sync lite subscription info, lite push consumer only + rpc SyncLiteSubscription(SyncLiteSubscriptionRequest) returns (SyncLiteSubscriptionResponse) {} } \ No newline at end of file diff --git a/rust/src/client.rs b/rust/src/client.rs index 0faefcb41..e17b8617e 100644 --- a/rust/src/client.rs +++ b/rust/src/client.rs @@ -36,9 +36,10 @@ use crate::model::message::AckMessageEntry; use crate::pb; use crate::pb::receive_message_response::Content; use crate::pb::{ - AckMessageRequest, AckMessageResultEntry, ChangeInvisibleDurationRequest, FilterExpression, - HeartbeatRequest, HeartbeatResponse, Message, MessageQueue, NotifyClientTerminationRequest, - QueryRouteRequest, ReceiveMessageRequest, Resource, SendMessageRequest, Status, + telemetry_command::Command, AckMessageRequest, AckMessageResultEntry, + ChangeInvisibleDurationRequest, FilterExpression, HeartbeatRequest, HeartbeatResponse, Message, + MessageQueue, NotifyClientTerminationRequest, QueryRouteRequest, RecallMessageRequest, + RecallMessageResponse, ReceiveMessageRequest, Resource, SendMessageRequest, Status, TelemetryCommand, }; use crate::session::RPCClient; @@ -113,6 +114,68 @@ impl Client { self.route_manager.clone() } + /// Clone client for LitePushConsumer (shares SessionManager for true Lite mode) + /// + /// This method creates a lightweight clone of the client specifically for LitePushConsumer + /// and LiteSubscriptionManager. The key feature is that it shares the same SessionManager + /// with the original client, enabling true Lite mode where both clients use the same + /// telemetry session. + /// + /// Key design decisions: + /// - **SessionManager Sharing**: Uses Arc::clone to share the same SessionManager instance, + /// ensuring both the original client and the cloned Lite client use the same telemetry + /// session. This is the core of true Lite mode. + /// - **Client Type**: Changed to LitePushConsumer to identify this as a lite consumer. + /// - **FIFO Mode**: Explicitly enabled for LitePushConsumer to ensure ordered message consumption. + /// - **No shutdown_tx Copy**: Sets shutdown_tx to None to avoid duplicate shutdown signals. + /// Only the original client should trigger shutdown to prevent race conditions. + /// - **Lightweight Clone**: Only clones necessary fields, avoiding unnecessary deep copies. + /// + /// Usage: + /// ```ignore + /// let lite_client = original_client.clone_for_lite_consumer(); + /// // Use lite_client for LiteSubscriptionManager + /// // Both clients share the same SessionManager and telemetry session + /// ``` + pub(crate) fn clone_for_lite_consumer(&self) -> Self { + // Update client type to LitePushConsumer + let mut new_option = self.option.clone(); + new_option.client_type = ClientType::LitePushConsumer; + + // Create new settings with LitePushConsumer type and FIFO enabled + let mut new_settings = self.settings.clone(); + if let Some(Command::Settings(ref mut settings)) = &mut new_settings.command { + // Set client type to LitePushConsumer + settings.client_type = Some(pb::ClientType::LitePushConsumer as i32); + + // Ensure FIFO is enabled for LitePushConsumer + // This is critical for maintaining message order in lite mode + if let Some(pb::settings::PubSub::Subscription(ref mut sub)) = settings.pub_sub { + sub.fifo = Some(true); + } + } + + // Share the same SessionManager via Arc::clone + // This is the KEY feature that enables true Lite mode: + // - Both clients use the same telemetry session + // - No duplicate connections to the server + // - Shared state management + let session_manager = Arc::clone(&self.session_manager); + + // Create the lightweight clone + // Note: shutdown_tx is set to None to avoid duplicate shutdown signals + Self { + option: new_option, + session_manager, + route_manager: self.route_manager.clone(), + id: self.id.clone(), + access_endpoints: self.access_endpoints.clone(), + settings: new_settings, + telemetry_command_tx: None, // Will be set during start() + shutdown_tx: None, // Intentionally None to avoid duplicate shutdown + } + } + pub(crate) async fn start( &mut self, telemetry_command_tx: mpsc::Sender, @@ -147,10 +210,10 @@ impl Client { select! { _ = heartbeat_interval.tick() => { let sessions = session_manager.get_all_sessions().await; - if sessions.is_err() { + if let Err(e) = sessions { error!( "send heartbeat failed: failed to get sessions: {}", - sessions.unwrap_err() + e ); continue; } @@ -158,19 +221,19 @@ impl Client { for session in sessions.unwrap() { let peer = session.peer().to_string(); let response = Self::heart_beat_inner(session, &group, &namespace, &client_type).await; - if response.is_err() { + if let Err(e) = response { error!( "send heartbeat failed: failed to send heartbeat rpc: {}", - response.unwrap_err() + e ); continue; } let result = handle_response_status(response.unwrap().status, OPERATION_HEARTBEAT); - if result.is_err() { + if let Err(e) = result { error!( "send heartbeat failed: server return error: {}", - result.unwrap_err() + e ); continue; } @@ -179,15 +242,15 @@ impl Client { }, _ = sync_settings_interval.tick() => { let sessions = session_manager.get_all_sessions().await; - if sessions.is_err() { - error!("sync settings failed: failed to get sessions: {}", sessions.unwrap_err()); + if let Err(e) = sessions { + error!("sync settings failed: failed to get sessions: {}", e); continue; } for mut session in sessions.unwrap() { let peer = session.peer().to_string(); let result = session.update_settings(settings.clone()).await; - if result.is_err() { - error!("sync settings failed: failed to call rpc: {}", result.unwrap_err()); + if let Err(e) = result { + error!("sync settings failed: failed to call rpc: {}", e); continue; } debug!("sync settings success, peer = {}", peer); @@ -196,8 +259,8 @@ impl Client { }, _ = sync_route_timer.tick() => { let result = route_manager.sync_route_data(&mut rpc_client).await; - if result.is_err() { - error!("sync route failed: {}", result.unwrap_err()); + if let Err(e) = result { + error!("sync route failed: {}", e); } }, _ = &mut shutdown_rx => { @@ -211,7 +274,7 @@ impl Client { Ok(()) } - fn check_started(&self, operation: &'static str) -> Result<(), ClientError> { + pub(crate) fn check_started(&self, operation: &'static str) -> Result<(), ClientError> { if !self.is_started() { return Err(ClientError::new( ErrorKind::ClientIsNotRunning, @@ -240,10 +303,86 @@ impl Client { Ok(()) } + /// Shutdown without consuming self (for internal use) + pub(crate) async fn shutdown_ref(&mut self) -> Result<(), ClientError> { + self.check_started(OPERATION_CLIENT_SHUTDOWN)?; + let mut rpc_client = self.get_session().await?; + self.telemetry_command_tx = None; + if let Some(tx) = self.shutdown_tx.take() { + let _ = tx.send(()); + } + let group = self.option.group.as_ref().map(|group| Resource { + name: group.to_string(), + resource_namespace: self.option.namespace.to_string(), + }); + let response = rpc_client.notify_shutdown(NotifyClientTerminationRequest { group }); + handle_response_status(response.await?.status, OPERATION_CLIENT_SHUTDOWN)?; + self.session_manager.shutdown().await; + Ok(()) + } + pub(crate) fn client_id(&self) -> &str { &self.id } + /// Get the client type + #[allow(dead_code)] + pub(crate) fn get_client_type(&self) -> ClientType { + self.option.client_type.clone() + } + + /// Check if this is a lite consumer (LitePushConsumer or LiteSimpleConsumer) + #[allow(dead_code)] + pub(crate) fn is_lite_consumer(&self) -> bool { + matches!(self.option.client_type, ClientType::LitePushConsumer) + } + + /// Verify that this client shares the same SessionManager with another client + /// This is useful for verifying Lite mode session sharing + #[allow(dead_code)] + pub(crate) fn shares_session_manager_with(&self, other: &Self) -> bool { + Arc::ptr_eq(&self.session_manager, &other.session_manager) + } + + /// Get session for LitePushConsumer without checking if client is started + /// + /// This method is specifically designed for LitePushConsumer which uses a cloned client. + /// The cloned client doesn't have shutdown_tx set (to avoid duplicate shutdown), so + /// is_started() would return false. However, it shares the same SessionManager as the + /// original client, so it can still get sessions. + /// + /// Key features: + /// - Skips the check_started validation (cloned clients don't have shutdown_tx) + /// - Creates a virtual telemetry channel if missing (for cloned clients) + /// - Shares the same SessionManager, ensuring the same underlying connection + /// + /// This enables true Lite mode where both the original and cloned clients + /// use the same telemetry session through shared SessionManager. + pub(crate) async fn get_session_for_lite_consumer(&self) -> Result { + // For LitePushConsumer, we skip the check_started check because: + // 1. The cloned client intentionally has shutdown_tx = None to avoid duplicate shutdown + // 2. But it shares the same SessionManager, so sessions are still valid + // 3. The original client manages the lifecycle, not the clone + + // Handle missing telemetry_command_tx by creating a virtual channel + // This is safe because: + // - The cloned client shares SessionManager with the original + // - The original client's telemetry channel is already established + // - The virtual channel won't be used for actual communication + let telemetry_tx = self.telemetry_command_tx.clone().unwrap_or_else(|| { + // Create a virtual/dummy channel for cloned clients + // This channel won't receive actual messages, but satisfies the API requirement + let (tx, _rx) = mpsc::channel(1); + tx + }); + + let session = self + .session_manager + .get_or_create_session(&self.access_endpoints, self.settings.clone(), telemetry_tx) + .await?; + Ok(session) + } + fn generate_client_id() -> String { let host = match hostname::get() { Ok(name) => name, @@ -341,6 +480,18 @@ impl Client { .await } + pub(crate) async fn recall_message( + &self, + endpoints: &Endpoints, + request: RecallMessageRequest, + ) -> Result { + self.recall_message_inner( + self.get_session_with_endpoints(endpoints).await.unwrap(), + request, + ) + .await + } + pub(crate) async fn send_message_inner( &self, mut rpc_client: T, @@ -357,6 +508,15 @@ impl Client { .collect()) } + pub(crate) async fn recall_message_inner( + &self, + mut rpc_client: T, + request: RecallMessageRequest, + ) -> Result { + let response = rpc_client.recall_message(request).await?; + Ok(response) + } + pub(crate) async fn receive_message( &self, endpoints: &Endpoints, @@ -396,6 +556,7 @@ impl Client { long_polling_timeout: Some( Duration::try_from(*self.option.long_polling_timeout()).unwrap(), ), + attempt_id: None, }; let responses = rpc_client.receive_message(request).await?; @@ -444,6 +605,7 @@ impl Client { vec![pb::AckMessageEntry { message_id: ack_entry.message_id(), receipt_handle: ack_entry.receipt_handle(), + lite_topic: None, }], ) .await?; @@ -511,6 +673,8 @@ impl Client { receipt_handle, invisible_duration: Some(invisible_duration), message_id, + lite_topic: None, + suspend: None, }; let response = rpc_client.change_invisible_duration(request).await?; handle_response_status(response.status, OPERATION_ACK_MESSAGE)?; @@ -1238,4 +1402,149 @@ pub(crate) mod tests { assert_eq!(error.message, "server return an error"); assert_eq!(error.operation, "client.ack_message"); } + + #[test] + fn test_clone_for_lite_consumer() { + let _m = MTX.lock(); + + // Create original client + let original_client = new_client_for_test(); + + // Clone for lite consumer + let lite_client = original_client.clone_for_lite_consumer(); + + // Verify client type is changed + assert!(matches!( + lite_client.option.client_type, + ClientType::LitePushConsumer + )); + + // Verify settings are updated + if let Some(Command::Settings(ref settings)) = lite_client.settings.command { + assert_eq!( + settings.client_type, + Some(pb::ClientType::LitePushConsumer as i32) + ); + + // Verify FIFO is enabled + if let Some(pb::settings::PubSub::Subscription(ref sub)) = settings.pub_sub { + assert_eq!(sub.fifo, Some(true)); + } + } + + // Verify SessionManager is shared + assert!(original_client.shares_session_manager_with(&lite_client)); + + // Verify other fields are cloned correctly + assert_eq!(original_client.id, lite_client.id); + assert_eq!( + original_client.access_endpoints.endpoint_url(), + lite_client.access_endpoints.endpoint_url() + ); + + // Verify telemetry_command_tx and shutdown_tx are reset + assert!(lite_client.telemetry_command_tx.is_none()); + assert!(lite_client.shutdown_tx.is_none()); + } + + #[test] + fn test_is_lite_consumer() { + let _m = MTX.lock(); + + // Create original client (not lite) + let original_client = new_client_for_test(); + assert!(!original_client.is_lite_consumer()); + + // Clone for lite consumer + let lite_client = original_client.clone_for_lite_consumer(); + assert!(lite_client.is_lite_consumer()); + } + + #[tokio::test] + async fn test_lite_consumer_session_sharing() { + let _m = MTX.lock(); + + let context = MockSession::new_context(); + context.expect().returning(|_, _, _| { + let mut session = MockSession::default(); + session.expect_start().returning(|_, _| Ok(())); + session + .expect_shadow_session() + .returning(|| MockSession::default()); + Ok(session) + }); + + // Create original client + let session_manager = new_session_manager(); + let mut original_client = new_client_with_session_manager(session_manager); + let (tx1, _rx1) = mpsc::channel(16); + let _ = original_client.start(tx1).await; + + // Clone for lite consumer + let mut lite_client = original_client.clone_for_lite_consumer(); + let (tx2, _rx2) = mpsc::channel(16); + let _ = lite_client.start(tx2).await; + + // Verify they share the same SessionManager + assert!(original_client.shares_session_manager_with(&lite_client)); + + // Both clients should be able to get sessions + let result1 = original_client.get_session().await; + assert!(result1.is_ok()); + + let result2 = lite_client.get_session().await; + assert!(result2.is_ok()); + } + + #[tokio::test] + async fn test_get_session_for_lite_consumer() { + let _m = MTX.lock(); + + let context = MockSession::new_context(); + context.expect().returning(|_, _, _| { + let mut session = MockSession::default(); + session.expect_start().returning(|_, _| Ok(())); + session + .expect_shadow_session() + .returning(|| MockSession::default()); + Ok(session) + }); + + // Create original client and start it + let session_manager = new_session_manager(); + let mut original_client = new_client_with_session_manager(session_manager); + let (tx1, _rx1) = mpsc::channel(16); + let _ = original_client.start(tx1).await; + + // Clone for lite consumer (this clone has shutdown_tx = None) + let lite_client = original_client.clone_for_lite_consumer(); + + // Verify the lite client is not "started" (shutdown_tx is None) + assert!(!lite_client.is_started()); + + // But it should still be able to get a session using get_session_for_lite_consumer + // This method skips check_started and creates a virtual telemetry channel + let result = lite_client.get_session_for_lite_consumer().await; + assert!(result.is_ok()); + + // Verify they share the same SessionManager + assert!(original_client.shares_session_manager_with(&lite_client)); + } + + #[test] + fn test_is_lite_consumer_with_clone() { + let _m = MTX.lock(); + + // Create original client with LitePushConsumer type + let mut option = ClientOption::default(); + option.client_type = ClientType::LitePushConsumer; + option.group = Some("group".to_string()); + + let original_client = Client::new(option, TelemetryCommand::default()).unwrap(); + assert!(original_client.is_lite_consumer()); + + // Clone should also be lite consumer + let cloned_client = original_client.clone_for_lite_consumer(); + assert!(cloned_client.is_lite_consumer()); + } } diff --git a/rust/src/conf.rs b/rust/src/conf.rs index ec61d1cac..697f41fc3 100644 --- a/rust/src/conf.rs +++ b/rust/src/conf.rs @@ -263,6 +263,7 @@ pub struct PushConsumerOption { long_polling_timeout: Duration, subscription_expressions: HashMap, fifo: bool, + enable_fifo_consume_accelerator: bool, batch_size: i32, consumer_worker_count_each_queue: usize, } @@ -276,6 +277,7 @@ impl Default for PushConsumerOption { long_polling_timeout: Duration::from_secs(40), subscription_expressions: HashMap::new(), fifo: false, + enable_fifo_consume_accelerator: false, batch_size: 32, consumer_worker_count_each_queue: 4, } @@ -319,6 +321,11 @@ impl PushConsumerOption { .insert(topic.into(), filter_expression); } + /// Set subscription expressions (replaces all existing subscriptions) + pub fn set_subscription_expressions(&mut self, expressions: HashMap) { + self.subscription_expressions = expressions; + } + pub fn get_filter_expression(&self, topic: &str) -> Option<&FilterExpression> { self.subscription_expressions.get(topic) } @@ -327,10 +334,23 @@ impl PushConsumerOption { self.fifo } - pub(crate) fn set_fifo(&mut self, fifo: bool) { + /// Set whether to use FIFO consumer + pub fn set_fifo(&mut self, fifo: bool) { self.fifo = fifo; } + /// Check if FIFO consume accelerator is enabled + pub fn enable_fifo_consume_accelerator(&self) -> bool { + self.enable_fifo_consume_accelerator + } + + /// Enable or disable FIFO consume accelerator. + /// If enabled, the consumer will consume messages in parallel by messageGroup, + /// it may increase the probability of repeatedly consuming the same message. + pub fn set_enable_fifo_consume_accelerator(&mut self, enable: bool) { + self.enable_fifo_consume_accelerator = enable; + } + pub fn batch_size(&self) -> i32 { self.batch_size } diff --git a/rust/src/lib.rs b/rust/src/lib.rs index c12eba674..b73ba6ced 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -117,14 +117,20 @@ //! } //! } //! ``` -//! // Export structs that are part of crate API. +#[cfg(not(test))] +pub use lite_push_consumer::{LitePushConsumer, LitePushConsumerTrait}; +pub use model::common::ConsumeResult; +pub use model::transaction::Transaction; pub use producer::Producer; pub use push_consumer::MessageListener; pub use push_consumer::PushConsumer; pub use simple_consumer::SimpleConsumer; +// Export offset_option types +pub use model::offset_option::{OffsetOption, OffsetPolicy}; + #[allow(dead_code)] pub mod conf; pub mod error; @@ -140,6 +146,10 @@ mod session; pub mod model; mod util; +#[cfg(not(test))] +mod lite_push_consumer; +#[cfg(not(test))] +mod lite_subscription_manager; mod producer; mod push_consumer; mod simple_consumer; diff --git a/rust/src/lite_push_consumer.rs b/rust/src/lite_push_consumer.rs new file mode 100644 index 000000000..b782309f2 --- /dev/null +++ b/rust/src/lite_push_consumer.rs @@ -0,0 +1,430 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//! LitePushConsumer - A specialized consumer for lite topics with reduced metadata and storage overhead. +//! +//! Reference Java: LitePushConsumerImpl extends PushConsumerImpl +//! +//! LitePushConsumer is a push consumer variant for lite topics. It shares the same +//! consumption infrastructure as PushConsumer (assignment scanning, message receiving, processing) +//! but adds lite topic subscription management via LiteSubscriptionManager. +//! +//! Key design points (aligned with Java): +//! 1. LitePushConsumer wraps a PushConsumer for message consumption logic +//! 2. LiteSubscriptionManager handles lite topic lifecycle (subscribe/unsubscribe/sync) +//! 3. The lite_client is a lightweight clone of the main client for LiteSubscriptionManager RPC calls +//! 4. Telemetry commands related to lite subscriptions (NotifyUnsubscribeLiteCommand, Settings) +//! are forwarded to LiteSubscriptionManager for processing + +use std::collections::HashSet; +use std::sync::Arc; + +use async_trait::async_trait; +use tokio::sync::mpsc; +use tokio_util::sync::CancellationToken; +use tokio_util::task::TaskTracker; +use tracing::{debug, error, info, warn}; + +use crate::client::Client; +use crate::conf::{ClientOption, PushConsumerOption}; +use crate::error::{ClientError, ErrorKind}; +use crate::lite_subscription_manager::LiteSubscriptionManager; +use crate::model::common::ClientType; +use crate::model::offset_option::OffsetOption; +use crate::pb; +use crate::push_consumer::{MessageListener, PushConsumer}; +use crate::util::build_push_consumer_settings; + +const OPERATION_NEW_LITE_PUSH_CONSUMER: &str = "lite_push_consumer.new"; + +/// LitePushConsumer trait defining the interface for lite push consumers +#[async_trait] +pub trait LitePushConsumerTrait { + /// Subscribe to a lite topic + async fn subscribe_lite(&self, lite_topic: String) -> Result<(), ClientError>; + + /// Subscribe to a lite topic with offset option + async fn subscribe_lite_with_offset( + &self, + lite_topic: String, + offset_option: OffsetOption, + ) -> Result<(), ClientError>; + + /// Unsubscribe from a lite topic + async fn unsubscribe_lite(&self, lite_topic: String) -> Result<(), ClientError>; + + /// Get the set of subscribed lite topics + fn get_lite_topic_set(&self) -> HashSet; + + /// Get the consumer group name + fn get_consumer_group(&self) -> String; + + /// Shutdown the consumer + async fn shutdown(&mut self) -> Result<(), ClientError>; +} + +/// LitePushConsumer implementation. +/// +/// Reference Java: LitePushConsumerImpl extends PushConsumerImpl +/// +/// Implementation notes: +/// - Java's LitePushConsumerImpl extends PushConsumerImpl, which extends ConsumerImpl extends ClientImpl. +/// All classes share the same underlying client infrastructure. +/// - In Rust, since Client doesn't implement Clone for shared ownership, we use clone_for_lite_consumer() +/// to create a lightweight clone for LiteSubscriptionManager. This ensures both the inner PushConsumer +/// and LiteSubscriptionManager have their own Client instances sharing the same SessionManager. +/// - LitePushSubscriptionSettings is intentionally NOT included here because LiteSubscriptionManager +/// already handles settings sync (sync(), subscribeLite(), unsubscribeLite(), quota management). +/// Adding a separate settings wrapper would duplicate the logic from LiteSubscriptionManager. +pub struct LitePushConsumer { + inner: PushConsumer, + lite_client: Arc, + lite_subscription_manager: Arc, + shutdown_token: Option, + task_tracker: Option, +} + +impl LitePushConsumer { + /// Create a new LitePushConsumer + /// + /// Reference Java: + /// 1. LitePushConsumerBuilderImpl sets {bindTopic: SUB_ALL} as subscriptionExpressions + /// 2. LitePushConsumerImpl constructor calls super(...) which creates PushConsumerImpl + /// 3. LiteSubscriptionManager is created with (thisConsumerImpl, new Resource(bindTopic), groupResource) + pub fn new( + client_option: ClientOption, + option: PushConsumerOption, + bind_topic: String, + message_listener: MessageListener, + ) -> Result { + if option.consumer_group().is_empty() { + return Err(ClientError::new( + ErrorKind::Config, + "consumer group is required.", + OPERATION_NEW_LITE_PUSH_CONSUMER, + )); + } + + if bind_topic.is_empty() { + return Err(ClientError::new( + ErrorKind::Config, + "bind topic is required.", + OPERATION_NEW_LITE_PUSH_CONSUMER, + )); + } + + // Reference Java: ImmutableMap.of(bindTopic, FilterExpression.SUB_ALL) + // This ensures the subscription_table contains bind_topic so that scan_assignments + // can query assignments and start messageQueueActor for the bind topic + let mut option_with_default_subscription = option.clone(); + option_with_default_subscription.set_subscription_expressions( + std::collections::HashMap::from([( + bind_topic.clone(), + crate::model::common::FilterExpression::sub_all(), + )]), + ); + + // Reference Java: PushSubscriptionSettings.toProtobuf() with clientType = LitePushConsumer + let mut settings = build_push_consumer_settings(&option_with_default_subscription); + if let Some(pb::telemetry_command::Command::Settings(ref mut s)) = settings.command { + s.client_type = Some(pb::ClientType::LitePushConsumer as i32); + } + + let namespace = option.namespace().to_string(); + let consumer_group = option.consumer_group().to_string(); + + // Reference Java: ConsumerImpl constructor + // Create the main client for consumption (inner PushConsumer) + let client_option = ClientOption { + client_type: ClientType::LitePushConsumer, + group: Some(option.consumer_group().to_string()), + namespace: namespace.clone(), + ..client_option + }; + let client = Client::new(client_option, settings)?; + + // Reference Java: LiteSubscriptionManager(consumerImpl, new Resource(bindTopic), groupResource) + // clone_for_lite_consumer creates a lightweight Client clone sharing the same SessionManager. + // This simulates Java's approach where LiteSubscriptionManager uses consumerImpl directly. + // + // Dual Client Architecture: + // - inner client: Used by PushConsumer for message consumption (owns the client) + // - lite_client: Used by LiteSubscriptionManager for subscription management (cloned, shares SessionManager) + // Both clients share the same SessionManager via Arc::clone, enabling true Lite mode + // where they use the same underlying telemetry session and connection. + let lite_client = Arc::new(client.clone_for_lite_consumer()); + let lite_subscription_manager = Arc::new(LiteSubscriptionManager::new( + Arc::clone(&lite_client), + bind_topic.clone(), + namespace, + consumer_group, + )); + + // Create inner PushConsumer with option_with_default_subscription + // This is CRITICAL: The inner PushConsumer must use option_with_default_subscription + // to ensure subscription_table contains bind_topic. This allows scan_assignments to + // query assignments for the bind topic and start messageQueueActor. + // + // Reference Java: LitePushConsumerImpl extends PushConsumerImpl, which uses the + // subscriptionExpressions set in the builder (ImmutableMap.of(bindTopic, SUB_ALL)) + let inner = PushConsumer::new_with_client( + client, + option_with_default_subscription, + message_listener, + )?; + + Ok(Self { + inner, + lite_client, + lite_subscription_manager, + shutdown_token: None, + task_tracker: None, + }) + } + + /// Start the LitePushConsumer + /// + /// Reference Java: LitePushConsumerImpl.startUp() + /// + /// Java flow: + /// 1. super.startUp() -> PushConsumerImpl.startUp() -> ConsumerImpl.startUp() -> ClientImpl.startUp() + /// Starts the client, establishes telemetry, fetches topic routes, starts assignment scanning. + /// 2. liteSubscriptionManager.startUp() -> syncAllLiteSubscription() + schedule periodic sync + pub async fn start(&mut self) -> Result<(), ClientError> { + let bind_topic = self + .lite_subscription_manager + .get_bind_topic_name() + .to_string(); + let consumer_group = self + .lite_subscription_manager + .get_consumer_group_name() + .to_string(); + let client_id = self.lite_client.client_id().to_string(); + + info!( + "Begin to start the LitePushConsumer, bindTopic={}, consumerGroup={}, clientId={}", + bind_topic, consumer_group, client_id + ); + + // Step 1: Pre-fetch bindTopic route (reference Java: fetchTopicRoute during ClientImpl.startUp()) + // This is done early to validate the bind topic exists before starting consumption + match self.lite_client.topic_route(&bind_topic, false).await { + Ok(route) => { + info!( + "Pre-fetched route for bindTopic={}, message_queues={}", + bind_topic, + route.queue.len() + ); + } + Err(e) => { + warn!( + "Failed to pre-fetch route for bindTopic={}, error={:?}. Will retry later.", + bind_topic, e + ); + } + } + + // Step 2: Start inner PushConsumer (reference Java: super.startUp()) + // This starts the client, establishes telemetry, fetches topic routes, and sets up assignment scanning + let (telemetry_command_tx, mut telemetry_command_rx) = mpsc::channel(16); + if let Err(e) = self.inner.start_with_telemetry(telemetry_command_tx).await { + error!( + "Failed to start LitePushConsumer inner, bindTopic={}, clientId={}, error={:?}", + bind_topic, client_id, e + ); + // Reference Java: startUp() calls shutDown() on failure + if let Err(shutdown_err) = self.shutdown().await { + error!( + "Failed to shutdown after start failure, clientId={}, error={:?}", + client_id, shutdown_err + ); + } + return Err(ClientError::new( + ErrorKind::ClientInternal, + &format!("startUp err={:?}", e), + "lite_push_consumer.start", + )); + } + + // Step 3: Start lite subscription manager (reference Java: liteSubscriptionManager.startUp()) + if let Err(e) = self.lite_subscription_manager.start().await { + error!( + "Failed to start LiteSubscriptionManager, bindTopic={}, clientId={}, error={:?}", + bind_topic, client_id, e + ); + if let Err(shutdown_err) = self.shutdown().await { + error!( + "Failed to shutdown after start failure, clientId={}, error={:?}", + client_id, shutdown_err + ); + } + return Err(ClientError::new( + ErrorKind::ClientInternal, + &format!("startUp err={:?}", e), + "lite_push_consumer.start", + )); + } + + // Step 4: Setup telemetry command handler for lite-specific commands + // Reference Java: LitePushConsumerImpl handles: + // - onSettingsCommand() -> super.onSettingsCommand() + liteSubscriptionManager.sync(settings) + // - onNotifyUnsubscribeLiteCommand() -> liteSubscriptionManager.onNotifyUnsubscribeLiteCommand() + let shutdown_token = CancellationToken::new(); + self.shutdown_token = Some(shutdown_token.clone()); + let task_tracker = TaskTracker::new(); + self.task_tracker = Some(task_tracker.clone()); + + let manager = Arc::clone(&self.lite_subscription_manager); + let client_id_clone = client_id.clone(); + task_tracker.spawn(async move { + loop { + tokio::select! { + command = telemetry_command_rx.recv() => { + if let Some(command) = command { + // Reference Java: onNotifyUnsubscribeLiteCommand() + if let Some(pb::telemetry_command::Command::NotifyUnsubscribeLiteCommand(ref cmd)) = + command.command + { + info!( + "Received unsubscribe notification for lite topic: {}, clientId={}", + cmd.lite_topic, client_id_clone + ); + manager.on_notify_unsubscribe_lite_command(cmd.lite_topic.clone()); + } + + // Reference Java: onSettingsCommand() -> liteSubscriptionManager.sync(settings) + // super.onSettingsCommand() is handled by inner PushConsumer's telemetry loop + if let Some(pb::telemetry_command::Command::Settings(ref settings_cmd)) = + command.command + { + debug!( + "Received settings update from server, clientId={}", + client_id_clone + ); + // LiteSubscriptionManager.sync_settings() handles quota and max_topic_size + // Reference Java: liteSubscriptionManager.sync(settings) + manager.sync_settings(settings_cmd); + } + } else { + // Channel closed, exit loop + break; + } + } + _ = shutdown_token.cancelled() => { + break; + } + } + } + info!( + "LitePushConsumer telemetry handler stopped, clientId={}", + client_id_clone + ); + }); + + info!( + "The LitePushConsumer starts successfully, bindTopic={}, consumerGroup={}, clientId={}", + bind_topic, consumer_group, client_id + ); + Ok(()) + } +} + +#[async_trait] +impl LitePushConsumerTrait for LitePushConsumer { + /// Subscribe to a lite topic + /// + /// Reference Java: LitePushConsumerImpl.subscribeLite(String) + async fn subscribe_lite(&self, lite_topic: String) -> Result<(), ClientError> { + // Check if client is started (public API validation) + // Use inner's client because lite_client is a clone without shutdown_tx + self.inner + .check_started("lite_push_consumer.subscribe_lite")?; + + self.lite_subscription_manager + .subscribe_lite(lite_topic, None) + .await + } + + /// Subscribe to a lite topic with offset option + /// + /// Reference Java: LitePushConsumerImpl.subscribeLite(String, OffsetOption) + async fn subscribe_lite_with_offset( + &self, + lite_topic: String, + offset_option: OffsetOption, + ) -> Result<(), ClientError> { + // Check if client is started (public API validation) + // Use inner's client because lite_client is a clone without shutdown_tx + self.inner + .check_started("lite_push_consumer.subscribe_lite_with_offset")?; + + self.lite_subscription_manager + .subscribe_lite(lite_topic, Some(offset_option)) + .await + } + + /// Unsubscribe from a lite topic + /// + /// Reference Java: LitePushConsumerImpl.unsubscribeLite(String) + async fn unsubscribe_lite(&self, lite_topic: String) -> Result<(), ClientError> { + // Check if client is started (public API validation) + // Use inner's client because lite_client is a clone without shutdown_tx + self.inner + .check_started("lite_push_consumer.unsubscribe_lite")?; + + self.lite_subscription_manager + .unsubscribe_lite(lite_topic) + .await + } + + /// Get the set of subscribed lite topics + /// + /// Reference Java: LitePushConsumerImpl.getLiteTopicSet() + fn get_lite_topic_set(&self) -> HashSet { + self.lite_subscription_manager.get_lite_topic_set() + } + + /// Get the consumer group name + /// + /// Reference Java: LitePushConsumerImpl.getConsumerGroup() + fn get_consumer_group(&self) -> String { + self.lite_subscription_manager + .get_consumer_group_name() + .to_string() + } + + /// Shutdown the consumer + /// + /// Reference Java: PushConsumerImpl.close() -> this.stopAsync().awaitTerminated() + async fn shutdown(&mut self) -> Result<(), ClientError> { + info!("Shutting down LitePushConsumer..."); + + if let Some(token) = self.shutdown_token.take() { + token.cancel(); + } + + if let Some(tracker) = self.task_tracker.take() { + tracker.close(); + tracker.wait().await; + } + + self.inner.shutdown_ref().await?; + + info!("LitePushConsumer shutdown successfully"); + Ok(()) + } +} diff --git a/rust/src/lite_subscription_manager.rs b/rust/src/lite_subscription_manager.rs new file mode 100644 index 000000000..b3e9c9eb4 --- /dev/null +++ b/rust/src/lite_subscription_manager.rs @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//! Lite subscription manager for managing lite topic subscriptions lifecycle. + +use std::collections::HashSet; +use std::sync::Arc; +use std::time::Duration; + +use parking_lot::Mutex; +use tracing::{error, info}; + +use crate::client::Client; +use crate::error::{ClientError, ErrorKind}; +use crate::model::offset_option::OffsetOption; +use crate::pb; +use crate::pb::{LiteSubscriptionAction, Resource, SyncLiteSubscriptionRequest}; +use crate::session::RPCClient; +use crate::util::handle_response_status; + +const OPERATION_SYNC_LITE_SUBSCRIPTION: &str = "lite_subscription.sync"; + +/// Manages lite topic subscriptions for LitePushConsumer +pub struct LiteSubscriptionManager { + client: Arc, + bind_topic: Resource, + group: Resource, + lite_topic_set: Arc>>, + lite_subscription_quota: Arc>, + max_lite_topic_size: Arc>, +} + +impl LiteSubscriptionManager { + /// Create a new LiteSubscriptionManager + pub fn new( + client: Arc, + bind_topic_name: String, + namespace: String, + consumer_group: String, + ) -> Self { + Self { + client, + bind_topic: Resource { + resource_namespace: namespace.clone(), + name: bind_topic_name, + }, + group: Resource { + resource_namespace: namespace, + name: consumer_group, + }, + lite_topic_set: Arc::new(Mutex::new(HashSet::new())), + lite_subscription_quota: Arc::new(Mutex::new(0)), + max_lite_topic_size: Arc::new(Mutex::new(64)), // default value + } + } + + /// Start the subscription manager - sync all subscriptions after startup + pub async fn start(&self) -> Result<(), ClientError> { + self.sync_all_lite_subscription().await?; + + // Schedule periodic sync every 30 seconds + let manager = self.clone_for_scheduler(); + tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(30)); + loop { + interval.tick().await; + if let Err(e) = manager.sync_all_lite_subscription().await { + error!("Schedule syncAllLiteSubscription error: {:?}", e); + } + } + }); + + Ok(()) + } + + /// Clone necessary fields for scheduler task + fn clone_for_scheduler(&self) -> Arc { + Arc::new(Self { + client: Arc::clone(&self.client), + bind_topic: self.bind_topic.clone(), + group: self.group.clone(), + lite_topic_set: Arc::clone(&self.lite_topic_set), + lite_subscription_quota: Arc::clone(&self.lite_subscription_quota), + max_lite_topic_size: Arc::clone(&self.max_lite_topic_size), + }) + } + + /// Get bind topic name + pub fn get_bind_topic_name(&self) -> &str { + &self.bind_topic.name + } + + /// Get consumer group name + pub fn get_consumer_group_name(&self) -> &str { + &self.group.name + } + + /// Get the set of subscribed lite topics + pub fn get_lite_topic_set(&self) -> HashSet { + self.lite_topic_set.lock().clone() + } + + /// Sync settings from server (reference Java: check hasSubscription first) + pub fn sync_settings(&self, settings: &pb::Settings) { + // Check if settings has subscription (similar to Java's settings.hasSubscription()) + let has_subscription = matches!( + &settings.pub_sub, + Some(pb::settings::PubSub::Subscription(_)) + ); + + if !has_subscription { + return; + } + + // Now we know it's a Subscription variant, extract and sync + if let Some(pb::settings::PubSub::Subscription(subscription)) = &settings.pub_sub { + if let Some(quota) = subscription.lite_subscription_quota { + *self.lite_subscription_quota.lock() = quota; + info!("Updated lite subscription quota to {}", quota); + } + if let Some(max_size) = subscription.max_lite_topic_size { + *self.max_lite_topic_size.lock() = max_size; + info!("Updated max lite topic size to {}", max_size); + } + } + } + + /// Subscribe to a lite topic (reference Java: checkRunning first) + pub async fn subscribe_lite( + &self, + lite_topic: String, + offset_option: Option, + ) -> Result<(), ClientError> { + // For LitePushConsumer, we skip the check_started check because: + // 1. The cloned client intentionally has shutdown_tx = None to avoid duplicate shutdown + // 2. But it shares the same SessionManager, so sessions are still valid + // 3. The original client manages the lifecycle, not the clone + // Note: The public API (LitePushConsumerTrait::subscribe_lite) should validate state + + // Check if already subscribed + if self.lite_topic_set.lock().contains(&lite_topic) { + return Ok(()); + } + + // Validate lite topic format and length + let max_size = *self.max_lite_topic_size.lock(); + self.validate_lite_topic(&lite_topic, max_size)?; + + // Check quota before adding new subscription + self.check_lite_subscription_quota(1)?; + + // Sync subscription to server using PartialAdd action + // This adds the new lite topic to the existing set on the server + self.sync_lite_subscription( + LiteSubscriptionAction::PartialAdd, + vec![lite_topic.clone()], + offset_option, + ) + .await?; + + // Add to local set after successful sync + self.lite_topic_set.lock().insert(lite_topic.clone()); + + info!( + "SubscribeLite {}, topic={}, group={}, clientId={}", + lite_topic, + self.get_bind_topic_name(), + self.get_consumer_group_name(), + self.client.client_id() + ); + + Ok(()) + } + + /// Unsubscribe from a lite topic (reference Java: checkRunning first) + pub async fn unsubscribe_lite(&self, lite_topic: String) -> Result<(), ClientError> { + // Check if client is running (reference Java: consumerImpl.checkRunning() line 119) + self.client + .check_started(OPERATION_SYNC_LITE_SUBSCRIPTION)?; + + // Check if subscribed + if !self.lite_topic_set.lock().contains(&lite_topic) { + return Ok(()); + } + + // Sync unsubscription to server + self.sync_lite_subscription( + LiteSubscriptionAction::PartialRemove, + vec![lite_topic.clone()], + None, + ) + .await?; + + // Remove from local set + self.lite_topic_set.lock().remove(&lite_topic); + + info!( + "UnsubscribeLite {}, topic={}, group={}, clientId={}", + lite_topic, + self.get_bind_topic_name(), + self.get_consumer_group_name(), + self.client.client_id() + ); + + Ok(()) + } + + /// Sync all lite subscriptions periodically + async fn sync_all_lite_subscription(&self) -> Result<(), ClientError> { + // For LitePushConsumer, we skip the check_started check because: + // 1. The cloned client intentionally has shutdown_tx = None to avoid duplicate shutdown + // 2. But it shares the same SessionManager, so sessions are still valid + // 3. The original client manages the lifecycle, not the clone + // Note: subscribe_lite and unsubscribe_lite still check started status via the public API + + // Check quota + self.check_lite_subscription_quota(0)?; + + let topics: Vec = self.lite_topic_set.lock().iter().cloned().collect(); + if topics.is_empty() { + return Ok(()); + } + + match self + .sync_lite_subscription(LiteSubscriptionAction::CompleteAdd, topics, None) + .await + { + Ok(_) => Ok(()), + Err(e) => { + error!("Failed to sync all lite subscriptions: {:?}", e); + Err(e) + } + } + } + + /// Sync lite subscription to server + async fn sync_lite_subscription( + &self, + action: LiteSubscriptionAction, + lite_topics: Vec, + offset_option: Option, + ) -> Result<(), ClientError> { + let request = SyncLiteSubscriptionRequest { + action: action as i32, + topic: Some(self.bind_topic.clone()), + group: Some(self.group.clone()), + lite_topic_set: lite_topics, + version: None, + offset_option: offset_option.map(|opt| opt.to_protobuf()), + }; + + // Use get_session_for_lite_consumer for LitePushConsumer + // This method skips check_started and handles missing telemetry channel + let mut rpc_client = self.client.get_session_for_lite_consumer().await?; + + let response = rpc_client.sync_lite_subscription(request).await?; + + handle_response_status(response.status, OPERATION_SYNC_LITE_SUBSCRIPTION)?; + + Ok(()) + } + + /// Handle NotifyUnsubscribeLiteCommand from server + pub fn on_notify_unsubscribe_lite_command(&self, lite_topic: String) { + info!( + "Notify unsubscribe lite liteTopic={} group={} bindTopic={}", + lite_topic, + self.get_consumer_group_name(), + self.get_bind_topic_name() + ); + + if !lite_topic.is_empty() { + self.lite_topic_set.lock().remove(&lite_topic); + } + } + + /// Validate lite topic format and length + fn validate_lite_topic(&self, lite_topic: &str, max_length: i32) -> Result<(), ClientError> { + if lite_topic.trim().is_empty() { + return Err(ClientError::new( + ErrorKind::Config, + "liteTopic is blank", + OPERATION_SYNC_LITE_SUBSCRIPTION, + )); + } + + if lite_topic.len() > max_length as usize { + return Err(ClientError::new( + ErrorKind::Config, + &format!( + "liteTopic length exceeded max length {}, liteTopic: {}", + max_length, lite_topic + ), + OPERATION_SYNC_LITE_SUBSCRIPTION, + )); + } + + Ok(()) + } + + /// Check if adding delta subscriptions would exceed quota + fn check_lite_subscription_quota(&self, delta: i32) -> Result<(), ClientError> { + let current_size = self.lite_topic_set.lock().len() as i32; + let quota = *self.lite_subscription_quota.lock(); + + // If quota is 0, it means the server hasn't returned the quota yet. + // In this case, we should allow subscriptions to proceed. + // Once the server returns the quota, it will be updated via sync_settings(). + if quota == 0 { + return Ok(()); + } + + if current_size + delta > quota { + return Err(ClientError::new( + ErrorKind::Server, + &format!( + "Lite subscription quota exceeded: current={}, delta={}, quota={}", + current_size, delta, quota + ), + OPERATION_SYNC_LITE_SUBSCRIPTION, + ) + .with_context( + "code", + format!("{}", pb::Code::LiteSubscriptionQuotaExceeded as i32), + )); + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn create_test_client() -> Arc { + // Create a minimal client for testing + let option = crate::conf::ClientOption::default(); + let settings = pb::TelemetryCommand { + command: None, + status: None, + }; + Arc::new(Client::new(option, settings).unwrap()) + } + + #[test] + fn test_validate_lite_topic() { + let client = create_test_client(); + let manager = LiteSubscriptionManager::new( + client, + "bind_topic".to_string(), + "namespace".to_string(), + "group".to_string(), + ); + + // Valid topic + assert!(manager.validate_lite_topic("valid-topic", 64).is_ok()); + + // Blank topic + assert!(manager.validate_lite_topic("", 64).is_err()); + assert!(manager.validate_lite_topic(" ", 64).is_err()); + + // Too long topic + let long_topic = "a".repeat(65); + assert!(manager.validate_lite_topic(&long_topic, 64).is_err()); + } + + #[test] + fn test_check_quota() { + let client = create_test_client(); + let manager = LiteSubscriptionManager::new( + client, + "bind_topic".to_string(), + "namespace".to_string(), + "group".to_string(), + ); + + // When quota is 0 (not set by server yet), should allow subscriptions + assert!(manager.check_lite_subscription_quota(1).is_ok()); + assert!(manager.check_lite_subscription_quota(100).is_ok()); + + // Set quota to 5 + *manager.lite_subscription_quota.lock() = 5; + + // Should pass when under quota + assert!(manager.check_lite_subscription_quota(3).is_ok()); + + // Should fail when exceeding quota + assert!(manager.check_lite_subscription_quota(6).is_err()); + + // Add some topics + manager.lite_topic_set.lock().insert("topic1".to_string()); + manager.lite_topic_set.lock().insert("topic2".to_string()); + + // Now only 3 more allowed + assert!(manager.check_lite_subscription_quota(3).is_ok()); + assert!(manager.check_lite_subscription_quota(4).is_err()); + } +} diff --git a/rust/src/model/common.rs b/rust/src/model/common.rs index 9401cb188..b6e93b8da 100644 --- a/rust/src/model/common.rs +++ b/rust/src/model/common.rs @@ -36,6 +36,7 @@ pub(crate) enum ClientType { SimpleConsumer = 3, #[allow(dead_code)] PullConsumer = 4, + LitePushConsumer = 5, } #[derive(Debug)] @@ -209,6 +210,9 @@ pub struct FilterExpression { } impl FilterExpression { + /// Wildcard expression that matches all messages + pub const SUB_ALL: &'static str = "*"; + /// Create a new filter expression /// /// # Arguments @@ -222,6 +226,14 @@ impl FilterExpression { } } + /// Create a default filter expression that subscribes to all messages (tag="*") + pub fn sub_all() -> Self { + FilterExpression { + filter_type: FilterType::Tag, + expression: Self::SUB_ALL.to_string(), + } + } + /// Get filter type pub fn filter_type(&self) -> FilterType { self.filter_type @@ -238,6 +250,7 @@ impl FilterExpression { pub struct SendReceipt { message_id: String, transaction_id: String, + recall_handle: Option, } impl SendReceipt { @@ -245,6 +258,11 @@ impl SendReceipt { SendReceipt { message_id: entry.message_id.clone(), transaction_id: entry.transaction_id.clone(), + recall_handle: if entry.recall_handle.is_empty() { + None + } else { + Some(entry.recall_handle.clone()) + }, } } @@ -257,6 +275,12 @@ impl SendReceipt { pub fn transaction_id(&self) -> &str { &self.transaction_id } + + /// Get recall handle for delay/timed messages + /// This handle can be used to recall the message before delivery + pub fn recall_handle(&self) -> Option<&str> { + self.recall_handle.as_deref() + } } #[derive(Debug)] diff --git a/rust/src/model/message.rs b/rust/src/model/message.rs index 035861b94..3f3f99328 100644 --- a/rust/src/model/message.rs +++ b/rust/src/model/message.rs @@ -21,16 +21,18 @@ use std::collections::HashMap; use crate::error::{ClientError, ErrorKind}; use crate::model::common::Endpoints; -use crate::model::message::MessageType::{DELAY, FIFO, NORMAL, TRANSACTION}; +use crate::model::message::MessageType::{DELAY, FIFO, LITE, NORMAL, PRIORITY, TRANSACTION}; use crate::model::message_id::UNIQ_ID_GENERATOR; use crate::pb; -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Copy, Debug, PartialEq)] pub enum MessageType { NORMAL = 1, FIFO = 2, DELAY = 3, TRANSACTION = 4, + LITE = 5, + PRIORITY = 6, } /// [`Message`] is the data model for sending. @@ -43,6 +45,8 @@ pub trait Message { fn take_properties(&mut self) -> HashMap; fn take_message_group(&mut self) -> Option; fn take_delivery_timestamp(&mut self) -> Option; + fn take_priority(&mut self) -> Option; + fn take_lite_topic(&mut self) -> Option; fn transaction_enabled(&mut self) -> bool; fn get_message_type(&self) -> MessageType; } @@ -68,6 +72,8 @@ pub(crate) struct MessageImpl { pub(crate) delivery_timestamp: Option, pub(crate) transaction_enabled: bool, pub(crate) message_type: MessageType, + pub(crate) priority: Option, + pub(crate) lite_topic: Option, } impl Message for MessageImpl { @@ -103,6 +109,14 @@ impl Message for MessageImpl { self.delivery_timestamp.take() } + fn take_priority(&mut self) -> Option { + self.priority.take() + } + + fn take_lite_topic(&mut self) -> Option { + self.lite_topic.take() + } + fn transaction_enabled(&mut self) -> bool { self.transaction_enabled } @@ -134,6 +148,8 @@ impl MessageBuilder { delivery_timestamp: None, transaction_enabled: false, message_type: NORMAL, + priority: None, + lite_topic: None, }, } } @@ -162,6 +178,8 @@ impl MessageBuilder { delivery_timestamp: None, transaction_enabled: false, message_type: FIFO, + priority: None, + lite_topic: None, }, } } @@ -190,6 +208,8 @@ impl MessageBuilder { delivery_timestamp: Some(delay_time), transaction_enabled: false, message_type: DELAY, + priority: None, + lite_topic: None, }, } } @@ -213,6 +233,8 @@ impl MessageBuilder { delivery_timestamp: None, transaction_enabled: true, message_type: TRANSACTION, + priority: None, + lite_topic: None, }, } } @@ -279,6 +301,82 @@ impl MessageBuilder { self } + /// Create a new [`MessageBuilder`] for building a priority message. + /// + /// # Arguments + /// + /// * `topic` - topic of the message + /// * `body` - message body + /// * `priority` - message priority (1-10, higher value means higher priority) + pub fn priority_message_builder( + topic: impl Into, + body: Vec, + priority: i32, + ) -> MessageBuilder { + MessageBuilder { + message: MessageImpl { + message_id: UNIQ_ID_GENERATOR.lock().next_id(), + topic: topic.into(), + body: Some(body), + tag: None, + keys: None, + properties: None, + message_group: None, + delivery_timestamp: None, + transaction_enabled: false, + message_type: PRIORITY, + priority: Some(priority), + lite_topic: None, + }, + } + } + + /// Set message priority (1-10, higher value means higher priority). + /// Only valid for priority messages. + pub fn set_priority(mut self, priority: i32) -> Self { + self.message.priority = Some(priority); + self.message.message_type = PRIORITY; + self + } + + /// Create a new [`MessageBuilder`] for building a lite message. + /// + /// # Arguments + /// + /// * `topic` - parent topic of the message + /// * `body` - message body + /// * `lite_topic` - lite topic name + pub fn lite_message_builder( + topic: impl Into, + body: Vec, + lite_topic: impl Into, + ) -> MessageBuilder { + MessageBuilder { + message: MessageImpl { + message_id: UNIQ_ID_GENERATOR.lock().next_id(), + topic: topic.into(), + body: Some(body), + tag: None, + keys: None, + properties: None, + message_group: None, + delivery_timestamp: None, + transaction_enabled: false, + message_type: LITE, + priority: None, + lite_topic: Some(lite_topic.into()), + }, + } + } + + /// Set lite topic for lite message. + /// Lite topic cannot be set with message_group, delivery_timestamp, or priority at the same time. + pub fn set_lite_topic(mut self, lite_topic: impl Into) -> Self { + self.message.lite_topic = Some(lite_topic.into()); + self.message.message_type = LITE; + self + } + fn check_message(&self) -> Result<(), String> { if self.message.topic.is_empty() { return Err("Topic is empty.".to_string()); @@ -299,6 +397,23 @@ impl MessageBuilder { .to_string(), ); } + // Lite topic validation + if self.message.lite_topic.is_some() { + if self.message.message_group.is_some() { + return Err( + "lite_topic and message_group can not be set at the same time.".to_string(), + ); + } + if self.message.delivery_timestamp.is_some() { + return Err( + "lite_topic and delivery_timestamp can not be set at the same time." + .to_string(), + ); + } + if self.message.priority.is_some() { + return Err("lite_topic and priority can not be set at the same time.".to_string()); + } + } Ok(()) } @@ -338,6 +453,8 @@ pub struct MessageView { pub(crate) born_timestamp: i64, pub(crate) delivery_attempt: i32, pub(crate) endpoints: Endpoints, + pub(crate) priority: Option, + pub(crate) lite_topic: Option, } impl AckMessageEntry for MessageView { @@ -377,6 +494,8 @@ impl MessageView { born_timestamp: system_properties.born_timestamp.map_or(0, |t| t.seconds), delivery_attempt: system_properties.delivery_attempt.unwrap_or(0), endpoints, + priority: system_properties.priority, + lite_topic: system_properties.lite_topic, }) } @@ -439,6 +558,16 @@ impl MessageView { pub fn delivery_attempt(&self) -> i32 { self.delivery_attempt } + + /// Get message priority (1-10, higher value means higher priority) + pub fn priority(&self) -> Option { + self.priority + } + + /// Get lite topic of lite message + pub fn lite_topic(&self) -> Option<&str> { + self.lite_topic.as_deref() + } } #[cfg(test)] @@ -499,6 +628,63 @@ mod tests { let message = MessageBuilder::transaction_message_builder("test", vec![1, 2, 3]).build(); let mut message = message.unwrap(); assert!(message.transaction_enabled()); + + // Test lite message builder + let message = + MessageBuilder::lite_message_builder("parent_topic", vec![1, 2, 3], "lite_topic_001") + .build(); + assert!(message.is_ok()); + let mut message = message.unwrap(); + assert_eq!( + message.take_lite_topic(), + Some("lite_topic_001".to_string()) + ); + assert_eq!(message.get_message_type(), LITE); + + // Test lite topic conflicts with message_group + let message = MessageBuilder::builder() + .set_topic("test") + .set_body(vec![1, 2, 3]) + .set_lite_topic("lite_topic") + .set_message_group("group") + .build(); + assert!(message.is_err()); + let err = message.err().unwrap(); + assert_eq!(err.kind, ErrorKind::InvalidMessage); + assert_eq!( + err.message, + "lite_topic and message_group can not be set at the same time." + ); + + // Test lite topic conflicts with delivery_timestamp + let message = MessageBuilder::builder() + .set_topic("test") + .set_body(vec![1, 2, 3]) + .set_lite_topic("lite_topic") + .set_delivery_timestamp(123456789) + .build(); + assert!(message.is_err()); + let err = message.err().unwrap(); + assert_eq!(err.kind, ErrorKind::InvalidMessage); + assert_eq!( + err.message, + "lite_topic and delivery_timestamp can not be set at the same time." + ); + + // Test lite topic conflicts with priority + let message = MessageBuilder::builder() + .set_topic("test") + .set_body(vec![1, 2, 3]) + .set_lite_topic("lite_topic") + .set_priority(5) + .build(); + assert!(message.is_err()); + let err = message.err().unwrap(); + assert_eq!(err.kind, ErrorKind::InvalidMessage); + assert_eq!( + err.message, + "lite_topic and priority can not be set at the same time." + ); } #[test] @@ -565,4 +751,100 @@ mod tests { "localhost:8081" ); } + + #[test] + fn test_lite_message_view() { + // Test MessageView with lite_topic + let message_view = MessageView::from_pb_message( + pb::Message { + topic: Some(pb::Resource { + name: "parent_topic".to_string(), + ..Default::default() + }), + body: vec![1, 2, 3], + user_properties: HashMap::new(), + system_properties: Some(pb::SystemProperties { + message_id: "lite_msg_id".to_string(), + receipt_handle: Some("receipt_handle".to_string()), + lite_topic: Some("lite_topic_001".to_string()), + message_type: LITE as i32, + ..Default::default() + }), + }, + Endpoints::from_url("localhost:8081").unwrap(), + ) + .unwrap(); + + assert_eq!(message_view.message_id(), "lite_msg_id"); + assert_eq!(message_view.topic(), "parent_topic"); + assert_eq!(message_view.lite_topic(), Some("lite_topic_001")); + assert_eq!(message_view.body(), &[1, 2, 3]); + } + + #[test] + fn test_lite_message_without_lite_topic() { + // Test MessageView without lite_topic + let message_view = MessageView::from_pb_message( + pb::Message { + topic: Some(pb::Resource { + name: "normal_topic".to_string(), + ..Default::default() + }), + body: vec![4, 5, 6], + user_properties: HashMap::new(), + system_properties: Some(pb::SystemProperties { + message_id: "normal_msg_id".to_string(), + receipt_handle: Some("receipt_handle".to_string()), + lite_topic: None, + message_type: NORMAL as i32, + ..Default::default() + }), + }, + Endpoints::from_url("localhost:8081").unwrap(), + ) + .unwrap(); + + assert_eq!(message_view.message_id(), "normal_msg_id"); + assert_eq!(message_view.topic(), "normal_topic"); + assert_eq!(message_view.lite_topic(), None); + assert_eq!(message_view.body(), &[4, 5, 6]); + } + + #[test] + fn test_lite_message_builder_with_all_fields() { + // Test lite message builder with additional fields + let message = + MessageBuilder::lite_message_builder("parent_topic", vec![7, 8, 9], "lite_topic_full") + .set_tag("test-tag") + .set_keys(vec!["key1".to_string(), "key2".to_string()]) + .set_properties({ + let mut props = HashMap::new(); + props.insert("prop1".to_string(), "value1".to_string()); + props + }) + .build(); + + assert!(message.is_ok()); + let mut msg = message.unwrap(); + assert_eq!(msg.take_topic(), "parent_topic"); + assert_eq!(msg.take_lite_topic(), Some("lite_topic_full".to_string())); + assert_eq!(msg.take_body(), vec![7, 8, 9]); + assert_eq!(msg.take_tag(), Some("test-tag".to_string())); + assert_eq!( + msg.take_keys(), + vec!["key1".to_string(), "key2".to_string()] + ); + assert_eq!(msg.get_message_type(), LITE); + } + + #[test] + fn test_lite_message_type_enum_value() { + // Verify MessageType enum values match proto definition + assert_eq!(NORMAL as i32, 1); + assert_eq!(FIFO as i32, 2); + assert_eq!(DELAY as i32, 3); + assert_eq!(TRANSACTION as i32, 4); + assert_eq!(LITE as i32, 5); // LITE should be 5 + assert_eq!(PRIORITY as i32, 6); // PRIORITY should be 6 + } } diff --git a/rust/src/model/mod.rs b/rust/src/model/mod.rs index ca9a13c18..03f1364e4 100644 --- a/rust/src/model/mod.rs +++ b/rust/src/model/mod.rs @@ -20,4 +20,5 @@ pub mod common; pub mod message; pub(crate) mod message_id; +pub mod offset_option; pub mod transaction; diff --git a/rust/src/model/offset_option.rs b/rust/src/model/offset_option.rs new file mode 100644 index 000000000..9b8365e40 --- /dev/null +++ b/rust/src/model/offset_option.rs @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//! OffsetOption for specifying consume from offset in Lite subscription. + +use crate::pb; + +/// Policy for determining where to start consuming messages +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum OffsetPolicy { + /// Start from the last consumed offset (default behavior) + Last = 0, + /// Start from the minimum available offset (earliest message) + Min = 1, + /// Start from the maximum available offset (latest message) + Max = 2, +} + +impl From for i32 { + fn from(policy: OffsetPolicy) -> Self { + policy as i32 + } +} + +impl TryFrom for OffsetPolicy { + type Error = String; + + fn try_from(value: i32) -> Result { + match value { + 0 => Ok(OffsetPolicy::Last), + 1 => Ok(OffsetPolicy::Min), + 2 => Ok(OffsetPolicy::Max), + _ => Err(format!("Invalid offset policy value: {}", value)), + } + } +} + +/// Offset option for specifying where to start consuming messages +#[derive(Debug, Clone)] +pub struct OffsetOption { + offset_type: OffsetType, +} + +#[derive(Debug, Clone)] +enum OffsetType { + Policy(OffsetPolicy), + Offset(i64), + TailN(i64), + Timestamp(i64), +} + +impl OffsetOption { + /// Create an OffsetOption with a policy + pub fn from_policy(policy: OffsetPolicy) -> Self { + Self { + offset_type: OffsetType::Policy(policy), + } + } + + /// Create an OffsetOption from a specific offset + pub fn from_offset(offset: i64) -> Self { + Self { + offset_type: OffsetType::Offset(offset), + } + } + + /// Create an OffsetOption from tail N messages + pub fn from_tail_n(n: i64) -> Self { + Self { + offset_type: OffsetType::TailN(n), + } + } + + /// Create an OffsetOption from a timestamp (milliseconds since epoch) + pub fn from_timestamp(timestamp: i64) -> Self { + Self { + offset_type: OffsetType::Timestamp(timestamp), + } + } + + /// Convert to protobuf OffsetOption + pub(crate) fn to_protobuf(&self) -> pb::OffsetOption { + let offset_type = match &self.offset_type { + OffsetType::Policy(policy) => pb::offset_option::OffsetType::Policy(*policy as i32), + OffsetType::Offset(offset) => pb::offset_option::OffsetType::Offset(*offset), + OffsetType::TailN(n) => pb::offset_option::OffsetType::TailN(*n), + OffsetType::Timestamp(timestamp) => { + pb::offset_option::OffsetType::Timestamp(*timestamp) + } + }; + + pb::OffsetOption { + offset_type: Some(offset_type), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_offset_option_from_policy() { + let option = OffsetOption::from_policy(OffsetPolicy::Last); + let pb = option.to_protobuf(); + assert!(pb.offset_type.is_some()); + + if let Some(pb::offset_option::OffsetType::Policy(policy)) = pb.offset_type { + assert_eq!(policy, 0); + } else { + panic!("Expected Policy variant"); + } + } + + #[test] + fn test_offset_option_from_policy_min() { + let option = OffsetOption::from_policy(OffsetPolicy::Min); + let pb = option.to_protobuf(); + + if let Some(pb::offset_option::OffsetType::Policy(policy)) = pb.offset_type { + assert_eq!(policy, 1); + } else { + panic!("Expected Policy variant"); + } + } + + #[test] + fn test_offset_option_from_policy_max() { + let option = OffsetOption::from_policy(OffsetPolicy::Max); + let pb = option.to_protobuf(); + + if let Some(pb::offset_option::OffsetType::Policy(policy)) = pb.offset_type { + assert_eq!(policy, 2); + } else { + panic!("Expected Policy variant"); + } + } + + #[test] + fn test_offset_option_from_offset() { + let option = OffsetOption::from_offset(12345); + let pb = option.to_protobuf(); + + if let Some(pb::offset_option::OffsetType::Offset(offset)) = pb.offset_type { + assert_eq!(offset, 12345); + } else { + panic!("Expected Offset variant"); + } + } + + #[test] + fn test_offset_option_from_offset_negative() { + let option = OffsetOption::from_offset(-100); + let pb = option.to_protobuf(); + + if let Some(pb::offset_option::OffsetType::Offset(offset)) = pb.offset_type { + assert_eq!(offset, -100); + } else { + panic!("Expected Offset variant"); + } + } + + #[test] + fn test_offset_option_from_tail_n() { + let option = OffsetOption::from_tail_n(100); + let pb = option.to_protobuf(); + + if let Some(pb::offset_option::OffsetType::TailN(n)) = pb.offset_type { + assert_eq!(n, 100); + } else { + panic!("Expected TailN variant"); + } + } + + #[test] + fn test_offset_option_from_timestamp() { + let timestamp = 1234567890000; + let option = OffsetOption::from_timestamp(timestamp); + let pb = option.to_protobuf(); + + if let Some(pb::offset_option::OffsetType::Timestamp(ts)) = pb.offset_type { + assert_eq!(ts, timestamp); + } else { + panic!("Expected Timestamp variant"); + } + } + + #[test] + fn test_offset_policy_conversion() { + assert_eq!(i32::from(OffsetPolicy::Last), 0); + assert_eq!(i32::from(OffsetPolicy::Min), 1); + assert_eq!(i32::from(OffsetPolicy::Max), 2); + + assert_eq!(OffsetPolicy::try_from(0).unwrap(), OffsetPolicy::Last); + assert_eq!(OffsetPolicy::try_from(1).unwrap(), OffsetPolicy::Min); + assert_eq!(OffsetPolicy::try_from(2).unwrap(), OffsetPolicy::Max); + assert!(OffsetPolicy::try_from(3).is_err()); + assert!(OffsetPolicy::try_from(-1).is_err()); + } + + #[test] + fn test_offset_policy_clone_and_copy() { + let policy1 = OffsetPolicy::Last; + let policy2 = policy1; // Copy + let policy3 = policy1.clone(); // Clone + + assert_eq!(policy1, policy2); + assert_eq!(policy1, policy3); + } + + #[test] + fn test_offset_policy_debug() { + let policy = OffsetPolicy::Min; + let debug_str = format!("{:?}", policy); + assert!(debug_str.contains("Min")); + } +} diff --git a/rust/src/model/transaction.rs b/rust/src/model/transaction.rs index 6d5e90805..eb460426e 100644 --- a/rust/src/model/transaction.rs +++ b/rust/src/model/transaction.rs @@ -169,6 +169,7 @@ mod tests { message_id: "".to_string(), transaction_id: "".to_string(), offset: 0, + recall_handle: "".to_string(), }), ); transaction.commit().await @@ -196,6 +197,7 @@ mod tests { message_id: "".to_string(), transaction_id: "".to_string(), offset: 0, + recall_handle: "".to_string(), }), ); transaction.rollback().await diff --git a/rust/src/pb/apache.rocketmq.v2.rs b/rust/src/pb/apache.rocketmq.v2.rs index 578895f91..60e7f7e04 100644 --- a/rust/src/pb/apache.rocketmq.v2.rs +++ b/rust/src/pb/apache.rocketmq.v2.rs @@ -208,6 +208,12 @@ pub struct SystemProperties { /// Information to identify whether this message is from dead letter queue. #[prost(message, optional, tag = "20")] pub dead_letter_queue: ::core::option::Option, + /// lite topic + #[prost(string, optional, tag = "21")] + pub lite_topic: ::core::option::Option<::prost::alloc::string::String>, + /// Priority of message, which is optional + #[prost(int32, optional, tag = "22")] + pub priority: ::core::option::Option, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -360,6 +366,14 @@ pub struct Subscription { /// push consumer. #[prost(message, optional, tag = "5")] pub long_polling_timeout: ::core::option::Option<::prost_types::Duration>, + /// Only lite push consumer + /// client-side lite subscription quota limit + #[prost(int32, optional, tag = "6")] + pub lite_subscription_quota: ::core::option::Option, + /// Only lite push consumer + /// Maximum length limit for lite topic + #[prost(int32, optional, tag = "7")] + pub max_lite_topic_size: ::core::option::Option, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -371,6 +385,66 @@ pub struct Metric { #[prost(message, optional, tag = "2")] pub endpoints: ::core::option::Option, } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct OffsetOption { + #[prost(oneof = "offset_option::OffsetType", tags = "1, 2, 3, 4")] + pub offset_type: ::core::option::Option, +} +/// Nested message and enum types in `OffsetOption`. +pub mod offset_option { + #[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + Hash, + PartialOrd, + Ord, + ::prost::Enumeration + )] + #[repr(i32)] + pub enum Policy { + Last = 0, + Min = 1, + Max = 2, + } + impl Policy { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Policy::Last => "LAST", + Policy::Min => "MIN", + Policy::Max => "MAX", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "LAST" => Some(Self::Last), + "MIN" => Some(Self::Min), + "MAX" => Some(Self::Max), + _ => None, + } + } + } + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum OffsetType { + #[prost(enumeration = "Policy", tag = "1")] + Policy(i32), + #[prost(int64, tag = "2")] + Offset(i64), + #[prost(int64, tag = "3")] + TailN(i64), + #[prost(int64, tag = "4")] + Timestamp(i64), + } +} #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] pub enum TransactionResolution { @@ -537,6 +611,10 @@ pub enum MessageType { /// Messages that are transactional. Only committed messages are delivered to /// subscribers. Transaction = 4, + /// lite topic + Lite = 5, + /// Messages that lower prioritised ones may need to wait for higher priority messages to be processed first + Priority = 6, } impl MessageType { /// String value of the enum field names used in the ProtoBuf definition. @@ -550,6 +628,8 @@ impl MessageType { MessageType::Fifo => "FIFO", MessageType::Delay => "DELAY", MessageType::Transaction => "TRANSACTION", + MessageType::Lite => "LITE", + MessageType::Priority => "PRIORITY", } } /// Creates an enum from field names used in the ProtoBuf definition. @@ -560,6 +640,8 @@ impl MessageType { "FIFO" => Some(Self::Fifo), "DELAY" => Some(Self::Delay), "TRANSACTION" => Some(Self::Transaction), + "LITE" => Some(Self::Lite), + "PRIORITY" => Some(Self::Priority), _ => None, } } @@ -610,6 +692,8 @@ pub enum ClientType { PushConsumer = 2, SimpleConsumer = 3, PullConsumer = 4, + LitePushConsumer = 5, + LiteSimpleConsumer = 6, } impl ClientType { /// String value of the enum field names used in the ProtoBuf definition. @@ -623,6 +707,8 @@ impl ClientType { ClientType::PushConsumer => "PUSH_CONSUMER", ClientType::SimpleConsumer => "SIMPLE_CONSUMER", ClientType::PullConsumer => "PULL_CONSUMER", + ClientType::LitePushConsumer => "LITE_PUSH_CONSUMER", + ClientType::LiteSimpleConsumer => "LITE_SIMPLE_CONSUMER", } } /// Creates an enum from field names used in the ProtoBuf definition. @@ -633,6 +719,8 @@ impl ClientType { "PUSH_CONSUMER" => Some(Self::PushConsumer), "SIMPLE_CONSUMER" => Some(Self::SimpleConsumer), "PULL_CONSUMER" => Some(Self::PullConsumer), + "LITE_PUSH_CONSUMER" => Some(Self::LitePushConsumer), + "LITE_SIMPLE_CONSUMER" => Some(Self::LiteSimpleConsumer), _ => None, } } @@ -712,6 +800,10 @@ pub enum Code { ClientIdRequired = 40017, /// Polling time is illegal. IllegalPollingTime = 40018, + /// Offset is illegal. + IllegalOffset = 40019, + /// Format of lite topic is illegal. + IllegalLiteTopic = 40020, /// Generic code indicates that the client request lacks valid authentication /// credentials for the requested resource. Unauthorized = 40100, @@ -727,12 +819,16 @@ pub enum Code { TopicNotFound = 40402, /// Consumer group resource does not exist. ConsumerGroupNotFound = 40403, + /// Offset not found from server. + OffsetNotFound = 40404, /// Generic code representing client side timeout when connecting to, reading data from, or write data to server. RequestTimeout = 40800, /// Generic code represents that the request entity is larger than limits defined by server. PayloadTooLarge = 41300, /// Message body size exceeds the threshold. MessageBodyTooLarge = 41301, + /// Message body is empty. + MessageBodyEmpty = 41302, /// Generic code for use cases where pre-conditions are not met. /// For example, if a producer instance is used to publish messages without prior start() invocation, /// this error code will be raised. @@ -740,6 +836,9 @@ pub enum Code { /// Generic code indicates that too many requests are made in short period of duration. /// Requests are throttled. TooManyRequests = 42900, + /// LiteTopic related quota exceeded + LiteTopicQuotaExceeded = 42901, + LiteSubscriptionQuotaExceeded = 42902, /// Generic code for the case that the server is unwilling to process the request because its header fields are too large. /// The request may be resubmitted after reducing the size of the request header fields. RequestHeaderFieldsTooLarge = 43100, @@ -812,6 +911,8 @@ impl Code { Code::MessageCorrupted => "MESSAGE_CORRUPTED", Code::ClientIdRequired => "CLIENT_ID_REQUIRED", Code::IllegalPollingTime => "ILLEGAL_POLLING_TIME", + Code::IllegalOffset => "ILLEGAL_OFFSET", + Code::IllegalLiteTopic => "ILLEGAL_LITE_TOPIC", Code::Unauthorized => "UNAUTHORIZED", Code::PaymentRequired => "PAYMENT_REQUIRED", Code::Forbidden => "FORBIDDEN", @@ -819,11 +920,15 @@ impl Code { Code::MessageNotFound => "MESSAGE_NOT_FOUND", Code::TopicNotFound => "TOPIC_NOT_FOUND", Code::ConsumerGroupNotFound => "CONSUMER_GROUP_NOT_FOUND", + Code::OffsetNotFound => "OFFSET_NOT_FOUND", Code::RequestTimeout => "REQUEST_TIMEOUT", Code::PayloadTooLarge => "PAYLOAD_TOO_LARGE", Code::MessageBodyTooLarge => "MESSAGE_BODY_TOO_LARGE", + Code::MessageBodyEmpty => "MESSAGE_BODY_EMPTY", Code::PreconditionFailed => "PRECONDITION_FAILED", Code::TooManyRequests => "TOO_MANY_REQUESTS", + Code::LiteTopicQuotaExceeded => "LITE_TOPIC_QUOTA_EXCEEDED", + Code::LiteSubscriptionQuotaExceeded => "LITE_SUBSCRIPTION_QUOTA_EXCEEDED", Code::RequestHeaderFieldsTooLarge => "REQUEST_HEADER_FIELDS_TOO_LARGE", Code::MessagePropertiesTooLarge => "MESSAGE_PROPERTIES_TOO_LARGE", Code::InternalError => "INTERNAL_ERROR", @@ -866,6 +971,8 @@ impl Code { "MESSAGE_CORRUPTED" => Some(Self::MessageCorrupted), "CLIENT_ID_REQUIRED" => Some(Self::ClientIdRequired), "ILLEGAL_POLLING_TIME" => Some(Self::IllegalPollingTime), + "ILLEGAL_OFFSET" => Some(Self::IllegalOffset), + "ILLEGAL_LITE_TOPIC" => Some(Self::IllegalLiteTopic), "UNAUTHORIZED" => Some(Self::Unauthorized), "PAYMENT_REQUIRED" => Some(Self::PaymentRequired), "FORBIDDEN" => Some(Self::Forbidden), @@ -873,11 +980,17 @@ impl Code { "MESSAGE_NOT_FOUND" => Some(Self::MessageNotFound), "TOPIC_NOT_FOUND" => Some(Self::TopicNotFound), "CONSUMER_GROUP_NOT_FOUND" => Some(Self::ConsumerGroupNotFound), + "OFFSET_NOT_FOUND" => Some(Self::OffsetNotFound), "REQUEST_TIMEOUT" => Some(Self::RequestTimeout), "PAYLOAD_TOO_LARGE" => Some(Self::PayloadTooLarge), "MESSAGE_BODY_TOO_LARGE" => Some(Self::MessageBodyTooLarge), + "MESSAGE_BODY_EMPTY" => Some(Self::MessageBodyEmpty), "PRECONDITION_FAILED" => Some(Self::PreconditionFailed), "TOO_MANY_REQUESTS" => Some(Self::TooManyRequests), + "LITE_TOPIC_QUOTA_EXCEEDED" => Some(Self::LiteTopicQuotaExceeded), + "LITE_SUBSCRIPTION_QUOTA_EXCEEDED" => { + Some(Self::LiteSubscriptionQuotaExceeded) + } "REQUEST_HEADER_FIELDS_TOO_LARGE" => Some(Self::RequestHeaderFieldsTooLarge), "MESSAGE_PROPERTIES_TOO_LARGE" => Some(Self::MessagePropertiesTooLarge), "INTERNAL_ERROR" => Some(Self::InternalError), @@ -956,6 +1069,38 @@ impl Language { } #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] +pub enum LiteSubscriptionAction { + PartialAdd = 0, + PartialRemove = 1, + CompleteAdd = 2, + CompleteRemove = 3, +} +impl LiteSubscriptionAction { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + LiteSubscriptionAction::PartialAdd => "PARTIAL_ADD", + LiteSubscriptionAction::PartialRemove => "PARTIAL_REMOVE", + LiteSubscriptionAction::CompleteAdd => "COMPLETE_ADD", + LiteSubscriptionAction::CompleteRemove => "COMPLETE_REMOVE", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "PARTIAL_ADD" => Some(Self::PartialAdd), + "PARTIAL_REMOVE" => Some(Self::PartialRemove), + "COMPLETE_ADD" => Some(Self::CompleteAdd), + "COMPLETE_REMOVE" => Some(Self::CompleteRemove), + _ => None, + } + } +} +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] pub enum QueryOffsetPolicy { /// Use this option if client wishes to playback all existing messages. Beginning = 0, @@ -1036,6 +1181,9 @@ pub struct SendResultEntry { pub transaction_id: ::prost::alloc::string::String, #[prost(int64, tag = "4")] pub offset: i64, + /// Unique handle to identify message to recall, support delay message for now. + #[prost(string, tag = "5")] + pub recall_handle: ::prost::alloc::string::String, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -1084,6 +1232,8 @@ pub struct ReceiveMessageRequest { pub auto_renew: bool, #[prost(message, optional, tag = "7")] pub long_polling_timeout: ::core::option::Option<::prost_types::Duration>, + #[prost(string, optional, tag = "8")] + pub attempt_id: ::core::option::Option<::prost::alloc::string::String>, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -1112,6 +1262,8 @@ pub struct AckMessageEntry { pub message_id: ::prost::alloc::string::String, #[prost(string, tag = "2")] pub receipt_handle: ::prost::alloc::string::String, + #[prost(string, optional, tag = "3")] + pub lite_topic: ::core::option::Option<::prost::alloc::string::String>, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -1161,6 +1313,8 @@ pub struct ForwardMessageToDeadLetterQueueRequest { pub delivery_attempt: i32, #[prost(int32, tag = "6")] pub max_delivery_attempts: i32, + #[prost(string, optional, tag = "7")] + pub lite_topic: ::core::option::Option<::prost::alloc::string::String>, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -1212,6 +1366,12 @@ pub struct PrintThreadStackTraceCommand { } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] +pub struct ReconnectEndpointsCommand { + #[prost(string, tag = "1")] + pub nonce: ::prost::alloc::string::String, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct ThreadStackTrace { #[prost(string, tag = "1")] pub nonce: ::prost::alloc::string::String, @@ -1242,10 +1402,16 @@ pub struct RecoverOrphanedTransactionCommand { } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] +pub struct NotifyUnsubscribeLiteCommand { + #[prost(string, tag = "1")] + pub lite_topic: ::prost::alloc::string::String, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct TelemetryCommand { #[prost(message, optional, tag = "1")] pub status: ::core::option::Option, - #[prost(oneof = "telemetry_command::Command", tags = "2, 3, 4, 5, 6, 7")] + #[prost(oneof = "telemetry_command::Command", tags = "2, 3, 4, 5, 6, 7, 8, 9")] pub command: ::core::option::Option, } /// Nested message and enum types in `TelemetryCommand`. @@ -1275,6 +1441,12 @@ pub mod telemetry_command { /// Request client to verify the consumption of the appointed message. #[prost(message, tag = "7")] VerifyMessageCommand(super::VerifyMessageCommand), + /// Request client to reconnect server use the latest endpoints. + #[prost(message, tag = "8")] + ReconnectEndpointsCommand(super::ReconnectEndpointsCommand), + /// Request client to unsubscribe lite topic. + #[prost(message, tag = "9")] + NotifyUnsubscribeLiteCommand(super::NotifyUnsubscribeLiteCommand), } } #[allow(clippy::derive_partial_eq_without_eq)] @@ -1306,6 +1478,11 @@ pub struct ChangeInvisibleDurationRequest { /// For message tracing #[prost(string, tag = "5")] pub message_id: ::prost::alloc::string::String, + #[prost(string, optional, tag = "6")] + pub lite_topic: ::core::option::Option<::prost::alloc::string::String>, + /// If true, server will not increment the retry times for this message + #[prost(bool, optional, tag = "7")] + pub suspend: ::core::option::Option, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -1401,6 +1578,48 @@ pub struct QueryOffsetResponse { #[prost(int64, tag = "2")] pub offset: i64, } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct RecallMessageRequest { + #[prost(message, optional, tag = "1")] + pub topic: ::core::option::Option, + /// Refer to SendResultEntry. + #[prost(string, tag = "2")] + pub recall_handle: ::prost::alloc::string::String, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct RecallMessageResponse { + #[prost(message, optional, tag = "1")] + pub status: ::core::option::Option, + #[prost(string, tag = "2")] + pub message_id: ::prost::alloc::string::String, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SyncLiteSubscriptionRequest { + #[prost(enumeration = "LiteSubscriptionAction", tag = "1")] + pub action: i32, + /// bindTopic for lite push consumer + #[prost(message, optional, tag = "2")] + pub topic: ::core::option::Option, + /// consumer group + #[prost(message, optional, tag = "3")] + pub group: ::core::option::Option, + /// lite subscription set of lite topics + #[prost(string, repeated, tag = "4")] + pub lite_topic_set: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + #[prost(int64, optional, tag = "5")] + pub version: ::core::option::Option, + #[prost(message, optional, tag = "6")] + pub offset_option: ::core::option::Option, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SyncLiteSubscriptionResponse { + #[prost(message, optional, tag = "1")] + pub status: ::core::option::Option, +} /// Generated client implementations. pub mod messaging_service_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] @@ -1779,6 +1998,8 @@ pub mod messaging_service_client { ); self.inner.server_streaming(req, path, codec).await } + /// Update the consumption progress of the designated queue of the + /// consumer group to the remote. pub async fn update_offset( &mut self, request: impl tonic::IntoRequest, @@ -1809,6 +2030,8 @@ pub mod messaging_service_client { ); self.inner.unary(req, path, codec).await } + /// Query the consumption progress of the designated queue of the + /// consumer group to the remote. pub async fn get_offset( &mut self, request: impl tonic::IntoRequest, @@ -1836,6 +2059,7 @@ pub mod messaging_service_client { ); self.inner.unary(req, path, codec).await } + /// Query the offset of the designated queue by the query offset policy. pub async fn query_offset( &mut self, request: impl tonic::IntoRequest, @@ -1994,6 +2218,70 @@ pub mod messaging_service_client { ); self.inner.unary(req, path, codec).await } + /// Recall a message, + /// for delay message, should recall before delivery time, like the rollback operation of transaction message, + /// for normal message, not supported for now. + pub async fn recall_message( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/apache.rocketmq.v2.MessagingService/RecallMessage", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "apache.rocketmq.v2.MessagingService", + "RecallMessage", + ), + ); + self.inner.unary(req, path, codec).await + } + /// Sync lite subscription info, lite push consumer only + pub async fn sync_lite_subscription( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/apache.rocketmq.v2.MessagingService/SyncLiteSubscription", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "apache.rocketmq.v2.MessagingService", + "SyncLiteSubscription", + ), + ); + self.inner.unary(req, path, codec).await + } } } #[allow(clippy::derive_partial_eq_without_eq)] diff --git a/rust/src/producer.rs b/rust/src/producer.rs index 8bce38aa5..bcb3b7d16 100644 --- a/rust/src/producer.rs +++ b/rust/src/producer.rs @@ -38,7 +38,10 @@ use crate::model::transaction::{ use crate::pb; use crate::pb::settings::PubSub; use crate::pb::telemetry_command::Command::{RecoverOrphanedTransactionCommand, Settings}; -use crate::pb::{Encoding, EndTransactionRequest, Resource, SystemProperties, TransactionSource}; +use crate::pb::{ + Encoding, EndTransactionRequest, RecallMessageRequest, Resource, SystemProperties, + TransactionSource, +}; use crate::session::RPCClient; use crate::util::{ build_endpoints_by_message_queue, build_producer_settings, handle_response_status, @@ -71,6 +74,7 @@ impl Producer { const OPERATION_SEND_MESSAGE: &'static str = "producer.send_message"; const OPERATION_SEND_TRANSACTION_MESSAGE: &'static str = "producer.send_transaction_message"; const OPERATION_END_TRANSACTION: &'static str = "producer.end_transaction"; + const OPERATION_RECALL_MESSAGE: &'static str = "producer.recall_message"; /// Create a new producer instance /// /// # Arguments @@ -313,6 +317,36 @@ impl Producer { .take_delivery_timestamp() .map(|seconds| Timestamp { seconds, nanos: 0 }); + let priority = message.take_priority(); + + let lite_topic = message.take_lite_topic(); + + // Validate lite topic constraints + if lite_topic.is_some() { + // Lite topic cannot be used with message_group, delivery_timestamp, or priority + if message_group.is_some() { + return Err(ClientError::new( + ErrorKind::InvalidMessage, + "lite_topic and message_group cannot be set at the same time", + Self::OPERATION_SEND_MESSAGE, + )); + } + if delivery_timestamp.is_some() { + return Err(ClientError::new( + ErrorKind::InvalidMessage, + "lite_topic and delivery_timestamp cannot be set at the same time", + Self::OPERATION_SEND_MESSAGE, + )); + } + if priority.is_some() { + return Err(ClientError::new( + ErrorKind::InvalidMessage, + "lite_topic and priority cannot be set at the same time", + Self::OPERATION_SEND_MESSAGE, + )); + } + } + if message.transaction_enabled() { message_group = None; delivery_timestamp = None; @@ -335,6 +369,8 @@ impl Producer { message_id: message.take_message_id(), message_group, delivery_timestamp, + priority, + lite_topic, message_type: message.get_message_type() as i32, born_host: HOST_NAME.clone(), born_timestamp: born_timestamp.clone(), @@ -459,6 +495,98 @@ impl Producer { } self.client.shutdown().await } + + /// Recall a scheduled/delayed message before it is delivered. + /// + /// This operation requires server support and only works for delay/timed messages. + /// The recall_handle can be obtained from SendReceipt after sending a delay message. + /// + /// # Arguments + /// + /// * `topic` - the topic of the message to recall + /// * `recall_handle` - the handle returned when sending the delay message + /// + /// # Returns + /// + /// Returns the message_id of the recalled message if successful. + /// + /// # Example + /// + /// ```ignore + /// use rocketmq::model::message::MessageBuilder; + /// use std::time::{SystemTime, Duration}; + /// + /// // Create and send a delay message + /// let delay_message = MessageBuilder::delay_message_builder( + /// "topic", + /// b"message body".to_vec(), + /// (SystemTime::now() + Duration::from_secs(60)) + /// .duration_since(SystemTime::UNIX_EPOCH) + /// .unwrap() + /// .as_secs() as i64, + /// ) + /// .build() + /// .unwrap(); + /// + /// let send_receipt = producer.send(delay_message).await?; + /// if let Some(recall_handle) = send_receipt.recall_handle() { + /// let recalled_message_id = producer.recall_message("topic", recall_handle).await?; + /// println!("Recalled message: {}", recalled_message_id); + /// } + /// ``` + pub async fn recall_message( + &self, + topic: impl Into, + recall_handle: impl Into, + ) -> Result { + let topic_name = topic.into(); + let recall_handle_str = recall_handle.into(); + + let route = self.client.topic_route(&topic_name, true).await?; + let endpoints = route + .queue + .first() + .ok_or_else(|| { + ClientError::new( + ErrorKind::NoBrokerAvailable, + "no message queue available", + Self::OPERATION_RECALL_MESSAGE, + ) + })? + .broker + .as_ref() + .ok_or_else(|| { + ClientError::new( + ErrorKind::NoBrokerAvailable, + "no broker available", + Self::OPERATION_RECALL_MESSAGE, + ) + })? + .endpoints + .as_ref() + .ok_or_else(|| { + ClientError::new( + ErrorKind::NoBrokerAvailable, + "no endpoints available", + Self::OPERATION_RECALL_MESSAGE, + ) + }); + + let endpoints = crate::model::common::Endpoints::from_pb_endpoints(endpoints?.clone()); + + let request = RecallMessageRequest { + topic: Some(Resource { + name: topic_name, + resource_namespace: self.get_resource_namespace(), + }), + recall_handle: recall_handle_str, + }; + + let response = self.client.recall_message(&endpoints, request).await?; + handle_response_status(response.status, Self::OPERATION_RECALL_MESSAGE)?; + + Ok(response.message_id) + } } #[cfg(test)] @@ -617,6 +745,8 @@ mod tests { delivery_timestamp: None, transaction_enabled: false, message_type: MessageType::TRANSACTION, + priority: None, + lite_topic: None, }]; let result = producer.transform_messages_to_protobuf(messages).await; assert!(result.is_err()); diff --git a/rust/src/push_consumer.rs b/rust/src/push_consumer.rs index 8672a4a3f..31d9ea38c 100644 --- a/rust/src/push_consumer.rs +++ b/rust/src/push_consumer.rs @@ -100,6 +100,34 @@ impl PushConsumer { }) } + /// Create a new PushConsumer with an existing client (for LitePushConsumer) + /// The client should already be configured with the correct ClientType + pub(crate) fn new_with_client( + client: Client, + option: PushConsumerOption, + message_listener: MessageListener, + ) -> Result { + if option.consumer_group().is_empty() { + return Err(ClientError::new( + ErrorKind::Config, + "consumer group is required.", + OPERATION_NEW_PUSH_CONSUMER, + )); + } + Ok(Self { + client, + message_listener: Arc::new(message_listener), + option: Arc::new(RwLock::new(option)), + shutdown_token: None, + task_tracker: None, + }) + } + + /// Check if the PushConsumer is started + pub(crate) fn check_started(&self, operation: &'static str) -> Result<(), ClientError> { + self.client.check_started(operation) + } + pub async fn start(&mut self) -> Result<(), ClientError> { let (telemetry_command_tx, mut telemetry_command_rx) = mpsc::channel(16); self.client.start(telemetry_command_tx).await?; @@ -181,8 +209,8 @@ impl PushConsumer { response.assignments, retry_policy_inner.clone(), ).await; - if result.is_err() { - error!("process assignments failed: {:?}", result.unwrap_err()); + if let Err(e) = result { + error!("process assignments failed: {:?}", e); } } else { error!("query assignment failed, no status in response."); @@ -207,6 +235,120 @@ impl PushConsumer { Ok(()) } + /// Start with external telemetry channel (for LitePushConsumer) + pub async fn start_with_telemetry( + &mut self, + _telemetry_command_tx: mpsc::Sender, + ) -> Result<(), ClientError> { + // Extract the command channel from TelemetryCommand + let (command_tx, mut command_rx) = mpsc::channel(16); + self.client.start(command_tx).await?; + let option = Arc::clone(&self.option); + let mut rpc_client = self.client.get_session().await?; + let route_manager = self.client.get_route_manager(); + let topics; + { + topics = option + .read() + .subscription_expressions() + .keys() + .cloned() + .collect(); + } + route_manager + .sync_topic_routes(&mut rpc_client, topics) + .await?; + + let mut actor_table: HashMap = HashMap::new(); + let message_listener = Arc::clone(&self.message_listener); + let retry_policy: Arc>> = Arc::new(Mutex::new(None)); + + let shutdown_token = CancellationToken::new(); + self.shutdown_token = Some(shutdown_token.clone()); + let task_tracker = TaskTracker::new(); + self.task_tracker = Some(task_tracker.clone()); + + task_tracker.spawn(async move { + let mut scan_assignments_timer = + tokio::time::interval(std::time::Duration::from_secs(30)); + loop { + select! { + command = command_rx.recv() => { + if let Some(command) = command { + let remote_backoff_policy = BackOffRetryPolicy::try_from(command); + if let Ok(remote_backoff_policy) = remote_backoff_policy { + retry_policy.lock().replace(remote_backoff_policy); + } + } + } + _ = scan_assignments_timer.tick() => { + let option_retry_policy; + { + option_retry_policy = retry_policy.lock().clone(); + } + if option_retry_policy.is_none() { + warn!("retry policy is not set. skip scanning."); + continue; + } + let retry_policy_inner = option_retry_policy.unwrap(); + let consumer_option; + { + consumer_option = option.read().clone(); + } + let subscription_table = consumer_option.subscription_expressions(); + // query endpoints from topic route + for topic in subscription_table.keys() { + if let Some(endpoints) = route_manager.pick_endpoints(topic.as_str()) { + let request = QueryAssignmentRequest { + topic: Some(Resource { + name: topic.to_string(), + resource_namespace: consumer_option.namespace().to_string(), + }), + group: Some(Resource { + name: consumer_option.consumer_group().to_string(), + resource_namespace: consumer_option.namespace().to_string(), + }), + endpoints: Some(endpoints.into_inner()), + }; + let result = rpc_client.query_assignment(request).await; + if let Ok(response) = result { + if handle_response_status(response.status, OPERATION_START_PUSH_CONSUMER).is_ok() { + let result = Self::process_assignments( + &rpc_client, + &consumer_option, + Arc::clone(&message_listener), + &mut actor_table, + response.assignments, + retry_policy_inner.clone(), + ).await; + if let Err(e) = result { + error!("process assignments failed: {:?}", e); + } + } else { + error!("query assignment failed, no status in response."); + } + } else { + error!("query assignment failed: {:?}", result.unwrap_err()); + } + } + } + } + _ = shutdown_token.cancelled() => { + let entries = actor_table.drain(); + info!("shutdown {:?} actors", entries.len()); + for (_, actor) in entries { + let _ = actor.shutdown().await; + } + break; + } + } + } + }); + + info!("PushConsumer started with external telemetry"); + Ok(()) + } + async fn process_assignments( rpc_client: &Session, option: &PushConsumerOption, @@ -278,6 +420,19 @@ impl PushConsumer { Ok(()) } + /// Shutdown without consuming self (for LitePushConsumer) + pub async fn shutdown_ref(&mut self) -> Result<(), ClientError> { + if let Some(shutdown_token) = self.shutdown_token.take() { + shutdown_token.cancel(); + } + if let Some(task_tracker) = self.task_tracker.take() { + task_tracker.close(); + task_tracker.wait().await; + } + self.client.shutdown_ref().await?; + Ok(()) + } + async fn receive_messages( rpc_client: &mut T, message_queue: &MessageQueue, @@ -303,6 +458,7 @@ impl PushConsumer { long_polling_timeout: Some( prost_types::Duration::try_from(*option.long_polling_timeout()).unwrap(), ), + attempt_id: None, }; let responses = rpc_client.receive_message(request).await?; let mut messages: Vec = Vec::with_capacity(option.batch_size() as usize); @@ -314,8 +470,7 @@ impl PushConsumer { let mut _delivery_timestamp: Option = None; for response in responses { - if response.content.is_some() { - let content = response.content.unwrap(); + if let Some(content) = response.content { match content { Content::Status(response_status) => { // Store the status for later processing @@ -492,10 +647,12 @@ impl MessageQueueActor { let message_queue = self.message_queue.clone(); let option = self.option.clone(); let retry_policy = self.retry_policy.clone(); + // TODO: Pass is_lite_consumer from Client when LitePushConsumer is fully integrated let mut ack_processor = AckEntryProcessor::new( self.rpc_client.shadow_session(), self.option.get_consumer_group_resource(), self.message_queue.topic.to_owned(), + false, // Default to false for now, will be set by LitePushConsumer ); ack_processor.start().await?; let shutdown_token = shutdown_token.clone(); @@ -623,39 +780,123 @@ impl FifoConsumerWorker { ) -> Result<(), ClientError> { let messages = PushConsumer::receive_messages(&mut self.rpc_client, message_queue, option).await?; + + // If FIFO consume accelerator is enabled, consume messages in parallel by messageGroup + if option.enable_fifo_consume_accelerator() && !messages.is_empty() { + self.consume_with_accelerator(messages, ack_processor, retry_policy) + .await; + } else { + // Use sequential consumption for standard FIFO behavior + for message in messages { + self.consume_message_sequentially(message, ack_processor, retry_policy) + .await; + } + } + + Ok(()) + } + + /// Consume messages with accelerator: messages with the same messageGroup are consumed sequentially, + /// but messages with different messageGroups are consumed in parallel. + async fn consume_with_accelerator( + &self, + messages: Vec, + ack_processor: &mut AckEntryProcessor, + retry_policy: &BackOffRetryPolicy, + ) { + let message_count = messages.len(); + + // Group messages by message_group + let mut grouped_messages: HashMap, Vec> = HashMap::new(); for message in messages { - let mut delivery_attempt = message.delivery_attempt(); - let max_delivery_attempts = retry_policy.get_max_attempts(); - loop { - let consume_result = (self.message_listener)(&message); - match consume_result { - ConsumeResult::SUCCESS => { - ack_processor.ack_message(message).await; - break; - } - ConsumeResult::FAILURE => { - delivery_attempt += 1; - if delivery_attempt > max_delivery_attempts { - ack_processor - .forward_to_deadletter_queue( - message, - delivery_attempt, - max_delivery_attempts, - ) - .await; - break; - } else { - tokio::time::sleep( - retry_policy.get_next_attempt_delay(delivery_attempt), + let group = message.message_group().map(|s| s.to_string()); + grouped_messages.entry(group).or_default().push(message); + } + + info!( + "FIFO consume accelerator enabled, message_count={}, group_count={}", + message_count, + grouped_messages.len() + ); + + // Spawn a task for each message group to consume messages in parallel + let mut handles = vec![]; + for (_group, group_messages) in grouped_messages { + let message_listener = Arc::clone(&self.message_listener); + let mut ack_processor_shadow = ack_processor.shadow_self(); + let retry_policy = retry_policy.clone(); + + let handle = tokio::spawn(async move { + // Within the same group, consume messages sequentially + for message in group_messages { + Self::consume_message_sequentially_static( + message, + &message_listener, + &mut ack_processor_shadow, + &retry_policy, + ) + .await; + } + }); + handles.push(handle); + } + + // Wait for all group tasks to complete + for handle in handles { + let _ = handle.await; + } + } + + /// Consume a single message sequentially with retries (instance method) + async fn consume_message_sequentially( + &self, + message: MessageView, + ack_processor: &mut AckEntryProcessor, + retry_policy: &BackOffRetryPolicy, + ) { + Self::consume_message_sequentially_static( + message, + &self.message_listener, + ack_processor, + retry_policy, + ) + .await; + } + + /// Consume a single message sequentially with retries (static method for use in spawned tasks) + async fn consume_message_sequentially_static( + message: MessageView, + message_listener: &Arc, + ack_processor: &mut AckEntryProcessor, + retry_policy: &BackOffRetryPolicy, + ) { + let mut delivery_attempt = message.delivery_attempt(); + let max_delivery_attempts = retry_policy.get_max_attempts(); + loop { + let consume_result = (message_listener)(&message); + match consume_result { + ConsumeResult::SUCCESS => { + ack_processor.ack_message(message).await; + break; + } + ConsumeResult::FAILURE => { + delivery_attempt += 1; + if delivery_attempt > max_delivery_attempts { + ack_processor + .forward_to_deadletter_queue( + message, + delivery_attempt, + max_delivery_attempts, ) .await; - } + break; + } else { + tokio::time::sleep(retry_policy.get_next_attempt_delay(delivery_attempt)) + .await; } } } } - - Ok(()) } } @@ -663,17 +904,24 @@ struct AckEntryProcessor { rpc_client: Session, consumer_group: Resource, topic: Resource, + is_lite_consumer: bool, // Flag to indicate if this is a lite consumer ack_entry_sender: Option>, shutdown_token: Option, task_tracker: Option, } impl AckEntryProcessor { - fn new(rpc_client: Session, consumer_group: Resource, topic: Resource) -> Self { + fn new( + rpc_client: Session, + consumer_group: Resource, + topic: Resource, + is_lite_consumer: bool, + ) -> Self { Self { rpc_client, consumer_group, topic, + is_lite_consumer, ack_entry_sender: None, shutdown_token: None, task_tracker: None, @@ -685,6 +933,7 @@ impl AckEntryProcessor { rpc_client: self.rpc_client.shadow_session(), consumer_group: self.consumer_group.clone(), topic: self.topic.clone(), + is_lite_consumer: self.is_lite_consumer, ack_entry_sender: None, shutdown_token: None, task_tracker: None, @@ -765,6 +1014,11 @@ impl AckEntryProcessor { delivery_attempt: i32, max_delivery_attempts: i32, ) -> Result<(), ClientError> { + // Directly get lite_topic from message if it exists + // This solves the duplicate consumption problem by ensuring ForwardToDeadLetterQueue requests + // always include lite_topic when the message has it, regardless of client type flag + let lite_topic = message.lite_topic().map(|s| s.to_string()); + let request = ForwardMessageToDeadLetterQueueRequest { group: Some(self.consumer_group.clone()), topic: Some(self.topic.clone()), @@ -772,6 +1026,7 @@ impl AckEntryProcessor { message_id: message.message_id().to_string(), delivery_attempt, max_delivery_attempts, + lite_topic, }; let response = self.rpc_client.forward_to_deadletter_queue(request).await?; handle_response_status(response.status, OPERATION_FORWARD_TO_DEADLETTER_QUEUE) @@ -782,6 +1037,11 @@ impl AckEntryProcessor { ack_entry: &MessageView, invisible_duration: Duration, ) -> Result<(), ClientError> { + // Directly get lite_topic from message if it exists + // This solves the duplicate consumption problem by ensuring ChangeInvisibleDuration requests + // always include lite_topic when the message has it, regardless of client type flag + let lite_topic = ack_entry.lite_topic().map(|s| s.to_string()); + let request = ChangeInvisibleDurationRequest { group: Some(self.consumer_group.clone()), topic: Some(self.topic.clone()), @@ -796,6 +1056,8 @@ impl AckEntryProcessor { ) }, )?), + lite_topic, + suspend: None, }; let response = self.rpc_client.change_invisible_duration(request).await?; handle_response_status(response.status, OPERATION_CHANGE_INVISIBLE_DURATION) @@ -893,12 +1155,18 @@ impl AckEntryProcessor { } async fn ack_message_inner(&mut self, ack_entry: &MessageView) -> Result<(), ClientError> { + // Directly get lite_topic from message if it exists + // This solves the duplicate consumption problem by ensuring ACK requests + // always include lite_topic when the message has it, regardless of client type flag + let lite_topic = ack_entry.lite_topic().map(|s| s.to_string()); + let request = AckMessageRequest { group: Some(self.consumer_group.clone()), topic: Some(self.topic.clone()), entries: vec![pb::AckMessageEntry { message_id: ack_entry.message_id().to_string(), receipt_handle: ack_entry.receipt_handle().to_string(), + lite_topic, }], }; let response = self.rpc_client.ack_message(request).await?; @@ -1368,7 +1636,7 @@ mod tests { name: "test_topic".to_string(), resource_namespace: "".to_string(), }; - let mut ack_processor = AckEntryProcessor::new(rpc_client, consumer_group, topic); + let mut ack_processor = AckEntryProcessor::new(rpc_client, consumer_group, topic, false); let result = ack_processor.start().await; assert!(result.is_ok()); @@ -1417,7 +1685,7 @@ mod tests { name: "test_topic".to_string(), resource_namespace: "".to_string(), }; - let mut ack_processor = AckEntryProcessor::new(rpc_client, consumer_group, topic); + let mut ack_processor = AckEntryProcessor::new(rpc_client, consumer_group, topic, false); let result = ack_processor.start().await; assert!(result.is_ok()); @@ -1463,7 +1731,7 @@ mod tests { name: "test_topic".to_string(), resource_namespace: "".to_string(), }; - let mut ack_processor = AckEntryProcessor::new(rpc_client, consumer_group, topic); + let mut ack_processor = AckEntryProcessor::new(rpc_client, consumer_group, topic, false); let result = ack_processor.start().await; assert!(result.is_ok()); @@ -1518,7 +1786,7 @@ mod tests { name: "test_topic".to_string(), resource_namespace: "".to_string(), }; - let mut ack_processor = AckEntryProcessor::new(rpc_client, consumer_group, topic); + let mut ack_processor = AckEntryProcessor::new(rpc_client, consumer_group, topic, false); let result = ack_processor.start().await; assert!(result.is_ok()); @@ -1564,7 +1832,7 @@ mod tests { name: "test_topic".to_string(), resource_namespace: "".to_string(), }; - let mut ack_processor = AckEntryProcessor::new(rpc_client, consumer_group, topic); + let mut ack_processor = AckEntryProcessor::new(rpc_client, consumer_group, topic, false); let result = ack_processor.start().await; assert!(result.is_ok()); @@ -1618,7 +1886,7 @@ mod tests { name: "test_topic".to_string(), resource_namespace: "".to_string(), }; - let mut ack_processor = AckEntryProcessor::new(rpc_client, consumer_group, topic); + let mut ack_processor = AckEntryProcessor::new(rpc_client, consumer_group, topic, false); let result = ack_processor.start().await; assert!(result.is_ok()); @@ -1679,7 +1947,7 @@ mod tests { entries: vec![], }) }); - let mut ack_processor = AckEntryProcessor::new(session, consumer_group, topic); + let mut ack_processor = AckEntryProcessor::new(session, consumer_group, topic, false); let is_start_ok = ack_processor.start().await; assert!(is_start_ok.is_ok()); let result = ConsumerWorker::Standard(consumer_worker) @@ -1738,7 +2006,7 @@ mod tests { receipt_handle: "".to_string(), }) }); - let mut ack_processor = AckEntryProcessor::new(session, consumer_group, topic); + let mut ack_processor = AckEntryProcessor::new(session, consumer_group, topic, false); let is_start_ok = ack_processor.start().await; assert!(is_start_ok.is_ok()); let result = ConsumerWorker::Standard(consumer_worker) @@ -1791,7 +2059,7 @@ mod tests { entries: vec![], }) }); - let mut ack_processor = AckEntryProcessor::new(session, consumer_group, topic); + let mut ack_processor = AckEntryProcessor::new(session, consumer_group, topic, false); let result = ConsumerWorker::Fifo(consumer_worker) .receive_messages( &new_message_queue(), @@ -1852,7 +2120,7 @@ mod tests { }), }) }); - let mut ack_processor = AckEntryProcessor::new(session, consumer_group, topic); + let mut ack_processor = AckEntryProcessor::new(session, consumer_group, topic, false); let result = ConsumerWorker::Fifo(consumer_worker) .receive_messages( &new_message_queue(), diff --git a/rust/src/session.rs b/rust/src/session.rs index f9a081548..55e9d4a15 100644 --- a/rust/src/session.rs +++ b/rust/src/session.rs @@ -36,8 +36,9 @@ use crate::pb::{ ForwardMessageToDeadLetterQueueRequest, ForwardMessageToDeadLetterQueueResponse, HeartbeatRequest, HeartbeatResponse, NotifyClientTerminationRequest, NotifyClientTerminationResponse, QueryAssignmentRequest, QueryAssignmentResponse, - QueryRouteRequest, QueryRouteResponse, ReceiveMessageRequest, ReceiveMessageResponse, - SendMessageRequest, SendMessageResponse, TelemetryCommand, + QueryRouteRequest, QueryRouteResponse, RecallMessageRequest, RecallMessageResponse, + ReceiveMessageRequest, ReceiveMessageResponse, SendMessageRequest, SendMessageResponse, + TelemetryCommand, }; use crate::util::{PROTOCOL_VERSION, SDK_LANGUAGE, SDK_VERSION}; use crate::{error::ClientError, pb::messaging_service_client::MessagingServiceClient}; @@ -49,6 +50,7 @@ const OPERATION_UPDATE_SETTINGS: &str = "session.update_settings"; const OPERATION_QUERY_ROUTE: &str = "rpc.query_route"; const OPERATION_HEARTBEAT: &str = "rpc.heartbeat"; const OPERATION_SEND_MESSAGE: &str = "rpc.send_message"; +const OPERATION_RECALL_MESSAGE: &str = "rpc.recall_message"; const OPERATION_RECEIVE_MESSAGE: &str = "rpc.receive_message"; const OPERATION_ACK_MESSAGE: &str = "rpc.ack_message"; const OPERATION_CHANGE_INVISIBLE_DURATION: &str = "rpc.change_invisible_duration"; @@ -72,6 +74,10 @@ pub(crate) trait RPCClient { &mut self, request: SendMessageRequest, ) -> Result; + async fn recall_message( + &mut self, + request: RecallMessageRequest, + ) -> Result; async fn receive_message( &mut self, request: ReceiveMessageRequest, @@ -100,6 +106,10 @@ pub(crate) trait RPCClient { &mut self, request: ForwardMessageToDeadLetterQueueRequest, ) -> Result; + async fn sync_lite_subscription( + &mut self, + request: crate::pb::SyncLiteSubscriptionRequest, + ) -> Result; } #[derive(Debug)] @@ -464,6 +474,22 @@ impl RPCClient for Session { Ok(response.into_inner()) } + async fn recall_message( + &mut self, + request: RecallMessageRequest, + ) -> Result { + let request = self.sign(request); + let response = self.stub.recall_message(request).await.map_err(|e| { + ClientError::new( + ErrorKind::ClientInternal, + "send rpc recall_message failed", + OPERATION_RECALL_MESSAGE, + ) + .set_source(e) + })?; + Ok(response.into_inner()) + } + async fn receive_message( &mut self, request: ReceiveMessageRequest, @@ -613,6 +639,26 @@ impl RPCClient for Session { })?; Ok(response.into_inner()) } + + async fn sync_lite_subscription( + &mut self, + request: crate::pb::SyncLiteSubscriptionRequest, + ) -> Result { + let request = self.sign(request); + let response = self + .stub + .sync_lite_subscription(request) + .await + .map_err(|e| { + ClientError::new( + ErrorKind::ClientInternal, + "send rpc sync_lite_subscription failed", + "sync_lite_subscription", + ) + .set_source(e) + })?; + Ok(response.into_inner()) + } } mock! { @@ -680,6 +726,14 @@ mock! { &mut self, request: ForwardMessageToDeadLetterQueueRequest, ) -> Result; + async fn recall_message( + &mut self, + request: RecallMessageRequest, + ) -> Result; + async fn sync_lite_subscription( + &mut self, + request: crate::pb::SyncLiteSubscriptionRequest, + ) -> Result; } } diff --git a/rust/src/util.rs b/rust/src/util.rs index e0312b13f..1355b11e7 100644 --- a/rust/src/util.rs +++ b/rust/src/util.rs @@ -142,6 +142,8 @@ pub(crate) fn build_simple_consumer_settings(option: &SimpleConsumerOption) -> T seconds: option.long_polling_timeout().as_secs() as i64, nanos: option.long_polling_timeout().subsec_nanos() as i32, }), + lite_subscription_quota: None, + max_lite_topic_size: None, })), user_agent: Some(Ua { language: SDK_LANGUAGE as i32, @@ -190,6 +192,8 @@ pub(crate) fn build_push_consumer_settings(option: &PushConsumerOption) -> Telem seconds: option.long_polling_timeout().as_secs() as i64, nanos: option.long_polling_timeout().subsec_nanos() as i32, }), + lite_subscription_quota: None, + max_lite_topic_size: None, })), user_agent: Some(Ua { language: SDK_LANGUAGE as i32, @@ -284,7 +288,9 @@ pub fn handle_receive_message_status( | Code::UnrecognizedClientType | Code::MessageCorrupted | Code::ClientIdRequired - | Code::IllegalPollingTime => { + | Code::IllegalPollingTime + | Code::IllegalOffset + | Code::IllegalLiteTopic => { Err( ClientError::new(ErrorKind::Config, "bad request", operation) .with_context("code", format!("{}", status.code)) @@ -306,19 +312,20 @@ pub fn handle_receive_message_status( Code::Forbidden => Err(ClientError::new(ErrorKind::Server, "forbidden", operation) .with_context("code", format!("{}", status.code)) .with_context("message", status.message.clone())), - Code::NotFound | Code::TopicNotFound | Code::ConsumerGroupNotFound => { - Err(ClientError::new(ErrorKind::Server, "not found", operation) + Code::NotFound + | Code::TopicNotFound + | Code::ConsumerGroupNotFound + | Code::OffsetNotFound => Err(ClientError::new(ErrorKind::Server, "not found", operation) + .with_context("code", format!("{}", status.code)) + .with_context("message", status.message.clone())), + Code::PayloadTooLarge | Code::MessageBodyTooLarge | Code::MessageBodyEmpty => Err( + ClientError::new(ErrorKind::Server, "payload too large", operation) .with_context("code", format!("{}", status.code)) - .with_context("message", status.message.clone())) - } - Code::PayloadTooLarge | Code::MessageBodyTooLarge => { - Err( - ClientError::new(ErrorKind::Server, "payload too large", operation) - .with_context("code", format!("{}", status.code)) - .with_context("message", status.message.clone()), - ) - } - Code::TooManyRequests => { + .with_context("message", status.message.clone()), + ), + Code::TooManyRequests + | Code::LiteTopicQuotaExceeded + | Code::LiteSubscriptionQuotaExceeded => { Err( ClientError::new(ErrorKind::Server, "too many requests", operation) .with_context("code", format!("{}", status.code)) diff --git a/rust/tests/lite_integration_test.rs b/rust/tests/lite_integration_test.rs new file mode 100644 index 000000000..3d379510f --- /dev/null +++ b/rust/tests/lite_integration_test.rs @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//! Integration tests for Lite functionality. +//! +//! These tests verify that Lite modules compile and can be used correctly. +//! Note: Integration tests don't trigger automock, so they use real Client types. + +#[cfg(test)] +mod lite_tests { + use rocketmq::conf::{ClientOption, PushConsumerOption}; + use rocketmq::model::message::{Message, MessageBuilder}; + use rocketmq::model::offset_option::{OffsetOption, OffsetPolicy}; + use rocketmq::{LitePushConsumer, LitePushConsumerTrait}; + + #[test] + fn test_lite_message_builder() { + // Test that lite message builder works + let message = + MessageBuilder::lite_message_builder("parent_topic", vec![1, 2, 3], "lite_topic_001") + .build(); + + assert!(message.is_ok()); + let mut msg = message.unwrap(); + assert_eq!(msg.take_lite_topic(), Some("lite_topic_001".to_string())); + } + + #[test] + fn test_offset_option_creation() { + // Test OffsetOption creation + let _opt1 = OffsetOption::from_policy(OffsetPolicy::Last); + let _opt2 = OffsetOption::from_offset(12345); + let _opt3 = OffsetOption::from_tail_n(100); + let _opt4 = OffsetOption::from_timestamp(1234567890000); + } + + #[test] + fn test_lite_push_consumer_compilation() { + // This test verifies that LitePushConsumer can be instantiated + // (without actually connecting to a server) + + let mut client_option = ClientOption::default(); + client_option.set_access_url("http://localhost:8080"); + + let mut option = PushConsumerOption::default(); + option.set_consumer_group("test_group"); + + let message_listener = + Box::new(|_: &rocketmq::model::message::MessageView| rocketmq::ConsumeResult::SUCCESS); + + // This should compile without errors + let result = LitePushConsumer::new( + client_option, + option, + "parent_topic".to_string(), + message_listener, + ); + + // We expect an error because we're not connecting to a real server, + // but the important thing is that it compiles + assert!(result.is_err() || result.is_ok()); + } +}