@@ -560,145 +560,154 @@ protected PartitionedTopicMetadata internalGetPartitionedMetadata(boolean author
560560
561561 protected void internalDeletePartitionedTopic (AsyncResponse asyncResponse , boolean authoritative ,
562562 boolean force , boolean deleteSchema ) {
563- CompletableFuture <PartitionedTopicMetadata > partitionedTopicMetadataCompletableFuture =
564- validateNamespaceOperationAsync (topicName .getNamespaceObject (), NamespaceOperation .DELETE_TOPIC )
565- .thenApply (__ -> validateTopicOwnershipAsync (topicName , authoritative ))
566- .thenCompose (__ -> pulsar ().getBrokerService ().fetchPartitionedTopicMetadataAsync (topicName ));
567- final CompletableFuture <Void > future = new CompletableFuture <>();
568- partitionedTopicMetadataCompletableFuture .thenAccept (partitionMeta -> {
569- final int numPartitions = partitionMeta .partitions ;
570- if (numPartitions > 0 ) {
571- final AtomicInteger count = new AtomicInteger (numPartitions );
572- if (deleteSchema ) {
573- count .incrementAndGet ();
574- pulsar ().getBrokerService ().deleteSchemaStorage (topicName .getPartition (0 ).toString ())
575- .whenComplete ((r , ex ) -> {
563+ validateNamespaceOperationAsync (topicName .getNamespaceObject (), NamespaceOperation .DELETE_TOPIC )
564+ .thenAccept (__ -> validateTopicOwnershipAsync (topicName , authoritative ))
565+ .thenAccept (__ -> {
566+ final CompletableFuture <Void > future = new CompletableFuture <>();
567+ pulsar ().getBrokerService ().fetchPartitionedTopicMetadataAsync (topicName )
568+ .thenAccept (partitionMeta -> {
569+ final int numPartitions = partitionMeta .partitions ;
570+ if (numPartitions > 0 ) {
571+ final AtomicInteger count = new AtomicInteger (numPartitions );
572+ if (deleteSchema ) {
573+ count .incrementAndGet ();
574+ pulsar ().getBrokerService ().deleteSchemaStorage (topicName .getPartition (0 ).toString ())
575+ .whenComplete ((r , ex ) -> {
576+ if (ex != null ) {
577+ log .warn ("Failed to delete schema storage of topic: {}" , topicName );
578+ }
579+ if (count .decrementAndGet () == 0 ) {
580+ future .complete (null );
581+ }
582+ });
583+ }
584+ // delete authentication policies of the partitioned topic
585+ CompletableFuture <Void > deleteAuthFuture = new CompletableFuture <>();
586+ pulsar ().getPulsarResources ().getNamespaceResources ()
587+ .setPoliciesAsync (topicName .getNamespaceObject (), p -> {
588+ for (int i = 0 ; i < numPartitions ; i ++) {
589+ p .auth_policies .getTopicAuthentication ()
590+ .remove (topicName .getPartition (i ).toString ());
591+ }
592+ p .auth_policies .getTopicAuthentication ().remove (topicName .toString ());
593+ return p ;
594+ }).thenAccept (v -> {
595+ log .info ("Successfully delete authentication policies" +
596+ " for partitioned topic {}" , topicName );
597+ deleteAuthFuture .complete (null );
598+ }).exceptionally (ex -> {
599+ if (ex .getCause () instanceof MetadataStoreException .NotFoundException ) {
600+ log .warn ("Namespace policies of {} not found" ,
601+ topicName .getNamespaceObject ());
602+ deleteAuthFuture .complete (null );
603+ } else {
604+ log .error ("Failed to delete authentication policies " +
605+ "for partitioned topic {}" , topicName , ex );
606+ deleteAuthFuture .completeExceptionally (ex );
607+ }
608+ return null ;
609+ });
610+
611+ deleteAuthFuture .whenComplete ((r , ex ) -> {
576612 if (ex != null ) {
577- log .warn ("Failed to delete schema storage of topic: {}" , topicName );
613+ future .completeExceptionally (ex );
614+ return ;
578615 }
579- if (count .decrementAndGet () == 0 ) {
580- future .complete (null );
616+ for (int i = 0 ; i < numPartitions ; i ++) {
617+ TopicName topicNamePartition = topicName .getPartition (i );
618+ try {
619+ pulsar ().getAdminClient ().topics ()
620+ .deleteAsync (topicNamePartition .toString (), force )
621+ .whenComplete ((r1 , ex1 ) -> {
622+ if (ex1 != null ) {
623+ if (ex1 instanceof NotFoundException ) {
624+ // if the sub-topic is not found, the client might not have called
625+ // create producer or it might have been deleted earlier,
626+ //so we ignore the 404 error.
627+ // For all other exception,
628+ //we fail the delete partition method even if a single
629+ // partition is failed to be deleted
630+ if (log .isDebugEnabled ()) {
631+ log .debug ("[{}] Partition not found: {}" , clientAppId (),
632+ topicNamePartition );
633+ }
634+ } else {
635+ log .error ("[{}] Failed to delete partition {}" ,
636+ clientAppId (), topicNamePartition , ex1 );
637+ future .completeExceptionally (ex1 );
638+ return ;
639+ }
640+ } else {
641+ log .info ("[{}] Deleted partition {}" , clientAppId (),
642+ topicNamePartition );
643+ }
644+ if (count .decrementAndGet () == 0 ) {
645+ future .complete (null );
646+ }
647+ });
648+ } catch (Exception e ) {
649+ log .error ("[{}] Failed to delete partition {}" , clientAppId (),
650+ topicNamePartition , e );
651+ future .completeExceptionally (e );
652+ }
581653 }
582654 });
583- }
584- // delete authentication policies of the partitioned topic
585- CompletableFuture <Void > deleteAuthFuture = new CompletableFuture <>();
586- pulsar ().getPulsarResources ().getNamespaceResources ()
587- .setPoliciesAsync (topicName .getNamespaceObject (), p -> {
588- for (int i = 0 ; i < numPartitions ; i ++) {
589- p .auth_policies .getTopicAuthentication ().remove (topicName .getPartition (i ).toString ());
590- }
591- p .auth_policies .getTopicAuthentication ().remove (topicName .toString ());
592- return p ;
593- }).thenAccept (v -> {
594- log .info ("Successfully delete authentication policies for partitioned topic {}" , topicName );
595- deleteAuthFuture .complete (null );
596- }).exceptionally (ex -> {
597- if (ex .getCause () instanceof MetadataStoreException .NotFoundException ) {
598- log .warn ("Namespace policies of {} not found" , topicName .getNamespaceObject ());
599- deleteAuthFuture .complete (null );
655+ } else {
656+ future .complete (null );
657+ }
658+ }).exceptionally (ex -> {
659+ future .completeExceptionally (ex );
660+ return null ;
661+ });
662+
663+ future .whenComplete ((r , ex ) -> {
664+ if (ex != null ) {
665+ if (ex instanceof PreconditionFailedException ) {
666+ asyncResponse .resume (
667+ new RestException (Status .PRECONDITION_FAILED ,
668+ "Topic has active producers/subscriptions" ));
669+ return ;
670+ } else if (ex instanceof PulsarAdminException ) {
671+ asyncResponse .resume (new RestException ((PulsarAdminException ) ex ));
672+ return ;
673+ } else if (ex instanceof WebApplicationException ) {
674+ asyncResponse .resume (ex );
675+ return ;
600676 } else {
601- log .error ("Failed to delete authentication policies for partitioned topic {}" ,
602- topicName , ex );
603- deleteAuthFuture .completeExceptionally (ex );
677+ asyncResponse .resume (new RestException (ex ));
678+ return ;
604679 }
605- return null ;
606- });
607-
608- deleteAuthFuture .whenComplete ((r , ex ) -> {
609- if (ex != null ) {
610- future .completeExceptionally (ex );
611- return ;
612- }
613- for (int i = 0 ; i < numPartitions ; i ++) {
614- TopicName topicNamePartition = topicName .getPartition (i );
680+ }
681+ // Only tries to delete the znode for partitioned topic when all its partitions are successfully deleted
615682 try {
616- pulsar ().getAdminClient ().topics ()
617- .deleteAsync (topicNamePartition .toString (), force )
618- .whenComplete ((r1 , ex1 ) -> {
619- if (ex1 != null ) {
620- if (ex1 instanceof NotFoundException ) {
621- // if the sub-topic is not found, the client might not have called
622- // create producer or it might have been deleted earlier,
623- //so we ignore the 404 error.
624- // For all other exception,
625- //we fail the delete partition method even if a single
626- // partition is failed to be deleted
627- if (log .isDebugEnabled ()) {
628- log .debug ("[{}] Partition not found: {}" , clientAppId (),
629- topicNamePartition );
630- }
631- } else {
632- log .error ("[{}] Failed to delete partition {}" , clientAppId (),
633- topicNamePartition , ex1 );
634- future .completeExceptionally (ex1 );
635- return ;
636- }
683+ namespaceResources ().getPartitionedTopicResources ()
684+ .deletePartitionedTopicAsync (topicName ).thenAccept (r2 -> {
685+ log .info ("[{}] Deleted partitioned topic {}" , clientAppId (), topicName );
686+ asyncResponse .resume (Response .noContent ().build ());
687+ }).exceptionally (ex1 -> {
688+ log .error ("[{}] Failed to delete partitioned topic {}" , clientAppId (),
689+ topicName , ex1 .getCause ());
690+ if (ex1 .getCause () instanceof MetadataStoreException .NotFoundException ) {
691+ asyncResponse .resume (new RestException (
692+ new RestException (Status .NOT_FOUND ,
693+ "Partitioned topic does not exist" )));
694+ } else if (ex1
695+ .getCause ()
696+ instanceof MetadataStoreException .BadVersionException ) {
697+ asyncResponse .resume (
698+ new RestException (new RestException (Status .CONFLICT ,
699+ "Concurrent modification" )));
637700 } else {
638- log .info ("[{}] Deleted partition {}" , clientAppId (), topicNamePartition );
639- }
640- if (count .decrementAndGet () == 0 ) {
641- future .complete (null );
701+ asyncResponse .resume (new RestException ((ex1 .getCause ())));
642702 }
703+ return null ;
643704 });
644- } catch (Exception e ) {
645- log .error ("[{}] Failed to delete partition {}" , clientAppId (), topicNamePartition , e );
646- future . completeExceptionally ( e );
705+ } catch (Exception e1 ) {
706+ log .error ("[{}] Failed to delete partitioned topic {}" , clientAppId (), topicName , e1 );
707+ asyncResponse . resume ( new RestException ( e1 ) );
647708 }
648- }
649- });
650- } else {
651- future .complete (null );
652- }
653- }).exceptionally (ex -> {
654- future .completeExceptionally (ex );
655- return null ;
656- });
657-
658- future .whenComplete ((r , ex ) -> {
659- if (ex != null ) {
660- if (ex instanceof PreconditionFailedException ) {
661- asyncResponse .resume (
662- new RestException (Status .PRECONDITION_FAILED , "Topic has active producers/subscriptions" ));
663- return ;
664- } else if (ex instanceof PulsarAdminException ) {
665- asyncResponse .resume (new RestException ((PulsarAdminException ) ex ));
666- return ;
667- } else if (ex instanceof WebApplicationException ) {
668- asyncResponse .resume (ex );
669- return ;
670- } else {
671- asyncResponse .resume (new RestException (ex ));
672- return ;
673- }
674- }
675- // Only tries to delete the znode for partitioned topic when all its partitions are successfully deleted
676- try {
677- namespaceResources ().getPartitionedTopicResources ()
678- .deletePartitionedTopicAsync (topicName ).thenAccept (r2 -> {
679- log .info ("[{}] Deleted partitioned topic {}" , clientAppId (), topicName );
680- asyncResponse .resume (Response .noContent ().build ());
681- }).exceptionally (ex1 -> {
682- log .error ("[{}] Failed to delete partitioned topic {}" , clientAppId (), topicName , ex1 .getCause ());
683- if (ex1 .getCause ()
684- instanceof org .apache .pulsar .metadata .api .MetadataStoreException .NotFoundException ) {
685- asyncResponse .resume (new RestException (
686- new RestException (Status .NOT_FOUND , "Partitioned topic does not exist" )));
687- } else if (ex1
688- .getCause ()
689- instanceof org .apache .pulsar .metadata .api .MetadataStoreException .BadVersionException ) {
690- asyncResponse .resume (
691- new RestException (new RestException (Status .CONFLICT , "Concurrent modification" )));
692- } else {
693- asyncResponse .resume (new RestException ((ex1 .getCause ())));
694- }
695- return null ;
696- });
697- } catch (Exception e1 ) {
698- log .error ("[{}] Failed to delete partitioned topic {}" , clientAppId (), topicName , e1 );
699- asyncResponse .resume (new RestException (e1 ));
700- }
701- }).exceptionally (ex ->{
709+ });
710+ }).exceptionally (ex ->{
702711 Throwable cause = ex .getCause ();
703712 if (cause instanceof WebApplicationException && ((WebApplicationException ) cause ).getResponse ().getStatus ()
704713 == Status .TEMPORARY_REDIRECT .getStatusCode ()) {
0 commit comments