2525import java .util .Map ;
2626import java .util .concurrent .TimeUnit ;
2727import lombok .extern .slf4j .Slf4j ;
28+ import org .apache .pulsar .broker .PulsarService ;
2829import org .apache .pulsar .broker .resourcegroup .ResourceGroup .BytesAndMessagesCount ;
2930import org .apache .pulsar .broker .resourcegroup .ResourceGroup .ResourceGroupMonitoringClass ;
3031import org .apache .pulsar .broker .resourcegroup .ResourceGroupService .ResourceGroupUsageStatsType ;
3132import org .apache .pulsar .broker .service .BrokerService ;
32- import org .apache .pulsar .client .admin .PulsarAdminException ;
33+ import org .apache .pulsar .broker .service .SharedPulsarBaseTest ;
34+ import org .apache .pulsar .broker .service .SharedPulsarCluster ;
3335import org .apache .pulsar .client .api .Consumer ;
3436import org .apache .pulsar .client .api .Message ;
3537import org .apache .pulsar .client .api .MessageId ;
3638import org .apache .pulsar .client .api .Producer ;
37- import org .apache .pulsar .client .api .ProducerConsumerBase ;
3839import org .apache .pulsar .client .api .PulsarClientException ;
3940import org .apache .pulsar .client .api .SubscriptionType ;
4041import org .apache .pulsar .common .naming .NamespaceName ;
4142import org .apache .pulsar .common .naming .TopicName ;
42- import org .apache .pulsar .common .policies .data .ClusterData ;
4343import org .apache .pulsar .common .policies .data .TenantInfo ;
4444import org .apache .pulsar .common .policies .data .TenantInfoImpl ;
4545import org .apache .pulsar .common .policies .data .TopicStats ;
4646import org .apache .pulsar .common .policies .data .stats .TopicStatsImpl ;
4747import org .testng .Assert ;
48- import org .testng .annotations .AfterClass ;
49- import org .testng .annotations .BeforeClass ;
5048import org .testng .annotations .Test ;
5149
5250
5755// are verified on the RGs.
5856@ Slf4j
5957@ Test (groups = "flaky" )
60- public class RGUsageMTAggrWaitForAllMsgsTest extends ProducerConsumerBase {
61- @ BeforeClass (alwaysRun = true )
62- @ Override
63- protected void setup () throws Exception {
64- super .internalSetup ();
58+ public class RGUsageMTAggrWaitForAllMsgsTest extends SharedPulsarBaseTest {
59+ @ org .testng .annotations .BeforeClass (alwaysRun = true )
60+ public void setupRG () throws Exception {
61+ PulsarService pulsar = SharedPulsarCluster .get ().getPulsarService ();
6562 this .prepareForOps ();
6663
6764 ResourceQuotaCalculator dummyQuotaCalc = new ResourceQuotaCalculator () {
@@ -87,12 +84,6 @@ public long computeLocalQuota(long confUsage, long myUsage, long[] allUsages) {
8784 Thread .sleep (2000 );
8885 }
8986
90- @ AfterClass (alwaysRun = true )
91- @ Override
92- protected void cleanup () throws Exception {
93- super .internalCleanup ();
94- }
95-
9687 @ Test
9788 public void testMTProduceConsumeRGUsagePersistentTopicNamesSameTenant () throws Exception {
9889 testProduceConsumeUsageOnRG (persistentTopicNamesSameTenantAndNsRGs );
@@ -547,7 +538,7 @@ private void verifyRGProdConsStats(String[] topicStrings,
547538 int scaleFactor , boolean checkProduce ,
548539 boolean checkConsume ) throws Exception {
549540
550- BrokerService bs = pulsar .getBrokerService ();
541+ BrokerService bs = SharedPulsarCluster . get (). getPulsarService () .getBrokerService ();
551542 Map <String , TopicStatsImpl > topicStatsMap = bs .getTopicStats ();
552543
553544 log .debug ("verifyProdConsStats: topicStatsMap has {} entries" , topicStatsMap .size ());
@@ -779,7 +770,7 @@ private void verifyRGMetrics(int sentNumBytes, int sentNumMsgs,
779770 new org .apache .pulsar .common .policies .data .ResourceGroup ();
780771 private ResourceGroupService rgservice ;
781772
782- private final String clusterName = "test" ;
773+ private final String clusterName = SharedPulsarCluster . CLUSTER_NAME ;
783774 private static final String BaseRGName = "rg-" ;
784775 private static final String BaseTestTopicName = "rgusage-topic-" ;
785776
@@ -837,8 +828,8 @@ private void verifyRGMetrics(int sentNumBytes, int sentNumMsgs,
837828 long residualRecvdNumMessages ;
838829
839830 // Create the topics provided
840- private void createTopics (String [] topics ) {
841- BrokerService bs = this . pulsar .getBrokerService ();
831+ private void createTopics (String [] topics ) throws Exception {
832+ BrokerService bs = SharedPulsarCluster . get (). getPulsarService () .getBrokerService ();
842833 for (String topic : topics ) {
843834 if (!createdTopics .contains (topic )) {
844835 bs .getOrCreateTopic (topic );
@@ -848,8 +839,8 @@ private void createTopics(String[] topics) {
848839 }
849840
850841 // Destroy the topics provided
851- private void destroyTopics (String [] topics ) {
852- BrokerService bs = this . pulsar .getBrokerService ();
842+ private void destroyTopics (String [] topics ) throws Exception {
843+ BrokerService bs = SharedPulsarCluster . get (). getPulsarService () .getBrokerService ();
853844 for (String topic : topics ) {
854845 if (!createdTopics .contains (topic )) {
855846 bs .deleteTopic (topic , true );
@@ -873,10 +864,11 @@ private void destroyRGs() throws Exception {
873864 }
874865
875866 // Initial set up for transport manager and cluster creation.
876- private void prepareForOps () throws PulsarAdminException {
877- this .conf .setResourceUsageTransportPublishIntervalInSecs (PUBLISH_INTERVAL_SECS );
878- this .conf .setAllowAutoTopicCreation (true );
879- admin .clusters ().createCluster (clusterName , ClusterData .builder ().serviceUrl (brokerUrl .toString ()).build ());
867+ private void prepareForOps () throws Exception {
868+ SharedPulsarCluster .get ().getPulsarService ().getConfiguration ()
869+ .setResourceUsageTransportPublishIntervalInSecs (PUBLISH_INTERVAL_SECS );
870+ SharedPulsarCluster .get ().getPulsarService ().getConfiguration ().setAllowAutoTopicCreation (true );
871+ // Cluster already created by SharedPulsarCluster
880872 }
881873
882874 // Set up of RG/tenant/namespaces/topic names, and checking of the test parameters.
0 commit comments