@@ -239,6 +239,18 @@ public ServerCnx createServerCnxSpy() {
239239 getPulsarService ());
240240 }
241241
242+ private enum WithMockZooKeeperOrTestZKServer {
243+ MOCKZOOKEEPER , MOCKZOOKEEPER_SEPARATE_GLOBAL , TEST_ZK_SERVER , TEST_ZK_SERVER_SEPARATE_GLOBAL ;
244+
245+ boolean isMockZooKeeper () {
246+ return this == MOCKZOOKEEPER || this == MOCKZOOKEEPER_SEPARATE_GLOBAL ;
247+ }
248+
249+ boolean isTestZKServer () {
250+ return this == TEST_ZK_SERVER || this == TEST_ZK_SERVER_SEPARATE_GLOBAL ;
251+ }
252+ }
253+
242254 /**
243255 * A builder for a PulsarTestContext.
244256 *
@@ -255,6 +267,7 @@ public static class Builder {
255267 protected boolean configOverrideCalled = false ;
256268 protected Function <BrokerService , BrokerService > brokerServiceCustomizer = Function .identity ();
257269 protected PulsarTestContext otherContextToClose ;
270+ protected WithMockZooKeeperOrTestZKServer withMockZooKeeperOrTestZKServer ;
258271
259272 /**
260273 * Initialize the ServiceConfiguration with default values.
@@ -411,11 +424,13 @@ public Builder pulsarServiceCustomizer(
411424 public Builder reuseMockBookkeeperAndMetadataStores (PulsarTestContext otherContext ) {
412425 bookKeeperClient (otherContext .getBookKeeperClient ());
413426 if (otherContext .getMockZooKeeper () != null ) {
427+ withMockZooKeeperOrTestZKServer = null ;
414428 mockZooKeeper (otherContext .getMockZooKeeper ());
415429 if (otherContext .getMockZooKeeperGlobal () != null ) {
416430 mockZooKeeperGlobal (otherContext .getMockZooKeeperGlobal ());
417431 }
418432 } else if (otherContext .getTestZKServer () != null ) {
433+ withMockZooKeeperOrTestZKServer = null ;
419434 testZKServer (otherContext .getTestZKServer ());
420435 if (otherContext .getTestZKServerGlobal () != null ) {
421436 testZKServerGlobal (otherContext .getTestZKServerGlobal ());
@@ -475,31 +490,14 @@ public Builder withMockZookeeper() {
475490 * @return the builder
476491 */
477492 public Builder withMockZookeeper (boolean useSeparateGlobalZk ) {
478- try {
479- mockZooKeeper (createMockZooKeeper ());
480- if (useSeparateGlobalZk ) {
481- mockZooKeeperGlobal (createMockZooKeeper ());
482- }
483- } catch (Exception e ) {
484- throw new RuntimeException (e );
493+ if (useSeparateGlobalZk ) {
494+ withMockZooKeeperOrTestZKServer = WithMockZooKeeperOrTestZKServer .MOCKZOOKEEPER_SEPARATE_GLOBAL ;
495+ } else {
496+ withMockZooKeeperOrTestZKServer = WithMockZooKeeperOrTestZKServer .MOCKZOOKEEPER ;
485497 }
486498 return this ;
487499 }
488500
489- private MockZooKeeper createMockZooKeeper () throws Exception {
490- MockZooKeeper zk = MockZooKeeper .newInstance ();
491- initializeZookeeper (zk );
492- registerCloseable (zk ::shutdown );
493- return zk ;
494- }
495-
496- private static void initializeZookeeper (ZooKeeper zk ) throws KeeperException , InterruptedException {
497- ZkUtils .createFullPathOptimistic (zk , "/ledgers/available/192.168.1.1:" + 5000 ,
498- "" .getBytes (StandardCharsets .UTF_8 ), ZooDefs .Ids .OPEN_ACL_UNSAFE , CreateMode .PERSISTENT );
499-
500- zk .create ("/ledgers/LAYOUT" , "1\n flat:1" .getBytes (StandardCharsets .UTF_8 ), ZooDefs .Ids .OPEN_ACL_UNSAFE ,
501- CreateMode .PERSISTENT );
502- }
503501
504502 /**
505503 * Configure this PulsarTestContext to use a test ZooKeeper instance which is
@@ -518,27 +516,14 @@ public Builder withTestZookeeper() {
518516 * @return the builder
519517 */
520518 public Builder withTestZookeeper (boolean useSeparateGlobalZk ) {
521- try {
522- testZKServer (createTestZookeeper ());
523- if (useSeparateGlobalZk ) {
524- testZKServerGlobal (createTestZookeeper ());
525- }
526- } catch (Exception e ) {
527- throw new RuntimeException (e );
519+ if (useSeparateGlobalZk ) {
520+ withMockZooKeeperOrTestZKServer = WithMockZooKeeperOrTestZKServer .TEST_ZK_SERVER_SEPARATE_GLOBAL ;
521+ } else {
522+ withMockZooKeeperOrTestZKServer = WithMockZooKeeperOrTestZKServer .TEST_ZK_SERVER ;
528523 }
529524 return this ;
530525 }
531526
532- private TestZKServer createTestZookeeper () throws Exception {
533- TestZKServer testZKServer = new TestZKServer ();
534- try (ZooKeeper zkc = new ZooKeeper (testZKServer .getConnectionString (), 5000 , event -> {
535- })) {
536- initializeZookeeper (zkc );
537- }
538- registerCloseable (testZKServer );
539- return testZKServer ;
540- }
541-
542527 /**
543528 * Applicable only when PulsarTestContext is not startable. This will configure mocks
544529 * for PulsarTestResources and related classes.
@@ -626,6 +611,7 @@ public final PulsarTestContext build() {
626611 if (configOverrideCustomizer != null ) {
627612 configOverrideCustomizer .accept (super .config );
628613 }
614+ createWithMockZooKeeperOrTestZKServerInstances ();
629615 if (super .managedLedgerStorage != null && !MockUtil .isMock (super .managedLedgerStorage )) {
630616 super .managedLedgerStorage = spyConfig .getManagedLedgerStorage ().spy (super .managedLedgerStorage );
631617 }
@@ -650,6 +636,73 @@ public final PulsarTestContext build() {
650636 return super .build ();
651637 }
652638
639+ void createWithMockZooKeeperOrTestZKServerInstances () {
640+ if (withMockZooKeeperOrTestZKServer == null ) {
641+ return ;
642+ }
643+ int sessionTimeout = (int ) super .config .getMetadataStoreSessionTimeoutMillis ();
644+ try {
645+ if (withMockZooKeeperOrTestZKServer .isMockZooKeeper ()) {
646+ if (super .mockZooKeeper == null ) {
647+ mockZooKeeper (createMockZooKeeper (sessionTimeout ));
648+ } else {
649+ log .warn ("Skipping creating mockZooKeeper, already set" );
650+ }
651+ if (withMockZooKeeperOrTestZKServer
652+ == WithMockZooKeeperOrTestZKServer .MOCKZOOKEEPER_SEPARATE_GLOBAL ) {
653+ if (super .mockZooKeeperGlobal == null ) {
654+ mockZooKeeperGlobal (createMockZooKeeper (sessionTimeout ));
655+ } else {
656+ log .warn ("Skipping creating mockZooKeeperGlobal, already set" );
657+ }
658+ }
659+ } else if (withMockZooKeeperOrTestZKServer .isTestZKServer ()) {
660+ if (super .testZKServer == null ) {
661+ testZKServer (createTestZookeeper (sessionTimeout ));
662+ } else {
663+ log .warn ("Skipping creating testZKServer, already set" );
664+ }
665+ if (withMockZooKeeperOrTestZKServer
666+ == WithMockZooKeeperOrTestZKServer .TEST_ZK_SERVER_SEPARATE_GLOBAL ) {
667+ if (super .testZKServerGlobal == null ) {
668+ testZKServerGlobal (createTestZookeeper (sessionTimeout ));
669+ } else {
670+ log .warn ("Skipping creating testZKServerGlobal, already set" );
671+ }
672+ }
673+ }
674+ } catch (Exception e ) {
675+ throw new RuntimeException (e );
676+ }
677+ }
678+
679+ private MockZooKeeper createMockZooKeeper (int sessionTimeout ) throws Exception {
680+ MockZooKeeper zk = MockZooKeeper .newInstance ();
681+ zk .setSessionTimeout (sessionTimeout );
682+ initializeZookeeper (zk );
683+ registerCloseable (zk ::shutdown );
684+ return zk ;
685+ }
686+
687+ // this might not be required at all, but it's kept here as an example
688+ private static void initializeZookeeper (ZooKeeper zk ) throws KeeperException , InterruptedException {
689+ ZkUtils .createFullPathOptimistic (zk , "/ledgers/available/192.168.1.1:" + 5000 ,
690+ "" .getBytes (StandardCharsets .UTF_8 ), ZooDefs .Ids .OPEN_ACL_UNSAFE , CreateMode .PERSISTENT );
691+
692+ zk .create ("/ledgers/LAYOUT" , "1\n flat:1" .getBytes (StandardCharsets .UTF_8 ), ZooDefs .Ids .OPEN_ACL_UNSAFE ,
693+ CreateMode .PERSISTENT );
694+ }
695+
696+ private TestZKServer createTestZookeeper (int sessionTimeout ) throws Exception {
697+ TestZKServer testZKServer = new TestZKServer ();
698+ try (ZooKeeper zkc = new ZooKeeper (testZKServer .getConnectionString (), sessionTimeout , event -> {
699+ })) {
700+ initializeZookeeper (zkc );
701+ }
702+ registerCloseable (testZKServer );
703+ return testZKServer ;
704+ }
705+
653706 protected void handlePreallocatePorts (ServiceConfiguration config ) {
654707 if (super .preallocatePorts ) {
655708 config .getBrokerServicePort ().ifPresent (portNumber -> {
@@ -714,28 +767,30 @@ private void initializeCommonPulsarServices(SpyConfig spyConfig) {
714767 if (super .localMetadataStore == null || super .configurationMetadataStore == null ) {
715768 if (super .mockZooKeeper != null ) {
716769 MetadataStoreExtended mockZookeeperMetadataStore =
717- createMockZookeeperMetadataStore (super .mockZooKeeper , MetadataStoreConfig .METADATA_STORE );
770+ createMockZookeeperMetadataStore (super .mockZooKeeper , super .config ,
771+ MetadataStoreConfig .METADATA_STORE );
718772 if (super .localMetadataStore == null ) {
719773 localMetadataStore (mockZookeeperMetadataStore );
720774 }
721775 if (super .configurationMetadataStore == null ) {
722776 if (super .mockZooKeeperGlobal != null ) {
723777 configurationMetadataStore (createMockZookeeperMetadataStore (super .mockZooKeeperGlobal ,
724- MetadataStoreConfig .CONFIGURATION_METADATA_STORE ));
778+ super . config , MetadataStoreConfig .CONFIGURATION_METADATA_STORE ));
725779 } else {
726780 configurationMetadataStore (mockZookeeperMetadataStore );
727781 }
728782 }
729783 } else if (super .testZKServer != null ) {
730784 MetadataStoreExtended testZookeeperMetadataStore =
731- createTestZookeeperMetadataStore (super .testZKServer , MetadataStoreConfig .METADATA_STORE );
785+ createTestZookeeperMetadataStore (super .testZKServer , super .config ,
786+ MetadataStoreConfig .METADATA_STORE );
732787 if (super .localMetadataStore == null ) {
733788 localMetadataStore (testZookeeperMetadataStore );
734789 }
735790 if (super .configurationMetadataStore == null ) {
736791 if (super .testZKServerGlobal != null ) {
737792 configurationMetadataStore (createTestZookeeperMetadataStore (super .testZKServerGlobal ,
738- MetadataStoreConfig .CONFIGURATION_METADATA_STORE ));
793+ super . config , MetadataStoreConfig .CONFIGURATION_METADATA_STORE ));
739794 } else {
740795 configurationMetadataStore (testZookeeperMetadataStore );
741796 }
@@ -765,16 +820,30 @@ private void initializeCommonPulsarServices(SpyConfig spyConfig) {
765820 }
766821 }
767822
823+ private MetadataStoreConfig createMetadataStoreConfig (ServiceConfiguration config , String metadataStoreName ) {
824+ return MetadataStoreConfig .builder ()
825+ .sessionTimeoutMillis ((int ) config .getMetadataStoreSessionTimeoutMillis ())
826+ .allowReadOnlyOperations (config .isMetadataStoreAllowReadOnlyOperations ())
827+ .batchingEnabled (config .isMetadataStoreBatchingEnabled ())
828+ .batchingMaxDelayMillis (config .getMetadataStoreBatchingMaxDelayMillis ())
829+ .batchingMaxOperations (config .getMetadataStoreBatchingMaxOperations ())
830+ .batchingMaxSizeKb (config .getMetadataStoreBatchingMaxSizeKb ())
831+ .metadataStoreName (metadataStoreName )
832+ .build ();
833+ }
834+
768835 private MetadataStoreExtended createMockZookeeperMetadataStore (MockZooKeeper mockZooKeeper ,
836+ ServiceConfiguration config ,
769837 String metadataStoreName ) {
770838 // provide a unique session id for each instance
771839 MockZooKeeperSession mockZooKeeperSession = MockZooKeeperSession .newInstance (mockZooKeeper , false );
840+ mockZooKeeperSession .setSessionTimeout ((int ) config .getMetadataStoreSessionTimeoutMillis ());
772841 registerCloseable (() -> {
773842 mockZooKeeperSession .close ();
774843 resetSpyOrMock (mockZooKeeperSession );
775844 });
776- ZKMetadataStore zkMetadataStore = new ZKMetadataStore ( mockZooKeeperSession ,
777- MetadataStoreConfig . builder (). metadataStoreName ( metadataStoreName ). build ( ));
845+ ZKMetadataStore zkMetadataStore =
846+ new ZKMetadataStore ( mockZooKeeperSession , createMetadataStoreConfig ( config , metadataStoreName ));
778847 registerCloseable (() -> {
779848 zkMetadataStore .close ();
780849 resetSpyOrMock (zkMetadataStore );
@@ -786,9 +855,10 @@ private MetadataStoreExtended createMockZookeeperMetadataStore(MockZooKeeper moc
786855
787856 @ SneakyThrows
788857 private MetadataStoreExtended createTestZookeeperMetadataStore (TestZKServer zkServer ,
858+ ServiceConfiguration config ,
789859 String metadataStoreName ) {
790860 MetadataStoreExtended store = MetadataStoreExtended .create ("zk:" + zkServer .getConnectionString (),
791- MetadataStoreConfig . builder (). metadataStoreName ( metadataStoreName ). build ( ));
861+ createMetadataStoreConfig ( config , metadataStoreName ));
792862 registerCloseable (store );
793863 MetadataStoreExtended nonClosingProxy =
794864 NonClosingProxyHandler .createNonClosingProxy (store , MetadataStoreExtended .class );
0 commit comments