@@ -153,7 +153,7 @@ void PushingToViewsBlockOutputStream::write(const Block & block)
153153 const Settings & settings = context.getSettingsRef ();
154154 if (settings.parallel_view_processing && views.size () > 1 )
155155 {
156- // Push to views concurrently if enabled, and more than one view is attached
156+ // Push to views concurrently if enabled and more than one view is attached
157157 ThreadPool pool (std::min (size_t (settings.max_threads ), views.size ()));
158158 for (size_t view_num = 0 ; view_num < views.size (); ++view_num)
159159 {
@@ -208,6 +208,45 @@ void PushingToViewsBlockOutputStream::writeSuffix()
208208
209209 std::exception_ptr first_exception;
210210
211+ const Settings & settings = context.getSettingsRef ();
212+ bool parallel_processing = false ;
213+
214+ // / Run writeSuffix() for views in separate thread pool.
215+ // / In could have been done in PushingToViewsBlockOutputStream::process, however
216+ // / it is not good if insert into main table fail but into view succeed.
217+ if (settings.parallel_view_processing && views.size () > 1 )
218+ {
219+ parallel_processing = true ;
220+
221+ // Push to views concurrently if enabled and more than one view is attached
222+ ThreadPool pool (std::min (size_t (settings.max_threads ), views.size ()));
223+ auto thread_group = CurrentThread::getGroup ();
224+
225+ for (auto & view : views)
226+ {
227+ if (view.exception )
228+ continue ;
229+
230+ pool.scheduleOrThrowOnError ([thread_group, &view]
231+ {
232+ setThreadName (" PushingToViews" );
233+ if (thread_group)
234+ CurrentThread::attachToIfDetached (thread_group);
235+
236+ try
237+ {
238+ view.out ->writeSuffix ();
239+ }
240+ catch (...)
241+ {
242+ view.exception = std::current_exception ();
243+ }
244+ });
245+ }
246+ // Wait for concurrent view processing
247+ pool.wait ();
248+ }
249+
211250 for (auto & view : views)
212251 {
213252 if (view.exception )
@@ -218,6 +257,9 @@ void PushingToViewsBlockOutputStream::writeSuffix()
218257 continue ;
219258 }
220259
260+ if (parallel_processing)
261+ continue ;
262+
221263 try
222264 {
223265 view.out ->writeSuffix ();
0 commit comments