Skip to content

RxCpp and QT integration : any documentation? #438

@renaudcerrato

Description

@renaudcerrato

Just moved from Android to QT, and I'm struggling integrating RxCpp with the QT events loop. I'm basically trying to obtain a scheduler similar to AndroidSchedulers.mainThread().

Reading through #154 and #337 (thanks @studoot), I found the undocumented set_notify_earlier_wakeup.

I managed to get this far :

#include "rxcpp/rx.hpp"
namespace rx=rxcpp;

class helper
{
 public:
   helper(QObject *parent = Q_NULLPTR) : timer(parent)
   {
      rl.set_notify_earlier_wakeup([&](std::chrono::steady_clock::time_point when)
      {
         // Tell the timer to wake-up at `when` if its not already waking up earlier
         const auto ms = ms_until(when).count();
         if (!timer.isActive() || ms < timer.remainingTime())
         {
            timer.start(ms);
         }
      });
      timer.setSingleShot(true);
      timer.setTimerType(Qt::PreciseTimer);
      timer.connect(&timer, &QTimer::timeout, [&]() { onEventScheduled(); });
   }

   rxcpp::schedulers::run_loop rl;

private:
   // Flush the RxCpp run loop
   void onEventScheduled()
   {
      // Dispatch outstanding RxCpp events
      while (!rl.empty() && rl.peek().when < rl.now())
      {
         rl.dispatch();
      }
      // If there are outstanding events, set the timer to wakeup for the first one
      if (!rl.empty())
      {
         const auto time_till_next_event = ms_until(rl.peek().when);
         timer.start(static_cast<int>(time_till_next_event.count()));
      }
   }

   std::chrono::milliseconds ms_until(rxcpp::schedulers::run_loop::clock_type::time_point const& when) const
   {
      return ceil<std::chrono::milliseconds>(when - rl.now());
   }

   template <class To, class Rep, class Period>
   static inline To ceil(const std::chrono::duration<Rep, Period>& duration)
   {
      const auto as_To = std::chrono::duration_cast<To>(duration);
      return (as_To < duration) ? (as_To + To{1}) : as_To;
   }

   QTimer timer;
};

int main(int argc, char *argv[])
{
    QApplication a(argc, argv);

    helper rl;

    std::cout << std::setw(16) << std::this_thread::get_id() << " <- main thread id" << std::endl;

    using namespace std::chrono;

    auto mainthread = rx::observe_on_run_loop(rl.rl);
    auto workthread = rx::synchronize_new_thread();

    rx::composite_subscription lifetime;

    rx::observable<>::interval(workthread.now() + milliseconds(100), milliseconds(400)).
        map([](int i){
            std::cout << std::setw(16) << std::this_thread::get_id() << ": " << i << std::endl;
            return i;
        }).
        take_until(rx::observable<>::timer(seconds(1))).
        subscribe_on(workthread).
        observe_on(mainthread).
        subscribe(lifetime, [&](int i){
            std::cout << std::setw(16) << std::this_thread::get_id() << ": " << i << std::endl;
        });

    return a.exec();
}

Unfortunately, I'm getting the following output:

5080 <- main thread id
11864: 1
QObject::startTimer: Timers cannot be started from another thread
11864: 2
11864: 3

I'm rather new to QT, and it sounds like I can't start a timer on a different thread from which it has been created. Any idea on how to do a proper integration?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions