Skip to content

Commit 74fd91a

Browse files
authored
Fix transformation to string for BOOLEAN and TIMESTAMP (#9287)
1 parent d0b4f23 commit 74fd91a

File tree

5 files changed

+228
-316
lines changed

5 files changed

+228
-316
lines changed

pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java

Lines changed: 103 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.io.Closeable;
2222
import java.io.IOException;
2323
import java.math.BigDecimal;
24+
import java.sql.Timestamp;
2425
import java.util.Arrays;
2526
import java.util.HashMap;
2627
import java.util.Map;
@@ -32,7 +33,7 @@
3233
import org.apache.pinot.segment.spi.index.reader.Dictionary;
3334
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
3435
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
35-
import org.apache.pinot.spi.data.FieldSpec;
36+
import org.apache.pinot.spi.data.FieldSpec.DataType;
3637
import org.apache.pinot.spi.trace.Tracing;
3738
import org.apache.pinot.spi.utils.BytesUtils;
3839

@@ -66,10 +67,11 @@ public DataFetcher(Map<String, DataSource> dataSourceMap) {
6667
for (Map.Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
6768
String column = entry.getKey();
6869
DataSource dataSource = entry.getValue();
70+
DataSourceMetadata dataSourceMetadata = dataSource.getDataSourceMetadata();
6971
ColumnValueReader columnValueReader =
70-
new ColumnValueReader(dataSource.getForwardIndex(), dataSource.getDictionary());
72+
new ColumnValueReader(dataSource.getForwardIndex(), dataSource.getDictionary(),
73+
dataSourceMetadata.getDataType());
7174
_columnValueReaderMap.put(column, columnValueReader);
72-
DataSourceMetadata dataSourceMetadata = dataSource.getDataSourceMetadata();
7375
if (!dataSourceMetadata.isSingleValue()) {
7476
maxNumValuesPerMVEntry = Math.max(maxNumValuesPerMVEntry, dataSourceMetadata.getMaxNumValuesPerMVEntry());
7577
}
@@ -427,16 +429,16 @@ public void fetchNumValues(String column, int[] inDocIds, int length, int[] outN
427429
private class ColumnValueReader implements Closeable {
428430
final ForwardIndexReader _reader;
429431
final Dictionary _dictionary;
430-
final FieldSpec.DataType _dataType;
432+
final DataType _dataType;
431433
final boolean _singleValue;
432434

433435
boolean _readerContextCreated;
434436
ForwardIndexReaderContext _readerContext;
435437

436-
ColumnValueReader(ForwardIndexReader reader, @Nullable Dictionary dictionary) {
438+
ColumnValueReader(ForwardIndexReader reader, @Nullable Dictionary dictionary, DataType dataType) {
437439
_reader = reader;
438440
_dictionary = dictionary;
439-
_dataType = reader.getStoredType();
441+
_dataType = dataType;
440442
_singleValue = reader.isSingleValue();
441443
}
442444

@@ -550,9 +552,19 @@ void readStringValues(int[] docIds, int length, String[] valueBuffer) {
550552
if (_dictionary != null) {
551553
int[] dictIdBuffer = THREAD_LOCAL_DICT_IDS.get();
552554
_reader.readDictIds(docIds, length, dictIdBuffer, readerContext);
553-
_dictionary.readStringValues(dictIdBuffer, length, valueBuffer);
555+
if (_dataType == DataType.BOOLEAN) {
556+
for (int i = 0; i < length; i++) {
557+
valueBuffer[i] = Boolean.toString(_dictionary.getIntValue(dictIdBuffer[i]) == 1);
558+
}
559+
} else if (_dataType == DataType.TIMESTAMP) {
560+
for (int i = 0; i < length; i++) {
561+
valueBuffer[i] = new Timestamp(_dictionary.getLongValue(dictIdBuffer[i])).toString();
562+
}
563+
} else {
564+
_dictionary.readStringValues(dictIdBuffer, length, valueBuffer);
565+
}
554566
} else {
555-
switch (_reader.getStoredType()) {
567+
switch (_dataType) {
556568
case INT:
557569
for (int i = 0; i < length; i++) {
558570
valueBuffer[i] = Integer.toString(_reader.getInt(docIds[i], readerContext));
@@ -573,7 +585,23 @@ void readStringValues(int[] docIds, int length, String[] valueBuffer) {
573585
valueBuffer[i] = Double.toString(_reader.getDouble(docIds[i], readerContext));
574586
}
575587
break;
588+
case BIG_DECIMAL:
589+
for (int i = 0; i < length; i++) {
590+
valueBuffer[i] = _reader.getBigDecimal(docIds[i], readerContext).toPlainString();
591+
}
592+
break;
593+
case BOOLEAN:
594+
for (int i = 0; i < length; i++) {
595+
valueBuffer[i] = Boolean.toString(_reader.getInt(docIds[i], readerContext) == 1);
596+
}
597+
break;
598+
case TIMESTAMP:
599+
for (int i = 0; i < length; i++) {
600+
valueBuffer[i] = new Timestamp(_reader.getLong(docIds[i], readerContext)).toString();
601+
}
602+
break;
576603
case STRING:
604+
case JSON:
577605
for (int i = 0; i < length; i++) {
578606
valueBuffer[i] = _reader.getString(docIds[i], readerContext);
579607
}
@@ -622,23 +650,25 @@ void readBytesValues(int[] docIds, int length, byte[][] valueBuffer) {
622650

623651
void readDictIdsMV(int[] docIds, int length, int[][] dictIdsBuffer) {
624652
Tracing.activeRecording().setInputDataType(_dataType, _singleValue);
653+
ForwardIndexReaderContext readerContext = getReaderContext();
625654
for (int i = 0; i < length; i++) {
626-
int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext());
655+
int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, readerContext);
627656
dictIdsBuffer[i] = Arrays.copyOfRange(_reusableMVDictIds, 0, numValues);
628657
}
629658
}
630659

631660
void readIntValuesMV(int[] docIds, int length, int[][] valuesBuffer) {
632661
Tracing.activeRecording().setInputDataType(_dataType, _singleValue);
662+
ForwardIndexReaderContext readerContext = getReaderContext();
633663
if (_dictionary != null) {
634664
for (int i = 0; i < length; i++) {
635-
int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext());
665+
int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, readerContext);
636666
int[] values = new int[numValues];
637667
_dictionary.readIntValues(_reusableMVDictIds, numValues, values);
638668
valuesBuffer[i] = values;
639669
}
640670
} else {
641-
_reader.readValuesMV(docIds, length, _maxNumValuesPerMVEntry, valuesBuffer, getReaderContext());
671+
_reader.readValuesMV(docIds, length, _maxNumValuesPerMVEntry, valuesBuffer, readerContext);
642672
}
643673
}
644674

@@ -650,15 +680,16 @@ void readIntValuesMV(TransformEvaluator evaluator, int[] docIds, int length, int
650680

651681
void readLongValuesMV(int[] docIds, int length, long[][] valuesBuffer) {
652682
Tracing.activeRecording().setInputDataType(_dataType, _singleValue);
683+
ForwardIndexReaderContext readerContext = getReaderContext();
653684
if (_dictionary != null) {
654685
for (int i = 0; i < length; i++) {
655-
int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext());
686+
int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, readerContext);
656687
long[] values = new long[numValues];
657688
_dictionary.readLongValues(_reusableMVDictIds, numValues, values);
658689
valuesBuffer[i] = values;
659690
}
660691
} else {
661-
_reader.readValuesMV(docIds, length, _maxNumValuesPerMVEntry, valuesBuffer, getReaderContext());
692+
_reader.readValuesMV(docIds, length, _maxNumValuesPerMVEntry, valuesBuffer, readerContext);
662693
}
663694
}
664695

@@ -670,15 +701,16 @@ void readLongValuesMV(TransformEvaluator evaluator, int[] docIds, int length, lo
670701

671702
void readFloatValuesMV(int[] docIds, int length, float[][] valuesBuffer) {
672703
Tracing.activeRecording().setInputDataType(_dataType, _singleValue);
704+
ForwardIndexReaderContext readerContext = getReaderContext();
673705
if (_dictionary != null) {
674706
for (int i = 0; i < length; i++) {
675-
int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext());
707+
int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, readerContext);
676708
float[] values = new float[numValues];
677709
_dictionary.readFloatValues(_reusableMVDictIds, numValues, values);
678710
valuesBuffer[i] = values;
679711
}
680712
} else {
681-
_reader.readValuesMV(docIds, length, _maxNumValuesPerMVEntry, valuesBuffer, getReaderContext());
713+
_reader.readValuesMV(docIds, length, _maxNumValuesPerMVEntry, valuesBuffer, readerContext);
682714
}
683715
}
684716

@@ -690,15 +722,16 @@ void readFloatValuesMV(TransformEvaluator evaluator, int[] docIds, int length, f
690722

691723
void readDoubleValuesMV(int[] docIds, int length, double[][] valuesBuffer) {
692724
Tracing.activeRecording().setInputDataType(_dataType, _singleValue);
725+
ForwardIndexReaderContext readerContext = getReaderContext();
693726
if (_dictionary != null) {
694727
for (int i = 0; i < length; i++) {
695-
int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext());
728+
int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, readerContext);
696729
double[] values = new double[numValues];
697730
_dictionary.readDoubleValues(_reusableMVDictIds, numValues, values);
698731
valuesBuffer[i] = values;
699732
}
700733
} else {
701-
_reader.readValuesMV(docIds, length, _maxNumValuesPerMVEntry, valuesBuffer, getReaderContext());
734+
_reader.readValuesMV(docIds, length, _maxNumValuesPerMVEntry, valuesBuffer, readerContext);
702735
}
703736
}
704737

@@ -710,15 +743,62 @@ void readDoubleValuesMV(TransformEvaluator evaluator, int[] docIds, int length,
710743

711744
void readStringValuesMV(int[] docIds, int length, String[][] valuesBuffer) {
712745
Tracing.activeRecording().setInputDataType(_dataType, _singleValue);
746+
ForwardIndexReaderContext readerContext = getReaderContext();
713747
if (_dictionary != null) {
714-
for (int i = 0; i < length; i++) {
715-
int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext());
716-
String[] values = new String[numValues];
717-
_dictionary.readStringValues(_reusableMVDictIds, numValues, values);
718-
valuesBuffer[i] = values;
748+
if (_dataType == DataType.BOOLEAN) {
749+
for (int i = 0; i < length; i++) {
750+
int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, readerContext);
751+
int[] intValues = new int[numValues];
752+
_dictionary.readIntValues(_reusableMVDictIds, numValues, intValues);
753+
String[] values = new String[numValues];
754+
for (int j = 0; j < numValues; j++) {
755+
values[i] = Boolean.toString(intValues[i] == 1);
756+
}
757+
valuesBuffer[i] = values;
758+
}
759+
} else if (_dataType == DataType.TIMESTAMP) {
760+
for (int i = 0; i < length; i++) {
761+
int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, readerContext);
762+
long[] longValues = new long[numValues];
763+
_dictionary.readLongValues(_reusableMVDictIds, numValues, longValues);
764+
String[] values = new String[numValues];
765+
for (int j = 0; j < numValues; j++) {
766+
values[i] = new Timestamp(longValues[i]).toString();
767+
}
768+
valuesBuffer[i] = values;
769+
}
770+
} else {
771+
for (int i = 0; i < length; i++) {
772+
int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, readerContext);
773+
String[] values = new String[numValues];
774+
_dictionary.readStringValues(_reusableMVDictIds, numValues, values);
775+
valuesBuffer[i] = values;
776+
}
719777
}
720778
} else {
721-
_reader.readValuesMV(docIds, length, _maxNumValuesPerMVEntry, valuesBuffer, getReaderContext());
779+
if (_dataType == DataType.BOOLEAN) {
780+
int[] intValueBuffer = new int[_maxNumValuesPerMVEntry];
781+
for (int i = 0; i < length; i++) {
782+
int numValues = _reader.getIntMV(docIds[i], intValueBuffer, readerContext);
783+
String[] values = new String[numValues];
784+
for (int j = 0; j < numValues; j++) {
785+
values[i] = Boolean.toString(intValueBuffer[i] == 1);
786+
}
787+
valuesBuffer[i] = values;
788+
}
789+
} else if (_dataType == DataType.TIMESTAMP) {
790+
long[] longValueBuffer = new long[_maxNumValuesPerMVEntry];
791+
for (int i = 0; i < length; i++) {
792+
int numValues = _reader.getLongMV(docIds[i], longValueBuffer, readerContext);
793+
String[] values = new String[numValues];
794+
for (int j = 0; j < numValues; j++) {
795+
values[i] = new Timestamp(longValueBuffer[i]).toString();
796+
}
797+
valuesBuffer[i] = values;
798+
}
799+
} else {
800+
_reader.readValuesMV(docIds, length, _maxNumValuesPerMVEntry, valuesBuffer, readerContext);
801+
}
722802
}
723803
}
724804

0 commit comments

Comments
 (0)