Skip to content

Commit d35e6c1

Browse files
authored
[Issue 5904]Support unload all partitions of a partitioned topic (#6187)
Fixes #5904 ### Motivation Pulsar supports unload a non-partitioned-topic or a partition of a partitioned topic. If there has a partitioned topic with too many partitions, users need to get all partition and unload them one by one. We need to support unload all partition of a partitioned topic.
1 parent 47b944b commit d35e6c1

File tree

7 files changed

+146
-44
lines changed

7 files changed

+146
-44
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,32 @@ protected ZooKeeperChildrenCache failureDomainListCache() {
559559
return pulsar().getConfigurationCache().failureDomainListCache();
560560
}
561561

562+
protected CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadataAsync(
563+
TopicName topicName, boolean authoritative, boolean checkAllowAutoCreation) {
564+
validateClusterOwnership(topicName.getCluster());
565+
// validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
566+
// serve/redirect request else fail partitioned-metadata-request so, client fails while creating
567+
// producer/consumer
568+
validateGlobalNamespaceOwnership(topicName.getNamespaceObject());
569+
570+
try {
571+
checkConnect(topicName);
572+
} catch (WebApplicationException e) {
573+
validateAdminAccessForTenant(topicName.getTenant());
574+
} catch (Exception e) {
575+
// unknown error marked as internal server error
576+
log.warn("Unexpected error while authorizing lookup. topic={}, role={}. Error: {}", topicName,
577+
clientAppId(), e.getMessage(), e);
578+
return FutureUtil.failedFuture(e);
579+
}
580+
581+
if (checkAllowAutoCreation) {
582+
return pulsar().getBrokerService().fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName);
583+
} else {
584+
return pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName);
585+
}
586+
}
587+
562588
protected PartitionedTopicMetadata getPartitionedTopicMetadata(TopicName topicName,
563589
boolean authoritative, boolean checkAllowAutoCreation) {
564590
validateClusterOwnership(topicName.getCluster());

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java

Lines changed: 53 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -655,12 +655,63 @@ protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boole
655655
});
656656
}
657657

658-
protected void internalUnloadTopic(boolean authoritative) {
658+
protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean authoritative) {
659659
log.info("[{}] Unloading topic {}", clientAppId(), topicName);
660660
if (topicName.isGlobal()) {
661661
validateGlobalNamespaceOwnership(namespaceName);
662662
}
663-
unloadTopic(topicName, authoritative);
663+
664+
getPartitionedTopicMetadataAsync(topicName, authoritative, false).whenComplete((meta, t) -> {
665+
if (meta.partitions > 0) {
666+
final List<CompletableFuture<Void>> futures = Lists.newArrayList();
667+
668+
for (int i = 0; i < meta.partitions; i++) {
669+
TopicName topicNamePartition = topicName.getPartition(i);
670+
try {
671+
futures.add(pulsar().getAdminClient().topics().unloadAsync(topicNamePartition.toString()));
672+
} catch (Exception e) {
673+
log.error("[{}] Failed to unload topic {}", clientAppId(), topicNamePartition, e);
674+
asyncResponse.resume(new RestException(e));
675+
return;
676+
}
677+
}
678+
679+
FutureUtil.waitForAll(futures).handle((result, exception) -> {
680+
if (exception != null) {
681+
Throwable th = exception.getCause();
682+
if (th instanceof NotFoundException) {
683+
asyncResponse.resume(new RestException(Status.NOT_FOUND, th.getMessage()));
684+
} else {
685+
log.error("[{}] Failed to unload topic {}", clientAppId(), topicName, exception);
686+
asyncResponse.resume(new RestException(exception));
687+
}
688+
return null;
689+
}
690+
691+
asyncResponse.resume(Response.noContent().build());
692+
return null;
693+
});
694+
} else {
695+
validateAdminAccessForTenant(topicName.getTenant());
696+
validateTopicOwnership(topicName, authoritative);
697+
698+
Topic topic = getTopicReference(topicName);
699+
topic.close(false).whenComplete((r, ex) -> {
700+
if (ex != null) {
701+
log.error("[{}] Failed to unload topic {}, {}", clientAppId(), topicName, ex.getMessage(), ex);
702+
asyncResponse.resume(new RestException(ex));
703+
704+
} else {
705+
log.info("[{}] Successfully unloaded topic {}", clientAppId(), topicName);
706+
asyncResponse.resume(Response.noContent().build());
707+
}
708+
});
709+
}
710+
}).exceptionally(t -> {
711+
Throwable th = t.getCause();
712+
asyncResponse.resume(new RestException(th));
713+
return null;
714+
});
664715
}
665716

666717
protected void internalDeleteTopic(boolean authoritative, boolean force) {
@@ -1893,22 +1944,6 @@ private CompletableFuture<Void> createSubscriptions(TopicName topicName, int num
18931944
return result;
18941945
}
18951946

1896-
protected void unloadTopic(TopicName topicName, boolean authoritative) {
1897-
validateSuperUserAccess();
1898-
validateTopicOwnership(topicName, authoritative);
1899-
try {
1900-
Topic topic = getTopicReference(topicName);
1901-
topic.close(false).get();
1902-
log.info("[{}] Successfully unloaded topic {}", clientAppId(), topicName);
1903-
} catch (NullPointerException e) {
1904-
log.error("[{}] topic {} not found", clientAppId(), topicName);
1905-
throw new RestException(Status.NOT_FOUND, "Topic does not exist");
1906-
} catch (Exception e) {
1907-
log.error("[{}] Failed to unload topic {}, {}", clientAppId(), topicName, e.getMessage(), e);
1908-
throw new RestException(e);
1909-
}
1910-
}
1911-
19121947
// as described at : (PR: #836) CPP-client old client lib should not be allowed to connect on partitioned-topic.
19131948
// So, all requests from old-cpp-client (< v1.21) must be rejected.
19141949
// Pulsar client-java lib always passes user-agent as X-Java-$version.

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -169,16 +169,18 @@ public void createPartitionedTopic(@PathParam("property") String property, @Path
169169
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
170170
@ApiResponse(code = 403, message = "Don't have admin permission"),
171171
@ApiResponse(code = 404, message = "Topic does not exist") })
172-
public void unloadTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
173-
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
172+
public void unloadTopic(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property,
173+
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
174+
@PathParam("topic") @Encoded String encodedTopic,
174175
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
175-
validateTopicName(property, cluster, namespace, encodedTopic);
176-
log.info("[{}] Unloading topic {}", clientAppId(), topicName);
177-
178-
if (topicName.isGlobal()) {
179-
validateGlobalNamespaceOwnership(namespaceName);
176+
try {
177+
validateTopicName(property, cluster, namespace, encodedTopic);
178+
internalUnloadTopic(asyncResponse, authoritative);
179+
} catch (WebApplicationException wae) {
180+
asyncResponse.resume(wae);
181+
} catch (Exception e) {
182+
asyncResponse.resume(new RestException(e));
180183
}
181-
unloadTopic(topicName, authoritative);
182184
}
183185

184186
@GET

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -225,11 +225,12 @@ public void deletePartitionedTopic(@Suspended final AsyncResponse asyncResponse,
225225
@ApiOperation(hidden = true, value = "Unload a topic")
226226
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
227227
@ApiResponse(code = 404, message = "Topic does not exist") })
228-
public void unloadTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
229-
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
228+
public void unloadTopic(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property,
229+
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
230+
@PathParam("topic") @Encoded String encodedTopic,
230231
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
231232
validateTopicName(property, cluster, namespace, encodedTopic);
232-
internalUnloadTopic(authoritative);
233+
internalUnloadTopic(asyncResponse, authoritative);
233234
}
234235

235236
@DELETE

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ public void createPartitionedTopic(
219219
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration"),
220220
})
221221
public void unloadTopic(
222+
@Suspended final AsyncResponse asyncResponse,
222223
@ApiParam(value = "Specify the tenant", required = true)
223224
@PathParam("tenant") String tenant,
224225
@ApiParam(value = "Specify the namespace", required = true)
@@ -227,12 +228,14 @@ public void unloadTopic(
227228
@PathParam("topic") @Encoded String encodedTopic,
228229
@ApiParam(value = "Is authentication required to perform this operation")
229230
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
230-
validateTopicName(tenant, namespace, encodedTopic);
231-
log.info("[{}] Unloading topic {}", clientAppId(), topicName);
232-
if (topicName.isGlobal()) {
233-
validateGlobalNamespaceOwnership(namespaceName);
231+
try {
232+
validateTopicName(tenant, namespace, encodedTopic);
233+
internalUnloadTopic(asyncResponse, authoritative);
234+
} catch (WebApplicationException wae) {
235+
asyncResponse.resume(wae);
236+
} catch (Exception e) {
237+
asyncResponse.resume(new RestException(e));
234238
}
235-
unloadTopic(topicName, authoritative);
236239
}
237240

238241
@GET

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,7 @@ public void deletePartitionedTopic(
373373
@ApiResponse(code = 500, message = "Internal server error"),
374374
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration") })
375375
public void unloadTopic(
376+
@Suspended final AsyncResponse asyncResponse,
376377
@ApiParam(value = "Specify the tenant", required = true)
377378
@PathParam("tenant") String tenant,
378379
@ApiParam(value = "Specify the namespace", required = true)
@@ -381,8 +382,14 @@ public void unloadTopic(
381382
@PathParam("topic") @Encoded String encodedTopic,
382383
@ApiParam(value = "Is authentication required to perform this operation")
383384
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
384-
validateTopicName(tenant, namespace, encodedTopic);
385-
internalUnloadTopic(authoritative);
385+
try {
386+
validateTopicName(tenant, namespace, encodedTopic);
387+
internalUnloadTopic(asyncResponse, authoritative);
388+
} catch (WebApplicationException wae) {
389+
asyncResponse.resume(wae);
390+
} catch (Exception e) {
391+
asyncResponse.resume(new RestException(e));
392+
}
386393
}
387394

388395
@DELETE

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -276,18 +276,46 @@ public void testUpdatePartitionedTopicHavingNonPartitionTopicWithPartitionSuffix
276276
@Test
277277
public void testUnloadTopic() {
278278
final String topicName = "standard-topic-to-be-unload";
279+
final String partitionTopicName = "partition-topic-to-be-unload";
280+
281+
// 1) not exist topic
282+
AsyncResponse response = mock(AsyncResponse.class);
283+
persistentTopics.unloadTopic(response, testTenant, testNamespace, "topic-not-exist", true);
284+
ArgumentCaptor<RestException> errCaptor = ArgumentCaptor.forClass(RestException.class);
285+
verify(response, timeout(5000).times(1)).resume(errCaptor.capture());
286+
Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode());
287+
288+
// 2) create non partitioned topic and unload
289+
response = mock(AsyncResponse.class);
279290
persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, true);
280-
persistentTopics.unloadTopic(testTenant, testNamespace, topicName, true);
291+
persistentTopics.unloadTopic(response, testTenant, testNamespace, topicName, true);
292+
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
293+
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
294+
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
295+
296+
// 3) create partitioned topic and unload
297+
response = mock(AsyncResponse.class);
298+
persistentTopics.createPartitionedTopic(testTenant, testNamespace, partitionTopicName, 6);
299+
persistentTopics.unloadTopic(response, testTenant, testNamespace, partitionTopicName, true);
300+
responseCaptor = ArgumentCaptor.forClass(Response.class);
301+
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
302+
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
303+
304+
// 4) delete partitioned topic
305+
response = mock(AsyncResponse.class);
306+
persistentTopics.deletePartitionedTopic(response, testTenant, testNamespace, partitionTopicName, true, true);
307+
responseCaptor = ArgumentCaptor.forClass(Response.class);
308+
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
309+
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
281310
}
282311

283-
@Test(expectedExceptions = RestException.class)
312+
@Test
284313
public void testUnloadTopicShallThrowNotFoundWhenTopicNotExist() {
285-
try {
286-
persistentTopics.unloadTopic(testTenant, testNamespace,"non-existent-topic", true);
287-
} catch (RestException e) {
288-
Assert.assertEquals(e.getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode());
289-
throw e;
290-
}
314+
AsyncResponse response = mock(AsyncResponse.class);
315+
persistentTopics.unloadTopic(response, testTenant, testNamespace,"non-existent-topic", true);
316+
ArgumentCaptor<RestException> responseCaptor = ArgumentCaptor.forClass(RestException.class);
317+
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
318+
Assert.assertEquals(responseCaptor.getValue().getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode());
291319
}
292320

293321
@Test

0 commit comments

Comments
 (0)