Skip to content

Conversation

@VGalaxies
Copy link
Contributor

…s reported when configuring ConsumerConstant.NODE_URLS_KEY as the cluster address. (apache#15322)

### Description
When using SubscriptionPullConsumer to consume data, an error is reported when configuring ConsumerConstant.NODE_URLS_KEY as the cluster address.


### Code:

```
Properties consumerConfig = new Properties();
        consumerConfig.put(ConsumerConstant.CONSUMER_ID_KEY, "c1");
        consumerConfig.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1");
        consumerConfig.put(ConsumerConstant.NODE_URLS_KEY, Arrays.asList("192.168.1.1:6667","192.168.1.2:6667","192.168.1.3:6667"));
        consumerConfig.put(ConsumerConstant.USERNAME_KEY, "root");
        consumerConfig.put(ConsumerConstant.PASSWORD_KEY, "root");
        try (SubscriptionPullConsumer pullConsumer = new SubscriptionPullConsumer(consumerConfig)) {
            pullConsumer.open();
            pullConsumer.subscribe("my_topic");
            while (true) {
                List<SubscriptionMessage> messages = pullConsumer.poll(10000);
                for (final SubscriptionMessage message : messages) {
                    final short messageType = message.getMessageType();
                    if (SubscriptionMessageType.isValidatedMessageType(messageType)) {
                        for (final SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) {
                            while (dataSet.hasNext()) {
                                final RowRecord record = dataSet.next();
                                System.out.println(record);
                            }
                        }
                    }
                }
            }
        }

```

### Error:
org.apache.iotdb.rpc.subscription.exception.SubscriptionConnectionException: Cluster has no available subscription providers to connect with initial endpoints [TEndPoint(ip:localhost, port:0)]

	at org.apache.iotdb.session.subscription.consumer.SubscriptionProviders.openProviders(SubscriptionProviders.java:123)
	at org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer.open(SubscriptionConsumer.java:260)
	at org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer.open(SubscriptionPullConsumer.java:112)
	at org.apache.iotdb.session.SessionTest.testNullCluster(SessionTest.java:1222)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
	at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:232)
	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:55)


### Reason
Since the value property of Properties cannot be set to null, and the AbstractSubscriptionConsumer(final AbstractSubscriptionConsumerBuilder builder, final Properties properties) constructor sets default values, the condition if (Objects.nonNull(builder.host) || Objects.nonNull(builder.port)) in the AbstractSubscriptionConsumer(final AbstractSubscriptionConsumerBuilder builder) constructor will always evaluate to true.

##### Key changed/added classes 
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
@VGalaxies VGalaxies changed the title [To dev/1.3] [Bug] When using SubscriptionPullConsumer to consume data, an error is reported when configuring ConsumerConstant.NODE_URLS_KEY as the cluster address. (#15322) [To dev/1.3] Subscription: [Bug] When using SubscriptionPullConsumer to consume data, an error is reported when configuring ConsumerConstant.NODE_URLS_KEY as the cluster address. (#15322) Apr 17, 2025
@SteveYurongSu SteveYurongSu self-assigned this Apr 18, 2025
@SteveYurongSu SteveYurongSu merged commit 62e1da1 into apache:dev/1.3 Apr 21, 2025
25 checks passed
@VGalaxies VGalaxies deleted the cp-15322 branch April 23, 2025 10:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants