Skip to content

Conversation

jroper
Copy link
Contributor

@jroper jroper commented Jul 15, 2017

See #19950 (comment) for context.

One major change from the API proposed is that this wraps factories of sources/flows/sinks, not plain sources/flows/sinks.

I did this basically because there's no equivalent of Flow.apply(() => Flow) (for Source there's lazily and for Sink there's lazyInit). Why this is important is because very often you start with a graph that is not reusable, a perfect use case for a restartable flow is where you want to feed the stream through a WebSocket using for example Http().webSocketClientFlow(WebSocketRequest("ws://echo.websocket.org")). But that WebSocket flow is not reusable, a second attempt to materialize it will fail. Hence, we need a factory of flows to be able to use the API in this way.

Since the flow version was taking a factory, I thought the source and sink version should also take a factory for consistency. And besides, neither Source.lazily nor Sink.lazyInit actually provides the semantics of just turning a factory into the direct instance, they also delay the invocation until first pull/push, which is subtley but not insignificantly different.

@akka-ci akka-ci added validating PR is currently being validated by Jenkins needs-attention Indicates a PR validation failure (set by CI infrastructure) and removed validating PR is currently being validated by Jenkins labels Jul 15, 2017
@akka-ci
Copy link

akka-ci commented Jul 15, 2017

Test FAILed.

@jroper jroper force-pushed the restartsourceflowsink branch from a468a45 to f5ba53f Compare July 16, 2017 05:57
@akka-ci akka-ci added validating PR is currently being validated by Jenkins tested PR that was successfully built and tested by Jenkins and removed needs-attention Indicates a PR validation failure (set by CI infrastructure) validating PR is currently being validated by Jenkins labels Jul 16, 2017
@akka-ci
Copy link

akka-ci commented Jul 16, 2017

Test PASSed.

* Implemented restart flow/source/sink with exponential backoff.
* Also fixed invariance bug for Source.fromSourceCompletionStage.
@jroper jroper force-pushed the restartsourceflowsink branch from f5ba53f to 5d8a367 Compare July 16, 2017 11:21
@jroper jroper changed the title [WIP] Restart Flow/Source/Sink Restart Flow/Source/Sink Jul 16, 2017
@jroper
Copy link
Contributor Author

jroper commented Jul 16, 2017

I've updated this PR with full tests and documentation.

For the documentation, I've stubbed a few akka-http APIs so that I can show the feature with a realistic use case (it doesn't make sense to wrap a source like Source.single in a restarting source with backoff). I'm not sure if that's considered good practice in Akka or not, let me know if you'd like me to change it (and give suggestions for what I should demonstrate too).

I also fixed an invariance bug in the Source.fromSourceCompletionStage/fromFuture APIs.

@@ -184,13 +184,14 @@ object Source {
* Streams the elements of the given future source once it successfully completes.
* If the future fails the stream is failed.
*/
def fromFutureSource[T, M](future: Future[Graph[SourceShape[T], M]]): javadsl.Source[T, Future[M]] = new Source(scaladsl.Source.fromFutureSource(future))
def fromFutureSource[T, M](future: Future[_ <: Graph[SourceShape[T], M]]): javadsl.Source[T, Future[M]] = new Source(scaladsl.Source.fromFutureSource(future))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Future is variant, but this is the Java API, it needs to be variant at the call site.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, should be a compatible change AFAICS

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, both source and binary compatible. I thought about doing this in a separate PR but I really wanted to use it in the docs.


/**
* Streams the elements of an asynchronous source once its given `completion` stage completes.
* If the `completion` fails the stream is failed with that exception.
*/
def fromSourceCompletionStage[T, M](completion: CompletionStage[Graph[SourceShape[T], M]]): javadsl.Source[T, CompletionStage[M]] = new Source(scaladsl.Source.fromSourceCompletionStage(completion))
def fromSourceCompletionStage[T, M](completion: CompletionStage[_ <: Graph[SourceShape[T], M]]): javadsl.Source[T, CompletionStage[M]] =
new Source(scaladsl.Source.fromSourceCompletionStage(completion))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CompletionStage is invariant and this is the Java API so this definitely needs to be variant at the call site.

@@ -275,7 +275,7 @@ object Source {
* Streams the elements of an asynchronous source once its given `completion` stage completes.
* If the `completion` fails the stream is failed with that exception.
*/
def fromSourceCompletionStage[T, M](completion: CompletionStage[Graph[SourceShape[T], M]]): Source[T, CompletionStage[M]] = fromFutureSource(completion.toScala).mapMaterializedValue(_.toJava)
def fromSourceCompletionStage[T, M](completion: CompletionStage[_ <: Graph[SourceShape[T], M]]): Source[T, CompletionStage[M]] = fromFutureSource(completion.toScala).mapMaterializedValue(_.toJava)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CompletionStage is invariant, so even though this is the Scala API it still needs to be variant at the callsite.

@akka-ci akka-ci added validating PR is currently being validated by Jenkins and removed tested PR that was successfully built and tested by Jenkins labels Jul 16, 2017
@akka-ci akka-ci added tested PR that was successfully built and tested by Jenkins and removed validating PR is currently being validated by Jenkins labels Jul 16, 2017
@akka-ci
Copy link

akka-ci commented Jul 16, 2017

Test PASSed.

Copy link
Contributor

@johanandren johanandren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a full review, only skimmed it and had one question, looks very nice.

probe.requestNext("b")
probe.requestNext("a")

created.get() should ===(3)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this a problematic shortcoming, that a stream using RestartSource.withBackoff can never complete based on the source completing? It is not what I would spontaneously expect from such a stage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it what you'd expect from Akka's backoff supervisor? Because it's exactly the same the behavior, and has exactly the same use cases. I'd like to use this feature for running things like CQRS event streams, Kafka subscriptions, and broadcasting ConductR event streams. You can't just stop processing all CQRS events in your system just because the source producing the stops. The source stopping (depending on the source) may indicate that the server that produced it is shutting down (perhaps due to an upgrade) and has cleanly closed its connections. An upgrade shouldn't cause me to stop doing what I need to do, I need to reestablish that connection (eg, if it's a rolling upgrade that caused the stream to stop, then after I get disconnected I want to connect to another node).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. Good point about the backoff supervisor, didn't think of that. I understood how it makes sense in your use case, the question was more if that is the only use case. Maybe some kind of alternative can be introduced in the future if it turns out it is useful/needed.

Copy link
Contributor

@johanandren johanandren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with some minor comments, thanks @jroper !

supervision strategy*, starting a stage again when it fails, each time with a growing time delay between restarts.

This pattern is useful when the stage fails <a id="^1" href="#1">[1]</a> because some external resource is not available,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skip the footnote and just say "fails or completes" here. Also, should there really be a comma here (non native english speaker asking)?

Copy link
Contributor Author

@jroper jroper Jul 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of this was copied verbatim from the backoff supervisor docs, but I'll update :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

> <a id="1" href="#^1">[1]</a> A failure can be indicated in two different ways; by the stream completing or failing.

The following snippet shows how to create a backoff supervisor using `akka.stream.scaladsl.RestartSource`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@scala[akka.stream.scaladsl.RestartSource] and @java[akka.stream.javadsl.RestartSource] here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

created.incrementAndGet()
Source(List("a", "b", "c"))
.map {
case "c" ⇒ sys.error("failed")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw TE("failed") gives less noise in the test output.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


}

}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent test coverage. Would be good to also use the akka.stream.testkit.Utils#assertAllStagesStopped around each test case to cover that there is no stream stages leaking (I don't think there is).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

large spikes of traffic hitting the recovering server or other resource that they all need to contact.

In the same way, `akka.stream.scaladsl.RestartSink` and `akka.stream.scaladsl.RestartFlow` can be used to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ktoso ktoso added the 2 - pick next Used to mark issues which are next up in the queue to be worked on. The tag is non-binding label Jul 19, 2017
Copy link
Contributor

@ktoso ktoso left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great, definitely needed and very nice ones. Just some minor nitpicks and let's merge after those cleaned up - we'll include it in @johanandren's docs change linking the various restart methods too

The following snippet shows how to create a backoff supervisor using `akka.stream.scaladsl.RestartSource`
which will supervise the given `Source`. The `Source` in this case is a stream of Server Sent Events,
produced by akka-http. If the stream fails at any point, the request will be made again, in increasing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fails or completes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


In the same way, `akka.stream.scaladsl.RestartSink` and `akka.stream.scaladsl.RestartFlow` can be used to
supervise sinks and flows.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A small note on when they would be restarted would be nice to add

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and re-starting after the same configured interval. By adding additional randomness to the
re-start intervals the streams will start in slightly different points in time, thus avoiding
large spikes of traffic hitting the recovering server or other resource that they all need to contact.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good explanation :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't claim credit, it came from the backoff supervisor documentation :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:v)

static Materializer materializer;

// Mocking akka-http
public static class Http {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see.. since we're in streams and don't have the dep hm...
I think this may work fine, these APIs are not going to change, let's keep the mocked versions - it's a nice tangible example after all

@@ -0,0 +1,461 @@
package akka.stream.scaladsl
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copyright (C) 2015-2017 Lightbend Inc. <http://www.lightbend.com>

// boring to add, I know, we'll try to add the sbt-header plugin soon again, we once failed at it due to some weirdness

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*
* This [[Source]] will never emit a complete or failure, since the completion or failure of the wrapped [[Source]]
* is always handled by restarting it. The wrapped [[Source]] can however be cancelled by cancelling this [[Source]].
* When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add here "This can be triggered simply by the upstream, or externally by introducing a [[KillSwitch]] right before this Sink in the graph."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flowFactory.create().asScala
}.asJava
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great docs in general 👍

@@ -184,13 +184,14 @@ object Source {
* Streams the elements of the given future source once it successfully completes.
* If the future fails the stream is failed.
*/
def fromFutureSource[T, M](future: Future[Graph[SourceShape[T], M]]): javadsl.Source[T, Future[M]] = new Source(scaladsl.Source.fromFutureSource(future))
def fromFutureSource[T, M](future: Future[_ <: Graph[SourceShape[T], M]]): javadsl.Source[T, Future[M]] = new Source(scaladsl.Source.fromFutureSource(future))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, should be a compatible change AFAICS

protected def backoff(): Unit

protected final def createSubInlet[T](out: Outlet[T]): SubSinkInlet[T] = {
val sinkIn = new SubSinkInlet[T](s"RestartWithBackoff$name.subin")
Copy link
Contributor

@ktoso ktoso Jul 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s"RestartWithBackoff$name.subIn"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which will supervise the given `Source`. The `Source` in this case is a stream of Server Sent Events,
produced by akka-http. If the stream fails at any point, the request will be made again, in increasing
intervals of 3, 6, 12, 24 and finally 30 seconds:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

finally 30 seconds (at which it will remain capped due to the maxBackoff parameter)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jroper
Copy link
Contributor Author

jroper commented Jul 20, 2017

I've updated the PR in response to the review.

@akka-ci akka-ci added the validating PR is currently being validated by Jenkins label Jul 20, 2017
@akka-ci akka-ci added needs-attention Indicates a PR validation failure (set by CI infrastructure) and removed tested PR that was successfully built and tested by Jenkins validating PR is currently being validated by Jenkins labels Jul 20, 2017
@akka-ci
Copy link

akka-ci commented Jul 20, 2017

Test FAILed.

@ktoso
Copy link
Contributor

ktoso commented Jul 20, 2017

Failure was formatting:


-      deadline.isOverdue() should be (true)
+      deadline.isOverdue() should be(true)

we started failing if a change is not formatted by scalariform (caused us weird hassles during merging).

Can you test:compile and push?

Looking good otherwise, ready to merge :)

@akka-ci akka-ci added validating PR is currently being validated by Jenkins needs-attention Indicates a PR validation failure (set by CI infrastructure) and removed needs-attention Indicates a PR validation failure (set by CI infrastructure) validating PR is currently being validated by Jenkins labels Jul 20, 2017
@akka-ci
Copy link

akka-ci commented Jul 20, 2017

Test FAILed.

@akka-ci akka-ci added validating PR is currently being validated by Jenkins tested PR that was successfully built and tested by Jenkins and removed needs-attention Indicates a PR validation failure (set by CI infrastructure) validating PR is currently being validated by Jenkins labels Jul 20, 2017
@akka-ci
Copy link

akka-ci commented Jul 20, 2017

Test PASSed.

*
* This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]].
*
* @param minBackoff minimum (initial) duration until the child actor will
Copy link

@QmQvl QmQvl Jul 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should have been child/wrapped/nested stream, I believe.

@q10
Copy link

q10 commented Oct 9, 2017

Thank you so much for adding this pattern to Akka!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
2 - pick next Used to mark issues which are next up in the queue to be worked on. The tag is non-binding tested PR that was successfully built and tested by Jenkins
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants