-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Subscription: [Bug] When using SubscriptionPullConsumer to consume data, an error is reported when configuring ConsumerConstant.NODE_URLS_KEY as the cluster address. #15322
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…s reported when configuring ConsumerConstant.NODE_URLS_KEY as the cluster address.
code:
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConstant.CONSUMER_ID_KEY, "c1");
consumerConfig.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1");
consumerConfig.put(ConsumerConstant.NODE_URLS_KEY, Arrays.asList("192.168.1.1:6667","192.168.1.2:6667","192.168.1.3:6667"));
consumerConfig.put(ConsumerConstant.USERNAME_KEY, "root");
consumerConfig.put(ConsumerConstant.PASSWORD_KEY, "root");
try (SubscriptionPullConsumer pullConsumer = new SubscriptionPullConsumer(consumerConfig)) {
pullConsumer.open();
pullConsumer.subscribe("my_topic");
while (true) {
List<SubscriptionMessage> messages = pullConsumer.poll(10000);
for (final SubscriptionMessage message : messages) {
final short messageType = message.getMessageType();
if (SubscriptionMessageType.isValidatedMessageType(messageType)) {
for (final SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) {
while (dataSet.hasNext()) {
final RowRecord record = dataSet.next();
System.out.println(record);
}
}
}
}
}
}
Error:
org.apache.iotdb.rpc.subscription.exception.SubscriptionConnectionException: Cluster has no available subscription providers to connect with initial endpoints [TEndPoint(ip:localhost, port:0)]
at org.apache.iotdb.session.subscription.consumer.SubscriptionProviders.openProviders(SubscriptionProviders.java:123)
at org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer.open(SubscriptionConsumer.java:260)
at org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer.open(SubscriptionPullConsumer.java:112)
at org.apache.iotdb.session.SessionTest.testNullCluster(SessionTest.java:1222)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:232)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:55)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, this is your first pull request in IoTDB project. Thanks for your contribution! IoTDB will be better because of you.
|
good catch! might consider adding a test case for this scenario in |
…s reported when configuring ConsumerConstant.NODE_URLS_KEY as the cluster address. (apache#15322) ### Description When using SubscriptionPullConsumer to consume data, an error is reported when configuring ConsumerConstant.NODE_URLS_KEY as the cluster address. ### Code: ``` Properties consumerConfig = new Properties(); consumerConfig.put(ConsumerConstant.CONSUMER_ID_KEY, "c1"); consumerConfig.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1"); consumerConfig.put(ConsumerConstant.NODE_URLS_KEY, Arrays.asList("192.168.1.1:6667","192.168.1.2:6667","192.168.1.3:6667")); consumerConfig.put(ConsumerConstant.USERNAME_KEY, "root"); consumerConfig.put(ConsumerConstant.PASSWORD_KEY, "root"); try (SubscriptionPullConsumer pullConsumer = new SubscriptionPullConsumer(consumerConfig)) { pullConsumer.open(); pullConsumer.subscribe("my_topic"); while (true) { List<SubscriptionMessage> messages = pullConsumer.poll(10000); for (final SubscriptionMessage message : messages) { final short messageType = message.getMessageType(); if (SubscriptionMessageType.isValidatedMessageType(messageType)) { for (final SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) { while (dataSet.hasNext()) { final RowRecord record = dataSet.next(); System.out.println(record); } } } } } } ``` ### Error: org.apache.iotdb.rpc.subscription.exception.SubscriptionConnectionException: Cluster has no available subscription providers to connect with initial endpoints [TEndPoint(ip:localhost, port:0)] at org.apache.iotdb.session.subscription.consumer.SubscriptionProviders.openProviders(SubscriptionProviders.java:123) at org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer.open(SubscriptionConsumer.java:260) at org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer.open(SubscriptionPullConsumer.java:112) at org.apache.iotdb.session.SessionTest.testNullCluster(SessionTest.java:1222) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69) at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38) at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11) at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35) at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:232) at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:55) ### Reason Since the value property of Properties cannot be set to null, and the AbstractSubscriptionConsumer(final AbstractSubscriptionConsumerBuilder builder, final Properties properties) constructor sets default values, the condition if (Objects.nonNull(builder.host) || Objects.nonNull(builder.port)) in the AbstractSubscriptionConsumer(final AbstractSubscriptionConsumerBuilder builder) constructor will always evaluate to true. ##### Key changed/added classes iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
…ta, an error is reported when configuring ConsumerConstant.NODE_URLS_KEY as the cluster address. (#15322) (#15358) * [Bug] When using SubscriptionPullConsumer to consume data, an error is reported when configuring ConsumerConstant.NODE_URLS_KEY as the cluster address. (#15322) ### Description When using SubscriptionPullConsumer to consume data, an error is reported when configuring ConsumerConstant.NODE_URLS_KEY as the cluster address. ### Code: ``` Properties consumerConfig = new Properties(); consumerConfig.put(ConsumerConstant.CONSUMER_ID_KEY, "c1"); consumerConfig.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1"); consumerConfig.put(ConsumerConstant.NODE_URLS_KEY, Arrays.asList("192.168.1.1:6667","192.168.1.2:6667","192.168.1.3:6667")); consumerConfig.put(ConsumerConstant.USERNAME_KEY, "root"); consumerConfig.put(ConsumerConstant.PASSWORD_KEY, "root"); try (SubscriptionPullConsumer pullConsumer = new SubscriptionPullConsumer(consumerConfig)) { pullConsumer.open(); pullConsumer.subscribe("my_topic"); while (true) { List<SubscriptionMessage> messages = pullConsumer.poll(10000); for (final SubscriptionMessage message : messages) { final short messageType = message.getMessageType(); if (SubscriptionMessageType.isValidatedMessageType(messageType)) { for (final SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) { while (dataSet.hasNext()) { final RowRecord record = dataSet.next(); System.out.println(record); } } } } } } ``` ### Error: org.apache.iotdb.rpc.subscription.exception.SubscriptionConnectionException: Cluster has no available subscription providers to connect with initial endpoints [TEndPoint(ip:localhost, port:0)] at org.apache.iotdb.session.subscription.consumer.SubscriptionProviders.openProviders(SubscriptionProviders.java:123) at org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer.open(SubscriptionConsumer.java:260) at org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer.open(SubscriptionPullConsumer.java:112) at org.apache.iotdb.session.SessionTest.testNullCluster(SessionTest.java:1222) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69) at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38) at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11) at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35) at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:232) at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:55) ### Reason Since the value property of Properties cannot be set to null, and the AbstractSubscriptionConsumer(final AbstractSubscriptionConsumerBuilder builder, final Properties properties) constructor sets default values, the condition if (Objects.nonNull(builder.host) || Objects.nonNull(builder.port)) in the AbstractSubscriptionConsumer(final AbstractSubscriptionConsumerBuilder builder) constructor will always evaluate to true. ##### Key changed/added classes iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java * fixup --------- Co-authored-by: lixiaobao <[email protected]>
…s reported when configuring ConsumerConstant.NODE_URLS_KEY as the cluster address. (#15322) ### Description When using SubscriptionPullConsumer to consume data, an error is reported when configuring ConsumerConstant.NODE_URLS_KEY as the cluster address. ### Code: ``` Properties consumerConfig = new Properties(); consumerConfig.put(ConsumerConstant.CONSUMER_ID_KEY, "c1"); consumerConfig.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1"); consumerConfig.put(ConsumerConstant.NODE_URLS_KEY, Arrays.asList("192.168.1.1:6667","192.168.1.2:6667","192.168.1.3:6667")); consumerConfig.put(ConsumerConstant.USERNAME_KEY, "root"); consumerConfig.put(ConsumerConstant.PASSWORD_KEY, "root"); try (SubscriptionPullConsumer pullConsumer = new SubscriptionPullConsumer(consumerConfig)) { pullConsumer.open(); pullConsumer.subscribe("my_topic"); while (true) { List<SubscriptionMessage> messages = pullConsumer.poll(10000); for (final SubscriptionMessage message : messages) { final short messageType = message.getMessageType(); if (SubscriptionMessageType.isValidatedMessageType(messageType)) { for (final SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) { while (dataSet.hasNext()) { final RowRecord record = dataSet.next(); System.out.println(record); } } } } } } ``` ### Error: org.apache.iotdb.rpc.subscription.exception.SubscriptionConnectionException: Cluster has no available subscription providers to connect with initial endpoints [TEndPoint(ip:localhost, port:0)] at org.apache.iotdb.session.subscription.consumer.SubscriptionProviders.openProviders(SubscriptionProviders.java:123) at org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer.open(SubscriptionConsumer.java:260) at org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer.open(SubscriptionPullConsumer.java:112) at org.apache.iotdb.session.SessionTest.testNullCluster(SessionTest.java:1222) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69) at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38) at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11) at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35) at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:232) at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:55) ### Reason Since the value property of Properties cannot be set to null, and the AbstractSubscriptionConsumer(final AbstractSubscriptionConsumerBuilder builder, final Properties properties) constructor sets default values, the condition if (Objects.nonNull(builder.host) || Objects.nonNull(builder.port)) in the AbstractSubscriptionConsumer(final AbstractSubscriptionConsumerBuilder builder) constructor will always evaluate to true. ##### Key changed/added classes iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java (cherry picked from commit 858c8b8)
Description
When using SubscriptionPullConsumer to consume data, an error is reported when configuring ConsumerConstant.NODE_URLS_KEY as the cluster address.
Code:
Error:
org.apache.iotdb.rpc.subscription.exception.SubscriptionConnectionException: Cluster has no available subscription providers to connect with initial endpoints [TEndPoint(ip:localhost, port:0)]
Reason
Since the value property of Properties cannot be set to null, and the AbstractSubscriptionConsumer(final AbstractSubscriptionConsumerBuilder builder, final Properties properties) constructor sets default values, the condition if (Objects.nonNull(builder.host) || Objects.nonNull(builder.port)) in the AbstractSubscriptionConsumer(final AbstractSubscriptionConsumerBuilder builder) constructor will always evaluate to true.
Key changed/added classes
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java