Skip to content

Commit 7e5839c

Browse files
authored
[cleanup] Convert 30 test classes to SharedPulsarBaseTest (#25381)
1 parent 3bc834f commit 7e5839c

30 files changed

Lines changed: 455 additions & 1036 deletions

pulsar-broker/src/test/java/org/apache/pulsar/broker/LedgerLostAndSkipNonRecoverableTest.java

Lines changed: 12 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -33,45 +33,31 @@
3333
import lombok.extern.slf4j.Slf4j;
3434
import org.apache.bookkeeper.mledger.Position;
3535
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
36+
import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
37+
import org.apache.pulsar.broker.service.SharedPulsarCluster;
3638
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
3739
import org.apache.pulsar.client.api.Consumer;
3840
import org.apache.pulsar.client.api.Message;
3941
import org.apache.pulsar.client.api.MessageId;
4042
import org.apache.pulsar.client.api.Producer;
41-
import org.apache.pulsar.client.api.ProducerConsumerBase;
4243
import org.apache.pulsar.client.api.Schema;
4344
import org.apache.pulsar.client.api.SubscriptionType;
4445
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
4546
import org.apache.pulsar.client.impl.MessageIdImpl;
4647
import org.apache.pulsar.common.util.FutureUtil;
4748
import org.awaitility.Awaitility;
4849
import org.testng.Assert;
49-
import org.testng.annotations.AfterClass;
50-
import org.testng.annotations.BeforeClass;
50+
import org.testng.annotations.BeforeMethod;
5151
import org.testng.annotations.DataProvider;
5252
import org.testng.annotations.Test;
5353

5454
@Slf4j
5555
@Test(groups = "broker")
56-
public class LedgerLostAndSkipNonRecoverableTest extends ProducerConsumerBase {
56+
public class LedgerLostAndSkipNonRecoverableTest extends SharedPulsarBaseTest {
5757

58-
private static final String DEFAULT_NAMESPACE = "my-property/my-ns";
59-
60-
@BeforeClass
61-
@Override
62-
protected void setup() throws Exception {
63-
super.internalSetup();
64-
super.producerBaseSetup();
65-
}
66-
67-
@AfterClass
68-
@Override
69-
protected void cleanup() throws Exception {
70-
super.internalCleanup();
71-
}
72-
73-
protected void doInitConf() throws Exception {
74-
conf.setAutoSkipNonRecoverableData(true);
58+
@BeforeMethod(alwaysRun = true)
59+
public void enableAutoSkipNonRecoverable() throws Exception {
60+
SharedPulsarCluster.get().getPulsarService().getConfiguration().setAutoSkipNonRecoverableData(true);
7561
}
7662

7763
@DataProvider(name = "batchEnabled")
@@ -84,9 +70,8 @@ public Object[][] batchEnabled(){
8470

8571
@Test(timeOut = 30000, dataProvider = "batchEnabled")
8672
public void testMarkDeletedPositionCanForwardAfterTopicLedgerLost(boolean enabledBatch) throws Exception {
87-
String topicSimpleName = UUID.randomUUID().toString().replaceAll("-", "");
73+
String topicName = newTopicName();
8874
String subName = UUID.randomUUID().toString().replaceAll("-", "");
89-
String topicName = String.format("persistent://%s/%s", DEFAULT_NAMESPACE, topicSimpleName);
9075

9176
log.info("create topic and subscription.");
9277
Consumer sub = createConsumer(topicName, subName, enabledBatch);
@@ -120,8 +105,9 @@ public void testMarkDeletedPositionCanForwardAfterTopicLedgerLost(boolean enable
120105
consumer.close();
121106

122107
log.info("Make lost ledger [{}].", individualPosition.getLedgerId());
123-
pulsar.getBrokerService().getTopic(topicName, false).get().get().close(false);
124-
pulsarTestContext.getMockBookKeeper().deleteLedger(individualPosition.getLedgerId());
108+
getTopic(topicName, false).get().get().close(false);
109+
PulsarService pulsar = SharedPulsarCluster.get().getPulsarService();
110+
pulsar.getBookKeeperClient().deleteLedger(individualPosition.getLedgerId());
125111

126112
log.info("send some messages.");
127113
sendManyMessages(topicName, 3, messageCountPerEntry);
@@ -133,12 +119,11 @@ public void testMarkDeletedPositionCanForwardAfterTopicLedgerLost(boolean enable
133119

134120
// cleanup
135121
consumerAndReceivedMessages2.consumer.close();
136-
admin.topics().delete(topicName);
137122
}
138123

139124
private ManagedCursorImpl getCursor(String topicName, String subName) throws Exception {
140125
PersistentSubscription subscription =
141-
(PersistentSubscription) pulsar.getBrokerService().getTopic(topicName, false)
126+
(PersistentSubscription) getTopic(topicName, false)
142127
.get().get().getSubscription(subName);
143128
return (ManagedCursorImpl) subscription.getCursor();
144129
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTopicApiTest.java

Lines changed: 13 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,12 @@
3434
import java.util.concurrent.ExecutionException;
3535
import java.util.concurrent.TimeUnit;
3636
import lombok.Cleanup;
37-
import org.apache.pulsar.broker.BrokerTestUtil;
37+
import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
3838
import org.apache.pulsar.client.admin.PulsarAdminException;
3939
import org.apache.pulsar.client.api.Consumer;
4040
import org.apache.pulsar.client.api.Message;
4141
import org.apache.pulsar.client.api.MessageId;
4242
import org.apache.pulsar.client.api.Producer;
43-
import org.apache.pulsar.client.api.ProducerConsumerBase;
4443
import org.apache.pulsar.client.api.PulsarClient;
4544
import org.apache.pulsar.client.api.PulsarClientException;
4645
import org.apache.pulsar.client.api.Schema;
@@ -54,32 +53,17 @@
5453
import org.slf4j.Logger;
5554
import org.slf4j.LoggerFactory;
5655
import org.testng.Assert;
57-
import org.testng.annotations.AfterClass;
58-
import org.testng.annotations.BeforeClass;
5956
import org.testng.annotations.DataProvider;
6057
import org.testng.annotations.Test;
6158

6259
@Test(groups = "broker-admin")
63-
public class AdminTopicApiTest extends ProducerConsumerBase {
60+
public class AdminTopicApiTest extends SharedPulsarBaseTest {
6461
private static final Logger log = LoggerFactory.getLogger(AdminTopicApiTest.class);
6562

66-
@Override
67-
@BeforeClass(alwaysRun = true)
68-
protected void setup() throws Exception {
69-
super.internalSetup();
70-
super.producerBaseSetup();
71-
}
72-
73-
@Override
74-
@AfterClass(alwaysRun = true)
75-
protected void cleanup() throws Exception {
76-
super.internalCleanup();
77-
}
78-
7963
@Test
8064
public void testDeleteNonExistTopic() throws Exception {
8165
// Case 1: call delete for a partitioned topic.
82-
final String topic1 = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
66+
final String topic1 = newTopicName();
8367
admin.topics().createPartitionedTopic(topic1, 2);
8468
admin.schemas().createSchemaAsync(topic1, Schema.STRING.getSchemaInfo());
8569
Awaitility.await().untilAsserted(() -> {
@@ -97,8 +81,8 @@ public void testDeleteNonExistTopic() throws Exception {
9781
// cleanup.
9882
admin.topics().deletePartitionedTopic(topic1, false);
9983

100-
// Case 2: call delete-partitioned-topi for a non-partitioned topic.
101-
final String topic2 = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
84+
// Case 2: call delete-partitioned-topic for a non-partitioned topic.
85+
final String topic2 = newTopicName();
10286
admin.topics().createNonPartitionedTopic(topic2);
10387
admin.schemas().createSchemaAsync(topic2, Schema.STRING.getSchemaInfo());
10488
Awaitility.await().untilAsserted(() -> {
@@ -117,7 +101,7 @@ public void testDeleteNonExistTopic() throws Exception {
117101
admin.topics().delete(topic2, false);
118102

119103
// Case 3: delete topic does not exist.
120-
final String topic3 = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
104+
final String topic3 = newTopicName();
121105
try {
122106
admin.topics().delete(topic3);
123107
fail("expected a 404 error");
@@ -136,10 +120,10 @@ public void testDeleteNonExistTopic() throws Exception {
136120
public void testPeekMessages() throws Exception {
137121
@Cleanup
138122
PulsarClient newPulsarClient = PulsarClient.builder()
139-
.serviceUrl(lookupUrl.toString())
123+
.serviceUrl(getBrokerServiceUrl())
140124
.build();
141125

142-
final String topic = "persistent://my-property/my-ns/test-publish-timestamp";
126+
final String topic = newTopicName();
143127

144128
@Cleanup
145129
Consumer<byte[]> consumer = newPulsarClient.newConsumer()
@@ -179,18 +163,19 @@ public void testPeekMessages() throws Exception {
179163
@DataProvider
180164
public Object[] getStatsDataProvider() {
181165
return new Object[]{
182-
TopicDomain.persistent + "://my-property/my-ns/" + UUID.randomUUID(),
183-
TopicDomain.non_persistent + "://my-property/my-ns/" + UUID.randomUUID(),
166+
TopicDomain.persistent.value(),
167+
TopicDomain.non_persistent.value(),
184168
};
185169
}
186170

187171
@Test(dataProvider = "getStatsDataProvider")
188-
public void testGetStats(String topic) throws Exception {
172+
public void testGetStats(String domain) throws Exception {
173+
String topic = domain + "://" + getNamespace() + "/" + UUID.randomUUID();
189174
admin.topics().createNonPartitionedTopic(topic);
190175

191176
@Cleanup
192177
PulsarClient newPulsarClient = PulsarClient.builder()
193-
.serviceUrl(lookupUrl.toString())
178+
.serviceUrl(getBrokerServiceUrl())
194179
.build();
195180

196181
final String subscriptionName = "my-sub";

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java

Lines changed: 24 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -29,40 +29,31 @@
2929
import java.util.concurrent.TimeUnit;
3030
import lombok.Cleanup;
3131
import org.apache.commons.lang3.RandomUtils;
32+
import org.apache.pulsar.broker.ServiceConfiguration;
33+
import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
34+
import org.apache.pulsar.broker.service.SharedPulsarCluster;
3235
import org.apache.pulsar.client.admin.PulsarAdminException;
3336
import org.apache.pulsar.client.api.Consumer;
3437
import org.apache.pulsar.client.api.Message;
3538
import org.apache.pulsar.client.api.MessageId;
3639
import org.apache.pulsar.client.api.MessageIdAdv;
3740
import org.apache.pulsar.client.api.Producer;
38-
import org.apache.pulsar.client.api.ProducerConsumerBase;
3941
import org.apache.pulsar.client.api.SubscriptionType;
4042
import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult;
4143
import org.apache.pulsar.common.util.FutureUtil;
42-
import org.testng.annotations.AfterMethod;
43-
import org.testng.annotations.BeforeMethod;
4444
import org.testng.annotations.Test;
4545

4646
@Test(groups = "broker-admin")
47-
public class AnalyzeBacklogSubscriptionTest extends ProducerConsumerBase {
47+
public class AnalyzeBacklogSubscriptionTest extends SharedPulsarBaseTest {
4848

49-
@BeforeMethod
50-
@Override
51-
public void setup() throws Exception {
52-
super.internalSetup();
53-
producerBaseSetup();
49+
private ServiceConfiguration conf() throws Exception {
50+
return SharedPulsarCluster.get().getPulsarService().getConfiguration();
5451
}
5552

56-
@Override
57-
protected void doInitConf() throws Exception {
58-
super.doInitConf();
59-
conf.setDispatcherMaxReadBatchSize(10);
60-
}
61-
62-
@AfterMethod(alwaysRun = true)
63-
@Override
64-
public void cleanup() throws Exception {
65-
super.internalCleanup();
53+
@org.testng.annotations.BeforeMethod(alwaysRun = true)
54+
public void setupDispatcherConfig() throws Exception {
55+
conf().setDispatcherMaxReadBatchSize(10);
56+
conf().setSubscriptionBacklogScanMaxEntries(10000);
6657
}
6758

6859
@Test
@@ -80,7 +71,7 @@ private void simpleAnalyzeBacklogTest(boolean batching) throws Exception {
8071
int batchSize = batching ? 5 : 1;
8172
int numEntries = numMessages / batchSize;
8273

83-
String topic = "persistent://my-property/my-ns/my-topic-" + batching;
74+
String topic = newTopicName();
8475
String subName = "sub-1";
8576
admin.topics().createSubscription(topic, subName, MessageId.latest);
8677

@@ -181,7 +172,7 @@ private void verifyBacklog(String topic, String subscription, int numEntries, in
181172

182173
@Test
183174
public void partitionedTopicNotAllowed() throws Exception {
184-
String topic = "persistent://my-property/my-ns/my-partitioned-topic";
175+
String topic = newTopicName();
185176
String subName = "sub-1";
186177
admin.topics().createPartitionedTopic(topic, 2);
187178
admin.topics().createSubscription(topic, subName, MessageId.latest);
@@ -201,9 +192,9 @@ public void partitionedTopicNotAllowed() throws Exception {
201192
@Test
202193
public void analyzeBacklogServerReturnFalseAbortedFlagWithoutLoop() throws Exception {
203194
int serverSubscriptionBacklogScanMaxEntries = 20;
204-
conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);
195+
conf().setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);
205196

206-
String topic = "persistent://my-property/my-ns/analyze-backlog-server-return-false-aborted-flag-without-loop";
197+
String topic = newTopicName();
207198
String subName = "sub-1";
208199
int numMessages = 10;
209200

@@ -217,9 +208,9 @@ public void analyzeBacklogServerReturnFalseAbortedFlagWithoutLoop() throws Excep
217208
@Test
218209
public void analyzeBacklogMaxEntriesExceedWithoutLoop() throws Exception {
219210
int serverSubscriptionBacklogScanMaxEntries = 20;
220-
conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);
211+
conf().setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);
221212

222-
String topic = "persistent://my-property/my-ns/analyze-backlog-max-entries-exceed-without-loop";
213+
String topic = newTopicName();
223214
String subName = "sub-1";
224215
int numMessages = 25;
225216

@@ -236,9 +227,9 @@ public void analyzeBacklogMaxEntriesExceedWithoutLoop() throws Exception {
236227
@Test
237228
public void analyzeBacklogServerReturnFalseAbortedFlagWithLoop() throws Exception {
238229
int serverSubscriptionBacklogScanMaxEntries = 20;
239-
conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);
230+
conf().setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);
240231

241-
String topic = "persistent://my-property/my-ns/analyze-backlog-server-return-false-aborted-flag-with-loop";
232+
String topic = newTopicName();
242233
String subName = "sub-1";
243234
int numMessages = 45;
244235

@@ -253,9 +244,9 @@ public void analyzeBacklogServerReturnFalseAbortedFlagWithLoop() throws Exceptio
253244
@Test
254245
public void analyzeBacklogMaxEntriesExceedWithLoop() throws Exception {
255246
int serverSubscriptionBacklogScanMaxEntries = 15;
256-
conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);
247+
conf().setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);
257248

258-
String topic = "persistent://my-property/my-ns/analyze-backlog-max-entries-exceed-with-loop";
249+
String topic = newTopicName();
259250
String subName = "sub-1";
260251
int numMessages = 55;
261252
int backlogScanMaxEntries = 40;
@@ -275,9 +266,9 @@ public void analyzeBacklogMaxEntriesExceedWithLoop() throws Exception {
275266
@Test
276267
public void analyzeBacklogWithTopicUnload() throws Exception {
277268
int serverSubscriptionBacklogScanMaxEntries = 10;
278-
conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);
269+
conf().setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);
279270

280-
String topic = "persistent://my-property/my-ns/analyze-backlog-with-topic-unload";
271+
String topic = newTopicName();
281272
String subName = "sub-1";
282273
int numMessages = 35;
283274

@@ -304,9 +295,9 @@ public void analyzeBacklogWithTopicUnload() throws Exception {
304295
@Test
305296
public void analyzeBacklogWithIndividualAck() throws Exception {
306297
int serverSubscriptionBacklogScanMaxEntries = 20;
307-
conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);
298+
conf().setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);
308299

309-
String topic = "persistent://my-property/my-ns/analyze-backlog-with-individual-ack";
300+
String topic = newTopicName();
310301
String subName = "sub-1";
311302
int messages = 55;
312303

0 commit comments

Comments
 (0)