From be4be45a8d3728bb51ee0bbb7201b4417e286556 Mon Sep 17 00:00:00 2001 From: momania Date: Fri, 6 Aug 2010 17:31:37 +0200 Subject: [PATCH 01/13] - moved all into package folder structure - added simple protobuf based rpc convenience --- .../scalablesolutions/akka/amqp}/AMQP.scala | 53 ++------- .../akka/amqp}/AMQPMessage.scala | 0 .../akka/amqp}/ConsumerActor.scala | 0 .../akka/amqp}/ExampleSession.scala | 6 +- .../akka/amqp}/ExchangeType.scala | 0 .../amqp}/FaultTolerantChannelActor.scala | 0 .../amqp}/FaultTolerantConnectionActor.scala | 0 .../akka/amqp}/ProducerActor.scala | 0 .../scalablesolutions/akka/amqp/rpc/RPC.scala | 110 ++++++++++++++++++ .../akka/amqp/rpc}/RpcClientActor.scala | 6 +- .../akka/amqp/rpc}/RpcServerActor.scala | 2 +- .../test}/AMQPConnectionRecoveryTest.scala | 17 +-- .../AMQPConsumerChannelRecoveryTest.scala | 18 +-- .../AMQPConsumerConnectionRecoveryTest.scala | 20 +--- .../AMQPConsumerManualAcknowledgeTest.scala | 18 +-- .../amqp/test}/AMQPConsumerMessageTest.scala | 18 +-- .../AMQPProducerChannelRecoveryTest.scala | 18 +-- .../AMQPProducerConnectionRecoveryTest.scala | 18 +-- .../amqp/test}/AMQPProducerMessageTest.scala | 18 +-- .../amqp/test}/AMQPRpcClientServerTest.scala | 25 ++-- .../akka/amqp/test/AMQPRpcProtobufTest.scala | 35 ++++++ .../akka/amqp/test}/AMQPTest.scala | 2 +- project/build/AkkaProject.scala | 50 ++++---- 23 files changed, 238 insertions(+), 196 deletions(-) rename akka-amqp/src/main/scala/{ => se/scalablesolutions/akka/amqp}/AMQP.scala (69%) rename akka-amqp/src/main/scala/{ => se/scalablesolutions/akka/amqp}/AMQPMessage.scala (100%) rename akka-amqp/src/main/scala/{ => se/scalablesolutions/akka/amqp}/ConsumerActor.scala (100%) rename akka-amqp/src/main/scala/{ => se/scalablesolutions/akka/amqp}/ExampleSession.scala (94%) rename akka-amqp/src/main/scala/{ => se/scalablesolutions/akka/amqp}/ExchangeType.scala (100%) rename akka-amqp/src/main/scala/{ => se/scalablesolutions/akka/amqp}/FaultTolerantChannelActor.scala (100%) rename akka-amqp/src/main/scala/{ => se/scalablesolutions/akka/amqp}/FaultTolerantConnectionActor.scala (100%) rename akka-amqp/src/main/scala/{ => se/scalablesolutions/akka/amqp}/ProducerActor.scala (100%) create mode 100644 akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala rename akka-amqp/src/main/scala/{ => se/scalablesolutions/akka/amqp/rpc}/RpcClientActor.scala (89%) rename akka-amqp/src/main/scala/{ => se/scalablesolutions/akka/amqp/rpc}/RpcServerActor.scala (94%) rename akka-amqp/src/test/scala/{ => se/scalablesolutions/akka/amqp/test}/AMQPConnectionRecoveryTest.scala (85%) rename akka-amqp/src/test/scala/{ => se/scalablesolutions/akka/amqp/test}/AMQPConsumerChannelRecoveryTest.scala (88%) rename akka-amqp/src/test/scala/{ => se/scalablesolutions/akka/amqp/test}/AMQPConsumerConnectionRecoveryTest.scala (88%) rename akka-amqp/src/test/scala/{ => se/scalablesolutions/akka/amqp/test}/AMQPConsumerManualAcknowledgeTest.scala (88%) rename akka-amqp/src/test/scala/{ => se/scalablesolutions/akka/amqp/test}/AMQPConsumerMessageTest.scala (86%) rename akka-amqp/src/test/scala/{ => se/scalablesolutions/akka/amqp/test}/AMQPProducerChannelRecoveryTest.scala (86%) rename akka-amqp/src/test/scala/{ => se/scalablesolutions/akka/amqp/test}/AMQPProducerConnectionRecoveryTest.scala (86%) rename akka-amqp/src/test/scala/{ => se/scalablesolutions/akka/amqp/test}/AMQPProducerMessageTest.scala (84%) rename akka-amqp/src/test/scala/{ => se/scalablesolutions/akka/amqp/test}/AMQPRpcClientServerTest.scala (77%) create mode 100644 akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcProtobufTest.scala rename akka-amqp/src/test/scala/{ => se/scalablesolutions/akka/amqp/test}/AMQPTest.scala (87%) diff --git a/akka-amqp/src/main/scala/AMQP.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala similarity index 69% rename from akka-amqp/src/main/scala/AMQP.scala rename to akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala index b1e08ae752..0823f306cd 100644 --- a/akka-amqp/src/main/scala/AMQP.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala @@ -1,15 +1,16 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - package se.scalablesolutions.akka.amqp -import se.scalablesolutions.akka.actor.{Actor, ActorRef} -import se.scalablesolutions.akka.actor.Actor._ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + import se.scalablesolutions.akka.config.OneForOneStrategy import com.rabbitmq.client.{ReturnListener, ShutdownListener, ConnectionFactory} import java.lang.IllegalArgumentException import se.scalablesolutions.akka.util.Logging +import se.scalablesolutions.akka.actor.{Actor, ActorRef} +import Actor._ + /** * AMQP Actor API. Implements Connection, Producer and Consumer materialized as Actors. * @@ -80,33 +81,6 @@ object AMQP { consumer } - def newRpcClient[O,I](connection: ActorRef, - exchangeParameters: ExchangeParameters, - routingKey: String, - serializer: RpcClientSerializer[O,I], - channelParameters: Option[ChannelParameters] = None): ActorRef = { - val rpcActor: ActorRef = actorOf(new RpcClientActor[O,I](exchangeParameters, routingKey, serializer, channelParameters)) - connection.startLink(rpcActor) - rpcActor ! Start - rpcActor - } - - def newRpcServer[I,O](connection: ActorRef, - exchangeParameters: ExchangeParameters, - routingKey: String, - serializer: RpcServerSerializer[I,O], - requestHandler: I => O, - queueName: Option[String] = None, - channelParameters: Option[ChannelParameters] = None) = { - val producer = newProducer(connection, new ProducerParameters(new ExchangeParameters("", ExchangeType.Direct), channelParameters = channelParameters)) - val rpcServer = actorOf(new RpcServerActor[I,O](producer, serializer, requestHandler)) - val consumer = newConsumer(connection, new ConsumerParameters(exchangeParameters, routingKey, rpcServer - , channelParameters = channelParameters - , selfAcknowledging = false - , queueName = queueName)) - - } - private val supervisor = new AMQPSupervisor class AMQPSupervisor extends Logging { @@ -129,17 +103,4 @@ object AMQP { connectionActor } } - - trait FromBinary[T] { - def fromBinary(bytes: Array[Byte]): T - } - - trait ToBinary[T] { - def toBinary(t: T): Array[Byte] - } - - - case class RpcClientSerializer[O,I](toBinary: ToBinary[O], fromBinary: FromBinary[I]) - - case class RpcServerSerializer[I,O](fromBinary: FromBinary[I], toBinary: ToBinary[O]) } diff --git a/akka-amqp/src/main/scala/AMQPMessage.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQPMessage.scala similarity index 100% rename from akka-amqp/src/main/scala/AMQPMessage.scala rename to akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQPMessage.scala diff --git a/akka-amqp/src/main/scala/ConsumerActor.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ConsumerActor.scala similarity index 100% rename from akka-amqp/src/main/scala/ConsumerActor.scala rename to akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ConsumerActor.scala diff --git a/akka-amqp/src/main/scala/ExampleSession.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala similarity index 94% rename from akka-amqp/src/main/scala/ExampleSession.scala rename to akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala index 4fa1358a29..f2ae5e8295 100644 --- a/akka-amqp/src/main/scala/ExampleSession.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala @@ -4,6 +4,8 @@ package se.scalablesolutions.akka.amqp +import rpc.RPC +import rpc.RPC.{RpcClientSerializer, RpcServerSerializer, ToBinary, FromBinary} import se.scalablesolutions.akka.actor.{Actor, ActorRegistry} import Actor._ import java.util.concurrent.{CountDownLatch, TimeUnit} @@ -146,7 +148,7 @@ object ExampleSession { def requestHandler(request: String) = 3 - val rpcServer = AMQP.newRpcServer[String,Int](connection, exchangeParameters, "rpc.in.key", rpcServerSerializer, + val rpcServer = RPC.newRpcServer[String,Int](connection, exchangeParameters, "rpc.in.key", rpcServerSerializer, requestHandler, queueName = Some("rpc.in.key.queue")) @@ -159,7 +161,7 @@ object ExampleSession { } val rpcClientSerializer = new RpcClientSerializer[String, Int](clientToBinary, clientFromBinary) - val rpcClient = AMQP.newRpcClient[String,Int](connection, exchangeParameters, "rpc.in.key", rpcClientSerializer) + val rpcClient = RPC.newRpcClient[String,Int](connection, exchangeParameters, "rpc.in.key", rpcClientSerializer) val response = (rpcClient !! "rpc_request") log.info("Response: " + response) diff --git a/akka-amqp/src/main/scala/ExchangeType.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExchangeType.scala similarity index 100% rename from akka-amqp/src/main/scala/ExchangeType.scala rename to akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExchangeType.scala diff --git a/akka-amqp/src/main/scala/FaultTolerantChannelActor.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantChannelActor.scala similarity index 100% rename from akka-amqp/src/main/scala/FaultTolerantChannelActor.scala rename to akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantChannelActor.scala diff --git a/akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantConnectionActor.scala similarity index 100% rename from akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala rename to akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantConnectionActor.scala diff --git a/akka-amqp/src/main/scala/ProducerActor.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ProducerActor.scala similarity index 100% rename from akka-amqp/src/main/scala/ProducerActor.scala rename to akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ProducerActor.scala diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala new file mode 100644 index 0000000000..a711a01cc5 --- /dev/null +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala @@ -0,0 +1,110 @@ +package se.scalablesolutions.akka.amqp.rpc + +import se.scalablesolutions.akka.amqp.AMQP._ +import com.google.protobuf.Message +import se.scalablesolutions.akka.actor.{Actor, ActorRef} +import Actor._ +import se.scalablesolutions.akka.amqp._ + +object RPC { + + def newRpcClient[O, I](connection: ActorRef, + exchangeParameters: ExchangeParameters, + routingKey: String, + serializer: RpcClientSerializer[O, I], + channelParameters: Option[ChannelParameters] = None): ActorRef = { + val rpcActor: ActorRef = actorOf(new RpcClientActor[O, I](exchangeParameters, routingKey, serializer, channelParameters)) + connection.startLink(rpcActor) + rpcActor ! Start + rpcActor + } + + def newRpcServer[I, O](connection: ActorRef, + exchangeParameters: ExchangeParameters, + routingKey: String, + serializer: RpcServerSerializer[I, O], + requestHandler: I => O, + queueName: Option[String] = None, + channelParameters: Option[ChannelParameters] = None) = { + val producer = newProducer(connection, new ProducerParameters(new ExchangeParameters("", ExchangeType.Direct), channelParameters = channelParameters)) + val rpcServer = actorOf(new RpcServerActor[I, O](producer, serializer, requestHandler)) + val consumer = newConsumer(connection, new ConsumerParameters(exchangeParameters, routingKey, rpcServer + , channelParameters = channelParameters + , selfAcknowledging = false + , queueName = queueName)) + + } + + trait FromBinary[T] { + def fromBinary(bytes: Array[Byte]): T + } + + trait ToBinary[T] { + def toBinary(t: T): Array[Byte] + } + + + case class RpcClientSerializer[O, I](toBinary: ToBinary[O], fromBinary: FromBinary[I]) + + case class RpcServerSerializer[I, O](fromBinary: FromBinary[I], toBinary: ToBinary[O]) + + + /** + * RPC convenience + */ + trait RpcClient[O, I] { + def callService(request: O, timeout: Long = 5000): Option[I] + } + + private class RpcServiceClient[O, I](client: ActorRef) extends RpcClient[O, I] { + def callService(request: O, timeout: Long = 5000): Option[I] = { + (client.!!(request, timeout)).as[I] + } + } + + private val ARRAY_OF_BYTE_ARRAY = Array[Class[_]](classOf[Array[Byte]]) + + def startProtobufServer[I <: Message, O <: Message]( + connection: ActorRef, serviceName: String, requestHandler: I => O)(implicit manifest: Manifest[I]) = { + + val serializer = new RpcServerSerializer[I, O]( + new FromBinary[I] { + def fromBinary(bytes: Array[Byte]): I = { + createProtobufFromBytes[I](bytes) + } + }, new ToBinary[O] { + def toBinary(t: O) = t.toByteArray + }) + + val exchangeParameters = new ExchangeParameters(serviceName, ExchangeType.Topic) + val routingKey = "%s.request".format(serviceName) + val queueName = "%s.in".format(routingKey) + + newRpcServer[I, O](connection, exchangeParameters, routingKey, serializer, requestHandler, + queueName = Some(queueName)) + } + + def startProtobufClient[O <: Message, I <: Message]( + connection: ActorRef, serviceName: String)(implicit manifest: Manifest[I]): RpcClient[O, I] = { + + val serializer = new RpcClientSerializer[O, I]( + new ToBinary[O] { + def toBinary(t: O) = t.toByteArray + }, new FromBinary[I] { + def fromBinary(bytes: Array[Byte]): I = { + createProtobufFromBytes[I](bytes) + } + }) + + val exchangeParameters = new ExchangeParameters(serviceName, ExchangeType.Topic) + val routingKey = "%s.request".format(serviceName) + val queueName = "%s.in".format(routingKey) + + val client = newRpcClient[O, I](connection, exchangeParameters, routingKey, serializer) + new RpcServiceClient[O, I](client) + } + + private def createProtobufFromBytes[I](bytes: Array[Byte])(implicit manifest: Manifest[I]): I = { + manifest.erasure.getDeclaredMethod("parseFrom", ARRAY_OF_BYTE_ARRAY: _*).invoke(null, bytes).asInstanceOf[I] + } +} \ No newline at end of file diff --git a/akka-amqp/src/main/scala/RpcClientActor.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RpcClientActor.scala similarity index 89% rename from akka-amqp/src/main/scala/RpcClientActor.scala rename to akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RpcClientActor.scala index 2935982a67..40665194fc 100644 --- a/akka-amqp/src/main/scala/RpcClientActor.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RpcClientActor.scala @@ -4,11 +4,9 @@ package se.scalablesolutions.akka.amqp -import se.scalablesolutions.akka.serialization.Serializer -import se.scalablesolutions.akka.amqp.AMQP.{ChannelParameters, ExchangeParameters} - import com.rabbitmq.client.{Channel, RpcClient} -import se.scalablesolutions.akka.amqp.AMQP.{RpcClientSerializer, ChannelParameters, ExchangeParameters} +import rpc.RPC.RpcClientSerializer +import se.scalablesolutions.akka.amqp.AMQP.{ChannelParameters, ExchangeParameters} class RpcClientActor[I,O](exchangeParameters: ExchangeParameters, routingKey: String, diff --git a/akka-amqp/src/main/scala/RpcServerActor.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RpcServerActor.scala similarity index 94% rename from akka-amqp/src/main/scala/RpcServerActor.scala rename to akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RpcServerActor.scala index 99d74d9b56..7c1cc01d01 100644 --- a/akka-amqp/src/main/scala/RpcServerActor.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RpcServerActor.scala @@ -4,9 +4,9 @@ package se.scalablesolutions.akka.amqp +import rpc.RPC.RpcServerSerializer import se.scalablesolutions.akka.actor.{ActorRef, Actor} import com.rabbitmq.client.AMQP.BasicProperties -import se.scalablesolutions.akka.amqp.AMQP.RpcServerSerializer class RpcServerActor[I,O](producer: ActorRef, serializer: RpcServerSerializer[I,O], requestHandler: I => O) extends Actor { diff --git a/akka-amqp/src/test/scala/AMQPConnectionRecoveryTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConnectionRecoveryTest.scala similarity index 85% rename from akka-amqp/src/test/scala/AMQPConnectionRecoveryTest.scala rename to akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConnectionRecoveryTest.scala index c1af35546a..ef2ee5b80f 100644 --- a/akka-amqp/src/test/scala/AMQPConnectionRecoveryTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConnectionRecoveryTest.scala @@ -1,12 +1,9 @@ +package se.scalablesolutions.akka.amqp.test + /** * Copyright (C) 2009-2010 Scalable Solutions AB */ -package se.scalablesolutions.akka.amqp.test - -import se.scalablesolutions.akka.util.Logging -import org.scalatest.junit.JUnitSuite -import org.junit.Test import java.util.concurrent.TimeUnit import se.scalablesolutions.akka.actor.{Actor, ActorRef} import org.multiverse.api.latches.StandardLatch @@ -14,8 +11,10 @@ import com.rabbitmq.client.ShutdownSignalException import se.scalablesolutions.akka.amqp._ import se.scalablesolutions.akka.amqp.AMQP.ConnectionParameters import org.scalatest.matchers.MustMatchers +import org.scalatest.junit.JUnitSuite +import org.junit.Test -class AMQPConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging { +class AMQPConnectionRecoveryTest extends JUnitSuite with MustMatchers { @Test def connectionAndRecovery = if (AMQPTest.enabled) { @@ -50,10 +49,4 @@ class AMQPConnectionRecoveryTest extends JUnitSuite with MustMatchers with Loggi } } - @Test - def dummy { - // amqp tests need local rabbitmq server running, so a disabled by default. - // this dummy test makes sure that the whole test class doesn't fail because of missing tests - assert(true) - } } diff --git a/akka-amqp/src/test/scala/AMQPConsumerChannelRecoveryTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerChannelRecoveryTest.scala similarity index 88% rename from akka-amqp/src/test/scala/AMQPConsumerChannelRecoveryTest.scala rename to akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerChannelRecoveryTest.scala index a0b44f4739..3c21fa36af 100644 --- a/akka-amqp/src/test/scala/AMQPConsumerChannelRecoveryTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerChannelRecoveryTest.scala @@ -1,12 +1,9 @@ +package se.scalablesolutions.akka.amqp.test + /** * Copyright (C) 2009-2010 Scalable Solutions AB */ -package se.scalablesolutions.akka.amqp.test - -import se.scalablesolutions.akka.util.Logging -import org.scalatest.junit.JUnitSuite -import se.scalablesolutions.akka.actor.Actor._ import org.multiverse.api.latches.StandardLatch import com.rabbitmq.client.ShutdownSignalException import se.scalablesolutions.akka.amqp._ @@ -15,8 +12,10 @@ import java.util.concurrent.TimeUnit import se.scalablesolutions.akka.actor.ActorRef import org.junit.Test import se.scalablesolutions.akka.amqp.AMQP._ +import org.scalatest.junit.JUnitSuite +import se.scalablesolutions.akka.actor.Actor._ -class AMQPConsumerChannelRecoveryTest extends JUnitSuite with MustMatchers with Logging { +class AMQPConsumerChannelRecoveryTest extends JUnitSuite with MustMatchers { @Test def consumerChannelRecovery = if (AMQPTest.enabled) { @@ -60,11 +59,4 @@ class AMQPConsumerChannelRecoveryTest extends JUnitSuite with MustMatchers with connection.stop } } - - @Test - def dummy { - // amqp tests need local rabbitmq server running, so a disabled by default. - // this dummy test makes sure that the whole test class doesn't fail because of missing tests - assert(true) - } } diff --git a/akka-amqp/src/test/scala/AMQPConsumerConnectionRecoveryTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerConnectionRecoveryTest.scala similarity index 88% rename from akka-amqp/src/test/scala/AMQPConsumerConnectionRecoveryTest.scala rename to akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerConnectionRecoveryTest.scala index bf4885fea5..301489bd8f 100644 --- a/akka-amqp/src/test/scala/AMQPConsumerConnectionRecoveryTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerConnectionRecoveryTest.scala @@ -1,22 +1,21 @@ +package se.scalablesolutions.akka.amqp.test + /** * Copyright (C) 2009-2010 Scalable Solutions AB */ -package se.scalablesolutions.akka.amqp.test - -import se.scalablesolutions.akka.util.Logging -import org.scalatest.junit.JUnitSuite -import se.scalablesolutions.akka.actor.Actor._ import org.multiverse.api.latches.StandardLatch import com.rabbitmq.client.ShutdownSignalException import se.scalablesolutions.akka.amqp._ import org.scalatest.matchers.MustMatchers import java.util.concurrent.TimeUnit -import se.scalablesolutions.akka.actor.ActorRef import org.junit.Test import se.scalablesolutions.akka.amqp.AMQP._ +import org.scalatest.junit.JUnitSuite +import se.scalablesolutions.akka.actor.{Actor, ActorRef} +import Actor._ -class AMQPConsumerConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging { +class AMQPConsumerConnectionRecoveryTest extends JUnitSuite with MustMatchers { @Test def consumerConnectionRecovery = if (AMQPTest.enabled) { @@ -79,11 +78,4 @@ class AMQPConsumerConnectionRecoveryTest extends JUnitSuite with MustMatchers wi connection.stop } } - - @Test - def dummy { - // amqp tests need local rabbitmq server running, so a disabled by default. - // this dummy test makes sure that the whole test class doesn't fail because of missing tests - assert(true) - } } diff --git a/akka-amqp/src/test/scala/AMQPConsumerManualAcknowledgeTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerManualAcknowledgeTest.scala similarity index 88% rename from akka-amqp/src/test/scala/AMQPConsumerManualAcknowledgeTest.scala rename to akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerManualAcknowledgeTest.scala index 2dc4ee939b..a18db40d5f 100644 --- a/akka-amqp/src/test/scala/AMQPConsumerManualAcknowledgeTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerManualAcknowledgeTest.scala @@ -1,12 +1,9 @@ +package se.scalablesolutions.akka.amqp.test + /** * Copyright (C) 2009-2010 Scalable Solutions AB */ -package se.scalablesolutions.akka.amqp.test - -import se.scalablesolutions.akka.util.Logging -import org.scalatest.junit.JUnitSuite -import org.multiverse.api.latches.StandardLatch import se.scalablesolutions.akka.actor.Actor._ import org.scalatest.matchers.MustMatchers import se.scalablesolutions.akka.amqp._ @@ -14,8 +11,10 @@ import org.junit.Test import se.scalablesolutions.akka.actor.ActorRef import java.util.concurrent.{CountDownLatch, TimeUnit} import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ConsumerParameters, ChannelParameters, ProducerParameters} +import org.multiverse.api.latches.StandardLatch +import org.scalatest.junit.JUnitSuite -class AMQPConsumerManualAcknowledgeTest extends JUnitSuite with MustMatchers with Logging { +class AMQPConsumerManualAcknowledgeTest extends JUnitSuite with MustMatchers { @Test def consumerMessageManualAcknowledge = if (AMQPTest.enabled) { @@ -57,11 +56,4 @@ class AMQPConsumerManualAcknowledgeTest extends JUnitSuite with MustMatchers wit connection.stop } } - - @Test - def dummy { - // amqp tests need local rabbitmq server running, so a disabled by default. - // this dummy test makes sure that the whole test class doesn't fail because of missing tests - assert(true) - } } diff --git a/akka-amqp/src/test/scala/AMQPConsumerMessageTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerMessageTest.scala similarity index 86% rename from akka-amqp/src/test/scala/AMQPConsumerMessageTest.scala rename to akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerMessageTest.scala index 5d34f867d6..9eb7aa9158 100644 --- a/akka-amqp/src/test/scala/AMQPConsumerMessageTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerMessageTest.scala @@ -1,20 +1,19 @@ +package se.scalablesolutions.akka.amqp.test + /** * Copyright (C) 2009-2010 Scalable Solutions AB */ -package se.scalablesolutions.akka.amqp.test - -import se.scalablesolutions.akka.util.Logging -import org.scalatest.junit.JUnitSuite -import org.junit.Test import se.scalablesolutions.akka.amqp._ import org.multiverse.api.latches.StandardLatch import se.scalablesolutions.akka.actor.Actor._ import org.scalatest.matchers.MustMatchers import java.util.concurrent.{CountDownLatch, TimeUnit} import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ConsumerParameters, ChannelParameters, ProducerParameters} +import org.scalatest.junit.JUnitSuite +import org.junit.Test -class AMQPConsumerMessageTest extends JUnitSuite with MustMatchers with Logging { +class AMQPConsumerMessageTest extends JUnitSuite with MustMatchers { @Test def consumerMessage = if (AMQPTest.enabled) { @@ -46,11 +45,4 @@ class AMQPConsumerMessageTest extends JUnitSuite with MustMatchers with Logging connection.stop } } - - @Test - def dummy { - // amqp tests need local rabbitmq server running, so a disabled by default. - // this dummy test makes sure that the whole test class doesn't fail because of missing tests - assert(true) - } } diff --git a/akka-amqp/src/test/scala/AMQPProducerChannelRecoveryTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerChannelRecoveryTest.scala similarity index 86% rename from akka-amqp/src/test/scala/AMQPProducerChannelRecoveryTest.scala rename to akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerChannelRecoveryTest.scala index 26b2d78393..3aec05c122 100644 --- a/akka-amqp/src/test/scala/AMQPProducerChannelRecoveryTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerChannelRecoveryTest.scala @@ -1,12 +1,9 @@ +package se.scalablesolutions.akka.amqp.test + /** * Copyright (C) 2009-2010 Scalable Solutions AB */ -package se.scalablesolutions.akka.amqp.test - -import se.scalablesolutions.akka.util.Logging -import org.scalatest.junit.JUnitSuite -import org.junit.Test import java.util.concurrent.TimeUnit import se.scalablesolutions.akka.actor.{Actor, ActorRef} import org.multiverse.api.latches.StandardLatch @@ -14,8 +11,10 @@ import com.rabbitmq.client.ShutdownSignalException import se.scalablesolutions.akka.amqp._ import org.scalatest.matchers.MustMatchers import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ChannelParameters, ProducerParameters, ConnectionParameters} +import org.scalatest.junit.JUnitSuite +import org.junit.Test -class AMQPProducerChannelRecoveryTest extends JUnitSuite with MustMatchers with Logging { +class AMQPProducerChannelRecoveryTest extends JUnitSuite with MustMatchers { @Test def producerChannelRecovery = if (AMQPTest.enabled) { @@ -53,11 +52,4 @@ class AMQPProducerChannelRecoveryTest extends JUnitSuite with MustMatchers with connection.stop } } - - @Test - def dummy { - // amqp tests need local rabbitmq server running, so a disabled by default. - // this dummy test makes sure that the whole test class doesn't fail because of missing tests - assert(true) - } } diff --git a/akka-amqp/src/test/scala/AMQPProducerConnectionRecoveryTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerConnectionRecoveryTest.scala similarity index 86% rename from akka-amqp/src/test/scala/AMQPProducerConnectionRecoveryTest.scala rename to akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerConnectionRecoveryTest.scala index fe8259b208..b1a4cc1027 100644 --- a/akka-amqp/src/test/scala/AMQPProducerConnectionRecoveryTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerConnectionRecoveryTest.scala @@ -1,12 +1,9 @@ +package se.scalablesolutions.akka.amqp.test + /** * Copyright (C) 2009-2010 Scalable Solutions AB */ -package se.scalablesolutions.akka.amqp.test - -import se.scalablesolutions.akka.util.Logging -import org.scalatest.junit.JUnitSuite -import org.junit.Test import java.util.concurrent.TimeUnit import se.scalablesolutions.akka.actor.{Actor, ActorRef} import org.multiverse.api.latches.StandardLatch @@ -14,8 +11,10 @@ import com.rabbitmq.client.ShutdownSignalException import se.scalablesolutions.akka.amqp._ import org.scalatest.matchers.MustMatchers import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ChannelParameters, ProducerParameters, ConnectionParameters} +import org.scalatest.junit.JUnitSuite +import org.junit.Test -class AMQPProducerConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging { +class AMQPProducerConnectionRecoveryTest extends JUnitSuite with MustMatchers { @Test def producerConnectionRecovery = if (AMQPTest.enabled) { @@ -52,11 +51,4 @@ class AMQPProducerConnectionRecoveryTest extends JUnitSuite with MustMatchers wi connection.stop } } - - @Test - def dummy { - // amqp tests need local rabbitmq server running, so a disabled by default. - // this dummy test makes sure that the whole test class doesn't fail because of missing tests - assert(true) - } } diff --git a/akka-amqp/src/test/scala/AMQPProducerMessageTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerMessageTest.scala similarity index 84% rename from akka-amqp/src/test/scala/AMQPProducerMessageTest.scala rename to akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerMessageTest.scala index 5b19df351f..587f03ec92 100644 --- a/akka-amqp/src/test/scala/AMQPProducerMessageTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerMessageTest.scala @@ -1,12 +1,9 @@ +package se.scalablesolutions.akka.amqp.test + /** * Copyright (C) 2009-2010 Scalable Solutions AB */ -package se.scalablesolutions.akka.amqp.test - -import se.scalablesolutions.akka.util.Logging -import org.scalatest.junit.JUnitSuite -import org.junit.Test import java.util.concurrent.TimeUnit import se.scalablesolutions.akka.actor.ActorRef import org.multiverse.api.latches.StandardLatch @@ -16,8 +13,10 @@ import com.rabbitmq.client.AMQP.BasicProperties import java.lang.String import org.scalatest.matchers.MustMatchers import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ProducerParameters} +import org.scalatest.junit.JUnitSuite +import org.junit.Test -class AMQPProducerMessageTest extends JUnitSuite with MustMatchers with Logging { +class AMQPProducerMessageTest extends JUnitSuite with MustMatchers { @Test def producerMessage = if (AMQPTest.enabled) { @@ -41,11 +40,4 @@ class AMQPProducerMessageTest extends JUnitSuite with MustMatchers with Logging connection.stop } } - - @Test - def dummy { - // amqp tests need local rabbitmq server running, so a disabled by default. - // this dummy test makes sure that the whole test class doesn't fail because of missing tests - assert(true) - } } diff --git a/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcClientServerTest.scala similarity index 77% rename from akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala rename to akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcClientServerTest.scala index c585675098..bf0ee6610d 100644 --- a/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcClientServerTest.scala @@ -1,19 +1,21 @@ +package se.scalablesolutions.akka.amqp.test + /** * Copyright (C) 2009-2010 Scalable Solutions AB */ -package se.scalablesolutions.akka.amqp.test - -import se.scalablesolutions.akka.util.Logging -import org.scalatest.junit.JUnitSuite -import org.junit.Test import se.scalablesolutions.akka.amqp._ +import rpc.RPC +import rpc.RPC.{RpcClientSerializer, RpcServerSerializer, FromBinary, ToBinary} import se.scalablesolutions.akka.actor.Actor._ import org.scalatest.matchers.MustMatchers import java.util.concurrent.{CountDownLatch, TimeUnit} import se.scalablesolutions.akka.amqp.AMQP._ +import org.scalatest.junit.JUnitSuite +import org.junit.Test -class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging { +class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers { + @Test def consumerMessage = if (AMQPTest.enabled) { val connection = AMQP.newConnection() @@ -39,7 +41,7 @@ class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging def requestHandler(request: String) = 3 - val rpcServer = AMQP.newRpcServer[String, Int](connection, exchangeParameters, "rpc.routing", rpcServerSerializer, + val rpcServer = RPC.newRpcServer[String, Int](connection, exchangeParameters, "rpc.routing", rpcServerSerializer, requestHandler, channelParameters = Some(channelParameters)) val rpcClientSerializer = new RpcClientSerializer[String, Int]( @@ -49,7 +51,7 @@ class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging def fromBinary(bytes: Array[Byte]) = bytes.head.toInt }) - val rpcClient = AMQP.newRpcClient[String, Int](connection, exchangeParameters, "rpc.routing", rpcClientSerializer, + val rpcClient = RPC.newRpcClient[String, Int](connection, exchangeParameters, "rpc.routing", rpcClientSerializer, channelParameters = Some(channelParameters)) countDown.await(2, TimeUnit.SECONDS) must be(true) @@ -59,11 +61,4 @@ class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging connection.stop } } - - @Test - def dummy { - // amqp tests need local rabbitmq server running, so a disabled by default. - // this dummy test makes sure that the whole test class doesn't fail because of missing tests - assert(true) - } } diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcProtobufTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcProtobufTest.scala new file mode 100644 index 0000000000..b0ca3f8ffe --- /dev/null +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcProtobufTest.scala @@ -0,0 +1,35 @@ +package se.scalablesolutions.akka.amqp.test + +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ +import org.scalatest.matchers.MustMatchers +import org.scalatest.junit.JUnitSuite +import se.scalablesolutions.akka.amqp.AMQP +import se.scalablesolutions.akka.remote.protocol.RemoteProtocol.AddressProtocol +import org.junit.Test +import se.scalablesolutions.akka.amqp.rpc.RPC + +class AMQPRpcProtobufTest extends JUnitSuite with MustMatchers { + + @Test + def consumerMessage = if (AMQPTest.enabled) { + + val connection = AMQP.newConnection() + + RPC.startProtobufServer(connection, "protoservice", requestHandler) + + val protobufClient = RPC.startProtobufClient[AddressProtocol, AddressProtocol](connection, "protoservice") + + val request = AddressProtocol.newBuilder.setHostname("testhost").setPort(4321).build + + protobufClient.callService(request) match { + case Some(response) => assert(response.getHostname == request.getHostname.reverse) + case None => fail("no response") + } + } + + def requestHandler(request: AddressProtocol): AddressProtocol = { + AddressProtocol.newBuilder.setHostname(request.getHostname.reverse).setPort(request.getPort).build + } +} \ No newline at end of file diff --git a/akka-amqp/src/test/scala/AMQPTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPTest.scala similarity index 87% rename from akka-amqp/src/test/scala/AMQPTest.scala rename to akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPTest.scala index 5ff9157bc5..8cad27ba60 100644 --- a/akka-amqp/src/test/scala/AMQPTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPTest.scala @@ -5,5 +5,5 @@ package se.scalablesolutions.akka.amqp.test object AMQPTest { - def enabled = false + def enabled = true } diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 02f72d7f0e..7e3477027f 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -41,15 +41,15 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { // ------------------------------------------------------------------------------------------------------------------- object Repositories { - lazy val AkkaRepo = MavenRepository("Akka Repository", "http://scalablesolutions.se/akka/repository") - lazy val CodehausSnapshotRepo = MavenRepository("Codehaus Snapshots", "http://snapshots.repository.codehaus.org") +// lazy val AkkaRepo = MavenRepository("Akka Repository", "http://scalablesolutions.se/akka/repository") +// lazy val CodehausSnapshotRepo = MavenRepository("Codehaus Snapshots", "http://snapshots.repository.codehaus.org") lazy val EmbeddedRepo = MavenRepository("Embedded Repo", (info.projectPath / "embedded-repo").asURL.toString) - lazy val FusesourceSnapshotRepo = MavenRepository("Fusesource Snapshots", "http://repo.fusesource.com/nexus/content/repositories/snapshots") - lazy val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/") - lazy val JBossRepo = MavenRepository("JBoss Repo", "https://repository.jboss.org/nexus/content/groups/public/") - lazy val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2") - lazy val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases") - lazy val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo") +// lazy val FusesourceSnapshotRepo = MavenRepository("Fusesource Snapshots", "http://repo.fusesource.com/nexus/content/repositories/snapshots") +// lazy val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/") +// lazy val JBossRepo = MavenRepository("JBoss Repo", "https://repository.jboss.org/nexus/content/groups/public/") +// lazy val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2") +// lazy val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases") +// lazy val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo") } // ------------------------------------------------------------------------------------------------------------------- @@ -60,23 +60,26 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { // ------------------------------------------------------------------------------------------------------------------- import Repositories._ - lazy val atmosphereModuleConfig = ModuleConfiguration("org.atmosphere", SonatypeSnapshotRepo) - lazy val grizzlyModuleConfig = ModuleConfiguration("com.sun.grizzly", JavaNetRepo) - lazy val guiceyFruitModuleConfig = ModuleConfiguration("org.guiceyfruit", GuiceyFruitRepo) - // lazy val hawtdispatchModuleConfig = ModuleConfiguration("org.fusesource.hawtdispatch", FusesourceSnapshotRepo) - lazy val jbossModuleConfig = ModuleConfiguration("org.jboss", JBossRepo) - lazy val jdmkModuleConfig = ModuleConfiguration("com.sun.jdmk", SunJDMKRepo) - lazy val jerseyContrModuleConfig = ModuleConfiguration("com.sun.jersey.contribs", JavaNetRepo) - lazy val jerseyModuleConfig = ModuleConfiguration("com.sun.jersey", JavaNetRepo) - lazy val jgroupsModuleConfig = ModuleConfiguration("jgroups", JBossRepo) - lazy val jmsModuleConfig = ModuleConfiguration("javax.jms", SunJDMKRepo) - lazy val jmxModuleConfig = ModuleConfiguration("com.sun.jmx", SunJDMKRepo) - lazy val liftModuleConfig = ModuleConfiguration("net.liftweb", ScalaToolsSnapshots) - lazy val multiverseModuleConfig = ModuleConfiguration("org.multiverse", CodehausSnapshotRepo) - lazy val nettyModuleConfig = ModuleConfiguration("org.jboss.netty", JBossRepo) - lazy val scalaTestModuleConfig = ModuleConfiguration("org.scalatest", ScalaToolsSnapshots) +// lazy val atmosphereModuleConfig = ModuleConfiguration("org.atmosphere", SonatypeSnapshotRepo) +// lazy val grizzlyModuleConfig = ModuleConfiguration("com.sun.grizzly", JavaNetRepo) +// lazy val guiceyFruitModuleConfig = ModuleConfiguration("org.guiceyfruit", GuiceyFruitRepo) +// // lazy val hawtdispatchModuleConfig = ModuleConfiguration("org.fusesource.hawtdispatch", FusesourceSnapshotRepo) +// lazy val jbossModuleConfig = ModuleConfiguration("org.jboss", JBossRepo) +// lazy val jdmkModuleConfig = ModuleConfiguration("com.sun.jdmk", SunJDMKRepo) +// lazy val jerseyContrModuleConfig = ModuleConfiguration("com.sun.jersey.contribs", JavaNetRepo) +// lazy val jerseyModuleConfig = ModuleConfiguration("com.sun.jersey", JavaNetRepo) +// lazy val jgroupsModuleConfig = ModuleConfiguration("jgroups", JBossRepo) +// lazy val jmsModuleConfig = ModuleConfiguration("javax.jms", SunJDMKRepo) +// lazy val jmxModuleConfig = ModuleConfiguration("com.sun.jmx", SunJDMKRepo) +// lazy val liftModuleConfig = ModuleConfiguration("net.liftweb", ScalaToolsSnapshots) +// lazy val multiverseModuleConfig = ModuleConfiguration("org.multiverse", CodehausSnapshotRepo) +// lazy val nettyModuleConfig = ModuleConfiguration("org.jboss.netty", JBossRepo) +// lazy val scalaTestModuleConfig = ModuleConfiguration("org.scalatest", ScalaToolsSnapshots) lazy val embeddedRepo = EmbeddedRepo // This is the only exception, because the embedded repo is fast! + val mavenLocal = "Local Maven Repository" at "file:/e:/maven-repository" + val efgfpNexusReleasesRepository = "Nexus Releases" at "http://nexus/nexus/content/groups/public" + val efgfpNexusSnaphotsRepository = "Nexus Snapshots" at "http://nexus/nexus/content/groups/public-snapshots" // ------------------------------------------------------------------------------------------------------------------- // Versions // ------------------------------------------------------------------------------------------------------------------- @@ -358,6 +361,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { class AkkaAMQPProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) with CodeFellowPlugin { val commons_io = Dependencies.commons_io val rabbit = Dependencies.rabbit + val protobuf = Dependencies.protobuf // testing val junit = Dependencies.junit From 533319d6679a28f9caec3b377344b4c41faac30a Mon Sep 17 00:00:00 2001 From: momania Date: Fri, 6 Aug 2010 17:32:01 +0200 Subject: [PATCH 02/13] disable ampq tests --- .../scala/se/scalablesolutions/akka/amqp/test/AMQPTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPTest.scala index 8cad27ba60..5ff9157bc5 100644 --- a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPTest.scala @@ -5,5 +5,5 @@ package se.scalablesolutions.akka.amqp.test object AMQPTest { - def enabled = true + def enabled = false } From b1fe483c53fcf77da6ac917cb85615692eb7b981 Mon Sep 17 00:00:00 2001 From: momania Date: Fri, 6 Aug 2010 17:33:55 +0200 Subject: [PATCH 03/13] remove rpcclient trait... --- .../scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala index a711a01cc5..3097e64cd9 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala @@ -52,11 +52,7 @@ object RPC { /** * RPC convenience */ - trait RpcClient[O, I] { - def callService(request: O, timeout: Long = 5000): Option[I] - } - - private class RpcServiceClient[O, I](client: ActorRef) extends RpcClient[O, I] { + class RpcClient[O, I](client: ActorRef){ def callService(request: O, timeout: Long = 5000): Option[I] = { (client.!!(request, timeout)).as[I] } @@ -101,7 +97,7 @@ object RPC { val queueName = "%s.in".format(routingKey) val client = newRpcClient[O, I](connection, exchangeParameters, routingKey, serializer) - new RpcServiceClient[O, I](client) + new RpcClient[O, I](client) } private def createProtobufFromBytes[I](bytes: Array[Byte])(implicit manifest: Manifest[I]): I = { From 75c2c147eec9001e06c9d3df6eb6f0a98376b574 Mon Sep 17 00:00:00 2001 From: momania Date: Mon, 9 Aug 2010 10:28:19 +0200 Subject: [PATCH 04/13] added optional routingkey and queuename to parameters --- .../scalablesolutions/akka/amqp/rpc/RPC.scala | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala index 3097e64cd9..26887dbff4 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala @@ -61,7 +61,9 @@ object RPC { private val ARRAY_OF_BYTE_ARRAY = Array[Class[_]](classOf[Array[Byte]]) def startProtobufServer[I <: Message, O <: Message]( - connection: ActorRef, serviceName: String, requestHandler: I => O)(implicit manifest: Manifest[I]) = { + connection: ActorRef, exchange: String, requestHandler: I => O, + routingKey: Option[String] = None, + queueName: Option[String] = None)(implicit manifest: Manifest[I]) = { val serializer = new RpcServerSerializer[I, O]( new FromBinary[I] { @@ -72,16 +74,18 @@ object RPC { def toBinary(t: O) = t.toByteArray }) - val exchangeParameters = new ExchangeParameters(serviceName, ExchangeType.Topic) - val routingKey = "%s.request".format(serviceName) - val queueName = "%s.in".format(routingKey) + val exchangeParameters = new ExchangeParameters(exchange, ExchangeType.Topic) + val rKey = routingKey.getOrElse("%s.request".format(exchange)) + val qName = queueName.getOrElse("%s.in".format(rKey)) - newRpcServer[I, O](connection, exchangeParameters, routingKey, serializer, requestHandler, - queueName = Some(queueName)) + newRpcServer[I, O](connection, exchangeParameters, rKey, serializer, requestHandler, + queueName = Some(qName)) } def startProtobufClient[O <: Message, I <: Message]( - connection: ActorRef, serviceName: String)(implicit manifest: Manifest[I]): RpcClient[O, I] = { + connection: ActorRef, exchange: String, + routingKey: Option[String] = None)(implicit manifest: Manifest[I]): RpcClient[O, I] = { + val serializer = new RpcClientSerializer[O, I]( new ToBinary[O] { @@ -92,11 +96,10 @@ object RPC { } }) - val exchangeParameters = new ExchangeParameters(serviceName, ExchangeType.Topic) - val routingKey = "%s.request".format(serviceName) - val queueName = "%s.in".format(routingKey) + val exchangeParameters = new ExchangeParameters(exchange, ExchangeType.Topic) + val rKey = routingKey.getOrElse("%s.request".format(exchange)) - val client = newRpcClient[O, I](connection, exchangeParameters, routingKey, serializer) + val client = newRpcClient[O, I](connection, exchangeParameters, rKey, serializer) new RpcClient[O, I](client) } From 6a586d1198373becf9ca6f7fa99a249f9c45ff34 Mon Sep 17 00:00:00 2001 From: momania Date: Mon, 9 Aug 2010 10:42:00 +0200 Subject: [PATCH 05/13] undo local repo settings (for the 25953467296th time :S ) --- project/build/AkkaProject.scala | 49 ++++++++++++++++----------------- 1 file changed, 23 insertions(+), 26 deletions(-) diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 7e3477027f..bf1e9e6478 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -41,15 +41,15 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { // ------------------------------------------------------------------------------------------------------------------- object Repositories { -// lazy val AkkaRepo = MavenRepository("Akka Repository", "http://scalablesolutions.se/akka/repository") -// lazy val CodehausSnapshotRepo = MavenRepository("Codehaus Snapshots", "http://snapshots.repository.codehaus.org") + lazy val AkkaRepo = MavenRepository("Akka Repository", "http://scalablesolutions.se/akka/repository") + lazy val CodehausSnapshotRepo = MavenRepository("Codehaus Snapshots", "http://snapshots.repository.codehaus.org") lazy val EmbeddedRepo = MavenRepository("Embedded Repo", (info.projectPath / "embedded-repo").asURL.toString) -// lazy val FusesourceSnapshotRepo = MavenRepository("Fusesource Snapshots", "http://repo.fusesource.com/nexus/content/repositories/snapshots") -// lazy val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/") -// lazy val JBossRepo = MavenRepository("JBoss Repo", "https://repository.jboss.org/nexus/content/groups/public/") -// lazy val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2") -// lazy val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases") -// lazy val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo") + lazy val FusesourceSnapshotRepo = MavenRepository("Fusesource Snapshots", "http://repo.fusesource.com/nexus/content/repositories/snapshots") + lazy val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/") + lazy val JBossRepo = MavenRepository("JBoss Repo", "https://repository.jboss.org/nexus/content/groups/public/") + lazy val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2") + lazy val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases") + lazy val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo") } // ------------------------------------------------------------------------------------------------------------------- @@ -60,26 +60,23 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { // ------------------------------------------------------------------------------------------------------------------- import Repositories._ -// lazy val atmosphereModuleConfig = ModuleConfiguration("org.atmosphere", SonatypeSnapshotRepo) -// lazy val grizzlyModuleConfig = ModuleConfiguration("com.sun.grizzly", JavaNetRepo) -// lazy val guiceyFruitModuleConfig = ModuleConfiguration("org.guiceyfruit", GuiceyFruitRepo) -// // lazy val hawtdispatchModuleConfig = ModuleConfiguration("org.fusesource.hawtdispatch", FusesourceSnapshotRepo) -// lazy val jbossModuleConfig = ModuleConfiguration("org.jboss", JBossRepo) -// lazy val jdmkModuleConfig = ModuleConfiguration("com.sun.jdmk", SunJDMKRepo) -// lazy val jerseyContrModuleConfig = ModuleConfiguration("com.sun.jersey.contribs", JavaNetRepo) -// lazy val jerseyModuleConfig = ModuleConfiguration("com.sun.jersey", JavaNetRepo) -// lazy val jgroupsModuleConfig = ModuleConfiguration("jgroups", JBossRepo) -// lazy val jmsModuleConfig = ModuleConfiguration("javax.jms", SunJDMKRepo) -// lazy val jmxModuleConfig = ModuleConfiguration("com.sun.jmx", SunJDMKRepo) -// lazy val liftModuleConfig = ModuleConfiguration("net.liftweb", ScalaToolsSnapshots) -// lazy val multiverseModuleConfig = ModuleConfiguration("org.multiverse", CodehausSnapshotRepo) -// lazy val nettyModuleConfig = ModuleConfiguration("org.jboss.netty", JBossRepo) -// lazy val scalaTestModuleConfig = ModuleConfiguration("org.scalatest", ScalaToolsSnapshots) + lazy val atmosphereModuleConfig = ModuleConfiguration("org.atmosphere", SonatypeSnapshotRepo) + lazy val grizzlyModuleConfig = ModuleConfiguration("com.sun.grizzly", JavaNetRepo) + lazy val guiceyFruitModuleConfig = ModuleConfiguration("org.guiceyfruit", GuiceyFruitRepo) + // lazy val hawtdispatchModuleConfig = ModuleConfiguration("org.fusesource.hawtdispatch", FusesourceSnapshotRepo) + lazy val jbossModuleConfig = ModuleConfiguration("org.jboss", JBossRepo) + lazy val jdmkModuleConfig = ModuleConfiguration("com.sun.jdmk", SunJDMKRepo) + lazy val jerseyContrModuleConfig = ModuleConfiguration("com.sun.jersey.contribs", JavaNetRepo) + lazy val jerseyModuleConfig = ModuleConfiguration("com.sun.jersey", JavaNetRepo) + lazy val jgroupsModuleConfig = ModuleConfiguration("jgroups", JBossRepo) + lazy val jmsModuleConfig = ModuleConfiguration("javax.jms", SunJDMKRepo) + lazy val jmxModuleConfig = ModuleConfiguration("com.sun.jmx", SunJDMKRepo) + lazy val liftModuleConfig = ModuleConfiguration("net.liftweb", ScalaToolsSnapshots) + lazy val multiverseModuleConfig = ModuleConfiguration("org.multiverse", CodehausSnapshotRepo) + lazy val nettyModuleConfig = ModuleConfiguration("org.jboss.netty", JBossRepo) + lazy val scalaTestModuleConfig = ModuleConfiguration("org.scalatest", ScalaToolsSnapshots) lazy val embeddedRepo = EmbeddedRepo // This is the only exception, because the embedded repo is fast! - val mavenLocal = "Local Maven Repository" at "file:/e:/maven-repository" - val efgfpNexusReleasesRepository = "Nexus Releases" at "http://nexus/nexus/content/groups/public" - val efgfpNexusSnaphotsRepository = "Nexus Snapshots" at "http://nexus/nexus/content/groups/public-snapshots" // ------------------------------------------------------------------------------------------------------------------- // Versions // ------------------------------------------------------------------------------------------------------------------- From 4770846eb7ffdf5a5c5b59649c818f9339b61ca2 Mon Sep 17 00:00:00 2001 From: momania Date: Tue, 10 Aug 2010 10:39:56 +0200 Subject: [PATCH 06/13] add durablility and auto-delete with defaults to rpc and with passive = true for client --- .../scalablesolutions/akka/amqp/rpc/RPC.scala | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala index 26887dbff4..60a1a6733c 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala @@ -13,7 +13,8 @@ object RPC { routingKey: String, serializer: RpcClientSerializer[O, I], channelParameters: Option[ChannelParameters] = None): ActorRef = { - val rpcActor: ActorRef = actorOf(new RpcClientActor[O, I](exchangeParameters, routingKey, serializer, channelParameters)) + val rpcActor: ActorRef = actorOf(new RpcClientActor[O, I]( + exchangeParameters, routingKey, serializer, channelParameters)) connection.startLink(rpcActor) rpcActor ! Start rpcActor @@ -26,9 +27,10 @@ object RPC { requestHandler: I => O, queueName: Option[String] = None, channelParameters: Option[ChannelParameters] = None) = { - val producer = newProducer(connection, new ProducerParameters(new ExchangeParameters("", ExchangeType.Direct), channelParameters = channelParameters)) + val producer = newProducer(connection, ProducerParameters( + ExchangeParameters("", ExchangeType.Direct), channelParameters = channelParameters)) val rpcServer = actorOf(new RpcServerActor[I, O](producer, serializer, requestHandler)) - val consumer = newConsumer(connection, new ConsumerParameters(exchangeParameters, routingKey, rpcServer + val consumer = newConsumer(connection, ConsumerParameters(exchangeParameters, routingKey, rpcServer , channelParameters = channelParameters , selfAcknowledging = false , queueName = queueName)) @@ -63,7 +65,8 @@ object RPC { def startProtobufServer[I <: Message, O <: Message]( connection: ActorRef, exchange: String, requestHandler: I => O, routingKey: Option[String] = None, - queueName: Option[String] = None)(implicit manifest: Manifest[I]) = { + queueName: Option[String] = None, + durable = false, autoDelete = true)(implicit manifest: Manifest[I]) = { val serializer = new RpcServerSerializer[I, O]( new FromBinary[I] { @@ -74,7 +77,8 @@ object RPC { def toBinary(t: O) = t.toByteArray }) - val exchangeParameters = new ExchangeParameters(exchange, ExchangeType.Topic) + val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic, + exchangeDurable = durable, exchangeAutoDelete = autoDelete) val rKey = routingKey.getOrElse("%s.request".format(exchange)) val qName = queueName.getOrElse("%s.in".format(rKey)) @@ -84,7 +88,8 @@ object RPC { def startProtobufClient[O <: Message, I <: Message]( connection: ActorRef, exchange: String, - routingKey: Option[String] = None)(implicit manifest: Manifest[I]): RpcClient[O, I] = { + routingKey: Option[String] = None, + durable = false, autoDelete = true, passive = true)(implicit manifest: Manifest[I]): RpcClient[O, I] = { val serializer = new RpcClientSerializer[O, I]( @@ -96,7 +101,8 @@ object RPC { } }) - val exchangeParameters = new ExchangeParameters(exchange, ExchangeType.Topic) + val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic, + exchangeDurable = durable, exchangeAutoDelete = autoDelete, exchangePassive = passive) val rKey = routingKey.getOrElse("%s.request".format(exchange)) val client = newRpcClient[O, I](connection, exchangeParameters, rKey, serializer) From 9815ef7d3a09e9b7e8e2c016c64e136a9a8491d4 Mon Sep 17 00:00:00 2001 From: momania Date: Tue, 10 Aug 2010 11:30:56 +0200 Subject: [PATCH 07/13] types seem to help the parameter declaration :S --- .../main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala index 60a1a6733c..9a40139596 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala @@ -66,7 +66,7 @@ object RPC { connection: ActorRef, exchange: String, requestHandler: I => O, routingKey: Option[String] = None, queueName: Option[String] = None, - durable = false, autoDelete = true)(implicit manifest: Manifest[I]) = { + durable: Boolean = false, autoDelete: Boolean = true)(implicit manifest: Manifest[I]) = { val serializer = new RpcServerSerializer[I, O]( new FromBinary[I] { @@ -89,7 +89,8 @@ object RPC { def startProtobufClient[O <: Message, I <: Message]( connection: ActorRef, exchange: String, routingKey: Option[String] = None, - durable = false, autoDelete = true, passive = true)(implicit manifest: Manifest[I]): RpcClient[O, I] = { + durable: Boolean = false, autoDelete: Boolean = true, + passive: Boolean = true)(implicit manifest: Manifest[I]): RpcClient[O, I] = { val serializer = new RpcClientSerializer[O, I]( From 93e88305c5541ea21e16b36227c41a7de032ff25 Mon Sep 17 00:00:00 2001 From: momania Date: Wed, 11 Aug 2010 10:09:04 +0200 Subject: [PATCH 08/13] manual rejection of delivery (for now by making it fail until new rabbitmq version has basicReject) --- .../se/scalablesolutions/akka/amqp/AMQP.scala | 13 ----- .../akka/amqp/AMQPMessage.scala | 3 ++ .../akka/amqp/ConsumerActor.scala | 14 +++++ .../test/AMQPConsumerManualRejectTest.scala | 53 +++++++++++++++++++ 4 files changed, 70 insertions(+), 13 deletions(-) create mode 100644 akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerManualRejectTest.scala diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala index df9c6ee79f..0ebfca40c3 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala @@ -105,17 +105,4 @@ object AMQP { connectionActor } } - - trait FromBinary[T] { - def fromBinary(bytes: Array[Byte]): T - } - - trait ToBinary[T] { - def toBinary(t: T): Array[Byte] - } - - - case class RpcClientSerializer[O,I](toBinary: ToBinary[O], fromBinary: FromBinary[I]) - - case class RpcServerSerializer[I,O](fromBinary: FromBinary[I], toBinary: ToBinary[O]) } diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQPMessage.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQPMessage.scala index 92cd95906a..34eb37aa14 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQPMessage.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQPMessage.scala @@ -44,6 +44,9 @@ case object Stopped extends AMQPMessage // delivery messages case class Acknowledge(deliveryTag: Long) extends AMQPMessage case class Acknowledged(deliveryTag: Long) extends AMQPMessage +case class Reject(deliveryTag: Long) extends AMQPMessage +case class Rejected(deliveryTag: Long) extends AMQPMessage +class RejectionException(deliveryTag: Long) extends RuntimeException // internal messages private[akka] case class Failure(cause: Throwable) extends InternalAMQPMessage diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ConsumerActor.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ConsumerActor.scala index 90c8d7deec..39aa346274 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ConsumerActor.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ConsumerActor.scala @@ -23,6 +23,7 @@ private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters) def specificMessageHandler = { case Acknowledge(deliveryTag) => acknowledgeDeliveryTag(deliveryTag, true) + case Reject(deliveryTag) => rejectDeliveryTag(deliveryTag, true) case message: Message => handleIllegalMessage("%s can't be used to send messages, ignoring message [%s]".format(this, message)) case unknown => @@ -82,6 +83,19 @@ private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters) } } + private def rejectDeliveryTag(deliveryTag: Long, remoteAcknowledgement: Boolean) = { + log.debug("Rejecting message with delivery tag [%s]", deliveryTag) + // FIXME: when rabbitmq 1.9 arrives, basicReject should be available on the API and implemented instead of this + log.warning("Consumer is rejecting delivery with tag [%s] - " + + "for now this means we have to self terminate and kill the channel - see you in a second.") + channel.foreach{ch => + if (remoteAcknowledgement) { + deliveryHandler ! Rejected(deliveryTag) + } + } + throw new RejectionException(deliveryTag) + } + private def handleIllegalMessage(errorMessage: String) = { log.error(errorMessage) throw new IllegalArgumentException(errorMessage) diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerManualRejectTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerManualRejectTest.scala new file mode 100644 index 0000000000..e928cb8ea9 --- /dev/null +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerManualRejectTest.scala @@ -0,0 +1,53 @@ +package se.scalablesolutions.akka.amqp.test + +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +import se.scalablesolutions.akka.actor.Actor._ +import org.scalatest.matchers.MustMatchers +import se.scalablesolutions.akka.amqp._ +import org.junit.Test +import se.scalablesolutions.akka.actor.ActorRef +import java.util.concurrent.{CountDownLatch, TimeUnit} +import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ConsumerParameters, ChannelParameters, ProducerParameters} +import org.multiverse.api.latches.StandardLatch +import org.scalatest.junit.JUnitSuite + +class AMQPConsumerManualRejectTest extends JUnitSuite with MustMatchers { + + @Test + def consumerMessageManualAcknowledge = if (AMQPTest.enabled) { + val connection = AMQP.newConnection() + try { + val countDown = new CountDownLatch(2) + val restartingLatch = new StandardLatch + val channelCallback = actor { + case Started => countDown.countDown + case Restarting => restartingLatch.open + case Stopped => () + } + val exchangeParameters = ExchangeParameters("text_exchange",ExchangeType.Direct) + val channelParameters = ChannelParameters(channelCallback = Some(channelCallback)) + + val rejectedLatch = new StandardLatch + val consumer:ActorRef = AMQP.newConsumer(connection, ConsumerParameters(exchangeParameters, "manual.reject.this", actor { + case Delivery(payload, _, deliveryTag, _, sender) => { + sender.foreach(_ ! Reject(deliveryTag)) + } + case Rejected(deliveryTag) => rejectedLatch.open + }, queueName = Some("self.reject.queue"), selfAcknowledging = false, queueAutoDelete = false, channelParameters = Some(channelParameters))) + + val producer = AMQP.newProducer(connection, + ProducerParameters(exchangeParameters, channelParameters = Some(channelParameters))) + + countDown.await(2, TimeUnit.SECONDS) must be (true) + producer ! Message("some_payload".getBytes, "manual.reject.this") + + rejectedLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) + restartingLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) + } finally { + connection.stop + } + } +} \ No newline at end of file From 6393a9419583413521474bbc0319bb032bdbac86 Mon Sep 17 00:00:00 2001 From: momania Date: Wed, 11 Aug 2010 14:49:21 +0200 Subject: [PATCH 09/13] added async call with partial function callback to rpcclient --- .../se/scalablesolutions/akka/amqp/rpc/RPC.scala | 9 ++++++++- .../akka/amqp/test/AMQPRpcProtobufTest.scala | 15 ++++++++++++++- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala index 9a40139596..cb25444185 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala @@ -55,9 +55,16 @@ object RPC { * RPC convenience */ class RpcClient[O, I](client: ActorRef){ - def callService(request: O, timeout: Long = 5000): Option[I] = { + def call(request: O, timeout: Long = 5000): Option[I] = { (client.!!(request, timeout)).as[I] } + + def callAsync(request: O, timeout: Long = 5000)(responseHandler: PartialFunction[Option[I],Unit]) = { + spawn { + val result = call(request, timeout) + responseHandler.apply(result) + } + } } private val ARRAY_OF_BYTE_ARRAY = Array[Class[_]](classOf[Array[Byte]]) diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcProtobufTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcProtobufTest.scala index b0ca3f8ffe..b936b821fe 100644 --- a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcProtobufTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcProtobufTest.scala @@ -9,6 +9,8 @@ import se.scalablesolutions.akka.amqp.AMQP import se.scalablesolutions.akka.remote.protocol.RemoteProtocol.AddressProtocol import org.junit.Test import se.scalablesolutions.akka.amqp.rpc.RPC +import org.multiverse.api.latches.StandardLatch +import java.util.concurrent.TimeUnit class AMQPRpcProtobufTest extends JUnitSuite with MustMatchers { @@ -23,10 +25,21 @@ class AMQPRpcProtobufTest extends JUnitSuite with MustMatchers { val request = AddressProtocol.newBuilder.setHostname("testhost").setPort(4321).build - protobufClient.callService(request) match { + protobufClient.call(request) match { case Some(response) => assert(response.getHostname == request.getHostname.reverse) case None => fail("no response") } + + val aSyncLatch = new StandardLatch + protobufClient.callAsync(request) { + case Some(response) => { + assert(response.getHostname == request.getHostname.reverse) + aSyncLatch.open + } + case None => fail("no response") + } + + aSyncLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) } def requestHandler(request: AddressProtocol): AddressProtocol = { From 1ecbae920ee05266e91675633b40a1a1b585e9d5 Mon Sep 17 00:00:00 2001 From: momania Date: Thu, 12 Aug 2010 10:53:39 +0200 Subject: [PATCH 10/13] added shutdownAll to be able to kill the whole actor tree, incl the amqp supervisor --- .../se/scalablesolutions/akka/amqp/AMQP.scala | 35 ++++++++----------- .../test/AMQPConnectionRecoveryTest.scala | 2 +- 2 files changed, 16 insertions(+), 21 deletions(-) diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala index 0ebfca40c3..9651054b90 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala @@ -9,7 +9,7 @@ import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.config.OneForOneStrategy import com.rabbitmq.client.{ReturnListener, ShutdownListener, ConnectionFactory} import java.lang.IllegalArgumentException -import se.scalablesolutions.akka.util.Logging + /** * AMQP Actor API. Implements Connection, Producer and Consumer materialized as Actors. * @@ -62,7 +62,8 @@ object AMQP { } def newConnection(connectionParameters: ConnectionParameters = new ConnectionParameters): ActorRef = { - val connection: ActorRef = supervisor.newConnection(connectionParameters) + val connection = actorOf(new FaultTolerantConnectionActor(connectionParameters)) + supervisor.startLink(connection) connection ! Connect connection } @@ -83,26 +84,20 @@ object AMQP { consumer } - private val supervisor = new AMQPSupervisor + class AMQPSupervisorActor extends Actor { + import self._ - class AMQPSupervisor extends Logging { - class AMQPSupervisorActor extends Actor { - import self._ + faultHandler = Some(OneForOneStrategy(5, 5000)) + trapExit = List(classOf[Throwable]) - faultHandler = Some(OneForOneStrategy(5, 5000)) - trapExit = List(classOf[Throwable]) - - def receive = { - case _ => {} // ignore all messages - } - } - - private val supervisor = actorOf(new AMQPSupervisorActor).start - - def newConnection(connectionParameters: ConnectionParameters): ActorRef = { - val connectionActor = actorOf(new FaultTolerantConnectionActor(connectionParameters)) - supervisor.startLink(connectionActor) - connectionActor + def receive = { + case _ => {} // ignore all messages } } + + private val supervisor = actorOf(new AMQPSupervisorActor).start + + def shutdownAll = { + supervisor.stop + } } diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConnectionRecoveryTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConnectionRecoveryTest.scala index ef2ee5b80f..a0dc21b7dc 100644 --- a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConnectionRecoveryTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConnectionRecoveryTest.scala @@ -44,7 +44,7 @@ class AMQPConnectionRecoveryTest extends JUnitSuite with MustMatchers { reconnectedLatch.tryAwait(2, TimeUnit.SECONDS) must be(true) } finally { - connection.stop + AMQP.shutdownAll disconnectedLatch.tryAwait(2, TimeUnit.SECONDS) must be(true) } } From 295664731e238266566706a68f8e8117584c5fa5 Mon Sep 17 00:00:00 2001 From: momania Date: Thu, 12 Aug 2010 11:07:36 +0200 Subject: [PATCH 11/13] shutdown linked actors too when shutting down supervisor --- .../src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala | 4 ++++ .../scala/se/scalablesolutions/akka/amqp/ConsumerActor.scala | 2 +- .../akka/amqp/FaultTolerantConnectionActor.scala | 2 +- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala index 9651054b90..b265cb38a1 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala @@ -93,6 +93,10 @@ object AMQP { def receive = { case _ => {} // ignore all messages } + + override def shutdown = { + self.shutdownLinkedActors + } } private val supervisor = actorOf(new AMQPSupervisorActor).start diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ConsumerActor.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ConsumerActor.scala index 39aa346274..b01f79f949 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ConsumerActor.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ConsumerActor.scala @@ -108,7 +108,7 @@ private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters) override def shutdown = { listenerTag.foreach(tag => channel.foreach(_.basicCancel(tag))) - self.linkedActorsAsList.foreach(_.stop) + self.shutdownLinkedActors super.shutdown } diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantConnectionActor.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantConnectionActor.scala index 97c3074700..1e50a985be 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantConnectionActor.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantConnectionActor.scala @@ -107,7 +107,7 @@ private[amqp] class FaultTolerantConnectionActor(connectionParameters: Connectio override def shutdown = { reconnectionTimer.cancel // make sure shutdown is called on all linked actors so they can do channel cleanup before connection is killed - self.linkedActorsAsList.foreach(_.stop) + self.shutdownLinkedActors disconnect } From 50e5e5e2987c725b7f8bfb99437a5e76b84b9911 Mon Sep 17 00:00:00 2001 From: momania Date: Thu, 12 Aug 2010 16:16:18 +0200 Subject: [PATCH 12/13] making it more easy to start string and protobuf base consumers, producers and rpc style --- .../se/scalablesolutions/akka/amqp/AMQP.scala | 183 ++++++++++++++---- .../akka/amqp/ExampleSession.scala | 131 +++++++++++-- .../scalablesolutions/akka/amqp/rpc/RPC.scala | 125 ++++++++---- .../akka/amqp/rpc/RpcClientActor.scala | 6 + .../test/AMQPConnectionRecoveryTest.scala | 2 +- .../AMQPConsumerChannelRecoveryTest.scala | 2 +- .../AMQPConsumerConnectionRecoveryTest.scala | 2 +- .../AMQPConsumerManualAcknowledgeTest.scala | 2 +- .../test/AMQPConsumerManualRejectTest.scala | 2 +- .../amqp/test/AMQPConsumerMessageTest.scala | 2 +- .../AMQPProducerChannelRecoveryTest.scala | 2 +- .../AMQPProducerConnectionRecoveryTest.scala | 2 +- .../amqp/test/AMQPProducerMessageTest.scala | 2 +- .../AMQPProtobufProducerConsumerTest.scala | 43 ++++ .../amqp/test/AMQPRpcClientServerTest.scala | 81 ++++---- .../akka/amqp/test/AMQPRpcProtobufTest.scala | 7 +- .../akka/amqp/test/AMQPRpcStringTest.scala | 47 +++++ .../test/AMQPStringProducerConsumerTest.scala | 44 +++++ .../akka/amqp/test/AMQPTest.scala | 15 +- config/logback.xml | 2 +- 20 files changed, 561 insertions(+), 141 deletions(-) create mode 100644 akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProtobufProducerConsumerTest.scala create mode 100644 akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcStringTest.scala create mode 100644 akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPStringProducerConsumerTest.scala diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala index b265cb38a1..cd73d27e03 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala @@ -1,5 +1,5 @@ /** - * Copyright (C) 2009-2010 Scalable Solutions AB + * Copyright (C) 2009-2010 Scalable Solutions AB */ package se.scalablesolutions.akka.amqp @@ -8,7 +8,8 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRef} import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.config.OneForOneStrategy import com.rabbitmq.client.{ReturnListener, ShutdownListener, ConnectionFactory} -import java.lang.IllegalArgumentException +import com.rabbitmq.client.AMQP.BasicProperties +import java.lang.{String, IllegalArgumentException} /** * AMQP Actor API. Implements Connection, Producer and Consumer materialized as Actors. @@ -19,43 +20,43 @@ import java.lang.IllegalArgumentException */ object AMQP { case class ConnectionParameters( - host: String = ConnectionFactory.DEFAULT_HOST, - port: Int = ConnectionFactory.DEFAULT_AMQP_PORT, - username: String = ConnectionFactory.DEFAULT_USER, - password: String = ConnectionFactory.DEFAULT_PASS, - virtualHost: String = ConnectionFactory.DEFAULT_VHOST, - initReconnectDelay: Long = 5000, - connectionCallback: Option[ActorRef] = None) + host: String = ConnectionFactory.DEFAULT_HOST, + port: Int = ConnectionFactory.DEFAULT_AMQP_PORT, + username: String = ConnectionFactory.DEFAULT_USER, + password: String = ConnectionFactory.DEFAULT_PASS, + virtualHost: String = ConnectionFactory.DEFAULT_VHOST, + initReconnectDelay: Long = 5000, + connectionCallback: Option[ActorRef] = None) case class ChannelParameters( - shutdownListener: Option[ShutdownListener] = None, - channelCallback: Option[ActorRef] = None) + shutdownListener: Option[ShutdownListener] = None, + channelCallback: Option[ActorRef] = None) case class ExchangeParameters( - exchangeName: String, - exchangeType: ExchangeType, - exchangeDurable: Boolean = false, - exchangeAutoDelete: Boolean = true, - exchangePassive: Boolean = false, - configurationArguments: Map[String, AnyRef] = Map()) + exchangeName: String, + exchangeType: ExchangeType, + exchangeDurable: Boolean = false, + exchangeAutoDelete: Boolean = true, + exchangePassive: Boolean = false, + configurationArguments: Map[String, AnyRef] = Map()) case class ProducerParameters( - exchangeParameters: ExchangeParameters, - producerId: Option[String] = None, - returnListener: Option[ReturnListener] = None, - channelParameters: Option[ChannelParameters] = None) + exchangeParameters: ExchangeParameters, + producerId: Option[String] = None, + returnListener: Option[ReturnListener] = None, + channelParameters: Option[ChannelParameters] = None) case class ConsumerParameters( - exchangeParameters: ExchangeParameters, - routingKey: String, - deliveryHandler: ActorRef, - queueName: Option[String] = None, - queueDurable: Boolean = false, - queueAutoDelete: Boolean = true, - queuePassive: Boolean = false, - queueExclusive: Boolean = false, - selfAcknowledging: Boolean = true, - channelParameters: Option[ChannelParameters] = None) { + exchangeParameters: ExchangeParameters, + routingKey: String, + deliveryHandler: ActorRef, + queueName: Option[String] = None, + queueDurable: Boolean = false, + queueAutoDelete: Boolean = true, + queuePassive: Boolean = false, + queueExclusive: Boolean = false, + selfAcknowledging: Boolean = true, + channelParameters: Option[ChannelParameters] = None) { if (queueDurable && queueName.isEmpty) { throw new IllegalArgumentException("A queue name is required when requesting a durable queue.") } @@ -84,6 +85,102 @@ object AMQP { consumer } + /** + * Convenience + */ + class ProducerClient[O](client: ActorRef, routingKey: String, toBinary: ToBinary[O]) { + def send(request: O, replyTo: Option[String] = None) = { + val basicProperties = new BasicProperties + basicProperties.setReplyTo(replyTo.getOrElse(null)) + client ! Message(toBinary.toBinary(request), routingKey, false, false, Some(basicProperties)) + } + + def stop = client.stop + } + + def newStringProducer(connection: ActorRef, + exchange: String, + routingKey: Option[String] = None, + producerId: Option[String] = None, + durable: Boolean = false, + autoDelete: Boolean = true, + passive: Boolean = true): ProducerClient[String] = { + + val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic, + exchangeDurable = durable, exchangeAutoDelete = autoDelete) + val rKey = routingKey.getOrElse("%s.request".format(exchange)) + + val producerRef = newProducer(connection, ProducerParameters(exchangeParameters, producerId)) + val toBinary = new ToBinary[String] { + def toBinary(t: String) = t.getBytes + } + new ProducerClient(producerRef, rKey, toBinary) + } + + def newStringConsumer(connection: ActorRef, + exchange: String, + handler: String => Unit, + routingKey: Option[String] = None, + queueName: Option[String] = None, + durable: Boolean = false, + autoDelete: Boolean = true): ActorRef = { + + val deliveryHandler = actor { + case Delivery(payload, _, _, _, _) => handler.apply(new String(payload)) + } + + val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic, + exchangeDurable = durable, exchangeAutoDelete = autoDelete) + val rKey = routingKey.getOrElse("%s.request".format(exchange)) + val qName = queueName.getOrElse("%s.in".format(rKey)) + + newConsumer(connection, ConsumerParameters(exchangeParameters, rKey, deliveryHandler, Some(qName), durable, autoDelete)) + } + + def newProtobufProducer[O <: com.google.protobuf.Message](connection: ActorRef, + exchange: String, + routingKey: Option[String] = None, + producerId: Option[String] = None, + durable: Boolean = false, + autoDelete: Boolean = true, + passive: Boolean = true): ProducerClient[O] = { + + val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic, + exchangeDurable = durable, exchangeAutoDelete = autoDelete) + val rKey = routingKey.getOrElse("%s.request".format(exchange)) + + val producerRef = newProducer(connection, ProducerParameters(exchangeParameters, producerId)) + new ProducerClient(producerRef, rKey, new ToBinary[O] { + def toBinary(t: O) = t.toByteArray + }) + } + + def newProtobufConsumer[I <: com.google.protobuf.Message](connection: ActorRef, + exchange: String, + handler: I => Unit, + routingKey: Option[String] = None, + queueName: Option[String] = None, + durable: Boolean = false, + autoDelete: Boolean = true)(implicit manifest: Manifest[I]): ActorRef = { + + val deliveryHandler = actor { + case Delivery(payload, _, _, _, _) => { + handler.apply(createProtobufFromBytes[I](payload)) + } + } + + val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic, + exchangeDurable = durable, exchangeAutoDelete = autoDelete) + val rKey = routingKey.getOrElse("%s.request".format(exchange)) + val qName = queueName.getOrElse("%s.in".format(rKey)) + + newConsumer(connection, ConsumerParameters(exchangeParameters, rKey, deliveryHandler, Some(qName), durable, autoDelete)) + } + + /** + * Main supervisor + */ + class AMQPSupervisorActor extends Actor { import self._ @@ -93,15 +190,29 @@ object AMQP { def receive = { case _ => {} // ignore all messages } - - override def shutdown = { - self.shutdownLinkedActors - } } private val supervisor = actorOf(new AMQPSupervisorActor).start def shutdownAll = { - supervisor.stop + supervisor.shutdownLinkedActors + } + + /** + * Serialization stuff + */ + + trait FromBinary[T] { + def fromBinary(bytes: Array[Byte]): T + } + + trait ToBinary[T] { + def toBinary(t: T): Array[Byte] + } + + private val ARRAY_OF_BYTE_ARRAY = Array[Class[_]](classOf[Array[Byte]]) + + private[amqp] def createProtobufFromBytes[I <: com.google.protobuf.Message](bytes: Array[Byte])(implicit manifest: Manifest[I]): I = { + manifest.erasure.getDeclaredMethod("parseFrom", ARRAY_OF_BYTE_ARRAY: _*).invoke(null, bytes).asInstanceOf[I] } } diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala index f2ae5e8295..f35da954d0 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala @@ -5,45 +5,63 @@ package se.scalablesolutions.akka.amqp import rpc.RPC -import rpc.RPC.{RpcClientSerializer, RpcServerSerializer, ToBinary, FromBinary} +import rpc.RPC.{RpcClientSerializer, RpcServerSerializer} import se.scalablesolutions.akka.actor.{Actor, ActorRegistry} import Actor._ import java.util.concurrent.{CountDownLatch, TimeUnit} -import se.scalablesolutions.akka.amqp.AMQP._ import java.lang.String +import se.scalablesolutions.akka.amqp.AMQP._ +import se.scalablesolutions.akka.remote.protocol.RemoteProtocol.AddressProtocol object ExampleSession { def main(args: Array[String]) = { - println("==== DIRECT ===") + + printTopic("DIRECT") direct - TimeUnit.SECONDS.sleep(2) - - println("==== FANOUT ===") + printTopic("FANOUT") fanout - TimeUnit.SECONDS.sleep(2) - - println("==== TOPIC ===") + printTopic("TOPIC") topic - TimeUnit.SECONDS.sleep(2) - - println("==== CALLBACK ===") + printTopic("CALLBACK") callback - TimeUnit.SECONDS.sleep(2) + printTopic("EASY STRING PRODUCER AND CONSUMER") + easyStringProducerConsumer - println("==== RPC ===") + printTopic("EASY PROTOBUF PRODUCER AND CONSUMER") + easyProtobufProducerConsumer + + printTopic("RPC") rpc - TimeUnit.SECONDS.sleep(2) + printTopic("EASY STRING RPC") + easyStringRpc + + printTopic("EASY PROTOBUF RPC") + easyProtobufRpc + + printTopic("Happy hAkking :-)") + + // shutdown everything the amqp tree except the main AMQP supervisor + // all connections/consumers/producers will be stopped + AMQP.shutdownAll ActorRegistry.shutdownAll System.exit(0) } + def printTopic(topic: String) { + + println("") + println("==== " + topic + " ===") + println("") + TimeUnit.SECONDS.sleep(2) + } + def direct = { // defaults to amqp://guest:guest@localhost:5672/ @@ -117,7 +135,7 @@ object ExampleSession { case Restarting => // not used, sent when channel or connection fails and initiates a restart case Stopped => log.info("Channel callback: Stopped") } - val exchangeParameters = ExchangeParameters("my_direct_exchange", ExchangeType.Direct) + val exchangeParameters = ExchangeParameters("my_callback_exchange", ExchangeType.Direct) val channelParameters = ChannelParameters(channelCallback = Some(channelCallback)) val consumer = AMQP.newConsumer(connection, ConsumerParameters(exchangeParameters, "callback.routing", actor { @@ -131,6 +149,40 @@ object ExampleSession { connection.stop } + def easyStringProducerConsumer = { + val connection = AMQP.newConnection() + + val exchangeName = "easy.string" + + // listen by default to: + // exchange = exchangeName + // routingKey = .request + // queueName = .in + AMQP.newStringConsumer(connection, exchangeName, message => println("Received message: "+message)) + + // send by default to: + // exchange = exchangeName + // routingKey = .request + val producer = AMQP.newStringProducer(connection, exchangeName) + + producer.send("This shit is easy!") + } + + def easyProtobufProducerConsumer = { + val connection = AMQP.newConnection() + + val exchangeName = "easy.protobuf" + + def protobufMessageHandler(message: AddressProtocol) = { + log.info("Received "+message) + } + + AMQP.newProtobufConsumer(connection, exchangeName, protobufMessageHandler) + + val producerClient = AMQP.newProtobufProducer[AddressProtocol](connection, exchangeName) + producerClient.send(AddressProtocol.newBuilder.setHostname("akkarocks.com").setPort(1234).build) + } + def rpc = { val connection = AMQP.newConnection() @@ -166,4 +218,51 @@ object ExampleSession { val response = (rpcClient !! "rpc_request") log.info("Response: " + response) } + + def easyStringRpc = { + + val connection = AMQP.newConnection() + + val exchangeName = "easy.stringrpc" + + // listen by default to: + // exchange = exchangeName + // routingKey = .request + // queueName = .in + RPC.newStringRpcServer(connection, exchangeName, request => { + log.info("Got request: "+request) + "Response to: '"+request+"'" + }) + + // send by default to: + // exchange = exchangeName + // routingKey = .request + val stringRpcClient = RPC.newStringRpcClient(connection, exchangeName) + + val response = stringRpcClient.call("AMQP Rocks!") + log.info("Got response: "+response) + + stringRpcClient.callAsync("AMQP is dead easy") { + case response => log.info("This is handled async: "+response) + } + } + + def easyProtobufRpc = { + + val connection = AMQP.newConnection() + + val exchangeName = "easy.protobuf.rpc" + + def protobufRequestHandler(request: AddressProtocol): AddressProtocol = { + AddressProtocol.newBuilder.setHostname(request.getHostname.reverse).setPort(request.getPort).build + } + + RPC.newProtobufRpcServer(connection, exchangeName, protobufRequestHandler) + + val stringRpcClient = RPC.newProtobufRpcClient[AddressProtocol, AddressProtocol](connection, exchangeName) + + val response = stringRpcClient.call(AddressProtocol.newBuilder.setHostname("localhost").setPort(4321).build) + + log.info("Got response: "+response) + } } diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala index cb25444185..b51cbe407f 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala @@ -26,26 +26,22 @@ object RPC { serializer: RpcServerSerializer[I, O], requestHandler: I => O, queueName: Option[String] = None, - channelParameters: Option[ChannelParameters] = None) = { + channelParameters: Option[ChannelParameters] = None): RpcServerHandle = { val producer = newProducer(connection, ProducerParameters( ExchangeParameters("", ExchangeType.Direct), channelParameters = channelParameters)) val rpcServer = actorOf(new RpcServerActor[I, O](producer, serializer, requestHandler)) - val consumer = newConsumer(connection, ConsumerParameters(exchangeParameters, routingKey, rpcServer - , channelParameters = channelParameters - , selfAcknowledging = false - , queueName = queueName)) - + val consumer = newConsumer(connection, ConsumerParameters(exchangeParameters, routingKey, rpcServer, + channelParameters = channelParameters, selfAcknowledging = false, queueName = queueName)) + RpcServerHandle(producer, consumer) } - trait FromBinary[T] { - def fromBinary(bytes: Array[Byte]): T + case class RpcServerHandle(producer: ActorRef, consumer: ActorRef) { + def stop = { + consumer.stop + producer.stop + } } - trait ToBinary[T] { - def toBinary(t: T): Array[Byte] - } - - case class RpcClientSerializer[O, I](toBinary: ToBinary[O], fromBinary: FromBinary[I]) case class RpcServerSerializer[I, O](fromBinary: FromBinary[I], toBinary: ToBinary[O]) @@ -65,15 +61,17 @@ object RPC { responseHandler.apply(result) } } + def stop = client.stop } - private val ARRAY_OF_BYTE_ARRAY = Array[Class[_]](classOf[Array[Byte]]) - - def startProtobufServer[I <: Message, O <: Message]( - connection: ActorRef, exchange: String, requestHandler: I => O, + def newProtobufRpcServer[I <: Message, O <: Message]( + connection: ActorRef, + exchange: String, + requestHandler: I => O, routingKey: Option[String] = None, queueName: Option[String] = None, - durable: Boolean = false, autoDelete: Boolean = true)(implicit manifest: Manifest[I]) = { + durable: Boolean = false, + autoDelete: Boolean = true)(implicit manifest: Manifest[I]): RpcServerHandle = { val serializer = new RpcServerSerializer[I, O]( new FromBinary[I] { @@ -84,19 +82,15 @@ object RPC { def toBinary(t: O) = t.toByteArray }) - val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic, - exchangeDurable = durable, exchangeAutoDelete = autoDelete) - val rKey = routingKey.getOrElse("%s.request".format(exchange)) - val qName = queueName.getOrElse("%s.in".format(rKey)) - - newRpcServer[I, O](connection, exchangeParameters, rKey, serializer, requestHandler, - queueName = Some(qName)) + startServer(connection, exchange, requestHandler, routingKey, queueName, durable, autoDelete, serializer) } - def startProtobufClient[O <: Message, I <: Message]( - connection: ActorRef, exchange: String, + def newProtobufRpcClient[O <: Message, I <: Message]( + connection: ActorRef, + exchange: String, routingKey: Option[String] = None, - durable: Boolean = false, autoDelete: Boolean = true, + durable: Boolean = false, + autoDelete: Boolean = true, passive: Boolean = true)(implicit manifest: Manifest[I]): RpcClient[O, I] = { @@ -109,15 +103,80 @@ object RPC { } }) + startClient(connection, exchange, routingKey, durable, autoDelete, passive, serializer) + } + + def newStringRpcServer(connection: ActorRef, + exchange: String, + requestHandler: String => String, + routingKey: Option[String] = None, + queueName: Option[String] = None, + durable: Boolean = false, + autoDelete: Boolean = true): RpcServerHandle = { + + val serializer = new RpcServerSerializer[String, String]( + new FromBinary[String] { + def fromBinary(bytes: Array[Byte]): String = { + new String(bytes) + } + }, new ToBinary[String] { + def toBinary(t: String) = t.getBytes + }) + + startServer(connection, exchange, requestHandler, routingKey, queueName, durable, autoDelete, serializer) + } + + def newStringRpcClient(connection: ActorRef, + exchange: String, + routingKey: Option[String] = None, + durable: Boolean = false, + autoDelete: Boolean = true, + passive: Boolean = true): RpcClient[String, String] = { + + + val serializer = new RpcClientSerializer[String, String]( + new ToBinary[String] { + def toBinary(t: String) = t.getBytes + }, new FromBinary[String] { + def fromBinary(bytes: Array[Byte]): String = { + new String(bytes) + } + }) + + startClient(connection, exchange, routingKey, durable, autoDelete, passive, serializer) + } + + private def startClient[O, I](connection: ActorRef, + exchange: String, + routingKey: Option[String] = None, + durable: Boolean = false, + autoDelete: Boolean = true, + passive: Boolean = true, + serializer: RpcClientSerializer[O, I]): RpcClient[O, I] = { + val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic, exchangeDurable = durable, exchangeAutoDelete = autoDelete, exchangePassive = passive) val rKey = routingKey.getOrElse("%s.request".format(exchange)) - val client = newRpcClient[O, I](connection, exchangeParameters, rKey, serializer) - new RpcClient[O, I](client) + val client = newRpcClient(connection, exchangeParameters, rKey, serializer) + new RpcClient(client) } - private def createProtobufFromBytes[I](bytes: Array[Byte])(implicit manifest: Manifest[I]): I = { - manifest.erasure.getDeclaredMethod("parseFrom", ARRAY_OF_BYTE_ARRAY: _*).invoke(null, bytes).asInstanceOf[I] + private def startServer[I, O](connection: ActorRef, + exchange: String, + requestHandler: I => O, + routingKey: Option[String] = None, + queueName: Option[String] = None, + durable: Boolean = false, + autoDelete: Boolean = true, + serializer: RpcServerSerializer[I, O]): RpcServerHandle = { + + val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic, + exchangeDurable = durable, exchangeAutoDelete = autoDelete) + val rKey = routingKey.getOrElse("%s.request".format(exchange)) + val qName = queueName.getOrElse("%s.in".format(rKey)) + + newRpcServer(connection, exchangeParameters, rKey, serializer, requestHandler, queueName = Some(qName)) } -} \ No newline at end of file +} + diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RpcClientActor.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RpcClientActor.scala index b70090c63e..5c717cb8bb 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RpcClientActor.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RpcClientActor.scala @@ -39,5 +39,11 @@ class RpcClientActor[I,O]( super.preRestart(reason) } + + override def shutdown = { + rpcClient.foreach(rpc => rpc.close) + super.shutdown + } + override def toString = "AMQP.RpcClient[exchange=" +exchangeName + ", routingKey=" + routingKey+ "]" } diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConnectionRecoveryTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConnectionRecoveryTest.scala index a0dc21b7dc..f9d30227f0 100644 --- a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConnectionRecoveryTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConnectionRecoveryTest.scala @@ -17,7 +17,7 @@ import org.junit.Test class AMQPConnectionRecoveryTest extends JUnitSuite with MustMatchers { @Test - def connectionAndRecovery = if (AMQPTest.enabled) { + def connectionAndRecovery = if (AMQPTest.enabled) AMQPTest.withCleanEndState { val connectedLatch = new StandardLatch val reconnectingLatch = new StandardLatch diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerChannelRecoveryTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerChannelRecoveryTest.scala index 3c21fa36af..31a90c8200 100644 --- a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerChannelRecoveryTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerChannelRecoveryTest.scala @@ -18,7 +18,7 @@ import se.scalablesolutions.akka.actor.Actor._ class AMQPConsumerChannelRecoveryTest extends JUnitSuite with MustMatchers { @Test - def consumerChannelRecovery = if (AMQPTest.enabled) { + def consumerChannelRecovery = if (AMQPTest.enabled) AMQPTest.withCleanEndState { val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50)) try { diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerConnectionRecoveryTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerConnectionRecoveryTest.scala index 301489bd8f..50c078a13a 100644 --- a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerConnectionRecoveryTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerConnectionRecoveryTest.scala @@ -18,7 +18,7 @@ import Actor._ class AMQPConsumerConnectionRecoveryTest extends JUnitSuite with MustMatchers { @Test - def consumerConnectionRecovery = if (AMQPTest.enabled) { + def consumerConnectionRecovery = if (AMQPTest.enabled) AMQPTest.withCleanEndState { val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50)) try { diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerManualAcknowledgeTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerManualAcknowledgeTest.scala index a18db40d5f..011f287636 100644 --- a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerManualAcknowledgeTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerManualAcknowledgeTest.scala @@ -17,7 +17,7 @@ import org.scalatest.junit.JUnitSuite class AMQPConsumerManualAcknowledgeTest extends JUnitSuite with MustMatchers { @Test - def consumerMessageManualAcknowledge = if (AMQPTest.enabled) { + def consumerMessageManualAcknowledge = if (AMQPTest.enabled) AMQPTest.withCleanEndState { val connection = AMQP.newConnection() try { val countDown = new CountDownLatch(2) diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerManualRejectTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerManualRejectTest.scala index e928cb8ea9..d00d09b480 100644 --- a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerManualRejectTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerManualRejectTest.scala @@ -17,7 +17,7 @@ import org.scalatest.junit.JUnitSuite class AMQPConsumerManualRejectTest extends JUnitSuite with MustMatchers { @Test - def consumerMessageManualAcknowledge = if (AMQPTest.enabled) { + def consumerMessageManualAcknowledge = if (AMQPTest.enabled) AMQPTest.withCleanEndState { val connection = AMQP.newConnection() try { val countDown = new CountDownLatch(2) diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerMessageTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerMessageTest.scala index 9eb7aa9158..88661de58d 100644 --- a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerMessageTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerMessageTest.scala @@ -16,7 +16,7 @@ import org.junit.Test class AMQPConsumerMessageTest extends JUnitSuite with MustMatchers { @Test - def consumerMessage = if (AMQPTest.enabled) { + def consumerMessage = if (AMQPTest.enabled) AMQPTest.withCleanEndState { val connection = AMQP.newConnection() try { diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerChannelRecoveryTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerChannelRecoveryTest.scala index 3aec05c122..e0ede02de3 100644 --- a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerChannelRecoveryTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerChannelRecoveryTest.scala @@ -17,7 +17,7 @@ import org.junit.Test class AMQPProducerChannelRecoveryTest extends JUnitSuite with MustMatchers { @Test - def producerChannelRecovery = if (AMQPTest.enabled) { + def producerChannelRecovery = if (AMQPTest.enabled) AMQPTest.withCleanEndState { val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50)) diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerConnectionRecoveryTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerConnectionRecoveryTest.scala index b1a4cc1027..ad756ff5f0 100644 --- a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerConnectionRecoveryTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerConnectionRecoveryTest.scala @@ -17,7 +17,7 @@ import org.junit.Test class AMQPProducerConnectionRecoveryTest extends JUnitSuite with MustMatchers { @Test - def producerConnectionRecovery = if (AMQPTest.enabled) { + def producerConnectionRecovery = if (AMQPTest.enabled) AMQPTest.withCleanEndState { val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50)) try { diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerMessageTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerMessageTest.scala index 587f03ec92..7d485b1b8f 100644 --- a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerMessageTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerMessageTest.scala @@ -19,7 +19,7 @@ import org.junit.Test class AMQPProducerMessageTest extends JUnitSuite with MustMatchers { @Test - def producerMessage = if (AMQPTest.enabled) { + def producerMessage = if (AMQPTest.enabled) AMQPTest.withCleanEndState { val connection: ActorRef = AMQP.newConnection() try { diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProtobufProducerConsumerTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProtobufProducerConsumerTest.scala new file mode 100644 index 0000000000..5d03dae5c2 --- /dev/null +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProtobufProducerConsumerTest.scala @@ -0,0 +1,43 @@ +package se.scalablesolutions.akka.amqp.test + +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ +import org.scalatest.matchers.MustMatchers +import org.scalatest.junit.JUnitSuite +import se.scalablesolutions.akka.amqp.AMQP +import org.junit.Test +import org.multiverse.api.latches.StandardLatch +import java.util.concurrent.TimeUnit +import se.scalablesolutions.akka.amqp.rpc.RPC +import se.scalablesolutions.akka.remote.protocol.RemoteProtocol.AddressProtocol + +class AMQPProtobufProducerConsumerTest extends JUnitSuite with MustMatchers { + + @Test + def consumerMessage = if (AMQPTest.enabled) AMQPTest.withCleanEndState { + + val connection = AMQP.newConnection() + + val responseLatch = new StandardLatch + + RPC.newProtobufRpcServer(connection, "protoexchange", requestHandler) + + val request = AddressProtocol.newBuilder.setHostname("testhost").setPort(4321).build + + def responseHandler(response: AddressProtocol) = { + assert(response.getHostname == request.getHostname.reverse) + responseLatch.open + } + AMQP.newProtobufConsumer(connection, "", responseHandler, Some("proto.reply.key")) + + val producer = AMQP.newProtobufProducer[AddressProtocol](connection, "protoexchange") + producer.send(request, Some("proto.reply.key")) + + responseLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) + } + + def requestHandler(request: AddressProtocol): AddressProtocol = { + AddressProtocol.newBuilder.setHostname(request.getHostname.reverse).setPort(request.getPort).build + } +} \ No newline at end of file diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcClientServerTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcClientServerTest.scala index bf0ee6610d..7de8044314 100644 --- a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcClientServerTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcClientServerTest.scala @@ -6,7 +6,7 @@ package se.scalablesolutions.akka.amqp.test import se.scalablesolutions.akka.amqp._ import rpc.RPC -import rpc.RPC.{RpcClientSerializer, RpcServerSerializer, FromBinary, ToBinary} +import rpc.RPC.{RpcClientSerializer, RpcServerSerializer} import se.scalablesolutions.akka.actor.Actor._ import org.scalatest.matchers.MustMatchers import java.util.concurrent.{CountDownLatch, TimeUnit} @@ -15,50 +15,47 @@ import org.scalatest.junit.JUnitSuite import org.junit.Test class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers { - + @Test - def consumerMessage = if (AMQPTest.enabled) { + def consumerMessage = if (AMQPTest.enabled) AMQPTest.withCleanEndState { + val connection = AMQP.newConnection() - try { - val countDown = new CountDownLatch(3) - val channelCallback = actor { - case Started => countDown.countDown - case Restarting => () - case Stopped => () - } - - val exchangeParameters = ExchangeParameters("text_topic_exchange", ExchangeType.Topic) - val channelParameters = ChannelParameters(channelCallback - = Some(channelCallback)) - - val rpcServerSerializer = new RpcServerSerializer[String, Int]( - new FromBinary[String] { - def fromBinary(bytes: Array[Byte]) = new String(bytes) - }, new ToBinary[Int] { - def toBinary(t: Int) = Array(t.toByte) - }) - - def requestHandler(request: String) = 3 - - val rpcServer = RPC.newRpcServer[String, Int](connection, exchangeParameters, "rpc.routing", rpcServerSerializer, - requestHandler, channelParameters = Some(channelParameters)) - - val rpcClientSerializer = new RpcClientSerializer[String, Int]( - new ToBinary[String] { - def toBinary(t: String) = t.getBytes - }, new FromBinary[Int] { - def fromBinary(bytes: Array[Byte]) = bytes.head.toInt - }) - - val rpcClient = RPC.newRpcClient[String, Int](connection, exchangeParameters, "rpc.routing", rpcClientSerializer, - channelParameters = Some(channelParameters)) - - countDown.await(2, TimeUnit.SECONDS) must be(true) - val response = rpcClient !! "some_payload" - response must be(Some(3)) - } finally { - connection.stop + val countDown = new CountDownLatch(3) + val channelCallback = actor { + case Started => countDown.countDown + case Restarting => () + case Stopped => () } + + val exchangeParameters = ExchangeParameters("text_topic_exchange", ExchangeType.Topic) + val channelParameters = ChannelParameters(channelCallback + = Some(channelCallback)) + + val rpcServerSerializer = new RpcServerSerializer[String, Int]( + new FromBinary[String] { + def fromBinary(bytes: Array[Byte]) = new String(bytes) + }, new ToBinary[Int] { + def toBinary(t: Int) = Array(t.toByte) + }) + + def requestHandler(request: String) = 3 + + val rpcServer = RPC.newRpcServer[String, Int](connection, exchangeParameters, "rpc.routing", rpcServerSerializer, + requestHandler, channelParameters = Some(channelParameters)) + + val rpcClientSerializer = new RpcClientSerializer[String, Int]( + new ToBinary[String] { + def toBinary(t: String) = t.getBytes + }, new FromBinary[Int] { + def fromBinary(bytes: Array[Byte]) = bytes.head.toInt + }) + + val rpcClient = RPC.newRpcClient[String, Int](connection, exchangeParameters, "rpc.routing", rpcClientSerializer, + channelParameters = Some(channelParameters)) + + countDown.await(2, TimeUnit.SECONDS) must be(true) + val response = rpcClient !! "some_payload" + response must be(Some(3)) } } diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcProtobufTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcProtobufTest.scala index b936b821fe..6b796374a6 100644 --- a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcProtobufTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcProtobufTest.scala @@ -15,13 +15,13 @@ import java.util.concurrent.TimeUnit class AMQPRpcProtobufTest extends JUnitSuite with MustMatchers { @Test - def consumerMessage = if (AMQPTest.enabled) { + def consumerMessage = if (AMQPTest.enabled) AMQPTest.withCleanEndState { val connection = AMQP.newConnection() - RPC.startProtobufServer(connection, "protoservice", requestHandler) + RPC.newProtobufRpcServer(connection, "protoservice", requestHandler) - val protobufClient = RPC.startProtobufClient[AddressProtocol, AddressProtocol](connection, "protoservice") + val protobufClient = RPC.newProtobufRpcClient[AddressProtocol, AddressProtocol](connection, "protoservice") val request = AddressProtocol.newBuilder.setHostname("testhost").setPort(4321).build @@ -40,6 +40,7 @@ class AMQPRpcProtobufTest extends JUnitSuite with MustMatchers { } aSyncLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) + } def requestHandler(request: AddressProtocol): AddressProtocol = { diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcStringTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcStringTest.scala new file mode 100644 index 0000000000..0a55fda954 --- /dev/null +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcStringTest.scala @@ -0,0 +1,47 @@ +package se.scalablesolutions.akka.amqp.test + +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ +import org.scalatest.matchers.MustMatchers +import org.scalatest.junit.JUnitSuite +import se.scalablesolutions.akka.amqp.AMQP +import org.junit.Test +import se.scalablesolutions.akka.amqp.rpc.RPC +import org.multiverse.api.latches.StandardLatch +import java.util.concurrent.TimeUnit + +class AMQPRpcStringTest extends JUnitSuite with MustMatchers { + + @Test + def consumerMessage = if (AMQPTest.enabled) AMQPTest.withCleanEndState { + + val connection = AMQP.newConnection() + + RPC.newStringRpcServer(connection, "stringservice", requestHandler) + + val protobufClient = RPC.newStringRpcClient(connection, "stringservice") + + val request = "teststring" + + protobufClient.call(request) match { + case Some(response) => assert(response == request.reverse) + case None => fail("no response") + } + + val aSyncLatch = new StandardLatch + protobufClient.callAsync(request) { + case Some(response) => { + assert(response == request.reverse) + aSyncLatch.open + } + case None => fail("no response") + } + + aSyncLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) + } + + def requestHandler(request: String): String= { + request.reverse + } +} \ No newline at end of file diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPStringProducerConsumerTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPStringProducerConsumerTest.scala new file mode 100644 index 0000000000..bbb77c51a7 --- /dev/null +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPStringProducerConsumerTest.scala @@ -0,0 +1,44 @@ +package se.scalablesolutions.akka.amqp.test + +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ +import org.scalatest.matchers.MustMatchers +import org.scalatest.junit.JUnitSuite +import se.scalablesolutions.akka.amqp.AMQP +import org.junit.Test +import org.multiverse.api.latches.StandardLatch +import java.util.concurrent.TimeUnit +import se.scalablesolutions.akka.amqp.rpc.RPC + +class AMQPStringProducerConsumerTest extends JUnitSuite with MustMatchers { + + @Test + def consumerMessage = if (AMQPTest.enabled) AMQPTest.withCleanEndState { + + val connection = AMQP.newConnection() + + val responseLatch = new StandardLatch + + RPC.newStringRpcServer(connection, "stringexchange", requestHandler) + + val request = "somemessage" + + def responseHandler(response: String) = { + + assert(response == request.reverse) + responseLatch.open + } + AMQP.newStringConsumer(connection, "", responseHandler, Some("string.reply.key")) + + val producer = AMQP.newStringProducer(connection, "stringexchange") + producer.send(request, Some("string.reply.key")) + + responseLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) + } + + def requestHandler(request: String): String= { + println("###### Reverse") + request.reverse + } +} \ No newline at end of file diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPTest.scala index 5ff9157bc5..5af6200add 100644 --- a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPTest.scala @@ -4,6 +4,19 @@ package se.scalablesolutions.akka.amqp.test +import se.scalablesolutions.akka.amqp.AMQP +import se.scalablesolutions.akka.actor.ActorRegistry +import java.util.concurrent.TimeUnit + object AMQPTest { - def enabled = false + + def enabled = true + + def withCleanEndState(action: => Unit) { + try { + action + } finally { + AMQP.shutdownAll + } + } } diff --git a/config/logback.xml b/config/logback.xml index 40faeefb3c..5ab49da1c3 100755 --- a/config/logback.xml +++ b/config/logback.xml @@ -27,7 +27,7 @@ ./logs/akka.log.%d{yyyy-MM-dd-HH} - + From 072425edad9912728be50332cd29396d0ef2e994 Mon Sep 17 00:00:00 2001 From: momania Date: Thu, 12 Aug 2010 16:18:11 +0200 Subject: [PATCH 13/13] disable tests again --- .../scala/se/scalablesolutions/akka/amqp/test/AMQPTest.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPTest.scala index 5af6200add..2930ce4e68 100644 --- a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPTest.scala @@ -5,12 +5,9 @@ package se.scalablesolutions.akka.amqp.test import se.scalablesolutions.akka.amqp.AMQP -import se.scalablesolutions.akka.actor.ActorRegistry -import java.util.concurrent.TimeUnit - object AMQPTest { - def enabled = true + def enabled = false def withCleanEndState(action: => Unit) { try {