Skip to content

Commit 86ef0a0

Browse files
liangyepianzhoulhotari
authored andcommitted
[fix][broker] fix wrong method name checkTopicExists. (#24293)
The current method `checkTopicExists` is an asynchronous method but follows a synchronous naming convention (lacking the `Async` suffix). This naming inconsistency can mislead developers into assuming it's a blocking operation, potentially causing misuse in client code. Since this method is `public`, we cannot remove it directly without breaking backward compatibility. 1. **Introduce a new asynchronous method**: - Added `checkTopicExistsAsync()` with the correct asynchronous naming convention. - Internally delegates to the original `checkTopicExists()` method to retain existing logic. 2. **Deprecate the original method**: - Marked `checkTopicExists()` as `@Deprecated` with a note directing users to the new `checkTopicExistsAsync()`. 3. **Refactor internal usages**: - Updated all internal calls to use `checkTopicExistsAsync()` instead of the deprecated method. 4. **Documentation updates**: - Added Javadoc to `checkTopicExists()` clarifying its deprecated status and replacement. This approach maintains backward compatibility while aligning method names with their asynchronous behavior. (cherry picked from commit af24849)
1 parent e3ed8fc commit 86ef0a0

File tree

10 files changed

+28
-15
lines changed

10 files changed

+28
-15
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -983,7 +983,7 @@ protected void validateOffloadPolicies(OffloadPoliciesImpl offloadPolicies) {
983983
}
984984

985985
protected CompletableFuture<Void> internalCheckTopicExists(TopicName topicName) {
986-
return pulsar().getNamespaceService().checkTopicExists(topicName)
986+
return pulsar().getNamespaceService().checkTopicExistsAsync(topicName)
987987
.thenAccept(info -> {
988988
boolean exists = info.isExists();
989989
info.recycle();

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -740,7 +740,7 @@ public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx)
740740
}
741741

742742
protected CompletableFuture<Void> internalCheckTopicExists(TopicName topicName) {
743-
return pulsar().getNamespaceService().checkTopicExists(topicName)
743+
return pulsar().getNamespaceService().checkTopicExistsAsync(topicName)
744744
.thenAccept(info -> {
745745
boolean exists = info.isExists();
746746
info.recycle();
@@ -5412,7 +5412,7 @@ protected CompletableFuture<Void> validateShadowTopics(List<String> shadowTopics
54125412
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
54135413
"Only persistent topic can be set as shadow topic"));
54145414
}
5415-
futures.add(pulsar().getNamespaceService().checkTopicExists(shadowTopicName)
5415+
futures.add(pulsar().getNamespaceService().checkTopicExistsAsync(shadowTopicName)
54165416
.thenAccept(info -> {
54175417
boolean exists = info.isExists();
54185418
info.recycle();

pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ protected CompletableFuture<LookupData> internalLookupTopicAsync(final TopicName
7777
return CompletableFuture.completedFuture(true);
7878
}
7979
// Case-2: Persistent topic.
80-
return pulsar().getNamespaceService().checkTopicExists(topicName).thenCompose(info -> {
80+
return pulsar().getNamespaceService().checkTopicExistsAsync(topicName).thenCompose(info -> {
8181
boolean exists = info.isExists();
8282
info.recycle();
8383
if (exists) {

pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1409,8 +1409,21 @@ public CompletableFuture<List<String>> getOwnedTopicListForNamespaceBundle(Names
14091409
}
14101410

14111411
/***
1412-
* Check topic exists( partitioned or non-partitioned ).
1412+
* Checks whether the topic exists( partitioned or non-partitioned ).
14131413
*/
1414+
public CompletableFuture<TopicExistsInfo> checkTopicExistsAsync(TopicName topic) {
1415+
return checkTopicExists(topic);
1416+
}
1417+
1418+
/**
1419+
* Checks whether the topic exists( partitioned or non-partitioned ).
1420+
*
1421+
* @deprecated This method uses a misleading synchronous name for an asynchronous operation.
1422+
* Use {@link #checkTopicExistsAsync(TopicName topic)} instead.
1423+
*
1424+
* @see #checkTopicExistsAsync(TopicName topic)
1425+
*/
1426+
@Deprecated
14141427
public CompletableFuture<TopicExistsInfo> checkTopicExists(TopicName topic) {
14151428
return pulsar.getBrokerService()
14161429
.fetchPartitionedTopicMetadataAsync(TopicName.get(topic.toString()))

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3229,7 +3229,7 @@ public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadata
32293229
if (pulsar.getNamespaceService() == null) {
32303230
return FutureUtil.failedFuture(new NamingException("namespace service is not ready"));
32313231
}
3232-
return pulsar.getNamespaceService().checkTopicExists(topicName).thenComposeAsync(topicExistsInfo -> {
3232+
return pulsar.getNamespaceService().checkTopicExistsAsync(topicName).thenComposeAsync(topicExistsInfo -> {
32333233
final boolean topicExists = topicExistsInfo.isExists();
32343234
final TopicType topicType = topicExistsInfo.getTopicType();
32353235
final Integer partitions = topicExistsInfo.getPartitions();

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -638,7 +638,7 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
638638
&& brokerAllowAutoCreate;
639639
if (!autoCreateIfNotExist) {
640640
NamespaceService namespaceService = getBrokerService().getPulsar().getNamespaceService();
641-
namespaceService.checkTopicExists(topicName).thenAccept(topicExistsInfo -> {
641+
namespaceService.checkTopicExistsAsync(topicName).thenAccept(topicExistsInfo -> {
642642
lookupSemaphore.release();
643643
if (!topicExistsInfo.isExists()) {
644644
writeAndFlush(Commands.newPartitionMetadataResponse(

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ public void testLookUpWithException() throws Exception {
363363
CompletableFuture existFuture = new CompletableFuture();
364364
existFuture.complete(TopicExistsInfo.newNonPartitionedTopicExists());
365365
doReturn(future).when(nameSpaceService).getBrokerServiceUrlAsync(any(), any());
366-
doReturn(existFuture).when(nameSpaceService).checkTopicExists(any());
366+
doReturn(existFuture).when(nameSpaceService).checkTopicExistsAsync(any());
367367
CompletableFuture existBooleanFuture = new CompletableFuture();
368368
existBooleanFuture.complete(false);
369369
doReturn(existBooleanFuture).when(nameSpaceService).checkNonPartitionedTopicExists(any());
@@ -388,7 +388,7 @@ public void testLookUpTopicNotExist() throws Exception {
388388
existFuture.complete(TopicExistsInfo.newTopicNotExists());
389389
CompletableFuture existBooleanFuture = new CompletableFuture();
390390
existBooleanFuture.complete(false);
391-
doReturn(existFuture).when(nameSpaceService).checkTopicExists(any());
391+
doReturn(existFuture).when(nameSpaceService).checkTopicExistsAsync(any());
392392
doReturn(existBooleanFuture).when(nameSpaceService).checkNonPartitionedTopicExists(any());
393393
doReturn(nameSpaceService).when(pulsar).getNamespaceService();
394394
AsyncResponse asyncResponse = mock(AsyncResponse.class);

pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ public void testLookupTopicNotExist() throws Exception {
152152
NamespaceService namespaceService = pulsar.getNamespaceService();
153153
CompletableFuture<TopicExistsInfo> future = new CompletableFuture<>();
154154
future.complete(TopicExistsInfo.newTopicNotExists());
155-
doReturn(future).when(namespaceService).checkTopicExists(any(TopicName.class));
155+
doReturn(future).when(namespaceService).checkTopicExistsAsync(any(TopicName.class));
156156
CompletableFuture<Boolean> booleanFuture = new CompletableFuture<>();
157157
booleanFuture.complete(false);
158158
doReturn(booleanFuture).when(namespaceService).checkNonPartitionedTopicExists(any(TopicName.class));
@@ -266,7 +266,7 @@ public void testValidateReplicationSettingsOnNamespace() throws Exception {
266266
NamespaceService namespaceService = pulsar.getNamespaceService();
267267
CompletableFuture<TopicExistsInfo> future = new CompletableFuture<>();
268268
future.complete(TopicExistsInfo.newTopicNotExists());
269-
doReturn(future).when(namespaceService).checkTopicExists(any(TopicName.class));
269+
doReturn(future).when(namespaceService).checkTopicExistsAsync(any(TopicName.class));
270270
CompletableFuture<Boolean> booleanFuture = new CompletableFuture<>();
271271
booleanFuture.complete(false);
272272
doReturn(future).when(namespaceService).checkNonPartitionedTopicExists(any(TopicName.class));
@@ -303,7 +303,7 @@ public void topicNotFound() throws Exception {
303303
NamespaceService namespaceService = pulsar.getNamespaceService();
304304
CompletableFuture<TopicExistsInfo> future = new CompletableFuture<>();
305305
future.complete(TopicExistsInfo.newTopicNotExists());
306-
doReturn(future).when(namespaceService).checkTopicExists(any(TopicName.class));
306+
doReturn(future).when(namespaceService).checkTopicExistsAsync(any(TopicName.class));
307307

308308
// Get the current semaphore first
309309
Integer state1 = pulsar.getBrokerService().getLookupRequestSemaphore().availablePermits();

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ public void testRemoveCluster() throws Exception {
220220
assertFalse(tps.containsKey(topicP1));
221221
assertFalse(tps.containsKey(topicChangeEvents));
222222
assertFalse(pulsar1.getNamespaceService()
223-
.checkTopicExists(TopicName.get(topicChangeEvents))
223+
.checkTopicExistsAsync(TopicName.get(topicChangeEvents))
224224
.get(5, TimeUnit.SECONDS).isExists());
225225
// Verify: schema will be removed in local cluster, and remote cluster will not.
226226
List<CompletableFuture<StoredSchema>> schemaList13

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -211,10 +211,10 @@ public void testRemoveCluster() throws Exception {
211211
Map<String, CompletableFuture<Optional<Topic>>> tps = pulsar1.getBrokerService().getTopics();
212212
assertFalse(tps.containsKey(topic));
213213
assertFalse(tps.containsKey(topicChangeEvents));
214-
assertFalse(pulsar1.getNamespaceService().checkTopicExists(TopicName.get(topic))
214+
assertFalse(pulsar1.getNamespaceService().checkTopicExistsAsync(TopicName.get(topic))
215215
.get(5, TimeUnit.SECONDS).isExists());
216216
assertFalse(pulsar1.getNamespaceService()
217-
.checkTopicExists(TopicName.get(topicChangeEvents))
217+
.checkTopicExistsAsync(TopicName.get(topicChangeEvents))
218218
.get(5, TimeUnit.SECONDS).isExists());
219219
});
220220

0 commit comments

Comments
 (0)