Skip to content

Commit e23c7d0

Browse files
committed
Address comments and fix tests
1 parent c530539 commit e23c7d0

File tree

10 files changed

+60
-58
lines changed

10 files changed

+60
-58
lines changed

pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.HashSet;
2626
import java.util.List;
2727
import java.util.Map;
28+
import javax.annotation.Nullable;
2829
import org.apache.pinot.core.routing.TimeBoundaryInfo;
2930
import org.apache.pinot.query.routing.MailboxMetadata;
3031
import org.apache.pinot.query.routing.QueryServerInstance;
@@ -41,7 +42,9 @@
4142
* </ul>
4243
*/
4344
public class DispatchablePlanMetadata implements Serializable {
45+
// These 2 fields are extracted from TableScanNode
4446
private final List<String> _scannedTables;
47+
private Map<String, String> _tableOptions;
4548

4649
// used for assigning server/worker nodes.
4750
private Map<QueryServerInstance, List<Integer>> _serverInstanceToWorkerIdMap;
@@ -86,6 +89,15 @@ public void addScannedTable(String tableName) {
8689
_scannedTables.add(tableName);
8790
}
8891

92+
@Nullable
93+
public Map<String, String> getTableOptions() {
94+
return _tableOptions;
95+
}
96+
97+
public void setTableOptions(Map<String, String> tableOptions) {
98+
_tableOptions = tableOptions;
99+
}
100+
89101
// -----------------------------------------------
90102
// attached physical plan context.
91103
// -----------------------------------------------

pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pinot.query.planner.physical;
2020

21+
import org.apache.calcite.rel.hint.PinotHintOptions;
2122
import org.apache.pinot.query.planner.plannode.AggregateNode;
2223
import org.apache.pinot.query.planner.plannode.ExchangeNode;
2324
import org.apache.pinot.query.planner.plannode.FilterNode;
@@ -127,6 +128,7 @@ public Void visitSort(SortNode node, DispatchablePlanContext context) {
127128
public Void visitTableScan(TableScanNode node, DispatchablePlanContext context) {
128129
DispatchablePlanMetadata dispatchablePlanMetadata = getOrCreateDispatchablePlanMetadata(node, context);
129130
dispatchablePlanMetadata.addScannedTable(node.getTableName());
131+
dispatchablePlanMetadata.setTableOptions(node.getNodeHint()._hintOptions.get(PinotHintOptions.TABLE_HINT_OPTIONS));
130132
return null;
131133
}
132134

pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public Void process(PlanNode node, DispatchablePlanContext context) {
6262
} else if (senderMetadata.isPartitionedTableScan()) {
6363
// For partitioned table scan, send the data to the worker with the same worker id (not necessary the same
6464
// instance)
65+
// TODO: Support further split the single partition into multiple workers
6566
senderWorkerIdsMap.forEach((senderServerInstance, senderWorkerIds) -> {
6667
for (int workerId : senderWorkerIds) {
6768
receiverWorkerIdsMap.forEach((receiverServerInstance, receiverWorkerIds) -> {

pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java

Lines changed: 13 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import org.apache.pinot.query.planner.PlanFragment;
4141
import org.apache.pinot.query.planner.physical.DispatchablePlanContext;
4242
import org.apache.pinot.query.planner.physical.DispatchablePlanMetadata;
43-
import org.apache.pinot.query.planner.plannode.PlanNode;
4443
import org.apache.pinot.query.planner.plannode.TableScanNode;
4544
import org.apache.pinot.spi.config.table.TableType;
4645
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
@@ -102,53 +101,29 @@ private void assignWorkersToLeafFragment(PlanFragment fragment, DispatchablePlan
102101
assignWorkersToNonRootFragment(child, context);
103102
}
104103

105-
TableScanNode tableScanNode = findTableScanNode(fragment.getFragmentRoot());
106-
Preconditions.checkState(tableScanNode != null, "Failed to find table scan node under leaf fragment");
107-
String tableName = tableScanNode.getTableName();
108-
109-
// Extract partitionKey and numPartitions from hint if provided
110-
Map<String, String> tableHintOptions =
111-
tableScanNode.getNodeHint()._hintOptions.get(PinotHintOptions.TABLE_HINT_OPTIONS);
104+
DispatchablePlanMetadata metadata = context.getDispatchablePlanMetadataMap().get(fragment.getFragmentId());
105+
Map<String, String> tableOptions = metadata.getTableOptions();
112106
String partitionKey = null;
113107
int numPartitions = 0;
114-
if (tableHintOptions != null) {
115-
partitionKey = tableHintOptions.get(PinotHintOptions.TableHintOptions.PARTITION_KEY);
116-
String partitionSize = tableHintOptions.get(PinotHintOptions.TableHintOptions.PARTITION_SIZE);
108+
if (tableOptions != null) {
109+
partitionKey = tableOptions.get(PinotHintOptions.TableHintOptions.PARTITION_KEY);
110+
String partitionSize = tableOptions.get(PinotHintOptions.TableHintOptions.PARTITION_SIZE);
117111
if (partitionSize != null) {
118112
numPartitions = Integer.parseInt(partitionSize);
119113
}
120114
}
121-
122115
if (partitionKey == null) {
123-
assignWorkersToNonPartitionedLeafFragment(fragment, context, tableName);
116+
assignWorkersToNonPartitionedLeafFragment(metadata, context);
124117
} else {
125118
Preconditions.checkState(numPartitions > 0, "'%s' must be provided for partition key: %s",
126119
PinotHintOptions.TableHintOptions.PARTITION_SIZE, partitionKey);
127-
assignWorkersToPartitionedLeafFragment(fragment, context, tableName, partitionKey, numPartitions);
128-
}
129-
}
130-
131-
@Nullable
132-
private TableScanNode findTableScanNode(PlanNode planNode) {
133-
if (planNode instanceof TableScanNode) {
134-
return (TableScanNode) planNode;
120+
assignWorkersToPartitionedLeafFragment(metadata, context, partitionKey, numPartitions);
135121
}
136-
List<PlanNode> children = planNode.getInputs();
137-
if (children.isEmpty()) {
138-
return null;
139-
}
140-
for (PlanNode child : children) {
141-
TableScanNode tableScanNode = findTableScanNode(child);
142-
if (tableScanNode != null) {
143-
return tableScanNode;
144-
}
145-
}
146-
return null;
147122
}
148123

149-
private void assignWorkersToNonPartitionedLeafFragment(PlanFragment fragment, DispatchablePlanContext context,
150-
String tableName) {
151-
DispatchablePlanMetadata metadata = context.getDispatchablePlanMetadataMap().get(fragment.getFragmentId());
124+
private void assignWorkersToNonPartitionedLeafFragment(DispatchablePlanMetadata metadata,
125+
DispatchablePlanContext context) {
126+
String tableName = metadata.getScannedTables().get(0);
152127
Map<String, RoutingTable> routingTableMap = getRoutingTable(tableName, context.getRequestId());
153128
Preconditions.checkState(!routingTableMap.isEmpty(), "Unable to find routing entries for table: %s", tableName);
154129

@@ -234,9 +209,9 @@ private RoutingTable getRoutingTable(String tableName, TableType tableType, long
234209
CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " + tableNameWithType), requestId);
235210
}
236211

237-
private void assignWorkersToPartitionedLeafFragment(PlanFragment fragment, DispatchablePlanContext context,
238-
String tableName, String partitionKey, int numPartitions) {
239-
DispatchablePlanMetadata metadata = context.getDispatchablePlanMetadataMap().get(fragment.getFragmentId());
212+
private void assignWorkersToPartitionedLeafFragment(DispatchablePlanMetadata metadata,
213+
DispatchablePlanContext context, String partitionKey, int numPartitions) {
214+
String tableName = metadata.getScannedTables().get(0);
240215
ColocatedTableInfo colocatedTableInfo = getColocatedTableInfo(tableName, partitionKey, numPartitions);
241216

242217
// Pick one server per partition

pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -192,15 +192,16 @@ public static QueryEnvironment getQueryEnvironment(int reducerPort, int port1, i
192192
String tableNameWithType = entry.getKey();
193193
String partitionColumn = entry.getValue().getLeft();
194194
List<List<String>> partitionIdToSegmentsMap = entry.getValue().getRight();
195+
int numPartitions = partitionIdToSegmentsMap.size();
195196
String hostname1 = MockRoutingManagerFactory.toHostname(port1);
196197
String hostname2 = MockRoutingManagerFactory.toHostname(port2);
197-
PartitionInfo[] partitionIdToInfoMap = new PartitionInfo[4];
198-
for (int i = 0; i < 4; i++) {
199-
String hostname = i < 2 ? hostname1 : hostname2;
198+
PartitionInfo[] partitionIdToInfoMap = new PartitionInfo[numPartitions];
199+
for (int i = 0; i < numPartitions; i++) {
200+
String hostname = i < (numPartitions / 2) ? hostname1 : hostname2;
200201
partitionIdToInfoMap[i] = new PartitionInfo(Collections.singleton(hostname), partitionIdToSegmentsMap.get(i));
201202
}
202203
TablePartitionInfo tablePartitionInfo =
203-
new TablePartitionInfo(tableNameWithType, partitionColumn, "hashCode", 4, partitionIdToInfoMap,
204+
new TablePartitionInfo(tableNameWithType, partitionColumn, "hashCode", numPartitions, partitionIdToInfoMap,
204205
Collections.emptySet());
205206
partitionInfoMap.put(tableNameWithType, tablePartitionInfo);
206207
}

pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase {
7373
private static final String QUERY_TEST_RESOURCE_FOLDER = "queries";
7474
private static final Random RANDOM = new Random(42);
7575
private static final String FILE_FILTER_PROPERTY = "pinot.fileFilter";
76+
private static final int NUM_PARTITIONS = 4;
7677

7778
private final Map<String, Set<String>> _tableToSegmentMap = new HashMap<>();
7879

@@ -116,13 +117,13 @@ public void setUp()
116117
if (partitionColumns != null && partitionColumns.size() == 1) {
117118
partitionColumn = partitionColumns.get(0);
118119
partitionIdToSegmentsMap = new ArrayList<>();
119-
for (int i = 0; i < 4; i++) {
120+
for (int i = 0; i < NUM_PARTITIONS; i++) {
120121
partitionIdToSegmentsMap.add(new ArrayList<>());
121122
}
122123
}
123124

124125
List<List<GenericRow>> partitionIdToRowsMap = new ArrayList<>();
125-
for (int i = 0; i < 4; i++) {
126+
for (int i = 0; i < NUM_PARTITIONS; i++) {
126127
partitionIdToRowsMap.add(new ArrayList<>());
127128
}
128129

@@ -133,13 +134,13 @@ public void setUp()
133134
} else {
134135
int partitionId;
135136
if (partitionColumns == null) {
136-
partitionId = RANDOM.nextInt(4);
137+
partitionId = RANDOM.nextInt(NUM_PARTITIONS);
137138
} else {
138139
int hashCode = 0;
139140
for (String field : partitionColumns) {
140141
hashCode += row.getValue(field).hashCode();
141142
}
142-
partitionId = (hashCode & Integer.MAX_VALUE) % 4;
143+
partitionId = (hashCode & Integer.MAX_VALUE) % NUM_PARTITIONS;
143144
}
144145
partitionIdToRowsMap.get(partitionId).add(row);
145146
}
@@ -218,8 +219,8 @@ public void setUp()
218219
private void addSegments(MockInstanceDataManagerFactory factory1, MockInstanceDataManagerFactory factory2,
219220
String offlineTableName, boolean allowEmptySegment, List<List<GenericRow>> partitionIdToRowsMap,
220221
@Nullable List<List<String>> partitionIdToSegmentsMap) {
221-
for (int i = 0; i < 4; i++) {
222-
MockInstanceDataManagerFactory factory = i < 2 ? factory1 : factory2;
222+
for (int i = 0; i < NUM_PARTITIONS; i++) {
223+
MockInstanceDataManagerFactory factory = i < (NUM_PARTITIONS / 2) ? factory1 : factory2;
223224
List<GenericRow> rows = partitionIdToRowsMap.get(i);
224225
if (allowEmptySegment || !rows.isEmpty()) {
225226
String segmentName = factory.addSegment(offlineTableName, rows);

pinot-query-runtime/src/test/resources/queries/Aggregates.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -654,13 +654,13 @@
654654
"description": "nested aggregation",
655655
"sql": "SELECT min(max(int_col)) FROM {tbl}",
656656
"comments": ".*Aggregate expressions cannot be nested.",
657-
"expectedException": ".*Error composing query plan for.*"
657+
"expectedException": "Error composing query plan for.*"
658658
},
659659
{
660660
"psql": "4.2.7",
661661
"description": "nested aggregation",
662662
"sql": "SELECT (SELECT max(min(int_col)) FROM {tbl}) from {tbl};",
663-
"expectedException": ".*Error composing query plan for.*"
663+
"expectedException": "Error composing query plan for.*"
664664
},
665665
{
666666
"psql": "4.2.7",

pinot-query-runtime/src/test/resources/queries/CountDistinct.json

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -117,14 +117,14 @@
117117
"outputs": [["b", 6], ["a", 6]]
118118
},
119119
{
120-
"comments": "table aren't actually partitioned by val thus all segments can produce duplicate results, thus [[6]]",
120+
"comments": "table aren't actually partitioned by val thus all segments can produce duplicate results, thus [[8]]",
121121
"sql": "SELECT SEGMENT_PARTITIONED_DISTINCT_COUNT(val) FROM {tbl1}",
122-
"outputs": [[6]]
122+
"outputs": [[8]]
123123
},
124124
{
125-
"comments": "table aren't actually partitioned by val thus all segments can produce duplicate results, thus [[b, 4], [a, 4]]",
125+
"comments": "table aren't actually partitioned by val thus all segments can produce duplicate results, thus [[b, 5], [a, 4]]",
126126
"sql": "SELECT groupingCol, SEGMENT_PARTITIONED_DISTINCT_COUNT(val) FROM {tbl1} GROUP BY groupingCol",
127-
"outputs": [["b", 4], ["a", 4]]
127+
"outputs": [["b", 5], ["a", 4]]
128128
},
129129
{
130130
"sql": "SELECT l.groupingCol, SEGMENT_PARTITIONED_DISTINCT_COUNT(l.val), SEGMENT_PARTITIONED_DISTINCT_COUNT(r.val) FROM {tbl1} l JOIN {tbl2} r ON l.groupingCol = r.groupingCol GROUP BY l.groupingCol",
@@ -135,9 +135,9 @@
135135
"outputs": [["b", 6], ["a", 6]]
136136
},
137137
{
138-
"comments": "table aren't actually partitioned by val thus all segments can produce duplicate results, thus [[b, 4], [a, 4]]",
138+
"comments": "table aren't actually partitioned by val thus all segments can produce duplicate results, thus [[b, 5], [a, 4]]",
139139
"sql": "SELECT /*+ aggOptions(is_skip_leaf_stage_aggregate='true') */ groupingCol, SEGMENT_PARTITIONED_DISTINCT_COUNT(val) FROM {tbl1} GROUP BY groupingCol",
140-
"outputs": [["b", 4], ["a", 4]]
140+
"outputs": [["b", 5], ["a", 4]]
141141
},
142142
{
143143
"sql": "SELECT /*+ aggOptions(is_skip_leaf_stage_aggregate='true') */ l.groupingCol, SEGMENT_PARTITIONED_DISTINCT_COUNT(l.val), SEGMENT_PARTITIONED_DISTINCT_COUNT(r.val) FROM {tbl1} l JOIN {tbl2} r ON l.groupingCol = r.groupingCol GROUP BY l.groupingCol",

pinot-query-runtime/src/test/resources/queries/QueryHints.json

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,16 @@
5151
}
5252
},
5353
"queries": [
54+
{
55+
"description": "Wrong partition key",
56+
"sql": "SELECT {tbl1}.num, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_key='name', partition_size='4') */ GROUP BY {tbl1}.num",
57+
"expectedException": "Error composing query plan for.*"
58+
},
59+
{
60+
"description": "Wrong partition size",
61+
"sql": "SELECT {tbl1}.num, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='2') */ GROUP BY {tbl1}.num",
62+
"expectedException": "Error composing query plan for.*"
63+
},
5464
{
5565
"description": "Group by partition column",
5666
"sql": "SELECT {tbl1}.num, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ GROUP BY {tbl1}.num"

pinot-query-runtime/src/test/resources/queries/SelectHaving.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,12 @@
4444
{
4545
"comment": "Plan failed. Expression 'a' is not being grouped.",
4646
"sql":"SELECT a FROM {test_having} HAVING min(a) < max(a);",
47-
"expectedException": ".*Error composing query plan.*"
47+
"expectedException": "Error composing query plan for.*"
4848
},
4949
{
5050
"comment": "Plan failed. Expression 'a' is not being grouped.",
5151
"sql":"SELECT 1 AS one FROM {test_having} HAVING a > 1;",
52-
"expectedException": ".*Error composing query plan.*"
52+
"expectedException": "Error composing query plan for.*"
5353
},
5454
{
5555
"sql":"SELECT 1 AS one FROM {test_having} HAVING 1 > 2;"

0 commit comments

Comments
 (0)