@@ -241,37 +241,6 @@ void ClientImpl::createProducerAsyncV2(const std::string& topic, const ProducerC
241241 }
242242}
243243
244- void ClientImpl::handleCreateProducer (Result result, const LookupDataResultPtr& partitionMetadata,
245- const TopicNamePtr& topicName, const ProducerConfiguration& conf,
246- const CreateProducerCallback& callback) {
247- if (!result) {
248- ProducerImplBasePtr producer;
249-
250- auto interceptors = std::make_shared<ProducerInterceptors>(conf.getInterceptors ());
251-
252- try {
253- if (partitionMetadata->getPartitions () > 0 ) {
254- producer = std::make_shared<PartitionedProducerImpl>(
255- shared_from_this (), topicName, partitionMetadata->getPartitions (), conf, interceptors);
256- } else {
257- producer = std::make_shared<ProducerImpl>(shared_from_this (), *topicName, conf, interceptors);
258- }
259- } catch (const std::runtime_error& e) {
260- LOG_ERROR (" Failed to create producer: " << e.what ());
261- callback (ResultConnectError, {});
262- return ;
263- }
264- producer->getProducerCreatedFuture ().addListener (
265- std::bind (&ClientImpl::handleProducerCreated, shared_from_this (), std::placeholders::_1,
266- std::placeholders::_2, callback, producer));
267- producer->start ();
268- } else {
269- LOG_ERROR (" Error Checking/Getting Partition Metadata while creating producer on "
270- << topicName->toString () << " -- " << result);
271- callback (result, Producer ());
272- }
273- }
274-
275244void ClientImpl::handleCreateProducerV2 (Result result, const LookupDataResultPtr& partitionMetadata,
276245 const TopicNamePtr& topicName, const ProducerConfiguration& conf,
277246 const CreateProducerCallbackV2& callback) {
@@ -401,44 +370,6 @@ void ClientImpl::createTableViewAsync(const std::string& topic, const TableViewC
401370 });
402371}
403372
404- void ClientImpl::handleReaderMetadataLookup (Result result, const LookupDataResultPtr& partitionMetadata,
405- const TopicNamePtr& topicName, const MessageId& startMessageId,
406- const ReaderConfiguration& conf, const ReaderCallback& callback) {
407- if (result != ResultOk) {
408- LOG_ERROR (" Error Checking/Getting Partition Metadata while creating readeron "
409- << topicName->toString () << " -- " << result);
410- callback (result, Reader ());
411- return ;
412- }
413-
414- ReaderImplPtr reader;
415- try {
416- reader.reset (new ReaderImpl (shared_from_this (), topicName->toString (),
417- partitionMetadata->getPartitions (), conf,
418- getListenerExecutorProvider ()->get (), callback));
419- } catch (const std::runtime_error& e) {
420- LOG_ERROR (" Failed to create reader: " << e.what ());
421- callback (ResultConnectError, {});
422- return ;
423- }
424- ConsumerImplBasePtr consumer = reader->getConsumer ();
425- auto self = shared_from_this ();
426- reader->start (startMessageId, [this , self](const ConsumerImplBaseWeakPtr& weakConsumerPtr) {
427- auto consumer = weakConsumerPtr.lock ();
428- if (consumer) {
429- auto address = consumer.get ();
430- auto existingConsumer = consumers_.putIfAbsent (address, consumer);
431- if (existingConsumer) {
432- consumer = existingConsumer.value ().lock ();
433- LOG_ERROR (" Unexpected existing consumer at the same address: "
434- << address << " , consumer: " << (consumer ? consumer->getName () : " (null)" ));
435- }
436- } else {
437- LOG_ERROR (" Unexpected case: the consumer is somehow expired" );
438- }
439- });
440- }
441-
442373void ClientImpl::handleReaderMetadataLookupV2 (Result result, const LookupDataResultPtr& partitionMetadata,
443374 const TopicNamePtr& topicName, const MessageId& startMessageId,
444375 const ReaderConfiguration& conf,
@@ -661,50 +592,6 @@ void ClientImpl::subscribeAsyncV2(const std::string& topic, const std::string& s
661592 std::placeholders::_2, topicName, subscriptionName, conf, callback));
662593}
663594
664- void ClientImpl::handleSubscribe (Result result, const LookupDataResultPtr& partitionMetadata,
665- const TopicNamePtr& topicName, const std::string& subscriptionName,
666- ConsumerConfiguration conf, const SubscribeCallback& callback) {
667- if (result == ResultOk) {
668- // generate random name if not supplied by the customer.
669- if (conf.getConsumerName ().empty ()) {
670- conf.setConsumerName (generateRandomName ());
671- }
672- ConsumerImplBasePtr consumer;
673- auto interceptors = std::make_shared<ConsumerInterceptors>(conf.getInterceptors ());
674-
675- try {
676- if (partitionMetadata->getPartitions () > 0 ) {
677- if (conf.getReceiverQueueSize () == 0 ) {
678- LOG_ERROR (" Can't use partitioned topic if the queue size is 0." );
679- callback (ResultInvalidConfiguration, Consumer ());
680- return ;
681- }
682- consumer = std::make_shared<MultiTopicsConsumerImpl>(shared_from_this (), topicName,
683- partitionMetadata->getPartitions (),
684- subscriptionName, conf, interceptors);
685- } else {
686- auto consumerImpl = std::make_shared<ConsumerImpl>(shared_from_this (), topicName->toString (),
687- subscriptionName, conf,
688- topicName->isPersistent (), interceptors);
689- consumerImpl->setPartitionIndex (topicName->getPartitionIndex ());
690- consumer = consumerImpl;
691- }
692- } catch (const std::runtime_error& e) {
693- LOG_ERROR (" Failed to create consumer: " << e.what ());
694- callback (ResultConnectError, {});
695- return ;
696- }
697- consumer->getConsumerCreatedFuture ().addListener (
698- std::bind (&ClientImpl::handleConsumerCreated, shared_from_this (), std::placeholders::_1,
699- std::placeholders::_2, callback, consumer));
700- consumer->start ();
701- } else {
702- LOG_ERROR (" Error Checking/Getting Partition Metadata while Subscribing on " << topicName->toString ()
703- << " -- " << result);
704- callback (result, Consumer ());
705- }
706- }
707-
708595void ClientImpl::handleSubscribeV2 (Result result, const LookupDataResultPtr& partitionMetadata,
709596 const TopicNamePtr& topicName, const std::string& subscriptionName,
710597 ConsumerConfiguration conf, const SubscribeCallbackV2& callback) {
0 commit comments