Skip to content

Commit 9f9a3d7

Browse files
committed
Add ctpl header only library
>>> backports dash@47a162255260019bd4733d4336c235b01019df9f A simple C++ thread pool library https://github.com/vit-vit/CTPL Commit: 437e135dbd94eb65b45533d9ce8ee28b5bd37b6d
1 parent 129446a commit 9f9a3d7

File tree

2 files changed

+240
-0
lines changed

2 files changed

+240
-0
lines changed

src/Makefile.am

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ BITCOIN_CORE_H = \
189189
core_io.h \
190190
cuckoocache.h \
191191
crypter.h \
192+
ctpl.h \
192193
cyclingvector.h \
193194
evo/deterministicmns.h \
194195
evo/evodb.h \

src/ctpl.h

Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
2+
/*********************************************************
3+
*
4+
* Copyright (C) 2014 by Vitaliy Vitsentiy
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*
18+
*********************************************************/
19+
20+
21+
#ifndef __ctpl_thread_pool_H__
22+
#define __ctpl_thread_pool_H__
23+
24+
#include <functional>
25+
#include <thread>
26+
#include <atomic>
27+
#include <vector>
28+
#include <memory>
29+
#include <exception>
30+
#include <future>
31+
#include <mutex>
32+
#include <boost/lockfree/queue.hpp>
33+
34+
35+
#ifndef _ctplThreadPoolLength_
36+
#define _ctplThreadPoolLength_ 100
37+
#endif
38+
39+
40+
// thread pool to run user's functors with signature
41+
// ret func(int id, other_params)
42+
// where id is the index of the thread that runs the functor
43+
// ret is some return type
44+
45+
46+
namespace ctpl {
47+
48+
class thread_pool {
49+
50+
public:
51+
52+
thread_pool() : q(_ctplThreadPoolLength_) { this->init(); }
53+
thread_pool(int nThreads, int queueSize = _ctplThreadPoolLength_) : q(queueSize) { this->init(); this->resize(nThreads); }
54+
55+
// the destructor waits for all the functions in the queue to be finished
56+
~thread_pool() {
57+
this->stop(true);
58+
}
59+
60+
// get the number of running threads in the pool
61+
int size() { return static_cast<int>(this->threads.size()); }
62+
63+
// number of idle threads
64+
int n_idle() { return this->nWaiting; }
65+
std::thread & get_thread(int i) { return *this->threads[i]; }
66+
67+
// change the number of threads in the pool
68+
// should be called from one thread, otherwise be careful to not interleave, also with this->stop()
69+
// nThreads must be >= 0
70+
void resize(int nThreads) {
71+
if (!this->isStop && !this->isDone) {
72+
int oldNThreads = static_cast<int>(this->threads.size());
73+
if (oldNThreads <= nThreads) { // if the number of threads is increased
74+
this->threads.resize(nThreads);
75+
this->flags.resize(nThreads);
76+
77+
for (int i = oldNThreads; i < nThreads; ++i) {
78+
this->flags[i] = std::make_shared<std::atomic<bool>>(false);
79+
this->set_thread(i);
80+
}
81+
}
82+
else { // the number of threads is decreased
83+
for (int i = oldNThreads - 1; i >= nThreads; --i) {
84+
*this->flags[i] = true; // this thread will finish
85+
this->threads[i]->detach();
86+
}
87+
{
88+
// stop the detached threads that were waiting
89+
std::unique_lock<std::mutex> lock(this->mutex);
90+
this->cv.notify_all();
91+
}
92+
this->threads.resize(nThreads); // safe to delete because the threads are detached
93+
this->flags.resize(nThreads); // safe to delete because the threads have copies of shared_ptr of the flags, not originals
94+
}
95+
}
96+
}
97+
98+
// empty the queue
99+
void clear_queue() {
100+
std::function<void(int id)> * _f;
101+
while (this->q.pop(_f))
102+
delete _f; // empty the queue
103+
}
104+
105+
// pops a functional wraper to the original function
106+
std::function<void(int)> pop() {
107+
std::function<void(int id)> * _f = nullptr;
108+
this->q.pop(_f);
109+
std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
110+
111+
std::function<void(int)> f;
112+
if (_f)
113+
f = *_f;
114+
return f;
115+
}
116+
117+
118+
// wait for all computing threads to finish and stop all threads
119+
// may be called asyncronously to not pause the calling thread while waiting
120+
// if isWait == true, all the functions in the queue are run, otherwise the queue is cleared without running the functions
121+
void stop(bool isWait = false) {
122+
if (!isWait) {
123+
if (this->isStop)
124+
return;
125+
this->isStop = true;
126+
for (int i = 0, n = this->size(); i < n; ++i) {
127+
*this->flags[i] = true; // command the threads to stop
128+
}
129+
this->clear_queue(); // empty the queue
130+
}
131+
else {
132+
if (this->isDone || this->isStop)
133+
return;
134+
this->isDone = true; // give the waiting threads a command to finish
135+
}
136+
{
137+
std::unique_lock<std::mutex> lock(this->mutex);
138+
this->cv.notify_all(); // stop all waiting threads
139+
}
140+
for (int i = 0; i < static_cast<int>(this->threads.size()); ++i) { // wait for the computing threads to finish
141+
if (this->threads[i]->joinable())
142+
this->threads[i]->join();
143+
}
144+
// if there were no threads in the pool but some functors in the queue, the functors are not deleted by the threads
145+
// therefore delete them here
146+
this->clear_queue();
147+
this->threads.clear();
148+
this->flags.clear();
149+
}
150+
151+
template<typename F, typename... Rest>
152+
auto push(F && f, Rest&&... rest) ->std::future<decltype(f(0, rest...))> {
153+
auto pck = std::make_shared<std::packaged_task<decltype(f(0, rest...))(int)>>(
154+
std::bind(std::forward<F>(f), std::placeholders::_1, std::forward<Rest>(rest)...)
155+
);
156+
157+
auto _f = new std::function<void(int id)>([pck](int id) {
158+
(*pck)(id);
159+
});
160+
this->q.push(_f);
161+
162+
std::unique_lock<std::mutex> lock(this->mutex);
163+
this->cv.notify_one();
164+
165+
return pck->get_future();
166+
}
167+
168+
// run the user's function that excepts argument int - id of the running thread. returned value is templatized
169+
// operator returns std::future, where the user can get the result and rethrow the catched exceptins
170+
template<typename F>
171+
auto push(F && f) ->std::future<decltype(f(0))> {
172+
auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>(std::forward<F>(f));
173+
174+
auto _f = new std::function<void(int id)>([pck](int id) {
175+
(*pck)(id);
176+
});
177+
this->q.push(_f);
178+
179+
std::unique_lock<std::mutex> lock(this->mutex);
180+
this->cv.notify_one();
181+
182+
return pck->get_future();
183+
}
184+
185+
186+
private:
187+
188+
// deleted
189+
thread_pool(const thread_pool &);// = delete;
190+
thread_pool(thread_pool &&);// = delete;
191+
thread_pool & operator=(const thread_pool &);// = delete;
192+
thread_pool & operator=(thread_pool &&);// = delete;
193+
194+
void set_thread(int i) {
195+
std::shared_ptr<std::atomic<bool>> flag(this->flags[i]); // a copy of the shared ptr to the flag
196+
auto f = [this, i, flag/* a copy of the shared ptr to the flag */]() {
197+
std::atomic<bool> & _flag = *flag;
198+
std::function<void(int id)> * _f;
199+
bool isPop = this->q.pop(_f);
200+
while (true) {
201+
while (isPop) { // if there is anything in the queue
202+
std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
203+
(*_f)(i);
204+
205+
if (_flag)
206+
return; // the thread is wanted to stop, return even if the queue is not empty yet
207+
else
208+
isPop = this->q.pop(_f);
209+
}
210+
211+
// the queue is empty here, wait for the next command
212+
std::unique_lock<std::mutex> lock(this->mutex);
213+
++this->nWaiting;
214+
this->cv.wait(lock, [this, &_f, &isPop, &_flag](){ isPop = this->q.pop(_f); return isPop || this->isDone || _flag; });
215+
--this->nWaiting;
216+
217+
if (!isPop)
218+
return; // if the queue is empty and this->isDone == true or *flag then return
219+
}
220+
};
221+
this->threads[i].reset(new std::thread(f)); // compiler may not support std::make_unique()
222+
}
223+
224+
void init() { this->nWaiting = 0; this->isStop = false; this->isDone = false; }
225+
226+
std::vector<std::unique_ptr<std::thread>> threads;
227+
std::vector<std::shared_ptr<std::atomic<bool>>> flags;
228+
mutable boost::lockfree::queue<std::function<void(int id)> *> q;
229+
std::atomic<bool> isDone;
230+
std::atomic<bool> isStop;
231+
std::atomic<int> nWaiting; // how many threads are waiting
232+
233+
std::mutex mutex;
234+
std::condition_variable cv;
235+
};
236+
237+
}
238+
239+
#endif // __ctpl_thread_pool_H__

0 commit comments

Comments
 (0)