Skip to content

Diamond graphs for observable aren't composable? #484

@iam

Description

@iam

When trying to design an algorithm that requires turning a regular rx chain into a diamond (imagine there's no multi-threading necessary here), there seems to be a lack of composability:

     A
   /   \
  B     C
   \   /
     D
     |
     E

(A, B, C, D, E are some linear chains)

The (only working) approach I could figure out to this would look like:

auto A = ... | publish();
auto B = A | ...;
auto C = A | ...;
auto D = B | merge(C);
auto E = D | ...;

E | subscribe(...);
A.connect();                    // <<<<< not composable

The problem is that calling connect doesn't compose, so the whole graph can't just be expressed as an observable<typeof(E)>.

What's the best practice to solve this? Should we change some operators to make this easier?


RxJava has autoConnect which rxcpp seems to lack.

ref_count would almost work except it calls connect too soon (when B is subscribed, but before waiting for C).


Possible solutions:

  • Add auto_connect to connectable_observable (problem: it doesn't seem to call 'unsubscribe' ever in RxJava. Not sure if this is acceptable?)
auto A = ... | publish() | auto_connect(2);
auto B = A | ...;
auto C = A | ...;
auto D = B | merge(C);
auto E = D | ...;

E | subscribe(...);
  • Change ref_count to take a threshold value
    ref_count(N): call connect() when reaching >=N subscribers.
    unsubscribe from source when reaching 0 subscribers
    (Today's behavior is simply N=1).
auto A = ... | publish() | ref_count(2);
auto B = A | ...;
auto C = A | ...;
auto D = B | merge(C);
auto E = D | ...;

E | subscribe(...);
  • Change ref_count to take another observable
    ref_count(Obs): subscribes/unsubscribes to its source as normal. also call Obs.connect() when reaching >=1 subscribers.
    (Today's behavior is simply Obs=source).
auto A = ... | publish();
auto B = A | ...;
auto C = A | ...;
auto D = B | merge(C) | ref_count(A);
auto E = D | ...;

E | subscribe(...);
  • Add some kind of new operator to avoid publish?
auto A = <src> | fork(2);
auto B = A | ...;
auto C = A | ...;
auto D = B | merge(C);
auto E = D | ...;

E.subscribe(...);
/*
  calls D.subscribe which:
  1) calls B.subscribe which calls A.subscribe -> the fork doesn't let it through
  2) calls C.subscribe which calls A.subscribe -> the fork N==2, so call the <src> subscribe

  and then the whole graph is subscribed at <src>
*/

fork(N) waits for N subscribers before subscribing to source. it forwards all the observer callbacks to all its subscribers. unsubscribe when subscriber count reaches 0.

(and this could also avoid all the mutex overhead of publish/ref_count).


What do you think? I'd be happy to write a PR if I knew what was the best way.

This has been bothering me for quite some time in some code I wrote for a project of mine, I ended up returning a pair<observable<T>, connectable_observable<Y>> the design of which "broke" immediately once I tried to integrate this into another larger observable chain.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions