Skip to content

Commit f1966d9

Browse files
authored
[Multi-stage] Support partition based colocated join (#10886)
1 parent 90dc3a3 commit f1966d9

File tree

18 files changed

+536
-303
lines changed

18 files changed

+536
-303
lines changed

pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import org.apache.pinot.broker.routing.segmentmetadata.SegmentZkMetadataFetchListener;
4747
import org.apache.pinot.broker.routing.segmentmetadata.SegmentZkMetadataFetcher;
4848
import org.apache.pinot.broker.routing.segmentpartition.SegmentPartitionMetadataManager;
49-
import org.apache.pinot.broker.routing.segmentpartition.TablePartitionInfo;
5049
import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector;
5150
import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelectorFactory;
5251
import org.apache.pinot.broker.routing.segmentpruner.SegmentPruner;
@@ -63,6 +62,7 @@
6362
import org.apache.pinot.common.utils.helix.HelixHelper;
6463
import org.apache.pinot.core.routing.RoutingManager;
6564
import org.apache.pinot.core.routing.RoutingTable;
65+
import org.apache.pinot.core.routing.TablePartitionInfo;
6666
import org.apache.pinot.core.routing.TimeBoundaryInfo;
6767
import org.apache.pinot.core.transport.ServerInstance;
6868
import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
@@ -651,8 +651,7 @@ public Map<String, ServerInstance> getEnabledServerInstanceMap() {
651651

652652
@Override
653653
public Map<String, ServerInstance> getEnabledServersForTableTenant(String tableNameWithType) {
654-
return _tableTenantServersMap.containsKey(tableNameWithType) ? _tableTenantServersMap.get(tableNameWithType)
655-
: new HashMap<String, ServerInstance>();
654+
return _tableTenantServersMap.getOrDefault(tableNameWithType, Collections.emptyMap());
656655
}
657656

658657
private String getIdealStatePath(String tableNameWithType) {
@@ -680,6 +679,7 @@ public TimeBoundaryInfo getTimeBoundaryInfo(String offlineTableName) {
680679
}
681680

682681
@Nullable
682+
@Override
683683
public TablePartitionInfo getTablePartitionInfo(String tableNameWithType) {
684684
RoutingEntry routingEntry = _routingEntryMap.get(tableNameWithType);
685685
if (routingEntry == null) {

pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManager.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import org.apache.helix.model.IdealState;
3131
import org.apache.helix.zookeeper.datamodel.ZNRecord;
3232
import org.apache.pinot.broker.routing.segmentmetadata.SegmentZkMetadataFetchListener;
33+
import org.apache.pinot.core.routing.TablePartitionInfo;
34+
import org.apache.pinot.core.routing.TablePartitionInfo.PartitionInfo;
3335
import org.apache.pinot.segment.spi.partition.PartitionFunction;
3436
import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
3537
import org.slf4j.Logger;
@@ -120,7 +122,7 @@ private List<String> getOnlineServers(ExternalView externalView, String segment)
120122
}
121123

122124
private void computeTablePartitionInfo() {
123-
TablePartitionInfo.PartitionInfo[] partitionInfoMap = new TablePartitionInfo.PartitionInfo[_numPartitions];
125+
PartitionInfo[] partitionInfoMap = new PartitionInfo[_numPartitions];
124126
Set<String> segmentsWithInvalidPartition = new HashSet<>();
125127
for (Map.Entry<String, SegmentInfo> entry : _segmentInfoMap.entrySet()) {
126128
String segment = entry.getKey();
@@ -131,16 +133,16 @@ private void computeTablePartitionInfo() {
131133
segmentsWithInvalidPartition.add(segment);
132134
continue;
133135
}
134-
TablePartitionInfo.PartitionInfo partitionInfo = partitionInfoMap[partitionId];
136+
PartitionInfo partitionInfo = partitionInfoMap[partitionId];
135137
if (partitionInfo == null) {
136-
partitionInfo = new TablePartitionInfo.PartitionInfo();
137-
partitionInfo._segments = new ArrayList<>();
138-
partitionInfo._segments.add(segment);
139-
partitionInfo._fullyReplicatedServers = new HashSet<>(onlineServers);
138+
Set<String> fullyReplicatedServers = new HashSet<>(onlineServers);
139+
List<String> segments = new ArrayList<>();
140+
segments.add(segment);
141+
partitionInfo = new PartitionInfo(fullyReplicatedServers, segments);
140142
partitionInfoMap[partitionId] = partitionInfo;
141143
} else {
142-
partitionInfo._segments.add(segment);
143144
partitionInfo._fullyReplicatedServers.retainAll(onlineServers);
145+
partitionInfo._segments.add(segment);
144146
}
145147
}
146148
if (!segmentsWithInvalidPartition.isEmpty()) {

pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManagerTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
3838
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
3939
import org.apache.pinot.controller.helix.ControllerTest;
40+
import org.apache.pinot.core.routing.TablePartitionInfo;
4041
import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
4142
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
4243
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;

pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.pinot.core.routing;
2020

2121
import java.util.Map;
22+
import javax.annotation.Nullable;
2223
import org.apache.pinot.common.request.BrokerRequest;
2324
import org.apache.pinot.core.transport.ServerInstance;
2425
import org.apache.pinot.spi.annotations.InterfaceAudience;
@@ -50,6 +51,7 @@ public interface RoutingManager {
5051
* @param brokerRequest the broker request constructed from a query.
5152
* @return the route table.
5253
*/
54+
@Nullable
5355
RoutingTable getRoutingTable(BrokerRequest brokerRequest, long requestId);
5456

5557
/**
@@ -66,8 +68,15 @@ public interface RoutingManager {
6668
* @param offlineTableName offline table name
6769
* @return time boundary info.
6870
*/
71+
@Nullable
6972
TimeBoundaryInfo getTimeBoundaryInfo(String offlineTableName);
7073

74+
/**
75+
* Returns the {@link TablePartitionInfo} for a given table.
76+
*/
77+
@Nullable
78+
TablePartitionInfo getTablePartitionInfo(String tableNameWithType);
79+
7180
/**
7281
* Returns all enabled server instances for a given table's server tenant.
7382
*

pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpartition/TablePartitionInfo.java renamed to pinot-core/src/main/java/org/apache/pinot/core/routing/TablePartitionInfo.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* specific language governing permissions and limitations
1717
* under the License.
1818
*/
19-
package org.apache.pinot.broker.routing.segmentpartition;
19+
package org.apache.pinot.core.routing;
2020

2121
import java.util.List;
2222
import java.util.Set;
@@ -65,7 +65,12 @@ public Set<String> getSegmentsWithInvalidPartition() {
6565
}
6666

6767
public static class PartitionInfo {
68-
List<String> _segments;
69-
Set<String> _fullyReplicatedServers;
68+
public final Set<String> _fullyReplicatedServers;
69+
public final List<String> _segments;
70+
71+
public PartitionInfo(Set<String> fullyReplicatedServers, List<String> segments) {
72+
_fullyReplicatedServers = fullyReplicatedServers;
73+
_segments = segments;
74+
}
7075
}
7176
}

pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,9 @@ public void onMatch(RelOptRuleCall call) {
6565
RelNode rightExchange;
6666
JoinInfo joinInfo = join.analyzeCondition();
6767

68-
boolean isColocatedJoin = PinotHintStrategyTable.containsHintOption(join.getHints(),
69-
PinotHintOptions.JOIN_HINT_OPTIONS, PinotHintOptions.JoinHintOptions.IS_COLOCATED_BY_JOIN_KEYS);
68+
boolean isColocatedJoin =
69+
PinotHintStrategyTable.containsHintOption(join.getHints(), PinotHintOptions.JOIN_HINT_OPTIONS,
70+
PinotHintOptions.JoinHintOptions.IS_COLOCATED_BY_JOIN_KEYS);
7071
if (isColocatedJoin) {
7172
// join exchange are colocated, we should directly pass through via join key
7273
leftExchange = PinotLogicalExchange.create(leftInput, RelDistributions.SINGLETON);
@@ -82,10 +83,9 @@ public void onMatch(RelOptRuleCall call) {
8283
}
8384

8485
RelNode newJoinNode =
85-
new LogicalJoin(join.getCluster(), join.getTraitSet(), leftExchange, rightExchange, join.getCondition(),
86-
join.getVariablesSet(), join.getJoinType(), join.isSemiJoinDone(),
86+
new LogicalJoin(join.getCluster(), join.getTraitSet(), join.getHints(), leftExchange, rightExchange,
87+
join.getCondition(), join.getVariablesSet(), join.getJoinType(), join.isSemiJoinDone(),
8788
ImmutableList.copyOf(join.getSystemFieldList()));
88-
8989
call.transformTo(newJoinNode);
9090
}
9191
}

pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import org.apache.calcite.rel.core.JoinRelType;
3232
import org.apache.calcite.rel.core.SetOp;
3333
import org.apache.calcite.rel.core.SortExchange;
34+
import org.apache.calcite.rel.hint.PinotHintOptions;
35+
import org.apache.calcite.rel.hint.PinotHintStrategyTable;
3436
import org.apache.calcite.rel.logical.LogicalAggregate;
3537
import org.apache.calcite.rel.logical.LogicalFilter;
3638
import org.apache.calcite.rel.logical.LogicalJoin;
@@ -178,12 +180,15 @@ private static PlanNode convertLogicalJoin(LogicalJoin node, int currentStageId)
178180

179181
// Parse out all equality JOIN conditions
180182
JoinInfo joinInfo = node.analyzeCondition();
181-
FieldSelectionKeySelector leftFieldSelectionKeySelector = new FieldSelectionKeySelector(joinInfo.leftKeys);
182-
FieldSelectionKeySelector rightFieldSelectionKeySelector = new FieldSelectionKeySelector(joinInfo.rightKeys);
183+
JoinNode.JoinKeys joinKeys = new JoinNode.JoinKeys(new FieldSelectionKeySelector(joinInfo.leftKeys),
184+
new FieldSelectionKeySelector(joinInfo.rightKeys));
185+
List<RexExpression> joinClause =
186+
joinInfo.nonEquiConditions.stream().map(RexExpression::toRexExpression).collect(Collectors.toList());
187+
boolean isColocatedJoin =
188+
PinotHintStrategyTable.containsHintOption(node.getHints(), PinotHintOptions.JOIN_HINT_OPTIONS,
189+
PinotHintOptions.JoinHintOptions.IS_COLOCATED_BY_JOIN_KEYS);
183190
return new JoinNode(currentStageId, toDataSchema(node.getRowType()), toDataSchema(node.getLeft().getRowType()),
184-
toDataSchema(node.getRight().getRowType()), joinType,
185-
new JoinNode.JoinKeys(leftFieldSelectionKeySelector, rightFieldSelectionKeySelector),
186-
joinInfo.nonEquiConditions.stream().map(RexExpression::toRexExpression).collect(Collectors.toList()));
191+
toDataSchema(node.getRight().getRowType()), joinType, joinKeys, joinClause, isColocatedJoin);
187192
}
188193

189194
private static DataSchema toDataSchema(RelDataType rowType) {

pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java

Lines changed: 53 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,12 @@
1818
*/
1919
package org.apache.pinot.query.planner.physical;
2020

21+
import java.util.Collections;
2122
import java.util.HashMap;
2223
import java.util.List;
2324
import java.util.Map;
25+
import org.apache.calcite.rel.RelDistribution;
2426
import org.apache.pinot.query.planner.plannode.DefaultPostOrderTraversalVisitor;
25-
import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
2627
import org.apache.pinot.query.planner.plannode.MailboxSendNode;
2728
import org.apache.pinot.query.planner.plannode.PlanNode;
2829
import org.apache.pinot.query.routing.MailboxMetadata;
@@ -35,57 +36,58 @@ public class MailboxAssignmentVisitor extends DefaultPostOrderTraversalVisitor<V
3536

3637
@Override
3738
public Void process(PlanNode node, DispatchablePlanContext context) {
38-
if (node instanceof MailboxSendNode || node instanceof MailboxReceiveNode) {
39-
int receiverStageId =
40-
isMailboxReceiveNode(node) ? node.getPlanFragmentId() : ((MailboxSendNode) node).getReceiverStageId();
41-
int senderStageId =
42-
isMailboxReceiveNode(node) ? ((MailboxReceiveNode) node).getSenderStageId() : node.getPlanFragmentId();
43-
DispatchablePlanMetadata receiverStagePlanMetadata =
44-
context.getDispatchablePlanMetadataMap().get(receiverStageId);
45-
DispatchablePlanMetadata senderStagePlanMetadata = context.getDispatchablePlanMetadataMap().get(senderStageId);
46-
receiverStagePlanMetadata.getServerInstanceToWorkerIdMap().entrySet().stream().forEach(receiverEntry -> {
47-
QueryServerInstance receiverServerInstance = receiverEntry.getKey();
48-
List<Integer> receiverWorkerIds = receiverEntry.getValue();
49-
for (int receiverWorkerId : receiverWorkerIds) {
50-
receiverStagePlanMetadata.getWorkerIdToMailBoxIdsMap().putIfAbsent(receiverWorkerId, new HashMap<>());
51-
senderStagePlanMetadata.getServerInstanceToWorkerIdMap().entrySet().stream().forEach(senderEntry -> {
52-
QueryServerInstance senderServerInstance = senderEntry.getKey();
53-
List<Integer> senderWorkerIds = senderEntry.getValue();
54-
for (int senderWorkerId : senderWorkerIds) {
55-
MailboxMetadata mailboxMetadata =
56-
isMailboxReceiveNode(node)
57-
? getMailboxMetadata(receiverStagePlanMetadata, senderStageId, receiverWorkerId)
58-
: getMailboxMetadata(senderStagePlanMetadata, receiverStageId, senderWorkerId);
59-
mailboxMetadata.getMailBoxIdList().add(
60-
MailboxIdUtils.toPlanMailboxId(senderStageId, senderWorkerId, receiverStageId, receiverWorkerId));
61-
VirtualServerAddress virtualServerAddress =
62-
isMailboxReceiveNode(node)
63-
? new VirtualServerAddress(senderServerInstance, senderWorkerId)
64-
: new VirtualServerAddress(receiverServerInstance, receiverWorkerId);
65-
mailboxMetadata.getVirtualAddressList().add(virtualServerAddress);
66-
}
67-
});
68-
}
69-
});
70-
}
71-
return null;
72-
}
39+
if (node instanceof MailboxSendNode) {
40+
MailboxSendNode sendNode = (MailboxSendNode) node;
41+
int senderFragmentId = sendNode.getPlanFragmentId();
42+
int receiverFragmentId = sendNode.getReceiverStageId();
43+
Map<Integer, DispatchablePlanMetadata> metadataMap = context.getDispatchablePlanMetadataMap();
44+
DispatchablePlanMetadata senderMetadata = metadataMap.get(senderFragmentId);
45+
DispatchablePlanMetadata receiverMetadata = metadataMap.get(receiverFragmentId);
46+
Map<QueryServerInstance, List<Integer>> senderWorkerIdsMap = senderMetadata.getServerInstanceToWorkerIdMap();
47+
Map<QueryServerInstance, List<Integer>> receiverWorkerIdsMap = receiverMetadata.getServerInstanceToWorkerIdMap();
48+
Map<Integer, Map<Integer, MailboxMetadata>> senderMailboxesMap = senderMetadata.getWorkerIdToMailBoxIdsMap();
49+
Map<Integer, Map<Integer, MailboxMetadata>> receiverMailboxesMap = receiverMetadata.getWorkerIdToMailBoxIdsMap();
7350

74-
private static boolean isMailboxReceiveNode(PlanNode node) {
75-
return node instanceof MailboxReceiveNode;
76-
}
77-
78-
private MailboxMetadata getMailboxMetadata(DispatchablePlanMetadata dispatchablePlanMetadata, int planFragmentId,
79-
int workerId) {
80-
Map<Integer, Map<Integer, MailboxMetadata>> workerIdToMailBoxIdsMap =
81-
dispatchablePlanMetadata.getWorkerIdToMailBoxIdsMap();
82-
if (!workerIdToMailBoxIdsMap.containsKey(workerId)) {
83-
workerIdToMailBoxIdsMap.put(workerId, new HashMap<>());
84-
}
85-
Map<Integer, MailboxMetadata> planFragmentToMailboxMetadataMap = workerIdToMailBoxIdsMap.get(workerId);
86-
if (!planFragmentToMailboxMetadataMap.containsKey(planFragmentId)) {
87-
planFragmentToMailboxMetadataMap.put(planFragmentId, new MailboxMetadata());
51+
if (sendNode.getDistributionType() == RelDistribution.Type.SINGLETON) {
52+
// For SINGLETON exchange type, send the data to the same instance (same worker id)
53+
senderWorkerIdsMap.forEach((serverInstance, workerIds) -> {
54+
for (int workerId : workerIds) {
55+
MailboxMetadata mailboxMetadata = new MailboxMetadata(Collections.singletonList(
56+
MailboxIdUtils.toPlanMailboxId(senderFragmentId, workerId, receiverFragmentId, workerId)),
57+
Collections.singletonList(new VirtualServerAddress(serverInstance, workerId)), Collections.emptyMap());
58+
senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(receiverFragmentId, mailboxMetadata);
59+
receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(senderFragmentId, mailboxMetadata);
60+
}
61+
});
62+
} else {
63+
// For other exchange types, send the data to all the instances in the receiver fragment
64+
// TODO: Add support for more exchange types
65+
senderWorkerIdsMap.forEach((senderServerInstance, senderWorkerIds) -> {
66+
for (int senderWorkerId : senderWorkerIds) {
67+
Map<Integer, MailboxMetadata> senderMailboxMetadataMap =
68+
senderMailboxesMap.computeIfAbsent(senderWorkerId, k -> new HashMap<>());
69+
receiverWorkerIdsMap.forEach((receiverServerInstance, receiverWorkerIds) -> {
70+
for (int receiverWorkerId : receiverWorkerIds) {
71+
Map<Integer, MailboxMetadata> receiverMailboxMetadataMap =
72+
receiverMailboxesMap.computeIfAbsent(receiverWorkerId, k -> new HashMap<>());
73+
String mailboxId = MailboxIdUtils.toPlanMailboxId(senderFragmentId, senderWorkerId, receiverFragmentId,
74+
receiverWorkerId);
75+
MailboxMetadata senderMailboxMetadata =
76+
senderMailboxMetadataMap.computeIfAbsent(receiverFragmentId, k -> new MailboxMetadata());
77+
senderMailboxMetadata.getMailBoxIdList().add(mailboxId);
78+
senderMailboxMetadata.getVirtualAddressList()
79+
.add(new VirtualServerAddress(receiverServerInstance, receiverWorkerId));
80+
MailboxMetadata receiverMailboxMetadata =
81+
receiverMailboxMetadataMap.computeIfAbsent(senderFragmentId, k -> new MailboxMetadata());
82+
receiverMailboxMetadata.getMailBoxIdList().add(mailboxId);
83+
receiverMailboxMetadata.getVirtualAddressList()
84+
.add(new VirtualServerAddress(senderServerInstance, senderWorkerId));
85+
}
86+
});
87+
}
88+
});
89+
}
8890
}
89-
return planFragmentToMailboxMetadataMap.get(planFragmentId);
91+
return null;
9092
}
9193
}

0 commit comments

Comments
 (0)