Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -367,12 +367,13 @@ final class AmqpCachedConnectionProvider private (val provider: AmqpConnectionPr
def withAutomaticRelease(automaticRelease: Boolean): AmqpCachedConnectionProvider =
copy(automaticRelease = automaticRelease)

private lazy val connection = provider.get

@tailrec
override def get: Connection = state.get match {
case Empty =>
if (state.compareAndSet(Empty, Connecting)) {
try {
val connection = provider.get
if (!state.compareAndSet(Connecting, Connected(connection, 1)))
throw new ConcurrentModificationException(
"Unexpected concurrent modification while creating the connection."
Expand All @@ -392,26 +393,33 @@ final class AmqpCachedConnectionProvider private (val provider: AmqpConnectionPr
case Closing => get
}

@tailrec
override def release(connection: Connection): Unit = state.get match {
case Empty => throw new IllegalStateException("There is no connection to release.")
case Connecting => release(connection)
case c @ Connected(cachedConnection, clients) =>
if (cachedConnection != connection)
throw new IllegalArgumentException("Can't release a connection that's not owned by this provider")

if (clients == 1 || !automaticRelease) {
if (state.compareAndSet(c, Closing)) {
provider.release(connection)
if (!state.compareAndSet(Closing, Empty))
throw new ConcurrentModificationException(
"Unexpected concurrent modification while closing the connection."
)
}
} else {
if (!state.compareAndSet(c, Connected(cachedConnection, clients - 1))) release(connection)
override def release(connectionForRelease: Connection): Unit = {

@tailrec
def releaseRecursive(connectionForRelease: Connection, provider: AmqpConnectionProvider): Unit = {
state.get match {
case Empty => throw new IllegalStateException("There is no connection to release.")
case Connecting => releaseRecursive(connectionForRelease, provider)
case c @ Connected(cachedConnection, clients) =>
if (cachedConnection != connectionForRelease)
throw new IllegalArgumentException("Can't release a connection that's not owned by this provider")

if (clients == 1 || !automaticRelease) {
if (state.compareAndSet(c, Closing)) {
provider.release(connectionForRelease)
if (!state.compareAndSet(Closing, Empty))
throw new ConcurrentModificationException(
"Unexpected concurrent modification while closing the connection."
)
}
} else {
if (!state.compareAndSet(c, Connected(cachedConnection, clients - 1)))
releaseRecursive(connectionForRelease, provider)
}
case Closing => releaseRecursive(connectionForRelease, provider)
}
case Closing => release(connection)
}
releaseRecursive(connectionForRelease, provider)
}

private def copy(automaticRelease: Boolean): AmqpCachedConnectionProvider =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import scala.concurrent.{Future, Promise}
* the queue named in the replyTo options of the message instead of from settings declared at construction.
*/
@InternalApi
private[amqp] final class AmqpReplyToSinkStage(settings: AmqpReplyToSinkSettings)
private[amqp] final class AmqpReplyToSinkStage(replyToSettings: AmqpReplyToSinkSettings)
extends GraphStageWithMaterializedValue[SinkShape[WriteMessage], Future[Done]] { stage =>

val in = Inlet[WriteMessage]("AmqpReplyToSink.in")
Expand All @@ -31,7 +31,7 @@ private[amqp] final class AmqpReplyToSinkStage(settings: AmqpReplyToSinkSettings
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {
val streamCompletion = Promise[Done]()
(new GraphStageLogic(shape) with AmqpConnectorLogic {
override val settings = stage.settings
override val settings = stage.replyToSettings

override def whenConnected(): Unit = pull(in)

Expand Down Expand Up @@ -73,7 +73,7 @@ private[amqp] final class AmqpReplyToSinkStage(settings: AmqpReplyToSinkSettings
elem.properties.orNull,
elem.bytes.toArray
)
} else if (settings.failIfReplyToMissing) {
} else if (replyToSettings.failIfReplyToMissing) {
onFailure(new RuntimeException("Reply-to header was not set"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import scala.util.Success
* can be overridden per message by including `expectedReplies` in the the header of the [[akka.stream.alpakka.amqp.WriteMessage]]
*/
@InternalApi
private[amqp] final class AmqpRpcFlowStage(settings: AmqpWriteSettings, bufferSize: Int, responsesPerMessage: Int = 1)
private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings,
bufferSize: Int,
responsesPerMessage: Int = 1)
extends GraphStageWithMaterializedValue[FlowShape[WriteMessage, CommittableReadResult], Future[String]] {
stage =>

Expand All @@ -43,9 +45,9 @@ private[amqp] final class AmqpRpcFlowStage(settings: AmqpWriteSettings, bufferSi
val streamCompletion = Promise[String]()
(new GraphStageLogic(shape) with AmqpConnectorLogic {

override val settings = stage.settings
private val exchange = settings.exchange.getOrElse("")
private val routingKey = settings.routingKey.getOrElse("")
override val settings = stage.writeSettings
private val exchange = writeSettings.exchange.getOrElse("")
private val routingKey = writeSettings.routingKey.getOrElse("")
private val queue = mutable.Queue[CommittableReadResult]()
private var queueName: String = _
private var unackedMessages = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.concurrent.{Future, Promise}
* instead of complete [[WriteResult]] (possibly it would be less confusing for users), but [[WriteResult]] is used
* for consistency with other variants and to make the flow ready for any possible future [[WriteResult]] extensions.
*/
@InternalApi private[amqp] final class AmqpSimpleFlowStage[T](settings: AmqpWriteSettings)
@InternalApi private[amqp] final class AmqpSimpleFlowStage[T](writeSettings: AmqpWriteSettings)
extends GraphStageWithMaterializedValue[FlowShape[(WriteMessage, T), (WriteResult, T)], Future[Done]] { stage =>

private val in: Inlet[(WriteMessage, T)] = Inlet(Logging.simpleName(this) + ".in")
Expand All @@ -35,13 +35,13 @@ import scala.concurrent.{Future, Promise}

override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {
val streamCompletion = Promise[Done]()
(new AbstractAmqpFlowStageLogic[T](settings, streamCompletion, shape) {
(new AbstractAmqpFlowStageLogic[T](writeSettings, streamCompletion, shape) {
override def publish(message: WriteMessage, passThrough: T): Unit = {
log.debug("Publishing message {}.", message)

channel.basicPublish(
settings.exchange.getOrElse(""),
message.routingKey.orElse(settings.routingKey).getOrElse(""),
message.routingKey.orElse(writeSettings.routingKey).getOrElse(""),
message.mandatory,
message.immediate,
message.properties.orNull,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import akka.stream.alpakka.amqp.impl
import akka.stream.alpakka.amqp.{AmqpSourceSettings, ReadResult}
import akka.stream.scaladsl.Source

import scala.concurrent.ExecutionContext

object AmqpSource {
private implicit val executionContext = ExecutionContexts.parasitic
private implicit val executionContext: ExecutionContext = ExecutionContexts.parasitic

/**
* Scala API: Convenience for "at-most once delivery" semantics. Each message is acked to RabbitMQ
Expand Down
6 changes: 4 additions & 2 deletions amqp/src/test/scala/akka/stream/alpakka/amqp/AmqpSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

import scala.concurrent.ExecutionContext

abstract class AmqpSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll with ScalaFutures with LogCapturing {

implicit val system = ActorSystem(this.getClass.getSimpleName)
implicit val executionContext = ExecutionContexts.parasitic
implicit val system: ActorSystem = ActorSystem(this.getClass.getSimpleName)
implicit val executionContext: ExecutionContext = ExecutionContexts.parasitic

override protected def afterAll(): Unit =
system.terminate()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package akka.stream.alpakka.amqp.scaladsl
import akka.Done
import akka.stream.alpakka.amqp._
import akka.stream.scaladsl.{Flow, FlowWithContext, Keep, Sink, Source}
import akka.stream.testkit.TestSubscriber
import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
import akka.stream.testkit.scaladsl.{TestSink, TestSource}
import akka.util.ByteString
Expand Down Expand Up @@ -309,7 +310,7 @@ class AmqpFlowSpec extends AmqpSpec with AmqpMocking with BeforeAndAfterEach {
val input = Vector("one", "two", "three", "four", "five")
val expectedOutput = input.map(s => (WriteResult.confirmed, s))

val (completion, probe) =
val (completion: Future[Done], probe: TestSubscriber.Probe[(WriteResult, String)]) =
Source(input)
.map(s => (WriteMessage(ByteString(s)), s))
.viaMat(flow)(Keep.right)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package akka.stream.alpakka.amqp.scaladsl

import java.util.concurrent.ExecutorService
import java.util.concurrent.atomic.AtomicInteger

import akka.actor.ActorSystem
import akka.dispatch.ExecutionContexts
import akka.stream.alpakka.amqp.{
Expand All @@ -23,7 +22,7 @@ import com.rabbitmq.client.{AddressResolver, Connection, ConnectionFactory, Shut
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.BeforeAndAfterEach

import scala.concurrent.Future
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.util.control.NonFatal
import org.scalatest.matchers.should.Matchers
Expand All @@ -41,7 +40,7 @@ class AmqpGraphStageLogicConnectionShutdownSpec
with LogCapturing {

override implicit val patienceConfig = PatienceConfig(10.seconds)
private implicit val executionContext = ExecutionContexts.parasitic
private implicit val executionContext: ExecutionContext = ExecutionContexts.parasitic

val shutdownsAdded = new AtomicInteger()
val shutdownsRemoved = new AtomicInteger()
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ TaskKey[Unit]("verifyCodeFmt") := {

addCommandAlias("verifyCodeStyle", "headerCheck; verifyCodeFmt")

lazy val amqp = alpakkaProject("amqp", "amqp", Dependencies.Amqp)
lazy val amqp = alpakkaProject("amqp", "amqp", Dependencies.Amqp, Scala3.settings)

lazy val avroparquet =
alpakkaProject("avroparquet", "avroparquet", Dependencies.AvroParquet)
Expand Down