|
25 | 25 | import java.time.temporal.ChronoUnit; |
26 | 26 | import java.util.concurrent.CountDownLatch; |
27 | 27 | import java.util.concurrent.TimeUnit; |
| 28 | +import java.util.concurrent.atomic.AtomicBoolean; |
28 | 29 | import java.util.concurrent.atomic.AtomicInteger; |
29 | 30 | import java.util.concurrent.atomic.AtomicLong; |
30 | 31 |
|
@@ -1529,4 +1530,98 @@ public void testFinishEmptyStream() throws Exception { |
1529 | 1530 | } |
1530 | 1531 | }); |
1531 | 1532 | } |
| 1533 | + |
| 1534 | + @Test |
| 1535 | + public void testReconnectOverOrdered() throws Exception { |
| 1536 | + // ------------------------------------------------------------ |
| 1537 | + // The idea here is... |
| 1538 | + // 1. connect with an ordered consumer and start consuming |
| 1539 | + // 2. stop the server then restart it causing a disconnect, |
| 1540 | + // but reconnect before the idle heartbeat alarm kicks in |
| 1541 | + // 3. stop the server but wait a little before restarting |
| 1542 | + // so the alarm goes off but still disconnected |
| 1543 | + // to make sure the consumer continues after that condition |
| 1544 | + // ------------------------------------------------------------ |
| 1545 | + int port = NatsTestServer.nextPort(); |
| 1546 | + ListenerForTesting lft = new ListenerForTesting(); |
| 1547 | + Options options = new Options.Builder() |
| 1548 | + .connectionListener(lft) |
| 1549 | + .errorListener(lft) |
| 1550 | + .server(NatsTestServer.getNatsLocalhostUri(port)).build(); |
| 1551 | + NatsConnection nc; |
| 1552 | + |
| 1553 | + String stream = stream(); |
| 1554 | + String subject = subject(); |
| 1555 | + |
| 1556 | + AtomicBoolean allInOrder = new AtomicBoolean(true); |
| 1557 | + AtomicInteger atomicCount = new AtomicInteger(); |
| 1558 | + AtomicLong nextExpectedSequence = new AtomicLong(0); |
| 1559 | + |
| 1560 | + MessageHandler handler = msg -> { |
| 1561 | + if (msg.metaData().streamSequence() != nextExpectedSequence.incrementAndGet()) { |
| 1562 | + allInOrder.set(false); |
| 1563 | + } |
| 1564 | + msg.ack(); |
| 1565 | + atomicCount.incrementAndGet(); |
| 1566 | + sleep(50); // simulate some work and to slow the endless consume |
| 1567 | + }; |
| 1568 | + |
| 1569 | + StreamContext streamContext; |
| 1570 | + OrderedConsumerContext orderedConsumerContext; |
| 1571 | + MessageConsumer consumer; |
| 1572 | + |
| 1573 | + try (NatsTestServer ts = new NatsTestServer(port, false, true)) { |
| 1574 | + nc = (NatsConnection) standardConnection(options); |
| 1575 | + StreamConfiguration sc = StreamConfiguration.builder() |
| 1576 | + .name(stream) |
| 1577 | + .storageType(StorageType.File) // file since we are killing the server and bringing it back up. |
| 1578 | + .subjects(subject).build(); |
| 1579 | + nc.jetStreamManagement().addStream(sc); |
| 1580 | + |
| 1581 | + jsPublish(nc, subject, 10000); |
| 1582 | + |
| 1583 | + ConsumeOptions consumeOptions = ConsumeOptions.builder() |
| 1584 | + .batchSize(100) // small batch size means more round trips |
| 1585 | + .expiresIn(2000) // idle heartbeat is half of this, alarm time is 3 * ihb |
| 1586 | + .build(); |
| 1587 | + |
| 1588 | + OrderedConsumerConfiguration ocConfig = new OrderedConsumerConfiguration() |
| 1589 | + .filterSubjects(subject); |
| 1590 | + streamContext = nc.getStreamContext(stream); |
| 1591 | + orderedConsumerContext = streamContext.createOrderedConsumer(ocConfig); |
| 1592 | + //noinspection resource |
| 1593 | + consumer = orderedConsumerContext.consume(consumeOptions, handler); |
| 1594 | + |
| 1595 | + sleep(500); // time enough to get some messages |
| 1596 | + } |
| 1597 | + |
| 1598 | + assertTrue(allInOrder.get()); |
| 1599 | + int count1 = atomicCount.get(); |
| 1600 | + assertTrue(count1 > 0); |
| 1601 | + assertEquals(count1, nextExpectedSequence.get()); |
| 1602 | + |
| 1603 | + // reconnect and get some more messages |
| 1604 | + try (NatsTestServer ignored = new NatsTestServer(port, false, true)) { |
| 1605 | + standardConnectionWait(nc); |
| 1606 | + sleep(5000); // long enough to get messages and for the hb alarm to have tripped |
| 1607 | + } |
| 1608 | + |
| 1609 | + assertTrue(allInOrder.get()); |
| 1610 | + int count2 = atomicCount.get(); |
| 1611 | + assertTrue(count2 > count1); |
| 1612 | + assertEquals(count2, nextExpectedSequence.get()); |
| 1613 | + |
| 1614 | + sleep(4000); // enough delay before reconnect to trip hb alarm again |
| 1615 | + try (NatsTestServer ignored = new NatsTestServer(port, false, true)) { |
| 1616 | + standardConnectionWait(nc); |
| 1617 | + sleep(4000); // long enough to get messages and for the hb alarm to have tripped |
| 1618 | + |
| 1619 | + nc.jetStreamManagement().deleteStream(stream); // it was a file stream clean it up |
| 1620 | + } |
| 1621 | + |
| 1622 | + assertTrue(allInOrder.get()); |
| 1623 | + int count3 = atomicCount.get(); |
| 1624 | + assertTrue(count3 > count2); |
| 1625 | + assertEquals(count3, nextExpectedSequence.get()); |
| 1626 | + } |
1532 | 1627 | } |
0 commit comments