-
Notifications
You must be signed in to change notification settings - Fork 1.4k
[multistage] Fix Leaks in Mailbox #10322
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
great work cleaning up the stability issues. thank you @ankitsultana . please comment when this PR is ready for final review. I will take a look and comment as we go as well. |
400256d to
aa6e0b8
Compare
Codecov Report
@@ Coverage Diff @@
## master #10322 +/- ##
============================================
+ Coverage 70.25% 70.32% +0.06%
- Complexity 6023 6074 +51
============================================
Files 2049 2050 +1
Lines 111068 111187 +119
Branches 16898 16908 +10
============================================
+ Hits 78031 78189 +158
+ Misses 27560 27522 -38
+ Partials 5477 5476 -1
Flags with carried forward coverage won't be shown. Click here to find out more.
... and 37 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
...ry-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
Outdated
Show resolved
Hide resolved
| _mailboxService = mailboxService; | ||
| _requestId = requestId; | ||
| _stageId = stageId; | ||
| _timeoutMs = timeoutMs; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self-review: We don't need to store timeoutMs anymore. We can pass deadline directly to MailboxReceiveOperator.
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
Show resolved
Hide resolved
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
Outdated
Show resolved
Hide resolved
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
Outdated
Show resolved
Hide resolved
| * </ol> | ||
| * @param mailboxId | ||
| */ | ||
| void releaseReceivingMailbox(MailboxIdentifier mailboxId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think releaseReceivingMailbox should only invalidate the mailbox cached inside the MailboxService.
MailboxOperator will have
- info regarding the actual mailbox
- the mailbox service that caches the mailboxID -> mailbox object mapping entry
we need to make the ownership relation a bit more clear. to me with the current model, cancel/close needs to be idempotent and states needs to be kept and even though it is only done 1 once per mailbox. each call into these state transition will have to be guarded by locks and it is not ideal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO.
MailboxServiceALWAYS owns theMailboxContentStreamObserver(and thus theGRPCReceivingMailbox)- Throughout the interaction of the GRPC layer, both the mailbox grpc stub (
GRPCMailboxServer.open) and theMailboxReceiveOperatorwill accessMailboxService.getReceivingMailboxand thus we cannot transfer the ownership of the receiving mailbox to the mailbox operator (e.g. delete it from the cache)
- Throughout the interaction of the GRPC layer, both the mailbox grpc stub (
MailboxReceiveOperatorDOES NOT OWN the observer. thus it should ONLY call method to interact with mailbox via mailboxService. This means:- it should not be acquiring any Receiving mailbox object, rather it should only use the APIs to get contents out of the receiving mailbox.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
on the contrary, sending side, mailboxservice NEVER owns the GRPCSendingMailbox, b/c it is always constructed and initialized by the MailboxSendObserver, so mailbox send observer holds the reference to the StatusStreamObserver and thus responsible for calling close / cancel on it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the relationship looks like this
* ----------------------------------------------------------------------------
* (Operator Layer)
* MailboxSendOperator ---------> MailboxReceiveOperator
* | |
* | |
* ------------------|----------------------------------|----------------------
* | |
* (MailboxService) | | ( WAIT ON INIT )
* \_/ \_/
* SendingMailbox ReceivingMailbox
* ------------------|---------------------------------/^\---------------------
* (Physical Layer) | |
* (e.g. GRPC) | | ( INITIALIZE )
* | |
* \_/ |
* StreamObserver -------------------> StreamObserver
* ----------------------------------------------------------------------------
and the only place we require MailboxService to own the closable object is ReceivingMailbox.
This means
- we need to make sure that MailboxReceiveOperator gets the mailbox but not store it
- possibly changing the getReceivingMailbox API to
TransferrableBlock receiveMail(MailboxID)
- possibly changing the getReceivingMailbox API to
- SendingMailbox can be change into a util-based initialization and doesn't need to go through MailboxServcie (but for the simplicity we can still keep the current way)
| // TODO: This can happen if the call is already closed. Consider removing this log altogether or find a way | ||
| // to check if the stream is already closed. | ||
| LOGGER.info("Tried to cancel receiving mailbox", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's search for a better way to deal with already closed issue.
I think i read some article regarding close/half-close conditions. but we can follow up later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not related to gRPC but our business logic. There can be concurrent cancellations/terminations of the stream so this can always happen.
| @Override | ||
| public String getMailboxId() { | ||
| return _mailboxId; | ||
| public boolean isClosed() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i dont think this API is needed.
mailbox should only expose.
send, complete, cancel, close. whether it is closed should be an internal state of mailbox.
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
Show resolved
Hide resolved
...ntime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
Outdated
Show resolved
Hide resolved
| public class MailboxContentStreamObserver implements StreamObserver<Mailbox.MailboxContent> { | ||
| private static final Logger LOGGER = LoggerFactory.getLogger(MailboxContentStreamObserver.class); | ||
| private static final int DEFAULT_MAX_PENDING_MAILBOX_CONTENT = 5; | ||
| private static final long DEFAULT_QUEUE_POLL_TIMEOUT_MS = 120_000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the reason for this? and how is this determined?
since it is controlled by the ReceiveOperator to move on to the next mailbox if the current one doesn'thave data. shouldn't we not put any wait here and simply return a null so that ReceiveOperator can decided what to do?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the timeout for the queue offer and not poll (it should be renamed). The receive operator will return null if there's no data in the receivingBuffer.
This should ideally be tied to the query deadline but we don't have it in MailboxContentStreamObserver right now. At present I kept it to a reasonably large number. We can consider either increasing or decreasing this.
...y-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
Show resolved
Hide resolved
...ry-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
Outdated
Show resolved
Hide resolved
...ry-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
Outdated
Show resolved
Hide resolved
...ry-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
Outdated
Show resolved
Hide resolved
...ry-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
Show resolved
Hide resolved
51e6e84 to
35b3bad
Compare
walterddr
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm.
Checklist:
Tested internally at Uber with a reasonably large cluster and large joins, and this fixes all the reliability issues for us.