diff --git a/akka-amqp/src/main/scala/AMQP.scala b/akka-amqp/src/main/scala/AMQP.scala index 74d0350a17..7e46ca1b9f 100644 --- a/akka-amqp/src/main/scala/AMQP.scala +++ b/akka-amqp/src/main/scala/AMQP.scala @@ -4,6 +4,7 @@ package com.scalablesolutions.akka.amqp +import java.lang.String import rabbitmq.client.{AMQP => RabbitMQ, _} import rabbitmq.client.ConnectionFactory @@ -15,7 +16,7 @@ import java.util.{Timer, TimerTask} object AMQP { case class Message(val payload: AnyRef) - case class AddListener(val listener: Actor) + case class MessageConsumer(val listener: Actor) case class Reconnect(val delay: Long) /** @@ -23,8 +24,8 @@ object AMQP { * Usage: *
* val params = new ConnectionParameters
- * params.setUsername("bob")
- * params.setPassword("wilber")
+ * params.setUsername("barack")
+ * params.setPassword("obama")
* params.setVirtualHost("/")
* params.setRequestedHeartbeat(0)
* val client = new AMQP.Client(new ConnectionFactory(params), "localhost", 9889, "exchangeKey", "routingKey", Serializer.Java)
@@ -62,11 +63,11 @@ object AMQP {
private var listeners: List[Actor] = Nil
private val endpoint = this
- def init(channel: Channel)
+ def init(channel: Channel)
def receive: PartialFunction[Any, Unit] = {
case message: Message => listeners.foreach(_ ! message)
- case AddListener(listener) => listeners ::= listener
+ case MessageConsumer(listener) => listeners ::= listener
case Reconnect(delay) => reconnect(delay)
case unknown => throw new IllegalArgumentException("Unknown message to AMQP dispatcher [" + unknown + "]")
}
@@ -86,36 +87,47 @@ object AMQP {
}
}
-class ExampleAMQPEnpointRunner {
- val params = new ConnectionParameters
- params.setUsername("guest")
- params.setPassword("guest")
- params.setVirtualHost("/")
- params.setRequestedHeartbeat(0)
- class ExampleConsumer(channel: Channel, listener: Actor)
- val endpoint = new AMQP.Endpoint(new ConnectionFactory(params), "localhost", 5672) {
+class ExampleAMQPSession {
+ import AMQP._
+
+ val CONFIG = new ConnectionParameters
+ CONFIG.setUsername("barack")
+ CONFIG.setPassword("obama")
+ CONFIG.setVirtualHost("/")
+ CONFIG.setRequestedHeartbeat(0)
+
+ val EXCHANGE = "whitehouse.gov"
+ val QUEUE = "marketing"
+ val ROUTING_KEY = "newsletter"
+ val HOSTNAME = "localhost"
+ val PORT = 8787
+ val SERIALIZER = Serializer.Java
+
+ val endpoint = new Endpoint(new ConnectionFactory(CONFIG), HOSTNAME, PORT) {
override def init(channel: Channel) = {
- channel.exchangeDeclare("mult", "direct")
- channel.queueDeclare("mult_queue")
- channel.queueBind("mult_queue", "mult", "routeroute")
- channel.basicConsume("mult_queue", false, new DefaultConsumer(channel) {
- private val serializer = Serializer.Java
- override def handleDelivery(tag: String, env: Envelope, props: RabbitMQ.BasicProperties, payload: Array[Byte]) {
- val routingKey = env.getRoutingKey
- val contentType = props.contentType
- val deliveryTag = env.getDeliveryTag
- listener ! AMQP.Message(serializer.in(payload, None))
- channel.basicAck(deliveryTag, false)
+ channel.exchangeDeclare(EXCHANGE, "direct")
+ channel.queueDeclare(QUEUE)
+ channel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY)
+ channel.basicConsume(QUEUE, false, new DefaultConsumer(channel) {
+ override def handleDelivery(tag: String, envelope: Envelope, properties: RabbitMQ.BasicProperties, payload: Array[Byte]) {
+ messageConsumer ! Message(SERIALIZER.in(payload, None))
+ channel.basicAck(envelope.getDeliveryTag, false)
}
})
}
}
- val listener = new Actor() {
+ endpoint.start
+
+ val messageConsumer = new Actor() {
def receive: PartialFunction[Any, Unit] = {
- case AMQP.Message(payload) => println("Received message: " + payload)
+ case Message(payload) => println("Received message: " + payload)
}
}
- endpoint.start
- listener.start
- endpoint ! AMQP.AddListener(listener)
+ messageConsumer.start
+
+ endpoint ! MessageConsumer(messageConsumer)
+
+ val client = new Client(new ConnectionFactory(CONFIG), HOSTNAME, PORT, EXCHANGE, ROUTING_KEY, SERIALIZER)
+ client.start
+ client ! Message("The President: I'm going surfing")
}
diff --git a/akka.iws b/akka.iws
index d1208ab19f..0292fc4550 100644
--- a/akka.iws
+++ b/akka.iws
@@ -2,15 +2,8 @@
-
-
-
-
-
-
-
-
+
@@ -68,7 +61,7 @@
-
+
@@ -392,7 +385,7 @@
-
+