@@ -790,30 +790,56 @@ protected void handleCommandScalableTopicLookup(
790790 return ;
791791 }
792792
793- // Create a DagWatchSession that will send the initial layout and watch for changes
794- var session = new org .apache .pulsar .broker .service .scalable .DagWatchSession (
795- sessionId , topicName , this , resources , service );
796- dagWatchSessions .put (sessionId , session );
797-
798- session .start ()
799- .thenAcceptAsync (session ::pushUpdate , ctx .executor ())
800- .exceptionally (ex -> {
801- Throwable cause = ex .getCause () != null ? ex .getCause () : ex ;
802- log .warn ().attr ("topic" , topicName ).exception (cause )
803- .log ("ScalableTopicLookup failed" );
804- dagWatchSessions .remove (sessionId );
805- session .close ();
806- ctx .executor ().execute (() ->
793+ isTopicOperationAllowed (topicName , TopicOperation .LOOKUP , authenticationData , originalAuthData )
794+ .thenAccept (isAuthorized -> {
795+ if (!isAuthorized ) {
796+ final String msg = "Client is not authorized to ScalableTopicLookup" ;
797+ log .warn ()
798+ .attr ("principal" , getPrincipal ())
799+ .attr ("topic" , topicName )
800+ .log (msg );
807801 ctx .writeAndFlush (Commands .newScalableTopicError (sessionId ,
808- ServerError .TopicNotFound , cause .getMessage ()))
809- );
802+ ServerError .AuthorizationError , msg ));
803+ return ;
804+ }
805+ // Create a DagWatchSession that will send the initial layout and watch for changes
806+ var session = new org .apache .pulsar .broker .service .scalable .DagWatchSession (
807+ sessionId , topicName , this , resources , service );
808+ dagWatchSessions .put (sessionId , session );
809+
810+ session .start ()
811+ .thenAcceptAsync (session ::pushUpdate , ctx .executor ())
812+ .exceptionally (ex -> {
813+ Throwable cause = ex .getCause () != null ? ex .getCause () : ex ;
814+ log .warn ().attr ("topic" , topicName ).exception (cause )
815+ .log ("ScalableTopicLookup failed" );
816+ dagWatchSessions .remove (sessionId );
817+ session .close ();
818+ ctx .executor ().execute (() ->
819+ ctx .writeAndFlush (Commands .newScalableTopicError (sessionId ,
820+ ServerError .TopicNotFound , cause .getMessage ()))
821+ );
822+ return null ;
823+ });
824+ })
825+ .exceptionally (ex -> {
826+ logAuthException (remoteAddress , "scalable-topic-lookup" , getPrincipal (),
827+ Optional .of (topicName ), ex );
828+ ctx .writeAndFlush (Commands .newScalableTopicError (sessionId ,
829+ ServerError .AuthorizationError ,
830+ "Exception occurred while trying to authorize ScalableTopicLookup" ));
810831 return null ;
811832 });
812833 }
813834
814835 @ Override
815836 protected void handleCommandScalableTopicClose (
816837 CommandScalableTopicClose commandScalableTopicClose ) {
838+ // No per-call authorization: the session is keyed in this connection's
839+ // dagWatchSessions map (per-ServerCnx), authentication is enforced at connect,
840+ // and the originating ScalableTopicLookup was authorized when the session was
841+ // created. A close for an unknown sessionId is an idempotent no-op. Same
842+ // pattern as handleCloseProducer / handleCloseConsumer.
817843 checkArgument (state == State .Connected );
818844
819845 final long sessionId = commandScalableTopicClose .getSessionId ();
@@ -880,23 +906,46 @@ protected void handleCommandScalableTopicSubscribe(
880906 return ;
881907 }
882908
883- scalableTopicService .registerConsumer (topicName , subscription , consumerName , consumerId , this )
884- .whenCompleteAsync ((assignment , ex ) -> {
885- if (ex != null ) {
886- Throwable cause = ex .getCause () != null ? ex .getCause () : ex ;
887- log .warn ().attr ("topic" , topicName ).attr ("subscription" , subscription )
888- .attr ("consumerName" , consumerName ).exception (cause )
889- .log ("ScalableTopicSubscribe failed" );
909+ isTopicOperationAllowed (topicName , subscription , TopicOperation .CONSUME )
910+ .thenAccept (isAuthorized -> {
911+ if (!isAuthorized ) {
912+ final String msg = "Client is not authorized to ScalableTopicSubscribe" ;
913+ log .warn ()
914+ .attr ("principal" , getPrincipal ())
915+ .attr ("topic" , topicName )
916+ .attr ("subscription" , subscription )
917+ .log (msg );
890918 getCommandSender ().sendScalableTopicSubscribeError (requestId ,
891- ServerError .UnknownError , cause . getMessage () );
919+ ServerError .AuthorizationError , msg );
892920 return ;
893921 }
894- // Record the registration so we can call onConsumerDisconnect on channelInactive.
895- scalableConsumerRegistrations .put (consumerId ,
896- new ScalableConsumerRegistrationRef (topicName , subscription , consumerName ));
897- getCommandSender ().sendScalableTopicSubscribeResponse (requestId ,
898- org .apache .pulsar .broker .service .scalable .ConsumerSession .toProto (assignment ));
899- }, ctx .executor ());
922+ scalableTopicService .registerConsumer (topicName , subscription , consumerName ,
923+ consumerId , this )
924+ .whenCompleteAsync ((assignment , ex ) -> {
925+ if (ex != null ) {
926+ Throwable cause = ex .getCause () != null ? ex .getCause () : ex ;
927+ log .warn ().attr ("topic" , topicName ).attr ("subscription" , subscription )
928+ .attr ("consumerName" , consumerName ).exception (cause )
929+ .log ("ScalableTopicSubscribe failed" );
930+ getCommandSender ().sendScalableTopicSubscribeError (requestId ,
931+ ServerError .UnknownError , cause .getMessage ());
932+ return ;
933+ }
934+ // Record the registration so we can call onConsumerDisconnect on channelInactive.
935+ scalableConsumerRegistrations .put (consumerId ,
936+ new ScalableConsumerRegistrationRef (topicName , subscription , consumerName ));
937+ getCommandSender ().sendScalableTopicSubscribeResponse (requestId ,
938+ org .apache .pulsar .broker .service .scalable .ConsumerSession .toProto (assignment ));
939+ }, ctx .executor ());
940+ })
941+ .exceptionally (ex -> {
942+ logAuthException (remoteAddress , "scalable-topic-subscribe" , getPrincipal (),
943+ Optional .of (topicName ), ex );
944+ getCommandSender ().sendScalableTopicSubscribeError (requestId ,
945+ ServerError .AuthorizationError ,
946+ "Exception occurred while trying to authorize ScalableTopicSubscribe" );
947+ return null ;
948+ });
900949 }
901950
902951 @ Override
0 commit comments