Skip to content

inefficient implementation of blocking_subscribe #430

@nikobarli

Description

@nikobarli

When debugging an issue, I found that the implementation of blocking_subscribe inside rx-observable.hpp is a bit weird. The predicate lambda passed to std::condition_variable::wait is polling a memory location:

        cs.add(
            [&, track](){
                // OSX geting invalid x86 op if notify_one is after the disposed = true
                // presumably because the condition_variable may already have been awakened
                // and is now sitting in a while loop on disposed
                wake.notify_one();
                track->disposed = true;
            });

        std::unique_lock<std::mutex> guard(lock);
        source.subscribe(std::move(scbr));

        wake.wait(guard,
            [&, track](){
                // this is really not good.
                // false wakeups were never followed by true wakeups so..

                // anyways this gets triggered before disposed is set now so wait.
                while (!track->disposed) {
                    ++track->false_wakes;
                }
                ++track->true_wakes;
                return true;
            });

Looking at the comment, and the diff with the previous revision, I understood that you made the modification due to some problem in OSX environment.

Below is the code from the previous revision.

        std::mutex lock;
        std::condition_variable wake;
        std::atomic_bool disposed;
        auto scbr = make_subscriber<T>(std::forward<ArgN>(an)...);
        auto cs = scbr.get_subscription();
        cs.add([&](){
            disposed = true;
            wake.notify_one();
        });

        std::unique_lock<std::mutex> guard(lock);
        source.subscribe(scbr);
        wake.wait(guard, [&](){return !!disposed;});

Looking at this older code, I suspect the original problem you experienced in IOS is because a race condition:

Thread 1: get lock and subscribe
Thread 1: check (!!disposed) -> false, needs to wait for conditional_variable
Thread 2: set disposed = true
Thread 2: wake.notify_one();
Thread 1: enter wait() -> never gets notified

I think a better approach to solve this is to lock the same mutex inside the unsubscription handler. And you actually don't need the "disposed" flag.

        auto cs = scbr.get_subscription();
        cs.add(
            [&](){
                {
                    // Just make sure to acquire the lock once before notifying.
                    // This guarantee the subscriber has enter wake.wait().
                    std::unique_lock<std::mutex> guard(lock);
                }
                wake.notify_one();
            });

        std::unique_lock<std::mutex> guard(lock);
        source.subscribe(std::move(scbr));
        wake.wait(guard);

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions