1818 */
1919package org .apache .pinot .query .planner .physical ;
2020
21+ import java .util .Collections ;
2122import java .util .HashMap ;
2223import java .util .List ;
2324import java .util .Map ;
25+ import org .apache .calcite .rel .RelDistribution ;
2426import org .apache .pinot .query .planner .plannode .DefaultPostOrderTraversalVisitor ;
25- import org .apache .pinot .query .planner .plannode .MailboxReceiveNode ;
2627import org .apache .pinot .query .planner .plannode .MailboxSendNode ;
2728import org .apache .pinot .query .planner .plannode .PlanNode ;
2829import org .apache .pinot .query .routing .MailboxMetadata ;
@@ -35,57 +36,58 @@ public class MailboxAssignmentVisitor extends DefaultPostOrderTraversalVisitor<V
3536
3637 @ Override
3738 public Void process (PlanNode node , DispatchablePlanContext context ) {
38- if (node instanceof MailboxSendNode || node instanceof MailboxReceiveNode ) {
39- int receiverStageId =
40- isMailboxReceiveNode (node ) ? node .getPlanFragmentId () : ((MailboxSendNode ) node ).getReceiverStageId ();
41- int senderStageId =
42- isMailboxReceiveNode (node ) ? ((MailboxReceiveNode ) node ).getSenderStageId () : node .getPlanFragmentId ();
43- DispatchablePlanMetadata receiverStagePlanMetadata =
44- context .getDispatchablePlanMetadataMap ().get (receiverStageId );
45- DispatchablePlanMetadata senderStagePlanMetadata = context .getDispatchablePlanMetadataMap ().get (senderStageId );
46- receiverStagePlanMetadata .getServerInstanceToWorkerIdMap ().entrySet ().stream ().forEach (receiverEntry -> {
47- QueryServerInstance receiverServerInstance = receiverEntry .getKey ();
48- List <Integer > receiverWorkerIds = receiverEntry .getValue ();
49- for (int receiverWorkerId : receiverWorkerIds ) {
50- receiverStagePlanMetadata .getWorkerIdToMailBoxIdsMap ().putIfAbsent (receiverWorkerId , new HashMap <>());
51- senderStagePlanMetadata .getServerInstanceToWorkerIdMap ().entrySet ().stream ().forEach (senderEntry -> {
52- QueryServerInstance senderServerInstance = senderEntry .getKey ();
53- List <Integer > senderWorkerIds = senderEntry .getValue ();
54- for (int senderWorkerId : senderWorkerIds ) {
55- MailboxMetadata mailboxMetadata =
56- isMailboxReceiveNode (node )
57- ? getMailboxMetadata (receiverStagePlanMetadata , senderStageId , receiverWorkerId )
58- : getMailboxMetadata (senderStagePlanMetadata , receiverStageId , senderWorkerId );
59- mailboxMetadata .getMailBoxIdList ().add (
60- MailboxIdUtils .toPlanMailboxId (senderStageId , senderWorkerId , receiverStageId , receiverWorkerId ));
61- VirtualServerAddress virtualServerAddress =
62- isMailboxReceiveNode (node )
63- ? new VirtualServerAddress (senderServerInstance , senderWorkerId )
64- : new VirtualServerAddress (receiverServerInstance , receiverWorkerId );
65- mailboxMetadata .getVirtualAddressList ().add (virtualServerAddress );
66- }
67- });
68- }
69- });
70- }
71- return null ;
72- }
39+ if (node instanceof MailboxSendNode ) {
40+ MailboxSendNode sendNode = (MailboxSendNode ) node ;
41+ int senderFragmentId = sendNode .getPlanFragmentId ();
42+ int receiverFragmentId = sendNode .getReceiverStageId ();
43+ Map <Integer , DispatchablePlanMetadata > metadataMap = context .getDispatchablePlanMetadataMap ();
44+ DispatchablePlanMetadata senderMetadata = metadataMap .get (senderFragmentId );
45+ DispatchablePlanMetadata receiverMetadata = metadataMap .get (receiverFragmentId );
46+ Map <QueryServerInstance , List <Integer >> senderWorkerIdsMap = senderMetadata .getServerInstanceToWorkerIdMap ();
47+ Map <QueryServerInstance , List <Integer >> receiverWorkerIdsMap = receiverMetadata .getServerInstanceToWorkerIdMap ();
48+ Map <Integer , Map <Integer , MailboxMetadata >> senderMailboxesMap = senderMetadata .getWorkerIdToMailBoxIdsMap ();
49+ Map <Integer , Map <Integer , MailboxMetadata >> receiverMailboxesMap = receiverMetadata .getWorkerIdToMailBoxIdsMap ();
7350
74- private static boolean isMailboxReceiveNode (PlanNode node ) {
75- return node instanceof MailboxReceiveNode ;
76- }
77-
78- private MailboxMetadata getMailboxMetadata (DispatchablePlanMetadata dispatchablePlanMetadata , int planFragmentId ,
79- int workerId ) {
80- Map <Integer , Map <Integer , MailboxMetadata >> workerIdToMailBoxIdsMap =
81- dispatchablePlanMetadata .getWorkerIdToMailBoxIdsMap ();
82- if (!workerIdToMailBoxIdsMap .containsKey (workerId )) {
83- workerIdToMailBoxIdsMap .put (workerId , new HashMap <>());
84- }
85- Map <Integer , MailboxMetadata > planFragmentToMailboxMetadataMap = workerIdToMailBoxIdsMap .get (workerId );
86- if (!planFragmentToMailboxMetadataMap .containsKey (planFragmentId )) {
87- planFragmentToMailboxMetadataMap .put (planFragmentId , new MailboxMetadata ());
51+ if (sendNode .getDistributionType () == RelDistribution .Type .SINGLETON ) {
52+ // For SINGLETON exchange type, send the data to the same instance (same worker id)
53+ senderWorkerIdsMap .forEach ((serverInstance , workerIds ) -> {
54+ for (int workerId : workerIds ) {
55+ MailboxMetadata mailboxMetadata = new MailboxMetadata (Collections .singletonList (
56+ MailboxIdUtils .toPlanMailboxId (senderFragmentId , workerId , receiverFragmentId , workerId )),
57+ Collections .singletonList (new VirtualServerAddress (serverInstance , workerId )), Collections .emptyMap ());
58+ senderMailboxesMap .computeIfAbsent (workerId , k -> new HashMap <>()).put (receiverFragmentId , mailboxMetadata );
59+ receiverMailboxesMap .computeIfAbsent (workerId , k -> new HashMap <>()).put (senderFragmentId , mailboxMetadata );
60+ }
61+ });
62+ } else {
63+ // For other exchange types, send the data to all the instances in the receiver fragment
64+ // TODO: Add support for more exchange types
65+ senderWorkerIdsMap .forEach ((senderServerInstance , senderWorkerIds ) -> {
66+ for (int senderWorkerId : senderWorkerIds ) {
67+ Map <Integer , MailboxMetadata > senderMailboxMetadataMap =
68+ senderMailboxesMap .computeIfAbsent (senderWorkerId , k -> new HashMap <>());
69+ receiverWorkerIdsMap .forEach ((receiverServerInstance , receiverWorkerIds ) -> {
70+ for (int receiverWorkerId : receiverWorkerIds ) {
71+ Map <Integer , MailboxMetadata > receiverMailboxMetadataMap =
72+ receiverMailboxesMap .computeIfAbsent (receiverWorkerId , k -> new HashMap <>());
73+ String mailboxId = MailboxIdUtils .toPlanMailboxId (senderFragmentId , senderWorkerId , receiverFragmentId ,
74+ receiverWorkerId );
75+ MailboxMetadata senderMailboxMetadata =
76+ senderMailboxMetadataMap .computeIfAbsent (receiverFragmentId , k -> new MailboxMetadata ());
77+ senderMailboxMetadata .getMailBoxIdList ().add (mailboxId );
78+ senderMailboxMetadata .getVirtualAddressList ()
79+ .add (new VirtualServerAddress (receiverServerInstance , receiverWorkerId ));
80+ MailboxMetadata receiverMailboxMetadata =
81+ receiverMailboxMetadataMap .computeIfAbsent (senderFragmentId , k -> new MailboxMetadata ());
82+ receiverMailboxMetadata .getMailBoxIdList ().add (mailboxId );
83+ receiverMailboxMetadata .getVirtualAddressList ()
84+ .add (new VirtualServerAddress (senderServerInstance , senderWorkerId ));
85+ }
86+ });
87+ }
88+ });
89+ }
8890 }
89- return planFragmentToMailboxMetadataMap . get ( planFragmentId ) ;
91+ return null ;
9092 }
9193}
0 commit comments