55#include < scheduler.h>
66
77#include < random.h>
8- #include < reverselock.h>
98
109#include < assert.h>
1110#include < utility>
@@ -20,18 +19,9 @@ CScheduler::~CScheduler()
2019}
2120
2221
23- #if BOOST_VERSION < 105000
24- static boost::system_time toPosixTime (const boost::chrono::system_clock::time_point& t)
25- {
26- // Creating the posix_time using from_time_t loses sub-second precision. So rather than exporting the time_point to time_t,
27- // start with a posix_time at the epoch (0) and add the milliseconds that have passed since then.
28- return boost::posix_time::from_time_t (0 ) + boost::posix_time::milliseconds (boost::chrono::duration_cast<boost::chrono::milliseconds>(t.time_since_epoch ()).count ());
29- }
30- #endif
31-
3222void CScheduler::serviceQueue ()
3323{
34- boost::unique_lock<boost::mutex> lock (newTaskMutex);
24+ WAIT_LOCK (newTaskMutex, lock );
3525 ++nThreadsServicingQueue;
3626
3727 // newTaskMutex is locked throughout this loop EXCEPT
@@ -40,7 +30,7 @@ void CScheduler::serviceQueue()
4030 while (!shouldStop ()) {
4131 try {
4232 if (!shouldStop () && taskQueue.empty ()) {
43- reverse_lock<boost::unique_lock<boost::mutex> > rlock (lock);
33+ REVERSE_LOCK (lock);
4434 }
4535 while (!shouldStop () && taskQueue.empty ()) {
4636 // Wait until there is something to do.
@@ -50,21 +40,13 @@ void CScheduler::serviceQueue()
5040 // Wait until either there is a new task, or until
5141 // the time of the first item on the queue:
5242
53- // wait_until needs boost 1.50 or later; older versions have timed_wait:
54- #if BOOST_VERSION < 105000
55- while (!shouldStop () && !taskQueue.empty () &&
56- newTaskScheduled.timed_wait (lock, toPosixTime (taskQueue.begin ()->first ))) {
57- // Keep waiting until timeout
58- }
59- #else
60- // Some boost versions have a conflicting overload of wait_until that returns void.
61- // Explicitly use a template here to avoid hitting that overload.
6243 while (!shouldStop () && !taskQueue.empty ()) {
63- boost ::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin ()->first ;
64- if (newTaskScheduled.wait_until <> (lock, timeToWaitFor) == boost ::cv_status::timeout)
44+ std ::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin ()->first ;
45+ if (newTaskScheduled.wait_until (lock, timeToWaitFor) == std ::cv_status::timeout) {
6546 break ; // Exit loop after timeout, it means we reached the time of the event
47+ }
6648 }
67- # endif
49+
6850 // If there are multiple threads, the queue can empty while we're waiting (another
6951 // thread may service the task we were waiting on).
7052 if (shouldStop () || taskQueue.empty ())
@@ -76,7 +58,7 @@ void CScheduler::serviceQueue()
7658 {
7759 // Unlock before calling f, so it can reschedule itself or another task
7860 // without deadlocking:
79- reverse_lock<boost::unique_lock<boost::mutex> > rlock (lock);
61+ REVERSE_LOCK (lock);
8062 f ();
8163 }
8264 } catch (...) {
@@ -91,7 +73,7 @@ void CScheduler::serviceQueue()
9173void CScheduler::stop (bool drain)
9274{
9375 {
94- boost::unique_lock<boost::mutex> lock (newTaskMutex);
76+ LOCK (newTaskMutex);
9577 if (drain)
9678 stopWhenEmpty = true ;
9779 else
@@ -100,29 +82,29 @@ void CScheduler::stop(bool drain)
10082 newTaskScheduled.notify_all ();
10183}
10284
103- void CScheduler::schedule (CScheduler::Function f, boost ::chrono::system_clock::time_point t)
85+ void CScheduler::schedule (CScheduler::Function f, std ::chrono::system_clock::time_point t)
10486{
10587 {
106- boost::unique_lock<boost::mutex> lock (newTaskMutex);
88+ LOCK (newTaskMutex);
10789 taskQueue.insert (std::make_pair (t, f));
10890 }
10991 newTaskScheduled.notify_one ();
11092}
11193
11294void CScheduler::scheduleFromNow (CScheduler::Function f, int64_t deltaMilliSeconds)
11395{
114- schedule (f, boost ::chrono::system_clock::now () + boost ::chrono::milliseconds (deltaMilliSeconds));
96+ schedule (f, std ::chrono::system_clock::now () + std ::chrono::milliseconds (deltaMilliSeconds));
11597}
11698
117- void CScheduler::MockForward (boost ::chrono::seconds delta_seconds)
99+ void CScheduler::MockForward (std ::chrono::seconds delta_seconds)
118100{
119- assert (delta_seconds.count () > 0 && delta_seconds < boost ::chrono::hours{1 });
101+ assert (delta_seconds.count () > 0 && delta_seconds < std ::chrono::hours{1 });
120102
121103 {
122- boost::unique_lock<boost::mutex> lock (newTaskMutex);
104+ LOCK (newTaskMutex);
123105
124106 // use temp_queue to maintain updated schedule
125- std::multimap<boost ::chrono::system_clock::time_point, Function> temp_queue;
107+ std::multimap<std ::chrono::system_clock::time_point, Function> temp_queue;
126108
127109 for (const auto & element : taskQueue) {
128110 temp_queue.emplace_hint (temp_queue.cend (), element.first - delta_seconds, element.second );
@@ -147,10 +129,10 @@ void CScheduler::scheduleEvery(CScheduler::Function f, int64_t deltaMilliSeconds
147129 scheduleFromNow (std::bind (&Repeat, this , f, deltaMilliSeconds), deltaMilliSeconds);
148130}
149131
150- size_t CScheduler::getQueueInfo (boost ::chrono::system_clock::time_point &first,
151- boost ::chrono::system_clock::time_point &last) const
132+ size_t CScheduler::getQueueInfo (std ::chrono::system_clock::time_point &first,
133+ std ::chrono::system_clock::time_point &last) const
152134{
153- boost::unique_lock<boost::mutex> lock (newTaskMutex);
135+ LOCK (newTaskMutex);
154136 size_t result = taskQueue.size ();
155137 if (!taskQueue.empty ()) {
156138 first = taskQueue.begin ()->first ;
@@ -160,7 +142,7 @@ size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first,
160142}
161143
162144bool CScheduler::AreThreadsServicingQueue () const {
163- boost::unique_lock<boost::mutex> lock (newTaskMutex);
145+ LOCK (newTaskMutex);
164146 return nThreadsServicingQueue;
165147}
166148
@@ -174,7 +156,7 @@ void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() {
174156 if (m_are_callbacks_running) return ;
175157 if (m_callbacks_pending.empty ()) return ;
176158 }
177- m_pscheduler->schedule (std::bind (&SingleThreadedSchedulerClient::ProcessQueue, this ));
159+ m_pscheduler->schedule (std::bind (&SingleThreadedSchedulerClient::ProcessQueue, this ), std::chrono::system_clock::now () );
178160}
179161
180162void SingleThreadedSchedulerClient::ProcessQueue () {
0 commit comments