-
Notifications
You must be signed in to change notification settings - Fork 641
Description
Hi! First of all, thanks for all the hard work on developing the Alpakka. We're using it on daily basis in our project and it's very useful 🙂
Also, this is my first issue submission, if there's anything wrong or missing, please let me know
Versions used
Akka version: 2.8.4
Alpakka AMQP: 6.0.2
Expected Behavior
With restartable AMQP source, when server side closes the connection, client should attempt to reconnect
Actual Behavior
When using AMQPCachedConnectionProvider
, once server closes the connection, client is unable to reconnect properly
Relevant logs
09:58:01.382 [test-akka.actor.default-dispatcher-5] WARN a.s.s.RestartWithBackoffSource - Restarting stream due to failure [5]: com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - Closed via management plugin, class-id=0, method-id=0)
com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - Closed via management plugin, class-id=0, method-id=0)
at com.rabbitmq.client.impl.AMQConnection.ensureIsOpen(AMQConnection.java:175)
at com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:615)
at akka.stream.alpakka.amqp.impl.AmqpConnectorLogic.preStart(AmqpConnectorLogic.scala:30)
at akka.stream.alpakka.amqp.impl.AmqpConnectorLogic.preStart$(AmqpConnectorLogic.scala:27)
at akka.stream.alpakka.amqp.impl.AmqpSourceStage$$anon$1.preStart(AmqpSourceStage.scala:44)
at akka.stream.impl.fusing.GraphInterpreter.init(GraphInterpreter.scala:309)
at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:619)
at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:727)
at akka.stream.impl.fusing.ActorGraphInterpreter.finishShellRegistration(ActorGraphInterpreter.scala:770)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$shortCircuitBatch(ActorGraphInterpreter.scala:788)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:819)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:716)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)
at akka.actor.ActorCell.invoke(ActorCell.scala:547)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
Reproducible Test Case
The simplest I could come up with is the following (note: I tested with local RabbitMQ instance):
val amqpConnectionProvider: AmqpCachedConnectionProvider =
AmqpCachedConnectionProvider(
AmqpDetailsConnectionProvider("localhost", 5672)
.withConnectionName("myConnection")
.withAutomaticRecoveryEnabled(false)
.withTopologyRecoveryEnabled(false)
.withCredentials(AmqpCredentials("guest", "guest"))
)
val source = AmqpSource.committableSource(
NamedQueueSourceSettings(
amqpConnectionProvider,
"myQueue"
),
bufferSize = 10
)
val backoffSettings: RestartSettings = RestartSettings(1.second, 5.seconds, 0.2)
implicit val actorSystem: ActorSystem = ActorSystem("test")
RestartSource
.withBackoff(backoffSettings) { () =>
println("Restarting AMQP source")
source.map(_ => ())
}
.viaMat(KillSwitches.single)(Keep.right)
.toMat(Sink.ignore)(Keep.left)
.run()
I think this is a regression introduced here: https://github.com/akka/alpakka/pull/2959/files
Looks like after this update, internal provider is never asked to open a new connection again. I might be misunderstanding the idea though, feel free to correct me.
ℹ️ The same logic is working with Alpakka 5.0
I'll be happy to open a PR for this, if you think this is an actual issue (as a novice here I might need some guidance though 🙂 )
Cheers!