Skip to content

Redelivery of messages by negative ack happened more early on C++. #6461

@k2la

Description

@k2la

On pulsar-client-cpp, when NegativeAckRedeliveryDelayMs was 10000, redelivery of messages by negative ack happened a few times in 10 seconds.

Expected behavior

my-message-0: Mon Mar  2 15:41:56 2020
my-message-1: Mon Mar  2 15:41:56 2020
my-message-2: Mon Mar  2 15:41:56 2020
my-message-0: Mon Mar  2 15:42:06 2020
my-message-1: Mon Mar  2 15:42:06 2020
my-message-2: Mon Mar  2 15:42:06 2020
my-message-0: Mon Mar  2 15:42:16 2020
my-message-1: Mon Mar  2 15:42:16 2020
my-message-2: Mon Mar  2 15:42:16 2020

Actual behavior

my-message-0: Mon Mar  2 15:41:56 2020
my-message-1: Mon Mar  2 15:41:56 2020
my-message-2: Mon Mar  2 15:41:56 2020
my-message-0: Mon Mar  2 15:41:59 2020
my-message-1: Mon Mar  2 15:41:59 2020
my-message-2: Mon Mar  2 15:41:59 2020
my-message-0: Mon Mar  2 15:42:03 2020
my-message-1: Mon Mar  2 15:42:03 2020
my-message-2: Mon Mar  2 15:42:03 2020

Steps to reproduce

Execute the following code:

#include <iostream>
#include <pulsar/Client.h>
#include <pulsar/Authentication.h>
#include <boost/property_tree/ini_parser.hpp>
#include <time.h>

using namespace std;
using namespace pulsar;
using namespace boost::property_tree;

void messageListener(Consumer consumer, const Message &msg) {
  time_t t = time(NULL);
  printf("%s: %s", msg.getDataAsString().c_str(), ctime(&t));
  // consumer.acknowledge(msg);
  consumer.negativeAcknowledge(msg);
}

int main() {
  Client client("pulsar://<broker url>");

  ConsumerConfiguration conf = ConsumerConfiguration();
  // conf.setUnAckedMessagesTimeoutMs(10000);
  conf.setNegativeAckRedeliveryDelayMs(10000);
  conf.setMessageListener(messageListener);

  Consumer consumer;
  Result result = client.subscribe("persistent://<tenant>/<ns>/<topic>", "sub1", conf, consumer);
  if (result != ResultOk) {
      cout << "error" << "\n";
      return -1;
  }

  if (result != ResultOk) {
    cerr << "Failed to create consumer: " << result << endl;
    return -1;
  }

  int n;
  cin >> n;

  client.close();
}

System configuration

Pulsar Broker version: 2.5.0
Pulsar Client C++ version: master, 2.4.2
MacOS: 10.15.2
cmake: 3.16.3
GNU Make: 3.81

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions