-
Notifications
You must be signed in to change notification settings - Fork 3.6k
=str 20448 splitAfter should emit substreams in a lazy way #21306
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
=str 20448 splitAfter should emit substreams in a lazy way #21306
Conversation
Can one of the repo owners verify this patch? |
OK TO TEST |
Yeah, I don't see how it can "simplify protocol a lot" either. |
Test PASSed. |
I think that's a very old comment that remained here (from before we had any |
LGTM, thanks |
Cool, need one more LGTM |
} | ||
|
||
override def onDownstreamFinish(): Unit = { | ||
// If the substream is already cancelled or it has not been handed out, we can go away | ||
if (!substreamWaitingToBePushed || substreamCancelled) completeStage() | ||
if (substreamSource == null || substreamWaitingToBePushed || substreamCancelled) completeStage() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
eq
to be consistent with the other null comparison?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I'll fix this
Started looking at it but need to spend more time on it to understand the stage, will try to do that soon. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks correct, but I don't see why the condition had to be reversed on L432
if (substreamSource eq null) { | ||
//can be already pulled from substream in case split after | ||
if (!hasBeenPulled(in)) pull(in) | ||
} else if (substreamWaitingToBePushed) pushSubstreamSource() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this condition has been reversed (it was negated before). What is the reason?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@drewhk the name of the variable should be "substreamWasPushed" in legacy code because of following:
if (!substreamWaitingToBePushed) {
push(out, Source.fromGraph(substreamSource.source))
...
substreamWaitingToBePushed = true
}
There are three stages - substreamSource
is null because data not yet come from upstream.
When data came it can be pushed downstream or not yet pushed so substreamWaitingToBePushed
should be true in this state but it's false. And the third stage when substream is pushed.
So I reversed substreamWaitingToBePushed
value across split stage
My assumption is that substreamWaitingToBePushed
was copied from groupBy
stage with some misconception.
Let me know if I missed something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I see thank you for the explanation.
To sum up - @drewhk has some concern about using reversing substreamWaitingToBePushed (as it seems name was not corresponding to flag purpose). Rest seems to be small nitpicks. |
Let me know if it's ok to rebase |
only needs a rebase now. |
489dbd4
to
98f5e52
Compare
PR is rebased and ready for merge. |
Test PASSed. |
LGTM AFAICS, Thanks @agolubev ! |
ported from akka/akka#21306 closes 3222
ported from akka/akka#21306 closes 3222
ported from akka/akka#21306 closes 3222
Ref #20448
Actually, can not see good way of simplifying WebSocket because of this change. Any thoughts?