From 92eb5746fbc05d4bf29cd2ac8b15c3599c4ffd0e Mon Sep 17 00:00:00 2001 From: jboner Date: Tue, 27 Oct 2009 22:26:13 +0100 Subject: [PATCH] removed transparent serialization/deserialization on AMQP module --- akka-amqp/src/main/scala/AMQP.scala | 61 ++++++++++--------- akka-amqp/src/main/scala/ExampleSession.scala | 26 ++++---- 2 files changed, 44 insertions(+), 43 deletions(-) diff --git a/akka-amqp/src/main/scala/AMQP.scala b/akka-amqp/src/main/scala/AMQP.scala index 4872799016..6ccab7c6c9 100644 --- a/akka-amqp/src/main/scala/AMQP.scala +++ b/akka-amqp/src/main/scala/AMQP.scala @@ -10,7 +10,6 @@ import com.rabbitmq.client.ConnectionFactory import se.scalablesolutions.akka.actor.{OneForOneStrategy, Actor} import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.util.{HashCode, Logging} -import se.scalablesolutions.akka.serialization.Serializer import scala.collection.mutable.HashMap @@ -53,16 +52,16 @@ object AMQP extends Actor { sealed trait AMQPMessage private[akka] trait InternalAMQPMessage extends AMQPMessage - class Message(val payload: AnyRef, - val routingKey: String, - val mandatory: Boolean, - val immediate: Boolean, + class Message(val payload: Array[Byte], + val routingKey: String, + val mandatory: Boolean, + val immediate: Boolean, val properties: RabbitMQ.BasicProperties) extends AMQPMessage { - override def toString(): String = - "Message[payload=" + payload + - ", routingKey=" + routingKey + - ", mandatory=" + mandatory + - ", immediate=" + immediate + + override def toString(): String = + "Message[payload=" + payload + + ", routingKey=" + routingKey + + ", mandatory=" + mandatory + + ", immediate=" + immediate + ", properties=" + properties + "]" } @@ -70,17 +69,19 @@ object AMQP extends Actor { def unapply(message: Message): Option[Tuple5[AnyRef, String, Boolean, Boolean, RabbitMQ.BasicProperties]] = Some((message.payload, message.routingKey, message.mandatory, message.immediate, message.properties)) - def apply(payload: AnyRef, routingKey: String, mandatory: Boolean, immediate: Boolean, properties: RabbitMQ.BasicProperties): Message = + def apply(payload: Array[Byte], routingKey: String, mandatory: Boolean, immediate: Boolean, properties: RabbitMQ.BasicProperties): Message = new Message(payload, routingKey, mandatory, immediate, properties) - def apply(payload: AnyRef, routingKey: String): Message = + def apply(payload: Array[Byte], routingKey: String): Message = new Message(payload, routingKey, false, false, null) } - private[akka] case class MessageConsumerListener(queueName: String, - routingKey: String, - isUsingExistingQueue: Boolean, - actor: Actor) extends AMQPMessage { + case class MessageConsumerListener(queueName: String, + routingKey: String, + isUsingExistingQueue: Boolean, + actor: Actor) extends AMQPMessage { + def this(queueName: String, routingKey: String, actor: Actor) = this(queueName, routingKey, false, actor) + private[akka] var tag: Option[String] = None override def toString() = @@ -117,6 +118,9 @@ object AMQP extends Actor { { if (that.asInstanceOf[MessageConsumerListener].tag.isDefined) { that.asInstanceOf[MessageConsumerListener].tag.get == tag.get } else true} } } + object MessageConsumerListener { + def apply(queueName: String, routingKey: String, actor: Actor) = new MessageConsumerListener(queueName, routingKey, false, actor) + } case object Stop extends AMQPMessage @@ -156,7 +160,6 @@ object AMQP extends Actor { hostname: String, port: Int, exchangeName: String, - serializer: Serializer, returnListener: Option[ReturnListener], shutdownListener: Option[ShutdownListener], initReconnectDelay: Long): Producer = { @@ -164,7 +167,6 @@ object AMQP extends Actor { new ConnectionFactory(config), hostname, port, exchangeName, - serializer, returnListener, shutdownListener, initReconnectDelay) @@ -178,7 +180,6 @@ object AMQP extends Actor { port: Int, exchangeName: String, exchangeType: ExchangeType, - serializer: Serializer, shutdownListener: Option[ShutdownListener], initReconnectDelay: Long, passive: Boolean, @@ -189,7 +190,6 @@ object AMQP extends Actor { hostname, port, exchangeName, exchangeType, - serializer, shutdownListener, initReconnectDelay, passive, @@ -218,7 +218,6 @@ object AMQP extends Actor { val hostname: String, val port: Int, val exchangeName: String, - val serializer: Serializer, val returnListener: Option[ReturnListener], val shutdownListener: Option[ShutdownListener], val initReconnectDelay: Long) @@ -228,10 +227,12 @@ object AMQP extends Actor { log.info("AMQP.Producer [%s] is started", toString) + def newRPC(routingKey: String): RpcClient = new RpcClient(channel, exchangeName, routingKey) + def receive = { case message @ Message(payload, routingKey, mandatory, immediate, properties) => log.debug("Sending message [%s]", message) - channel.basicPublish(exchangeName, routingKey, mandatory, immediate, properties, serializer.out(payload)) + channel.basicPublish(exchangeName, routingKey, mandatory, immediate, properties, payload.asInstanceOf[Array[Byte]]) case Stop => disconnect stop @@ -278,7 +279,6 @@ object AMQP extends Actor { val port: Int, val exchangeName: String, val exchangeType: ExchangeType, - val serializer: Serializer, val shutdownListener: Option[ShutdownListener], val initReconnectDelay: Long, val passive: Boolean, @@ -299,7 +299,7 @@ object AMQP extends Actor { case listener: MessageConsumerListener => startLink(listener.actor) listeners.put(listener, listener) - setupConsumer(listener) + registerConsumer(listener) log.info("Message consumer listener is registered [%s]", listener) case CancelMessageConsumerListener(listener) => @@ -319,7 +319,7 @@ object AMQP extends Actor { case message @ Message(payload, routingKey, mandatory, immediate, properties) => log.debug("Sending message [%s]", message) - channel.basicPublish(exchangeName, routingKey, mandatory, immediate, properties, serializer.out(payload)) + channel.basicPublish(exchangeName, routingKey, mandatory, immediate, properties, payload.asInstanceOf[Array[Byte]]) case Reconnect(delay) => reconnect(delay) @@ -336,13 +336,14 @@ object AMQP extends Actor { channel.exchangeDeclare(exchangeName.toString, exchangeType.toString, passive, durable, configurationArguments.asJava) - listeners.elements.toList.map(_._2).foreach(setupConsumer) + listeners.elements.toList.map(_._2).foreach(registerConsumer) if (shutdownListener.isDefined) connection.addShutdownListener(shutdownListener.get) } - private def setupConsumer(listener: MessageConsumerListener) = { - log.debug("Adding MessageConsumerListener %s", listener.toString(exchangeName)) + private def registerConsumer(listener: MessageConsumerListener) = { + log.debug("Register MessageConsumerListener %s", listener.toString(exchangeName)) if (!listener.isUsingExistingQueue) { + log.debug("Declaring and binding new queue for MessageConsumerListener [%s]", listener.queueName) channel.queueDeclare(listener.queueName) channel.queueBind(listener.queueName, exchangeName, listener.routingKey) } @@ -350,7 +351,9 @@ object AMQP extends Actor { val listenerTag = channel.basicConsume(listener.queueName, false, new DefaultConsumer(channel) with Logging { override def handleDelivery(tag: String, envelope: Envelope, properties: RabbitMQ.BasicProperties, payload: Array[Byte]) { try { - listener.actor ! Message(serializer.in(payload, None), envelope.getRoutingKey) + val mandatory = false // FIXME: where to find out if it's mandatory? + val immediate = false // FIXME: where to find out if it's immediate? + listener.actor ! Message(payload, envelope.getRoutingKey, mandatory, immediate, properties) channel.basicAck(envelope.getDeliveryTag, false) } catch { case cause => self ! Failure(cause) // pass on and re-throw exception in endpoint actor to trigger restart and reconnect diff --git a/akka-amqp/src/main/scala/ExampleSession.scala b/akka-amqp/src/main/scala/ExampleSession.scala index 9f4d13aed9..3054080b8f 100644 --- a/akka-amqp/src/main/scala/ExampleSession.scala +++ b/akka-amqp/src/main/scala/ExampleSession.scala @@ -4,14 +4,12 @@ package se.scalablesolutions.akka.amqp -import se.scalablesolutions.akka.serialization.Serializer import se.scalablesolutions.akka.actor.Actor import com.rabbitmq.client.ConnectionParameters object ExampleSession { import AMQP._ - val SERIALIZER = Serializer.Java val CONFIG = new ConnectionParameters val HOSTNAME = "localhost" val PORT = 5672 @@ -30,29 +28,29 @@ object ExampleSession { } def direct = { - val consumer = AMQP.newConsumer(CONFIG, HOSTNAME, PORT, IM, ExchangeType.Direct, SERIALIZER, None, 100, false, false, Map[String, AnyRef]()) - consumer ! MessageConsumerListener("@george_bush", "direct", false, new Actor() { + val consumer = AMQP.newConsumer(CONFIG, HOSTNAME, PORT, IM, ExchangeType.Direct, None, 100, false, false, Map[String, AnyRef]()) + consumer ! MessageConsumerListener("@george_bush", "direct", new Actor() { def receive: PartialFunction[Any, Unit] = { - case Message(payload, _, _, _, _) => log.info("@george_bush received message from: %s", payload) + case Message(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload.asInstanceOf[Array[Byte]])) } }) - val producer = AMQP.newProducer(CONFIG, HOSTNAME, PORT, IM, SERIALIZER, None, None, 100) - producer ! Message("@jonas_boner: You sucked!!", "direct") + val producer = AMQP.newProducer(CONFIG, HOSTNAME, PORT, IM, None, None, 100) + producer ! Message("@jonas_boner: You sucked!!".getBytes, "direct") } def fanout = { - val consumer = AMQP.newConsumer(CONFIG, HOSTNAME, PORT, CHAT, ExchangeType.Fanout, SERIALIZER, None, 100, false, false, Map[String, AnyRef]()) - consumer ! MessageConsumerListener("@george_bush", "", false, new Actor() { + val consumer = AMQP.newConsumer(CONFIG, HOSTNAME, PORT, CHAT, ExchangeType.Fanout, None, 100, false, false, Map[String, AnyRef]()) + consumer ! MessageConsumerListener("@george_bush", "", new Actor() { def receive: PartialFunction[Any, Unit] = { - case Message(payload, _, _, _, _) => log.info("@george_bush received message from: %s", payload) + case Message(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload.asInstanceOf[Array[Byte]])) } }) - consumer ! MessageConsumerListener("@barack_obama", "", false, new Actor() { + consumer ! MessageConsumerListener("@barack_obama", "", new Actor() { def receive: PartialFunction[Any, Unit] = { - case Message(payload, _, _, _, _) => log.info("@barack_obama received message from: %s", payload) + case Message(payload, _, _, _, _) => log.info("@barack_obama received message from: %s", new String(payload.asInstanceOf[Array[Byte]])) } }) - val producer = AMQP.newProducer(CONFIG, HOSTNAME, PORT, CHAT, SERIALIZER, None, None, 100) - producer ! Message("@jonas_boner: I'm going surfing", "") + val producer = AMQP.newProducer(CONFIG, HOSTNAME, PORT, CHAT, None, None, 100) + producer ! Message("@jonas_boner: I'm going surfing".getBytes, "") } } \ No newline at end of file