Skip to content

Conversation

@ankitsultana
Copy link
Contributor

@ankitsultana ankitsultana commented Feb 23, 2023

Checklist:

  • Clean-up the PR, run a self-review.
  • Add cancellation for the leaf stage MailboxSendOperator.
  • Rebase on master
  • Add UTs
  • Update design doc.

Tested internally at Uber with a reasonably large cluster and large joins, and this fixes all the reliability issues for us.

@walterddr
Copy link
Contributor

great work cleaning up the stability issues. thank you @ankitsultana . please comment when this PR is ready for final review. I will take a look and comment as we go as well.

@ankitsultana ankitsultana changed the title [wip] [Do Not Review] [multistage] Fix Leaks in Mailbox [Draft] [multistage] Fix Leaks in Mailbox Mar 3, 2023
@codecov-commenter
Copy link

codecov-commenter commented Mar 3, 2023

Codecov Report

Merging #10322 (35b3bad) into master (6bd8a7d) will increase coverage by 0.06%.
The diff coverage is 80.08%.

@@             Coverage Diff              @@
##             master   #10322      +/-   ##
============================================
+ Coverage     70.25%   70.32%   +0.06%     
- Complexity     6023     6074      +51     
============================================
  Files          2049     2050       +1     
  Lines        111068   111187     +119     
  Branches      16898    16908      +10     
============================================
+ Hits          78031    78189     +158     
+ Misses        27560    27522      -38     
+ Partials       5477     5476       -1     
Flag Coverage Δ
integration1 24.49% <0.00%> (-0.01%) ⬇️
integration2 24.32% <0.00%> (-0.07%) ⬇️
unittests1 67.94% <80.08%> (+0.09%) ⬆️
unittests2 13.90% <0.00%> (-0.01%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...y/runtime/operator/exchange/BroadcastExchange.java 100.00% <ø> (ø)
.../query/runtime/operator/exchange/HashExchange.java 100.00% <ø> (ø)
...uery/runtime/operator/exchange/RandomExchange.java 100.00% <ø> (ø)
...y/runtime/operator/exchange/SingletonExchange.java 100.00% <ø> (ø)
.../mailbox/channel/MailboxContentStreamObserver.java 74.13% <62.16%> (+7.94%) ⬆️
...he/pinot/query/mailbox/InMemoryMailboxService.java 80.00% <65.00%> (-20.00%) ⬇️
...va/org/apache/pinot/query/runtime/QueryRunner.java 81.81% <72.72%> (-2.50%) ⬇️
...apache/pinot/query/mailbox/GrpcMailboxService.java 80.95% <73.07%> (-14.89%) ⬇️
...apache/pinot/query/mailbox/GrpcSendingMailbox.java 81.57% <73.68%> (+0.93%) ⬆️
.../pinot/query/mailbox/InMemoryReceivingMailbox.java 85.00% <75.00%> (+11.66%) ⬆️
... and 13 more

... and 37 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

_mailboxService = mailboxService;
_requestId = requestId;
_stageId = stageId;
_timeoutMs = timeoutMs;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self-review: We don't need to store timeoutMs anymore. We can pass deadline directly to MailboxReceiveOperator.

* </ol>
* @param mailboxId
*/
void releaseReceivingMailbox(MailboxIdentifier mailboxId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think releaseReceivingMailbox should only invalidate the mailbox cached inside the MailboxService.

MailboxOperator will have

  1. info regarding the actual mailbox
  2. the mailbox service that caches the mailboxID -> mailbox object mapping entry

we need to make the ownership relation a bit more clear. to me with the current model, cancel/close needs to be idempotent and states needs to be kept and even though it is only done 1 once per mailbox. each call into these state transition will have to be guarded by locks and it is not ideal.

Copy link
Contributor

@walterddr walterddr Mar 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO.

  1. MailboxService ALWAYS owns the MailboxContentStreamObserver (and thus the GRPCReceivingMailbox)
    • Throughout the interaction of the GRPC layer, both the mailbox grpc stub (GRPCMailboxServer.open) and the MailboxReceiveOperator will access MailboxService.getReceivingMailbox and thus we cannot transfer the ownership of the receiving mailbox to the mailbox operator (e.g. delete it from the cache)
  2. MailboxReceiveOperator DOES NOT OWN the observer. thus it should ONLY call method to interact with mailbox via mailboxService. This means:
    • it should not be acquiring any Receiving mailbox object, rather it should only use the APIs to get contents out of the receiving mailbox.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on the contrary, sending side, mailboxservice NEVER owns the GRPCSendingMailbox, b/c it is always constructed and initialized by the MailboxSendObserver, so mailbox send observer holds the reference to the StatusStreamObserver and thus responsible for calling close / cancel on it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the relationship looks like this

 * ----------------------------------------------------------------------------
 * (Operator Layer)
 *            MailboxSendOperator ---------> MailboxReceiveOperator
 *                   |                                  |
 *                   |                                  |
 * ------------------|----------------------------------|----------------------
 *                   |                                  |
 * (MailboxService)  |                                  |  ( WAIT ON INIT )
 *                  \_/                                \_/                   
 *           SendingMailbox                      ReceivingMailbox
 * ------------------|---------------------------------/^\---------------------
 * (Physical Layer)  |                                  |
 * (e.g. GRPC)       |                                  |  ( INITIALIZE )
 *                   |                                  |
 *                  \_/                                 |
 *           StreamObserver -------------------> StreamObserver
 * ----------------------------------------------------------------------------

and the only place we require MailboxService to own the closable object is ReceivingMailbox.

This means

  1. we need to make sure that MailboxReceiveOperator gets the mailbox but not store it
    • possibly changing the getReceivingMailbox API to TransferrableBlock receiveMail(MailboxID)
  2. SendingMailbox can be change into a util-based initialization and doesn't need to go through MailboxServcie (but for the simplicity we can still keep the current way)

Comment on lines +108 to +110
// TODO: This can happen if the call is already closed. Consider removing this log altogether or find a way
// to check if the stream is already closed.
LOGGER.info("Tried to cancel receiving mailbox", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's search for a better way to deal with already closed issue.
I think i read some article regarding close/half-close conditions. but we can follow up later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not related to gRPC but our business logic. There can be concurrent cancellations/terminations of the stream so this can always happen.

@Override
public String getMailboxId() {
return _mailboxId;
public boolean isClosed() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i dont think this API is needed.

mailbox should only expose.
send, complete, cancel, close. whether it is closed should be an internal state of mailbox.

public class MailboxContentStreamObserver implements StreamObserver<Mailbox.MailboxContent> {
private static final Logger LOGGER = LoggerFactory.getLogger(MailboxContentStreamObserver.class);
private static final int DEFAULT_MAX_PENDING_MAILBOX_CONTENT = 5;
private static final long DEFAULT_QUEUE_POLL_TIMEOUT_MS = 120_000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the reason for this? and how is this determined?
since it is controlled by the ReceiveOperator to move on to the next mailbox if the current one doesn'thave data. shouldn't we not put any wait here and simply return a null so that ReceiveOperator can decided what to do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the timeout for the queue offer and not poll (it should be renamed). The receive operator will return null if there's no data in the receivingBuffer.

This should ideally be tied to the query deadline but we don't have it in MailboxContentStreamObserver right now. At present I kept it to a reasonably large number. We can consider either increasing or decreasing this.

@ankitsultana ankitsultana marked this pull request as ready for review March 8, 2023 21:28
@ankitsultana ankitsultana changed the title [Draft] [multistage] Fix Leaks in Mailbox [multistage] Fix Leaks in Mailbox Mar 8, 2023
Copy link
Contributor

@walterddr walterddr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm.

@walterddr walterddr merged commit 5b289fc into apache:master Mar 10, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants