Skip to content

Commit db25438

Browse files
authored
[Broker] Check allowAutoSubscriptionCreation when creating init sub (#14458)
Master Issue: #13408 ### Motivation In #13355, we added support for creating initial subscription when creating the producer. But the broker didn't check if the subscription can be created automatically. The initial subscription will be created even if the `allowAutoSubscriptionCreation` is disabled.
1 parent 5235ff9 commit db25438

File tree

6 files changed

+46
-2
lines changed

6 files changed

+46
-2
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1273,6 +1273,20 @@ protected void handleProducer(final CommandProducer cmdProducer) {
12731273
if (!Strings.isNullOrEmpty(initialSubscriptionName)
12741274
&& topic.isPersistent()
12751275
&& !topic.getSubscriptions().containsKey(initialSubscriptionName)) {
1276+
if (!this.getBrokerService().isAllowAutoSubscriptionCreation(topicName)) {
1277+
String msg =
1278+
"Could not create the initial subscription due to the auto subscription "
1279+
+ "creation is not allowed.";
1280+
if (producerFuture.completeExceptionally(
1281+
new BrokerServiceException.NotAllowedException(msg))) {
1282+
log.warn("[{}] {} initialSubscriptionName: {}, topic: {}",
1283+
remoteAddress, msg, initialSubscriptionName, topicName);
1284+
commandSender.sendErrorResponse(requestId,
1285+
ServerError.NotAllowedError, msg);
1286+
}
1287+
producers.remove(producerId, producerFuture);
1288+
return;
1289+
}
12761290
createInitSubFuture =
12771291
topic.createSubscription(initialSubscriptionName, InitialPosition.Earliest,
12781292
false);

pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.client.api;
2020

21+
import static org.testng.Assert.fail;
2122
import org.apache.pulsar.client.admin.PulsarAdminException;
2223
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
2324
import org.apache.pulsar.client.impl.ProducerImpl;
@@ -167,4 +168,27 @@ public void testCreateInitialSubscriptionWhenExisting() throws PulsarClientExcep
167168

168169
Assert.assertTrue(admin.topics().getSubscriptions(topic.toString()).contains(initialSubscriptionName));
169170
}
171+
172+
@Test
173+
public void testInitialSubscriptionCreationWithAutoCreationDisable()
174+
throws PulsarAdminException, PulsarClientException {
175+
pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false);
176+
177+
final TopicName topic =
178+
TopicName.get("persistent", "public", "default",
179+
"testInitialSubscriptionCreationWithAutoCreationDisable");
180+
final String initialSubscriptionName = "init-sub";
181+
admin.topics().createNonPartitionedTopic(topic.toString());
182+
try {
183+
Producer<byte[]> producer = ((ProducerBuilderImpl<byte[]>) pulsarClient.newProducer())
184+
.initialSubscriptionName(initialSubscriptionName)
185+
.topic(topic.toString())
186+
.create();
187+
fail("Should not pass");
188+
} catch (PulsarClientException.NotAllowedException exception) {
189+
// ok
190+
}
191+
192+
Assert.assertFalse(admin.topics().getSubscriptions(topic.toString()).contains(initialSubscriptionName));
193+
}
170194
}

pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DeadLetterPolicy.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ public class DeadLetterPolicy {
5252
/**
5353
* Name of the initial subscription name of the dead letter topic.
5454
* If this field is not set, the initial subscription for the dead letter topic will not be created.
55+
* If this field is set but the broker's `allowAutoSubscriptionCreation` is disabled, the DLQ producer will fail
56+
* to be created.
5557
*/
5658
private String initialSubscriptionName;
5759
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,9 @@ public ProducerBuilder<T> enableLazyStartPartitionedProducers(boolean lazyStartP
328328
/**
329329
* Use this config to automatically create an initial subscription when creating the topic.
330330
* If this field is not set, the initial subscription will not be created.
331-
* This method is limited to internal use
331+
* If this field is set but the broker's `allowAutoSubscriptionCreation` is disabled, the producer will fail to
332+
* be created.
333+
* This method is limited to internal use. This method will only be used when the consumer creates the dlq producer.
332334
*
333335
* @param initialSubscriptionName Name of the initial subscription of the topic.
334336
* @return the producer builder implementation instance

pulsar-common/src/main/proto/PulsarApi.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,8 @@ message CommandProducer {
497497

498498
// Name of the initial subscription of the topic.
499499
// If this field is not set, the initial subscription will not be created.
500+
// If this field is set but the broker's `allowAutoSubscriptionCreation`
501+
// is disabled, the producer will fail to be created.
500502
optional string initial_subscription_name = 13;
501503
}
502504

site2/docs/concepts-messaging.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
317317
318318
```
319319
320-
By default, there is no subscription during a DLQ topic creation. Without a just-in-time subscription to the DLQ topic, you may lose messages. To automatically create an initial subscription for the DLQ, you can specify the `initialSubscriptionName` parameter.
320+
By default, there is no subscription during a DLQ topic creation. Without a just-in-time subscription to the DLQ topic, you may lose messages. To automatically create an initial subscription for the DLQ, you can specify the `initialSubscriptionName` parameter. If this parameter is set but the broker's `allowAutoSubscriptionCreation` is disabled, the DLQ producer will fail to be created.
321321
322322
```java
323323
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)

0 commit comments

Comments
 (0)