1818 */
1919package org .apache .pulsar .broker .admin ;
2020
21+ import static org .mockito .Mockito .doReturn ;
2122import io .jsonwebtoken .Jwts ;
23+ import java .util .UUID ;
24+ import java .util .concurrent .atomic .AtomicBoolean ;
25+ import java .util .function .Consumer ;
2226import org .apache .commons .lang3 .reflect .FieldUtils ;
27+ import org .apache .pulsar .broker .BrokerTestUtil ;
2328import org .apache .pulsar .broker .authorization .AuthorizationService ;
2429import org .apache .pulsar .client .admin .PulsarAdmin ;
2530import org .apache .pulsar .common .policies .data .NamespaceOperation ;
2631import org .apache .pulsar .common .policies .data .TopicOperation ;
2732import org .apache .pulsar .security .MockedPulsarStandalone ;
2833import org .mockito .Mockito ;
34+ import org .mockito .invocation .InvocationOnMock ;
2935import org .testng .Assert ;
3036import org .testng .annotations .AfterMethod ;
3137import org .testng .annotations .BeforeMethod ;
32- import java .util .UUID ;
33- import java .util .concurrent .atomic .AtomicBoolean ;
34- import static org .mockito .Mockito .doReturn ;
3538
36- public class AuthZTest extends MockedPulsarStandalone {
39+ public abstract class AuthZTest extends MockedPulsarStandalone {
3740
3841 protected PulsarAdmin superUserAdmin ;
3942
@@ -47,6 +50,9 @@ public class AuthZTest extends MockedPulsarStandalone {
4750 protected static final String TENANT_ADMIN_TOKEN = Jwts .builder ()
4851 .claim ("sub" , TENANT_ADMIN_SUBJECT ).signWith (SECRET_KEY ).compact ();
4952
53+ private volatile Consumer <InvocationOnMock > allowTopicOperationAsyncHandler ;
54+ private volatile Consumer <InvocationOnMock > allowNamespaceOperationAsyncHandler ;
55+
5056 @ Override
5157 public void close () throws Exception {
5258 if (superUserAdmin != null ) {
@@ -65,48 +71,62 @@ public void close() throws Exception {
6571 @ BeforeMethod (alwaysRun = true )
6672 public void before () throws IllegalAccessException {
6773 orignalAuthorizationService = getPulsarService ().getBrokerService ().getAuthorizationService ();
68- authorizationService = Mockito . spy (orignalAuthorizationService );
74+ authorizationService = BrokerTestUtil . spyWithoutRecordingInvocations (orignalAuthorizationService );
6975 FieldUtils .writeField (getPulsarService ().getBrokerService (), "authorizationService" ,
7076 authorizationService , true );
77+ Mockito .doAnswer (invocationOnMock -> {
78+ Consumer <InvocationOnMock > localAllowTopicOperationAsyncHandler =
79+ allowTopicOperationAsyncHandler ;
80+ if (localAllowTopicOperationAsyncHandler != null ) {
81+ localAllowTopicOperationAsyncHandler .accept (invocationOnMock );
82+ }
83+ return invocationOnMock .callRealMethod ();
84+ }).when (authorizationService ).allowTopicOperationAsync (Mockito .any (), Mockito .any (), Mockito .any (),
85+ Mockito .any (), Mockito .any ());
86+ doReturn (true )
87+ .when (authorizationService ).isValidOriginalPrincipal (Mockito .any (), Mockito .any (), Mockito .any ());
88+ Mockito .doAnswer (invocationOnMock -> {
89+ Consumer <InvocationOnMock > localAllowNamespaceOperationAsyncHandler =
90+ allowNamespaceOperationAsyncHandler ;
91+ if (localAllowNamespaceOperationAsyncHandler != null ) {
92+ localAllowNamespaceOperationAsyncHandler .accept (invocationOnMock );
93+ }
94+ return invocationOnMock .callRealMethod ();
95+ }).when (authorizationService ).allowNamespaceOperationAsync (Mockito .any (), Mockito .any (), Mockito .any (),
96+ Mockito .any (), Mockito .any ());
7197 }
7298
7399 @ AfterMethod (alwaysRun = true )
74100 public void after () throws IllegalAccessException {
75101 FieldUtils .writeField (getPulsarService ().getBrokerService (), "authorizationService" ,
76102 orignalAuthorizationService , true );
103+ allowNamespaceOperationAsyncHandler = null ;
104+ allowTopicOperationAsyncHandler = null ;
77105 }
78106
79107 protected AtomicBoolean setAuthorizationTopicOperationChecker (String role , Object operation ) {
80108 AtomicBoolean execFlag = new AtomicBoolean (false );
81109 if (operation instanceof TopicOperation ) {
82- Mockito . doAnswer ( invocationOnMock -> {
110+ allowTopicOperationAsyncHandler = invocationOnMock -> {
83111 String role_ = invocationOnMock .getArgument (2 );
84112 if (role .equals (role_ )) {
85113 TopicOperation operation_ = invocationOnMock .getArgument (1 );
86114 Assert .assertEquals (operation_ , operation );
87115 }
88116 execFlag .set (true );
89- return invocationOnMock .callRealMethod ();
90- }).when (authorizationService ).allowTopicOperationAsync (Mockito .any (), Mockito .any (), Mockito .any (),
91- Mockito .any (), Mockito .any ());
117+ };
92118 } else if (operation instanceof NamespaceOperation ) {
93- doReturn (true )
94- .when (authorizationService ).isValidOriginalPrincipal (Mockito .any (), Mockito .any (), Mockito .any ());
95- Mockito .doAnswer (invocationOnMock -> {
119+ allowNamespaceOperationAsyncHandler = invocationOnMock -> {
96120 String role_ = invocationOnMock .getArgument (2 );
97121 if (role .equals (role_ )) {
98122 TopicOperation operation_ = invocationOnMock .getArgument (1 );
99123 Assert .assertEquals (operation_ , operation );
100124 }
101125 execFlag .set (true );
102- return invocationOnMock .callRealMethod ();
103- }).when (authorizationService ).allowNamespaceOperationAsync (Mockito .any (), Mockito .any (), Mockito .any (),
104- Mockito .any (), Mockito .any ());
126+ };
105127 } else {
106128 throw new IllegalArgumentException ("" );
107129 }
108-
109-
110130 return execFlag ;
111131 }
112132
0 commit comments