-
Notifications
You must be signed in to change notification settings - Fork 1.4k
[multistage] proper support for column-level null handling #10423
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,6 +32,7 @@ | |
| import org.apache.calcite.schema.Table; | ||
| import org.apache.pinot.common.config.provider.TableCache; | ||
| import org.apache.pinot.common.function.FunctionRegistry; | ||
| import org.apache.pinot.spi.config.table.TableConfig; | ||
| import org.apache.pinot.spi.utils.builder.TableNameBuilder; | ||
|
|
||
| import static java.util.Objects.requireNonNull; | ||
|
|
@@ -70,7 +71,7 @@ public Table getTable(String name) { | |
| + "If you are running this via the a test environment, check to make sure you're " | ||
| + "specifying the correct tables."); | ||
| } | ||
| return new PinotTable(schema); | ||
| return new PinotTable(schema, isNullEnabled(_tableCache, tableName)); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -127,4 +128,20 @@ public boolean isMutable() { | |
| public Schema snapshot(SchemaVersion version) { | ||
| return this; | ||
| } | ||
|
|
||
| private static boolean isNullEnabled(TableCache tableCache, String rawTableName) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function gets a table property. Other similar functions are member functions. Do we want change this function to a member function for consistency? |
||
| TableConfig tableConfig = tableCache.getTableConfig( | ||
|
||
| TableNameBuilder.forType(org.apache.pinot.spi.config.table.TableType.REALTIME) | ||
| .tableNameWithType(rawTableName)); | ||
| if (tableConfig == null) { | ||
| tableConfig = tableCache.getTableConfig( | ||
| TableNameBuilder.forType(org.apache.pinot.spi.config.table.TableType.OFFLINE) | ||
| .tableNameWithType(rawTableName)); | ||
| } | ||
| if (tableConfig != null && tableConfig.getIndexingConfig() != null) { | ||
| return tableConfig.getIndexingConfig().isNullHandlingEnabled(); | ||
|
||
| } else { | ||
| return false; | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -43,43 +43,44 @@ public TypeFactory(RelDataTypeSystem typeSystem) { | |
| super(typeSystem); | ||
| } | ||
|
|
||
| public RelDataType createRelDataTypeFromSchema(Schema schema) { | ||
| public RelDataType createRelDataTypeFromSchema(Schema schema, boolean isNullSupportEnabled) { | ||
| Builder builder = new Builder(this); | ||
| for (Map.Entry<String, FieldSpec> e : schema.getFieldSpecMap().entrySet()) { | ||
| builder.add(e.getKey(), toRelDataType(e.getValue())); | ||
| builder.add(e.getKey(), toRelDataType(e.getValue(), | ||
| isNullSupportEnabled || e.getValue().isNullableField())); | ||
|
||
| } | ||
| return builder.build(); | ||
| } | ||
|
|
||
| private RelDataType toRelDataType(FieldSpec fieldSpec) { | ||
| private RelDataType toRelDataType(FieldSpec fieldSpec, boolean isNullSupportEnabled) { | ||
| switch (fieldSpec.getDataType()) { | ||
| case INT: | ||
| return fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.INTEGER) | ||
| : createArrayType(createSqlType(SqlTypeName.INTEGER), -1); | ||
| return createTypeWithNullability(fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.INTEGER) | ||
| : createArrayType(createSqlType(SqlTypeName.INTEGER), -1), isNullSupportEnabled); | ||
| case LONG: | ||
| return fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.BIGINT) | ||
| : createArrayType(createSqlType(SqlTypeName.BIGINT), -1); | ||
| return createTypeWithNullability(fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.BIGINT) | ||
| : createArrayType(createSqlType(SqlTypeName.BIGINT), -1), isNullSupportEnabled); | ||
| case FLOAT: | ||
| return fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.REAL) | ||
| : createArrayType(createSqlType(SqlTypeName.REAL), -1); | ||
| return createTypeWithNullability(fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.REAL) | ||
| : createArrayType(createSqlType(SqlTypeName.REAL), -1), isNullSupportEnabled); | ||
| case DOUBLE: | ||
| return fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.DOUBLE) | ||
| : createArrayType(createSqlType(SqlTypeName.DOUBLE), -1); | ||
| return createTypeWithNullability(fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.DOUBLE) | ||
| : createArrayType(createSqlType(SqlTypeName.DOUBLE), -1), isNullSupportEnabled); | ||
| case BOOLEAN: | ||
| return fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.BOOLEAN) | ||
| : createArrayType(createSqlType(SqlTypeName.BOOLEAN), -1); | ||
| return createTypeWithNullability(fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.BOOLEAN) | ||
| : createArrayType(createSqlType(SqlTypeName.BOOLEAN), -1), isNullSupportEnabled); | ||
| case TIMESTAMP: | ||
| return fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.TIMESTAMP) | ||
| : createArrayType(createSqlType(SqlTypeName.TIMESTAMP), -1); | ||
| return createTypeWithNullability(fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.TIMESTAMP) | ||
| : createArrayType(createSqlType(SqlTypeName.TIMESTAMP), -1), isNullSupportEnabled); | ||
| case STRING: | ||
| return fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.VARCHAR) | ||
| : createArrayType(createSqlType(SqlTypeName.VARCHAR), -1); | ||
| return createTypeWithNullability(fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.VARCHAR) | ||
| : createArrayType(createSqlType(SqlTypeName.VARCHAR), -1), isNullSupportEnabled); | ||
| case BYTES: | ||
| return fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.VARBINARY) | ||
| : createArrayType(createSqlType(SqlTypeName.VARBINARY), -1); | ||
| return createTypeWithNullability(fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.VARBINARY) | ||
| : createArrayType(createSqlType(SqlTypeName.VARBINARY), -1), isNullSupportEnabled); | ||
| case BIG_DECIMAL: | ||
| return fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.DECIMAL) | ||
| : createArrayType(createSqlType(SqlTypeName.DECIMAL), -1); | ||
| return createTypeWithNullability(fieldSpec.isSingleValueField() ? createSqlType(SqlTypeName.DECIMAL) | ||
| : createArrayType(createSqlType(SqlTypeName.DECIMAL), -1), isNullSupportEnabled); | ||
| case JSON: | ||
| return createSqlType(SqlTypeName.VARCHAR); | ||
| case LIST: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,6 +22,7 @@ | |
| import com.fasterxml.jackson.annotation.JsonProperty; | ||
| import com.google.common.collect.ImmutableList; | ||
| import com.google.common.collect.ImmutableMap; | ||
| import java.util.Collections; | ||
| import java.util.HashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
|
|
@@ -55,11 +56,20 @@ public class QueryEnvironmentTestBase { | |
| .addDateTime("ts", FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:HOURS") | ||
| .addMetric("col3", FieldSpec.DataType.INT, 0) | ||
| .addMetric("col4", FieldSpec.DataType.BIG_DECIMAL, 0) | ||
| .addMetric("xNullCol", FieldSpec.DataType.INT, 0) | ||
| .setSchemaName("defaultSchemaName"); | ||
| TABLE_SCHEMAS.put("a_REALTIME", SCHEMA_BUILDER.setSchemaName("a").build()); | ||
| TABLE_SCHEMAS.put("b_REALTIME", SCHEMA_BUILDER.setSchemaName("b").build()); | ||
| TABLE_SCHEMAS.put("c_OFFLINE", SCHEMA_BUILDER.setSchemaName("c").build()); | ||
| TABLE_SCHEMAS.put("d", SCHEMA_BUILDER.setSchemaName("d").build()); | ||
| Schema schemaA = SCHEMA_BUILDER.setSchemaName("a").build(); | ||
| schemaA.getFieldSpecFor("xNullCol").setNullableField(true); | ||
| Schema schemaB = SCHEMA_BUILDER.setSchemaName("b").build(); | ||
| schemaA.getFieldSpecFor("xNullCol").setNullableField(true); | ||
| Schema schemaC = SCHEMA_BUILDER.setSchemaName("c").build(); | ||
| schemaA.getFieldSpecFor("xNullCol").setNullableField(true); | ||
| Schema schemaD = SCHEMA_BUILDER.setSchemaName("d").build(); | ||
| schemaA.getFieldSpecFor("xNullCol").setNullableField(true); | ||
| TABLE_SCHEMAS.put("a_REALTIME", schemaA); | ||
| TABLE_SCHEMAS.put("b_REALTIME", schemaB); | ||
| TABLE_SCHEMAS.put("c_OFFLINE", schemaC); | ||
| TABLE_SCHEMAS.put("d", schemaD); | ||
| } | ||
|
|
||
| protected QueryEnvironment _queryEnvironment; | ||
|
|
@@ -143,9 +153,15 @@ protected Object[][] provideQueries() { | |
|
|
||
| public static QueryEnvironment getQueryEnvironment(int reducerPort, int port1, int port2, | ||
| Map<String, Schema> schemaMap, Map<String, List<String>> segmentMap1, Map<String, List<String>> segmentMap2) { | ||
| return getQueryEnvironment(reducerPort, port1, port2, schemaMap, segmentMap1, segmentMap2, Collections.emptyMap()); | ||
|
||
| } | ||
|
|
||
| public static QueryEnvironment getQueryEnvironment(int reducerPort, int port1, int port2, | ||
| Map<String, Schema> schemaMap, Map<String, List<String>> segmentMap1, Map<String, List<String>> segmentMap2, | ||
| Map<String, Boolean> nullHandlingMap) { | ||
| MockRoutingManagerFactory factory = new MockRoutingManagerFactory(port1, port2); | ||
| for (Map.Entry<String, Schema> entry : schemaMap.entrySet()) { | ||
| factory.registerTable(entry.getValue(), entry.getKey()); | ||
| factory.registerTable(entry.getValue(), entry.getKey(), nullHandlingMap.getOrDefault(entry.getKey(), false)); | ||
|
||
| } | ||
| for (Map.Entry<String, List<String>> entry : segmentMap1.entrySet()) { | ||
| for (String segment : entry.getValue()) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't need to handle null here if the scalar function doesn't have
nullableParameters