2121import java .io .Closeable ;
2222import java .io .IOException ;
2323import java .math .BigDecimal ;
24+ import java .sql .Timestamp ;
2425import java .util .Arrays ;
2526import java .util .HashMap ;
2627import java .util .Map ;
3233import org .apache .pinot .segment .spi .index .reader .Dictionary ;
3334import org .apache .pinot .segment .spi .index .reader .ForwardIndexReader ;
3435import org .apache .pinot .segment .spi .index .reader .ForwardIndexReaderContext ;
35- import org .apache .pinot .spi .data .FieldSpec ;
36+ import org .apache .pinot .spi .data .FieldSpec . DataType ;
3637import org .apache .pinot .spi .trace .Tracing ;
3738import org .apache .pinot .spi .utils .BytesUtils ;
3839
@@ -66,10 +67,11 @@ public DataFetcher(Map<String, DataSource> dataSourceMap) {
6667 for (Map .Entry <String , DataSource > entry : dataSourceMap .entrySet ()) {
6768 String column = entry .getKey ();
6869 DataSource dataSource = entry .getValue ();
70+ DataSourceMetadata dataSourceMetadata = dataSource .getDataSourceMetadata ();
6971 ColumnValueReader columnValueReader =
70- new ColumnValueReader (dataSource .getForwardIndex (), dataSource .getDictionary ());
72+ new ColumnValueReader (dataSource .getForwardIndex (), dataSource .getDictionary (),
73+ dataSourceMetadata .getDataType ());
7174 _columnValueReaderMap .put (column , columnValueReader );
72- DataSourceMetadata dataSourceMetadata = dataSource .getDataSourceMetadata ();
7375 if (!dataSourceMetadata .isSingleValue ()) {
7476 maxNumValuesPerMVEntry = Math .max (maxNumValuesPerMVEntry , dataSourceMetadata .getMaxNumValuesPerMVEntry ());
7577 }
@@ -427,16 +429,16 @@ public void fetchNumValues(String column, int[] inDocIds, int length, int[] outN
427429 private class ColumnValueReader implements Closeable {
428430 final ForwardIndexReader _reader ;
429431 final Dictionary _dictionary ;
430- final FieldSpec . DataType _dataType ;
432+ final DataType _dataType ;
431433 final boolean _singleValue ;
432434
433435 boolean _readerContextCreated ;
434436 ForwardIndexReaderContext _readerContext ;
435437
436- ColumnValueReader (ForwardIndexReader reader , @ Nullable Dictionary dictionary ) {
438+ ColumnValueReader (ForwardIndexReader reader , @ Nullable Dictionary dictionary , DataType dataType ) {
437439 _reader = reader ;
438440 _dictionary = dictionary ;
439- _dataType = reader . getStoredType () ;
441+ _dataType = dataType ;
440442 _singleValue = reader .isSingleValue ();
441443 }
442444
@@ -550,9 +552,19 @@ void readStringValues(int[] docIds, int length, String[] valueBuffer) {
550552 if (_dictionary != null ) {
551553 int [] dictIdBuffer = THREAD_LOCAL_DICT_IDS .get ();
552554 _reader .readDictIds (docIds , length , dictIdBuffer , readerContext );
553- _dictionary .readStringValues (dictIdBuffer , length , valueBuffer );
555+ if (_dataType == DataType .BOOLEAN ) {
556+ for (int i = 0 ; i < length ; i ++) {
557+ valueBuffer [i ] = Boolean .toString (_dictionary .getIntValue (dictIdBuffer [i ]) == 1 );
558+ }
559+ } else if (_dataType == DataType .TIMESTAMP ) {
560+ for (int i = 0 ; i < length ; i ++) {
561+ valueBuffer [i ] = new Timestamp (_dictionary .getLongValue (dictIdBuffer [i ])).toString ();
562+ }
563+ } else {
564+ _dictionary .readStringValues (dictIdBuffer , length , valueBuffer );
565+ }
554566 } else {
555- switch (_reader . getStoredType () ) {
567+ switch (_dataType ) {
556568 case INT :
557569 for (int i = 0 ; i < length ; i ++) {
558570 valueBuffer [i ] = Integer .toString (_reader .getInt (docIds [i ], readerContext ));
@@ -573,7 +585,23 @@ void readStringValues(int[] docIds, int length, String[] valueBuffer) {
573585 valueBuffer [i ] = Double .toString (_reader .getDouble (docIds [i ], readerContext ));
574586 }
575587 break ;
588+ case BIG_DECIMAL :
589+ for (int i = 0 ; i < length ; i ++) {
590+ valueBuffer [i ] = _reader .getBigDecimal (docIds [i ], readerContext ).toPlainString ();
591+ }
592+ break ;
593+ case BOOLEAN :
594+ for (int i = 0 ; i < length ; i ++) {
595+ valueBuffer [i ] = Boolean .toString (_reader .getInt (docIds [i ], readerContext ) == 1 );
596+ }
597+ break ;
598+ case TIMESTAMP :
599+ for (int i = 0 ; i < length ; i ++) {
600+ valueBuffer [i ] = new Timestamp (_reader .getLong (docIds [i ], readerContext )).toString ();
601+ }
602+ break ;
576603 case STRING :
604+ case JSON :
577605 for (int i = 0 ; i < length ; i ++) {
578606 valueBuffer [i ] = _reader .getString (docIds [i ], readerContext );
579607 }
@@ -622,23 +650,25 @@ void readBytesValues(int[] docIds, int length, byte[][] valueBuffer) {
622650
623651 void readDictIdsMV (int [] docIds , int length , int [][] dictIdsBuffer ) {
624652 Tracing .activeRecording ().setInputDataType (_dataType , _singleValue );
653+ ForwardIndexReaderContext readerContext = getReaderContext ();
625654 for (int i = 0 ; i < length ; i ++) {
626- int numValues = _reader .getDictIdMV (docIds [i ], _reusableMVDictIds , getReaderContext () );
655+ int numValues = _reader .getDictIdMV (docIds [i ], _reusableMVDictIds , readerContext );
627656 dictIdsBuffer [i ] = Arrays .copyOfRange (_reusableMVDictIds , 0 , numValues );
628657 }
629658 }
630659
631660 void readIntValuesMV (int [] docIds , int length , int [][] valuesBuffer ) {
632661 Tracing .activeRecording ().setInputDataType (_dataType , _singleValue );
662+ ForwardIndexReaderContext readerContext = getReaderContext ();
633663 if (_dictionary != null ) {
634664 for (int i = 0 ; i < length ; i ++) {
635- int numValues = _reader .getDictIdMV (docIds [i ], _reusableMVDictIds , getReaderContext () );
665+ int numValues = _reader .getDictIdMV (docIds [i ], _reusableMVDictIds , readerContext );
636666 int [] values = new int [numValues ];
637667 _dictionary .readIntValues (_reusableMVDictIds , numValues , values );
638668 valuesBuffer [i ] = values ;
639669 }
640670 } else {
641- _reader .readValuesMV (docIds , length , _maxNumValuesPerMVEntry , valuesBuffer , getReaderContext () );
671+ _reader .readValuesMV (docIds , length , _maxNumValuesPerMVEntry , valuesBuffer , readerContext );
642672 }
643673 }
644674
@@ -650,15 +680,16 @@ void readIntValuesMV(TransformEvaluator evaluator, int[] docIds, int length, int
650680
651681 void readLongValuesMV (int [] docIds , int length , long [][] valuesBuffer ) {
652682 Tracing .activeRecording ().setInputDataType (_dataType , _singleValue );
683+ ForwardIndexReaderContext readerContext = getReaderContext ();
653684 if (_dictionary != null ) {
654685 for (int i = 0 ; i < length ; i ++) {
655- int numValues = _reader .getDictIdMV (docIds [i ], _reusableMVDictIds , getReaderContext () );
686+ int numValues = _reader .getDictIdMV (docIds [i ], _reusableMVDictIds , readerContext );
656687 long [] values = new long [numValues ];
657688 _dictionary .readLongValues (_reusableMVDictIds , numValues , values );
658689 valuesBuffer [i ] = values ;
659690 }
660691 } else {
661- _reader .readValuesMV (docIds , length , _maxNumValuesPerMVEntry , valuesBuffer , getReaderContext () );
692+ _reader .readValuesMV (docIds , length , _maxNumValuesPerMVEntry , valuesBuffer , readerContext );
662693 }
663694 }
664695
@@ -670,15 +701,16 @@ void readLongValuesMV(TransformEvaluator evaluator, int[] docIds, int length, lo
670701
671702 void readFloatValuesMV (int [] docIds , int length , float [][] valuesBuffer ) {
672703 Tracing .activeRecording ().setInputDataType (_dataType , _singleValue );
704+ ForwardIndexReaderContext readerContext = getReaderContext ();
673705 if (_dictionary != null ) {
674706 for (int i = 0 ; i < length ; i ++) {
675- int numValues = _reader .getDictIdMV (docIds [i ], _reusableMVDictIds , getReaderContext () );
707+ int numValues = _reader .getDictIdMV (docIds [i ], _reusableMVDictIds , readerContext );
676708 float [] values = new float [numValues ];
677709 _dictionary .readFloatValues (_reusableMVDictIds , numValues , values );
678710 valuesBuffer [i ] = values ;
679711 }
680712 } else {
681- _reader .readValuesMV (docIds , length , _maxNumValuesPerMVEntry , valuesBuffer , getReaderContext () );
713+ _reader .readValuesMV (docIds , length , _maxNumValuesPerMVEntry , valuesBuffer , readerContext );
682714 }
683715 }
684716
@@ -690,15 +722,16 @@ void readFloatValuesMV(TransformEvaluator evaluator, int[] docIds, int length, f
690722
691723 void readDoubleValuesMV (int [] docIds , int length , double [][] valuesBuffer ) {
692724 Tracing .activeRecording ().setInputDataType (_dataType , _singleValue );
725+ ForwardIndexReaderContext readerContext = getReaderContext ();
693726 if (_dictionary != null ) {
694727 for (int i = 0 ; i < length ; i ++) {
695- int numValues = _reader .getDictIdMV (docIds [i ], _reusableMVDictIds , getReaderContext () );
728+ int numValues = _reader .getDictIdMV (docIds [i ], _reusableMVDictIds , readerContext );
696729 double [] values = new double [numValues ];
697730 _dictionary .readDoubleValues (_reusableMVDictIds , numValues , values );
698731 valuesBuffer [i ] = values ;
699732 }
700733 } else {
701- _reader .readValuesMV (docIds , length , _maxNumValuesPerMVEntry , valuesBuffer , getReaderContext () );
734+ _reader .readValuesMV (docIds , length , _maxNumValuesPerMVEntry , valuesBuffer , readerContext );
702735 }
703736 }
704737
@@ -710,15 +743,62 @@ void readDoubleValuesMV(TransformEvaluator evaluator, int[] docIds, int length,
710743
711744 void readStringValuesMV (int [] docIds , int length , String [][] valuesBuffer ) {
712745 Tracing .activeRecording ().setInputDataType (_dataType , _singleValue );
746+ ForwardIndexReaderContext readerContext = getReaderContext ();
713747 if (_dictionary != null ) {
714- for (int i = 0 ; i < length ; i ++) {
715- int numValues = _reader .getDictIdMV (docIds [i ], _reusableMVDictIds , getReaderContext ());
716- String [] values = new String [numValues ];
717- _dictionary .readStringValues (_reusableMVDictIds , numValues , values );
718- valuesBuffer [i ] = values ;
748+ if (_dataType == DataType .BOOLEAN ) {
749+ for (int i = 0 ; i < length ; i ++) {
750+ int numValues = _reader .getDictIdMV (docIds [i ], _reusableMVDictIds , readerContext );
751+ int [] intValues = new int [numValues ];
752+ _dictionary .readIntValues (_reusableMVDictIds , numValues , intValues );
753+ String [] values = new String [numValues ];
754+ for (int j = 0 ; j < numValues ; j ++) {
755+ values [i ] = Boolean .toString (intValues [i ] == 1 );
756+ }
757+ valuesBuffer [i ] = values ;
758+ }
759+ } else if (_dataType == DataType .TIMESTAMP ) {
760+ for (int i = 0 ; i < length ; i ++) {
761+ int numValues = _reader .getDictIdMV (docIds [i ], _reusableMVDictIds , readerContext );
762+ long [] longValues = new long [numValues ];
763+ _dictionary .readLongValues (_reusableMVDictIds , numValues , longValues );
764+ String [] values = new String [numValues ];
765+ for (int j = 0 ; j < numValues ; j ++) {
766+ values [i ] = new Timestamp (longValues [i ]).toString ();
767+ }
768+ valuesBuffer [i ] = values ;
769+ }
770+ } else {
771+ for (int i = 0 ; i < length ; i ++) {
772+ int numValues = _reader .getDictIdMV (docIds [i ], _reusableMVDictIds , readerContext );
773+ String [] values = new String [numValues ];
774+ _dictionary .readStringValues (_reusableMVDictIds , numValues , values );
775+ valuesBuffer [i ] = values ;
776+ }
719777 }
720778 } else {
721- _reader .readValuesMV (docIds , length , _maxNumValuesPerMVEntry , valuesBuffer , getReaderContext ());
779+ if (_dataType == DataType .BOOLEAN ) {
780+ int [] intValueBuffer = new int [_maxNumValuesPerMVEntry ];
781+ for (int i = 0 ; i < length ; i ++) {
782+ int numValues = _reader .getIntMV (docIds [i ], intValueBuffer , readerContext );
783+ String [] values = new String [numValues ];
784+ for (int j = 0 ; j < numValues ; j ++) {
785+ values [i ] = Boolean .toString (intValueBuffer [i ] == 1 );
786+ }
787+ valuesBuffer [i ] = values ;
788+ }
789+ } else if (_dataType == DataType .TIMESTAMP ) {
790+ long [] longValueBuffer = new long [_maxNumValuesPerMVEntry ];
791+ for (int i = 0 ; i < length ; i ++) {
792+ int numValues = _reader .getLongMV (docIds [i ], longValueBuffer , readerContext );
793+ String [] values = new String [numValues ];
794+ for (int j = 0 ; j < numValues ; j ++) {
795+ values [i ] = new Timestamp (longValueBuffer [i ]).toString ();
796+ }
797+ valuesBuffer [i ] = values ;
798+ }
799+ } else {
800+ _reader .readValuesMV (docIds , length , _maxNumValuesPerMVEntry , valuesBuffer , readerContext );
801+ }
722802 }
723803 }
724804
0 commit comments