Skip to content

Commit 29052b6

Browse files
authored
Merge pull request #7377 from azat/INSERT-Distributed-MATERIALIZED-cols
* Fix INSERT into Distributed non local node with MATERIALIZED columns Previous patch e527def ("Fix INSERT into Distributed() table with MATERIALIZED column") fixes it only for cases when the node is local, i.e. direct insert. This patch address the problem when the node is not local (`is_local == false`), by erasing materialized columns on INSERT into Distributed. And this patch fixes two cases, depends on `insert_distributed_sync` setting: - `insert_distributed_sync=0` ``` Not found column value in block. There are only columns: date. Stack trace: 2. 0x7ffff7be92e0 DB::Exception::Exception() dbms/src/Common/Exception.h:27 3. 0x7fffec5d6cf6 DB::Block::getByName(...) dbms/src/Core/Block.cpp:187 4. 0x7fffec2fe067 DB::NativeBlockInputStream::readImpl() dbms/src/DataStreams/NativeBlockInputStream.cpp:159 5. 0x7fffec2d223f DB::IBlockInputStream::read() dbms/src/DataStreams/IBlockInputStream.cpp:61 6. 0x7ffff7c6d40d DB::TCPHandler::receiveData() dbms/programs/server/TCPHandler.cpp:971 7. 0x7ffff7c6cc1d DB::TCPHandler::receivePacket() dbms/programs/server/TCPHandler.cpp:855 8. 0x7ffff7c6a1ef DB::TCPHandler::readDataNext(unsigned long const&, int const&) dbms/programs/server/TCPHandler.cpp:406 9. 0x7ffff7c6a41b DB::TCPHandler::readData(DB::Settings const&) dbms/programs/server/TCPHandler.cpp:437 10. 0x7ffff7c6a5d9 DB::TCPHandler::processInsertQuery(DB::Settings const&) dbms/programs/server/TCPHandler.cpp:464 11. 0x7ffff7c687b5 DB::TCPHandler::runImpl() dbms/programs/server/TCPHandler.cpp:257 ``` - `insert_distributed_sync=1` ``` 2019.10.18 13:23:22.114578 [ 44 ] {a78f669f-0b08-4337-abf8-d31e958f6d12} <Error> executeQuery: Code: 171, e.displayText() = DB::Exception: Block structure mismatch in RemoteBlockOutputStream stream: different number of columns: date Date UInt16(size = 1), value Date UInt16(size = 1) date Date UInt16(size = 0): Insertion status: Wrote 1 blocks and 0 rows on shard 0 replica 0, 127.0.0.1:59000 (average 0 ms per block) Wrote 0 blocks and 0 rows on shard 1 replica 0, 127.0.0.2:59000 (average 2 ms per block) (version 19.16.1.1) (from [::1]:3624) (in query: INSERT INTO distributed_00952 VALUES ), Stack trace: 2. 0x7ffff7be92e0 DB::Exception::Exception() dbms/src/Common/Exception.h:27 3. 0x7fffec5da4e9 DB::checkBlockStructure<void>(...)::{...}::operator()(...) const dbms/src/Core/Block.cpp:460 4. 0x7fffec5da671 void DB::checkBlockStructure<void>(...) dbms/src/Core/Block.cpp:467 5. 0x7fffec5d8d58 DB::assertBlocksHaveEqualStructure(...) dbms/src/Core/Block.cpp:515 6. 0x7fffec326630 DB::RemoteBlockOutputStream::write(DB::Block const&) dbms/src/DataStreams/RemoteBlockOutputStream.cpp:68 7. 0x7fffe98bd154 DB::DistributedBlockOutputStream::runWritingJob(DB::DistributedBlockOutputStream::JobReplica&, DB::Block const&)::{lambda()#1}::operator()() const dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp:280 <snip> ```` Fixes: #7365 Fixes: #5429 Refs: #6891 * Cover INSERT into Distributed with MATERIALIZED columns and !is_local node I guess that adding new cluster into server-test.xml is not required, but it won't harm. * Update DistributedBlockOutputStream.cpp
2 parents f8a401b + 050de71 commit 29052b6

File tree

6 files changed

+86
-6
lines changed

6 files changed

+86
-6
lines changed

dbms/programs/server/config.xml

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,21 @@
180180
<port>9000</port>
181181
</replica>
182182
</shard>
183-
</test_cluster_two_shards_localhost>
183+
</test_cluster_two_shards_localhost>
184+
<test_cluster_two_shards>
185+
<shard>
186+
<replica>
187+
<host>127.0.0.1</host>
188+
<port>9000</port>
189+
</replica>
190+
</shard>
191+
<shard>
192+
<replica>
193+
<host>127.0.0.2</host>
194+
<port>9000</port>
195+
</replica>
196+
</shard>
197+
</test_cluster_two_shards>
184198
<test_shard_localhost_secure>
185199
<shard>
186200
<replica>

dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,13 +81,28 @@ void DistributedBlockOutputStream::writePrefix()
8181

8282
void DistributedBlockOutputStream::write(const Block & block)
8383
{
84+
Block ordinary_block{ block };
85+
86+
/* They are added by the AddingDefaultBlockOutputStream, and we will get
87+
* different number of columns eventually */
88+
for (const auto & col : storage.getColumns().getMaterialized())
89+
{
90+
if (ordinary_block.has(col.name))
91+
{
92+
ordinary_block.erase(col.name);
93+
LOG_DEBUG(log, storage.getTableName()
94+
<< ": column " + col.name + " will be removed, "
95+
<< "because it is MATERIALIZED");
96+
}
97+
}
98+
99+
84100
if (insert_sync)
85-
writeSync(block);
101+
writeSync(ordinary_block);
86102
else
87-
writeAsync(block);
103+
writeAsync(ordinary_block);
88104
}
89105

90-
91106
void DistributedBlockOutputStream::writeAsync(const Block & block)
92107
{
93108
if (storage.getShardingKeyExpr() && (cluster->getShardsInfo().size() > 1))

dbms/src/Storages/StorageDistributed.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
#include <Storages/StorageDistributed.h>
22

33
#include <DataStreams/OneBlockInputStream.h>
4-
#include <DataStreams/materializeBlock.h>
54

65
#include <Databases/IDatabase.h>
76

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,14 @@
1+
insert_distributed_sync=0
12
2018-08-01
23
2018-08-01
34
2018-08-01 2017-08-01
5+
2018-08-01 2017-08-01
6+
2018-08-01
7+
2018-08-01 2017-08-01
8+
insert_distributed_sync=1
9+
2018-08-01
10+
2018-08-01
11+
2018-08-01 2017-08-01
12+
2018-08-01 2017-08-01
13+
2018-08-01
14+
2018-08-01 2017-08-01
Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,42 @@
11
DROP TABLE IF EXISTS local_00952;
22
DROP TABLE IF EXISTS distributed_00952;
33

4+
--
5+
-- insert_distributed_sync=0
6+
--
7+
SELECT 'insert_distributed_sync=0';
8+
SET insert_distributed_sync=0;
9+
410
CREATE TABLE local_00952 (date Date, value Date MATERIALIZED toDate('2017-08-01')) ENGINE = MergeTree(date, date, 8192);
5-
CREATE TABLE distributed_00952 AS local_00952 ENGINE = Distributed('test_shard_localhost', currentDatabase(), local_00952, rand());
11+
CREATE TABLE distributed_00952 AS local_00952 ENGINE = Distributed('test_cluster_two_shards', currentDatabase(), local_00952, rand());
12+
13+
INSERT INTO distributed_00952 VALUES ('2018-08-01');
14+
SYSTEM FLUSH DISTRIBUTED distributed_00952;
15+
16+
SELECT * FROM distributed_00952;
17+
SELECT date, value FROM distributed_00952;
18+
SELECT * FROM local_00952;
19+
SELECT date, value FROM local_00952;
620

21+
DROP TABLE distributed_00952;
22+
DROP TABLE local_00952;
23+
24+
--
25+
-- insert_distributed_sync=1
26+
--
27+
SELECT 'insert_distributed_sync=1';
728
SET insert_distributed_sync=1;
829

30+
CREATE TABLE local_00952 (date Date, value Date MATERIALIZED toDate('2017-08-01')) ENGINE = MergeTree(date, date, 8192);
31+
CREATE TABLE distributed_00952 AS local_00952 ENGINE = Distributed('test_cluster_two_shards', currentDatabase(), local_00952, rand());
32+
933
INSERT INTO distributed_00952 VALUES ('2018-08-01');
34+
1035
SELECT * FROM distributed_00952;
36+
SELECT date, value FROM distributed_00952;
1137
SELECT * FROM local_00952;
1238
SELECT date, value FROM local_00952;
1339

1440
DROP TABLE distributed_00952;
1541
DROP TABLE local_00952;
42+

dbms/tests/server-test.xml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,20 @@
7575
</replica>
7676
</shard>
7777
</test_shard_localhost>
78+
<test_cluster_two_shards>
79+
<shard>
80+
<replica>
81+
<host>127.0.0.1</host>
82+
<port>59000</port>
83+
</replica>
84+
</shard>
85+
<shard>
86+
<replica>
87+
<host>127.0.0.2</host>
88+
<port>59000</port>
89+
</replica>
90+
</shard>
91+
</test_cluster_two_shards>
7892
<test_cluster_two_shards_localhost>
7993
<shard>
8094
<replica>

0 commit comments

Comments
 (0)