From 0a169fbc9699bfeb15e2381583cbd48958bc9cd6 Mon Sep 17 00:00:00 2001 From: jboner Date: Tue, 27 Oct 2009 12:32:06 +0100 Subject: [PATCH] Added possibility of sending reply messages directly by sending them to the AMQP.Consumer --- akka-amqp/src/main/scala/AMQP.scala | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/akka-amqp/src/main/scala/AMQP.scala b/akka-amqp/src/main/scala/AMQP.scala index 5928d74a80..55ca1a8e99 100644 --- a/akka-amqp/src/main/scala/AMQP.scala +++ b/akka-amqp/src/main/scala/AMQP.scala @@ -201,7 +201,7 @@ object AMQP extends Actor { log.info("AMQP.Producer [%s] is started", toString) - def receive: PartialFunction[Any, Unit] = { + def receive = { case message @ Message(payload, routingKey, mandatory, immediate, properties) => log.debug("Sending message [%s]", message) channel.basicPublish(exchangeName, routingKey, mandatory, immediate, properties, serializer.out(payload)) @@ -304,7 +304,7 @@ object AMQP extends Actor { listener.tag = Some(listenerTag) } - def receive: PartialFunction[Any, Unit] = { + def receive = { case listener: MessageConsumerListener => startLink(listener.actor) listeners.put(listener, listener) @@ -326,9 +326,16 @@ object AMQP extends Actor { } } + case message @ Message(payload, routingKey, mandatory, immediate, properties) => + log.debug("Sending message [%s]", message) + channel.basicPublish(exchangeName, routingKey, mandatory, immediate, properties, serializer.out(payload)) + case Reconnect(delay) => reconnect(delay) + case Failure(cause) => log.error(cause, ""); throw cause + case Stop => disconnect; stop + case unknown => throw new IllegalArgumentException("Unknown message [" + unknown + "] to AMQP Consumer [" + this + "]") }