1+
2+ /* ********************************************************
3+ *
4+ * Copyright (C) 2014 by Vitaliy Vitsentiy
5+ *
6+ * Licensed under the Apache License, Version 2.0 (the "License");
7+ * you may not use this file except in compliance with the License.
8+ * You may obtain a copy of the License at
9+ *
10+ * http://www.apache.org/licenses/LICENSE-2.0
11+ *
12+ * Unless required by applicable law or agreed to in writing, software
13+ * distributed under the License is distributed on an "AS IS" BASIS,
14+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+ * See the License for the specific language governing permissions and
16+ * limitations under the License.
17+ *
18+ *********************************************************/
19+
20+
21+ #ifndef __ctpl_thread_pool_H__
22+ #define __ctpl_thread_pool_H__
23+
24+ #include < functional>
25+ #include < thread>
26+ #include < atomic>
27+ #include < vector>
28+ #include < memory>
29+ #include < exception>
30+ #include < future>
31+ #include < mutex>
32+ #include < boost/lockfree/queue.hpp>
33+
34+
35+ #ifndef _ctplThreadPoolLength_
36+ #define _ctplThreadPoolLength_ 100
37+ #endif
38+
39+
40+ // thread pool to run user's functors with signature
41+ // ret func(int id, other_params)
42+ // where id is the index of the thread that runs the functor
43+ // ret is some return type
44+
45+
46+ namespace ctpl {
47+
48+ class thread_pool {
49+
50+ public:
51+
52+ thread_pool () : q(_ctplThreadPoolLength_) { this ->init (); }
53+ thread_pool (int nThreads, int queueSize = _ctplThreadPoolLength_) : q(queueSize) { this ->init (); this ->resize (nThreads); }
54+
55+ // the destructor waits for all the functions in the queue to be finished
56+ ~thread_pool () {
57+ this ->stop (true );
58+ }
59+
60+ // get the number of running threads in the pool
61+ int size () { return static_cast <int >(this ->threads .size ()); }
62+
63+ // number of idle threads
64+ int n_idle () { return this ->nWaiting ; }
65+ std::thread & get_thread (int i) { return *this ->threads [i]; }
66+
67+ // change the number of threads in the pool
68+ // should be called from one thread, otherwise be careful to not interleave, also with this->stop()
69+ // nThreads must be >= 0
70+ void resize (int nThreads) {
71+ if (!this ->isStop && !this ->isDone ) {
72+ int oldNThreads = static_cast <int >(this ->threads .size ());
73+ if (oldNThreads <= nThreads) { // if the number of threads is increased
74+ this ->threads .resize (nThreads);
75+ this ->flags .resize (nThreads);
76+
77+ for (int i = oldNThreads; i < nThreads; ++i) {
78+ this ->flags [i] = std::make_shared<std::atomic<bool >>(false );
79+ this ->set_thread (i);
80+ }
81+ }
82+ else { // the number of threads is decreased
83+ for (int i = oldNThreads - 1 ; i >= nThreads; --i) {
84+ *this ->flags [i] = true ; // this thread will finish
85+ this ->threads [i]->detach ();
86+ }
87+ {
88+ // stop the detached threads that were waiting
89+ std::unique_lock<std::mutex> lock (this ->mutex );
90+ this ->cv .notify_all ();
91+ }
92+ this ->threads .resize (nThreads); // safe to delete because the threads are detached
93+ this ->flags .resize (nThreads); // safe to delete because the threads have copies of shared_ptr of the flags, not originals
94+ }
95+ }
96+ }
97+
98+ // empty the queue
99+ void clear_queue () {
100+ std::function<void (int id)> * _f;
101+ while (this ->q .pop (_f))
102+ delete _f; // empty the queue
103+ }
104+
105+ // pops a functional wraper to the original function
106+ std::function<void (int )> pop () {
107+ std::function<void (int id)> * _f = nullptr ;
108+ this ->q .pop (_f);
109+ std::unique_ptr<std::function<void (int id)>> func (_f); // at return, delete the function even if an exception occurred
110+
111+ std::function<void (int )> f;
112+ if (_f)
113+ f = *_f;
114+ return f;
115+ }
116+
117+
118+ // wait for all computing threads to finish and stop all threads
119+ // may be called asyncronously to not pause the calling thread while waiting
120+ // if isWait == true, all the functions in the queue are run, otherwise the queue is cleared without running the functions
121+ void stop (bool isWait = false ) {
122+ if (!isWait) {
123+ if (this ->isStop )
124+ return ;
125+ this ->isStop = true ;
126+ for (int i = 0 , n = this ->size (); i < n; ++i) {
127+ *this ->flags [i] = true ; // command the threads to stop
128+ }
129+ this ->clear_queue (); // empty the queue
130+ }
131+ else {
132+ if (this ->isDone || this ->isStop )
133+ return ;
134+ this ->isDone = true ; // give the waiting threads a command to finish
135+ }
136+ {
137+ std::unique_lock<std::mutex> lock (this ->mutex );
138+ this ->cv .notify_all (); // stop all waiting threads
139+ }
140+ for (int i = 0 ; i < static_cast <int >(this ->threads .size ()); ++i) { // wait for the computing threads to finish
141+ if (this ->threads [i]->joinable ())
142+ this ->threads [i]->join ();
143+ }
144+ // if there were no threads in the pool but some functors in the queue, the functors are not deleted by the threads
145+ // therefore delete them here
146+ this ->clear_queue ();
147+ this ->threads .clear ();
148+ this ->flags .clear ();
149+ }
150+
151+ template <typename F, typename ... Rest>
152+ auto push (F && f, Rest&&... rest) ->std::future<decltype(f(0 , rest...))> {
153+ auto pck = std::make_shared<std::packaged_task<decltype (f (0 , rest...))(int )>>(
154+ std::bind (std::forward<F>(f), std::placeholders::_1, std::forward<Rest>(rest)...)
155+ );
156+
157+ auto _f = new std::function<void (int id)>([pck](int id) {
158+ (*pck)(id);
159+ });
160+ this ->q .push (_f);
161+
162+ std::unique_lock<std::mutex> lock (this ->mutex );
163+ this ->cv .notify_one ();
164+
165+ return pck->get_future ();
166+ }
167+
168+ // run the user's function that excepts argument int - id of the running thread. returned value is templatized
169+ // operator returns std::future, where the user can get the result and rethrow the catched exceptins
170+ template <typename F>
171+ auto push (F && f) ->std::future<decltype(f(0 ))> {
172+ auto pck = std::make_shared<std::packaged_task<decltype (f (0 ))(int )>>(std::forward<F>(f));
173+
174+ auto _f = new std::function<void (int id)>([pck](int id) {
175+ (*pck)(id);
176+ });
177+ this ->q .push (_f);
178+
179+ std::unique_lock<std::mutex> lock (this ->mutex );
180+ this ->cv .notify_one ();
181+
182+ return pck->get_future ();
183+ }
184+
185+
186+ private:
187+
188+ // deleted
189+ thread_pool (const thread_pool &);// = delete;
190+ thread_pool (thread_pool &&);// = delete;
191+ thread_pool & operator =(const thread_pool &);// = delete;
192+ thread_pool & operator =(thread_pool &&);// = delete;
193+
194+ void set_thread (int i) {
195+ std::shared_ptr<std::atomic<bool >> flag (this ->flags [i]); // a copy of the shared ptr to the flag
196+ auto f = [this , i, flag/* a copy of the shared ptr to the flag */ ]() {
197+ std::atomic<bool > & _flag = *flag;
198+ std::function<void (int id)> * _f;
199+ bool isPop = this ->q .pop (_f);
200+ while (true ) {
201+ while (isPop) { // if there is anything in the queue
202+ std::unique_ptr<std::function<void (int id)>> func (_f); // at return, delete the function even if an exception occurred
203+ (*_f)(i);
204+
205+ if (_flag)
206+ return ; // the thread is wanted to stop, return even if the queue is not empty yet
207+ else
208+ isPop = this ->q .pop (_f);
209+ }
210+
211+ // the queue is empty here, wait for the next command
212+ std::unique_lock<std::mutex> lock (this ->mutex );
213+ ++this ->nWaiting ;
214+ this ->cv .wait (lock, [this , &_f, &isPop, &_flag](){ isPop = this ->q .pop (_f); return isPop || this ->isDone || _flag; });
215+ --this ->nWaiting ;
216+
217+ if (!isPop)
218+ return ; // if the queue is empty and this->isDone == true or *flag then return
219+ }
220+ };
221+ this ->threads [i].reset (new std::thread (f)); // compiler may not support std::make_unique()
222+ }
223+
224+ void init () { this ->nWaiting = 0 ; this ->isStop = false ; this ->isDone = false ; }
225+
226+ std::vector<std::unique_ptr<std::thread>> threads;
227+ std::vector<std::shared_ptr<std::atomic<bool >>> flags;
228+ mutable boost::lockfree::queue<std::function<void (int id)> *> q;
229+ std::atomic<bool > isDone;
230+ std::atomic<bool > isStop;
231+ std::atomic<int > nWaiting; // how many threads are waiting
232+
233+ std::mutex mutex;
234+ std::condition_variable cv;
235+ };
236+
237+ }
238+
239+ #endif // __ctpl_thread_pool_H__
0 commit comments