Skip to content

Commit b21f728

Browse files
authored
[broker][admin]Add api for update topic properties (apache#17238)
1 parent 40d2ae3 commit b21f728

File tree

7 files changed

+222
-0
lines changed

7 files changed

+222
-0
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
6363
import org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog;
6464
import org.apache.bookkeeper.mledger.impl.PositionImpl;
65+
import org.apache.commons.collections4.MapUtils;
6566
import org.apache.commons.lang3.StringUtils;
6667
import org.apache.pulsar.broker.PulsarServerException;
6768
import org.apache.pulsar.broker.PulsarService;
@@ -600,6 +601,60 @@ private CompletableFuture<Map<String, String>> getPropertiesAsync() {
600601
});
601602
}
602603

604+
protected CompletableFuture<Void> internalUpdatePropertiesAsync(boolean authoritative,
605+
Map<String, String> properties) {
606+
if (properties == null || properties.isEmpty()) {
607+
log.warn("[{}] [{}] properties is empty, ignore update", clientAppId(), topicName);
608+
return CompletableFuture.completedFuture(null);
609+
}
610+
return validateTopicOwnershipAsync(topicName, authoritative)
611+
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.PRODUCE))
612+
.thenCompose(__ -> {
613+
if (topicName.isPartitioned()) {
614+
return internalUpdateNonPartitionedTopicProperties(properties);
615+
} else {
616+
return pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName)
617+
.thenCompose(metadata -> {
618+
if (metadata.partitions == 0) {
619+
return internalUpdateNonPartitionedTopicProperties(properties);
620+
}
621+
return namespaceResources()
622+
.getPartitionedTopicResources().updatePartitionedTopicAsync(topicName,
623+
p -> new PartitionedTopicMetadata(p.partitions,
624+
MapUtils.putAll(p.properties, properties.entrySet().toArray())));
625+
});
626+
}
627+
}).thenAccept(__ ->
628+
log.info("[{}] [{}] update properties success with properties {}",
629+
clientAppId(), topicName, properties));
630+
}
631+
632+
private CompletableFuture<Void> internalUpdateNonPartitionedTopicProperties(Map<String, String> properties) {
633+
CompletableFuture<Void> future = new CompletableFuture<>();
634+
pulsar().getBrokerService().getTopicIfExists(topicName.toString())
635+
.thenAccept(opt -> {
636+
if (!opt.isPresent()) {
637+
throw new RestException(Status.NOT_FOUND,
638+
getTopicNotFoundErrorMessage(topicName.toString()));
639+
}
640+
ManagedLedger managedLedger = ((PersistentTopic) opt.get()).getManagedLedger();
641+
managedLedger.asyncSetProperties(properties, new AsyncCallbacks.UpdatePropertiesCallback() {
642+
643+
@Override
644+
public void updatePropertiesComplete(Map<String, String> properties, Object ctx) {
645+
managedLedger.getConfig().getProperties().putAll(properties);
646+
future.complete(null);
647+
}
648+
649+
@Override
650+
public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) {
651+
future.completeExceptionally(exception);
652+
}
653+
}, null);
654+
});
655+
return future;
656+
}
657+
603658
protected CompletableFuture<Void> internalCheckTopicExists(TopicName topicName) {
604659
return pulsar().getNamespaceService().checkTopicExists(topicName)
605660
.thenAccept(exist -> {

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -966,6 +966,42 @@ public void getProperties(
966966
});
967967
}
968968

969+
@PUT
970+
@Path("/{tenant}/{namespace}/{topic}/properties")
971+
@ApiOperation(value = "Update the properties on the given topic.")
972+
@ApiResponses(value = {
973+
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
974+
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or"
975+
+ "subscriber is not authorized to access this operation"),
976+
@ApiResponse(code = 403, message = "Don't have admin permission"),
977+
@ApiResponse(code = 404, message = "Topic/Subscription does not exist"),
978+
@ApiResponse(code = 405, message = "Method Not Allowed"),
979+
@ApiResponse(code = 500, message = "Internal server error"),
980+
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
981+
})
982+
public void updateProperties(
983+
@Suspended final AsyncResponse asyncResponse,
984+
@ApiParam(value = "Specify the tenant", required = true)
985+
@PathParam("tenant") String tenant,
986+
@ApiParam(value = "Specify the namespace", required = true)
987+
@PathParam("namespace") String namespace,
988+
@ApiParam(value = "Specify topic name", required = true)
989+
@PathParam("topic") @Encoded String encodedTopic,
990+
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
991+
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
992+
@ApiParam(value = "Key value pair properties for the topic metadata") Map<String, String> properties){
993+
validatePersistentTopicName(tenant, namespace, encodedTopic);
994+
internalUpdatePropertiesAsync(authoritative, properties)
995+
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
996+
.exceptionally(ex -> {
997+
if (!isRedirectException(ex)) {
998+
log.error("[{}] Failed to update topic {} properties", clientAppId(), topicName, ex);
999+
}
1000+
resumeAsyncResponseExceptionally(asyncResponse, ex);
1001+
return null;
1002+
});
1003+
}
1004+
9691005
@DELETE
9701006
@Path("/{tenant}/{namespace}/{topic}/partitions")
9711007
@ApiOperation(value = "Delete a partitioned topic.",

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -887,6 +887,76 @@ public void testCreateAndGetTopicProperties() throws Exception {
887887
Assert.assertEquals(properties22.get("key2"), "value2");
888888
}
889889

890+
@Test
891+
public void testUpdatePartitionedTopicProperties() throws Exception {
892+
final String namespace = "prop-xyz/ns2";
893+
final String topicName = "persistent://" + namespace + "/testUpdatePartitionedTopicProperties";
894+
admin.namespaces().createNamespace(namespace, 20);
895+
896+
// create partitioned topic with properties
897+
Map<String, String> topicProperties = new HashMap<>();
898+
topicProperties.put("key1", "value1");
899+
admin.topics().createPartitionedTopic(topicName, 2, topicProperties);
900+
Map<String, String> properties = admin.topics().getProperties(topicName);
901+
Assert.assertNotNull(properties);
902+
Assert.assertEquals(properties.get("key1"), "value1");
903+
904+
// update with new key, old properties should keep
905+
topicProperties = new HashMap<>();
906+
topicProperties.put("key2", "value2");
907+
admin.topics().updateProperties(topicName, topicProperties);
908+
properties = admin.topics().getProperties(topicName);
909+
Assert.assertNotNull(properties);
910+
Assert.assertEquals(properties.size(), 2);
911+
Assert.assertEquals(properties.get("key1"), "value1");
912+
Assert.assertEquals(properties.get("key2"), "value2");
913+
914+
// override old values
915+
topicProperties = new HashMap<>();
916+
topicProperties.put("key1", "value11");
917+
admin.topics().updateProperties(topicName, topicProperties);
918+
properties = admin.topics().getProperties(topicName);
919+
Assert.assertNotNull(properties);
920+
Assert.assertEquals(properties.size(), 2);
921+
Assert.assertEquals(properties.get("key1"), "value11");
922+
Assert.assertEquals(properties.get("key2"), "value2");
923+
}
924+
925+
@Test
926+
public void testUpdateNonPartitionedTopicProperties() throws Exception {
927+
final String namespace = "prop-xyz/ns2";
928+
final String topicName = "persistent://" + namespace + "/testUpdateNonPartitionedTopicProperties";
929+
admin.namespaces().createNamespace(namespace, 20);
930+
931+
// create non-partitioned topic with properties
932+
Map<String, String> topicProperties = new HashMap<>();
933+
topicProperties.put("key1", "value1");
934+
admin.topics().createNonPartitionedTopic(topicName, topicProperties);
935+
Map<String, String> properties = admin.topics().getProperties(topicName);
936+
Assert.assertNotNull(properties);
937+
Assert.assertEquals(properties.get("key1"), "value1");
938+
939+
// update with new key, old properties should keep
940+
topicProperties = new HashMap<>();
941+
topicProperties.put("key2", "value2");
942+
admin.topics().updateProperties(topicName, topicProperties);
943+
properties = admin.topics().getProperties(topicName);
944+
Assert.assertNotNull(properties);
945+
Assert.assertEquals(properties.size(), 2);
946+
Assert.assertEquals(properties.get("key1"), "value1");
947+
Assert.assertEquals(properties.get("key2"), "value2");
948+
949+
// override old values
950+
topicProperties = new HashMap<>();
951+
topicProperties.put("key1", "value11");
952+
admin.topics().updateProperties(topicName, topicProperties);
953+
properties = admin.topics().getProperties(topicName);
954+
Assert.assertNotNull(properties);
955+
Assert.assertEquals(properties.size(), 2);
956+
Assert.assertEquals(properties.get("key1"), "value11");
957+
Assert.assertEquals(properties.get("key2"), "value2");
958+
}
959+
890960
@Test
891961
public void testNonPersistentTopics() throws Exception {
892962
final String namespace = "prop-xyz/ns2";

pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -736,6 +736,24 @@ void updatePartitionedTopic(String topic, int numPartitions, boolean updateLocal
736736
*/
737737
CompletableFuture<Map<String, String>> getPropertiesAsync(String topic);
738738

739+
/**
740+
* Update Topic Properties on a topic.
741+
* The new properties will override the existing values, old properties in the topic will be keep if not override.
742+
* @param topic
743+
* @param properties
744+
* @throws PulsarAdminException
745+
*/
746+
void updateProperties(String topic, Map<String, String> properties) throws PulsarAdminException;
747+
748+
/**
749+
* Update Topic Properties on a topic.
750+
* The new properties will override the existing values, old properties in the topic will be keep if not override.
751+
* @param topic
752+
* @param properties
753+
* @return
754+
*/
755+
CompletableFuture<Void> updatePropertiesAsync(String topic, Map<String, String> properties);
756+
739757
/**
740758
* Delete a partitioned topic and its schemas.
741759
* <p/>

pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,21 @@ public CompletableFuture<Map<String, String>> getPropertiesAsync(String topic) {
456456
return asyncGetRequest(path, new FutureCallback<Map<String, String>>(){});
457457
}
458458

459+
@Override
460+
public void updateProperties(String topic, Map<String, String> properties) throws PulsarAdminException {
461+
sync(() -> updatePropertiesAsync(topic, properties));
462+
}
463+
464+
@Override
465+
public CompletableFuture<Void> updatePropertiesAsync(String topic, Map<String, String> properties) {
466+
TopicName tn = validateTopic(topic);
467+
WebTarget path = topicPath(tn, "properties");
468+
if (properties == null) {
469+
properties = new HashMap<>();
470+
}
471+
return asyncPutRequest(path, Entity.entity(properties, MediaType.APPLICATION_JSON));
472+
}
473+
459474
@Override
460475
public void deletePartitionedTopic(String topic) throws PulsarAdminException {
461476
deletePartitionedTopic(topic, false);

pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1501,6 +1501,13 @@ public void topics() throws Exception {
15011501
cmdTopics.run(split("update-subscription-properties persistent://myprop/clust/ns1/ds1 -s sub1 --clear"));
15021502
verify(mockTopics).updateSubscriptionProperties("persistent://myprop/clust/ns1/ds1", "sub1", new HashMap<>());
15031503

1504+
cmdTopics = new CmdTopics(() -> admin);
1505+
cmdTopics.run(split("update-properties persistent://myprop/clust/ns1/ds1 --property a=b -p x=y,z"));
1506+
props = new HashMap<>();
1507+
props.put("a", "b");
1508+
props.put("x", "y,z");
1509+
verify(mockTopics).updateProperties("persistent://myprop/clust/ns1/ds1", props);
1510+
15041511
cmdTopics = new CmdTopics(() -> admin);
15051512
cmdTopics.run(split("get-subscription-properties persistent://myprop/clust/ns1/ds1 -s sub1"));
15061513
verify(mockTopics).getSubscriptionProperties("persistent://myprop/clust/ns1/ds1", "sub1");

pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ public CmdTopics(Supplier<PulsarAdmin> admin) {
122122
jcommander.addCommand("update-partitioned-topic", new UpdatePartitionedCmd());
123123
jcommander.addCommand("get-partitioned-topic-metadata", new GetPartitionedTopicMetadataCmd());
124124
jcommander.addCommand("get-properties", new GetPropertiesCmd());
125+
jcommander.addCommand("update-properties", new UpdateProperties());
125126

126127
jcommander.addCommand("delete-partitioned-topic", new DeletePartitionedCmd());
127128
jcommander.addCommand("peek-messages", new PeekMessages());
@@ -624,6 +625,26 @@ void run() throws Exception {
624625
}
625626
}
626627

628+
@Parameters(commandDescription = "Update the properties of on a topic")
629+
private class UpdateProperties extends CliCommand {
630+
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
631+
private java.util.List<String> params;
632+
633+
@Parameter(names = {"--property", "-p"}, description = "key value pair properties(-p a=b -p c=d)",
634+
required = false, splitter = NoSplitter.class)
635+
private java.util.List<String> properties;
636+
637+
@Override
638+
void run() throws Exception {
639+
String topic = validateTopicName(params);
640+
Map<String, String> map = parseListKeyValueMap(properties);
641+
if (map == null) {
642+
map = Collections.emptyMap();
643+
}
644+
getTopics().updateProperties(topic, map);
645+
}
646+
}
647+
627648
@Parameters(commandDescription = "Delete a partitioned topic. "
628649
+ "It will also delete all the partitions of the topic if it exists."
629650
+ "And the application is not able to connect to the topic(delete then re-create with same name) again "

0 commit comments

Comments
 (0)