Skip to content

Consumer seek does not work on Exclusive consumers #413

@mikeam565

Description

@mikeam565

let inner_consumer: InnerConsumer<T, Exe> = match &mut self.inner {
InnerConsumer::Single(c) => {
c.seek(message_id, timestamp).await?;
let topic = c.topic().to_string();
let addr = client.lookup_topic(&topic).await?;
let config = c.config().clone();
InnerConsumer::Single(TopicConsumer::new(client, topic, addr, config).await?)
}
InnerConsumer::Multi(c) => {
c.seek(consumer_ids, message_id, timestamp).await?;
let topics = c.topics();
let config = c.config().clone();
//currently, pulsar only supports seek for non partitioned topics
let addrs =
try_join_all(topics.into_iter().map(|topic| client.lookup_topic(topic)))
.await?;
let topic_addr_pair = c.topics.iter().cloned().zip(addrs.iter().cloned());
let consumers = try_join_all(topic_addr_pair.map(|(topic, addr)| {
TopicConsumer::new(client.clone(), topic, addr, config.clone())
}))
.await?;
let consumers: BTreeMap<_, _> = consumers
.into_iter()
.map(|c| (c.topic(), Box::pin(c)))
.collect();
let topics: VecDeque<String> = consumers.keys().cloned().collect();
let existing_topics = topics.clone();
let topic_refresh = Duration::from_secs(30);
let refresh = Box::pin(client.executor.interval(topic_refresh).map(drop));
let namespace = c.namespace.clone();
let config = c.config().clone();
let topic_regex = c.topic_regex.clone();
InnerConsumer::Multi(MultiTopicConsumer {
namespace,
topic_regex,
pulsar: client,
consumers,
topics,
existing_topics,
new_consumers: None,
refresh,
config,
disc_last_message_received: None,
disc_messages_received: 0,
})
}
};
self.inner = inner_consumer;
Ok(())

Using seek with an Exclusive consumer will throw answered ConsumerBusy ("Exclusive consumer is already connected") errors. This is intuitive because when calling .seek(), new consumers are created and registered with the broker before the old ones are dropped (line 244).

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions