2929import java .util .concurrent .TimeUnit ;
3030import lombok .Cleanup ;
3131import org .apache .commons .lang3 .RandomUtils ;
32+ import org .apache .pulsar .broker .ServiceConfiguration ;
33+ import org .apache .pulsar .broker .service .SharedPulsarBaseTest ;
34+ import org .apache .pulsar .broker .service .SharedPulsarCluster ;
3235import org .apache .pulsar .client .admin .PulsarAdminException ;
3336import org .apache .pulsar .client .api .Consumer ;
3437import org .apache .pulsar .client .api .Message ;
3538import org .apache .pulsar .client .api .MessageId ;
3639import org .apache .pulsar .client .api .MessageIdAdv ;
3740import org .apache .pulsar .client .api .Producer ;
38- import org .apache .pulsar .client .api .ProducerConsumerBase ;
3941import org .apache .pulsar .client .api .SubscriptionType ;
4042import org .apache .pulsar .common .stats .AnalyzeSubscriptionBacklogResult ;
4143import org .apache .pulsar .common .util .FutureUtil ;
42- import org .testng .annotations .AfterMethod ;
43- import org .testng .annotations .BeforeMethod ;
4444import org .testng .annotations .Test ;
4545
4646@ Test (groups = "broker-admin" )
47- public class AnalyzeBacklogSubscriptionTest extends ProducerConsumerBase {
47+ public class AnalyzeBacklogSubscriptionTest extends SharedPulsarBaseTest {
4848
49- @ BeforeMethod
50- @ Override
51- public void setup () throws Exception {
52- super .internalSetup ();
53- producerBaseSetup ();
49+ private ServiceConfiguration conf () throws Exception {
50+ return SharedPulsarCluster .get ().getPulsarService ().getConfiguration ();
5451 }
5552
56- @ Override
57- protected void doInitConf () throws Exception {
58- super .doInitConf ();
59- conf .setDispatcherMaxReadBatchSize (10 );
60- }
61-
62- @ AfterMethod (alwaysRun = true )
63- @ Override
64- public void cleanup () throws Exception {
65- super .internalCleanup ();
53+ @ org .testng .annotations .BeforeMethod (alwaysRun = true )
54+ public void setupDispatcherConfig () throws Exception {
55+ conf ().setDispatcherMaxReadBatchSize (10 );
56+ conf ().setSubscriptionBacklogScanMaxEntries (10000 );
6657 }
6758
6859 @ Test
@@ -80,7 +71,7 @@ private void simpleAnalyzeBacklogTest(boolean batching) throws Exception {
8071 int batchSize = batching ? 5 : 1 ;
8172 int numEntries = numMessages / batchSize ;
8273
83- String topic = "persistent://my-property/my-ns/my-topic-" + batching ;
74+ String topic = newTopicName () ;
8475 String subName = "sub-1" ;
8576 admin .topics ().createSubscription (topic , subName , MessageId .latest );
8677
@@ -181,7 +172,7 @@ private void verifyBacklog(String topic, String subscription, int numEntries, in
181172
182173 @ Test
183174 public void partitionedTopicNotAllowed () throws Exception {
184- String topic = "persistent://my-property/my-ns/my-partitioned-topic" ;
175+ String topic = newTopicName () ;
185176 String subName = "sub-1" ;
186177 admin .topics ().createPartitionedTopic (topic , 2 );
187178 admin .topics ().createSubscription (topic , subName , MessageId .latest );
@@ -201,9 +192,9 @@ public void partitionedTopicNotAllowed() throws Exception {
201192 @ Test
202193 public void analyzeBacklogServerReturnFalseAbortedFlagWithoutLoop () throws Exception {
203194 int serverSubscriptionBacklogScanMaxEntries = 20 ;
204- conf .setSubscriptionBacklogScanMaxEntries (serverSubscriptionBacklogScanMaxEntries );
195+ conf () .setSubscriptionBacklogScanMaxEntries (serverSubscriptionBacklogScanMaxEntries );
205196
206- String topic = "persistent://my-property/my-ns/analyze-backlog-server-return-false-aborted-flag-without-loop" ;
197+ String topic = newTopicName () ;
207198 String subName = "sub-1" ;
208199 int numMessages = 10 ;
209200
@@ -217,9 +208,9 @@ public void analyzeBacklogServerReturnFalseAbortedFlagWithoutLoop() throws Excep
217208 @ Test
218209 public void analyzeBacklogMaxEntriesExceedWithoutLoop () throws Exception {
219210 int serverSubscriptionBacklogScanMaxEntries = 20 ;
220- conf .setSubscriptionBacklogScanMaxEntries (serverSubscriptionBacklogScanMaxEntries );
211+ conf () .setSubscriptionBacklogScanMaxEntries (serverSubscriptionBacklogScanMaxEntries );
221212
222- String topic = "persistent://my-property/my-ns/analyze-backlog-max-entries-exceed-without-loop" ;
213+ String topic = newTopicName () ;
223214 String subName = "sub-1" ;
224215 int numMessages = 25 ;
225216
@@ -236,9 +227,9 @@ public void analyzeBacklogMaxEntriesExceedWithoutLoop() throws Exception {
236227 @ Test
237228 public void analyzeBacklogServerReturnFalseAbortedFlagWithLoop () throws Exception {
238229 int serverSubscriptionBacklogScanMaxEntries = 20 ;
239- conf .setSubscriptionBacklogScanMaxEntries (serverSubscriptionBacklogScanMaxEntries );
230+ conf () .setSubscriptionBacklogScanMaxEntries (serverSubscriptionBacklogScanMaxEntries );
240231
241- String topic = "persistent://my-property/my-ns/analyze-backlog-server-return-false-aborted-flag-with-loop" ;
232+ String topic = newTopicName () ;
242233 String subName = "sub-1" ;
243234 int numMessages = 45 ;
244235
@@ -253,9 +244,9 @@ public void analyzeBacklogServerReturnFalseAbortedFlagWithLoop() throws Exceptio
253244 @ Test
254245 public void analyzeBacklogMaxEntriesExceedWithLoop () throws Exception {
255246 int serverSubscriptionBacklogScanMaxEntries = 15 ;
256- conf .setSubscriptionBacklogScanMaxEntries (serverSubscriptionBacklogScanMaxEntries );
247+ conf () .setSubscriptionBacklogScanMaxEntries (serverSubscriptionBacklogScanMaxEntries );
257248
258- String topic = "persistent://my-property/my-ns/analyze-backlog-max-entries-exceed-with-loop" ;
249+ String topic = newTopicName () ;
259250 String subName = "sub-1" ;
260251 int numMessages = 55 ;
261252 int backlogScanMaxEntries = 40 ;
@@ -275,9 +266,9 @@ public void analyzeBacklogMaxEntriesExceedWithLoop() throws Exception {
275266 @ Test
276267 public void analyzeBacklogWithTopicUnload () throws Exception {
277268 int serverSubscriptionBacklogScanMaxEntries = 10 ;
278- conf .setSubscriptionBacklogScanMaxEntries (serverSubscriptionBacklogScanMaxEntries );
269+ conf () .setSubscriptionBacklogScanMaxEntries (serverSubscriptionBacklogScanMaxEntries );
279270
280- String topic = "persistent://my-property/my-ns/analyze-backlog-with-topic-unload" ;
271+ String topic = newTopicName () ;
281272 String subName = "sub-1" ;
282273 int numMessages = 35 ;
283274
@@ -304,9 +295,9 @@ public void analyzeBacklogWithTopicUnload() throws Exception {
304295 @ Test
305296 public void analyzeBacklogWithIndividualAck () throws Exception {
306297 int serverSubscriptionBacklogScanMaxEntries = 20 ;
307- conf .setSubscriptionBacklogScanMaxEntries (serverSubscriptionBacklogScanMaxEntries );
298+ conf () .setSubscriptionBacklogScanMaxEntries (serverSubscriptionBacklogScanMaxEntries );
308299
309- String topic = "persistent://my-property/my-ns/analyze-backlog-with-individual-ack" ;
300+ String topic = newTopicName () ;
310301 String subName = "sub-1" ;
311302 int messages = 55 ;
312303
0 commit comments