Skip to content

Commit fbb0aaf

Browse files
committed
Use thenAccept instead of new CompletableFuture
1 parent 19f533e commit fbb0aaf

File tree

1 file changed

+137
-128
lines changed

1 file changed

+137
-128
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java

Lines changed: 137 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -560,145 +560,154 @@ protected PartitionedTopicMetadata internalGetPartitionedMetadata(boolean author
560560

561561
protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boolean authoritative,
562562
boolean force, boolean deleteSchema) {
563-
CompletableFuture<PartitionedTopicMetadata> partitionedTopicMetadataCompletableFuture =
564-
validateNamespaceOperationAsync(topicName.getNamespaceObject(), NamespaceOperation.DELETE_TOPIC)
565-
.thenApply(__ -> validateTopicOwnershipAsync(topicName, authoritative))
566-
.thenCompose(__ -> pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName));
567-
final CompletableFuture<Void> future = new CompletableFuture<>();
568-
partitionedTopicMetadataCompletableFuture.thenAccept(partitionMeta -> {
569-
final int numPartitions = partitionMeta.partitions;
570-
if (numPartitions > 0) {
571-
final AtomicInteger count = new AtomicInteger(numPartitions);
572-
if (deleteSchema) {
573-
count.incrementAndGet();
574-
pulsar().getBrokerService().deleteSchemaStorage(topicName.getPartition(0).toString())
575-
.whenComplete((r, ex) -> {
563+
validateNamespaceOperationAsync(topicName.getNamespaceObject(), NamespaceOperation.DELETE_TOPIC)
564+
.thenAccept(__ -> validateTopicOwnershipAsync(topicName, authoritative))
565+
.thenAccept(__ -> {
566+
final CompletableFuture<Void> future = new CompletableFuture<>();
567+
pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName)
568+
.thenAccept(partitionMeta -> {
569+
final int numPartitions = partitionMeta.partitions;
570+
if (numPartitions > 0) {
571+
final AtomicInteger count = new AtomicInteger(numPartitions);
572+
if (deleteSchema) {
573+
count.incrementAndGet();
574+
pulsar().getBrokerService().deleteSchemaStorage(topicName.getPartition(0).toString())
575+
.whenComplete((r, ex) -> {
576+
if (ex != null) {
577+
log.warn("Failed to delete schema storage of topic: {}", topicName);
578+
}
579+
if (count.decrementAndGet() == 0) {
580+
future.complete(null);
581+
}
582+
});
583+
}
584+
// delete authentication policies of the partitioned topic
585+
CompletableFuture<Void> deleteAuthFuture = new CompletableFuture<>();
586+
pulsar().getPulsarResources().getNamespaceResources()
587+
.setPoliciesAsync(topicName.getNamespaceObject(), p -> {
588+
for (int i = 0; i < numPartitions; i++) {
589+
p.auth_policies.getTopicAuthentication()
590+
.remove(topicName.getPartition(i).toString());
591+
}
592+
p.auth_policies.getTopicAuthentication().remove(topicName.toString());
593+
return p;
594+
}).thenAccept(v -> {
595+
log.info("Successfully delete authentication policies" +
596+
" for partitioned topic {}", topicName);
597+
deleteAuthFuture.complete(null);
598+
}).exceptionally(ex -> {
599+
if (ex.getCause() instanceof MetadataStoreException.NotFoundException) {
600+
log.warn("Namespace policies of {} not found",
601+
topicName.getNamespaceObject());
602+
deleteAuthFuture.complete(null);
603+
} else {
604+
log.error("Failed to delete authentication policies " +
605+
"for partitioned topic {}", topicName, ex);
606+
deleteAuthFuture.completeExceptionally(ex);
607+
}
608+
return null;
609+
});
610+
611+
deleteAuthFuture.whenComplete((r, ex) -> {
576612
if (ex != null) {
577-
log.warn("Failed to delete schema storage of topic: {}", topicName);
613+
future.completeExceptionally(ex);
614+
return;
578615
}
579-
if (count.decrementAndGet() == 0) {
580-
future.complete(null);
616+
for (int i = 0; i < numPartitions; i++) {
617+
TopicName topicNamePartition = topicName.getPartition(i);
618+
try {
619+
pulsar().getAdminClient().topics()
620+
.deleteAsync(topicNamePartition.toString(), force)
621+
.whenComplete((r1, ex1) -> {
622+
if (ex1 != null) {
623+
if (ex1 instanceof NotFoundException) {
624+
// if the sub-topic is not found, the client might not have called
625+
// create producer or it might have been deleted earlier,
626+
//so we ignore the 404 error.
627+
// For all other exception,
628+
//we fail the delete partition method even if a single
629+
// partition is failed to be deleted
630+
if (log.isDebugEnabled()) {
631+
log.debug("[{}] Partition not found: {}", clientAppId(),
632+
topicNamePartition);
633+
}
634+
} else {
635+
log.error("[{}] Failed to delete partition {}",
636+
clientAppId(), topicNamePartition, ex1);
637+
future.completeExceptionally(ex1);
638+
return;
639+
}
640+
} else {
641+
log.info("[{}] Deleted partition {}", clientAppId(),
642+
topicNamePartition);
643+
}
644+
if (count.decrementAndGet() == 0) {
645+
future.complete(null);
646+
}
647+
});
648+
} catch (Exception e) {
649+
log.error("[{}] Failed to delete partition {}", clientAppId(),
650+
topicNamePartition, e);
651+
future.completeExceptionally(e);
652+
}
581653
}
582654
});
583-
}
584-
// delete authentication policies of the partitioned topic
585-
CompletableFuture<Void> deleteAuthFuture = new CompletableFuture<>();
586-
pulsar().getPulsarResources().getNamespaceResources()
587-
.setPoliciesAsync(topicName.getNamespaceObject(), p -> {
588-
for (int i = 0; i < numPartitions; i++) {
589-
p.auth_policies.getTopicAuthentication().remove(topicName.getPartition(i).toString());
590-
}
591-
p.auth_policies.getTopicAuthentication().remove(topicName.toString());
592-
return p;
593-
}).thenAccept(v -> {
594-
log.info("Successfully delete authentication policies for partitioned topic {}", topicName);
595-
deleteAuthFuture.complete(null);
596-
}).exceptionally(ex -> {
597-
if (ex.getCause() instanceof MetadataStoreException.NotFoundException) {
598-
log.warn("Namespace policies of {} not found", topicName.getNamespaceObject());
599-
deleteAuthFuture.complete(null);
655+
} else {
656+
future.complete(null);
657+
}
658+
}).exceptionally(ex -> {
659+
future.completeExceptionally(ex);
660+
return null;
661+
});
662+
663+
future.whenComplete((r, ex) -> {
664+
if (ex != null) {
665+
if (ex instanceof PreconditionFailedException) {
666+
asyncResponse.resume(
667+
new RestException(Status.PRECONDITION_FAILED,
668+
"Topic has active producers/subscriptions"));
669+
return;
670+
} else if (ex instanceof PulsarAdminException) {
671+
asyncResponse.resume(new RestException((PulsarAdminException) ex));
672+
return;
673+
} else if (ex instanceof WebApplicationException) {
674+
asyncResponse.resume(ex);
675+
return;
600676
} else {
601-
log.error("Failed to delete authentication policies for partitioned topic {}",
602-
topicName, ex);
603-
deleteAuthFuture.completeExceptionally(ex);
677+
asyncResponse.resume(new RestException(ex));
678+
return;
604679
}
605-
return null;
606-
});
607-
608-
deleteAuthFuture.whenComplete((r, ex) -> {
609-
if (ex != null) {
610-
future.completeExceptionally(ex);
611-
return;
612-
}
613-
for (int i = 0; i < numPartitions; i++) {
614-
TopicName topicNamePartition = topicName.getPartition(i);
680+
}
681+
// Only tries to delete the znode for partitioned topic when all its partitions are successfully deleted
615682
try {
616-
pulsar().getAdminClient().topics()
617-
.deleteAsync(topicNamePartition.toString(), force)
618-
.whenComplete((r1, ex1) -> {
619-
if (ex1 != null) {
620-
if (ex1 instanceof NotFoundException) {
621-
// if the sub-topic is not found, the client might not have called
622-
// create producer or it might have been deleted earlier,
623-
//so we ignore the 404 error.
624-
// For all other exception,
625-
//we fail the delete partition method even if a single
626-
// partition is failed to be deleted
627-
if (log.isDebugEnabled()) {
628-
log.debug("[{}] Partition not found: {}", clientAppId(),
629-
topicNamePartition);
630-
}
631-
} else {
632-
log.error("[{}] Failed to delete partition {}", clientAppId(),
633-
topicNamePartition, ex1);
634-
future.completeExceptionally(ex1);
635-
return;
636-
}
683+
namespaceResources().getPartitionedTopicResources()
684+
.deletePartitionedTopicAsync(topicName).thenAccept(r2 -> {
685+
log.info("[{}] Deleted partitioned topic {}", clientAppId(), topicName);
686+
asyncResponse.resume(Response.noContent().build());
687+
}).exceptionally(ex1 -> {
688+
log.error("[{}] Failed to delete partitioned topic {}", clientAppId(),
689+
topicName, ex1.getCause());
690+
if (ex1.getCause() instanceof MetadataStoreException.NotFoundException) {
691+
asyncResponse.resume(new RestException(
692+
new RestException(Status.NOT_FOUND,
693+
"Partitioned topic does not exist")));
694+
} else if (ex1
695+
.getCause()
696+
instanceof MetadataStoreException.BadVersionException) {
697+
asyncResponse.resume(
698+
new RestException(new RestException(Status.CONFLICT,
699+
"Concurrent modification")));
637700
} else {
638-
log.info("[{}] Deleted partition {}", clientAppId(), topicNamePartition);
639-
}
640-
if (count.decrementAndGet() == 0) {
641-
future.complete(null);
701+
asyncResponse.resume(new RestException((ex1.getCause())));
642702
}
703+
return null;
643704
});
644-
} catch (Exception e) {
645-
log.error("[{}] Failed to delete partition {}", clientAppId(), topicNamePartition, e);
646-
future.completeExceptionally(e);
705+
} catch (Exception e1) {
706+
log.error("[{}] Failed to delete partitioned topic {}", clientAppId(), topicName, e1);
707+
asyncResponse.resume(new RestException(e1));
647708
}
648-
}
649-
});
650-
} else {
651-
future.complete(null);
652-
}
653-
}).exceptionally(ex -> {
654-
future.completeExceptionally(ex);
655-
return null;
656-
});
657-
658-
future.whenComplete((r, ex) -> {
659-
if (ex != null) {
660-
if (ex instanceof PreconditionFailedException) {
661-
asyncResponse.resume(
662-
new RestException(Status.PRECONDITION_FAILED, "Topic has active producers/subscriptions"));
663-
return;
664-
} else if (ex instanceof PulsarAdminException) {
665-
asyncResponse.resume(new RestException((PulsarAdminException) ex));
666-
return;
667-
} else if (ex instanceof WebApplicationException) {
668-
asyncResponse.resume(ex);
669-
return;
670-
} else {
671-
asyncResponse.resume(new RestException(ex));
672-
return;
673-
}
674-
}
675-
// Only tries to delete the znode for partitioned topic when all its partitions are successfully deleted
676-
try {
677-
namespaceResources().getPartitionedTopicResources()
678-
.deletePartitionedTopicAsync(topicName).thenAccept(r2 -> {
679-
log.info("[{}] Deleted partitioned topic {}", clientAppId(), topicName);
680-
asyncResponse.resume(Response.noContent().build());
681-
}).exceptionally(ex1 -> {
682-
log.error("[{}] Failed to delete partitioned topic {}", clientAppId(), topicName, ex1.getCause());
683-
if (ex1.getCause()
684-
instanceof org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException) {
685-
asyncResponse.resume(new RestException(
686-
new RestException(Status.NOT_FOUND, "Partitioned topic does not exist")));
687-
} else if (ex1
688-
.getCause()
689-
instanceof org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException) {
690-
asyncResponse.resume(
691-
new RestException(new RestException(Status.CONFLICT, "Concurrent modification")));
692-
} else {
693-
asyncResponse.resume(new RestException((ex1.getCause())));
694-
}
695-
return null;
696-
});
697-
} catch (Exception e1) {
698-
log.error("[{}] Failed to delete partitioned topic {}", clientAppId(), topicName, e1);
699-
asyncResponse.resume(new RestException(e1));
700-
}
701-
}).exceptionally(ex ->{
709+
});
710+
}).exceptionally(ex ->{
702711
Throwable cause = ex.getCause();
703712
if (cause instanceof WebApplicationException && ((WebApplicationException) cause).getResponse().getStatus()
704713
== Status.TEMPORARY_REDIRECT.getStatusCode()) {

0 commit comments

Comments
 (0)