-
Notifications
You must be signed in to change notification settings - Fork 402
Description
When trying to design an algorithm that requires turning a regular rx chain into a diamond (imagine there's no multi-threading necessary here), there seems to be a lack of composability:
A
/ \
B C
\ /
D
|
E
(A, B, C, D, E are some linear chains)
The (only working) approach I could figure out to this would look like:
auto A = ... | publish();
auto B = A | ...;
auto C = A | ...;
auto D = B | merge(C);
auto E = D | ...;
E | subscribe(...);
A.connect(); // <<<<< not composable
The problem is that calling connect
doesn't compose, so the whole graph can't just be expressed as an observable<typeof(E)>
.
What's the best practice to solve this? Should we change some operators to make this easier?
RxJava has autoConnect
which rxcpp seems to lack.
ref_count
would almost work except it calls connect
too soon (when B
is subscribed, but before waiting for C
).
Possible solutions:
- Add
auto_connect
toconnectable_observable
(problem: it doesn't seem to call 'unsubscribe' ever inRxJava
. Not sure if this is acceptable?)
auto A = ... | publish() | auto_connect(2);
auto B = A | ...;
auto C = A | ...;
auto D = B | merge(C);
auto E = D | ...;
E | subscribe(...);
- Change
ref_count
to take a threshold value
ref_count(N)
: call connect() when reaching >=N subscribers.
unsubscribe from source when reaching 0 subscribers
(Today's behavior is simply N=1).
auto A = ... | publish() | ref_count(2);
auto B = A | ...;
auto C = A | ...;
auto D = B | merge(C);
auto E = D | ...;
E | subscribe(...);
- Change
ref_count
to take another observable
ref_count(Obs)
: subscribes/unsubscribes to its source as normal. also call Obs.connect() when reaching >=1 subscribers.
(Today's behavior is simply Obs=source).
auto A = ... | publish();
auto B = A | ...;
auto C = A | ...;
auto D = B | merge(C) | ref_count(A);
auto E = D | ...;
E | subscribe(...);
- Add some kind of new operator to avoid
publish
?
auto A = <src> | fork(2);
auto B = A | ...;
auto C = A | ...;
auto D = B | merge(C);
auto E = D | ...;
E.subscribe(...);
/*
calls D.subscribe which:
1) calls B.subscribe which calls A.subscribe -> the fork doesn't let it through
2) calls C.subscribe which calls A.subscribe -> the fork N==2, so call the <src> subscribe
and then the whole graph is subscribed at <src>
*/
fork(N) waits for N subscribers before subscribing to source. it forwards all the observer callbacks to all its subscribers. unsubscribe when subscriber count reaches 0.
(and this could also avoid all the mutex overhead of publish/ref_count).
What do you think? I'd be happy to write a PR if I knew what was the best way.
This has been bothering me for quite some time in some code I wrote for a project of mine, I ended up returning a pair<observable<T>, connectable_observable<Y>>
the design of which "broke" immediately once I tried to integrate this into another larger observable chain.