Skip to content

Commit f13af48

Browse files
merlimatsijie
authored andcommitted
Allow to configure ack-timeout tick time (#4760)
### Motivation After the changes in #3118, there has a been a sharp increase of memory utilization for the UnackedMessageTracker due to the time buckets being created. This is especially true when the acktimeout is set to a larger value (eg: 1h) where 3600 time-buckets are being created. This lead to use 20MB per partition even when no message is tracked. Allowing to configure the tick time so that application can tune it based on needs. Additionally, fixed the logic that keeps creating hash maps and throwing them away at each tick time iteration, since that creates a lot of garbage and doesn't take care of the fact that the hash maps are expanding based on the required capacity (so next time they are already of the "right" size). On a final note: the current default of 1sec seems very wasteful. Something like 10s should be more appropriate as default.
1 parent 5cff169 commit f13af48

File tree

5 files changed

+51
-17
lines changed

5 files changed

+51
-17
lines changed

pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ public interface ConsumerBuilder<T> extends Cloneable {
170170

171171
/**
172172
* Set the timeout for unacked messages, truncated to the nearest millisecond. The timeout needs to be greater than
173-
* 10 seconds.
173+
* 1 second.
174174
* <p>
175175
* By default, the acknowledge timeout is disabled and that means that messages delivered to a
176176
* consumer will not be re-delivered unless the consumer crashes.
@@ -187,6 +187,21 @@ public interface ConsumerBuilder<T> extends Cloneable {
187187
*/
188188
ConsumerBuilder<T> ackTimeout(long ackTimeout, TimeUnit timeUnit);
189189

190+
/**
191+
* Define the granularity of the ack-timeout redelivery.
192+
* <p>
193+
* By default, the tick time is set to 1 second. Using an higher tick time will
194+
* reduce the memory overhead to track messages when the ack-timeout is set to
195+
* bigger values (eg: 1hour).
196+
*
197+
* @param tickTime
198+
* the min precision for the ack timeout messages tracker
199+
* @param timeUnit
200+
* unit in which the timeout is provided.
201+
* @return the consumer builder instance
202+
*/
203+
ConsumerBuilder<T> ackTimeoutTickTime(long tickTime, TimeUnit timeUnit);
204+
190205
/**
191206
* Set the delay to wait before re-delivering messages that have failed to be process.
192207
* <p>
@@ -386,7 +401,7 @@ public interface ConsumerBuilder<T> extends Cloneable {
386401
* C5 1 1
387402
* Order in which broker dispatches messages to consumers: C1, C2, C3, C1, C4, C5, C4
388403
* </pre>
389-
*
404+
*
390405
* <b>Failover subscription</b>
391406
* Broker selects active consumer for a failover-subscription based on consumer's priority-level and lexicographical sorting of a consumer name.
392407
* eg:
@@ -395,15 +410,15 @@ public interface ConsumerBuilder<T> extends Cloneable {
395410
* Consumer PriorityLevel Name
396411
* C1 0 aaa
397412
* C2 0 bbb
398-
*
413+
*
399414
* 2. Active consumer = C2 : Consumer with highest priority
400415
* Consumer PriorityLevel Name
401416
* C1 1 aaa
402417
* C2 0 bbb
403-
*
418+
*
404419
* Partitioned-topics:
405420
* Broker evenly assigns partitioned topics to highest priority consumers.
406-
*
421+
*
407422
* </pre>
408423
*
409424
* @param priorityLevel the priority of this consumer

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
5858
private List<ConsumerInterceptor<T>> interceptorList;
5959

6060
private static long MIN_ACK_TIMEOUT_MILLIS = 1000;
61+
private static long MIN_TICK_TIME_MILLIS = 100;
6162
private static long DEFAULT_ACK_TIMEOUT_MILLIS_FOR_DEAD_LETTER = 30000L;
6263

6364

@@ -156,6 +157,14 @@ public ConsumerBuilder<T> ackTimeout(long ackTimeout, TimeUnit timeUnit) {
156157
return this;
157158
}
158159

160+
@Override
161+
public ConsumerBuilder<T> ackTimeoutTickTime(long tickTime, TimeUnit timeUnit) {
162+
checkArgument(timeUnit.toMillis(tickTime) >= MIN_TICK_TIME_MILLIS,
163+
"Ack timeout tick time should be greater than " + MIN_TICK_TIME_MILLIS + " ms");
164+
conf.setTickDurationMillis(timeUnit.toMillis(tickTime));
165+
return this;
166+
}
167+
159168
@Override
160169
public ConsumerBuilder<T> negativeAckRedeliveryDelay(long redeliveryDelay, TimeUnit timeUnit) {
161170
checkArgument(redeliveryDelay >= 0, "redeliveryDelay needs to be >= 0");

pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,15 @@
2121
import com.google.common.base.Preconditions;
2222
import io.netty.util.Timeout;
2323
import io.netty.util.TimerTask;
24+
import io.netty.util.concurrent.FastThreadLocal;
25+
2426
import org.apache.pulsar.client.api.MessageId;
2527
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
2628
import org.slf4j.Logger;
2729
import org.slf4j.LoggerFactory;
2830

2931
import java.io.Closeable;
32+
import java.util.ArrayDeque;
3033
import java.util.HashSet;
3134
import java.util.Iterator;
3235
import java.util.LinkedList;
@@ -40,7 +43,7 @@ public class UnAckedMessageTracker implements Closeable {
4043
private static final Logger log = LoggerFactory.getLogger(UnAckedMessageTracker.class);
4144

4245
protected final ConcurrentHashMap<MessageId, ConcurrentOpenHashSet<MessageId>> messageIdPartitionMap;
43-
protected final LinkedList<ConcurrentOpenHashSet<MessageId>> timePartitions;
46+
protected final ArrayDeque<ConcurrentOpenHashSet<MessageId>> timePartitions;
4447

4548
protected final Lock readLock;
4649
protected final Lock writeLock;
@@ -94,6 +97,13 @@ public UnAckedMessageTracker(PulsarClientImpl client, ConsumerBase<?> consumerBa
9497
this(client, consumerBase, ackTimeoutMillis, ackTimeoutMillis);
9598
}
9699

100+
private static final FastThreadLocal<HashSet<MessageId>> TL_MESSAGE_IDS_SET = new FastThreadLocal<HashSet<MessageId>>() {
101+
@Override
102+
protected HashSet<MessageId> initialValue() throws Exception {
103+
return new HashSet<>();
104+
}
105+
};
106+
97107
public UnAckedMessageTracker(PulsarClientImpl client, ConsumerBase<?> consumerBase, long ackTimeoutMillis, long tickDurationInMs) {
98108
Preconditions.checkArgument(tickDurationInMs > 0 && ackTimeoutMillis >= tickDurationInMs);
99109
this.ackTimeoutMillis = ackTimeoutMillis;
@@ -102,20 +112,21 @@ public UnAckedMessageTracker(PulsarClientImpl client, ConsumerBase<?> consumerBa
102112
this.readLock = readWriteLock.readLock();
103113
this.writeLock = readWriteLock.writeLock();
104114
this.messageIdPartitionMap = new ConcurrentHashMap<>();
105-
this.timePartitions = new LinkedList<>();
115+
this.timePartitions = new ArrayDeque<>();
106116

107117
int blankPartitions = (int)Math.ceil((double)this.ackTimeoutMillis / this.tickDurationInMs);
108118
for (int i = 0; i < blankPartitions + 1; i++) {
109-
timePartitions.add(new ConcurrentOpenHashSet<>());
119+
timePartitions.add(new ConcurrentOpenHashSet<>(16, 1));
110120
}
111121

112122
timeout = client.timer().newTimeout(new TimerTask() {
113123
@Override
114124
public void run(Timeout t) throws Exception {
115-
Set<MessageId> messageIds = new HashSet<>();
125+
Set<MessageId> messageIds = TL_MESSAGE_IDS_SET.get();
126+
messageIds.clear();
127+
116128
writeLock.lock();
117129
try {
118-
timePartitions.addLast(new ConcurrentOpenHashSet<>());
119130
ConcurrentOpenHashSet<MessageId> headPartition = timePartitions.removeFirst();
120131
if (!headPartition.isEmpty()) {
121132
log.warn("[{}] {} messages have timed-out", consumerBase, timePartitions.size());
@@ -124,6 +135,9 @@ public void run(Timeout t) throws Exception {
124135
messageIdPartitionMap.remove(messageId);
125136
});
126137
}
138+
139+
headPartition.clear();
140+
timePartitions.addLast(headPartition);
127141
} finally {
128142
writeLock.unlock();
129143
}
@@ -140,11 +154,7 @@ public void clear() {
140154
writeLock.lock();
141155
try {
142156
messageIdPartitionMap.clear();
143-
timePartitions.clear();
144-
int blankPartitions = (int)Math.ceil((double)ackTimeoutMillis / tickDurationInMs);
145-
for (int i = 0; i < blankPartitions + 1; i++) {
146-
timePartitions.add(new ConcurrentOpenHashSet<>());
147-
}
157+
timePartitions.forEach(tp -> tp.clear());
148158
} finally {
149159
writeLock.unlock();
150160
}

pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ private static ManagedLedgerFactory initManagedLedgerFactory(PulsarConnectorConf
8686
.setZkServers(pulsarConnectorConfig.getZookeeperUri())
8787
.setClientTcpNoDelay(false)
8888
.setUseV2WireProtocol(true)
89-
.setStickyReadsEnabled(true)
89+
.setStickyReadsEnabled(false)
9090
.setReadEntryTimeout(60);
9191
return new ManagedLedgerFactoryImpl(bkClientConfiguration);
9292
}

pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ ManagedLedgerFactory getManagedLedgerFactory() throws Exception {
128128
ClientConfiguration bkClientConfiguration = new ClientConfiguration()
129129
.setZkServers(this.pulsarConnectorConfig.getZookeeperUri())
130130
.setClientTcpNoDelay(false)
131-
.setStickyReadsEnabled(true)
131+
.setStickyReadsEnabled(false)
132132
.setUseV2WireProtocol(true);
133133
return new ManagedLedgerFactoryImpl(bkClientConfiguration);
134134
}

0 commit comments

Comments
 (0)