Skip to content

Commit 3b57116

Browse files
authored
Serialize V2 Plan using Protobufs instead of reflection. (#13221)
* Prototype with TableScanNode serialized using a protobuf message. * Serialize Mailbox Send and Receive. * Serialize SetOpNode. * Serialize Exchange and Sort * Add serialization support for all nodes * Compiles * Add license header * Allocate a hashmap within nodehints. * Fix trailing whitespace * Handle nulls correctly in direction keys and Literal Expressions * Create new context for every plan node. * Improve structure of visitors. * Fix long line. * Only support specific types of literals. * Do not assume data type and underlying Java type matches. * Support object literal. * Add javadocs to a couple of files. * Set distinct flag * Address review comments. * Undo style change. * Remove StageNodeSerDeUtils.java * Do not prematurely serialize Stagenode. * Revert "Do not prematurely serialize Stagenode." This reverts commit 3969f63. * Make visitor a nested class. * Remove use of visit prefix. * Fix name
1 parent 9302f18 commit 3b57116

File tree

27 files changed

+1487
-570
lines changed

27 files changed

+1487
-570
lines changed
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
//
2+
// Licensed to the Apache Software Foundation (ASF) under one
3+
// or more contributor license agreements. See the NOTICE file
4+
// distributed with this work for additional information
5+
// regarding copyright ownership. The ASF licenses this file
6+
// to you under the Apache License, Version 2.0 (the
7+
// "License"); you may not use this file except in compliance
8+
// with the License. You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing,
13+
// software distributed under the License is distributed on an
14+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
// KIND, either express or implied. See the License for the
16+
// specific language governing permissions and limitations
17+
// under the License.
18+
//
19+
20+
syntax = "proto3";
21+
22+
package org.apache.pinot.common.proto;
23+
24+
enum ColumnDataType {
25+
INT = 0;
26+
LONG = 1;
27+
FLOAT = 2;
28+
DOUBLE = 3;
29+
BIG_DECIMAL = 4;
30+
BOOLEAN = 5;
31+
TIMESTAMP = 6;
32+
STRING = 7;
33+
JSON = 8;
34+
BYTES = 9;
35+
OBJECT = 10;
36+
INT_ARRAY = 11;
37+
LONG_ARRAY = 12;
38+
FLOAT_ARRAY = 13;
39+
DOUBLE_ARRAY = 14;
40+
BOOLEAN_ARRAY = 15;
41+
TIMESTAMP_ARRAY = 16;
42+
STRING_ARRAY = 17;
43+
BYTES_ARRAY = 18;
44+
UNKNOWN = 19;
45+
}
46+
47+
message InputRef {
48+
int32 index = 1;
49+
}
50+
51+
message Literal {
52+
ColumnDataType dataType = 1;
53+
bool isValueNull = 2;
54+
oneof literalField {
55+
bool boolField = 101;
56+
int32 intField = 102;
57+
int64 longField = 103;
58+
float floatField = 104;
59+
double doubleField = 105;
60+
string stringField = 106;
61+
bytes bytesField = 107;
62+
bytes serializedField = 108;
63+
}
64+
}
65+
66+
message FunctionCall {
67+
int32 sqlKind = 1;
68+
ColumnDataType dataType = 2;
69+
string functionName = 3;
70+
repeated RexExpression functionOperands = 4;
71+
bool isDistinct = 5;
72+
}
73+
74+
message RexExpression {
75+
oneof expression {
76+
InputRef inputRef = 1;
77+
Literal literal = 2;
78+
FunctionCall functionCall = 3;
79+
}
80+
}

pinot-common/src/main/proto/plan.proto

Lines changed: 201 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -18,64 +18,219 @@
1818
//
1919

2020
syntax = "proto3";
21+
import "expressions.proto";
2122

2223
package org.apache.pinot.common.proto;
2324

2425
message StageNode {
2526
int32 stageId = 1;
26-
string nodeName = 2;
2727
repeated StageNode inputs = 3;
2828
repeated string columnNames = 4;
29-
repeated string columnDataTypes = 5;
30-
ObjectField objectField = 6;
31-
}
32-
33-
// MemberVariableField defines the serialized format of the member variables of a class object.
34-
// MemberVariableField can be one of
35-
// 1. literal
36-
// 2. list
37-
// 3. map
38-
// 4. complex class object
39-
message MemberVariableField {
40-
oneof member_variable_field {
41-
LiteralField literalField = 1;
42-
ListField listField = 2;
43-
MapField mapField = 3;
44-
ObjectField objectField = 4;
29+
repeated ColumnDataType columnDataTypes = 5;
30+
oneof nodeType {
31+
TableScanNode tableScanNode = 102;
32+
MailboxReceiveNode receiveNode = 103;
33+
MailboxSendNode sendNode = 104;
34+
SetOpNode setNode = 105;
35+
ExchangeNode exchangeNode = 106;
36+
SortNode sortNode = 107;
37+
AggregateNode aggregateNode = 108;
38+
JoinNode joinNode = 109;
39+
LiteralValueNode literalValueNode = 110;
40+
ProjectNode projectNode = 111;
41+
ValueNode valueNode = 112;
42+
WindowNode windowNode = 113;
43+
FilterNode filterNode = 114;
4544
}
4645
}
4746

48-
// ObjectField defines the serialized format of a complex class object.
49-
// it contains:
50-
// 1. its fully-qualified clazz name;
51-
// 2. its MemberVariableField map.
52-
message ObjectField {
53-
string objectClassName = 1;
54-
map<string, MemberVariableField> memberVariables = 2;
55-
}
56-
57-
// LiteralField defines the serialized format of a literal field.
58-
message LiteralField {
59-
oneof literal_field {
60-
bool boolField = 1;
61-
int32 intField = 2;
62-
int64 longField = 3;
63-
float floatField = 4;
64-
double doubleField = 5;
65-
string stringField = 6;
66-
bytes bytesField = 7;
67-
bytes bigDecimalField = 8;
68-
}
47+
message StrStrMap {
48+
map<string, string> options = 1;
49+
}
50+
51+
message NodeHint {
52+
map<string, StrStrMap> hintOptions = 1;
53+
}
54+
55+
message TableScanNode {
56+
NodeHint nodeHint = 1;
57+
string tableName = 2;
58+
repeated string tableScanColumns = 3;
59+
}
60+
61+
enum PinotRelExchangeType {
62+
STREAMING = 0;
63+
SUB_PLAN = 1;
64+
PIPELINE_BREAKER = 2;
65+
}
66+
67+
enum RelDistributionType {
68+
SINGLETON = 0;
69+
HASH_DISTRIBUTED = 1;
70+
RANGE_DISTRIBUTED = 2;
71+
RANDOM_DISTRIBUTED = 3;
72+
ROUND_ROBIN_DISTRIBUTED = 4;
73+
BROADCAST_DISTRIBUTED = 5;
74+
ANY = 6;
75+
}
76+
77+
enum Direction {
78+
ASCENDING = 0;
79+
STRICTLY_ASCENDING = 1;
80+
DESCENDING = 2;
81+
STRICTLY_DESCENDING = 3;
82+
CLUSTERED = 4;
83+
}
84+
85+
enum NullDirection {
86+
FIRST = 0;
87+
LAST = 1;
88+
UNSPECIFIED = 2;
89+
}
90+
91+
message DirectionList {
92+
repeated Direction item = 1;
93+
}
94+
95+
message NullDirectionList {
96+
repeated NullDirection item = 1;
97+
}
98+
99+
message RexExpressionList {
100+
repeated RexExpression item = 1;
101+
}
102+
103+
message DistributionKeyList {
104+
repeated int32 item = 1;
105+
}
106+
107+
message MailboxReceiveNode {
108+
int32 senderStageId = 1;
109+
PinotRelExchangeType exchangeType = 2;
110+
RelDistributionType distributionType = 3;
111+
DistributionKeyList distributionKeys = 4;
112+
repeated int32 collationKeys = 5;
113+
DirectionList collationDirections = 6;
114+
NullDirectionList collationNullDirections = 7;
115+
bool sortOnSender = 8;
116+
bool sortOnReceiver = 9;
117+
StageNode sender = 10;
118+
}
119+
120+
message MailboxSendNode {
121+
int32 receiverStageId = 1;
122+
RelDistributionType distributionType = 2;
123+
PinotRelExchangeType exchangeType = 3;
124+
DistributionKeyList distributionKeys = 4;
125+
repeated int32 collationKeys = 5;
126+
DirectionList collationDirections = 6;
127+
bool sortOnSender = 7;
128+
bool prePartitioned = 8;
129+
}
130+
131+
enum SetOpType {
132+
UNION = 0;
133+
INTERSECT = 1;
134+
MINUS = 2;
135+
}
136+
137+
message SetOpNode {
138+
SetOpType setOpType = 1;
139+
bool all = 2;
140+
}
141+
142+
message RelFieldCollation {
143+
int32 fieldIndex = 1;
144+
Direction direction = 2;
145+
NullDirection nullDirection = 3;
146+
}
147+
148+
message ExchangeNode {
149+
PinotRelExchangeType exchangeType = 1;
150+
RelDistributionType distributionType = 2;
151+
repeated int32 keys = 3;
152+
bool isSortOnSender = 4;
153+
bool isSortOnReceiver = 5;
154+
bool isPrePartitioned = 6;
155+
repeated RelFieldCollation collations = 7;
156+
repeated string tableNames = 8;
157+
}
158+
159+
message SortNode {
160+
RexExpressionList collationKeys = 1;
161+
DirectionList collationDirections = 2;
162+
NullDirectionList collationNullDirections = 3;
163+
int32 fetch = 4;
164+
int32 offset = 5;
165+
}
166+
167+
enum AggType {
168+
DIRECT = 0;
169+
LEAF = 1;
170+
INTERMEDIATE = 2;
171+
FINAL = 3;
172+
}
173+
174+
message AggregateNode {
175+
NodeHint nodeHint = 1;
176+
RexExpressionList aggCalls = 2;
177+
repeated int32 filterArgIndices = 3;
178+
RexExpressionList groupSet = 4;
179+
AggType aggType = 5;
69180
}
70181

71-
// ListField defines the serialized format of a list field.
72-
// The content of the list is a MemberVariableField.
73-
message ListField {
74-
repeated MemberVariableField content = 1;
182+
message FilterNode {
183+
RexExpression condition = 1;
75184
}
76185

77-
// ListField defines the serialized format of a map field.
78-
// The key of the map is a string and the value of the map is a MemberVariableField.
79-
message MapField {
80-
map<string, MemberVariableField> content = 1;
186+
enum JoinRelType {
187+
INNER = 0;
188+
LEFT = 1;
189+
RIGHT = 2;
190+
FULL = 3;
191+
SEMI = 4;
192+
ANTI = 5;
81193
}
194+
195+
message JoinKeys {
196+
repeated int32 leftKeys = 1;
197+
repeated int32 rightKeys = 2;
198+
}
199+
200+
message JoinNode {
201+
JoinRelType joinRelType = 1;
202+
JoinKeys joinKeys = 2;
203+
RexExpressionList joinClause = 3;
204+
NodeHint joinHints = 4;
205+
repeated string leftColumnNames = 5;
206+
repeated string rightColumnNames = 6;
207+
}
208+
209+
message LiteralValueNode {
210+
bytes dataTable = 1;
211+
}
212+
213+
message ProjectNode {
214+
RexExpressionList projects = 1;
215+
}
216+
217+
message ValueNode {
218+
repeated RexExpressionList rows = 1;
219+
}
220+
221+
enum WindowFrameType {
222+
ROWS = 0;
223+
RANGE = 1;
224+
}
225+
226+
message WindowNode {
227+
RexExpressionList groupSet = 1;
228+
RexExpressionList orderSet = 2;
229+
repeated Direction orderSetDirection = 3;
230+
repeated NullDirection orderSetNullDirection = 4;
231+
RexExpressionList aggCalls = 5;
232+
int32 lowerBound = 6;
233+
int32 upperBound = 7;
234+
RexExpressionList constants = 8;
235+
WindowFrameType windowFrameType = 9;
236+
}

pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -130,9 +130,9 @@ private static PlanNode convertLogicalExchange(Exchange node, int currentStageId
130130
}
131131
}
132132
RelDistribution inputDistributionTrait = node.getInputs().get(0).getTraitSet().getDistribution();
133-
boolean isPrePartitioned = inputDistributionTrait != null
134-
&& inputDistributionTrait.getType() == RelDistribution.Type.HASH_DISTRIBUTED
135-
&& inputDistributionTrait == node.getDistribution();
133+
boolean isPrePartitioned =
134+
inputDistributionTrait != null && inputDistributionTrait.getType() == RelDistribution.Type.HASH_DISTRIBUTED
135+
&& inputDistributionTrait == node.getDistribution();
136136
List<RelFieldCollation> fieldCollations = (collation == null) ? null : collation.getFieldCollations();
137137

138138
// Compute all the tables involved under this exchange node
@@ -169,7 +169,8 @@ private static PlanNode convertLogicalAggregate(LogicalAggregate node, int curre
169169
}
170170

171171
private static PlanNode convertLogicalProject(LogicalProject node, int currentStageId) {
172-
return new ProjectNode(currentStageId, toDataSchema(node.getRowType()), node.getProjects());
172+
return new ProjectNode(currentStageId, toDataSchema(node.getRowType()),
173+
node.getProjects().stream().map(RexExpressionUtils::fromRexNode).collect(Collectors.toList()));
173174
}
174175

175176
private static PlanNode convertLogicalFilter(LogicalFilter node, int currentStageId) {

0 commit comments

Comments
 (0)