Skip to content

Commit 9e5bf3b

Browse files
committed
Adding unit test
1 parent 8dead53 commit 9e5bf3b

1 file changed

Lines changed: 95 additions & 0 deletions

File tree

src/test/java/io/nats/client/impl/SimplificationTests.java

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.time.temporal.ChronoUnit;
2626
import java.util.concurrent.CountDownLatch;
2727
import java.util.concurrent.TimeUnit;
28+
import java.util.concurrent.atomic.AtomicBoolean;
2829
import java.util.concurrent.atomic.AtomicInteger;
2930
import java.util.concurrent.atomic.AtomicLong;
3031

@@ -1529,4 +1530,98 @@ public void testFinishEmptyStream() throws Exception {
15291530
}
15301531
});
15311532
}
1533+
1534+
@Test
1535+
public void testReconnectOverOrdered() throws Exception {
1536+
// ------------------------------------------------------------
1537+
// The idea here is...
1538+
// 1. connect with an ordered consumer and start consuming
1539+
// 2. stop the server then restart it causing a disconnect,
1540+
// but reconnect before the idle heartbeat alarm kicks in
1541+
// 3. stop the server but wait a little before restarting
1542+
// so the alarm goes off but still disconnected
1543+
// to make sure the consumer continues after that condition
1544+
// ------------------------------------------------------------
1545+
int port = NatsTestServer.nextPort();
1546+
ListenerForTesting lft = new ListenerForTesting();
1547+
Options options = new Options.Builder()
1548+
.connectionListener(lft)
1549+
.errorListener(lft)
1550+
.server(NatsTestServer.getNatsLocalhostUri(port)).build();
1551+
NatsConnection nc;
1552+
1553+
String stream = stream();
1554+
String subject = subject();
1555+
1556+
AtomicBoolean allInOrder = new AtomicBoolean(true);
1557+
AtomicInteger atomicCount = new AtomicInteger();
1558+
AtomicLong nextExpectedSequence = new AtomicLong(0);
1559+
1560+
MessageHandler handler = msg -> {
1561+
if (msg.metaData().streamSequence() != nextExpectedSequence.incrementAndGet()) {
1562+
allInOrder.set(false);
1563+
}
1564+
msg.ack();
1565+
atomicCount.incrementAndGet();
1566+
sleep(50); // simulate some work and to slow the endless consume
1567+
};
1568+
1569+
StreamContext streamContext;
1570+
OrderedConsumerContext orderedConsumerContext;
1571+
MessageConsumer consumer;
1572+
1573+
try (NatsTestServer ts = new NatsTestServer(port, false, true)) {
1574+
nc = (NatsConnection) standardConnection(options);
1575+
StreamConfiguration sc = StreamConfiguration.builder()
1576+
.name(stream)
1577+
.storageType(StorageType.File) // file since we are killing the server and bringing it back up.
1578+
.subjects(subject).build();
1579+
nc.jetStreamManagement().addStream(sc);
1580+
1581+
jsPublish(nc, subject, 10000);
1582+
1583+
ConsumeOptions consumeOptions = ConsumeOptions.builder()
1584+
.batchSize(100) // small batch size means more round trips
1585+
.expiresIn(2000) // idle heartbeat is half of this, alarm time is 3 * ihb
1586+
.build();
1587+
1588+
OrderedConsumerConfiguration ocConfig = new OrderedConsumerConfiguration()
1589+
.filterSubjects(subject);
1590+
streamContext = nc.getStreamContext(stream);
1591+
orderedConsumerContext = streamContext.createOrderedConsumer(ocConfig);
1592+
//noinspection resource
1593+
consumer = orderedConsumerContext.consume(consumeOptions, handler);
1594+
1595+
sleep(500); // time enough to get some messages
1596+
}
1597+
1598+
assertTrue(allInOrder.get());
1599+
int count1 = atomicCount.get();
1600+
assertTrue(count1 > 0);
1601+
assertEquals(count1, nextExpectedSequence.get());
1602+
1603+
// reconnect and get some more messages
1604+
try (NatsTestServer ignored = new NatsTestServer(port, false, true)) {
1605+
standardConnectionWait(nc);
1606+
sleep(5000); // long enough to get messages and for the hb alarm to have tripped
1607+
}
1608+
1609+
assertTrue(allInOrder.get());
1610+
int count2 = atomicCount.get();
1611+
assertTrue(count2 > count1);
1612+
assertEquals(count2, nextExpectedSequence.get());
1613+
1614+
sleep(4000); // enough delay before reconnect to trip hb alarm again
1615+
try (NatsTestServer ignored = new NatsTestServer(port, false, true)) {
1616+
standardConnectionWait(nc);
1617+
sleep(4000); // long enough to get messages and for the hb alarm to have tripped
1618+
1619+
nc.jetStreamManagement().deleteStream(stream); // it was a file stream clean it up
1620+
}
1621+
1622+
assertTrue(allInOrder.get());
1623+
int count3 = atomicCount.get();
1624+
assertTrue(count3 > count2);
1625+
assertEquals(count3, nextExpectedSequence.get());
1626+
}
15321627
}

0 commit comments

Comments
 (0)