diff --git a/akka-amqp/src/main/scala/AMQP.scala b/akka-amqp/src/main/scala/AMQP.scala index 974a7196a6..be3556e812 100644 --- a/akka-amqp/src/main/scala/AMQP.scala +++ b/akka-amqp/src/main/scala/AMQP.scala @@ -9,8 +9,7 @@ import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.config.OneForOneStrategy import com.rabbitmq.client.{ReturnListener, ShutdownListener, ConnectionFactory} import java.lang.IllegalArgumentException -import se.scalablesolutions.akka.util.{Logging} -import java.util.UUID +import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.serialization.Serializer /** diff --git a/akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala b/akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala index 32687ee8dd..5f0a49910e 100644 --- a/akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala +++ b/akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala @@ -10,6 +10,7 @@ import com.rabbitmq.client._ import se.scalablesolutions.akka.amqp.AMQP.ConnectionParameters import se.scalablesolutions.akka.actor.{Exit, Actor} import se.scalablesolutions.akka.config.ScalaConfig.{Permanent, LifeCycle} +import se.scalablesolutions.akka.config.OneForOneStrategy private[amqp] class FaultTolerantConnectionActor(connectionParameters: ConnectionParameters) extends Actor { import connectionParameters._ @@ -17,6 +18,9 @@ private[amqp] class FaultTolerantConnectionActor(connectionParameters: Connectio self.id = "amqp-connection-%s".format(host) self.lifeCycle = Some(LifeCycle(Permanent)) + self.trapExit = List(classOf[Throwable]) + self.faultHandler = Some(OneForOneStrategy(5, 5000)) + val reconnectionTimer = new Timer("%s-timer".format(self.id)) val connectionFactory: ConnectionFactory = new ConnectionFactory() diff --git a/akka-amqp/src/test/scala/AMQPConnectionRecoveryTest.scala b/akka-amqp/src/test/scala/AMQPConnectionRecoveryTest.scala index 72ccab3cc1..3bc2cb20dd 100644 --- a/akka-amqp/src/test/scala/AMQPConnectionRecoveryTest.scala +++ b/akka-amqp/src/test/scala/AMQPConnectionRecoveryTest.scala @@ -17,8 +17,9 @@ import org.scalatest.matchers.MustMatchers class AMQPConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging { -// @Test - def connectionAndRecovery = { + @Test + def connectionAndRecovery = if (AMQPTest.enabled) { + val connectedLatch = new StandardLatch val reconnectingLatch = new StandardLatch val reconnectedLatch = new StandardLatch diff --git a/akka-amqp/src/test/scala/AMQPConsumerChannelRecoveryTest.scala b/akka-amqp/src/test/scala/AMQPConsumerChannelRecoveryTest.scala index b9bf0e3dbe..0f6fadfcc4 100644 --- a/akka-amqp/src/test/scala/AMQPConsumerChannelRecoveryTest.scala +++ b/akka-amqp/src/test/scala/AMQPConsumerChannelRecoveryTest.scala @@ -18,8 +18,8 @@ import se.scalablesolutions.akka.amqp.AMQP._ class AMQPConsumerChannelRecoveryTest extends JUnitSuite with MustMatchers with Logging { -// @Test - def consumerChannelRecovery = { + @Test + def consumerChannelRecovery = if (AMQPTest.enabled) { val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50)) try { diff --git a/akka-amqp/src/test/scala/AMQPConsumerConnectionRecoveryTest.scala b/akka-amqp/src/test/scala/AMQPConsumerConnectionRecoveryTest.scala index 34a135f091..9dccd43be8 100644 --- a/akka-amqp/src/test/scala/AMQPConsumerConnectionRecoveryTest.scala +++ b/akka-amqp/src/test/scala/AMQPConsumerConnectionRecoveryTest.scala @@ -18,8 +18,8 @@ import se.scalablesolutions.akka.amqp.AMQP._ class AMQPConsumerConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging { -// @Test - def consumerConnectionRecovery = { + @Test + def consumerConnectionRecovery = if (AMQPTest.enabled) { val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50)) try { diff --git a/akka-amqp/src/test/scala/AMQPConsumerManualAcknowledgeTest.scala b/akka-amqp/src/test/scala/AMQPConsumerManualAcknowledgeTest.scala index c616630317..d48f38afc5 100644 --- a/akka-amqp/src/test/scala/AMQPConsumerManualAcknowledgeTest.scala +++ b/akka-amqp/src/test/scala/AMQPConsumerManualAcknowledgeTest.scala @@ -17,8 +17,8 @@ import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ConsumerParamete class AMQPConsumerManualAcknowledgeTest extends JUnitSuite with MustMatchers with Logging { -// @Test - def consumerMessageManualAcknowledge = { + @Test + def consumerMessageManualAcknowledge = if (AMQPTest.enabled) { val connection = AMQP.newConnection() try { val countDown = new CountDownLatch(2) @@ -30,15 +30,21 @@ class AMQPConsumerManualAcknowledgeTest extends JUnitSuite with MustMatchers wit val exchangeParameters = ExchangeParameters("text_exchange",ExchangeType.Direct) val channelParameters = ChannelParameters(channelCallback = Some(channelCallback)) + val failLatch = new StandardLatch val acknowledgeLatch = new StandardLatch var deliveryTagCheck: Long = -1 val consumer:ActorRef = AMQP.newConsumer(connection, ConsumerParameters(exchangeParameters, "manual.ack.this", actor { case Delivery(payload, _, deliveryTag, _, sender) => { - deliveryTagCheck = deliveryTag - sender.foreach(_ ! Acknowledge(deliveryTag)) + if (!failLatch.isOpen) { + failLatch.open + error("Make it fail!") + } else { + deliveryTagCheck = deliveryTag + sender.foreach(_ ! Acknowledge(deliveryTag)) + } } case Acknowledged(deliveryTag) => if (deliveryTagCheck == deliveryTag) acknowledgeLatch.open - }, selfAcknowledging = false, channelParameters = Some(channelParameters))) + }, queueName = Some("self.ack.queue"), selfAcknowledging = false, queueAutoDelete = false, channelParameters = Some(channelParameters))) val producer = AMQP.newProducer(connection, ProducerParameters(exchangeParameters, channelParameters = Some(channelParameters))) diff --git a/akka-amqp/src/test/scala/AMQPConsumerMessageTest.scala b/akka-amqp/src/test/scala/AMQPConsumerMessageTest.scala index dd01e4729a..af94b0a515 100644 --- a/akka-amqp/src/test/scala/AMQPConsumerMessageTest.scala +++ b/akka-amqp/src/test/scala/AMQPConsumerMessageTest.scala @@ -16,8 +16,8 @@ import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ConsumerParamete class AMQPConsumerMessageTest extends JUnitSuite with MustMatchers with Logging { -// @Test - def consumerMessage = { + @Test + def consumerMessage = if (AMQPTest.enabled) { val connection = AMQP.newConnection() try { diff --git a/akka-amqp/src/test/scala/AMQPProducerChannelRecoveryTest.scala b/akka-amqp/src/test/scala/AMQPProducerChannelRecoveryTest.scala index b2ad2e2e58..095a21fc86 100644 --- a/akka-amqp/src/test/scala/AMQPProducerChannelRecoveryTest.scala +++ b/akka-amqp/src/test/scala/AMQPProducerChannelRecoveryTest.scala @@ -17,8 +17,8 @@ import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ChannelParameter class AMQPProducerChannelRecoveryTest extends JUnitSuite with MustMatchers with Logging { -// @Test - def producerChannelRecovery = { + @Test + def producerChannelRecovery = if (AMQPTest.enabled) { val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50)) diff --git a/akka-amqp/src/test/scala/AMQPProducerConnectionRecoveryTest.scala b/akka-amqp/src/test/scala/AMQPProducerConnectionRecoveryTest.scala index c0463469c9..71bc08bdaa 100644 --- a/akka-amqp/src/test/scala/AMQPProducerConnectionRecoveryTest.scala +++ b/akka-amqp/src/test/scala/AMQPProducerConnectionRecoveryTest.scala @@ -17,8 +17,8 @@ import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ChannelParameter class AMQPProducerConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging { -// @Test - def producerConnectionRecovery = { + @Test + def producerConnectionRecovery = if (AMQPTest.enabled) { val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50)) try { diff --git a/akka-amqp/src/test/scala/AMQPProducerMessageTest.scala b/akka-amqp/src/test/scala/AMQPProducerMessageTest.scala index d426031230..ab9bb00e7c 100644 --- a/akka-amqp/src/test/scala/AMQPProducerMessageTest.scala +++ b/akka-amqp/src/test/scala/AMQPProducerMessageTest.scala @@ -19,8 +19,8 @@ import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ProducerParamete class AMQPProducerMessageTest extends JUnitSuite with MustMatchers with Logging { -// @Test - def producerMessage = { + @Test + def producerMessage = if (AMQPTest.enabled) { val connection: ActorRef = AMQP.newConnection() try { diff --git a/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala b/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala index 08f9f47a32..eebcfccce3 100644 --- a/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala +++ b/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala @@ -16,8 +16,8 @@ import se.scalablesolutions.akka.serialization.Serializer class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging { -// @Test - def consumerMessage = { + @Test + def consumerMessage = if (AMQPTest.enabled) { val connection = AMQP.newConnection() try { @@ -28,7 +28,7 @@ class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging case Stopped => () } - val exchangeParameters = ExchangeParameters("text_exchange",ExchangeType.Topic) + val exchangeParameters = ExchangeParameters("text_topic_exchange", ExchangeType.Topic) val channelParameters = ChannelParameters(channelCallback = Some(channelCallback)) val stringSerializer = new Serializer { def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]) = new String(bytes) diff --git a/akka-amqp/src/test/scala/AMQPTest.scala b/akka-amqp/src/test/scala/AMQPTest.scala new file mode 100644 index 0000000000..e50ab673f6 --- /dev/null +++ b/akka-amqp/src/test/scala/AMQPTest.scala @@ -0,0 +1,9 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.amqp.test + +object AMQPTest { + def enabled = false +} \ No newline at end of file