|
let inner_consumer: InnerConsumer<T, Exe> = match &mut self.inner { |
|
InnerConsumer::Single(c) => { |
|
c.seek(message_id, timestamp).await?; |
|
let topic = c.topic().to_string(); |
|
let addr = client.lookup_topic(&topic).await?; |
|
let config = c.config().clone(); |
|
InnerConsumer::Single(TopicConsumer::new(client, topic, addr, config).await?) |
|
} |
|
InnerConsumer::Multi(c) => { |
|
c.seek(consumer_ids, message_id, timestamp).await?; |
|
let topics = c.topics(); |
|
let config = c.config().clone(); |
|
|
|
//currently, pulsar only supports seek for non partitioned topics |
|
let addrs = |
|
try_join_all(topics.into_iter().map(|topic| client.lookup_topic(topic))) |
|
.await?; |
|
|
|
let topic_addr_pair = c.topics.iter().cloned().zip(addrs.iter().cloned()); |
|
|
|
let consumers = try_join_all(topic_addr_pair.map(|(topic, addr)| { |
|
TopicConsumer::new(client.clone(), topic, addr, config.clone()) |
|
})) |
|
.await?; |
|
|
|
let consumers: BTreeMap<_, _> = consumers |
|
.into_iter() |
|
.map(|c| (c.topic(), Box::pin(c))) |
|
.collect(); |
|
let topics: VecDeque<String> = consumers.keys().cloned().collect(); |
|
let existing_topics = topics.clone(); |
|
let topic_refresh = Duration::from_secs(30); |
|
let refresh = Box::pin(client.executor.interval(topic_refresh).map(drop)); |
|
let namespace = c.namespace.clone(); |
|
let config = c.config().clone(); |
|
let topic_regex = c.topic_regex.clone(); |
|
InnerConsumer::Multi(MultiTopicConsumer { |
|
namespace, |
|
topic_regex, |
|
pulsar: client, |
|
consumers, |
|
topics, |
|
existing_topics, |
|
new_consumers: None, |
|
refresh, |
|
config, |
|
disc_last_message_received: None, |
|
disc_messages_received: 0, |
|
}) |
|
} |
|
}; |
|
|
|
self.inner = inner_consumer; |
|
Ok(()) |
pulsar-rs/src/consumer/mod.rs
Lines 192 to 245 in cf67345
Using seek with an Exclusive consumer will throw
answered ConsumerBusy ("Exclusive consumer is already connected")errors. This is intuitive because when calling.seek(), new consumers are created and registered with the broker before the old ones are dropped (line 244).