2222import static org .apache .commons .lang3 .StringUtils .isBlank ;
2323import static org .apache .pulsar .broker .BrokerTestUtil .newUniqueName ;
2424import static org .apache .pulsar .broker .resources .LoadBalanceResources .BUNDLE_DATA_BASE_PATH ;
25+ import static org .mockito .ArgumentMatchers .any ;
26+ import static org .mockito .Mockito .doAnswer ;
2527import static org .mockito .Mockito .spy ;
2628import static org .mockito .Mockito .times ;
2729import static org .mockito .Mockito .verify ;
5254import java .util .UUID ;
5355import java .util .concurrent .CompletableFuture ;
5456import java .util .concurrent .TimeUnit ;
57+ import java .util .concurrent .atomic .AtomicInteger ;
5558import javax .ws .rs .NotAcceptableException ;
5659import javax .ws .rs .core .Response .Status ;
5760import lombok .AllArgsConstructor ;
7073import org .apache .pulsar .broker .loadbalance .impl .ModularLoadManagerImpl ;
7174import org .apache .pulsar .broker .loadbalance .impl .ModularLoadManagerWrapper ;
7275import org .apache .pulsar .broker .loadbalance .impl .SimpleLoadManagerImpl ;
76+ import org .apache .pulsar .broker .service .AbstractTopic ;
7377import org .apache .pulsar .broker .service .Topic ;
7478import org .apache .pulsar .broker .service .persistent .PersistentSubscription ;
7579import org .apache .pulsar .broker .service .persistent .PersistentTopic ;
127131import org .apache .pulsar .common .policies .data .TopicStats ;
128132import org .apache .pulsar .common .policies .data .TopicType ;
129133import org .apache .pulsar .common .policies .data .impl .BacklogQuotaImpl ;
134+ import org .apache .pulsar .common .protocol .schema .SchemaData ;
135+ import org .apache .pulsar .common .util .collections .ConcurrentOpenHashMap ;
130136import org .awaitility .Awaitility ;
137+ import org .awaitility .reflect .WhiteboxImpl ;
138+ import org .mockito .Mockito ;
139+ import org .mockito .invocation .InvocationOnMock ;
140+ import org .mockito .stubbing .Answer ;
131141import org .testng .Assert ;
132142import org .testng .annotations .AfterClass ;
133143import org .testng .annotations .AfterMethod ;
@@ -2870,49 +2880,80 @@ public void testMaxProducersPerTopicUnlimited() throws Exception {
28702880 final String myNamespace = newUniqueName (defaultTenant + "/ns" );
28712881 admin .namespaces ().createNamespace (myNamespace , Set .of ("test" ));
28722882 final String topic = "persistent://" + myNamespace + "/testMaxProducersPerTopicUnlimited" ;
2883+ admin .topics ().createNonPartitionedTopic (topic );
2884+ AtomicInteger schemaOpsCounter = injectSchemaCheckCounterForTopic (topic );
28732885 //the policy is set to 0, so there will be no restrictions
28742886 admin .namespaces ().setMaxProducersPerTopic (myNamespace , 0 );
28752887 Awaitility .await ().until (()
28762888 -> admin .namespaces ().getMaxProducersPerTopic (myNamespace ) == 0 );
2877- List <Producer <byte [] >> producers = new ArrayList <>();
2889+ List <Producer <String >> producers = new ArrayList <>();
28782890 for (int i = 0 ; i < maxProducersPerTopic + 1 ; i ++) {
2879- Producer <byte [] > producer = pulsarClient .newProducer ().topic (topic ).create ();
2891+ Producer <String > producer = pulsarClient .newProducer (Schema . STRING ).topic (topic ).create ();
28802892 producers .add (producer );
28812893 }
2894+ assertEquals (schemaOpsCounter .get (), maxProducersPerTopic + 1 );
28822895
28832896 admin .namespaces ().removeMaxProducersPerTopic (myNamespace );
28842897 Awaitility .await ().until (()
28852898 -> admin .namespaces ().getMaxProducersPerTopic (myNamespace ) == null );
2899+
28862900 try {
28872901 @ Cleanup
2888- Producer <byte [] > producer = pulsarClient .newProducer ().topic (topic ).create ();
2902+ Producer <String > producer = pulsarClient .newProducer (Schema . STRING ).topic (topic ).create ();
28892903 fail ("should fail" );
28902904 } catch (PulsarClientException e ) {
28912905 String expectMsg = "Topic '" + topic + "' reached max producers limit" ;
28922906 assertTrue (e .getMessage ().contains (expectMsg ));
2907+ assertEquals (schemaOpsCounter .get (), maxProducersPerTopic + 1 );
28932908 }
28942909 //set the limit to 3
28952910 admin .namespaces ().setMaxProducersPerTopic (myNamespace , 3 );
28962911 Awaitility .await ().until (()
28972912 -> admin .namespaces ().getMaxProducersPerTopic (myNamespace ) == 3 );
28982913 // should success
2899- Producer <byte [] > producer = pulsarClient .newProducer ().topic (topic ).create ();
2914+ Producer <String > producer = pulsarClient .newProducer (Schema . STRING ).topic (topic ).create ();
29002915 producers .add (producer );
2916+ assertEquals (schemaOpsCounter .get (), maxProducersPerTopic + 2 );
29012917 try {
29022918 @ Cleanup
29032919 Producer <byte []> producer1 = pulsarClient .newProducer ().topic (topic ).create ();
29042920 fail ("should fail" );
29052921 } catch (PulsarClientException e ) {
29062922 String expectMsg = "Topic '" + topic + "' reached max producers limit" ;
29072923 assertTrue (e .getMessage ().contains (expectMsg ));
2924+ assertEquals (schemaOpsCounter .get (), maxProducersPerTopic + 2 );
29082925 }
29092926
29102927 //clean up
2911- for (Producer <byte [] > tempProducer : producers ) {
2928+ for (Producer <String > tempProducer : producers ) {
29122929 tempProducer .close ();
29132930 }
29142931 }
29152932
2933+ private AtomicInteger injectSchemaCheckCounterForTopic (String topicName ) {
2934+ ConcurrentOpenHashMap <String , CompletableFuture <Optional <Topic >>> topics =
2935+ WhiteboxImpl .getInternalState (pulsar .getBrokerService (), "topics" );
2936+ AbstractTopic topic = (AbstractTopic ) topics .get (topicName ).join ().get ();
2937+ AbstractTopic spyTopic = Mockito .spy (topic );
2938+ AtomicInteger counter = new AtomicInteger ();
2939+ doAnswer (new Answer () {
2940+ @ Override
2941+ public Object answer (InvocationOnMock invocation ) throws Throwable {
2942+ counter .incrementAndGet ();
2943+ return invocation .callRealMethod ();
2944+ }
2945+ }).when (spyTopic ).addSchema (any (SchemaData .class ));
2946+ doAnswer (new Answer () {
2947+ @ Override
2948+ public Object answer (InvocationOnMock invocation ) throws Throwable {
2949+ counter .incrementAndGet ();
2950+ return invocation .callRealMethod ();
2951+ }
2952+ }).when (spyTopic ).addSchemaIfIdleOrCheckCompatible (any (SchemaData .class ));
2953+ topics .put (topicName , CompletableFuture .completedFuture (Optional .of (spyTopic )));
2954+ return counter ;
2955+ }
2956+
29162957 @ Test
29172958 public void testMaxConsumersPerTopicUnlimited () throws Exception {
29182959 restartClusterAfterTest ();
@@ -2924,49 +2965,55 @@ public void testMaxConsumersPerTopicUnlimited() throws Exception {
29242965 final String myNamespace = newUniqueName (defaultTenant + "/ns" );
29252966 admin .namespaces ().createNamespace (myNamespace , Set .of ("test" ));
29262967 final String topic = "persistent://" + myNamespace + "/testMaxConsumersPerTopicUnlimited" ;
2968+ admin .topics ().createNonPartitionedTopic (topic );
2969+ AtomicInteger schemaOpsCounter = injectSchemaCheckCounterForTopic (topic );
29272970
29282971 assertNull (admin .namespaces ().getMaxConsumersPerTopic (myNamespace ));
29292972 //the policy is set to 0, so there will be no restrictions
29302973 admin .namespaces ().setMaxConsumersPerTopic (myNamespace , 0 );
29312974 Awaitility .await ().until (()
29322975 -> admin .namespaces ().getMaxConsumersPerTopic (myNamespace ) == 0 );
2933- List <Consumer <byte [] >> consumers = new ArrayList <>();
2976+ List <Consumer <String >> consumers = new ArrayList <>();
29342977 for (int i = 0 ; i < maxConsumersPerTopic + 1 ; i ++) {
2935- Consumer <byte [] > consumer =
2936- pulsarClient . newConsumer () .subscriptionName (UUID .randomUUID ().toString ()).topic (topic ).subscribe ();
2978+ Consumer <String > consumer = pulsarClient . newConsumer ( Schema . STRING )
2979+ .subscriptionName (UUID .randomUUID ().toString ()).topic (topic ).subscribe ();
29372980 consumers .add (consumer );
29382981 }
2982+ assertEquals (schemaOpsCounter .get (), maxConsumersPerTopic + 2 );
29392983
29402984 admin .namespaces ().removeMaxConsumersPerTopic (myNamespace );
29412985 Awaitility .await ().until (()
29422986 -> admin .namespaces ().getMaxConsumersPerTopic (myNamespace ) == null );
29432987 try {
29442988 @ Cleanup
2945- Consumer <byte [] > subscribe =
2946- pulsarClient . newConsumer () .subscriptionName (UUID .randomUUID ().toString ()).topic (topic ).subscribe ();
2989+ Consumer <String > subscribe = pulsarClient . newConsumer ( Schema . STRING )
2990+ .subscriptionName (UUID .randomUUID ().toString ()).topic (topic ).subscribe ();
29472991 fail ("should fail" );
29482992 } catch (PulsarClientException e ) {
29492993 assertTrue (e .getMessage ().contains ("Topic reached max consumers limit" ));
2994+ assertEquals (schemaOpsCounter .get (), maxConsumersPerTopic + 2 );
29502995 }
29512996 //set the limit to 3
29522997 admin .namespaces ().setMaxConsumersPerTopic (myNamespace , 3 );
29532998 Awaitility .await ().until (()
29542999 -> admin .namespaces ().getMaxConsumersPerTopic (myNamespace ) == 3 );
29553000 // should success
2956- Consumer <byte [] > consumer =
2957- pulsarClient . newConsumer () .subscriptionName (UUID .randomUUID ().toString ()).topic (topic ).subscribe ();
3001+ Consumer <String > consumer = pulsarClient . newConsumer ( Schema . STRING )
3002+ .subscriptionName (UUID .randomUUID ().toString ()).topic (topic ).subscribe ();
29583003 consumers .add (consumer );
3004+ assertEquals (schemaOpsCounter .get (), maxConsumersPerTopic + 3 );
29593005 try {
29603006 @ Cleanup
2961- Consumer <byte [] > subscribe =
2962- pulsarClient . newConsumer () .subscriptionName (UUID .randomUUID ().toString ()).topic (topic ).subscribe ();
3007+ Consumer <String > subscribe = pulsarClient . newConsumer ( Schema . STRING )
3008+ .subscriptionName (UUID .randomUUID ().toString ()).topic (topic ).subscribe ();
29633009 fail ("should fail" );
29643010 } catch (PulsarClientException e ) {
29653011 assertTrue (e .getMessage ().contains ("Topic reached max consumers limit" ));
3012+ assertEquals (schemaOpsCounter .get (), maxConsumersPerTopic + 3 );
29663013 }
29673014
29683015 //clean up
2969- for (Consumer <byte [] > subConsumer : consumers ) {
3016+ for (Consumer <String > subConsumer : consumers ) {
29703017 subConsumer .close ();
29713018 }
29723019 }
0 commit comments