Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
e336a90
Add regexp support
Mar 1, 2024
e25b479
Json index based extraction
Mar 1, 2024
56bc02a
Ordering fix
Mar 1, 2024
6ba201c
Add range filter
Mar 1, 2024
d1abd9b
Lint
Mar 1, 2024
444ee8d
Fix tests
Mar 2, 2024
f507797
Delay materialization until transform call
Mar 3, 2024
579661f
Extract literal type in range predicate
Mar 3, 2024
a5ed67d
Fix comparison
Mar 3, 2024
1db872c
Fix tests
Mar 3, 2024
73c40ee
Fix ordering
Mar 3, 2024
900ee14
Remove todo
Mar 4, 2024
6fb5066
Add support for null default value
Mar 4, 2024
7f164f5
Fixes
Mar 4, 2024
7249c71
Add comments
Mar 4, 2024
f139de0
Add tests
Mar 4, 2024
03007af
Remove rangeDataType from equality
Mar 4, 2024
1124a51
Add ITs
Mar 5, 2024
4bf276f
Merge branch 'master' of github.com:apache/pinot into jsonExtractIndexMv
Mar 8, 2024
f585f54
Lint
Mar 8, 2024
cf52952
Merge branch 'master' of github.com:apache/pinot into jsonExtractIndexMv
Mar 15, 2024
3ba6ba2
Merge branch 'master' of github.com:apache/pinot into jsonExtractIndexMv
Mar 18, 2024
f4a9129
Merge branch 'master' of github.com:apache/pinot into jsonExtractIndexMv
Mar 20, 2024
7d83f24
Consolidate with JsonExtractIndex
Mar 20, 2024
dcd8ca7
Lint
Mar 20, 2024
a7ec601
Lint
Mar 20, 2024
37d66a4
Move filter logic to new PR
Mar 21, 2024
e5a64bf
Lint
Mar 21, 2024
de04d1a
Review comments
Mar 21, 2024
c782fc0
Review comments
Mar 21, 2024
01b8280
Remove visibleForTesting
Mar 21, 2024
bc60fd0
Javadoc edit
Mar 21, 2024
dfc7096
Minor refactor
Mar 21, 2024
deda568
Comments
Mar 21, 2024
8bebd78
Review comments
Mar 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class JsonExtractIndexTransformFunction extends BaseTransformFunction {
private TransformResultMetadata _resultMetadata;
private JsonIndexReader _jsonIndexReader;
private Object _defaultValue;
private Map<String, RoaringBitmap> _matchingDocsMap;
private Map<String, RoaringBitmap> _valueToMatchingDocsMap;

@Override
public String getName() {
Expand Down Expand Up @@ -90,15 +90,12 @@ public void init(List<TransformFunction> arguments, Map<String, ColumnContext> c
}
String resultsType = ((LiteralTransformFunction) thirdArgument).getStringLiteral().toUpperCase();
boolean isSingleValue = !resultsType.endsWith("_ARRAY");
// TODO: will support array type; the underlying jsonIndexReader.getMatchingDocsMap supports the json path [*]
if (!isSingleValue) {
throw new IllegalArgumentException("jsonExtractIndex only supports single value type");
}
if (isSingleValue && inputJsonPath.contains("[*]")) {
throw new IllegalArgumentException("[*] syntax in json path is unsupported as json_extract_index"
+ "currently does not support returning array types");
throw new IllegalArgumentException(
"[*] syntax in json path is unsupported for singleValue field json_extract_index");
}
DataType dataType = DataType.valueOf(resultsType);
DataType dataType = isSingleValue ? DataType.valueOf(resultsType)
: DataType.valueOf(resultsType.substring(0, resultsType.length() - 6));

if (arguments.size() == 4) {
TransformFunction fourthArgument = arguments.get(3);
Expand All @@ -108,8 +105,12 @@ public void init(List<TransformFunction> arguments, Map<String, ColumnContext> c
_defaultValue = dataType.convert(((LiteralTransformFunction) fourthArgument).getStringLiteral());
}

_resultMetadata = new TransformResultMetadata(dataType, true, false);
_matchingDocsMap = _jsonIndexReader.getMatchingDocsMap(_jsonPathString);
_resultMetadata = new TransformResultMetadata(dataType, isSingleValue, false);
_valueToMatchingDocsMap = _jsonIndexReader.getMatchingFlattenedDocsMap(_jsonPathString);
if (isSingleValue) {
// For single value result type, it's more efficient to use original docIDs map
_jsonIndexReader.convertFlattenedDocIdsToDocIds(_valueToMatchingDocsMap);
}
}

@Override
Expand All @@ -122,8 +123,8 @@ public int[] transformToIntValuesSV(ValueBlock valueBlock) {
int numDocs = valueBlock.getNumDocs();
int[] inputDocIds = valueBlock.getDocIds();
initIntValuesSV(numDocs);
String[] valuesFromIndex =
_jsonIndexReader.getValuesForKeyAndDocs(valueBlock.getDocIds(), _matchingDocsMap);
String[] valuesFromIndex = _jsonIndexReader.getValuesSV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
_valueToMatchingDocsMap, false);
for (int i = 0; i < numDocs; i++) {
String value = valuesFromIndex[inputDocIds[i]];
if (value == null) {
Expand All @@ -144,8 +145,8 @@ public long[] transformToLongValuesSV(ValueBlock valueBlock) {
int numDocs = valueBlock.getNumDocs();
int[] inputDocIds = valueBlock.getDocIds();
initLongValuesSV(numDocs);
String[] valuesFromIndex =
_jsonIndexReader.getValuesForKeyAndDocs(valueBlock.getDocIds(), _matchingDocsMap);
String[] valuesFromIndex = _jsonIndexReader.getValuesSV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
_valueToMatchingDocsMap, false);
for (int i = 0; i < numDocs; i++) {
String value = valuesFromIndex[i];
if (value == null) {
Expand All @@ -166,8 +167,8 @@ public float[] transformToFloatValuesSV(ValueBlock valueBlock) {
int numDocs = valueBlock.getNumDocs();
int[] inputDocIds = valueBlock.getDocIds();
initFloatValuesSV(numDocs);
String[] valuesFromIndex =
_jsonIndexReader.getValuesForKeyAndDocs(valueBlock.getDocIds(), _matchingDocsMap);
String[] valuesFromIndex = _jsonIndexReader.getValuesSV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
_valueToMatchingDocsMap, false);
for (int i = 0; i < numDocs; i++) {
String value = valuesFromIndex[i];
if (value == null) {
Expand All @@ -188,8 +189,8 @@ public double[] transformToDoubleValuesSV(ValueBlock valueBlock) {
int numDocs = valueBlock.getNumDocs();
int[] inputDocIds = valueBlock.getDocIds();
initDoubleValuesSV(numDocs);
String[] valuesFromIndex =
_jsonIndexReader.getValuesForKeyAndDocs(valueBlock.getDocIds(), _matchingDocsMap);
String[] valuesFromIndex = _jsonIndexReader.getValuesSV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
_valueToMatchingDocsMap, false);
for (int i = 0; i < numDocs; i++) {
String value = valuesFromIndex[i];
if (value == null) {
Expand All @@ -210,8 +211,8 @@ public BigDecimal[] transformToBigDecimalValuesSV(ValueBlock valueBlock) {
int numDocs = valueBlock.getNumDocs();
int[] inputDocIds = valueBlock.getDocIds();
initBigDecimalValuesSV(numDocs);
String[] valuesFromIndex =
_jsonIndexReader.getValuesForKeyAndDocs(valueBlock.getDocIds(), _matchingDocsMap);
String[] valuesFromIndex = _jsonIndexReader.getValuesSV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
_valueToMatchingDocsMap, false);
for (int i = 0; i < numDocs; i++) {
String value = valuesFromIndex[i];
if (value == null) {
Expand All @@ -232,8 +233,8 @@ public String[] transformToStringValuesSV(ValueBlock valueBlock) {
int numDocs = valueBlock.getNumDocs();
int[] inputDocIds = valueBlock.getDocIds();
initStringValuesSV(numDocs);
String[] valuesFromIndex =
_jsonIndexReader.getValuesForKeyAndDocs(valueBlock.getDocIds(), _matchingDocsMap);
String[] valuesFromIndex = _jsonIndexReader.getValuesSV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
_valueToMatchingDocsMap, false);
for (int i = 0; i < numDocs; i++) {
String value = valuesFromIndex[i];
if (value == null) {
Expand All @@ -251,26 +252,80 @@ public String[] transformToStringValuesSV(ValueBlock valueBlock) {

@Override
public int[][] transformToIntValuesMV(ValueBlock valueBlock) {
throw new UnsupportedOperationException("jsonExtractIndex does not support transforming to multi-value columns");
int numDocs = valueBlock.getNumDocs();
initIntValuesMV(numDocs);
String[][] valuesFromIndex = _jsonIndexReader.getValuesMV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
_valueToMatchingDocsMap);

for (int i = 0; i < numDocs; i++) {
String[] value = valuesFromIndex[i];
_intValuesMV[i] = new int[value.length];
for (int j = 0; j < value.length; j++) {
_intValuesMV[i][j] = Integer.parseInt(value[j]);
}
}
return _intValuesMV;
}

@Override
public long[][] transformToLongValuesMV(ValueBlock valueBlock) {
throw new UnsupportedOperationException("jsonExtractIndex does not support transforming to multi-value columns");
int numDocs = valueBlock.getNumDocs();
initLongValuesMV(numDocs);
String[][] valuesFromIndex = _jsonIndexReader.getValuesMV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
_valueToMatchingDocsMap);
for (int i = 0; i < numDocs; i++) {
String[] value = valuesFromIndex[i];
_longValuesMV[i] = new long[value.length];
for (int j = 0; j < value.length; j++) {
_longValuesMV[i][j] = Long.parseLong(value[j]);
}
}
return _longValuesMV;
}

@Override
public float[][] transformToFloatValuesMV(ValueBlock valueBlock) {
throw new UnsupportedOperationException("jsonExtractIndex does not support transforming to multi-value columns");
int numDocs = valueBlock.getNumDocs();
initFloatValuesMV(numDocs);
String[][] valuesFromIndex = _jsonIndexReader.getValuesMV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
_valueToMatchingDocsMap);
for (int i = 0; i < numDocs; i++) {
String[] value = valuesFromIndex[i];
_floatValuesMV[i] = new float[value.length];
for (int j = 0; j < value.length; j++) {
_floatValuesMV[i][j] = Float.parseFloat(value[j]);
}
}
return _floatValuesMV;
}

@Override
public double[][] transformToDoubleValuesMV(ValueBlock valueBlock) {
throw new UnsupportedOperationException("jsonExtractIndex does not support transforming to multi-value columns");
int numDocs = valueBlock.getNumDocs();
initDoubleValuesMV(numDocs);
String[][] valuesFromIndex = _jsonIndexReader.getValuesMV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
_valueToMatchingDocsMap);
for (int i = 0; i < numDocs; i++) {
String[] value = valuesFromIndex[i];
_doubleValuesMV[i] = new double[value.length];
for (int j = 0; j < value.length; j++) {
_doubleValuesMV[i][j] = Double.parseDouble(value[j]);
}
}
return _doubleValuesMV;
}

@Override
public String[][] transformToStringValuesMV(ValueBlock valueBlock) {
throw new UnsupportedOperationException("jsonExtractIndex does not support transforming to multi-value columns");
int numDocs = valueBlock.getNumDocs();
initStringValuesMV(numDocs);
String[][] valuesFromIndex = _jsonIndexReader.getValuesMV(valueBlock.getDocIds(), valueBlock.getNumDocs(),
_valueToMatchingDocsMap);
for (int i = 0; i < numDocs; i++) {
String[] value = valuesFromIndex[i];
_stringValuesMV[i] = new String[value.length];
System.arraycopy(value, 0, _stringValuesMV[i], 0, value.length);
}
return _stringValuesMV;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pinot.core.operator.transform.function;

import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.File;
import java.math.BigDecimal;
import java.sql.Timestamp;
Expand Down Expand Up @@ -46,6 +48,7 @@
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
Expand Down Expand Up @@ -155,10 +158,13 @@ public void setUp()
_stringSVValues[i] = df.format(_intSVValues[i] * RANDOM.nextDouble());
_jsonSVValues[i] = String.format(
"{\"intVal\":%s, \"longVal\":%s, \"floatVal\":%s, \"doubleVal\":%s, \"bigDecimalVal\":%s, "
+ "\"stringVal\":\"%s\", "
+ "\"stringVal\":\"%s\", \"arrayField\": [{\"arrIntField\": 1, \"arrStringField\": \"abc\"}, "
+ "{\"arrIntField\": 2, \"arrStringField\": \"xyz\"},"
+ "{\"arrIntField\": 5, \"arrStringField\": \"wxy\"},"
+ "{\"arrIntField\": 0}], "
+ "\"intVals\":[0,1], \"longVals\":[0,1], \"floatVals\":[0.0,1.0], \"doubleVals\":[0.0,1.0], "
+ "\"bigDecimalVals\":[0.0,1.0], \"stringVals\":[\"0\",\"1\"]}",
RANDOM.nextInt(), RANDOM.nextLong(), RANDOM.nextFloat(), RANDOM.nextDouble(),
RANDOM.nextInt(), RANDOM.nextLong(), RANDOM.nextFloat(), RANDOM.nextDouble(),
BigDecimal.valueOf(RANDOM.nextDouble()).multiply(BigDecimal.valueOf(RANDOM.nextInt())),
df.format(RANDOM.nextInt() * RANDOM.nextDouble()));
_stringAlphaNumericSVValues[i] = RandomStringUtils.randomAlphanumeric(26);
Expand Down Expand Up @@ -276,7 +282,7 @@ public void setUp()
.addSingleValueDimension(DOUBLE_SV_COLUMN, FieldSpec.DataType.DOUBLE)
.addMetric(BIG_DECIMAL_SV_COLUMN, FieldSpec.DataType.BIG_DECIMAL)
.addSingleValueDimension(STRING_SV_COLUMN, FieldSpec.DataType.STRING)
.addSingleValueDimension(JSON_STRING_SV_COLUMN, FieldSpec.DataType.STRING)
.addSingleValueDimension(JSON_STRING_SV_COLUMN, FieldSpec.DataType.STRING, 5000, "{}")
.addSingleValueDimension(STRING_SV_NULL_COLUMN, FieldSpec.DataType.STRING)
.addSingleValueDimension(STRING_ALPHANUM_SV_COLUMN, FieldSpec.DataType.STRING)
.addSingleValueDimension(STRING_ALPHANUM_NULL_SV_COLUMN, FieldSpec.DataType.STRING)
Expand All @@ -303,10 +309,19 @@ public void setUp()
.addDateTime(TIMESTAMP_COLUMN, FieldSpec.DataType.TIMESTAMP, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
.addDateTime(TIMESTAMP_COLUMN_NULL, FieldSpec.DataType.TIMESTAMP, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
.addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.MILLISECONDS, TIME_COLUMN), null).build();

List<FieldConfig> fieldConfigList = new ArrayList<>();
ObjectNode jsonIndexProps = JsonNodeFactory.instance.objectNode();
jsonIndexProps.put("disableCrossArrayUnnest", true);
ObjectNode indexNode = JsonNodeFactory.instance.objectNode();
indexNode.put("json", jsonIndexProps);
FieldConfig jsonFieldConfig =
new FieldConfig(JSON_STRING_SV_COLUMN, FieldConfig.EncodingType.DICTIONARY, null, null, null, null, indexNode,
null, null);
fieldConfigList.add(jsonFieldConfig);
TableConfig tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName("test").setTimeColumnName(TIME_COLUMN)
.setJsonIndexColumns(List.of(JSON_STRING_SV_COLUMN))
.setNullHandlingEnabled(true).build();
.setFieldConfigList(fieldConfigList).setNullHandlingEnabled(true).build();

SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
config.setOutDir(INDEX_DIR_PATH);
Expand Down
Loading