From 4513b1be345fdbc7ac3516aa60bcddd71228a6bb Mon Sep 17 00:00:00 2001 From: jboner Date: Thu, 27 Aug 2009 18:42:39 +0200 Subject: [PATCH] cleaned up example session --- akka-amqp/src/main/scala/AMQP.scala | 70 +++++++++++++++++------------ akka.iws | 13 ++---- 2 files changed, 44 insertions(+), 39 deletions(-) diff --git a/akka-amqp/src/main/scala/AMQP.scala b/akka-amqp/src/main/scala/AMQP.scala index 74d0350a17..7e46ca1b9f 100644 --- a/akka-amqp/src/main/scala/AMQP.scala +++ b/akka-amqp/src/main/scala/AMQP.scala @@ -4,6 +4,7 @@ package com.scalablesolutions.akka.amqp +import java.lang.String import rabbitmq.client.{AMQP => RabbitMQ, _} import rabbitmq.client.ConnectionFactory @@ -15,7 +16,7 @@ import java.util.{Timer, TimerTask} object AMQP { case class Message(val payload: AnyRef) - case class AddListener(val listener: Actor) + case class MessageConsumer(val listener: Actor) case class Reconnect(val delay: Long) /** @@ -23,8 +24,8 @@ object AMQP { * Usage: *
    * val params = new ConnectionParameters
-   * params.setUsername("bob")
-   * params.setPassword("wilber")
+   * params.setUsername("barack")
+   * params.setPassword("obama")
    * params.setVirtualHost("/")
    * params.setRequestedHeartbeat(0)
    * val client = new AMQP.Client(new ConnectionFactory(params), "localhost", 9889, "exchangeKey", "routingKey", Serializer.Java)
@@ -62,11 +63,11 @@ object AMQP {
     private var listeners: List[Actor] = Nil
     private val endpoint = this
 
-    def init(channel: Channel)
+    def  init(channel: Channel)
 
     def receive: PartialFunction[Any, Unit] = {
       case message: Message => listeners.foreach(_ ! message)
-      case AddListener(listener) => listeners ::= listener
+      case MessageConsumer(listener) => listeners ::= listener
       case Reconnect(delay) => reconnect(delay)
       case unknown => throw new IllegalArgumentException("Unknown message to AMQP dispatcher [" + unknown + "]")
     }
@@ -86,36 +87,47 @@ object AMQP {
   }
 }
 
-class ExampleAMQPEnpointRunner {
-  val params = new ConnectionParameters
-  params.setUsername("guest")
-  params.setPassword("guest")
-  params.setVirtualHost("/")
-  params.setRequestedHeartbeat(0)
-  class ExampleConsumer(channel: Channel, listener: Actor)
-  val endpoint = new AMQP.Endpoint(new ConnectionFactory(params), "localhost", 5672) {
+class ExampleAMQPSession {
+  import AMQP._
+  
+  val CONFIG = new ConnectionParameters
+  CONFIG.setUsername("barack")
+  CONFIG.setPassword("obama")
+  CONFIG.setVirtualHost("/")
+  CONFIG.setRequestedHeartbeat(0)
+
+  val EXCHANGE = "whitehouse.gov"
+  val QUEUE = "marketing"
+  val ROUTING_KEY = "newsletter"
+  val HOSTNAME = "localhost"
+  val PORT = 8787
+  val SERIALIZER = Serializer.Java
+
+  val endpoint = new Endpoint(new ConnectionFactory(CONFIG), HOSTNAME, PORT) {
     override def init(channel: Channel) = {
-      channel.exchangeDeclare("mult", "direct")
-      channel.queueDeclare("mult_queue")
-      channel.queueBind("mult_queue", "mult", "routeroute")
-      channel.basicConsume("mult_queue", false, new DefaultConsumer(channel) {
-        private val serializer = Serializer.Java
-        override def handleDelivery(tag: String, env: Envelope, props: RabbitMQ.BasicProperties, payload: Array[Byte]) {
-          val routingKey = env.getRoutingKey
-          val contentType = props.contentType
-          val deliveryTag = env.getDeliveryTag
-          listener ! AMQP.Message(serializer.in(payload, None))
-          channel.basicAck(deliveryTag, false)
+      channel.exchangeDeclare(EXCHANGE, "direct")
+      channel.queueDeclare(QUEUE)
+      channel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY)
+      channel.basicConsume(QUEUE, false, new DefaultConsumer(channel) {
+        override def handleDelivery(tag: String, envelope: Envelope, properties: RabbitMQ.BasicProperties, payload: Array[Byte]) {
+          messageConsumer ! Message(SERIALIZER.in(payload, None))
+          channel.basicAck(envelope.getDeliveryTag, false)
         }
       })
     }
   }
-  val listener = new Actor() {
+  endpoint.start
+
+  val messageConsumer = new Actor() {
     def receive: PartialFunction[Any, Unit] = {
-      case AMQP.Message(payload) => println("Received message: " + payload)
+      case Message(payload) => println("Received message: " + payload)
     }
   }
-  endpoint.start
-  listener.start
-  endpoint ! AMQP.AddListener(listener)
+  messageConsumer.start
+
+  endpoint ! MessageConsumer(messageConsumer)
+
+  val client = new Client(new ConnectionFactory(CONFIG), HOSTNAME, PORT, EXCHANGE, ROUTING_KEY, SERIALIZER)
+  client.start
+  client ! Message("The President: I'm going surfing")
 }
diff --git a/akka.iws b/akka.iws
index d1208ab19f..0292fc4550 100644
--- a/akka.iws
+++ b/akka.iws
@@ -2,15 +2,8 @@
 
   
     
-      
-      
-      
-      
-      
-      
-      
-      
       
+      
     
     
     
@@ -68,7 +61,7 @@
       
         
           
-            
+            
               
             
           
@@ -392,7 +385,7 @@
     
     
       
-        
+