Skip to content

Commit 29a6836

Browse files
authored
Subscription: tree/table model isolation for topic/consumer/subscription entities (#15484)
1 parent eda4249 commit 29a6836

File tree

23 files changed

+399
-87
lines changed

23 files changed

+399
-87
lines changed
Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.subscription.it.local.tablemodel;
21+
22+
import org.apache.iotdb.it.env.EnvFactory;
23+
import org.apache.iotdb.it.framework.IoTDBTestRunner;
24+
import org.apache.iotdb.itbase.category.LocalStandaloneIT;
25+
import org.apache.iotdb.session.subscription.ISubscriptionTableSession;
26+
import org.apache.iotdb.session.subscription.ISubscriptionTreeSession;
27+
import org.apache.iotdb.session.subscription.SubscriptionTableSessionBuilder;
28+
import org.apache.iotdb.session.subscription.SubscriptionTreeSessionBuilder;
29+
import org.apache.iotdb.session.subscription.consumer.ISubscriptionTablePullConsumer;
30+
import org.apache.iotdb.session.subscription.consumer.ISubscriptionTreePullConsumer;
31+
import org.apache.iotdb.session.subscription.consumer.table.SubscriptionTablePullConsumerBuilder;
32+
import org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePullConsumerBuilder;
33+
import org.apache.iotdb.subscription.it.local.AbstractSubscriptionLocalIT;
34+
35+
import org.junit.Assert;
36+
import org.junit.Before;
37+
import org.junit.Test;
38+
import org.junit.experimental.categories.Category;
39+
import org.junit.runner.RunWith;
40+
41+
import static org.junit.Assert.fail;
42+
43+
@RunWith(IoTDBTestRunner.class)
44+
@Category({LocalStandaloneIT.class})
45+
public class IoTDBSubscriptionIsolationIT extends AbstractSubscriptionLocalIT {
46+
47+
@Override
48+
@Before
49+
public void setUp() throws Exception {
50+
super.setUp();
51+
}
52+
53+
@Test
54+
public void testTopicIsolation() throws Exception {
55+
final String treeTopicName = "treeTopic";
56+
final String tableTopicName = "tableTopic";
57+
58+
final String host = EnvFactory.getEnv().getIP();
59+
final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
60+
61+
// create tree topic
62+
try (final ISubscriptionTreeSession session =
63+
new SubscriptionTreeSessionBuilder().host(host).port(port).build()) {
64+
session.open();
65+
session.createTopic(treeTopicName);
66+
}
67+
68+
// create table topic
69+
try (final ISubscriptionTableSession session =
70+
new SubscriptionTableSessionBuilder().host(host).port(port).build()) {
71+
session.createTopic(tableTopicName);
72+
}
73+
74+
// show topic on tree session
75+
try (final ISubscriptionTreeSession session =
76+
new SubscriptionTreeSessionBuilder().host(host).port(port).build()) {
77+
session.open();
78+
Assert.assertEquals(1, session.getTopics().size());
79+
Assert.assertTrue(session.getTopic(treeTopicName).isPresent());
80+
Assert.assertFalse(session.getTopic(tableTopicName).isPresent());
81+
}
82+
83+
// show topic on table session
84+
try (final ISubscriptionTableSession session =
85+
new SubscriptionTableSessionBuilder().host(host).port(port).build()) {
86+
Assert.assertEquals(1, session.getTopics().size());
87+
Assert.assertTrue(session.getTopic(tableTopicName).isPresent());
88+
Assert.assertFalse(session.getTopic(treeTopicName).isPresent());
89+
}
90+
91+
// drop table topic on tree session
92+
try (final ISubscriptionTreeSession session =
93+
new SubscriptionTreeSessionBuilder().host(host).port(port).build()) {
94+
session.open();
95+
try {
96+
session.dropTopic(tableTopicName);
97+
fail();
98+
} catch (final Exception ignored) {
99+
}
100+
}
101+
102+
// drop tree topic on table session
103+
try (final ISubscriptionTableSession session =
104+
new SubscriptionTableSessionBuilder().host(host).port(port).build()) {
105+
try {
106+
session.dropTopic(treeTopicName);
107+
fail();
108+
} catch (final Exception ignored) {
109+
}
110+
}
111+
112+
// drop tree topic on tree session
113+
try (final ISubscriptionTreeSession session =
114+
new SubscriptionTreeSessionBuilder().host(host).port(port).build()) {
115+
session.open();
116+
session.dropTopic(treeTopicName);
117+
}
118+
119+
// drop table topic on table session
120+
try (final ISubscriptionTableSession session =
121+
new SubscriptionTableSessionBuilder().host(host).port(port).build()) {
122+
session.dropTopic(tableTopicName);
123+
}
124+
}
125+
126+
@Test
127+
public void testSubscriptionIsolation() throws Exception {
128+
final String treeTopicName = "treeTopic";
129+
final String tableTopicName = "tableTopic";
130+
131+
final String host = EnvFactory.getEnv().getIP();
132+
final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
133+
134+
// create tree topic
135+
try (final ISubscriptionTreeSession session =
136+
new SubscriptionTreeSessionBuilder().host(host).port(port).build()) {
137+
session.open();
138+
session.createTopic(treeTopicName);
139+
}
140+
141+
// create table topic
142+
try (final ISubscriptionTableSession session =
143+
new SubscriptionTableSessionBuilder().host(host).port(port).build()) {
144+
session.createTopic(tableTopicName);
145+
}
146+
147+
// subscribe table topic on tree consumer
148+
try (final ISubscriptionTreePullConsumer consumer =
149+
new SubscriptionTreePullConsumerBuilder().host(host).port(port).build()) {
150+
consumer.open();
151+
try {
152+
consumer.subscribe(tableTopicName);
153+
fail();
154+
} catch (final Exception ignored) {
155+
}
156+
}
157+
158+
// subscribe tree topic on table consumer
159+
try (final ISubscriptionTablePullConsumer consumer =
160+
new SubscriptionTablePullConsumerBuilder().host(host).port(port).build()) {
161+
consumer.open();
162+
try {
163+
consumer.subscribe(treeTopicName);
164+
fail();
165+
} catch (final Exception ignored) {
166+
}
167+
}
168+
169+
// subscribe tree topic on tree consumer
170+
final ISubscriptionTreePullConsumer treeConsumer =
171+
new SubscriptionTreePullConsumerBuilder().host(host).port(port).build();
172+
treeConsumer.open();
173+
treeConsumer.subscribe(treeTopicName);
174+
175+
// subscribe table topic on table consumer
176+
final ISubscriptionTablePullConsumer tableConsumer =
177+
new SubscriptionTablePullConsumerBuilder().host(host).port(port).build();
178+
tableConsumer.open();
179+
tableConsumer.subscribe(tableTopicName);
180+
181+
// show subscription on tree session
182+
try (final ISubscriptionTreeSession session =
183+
new SubscriptionTreeSessionBuilder().host(host).port(port).build()) {
184+
session.open();
185+
Assert.assertEquals(1, session.getSubscriptions().size());
186+
Assert.assertEquals(1, session.getSubscriptions(treeTopicName).size());
187+
Assert.assertEquals(0, session.getSubscriptions(tableTopicName).size());
188+
}
189+
190+
// show subscription on table session
191+
try (final ISubscriptionTableSession session =
192+
new SubscriptionTableSessionBuilder().host(host).port(port).build()) {
193+
Assert.assertEquals(1, session.getSubscriptions().size());
194+
Assert.assertEquals(1, session.getSubscriptions(tableTopicName).size());
195+
Assert.assertEquals(0, session.getSubscriptions(treeTopicName).size());
196+
}
197+
198+
// unsubscribe table topic on tree consumer
199+
try {
200+
treeConsumer.unsubscribe(tableTopicName);
201+
fail();
202+
} catch (final Exception ignored) {
203+
204+
}
205+
206+
// unsubscribe tree topic on table consumer
207+
try {
208+
tableConsumer.unsubscribe(treeTopicName);
209+
fail();
210+
} catch (final Exception ignored) {
211+
212+
}
213+
214+
// close consumers
215+
treeConsumer.close();
216+
tableConsumer.close();
217+
}
218+
}

iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ public String getPassword() {
7676
return getString(ConsumerConstant.PASSWORD_KEY);
7777
}
7878

79+
public String getSqlDialect() {
80+
return getString(ConsumerConstant.SQL_DIALECT_KEY);
81+
}
82+
7983
public void setConsumerId(final String consumerId) {
8084
attributes.put(ConsumerConstant.CONSUMER_ID_KEY, consumerId);
8185
}

iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ public class ConsumerConstant {
2525

2626
/////////////////////////////// common ///////////////////////////////
2727

28+
// TODO: hide from the client
29+
public static final String SQL_DIALECT_KEY = "sql-dialect";
30+
2831
public static final String HOST_KEY = "host";
2932
public static final String PORT_KEY = "port";
3033
public static final String NODE_URLS_KEY = "node-urls";

iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ synchronized void handshake() throws SubscriptionException, IoTDBConnectionExcep
163163
consumerAttributes.put(ConsumerConstant.CONSUMER_ID_KEY, consumerId);
164164
consumerAttributes.put(ConsumerConstant.USERNAME_KEY, username);
165165
consumerAttributes.put(ConsumerConstant.PASSWORD_KEY, password);
166+
consumerAttributes.put(ConsumerConstant.SQL_DIALECT_KEY, session.getSqlDialect());
166167

167168
final PipeSubscribeHandshakeResp resp =
168169
handshake(new ConsumerConfig(consumerAttributes)); // throw SubscriptionException

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/subscription/SubscriptionTableResp.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import java.nio.ByteBuffer;
3232
import java.util.ArrayList;
3333
import java.util.List;
34+
import java.util.Objects;
35+
import java.util.stream.Collectors;
3436

3537
public class SubscriptionTableResp implements DataSet {
3638
private final TSStatus status;
@@ -46,19 +48,18 @@ public SubscriptionTableResp(
4648
this.allConsumerGroupMeta = allConsumerGroupMeta;
4749
}
4850

49-
public SubscriptionTableResp filter(String topicName) {
50-
if (topicName == null) {
51-
return this;
52-
} else {
53-
final List<SubscriptionMeta> filteredSubscriptionMeta = new ArrayList<>();
54-
for (SubscriptionMeta subscriptionMeta : allSubscriptionMeta) {
55-
if (subscriptionMeta.getTopicName().equals(topicName)) {
56-
filteredSubscriptionMeta.add(subscriptionMeta);
57-
break;
58-
}
59-
}
60-
return new SubscriptionTableResp(status, filteredSubscriptionMeta, allConsumerGroupMeta);
61-
}
51+
public SubscriptionTableResp filter(String topicName, boolean isTableModel) {
52+
return new SubscriptionTableResp(
53+
status,
54+
allSubscriptionMeta.stream()
55+
.filter(
56+
subscriptionMeta ->
57+
(Objects.isNull(topicName)
58+
|| Objects.equals(
59+
subscriptionMeta.getTopicMeta().getTopicName(), topicName))
60+
&& subscriptionMeta.getTopicMeta().visibleUnder(isTableModel))
61+
.collect(Collectors.toList()),
62+
allConsumerGroupMeta);
6263
}
6364

6465
public TShowSubscriptionResp convertToTShowSubscriptionResp() {
@@ -67,7 +68,7 @@ public TShowSubscriptionResp convertToTShowSubscriptionResp() {
6768
for (SubscriptionMeta subscriptionMeta : allSubscriptionMeta) {
6869
showSubscriptionInfoList.add(
6970
new TShowSubscriptionInfo(
70-
subscriptionMeta.getTopicName(),
71+
subscriptionMeta.getTopicMeta().getTopicName(),
7172
subscriptionMeta.getConsumerGroupId(),
7273
subscriptionMeta.getConsumerIds()));
7374
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/subscription/TopicTableResp.java

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import java.nio.ByteBuffer;
3131
import java.util.ArrayList;
3232
import java.util.List;
33+
import java.util.Objects;
34+
import java.util.stream.Collectors;
3335

3436
public class TopicTableResp implements DataSet {
3537
private final TSStatus status;
@@ -40,19 +42,16 @@ public TopicTableResp(TSStatus status, List<TopicMeta> allTopicMeta) {
4042
this.allTopicMeta = allTopicMeta;
4143
}
4244

43-
public TopicTableResp filter(String topicName) {
44-
if (topicName == null) {
45-
return this;
46-
} else {
47-
final List<TopicMeta> filteredTopicMeta = new ArrayList<>();
48-
for (TopicMeta topicMeta : allTopicMeta) {
49-
if (topicMeta.getTopicName().equals(topicName)) {
50-
filteredTopicMeta.add(topicMeta);
51-
break;
52-
}
53-
}
54-
return new TopicTableResp(status, filteredTopicMeta);
55-
}
45+
public TopicTableResp filter(String topicName, boolean isTableModel) {
46+
return new TopicTableResp(
47+
status,
48+
allTopicMeta.stream()
49+
.filter(
50+
topicMeta ->
51+
(Objects.isNull(topicName)
52+
|| Objects.equals(topicMeta.getTopicName(), topicName))
53+
&& topicMeta.visibleUnder(isTableModel))
54+
.collect(Collectors.toList()));
5655
}
5756

5857
public TShowTopicResp convertToTShowTopicResp() {

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -153,28 +153,23 @@ public TSStatus createTopic(TCreateTopicReq req) {
153153

154154
public TSStatus dropTopic(TDropTopicReq req) {
155155
final String topicName = req.getTopicName();
156-
final boolean isTopicExistedBeforeDrop = subscriptionInfo.isTopicExisted(topicName);
157-
final TSStatus status = configManager.getProcedureManager().dropTopic(topicName);
158-
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
159-
LOGGER.warn("Failed to drop topic {}. Result status: {}.", topicName, status);
160-
}
161-
162-
// If the `IF EXISTS` condition is not set and the topic does not exist before the drop
163-
// operation, return an error status indicating that the topic does not exist.
164-
final boolean isIfExistedConditionSet =
156+
final boolean isSetIfExistsCondition =
165157
req.isSetIfExistsCondition() && req.isIfExistsCondition();
166-
return isTopicExistedBeforeDrop || isIfExistedConditionSet
167-
? status
168-
: RpcUtils.getStatus(
169-
TSStatusCode.TOPIC_NOT_EXIST_ERROR,
170-
String.format(
171-
"Failed to drop topic %s. Failures: %s does not exist.", topicName, topicName));
158+
if (!subscriptionInfo.isTopicExisted(topicName, req.isTableModel)) {
159+
return isSetIfExistsCondition
160+
? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)
161+
: RpcUtils.getStatus(
162+
TSStatusCode.TOPIC_NOT_EXIST_ERROR,
163+
String.format(
164+
"Failed to drop topic %s. Failures: %s does not exist.", topicName, topicName));
165+
}
166+
return configManager.getProcedureManager().dropTopic(topicName);
172167
}
173168

174169
public TShowTopicResp showTopic(TShowTopicReq req) {
175170
try {
176171
return ((TopicTableResp) configManager.getConsensusManager().read(new ShowTopicPlan()))
177-
.filter(req.getTopicName())
172+
.filter(req.topicName, req.isTableModel)
178173
.convertToTShowTopicResp();
179174
} catch (Exception e) {
180175
LOGGER.warn("Failed to show topic info.", e);
@@ -252,7 +247,7 @@ public TShowSubscriptionResp showSubscription(TShowSubscriptionReq req) {
252247
try {
253248
return ((SubscriptionTableResp)
254249
configManager.getConsensusManager().read(new ShowSubscriptionPlan()))
255-
.filter(req.getTopicName())
250+
.filter(req.getTopicName(), req.isTableModel)
256251
.convertToTShowSubscriptionResp();
257252
} catch (Exception e) {
258253
LOGGER.warn("Failed to show subscription info.", e);

0 commit comments

Comments
 (0)