-
Notifications
You must be signed in to change notification settings - Fork 403
Description
When debugging an issue, I found that the implementation of blocking_subscribe inside rx-observable.hpp is a bit weird. The predicate lambda passed to std::condition_variable::wait is polling a memory location:
cs.add(
[&, track](){
// OSX geting invalid x86 op if notify_one is after the disposed = true
// presumably because the condition_variable may already have been awakened
// and is now sitting in a while loop on disposed
wake.notify_one();
track->disposed = true;
});
std::unique_lock<std::mutex> guard(lock);
source.subscribe(std::move(scbr));
wake.wait(guard,
[&, track](){
// this is really not good.
// false wakeups were never followed by true wakeups so..
// anyways this gets triggered before disposed is set now so wait.
while (!track->disposed) {
++track->false_wakes;
}
++track->true_wakes;
return true;
});
Looking at the comment, and the diff with the previous revision, I understood that you made the modification due to some problem in OSX environment.
Below is the code from the previous revision.
std::mutex lock;
std::condition_variable wake;
std::atomic_bool disposed;
auto scbr = make_subscriber<T>(std::forward<ArgN>(an)...);
auto cs = scbr.get_subscription();
cs.add([&](){
disposed = true;
wake.notify_one();
});
std::unique_lock<std::mutex> guard(lock);
source.subscribe(scbr);
wake.wait(guard, [&](){return !!disposed;});
Looking at this older code, I suspect the original problem you experienced in IOS is because a race condition:
Thread 1: get lock and subscribe
Thread 1: check (!!disposed) -> false, needs to wait for conditional_variable
Thread 2: set disposed = true
Thread 2: wake.notify_one();
Thread 1: enter wait() -> never gets notified
I think a better approach to solve this is to lock the same mutex inside the unsubscription handler. And you actually don't need the "disposed" flag.
auto cs = scbr.get_subscription();
cs.add(
[&](){
{
// Just make sure to acquire the lock once before notifying.
// This guarantee the subscriber has enter wake.wait().
std::unique_lock<std::mutex> guard(lock);
}
wake.notify_one();
});
std::unique_lock<std::mutex> guard(lock);
source.subscribe(std::move(scbr));
wake.wait(guard);