2020
2121import com .google .common .annotations .VisibleForTesting ;
2222import com .google .common .collect .ImmutableList ;
23+ import java .util .ArrayList ;
2324import java .util .Comparator ;
2425import java .util .LinkedList ;
2526import java .util .List ;
@@ -44,7 +45,8 @@ public class SortOperator extends MultiStageOperator {
4445 private final int _fetch ;
4546 private final int _offset ;
4647 private final DataSchema _dataSchema ;
47- private final PriorityQueue <Object []> _rows ;
48+ private final PriorityQueue <Object []> _priorityQueue ;
49+ private final ArrayList <Object []> _rows ;
4850 private final int _numRowsToKeep ;
4951
5052 private boolean _readyToConstruct ;
@@ -70,8 +72,15 @@ public SortOperator(MultiStageOperator upstreamOperator, List<RexExpression> col
7072 _upstreamErrorBlock = null ;
7173 _isSortedBlockConstructed = false ;
7274 _numRowsToKeep = _fetch > 0 ? _fetch + _offset : defaultHolderCapacity ;
73- _rows = new PriorityQueue <>(_numRowsToKeep ,
74- new SortComparator (collationKeys , collationDirections , dataSchema , false ));
75+ // When there's no collationKeys, the SortOperator is a simple selection with row trim on limit & offset
76+ if (collationKeys .isEmpty ()) {
77+ _priorityQueue = null ;
78+ _rows = new ArrayList <>();
79+ } else {
80+ _priorityQueue = new PriorityQueue <>(_numRowsToKeep ,
81+ new SortComparator (collationKeys , collationDirections , dataSchema , false ));
82+ _rows = null ;
83+ }
7584 }
7685
7786 @ Override
@@ -107,16 +116,25 @@ private TransferableBlock produceSortedBlock() {
107116 }
108117
109118 if (!_isSortedBlockConstructed ) {
110- LinkedList <Object []> rows = new LinkedList <>();
111- while (_rows .size () > _offset ) {
112- Object [] row = _rows .poll ();
113- rows .addFirst (row );
114- }
115119 _isSortedBlockConstructed = true ;
116- if (rows .size () == 0 ) {
117- return TransferableBlockUtils .getEndOfStreamTransferableBlock ();
120+ if (_priorityQueue == null ) {
121+ if (_rows .size () > _offset ) {
122+ List <Object []> row = _rows .subList (_offset , _rows .size ());
123+ return new TransferableBlock (row , _dataSchema , DataBlock .Type .ROW );
124+ } else {
125+ return TransferableBlockUtils .getEndOfStreamTransferableBlock ();
126+ }
118127 } else {
119- return new TransferableBlock (rows , _dataSchema , DataBlock .Type .ROW );
128+ LinkedList <Object []> rows = new LinkedList <>();
129+ while (_priorityQueue .size () > _offset ) {
130+ Object [] row = _priorityQueue .poll ();
131+ rows .addFirst (row );
132+ }
133+ if (rows .size () == 0 ) {
134+ return TransferableBlockUtils .getEndOfStreamTransferableBlock ();
135+ } else {
136+ return new TransferableBlock (rows , _dataSchema , DataBlock .Type .ROW );
137+ }
120138 }
121139 } else {
122140 return TransferableBlockUtils .getEndOfStreamTransferableBlock ();
@@ -137,8 +155,19 @@ private void consumeInputBlocks() {
137155 }
138156
139157 List <Object []> container = block .getContainer ();
140- for (Object [] row : container ) {
141- SelectionOperatorUtils .addToPriorityQueue (row , _rows , _numRowsToKeep );
158+ if (_priorityQueue == null ) {
159+ // TODO: when push-down properly, we shouldn't get more than _numRowsToKeep
160+ if (_rows .size () <= _numRowsToKeep ) {
161+ if (_rows .size () + container .size () <= _numRowsToKeep ) {
162+ _rows .addAll (container );
163+ } else {
164+ _rows .addAll (container .subList (0 , _numRowsToKeep - _rows .size ()));
165+ }
166+ }
167+ } else {
168+ for (Object [] row : container ) {
169+ SelectionOperatorUtils .addToPriorityQueue (row , _priorityQueue , _numRowsToKeep );
170+ }
142171 }
143172 block = _upstreamOperator .nextBlock ();
144173 }
0 commit comments