diff --git a/akka-amqp/src/main/scala/AMQP.scala b/akka-amqp/src/main/scala/AMQP.scala deleted file mode 100644 index 0d94c7a7dc..0000000000 --- a/akka-amqp/src/main/scala/AMQP.scala +++ /dev/null @@ -1,150 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -package se.scalablesolutions.akka.amqp - -import se.scalablesolutions.akka.actor.{Actor, ActorRef} -import se.scalablesolutions.akka.actor.Actor._ -import se.scalablesolutions.akka.config.OneForOneStrategy -import com.rabbitmq.client.{ReturnListener, ShutdownListener, ConnectionFactory} -import java.lang.IllegalArgumentException -import se.scalablesolutions.akka.util.Logging -/** - * AMQP Actor API. Implements Connection, Producer and Consumer materialized as Actors. - * - * @see se.scalablesolutions.akka.amqp.ExampleSession - * - * @author Irmo Manie - */ -object AMQP { - case class ConnectionParameters( - host: String = ConnectionFactory.DEFAULT_HOST, - port: Int = ConnectionFactory.DEFAULT_AMQP_PORT, - username: String = ConnectionFactory.DEFAULT_USER, - password: String = ConnectionFactory.DEFAULT_PASS, - virtualHost: String = ConnectionFactory.DEFAULT_VHOST, - initReconnectDelay: Long = 5000, - connectionCallback: Option[ActorRef] = None) - - case class ChannelParameters( - shutdownListener: Option[ShutdownListener] = None, - channelCallback: Option[ActorRef] = None) - - case class ExchangeParameters( - exchangeName: String, - exchangeType: ExchangeType, - exchangeDurable: Boolean = false, - exchangeAutoDelete: Boolean = true, - exchangePassive: Boolean = false, - configurationArguments: Map[String, AnyRef] = Map()) - - case class ProducerParameters( - exchangeParameters: ExchangeParameters, - producerId: Option[String] = None, - returnListener: Option[ReturnListener] = None, - channelParameters: Option[ChannelParameters] = None) - - case class ConsumerParameters( - exchangeParameters: ExchangeParameters, - routingKey: String, - deliveryHandler: ActorRef, - queueName: Option[String] = None, - queueDurable: Boolean = false, - queueAutoDelete: Boolean = true, - queuePassive: Boolean = false, - queueExclusive: Boolean = false, - selfAcknowledging: Boolean = true, - channelParameters: Option[ChannelParameters] = None) { - if (queueDurable && queueName.isEmpty) { - throw new IllegalArgumentException("A queue name is required when requesting a durable queue.") - } - } - - def newConnection(connectionParameters: ConnectionParameters = new ConnectionParameters): ActorRef = { - val connection: ActorRef = supervisor.newConnection(connectionParameters) - connection ! Connect - connection - } - - def newProducer(connection: ActorRef, producerParameters: ProducerParameters): ActorRef = { - val producer: ActorRef = Actor.actorOf(new ProducerActor(producerParameters)) - connection.startLink(producer) - producer ! Start - producer - } - - def newConsumer(connection: ActorRef, consumerParameters: ConsumerParameters): ActorRef = { - val consumer: ActorRef = actorOf(new ConsumerActor(consumerParameters)) - val handler = consumerParameters.deliveryHandler - if (handler.supervisor.isEmpty) consumer.startLink(handler) - connection.startLink(consumer) - consumer ! Start - consumer - } - - def newRpcClient[O,I]( - connection: ActorRef, - exchangeParameters: ExchangeParameters, - routingKey: String, - serializer: RpcClientSerializer[O,I], - channelParameters: Option[ChannelParameters] = None): ActorRef = { - val rpcActor: ActorRef = actorOf(new RpcClientActor[O,I](exchangeParameters, routingKey, serializer, channelParameters)) - connection.startLink(rpcActor) - rpcActor ! Start - rpcActor - } - - def newRpcServer[I,O]( - connection: ActorRef, - exchangeParameters: ExchangeParameters, - routingKey: String, - serializer: RpcServerSerializer[I,O], - requestHandler: I => O, - queueName: Option[String] = None, - channelParameters: Option[ChannelParameters] = None) = { - val producer = newProducer(connection, new ProducerParameters(new ExchangeParameters("", ExchangeType.Direct), channelParameters = channelParameters)) - val rpcServer = actorOf(new RpcServerActor[I,O](producer, serializer, requestHandler)) - val consumer = newConsumer(connection, new ConsumerParameters(exchangeParameters, routingKey, rpcServer - , channelParameters = channelParameters - , selfAcknowledging = false - , queueName = queueName)) - - } - - private val supervisor = new AMQPSupervisor - - class AMQPSupervisor extends Logging { - class AMQPSupervisorActor extends Actor { - import self._ - - faultHandler = Some(OneForOneStrategy(5, 5000)) - trapExit = List(classOf[Throwable]) - - def receive = { - case _ => {} // ignore all messages - } - } - - private val supervisor = actorOf(new AMQPSupervisorActor).start - - def newConnection(connectionParameters: ConnectionParameters): ActorRef = { - val connectionActor = actorOf(new FaultTolerantConnectionActor(connectionParameters)) - supervisor.startLink(connectionActor) - connectionActor - } - } - - trait FromBinary[T] { - def fromBinary(bytes: Array[Byte]): T - } - - trait ToBinary[T] { - def toBinary(t: T): Array[Byte] - } - - - case class RpcClientSerializer[O,I](toBinary: ToBinary[O], fromBinary: FromBinary[I]) - - case class RpcServerSerializer[I,O](fromBinary: FromBinary[I], toBinary: ToBinary[O]) -} diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala new file mode 100644 index 0000000000..cd73d27e03 --- /dev/null +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala @@ -0,0 +1,218 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.amqp + +import se.scalablesolutions.akka.actor.{Actor, ActorRef} +import se.scalablesolutions.akka.actor.Actor._ +import se.scalablesolutions.akka.config.OneForOneStrategy +import com.rabbitmq.client.{ReturnListener, ShutdownListener, ConnectionFactory} +import com.rabbitmq.client.AMQP.BasicProperties +import java.lang.{String, IllegalArgumentException} + +/** + * AMQP Actor API. Implements Connection, Producer and Consumer materialized as Actors. + * + * @see se.scalablesolutions.akka.amqp.ExampleSession + * + * @author Irmo Manie + */ +object AMQP { + case class ConnectionParameters( + host: String = ConnectionFactory.DEFAULT_HOST, + port: Int = ConnectionFactory.DEFAULT_AMQP_PORT, + username: String = ConnectionFactory.DEFAULT_USER, + password: String = ConnectionFactory.DEFAULT_PASS, + virtualHost: String = ConnectionFactory.DEFAULT_VHOST, + initReconnectDelay: Long = 5000, + connectionCallback: Option[ActorRef] = None) + + case class ChannelParameters( + shutdownListener: Option[ShutdownListener] = None, + channelCallback: Option[ActorRef] = None) + + case class ExchangeParameters( + exchangeName: String, + exchangeType: ExchangeType, + exchangeDurable: Boolean = false, + exchangeAutoDelete: Boolean = true, + exchangePassive: Boolean = false, + configurationArguments: Map[String, AnyRef] = Map()) + + case class ProducerParameters( + exchangeParameters: ExchangeParameters, + producerId: Option[String] = None, + returnListener: Option[ReturnListener] = None, + channelParameters: Option[ChannelParameters] = None) + + case class ConsumerParameters( + exchangeParameters: ExchangeParameters, + routingKey: String, + deliveryHandler: ActorRef, + queueName: Option[String] = None, + queueDurable: Boolean = false, + queueAutoDelete: Boolean = true, + queuePassive: Boolean = false, + queueExclusive: Boolean = false, + selfAcknowledging: Boolean = true, + channelParameters: Option[ChannelParameters] = None) { + if (queueDurable && queueName.isEmpty) { + throw new IllegalArgumentException("A queue name is required when requesting a durable queue.") + } + } + + def newConnection(connectionParameters: ConnectionParameters = new ConnectionParameters): ActorRef = { + val connection = actorOf(new FaultTolerantConnectionActor(connectionParameters)) + supervisor.startLink(connection) + connection ! Connect + connection + } + + def newProducer(connection: ActorRef, producerParameters: ProducerParameters): ActorRef = { + val producer: ActorRef = Actor.actorOf(new ProducerActor(producerParameters)) + connection.startLink(producer) + producer ! Start + producer + } + + def newConsumer(connection: ActorRef, consumerParameters: ConsumerParameters): ActorRef = { + val consumer: ActorRef = actorOf(new ConsumerActor(consumerParameters)) + val handler = consumerParameters.deliveryHandler + if (handler.supervisor.isEmpty) consumer.startLink(handler) + connection.startLink(consumer) + consumer ! Start + consumer + } + + /** + * Convenience + */ + class ProducerClient[O](client: ActorRef, routingKey: String, toBinary: ToBinary[O]) { + def send(request: O, replyTo: Option[String] = None) = { + val basicProperties = new BasicProperties + basicProperties.setReplyTo(replyTo.getOrElse(null)) + client ! Message(toBinary.toBinary(request), routingKey, false, false, Some(basicProperties)) + } + + def stop = client.stop + } + + def newStringProducer(connection: ActorRef, + exchange: String, + routingKey: Option[String] = None, + producerId: Option[String] = None, + durable: Boolean = false, + autoDelete: Boolean = true, + passive: Boolean = true): ProducerClient[String] = { + + val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic, + exchangeDurable = durable, exchangeAutoDelete = autoDelete) + val rKey = routingKey.getOrElse("%s.request".format(exchange)) + + val producerRef = newProducer(connection, ProducerParameters(exchangeParameters, producerId)) + val toBinary = new ToBinary[String] { + def toBinary(t: String) = t.getBytes + } + new ProducerClient(producerRef, rKey, toBinary) + } + + def newStringConsumer(connection: ActorRef, + exchange: String, + handler: String => Unit, + routingKey: Option[String] = None, + queueName: Option[String] = None, + durable: Boolean = false, + autoDelete: Boolean = true): ActorRef = { + + val deliveryHandler = actor { + case Delivery(payload, _, _, _, _) => handler.apply(new String(payload)) + } + + val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic, + exchangeDurable = durable, exchangeAutoDelete = autoDelete) + val rKey = routingKey.getOrElse("%s.request".format(exchange)) + val qName = queueName.getOrElse("%s.in".format(rKey)) + + newConsumer(connection, ConsumerParameters(exchangeParameters, rKey, deliveryHandler, Some(qName), durable, autoDelete)) + } + + def newProtobufProducer[O <: com.google.protobuf.Message](connection: ActorRef, + exchange: String, + routingKey: Option[String] = None, + producerId: Option[String] = None, + durable: Boolean = false, + autoDelete: Boolean = true, + passive: Boolean = true): ProducerClient[O] = { + + val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic, + exchangeDurable = durable, exchangeAutoDelete = autoDelete) + val rKey = routingKey.getOrElse("%s.request".format(exchange)) + + val producerRef = newProducer(connection, ProducerParameters(exchangeParameters, producerId)) + new ProducerClient(producerRef, rKey, new ToBinary[O] { + def toBinary(t: O) = t.toByteArray + }) + } + + def newProtobufConsumer[I <: com.google.protobuf.Message](connection: ActorRef, + exchange: String, + handler: I => Unit, + routingKey: Option[String] = None, + queueName: Option[String] = None, + durable: Boolean = false, + autoDelete: Boolean = true)(implicit manifest: Manifest[I]): ActorRef = { + + val deliveryHandler = actor { + case Delivery(payload, _, _, _, _) => { + handler.apply(createProtobufFromBytes[I](payload)) + } + } + + val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic, + exchangeDurable = durable, exchangeAutoDelete = autoDelete) + val rKey = routingKey.getOrElse("%s.request".format(exchange)) + val qName = queueName.getOrElse("%s.in".format(rKey)) + + newConsumer(connection, ConsumerParameters(exchangeParameters, rKey, deliveryHandler, Some(qName), durable, autoDelete)) + } + + /** + * Main supervisor + */ + + class AMQPSupervisorActor extends Actor { + import self._ + + faultHandler = Some(OneForOneStrategy(5, 5000)) + trapExit = List(classOf[Throwable]) + + def receive = { + case _ => {} // ignore all messages + } + } + + private val supervisor = actorOf(new AMQPSupervisorActor).start + + def shutdownAll = { + supervisor.shutdownLinkedActors + } + + /** + * Serialization stuff + */ + + trait FromBinary[T] { + def fromBinary(bytes: Array[Byte]): T + } + + trait ToBinary[T] { + def toBinary(t: T): Array[Byte] + } + + private val ARRAY_OF_BYTE_ARRAY = Array[Class[_]](classOf[Array[Byte]]) + + private[amqp] def createProtobufFromBytes[I <: com.google.protobuf.Message](bytes: Array[Byte])(implicit manifest: Manifest[I]): I = { + manifest.erasure.getDeclaredMethod("parseFrom", ARRAY_OF_BYTE_ARRAY: _*).invoke(null, bytes).asInstanceOf[I] + } +} diff --git a/akka-amqp/src/main/scala/AMQPMessage.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQPMessage.scala similarity index 91% rename from akka-amqp/src/main/scala/AMQPMessage.scala rename to akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQPMessage.scala index 92cd95906a..34eb37aa14 100644 --- a/akka-amqp/src/main/scala/AMQPMessage.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQPMessage.scala @@ -44,6 +44,9 @@ case object Stopped extends AMQPMessage // delivery messages case class Acknowledge(deliveryTag: Long) extends AMQPMessage case class Acknowledged(deliveryTag: Long) extends AMQPMessage +case class Reject(deliveryTag: Long) extends AMQPMessage +case class Rejected(deliveryTag: Long) extends AMQPMessage +class RejectionException(deliveryTag: Long) extends RuntimeException // internal messages private[akka] case class Failure(cause: Throwable) extends InternalAMQPMessage diff --git a/akka-amqp/src/main/scala/ConsumerActor.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ConsumerActor.scala similarity index 83% rename from akka-amqp/src/main/scala/ConsumerActor.scala rename to akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ConsumerActor.scala index 90c8d7deec..b01f79f949 100644 --- a/akka-amqp/src/main/scala/ConsumerActor.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ConsumerActor.scala @@ -23,6 +23,7 @@ private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters) def specificMessageHandler = { case Acknowledge(deliveryTag) => acknowledgeDeliveryTag(deliveryTag, true) + case Reject(deliveryTag) => rejectDeliveryTag(deliveryTag, true) case message: Message => handleIllegalMessage("%s can't be used to send messages, ignoring message [%s]".format(this, message)) case unknown => @@ -82,6 +83,19 @@ private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters) } } + private def rejectDeliveryTag(deliveryTag: Long, remoteAcknowledgement: Boolean) = { + log.debug("Rejecting message with delivery tag [%s]", deliveryTag) + // FIXME: when rabbitmq 1.9 arrives, basicReject should be available on the API and implemented instead of this + log.warning("Consumer is rejecting delivery with tag [%s] - " + + "for now this means we have to self terminate and kill the channel - see you in a second.") + channel.foreach{ch => + if (remoteAcknowledgement) { + deliveryHandler ! Rejected(deliveryTag) + } + } + throw new RejectionException(deliveryTag) + } + private def handleIllegalMessage(errorMessage: String) = { log.error(errorMessage) throw new IllegalArgumentException(errorMessage) @@ -94,7 +108,7 @@ private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters) override def shutdown = { listenerTag.foreach(tag => channel.foreach(_.basicCancel(tag))) - self.linkedActorsAsList.foreach(_.stop) + self.shutdownLinkedActors super.shutdown } diff --git a/akka-amqp/src/main/scala/ExampleSession.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala similarity index 58% rename from akka-amqp/src/main/scala/ExampleSession.scala rename to akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala index 4fa1358a29..f35da954d0 100644 --- a/akka-amqp/src/main/scala/ExampleSession.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala @@ -4,44 +4,64 @@ package se.scalablesolutions.akka.amqp +import rpc.RPC +import rpc.RPC.{RpcClientSerializer, RpcServerSerializer} import se.scalablesolutions.akka.actor.{Actor, ActorRegistry} import Actor._ import java.util.concurrent.{CountDownLatch, TimeUnit} -import se.scalablesolutions.akka.amqp.AMQP._ import java.lang.String +import se.scalablesolutions.akka.amqp.AMQP._ +import se.scalablesolutions.akka.remote.protocol.RemoteProtocol.AddressProtocol object ExampleSession { def main(args: Array[String]) = { - println("==== DIRECT ===") + + printTopic("DIRECT") direct - TimeUnit.SECONDS.sleep(2) - - println("==== FANOUT ===") + printTopic("FANOUT") fanout - TimeUnit.SECONDS.sleep(2) - - println("==== TOPIC ===") + printTopic("TOPIC") topic - TimeUnit.SECONDS.sleep(2) - - println("==== CALLBACK ===") + printTopic("CALLBACK") callback - TimeUnit.SECONDS.sleep(2) + printTopic("EASY STRING PRODUCER AND CONSUMER") + easyStringProducerConsumer - println("==== RPC ===") + printTopic("EASY PROTOBUF PRODUCER AND CONSUMER") + easyProtobufProducerConsumer + + printTopic("RPC") rpc - TimeUnit.SECONDS.sleep(2) + printTopic("EASY STRING RPC") + easyStringRpc + + printTopic("EASY PROTOBUF RPC") + easyProtobufRpc + + printTopic("Happy hAkking :-)") + + // shutdown everything the amqp tree except the main AMQP supervisor + // all connections/consumers/producers will be stopped + AMQP.shutdownAll ActorRegistry.shutdownAll System.exit(0) } + def printTopic(topic: String) { + + println("") + println("==== " + topic + " ===") + println("") + TimeUnit.SECONDS.sleep(2) + } + def direct = { // defaults to amqp://guest:guest@localhost:5672/ @@ -115,7 +135,7 @@ object ExampleSession { case Restarting => // not used, sent when channel or connection fails and initiates a restart case Stopped => log.info("Channel callback: Stopped") } - val exchangeParameters = ExchangeParameters("my_direct_exchange", ExchangeType.Direct) + val exchangeParameters = ExchangeParameters("my_callback_exchange", ExchangeType.Direct) val channelParameters = ChannelParameters(channelCallback = Some(channelCallback)) val consumer = AMQP.newConsumer(connection, ConsumerParameters(exchangeParameters, "callback.routing", actor { @@ -129,6 +149,40 @@ object ExampleSession { connection.stop } + def easyStringProducerConsumer = { + val connection = AMQP.newConnection() + + val exchangeName = "easy.string" + + // listen by default to: + // exchange = exchangeName + // routingKey = .request + // queueName = .in + AMQP.newStringConsumer(connection, exchangeName, message => println("Received message: "+message)) + + // send by default to: + // exchange = exchangeName + // routingKey = .request + val producer = AMQP.newStringProducer(connection, exchangeName) + + producer.send("This shit is easy!") + } + + def easyProtobufProducerConsumer = { + val connection = AMQP.newConnection() + + val exchangeName = "easy.protobuf" + + def protobufMessageHandler(message: AddressProtocol) = { + log.info("Received "+message) + } + + AMQP.newProtobufConsumer(connection, exchangeName, protobufMessageHandler) + + val producerClient = AMQP.newProtobufProducer[AddressProtocol](connection, exchangeName) + producerClient.send(AddressProtocol.newBuilder.setHostname("akkarocks.com").setPort(1234).build) + } + def rpc = { val connection = AMQP.newConnection() @@ -146,7 +200,7 @@ object ExampleSession { def requestHandler(request: String) = 3 - val rpcServer = AMQP.newRpcServer[String,Int](connection, exchangeParameters, "rpc.in.key", rpcServerSerializer, + val rpcServer = RPC.newRpcServer[String,Int](connection, exchangeParameters, "rpc.in.key", rpcServerSerializer, requestHandler, queueName = Some("rpc.in.key.queue")) @@ -159,9 +213,56 @@ object ExampleSession { } val rpcClientSerializer = new RpcClientSerializer[String, Int](clientToBinary, clientFromBinary) - val rpcClient = AMQP.newRpcClient[String,Int](connection, exchangeParameters, "rpc.in.key", rpcClientSerializer) + val rpcClient = RPC.newRpcClient[String,Int](connection, exchangeParameters, "rpc.in.key", rpcClientSerializer) val response = (rpcClient !! "rpc_request") log.info("Response: " + response) } + + def easyStringRpc = { + + val connection = AMQP.newConnection() + + val exchangeName = "easy.stringrpc" + + // listen by default to: + // exchange = exchangeName + // routingKey = .request + // queueName = .in + RPC.newStringRpcServer(connection, exchangeName, request => { + log.info("Got request: "+request) + "Response to: '"+request+"'" + }) + + // send by default to: + // exchange = exchangeName + // routingKey = .request + val stringRpcClient = RPC.newStringRpcClient(connection, exchangeName) + + val response = stringRpcClient.call("AMQP Rocks!") + log.info("Got response: "+response) + + stringRpcClient.callAsync("AMQP is dead easy") { + case response => log.info("This is handled async: "+response) + } + } + + def easyProtobufRpc = { + + val connection = AMQP.newConnection() + + val exchangeName = "easy.protobuf.rpc" + + def protobufRequestHandler(request: AddressProtocol): AddressProtocol = { + AddressProtocol.newBuilder.setHostname(request.getHostname.reverse).setPort(request.getPort).build + } + + RPC.newProtobufRpcServer(connection, exchangeName, protobufRequestHandler) + + val stringRpcClient = RPC.newProtobufRpcClient[AddressProtocol, AddressProtocol](connection, exchangeName) + + val response = stringRpcClient.call(AddressProtocol.newBuilder.setHostname("localhost").setPort(4321).build) + + log.info("Got response: "+response) + } } diff --git a/akka-amqp/src/main/scala/ExchangeType.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExchangeType.scala similarity index 100% rename from akka-amqp/src/main/scala/ExchangeType.scala rename to akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExchangeType.scala diff --git a/akka-amqp/src/main/scala/FaultTolerantChannelActor.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantChannelActor.scala similarity index 100% rename from akka-amqp/src/main/scala/FaultTolerantChannelActor.scala rename to akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantChannelActor.scala diff --git a/akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantConnectionActor.scala similarity index 98% rename from akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala rename to akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantConnectionActor.scala index 97c3074700..1e50a985be 100644 --- a/akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantConnectionActor.scala @@ -107,7 +107,7 @@ private[amqp] class FaultTolerantConnectionActor(connectionParameters: Connectio override def shutdown = { reconnectionTimer.cancel // make sure shutdown is called on all linked actors so they can do channel cleanup before connection is killed - self.linkedActorsAsList.foreach(_.stop) + self.shutdownLinkedActors disconnect } diff --git a/akka-amqp/src/main/scala/ProducerActor.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ProducerActor.scala similarity index 100% rename from akka-amqp/src/main/scala/ProducerActor.scala rename to akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ProducerActor.scala diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala new file mode 100644 index 0000000000..b51cbe407f --- /dev/null +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala @@ -0,0 +1,182 @@ +package se.scalablesolutions.akka.amqp.rpc + +import se.scalablesolutions.akka.amqp.AMQP._ +import com.google.protobuf.Message +import se.scalablesolutions.akka.actor.{Actor, ActorRef} +import Actor._ +import se.scalablesolutions.akka.amqp._ + +object RPC { + + def newRpcClient[O, I](connection: ActorRef, + exchangeParameters: ExchangeParameters, + routingKey: String, + serializer: RpcClientSerializer[O, I], + channelParameters: Option[ChannelParameters] = None): ActorRef = { + val rpcActor: ActorRef = actorOf(new RpcClientActor[O, I]( + exchangeParameters, routingKey, serializer, channelParameters)) + connection.startLink(rpcActor) + rpcActor ! Start + rpcActor + } + + def newRpcServer[I, O](connection: ActorRef, + exchangeParameters: ExchangeParameters, + routingKey: String, + serializer: RpcServerSerializer[I, O], + requestHandler: I => O, + queueName: Option[String] = None, + channelParameters: Option[ChannelParameters] = None): RpcServerHandle = { + val producer = newProducer(connection, ProducerParameters( + ExchangeParameters("", ExchangeType.Direct), channelParameters = channelParameters)) + val rpcServer = actorOf(new RpcServerActor[I, O](producer, serializer, requestHandler)) + val consumer = newConsumer(connection, ConsumerParameters(exchangeParameters, routingKey, rpcServer, + channelParameters = channelParameters, selfAcknowledging = false, queueName = queueName)) + RpcServerHandle(producer, consumer) + } + + case class RpcServerHandle(producer: ActorRef, consumer: ActorRef) { + def stop = { + consumer.stop + producer.stop + } + } + + case class RpcClientSerializer[O, I](toBinary: ToBinary[O], fromBinary: FromBinary[I]) + + case class RpcServerSerializer[I, O](fromBinary: FromBinary[I], toBinary: ToBinary[O]) + + + /** + * RPC convenience + */ + class RpcClient[O, I](client: ActorRef){ + def call(request: O, timeout: Long = 5000): Option[I] = { + (client.!!(request, timeout)).as[I] + } + + def callAsync(request: O, timeout: Long = 5000)(responseHandler: PartialFunction[Option[I],Unit]) = { + spawn { + val result = call(request, timeout) + responseHandler.apply(result) + } + } + def stop = client.stop + } + + def newProtobufRpcServer[I <: Message, O <: Message]( + connection: ActorRef, + exchange: String, + requestHandler: I => O, + routingKey: Option[String] = None, + queueName: Option[String] = None, + durable: Boolean = false, + autoDelete: Boolean = true)(implicit manifest: Manifest[I]): RpcServerHandle = { + + val serializer = new RpcServerSerializer[I, O]( + new FromBinary[I] { + def fromBinary(bytes: Array[Byte]): I = { + createProtobufFromBytes[I](bytes) + } + }, new ToBinary[O] { + def toBinary(t: O) = t.toByteArray + }) + + startServer(connection, exchange, requestHandler, routingKey, queueName, durable, autoDelete, serializer) + } + + def newProtobufRpcClient[O <: Message, I <: Message]( + connection: ActorRef, + exchange: String, + routingKey: Option[String] = None, + durable: Boolean = false, + autoDelete: Boolean = true, + passive: Boolean = true)(implicit manifest: Manifest[I]): RpcClient[O, I] = { + + + val serializer = new RpcClientSerializer[O, I]( + new ToBinary[O] { + def toBinary(t: O) = t.toByteArray + }, new FromBinary[I] { + def fromBinary(bytes: Array[Byte]): I = { + createProtobufFromBytes[I](bytes) + } + }) + + startClient(connection, exchange, routingKey, durable, autoDelete, passive, serializer) + } + + def newStringRpcServer(connection: ActorRef, + exchange: String, + requestHandler: String => String, + routingKey: Option[String] = None, + queueName: Option[String] = None, + durable: Boolean = false, + autoDelete: Boolean = true): RpcServerHandle = { + + val serializer = new RpcServerSerializer[String, String]( + new FromBinary[String] { + def fromBinary(bytes: Array[Byte]): String = { + new String(bytes) + } + }, new ToBinary[String] { + def toBinary(t: String) = t.getBytes + }) + + startServer(connection, exchange, requestHandler, routingKey, queueName, durable, autoDelete, serializer) + } + + def newStringRpcClient(connection: ActorRef, + exchange: String, + routingKey: Option[String] = None, + durable: Boolean = false, + autoDelete: Boolean = true, + passive: Boolean = true): RpcClient[String, String] = { + + + val serializer = new RpcClientSerializer[String, String]( + new ToBinary[String] { + def toBinary(t: String) = t.getBytes + }, new FromBinary[String] { + def fromBinary(bytes: Array[Byte]): String = { + new String(bytes) + } + }) + + startClient(connection, exchange, routingKey, durable, autoDelete, passive, serializer) + } + + private def startClient[O, I](connection: ActorRef, + exchange: String, + routingKey: Option[String] = None, + durable: Boolean = false, + autoDelete: Boolean = true, + passive: Boolean = true, + serializer: RpcClientSerializer[O, I]): RpcClient[O, I] = { + + val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic, + exchangeDurable = durable, exchangeAutoDelete = autoDelete, exchangePassive = passive) + val rKey = routingKey.getOrElse("%s.request".format(exchange)) + + val client = newRpcClient(connection, exchangeParameters, rKey, serializer) + new RpcClient(client) + } + + private def startServer[I, O](connection: ActorRef, + exchange: String, + requestHandler: I => O, + routingKey: Option[String] = None, + queueName: Option[String] = None, + durable: Boolean = false, + autoDelete: Boolean = true, + serializer: RpcServerSerializer[I, O]): RpcServerHandle = { + + val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic, + exchangeDurable = durable, exchangeAutoDelete = autoDelete) + val rKey = routingKey.getOrElse("%s.request".format(exchange)) + val qName = queueName.getOrElse("%s.in".format(rKey)) + + newRpcServer(connection, exchangeParameters, rKey, serializer, requestHandler, queueName = Some(qName)) + } +} + diff --git a/akka-amqp/src/main/scala/RpcClientActor.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RpcClientActor.scala similarity index 89% rename from akka-amqp/src/main/scala/RpcClientActor.scala rename to akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RpcClientActor.scala index 0691e76884..5c717cb8bb 100644 --- a/akka-amqp/src/main/scala/RpcClientActor.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RpcClientActor.scala @@ -4,11 +4,9 @@ package se.scalablesolutions.akka.amqp -import se.scalablesolutions.akka.serialization.Serializer -import se.scalablesolutions.akka.amqp.AMQP.{ChannelParameters, ExchangeParameters} - import com.rabbitmq.client.{Channel, RpcClient} -import se.scalablesolutions.akka.amqp.AMQP.{RpcClientSerializer, ChannelParameters, ExchangeParameters} +import rpc.RPC.RpcClientSerializer +import se.scalablesolutions.akka.amqp.AMQP.{ChannelParameters, ExchangeParameters} class RpcClientActor[I,O]( exchangeParameters: ExchangeParameters, @@ -41,5 +39,11 @@ class RpcClientActor[I,O]( super.preRestart(reason) } + + override def shutdown = { + rpcClient.foreach(rpc => rpc.close) + super.shutdown + } + override def toString = "AMQP.RpcClient[exchange=" +exchangeName + ", routingKey=" + routingKey+ "]" } diff --git a/akka-amqp/src/main/scala/RpcServerActor.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RpcServerActor.scala similarity index 94% rename from akka-amqp/src/main/scala/RpcServerActor.scala rename to akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RpcServerActor.scala index 309c7fa40c..5f6b4b713c 100644 --- a/akka-amqp/src/main/scala/RpcServerActor.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RpcServerActor.scala @@ -4,9 +4,9 @@ package se.scalablesolutions.akka.amqp +import rpc.RPC.RpcServerSerializer import se.scalablesolutions.akka.actor.{ActorRef, Actor} import com.rabbitmq.client.AMQP.BasicProperties -import se.scalablesolutions.akka.amqp.AMQP.RpcServerSerializer class RpcServerActor[I,O]( producer: ActorRef, diff --git a/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala b/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala deleted file mode 100644 index c585675098..0000000000 --- a/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -package se.scalablesolutions.akka.amqp.test - -import se.scalablesolutions.akka.util.Logging -import org.scalatest.junit.JUnitSuite -import org.junit.Test -import se.scalablesolutions.akka.amqp._ -import se.scalablesolutions.akka.actor.Actor._ -import org.scalatest.matchers.MustMatchers -import java.util.concurrent.{CountDownLatch, TimeUnit} -import se.scalablesolutions.akka.amqp.AMQP._ - -class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging { - @Test - def consumerMessage = if (AMQPTest.enabled) { - val connection = AMQP.newConnection() - try { - - val countDown = new CountDownLatch(3) - val channelCallback = actor { - case Started => countDown.countDown - case Restarting => () - case Stopped => () - } - - val exchangeParameters = ExchangeParameters("text_topic_exchange", ExchangeType.Topic) - val channelParameters = ChannelParameters(channelCallback - = Some(channelCallback)) - - val rpcServerSerializer = new RpcServerSerializer[String, Int]( - new FromBinary[String] { - def fromBinary(bytes: Array[Byte]) = new String(bytes) - }, new ToBinary[Int] { - def toBinary(t: Int) = Array(t.toByte) - }) - - def requestHandler(request: String) = 3 - - val rpcServer = AMQP.newRpcServer[String, Int](connection, exchangeParameters, "rpc.routing", rpcServerSerializer, - requestHandler, channelParameters = Some(channelParameters)) - - val rpcClientSerializer = new RpcClientSerializer[String, Int]( - new ToBinary[String] { - def toBinary(t: String) = t.getBytes - }, new FromBinary[Int] { - def fromBinary(bytes: Array[Byte]) = bytes.head.toInt - }) - - val rpcClient = AMQP.newRpcClient[String, Int](connection, exchangeParameters, "rpc.routing", rpcClientSerializer, - channelParameters = Some(channelParameters)) - - countDown.await(2, TimeUnit.SECONDS) must be(true) - val response = rpcClient !! "some_payload" - response must be(Some(3)) - } finally { - connection.stop - } - } - - @Test - def dummy { - // amqp tests need local rabbitmq server running, so a disabled by default. - // this dummy test makes sure that the whole test class doesn't fail because of missing tests - assert(true) - } -} diff --git a/akka-amqp/src/test/scala/AMQPConnectionRecoveryTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConnectionRecoveryTest.scala similarity index 81% rename from akka-amqp/src/test/scala/AMQPConnectionRecoveryTest.scala rename to akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConnectionRecoveryTest.scala index c1af35546a..f9d30227f0 100644 --- a/akka-amqp/src/test/scala/AMQPConnectionRecoveryTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConnectionRecoveryTest.scala @@ -1,12 +1,9 @@ +package se.scalablesolutions.akka.amqp.test + /** * Copyright (C) 2009-2010 Scalable Solutions AB */ -package se.scalablesolutions.akka.amqp.test - -import se.scalablesolutions.akka.util.Logging -import org.scalatest.junit.JUnitSuite -import org.junit.Test import java.util.concurrent.TimeUnit import se.scalablesolutions.akka.actor.{Actor, ActorRef} import org.multiverse.api.latches.StandardLatch @@ -14,11 +11,13 @@ import com.rabbitmq.client.ShutdownSignalException import se.scalablesolutions.akka.amqp._ import se.scalablesolutions.akka.amqp.AMQP.ConnectionParameters import org.scalatest.matchers.MustMatchers +import org.scalatest.junit.JUnitSuite +import org.junit.Test -class AMQPConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging { +class AMQPConnectionRecoveryTest extends JUnitSuite with MustMatchers { @Test - def connectionAndRecovery = if (AMQPTest.enabled) { + def connectionAndRecovery = if (AMQPTest.enabled) AMQPTest.withCleanEndState { val connectedLatch = new StandardLatch val reconnectingLatch = new StandardLatch @@ -45,15 +44,9 @@ class AMQPConnectionRecoveryTest extends JUnitSuite with MustMatchers with Loggi reconnectedLatch.tryAwait(2, TimeUnit.SECONDS) must be(true) } finally { - connection.stop + AMQP.shutdownAll disconnectedLatch.tryAwait(2, TimeUnit.SECONDS) must be(true) } } - @Test - def dummy { - // amqp tests need local rabbitmq server running, so a disabled by default. - // this dummy test makes sure that the whole test class doesn't fail because of missing tests - assert(true) - } } diff --git a/akka-amqp/src/test/scala/AMQPConsumerChannelRecoveryTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerChannelRecoveryTest.scala similarity index 86% rename from akka-amqp/src/test/scala/AMQPConsumerChannelRecoveryTest.scala rename to akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerChannelRecoveryTest.scala index a0b44f4739..31a90c8200 100644 --- a/akka-amqp/src/test/scala/AMQPConsumerChannelRecoveryTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerChannelRecoveryTest.scala @@ -1,12 +1,9 @@ +package se.scalablesolutions.akka.amqp.test + /** * Copyright (C) 2009-2010 Scalable Solutions AB */ -package se.scalablesolutions.akka.amqp.test - -import se.scalablesolutions.akka.util.Logging -import org.scalatest.junit.JUnitSuite -import se.scalablesolutions.akka.actor.Actor._ import org.multiverse.api.latches.StandardLatch import com.rabbitmq.client.ShutdownSignalException import se.scalablesolutions.akka.amqp._ @@ -15,11 +12,13 @@ import java.util.concurrent.TimeUnit import se.scalablesolutions.akka.actor.ActorRef import org.junit.Test import se.scalablesolutions.akka.amqp.AMQP._ +import org.scalatest.junit.JUnitSuite +import se.scalablesolutions.akka.actor.Actor._ -class AMQPConsumerChannelRecoveryTest extends JUnitSuite with MustMatchers with Logging { +class AMQPConsumerChannelRecoveryTest extends JUnitSuite with MustMatchers { @Test - def consumerChannelRecovery = if (AMQPTest.enabled) { + def consumerChannelRecovery = if (AMQPTest.enabled) AMQPTest.withCleanEndState { val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50)) try { @@ -60,11 +59,4 @@ class AMQPConsumerChannelRecoveryTest extends JUnitSuite with MustMatchers with connection.stop } } - - @Test - def dummy { - // amqp tests need local rabbitmq server running, so a disabled by default. - // this dummy test makes sure that the whole test class doesn't fail because of missing tests - assert(true) - } } diff --git a/akka-amqp/src/test/scala/AMQPConsumerConnectionRecoveryTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerConnectionRecoveryTest.scala similarity index 86% rename from akka-amqp/src/test/scala/AMQPConsumerConnectionRecoveryTest.scala rename to akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerConnectionRecoveryTest.scala index bf4885fea5..50c078a13a 100644 --- a/akka-amqp/src/test/scala/AMQPConsumerConnectionRecoveryTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerConnectionRecoveryTest.scala @@ -1,25 +1,24 @@ +package se.scalablesolutions.akka.amqp.test + /** * Copyright (C) 2009-2010 Scalable Solutions AB */ -package se.scalablesolutions.akka.amqp.test - -import se.scalablesolutions.akka.util.Logging -import org.scalatest.junit.JUnitSuite -import se.scalablesolutions.akka.actor.Actor._ import org.multiverse.api.latches.StandardLatch import com.rabbitmq.client.ShutdownSignalException import se.scalablesolutions.akka.amqp._ import org.scalatest.matchers.MustMatchers import java.util.concurrent.TimeUnit -import se.scalablesolutions.akka.actor.ActorRef import org.junit.Test import se.scalablesolutions.akka.amqp.AMQP._ +import org.scalatest.junit.JUnitSuite +import se.scalablesolutions.akka.actor.{Actor, ActorRef} +import Actor._ -class AMQPConsumerConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging { +class AMQPConsumerConnectionRecoveryTest extends JUnitSuite with MustMatchers { @Test - def consumerConnectionRecovery = if (AMQPTest.enabled) { + def consumerConnectionRecovery = if (AMQPTest.enabled) AMQPTest.withCleanEndState { val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50)) try { @@ -79,11 +78,4 @@ class AMQPConsumerConnectionRecoveryTest extends JUnitSuite with MustMatchers wi connection.stop } } - - @Test - def dummy { - // amqp tests need local rabbitmq server running, so a disabled by default. - // this dummy test makes sure that the whole test class doesn't fail because of missing tests - assert(true) - } } diff --git a/akka-amqp/src/test/scala/AMQPConsumerManualAcknowledgeTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerManualAcknowledgeTest.scala similarity index 86% rename from akka-amqp/src/test/scala/AMQPConsumerManualAcknowledgeTest.scala rename to akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerManualAcknowledgeTest.scala index 2dc4ee939b..011f287636 100644 --- a/akka-amqp/src/test/scala/AMQPConsumerManualAcknowledgeTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerManualAcknowledgeTest.scala @@ -1,12 +1,9 @@ +package se.scalablesolutions.akka.amqp.test + /** * Copyright (C) 2009-2010 Scalable Solutions AB */ -package se.scalablesolutions.akka.amqp.test - -import se.scalablesolutions.akka.util.Logging -import org.scalatest.junit.JUnitSuite -import org.multiverse.api.latches.StandardLatch import se.scalablesolutions.akka.actor.Actor._ import org.scalatest.matchers.MustMatchers import se.scalablesolutions.akka.amqp._ @@ -14,11 +11,13 @@ import org.junit.Test import se.scalablesolutions.akka.actor.ActorRef import java.util.concurrent.{CountDownLatch, TimeUnit} import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ConsumerParameters, ChannelParameters, ProducerParameters} +import org.multiverse.api.latches.StandardLatch +import org.scalatest.junit.JUnitSuite -class AMQPConsumerManualAcknowledgeTest extends JUnitSuite with MustMatchers with Logging { +class AMQPConsumerManualAcknowledgeTest extends JUnitSuite with MustMatchers { @Test - def consumerMessageManualAcknowledge = if (AMQPTest.enabled) { + def consumerMessageManualAcknowledge = if (AMQPTest.enabled) AMQPTest.withCleanEndState { val connection = AMQP.newConnection() try { val countDown = new CountDownLatch(2) @@ -57,11 +56,4 @@ class AMQPConsumerManualAcknowledgeTest extends JUnitSuite with MustMatchers wit connection.stop } } - - @Test - def dummy { - // amqp tests need local rabbitmq server running, so a disabled by default. - // this dummy test makes sure that the whole test class doesn't fail because of missing tests - assert(true) - } } diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerManualRejectTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerManualRejectTest.scala new file mode 100644 index 0000000000..d00d09b480 --- /dev/null +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerManualRejectTest.scala @@ -0,0 +1,53 @@ +package se.scalablesolutions.akka.amqp.test + +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +import se.scalablesolutions.akka.actor.Actor._ +import org.scalatest.matchers.MustMatchers +import se.scalablesolutions.akka.amqp._ +import org.junit.Test +import se.scalablesolutions.akka.actor.ActorRef +import java.util.concurrent.{CountDownLatch, TimeUnit} +import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ConsumerParameters, ChannelParameters, ProducerParameters} +import org.multiverse.api.latches.StandardLatch +import org.scalatest.junit.JUnitSuite + +class AMQPConsumerManualRejectTest extends JUnitSuite with MustMatchers { + + @Test + def consumerMessageManualAcknowledge = if (AMQPTest.enabled) AMQPTest.withCleanEndState { + val connection = AMQP.newConnection() + try { + val countDown = new CountDownLatch(2) + val restartingLatch = new StandardLatch + val channelCallback = actor { + case Started => countDown.countDown + case Restarting => restartingLatch.open + case Stopped => () + } + val exchangeParameters = ExchangeParameters("text_exchange",ExchangeType.Direct) + val channelParameters = ChannelParameters(channelCallback = Some(channelCallback)) + + val rejectedLatch = new StandardLatch + val consumer:ActorRef = AMQP.newConsumer(connection, ConsumerParameters(exchangeParameters, "manual.reject.this", actor { + case Delivery(payload, _, deliveryTag, _, sender) => { + sender.foreach(_ ! Reject(deliveryTag)) + } + case Rejected(deliveryTag) => rejectedLatch.open + }, queueName = Some("self.reject.queue"), selfAcknowledging = false, queueAutoDelete = false, channelParameters = Some(channelParameters))) + + val producer = AMQP.newProducer(connection, + ProducerParameters(exchangeParameters, channelParameters = Some(channelParameters))) + + countDown.await(2, TimeUnit.SECONDS) must be (true) + producer ! Message("some_payload".getBytes, "manual.reject.this") + + rejectedLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) + restartingLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) + } finally { + connection.stop + } + } +} \ No newline at end of file diff --git a/akka-amqp/src/test/scala/AMQPConsumerMessageTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerMessageTest.scala similarity index 83% rename from akka-amqp/src/test/scala/AMQPConsumerMessageTest.scala rename to akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerMessageTest.scala index 5d34f867d6..88661de58d 100644 --- a/akka-amqp/src/test/scala/AMQPConsumerMessageTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerMessageTest.scala @@ -1,23 +1,22 @@ +package se.scalablesolutions.akka.amqp.test + /** * Copyright (C) 2009-2010 Scalable Solutions AB */ -package se.scalablesolutions.akka.amqp.test - -import se.scalablesolutions.akka.util.Logging -import org.scalatest.junit.JUnitSuite -import org.junit.Test import se.scalablesolutions.akka.amqp._ import org.multiverse.api.latches.StandardLatch import se.scalablesolutions.akka.actor.Actor._ import org.scalatest.matchers.MustMatchers import java.util.concurrent.{CountDownLatch, TimeUnit} import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ConsumerParameters, ChannelParameters, ProducerParameters} +import org.scalatest.junit.JUnitSuite +import org.junit.Test -class AMQPConsumerMessageTest extends JUnitSuite with MustMatchers with Logging { +class AMQPConsumerMessageTest extends JUnitSuite with MustMatchers { @Test - def consumerMessage = if (AMQPTest.enabled) { + def consumerMessage = if (AMQPTest.enabled) AMQPTest.withCleanEndState { val connection = AMQP.newConnection() try { @@ -46,11 +45,4 @@ class AMQPConsumerMessageTest extends JUnitSuite with MustMatchers with Logging connection.stop } } - - @Test - def dummy { - // amqp tests need local rabbitmq server running, so a disabled by default. - // this dummy test makes sure that the whole test class doesn't fail because of missing tests - assert(true) - } } diff --git a/akka-amqp/src/test/scala/AMQPProducerChannelRecoveryTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerChannelRecoveryTest.scala similarity index 84% rename from akka-amqp/src/test/scala/AMQPProducerChannelRecoveryTest.scala rename to akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerChannelRecoveryTest.scala index 26b2d78393..e0ede02de3 100644 --- a/akka-amqp/src/test/scala/AMQPProducerChannelRecoveryTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerChannelRecoveryTest.scala @@ -1,12 +1,9 @@ +package se.scalablesolutions.akka.amqp.test + /** * Copyright (C) 2009-2010 Scalable Solutions AB */ -package se.scalablesolutions.akka.amqp.test - -import se.scalablesolutions.akka.util.Logging -import org.scalatest.junit.JUnitSuite -import org.junit.Test import java.util.concurrent.TimeUnit import se.scalablesolutions.akka.actor.{Actor, ActorRef} import org.multiverse.api.latches.StandardLatch @@ -14,11 +11,13 @@ import com.rabbitmq.client.ShutdownSignalException import se.scalablesolutions.akka.amqp._ import org.scalatest.matchers.MustMatchers import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ChannelParameters, ProducerParameters, ConnectionParameters} +import org.scalatest.junit.JUnitSuite +import org.junit.Test -class AMQPProducerChannelRecoveryTest extends JUnitSuite with MustMatchers with Logging { +class AMQPProducerChannelRecoveryTest extends JUnitSuite with MustMatchers { @Test - def producerChannelRecovery = if (AMQPTest.enabled) { + def producerChannelRecovery = if (AMQPTest.enabled) AMQPTest.withCleanEndState { val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50)) @@ -53,11 +52,4 @@ class AMQPProducerChannelRecoveryTest extends JUnitSuite with MustMatchers with connection.stop } } - - @Test - def dummy { - // amqp tests need local rabbitmq server running, so a disabled by default. - // this dummy test makes sure that the whole test class doesn't fail because of missing tests - assert(true) - } } diff --git a/akka-amqp/src/test/scala/AMQPProducerConnectionRecoveryTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerConnectionRecoveryTest.scala similarity index 84% rename from akka-amqp/src/test/scala/AMQPProducerConnectionRecoveryTest.scala rename to akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerConnectionRecoveryTest.scala index fe8259b208..ad756ff5f0 100644 --- a/akka-amqp/src/test/scala/AMQPProducerConnectionRecoveryTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerConnectionRecoveryTest.scala @@ -1,12 +1,9 @@ +package se.scalablesolutions.akka.amqp.test + /** * Copyright (C) 2009-2010 Scalable Solutions AB */ -package se.scalablesolutions.akka.amqp.test - -import se.scalablesolutions.akka.util.Logging -import org.scalatest.junit.JUnitSuite -import org.junit.Test import java.util.concurrent.TimeUnit import se.scalablesolutions.akka.actor.{Actor, ActorRef} import org.multiverse.api.latches.StandardLatch @@ -14,11 +11,13 @@ import com.rabbitmq.client.ShutdownSignalException import se.scalablesolutions.akka.amqp._ import org.scalatest.matchers.MustMatchers import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ChannelParameters, ProducerParameters, ConnectionParameters} +import org.scalatest.junit.JUnitSuite +import org.junit.Test -class AMQPProducerConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging { +class AMQPProducerConnectionRecoveryTest extends JUnitSuite with MustMatchers { @Test - def producerConnectionRecovery = if (AMQPTest.enabled) { + def producerConnectionRecovery = if (AMQPTest.enabled) AMQPTest.withCleanEndState { val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50)) try { @@ -52,11 +51,4 @@ class AMQPProducerConnectionRecoveryTest extends JUnitSuite with MustMatchers wi connection.stop } } - - @Test - def dummy { - // amqp tests need local rabbitmq server running, so a disabled by default. - // this dummy test makes sure that the whole test class doesn't fail because of missing tests - assert(true) - } } diff --git a/akka-amqp/src/test/scala/AMQPProducerMessageTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerMessageTest.scala similarity index 81% rename from akka-amqp/src/test/scala/AMQPProducerMessageTest.scala rename to akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerMessageTest.scala index 5b19df351f..7d485b1b8f 100644 --- a/akka-amqp/src/test/scala/AMQPProducerMessageTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerMessageTest.scala @@ -1,12 +1,9 @@ +package se.scalablesolutions.akka.amqp.test + /** * Copyright (C) 2009-2010 Scalable Solutions AB */ -package se.scalablesolutions.akka.amqp.test - -import se.scalablesolutions.akka.util.Logging -import org.scalatest.junit.JUnitSuite -import org.junit.Test import java.util.concurrent.TimeUnit import se.scalablesolutions.akka.actor.ActorRef import org.multiverse.api.latches.StandardLatch @@ -16,11 +13,13 @@ import com.rabbitmq.client.AMQP.BasicProperties import java.lang.String import org.scalatest.matchers.MustMatchers import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ProducerParameters} +import org.scalatest.junit.JUnitSuite +import org.junit.Test -class AMQPProducerMessageTest extends JUnitSuite with MustMatchers with Logging { +class AMQPProducerMessageTest extends JUnitSuite with MustMatchers { @Test - def producerMessage = if (AMQPTest.enabled) { + def producerMessage = if (AMQPTest.enabled) AMQPTest.withCleanEndState { val connection: ActorRef = AMQP.newConnection() try { @@ -41,11 +40,4 @@ class AMQPProducerMessageTest extends JUnitSuite with MustMatchers with Logging connection.stop } } - - @Test - def dummy { - // amqp tests need local rabbitmq server running, so a disabled by default. - // this dummy test makes sure that the whole test class doesn't fail because of missing tests - assert(true) - } } diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProtobufProducerConsumerTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProtobufProducerConsumerTest.scala new file mode 100644 index 0000000000..5d03dae5c2 --- /dev/null +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProtobufProducerConsumerTest.scala @@ -0,0 +1,43 @@ +package se.scalablesolutions.akka.amqp.test + +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ +import org.scalatest.matchers.MustMatchers +import org.scalatest.junit.JUnitSuite +import se.scalablesolutions.akka.amqp.AMQP +import org.junit.Test +import org.multiverse.api.latches.StandardLatch +import java.util.concurrent.TimeUnit +import se.scalablesolutions.akka.amqp.rpc.RPC +import se.scalablesolutions.akka.remote.protocol.RemoteProtocol.AddressProtocol + +class AMQPProtobufProducerConsumerTest extends JUnitSuite with MustMatchers { + + @Test + def consumerMessage = if (AMQPTest.enabled) AMQPTest.withCleanEndState { + + val connection = AMQP.newConnection() + + val responseLatch = new StandardLatch + + RPC.newProtobufRpcServer(connection, "protoexchange", requestHandler) + + val request = AddressProtocol.newBuilder.setHostname("testhost").setPort(4321).build + + def responseHandler(response: AddressProtocol) = { + assert(response.getHostname == request.getHostname.reverse) + responseLatch.open + } + AMQP.newProtobufConsumer(connection, "", responseHandler, Some("proto.reply.key")) + + val producer = AMQP.newProtobufProducer[AddressProtocol](connection, "protoexchange") + producer.send(request, Some("proto.reply.key")) + + responseLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) + } + + def requestHandler(request: AddressProtocol): AddressProtocol = { + AddressProtocol.newBuilder.setHostname(request.getHostname.reverse).setPort(request.getPort).build + } +} \ No newline at end of file diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcClientServerTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcClientServerTest.scala new file mode 100644 index 0000000000..7de8044314 --- /dev/null +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcClientServerTest.scala @@ -0,0 +1,61 @@ +package se.scalablesolutions.akka.amqp.test + +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +import se.scalablesolutions.akka.amqp._ +import rpc.RPC +import rpc.RPC.{RpcClientSerializer, RpcServerSerializer} +import se.scalablesolutions.akka.actor.Actor._ +import org.scalatest.matchers.MustMatchers +import java.util.concurrent.{CountDownLatch, TimeUnit} +import se.scalablesolutions.akka.amqp.AMQP._ +import org.scalatest.junit.JUnitSuite +import org.junit.Test + +class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers { + + @Test + def consumerMessage = if (AMQPTest.enabled) AMQPTest.withCleanEndState { + + val connection = AMQP.newConnection() + + val countDown = new CountDownLatch(3) + val channelCallback = actor { + case Started => countDown.countDown + case Restarting => () + case Stopped => () + } + + val exchangeParameters = ExchangeParameters("text_topic_exchange", ExchangeType.Topic) + val channelParameters = ChannelParameters(channelCallback + = Some(channelCallback)) + + val rpcServerSerializer = new RpcServerSerializer[String, Int]( + new FromBinary[String] { + def fromBinary(bytes: Array[Byte]) = new String(bytes) + }, new ToBinary[Int] { + def toBinary(t: Int) = Array(t.toByte) + }) + + def requestHandler(request: String) = 3 + + val rpcServer = RPC.newRpcServer[String, Int](connection, exchangeParameters, "rpc.routing", rpcServerSerializer, + requestHandler, channelParameters = Some(channelParameters)) + + val rpcClientSerializer = new RpcClientSerializer[String, Int]( + new ToBinary[String] { + def toBinary(t: String) = t.getBytes + }, new FromBinary[Int] { + def fromBinary(bytes: Array[Byte]) = bytes.head.toInt + }) + + val rpcClient = RPC.newRpcClient[String, Int](connection, exchangeParameters, "rpc.routing", rpcClientSerializer, + channelParameters = Some(channelParameters)) + + countDown.await(2, TimeUnit.SECONDS) must be(true) + val response = rpcClient !! "some_payload" + response must be(Some(3)) + } +} diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcProtobufTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcProtobufTest.scala new file mode 100644 index 0000000000..6b796374a6 --- /dev/null +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcProtobufTest.scala @@ -0,0 +1,49 @@ +package se.scalablesolutions.akka.amqp.test + +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ +import org.scalatest.matchers.MustMatchers +import org.scalatest.junit.JUnitSuite +import se.scalablesolutions.akka.amqp.AMQP +import se.scalablesolutions.akka.remote.protocol.RemoteProtocol.AddressProtocol +import org.junit.Test +import se.scalablesolutions.akka.amqp.rpc.RPC +import org.multiverse.api.latches.StandardLatch +import java.util.concurrent.TimeUnit + +class AMQPRpcProtobufTest extends JUnitSuite with MustMatchers { + + @Test + def consumerMessage = if (AMQPTest.enabled) AMQPTest.withCleanEndState { + + val connection = AMQP.newConnection() + + RPC.newProtobufRpcServer(connection, "protoservice", requestHandler) + + val protobufClient = RPC.newProtobufRpcClient[AddressProtocol, AddressProtocol](connection, "protoservice") + + val request = AddressProtocol.newBuilder.setHostname("testhost").setPort(4321).build + + protobufClient.call(request) match { + case Some(response) => assert(response.getHostname == request.getHostname.reverse) + case None => fail("no response") + } + + val aSyncLatch = new StandardLatch + protobufClient.callAsync(request) { + case Some(response) => { + assert(response.getHostname == request.getHostname.reverse) + aSyncLatch.open + } + case None => fail("no response") + } + + aSyncLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) + + } + + def requestHandler(request: AddressProtocol): AddressProtocol = { + AddressProtocol.newBuilder.setHostname(request.getHostname.reverse).setPort(request.getPort).build + } +} \ No newline at end of file diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcStringTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcStringTest.scala new file mode 100644 index 0000000000..0a55fda954 --- /dev/null +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcStringTest.scala @@ -0,0 +1,47 @@ +package se.scalablesolutions.akka.amqp.test + +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ +import org.scalatest.matchers.MustMatchers +import org.scalatest.junit.JUnitSuite +import se.scalablesolutions.akka.amqp.AMQP +import org.junit.Test +import se.scalablesolutions.akka.amqp.rpc.RPC +import org.multiverse.api.latches.StandardLatch +import java.util.concurrent.TimeUnit + +class AMQPRpcStringTest extends JUnitSuite with MustMatchers { + + @Test + def consumerMessage = if (AMQPTest.enabled) AMQPTest.withCleanEndState { + + val connection = AMQP.newConnection() + + RPC.newStringRpcServer(connection, "stringservice", requestHandler) + + val protobufClient = RPC.newStringRpcClient(connection, "stringservice") + + val request = "teststring" + + protobufClient.call(request) match { + case Some(response) => assert(response == request.reverse) + case None => fail("no response") + } + + val aSyncLatch = new StandardLatch + protobufClient.callAsync(request) { + case Some(response) => { + assert(response == request.reverse) + aSyncLatch.open + } + case None => fail("no response") + } + + aSyncLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) + } + + def requestHandler(request: String): String= { + request.reverse + } +} \ No newline at end of file diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPStringProducerConsumerTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPStringProducerConsumerTest.scala new file mode 100644 index 0000000000..bbb77c51a7 --- /dev/null +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPStringProducerConsumerTest.scala @@ -0,0 +1,44 @@ +package se.scalablesolutions.akka.amqp.test + +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ +import org.scalatest.matchers.MustMatchers +import org.scalatest.junit.JUnitSuite +import se.scalablesolutions.akka.amqp.AMQP +import org.junit.Test +import org.multiverse.api.latches.StandardLatch +import java.util.concurrent.TimeUnit +import se.scalablesolutions.akka.amqp.rpc.RPC + +class AMQPStringProducerConsumerTest extends JUnitSuite with MustMatchers { + + @Test + def consumerMessage = if (AMQPTest.enabled) AMQPTest.withCleanEndState { + + val connection = AMQP.newConnection() + + val responseLatch = new StandardLatch + + RPC.newStringRpcServer(connection, "stringexchange", requestHandler) + + val request = "somemessage" + + def responseHandler(response: String) = { + + assert(response == request.reverse) + responseLatch.open + } + AMQP.newStringConsumer(connection, "", responseHandler, Some("string.reply.key")) + + val producer = AMQP.newStringProducer(connection, "stringexchange") + producer.send(request, Some("string.reply.key")) + + responseLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) + } + + def requestHandler(request: String): String= { + println("###### Reverse") + request.reverse + } +} \ No newline at end of file diff --git a/akka-amqp/src/test/scala/AMQPTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPTest.scala similarity index 51% rename from akka-amqp/src/test/scala/AMQPTest.scala rename to akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPTest.scala index 5ff9157bc5..2930ce4e68 100644 --- a/akka-amqp/src/test/scala/AMQPTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPTest.scala @@ -4,6 +4,16 @@ package se.scalablesolutions.akka.amqp.test +import se.scalablesolutions.akka.amqp.AMQP object AMQPTest { + def enabled = false + + def withCleanEndState(action: => Unit) { + try { + action + } finally { + AMQP.shutdownAll + } + } } diff --git a/config/logback.xml b/config/logback.xml index 40faeefb3c..5ab49da1c3 100755 --- a/config/logback.xml +++ b/config/logback.xml @@ -27,7 +27,7 @@ ./logs/akka.log.%d{yyyy-MM-dd-HH} - + diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index e0874b9e9c..e546ff0f80 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -374,6 +374,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { class AkkaAMQPProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) with CodeFellowPlugin { val commons_io = Dependencies.commons_io val rabbit = Dependencies.rabbit + val protobuf = Dependencies.protobuf // testing val junit = Dependencies.junit