- make consumer restart when delegated handling actor fails

- made single object to flag test enable/disable
This commit is contained in:
momania 2010-07-14 10:18:01 +02:00
parent 8170b9741f
commit 4d0b503bf0
12 changed files with 43 additions and 24 deletions

View file

@ -9,8 +9,7 @@ import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.config.OneForOneStrategy
import com.rabbitmq.client.{ReturnListener, ShutdownListener, ConnectionFactory}
import java.lang.IllegalArgumentException
import se.scalablesolutions.akka.util.{Logging}
import java.util.UUID
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.serialization.Serializer
/**

View file

@ -10,6 +10,7 @@ import com.rabbitmq.client._
import se.scalablesolutions.akka.amqp.AMQP.ConnectionParameters
import se.scalablesolutions.akka.actor.{Exit, Actor}
import se.scalablesolutions.akka.config.ScalaConfig.{Permanent, LifeCycle}
import se.scalablesolutions.akka.config.OneForOneStrategy
private[amqp] class FaultTolerantConnectionActor(connectionParameters: ConnectionParameters) extends Actor {
import connectionParameters._
@ -17,6 +18,9 @@ private[amqp] class FaultTolerantConnectionActor(connectionParameters: Connectio
self.id = "amqp-connection-%s".format(host)
self.lifeCycle = Some(LifeCycle(Permanent))
self.trapExit = List(classOf[Throwable])
self.faultHandler = Some(OneForOneStrategy(5, 5000))
val reconnectionTimer = new Timer("%s-timer".format(self.id))
val connectionFactory: ConnectionFactory = new ConnectionFactory()

View file

@ -17,8 +17,9 @@ import org.scalatest.matchers.MustMatchers
class AMQPConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging {
// @Test
def connectionAndRecovery = {
@Test
def connectionAndRecovery = if (AMQPTest.enabled) {
val connectedLatch = new StandardLatch
val reconnectingLatch = new StandardLatch
val reconnectedLatch = new StandardLatch

View file

@ -18,8 +18,8 @@ import se.scalablesolutions.akka.amqp.AMQP._
class AMQPConsumerChannelRecoveryTest extends JUnitSuite with MustMatchers with Logging {
// @Test
def consumerChannelRecovery = {
@Test
def consumerChannelRecovery = if (AMQPTest.enabled) {
val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50))
try {

View file

@ -18,8 +18,8 @@ import se.scalablesolutions.akka.amqp.AMQP._
class AMQPConsumerConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging {
// @Test
def consumerConnectionRecovery = {
@Test
def consumerConnectionRecovery = if (AMQPTest.enabled) {
val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50))
try {

View file

@ -17,8 +17,8 @@ import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ConsumerParamete
class AMQPConsumerManualAcknowledgeTest extends JUnitSuite with MustMatchers with Logging {
// @Test
def consumerMessageManualAcknowledge = {
@Test
def consumerMessageManualAcknowledge = if (AMQPTest.enabled) {
val connection = AMQP.newConnection()
try {
val countDown = new CountDownLatch(2)
@ -30,15 +30,21 @@ class AMQPConsumerManualAcknowledgeTest extends JUnitSuite with MustMatchers wit
val exchangeParameters = ExchangeParameters("text_exchange",ExchangeType.Direct)
val channelParameters = ChannelParameters(channelCallback = Some(channelCallback))
val failLatch = new StandardLatch
val acknowledgeLatch = new StandardLatch
var deliveryTagCheck: Long = -1
val consumer:ActorRef = AMQP.newConsumer(connection, ConsumerParameters(exchangeParameters, "manual.ack.this", actor {
case Delivery(payload, _, deliveryTag, _, sender) => {
deliveryTagCheck = deliveryTag
sender.foreach(_ ! Acknowledge(deliveryTag))
if (!failLatch.isOpen) {
failLatch.open
error("Make it fail!")
} else {
deliveryTagCheck = deliveryTag
sender.foreach(_ ! Acknowledge(deliveryTag))
}
}
case Acknowledged(deliveryTag) => if (deliveryTagCheck == deliveryTag) acknowledgeLatch.open
}, selfAcknowledging = false, channelParameters = Some(channelParameters)))
}, queueName = Some("self.ack.queue"), selfAcknowledging = false, queueAutoDelete = false, channelParameters = Some(channelParameters)))
val producer = AMQP.newProducer(connection,
ProducerParameters(exchangeParameters, channelParameters = Some(channelParameters)))

View file

@ -16,8 +16,8 @@ import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ConsumerParamete
class AMQPConsumerMessageTest extends JUnitSuite with MustMatchers with Logging {
// @Test
def consumerMessage = {
@Test
def consumerMessage = if (AMQPTest.enabled) {
val connection = AMQP.newConnection()
try {

View file

@ -17,8 +17,8 @@ import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ChannelParameter
class AMQPProducerChannelRecoveryTest extends JUnitSuite with MustMatchers with Logging {
// @Test
def producerChannelRecovery = {
@Test
def producerChannelRecovery = if (AMQPTest.enabled) {
val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50))

View file

@ -17,8 +17,8 @@ import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ChannelParameter
class AMQPProducerConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging {
// @Test
def producerConnectionRecovery = {
@Test
def producerConnectionRecovery = if (AMQPTest.enabled) {
val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50))
try {

View file

@ -19,8 +19,8 @@ import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ProducerParamete
class AMQPProducerMessageTest extends JUnitSuite with MustMatchers with Logging {
// @Test
def producerMessage = {
@Test
def producerMessage = if (AMQPTest.enabled) {
val connection: ActorRef = AMQP.newConnection()
try {

View file

@ -16,8 +16,8 @@ import se.scalablesolutions.akka.serialization.Serializer
class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging {
// @Test
def consumerMessage = {
@Test
def consumerMessage = if (AMQPTest.enabled) {
val connection = AMQP.newConnection()
try {
@ -28,7 +28,7 @@ class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging
case Stopped => ()
}
val exchangeParameters = ExchangeParameters("text_exchange",ExchangeType.Topic)
val exchangeParameters = ExchangeParameters("text_topic_exchange", ExchangeType.Topic)
val channelParameters = ChannelParameters(channelCallback = Some(channelCallback))
val stringSerializer = new Serializer {
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]) = new String(bytes)

View file

@ -0,0 +1,9 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.amqp.test
object AMQPTest {
def enabled = false
}