Skip to content

Commit 0bbb945

Browse files
author
Yao Liu
authored
[multistage] [bugfix] Fix sending mailbox object leak (#10190)
* cancel * clean up * error format * all type support and fix * fix sending mailbox leak * fix lint * address comments * address comments
1 parent ce0af82 commit 0bbb945

File tree

15 files changed

+190
-268
lines changed

15 files changed

+190
-268
lines changed

pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,14 @@
1919
package org.apache.pinot.query.mailbox;
2020

2121
import io.grpc.ManagedChannel;
22+
import io.grpc.stub.StreamObserver;
2223
import java.util.concurrent.ConcurrentHashMap;
24+
import java.util.concurrent.CountDownLatch;
2325
import java.util.function.Consumer;
26+
import org.apache.pinot.common.proto.Mailbox;
27+
import org.apache.pinot.common.proto.PinotMailboxGrpc;
2428
import org.apache.pinot.query.mailbox.channel.ChannelManager;
29+
import org.apache.pinot.query.mailbox.channel.MailboxStatusStreamObserver;
2530
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
2631
import org.apache.pinot.spi.env.PinotConfiguration;
2732

@@ -52,8 +57,6 @@ public class GrpcMailboxService implements MailboxService<TransferableBlock> {
5257
// maintaining a list of registered mailboxes.
5358
private final ConcurrentHashMap<String, ReceivingMailbox<TransferableBlock>> _receivingMailboxMap =
5459
new ConcurrentHashMap<>();
55-
private final ConcurrentHashMap<String, SendingMailbox<TransferableBlock>> _sendingMailboxMap =
56-
new ConcurrentHashMap<>();
5760
private final Consumer<MailboxIdentifier> _gotMailCallback;
5861

5962
public GrpcMailboxService(String hostname, int mailboxPort, PinotConfiguration extraConfig,
@@ -89,16 +92,22 @@ public int getMailboxPort() {
8992
* @param mailboxId the id of the mailbox.
9093
*/
9194
public SendingMailbox<TransferableBlock> getSendingMailbox(MailboxIdentifier mailboxId) {
92-
return _sendingMailboxMap.computeIfAbsent(mailboxId.toString(), (mId) -> new GrpcSendingMailbox(mId, this));
95+
ManagedChannel channel = getChannel(mailboxId.toString());
96+
PinotMailboxGrpc.PinotMailboxStub stub = PinotMailboxGrpc.newStub(channel);
97+
CountDownLatch latch = new CountDownLatch(1);
98+
StreamObserver<Mailbox.MailboxContent> mailboxContentStreamObserver =
99+
stub.open(new MailboxStatusStreamObserver(latch));
100+
GrpcSendingMailbox mailbox = new GrpcSendingMailbox(mailboxId.toString(), mailboxContentStreamObserver, latch);
101+
return mailbox;
93102
}
94103

95104
/**
96105
* Register a mailbox, mailbox needs to be registered before use.
97106
* @param mailboxId the id of the mailbox.
98107
*/
99108
public ReceivingMailbox<TransferableBlock> getReceivingMailbox(MailboxIdentifier mailboxId) {
100-
return _receivingMailboxMap.computeIfAbsent(
101-
mailboxId.toString(), (mId) -> new GrpcReceivingMailbox(mId, _gotMailCallback));
109+
return _receivingMailboxMap.computeIfAbsent(mailboxId.toString(),
110+
(mId) -> new GrpcReceivingMailbox(mId, _gotMailCallback));
102111
}
103112

104113
public ManagedChannel getChannel(String mailboxId) {

pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.apache.pinot.query.mailbox;
2020

2121
import com.google.protobuf.ByteString;
22-
import io.grpc.ManagedChannel;
2322
import io.grpc.stub.StreamObserver;
2423
import java.io.IOException;
2524
import java.util.concurrent.CountDownLatch;
@@ -30,49 +29,35 @@
3029
import org.apache.pinot.common.datablock.MetadataBlock;
3130
import org.apache.pinot.common.proto.Mailbox;
3231
import org.apache.pinot.common.proto.Mailbox.MailboxContent;
33-
import org.apache.pinot.common.proto.PinotMailboxGrpc;
3432
import org.apache.pinot.query.mailbox.channel.ChannelUtils;
35-
import org.apache.pinot.query.mailbox.channel.MailboxStatusStreamObserver;
3633
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
3734

3835

3936
/**
4037
* GRPC implementation of the {@link SendingMailbox}.
4138
*/
4239
public class GrpcSendingMailbox implements SendingMailbox<TransferableBlock> {
43-
private final GrpcMailboxService _mailboxService;
4440
private final String _mailboxId;
4541
private final AtomicBoolean _initialized = new AtomicBoolean(false);
4642
private final AtomicInteger _totalMsgSent = new AtomicInteger(0);
4743

48-
private CountDownLatch _finishLatch = new CountDownLatch(1);
44+
private final CountDownLatch _finishLatch;
45+
private final StreamObserver<MailboxContent> _mailboxContentStreamObserver;
4946

50-
private StreamObserver<MailboxContent> _mailboxContentStreamObserver;
51-
52-
public GrpcSendingMailbox(String mailboxId, GrpcMailboxService mailboxService) {
53-
_mailboxService = mailboxService;
47+
public GrpcSendingMailbox(String mailboxId, StreamObserver<MailboxContent> mailboxContentStreamObserver,
48+
CountDownLatch latch) {
5449
_mailboxId = mailboxId;
50+
_mailboxContentStreamObserver = mailboxContentStreamObserver;
51+
_finishLatch = latch;
5552
_initialized.set(false);
5653
}
5754

58-
public void init()
59-
throws UnsupportedOperationException {
60-
ManagedChannel channel = _mailboxService.getChannel(_mailboxId);
61-
PinotMailboxGrpc.PinotMailboxStub stub = PinotMailboxGrpc.newStub(channel);
62-
_mailboxContentStreamObserver = stub.open(new MailboxStatusStreamObserver(_finishLatch));
63-
// TODO: Replace init call with metadata.
64-
// send a begin-of-stream message.
65-
_mailboxContentStreamObserver.onNext(MailboxContent.newBuilder().setMailboxId(_mailboxId)
66-
.putMetadata(ChannelUtils.MAILBOX_METADATA_BEGIN_OF_STREAM_KEY, "true").build());
67-
_initialized.set(true);
68-
}
69-
7055
@Override
7156
public void send(TransferableBlock block)
7257
throws UnsupportedOperationException {
7358
if (!_initialized.get()) {
7459
// initialization is special
75-
init();
60+
open();
7661
}
7762
MailboxContent data = toMailboxContent(block.getDataBlock());
7863
_mailboxContentStreamObserver.onNext(data);
@@ -84,6 +69,15 @@ public void complete() {
8469
_mailboxContentStreamObserver.onCompleted();
8570
}
8671

72+
@Override
73+
public void open() {
74+
// TODO: Get rid of init call.
75+
// send a begin-of-stream message.
76+
_mailboxContentStreamObserver.onNext(MailboxContent.newBuilder().setMailboxId(_mailboxId)
77+
.putMetadata(ChannelUtils.MAILBOX_METADATA_BEGIN_OF_STREAM_KEY, "true").build());
78+
_initialized.set(true);
79+
}
80+
8781
@Override
8882
public String getMailboxId() {
8983
return _mailboxId;

pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryMailboxService.java

Lines changed: 13 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ public class InMemoryMailboxService implements MailboxService<TransferableBlock>
3232
private final int _mailboxPort;
3333
private final Consumer<MailboxIdentifier> _receivedMailContentCallback;
3434

35-
private final ConcurrentHashMap<String, InMemoryMailboxState> _mailboxStateMap = new ConcurrentHashMap<>();
35+
private final ConcurrentHashMap<String, ReceivingMailbox> _receivingMailbox = new ConcurrentHashMap<>();
36+
private final ConcurrentHashMap<String, BlockingQueue> _mailboxQueue = new ConcurrentHashMap<>();
3637

3738
public InMemoryMailboxService(String hostname, int mailboxPort,
3839
Consumer<MailboxIdentifier> receivedMailContentCallback) {
@@ -41,6 +42,10 @@ public InMemoryMailboxService(String hostname, int mailboxPort,
4142
_receivedMailContentCallback = receivedMailContentCallback;
4243
}
4344

45+
public Consumer<MailboxIdentifier> getReceivedMailContentCallback() {
46+
return _receivedMailContentCallback;
47+
}
48+
4449
@Override
4550
public void start() {
4651
}
@@ -62,24 +67,6 @@ public int getMailboxPort() {
6267
public SendingMailbox<TransferableBlock> getSendingMailbox(MailboxIdentifier mailboxId) {
6368
Preconditions.checkState(mailboxId.isLocal(), "Cannot use in-memory mailbox service for non-local transport");
6469
String mId = mailboxId.toString();
65-
return _mailboxStateMap.computeIfAbsent(mId, this::newMailboxState)._sendingMailbox;
66-
}
67-
68-
public ReceivingMailbox<TransferableBlock> getReceivingMailbox(MailboxIdentifier mailboxId) {
69-
Preconditions.checkState(mailboxId.isLocal(), "Cannot use in-memory mailbox service for non-local transport");
70-
String mId = mailboxId.toString();
71-
return _mailboxStateMap.computeIfAbsent(mId, this::newMailboxState)._receivingMailbox;
72-
}
73-
74-
InMemoryMailboxState newMailboxState(String mailboxId) {
75-
BlockingQueue<TransferableBlock> queue = createDefaultChannel();
76-
return new InMemoryMailboxState(
77-
new InMemorySendingMailbox(mailboxId, queue, _receivedMailContentCallback),
78-
new InMemoryReceivingMailbox(mailboxId, queue),
79-
queue);
80-
}
81-
82-
private BlockingQueue<TransferableBlock> createDefaultChannel() {
8370
// for now, we use an unbounded blocking queue as the means of communication between
8471
// in memory mailboxes - the reason for this is that unless we implement flow control,
8572
// blocks will sit in memory either way (blocking the sender from sending doesn't prevent
@@ -88,19 +75,14 @@ private BlockingQueue<TransferableBlock> createDefaultChannel() {
8875
// threads (most importantly, the receiving thread) from running - which can cause unnecessary
8976
// failure situations
9077
// TODO: when we implement flow control, we should swap this out with a bounded abstraction
91-
return new LinkedBlockingQueue<>();
78+
return new InMemorySendingMailbox(mailboxId.toString(),
79+
_mailboxQueue.computeIfAbsent(mId, id -> new LinkedBlockingQueue<>()), getReceivedMailContentCallback());
9280
}
9381

94-
static class InMemoryMailboxState {
95-
ReceivingMailbox<TransferableBlock> _receivingMailbox;
96-
SendingMailbox<TransferableBlock> _sendingMailbox;
97-
BlockingQueue<TransferableBlock> _queue;
98-
99-
InMemoryMailboxState(SendingMailbox<TransferableBlock> sendingMailbox,
100-
ReceivingMailbox<TransferableBlock> receivingMailbox, BlockingQueue<TransferableBlock> queue) {
101-
_receivingMailbox = receivingMailbox;
102-
_sendingMailbox = sendingMailbox;
103-
_queue = queue;
104-
}
82+
public ReceivingMailbox<TransferableBlock> getReceivingMailbox(MailboxIdentifier mailboxId) {
83+
Preconditions.checkState(mailboxId.isLocal(), "Cannot use in-memory mailbox service for non-local transport");
84+
String mId = mailboxId.toString();
85+
BlockingQueue mailboxQueue = _mailboxQueue.computeIfAbsent(mId, id -> new LinkedBlockingQueue<>());
86+
return _receivingMailbox.computeIfAbsent(mId, id -> new InMemoryReceivingMailbox(mId, mailboxQueue));
10587
}
10688
}

pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,23 @@
2525

2626

2727
public class InMemorySendingMailbox implements SendingMailbox<TransferableBlock> {
28-
private final BlockingQueue<TransferableBlock> _queue;
2928
private final Consumer<MailboxIdentifier> _gotMailCallback;
3029
private final String _mailboxId;
3130

31+
// TODO: changed to 2-way communication channel.
32+
private BlockingQueue<TransferableBlock> _queue;
33+
3234
public InMemorySendingMailbox(String mailboxId, BlockingQueue<TransferableBlock> queue,
3335
Consumer<MailboxIdentifier> gotMailCallback) {
3436
_mailboxId = mailboxId;
3537
_queue = queue;
3638
_gotMailCallback = gotMailCallback;
3739
}
3840

41+
@Override
42+
public void open() {
43+
}
44+
3945
@Override
4046
public String getMailboxId() {
4147
return _mailboxId;

pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
*/
3131
public interface SendingMailbox<T> {
3232

33+
void open();
34+
3335
/**
3436
* get the unique identifier for the mailbox.
3537
*

pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java

Lines changed: 18 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pinot.query.runtime.operator.exchange;
2020

21+
import java.util.ArrayList;
2122
import java.util.Iterator;
2223
import java.util.List;
2324
import org.apache.calcite.rel.RelDistribution;
@@ -38,23 +39,25 @@ public abstract class BlockExchange {
3839
// TODO: Deduct this value via grpc config maximum byte size; and make it configurable with override.
3940
// TODO: Max block size is a soft limit. only counts fixedSize datatable byte buffer
4041
private static final int MAX_MAILBOX_CONTENT_SIZE_BYTES = 4 * 1024 * 1024;
41-
42-
private final MailboxService<TransferableBlock> _mailbox;
43-
private final List<MailboxIdentifier> _destinations;
42+
private final List<SendingMailbox<TransferableBlock>> _sendingMailboxes;
4443
private final BlockSplitter _splitter;
4544

4645
public static BlockExchange getExchange(MailboxService<TransferableBlock> mailboxService,
47-
List<MailboxIdentifier> destinations, RelDistribution.Type exchangeType,
48-
KeySelector<Object[], Object[]> selector, BlockSplitter splitter) {
46+
List<MailboxIdentifier> destinations, RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> selector,
47+
BlockSplitter splitter) {
48+
List<SendingMailbox<TransferableBlock>> sendingMailboxes = new ArrayList<>();
49+
for (MailboxIdentifier mid : destinations) {
50+
sendingMailboxes.add(mailboxService.getSendingMailbox(mid));
51+
}
4952
switch (exchangeType) {
5053
case SINGLETON:
51-
return new SingletonExchange(mailboxService, destinations, splitter);
54+
return new SingletonExchange(sendingMailboxes, splitter);
5255
case HASH_DISTRIBUTED:
53-
return new HashExchange(mailboxService, destinations, selector, splitter);
56+
return new HashExchange(sendingMailboxes, selector, splitter);
5457
case RANDOM_DISTRIBUTED:
55-
return new RandomExchange(mailboxService, destinations, splitter);
58+
return new RandomExchange(sendingMailboxes, splitter);
5659
case BROADCAST_DISTRIBUTED:
57-
return new BroadcastExchange(mailboxService, destinations, splitter);
60+
return new BroadcastExchange(sendingMailboxes, splitter);
5861
case ROUND_ROBIN_DISTRIBUTED:
5962
case RANGE_DISTRIBUTED:
6063
case ANY:
@@ -63,29 +66,20 @@ public static BlockExchange getExchange(MailboxService<TransferableBlock> mailbo
6366
}
6467
}
6568

66-
protected BlockExchange(MailboxService<TransferableBlock> mailbox, List<MailboxIdentifier> destinations,
67-
BlockSplitter splitter) {
68-
_mailbox = mailbox;
69-
_destinations = destinations;
69+
protected BlockExchange(List<SendingMailbox<TransferableBlock>> sendingMailboxes, BlockSplitter splitter) {
70+
_sendingMailboxes = sendingMailboxes;
7071
_splitter = splitter;
7172
}
7273

7374
public void send(TransferableBlock block) {
7475
if (block.isEndOfStreamBlock()) {
75-
_destinations.forEach(destination -> sendBlock(destination, block));
76+
_sendingMailboxes.forEach(destination -> sendBlock(destination, block));
7677
return;
7778
}
78-
79-
Iterator<RoutedBlock> routedBlocks = route(_destinations, block);
80-
while (routedBlocks.hasNext()) {
81-
RoutedBlock next = routedBlocks.next();
82-
sendBlock(next._destination, next._block);
83-
}
79+
route(_sendingMailboxes, block);
8480
}
8581

86-
private void sendBlock(MailboxIdentifier mailboxId, TransferableBlock block) {
87-
SendingMailbox<TransferableBlock> sendingMailbox = _mailbox.getSendingMailbox(mailboxId);
88-
82+
protected void sendBlock(SendingMailbox<TransferableBlock> sendingMailbox, TransferableBlock block) {
8983
if (block.isEndOfStreamBlock()) {
9084
sendingMailbox.send(block);
9185
sendingMailbox.complete();
@@ -94,21 +88,10 @@ private void sendBlock(MailboxIdentifier mailboxId, TransferableBlock block) {
9488

9589
DataBlock.Type type = block.getType();
9690
Iterator<TransferableBlock> splits = _splitter.split(block, type, MAX_MAILBOX_CONTENT_SIZE_BYTES);
97-
9891
while (splits.hasNext()) {
9992
sendingMailbox.send(splits.next());
10093
}
10194
}
10295

103-
protected abstract Iterator<RoutedBlock> route(List<MailboxIdentifier> destinations, TransferableBlock block);
104-
105-
protected static class RoutedBlock {
106-
final MailboxIdentifier _destination;
107-
final TransferableBlock _block;
108-
109-
protected RoutedBlock(MailboxIdentifier destination, TransferableBlock block) {
110-
_destination = destination;
111-
_block = block;
112-
}
113-
}
96+
protected abstract void route(List<SendingMailbox<TransferableBlock>> destinations, TransferableBlock block);
11497
}

pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchange.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,8 @@
1818
*/
1919
package org.apache.pinot.query.runtime.operator.exchange;
2020

21-
import java.util.Iterator;
2221
import java.util.List;
23-
import org.apache.pinot.query.mailbox.MailboxIdentifier;
24-
import org.apache.pinot.query.mailbox.MailboxService;
22+
import org.apache.pinot.query.mailbox.SendingMailbox;
2523
import org.apache.pinot.query.runtime.blocks.BlockSplitter;
2624
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
2725

@@ -31,13 +29,14 @@
3129
*/
3230
class BroadcastExchange extends BlockExchange {
3331

34-
protected BroadcastExchange(MailboxService<TransferableBlock> mailbox, List<MailboxIdentifier> destinations,
35-
BlockSplitter splitter) {
36-
super(mailbox, destinations, splitter);
32+
protected BroadcastExchange(List<SendingMailbox<TransferableBlock>> sendingMailboxes, BlockSplitter splitter) {
33+
super(sendingMailboxes, splitter);
3734
}
3835

3936
@Override
40-
protected Iterator<RoutedBlock> route(List<MailboxIdentifier> destinations, TransferableBlock block) {
41-
return destinations.stream().map(dest -> new RoutedBlock(dest, block)).iterator();
37+
protected void route(List<SendingMailbox<TransferableBlock>> destinations, TransferableBlock block) {
38+
for (SendingMailbox mailbox : destinations) {
39+
sendBlock(mailbox, block);
40+
}
4241
}
4342
}

0 commit comments

Comments
 (0)