Skip to content

Commit 8cfaf48

Browse files
authored
Add streaming dispatcher. (#9056)
Related to #3804 ### Motivation Trying to streamline the dispatcher's read requests to manager ledger instead of micro batch. ### Modifications Created a StreamingEntryReader that can streamline read request to managed ledger. Created StreamingDispatcher interface with necessary method to interact with StreamingEntryReader. Created PersistentStreamingDispatcherSingleActive/MultipleConsumer that make use of StreamingEntryReader to read entries from managed ledger. Add config to use streaming dispatcher.
1 parent 9af8577 commit 8cfaf48

29 files changed

+1905
-237
lines changed

build/run_unit_group.sh

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,17 @@ function broker_group_2() {
5656
-DtestForkCount=1 \
5757
-DtestReuseFork=true
5858

59+
$MVN_TEST_COMMAND -pl pulsar-broker -Dinclude="**/*StreamingDispatcher*Test.java" \
60+
-DtestForkCount=1 \
61+
-DtestReuseFork=true
62+
5963
$MVN_TEST_COMMAND -pl pulsar-broker -Dinclude="org/apache/pulsar/broker/zookeeper/**/*.java,
6064
org/apache/pulsar/broker/loadbalance/**/*.java,
6165
org/apache/pulsar/broker/service/**/*.java" \
6266
-Dexclude="**/ReplicatorTest.java,
6367
**/MessagePublishBufferThrottleTest.java,
6468
**/TopicOwnerTest.java,
69+
**/*StreamingDispatcher*Test.java,
6570
**/AntiAffinityNamespaceGroupTest.java"
6671
}
6772

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.bookkeeper.mledger;
20+
21+
/**
22+
* Contains callback that can be registered with {@link ManagedLedger} to wait for new entries to be available.
23+
*/
24+
public interface WaitingEntryCallBack {
25+
26+
/**
27+
* The callback {@link ManagedLedger} will trigger when new entries are available.
28+
*/
29+
void entriesAvailable();
30+
}

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@
110110
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
111111
import org.apache.bookkeeper.mledger.ManagedLedgerMXBean;
112112
import org.apache.bookkeeper.mledger.Position;
113+
import org.apache.bookkeeper.mledger.WaitingEntryCallBack;
113114
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
114115
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
115116
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
@@ -170,6 +171,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
170171
// Cursors that are waiting to be notified when new entries are persisted
171172
final ConcurrentLinkedQueue<ManagedCursorImpl> waitingCursors;
172173

174+
// Objects that are waiting to be notified when new entries are persisted
175+
final ConcurrentLinkedQueue<WaitingEntryCallBack> waitingEntryCallBacks;
176+
173177
// This map is used for concurrent open cursor requests, where the 2nd request will attach a listener to the
174178
// uninitialized cursor future from the 1st request
175179
final Map<String, CompletableFuture<ManagedCursor>> uninitializedCursors;
@@ -290,6 +294,7 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper
290294
this.mbean = new ManagedLedgerMBeanImpl(this);
291295
this.entryCache = factory.getEntryCacheManager().getEntryCache(this);
292296
this.waitingCursors = Queues.newConcurrentLinkedQueue();
297+
this.waitingEntryCallBacks = Queues.newConcurrentLinkedQueue();
293298
this.uninitializedCursors = Maps.newHashMap();
294299
this.clock = config.getClock();
295300

@@ -2109,6 +2114,21 @@ void notifyCursors() {
21092114
}
21102115
}
21112116

2117+
void notifyWaitingEntryCallBacks() {
2118+
while (true) {
2119+
final WaitingEntryCallBack cb = waitingEntryCallBacks.poll();
2120+
if (cb == null) {
2121+
break;
2122+
}
2123+
2124+
executor.execute(safeRun(cb::entriesAvailable));
2125+
}
2126+
}
2127+
2128+
public void addWaitingEntryCallBack(WaitingEntryCallBack cb) {
2129+
this.waitingEntryCallBacks.add(cb);
2130+
}
2131+
21122132
private void trimConsumedLedgersInBackground() {
21132133
trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
21142134
}
@@ -3086,7 +3106,7 @@ public PositionImpl getPreviousPosition(PositionImpl position) {
30863106
* the position to validate
30873107
* @return true if the position is valid, false otherwise
30883108
*/
3089-
boolean isValidPosition(PositionImpl position) {
3109+
public boolean isValidPosition(PositionImpl position) {
30903110
PositionImpl last = lastConfirmedEntry;
30913111
if (log.isDebugEnabled()) {
30923112
log.debug("IsValid position: {} -- last: {}", position, last);
@@ -3130,7 +3150,9 @@ public PositionImpl getNextValidPosition(final PositionImpl position) {
31303150
next = getNextValidPositionInternal(position);
31313151
} catch (NullPointerException e) {
31323152
next = lastConfirmedEntry.getNext();
3133-
log.error("[{}] Can't find next valid position, fail back to the next position of the last position.", name, e);
3153+
if (log.isDebugEnabled()) {
3154+
log.debug("[{}] Can't find next valid position, fall back to the next position of the last position.", name, e);
3155+
}
31343156
}
31353157
return next;
31363158
}

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ public void safeRun() {
207207
cb.addComplete(lastEntry, data.asReadOnly(), ctx);
208208
ReferenceCountUtil.release(data);
209209
ml.notifyCursors();
210+
ml.notifyWaitingEntryCallBacks();
210211
this.recycle();
211212
} else {
212213
ReferenceCountUtil.release(data);
@@ -232,6 +233,7 @@ public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
232233
if (cb != null) {
233234
cb.addComplete(PositionImpl.get(lh.getId(), entryId), null, ctx);
234235
ml.notifyCursors();
236+
ml.notifyWaitingEntryCallBacks();
235237
this.recycle();
236238
}
237239
}

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -685,6 +685,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
685685
)
686686
private boolean preciseDispatcherFlowControl = false;
687687

688+
@FieldContext(
689+
category = CATEGORY_SERVER,
690+
doc = "Whether to use streaming read dispatcher. Currently is in preview and can be changed " +
691+
"in subsequent release."
692+
)
693+
private boolean streamingDispatch = false;
694+
688695
@FieldContext(
689696
dynamic = true,
690697
category = CATEGORY_SERVER,

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ public SubType getType() {
7272

7373
public abstract boolean isConsumerAvailable(Consumer consumer);
7474

75+
protected void cancelPendingRead() {}
76+
7577
/**
7678
* <pre>
7779
* Broker gives more priority while dispatching messages. Here, broker follows descending priorities. (eg:

0 commit comments

Comments
 (0)