Skip to content

add run loop scheduler#154

Merged
kirkshoop merged 3 commits intoReactiveX:masterfrom
kirkshoop:runloop
Jun 24, 2015
Merged

add run loop scheduler#154
kirkshoop merged 3 commits intoReactiveX:masterfrom
kirkshoop:runloop

Conversation

@kirkshoop
Copy link
Copy Markdown
Member

I added the run loop scheduler to answer #151

Usage

#include "rxcpp/rx.hpp"
// create alias' to simplify code
// these are owned by the user so that
// conflicts can be managed by the user.
namespace rx=rxcpp;

int main()
{
    std::cout << std::setw(16) << std::this_thread::get_id() << " <- main thread id" << std::endl;

    using namespace std::chrono;

    rx::schedulers::run_loop rl;

    auto mainthread = rx::observe_on_run_loop(rl);
    auto workthread = rx::synchronize_new_thread();

    rx::composite_subscription lifetime;

    rx::observable<>::interval(workthread.now() + milliseconds(100), milliseconds(400)).
        map([](int i){
            std::cout << std::setw(16) << std::this_thread::get_id() << ": " << i << std::endl;
            return i;
        }).
        take_until(rx::observable<>::timer(seconds(1))).
        subscribe_on(workthread).
        observe_on(mainthread).
        subscribe(lifetime, [&](int i){
            std::cout << std::setw(16) << std::this_thread::get_id() << ": " << i << std::endl;
        });

    while (lifetime.is_subscribed() || !rl.empty()) {
        while (!rl.empty() && rl.peek().when < rl.now()) {
            rl.dispatch();
        }
    }
    return 0;
}

Output:

  0x7fff7b1cb300 <- main thread id
     0x102047000: 1
  0x7fff7b1cb300: 1
     0x102047000: 2
  0x7fff7b1cb300: 2
     0x102047000: 3
  0x7fff7b1cb300: 3

@kirkshoop
Copy link
Copy Markdown
Member Author

Added fix for #153

kirkshoop pushed a commit that referenced this pull request Jun 24, 2015
@kirkshoop kirkshoop merged commit 0ff6d6a into ReactiveX:master Jun 24, 2015
@kirkshoop kirkshoop deleted the runloop branch June 24, 2015 03:44
@aomader
Copy link
Copy Markdown

aomader commented Sep 11, 2015

@kirkshoop Quick question, how would you integrate your runloop with an existing runloop like e.g. Qt's. I'm trying to bind my observables to a QObject in a safe manner, i.e., using observe_on, but I'm unsure how to use it properly. Any hints or recommendations?

As a side note, it might make sense to aggregate such common problems in a document or wiki. I for myself like to collect all common problems and their solutions in the main readme file directly, this motives the usage better.

@kirkshoop
Copy link
Copy Markdown
Member Author

Hi!

The runloop scheduler is intended to allow the scheduled items to be dispatched in a loop - perhaps in the same loop with another event system - like Win32 window messages.

After some research QT appears to keep its event loop hidden inside the application object.
This means that runloop is not the best tool for the job.

It would not be terribly hard to build a scheduler for QT that would post an event (or signal a slot) for each scheduled action. this would leave the QT event loop untouched.

@aomader
Copy link
Copy Markdown

aomader commented Sep 16, 2015

@kirkshoop Thanks for the follow up.

My current solution involves a simple timer which is scheduled on Qt's main loop and dispatches the observable events in a repetitive fashion, something like this:

rx::schedulers::run_loop runLoop;
auto worker = rx::observe_on_run_loop(runLoop);

QTimer timer;
QObject::connect(&timer, &QTimer::timeout, [&runLoop]() {
    while (runLoop.empty() && runLoop.peek().when < runLoop.now()) {
        runLoop.dispatch();
    }
});
timer.start(10);

Obviously this isn't ideal and has a few downsides. First of, the dispatch loop could block the main loop for too long. This is easily solvable by implementing some sort of time limit per dispatch run. Second, using a timer and thus spinning the loop every x ms is relatively costly for low intensity applications and increases the energy usage for no reason. Instead of a polling mechanism it would be much better to have a pushing one.

A better approach would probably involve the usage of QCoreApplication::postEvent() to post a dispatch event to the main loop and a custom event handler for this types of events to run the dispatch loop. However, I'm not sure how to integrate the postEvent() with your schedulor. Is there a way to register a handler or some sort of callback which is called once a dispatchable event is available? The following block demonstrates my idea in pseudo code:

// install the dispatch which runs the dispatch loop like above
QApplication::installEventHandler(myDispatchEventHandler);

rx::schedulers::run_loop runLoop;
auto worker = rx::observe_on_run_loop(runLoop);

// FIXME: i'm unsure if this is possible with rxcpp
worker.on_new_event([]() {
    // might make sense to check if a similar event is already
    // queued on qts main loop and to drop this event, if so
    QApplication::postEvent(spinTheDispatchLoop);
});

// starts the main loop
QApplication::exec();

@studoot
Copy link
Copy Markdown
Contributor

studoot commented Jan 24, 2017

So, I wish I'd noticed this issue before this morning, when I went through similar thought processes, working out how I was going to integrate RxCpp and Qt. I came up with something very similar.

I use two Qt signals to trigger the Rx run loop:

  1. I use QAbstractEventDispatcher::aboutToBlock to run the dispatch loop in case there have been any newly queued schedulables during the last batch of processing Qt events. I figured this would work nicely for me, as most of my Rx stuff is dependent on

  2. I use a QTimer (and the associatedtimeout signal) when there's an amount of time until the next Rx scheduled time.

    In addition, this timer is used with a 5ms interval if there is no QAbstractEventDispatcher::instance available when initialising.

This scheme seems to work pretty nicely - both Qt and Rx get scheduled without any noticeable delays, and CPU usage is just noise on my 3.5GHz workstation.

As @b52 alludes to, it would be useful to get notification (via callback?) when new schedulable items are queued - that could be used to trigger Rx processing rather than using QAbstractEventDispatcher::aboutToBlock.

I'll also mention RxQt, which provides some nice wrappers for creating observables from Qt signals and events...

#include <nonstd/optional.hpp> // From https://github.com/martinmoene/optional-lite/
#include <QAbstractEventDispatcher>
#include <QTimer>
#include <rxcpp/rx.hpp>

class QtRxEventsProcessor
{
public:
   QtRxEventsProcessor() : useTimerForRegularWakeup_(false)
   {
      t_.setSingleShot(true);
      t_.setTimerType(Qt::PreciseTimer);
      t_.connect(&t_, &QTimer::timeout, [&]() { onEventScheduled(); });
      if (const auto eventDispatcher = QAbstractEventDispatcher::instance())
      {
         const auto dispatcherConnection =
             QObject::connect(eventDispatcher, &QAbstractEventDispatcher::aboutToBlock, [&]() { onEventScheduled(); });
         useTimerForRegularWakeup_ = !dispatcherConnection;
      }
      else
      {
         useTimerForRegularWakeup_ = true;
      }
      onEventScheduled();
   }

private:
   void onEventScheduled()
   {
      if (const auto oTimeTillNextEvent = ProcessRxEvents())
      {
         t_.stop();
         t_.setInterval(oTimeTillNextEvent->count());
         t_.start();
      }
      else if (useTimerForRegularWakeup_)
      {
         t_.stop();
         t_.setInterval(5);
         t_.start();
      }
   }
   nonstd::optional<std::chrono::milliseconds> ProcessRxEvents() const
   {
      using namespace std::chrono;
      using namespace nonstd;
      while (!rl_.empty() && rl_.peek().when < rl_.now())
      {
         rl_.dispatch();
      }
      return rl_.empty() ? nullopt
                         : optional<std::chrono::milliseconds>(duration_cast<milliseconds>(rl_.peek().when - rl_.now()));
   }
   rxcpp::schedulers::run_loop rl_;
   QTimer t_;
   bool useTimerForRegularWakeup_;
};

@kirkshoop
Copy link
Copy Markdown
Member Author

Yes, this looks great!

I think that adding a callback to run loop that is passed the when for each call to schedule makes sense. I would merge that PR. 😉

@studoot
Copy link
Copy Markdown
Contributor

studoot commented Jan 24, 2017

Hint taken! And will be followed up on...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants