Skip to content

Commit 0e88c94

Browse files
committed
[improve] [broker] Check max producers/consumers limitation first before other ops to save resources (#23074)
(cherry picked from commit 679a3d4)
1 parent 422ee7c commit 0e88c94

File tree

5 files changed

+181
-68
lines changed

5 files changed

+181
-68
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -486,8 +486,18 @@ private PublishRate publishRateInBroker(ServiceConfiguration config) {
486486
return new PublishRate(config.getMaxPublishRatePerTopicInMessages(), config.getMaxPublishRatePerTopicInBytes());
487487
}
488488

489+
public boolean isProducersExceeded(String producerName) {
490+
String replicatorPrefix = brokerService.getPulsar().getConfig().getReplicatorPrefix() + ".";
491+
boolean isRemote = producerName.startsWith(replicatorPrefix);
492+
return isProducersExceeded(isRemote);
493+
}
494+
489495
protected boolean isProducersExceeded(Producer producer) {
490-
if (isSystemTopic() || producer.isRemote()) {
496+
return isProducersExceeded(producer.isRemote());
497+
}
498+
499+
protected boolean isProducersExceeded(boolean isRemote) {
500+
if (isSystemTopic() || isRemote) {
491501
return false;
492502
}
493503
Integer maxProducers = topicPolicies.getMaxProducersPerTopic().get();
@@ -536,7 +546,7 @@ public int getNumberOfSameAddressProducers(final String clientAddress) {
536546
return count;
537547
}
538548

539-
protected boolean isConsumersExceededOnTopic() {
549+
public boolean isConsumersExceededOnTopic() {
540550
if (isSystemTopic()) {
541551
return false;
542552
}
@@ -965,12 +975,6 @@ protected void checkTopicFenced() throws BrokerServiceException {
965975
}
966976

967977
protected CompletableFuture<Void> internalAddProducer(Producer producer) {
968-
if (isProducersExceeded(producer)) {
969-
log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic);
970-
return CompletableFuture.failedFuture(new BrokerServiceException.ProducerBusyException(
971-
"Topic '" + topic + "' reached max producers limit"));
972-
}
973-
974978
if (isSameAddressProducersExceeded(producer)) {
975979
log.warn("[{}] Attempting to add producer to topic which reached max same address producers limit", topic);
976980
return CompletableFuture.failedFuture(new BrokerServiceException.ProducerBusyException(

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1306,6 +1306,16 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
13061306
"Topic " + topicName + " does not exist"));
13071307
}
13081308
final Topic topic = optTopic.get();
1309+
// Check max consumer limitation to avoid unnecessary ops wasting resources. For example:
1310+
// the new consumer reached max producer limitation, but pulsar did schema check first,
1311+
// it would waste CPU.
1312+
if (((AbstractTopic) topic).isConsumersExceededOnTopic()) {
1313+
log.warn("[{}] Attempting to add consumer to topic which reached max"
1314+
+ " consumers limit", topic);
1315+
Throwable t =
1316+
new ConsumerBusyException("Topic reached max consumers limit");
1317+
return FutureUtil.failedFuture(t);
1318+
}
13091319
return service.isAllowAutoSubscriptionCreationAsync(topicName)
13101320
.thenCompose(isAllowedAutoSubscriptionCreation -> {
13111321
boolean rejectSubscriptionIfDoesNotExist = isDurable
@@ -1544,6 +1554,15 @@ protected void handleProducer(final CommandProducer cmdProducer) {
15441554
}
15451555

15461556
service.getOrCreateTopic(topicName.toString()).thenCompose((Topic topic) -> {
1557+
// Check max producer limitation to avoid unnecessary ops wasting resources. For example: the new
1558+
// producer reached max producer limitation, but pulsar did schema check first, it would waste CPU
1559+
if (((AbstractTopic) topic).isProducersExceeded(producerName)) {
1560+
log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic);
1561+
String errorMsg = "Topic '" + topicName.toString() + "' reached max producers limit";
1562+
Throwable t = new BrokerServiceException.ProducerBusyException(errorMsg);
1563+
return CompletableFuture.failedFuture(t);
1564+
}
1565+
15471566
// Before creating producer, check if backlog quota exceeded
15481567
// on topic for size based limit and time based limit
15491568
CompletableFuture<Void> backlogQuotaCheckFuture = CompletableFuture.allOf(

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java

Lines changed: 62 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import static org.apache.commons.lang3.StringUtils.isBlank;
2323
import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
2424
import static org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH;
25+
import static org.mockito.ArgumentMatchers.any;
26+
import static org.mockito.Mockito.doAnswer;
2527
import static org.mockito.Mockito.spy;
2628
import static org.mockito.Mockito.times;
2729
import static org.mockito.Mockito.verify;
@@ -52,6 +54,7 @@
5254
import java.util.UUID;
5355
import java.util.concurrent.CompletableFuture;
5456
import java.util.concurrent.TimeUnit;
57+
import java.util.concurrent.atomic.AtomicInteger;
5558
import javax.ws.rs.NotAcceptableException;
5659
import javax.ws.rs.core.Response.Status;
5760
import lombok.AllArgsConstructor;
@@ -70,6 +73,7 @@
7073
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
7174
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
7275
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
76+
import org.apache.pulsar.broker.service.AbstractTopic;
7377
import org.apache.pulsar.broker.service.Topic;
7478
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
7579
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -127,7 +131,13 @@
127131
import org.apache.pulsar.common.policies.data.TopicStats;
128132
import org.apache.pulsar.common.policies.data.TopicType;
129133
import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
134+
import org.apache.pulsar.common.protocol.schema.SchemaData;
135+
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
130136
import org.awaitility.Awaitility;
137+
import org.awaitility.reflect.WhiteboxImpl;
138+
import org.mockito.Mockito;
139+
import org.mockito.invocation.InvocationOnMock;
140+
import org.mockito.stubbing.Answer;
131141
import org.testng.Assert;
132142
import org.testng.annotations.AfterClass;
133143
import org.testng.annotations.AfterMethod;
@@ -2870,49 +2880,80 @@ public void testMaxProducersPerTopicUnlimited() throws Exception {
28702880
final String myNamespace = newUniqueName(defaultTenant + "/ns");
28712881
admin.namespaces().createNamespace(myNamespace, Set.of("test"));
28722882
final String topic = "persistent://" + myNamespace + "/testMaxProducersPerTopicUnlimited";
2883+
admin.topics().createNonPartitionedTopic(topic);
2884+
AtomicInteger schemaOpsCounter = injectSchemaCheckCounterForTopic(topic);
28732885
//the policy is set to 0, so there will be no restrictions
28742886
admin.namespaces().setMaxProducersPerTopic(myNamespace, 0);
28752887
Awaitility.await().until(()
28762888
-> admin.namespaces().getMaxProducersPerTopic(myNamespace) == 0);
2877-
List<Producer<byte[]>> producers = new ArrayList<>();
2889+
List<Producer<String>> producers = new ArrayList<>();
28782890
for (int i = 0; i < maxProducersPerTopic + 1; i++) {
2879-
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
2891+
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
28802892
producers.add(producer);
28812893
}
2894+
assertEquals(schemaOpsCounter.get(), maxProducersPerTopic + 1);
28822895

28832896
admin.namespaces().removeMaxProducersPerTopic(myNamespace);
28842897
Awaitility.await().until(()
28852898
-> admin.namespaces().getMaxProducersPerTopic(myNamespace) == null);
2899+
28862900
try {
28872901
@Cleanup
2888-
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
2902+
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
28892903
fail("should fail");
28902904
} catch (PulsarClientException e) {
28912905
String expectMsg = "Topic '" + topic + "' reached max producers limit";
28922906
assertTrue(e.getMessage().contains(expectMsg));
2907+
assertEquals(schemaOpsCounter.get(), maxProducersPerTopic + 1);
28932908
}
28942909
//set the limit to 3
28952910
admin.namespaces().setMaxProducersPerTopic(myNamespace, 3);
28962911
Awaitility.await().until(()
28972912
-> admin.namespaces().getMaxProducersPerTopic(myNamespace) == 3);
28982913
// should success
2899-
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
2914+
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
29002915
producers.add(producer);
2916+
assertEquals(schemaOpsCounter.get(), maxProducersPerTopic + 2);
29012917
try {
29022918
@Cleanup
29032919
Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topic).create();
29042920
fail("should fail");
29052921
} catch (PulsarClientException e) {
29062922
String expectMsg = "Topic '" + topic + "' reached max producers limit";
29072923
assertTrue(e.getMessage().contains(expectMsg));
2924+
assertEquals(schemaOpsCounter.get(), maxProducersPerTopic + 2);
29082925
}
29092926

29102927
//clean up
2911-
for (Producer<byte[]> tempProducer : producers) {
2928+
for (Producer<String> tempProducer : producers) {
29122929
tempProducer.close();
29132930
}
29142931
}
29152932

2933+
private AtomicInteger injectSchemaCheckCounterForTopic(String topicName) {
2934+
ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topics =
2935+
WhiteboxImpl.getInternalState(pulsar.getBrokerService(), "topics");
2936+
AbstractTopic topic = (AbstractTopic) topics.get(topicName).join().get();
2937+
AbstractTopic spyTopic = Mockito.spy(topic);
2938+
AtomicInteger counter = new AtomicInteger();
2939+
doAnswer(new Answer() {
2940+
@Override
2941+
public Object answer(InvocationOnMock invocation) throws Throwable {
2942+
counter.incrementAndGet();
2943+
return invocation.callRealMethod();
2944+
}
2945+
}).when(spyTopic).addSchema(any(SchemaData.class));
2946+
doAnswer(new Answer() {
2947+
@Override
2948+
public Object answer(InvocationOnMock invocation) throws Throwable {
2949+
counter.incrementAndGet();
2950+
return invocation.callRealMethod();
2951+
}
2952+
}).when(spyTopic).addSchemaIfIdleOrCheckCompatible(any(SchemaData.class));
2953+
topics.put(topicName, CompletableFuture.completedFuture(Optional.of(spyTopic)));
2954+
return counter;
2955+
}
2956+
29162957
@Test
29172958
public void testMaxConsumersPerTopicUnlimited() throws Exception {
29182959
restartClusterAfterTest();
@@ -2924,49 +2965,55 @@ public void testMaxConsumersPerTopicUnlimited() throws Exception {
29242965
final String myNamespace = newUniqueName(defaultTenant + "/ns");
29252966
admin.namespaces().createNamespace(myNamespace, Set.of("test"));
29262967
final String topic = "persistent://" + myNamespace + "/testMaxConsumersPerTopicUnlimited";
2968+
admin.topics().createNonPartitionedTopic(topic);
2969+
AtomicInteger schemaOpsCounter = injectSchemaCheckCounterForTopic(topic);
29272970

29282971
assertNull(admin.namespaces().getMaxConsumersPerTopic(myNamespace));
29292972
//the policy is set to 0, so there will be no restrictions
29302973
admin.namespaces().setMaxConsumersPerTopic(myNamespace, 0);
29312974
Awaitility.await().until(()
29322975
-> admin.namespaces().getMaxConsumersPerTopic(myNamespace) == 0);
2933-
List<Consumer<byte[]>> consumers = new ArrayList<>();
2976+
List<Consumer<String>> consumers = new ArrayList<>();
29342977
for (int i = 0; i < maxConsumersPerTopic + 1; i++) {
2935-
Consumer<byte[]> consumer =
2936-
pulsarClient.newConsumer().subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
2978+
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
2979+
.subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
29372980
consumers.add(consumer);
29382981
}
2982+
assertEquals(schemaOpsCounter.get(), maxConsumersPerTopic + 2);
29392983

29402984
admin.namespaces().removeMaxConsumersPerTopic(myNamespace);
29412985
Awaitility.await().until(()
29422986
-> admin.namespaces().getMaxConsumersPerTopic(myNamespace) == null);
29432987
try {
29442988
@Cleanup
2945-
Consumer<byte[]> subscribe =
2946-
pulsarClient.newConsumer().subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
2989+
Consumer<String> subscribe = pulsarClient.newConsumer(Schema.STRING)
2990+
.subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
29472991
fail("should fail");
29482992
} catch (PulsarClientException e) {
29492993
assertTrue(e.getMessage().contains("Topic reached max consumers limit"));
2994+
assertEquals(schemaOpsCounter.get(), maxConsumersPerTopic + 2);
29502995
}
29512996
//set the limit to 3
29522997
admin.namespaces().setMaxConsumersPerTopic(myNamespace, 3);
29532998
Awaitility.await().until(()
29542999
-> admin.namespaces().getMaxConsumersPerTopic(myNamespace) == 3);
29553000
// should success
2956-
Consumer<byte[]> consumer =
2957-
pulsarClient.newConsumer().subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
3001+
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
3002+
.subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
29583003
consumers.add(consumer);
3004+
assertEquals(schemaOpsCounter.get(), maxConsumersPerTopic + 3);
29593005
try {
29603006
@Cleanup
2961-
Consumer<byte[]> subscribe =
2962-
pulsarClient.newConsumer().subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
3007+
Consumer<String> subscribe = pulsarClient.newConsumer(Schema.STRING)
3008+
.subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe();
29633009
fail("should fail");
29643010
} catch (PulsarClientException e) {
29653011
assertTrue(e.getMessage().contains("Topic reached max consumers limit"));
3012+
assertEquals(schemaOpsCounter.get(), maxConsumersPerTopic + 3);
29663013
}
29673014

29683015
//clean up
2969-
for (Consumer<byte[]> subConsumer : consumers) {
3016+
for (Consumer<String> subConsumer : consumers) {
29703017
subConsumer.close();
29713018
}
29723019
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java

Lines changed: 0 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -509,51 +509,6 @@ public void testProducerOverwrite() {
509509
topic.getProducers().values().forEach(producer -> Assert.assertEquals(producer.getEpoch(), 3));
510510
}
511511

512-
private void testMaxProducers() {
513-
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
514-
topic.initialize().join();
515-
String role = "appid1";
516-
// 1. add producer1
517-
Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name1", role,
518-
false, null, SchemaVersion.Latest, 0, false, ProducerAccessMode.Shared, Optional.empty(), true);
519-
topic.addProducer(producer, new CompletableFuture<>());
520-
assertEquals(topic.getProducers().size(), 1);
521-
522-
// 2. add producer2
523-
Producer producer2 = new Producer(topic, serverCnx, 2 /* producer id */, "prod-name2", role,
524-
false, null, SchemaVersion.Latest, 0, false, ProducerAccessMode.Shared, Optional.empty(), true);
525-
topic.addProducer(producer2, new CompletableFuture<>());
526-
assertEquals(topic.getProducers().size(), 2);
527-
528-
// 3. add producer3 but reached maxProducersPerTopic
529-
try {
530-
Producer producer3 = new Producer(topic, serverCnx, 3 /* producer id */, "prod-name3", role,
531-
false, null, SchemaVersion.Latest, 0, false, ProducerAccessMode.Shared, Optional.empty(), true);
532-
topic.addProducer(producer3, new CompletableFuture<>()).join();
533-
fail("should have failed");
534-
} catch (Exception e) {
535-
assertEquals(e.getCause().getClass(), BrokerServiceException.ProducerBusyException.class);
536-
}
537-
}
538-
539-
@Test
540-
public void testMaxProducersForBroker() {
541-
// set max clients
542-
pulsarTestContext.getConfig().setMaxProducersPerTopic(2);
543-
testMaxProducers();
544-
}
545-
546-
@Test
547-
public void testMaxProducersForNamespace() throws Exception {
548-
// set max clients
549-
Policies policies = new Policies();
550-
policies.max_producers_per_topic = 2;
551-
pulsarTestContext.getPulsarResources().getNamespaceResources()
552-
.createPolicies(TopicName.get(successTopicName).getNamespaceObject(),
553-
policies);
554-
testMaxProducers();
555-
}
556-
557512
private Producer getMockedProducerWithSpecificAddress(Topic topic, long producerId, InetAddress address) {
558513
final String producerNameBase = "producer";
559514
final String role = "appid1";

0 commit comments

Comments
 (0)