Skip to content

Alternative to using subjects (or behaviors) for (infinite) streams? #451

@thorstink

Description

@thorstink

Hi,

I am toying around using rxcpp and I am having some difficulties deciding on implementation. Another thing that confuses me is that in a lot of examples observables are finite, while I want to use them in a sort of 'infinite' way. I have multiple continuous sources of information (driver-objects for e.g. sensors) and they all have their own event loop. I want to use observables so I can combine their information and apply functions to them and pass them on.

So far I implemented subjects (and behaviors, but what is the difference exactly?) and 'manually' call the on_next() function from the subject's subscriber. However, reading this implies I should try avoiding this sort of scheme if I can. I have not figured out an alternative way so I am asking here :-) Or is subjects/behavior the appropriate way to go if you want to move information from such a driver to an observable? In other Rx implementations I see a from_event implementation? Is that something that might handle these drivers in a more rx-way?

Also; what I do now is to make the subject a member variables of the specific driver and subscribe to it (through get_observable()) from the parent scope; is that considered normal use?

I wrote up some minimal working example to demonstrate what I made; I was just wondering if this is sort of among the lines on how RX should be used.. :-)

#include<rxcpp/rx.hpp>
#include<thread>
#include<memory>
#include<chrono>
#include<string>
namespace rx=rxcpp;
namespace rxsub=rxcpp::subjects;
namespace rxu=rxcpp::util;
using namespace std::chrono_literals;

class driver {
public:
    driver(int t, std::string msg) : t_(std::chrono::seconds(t)), msg_(msg) {
        b_.reset(new rxcpp::subjects::behavior<std::string>(msg_));
    }
    void run()
    {
        for (;;) {
            b_->get_subscriber().on_next(msg_);
            std::this_thread::sleep_for(t_);  
        }
    }
    std::shared_ptr<rxcpp::subjects::behavior<std::string>> b_;
    std::chrono::seconds t_;
    std::string msg_;
};

class complextask 
{
public:
    complextask(int t) : t_(std::chrono::seconds(t)) {};
    std::string calculate(std::string msg) {
        std::this_thread::sleep_for(t_);
        return std::string("after a lot of thinking: ") + msg;  
    }
    std::chrono::seconds t_;
};

int main()
{
    // generate information
    driver foo(1, "foo");
    driver bar(2, "bar"); 
    // calculation task
    complextask complex(5);

    // give drivers their own thread
    std::thread t_foo([&](){
        foo.run();
    });

    std::thread t_bar([&](){
        bar.run();
    });

    std::cout << "main thread\n";
    auto barobs = bar.b_->get_observable();
    auto fooobs = foo.b_->get_observable();
    auto merged = barobs.merge(fooobs);

    merged.subscribe([](std::string msg){
            std::cout << msg << ", " << std::endl;
        });

    merged
        .observe_on(rxcpp::observe_on_event_loop())
        .subscribe([&](std::string msg){
            std::cout << complex.calculate(msg) << std::endl;
        });

    t_foo.join();
    t_bar.join();
    return 0;
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions