11#include < vector>
22#include < Interpreters/Squashing.h>
3+ #include " Common/Logger.h"
4+ #include " Common/logger_useful.h"
35#include < Common/CurrentThread.h>
46#include < base/defines.h>
57
@@ -16,14 +18,15 @@ Squashing::Squashing(Block header_, size_t min_block_size_rows_, size_t min_bloc
1618 , min_block_size_bytes(min_block_size_bytes_)
1719 , header(header_)
1820{
21+ LOG_TEST (getLogger (" Squashing" ), " header columns {}" , header.columns ());
1922}
2023
2124Chunk Squashing::flush ()
2225{
2326 if (!accumulated)
2427 return {};
2528
26- auto result = convertToChunk (accumulated. extract ());
29+ auto result = convertToChunk (extract ());
2730 chassert (result);
2831 return result;
2932}
@@ -43,6 +46,8 @@ Chunk Squashing::squash(Chunk && input_chunk)
4346
4447Chunk Squashing::add (Chunk && input_chunk)
4548{
49+ LOG_TEST (getLogger (" Squashing" ), " add columns {} rows {}" , input_chunk.getNumColumns (), input_chunk.getNumRows ());
50+
4651 if (!input_chunk)
4752 return {};
4853
@@ -53,11 +58,11 @@ Chunk Squashing::add(Chunk && input_chunk)
5358 if (!accumulated)
5459 {
5560 accumulated.add (std::move (input_chunk));
56- return convertToChunk (accumulated. extract ());
61+ return convertToChunk (extract ());
5762 }
5863
5964 // / Return accumulated data (maybe it has small size) and place new block to accumulated data.
60- Chunk res_chunk = convertToChunk (accumulated. extract ());
65+ Chunk res_chunk = convertToChunk (extract ());
6166 accumulated.add (std::move (input_chunk));
6267 return res_chunk;
6368 }
@@ -66,7 +71,7 @@ Chunk Squashing::add(Chunk && input_chunk)
6671 if (isEnoughSize ())
6772 {
6873 // / Return accumulated data and place new block to accumulated data.
69- Chunk res_chunk = convertToChunk (accumulated. extract ());
74+ Chunk res_chunk = convertToChunk (extract ());
7075 accumulated.add (std::move (input_chunk));
7176 return res_chunk;
7277 }
@@ -76,21 +81,25 @@ Chunk Squashing::add(Chunk && input_chunk)
7681
7782 // / If accumulated data is big enough, we send it
7883 if (isEnoughSize ())
79- return convertToChunk (accumulated. extract ());
84+ return convertToChunk (extract ());
8085
8186 return {};
8287}
8388
84- Chunk Squashing::convertToChunk (std::vector<Chunk> && chunks ) const
89+ Chunk Squashing::convertToChunk (CurrentData && data ) const
8590{
86- if (chunks.empty ())
91+ LOG_TEST (getLogger (" Squashing" ), " convertToChunk {}" , data.chunks .size ());
92+
93+ if (data.chunks .empty ())
8794 return {};
8895
8996 auto info = std::make_shared<ChunksToSquash>();
90- info->chunks = std::move (chunks);
97+ info->chunks = std::move (data. chunks );
9198
9299 // It is imortant that chunk is not empty, it has to have columns even if they are empty
93- auto aggr_chunk = Chunk (header.getColumns (), 0 );
100+ // Sometimes there are could be no columns in header but not empty rows in chunks
101+ // That happens when we intend to add defaults for the missing columns after
102+ auto aggr_chunk = Chunk (header.getColumns (), header.columns () ? 0 : data.getRows ());
94103 aggr_chunk.getChunkInfos ().add (std::move (info));
95104 chassert (aggr_chunk);
96105 return aggr_chunk;
@@ -149,17 +158,18 @@ bool Squashing::isEnoughSize(const Chunk & chunk) const
149158 return isEnoughSize (chunk.getNumRows (), chunk.bytes ());
150159}
151160
152- void Squashing::CurrentSize ::add (Chunk && chunk)
161+ void Squashing::CurrentData ::add (Chunk && chunk)
153162{
154163 rows += chunk.getNumRows ();
155164 bytes += chunk.bytes ();
156165 chunks.push_back (std::move (chunk));
157166}
158167
159- std::vector<Chunk> Squashing::CurrentSize ::extract ()
168+ Squashing::CurrentData Squashing::extract ()
160169{
161- auto result = std::move (chunks );
162- * this = {};
170+ auto result = std::move (accumulated );
171+ accumulated = {};
163172 return result;
164173}
174+
165175}
0 commit comments