Skip to content

Commit 0163ce8

Browse files
authored
[cleanup] Convert 12 test classes to SharedPulsarBaseTest (#25392)
1 parent 08d89a2 commit 0163ce8

14 files changed

Lines changed: 479 additions & 694 deletions

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MaxUnackedMessagesTest.java

Lines changed: 97 additions & 116 deletions
Large diffs are not rendered by default.

pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java

Lines changed: 18 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -25,28 +25,26 @@
2525
import java.util.Map;
2626
import java.util.concurrent.TimeUnit;
2727
import lombok.extern.slf4j.Slf4j;
28+
import org.apache.pulsar.broker.PulsarService;
2829
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount;
2930
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass;
3031
import org.apache.pulsar.broker.resourcegroup.ResourceGroupService.ResourceGroupUsageStatsType;
3132
import org.apache.pulsar.broker.service.BrokerService;
32-
import org.apache.pulsar.client.admin.PulsarAdminException;
33+
import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
34+
import org.apache.pulsar.broker.service.SharedPulsarCluster;
3335
import org.apache.pulsar.client.api.Consumer;
3436
import org.apache.pulsar.client.api.Message;
3537
import org.apache.pulsar.client.api.MessageId;
3638
import org.apache.pulsar.client.api.Producer;
37-
import org.apache.pulsar.client.api.ProducerConsumerBase;
3839
import org.apache.pulsar.client.api.PulsarClientException;
3940
import org.apache.pulsar.client.api.SubscriptionType;
4041
import org.apache.pulsar.common.naming.NamespaceName;
4142
import org.apache.pulsar.common.naming.TopicName;
42-
import org.apache.pulsar.common.policies.data.ClusterData;
4343
import org.apache.pulsar.common.policies.data.TenantInfo;
4444
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
4545
import org.apache.pulsar.common.policies.data.TopicStats;
4646
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
4747
import org.testng.Assert;
48-
import org.testng.annotations.AfterClass;
49-
import org.testng.annotations.BeforeClass;
5048
import org.testng.annotations.Test;
5149

5250

@@ -57,11 +55,10 @@
5755
// are verified on the RGs.
5856
@Slf4j
5957
@Test(groups = "flaky")
60-
public class RGUsageMTAggrWaitForAllMsgsTest extends ProducerConsumerBase {
61-
@BeforeClass(alwaysRun = true)
62-
@Override
63-
protected void setup() throws Exception {
64-
super.internalSetup();
58+
public class RGUsageMTAggrWaitForAllMsgsTest extends SharedPulsarBaseTest {
59+
@org.testng.annotations.BeforeClass(alwaysRun = true)
60+
public void setupRG() throws Exception {
61+
PulsarService pulsar = SharedPulsarCluster.get().getPulsarService();
6562
this.prepareForOps();
6663

6764
ResourceQuotaCalculator dummyQuotaCalc = new ResourceQuotaCalculator() {
@@ -87,12 +84,6 @@ public long computeLocalQuota(long confUsage, long myUsage, long[] allUsages) {
8784
Thread.sleep(2000);
8885
}
8986

90-
@AfterClass(alwaysRun = true)
91-
@Override
92-
protected void cleanup() throws Exception {
93-
super.internalCleanup();
94-
}
95-
9687
@Test
9788
public void testMTProduceConsumeRGUsagePersistentTopicNamesSameTenant() throws Exception {
9889
testProduceConsumeUsageOnRG(persistentTopicNamesSameTenantAndNsRGs);
@@ -547,7 +538,7 @@ private void verifyRGProdConsStats(String[] topicStrings,
547538
int scaleFactor, boolean checkProduce,
548539
boolean checkConsume) throws Exception {
549540

550-
BrokerService bs = pulsar.getBrokerService();
541+
BrokerService bs = SharedPulsarCluster.get().getPulsarService().getBrokerService();
551542
Map<String, TopicStatsImpl> topicStatsMap = bs.getTopicStats();
552543

553544
log.debug("verifyProdConsStats: topicStatsMap has {} entries", topicStatsMap.size());
@@ -779,7 +770,7 @@ private void verifyRGMetrics(int sentNumBytes, int sentNumMsgs,
779770
new org.apache.pulsar.common.policies.data.ResourceGroup();
780771
private ResourceGroupService rgservice;
781772

782-
private final String clusterName = "test";
773+
private final String clusterName = SharedPulsarCluster.CLUSTER_NAME;
783774
private static final String BaseRGName = "rg-";
784775
private static final String BaseTestTopicName = "rgusage-topic-";
785776

@@ -837,8 +828,8 @@ private void verifyRGMetrics(int sentNumBytes, int sentNumMsgs,
837828
long residualRecvdNumMessages;
838829

839830
// Create the topics provided
840-
private void createTopics(String[] topics) {
841-
BrokerService bs = this.pulsar.getBrokerService();
831+
private void createTopics(String[] topics) throws Exception {
832+
BrokerService bs = SharedPulsarCluster.get().getPulsarService().getBrokerService();
842833
for (String topic : topics) {
843834
if (!createdTopics.contains(topic)) {
844835
bs.getOrCreateTopic(topic);
@@ -848,8 +839,8 @@ private void createTopics(String[] topics) {
848839
}
849840

850841
// Destroy the topics provided
851-
private void destroyTopics(String[] topics) {
852-
BrokerService bs = this.pulsar.getBrokerService();
842+
private void destroyTopics(String[] topics) throws Exception {
843+
BrokerService bs = SharedPulsarCluster.get().getPulsarService().getBrokerService();
853844
for (String topic : topics) {
854845
if (!createdTopics.contains(topic)) {
855846
bs.deleteTopic(topic, true);
@@ -873,10 +864,11 @@ private void destroyRGs() throws Exception {
873864
}
874865

875866
// Initial set up for transport manager and cluster creation.
876-
private void prepareForOps() throws PulsarAdminException {
877-
this.conf.setResourceUsageTransportPublishIntervalInSecs(PUBLISH_INTERVAL_SECS);
878-
this.conf.setAllowAutoTopicCreation(true);
879-
admin.clusters().createCluster(clusterName, ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
867+
private void prepareForOps() throws Exception {
868+
SharedPulsarCluster.get().getPulsarService().getConfiguration()
869+
.setResourceUsageTransportPublishIntervalInSecs(PUBLISH_INTERVAL_SECS);
870+
SharedPulsarCluster.get().getPulsarService().getConfiguration().setAllowAutoTopicCreation(true);
871+
// Cluster already created by SharedPulsarCluster
880872
}
881873

882874
// Set up of RG/tenant/namespaces/topic names, and checking of the test parameters.

pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java

Lines changed: 18 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -23,37 +23,34 @@
2323
import java.util.concurrent.CompletableFuture;
2424
import java.util.concurrent.TimeUnit;
2525
import lombok.extern.slf4j.Slf4j;
26+
import org.apache.pulsar.broker.PulsarService;
2627
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount;
2728
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass;
2829
import org.apache.pulsar.broker.resourcegroup.ResourceGroupService.ResourceGroupUsageStatsType;
2930
import org.apache.pulsar.broker.service.BrokerService;
31+
import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
32+
import org.apache.pulsar.broker.service.SharedPulsarCluster;
3033
import org.apache.pulsar.broker.service.Topic;
3134
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
3235
import org.apache.pulsar.broker.service.resource.usage.ResourceUsage;
33-
import org.apache.pulsar.client.admin.PulsarAdminException;
3436
import org.apache.pulsar.client.api.Consumer;
3537
import org.apache.pulsar.client.api.Message;
3638
import org.apache.pulsar.client.api.Producer;
37-
import org.apache.pulsar.client.api.ProducerConsumerBase;
3839
import org.apache.pulsar.client.api.PulsarClientException;
3940
import org.apache.pulsar.client.api.SubscriptionType;
4041
import org.apache.pulsar.common.naming.NamespaceName;
4142
import org.apache.pulsar.common.naming.TopicName;
42-
import org.apache.pulsar.common.policies.data.ClusterData;
4343
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
4444
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
4545
import org.awaitility.Awaitility;
4646
import org.testng.Assert;
47-
import org.testng.annotations.AfterClass;
48-
import org.testng.annotations.BeforeClass;
4947
import org.testng.annotations.Test;
5048

5149
@Slf4j
52-
public class ResourceGroupUsageAggregationTest extends ProducerConsumerBase {
53-
@BeforeClass
54-
@Override
55-
protected void setup() throws Exception {
56-
super.internalSetup();
50+
public class ResourceGroupUsageAggregationTest extends SharedPulsarBaseTest {
51+
@org.testng.annotations.BeforeClass(alwaysRun = true)
52+
public void setupRG() throws Exception {
53+
PulsarService pulsar = SharedPulsarCluster.get().getPulsarService();
5754
this.prepareData();
5855

5956
ResourceQuotaCalculator dummyQuotaCalc = new ResourceQuotaCalculator() {
@@ -74,12 +71,6 @@ public long computeLocalQuota(long confUsage, long myUsage, long[] allUsages) {
7471
this.rgs = new ResourceGroupService(pulsar, TimeUnit.MILLISECONDS, transportMgr, dummyQuotaCalc);
7572
}
7673

77-
@AfterClass(alwaysRun = true)
78-
@Override
79-
protected void cleanup() throws Exception {
80-
super.internalCleanup();
81-
}
82-
8374
@Test
8475
public void testProduceConsumeUsageOnRG() throws Exception {
8576
testProduceConsumeUsageOnRG(produceConsumePersistentTopic);
@@ -183,7 +174,8 @@ public void acceptResourceUsage(String broker, ResourceUsage resourceUsage) {
183174

184175
consumer.close();
185176
// cleanup the topic data.
186-
CompletableFuture<Optional<Topic>> topicFuture = pulsar.getBrokerService().getTopics().remove(topicString);
177+
CompletableFuture<Optional<Topic>> topicFuture =
178+
SharedPulsarCluster.get().getPulsarService().getBrokerService().getTopics().remove(topicString);
187179
if (topicFuture != null) {
188180
Optional<Topic> optTopic = topicFuture.join();
189181
if (optTopic.isPresent()) {
@@ -210,8 +202,8 @@ private void verifyStats(String topicString, String rgName,
210202
int sentNumBytes, int sentNumMsgs,
211203
int recvdNumBytes, int recvdNumMsgs,
212204
boolean checkProduce, boolean checkConsume)
213-
throws InterruptedException, PulsarAdminException {
214-
BrokerService bs = pulsar.getBrokerService();
205+
throws Exception {
206+
BrokerService bs = SharedPulsarCluster.get().getPulsarService().getBrokerService();
215207
Awaitility.await().untilAsserted(() -> {
216208
TopicStatsImpl topicStats = bs.getTopicStats().get(topicString);
217209
Assert.assertNotNull(topicStats);
@@ -272,15 +264,15 @@ private void verifyStats(String topicString, String rgName,
272264
private static final int PUBLISH_INTERVAL_SECS = 300;
273265

274266
// Initial set up for transport manager and producer/consumer clusters/tenants/namespaces/topics.
275-
private void prepareData() throws PulsarAdminException {
276-
this.conf.setResourceUsageTransportPublishIntervalInSecs(PUBLISH_INTERVAL_SECS);
267+
private void prepareData() throws Exception {
268+
SharedPulsarCluster.get().getPulsarService().getConfiguration()
269+
.setResourceUsageTransportPublishIntervalInSecs(PUBLISH_INTERVAL_SECS);
277270

278-
this.conf.setAllowAutoTopicCreation(true);
271+
SharedPulsarCluster.get().getPulsarService().getConfiguration().setAllowAutoTopicCreation(true);
279272

280-
final String clusterName = "test";
281-
admin.clusters().createCluster(clusterName, ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
282-
admin.tenants().createTenant(tenantName,
283-
new TenantInfoImpl(Sets.newHashSet("fakeAdminRole"), Sets.newHashSet(clusterName)));
273+
final String clusterName = SharedPulsarCluster.CLUSTER_NAME;
274+
admin.tenants().createTenant(tenantName,
275+
new TenantInfoImpl(Sets.newHashSet("fakeAdminRole"), Sets.newHashSet(clusterName)));
284276
admin.namespaces().createNamespace(tenantAndNsName);
285277
admin.namespaces().setNamespaceReplicationClusters(tenantAndNsName, Sets.newHashSet(clusterName), false);
286278
}

0 commit comments

Comments
 (0)