Skip to content

Commit 19ccfd5

Browse files
Fix create partitioned topic with a substring of an existing topic name. (#6478)
Fixes #6468 Fix create a partitioned topic with a substring of an existing topic name. And make create partitioned topic async.
1 parent f2ec1b4 commit 19ccfd5

File tree

10 files changed

+227
-175
lines changed

10 files changed

+227
-175
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java

Lines changed: 109 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
import java.net.MalformedURLException;
2929
import java.net.URI;
30+
import java.util.ArrayList;
3031
import java.util.List;
3132
import java.util.Set;
3233
import java.util.concurrent.CompletableFuture;
@@ -36,6 +37,7 @@
3637

3738
import javax.servlet.ServletContext;
3839
import javax.ws.rs.WebApplicationException;
40+
import javax.ws.rs.container.AsyncResponse;
3941
import javax.ws.rs.core.Response;
4042
import javax.ws.rs.core.Response.Status;
4143
import javax.ws.rs.core.UriBuilder;
@@ -46,6 +48,7 @@
4648
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
4749
import org.apache.pulsar.broker.web.PulsarWebResource;
4850
import org.apache.pulsar.broker.web.RestException;
51+
import org.apache.pulsar.common.api.proto.PulsarApi;
4952
import org.apache.pulsar.common.naming.Constants;
5053
import org.apache.pulsar.common.naming.NamespaceBundle;
5154
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
@@ -255,35 +258,42 @@ protected List<String> getListOfNamespaces(String property) throws Exception {
255258
return namespaces;
256259
}
257260

258-
protected void tryCreatePartitionsAsync(int numPartitions) {
261+
protected CompletableFuture<Void> tryCreatePartitionsAsync(int numPartitions) {
259262
if (!topicName.isPersistent()) {
260-
return;
263+
return CompletableFuture.completedFuture(null);
261264
}
265+
List<CompletableFuture<Void>> futures = new ArrayList<>(numPartitions);
262266
for (int i = 0; i < numPartitions; i++) {
263-
tryCreatePartitionAsync(i);
267+
futures.add(tryCreatePartitionAsync(i, null));
264268
}
269+
return FutureUtil.waitForAll(futures);
265270
}
266271

267-
private void tryCreatePartitionAsync(final int partition) {
272+
private CompletableFuture<Void> tryCreatePartitionAsync(final int partition, CompletableFuture<Void> reuseFuture) {
273+
CompletableFuture<Void> result = reuseFuture == null ? new CompletableFuture<>() : reuseFuture;
268274
zkCreateOptimisticAsync(localZk(), ZkAdminPaths.managedLedgerPath(topicName.getPartition(partition)), new byte[0],
269275
(rc, s, o, s1) -> {
270276
if (KeeperException.Code.OK.intValue() == rc) {
271277
if (log.isDebugEnabled()) {
272278
log.debug("[{}] Topic partition {} created.", clientAppId(),
273279
topicName.getPartition(partition));
274280
}
281+
result.complete(null);
275282
} else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
276283
log.info("[{}] Topic partition {} is exists, doing nothing.", clientAppId(),
277284
topicName.getPartition(partition));
285+
result.completeExceptionally(KeeperException.create(KeeperException.Code.NODEEXISTS));
278286
} else if (KeeperException.Code.BADVERSION.intValue() == rc) {
279287
log.warn("[{}] Fail to create topic partition {} with concurrent modification, retry now.",
280288
clientAppId(), topicName.getPartition(partition));
281-
tryCreatePartitionAsync(partition);
289+
tryCreatePartitionAsync(partition, result);
282290
} else {
283291
log.error("[{}] Fail to create topic partition {}", clientAppId(),
284292
topicName.getPartition(partition), KeeperException.create(KeeperException.Code.get(rc)));
293+
result.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
285294
}
286295
});
296+
return result;
287297
}
288298

289299
protected NamespaceName namespaceName;
@@ -707,4 +717,98 @@ protected List<String> getPartitionedTopicList(TopicDomain topicDomain) {
707717
partitionedTopics.sort(null);
708718
return partitionedTopics;
709719
}
720+
721+
protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int numPartitions) {
722+
try {
723+
validateAdminAccessForTenant(topicName.getTenant());
724+
} catch (Exception e) {
725+
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
726+
resumeAsyncResponseExceptionally(asyncResponse, e);
727+
return;
728+
}
729+
if (numPartitions <= 0) {
730+
asyncResponse.resume(new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0"));
731+
return;
732+
}
733+
checkTopicExistsAsync(topicName).thenAccept(exists -> {
734+
if (exists) {
735+
log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
736+
asyncResponse.resume(new RestException(Status.CONFLICT, "This topic already exists"));
737+
} else {
738+
try {
739+
String path = ZkAdminPaths.partitionedTopicPath(topicName);
740+
byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
741+
zkCreateOptimisticAsync(globalZk(), path, data, (rc, s, o, s1) -> {
742+
if (KeeperException.Code.OK.intValue() == rc) {
743+
globalZk().sync(path, (rc2, s2, ctx) -> {
744+
if (KeeperException.Code.OK.intValue() == rc2) {
745+
log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
746+
tryCreatePartitionsAsync(numPartitions).thenAccept(v -> {
747+
log.info("[{}] Successfully created partitions for topic {}", clientAppId(), topicName);
748+
asyncResponse.resume(Response.noContent().build());
749+
}).exceptionally(e -> {
750+
log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName);
751+
// The partitioned topic is created but there are some partitions create failed
752+
asyncResponse.resume(new RestException(e));
753+
return null;
754+
});
755+
} else {
756+
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, KeeperException.create(KeeperException.Code.get(rc2)));
757+
asyncResponse.resume(new RestException(KeeperException.create(KeeperException.Code.get(rc2))));
758+
}
759+
}, null);
760+
} else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
761+
log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName);
762+
asyncResponse.resume(new RestException(Status.CONFLICT, "Partitioned topic already exists"));
763+
} else if (KeeperException.Code.BADVERSION.intValue() == rc) {
764+
log.warn("[{}] Failed to create partitioned topic {}: concurrent modification", clientAppId(),
765+
topicName);
766+
asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
767+
} else {
768+
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, KeeperException.create(KeeperException.Code.get(rc)));
769+
asyncResponse.resume(new RestException(KeeperException.create(KeeperException.Code.get(rc))));
770+
}
771+
});
772+
} catch (Exception e) {
773+
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
774+
resumeAsyncResponseExceptionally(asyncResponse, e);
775+
}
776+
}
777+
}).exceptionally(ex -> {
778+
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, ex);
779+
resumeAsyncResponseExceptionally(asyncResponse, ex);
780+
return null;
781+
});
782+
}
783+
784+
/**
785+
* Check the exists topics contains the given topic.
786+
* Since there are topic partitions and non-partitioned topics in Pulsar, must ensure both partitions
787+
* and non-partitioned topics are not duplicated. So, if compare with a partition name, we should compare
788+
* to the partitioned name of this partition.
789+
*
790+
* @param topicName given topic name
791+
*/
792+
protected CompletableFuture<Boolean> checkTopicExistsAsync(TopicName topicName) {
793+
return pulsar().getNamespaceService().getListOfTopics(topicName.getNamespaceObject(),
794+
PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
795+
.thenCompose(topics -> {
796+
boolean exists = false;
797+
for (String topic : topics) {
798+
if (topicName.getPartitionedTopicName().equals(TopicName.get(topic).getPartitionedTopicName())) {
799+
exists = true;
800+
break;
801+
}
802+
}
803+
return CompletableFuture.completedFuture(exists);
804+
});
805+
}
806+
807+
protected void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable throwable) {
808+
if (throwable instanceof WebApplicationException) {
809+
asyncResponse.resume(throwable);
810+
} else {
811+
asyncResponse.resume(new RestException(throwable));
812+
}
813+
}
710814
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java

Lines changed: 17 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
import static com.google.common.base.Preconditions.checkNotNull;
2222
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
23-
import org.apache.pulsar.common.api.proto.PulsarApi;
23+
2424
import static org.apache.pulsar.common.util.Codec.decode;
2525

2626
import com.github.zafarkhaja.semver.Version;
@@ -390,46 +390,6 @@ protected void internalRevokePermissionsOnTopic(String role) {
390390
revokePermissions(topicName.toString(), role);
391391
}
392392

393-
protected void internalCreatePartitionedTopic(int numPartitions) {
394-
validateAdminAccessForTenant(topicName.getTenant());
395-
if (numPartitions <= 0) {
396-
throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
397-
}
398-
validatePartitionTopicName(topicName.getLocalName());
399-
try {
400-
boolean topicExist = pulsar().getNamespaceService()
401-
.getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
402-
.join()
403-
.contains(topicName.toString());
404-
if (topicExist) {
405-
log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
406-
throw new RestException(Status.CONFLICT, "This topic already exists");
407-
}
408-
} catch (Exception e) {
409-
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
410-
throw new RestException(e);
411-
}
412-
try {
413-
String path = ZkAdminPaths.partitionedTopicPath(topicName);
414-
byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
415-
zkCreateOptimistic(path, data);
416-
tryCreatePartitionsAsync(numPartitions);
417-
// Sync data to all quorums and the observers
418-
zkSync(path);
419-
log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
420-
} catch (KeeperException.NodeExistsException e) {
421-
log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName);
422-
throw new RestException(Status.CONFLICT, "Partitioned topic already exists");
423-
} catch (KeeperException.BadVersionException e) {
424-
log.warn("[{}] Failed to create partitioned topic {}: concurrent modification", clientAppId(),
425-
topicName);
426-
throw new RestException(Status.CONFLICT, "Concurrent modification");
427-
} catch (Exception e) {
428-
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
429-
throw new RestException(e);
430-
}
431-
}
432-
433393
protected void internalCreateNonPartitionedTopic(boolean authoritative) {
434394
validateAdminAccessForTenant(topicName.getTenant());
435395
validateNonPartitionTopicName(topicName.getLocalName());
@@ -540,11 +500,22 @@ protected void internalUpdatePartitionedTopic(int numPartitions, boolean updateL
540500
}
541501
}
542502

543-
protected void internalCreateMissedPartitions() {
544-
PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(topicName, false, false);
545-
if (metadata != null) {
546-
tryCreatePartitionsAsync(metadata.partitions);
547-
}
503+
protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) {
504+
getPartitionedTopicMetadataAsync(topicName, false, false).thenAccept(metadata -> {
505+
if (metadata != null) {
506+
tryCreatePartitionsAsync(metadata.partitions).thenAccept(v -> {
507+
asyncResponse.resume(Response.noContent().build());
508+
}).exceptionally(e -> {
509+
log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName);
510+
resumeAsyncResponseExceptionally(asyncResponse, e);
511+
return null;
512+
});
513+
}
514+
}).exceptionally(e -> {
515+
log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName);
516+
resumeAsyncResponseExceptionally(asyncResponse, e);
517+
return null;
518+
});
548519
}
549520

550521
private CompletableFuture<Void> updatePartitionInOtherCluster(int numPartitions, Set<String> clusters) {
@@ -2071,40 +2042,6 @@ private void validatePartitionTopicUpdate(String topicName, int numberOfPartitio
20712042
}
20722043
}
20732044

2074-
/**
2075-
* Validate partitioned topic name.
2076-
* Validation will fail and throw RestException if
2077-
* 1) There's already a partitioned topic with same topic name and have some of its partition created.
2078-
* 2) There's already non partition topic with same name and contains partition suffix "-partition-"
2079-
* followed by numeric value. In this case internal created partition of partitioned topic could override
2080-
* the existing non partition topic.
2081-
*
2082-
* @param topicName
2083-
*/
2084-
private void validatePartitionTopicName(String topicName) {
2085-
List<String> existingTopicList = internalGetList();
2086-
String prefix = topicName + TopicName.PARTITIONED_TOPIC_SUFFIX;
2087-
for (String existingTopicName : existingTopicList) {
2088-
if (existingTopicName.contains(prefix)) {
2089-
try {
2090-
Long.parseLong(existingTopicName.substring(
2091-
existingTopicName.indexOf(TopicName.PARTITIONED_TOPIC_SUFFIX)
2092-
+ TopicName.PARTITIONED_TOPIC_SUFFIX.length()));
2093-
log.warn("[{}] Already have topic {} which contains partition " +
2094-
"suffix '-partition-' and end with numeric value. Creation of partitioned topic {}"
2095-
+ "could cause conflict.", clientAppId(), existingTopicName, topicName);
2096-
throw new RestException(Status.PRECONDITION_FAILED,
2097-
"Already have topic " + existingTopicName + " which contains partition suffix '-partition-' " +
2098-
"and end with numeric value, Creation of partitioned topic " + topicName +
2099-
" could cause conflict.");
2100-
} catch (NumberFormatException e) {
2101-
// Do nothing, if value after partition suffix is not pure numeric value,
2102-
// as it can't conflict with internal created partitioned topic's name.
2103-
}
2104-
}
2105-
}
2106-
}
2107-
21082045
/**
21092046
* Validate non partition topic name,
21102047
* Validation will fail and throw RestException if

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java

Lines changed: 4 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -124,41 +124,15 @@ public PersistentTopicInternalStats getInternalStats(@PathParam("property") Stri
124124
@ApiOperation(hidden = true, value = "Create a partitioned topic.", notes = "It needs to be called before creating a producer on a partitioned topic.")
125125
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
126126
@ApiResponse(code = 409, message = "Partitioned topic already exist") })
127-
public void createPartitionedTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
127+
public void createPartitionedTopic(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, @PathParam("cluster") String cluster,
128128
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
129129
int numPartitions) {
130-
validateTopicName(property, cluster, namespace, encodedTopic);
131-
validateAdminAccessForTenant(topicName.getTenant());
132-
if (numPartitions <= 0) {
133-
throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
134-
}
135130
try {
136-
boolean topicExist = pulsar().getNamespaceService()
137-
.getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
138-
.join()
139-
.contains(topicName.toString());
140-
if (topicExist) {
141-
log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
142-
throw new RestException(Status.CONFLICT, "This topic already exists");
143-
}
144-
} catch (Exception e) {
145-
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
146-
throw new RestException(e);
147-
}
148-
try {
149-
String path = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), domain(),
150-
topicName.getEncodedLocalName());
151-
byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
152-
zkCreateOptimistic(path, data);
153-
// Sync data to all quorums and the observers
154-
zkSync(path);
155-
log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
156-
} catch (KeeperException.NodeExistsException e) {
157-
log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName);
158-
throw new RestException(Status.CONFLICT, "Partitioned topic already exist");
131+
validateTopicName(property, cluster, namespace, encodedTopic);
132+
internalCreatePartitionedTopic(asyncResponse, numPartitions);
159133
} catch (Exception e) {
160134
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
161-
throw new RestException(e);
135+
resumeAsyncResponseExceptionally(asyncResponse, e);
162136
}
163137
}
164138

0 commit comments

Comments
 (0)