Skip to content

Commit 376472c

Browse files
committed
add test when chunk with 0 columns has to be produced in squashing
1 parent 5e4a244 commit 376472c

File tree

7 files changed

+71
-23
lines changed

7 files changed

+71
-23
lines changed

src/Core/Settings.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -952,7 +952,7 @@ class IColumn;
952952

953953
#define OBSOLETE_SETTINGS(M, ALIAS) \
954954
/** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \
955-
MAKE_OBSOLETE(M, Bool, update_insert_deduplication_token_in_dependent_materialized_views, 1) \
955+
MAKE_OBSOLETE(M, Bool, update_insert_deduplication_token_in_dependent_materialized_views, 0) \
956956
MAKE_OBSOLETE(M, UInt64, max_memory_usage_for_all_queries, 0) \
957957
MAKE_OBSOLETE(M, UInt64, multiple_joins_rewriter_version, 0) \
958958
MAKE_OBSOLETE(M, Bool, enable_debug_queries, false) \

src/Interpreters/Squashing.cpp

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#include <vector>
22
#include <Interpreters/Squashing.h>
3+
#include "Common/Logger.h"
4+
#include "Common/logger_useful.h"
35
#include <Common/CurrentThread.h>
46
#include <base/defines.h>
57

@@ -16,14 +18,15 @@ Squashing::Squashing(Block header_, size_t min_block_size_rows_, size_t min_bloc
1618
, min_block_size_bytes(min_block_size_bytes_)
1719
, header(header_)
1820
{
21+
LOG_TEST(getLogger("Squashing"), "header columns {}", header.columns());
1922
}
2023

2124
Chunk Squashing::flush()
2225
{
2326
if (!accumulated)
2427
return {};
2528

26-
auto result = convertToChunk(accumulated.extract());
29+
auto result = convertToChunk(extract());
2730
chassert(result);
2831
return result;
2932
}
@@ -43,6 +46,8 @@ Chunk Squashing::squash(Chunk && input_chunk)
4346

4447
Chunk Squashing::add(Chunk && input_chunk)
4548
{
49+
LOG_TEST(getLogger("Squashing"), "add columns {} rows {}", input_chunk.getNumColumns(), input_chunk.getNumRows());
50+
4651
if (!input_chunk)
4752
return {};
4853

@@ -53,11 +58,11 @@ Chunk Squashing::add(Chunk && input_chunk)
5358
if (!accumulated)
5459
{
5560
accumulated.add(std::move(input_chunk));
56-
return convertToChunk(accumulated.extract());
61+
return convertToChunk(extract());
5762
}
5863

5964
/// Return accumulated data (maybe it has small size) and place new block to accumulated data.
60-
Chunk res_chunk = convertToChunk(accumulated.extract());
65+
Chunk res_chunk = convertToChunk(extract());
6166
accumulated.add(std::move(input_chunk));
6267
return res_chunk;
6368
}
@@ -66,7 +71,7 @@ Chunk Squashing::add(Chunk && input_chunk)
6671
if (isEnoughSize())
6772
{
6873
/// Return accumulated data and place new block to accumulated data.
69-
Chunk res_chunk = convertToChunk(accumulated.extract());
74+
Chunk res_chunk = convertToChunk(extract());
7075
accumulated.add(std::move(input_chunk));
7176
return res_chunk;
7277
}
@@ -76,21 +81,25 @@ Chunk Squashing::add(Chunk && input_chunk)
7681

7782
/// If accumulated data is big enough, we send it
7883
if (isEnoughSize())
79-
return convertToChunk(accumulated.extract());
84+
return convertToChunk(extract());
8085

8186
return {};
8287
}
8388

84-
Chunk Squashing::convertToChunk(std::vector<Chunk> && chunks) const
89+
Chunk Squashing::convertToChunk(CurrentData && data) const
8590
{
86-
if (chunks.empty())
91+
LOG_TEST(getLogger("Squashing"), "convertToChunk {}", data.chunks.size());
92+
93+
if (data.chunks.empty())
8794
return {};
8895

8996
auto info = std::make_shared<ChunksToSquash>();
90-
info->chunks = std::move(chunks);
97+
info->chunks = std::move(data.chunks);
9198

9299
// It is imortant that chunk is not empty, it has to have columns even if they are empty
93-
auto aggr_chunk = Chunk(header.getColumns(), 0);
100+
// Sometimes there are could be no columns in header but not empty rows in chunks
101+
// That happens when we intend to add defaults for the missing columns after
102+
auto aggr_chunk = Chunk(header.getColumns(), header.columns() ? 0 : data.getRows());
94103
aggr_chunk.getChunkInfos().add(std::move(info));
95104
chassert(aggr_chunk);
96105
return aggr_chunk;
@@ -149,17 +158,18 @@ bool Squashing::isEnoughSize(const Chunk & chunk) const
149158
return isEnoughSize(chunk.getNumRows(), chunk.bytes());
150159
}
151160

152-
void Squashing::CurrentSize::add(Chunk && chunk)
161+
void Squashing::CurrentData::add(Chunk && chunk)
153162
{
154163
rows += chunk.getNumRows();
155164
bytes += chunk.bytes();
156165
chunks.push_back(std::move(chunk));
157166
}
158167

159-
std::vector<Chunk> Squashing::CurrentSize::extract()
168+
Squashing::CurrentData Squashing::extract()
160169
{
161-
auto result = std::move(chunks);
162-
*this = {};
170+
auto result = std::move(accumulated);
171+
accumulated = {};
163172
return result;
164173
}
174+
165175
}

src/Interpreters/Squashing.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,33 +49,33 @@ class Squashing
4949
const Block & getHeader() const { return header; }
5050

5151
private:
52-
class CurrentSize
52+
struct CurrentData
5353
{
5454
std::vector<Chunk> chunks = {};
5555
size_t rows = 0;
5656
size_t bytes = 0;
5757

58-
public:
5958
explicit operator bool () const { return !chunks.empty(); }
6059
size_t getRows() const { return rows; }
6160
size_t getBytes() const { return bytes; }
6261
void add(Chunk && chunk);
63-
std::vector<Chunk> extract();
6462
};
6563

6664
const size_t min_block_size_rows;
6765
const size_t min_block_size_bytes;
6866
Block header;
6967

70-
CurrentSize accumulated;
68+
CurrentData accumulated;
7169

7270
static Chunk squash(std::vector<Chunk> && input_chunks, Chunk::ChunkInfoCollection && infos);
7371

7472
bool isEnoughSize() const;
7573
bool isEnoughSize(size_t rows, size_t bytes) const;
7674
bool isEnoughSize(const Chunk & chunk) const;
7775

78-
Chunk convertToChunk(std::vector<Chunk> && chunks) const;
76+
CurrentData extract();
77+
78+
Chunk convertToChunk(CurrentData && data) const;
7979
};
8080

8181
}

src/Processors/Transforms/DeduplicationTokenTransforms.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,9 +148,11 @@ void CheckTokenTransform::transform(Chunk & chunk)
148148
auto token_info = chunk.getChunkInfos().get<TokenInfo>();
149149

150150
if (!token_info)
151+
{
151152
throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk has to have DedupTokenInfo as ChunkInfo, {}", debug);
153+
}
152154

153-
LOG_DEBUG(log, "debug: {}, token: {}", debug, token_info->debugToken());
155+
LOG_TEST(log, "debug: {}, token: {}, columns {} rows {}", debug, token_info->debugToken(), chunk.getNumColumns(), chunk.getNumRows());
154156
}
155157
#endif
156158

tests/queries/0_stateless/01275_parallel_mv.sql.j2

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ drop table if exists testXC;
1212

1313
create table testX (A Int64) engine=MergeTree order by tuple();
1414

15-
create materialized view testXA engine=MergeTree order by tuple() as select sleep(0.1) from testX;
16-
create materialized view testXB engine=MergeTree order by tuple() as select sleep(0.2), throwIf(A=1) from testX;
17-
create materialized view testXC engine=MergeTree order by tuple() as select sleep(0.1) from testX;
15+
create materialized view testXA engine=MergeTree order by tuple() as select sleepEachRow(0.2) from testX;
16+
create materialized view testXB engine=MergeTree order by tuple() as select sleepEachRow(0.4), throwIf(A=1) from testX;
17+
create materialized view testXC engine=MergeTree order by tuple() as select sleepEachRow(0.2) from testX;
1818

1919
-- { echoOn }
2020
{% for parallel_view_processing in [0, 1] %}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
-- { echo ON }
2+
CREATE TABLE src (x UInt8) ENGINE = Memory;
3+
CREATE TABLE dst (x UInt8) ENGINE = Memory;
4+
CREATE MATERIALIZED VIEW mv1 TO dst AS SELECT * FROM src;
5+
INSERT INTO src VALUES (0);
6+
SELECT * from dst;
7+
0
8+
TRUNCATE TABLE dst;
9+
--DROP TABLE src SYNC;
10+
--CREATE TABLE src (y String) ENGINE = MergeTree order by tuple();
11+
ALTER TABLE src ADD COLUMN y UInt8;
12+
ALTER TABLE src DROP COLUMN x;
13+
INSERT INTO src VALUES (0);
14+
SELECT * from dst;
15+
0
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
DROP TABLE IF EXISTS mv;
2+
DROP TABLE IF EXISTS src;
3+
DROP TABLE IF EXISTS dst;
4+
5+
-- { echo ON }
6+
CREATE TABLE src (x UInt8) ENGINE = Memory;
7+
CREATE TABLE dst (x UInt8) ENGINE = Memory;
8+
CREATE MATERIALIZED VIEW mv1 TO dst AS SELECT * FROM src;
9+
10+
INSERT INTO src VALUES (0);
11+
SELECT * from dst;
12+
13+
TRUNCATE TABLE dst;
14+
15+
--DROP TABLE src SYNC;
16+
--CREATE TABLE src (y String) ENGINE = MergeTree order by tuple();
17+
ALTER TABLE src ADD COLUMN y UInt8;
18+
ALTER TABLE src DROP COLUMN x;
19+
20+
INSERT INTO src VALUES (0);
21+
SELECT * from dst;

0 commit comments

Comments
 (0)