Skip to content

Commit 7f54522

Browse files
authored
Subscription: support drop subscription from session & intro allTopicMessagesHaveBeenConsumed for pull consumer (#15486)
- Introducing a new DropSubscriptionTask and corresponding updates across executor interfaces, cluster executors, and managers. - Updating the SQL parser to recognize the DROP SUBSCRIPTION statement. - Enhancing client session APIs and internal data models to support subscription deletion.
1 parent 632d87e commit 7f54522

File tree

50 files changed

+828
-112
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+828
-112
lines changed

example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ private static void dataSubscription3() throws Exception {
272272
.buildPushConsumer()) {
273273
consumer3.open();
274274
consumer3.subscribe(TOPIC_3);
275-
while (!consumer3.allSnapshotTopicMessagesHaveBeenConsumed()) {
275+
while (!consumer3.allTopicMessagesHaveBeenConsumed()) {
276276
LockSupport.parkNanos(SLEEP_NS); // wait some time
277277
}
278278
}
@@ -314,7 +314,7 @@ private static void dataSubscription4() throws Exception {
314314
.buildPullConsumer()) {
315315
consumer4.open();
316316
consumer4.subscribe(TOPIC_4);
317-
while (!consumer4.allSnapshotTopicMessagesHaveBeenConsumed()) {
317+
while (!consumer4.allTopicMessagesHaveBeenConsumed()) {
318318
for (final SubscriptionMessage message : consumer4.poll(POLL_TIMEOUT_MS)) {
319319
final SubscriptionTsFileHandler handler = message.getTsFileHandler();
320320
handler.moveFile(

integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/treemodel/IoTDBSubscriptionTopicIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -860,7 +860,7 @@ public void testSnapshotModeWithEmptyData() throws Exception {
860860
consumer.open();
861861
consumer.subscribe(topicName);
862862

863-
while (!consumer.allSnapshotTopicMessagesHaveBeenConsumed()) {
863+
while (!consumer.allTopicMessagesHaveBeenConsumed()) {
864864
LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); // wait some time
865865
consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS); // poll and ignore
866866
}

integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,22 @@
1919

2020
package org.apache.iotdb.subscription.it.local;
2121

22+
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
23+
import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq;
24+
import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionResp;
2225
import org.apache.iotdb.isession.ISession;
2326
import org.apache.iotdb.it.env.EnvFactory;
2427
import org.apache.iotdb.it.framework.IoTDBTestRunner;
2528
import org.apache.iotdb.itbase.category.LocalStandaloneIT;
29+
import org.apache.iotdb.rpc.RpcUtils;
2630
import org.apache.iotdb.rpc.subscription.config.TopicConstant;
2731
import org.apache.iotdb.session.subscription.SubscriptionTreeSession;
2832
import org.apache.iotdb.session.subscription.consumer.AckStrategy;
2933
import org.apache.iotdb.session.subscription.consumer.AsyncCommitCallback;
3034
import org.apache.iotdb.session.subscription.consumer.ConsumeResult;
3135
import org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePullConsumer;
3236
import org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePushConsumer;
37+
import org.apache.iotdb.session.subscription.model.Subscription;
3338
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
3439
import org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
3540
import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
@@ -51,6 +56,7 @@
5156
import java.util.Collections;
5257
import java.util.List;
5358
import java.util.Properties;
59+
import java.util.Set;
5460
import java.util.concurrent.atomic.AtomicBoolean;
5561
import java.util.concurrent.atomic.AtomicInteger;
5662
import java.util.concurrent.atomic.AtomicLong;
@@ -621,4 +627,95 @@ public void testMissingConsumerId() {
621627
fail(e.getMessage());
622628
}
623629
}
630+
631+
@Test
632+
public void testDropSubscriptionBySession() throws Exception {
633+
// Insert some historical data
634+
try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
635+
for (int i = 0; i < 100; ++i) {
636+
session.executeNonQueryStatement(
637+
String.format("insert into root.db.d1(time, s1) values (%s, 1)", i));
638+
}
639+
session.executeNonQueryStatement("flush");
640+
} catch (final Exception e) {
641+
e.printStackTrace();
642+
fail(e.getMessage());
643+
}
644+
645+
// Create topic
646+
final String topicName = "topic8";
647+
final String host = EnvFactory.getEnv().getIP();
648+
final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
649+
try (final SubscriptionTreeSession session = new SubscriptionTreeSession(host, port)) {
650+
session.open();
651+
session.createTopic(topicName);
652+
} catch (final Exception e) {
653+
e.printStackTrace();
654+
fail(e.getMessage());
655+
}
656+
657+
// Subscription
658+
final Thread thread =
659+
new Thread(
660+
() -> {
661+
try (final SubscriptionTreePullConsumer consumer =
662+
new SubscriptionTreePullConsumer.Builder()
663+
.host(host)
664+
.port(port)
665+
.consumerId("c1")
666+
.consumerGroupId("cg1")
667+
.autoCommit(true)
668+
.buildPullConsumer()) {
669+
consumer.open();
670+
consumer.subscribe(topicName);
671+
672+
while (!consumer.allTopicMessagesHaveBeenConsumed()) {
673+
LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); // wait some time
674+
consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS); // poll and ignore
675+
}
676+
} catch (final Exception e) {
677+
e.printStackTrace();
678+
// Avoid failure
679+
} finally {
680+
LOGGER.info("consumer exiting...");
681+
}
682+
},
683+
String.format("%s - consumer", testName.getDisplayName()));
684+
thread.start();
685+
686+
// Drop Subscription
687+
LockSupport.parkNanos(5_000_000_000L); // wait some time
688+
try (final SubscriptionTreeSession session = new SubscriptionTreeSession(host, port)) {
689+
session.open();
690+
final Set<Subscription> subscriptions = session.getSubscriptions(topicName);
691+
Assert.assertEquals(1, subscriptions.size());
692+
session.dropSubscription(subscriptions.iterator().next().getSubscriptionId());
693+
} catch (final Exception e) {
694+
e.printStackTrace();
695+
fail(e.getMessage());
696+
}
697+
698+
try {
699+
// Keep retrying if there are execution failures
700+
AWAIT.untilAsserted(
701+
() -> {
702+
// Check empty subscription
703+
try (final SyncConfigNodeIServiceClient client =
704+
(SyncConfigNodeIServiceClient)
705+
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
706+
final TShowSubscriptionResp showSubscriptionResp =
707+
client.showSubscription(new TShowSubscriptionReq());
708+
Assert.assertEquals(
709+
RpcUtils.SUCCESS_STATUS.getCode(), showSubscriptionResp.status.getCode());
710+
Assert.assertNotNull(showSubscriptionResp.subscriptionInfoList);
711+
Assert.assertEquals(0, showSubscriptionResp.subscriptionInfoList.size());
712+
}
713+
});
714+
} catch (final Exception e) {
715+
e.printStackTrace();
716+
fail(e.getMessage());
717+
} finally {
718+
thread.join();
719+
}
720+
}
624721
}

iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ExportTsFile.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public class ExportTsFile {
4747
public static void main(String[] args) throws Exception {
4848
Logger logger =
4949
(Logger) LoggerFactory.getLogger("org.apache.iotdb.session.subscription.consumer.base");
50-
logger.setLevel(Level.ERROR);
50+
logger.setLevel(Level.WARN);
5151
Options options = OptionsUtil.createSubscriptionTsFileOptions();
5252
parseParams(args, options);
5353
if (StringUtils.isEmpty(commonParam.getPath())) {

iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/SubscriptionTableTsFile.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ public void consumerPoll(ExecutorService executor, String topicName) {
148148
@Override
149149
public void run() {
150150
final String consumerGroupId = consumer.getConsumerGroupId();
151-
while (!consumer.allSnapshotTopicMessagesHaveBeenConsumed()) {
151+
while (!consumer.allTopicMessagesHaveBeenConsumed()) {
152152
try {
153153
for (final SubscriptionMessage message :
154154
consumer.poll(Constants.POLL_MESSAGE_TIMEOUT)) {

iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/SubscriptionTreeTsFile.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ public void consumerPoll(ExecutorService executor, String topicName) {
144144
new Runnable() {
145145
@Override
146146
public void run() {
147-
while (!consumer.allSnapshotTopicMessagesHaveBeenConsumed()) {
147+
while (!consumer.allTopicMessagesHaveBeenConsumed()) {
148148
try {
149149
for (final SubscriptionMessage message :
150150
consumer.poll(Constants.POLL_MESSAGE_TIMEOUT)) {

iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/AbstractSubscriptionSession.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,20 @@ protected Set<Subscription> getSubscriptions(final String topicName)
177177
}
178178
}
179179

180+
protected void dropSubscription(final String subscriptionId)
181+
throws IoTDBConnectionException, StatementExecutionException {
182+
IdentifierUtils.checkAndParseIdentifier(subscriptionId); // ignore the parse result
183+
final String sql = String.format("DROP SUBSCRIPTION %s", subscriptionId);
184+
session.executeNonQueryStatement(sql);
185+
}
186+
187+
protected void dropSubscriptionIfExists(final String subscriptionId)
188+
throws IoTDBConnectionException, StatementExecutionException {
189+
IdentifierUtils.checkAndParseIdentifier(subscriptionId); // ignore the parse result
190+
final String sql = String.format("DROP SUBSCRIPTION IF EXISTS %s", subscriptionId);
191+
session.executeNonQueryStatement(sql);
192+
}
193+
180194
/////////////////////////////// utility ///////////////////////////////
181195

182196
private Set<Topic> convertDataSetToTopics(final SessionDataSet dataSet)
@@ -202,7 +216,7 @@ private Set<Subscription> convertDataSetToSubscriptions(final SessionDataSet dat
202216
while (dataSet.hasNext()) {
203217
final RowRecord record = dataSet.next();
204218
final List<Field> fields = record.getFields();
205-
if (fields.size() != 3) {
219+
if (fields.size() != 4) {
206220
throw new SubscriptionException(
207221
String.format(
208222
"Unexpected fields %s was obtained during SHOW SUBSCRIPTION...",
@@ -212,7 +226,8 @@ private Set<Subscription> convertDataSetToSubscriptions(final SessionDataSet dat
212226
new Subscription(
213227
fields.get(0).getStringValue(),
214228
fields.get(1).getStringValue(),
215-
fields.get(2).getStringValue()));
229+
fields.get(2).getStringValue(),
230+
fields.get(3).getStringValue()));
216231
}
217232
return subscriptions;
218233
}

iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/ISubscriptionTableSession.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,4 +174,26 @@ Optional<Topic> getTopic(final String topicName)
174174
*/
175175
Set<Subscription> getSubscriptions(final String topicName)
176176
throws IoTDBConnectionException, StatementExecutionException;
177+
178+
/**
179+
* Removes the subscription identified by the given subscription ID.
180+
*
181+
* @param subscriptionId The unique identifier of the subscription to be removed.
182+
* @throws IoTDBConnectionException If there is an issue with the connection to IoTDB.
183+
* @throws StatementExecutionException If there is an issue executing the SQL statement.
184+
*/
185+
void dropSubscription(final String subscriptionId)
186+
throws IoTDBConnectionException, StatementExecutionException;
187+
188+
/**
189+
* Removes the subscription identified by the given subscription ID if it exists.
190+
*
191+
* <p>If the subscription does not exist, this method will not throw an exception.
192+
*
193+
* @param subscriptionId The unique identifier of the subscription to be removed.
194+
* @throws IoTDBConnectionException If there is an issue with the connection to IoTDB.
195+
* @throws StatementExecutionException If there is an issue executing the SQL statement.
196+
*/
197+
void dropSubscriptionIfExists(final String subscriptionId)
198+
throws IoTDBConnectionException, StatementExecutionException;
177199
}

iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/ISubscriptionTreeSession.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,4 +174,26 @@ Optional<Topic> getTopic(final String topicName)
174174
*/
175175
Set<Subscription> getSubscriptions(final String topicName)
176176
throws IoTDBConnectionException, StatementExecutionException;
177+
178+
/**
179+
* Removes the subscription identified by the given subscription ID.
180+
*
181+
* @param subscriptionId The unique identifier of the subscription to be removed.
182+
* @throws IoTDBConnectionException If there is an issue with the connection to IoTDB.
183+
* @throws StatementExecutionException If there is an issue executing the SQL statement.
184+
*/
185+
void dropSubscription(final String subscriptionId)
186+
throws IoTDBConnectionException, StatementExecutionException;
187+
188+
/**
189+
* Removes the subscription identified by the given subscription ID if it exists.
190+
*
191+
* <p>If the subscription does not exist, this method will not throw an exception.
192+
*
193+
* @param subscriptionId The unique identifier of the subscription to be removed.
194+
* @throws IoTDBConnectionException If there is an issue with the connection to IoTDB.
195+
* @throws StatementExecutionException If there is an issue executing the SQL statement.
196+
*/
197+
void dropSubscriptionIfExists(final String subscriptionId)
198+
throws IoTDBConnectionException, StatementExecutionException;
177199
}

iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTableSession.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,4 +110,16 @@ public Set<Subscription> getSubscriptions(final String topicName)
110110
throws IoTDBConnectionException, StatementExecutionException {
111111
return super.getSubscriptions(topicName);
112112
}
113+
114+
@Override
115+
public void dropSubscription(final String subscriptionId)
116+
throws IoTDBConnectionException, StatementExecutionException {
117+
super.dropSubscription(subscriptionId);
118+
}
119+
120+
@Override
121+
public void dropSubscriptionIfExists(final String subscriptionId)
122+
throws IoTDBConnectionException, StatementExecutionException {
123+
super.dropSubscriptionIfExists(subscriptionId);
124+
}
113125
}

0 commit comments

Comments
 (0)