Skip to content

Commit f862961

Browse files
authored
[Reader] Should set either start message id or start message from roll back duration. (#6392)
Currently, when constructing a reader, users can set both start message id and start time. This is strange and the behavior should be forbidden.
1 parent 336e971 commit f862961

File tree

3 files changed

+34
-3
lines changed

3 files changed

+34
-3
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ public void testReaderWithTimeLong() throws Exception {
242242

243243
// (3) Create reader and set position 1 hour back so, it should only read messages which are 2 hours old which
244244
// published on step 2
245-
Reader<byte[]> reader = pulsarClient.newReader().topic(topic).startMessageId(MessageId.earliest)
245+
Reader<byte[]> reader = pulsarClient.newReader().topic(topic)
246246
.startMessageFromRollbackDuration(2, TimeUnit.HOURS).create();
247247

248248
List<MessageId> receivedMessageIds = Lists.newArrayList();

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,15 @@ public CompletableFuture<Reader<T>> createAsync() {
8282
.failedFuture(new IllegalArgumentException("Topic name must be set on the reader builder"));
8383
}
8484

85-
if (conf.getStartMessageId() == null) {
85+
if (conf.getStartMessageId() != null && conf.getStartMessageFromRollbackDurationInSec() > 0 ||
86+
conf.getStartMessageId() == null && conf.getStartMessageFromRollbackDurationInSec() <= 0) {
8687
return FutureUtil
87-
.failedFuture(new IllegalArgumentException("Start message id must be set on the reader builder"));
88+
.failedFuture(new IllegalArgumentException(
89+
"Start message id or start message from roll back must be specified but they cannot be specified at the same time"));
90+
}
91+
92+
if (conf.getStartMessageFromRollbackDurationInSec() > 0) {
93+
conf.setStartMessageId(MessageId.earliest);
8894
}
8995

9096
return client.createReaderAsync(conf, schema);

pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,12 @@
2626
import java.lang.reflect.Field;
2727
import java.util.HashMap;
2828
import java.util.Map;
29+
import java.util.concurrent.TimeUnit;
30+
2931
import org.apache.pulsar.client.api.MessageId;
3032
import org.apache.pulsar.client.api.PulsarClient;
33+
import org.apache.pulsar.client.api.PulsarClientException;
34+
import org.apache.pulsar.client.api.Reader;
3135
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
3236
import org.testng.annotations.Test;
3337

@@ -96,5 +100,26 @@ public void readerBuilderLoadConfTest() throws Exception {
96100
assertTrue(obj instanceof ReaderConfigurationData);
97101
assertEquals(((ReaderConfigurationData) obj).getTopicName(), topicName);
98102
assertEquals(((ReaderConfigurationData) obj).getStartMessageId(), messageId);
103+
client.close();
104+
}
105+
106+
@Test(expectedExceptions = {PulsarClientException.class}, expectedExceptionsMessageRegExp = ".* must be specified but they cannot be specified at the same time.*")
107+
public void shouldNotSetTwoOptAtTheSameTime() throws Exception {
108+
PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
109+
try (Reader reader = client.newReader().topic("abc").startMessageId(MessageId.earliest).startMessageFromRollbackDuration(10, TimeUnit.HOURS).create()) {
110+
// no-op
111+
} finally {
112+
client.close();
113+
}
114+
}
115+
116+
@Test(expectedExceptions = {PulsarClientException.class}, expectedExceptionsMessageRegExp = ".* must be specified but they cannot be specified at the same time.*")
117+
public void shouldSetOneStartOpt() throws Exception {
118+
PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
119+
try (Reader reader = client.newReader().topic("abc").create()) {
120+
// no-op
121+
} finally {
122+
client.close();
123+
}
99124
}
100125
}

0 commit comments

Comments
 (0)