Skip to content

Conversation

@suiyuzeng
Copy link
Contributor

Master Issue: #13854

Motivation

See #13854

Verifying this change

  • Make sure that the change passes the CI checks.

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (no)
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)
  • The rest endpoints: (no)
  • The admin cli options: (no)
  • Anything that affects deployment: (no)

Documentation

  • no-need-doc

Comment on lines 4578 to 4579
private CompletableFuture<Void> internalSetReplicatedSubscriptionStatusForNonPartitionedTopicAsync(AsyncResponse asyncResponse,
String subName,boolean authoritative,boolean enabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private CompletableFuture<Void> internalSetReplicatedSubscriptionStatusForNonPartitionedTopicAsync(AsyncResponse asyncResponse,
String subName,boolean authoritative,boolean enabled) {
private CompletableFuture<Void> internalSetReplicatedSubscriptionStatusForNonPartitionedTopicAsync(AsyncResponse asyncResponse,
String subName, boolean authoritative, boolean enabled) {

String subName,boolean authoritative,boolean enabled) {
// Redirect the request to the appropriate broker if this broker is not the owner of the topic
return validateTopicOwnershipAsync(topicName, authoritative).thenRun(() -> {
Topic topic = getTopicReference(topicName);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method can be async.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch!

// 2.Redirect the request to the peer-cluster if the local cluster is not included in the replication clusters
CompletableFuture<Void> future =
validateTopicOperationAsync(topicName, TopicOperation.SET_REPLICATED_SUBSCRIPTION_STATUS, subName)
.thenCompose(__-> validateGlobalNamespaceOwnershipAsync(namespaceName));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.thenCompose(__-> validateGlobalNamespaceOwnershipAsync(namespaceName));
.thenCompose(__ -> validateGlobalNamespaceOwnershipAsync(namespaceName));

validateTopicOwnership(topicName, authoritative);

private CompletableFuture<Void> internalSetReplicatedSubscriptionStatusForNonPartitionedTopicAsync(AsyncResponse asyncResponse,
String subName,boolean authoritative,boolean enabled) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
String subName,boolean authoritative,boolean enabled) {
String subName, boolean authoritative, boolean enabled) {

}
} catch (Exception e) {
}).exceptionally(e -> {
resumeAsyncResponseExceptionally(asyncResponse, e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Maybe we need e.getCause here.
  • We need log some information when get exception.

@suiyuzeng suiyuzeng force-pushed the sync_call_issue_internalSetReplicatedSubscriptionStatus branch from 408b069 to e4eecb2 Compare January 21, 2022 13:29
return null;
});
} else {
getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need future.thenCompose here. This should be executed after previous validation is finished.

} else {
internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, subName, authoritative,
enabled);
internalSetReplicatedSubscriptionStatusForNonPartitionedTopicAsync(asyncResponse, subName,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to add "Async", let's keep it consistent with other methods.

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jan 21, 2022
@suiyuzeng suiyuzeng force-pushed the sync_call_issue_internalSetReplicatedSubscriptionStatus branch 2 times, most recently from e4a9897 to d5f4da8 Compare January 25, 2022 03:50
@suiyuzeng suiyuzeng force-pushed the sync_call_issue_internalSetReplicatedSubscriptionStatus branch from d5f4da8 to fbd70aa Compare January 25, 2022 05:25
@Jason918
Copy link
Contributor

/pulsarbot run-failure-checks

1 similar comment
@suiyuzeng
Copy link
Contributor Author

/pulsarbot run-failure-checks

@Jason918
Copy link
Contributor

@liudezhi2098 @mattisonchao PTAL

@codelipenghui codelipenghui merged commit 7ea2448 into apache:master Jan 28, 2022
Nicklee007 pushed a commit to Nicklee007/pulsar that referenced this pull request Apr 20, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants