|
27 | 27 |
|
28 | 28 | import java.net.MalformedURLException; |
29 | 29 | import java.net.URI; |
| 30 | +import java.util.ArrayList; |
30 | 31 | import java.util.List; |
31 | 32 | import java.util.Set; |
32 | 33 | import java.util.concurrent.CompletableFuture; |
|
36 | 37 |
|
37 | 38 | import javax.servlet.ServletContext; |
38 | 39 | import javax.ws.rs.WebApplicationException; |
| 40 | +import javax.ws.rs.container.AsyncResponse; |
39 | 41 | import javax.ws.rs.core.Response; |
40 | 42 | import javax.ws.rs.core.Response.Status; |
41 | 43 | import javax.ws.rs.core.UriBuilder; |
|
46 | 48 | import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService; |
47 | 49 | import org.apache.pulsar.broker.web.PulsarWebResource; |
48 | 50 | import org.apache.pulsar.broker.web.RestException; |
| 51 | +import org.apache.pulsar.common.api.proto.PulsarApi; |
49 | 52 | import org.apache.pulsar.common.naming.Constants; |
50 | 53 | import org.apache.pulsar.common.naming.NamespaceBundle; |
51 | 54 | import org.apache.pulsar.common.naming.NamespaceBundleFactory; |
@@ -255,35 +258,42 @@ protected List<String> getListOfNamespaces(String property) throws Exception { |
255 | 258 | return namespaces; |
256 | 259 | } |
257 | 260 |
|
258 | | - protected void tryCreatePartitionsAsync(int numPartitions) { |
| 261 | + protected CompletableFuture<Void> tryCreatePartitionsAsync(int numPartitions) { |
259 | 262 | if (!topicName.isPersistent()) { |
260 | | - return; |
| 263 | + return CompletableFuture.completedFuture(null); |
261 | 264 | } |
| 265 | + List<CompletableFuture<Void>> futures = new ArrayList<>(numPartitions); |
262 | 266 | for (int i = 0; i < numPartitions; i++) { |
263 | | - tryCreatePartitionAsync(i); |
| 267 | + futures.add(tryCreatePartitionAsync(i, null)); |
264 | 268 | } |
| 269 | + return FutureUtil.waitForAll(futures); |
265 | 270 | } |
266 | 271 |
|
267 | | - private void tryCreatePartitionAsync(final int partition) { |
| 272 | + private CompletableFuture<Void> tryCreatePartitionAsync(final int partition, CompletableFuture<Void> reuseFuture) { |
| 273 | + CompletableFuture<Void> result = reuseFuture == null ? new CompletableFuture<>() : reuseFuture; |
268 | 274 | zkCreateOptimisticAsync(localZk(), ZkAdminPaths.managedLedgerPath(topicName.getPartition(partition)), new byte[0], |
269 | 275 | (rc, s, o, s1) -> { |
270 | 276 | if (KeeperException.Code.OK.intValue() == rc) { |
271 | 277 | if (log.isDebugEnabled()) { |
272 | 278 | log.debug("[{}] Topic partition {} created.", clientAppId(), |
273 | 279 | topicName.getPartition(partition)); |
274 | 280 | } |
| 281 | + result.complete(null); |
275 | 282 | } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) { |
276 | 283 | log.info("[{}] Topic partition {} is exists, doing nothing.", clientAppId(), |
277 | 284 | topicName.getPartition(partition)); |
| 285 | + result.completeExceptionally(KeeperException.create(KeeperException.Code.NODEEXISTS)); |
278 | 286 | } else if (KeeperException.Code.BADVERSION.intValue() == rc) { |
279 | 287 | log.warn("[{}] Fail to create topic partition {} with concurrent modification, retry now.", |
280 | 288 | clientAppId(), topicName.getPartition(partition)); |
281 | | - tryCreatePartitionAsync(partition); |
| 289 | + tryCreatePartitionAsync(partition, result); |
282 | 290 | } else { |
283 | 291 | log.error("[{}] Fail to create topic partition {}", clientAppId(), |
284 | 292 | topicName.getPartition(partition), KeeperException.create(KeeperException.Code.get(rc))); |
| 293 | + result.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc))); |
285 | 294 | } |
286 | 295 | }); |
| 296 | + return result; |
287 | 297 | } |
288 | 298 |
|
289 | 299 | protected NamespaceName namespaceName; |
@@ -707,4 +717,98 @@ protected List<String> getPartitionedTopicList(TopicDomain topicDomain) { |
707 | 717 | partitionedTopics.sort(null); |
708 | 718 | return partitionedTopics; |
709 | 719 | } |
| 720 | + |
| 721 | + protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int numPartitions) { |
| 722 | + try { |
| 723 | + validateAdminAccessForTenant(topicName.getTenant()); |
| 724 | + } catch (Exception e) { |
| 725 | + log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e); |
| 726 | + resumeAsyncResponseExceptionally(asyncResponse, e); |
| 727 | + return; |
| 728 | + } |
| 729 | + if (numPartitions <= 0) { |
| 730 | + asyncResponse.resume(new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0")); |
| 731 | + return; |
| 732 | + } |
| 733 | + checkTopicExistsAsync(topicName).thenAccept(exists -> { |
| 734 | + if (exists) { |
| 735 | + log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName); |
| 736 | + asyncResponse.resume(new RestException(Status.CONFLICT, "This topic already exists")); |
| 737 | + } else { |
| 738 | + try { |
| 739 | + String path = ZkAdminPaths.partitionedTopicPath(topicName); |
| 740 | + byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions)); |
| 741 | + zkCreateOptimisticAsync(globalZk(), path, data, (rc, s, o, s1) -> { |
| 742 | + if (KeeperException.Code.OK.intValue() == rc) { |
| 743 | + globalZk().sync(path, (rc2, s2, ctx) -> { |
| 744 | + if (KeeperException.Code.OK.intValue() == rc2) { |
| 745 | + log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName); |
| 746 | + tryCreatePartitionsAsync(numPartitions).thenAccept(v -> { |
| 747 | + log.info("[{}] Successfully created partitions for topic {}", clientAppId(), topicName); |
| 748 | + asyncResponse.resume(Response.noContent().build()); |
| 749 | + }).exceptionally(e -> { |
| 750 | + log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName); |
| 751 | + // The partitioned topic is created but there are some partitions create failed |
| 752 | + asyncResponse.resume(new RestException(e)); |
| 753 | + return null; |
| 754 | + }); |
| 755 | + } else { |
| 756 | + log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, KeeperException.create(KeeperException.Code.get(rc2))); |
| 757 | + asyncResponse.resume(new RestException(KeeperException.create(KeeperException.Code.get(rc2)))); |
| 758 | + } |
| 759 | + }, null); |
| 760 | + } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) { |
| 761 | + log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName); |
| 762 | + asyncResponse.resume(new RestException(Status.CONFLICT, "Partitioned topic already exists")); |
| 763 | + } else if (KeeperException.Code.BADVERSION.intValue() == rc) { |
| 764 | + log.warn("[{}] Failed to create partitioned topic {}: concurrent modification", clientAppId(), |
| 765 | + topicName); |
| 766 | + asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification")); |
| 767 | + } else { |
| 768 | + log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, KeeperException.create(KeeperException.Code.get(rc))); |
| 769 | + asyncResponse.resume(new RestException(KeeperException.create(KeeperException.Code.get(rc)))); |
| 770 | + } |
| 771 | + }); |
| 772 | + } catch (Exception e) { |
| 773 | + log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e); |
| 774 | + resumeAsyncResponseExceptionally(asyncResponse, e); |
| 775 | + } |
| 776 | + } |
| 777 | + }).exceptionally(ex -> { |
| 778 | + log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, ex); |
| 779 | + resumeAsyncResponseExceptionally(asyncResponse, ex); |
| 780 | + return null; |
| 781 | + }); |
| 782 | + } |
| 783 | + |
| 784 | + /** |
| 785 | + * Check the exists topics contains the given topic. |
| 786 | + * Since there are topic partitions and non-partitioned topics in Pulsar, must ensure both partitions |
| 787 | + * and non-partitioned topics are not duplicated. So, if compare with a partition name, we should compare |
| 788 | + * to the partitioned name of this partition. |
| 789 | + * |
| 790 | + * @param topicName given topic name |
| 791 | + */ |
| 792 | + protected CompletableFuture<Boolean> checkTopicExistsAsync(TopicName topicName) { |
| 793 | + return pulsar().getNamespaceService().getListOfTopics(topicName.getNamespaceObject(), |
| 794 | + PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL) |
| 795 | + .thenCompose(topics -> { |
| 796 | + boolean exists = false; |
| 797 | + for (String topic : topics) { |
| 798 | + if (topicName.getPartitionedTopicName().equals(TopicName.get(topic).getPartitionedTopicName())) { |
| 799 | + exists = true; |
| 800 | + break; |
| 801 | + } |
| 802 | + } |
| 803 | + return CompletableFuture.completedFuture(exists); |
| 804 | + }); |
| 805 | + } |
| 806 | + |
| 807 | + protected void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable throwable) { |
| 808 | + if (throwable instanceof WebApplicationException) { |
| 809 | + asyncResponse.resume(throwable); |
| 810 | + } else { |
| 811 | + asyncResponse.resume(new RestException(throwable)); |
| 812 | + } |
| 813 | + } |
710 | 814 | } |
0 commit comments