Skip to content

Commit 92ac608

Browse files
Merge pull request ClickHouse#11330 from ClickHouse/fix-10241
More parallel MV processing.
2 parents c448070 + da85e1b commit 92ac608

File tree

2 files changed

+67
-1
lines changed

2 files changed

+67
-1
lines changed

src/DataStreams/PushingToViewsBlockOutputStream.cpp

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ void PushingToViewsBlockOutputStream::write(const Block & block)
153153
const Settings & settings = context.getSettingsRef();
154154
if (settings.parallel_view_processing && views.size() > 1)
155155
{
156-
// Push to views concurrently if enabled, and more than one view is attached
156+
// Push to views concurrently if enabled and more than one view is attached
157157
ThreadPool pool(std::min(size_t(settings.max_threads), views.size()));
158158
for (size_t view_num = 0; view_num < views.size(); ++view_num)
159159
{
@@ -208,6 +208,45 @@ void PushingToViewsBlockOutputStream::writeSuffix()
208208

209209
std::exception_ptr first_exception;
210210

211+
const Settings & settings = context.getSettingsRef();
212+
bool parallel_processing = false;
213+
214+
/// Run writeSuffix() for views in separate thread pool.
215+
/// In could have been done in PushingToViewsBlockOutputStream::process, however
216+
/// it is not good if insert into main table fail but into view succeed.
217+
if (settings.parallel_view_processing && views.size() > 1)
218+
{
219+
parallel_processing = true;
220+
221+
// Push to views concurrently if enabled and more than one view is attached
222+
ThreadPool pool(std::min(size_t(settings.max_threads), views.size()));
223+
auto thread_group = CurrentThread::getGroup();
224+
225+
for (auto & view : views)
226+
{
227+
if (view.exception)
228+
continue;
229+
230+
pool.scheduleOrThrowOnError([thread_group, &view]
231+
{
232+
setThreadName("PushingToViews");
233+
if (thread_group)
234+
CurrentThread::attachToIfDetached(thread_group);
235+
236+
try
237+
{
238+
view.out->writeSuffix();
239+
}
240+
catch (...)
241+
{
242+
view.exception = std::current_exception();
243+
}
244+
});
245+
}
246+
// Wait for concurrent view processing
247+
pool.wait();
248+
}
249+
211250
for (auto & view : views)
212251
{
213252
if (view.exception)
@@ -218,6 +257,9 @@ void PushingToViewsBlockOutputStream::writeSuffix()
218257
continue;
219258
}
220259

260+
if (parallel_processing)
261+
continue;
262+
221263
try
222264
{
223265
view.out->writeSuffix();

tests/performance/parallel_mv.xml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<test>
2+
<settings>
3+
<parallel_view_processing>1</parallel_view_processing>
4+
</settings>
5+
6+
<create_query>create table main_table (number UInt64) engine = MergeTree order by tuple();</create_query>
7+
<create_query>create materialized view mv_1 engine = MergeTree order by tuple() as
8+
select number, toString(number) from main_table where number % 13 != 0;</create_query>
9+
<create_query>create materialized view mv_2 engine = MergeTree order by tuple() as
10+
select number, toString(number) from main_table where number % 13 != 1;</create_query>
11+
<create_query>create materialized view mv_3 engine = MergeTree order by tuple() as
12+
select number, toString(number) from main_table where number % 13 != 3;</create_query>
13+
<create_query>create materialized view mv_4 engine = MergeTree order by tuple() as
14+
select number, toString(number) from main_table where number % 13 != 4;</create_query>
15+
16+
<!--<query>insert into main_table select number from numbers(100000)</query>-->
17+
<query>insert into main_table select number from numbers(1000000)</query>
18+
19+
<drop_query>drop table if exists main_table;</drop_query>
20+
<drop_query>drop table if exists mv_1;</drop_query>
21+
<drop_query>drop table if exists mv_2;</drop_query>
22+
<drop_query>drop table if exists mv_3;</drop_query>
23+
<drop_query>drop table if exists mv_4;</drop_query>
24+
</test>

0 commit comments

Comments
 (0)