Skip to content

Commit 61fc919

Browse files
authored
[multistage] Support IN and NOT-IN Clauses (#9374)
* [multistage] Support IN Clause With 1 Argument * Working in/not-in * Refactor RexExpressionUtils after rebasing with master
1 parent a949f95 commit 61fc919

File tree

6 files changed

+117
-20
lines changed

6 files changed

+117
-20
lines changed

pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
*/
1919
package org.apache.pinot.query.planner.logical;
2020

21-
import com.google.common.base.Preconditions;
2221
import java.math.BigDecimal;
2322
import java.util.List;
2423
import java.util.stream.Collectors;
@@ -29,7 +28,6 @@
2928
import org.apache.calcite.rex.RexLiteral;
3029
import org.apache.calcite.rex.RexNode;
3130
import org.apache.calcite.sql.SqlKind;
32-
import org.apache.calcite.sql.type.SqlTypeName;
3331
import org.apache.calcite.util.NlsString;
3432
import org.apache.pinot.common.utils.PinotDataType;
3533
import org.apache.pinot.query.planner.serde.ProtoProperties;
@@ -52,30 +50,24 @@ static RexExpression toRexExpression(RexNode rexNode) {
5250
} else if (rexNode instanceof RexLiteral) {
5351
RexLiteral rexLiteral = ((RexLiteral) rexNode);
5452
FieldSpec.DataType dataType = toDataType(rexLiteral.getType());
55-
return new RexExpression.Literal(dataType, rexLiteral.getTypeName(),
56-
toRexValue(dataType, rexLiteral.getValue()));
53+
return new RexExpression.Literal(dataType, toRexValue(dataType, rexLiteral.getValue()));
5754
} else if (rexNode instanceof RexCall) {
5855
RexCall rexCall = (RexCall) rexNode;
59-
List<RexExpression> operands = rexCall.getOperands().stream().map(RexExpression::toRexExpression)
60-
.collect(Collectors.toList());
61-
return toRexExpression(rexCall, operands);
56+
return toRexExpression(rexCall);
6257
} else {
6358
throw new IllegalArgumentException("Unsupported RexNode type with SqlKind: " + rexNode.getKind());
6459
}
6560
}
6661

67-
static RexExpression toRexExpression(RexCall rexCall, List<RexExpression> operands) {
62+
static RexExpression toRexExpression(RexCall rexCall) {
6863
switch (rexCall.getKind()) {
6964
case CAST:
70-
// CAST is being rewritten into "rexCall.CAST<targetType>(inputValue)",
71-
// - e.g. result type has already been converted into the CAST RexCall, so we assert single operand.
72-
Preconditions.checkState(operands.size() == 1, "CAST takes exactly 2 arguments");
73-
RelDataType castType = rexCall.getType();
74-
// add the 2nd argument as the source type info.
75-
operands.add(new Literal(FieldSpec.DataType.STRING, rexCall.getOperands().get(0).getType().getSqlTypeName(),
76-
toPinotDataType(rexCall.getOperands().get(0).getType()).name()));
77-
return new RexExpression.FunctionCall(rexCall.getKind(), toDataType(rexCall.getType()), "CAST", operands);
65+
return RexExpressionUtils.handleCast(rexCall);
66+
case SEARCH:
67+
return RexExpressionUtils.handleSearch(rexCall);
7868
default:
69+
List<RexExpression> operands =
70+
rexCall.getOperands().stream().map(RexExpression::toRexExpression).collect(Collectors.toList());
7971
return new RexExpression.FunctionCall(rexCall.getKind(), toDataType(rexCall.getType()),
8072
rexCall.getOperator().getName(), operands);
8173
}
@@ -186,7 +178,7 @@ class Literal implements RexExpression {
186178
public Literal() {
187179
}
188180

189-
public Literal(FieldSpec.DataType dataType, SqlTypeName sqlTypeName, @Nullable Object value) {
181+
public Literal(FieldSpec.DataType dataType, @Nullable Object value) {
190182
_sqlKind = SqlKind.LITERAL;
191183
_dataType = dataType;
192184
_value = value;
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pinot.query.planner.logical;
20+
21+
import com.google.common.base.Preconditions;
22+
import com.google.common.collect.Range;
23+
import java.util.ArrayList;
24+
import java.util.List;
25+
import java.util.Set;
26+
import java.util.stream.Collectors;
27+
import org.apache.calcite.rel.type.RelDataType;
28+
import org.apache.calcite.rex.RexCall;
29+
import org.apache.calcite.rex.RexInputRef;
30+
import org.apache.calcite.rex.RexLiteral;
31+
import org.apache.calcite.rex.RexNode;
32+
import org.apache.calcite.sql.SqlKind;
33+
import org.apache.calcite.util.Sarg;
34+
import org.apache.commons.lang3.NotImplementedException;
35+
import org.apache.pinot.spi.data.FieldSpec;
36+
37+
38+
public class RexExpressionUtils {
39+
40+
private RexExpressionUtils() {
41+
}
42+
43+
static RexExpression handleCast(RexCall rexCall) {
44+
// CAST is being rewritten into "rexCall.CAST<targetType>(inputValue)",
45+
// - e.g. result type has already been converted into the CAST RexCall, so we assert single operand.
46+
List<RexExpression> operands =
47+
rexCall.getOperands().stream().map(RexExpression::toRexExpression).collect(Collectors.toList());
48+
Preconditions.checkState(operands.size() == 1, "CAST takes exactly 2 arguments");
49+
RelDataType castType = rexCall.getType();
50+
// add the 2nd argument as the source type info.
51+
operands.add(new RexExpression.Literal(FieldSpec.DataType.STRING,
52+
RexExpression.toPinotDataType(rexCall.getOperands().get(0).getType()).name()));
53+
return new RexExpression.FunctionCall(rexCall.getKind(), RexExpression.toDataType(rexCall.getType()), "CAST",
54+
operands);
55+
}
56+
57+
// TODO: Add support for range filter expressions (e.g. a > 0 and a < 30)
58+
static RexExpression handleSearch(RexCall rexCall) {
59+
List<RexNode> operands = rexCall.getOperands();
60+
RexInputRef rexInputRef = (RexInputRef) operands.get(0);
61+
RexLiteral rexLiteral = (RexLiteral) operands.get(1);
62+
FieldSpec.DataType dataType = RexExpression.toDataType(rexLiteral.getType());
63+
Sarg sarg = rexLiteral.getValueAs(Sarg.class);
64+
if (sarg.isPoints()) {
65+
return new RexExpression.FunctionCall(SqlKind.IN, dataType, SqlKind.IN.name(), toFunctionOperands(rexInputRef,
66+
sarg.rangeSet.asRanges(), dataType));
67+
} else if (sarg.isComplementedPoints()) {
68+
return new RexExpression.FunctionCall(SqlKind.NOT_IN, dataType, SqlKind.NOT_IN.name(),
69+
toFunctionOperands(rexInputRef, sarg.rangeSet.complement().asRanges(), dataType));
70+
} else {
71+
throw new NotImplementedException("Range is not implemented yet");
72+
}
73+
}
74+
75+
private static List<RexExpression> toFunctionOperands(RexInputRef rexInputRef, Set<Range> ranges,
76+
FieldSpec.DataType dataType) {
77+
List<RexExpression> result = new ArrayList<>(ranges.size() + 1);
78+
result.add(RexExpression.toRexExpression(rexInputRef));
79+
for (Range range : ranges) {
80+
result.add(new RexExpression.Literal(dataType, RexExpression.toRexValue(dataType, range.lowerEndpoint())));
81+
}
82+
return result;
83+
}
84+
}

pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,10 @@ private Object[][] provideQueriesWithException() {
255255
new Object[]{"SELECT b.col1 - a.col3 FROM a JOIN c ON a.col1 = c.col3", "Table 'b' not found"},
256256
// non-agg column not being grouped
257257
new Object[]{"SELECT a.col1, SUM(a.col3) FROM a", "'a.col1' is not being grouped"},
258+
// empty IN clause fails compilation
259+
new Object[]{"SELECT a.col1 FROM a WHERE a.col1 IN ()", "Encountered \"\" at line"},
260+
// range filter queries are not supported right now
261+
new Object[]{"SELECT a.col1 FROM a WHERE a.col1 > 'x' AND a.col1 < 'y'", "Range is not implemented yet"}
258262
};
259263
}
260264

pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ protected Object[][] provideQueries() {
6767
new Object[]{"SELECT dateTrunc('DAY', a.ts + b.ts) FROM a JOIN b on a.col1 = b.col1 AND a.col2 = b.col2"},
6868
new Object[]{"SELECT a.col2, a.col3 FROM a JOIN b ON a.col1 = b.col1 "
6969
+ " WHERE a.col3 >= 0 GROUP BY a.col2, a.col3"},
70+
new Object[]{"SELECT a.col1, b.col2 FROM a JOIN b ON a.col1 = b.col1 WHERE a.col2 IN ('foo', 'bar') AND"
71+
+ " b.col2 NOT IN ('alice', 'charlie')"},
7072
};
7173
}
7274
}

pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import java.util.List;
2626
import java.util.Map;
2727
import javax.annotation.Nullable;
28-
import org.apache.calcite.sql.type.SqlTypeName;
2928
import org.apache.pinot.common.request.context.ExpressionContext;
3029
import org.apache.pinot.common.utils.DataSchema;
3130
import org.apache.pinot.core.common.Operator;
@@ -98,8 +97,7 @@ public AggregateOperator(BaseOperator<TransferableBlock> inputOperator, DataSche
9897
private RexExpression toAggregationFunctionOperand(RexExpression rexExpression) {
9998
List<RexExpression> functionOperands = ((RexExpression.FunctionCall) rexExpression).getFunctionOperands();
10099
Preconditions.checkState(functionOperands.size() < 2);
101-
return functionOperands.size() > 0 ? functionOperands.get(0)
102-
: new RexExpression.Literal(FieldSpec.DataType.INT, SqlTypeName.INTEGER, 1);
100+
return functionOperands.size() > 0 ? functionOperands.get(0) : new RexExpression.Literal(FieldSpec.DataType.INT, 1);
103101
}
104102

105103
@Override

pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,23 @@ private Object[][] provideTestSqlAndRowCount() {
109109
new Object[]{"SELECT a.col1, a.ts, b.col2, b.col3 FROM a JOIN b ON a.col1 = b.col2 "
110110
+ " WHERE a.col3 >= 0 AND a.col2 = 'alice' AND b.col3 >= 0", 3},
111111

112+
// Join query with IN and Not-IN clause. Table A's side of join will return 9 rows and Table B's side will
113+
// return 2 rows. Join will be only on col1=bar and since A will return 3 rows with that value and B will return
114+
// 1 row, the final output will have 3 rows.
115+
new Object[]{"SELECT a.col1, b.col2 FROM a JOIN b ON a.col1 = b.col1 "
116+
+ " WHERE a.col1 IN ('foo', 'bar', 'alice') AND b.col2 NOT IN ('foo', 'alice')", 3},
117+
118+
// Same query as above but written using OR/AND instead of IN.
119+
new Object[]{"SELECT a.col1, b.col2 FROM a JOIN b ON a.col1 = b.col1 "
120+
+ " WHERE (a.col1 = 'foo' OR a.col1 = 'bar' OR a.col1 = 'alice') AND b.col2 != 'foo'"
121+
+ " AND b.col2 != 'alice'", 3},
122+
123+
// Same as above but with single argument IN clauses. Left side of the join returns 3 rows, and the right side
124+
// returns 5 rows. Only key where join succeeds is col1=foo, and since table B has only 1 row with that value,
125+
// the number of rows should be 3.
126+
new Object[]{"SELECT a.col1, b.col2 FROM a JOIN b ON a.col1 = b.col1 "
127+
+ " WHERE a.col1 IN ('foo') AND b.col2 NOT IN ('')", 3},
128+
112129
// Projection pushdown
113130
new Object[]{"SELECT a.col1, a.col3 + a.col3 FROM a WHERE a.col3 >= 0 AND a.col2 = 'alice'", 3},
114131

0 commit comments

Comments
 (0)