cleaned up example session
This commit is contained in:
parent
a9ad5b8bdd
commit
4513b1be34
2 changed files with 44 additions and 39 deletions
|
|
@ -4,6 +4,7 @@
|
|||
|
||||
package com.scalablesolutions.akka.amqp
|
||||
|
||||
import java.lang.String
|
||||
import rabbitmq.client.{AMQP => RabbitMQ, _}
|
||||
import rabbitmq.client.ConnectionFactory
|
||||
|
||||
|
|
@ -15,7 +16,7 @@ import java.util.{Timer, TimerTask}
|
|||
|
||||
object AMQP {
|
||||
case class Message(val payload: AnyRef)
|
||||
case class AddListener(val listener: Actor)
|
||||
case class MessageConsumer(val listener: Actor)
|
||||
case class Reconnect(val delay: Long)
|
||||
|
||||
/**
|
||||
|
|
@ -23,8 +24,8 @@ object AMQP {
|
|||
* Usage:
|
||||
* <pre>
|
||||
* val params = new ConnectionParameters
|
||||
* params.setUsername("bob")
|
||||
* params.setPassword("wilber")
|
||||
* params.setUsername("barack")
|
||||
* params.setPassword("obama")
|
||||
* params.setVirtualHost("/")
|
||||
* params.setRequestedHeartbeat(0)
|
||||
* val client = new AMQP.Client(new ConnectionFactory(params), "localhost", 9889, "exchangeKey", "routingKey", Serializer.Java)
|
||||
|
|
@ -62,11 +63,11 @@ object AMQP {
|
|||
private var listeners: List[Actor] = Nil
|
||||
private val endpoint = this
|
||||
|
||||
def init(channel: Channel)
|
||||
def init(channel: Channel)
|
||||
|
||||
def receive: PartialFunction[Any, Unit] = {
|
||||
case message: Message => listeners.foreach(_ ! message)
|
||||
case AddListener(listener) => listeners ::= listener
|
||||
case MessageConsumer(listener) => listeners ::= listener
|
||||
case Reconnect(delay) => reconnect(delay)
|
||||
case unknown => throw new IllegalArgumentException("Unknown message to AMQP dispatcher [" + unknown + "]")
|
||||
}
|
||||
|
|
@ -86,36 +87,47 @@ object AMQP {
|
|||
}
|
||||
}
|
||||
|
||||
class ExampleAMQPEnpointRunner {
|
||||
val params = new ConnectionParameters
|
||||
params.setUsername("guest")
|
||||
params.setPassword("guest")
|
||||
params.setVirtualHost("/")
|
||||
params.setRequestedHeartbeat(0)
|
||||
class ExampleConsumer(channel: Channel, listener: Actor)
|
||||
val endpoint = new AMQP.Endpoint(new ConnectionFactory(params), "localhost", 5672) {
|
||||
class ExampleAMQPSession {
|
||||
import AMQP._
|
||||
|
||||
val CONFIG = new ConnectionParameters
|
||||
CONFIG.setUsername("barack")
|
||||
CONFIG.setPassword("obama")
|
||||
CONFIG.setVirtualHost("/")
|
||||
CONFIG.setRequestedHeartbeat(0)
|
||||
|
||||
val EXCHANGE = "whitehouse.gov"
|
||||
val QUEUE = "marketing"
|
||||
val ROUTING_KEY = "newsletter"
|
||||
val HOSTNAME = "localhost"
|
||||
val PORT = 8787
|
||||
val SERIALIZER = Serializer.Java
|
||||
|
||||
val endpoint = new Endpoint(new ConnectionFactory(CONFIG), HOSTNAME, PORT) {
|
||||
override def init(channel: Channel) = {
|
||||
channel.exchangeDeclare("mult", "direct")
|
||||
channel.queueDeclare("mult_queue")
|
||||
channel.queueBind("mult_queue", "mult", "routeroute")
|
||||
channel.basicConsume("mult_queue", false, new DefaultConsumer(channel) {
|
||||
private val serializer = Serializer.Java
|
||||
override def handleDelivery(tag: String, env: Envelope, props: RabbitMQ.BasicProperties, payload: Array[Byte]) {
|
||||
val routingKey = env.getRoutingKey
|
||||
val contentType = props.contentType
|
||||
val deliveryTag = env.getDeliveryTag
|
||||
listener ! AMQP.Message(serializer.in(payload, None))
|
||||
channel.basicAck(deliveryTag, false)
|
||||
channel.exchangeDeclare(EXCHANGE, "direct")
|
||||
channel.queueDeclare(QUEUE)
|
||||
channel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY)
|
||||
channel.basicConsume(QUEUE, false, new DefaultConsumer(channel) {
|
||||
override def handleDelivery(tag: String, envelope: Envelope, properties: RabbitMQ.BasicProperties, payload: Array[Byte]) {
|
||||
messageConsumer ! Message(SERIALIZER.in(payload, None))
|
||||
channel.basicAck(envelope.getDeliveryTag, false)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
val listener = new Actor() {
|
||||
endpoint.start
|
||||
|
||||
val messageConsumer = new Actor() {
|
||||
def receive: PartialFunction[Any, Unit] = {
|
||||
case AMQP.Message(payload) => println("Received message: " + payload)
|
||||
case Message(payload) => println("Received message: " + payload)
|
||||
}
|
||||
}
|
||||
endpoint.start
|
||||
listener.start
|
||||
endpoint ! AMQP.AddListener(listener)
|
||||
messageConsumer.start
|
||||
|
||||
endpoint ! MessageConsumer(messageConsumer)
|
||||
|
||||
val client = new Client(new ConnectionFactory(CONFIG), HOSTNAME, PORT, EXCHANGE, ROUTING_KEY, SERIALIZER)
|
||||
client.start
|
||||
client ! Message("The President: I'm going surfing")
|
||||
}
|
||||
|
|
|
|||
13
akka.iws
13
akka.iws
|
|
@ -2,15 +2,8 @@
|
|||
<project relativePaths="false" version="4">
|
||||
<component name="ChangeListManager">
|
||||
<list default="true" readonly="true" id="d8b7c92c-a9d3-4b9d-89b5-ef04194c68b8" name="Default" comment="">
|
||||
<change type="MODIFICATION" beforePath="$PROJECT_DIR$/kernel/akka-kernel.iml" afterPath="$PROJECT_DIR$/kernel/akka-kernel.iml" />
|
||||
<change type="NEW" beforePath="" afterPath="$PROJECT_DIR$/embedded-repo/org/multiverse/multiverse/0.3/multiverse-0.3.jar" />
|
||||
<change type="MODIFICATION" beforePath="$PROJECT_DIR$/fun-test-java/akka-fun-test-java.iml" afterPath="$PROJECT_DIR$/fun-test-java/akka-fun-test-java.iml" />
|
||||
<change type="MODIFICATION" beforePath="$PROJECT_DIR$/pom.xml" afterPath="$PROJECT_DIR$/pom.xml" />
|
||||
<change type="MODIFICATION" beforePath="$PROJECT_DIR$/samples-scala/akka-samples-scala.iml" afterPath="$PROJECT_DIR$/samples-scala/akka-samples-scala.iml" />
|
||||
<change type="MODIFICATION" beforePath="$PROJECT_DIR$/samples-lift/akka-samples-lift.iml" afterPath="$PROJECT_DIR$/samples-lift/akka-samples-lift.iml" />
|
||||
<change type="MODIFICATION" beforePath="$PROJECT_DIR$/samples-java/akka-samples-java.iml" afterPath="$PROJECT_DIR$/samples-java/akka-samples-java.iml" />
|
||||
<change type="MODIFICATION" beforePath="$PROJECT_DIR$/akka.ipr" afterPath="$PROJECT_DIR$/akka.ipr" />
|
||||
<change type="MODIFICATION" beforePath="$PROJECT_DIR$/akka.iws" afterPath="$PROJECT_DIR$/akka.iws" />
|
||||
<change type="MODIFICATION" beforePath="$PROJECT_DIR$/akka-amqp/src/main/scala/AMQP.scala" afterPath="$PROJECT_DIR$/akka-amqp/src/main/scala/AMQP.scala" />
|
||||
</list>
|
||||
<ignored path=".idea/workspace.xml" />
|
||||
<ignored path="akka.iws" />
|
||||
|
|
@ -68,7 +61,7 @@
|
|||
<file leaf-file-name="AMQP.scala" pinned="false" current="true" current-in-tab="true">
|
||||
<entry file="file://$PROJECT_DIR$/akka-amqp/src/main/scala/AMQP.scala">
|
||||
<provider selected="true" editor-type-id="text-editor">
|
||||
<state line="96" column="21" selection-start="3271" selection-end="3271" vertical-scroll-proportion="0.60444444">
|
||||
<state line="65" column="8" selection-start="2035" selection-end="2035" vertical-scroll-proportion="0.9288889">
|
||||
<folding />
|
||||
</state>
|
||||
</provider>
|
||||
|
|
@ -392,7 +385,7 @@
|
|||
</entry>
|
||||
<entry file="file://$PROJECT_DIR$/akka-amqp/src/main/scala/AMQP.scala">
|
||||
<provider selected="true" editor-type-id="text-editor">
|
||||
<state line="96" column="21" selection-start="3271" selection-end="3271" vertical-scroll-proportion="0.60444444">
|
||||
<state line="65" column="8" selection-start="2035" selection-end="2035" vertical-scroll-proportion="0.9288889">
|
||||
<folding />
|
||||
</state>
|
||||
</provider>
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue