-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Restart Flow/Source/Sink #23367
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Restart Flow/Source/Sink #23367
Conversation
Test FAILed. |
a468a45
to
f5ba53f
Compare
Test PASSed. |
* Implemented restart flow/source/sink with exponential backoff. * Also fixed invariance bug for Source.fromSourceCompletionStage.
f5ba53f
to
5d8a367
Compare
I've updated this PR with full tests and documentation. For the documentation, I've stubbed a few akka-http APIs so that I can show the feature with a realistic use case (it doesn't make sense to wrap a source like Source.single in a restarting source with backoff). I'm not sure if that's considered good practice in Akka or not, let me know if you'd like me to change it (and give suggestions for what I should demonstrate too). I also fixed an invariance bug in the Source.fromSourceCompletionStage/fromFuture APIs. |
@@ -184,13 +184,14 @@ object Source { | |||
* Streams the elements of the given future source once it successfully completes. | |||
* If the future fails the stream is failed. | |||
*/ | |||
def fromFutureSource[T, M](future: Future[Graph[SourceShape[T], M]]): javadsl.Source[T, Future[M]] = new Source(scaladsl.Source.fromFutureSource(future)) | |||
def fromFutureSource[T, M](future: Future[_ <: Graph[SourceShape[T], M]]): javadsl.Source[T, Future[M]] = new Source(scaladsl.Source.fromFutureSource(future)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Future is variant, but this is the Java API, it needs to be variant at the call site.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, should be a compatible change AFAICS
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, both source and binary compatible. I thought about doing this in a separate PR but I really wanted to use it in the docs.
|
||
/** | ||
* Streams the elements of an asynchronous source once its given `completion` stage completes. | ||
* If the `completion` fails the stream is failed with that exception. | ||
*/ | ||
def fromSourceCompletionStage[T, M](completion: CompletionStage[Graph[SourceShape[T], M]]): javadsl.Source[T, CompletionStage[M]] = new Source(scaladsl.Source.fromSourceCompletionStage(completion)) | ||
def fromSourceCompletionStage[T, M](completion: CompletionStage[_ <: Graph[SourceShape[T], M]]): javadsl.Source[T, CompletionStage[M]] = | ||
new Source(scaladsl.Source.fromSourceCompletionStage(completion)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CompletionStage is invariant and this is the Java API so this definitely needs to be variant at the call site.
@@ -275,7 +275,7 @@ object Source { | |||
* Streams the elements of an asynchronous source once its given `completion` stage completes. | |||
* If the `completion` fails the stream is failed with that exception. | |||
*/ | |||
def fromSourceCompletionStage[T, M](completion: CompletionStage[Graph[SourceShape[T], M]]): Source[T, CompletionStage[M]] = fromFutureSource(completion.toScala).mapMaterializedValue(_.toJava) | |||
def fromSourceCompletionStage[T, M](completion: CompletionStage[_ <: Graph[SourceShape[T], M]]): Source[T, CompletionStage[M]] = fromFutureSource(completion.toScala).mapMaterializedValue(_.toJava) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CompletionStage is invariant, so even though this is the Scala API it still needs to be variant at the callsite.
Test PASSed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a full review, only skimmed it and had one question, looks very nice.
probe.requestNext("b") | ||
probe.requestNext("a") | ||
|
||
created.get() should ===(3) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this a problematic shortcoming, that a stream using RestartSource.withBackoff
can never complete based on the source completing? It is not what I would spontaneously expect from such a stage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it what you'd expect from Akka's backoff supervisor? Because it's exactly the same the behavior, and has exactly the same use cases. I'd like to use this feature for running things like CQRS event streams, Kafka subscriptions, and broadcasting ConductR event streams. You can't just stop processing all CQRS events in your system just because the source producing the stops. The source stopping (depending on the source) may indicate that the server that produced it is shutting down (perhaps due to an upgrade) and has cleanly closed its connections. An upgrade shouldn't cause me to stop doing what I need to do, I need to reestablish that connection (eg, if it's a rolling upgrade that caused the stream to stop, then after I get disconnected I want to connect to another node).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. Good point about the backoff supervisor, didn't think of that. I understood how it makes sense in your use case, the question was more if that is the only use case. Maybe some kind of alternative can be introduced in the future if it turns out it is useful/needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with some minor comments, thanks @jroper !
supervision strategy*, starting a stage again when it fails, each time with a growing time delay between restarts. | ||
|
||
This pattern is useful when the stage fails <a id="^1" href="#1">[1]</a> because some external resource is not available, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Skip the footnote and just say "fails or completes" here. Also, should there really be a comma here (non native english speaker asking)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A lot of this was copied verbatim from the backoff supervisor docs, but I'll update :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅
> <a id="1" href="#^1">[1]</a> A failure can be indicated in two different ways; by the stream completing or failing. | ||
|
||
The following snippet shows how to create a backoff supervisor using `akka.stream.scaladsl.RestartSource` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@scala[akka.stream.scaladsl.RestartSource]
and @java[akka.stream.javadsl.RestartSource]
here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅
created.incrementAndGet() | ||
Source(List("a", "b", "c")) | ||
.map { | ||
case "c" ⇒ sys.error("failed") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw TE("failed")
gives less noise in the test output.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅
|
||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent test coverage. Would be good to also use the akka.stream.testkit.Utils#assertAllStagesStopped
around each test case to cover that there is no stream stages leaking (I don't think there is).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅
large spikes of traffic hitting the recovering server or other resource that they all need to contact. | ||
|
||
In the same way, `akka.stream.scaladsl.RestartSink` and `akka.stream.scaladsl.RestartFlow` can be used to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great, definitely needed and very nice ones. Just some minor nitpicks and let's merge after those cleaned up - we'll include it in @johanandren's docs change linking the various restart methods too
The following snippet shows how to create a backoff supervisor using `akka.stream.scaladsl.RestartSource` | ||
which will supervise the given `Source`. The `Source` in this case is a stream of Server Sent Events, | ||
produced by akka-http. If the stream fails at any point, the request will be made again, in increasing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fails or completes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅
|
||
In the same way, `akka.stream.scaladsl.RestartSink` and `akka.stream.scaladsl.RestartFlow` can be used to | ||
supervise sinks and flows. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A small note on when they would be restarted would be nice to add
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅
and re-starting after the same configured interval. By adding additional randomness to the | ||
re-start intervals the streams will start in slightly different points in time, thus avoiding | ||
large spikes of traffic hitting the recovering server or other resource that they all need to contact. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good explanation :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't claim credit, it came from the backoff supervisor documentation :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:v)
static Materializer materializer; | ||
|
||
// Mocking akka-http | ||
public static class Http { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see.. since we're in streams and don't have the dep hm...
I think this may work fine, these APIs are not going to change, let's keep the mocked versions - it's a nice tangible example after all
@@ -0,0 +1,461 @@ | |||
package akka.stream.scaladsl |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copyright (C) 2015-2017 Lightbend Inc. <http://www.lightbend.com>
// boring to add, I know, we'll try to add the sbt-header plugin soon again, we once failed at it due to some weirdness
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅
* | ||
* This [[Source]] will never emit a complete or failure, since the completion or failure of the wrapped [[Source]] | ||
* is always handled by restarting it. The wrapped [[Source]] can however be cancelled by cancelling this [[Source]]. | ||
* When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add here "This can be triggered simply by the upstream, or externally by introducing a [[KillSwitch]] right before this Sink in the graph."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅
flowFactory.create().asScala | ||
}.asJava | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great docs in general 👍
@@ -184,13 +184,14 @@ object Source { | |||
* Streams the elements of the given future source once it successfully completes. | |||
* If the future fails the stream is failed. | |||
*/ | |||
def fromFutureSource[T, M](future: Future[Graph[SourceShape[T], M]]): javadsl.Source[T, Future[M]] = new Source(scaladsl.Source.fromFutureSource(future)) | |||
def fromFutureSource[T, M](future: Future[_ <: Graph[SourceShape[T], M]]): javadsl.Source[T, Future[M]] = new Source(scaladsl.Source.fromFutureSource(future)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, should be a compatible change AFAICS
protected def backoff(): Unit | ||
|
||
protected final def createSubInlet[T](out: Outlet[T]): SubSinkInlet[T] = { | ||
val sinkIn = new SubSinkInlet[T](s"RestartWithBackoff$name.subin") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s"RestartWithBackoff$name.subIn"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅
which will supervise the given `Source`. The `Source` in this case is a stream of Server Sent Events, | ||
produced by akka-http. If the stream fails at any point, the request will be made again, in increasing | ||
intervals of 3, 6, 12, 24 and finally 30 seconds: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
finally 30 seconds (at which it will remain capped due to the maxBackoff parameter)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅
I've updated the PR in response to the review. |
Test FAILed. |
Failure was formatting:
we started failing if a change is not formatted by scalariform (caused us weird hassles during merging). Can you Looking good otherwise, ready to merge :) |
Test FAILed. |
Test PASSed. |
* | ||
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. | ||
* | ||
* @param minBackoff minimum (initial) duration until the child actor will |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should have been child/wrapped/nested stream
, I believe.
Thank you so much for adding this pattern to Akka! |
See #19950 (comment) for context.
One major change from the API proposed is that this wraps factories of sources/flows/sinks, not plain sources/flows/sinks.
I did this basically because there's no equivalent of
Flow.apply(() => Flow)
(for Source there's lazily and for Sink there's lazyInit). Why this is important is because very often you start with a graph that is not reusable, a perfect use case for a restartable flow is where you want to feed the stream through a WebSocket using for exampleHttp().webSocketClientFlow(WebSocketRequest("ws://echo.websocket.org"))
. But that WebSocket flow is not reusable, a second attempt to materialize it will fail. Hence, we need a factory of flows to be able to use the API in this way.Since the flow version was taking a factory, I thought the source and sink version should also take a factory for consistency. And besides, neither
Source.lazily
norSink.lazyInit
actually provides the semantics of just turning a factory into the direct instance, they also delay the invocation until first pull/push, which is subtley but not insignificantly different.