Skip to content

Commit f0eae2c

Browse files
committed
Even more checks and error messages
1 parent d2d641c commit f0eae2c

28 files changed

+54
-64
lines changed

src/AggregateFunctions/UniqExactSet.h

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,7 @@ class UniqExactSet
6565
auto data_vec_atomic_index = std::make_shared<std::atomic_uint32_t>(0);
6666
auto thread_func = [data_vec, data_vec_atomic_index, &is_cancelled, thread_group = CurrentThread::getGroup()]()
6767
{
68-
ThreadGroupSwitcher switcher(thread_group);
69-
setThreadName("UniqExaConvert");
68+
ThreadGroupSwitcher switcher(thread_group, "UniqExaConvert");
7069

7170
while (true)
7271
{
@@ -129,8 +128,7 @@ class UniqExactSet
129128

130129
auto thread_func = [&lhs, &rhs, next_bucket_to_merge, is_cancelled, thread_group = CurrentThread::getGroup()]()
131130
{
132-
ThreadGroupSwitcher switcher(thread_group);
133-
setThreadName("UniqExactMerger");
131+
ThreadGroupSwitcher switcher(thread_group, "UniqExactMerger");
134132

135133
while (true)
136134
{

src/Common/ThreadPool.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
#include <Common/ThreadPool.h>
2+
3+
#include <Common/CurrentThread.h>
24
#include <Common/ProfileEvents.h>
35
#include <Common/setThreadName.h>
46
#include <Common/Exception.h>
@@ -769,6 +771,11 @@ void ThreadPoolImpl<Thread>::ThreadFromThreadPool::worker()
769771

770772
CurrentMetrics::Increment metric_active_pool_threads(parent_pool.metric_active_threads);
771773

774+
#ifdef DEBUG_OR_SANITIZER_BUILD
775+
DB::ThreadStatus * initial_thread = DB::current_thread;
776+
DB::ThreadGroupPtr initial_thread_group = DB::CurrentThread::getGroup();
777+
#endif
778+
772779
if constexpr (!std::is_same_v<Thread, std::thread>)
773780
{
774781
Stopwatch watch;
@@ -785,6 +792,12 @@ void ThreadPoolImpl<Thread>::ThreadFromThreadPool::worker()
785792
job_data->job();
786793
}
787794

795+
#ifdef DEBUG_OR_SANITIZER_BUILD
796+
DB::ThreadStatus * final_thread = DB::current_thread;
797+
DB::ThreadGroupPtr final_thread_group = DB::CurrentThread::getGroup();
798+
if (final_thread != initial_thread || final_thread_group != initial_thread_group)
799+
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Thread pool job changed current ThreadStatus pointer ({} -> {}) or ThreadGroup ({} -> {}).", initial_thread ? "non-nullptr" : "nullptr", final_thread ? "non-nullptr" : "nullptr", initial_thread_group ? "master_thread_id " + std::to_string(initial_thread_group->master_thread_id) : "nullptr", final_thread_group ? "master_thread_id " + std::to_string(final_thread_group->master_thread_id) : "nullptr");
800+
#endif
788801

789802
if (thread_trace_context.root_span.isTraceEnabled())
790803
{

src/Common/ThreadStatus.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ class ThreadGroup
144144
* Typically used for inheriting thread group when scheduling tasks on a thread pool:
145145
* pool->scheduleOrThrow([thread_group = CurrentThread::getGroup()]()
146146
* {
147-
* ThreadGroupSwitcher switcher(thread_group);
147+
* ThreadGroupSwitcher switcher(thread_group, "MyThread");
148148
* ...
149149
* });
150150
*/
@@ -156,7 +156,8 @@ class ThreadGroupSwitcher : private boost::noncopyable
156156
/// * If false, asserts that the thread is not already attached to a different group.
157157
/// Use this when running a task in a thread pool.
158158
/// * If true, remembers the current group and restores it in destructor.
159-
explicit ThreadGroupSwitcher(ThreadGroupPtr thread_group_, bool allow_existing_group = false) noexcept;
159+
/// If thread_name is not empty, calls setThreadName along the way; should be at most 15 bytes long.
160+
explicit ThreadGroupSwitcher(ThreadGroupPtr thread_group_, std::string_view thread_name, bool allow_existing_group = false) noexcept;
160161
~ThreadGroupSwitcher() noexcept;
161162

162163
private:

src/Common/threadPoolCallbackRunner.h

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,7 @@ ThreadPoolCallbackRunnerUnsafe<Result, Callback> threadPoolCallbackRunnerUnsafe(
3232
{
3333
auto task = std::make_shared<std::packaged_task<Result()>>([thread_group, thread_name, my_callback = std::move(callback)]() mutable -> Result
3434
{
35-
ThreadGroupSwitcher switcher(thread_group);
36-
setThreadName(thread_name.data());
35+
ThreadGroupSwitcher switcher(thread_group, thread_name);
3736

3837
SCOPE_EXIT_SAFE(
3938
{
@@ -157,8 +156,7 @@ class ThreadPoolCallbackRunnerLocal final
157156

158157
auto task_func = [task, thread_group = CurrentThread::getGroup(), my_thread_name = thread_name, my_callback = std::move(callback), promise]() mutable -> void
159158
{
160-
ThreadGroupSwitcher switcher(thread_group);
161-
setThreadName(my_thread_name.data());
159+
ThreadGroupSwitcher switcher(thread_group, my_thread_name);
162160

163161
TaskState expected = SCHEDULED;
164162
if (!task->state.compare_exchange_strong(expected, RUNNING))

src/Compression/ParallelCompressedWriteBuffer.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,7 @@ void ParallelCompressedWriteBuffer::nextImpl()
4848
current_buffer->uncompressed_size = offset();
4949
pool.scheduleOrThrowOnError([this, my_current_buffer = current_buffer, thread_group = CurrentThread::getGroup()]
5050
{
51-
ThreadGroupSwitcher switcher(thread_group);
52-
setThreadName("ParallelCompres");
51+
ThreadGroupSwitcher switcher(thread_group, "ParallelCompres");
5352

5453
compress(my_current_buffer);
5554
});

src/Dictionaries/HashedDictionary.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -336,8 +336,7 @@ HashedDictionary<dictionary_key_type, sparse, sharded>::~HashedDictionary()
336336

337337
if (!pool.trySchedule([&container, thread_group = CurrentThread::getGroup()]
338338
{
339-
ThreadGroupSwitcher switcher(thread_group);
340-
setThreadName("HashedDictDtor");
339+
ThreadGroupSwitcher switcher(thread_group, "HashedDictDtor");
341340

342341
/// Do not account memory that was occupied by the dictionaries for the query/user context.
343342
MemoryTrackerBlockerInThread memory_blocker;

src/Dictionaries/HashedDictionaryParallelLoader.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,7 @@ class HashedDictionaryParallelLoader : public boost::noncopyable
6868
{
6969
pool.scheduleOrThrowOnError([this, shard, thread_group = CurrentThread::getGroup()]
7070
{
71-
ThreadGroupSwitcher switcher(thread_group);
72-
setThreadName("HashedDictLoad");
71+
ThreadGroupSwitcher switcher(thread_group, "HashedDictLoad");
7372

7473
WorkerStatistic statistic;
7574
SCOPE_EXIT_SAFE(

src/Interpreters/Aggregator.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2356,7 +2356,7 @@ BlocksList Aggregator::prepareBlocksAndFillTwoLevelImpl(
23562356
if (thread_pool)
23572357
thread_pool->scheduleOrThrowOnError([thread_id, &tasks, thread_group = CurrentThread::getGroup()]
23582358
{
2359-
ThreadGroupSwitcher switcher(thread_group);
2359+
ThreadGroupSwitcher switcher(thread_group, "");
23602360
tasks[thread_id]();
23612361
});
23622362
else
@@ -3123,7 +3123,7 @@ void Aggregator::mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVari
31233123
if (thread_pool)
31243124
thread_pool->scheduleOrThrowOnError([bucket, &merge_bucket, aggregates_pool, thread_group = CurrentThread::getGroup()]
31253125
{
3126-
ThreadGroupSwitcher switcher(thread_group);
3126+
ThreadGroupSwitcher switcher(thread_group, "");
31273127
merge_bucket(bucket, aggregates_pool);
31283128
});
31293129
else

src/Interpreters/ConcurrentHashJoin.cpp

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,7 @@ void reserveSpaceInHashMaps(
134134
pool->scheduleOrThrow(
135135
[&, i, thread_group = CurrentThread::getGroup()]()
136136
{
137-
ThreadGroupSwitcher switcher(thread_group);
138-
setThreadName("ConcurrentJoin");
137+
ThreadGroupSwitcher switcher(thread_group, "ConcurrentJoin");
139138

140139
const auto & right_data = getData(hash_joins[i]);
141140
std::visit([&](auto & maps) { return reserve_space_in_buckets(maps, right_data->type, i); }, right_data->maps.at(0));
@@ -189,8 +188,7 @@ ConcurrentHashJoin::ConcurrentHashJoin(
189188
pool->scheduleOrThrow(
190189
[&, i, thread_group = CurrentThread::getGroup()]()
191190
{
192-
ThreadGroupSwitcher switcher(thread_group);
193-
setThreadName("ConcurrentJoin");
191+
ThreadGroupSwitcher switcher(thread_group, "ConcurrentJoin");
194192

195193
/// reserve is not needed anyway - either we will use fixed-size hash map or shared two-level map (then reserve will be done in a special way below)
196194
const size_t reserve_size = 0;
@@ -239,8 +237,7 @@ ConcurrentHashJoin::~ConcurrentHashJoin()
239237
pool->scheduleOrThrow(
240238
[join = hash_joins[0], i, this, thread_group = CurrentThread::getGroup()]()
241239
{
242-
ThreadGroupSwitcher switcher(thread_group);
243-
setThreadName("ConcurrentJoin");
240+
ThreadGroupSwitcher switcher(thread_group, "ConcurrentJoin");
244241

245242
auto clear_space_in_buckets = [&](auto & maps, HashJoin::Type type, size_t idx)
246243
{

src/Interpreters/ExternalLoader.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -969,7 +969,7 @@ class ExternalLoader::LoadingDispatcher : private boost::noncopyable
969969
/// Does the loading, possibly in the separate thread.
970970
void doLoading(const String & name, size_t loading_id, bool forced_to_reload, size_t min_id_to_finish_loading_dependencies_, bool async, ThreadGroupPtr thread_group = {})
971971
{
972-
ThreadGroupSwitcher switcher(thread_group);
972+
ThreadGroupSwitcher switcher(thread_group, "");
973973

974974
/// Do not account memory that was occupied by the dictionaries for the query/user context.
975975
MemoryTrackerBlockerInThread memory_blocker;

0 commit comments

Comments
 (0)