diff --git a/akka-amqp/src/main/scala/AMQP.scala b/akka-amqp/src/main/scala/AMQP.scala index ddb7e678a5..5b497bd1d4 100644 --- a/akka-amqp/src/main/scala/AMQP.scala +++ b/akka-amqp/src/main/scala/AMQP.scala @@ -4,15 +4,19 @@ package se.scalablesolutions.akka.amqp -import java.lang.String import com.rabbitmq.client.{AMQP => RabbitMQ, _} import com.rabbitmq.client.ConnectionFactory -import se.scalablesolutions.akka.kernel.actor.Actor -import se.scalablesolutions.akka.kernel.util.Logging -import se.scalablesolutions.akka.serialization.Serializer +import kernel.actor.{OneForOneStrategy, Actor} +import kernel.config.ScalaConfig._ +import kernel.util.Logging +import serialization.Serializer +import org.scala_tools.javautils.Imports._ + +import java.util.concurrent.ConcurrentHashMap import java.util.{Timer, TimerTask} +import java.io.IOException /** * AMQP Actor API. Implements Client and Endpoint materialized as Actors. @@ -25,34 +29,46 @@ import java.util.{Timer, TimerTask} * } * messageConsumer.start * - * val endpoint = new Endpoint( - * new ConnectionFactory(CONFIG), HOSTNAME, PORT, EXCHANGE, QUEUE, ROUTING_KEY, ExchangeType.Direct, Serializer.Java) - * endpoint.start + * val endpoint = AMQP.newEndpoint(CONFIG, HOSTNAME, PORT, EXCHANGE, QUEUE, ROUTING_KEY, ExchangeType.Direct, Serializer.Java, None, 100) * * // register message consumer * endpoint ! MessageConsumer(messageConsumer) * - * val client = new Client(new ConnectionFactory(CONFIG), HOSTNAME, PORT, EXCHANGE, ROUTING_KEY, Serializer.Java, None) - * client.start - * client ! Message("Hi") + * val client = AMQP.newClient(CONFIG, HOSTNAME, PORT, EXCHANGE, Serializer.Java, None, None, 100) + * client ! Message("Hi", ROUTING_KEY) * * * @author Jonas Bonér */ -object AMQP { - case class Message(val payload: AnyRef) - case class MessageConsumer(val listener: Actor) - case class Reconnect(val delay: Long) +object AMQP extends Actor { + private val connections = new ConcurrentHashMap[FaultTolerantConnectionActor, FaultTolerantConnectionActor] + faultHandler = Some(OneForOneStrategy(5, 5000)) + trapExit = true + start + + class Message(val payload: AnyRef, val routingKey: String, val mandatory: Boolean, val immediate: Boolean) + object Message { + def unapply(message: Message): Option[Tuple4[AnyRef, String, Boolean, Boolean]] = + Some((message.payload, message.routingKey, message.mandatory, message.immediate)) + def apply(payload: AnyRef, routingKey: String, mandatory: Boolean, immediate: Boolean): Message = + new Message(payload, routingKey, mandatory, immediate) + def apply(payload: AnyRef, routingKey: String): Message = + new Message(payload, routingKey, false, false) + } + case class MessageConsumer(listener: Actor) + case class Reconnect(delay: Long) + case class Failure(cause: Throwable) + case object Stop class MessageNotDeliveredException(message: String) extends RuntimeException(message) - sealed trait ExchangeType + sealed trait ExchangeType object ExchangeType { case object Direct extends ExchangeType { override def toString = "direct" } case object Topic extends ExchangeType { - override def toString = "topic" + override def toString = "topic" } case object Fanout extends ExchangeType { override def toString = "fanout" @@ -62,6 +78,61 @@ object AMQP { } } + def newClient( + config: ConnectionParameters, + hostname: String, + port: Int, + exchangeName: String, + serializer: Serializer, + returnListener: Option[ReturnListener], + shutdownListener: Option[ShutdownListener], + initReconnectDelay: Long): Client = { + val client = new Client( + new ConnectionFactory(config), + hostname, port, + exchangeName, + serializer, + returnListener, + shutdownListener, + initReconnectDelay) + startLink(client) + client + } + + def newEndpoint( + config: ConnectionParameters, + hostname: String, + port: Int, + exchangeName: String, + queueName: String, + routingKey: String, + exchangeType: ExchangeType, + serializer: Serializer, + shutdownListener: Option[ShutdownListener], + initReconnectDelay: Long): Endpoint = { + val endpoint = new Endpoint( + new ConnectionFactory(config), + hostname, port, + exchangeName, queueName, routingKey, + exchangeType, + serializer, + shutdownListener, + initReconnectDelay) + startLink(endpoint) + endpoint + } + + def stopConnection(connection: FaultTolerantConnectionActor) = { + connection ! Stop + unlink(connection) + connections.remove(connection) + } + + override def shutdown = { + connections.values.asScala.foreach(_ ! Stop) + stop + } + /** * AMQP client actor. * Usage: @@ -71,24 +142,24 @@ object AMQP { * params.setPassword("obama") * params.setVirtualHost("/") * params.setRequestedHeartbeat(0) - * val client = new AMQP.Client(new ConnectionFactory(params), "localhost", 5672, "exchangeName", "routingKey", Serializer.Java, None, None) - * client.start + * val client = AMQP.newClient(params, "localhost", 5672, "exchangeName", Serializer.Java, None, None, 100) * client ! Message("hi") * * * @author Jonas Bonér */ - class Client( - connectionFactory: ConnectionFactory, - hostname: String, - port: Int, + class Client private[amqp] ( + val connectionFactory: ConnectionFactory, + val hostname: String, + val port: Int, exchangeKey: String, - routingKey: String, serializer: Serializer, returnListener: Option[ReturnListener], - shutdownListener: Option[ShutdownListener]) extends Actor { - private val connection = connectionFactory.newConnection(hostname, port) - private val channel = connection.createChannel + shutdownListener: Option[ShutdownListener], + val initReconnectDelay: Long) + extends FaultTolerantConnectionActor { + var connection = connectionFactory.newConnection(hostname, port) + var channel = connection.createChannel returnListener match { case Some(listener) => channel.setReturnListener(listener) case None => channel.setReturnListener(new ReturnListener() { @@ -110,34 +181,38 @@ object AMQP { if (shutdownListener.isDefined) connection.addShutdownListener(shutdownListener.get) def receive: PartialFunction[Any, Unit] = { - case Message(msg: AnyRef) => send(msg) + case Message(payload, routingKey, mandatory, immediate) => + channel.basicPublish(exchangeKey, routingKey, mandatory, immediate, null, serializer.out(payload)) + case Stop => + disconnect; stop } - protected def send(message: AnyRef) = channel.basicPublish(exchangeKey, routingKey, null, serializer.out(message)) + def setupChannel = {} } /** * @author Jonas Bonér */ - class Endpoint( - connectionFactory: ConnectionFactory, - hostname: String, - port: Int, + class Endpoint private[amqp] ( + val connectionFactory: ConnectionFactory, + val hostname: String, + val port: Int, exchangeName: String, queueName: String, routingKey: String, exchangeType: ExchangeType, serializer: Serializer, - shutdownListener: Option[ShutdownListener]) extends Actor { - private var connection = connectionFactory.newConnection(hostname, port) - private var channel = connection.createChannel - private val reconnectionTimer = new Timer - private var listeners: List[Actor] = Nil - private val endpoint = this + shutdownListener: Option[ShutdownListener], + val initReconnectDelay: Long) + extends FaultTolerantConnectionActor { + var connection = connectionFactory.newConnection(hostname, port) + var channel = connection.createChannel + var listeners: List[Actor] = Nil + val endpoint = this if (shutdownListener.isDefined) connection.addShutdownListener(shutdownListener.get) setupChannel - private def setupChannel = { + def setupChannel = { channel.exchangeDeclare(exchangeName, exchangeType.toString) channel.queueDeclare(queueName) channel.queueBind(queueName, exchangeName, routingKey) @@ -147,33 +222,80 @@ object AMQP { envelope: Envelope, properties: RabbitMQ.BasicProperties, payload: Array[Byte]) { - endpoint ! Message(serializer.in(payload, None)) - channel.basicAck(envelope.getDeliveryTag, false) + try { + endpoint ! Message(serializer.in(payload, None), envelope.getRoutingKey) + channel.basicAck(envelope.getDeliveryTag, false) + } catch { + case cause => endpoint ! Failure(cause) // pass on and rethrow exception in endpoint actor to trigger restart and reconnect + } } }) } def receive: PartialFunction[Any, Unit] = { - case message: Message => listeners.foreach(_ ! message) case MessageConsumer(listener) => listeners ::= listener - case Reconnect(delay) => reconnect(delay) - case unknown => throw new IllegalArgumentException("Unknown message to AMQP Endpoint [" + unknown + "]") + case message: Message => listeners.foreach(_ ! message) + case Reconnect(delay) => reconnect(delay) + case Failure(cause) => throw cause + case Stop => disconnect; stop + case unknown => throw new IllegalArgumentException("Unknown message to AMQP Endpoint [" + unknown + "]") + } + } + + trait FaultTolerantConnectionActor extends Actor { + lifeCycleConfig = Some(LifeCycle(Permanent, 100)) + + val reconnectionTimer = new Timer + + var connection: Connection + var channel: Channel + + val connectionFactory: ConnectionFactory + val hostname: String + val port: Int + val initReconnectDelay: Long + + def setupChannel + + protected def disconnect = { + try { + channel.close + } catch { + case e: IOException => log.error("Could not close AMQP channel %s:%s", hostname, port) + case _ => () + } + try { + connection.close + log.debug("Disconnected AMQP connection at %s:%s", hostname, port) + } catch { + case e: IOException => log.error("Could not close AMQP connection %s:%s", hostname, port) + case _ => () + } } - private def reconnect(delay: Long) = { + protected def reconnect(delay: Long) = { + disconnect try { connection = connectionFactory.newConnection(hostname, port) channel = connection.createChannel setupChannel - log.debug("Successfully reconnected to AMQP Server") + log.debug("Successfully reconnected to AMQP Server %s:%s", hostname, port) } catch { case e: Exception => val waitInMillis = delay * 2 - log.debug("Trying to reconnect to AMQP server in %n milliseconds" + waitInMillis) - reconnectionTimer.schedule(new TimerTask() {override def run = endpoint ! Reconnect(waitInMillis)}, delay) + val self = this + log.debug("Trying to reconnect to AMQP server in %n milliseconds", waitInMillis) + reconnectionTimer.schedule(new TimerTask() { override def run = self ! Reconnect(waitInMillis) }, delay) } } + + override def preRestart(reason: AnyRef, config: Option[AnyRef]) = disconnect + override def postRestart(reason: AnyRef, config: Option[AnyRef]) = reconnect(initReconnectDelay) } + + def receive: PartialFunction[Any, Unit] = { + case _ => {} // ignore all messages + } } object ExampleAMQPSession { @@ -189,20 +311,23 @@ object ExampleAMQPSession { val messageConsumer = new Actor() { def receive: PartialFunction[Any, Unit] = { - case Message(payload) => log.debug("Received message: %s", payload) + case Message(payload, _, _, _) => log.debug("Received message: %s", payload) } } messageConsumer.start - val endpoint = new Endpoint( - new ConnectionFactory(CONFIG), HOSTNAME, PORT, EXCHANGE, QUEUE, ROUTING_KEY, ExchangeType.Direct, SERIALIZER, None) - endpoint.start + val endpoint = AMQP.newEndpoint(CONFIG, HOSTNAME, PORT, EXCHANGE, QUEUE, ROUTING_KEY, ExchangeType.Direct, SERIALIZER, None, 100) // register message consumer endpoint ! MessageConsumer(messageConsumer) - val client = new Client(new ConnectionFactory(CONFIG), HOSTNAME, PORT, EXCHANGE, ROUTING_KEY, SERIALIZER, None, None) - client.start - client ! Message(ROUTING_KEY + " I'm going surfing") + val client = AMQP.newClient(CONFIG, HOSTNAME, PORT, EXCHANGE, SERIALIZER, None, None, 100) + client ! Message(ROUTING_KEY + " I'm going surfing", ROUTING_KEY) + Thread.sleep(1000) + client ! Message(ROUTING_KEY + " I'm going surfing", ROUTING_KEY) + Thread.sleep(1000) + client ! Message(ROUTING_KEY + " I'm going surfing", ROUTING_KEY) + Thread.sleep(1000) + client ! Message(ROUTING_KEY + " I'm going surfing", ROUTING_KEY) } } \ No newline at end of file diff --git a/akka.ipr b/akka.ipr index 3e6d55e9a9..569452eef7 100644 --- a/akka.ipr +++ b/akka.ipr @@ -1362,6 +1362,17 @@ + + + + + + + + + + + @@ -1582,6 +1593,17 @@ + + + + + + + + + + + @@ -1714,28 +1736,6 @@ - - - - - - - - - - - - - - - - - - - - - - diff --git a/akka.iws b/akka.iws index 84dc45b7bb..772e484768 100644 --- a/akka.iws +++ b/akka.iws @@ -2,13 +2,7 @@ - - - - - - - + @@ -69,59 +63,8 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + @@ -130,20 +73,7 @@ - - - - - - - - - - - - - - + @@ -152,72 +82,7 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + @@ -225,7 +90,25 @@ - + + + + + + + + + + + + + + + + + + + @@ -240,15 +123,6 @@ - - - - - - - - - @@ -268,7 +142,6 @@ @@ -319,24 +193,10 @@ @@ -467,7 +327,7 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + @@ -762,9 +486,30 @@ + + + + + + + + + + + + + + + + + + + + + - + @@ -776,9 +521,23 @@ + + + + + + + + + + + + + + - + diff --git a/kernel/src/main/scala/util/Scheduler.scala b/kernel/src/main/scala/util/Scheduler.scala index 2d7bfed445..f8e77fe797 100644 --- a/kernel/src/main/scala/util/Scheduler.scala +++ b/kernel/src/main/scala/util/Scheduler.scala @@ -15,6 +15,7 @@ package se.scalablesolutions.akka.kernel.util import java.util.concurrent._ import kernel.actor.{OneForOneStrategy, Actor} +import kernel.config.ScalaConfig._ import org.scala_tools.javautils.Imports._ @@ -26,6 +27,8 @@ case class SchedulerException(msg: String, e: Throwable) extends RuntimeExceptio * which is licensed under the Apache 2 License. */ class ScheduleActor(val receiver: Actor, val future: ScheduledFuture[AnyRef]) extends Actor with Logging { + lifeCycleConfig = Some(LifeCycle(Permanent, 100)) + def receive: PartialFunction[Any, Unit] = { case UnSchedule => Scheduler.stopSupervising(this)