-
Notifications
You must be signed in to change notification settings - Fork 409
Alternative to using subjects (or behaviors) for (infinite) streams? #451
Description
Hi,
I am toying around using rxcpp and I am having some difficulties deciding on implementation. Another thing that confuses me is that in a lot of examples observables are finite, while I want to use them in a sort of 'infinite' way. I have multiple continuous sources of information (driver-objects for e.g. sensors) and they all have their own event loop. I want to use observables so I can combine their information and apply functions to them and pass them on.
So far I implemented subjects (and behaviors, but what is the difference exactly?) and 'manually' call the on_next() function from the subject's subscriber. However, reading this implies I should try avoiding this sort of scheme if I can. I have not figured out an alternative way so I am asking here :-) Or is subjects/behavior the appropriate way to go if you want to move information from such a driver to an observable? In other Rx implementations I see a from_event implementation? Is that something that might handle these drivers in a more rx-way?
Also; what I do now is to make the subject a member variables of the specific driver and subscribe to it (through get_observable()) from the parent scope; is that considered normal use?
I wrote up some minimal working example to demonstrate what I made; I was just wondering if this is sort of among the lines on how RX should be used.. :-)
#include<rxcpp/rx.hpp>
#include<thread>
#include<memory>
#include<chrono>
#include<string>
namespace rx=rxcpp;
namespace rxsub=rxcpp::subjects;
namespace rxu=rxcpp::util;
using namespace std::chrono_literals;
class driver {
public:
driver(int t, std::string msg) : t_(std::chrono::seconds(t)), msg_(msg) {
b_.reset(new rxcpp::subjects::behavior<std::string>(msg_));
}
void run()
{
for (;;) {
b_->get_subscriber().on_next(msg_);
std::this_thread::sleep_for(t_);
}
}
std::shared_ptr<rxcpp::subjects::behavior<std::string>> b_;
std::chrono::seconds t_;
std::string msg_;
};
class complextask
{
public:
complextask(int t) : t_(std::chrono::seconds(t)) {};
std::string calculate(std::string msg) {
std::this_thread::sleep_for(t_);
return std::string("after a lot of thinking: ") + msg;
}
std::chrono::seconds t_;
};
int main()
{
// generate information
driver foo(1, "foo");
driver bar(2, "bar");
// calculation task
complextask complex(5);
// give drivers their own thread
std::thread t_foo([&](){
foo.run();
});
std::thread t_bar([&](){
bar.run();
});
std::cout << "main thread\n";
auto barobs = bar.b_->get_observable();
auto fooobs = foo.b_->get_observable();
auto merged = barobs.merge(fooobs);
merged.subscribe([](std::string msg){
std::cout << msg << ", " << std::endl;
});
merged
.observe_on(rxcpp::observe_on_event_loop())
.subscribe([&](std::string msg){
std::cout << complex.calculate(msg) << std::endl;
});
t_foo.join();
t_bar.join();
return 0;
}