Skip to content

Commit d574085

Browse files
authored
Support conjugates for scalar functions, add more scalar functions (#8582)
This PR adds support for being able to use complex scalar function expressions for fitlterConfig. In addition to common comparison functions, support for AND, OR and NOT has been added
1 parent b317246 commit d574085

File tree

4 files changed

+346
-10
lines changed

4 files changed

+346
-10
lines changed
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pinot.common.function.scalar;
20+
21+
import org.apache.pinot.spi.annotations.ScalarFunction;
22+
23+
public class ComparisonFunctions {
24+
25+
private static final double DOUBLE_COMPARISON_TOLERANCE = 1e-7d;
26+
27+
private ComparisonFunctions() {
28+
}
29+
30+
@ScalarFunction
31+
public static boolean greaterThan(double a, double b) {
32+
return a > b;
33+
}
34+
35+
@ScalarFunction
36+
public static boolean greaterThanOrEqual(double a, double b) {
37+
return a >= b;
38+
}
39+
40+
@ScalarFunction
41+
public static boolean lessThan(double a, double b) {
42+
return a < b;
43+
}
44+
45+
@ScalarFunction
46+
public static boolean lessThanOrEqual(double a, double b) {
47+
return a <= b;
48+
}
49+
50+
@ScalarFunction
51+
public static boolean notEquals(double a, double b) {
52+
return Math.abs(a - b) >= DOUBLE_COMPARISON_TOLERANCE;
53+
}
54+
55+
@ScalarFunction
56+
public static boolean equals(double a, double b) {
57+
// To avoid approximation errors
58+
return Math.abs(a - b) < DOUBLE_COMPARISON_TOLERANCE;
59+
}
60+
61+
@ScalarFunction
62+
public static boolean between(double val, double a, double b) {
63+
return val > a && val < b;
64+
}
65+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pinot.common.function.scalar;
20+
21+
import javax.annotation.Nullable;
22+
import org.apache.pinot.spi.annotations.ScalarFunction;
23+
24+
public class ObjectFunctions {
25+
private ObjectFunctions() {
26+
}
27+
28+
@ScalarFunction(nullableParameters = true)
29+
public static boolean isNull(@Nullable Object obj) {
30+
return obj == null;
31+
}
32+
33+
@ScalarFunction(nullableParameters = true)
34+
public static boolean isNotNull(@Nullable Object obj) {
35+
return !isNull(obj);
36+
}
37+
}

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/function/InbuiltFunctionEvaluator.java

Lines changed: 99 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pinot.segment.local.function;
2020

21+
import com.google.common.base.Preconditions;
2122
import java.util.ArrayList;
2223
import java.util.List;
2324
import org.apache.commons.lang3.StringUtils;
@@ -68,17 +69,27 @@ private ExecutableNode planExecution(ExpressionContext expression) {
6869
childNodes[i] = planExecution(arguments.get(i));
6970
}
7071
String functionName = function.getFunctionName();
71-
FunctionInfo functionInfo = FunctionRegistry.getFunctionInfo(functionName, numArguments);
72-
if (functionInfo == null) {
73-
if (FunctionRegistry.containsFunction(functionName)) {
74-
throw new IllegalStateException(
75-
String.format("Unsupported function: %s with %d parameters", functionName, numArguments));
76-
} else {
77-
throw new IllegalStateException(
78-
String.format("Unsupported function: %s not found", functionName));
79-
}
72+
switch (functionName) {
73+
case "and":
74+
return new AndExecutionNode(childNodes);
75+
case "or":
76+
return new OrExecutionNode(childNodes);
77+
case "not":
78+
Preconditions.checkState(numArguments == 1, "NOT function expects 1 argument, got: %s", numArguments);
79+
return new NotExecutionNode(childNodes[0]);
80+
default:
81+
FunctionInfo functionInfo = FunctionRegistry.getFunctionInfo(functionName, numArguments);
82+
if (functionInfo == null) {
83+
if (FunctionRegistry.containsFunction(functionName)) {
84+
throw new IllegalStateException(
85+
String.format("Unsupported function: %s with %d parameters", functionName, numArguments));
86+
} else {
87+
throw new IllegalStateException(
88+
String.format("Unsupported function: %s not found", functionName));
89+
}
90+
}
91+
return new FunctionExecutionNode(functionInfo, childNodes);
8092
}
81-
return new FunctionExecutionNode(functionInfo, childNodes);
8293
default:
8394
throw new IllegalStateException();
8495
}
@@ -106,6 +117,84 @@ private interface ExecutableNode {
106117
Object execute(Object[] values);
107118
}
108119

120+
private static class NotExecutionNode implements ExecutableNode {
121+
private final ExecutableNode _argumentNode;
122+
123+
NotExecutionNode(ExecutableNode argumentNode) {
124+
_argumentNode = argumentNode;
125+
}
126+
127+
@Override
128+
public Object execute(GenericRow row) {
129+
return !((Boolean) _argumentNode.execute(row));
130+
}
131+
132+
@Override
133+
public Object execute(Object[] values) {
134+
return !((Boolean) _argumentNode.execute(values));
135+
}
136+
}
137+
138+
private static class OrExecutionNode implements ExecutableNode {
139+
private final ExecutableNode[] _argumentNodes;
140+
141+
OrExecutionNode(ExecutableNode[] argumentNodes) {
142+
_argumentNodes = argumentNodes;
143+
}
144+
145+
@Override
146+
public Object execute(GenericRow row) {
147+
for (ExecutableNode executableNode :_argumentNodes) {
148+
Boolean res = (Boolean) executableNode.execute(row);
149+
if (res) {
150+
return true;
151+
}
152+
}
153+
return false;
154+
}
155+
156+
@Override
157+
public Object execute(Object[] values) {
158+
for (ExecutableNode executableNode :_argumentNodes) {
159+
Boolean res = (Boolean) executableNode.execute(values);
160+
if (res) {
161+
return true;
162+
}
163+
}
164+
return false;
165+
}
166+
}
167+
168+
private static class AndExecutionNode implements ExecutableNode {
169+
private final ExecutableNode[] _argumentNodes;
170+
171+
AndExecutionNode(ExecutableNode[] argumentNodes) {
172+
_argumentNodes = argumentNodes;
173+
}
174+
175+
@Override
176+
public Object execute(GenericRow row) {
177+
for (ExecutableNode executableNode :_argumentNodes) {
178+
Boolean res = (Boolean) executableNode.execute(row);
179+
if (!res) {
180+
return false;
181+
}
182+
}
183+
return true;
184+
}
185+
186+
@Override
187+
public Object execute(Object[] values) {
188+
for (ExecutableNode executableNode :_argumentNodes) {
189+
Boolean res = (Boolean) executableNode.execute(values);
190+
if (!res) {
191+
return false;
192+
}
193+
}
194+
return true;
195+
}
196+
}
197+
109198
private static class FunctionExecutionNode implements ExecutableNode {
110199
final FunctionInvoker _functionInvoker;
111200
final FunctionInfo _functionInfo;

pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ private static GenericRow getRecord() {
8181
record.putValue("svStringWithLengthLimit", "123");
8282
record.putValue("mvString1", new Object[]{"123", 123, 123L, 123f, 123.0});
8383
record.putValue("mvString2", new Object[]{123, 123L, 123f, 123.0, "123"});
84+
record.putValue("svNullString", null);
8485
return record;
8586
}
8687

@@ -178,6 +179,150 @@ record = transformer.transform(record);
178179
}
179180
}
180181

182+
@Test
183+
public void testScalarOps() {
184+
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
185+
186+
// expression true, filtered
187+
GenericRow genericRow = getRecord();
188+
tableConfig.setIngestionConfig(
189+
new IngestionConfig(null, null,
190+
new FilterConfig("svInt = 123"), null, null));
191+
RecordTransformer transformer = new FilterTransformer(tableConfig);
192+
transformer.transform(genericRow);
193+
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
194+
195+
// expression true, filtered
196+
genericRow = getRecord();
197+
tableConfig.setIngestionConfig(
198+
new IngestionConfig(null, null,
199+
new FilterConfig("svDouble > 120"), null, null));
200+
transformer = new FilterTransformer(tableConfig);
201+
transformer.transform(genericRow);
202+
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
203+
204+
// expression true, filtered
205+
genericRow = getRecord();
206+
tableConfig.setIngestionConfig(
207+
new IngestionConfig(null, null,
208+
new FilterConfig("svDouble >= 123"), null, null));
209+
transformer = new FilterTransformer(tableConfig);
210+
transformer.transform(genericRow);
211+
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
212+
213+
// expression true, filtered
214+
genericRow = getRecord();
215+
tableConfig.setIngestionConfig(
216+
new IngestionConfig(null, null,
217+
new FilterConfig("svDouble < 200"), null, null));
218+
transformer = new FilterTransformer(tableConfig);
219+
transformer.transform(genericRow);
220+
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
221+
222+
// expression true, filtered
223+
genericRow = getRecord();
224+
tableConfig.setIngestionConfig(
225+
new IngestionConfig(null, null,
226+
new FilterConfig("svDouble <= 123"), null, null));
227+
transformer = new FilterTransformer(tableConfig);
228+
transformer.transform(genericRow);
229+
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
230+
231+
// expression true, filtered
232+
genericRow = getRecord();
233+
tableConfig.setIngestionConfig(
234+
new IngestionConfig(null, null,
235+
new FilterConfig("svLong != 125"), null, null));
236+
transformer = new FilterTransformer(tableConfig);
237+
transformer.transform(genericRow);
238+
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
239+
240+
// expression true, filtered
241+
genericRow = getRecord();
242+
tableConfig.setIngestionConfig(
243+
new IngestionConfig(null, null,
244+
new FilterConfig("svLong = 123"), null, null));
245+
transformer = new FilterTransformer(tableConfig);
246+
transformer.transform(genericRow);
247+
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
248+
249+
// expression true, filtered
250+
genericRow = getRecord();
251+
tableConfig.setIngestionConfig(
252+
new IngestionConfig(null, null, new FilterConfig("between(svLong, 100, 125)"), null, null));
253+
transformer = new FilterTransformer(tableConfig);
254+
transformer.transform(genericRow);
255+
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
256+
}
257+
258+
private GenericRow getNullColumnsRecord() {
259+
GenericRow record = new GenericRow();
260+
record.putValue("svNullString", null);
261+
record.putValue("svInt", (byte) 123);
262+
263+
record.putValue("mvLong", Collections.singletonList(123f));
264+
record.putValue("mvNullFloat", null);
265+
return record;
266+
}
267+
268+
@Test
269+
public void testObjectOps() {
270+
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
271+
272+
// expression true, filtered
273+
GenericRow genericRow = getNullColumnsRecord();
274+
tableConfig.setIngestionConfig(
275+
new IngestionConfig(null, null, new FilterConfig("svNullString is null"), null, null));
276+
RecordTransformer transformer = new FilterTransformer(tableConfig);
277+
transformer.transform(genericRow);
278+
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
279+
280+
// expression true, filtered
281+
genericRow = getNullColumnsRecord();
282+
tableConfig.setIngestionConfig(new IngestionConfig(null, null, new FilterConfig("svInt is not null"), null, null));
283+
transformer = new FilterTransformer(tableConfig);
284+
transformer.transform(genericRow);
285+
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
286+
287+
// expression true, filtered
288+
genericRow = getNullColumnsRecord();
289+
tableConfig.setIngestionConfig(new IngestionConfig(null, null, new FilterConfig("mvLong is not null"), null, null));
290+
transformer = new FilterTransformer(tableConfig);
291+
transformer.transform(genericRow);
292+
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
293+
294+
// expression true, filtered
295+
genericRow = getNullColumnsRecord();
296+
tableConfig.setIngestionConfig(
297+
new IngestionConfig(null, null, new FilterConfig("mvNullFloat is null"), null, null));
298+
transformer = new FilterTransformer(tableConfig);
299+
transformer.transform(genericRow);
300+
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
301+
}
302+
303+
@Test
304+
public void testLogicalScalarOps() {
305+
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
306+
307+
// expression true, filtered
308+
GenericRow genericRow = getRecord();
309+
tableConfig.setIngestionConfig(
310+
new IngestionConfig(null, null,
311+
new FilterConfig("svInt = 123 AND svDouble <= 200"), null, null));
312+
RecordTransformer transformer = new FilterTransformer(tableConfig);
313+
transformer.transform(genericRow);
314+
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
315+
316+
// expression true, filtered
317+
genericRow = getRecord();
318+
tableConfig.setIngestionConfig(
319+
new IngestionConfig(null, null,
320+
new FilterConfig("svInt = 125 OR svLong <= 200"), null, null));
321+
transformer = new FilterTransformer(tableConfig);
322+
transformer.transform(genericRow);
323+
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
324+
}
325+
181326
@Test
182327
public void testNullValueTransformer() {
183328
RecordTransformer transformer = new NullValueTransformer(TABLE_CONFIG, SCHEMA);

0 commit comments

Comments
 (0)