Skip to content

AMQP cached connection provider does not reconnect #3023

@bturos

Description

@bturos

Hi! First of all, thanks for all the hard work on developing the Alpakka. We're using it on daily basis in our project and it's very useful 🙂
Also, this is my first issue submission, if there's anything wrong or missing, please let me know

Versions used

Akka version: 2.8.4
Alpakka AMQP: 6.0.2

Expected Behavior

With restartable AMQP source, when server side closes the connection, client should attempt to reconnect

Actual Behavior

When using AMQPCachedConnectionProvider, once server closes the connection, client is unable to reconnect properly

Relevant logs

09:58:01.382 [test-akka.actor.default-dispatcher-5] WARN  a.s.s.RestartWithBackoffSource - Restarting stream due to failure [5]: com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - Closed via management plugin, class-id=0, method-id=0)
com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - Closed via management plugin, class-id=0, method-id=0)
	at com.rabbitmq.client.impl.AMQConnection.ensureIsOpen(AMQConnection.java:175)
	at com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:615)
	at akka.stream.alpakka.amqp.impl.AmqpConnectorLogic.preStart(AmqpConnectorLogic.scala:30)
	at akka.stream.alpakka.amqp.impl.AmqpConnectorLogic.preStart$(AmqpConnectorLogic.scala:27)
	at akka.stream.alpakka.amqp.impl.AmqpSourceStage$$anon$1.preStart(AmqpSourceStage.scala:44)
	at akka.stream.impl.fusing.GraphInterpreter.init(GraphInterpreter.scala:309)
	at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:619)
	at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:727)
	at akka.stream.impl.fusing.ActorGraphInterpreter.finishShellRegistration(ActorGraphInterpreter.scala:770)
	at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$shortCircuitBatch(ActorGraphInterpreter.scala:788)
	at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:819)
	at akka.actor.Actor.aroundReceive(Actor.scala:537)
	at akka.actor.Actor.aroundReceive$(Actor.scala:535)
	at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:716)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)
	at akka.actor.ActorCell.invoke(ActorCell.scala:547)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
	at akka.dispatch.Mailbox.run(Mailbox.scala:231)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)

Reproducible Test Case

The simplest I could come up with is the following (note: I tested with local RabbitMQ instance):

val amqpConnectionProvider: AmqpCachedConnectionProvider =
    AmqpCachedConnectionProvider(
      AmqpDetailsConnectionProvider("localhost", 5672)
        .withConnectionName("myConnection")
        .withAutomaticRecoveryEnabled(false)
        .withTopologyRecoveryEnabled(false)
        .withCredentials(AmqpCredentials("guest", "guest"))
    )

  val source = AmqpSource.committableSource(
    NamedQueueSourceSettings(
      amqpConnectionProvider,
      "myQueue"
    ),
    bufferSize = 10
  )

  val backoffSettings: RestartSettings = RestartSettings(1.second, 5.seconds, 0.2)

  implicit val actorSystem: ActorSystem = ActorSystem("test")

  RestartSource
    .withBackoff(backoffSettings) { () =>
      println("Restarting AMQP source")
      source.map(_ => ())
    }
    .viaMat(KillSwitches.single)(Keep.right)
    .toMat(Sink.ignore)(Keep.left)
    .run()

I think this is a regression introduced here: https://github.com/akka/alpakka/pull/2959/files
Looks like after this update, internal provider is never asked to open a new connection again. I might be misunderstanding the idea though, feel free to correct me.

ℹ️ The same logic is working with Alpakka 5.0

I'll be happy to open a PR for this, if you think this is an actual issue (as a novice here I might need some guidance though 🙂 )

Cheers!

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions