removed transparent serialization/deserialization on AMQP module

This commit is contained in:
jboner 2009-10-27 22:26:13 +01:00
parent 58fe1bf913
commit 92eb5746fb
2 changed files with 44 additions and 43 deletions

View file

@ -10,7 +10,6 @@ import com.rabbitmq.client.ConnectionFactory
import se.scalablesolutions.akka.actor.{OneForOneStrategy, Actor}
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.util.{HashCode, Logging}
import se.scalablesolutions.akka.serialization.Serializer
import scala.collection.mutable.HashMap
@ -53,16 +52,16 @@ object AMQP extends Actor {
sealed trait AMQPMessage
private[akka] trait InternalAMQPMessage extends AMQPMessage
class Message(val payload: AnyRef,
val routingKey: String,
val mandatory: Boolean,
val immediate: Boolean,
class Message(val payload: Array[Byte],
val routingKey: String,
val mandatory: Boolean,
val immediate: Boolean,
val properties: RabbitMQ.BasicProperties) extends AMQPMessage {
override def toString(): String =
"Message[payload=" + payload +
", routingKey=" + routingKey +
", mandatory=" + mandatory +
", immediate=" + immediate +
override def toString(): String =
"Message[payload=" + payload +
", routingKey=" + routingKey +
", mandatory=" + mandatory +
", immediate=" + immediate +
", properties=" + properties + "]"
}
@ -70,17 +69,19 @@ object AMQP extends Actor {
def unapply(message: Message): Option[Tuple5[AnyRef, String, Boolean, Boolean, RabbitMQ.BasicProperties]] =
Some((message.payload, message.routingKey, message.mandatory, message.immediate, message.properties))
def apply(payload: AnyRef, routingKey: String, mandatory: Boolean, immediate: Boolean, properties: RabbitMQ.BasicProperties): Message =
def apply(payload: Array[Byte], routingKey: String, mandatory: Boolean, immediate: Boolean, properties: RabbitMQ.BasicProperties): Message =
new Message(payload, routingKey, mandatory, immediate, properties)
def apply(payload: AnyRef, routingKey: String): Message =
def apply(payload: Array[Byte], routingKey: String): Message =
new Message(payload, routingKey, false, false, null)
}
private[akka] case class MessageConsumerListener(queueName: String,
routingKey: String,
isUsingExistingQueue: Boolean,
actor: Actor) extends AMQPMessage {
case class MessageConsumerListener(queueName: String,
routingKey: String,
isUsingExistingQueue: Boolean,
actor: Actor) extends AMQPMessage {
def this(queueName: String, routingKey: String, actor: Actor) = this(queueName, routingKey, false, actor)
private[akka] var tag: Option[String] = None
override def toString() =
@ -117,6 +118,9 @@ object AMQP extends Actor {
{ if (that.asInstanceOf[MessageConsumerListener].tag.isDefined) { that.asInstanceOf[MessageConsumerListener].tag.get == tag.get } else true}
}
}
object MessageConsumerListener {
def apply(queueName: String, routingKey: String, actor: Actor) = new MessageConsumerListener(queueName, routingKey, false, actor)
}
case object Stop extends AMQPMessage
@ -156,7 +160,6 @@ object AMQP extends Actor {
hostname: String,
port: Int,
exchangeName: String,
serializer: Serializer,
returnListener: Option[ReturnListener],
shutdownListener: Option[ShutdownListener],
initReconnectDelay: Long): Producer = {
@ -164,7 +167,6 @@ object AMQP extends Actor {
new ConnectionFactory(config),
hostname, port,
exchangeName,
serializer,
returnListener,
shutdownListener,
initReconnectDelay)
@ -178,7 +180,6 @@ object AMQP extends Actor {
port: Int,
exchangeName: String,
exchangeType: ExchangeType,
serializer: Serializer,
shutdownListener: Option[ShutdownListener],
initReconnectDelay: Long,
passive: Boolean,
@ -189,7 +190,6 @@ object AMQP extends Actor {
hostname, port,
exchangeName,
exchangeType,
serializer,
shutdownListener,
initReconnectDelay,
passive,
@ -218,7 +218,6 @@ object AMQP extends Actor {
val hostname: String,
val port: Int,
val exchangeName: String,
val serializer: Serializer,
val returnListener: Option[ReturnListener],
val shutdownListener: Option[ShutdownListener],
val initReconnectDelay: Long)
@ -228,10 +227,12 @@ object AMQP extends Actor {
log.info("AMQP.Producer [%s] is started", toString)
def newRPC(routingKey: String): RpcClient = new RpcClient(channel, exchangeName, routingKey)
def receive = {
case message @ Message(payload, routingKey, mandatory, immediate, properties) =>
log.debug("Sending message [%s]", message)
channel.basicPublish(exchangeName, routingKey, mandatory, immediate, properties, serializer.out(payload))
channel.basicPublish(exchangeName, routingKey, mandatory, immediate, properties, payload.asInstanceOf[Array[Byte]])
case Stop =>
disconnect
stop
@ -278,7 +279,6 @@ object AMQP extends Actor {
val port: Int,
val exchangeName: String,
val exchangeType: ExchangeType,
val serializer: Serializer,
val shutdownListener: Option[ShutdownListener],
val initReconnectDelay: Long,
val passive: Boolean,
@ -299,7 +299,7 @@ object AMQP extends Actor {
case listener: MessageConsumerListener =>
startLink(listener.actor)
listeners.put(listener, listener)
setupConsumer(listener)
registerConsumer(listener)
log.info("Message consumer listener is registered [%s]", listener)
case CancelMessageConsumerListener(listener) =>
@ -319,7 +319,7 @@ object AMQP extends Actor {
case message @ Message(payload, routingKey, mandatory, immediate, properties) =>
log.debug("Sending message [%s]", message)
channel.basicPublish(exchangeName, routingKey, mandatory, immediate, properties, serializer.out(payload))
channel.basicPublish(exchangeName, routingKey, mandatory, immediate, properties, payload.asInstanceOf[Array[Byte]])
case Reconnect(delay) => reconnect(delay)
@ -336,13 +336,14 @@ object AMQP extends Actor {
channel.exchangeDeclare(exchangeName.toString, exchangeType.toString,
passive, durable,
configurationArguments.asJava)
listeners.elements.toList.map(_._2).foreach(setupConsumer)
listeners.elements.toList.map(_._2).foreach(registerConsumer)
if (shutdownListener.isDefined) connection.addShutdownListener(shutdownListener.get)
}
private def setupConsumer(listener: MessageConsumerListener) = {
log.debug("Adding MessageConsumerListener %s", listener.toString(exchangeName))
private def registerConsumer(listener: MessageConsumerListener) = {
log.debug("Register MessageConsumerListener %s", listener.toString(exchangeName))
if (!listener.isUsingExistingQueue) {
log.debug("Declaring and binding new queue for MessageConsumerListener [%s]", listener.queueName)
channel.queueDeclare(listener.queueName)
channel.queueBind(listener.queueName, exchangeName, listener.routingKey)
}
@ -350,7 +351,9 @@ object AMQP extends Actor {
val listenerTag = channel.basicConsume(listener.queueName, false, new DefaultConsumer(channel) with Logging {
override def handleDelivery(tag: String, envelope: Envelope, properties: RabbitMQ.BasicProperties, payload: Array[Byte]) {
try {
listener.actor ! Message(serializer.in(payload, None), envelope.getRoutingKey)
val mandatory = false // FIXME: where to find out if it's mandatory?
val immediate = false // FIXME: where to find out if it's immediate?
listener.actor ! Message(payload, envelope.getRoutingKey, mandatory, immediate, properties)
channel.basicAck(envelope.getDeliveryTag, false)
} catch {
case cause => self ! Failure(cause) // pass on and re-throw exception in endpoint actor to trigger restart and reconnect

View file

@ -4,14 +4,12 @@
package se.scalablesolutions.akka.amqp
import se.scalablesolutions.akka.serialization.Serializer
import se.scalablesolutions.akka.actor.Actor
import com.rabbitmq.client.ConnectionParameters
object ExampleSession {
import AMQP._
val SERIALIZER = Serializer.Java
val CONFIG = new ConnectionParameters
val HOSTNAME = "localhost"
val PORT = 5672
@ -30,29 +28,29 @@ object ExampleSession {
}
def direct = {
val consumer = AMQP.newConsumer(CONFIG, HOSTNAME, PORT, IM, ExchangeType.Direct, SERIALIZER, None, 100, false, false, Map[String, AnyRef]())
consumer ! MessageConsumerListener("@george_bush", "direct", false, new Actor() {
val consumer = AMQP.newConsumer(CONFIG, HOSTNAME, PORT, IM, ExchangeType.Direct, None, 100, false, false, Map[String, AnyRef]())
consumer ! MessageConsumerListener("@george_bush", "direct", new Actor() {
def receive: PartialFunction[Any, Unit] = {
case Message(payload, _, _, _, _) => log.info("@george_bush received message from: %s", payload)
case Message(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload.asInstanceOf[Array[Byte]]))
}
})
val producer = AMQP.newProducer(CONFIG, HOSTNAME, PORT, IM, SERIALIZER, None, None, 100)
producer ! Message("@jonas_boner: You sucked!!", "direct")
val producer = AMQP.newProducer(CONFIG, HOSTNAME, PORT, IM, None, None, 100)
producer ! Message("@jonas_boner: You sucked!!".getBytes, "direct")
}
def fanout = {
val consumer = AMQP.newConsumer(CONFIG, HOSTNAME, PORT, CHAT, ExchangeType.Fanout, SERIALIZER, None, 100, false, false, Map[String, AnyRef]())
consumer ! MessageConsumerListener("@george_bush", "", false, new Actor() {
val consumer = AMQP.newConsumer(CONFIG, HOSTNAME, PORT, CHAT, ExchangeType.Fanout, None, 100, false, false, Map[String, AnyRef]())
consumer ! MessageConsumerListener("@george_bush", "", new Actor() {
def receive: PartialFunction[Any, Unit] = {
case Message(payload, _, _, _, _) => log.info("@george_bush received message from: %s", payload)
case Message(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload.asInstanceOf[Array[Byte]]))
}
})
consumer ! MessageConsumerListener("@barack_obama", "", false, new Actor() {
consumer ! MessageConsumerListener("@barack_obama", "", new Actor() {
def receive: PartialFunction[Any, Unit] = {
case Message(payload, _, _, _, _) => log.info("@barack_obama received message from: %s", payload)
case Message(payload, _, _, _, _) => log.info("@barack_obama received message from: %s", new String(payload.asInstanceOf[Array[Byte]]))
}
})
val producer = AMQP.newProducer(CONFIG, HOSTNAME, PORT, CHAT, SERIALIZER, None, None, 100)
producer ! Message("@jonas_boner: I'm going surfing", "")
val producer = AMQP.newProducer(CONFIG, HOSTNAME, PORT, CHAT, None, None, 100)
producer ! Message("@jonas_boner: I'm going surfing".getBytes, "")
}
}