Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ private DataTypeConversionFunctions() {

@ScalarFunction
public static Object cast(Object value, String targetTypeLiteral) {
if (value == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't need to handle null here if the scalar function doesn't have nullableParameters

return null;
}
try {
Class<?> clazz = value.getClass();
// TODO: Support cast for MV
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ public static boolean isNotNull(@Nullable Object obj) {
return !isNull(obj);
}

@ScalarFunction(nullableParameters = true, names = {"isNotTrue", "is_not_true"})
public static boolean isNotTrue(@Nullable Boolean obj) {
return !isTrue(obj);
}

@ScalarFunction(nullableParameters = true, names = {"isTrue", "is_true"})
public static boolean isTrue(@Nullable Boolean obj) {
return obj != null && obj;
}

@ScalarFunction(nullableParameters = true, names = {"isDistinctFrom", "is_distinct_from"})
public static boolean isDistinctFrom(@Nullable Object obj1, @Nullable Object obj2) {
if (obj1 == null && obj2 == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.calcite.schema.Table;
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.common.function.FunctionRegistry;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;

import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -70,7 +71,7 @@ public Table getTable(String name) {
+ "If you are running this via the a test environment, check to make sure you're "
+ "specifying the correct tables.");
}
return new PinotTable(schema);
return new PinotTable(schema, isNullEnabled(_tableCache, tableName));
}

/**
Expand Down Expand Up @@ -127,4 +128,20 @@ public boolean isMutable() {
public Schema snapshot(SchemaVersion version) {
return this;
}

private static boolean isNullEnabled(TableCache tableCache, String rawTableName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function gets a table property. Other similar functions are member functions. Do we want change this function to a member function for consistency?

TableConfig tableConfig = tableCache.getTableConfig(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to get TableConfig for at least one type in a bunch of places in the code. Can we add a util method somewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea but let's follow up?

TableNameBuilder.forType(org.apache.pinot.spi.config.table.TableType.REALTIME)
.tableNameWithType(rawTableName));
if (tableConfig == null) {
tableConfig = tableCache.getTableConfig(
TableNameBuilder.forType(org.apache.pinot.spi.config.table.TableType.OFFLINE)
.tableNameWithType(rawTableName));
}
if (tableConfig != null && tableConfig.getIndexingConfig() != null) {
return tableConfig.getIndexingConfig().isNullHandlingEnabled();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't this address your concern @ankitsultana

} else {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,19 @@
* {@link RelDataType} of the table to the planner.
*/
public class PinotTable extends AbstractTable implements ScannableTable {
private Schema _schema;
private final Schema _schema;
private final boolean _enableNullSupport;

public PinotTable(Schema schema) {
public PinotTable(Schema schema, boolean enableNullSupport) {
_schema = schema;
_enableNullSupport = enableNullSupport;
}

@Override
public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
Preconditions.checkState(relDataTypeFactory instanceof TypeFactory);
TypeFactory typeFactory = (TypeFactory) relDataTypeFactory;
return typeFactory.createRelDataTypeFromSchema(_schema);
return typeFactory.createRelDataTypeFromSchema(_schema, _enableNullSupport);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,43 +43,44 @@ public TypeFactory(RelDataTypeSystem typeSystem) {
super(typeSystem);
}

public RelDataType createRelDataTypeFromSchema(Schema schema) {
public RelDataType createRelDataTypeFromSchema(Schema schema, boolean isNullSupportEnabled) {
Builder builder = new Builder(this);
for (Map.Entry<String, FieldSpec> e : schema.getFieldSpecMap().entrySet()) {
builder.add(e.getKey(), toRelDataType(e.getValue()));
builder.add(e.getKey(), toRelDataType(e.getValue(),
isNullSupportEnabled || e.getValue().isNullableField()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be "and"? isNullSupportEnabled && e.getValue().isNullableField()

Since fieldSpec is already passed to toRelDataType, we can also pass only isNullSupportEnabled here and in toRelDataType we can do:

        return createTypeWithNullability(fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.INTEGER)
                : createArrayType(createSqlType(SqlTypeName.INTEGER), -1), isNullSupportEnabled && fieldSpec.isNullableField());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no and to answer the previous comment as well.

we can either enable globally (using table config), or enable via column config

  • if global enable. then all fields are nullable.
  • if not global enable. we read column level nullability.
  • if neither is set, then the column is not nullable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about a fallback mechanism to determine nullability?

  • A column level nullability can be true, false, or NULL(default).
  • A table level nullability can be true or false (default)
  • If the column level nullability is present, it determines the nullability.
  • If the column level nullability is not present, we fall back to the table level nullability setting.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or the table level nullability can also be NULL (default). If both the column level nullability and table level nullability are NULL, we apply the current behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going to propose the same @shenyu0127 proposed in June

}
return builder.build();
}

private RelDataType toRelDataType(FieldSpec fieldSpec) {
private RelDataType toRelDataType(FieldSpec fieldSpec, boolean isNullSupportEnabled) {
switch (fieldSpec.getDataType()) {
case INT:
return fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.INTEGER)
: createArrayType(createSqlType(SqlTypeName.INTEGER), -1);
return createTypeWithNullability(fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.INTEGER)
: createArrayType(createSqlType(SqlTypeName.INTEGER), -1), isNullSupportEnabled);
case LONG:
return fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.BIGINT)
: createArrayType(createSqlType(SqlTypeName.BIGINT), -1);
return createTypeWithNullability(fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.BIGINT)
: createArrayType(createSqlType(SqlTypeName.BIGINT), -1), isNullSupportEnabled);
case FLOAT:
return fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.REAL)
: createArrayType(createSqlType(SqlTypeName.REAL), -1);
return createTypeWithNullability(fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.REAL)
: createArrayType(createSqlType(SqlTypeName.REAL), -1), isNullSupportEnabled);
case DOUBLE:
return fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.DOUBLE)
: createArrayType(createSqlType(SqlTypeName.DOUBLE), -1);
return createTypeWithNullability(fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.DOUBLE)
: createArrayType(createSqlType(SqlTypeName.DOUBLE), -1), isNullSupportEnabled);
case BOOLEAN:
return fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.BOOLEAN)
: createArrayType(createSqlType(SqlTypeName.BOOLEAN), -1);
return createTypeWithNullability(fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.BOOLEAN)
: createArrayType(createSqlType(SqlTypeName.BOOLEAN), -1), isNullSupportEnabled);
case TIMESTAMP:
return fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.TIMESTAMP)
: createArrayType(createSqlType(SqlTypeName.TIMESTAMP), -1);
return createTypeWithNullability(fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.TIMESTAMP)
: createArrayType(createSqlType(SqlTypeName.TIMESTAMP), -1), isNullSupportEnabled);
case STRING:
return fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.VARCHAR)
: createArrayType(createSqlType(SqlTypeName.VARCHAR), -1);
return createTypeWithNullability(fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.VARCHAR)
: createArrayType(createSqlType(SqlTypeName.VARCHAR), -1), isNullSupportEnabled);
case BYTES:
return fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.VARBINARY)
: createArrayType(createSqlType(SqlTypeName.VARBINARY), -1);
return createTypeWithNullability(fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.VARBINARY)
: createArrayType(createSqlType(SqlTypeName.VARBINARY), -1), isNullSupportEnabled);
case BIG_DECIMAL:
return fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.DECIMAL)
: createArrayType(createSqlType(SqlTypeName.DECIMAL), -1);
return createTypeWithNullability(fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.DECIMAL)
: createArrayType(createSqlType(SqlTypeName.DECIMAL), -1), isNullSupportEnabled);
case JSON:
return createSqlType(SqlTypeName.VARCHAR);
case LIST:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -55,11 +56,20 @@ public class QueryEnvironmentTestBase {
.addDateTime("ts", FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:HOURS")
.addMetric("col3", FieldSpec.DataType.INT, 0)
.addMetric("col4", FieldSpec.DataType.BIG_DECIMAL, 0)
.addMetric("xNullCol", FieldSpec.DataType.INT, 0)
.setSchemaName("defaultSchemaName");
TABLE_SCHEMAS.put("a_REALTIME", SCHEMA_BUILDER.setSchemaName("a").build());
TABLE_SCHEMAS.put("b_REALTIME", SCHEMA_BUILDER.setSchemaName("b").build());
TABLE_SCHEMAS.put("c_OFFLINE", SCHEMA_BUILDER.setSchemaName("c").build());
TABLE_SCHEMAS.put("d", SCHEMA_BUILDER.setSchemaName("d").build());
Schema schemaA = SCHEMA_BUILDER.setSchemaName("a").build();
schemaA.getFieldSpecFor("xNullCol").setNullableField(true);
Schema schemaB = SCHEMA_BUILDER.setSchemaName("b").build();
schemaA.getFieldSpecFor("xNullCol").setNullableField(true);
Schema schemaC = SCHEMA_BUILDER.setSchemaName("c").build();
schemaA.getFieldSpecFor("xNullCol").setNullableField(true);
Schema schemaD = SCHEMA_BUILDER.setSchemaName("d").build();
schemaA.getFieldSpecFor("xNullCol").setNullableField(true);
TABLE_SCHEMAS.put("a_REALTIME", schemaA);
TABLE_SCHEMAS.put("b_REALTIME", schemaB);
TABLE_SCHEMAS.put("c_OFFLINE", schemaC);
TABLE_SCHEMAS.put("d", schemaD);
}

protected QueryEnvironment _queryEnvironment;
Expand Down Expand Up @@ -143,9 +153,15 @@ protected Object[][] provideQueries() {

public static QueryEnvironment getQueryEnvironment(int reducerPort, int port1, int port2,
Map<String, Schema> schemaMap, Map<String, List<String>> segmentMap1, Map<String, List<String>> segmentMap2) {
return getQueryEnvironment(reducerPort, port1, port2, schemaMap, segmentMap1, segmentMap2, Collections.emptyMap());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^ Let's add a table or two which have nullHandlingEnabled: true in table-config. Anti semi join queries (NOT IN) kind of queries would turn into broadcast joins in that case.

}

public static QueryEnvironment getQueryEnvironment(int reducerPort, int port1, int port2,
Map<String, Schema> schemaMap, Map<String, List<String>> segmentMap1, Map<String, List<String>> segmentMap2,
Map<String, Boolean> nullHandlingMap) {
MockRoutingManagerFactory factory = new MockRoutingManagerFactory(port1, port2);
for (Map.Entry<String, Schema> entry : schemaMap.entrySet()) {
factory.registerTable(entry.getValue(), entry.getKey());
factory.registerTable(entry.getValue(), entry.getKey(), nullHandlingMap.getOrDefault(entry.getKey(), false));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be: nullHandlingMap.getOrDefault(entry.getKey(), true)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no. by default nullability is disabled. this is the default behavior of all tables for pinot nowadays.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment was wrt // ALWAYS enable null handling in V2.

Copy link
Contributor Author

@walterddr walterddr Mar 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i think the comment was not suppose to be there. what i meant was "nullhandling query option" was enabled always, not "null column is always registered for table" :-P

}
for (Map.Entry<String, List<String>> entry : segmentMap1.entrySet()) {
for (String segment : entry.getValue()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;

import static org.mockito.ArgumentMatchers.anyString;
Expand All @@ -53,6 +54,7 @@ public class MockRoutingManagerFactory {

private final HashMap<String, String> _tableNameMap;
private final Map<String, Schema> _schemaMap;
private final Map<String, Boolean> _nullHandlingMap;

private final Map<String, ServerInstance> _serverInstances;
private final Map<String, RoutingTable> _routingTableMap;
Expand All @@ -64,6 +66,7 @@ public MockRoutingManagerFactory(int... ports) {
_hybridTables = new ArrayList<>();
_serverInstances = new HashMap<>();
_schemaMap = new HashMap<>();
_nullHandlingMap = new HashMap<>();
_tableNameMap = new HashMap<>();
_routingTableMap = new HashMap<>();

Expand All @@ -73,14 +76,17 @@ public MockRoutingManagerFactory(int... ports) {
}
}

public MockRoutingManagerFactory registerTable(Schema schema, String tableName) {
public MockRoutingManagerFactory registerTable(Schema schema, String tableName, boolean enableNullHandling) {
TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
if (tableType == null) {
registerTableNameWithType(schema, TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName));
registerTableNameWithType(schema, TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName));
registerTableNameWithType(schema, TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName),
enableNullHandling);
registerTableNameWithType(schema, TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName),
enableNullHandling);
_hybridTables.add(tableName);
} else {
registerTableNameWithType(schema, TableNameBuilder.forType(tableType).tableNameWithType(tableName));
registerTableNameWithType(schema, TableNameBuilder.forType(tableType).tableNameWithType(tableName),
enableNullHandling);
}
return this;
}
Expand Down Expand Up @@ -116,6 +122,15 @@ public TableCache buildTableCache() {
String schemaName = invocationOnMock.getArgument(0);
return _schemaMap.get(schemaName);
});
when(mock.getTableConfig(anyString())).thenAnswer(invocationOnMock -> {
String tableNameWithType = invocationOnMock.getArgument(0);
String tableName = TableNameBuilder.extractRawTableName(tableNameWithType);
return new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName)
.setSchemaName(_schemaMap.get(tableName).getSchemaName())
.setTimeColumnName("ts")
.setNullHandlingEnabled(_nullHandlingMap.get(tableName))
.build();
});
return mock;
}

Expand All @@ -137,11 +152,12 @@ private static ServerInstance getServerInstance(String hostname, int nettyPort,
return new ServerInstance(instanceConfig);
}

private void registerTableNameWithType(Schema schema, String tableNameWithType) {
private void registerTableNameWithType(Schema schema, String tableNameWithType, boolean enableNullHandling) {
String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
_tableNameMap.put(tableNameWithType, rawTableName);
_schemaMap.put(rawTableName, schema);
_schemaMap.put(tableNameWithType, schema);
_nullHandlingMap.put(rawTableName, enableNullHandling);
}

private static class FakeRoutingManager implements RoutingManager {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void testRelDataTypeConversion() {
.addMultiValueDimension("STRING_ARRAY_COL", FieldSpec.DataType.STRING)
.addMultiValueDimension("BYTES_ARRAY_COL", FieldSpec.DataType.BYTES)
.build();
RelDataType relDataTypeFromSchema = typeFactory.createRelDataTypeFromSchema(testSchema);
RelDataType relDataTypeFromSchema = typeFactory.createRelDataTypeFromSchema(testSchema, false);
List<RelDataTypeField> fieldList = relDataTypeFromSchema.getFieldList();
for (RelDataTypeField field : fieldList) {
switch (field.getName()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,50 @@
"\n LogicalTableScan(table=[[a]])",
"\n"
]
},
{
"description": "Select with filters on nullable fields",
"sql": "EXPLAIN PLAN FOR SELECT a.col1, a.col3 + a.xNullCol FROM a WHERE a.col3 IS NOT NULL AND a.xNullCol IS NULL",
"output": [
"Execution Plan",
"\nLogicalProject(col1=[$0], EXPR$1=[+($2, $5)])",
"\n LogicalFilter(condition=[IS NULL($5)])",
"\n LogicalTableScan(table=[[a]])",
"\n"
]
},
{
"description": "Select with semi/anti join relation correlate with nullable fields",
"sql": "EXPLAIN PLAN FOR SELECT a.col1 FROM a WHERE a.xNullCol NOT IN (SELECT b.xNullCol FROM b) AND a.col3 IN (SELECT c.col3 FROM c)",
"output": [
"Execution Plan",
"\nLogicalProject(col1=[$0])",
"\n LogicalJoin(condition=[=($1, $4)], joinType=[semi])",
"\n PinotLogicalExchange(distribution=[hash[1]])",
"\n LogicalProject(col1=[$0], col3=[$1])",
"\n LogicalFilter(condition=[OR(=($2, 0), AND(IS NULL($6), >=($3, $2), IS NOT NULL($4)))])",
"\n LogicalJoin(condition=[=($4, $5)], joinType=[left])",
"\n PinotLogicalExchange(distribution=[hash[4]])",
"\n LogicalProject(col1=[$0], col3=[$1], $f0=[$3], $f1=[$4], xNullCol0=[$2])",
"\n LogicalJoin(condition=[true], joinType=[inner])",
"\n PinotLogicalExchange(distribution=[random])",
"\n LogicalProject(col1=[$0], col3=[$2], xNullCol=[$5])",
"\n LogicalTableScan(table=[[a]])",
"\n PinotLogicalExchange(distribution=[broadcast])",
"\n LogicalAggregate(group=[{}], agg#0=[$SUM0($0)], agg#1=[$SUM0($1)])",
"\n PinotLogicalExchange(distribution=[hash])",
"\n LogicalAggregate(group=[{}], agg#0=[COUNT()], agg#1=[COUNT($5)])",
"\n LogicalTableScan(table=[[b]])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{0}], agg#0=[MIN($1)])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalAggregate(group=[{0}], agg#0=[MIN($1)])",
"\n LogicalProject(xNullCol=[$5], $f1=[true])",
"\n LogicalTableScan(table=[[b]])",
"\n PinotLogicalExchange(distribution=[hash[2]])",
"\n LogicalTableScan(table=[[c]])",
"\n"
]
}
]
}
Expand Down
Loading