-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[Issue 13854][broker] Fix call sync method in async rest api for internalSetReplicatedSubscriptionStatus #13887
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Issue 13854][broker] Fix call sync method in async rest api for internalSetReplicatedSubscriptionStatus #13887
Conversation
| private CompletableFuture<Void> internalSetReplicatedSubscriptionStatusForNonPartitionedTopicAsync(AsyncResponse asyncResponse, | ||
| String subName,boolean authoritative,boolean enabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| private CompletableFuture<Void> internalSetReplicatedSubscriptionStatusForNonPartitionedTopicAsync(AsyncResponse asyncResponse, | |
| String subName,boolean authoritative,boolean enabled) { | |
| private CompletableFuture<Void> internalSetReplicatedSubscriptionStatusForNonPartitionedTopicAsync(AsyncResponse asyncResponse, | |
| String subName, boolean authoritative, boolean enabled) { |
| String subName,boolean authoritative,boolean enabled) { | ||
| // Redirect the request to the appropriate broker if this broker is not the owner of the topic | ||
| return validateTopicOwnershipAsync(topicName, authoritative).thenRun(() -> { | ||
| Topic topic = getTopicReference(topicName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method can be async.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch!
| // 2.Redirect the request to the peer-cluster if the local cluster is not included in the replication clusters | ||
| CompletableFuture<Void> future = | ||
| validateTopicOperationAsync(topicName, TopicOperation.SET_REPLICATED_SUBSCRIPTION_STATUS, subName) | ||
| .thenCompose(__-> validateGlobalNamespaceOwnershipAsync(namespaceName)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| .thenCompose(__-> validateGlobalNamespaceOwnershipAsync(namespaceName)); | |
| .thenCompose(__ -> validateGlobalNamespaceOwnershipAsync(namespaceName)); |
| validateTopicOwnership(topicName, authoritative); | ||
|
|
||
| private CompletableFuture<Void> internalSetReplicatedSubscriptionStatusForNonPartitionedTopicAsync(AsyncResponse asyncResponse, | ||
| String subName,boolean authoritative,boolean enabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| String subName,boolean authoritative,boolean enabled) { | |
| String subName, boolean authoritative, boolean enabled) { |
| } | ||
| } catch (Exception e) { | ||
| }).exceptionally(e -> { | ||
| resumeAsyncResponseExceptionally(asyncResponse, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Maybe we need
e.getCausehere. - We need log some information when get exception.
408b069 to
e4eecb2
Compare
| return null; | ||
| }); | ||
| } else { | ||
| getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need future.thenCompose here. This should be executed after previous validation is finished.
| } else { | ||
| internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, subName, authoritative, | ||
| enabled); | ||
| internalSetReplicatedSubscriptionStatusForNonPartitionedTopicAsync(asyncResponse, subName, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to add "Async", let's keep it consistent with other methods.
e4a9897 to
d5f4da8
Compare
d5f4da8 to
fbd70aa
Compare
|
/pulsarbot run-failure-checks |
1 similar comment
|
/pulsarbot run-failure-checks |
…rnalSetReplicatedSubscriptionStatus (apache#13887)
Master Issue: #13854
Motivation
See #13854
Verifying this change
Does this pull request potentially affect one of the following parts:
If
yeswas chosen, please highlight the changesDocumentation
no-need-doc