diff --git a/akka-amqp/pom.xml b/akka-amqp/pom.xml index 5e67d7a046..06a15a41a1 100644 --- a/akka-amqp/pom.xml +++ b/akka-amqp/pom.xml @@ -20,11 +20,16 @@ se.scalablesolutions.akka 0.6 - + com.rabbitmq rabbitmq-client 0.9.1 + + commons-io + commons-io + 1.4 + diff --git a/akka-amqp/src/main/scala/AMQP.scala b/akka-amqp/src/main/scala/AMQP.scala index faf271059c..43bcf82959 100644 --- a/akka-amqp/src/main/scala/AMQP.scala +++ b/akka-amqp/src/main/scala/AMQP.scala @@ -8,17 +8,60 @@ import java.lang.String import rabbitmq.client.{AMQP => RabbitMQ, _} import rabbitmq.client.ConnectionFactory -import se.scalablesolutions.akka.kernel.Kernel import se.scalablesolutions.akka.kernel.actor.Actor +import se.scalablesolutions.akka.kernel.util.Logging import se.scalablesolutions.akka.serialization.Serializer import java.util.{Timer, TimerTask} +/** + * AMQP Actor API. Implements Client and Endpoint materialized as Actors. + * + *
+ *   val messageConsumer = new Actor() {
+ *     def receive: PartialFunction[Any, Unit] = {
+ *       case Message(payload) => log.debug("Received message: %s", payload)
+ *     }
+ *   }
+ *   messageConsumer.start
+ *
+ *   val endpoint = new Endpoint(
+ *     new ConnectionFactory(CONFIG), HOSTNAME, PORT, EXCHANGE, QUEUE, ROUTING_KEY, ExchangeType.Direct, Serializer.Java)
+ *   endpoint.start
+ *
+ *   // register message consumer
+ *   endpoint ! MessageConsumer(messageConsumer)
+ *
+ *   val client = new Client(new ConnectionFactory(CONFIG), HOSTNAME, PORT, EXCHANGE, ROUTING_KEY, Serializer.Java, None)
+ *   client.start
+ *   client ! Message("Hi")
+ * 
+ * + * @author Jonas Bonér + */ object AMQP { case class Message(val payload: AnyRef) case class MessageConsumer(val listener: Actor) case class Reconnect(val delay: Long) + class MessageNotDeliveredException(message: String) extends RuntimeException(message) + + sealed trait ExchangeType + object ExchangeType { + case object Direct extends ExchangeType { + override def toString = "direct" + } + case object Topic extends ExchangeType { + override def toString = "topic" + } + case object Fanout extends ExchangeType { + override def toString = "fanout" + } + case object Match extends ExchangeType { + override def toString = "match" + } + } + /** * AMQP client actor. * Usage: @@ -28,7 +71,7 @@ object AMQP { * params.setPassword("obama") * params.setVirtualHost("/") * params.setRequestedHeartbeat(0) - * val client = new AMQP.Client(new ConnectionFactory(params), "localhost", 9889, "exchangeKey", "routingKey", Serializer.Java) + * val client = new AMQP.Client(new ConnectionFactory(params), "localhost", 5672, "exchangeName", "routingKey", Serializer.Java, None) * client.start * client ! Message("hi") * @@ -41,9 +84,28 @@ object AMQP { port: Int, exchangeKey: String, routingKey: String, - serializer: Serializer) extends Actor { + serializer: Serializer, + returnListener: Option[ReturnListener]) extends Actor { private val connection = connectionFactory.newConnection(hostname, port) private val channel = connection.createChannel + returnListener match { + case Some(listener) => channel.setReturnListener(listener) + case None => channel.setReturnListener(new ReturnListener() { + def handleBasicReturn( + replyCode: Int, + replyText: String, + exchange: String, + routingKey: String, + properties: RabbitMQ.BasicProperties, + body: Array[Byte]) = { + throw new MessageNotDeliveredException( + "Could not deliver message [" + replyText + + "] with reply code [" + replyCode + + "] and routing key [" + routingKey + + "] to exchange [" + exchange + "]") + } + }) + } def receive: PartialFunction[Any, Unit] = { case Message(msg: AnyRef) => send(msg) @@ -55,26 +117,50 @@ object AMQP { /** * @author Jonas Bonér */ - abstract class Endpoint(connectionFactory: ConnectionFactory, hostname: String, port: Int) extends Actor { + class Endpoint( + connectionFactory: ConnectionFactory, + hostname: String, + port: Int, + exchangeName: String, + queueName: String, + routingKey: String, + exchangeType: ExchangeType, + serializer: Serializer) extends Actor { private var connection = connectionFactory.newConnection(hostname, port) private var channel = connection.createChannel private val reconnectionTimer = new Timer private var listeners: List[Actor] = Nil private val endpoint = this + setupChannel - def init(channel: Channel) + private def setupChannel = { + channel.exchangeDeclare(exchangeName, exchangeType.toString) + channel.queueDeclare(queueName) + channel.queueBind(queueName, exchangeName, routingKey) + channel.basicConsume(queueName, false, new DefaultConsumer(channel) with Logging { + override def handleDelivery( + tag: String, + envelope: Envelope, + properties: RabbitMQ.BasicProperties, + payload: Array[Byte]) { + endpoint ! Message(serializer.in(payload, None)) + channel.basicAck(envelope.getDeliveryTag, false) + } + }) + } def receive: PartialFunction[Any, Unit] = { case message: Message => listeners.foreach(_ ! message) case MessageConsumer(listener) => listeners ::= listener case Reconnect(delay) => reconnect(delay) - case unknown => throw new IllegalArgumentException("Unknown message to AMQP dispatcher [" + unknown + "]") + case unknown => throw new IllegalArgumentException("Unknown message to AMQP Endpoint [" + unknown + "]") } private def reconnect(delay: Long) = { try { connection = connectionFactory.newConnection(hostname, port) channel = connection.createChannel + setupChannel log.debug("Successfully reconnected to AMQP Server") } catch { case e: Exception => @@ -90,44 +176,29 @@ object ExampleAMQPSession { def main(args: Array[String]) = { import AMQP._ val CONFIG = new ConnectionParameters - CONFIG.setUsername("barack") - CONFIG.setPassword("obama") - CONFIG.setVirtualHost("/") - CONFIG.setRequestedHeartbeat(0) - val EXCHANGE = "whitehouse.gov" - val QUEUE = "marketing" - val ROUTING_KEY = "newsletter" + val QUEUE = "twitter" + val ROUTING_KEY = "@barack_obama" val HOSTNAME = "localhost" - val PORT = 8787 + val PORT = 5672 val SERIALIZER = Serializer.Java val messageConsumer = new Actor() { def receive: PartialFunction[Any, Unit] = { - case Message(payload) => println("Received message: " + payload) + case Message(payload) => log.debug("Received message: %s", payload) } } messageConsumer.start - val endpoint = new Endpoint(new ConnectionFactory(CONFIG), HOSTNAME, PORT) { - override def init(channel: Channel) = { - channel.exchangeDeclare(EXCHANGE, "direct") - channel.queueDeclare(QUEUE) - channel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY) - channel.basicConsume(QUEUE, false, new DefaultConsumer(channel) { - override def handleDelivery(tag: String, envelope: Envelope, properties: RabbitMQ.BasicProperties, payload: Array[Byte]) { - messageConsumer ! Message(SERIALIZER.in(payload, None)) - channel.basicAck(envelope.getDeliveryTag, false) - } - }) - } - } + val endpoint = new Endpoint( + new ConnectionFactory(CONFIG), HOSTNAME, PORT, EXCHANGE, QUEUE, ROUTING_KEY, ExchangeType.Direct, SERIALIZER) endpoint.start + // register message consumer endpoint ! MessageConsumer(messageConsumer) - val client = new Client(new ConnectionFactory(CONFIG), HOSTNAME, PORT, EXCHANGE, ROUTING_KEY, SERIALIZER) + val client = new Client(new ConnectionFactory(CONFIG), HOSTNAME, PORT, EXCHANGE, ROUTING_KEY, SERIALIZER, None) client.start - client ! Message("The President: I'm going surfing") + client ! Message(ROUTING_KEY + " I'm going surfing") } } \ No newline at end of file diff --git a/akka.ipr b/akka.ipr index 175d776ea3..3e6d55e9a9 100644 --- a/akka.ipr +++ b/akka.ipr @@ -542,17 +542,6 @@ - - - - - - - - - - - @@ -597,17 +586,6 @@ - - - - - - - - - - - @@ -663,248 +641,6 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -938,50 +674,6 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -1004,28 +696,6 @@ - - - - - - - - - - - - - - - - - - - - - - @@ -1062,17 +732,6 @@ - - - - - - - - - - - @@ -1104,17 +763,6 @@ - - - - - - - - - - - @@ -1159,17 +807,6 @@ - - - - - - - - - - - @@ -1181,6 +818,319 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -1269,6 +1219,17 @@ + + + + + + + + + + + @@ -1280,28 +1241,6 @@ - - - - - - - - - - - - - - - - - - - - - - @@ -1324,6 +1263,17 @@ + + + + + + + + + + + @@ -1346,6 +1296,39 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -1379,6 +1362,226 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -1445,6 +1648,17 @@ + + + + + + + + + + + @@ -1500,196 +1714,26 @@ - + - + - + - + - + - + - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + diff --git a/akka.iws b/akka.iws index 9fc47e8b25..84dc45b7bb 100644 --- a/akka.iws +++ b/akka.iws @@ -2,23 +2,19 @@ - - - - + + + + + + + - - - - - - - - + @@ -70,82 +66,193 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - - + + + + + + + + + + + + + + + - - + + - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - + - - + + - + - - + + - + - - + + - + @@ -168,14 +275,15 @@ @@ -206,230 +314,6 @@ @@ -499,15 +357,19 @@ + + + + + + - - + - - + @@ -616,7 +478,13 @@ - + - + - + - - + + + - @@ -716,83 +584,233 @@ - + - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - + - + - + - + - + - + + + + + + + + + + + + + + + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + diff --git a/fun-test-java/akka-fun-test-java.iml b/fun-test-java/akka-fun-test-java.iml deleted file mode 100644 index e09beb736b..0000000000 --- a/fun-test-java/akka-fun-test-java.iml +++ /dev/null @@ -1,85 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/kernel/akka-kernel.iml b/kernel/akka-kernel.iml deleted file mode 100644 index 807760933a..0000000000 --- a/kernel/akka-kernel.iml +++ /dev/null @@ -1,95 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/samples-java/akka-samples-java.iml b/samples-java/akka-samples-java.iml deleted file mode 100644 index 97ec95ec37..0000000000 --- a/samples-java/akka-samples-java.iml +++ /dev/null @@ -1,78 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/samples-lift/akka-samples-lift.iml b/samples-lift/akka-samples-lift.iml deleted file mode 100644 index 5b3e6a796f..0000000000 --- a/samples-lift/akka-samples-lift.iml +++ /dev/null @@ -1,96 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/samples-scala/akka-samples-scala.iml b/samples-scala/akka-samples-scala.iml deleted file mode 100644 index f88fe93371..0000000000 --- a/samples-scala/akka-samples-scala.iml +++ /dev/null @@ -1,83 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/util-java/akka-util-java.iml b/util-java/akka-util-java.iml deleted file mode 100644 index a80717f779..0000000000 --- a/util-java/akka-util-java.iml +++ /dev/null @@ -1,17 +0,0 @@ - - - - - - - - - - - - - - - - -