Skip to content

Commit ef1c7da

Browse files
authored
Merge pull request #10757 from ClickHouse/fix-parallel-mv
Fix parallel MV
2 parents 9ae37a0 + 4613788 commit ef1c7da

File tree

4 files changed

+47
-2
lines changed

4 files changed

+47
-2
lines changed

src/DataStreams/PushingToViewsBlockOutputStream.cpp

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
9090
else
9191
out = std::make_shared<PushingToViewsBlockOutputStream>(dependent_table, *views_context, ASTPtr());
9292

93-
views.emplace_back(ViewInfo{std::move(query), database_table, std::move(out)});
93+
views.emplace_back(ViewInfo{std::move(query), database_table, std::move(out), nullptr});
9494
}
9595

9696
/// Do not push to destination table if the flag is set
@@ -162,7 +162,12 @@ void PushingToViewsBlockOutputStream::write(const Block & block)
162162
{
163163
// Process sequentially
164164
for (size_t view_num = 0; view_num < views.size(); ++view_num)
165+
{
165166
process(block, view_num);
167+
168+
if (views[view_num].exception)
169+
std::rethrow_exception(views[view_num].exception);
170+
}
166171
}
167172
}
168173

@@ -190,8 +195,18 @@ void PushingToViewsBlockOutputStream::writeSuffix()
190195
if (output)
191196
output->writeSuffix();
192197

198+
std::exception_ptr first_exception;
199+
193200
for (auto & view : views)
194201
{
202+
if (view.exception)
203+
{
204+
if (!first_exception)
205+
first_exception = view.exception;
206+
207+
continue;
208+
}
209+
195210
try
196211
{
197212
view.out->writeSuffix();
@@ -202,6 +217,9 @@ void PushingToViewsBlockOutputStream::writeSuffix()
202217
throw;
203218
}
204219
}
220+
221+
if (first_exception)
222+
std::rethrow_exception(first_exception);
205223
}
206224

207225
void PushingToViewsBlockOutputStream::flush()
@@ -270,7 +288,11 @@ void PushingToViewsBlockOutputStream::process(const Block & block, size_t view_n
270288
catch (Exception & ex)
271289
{
272290
ex.addMessage("while pushing to view " + view.table_id.getNameForLogs());
273-
throw;
291+
view.exception = std::current_exception();
292+
}
293+
catch (...)
294+
{
295+
view.exception = std::current_exception();
274296
}
275297
}
276298

src/DataStreams/PushingToViewsBlockOutputStream.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ class PushingToViewsBlockOutputStream : public IBlockOutputStream
4040
ASTPtr query;
4141
StorageID table_id;
4242
BlockOutputStreamPtr out;
43+
std::exception_ptr exception;
4344
};
4445

4546
std::vector<ViewInfo> views;
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
10
2+
10
3+
0
4+
10
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
drop table if exists testX;
2+
drop table if exists testXA;
3+
drop table if exists testXB;
4+
drop table if exists testXC;
5+
6+
create table testX (A Int64) engine=MergeTree order by tuple();
7+
8+
create materialized view testXA engine=MergeTree order by tuple() as select sleep(1) from testX;
9+
create materialized view testXB engine=MergeTree order by tuple() as select sleep(2), throwIf(A=1) from testX;
10+
create materialized view testXC engine=MergeTree order by tuple() as select sleep(1) from testX;
11+
12+
set parallel_view_processing=1;
13+
insert into testX select number from numbers(10); -- {serverError 395}
14+
15+
select count() from testX;
16+
select count() from testXA;
17+
select count() from testXB;
18+
select count() from testXC;

0 commit comments

Comments
 (0)