From 7a8caacf286841f911e71ed11ecb356613cc48d6 Mon Sep 17 00:00:00 2001 From: momania Date: Tue, 27 Jul 2010 11:06:48 +0200 Subject: [PATCH 1/4] no need for the dummy tests anymore --- akka-amqp/src/test/scala/AMQPConnectionRecoveryTest.scala | 7 ------- .../src/test/scala/AMQPConsumerChannelRecoveryTest.scala | 7 ------- .../test/scala/AMQPConsumerConnectionRecoveryTest.scala | 7 ------- .../test/scala/AMQPConsumerManualAcknowledgeTest.scala | 7 ------- akka-amqp/src/test/scala/AMQPConsumerMessageTest.scala | 7 ------- .../src/test/scala/AMQPProducerChannelRecoveryTest.scala | 7 ------- .../test/scala/AMQPProducerConnectionRecoveryTest.scala | 7 ------- akka-amqp/src/test/scala/AMQPProducerMessageTest.scala | 7 ------- akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala | 8 -------- 9 files changed, 64 deletions(-) diff --git a/akka-amqp/src/test/scala/AMQPConnectionRecoveryTest.scala b/akka-amqp/src/test/scala/AMQPConnectionRecoveryTest.scala index 3bc2cb20dd..a853cd4014 100644 --- a/akka-amqp/src/test/scala/AMQPConnectionRecoveryTest.scala +++ b/akka-amqp/src/test/scala/AMQPConnectionRecoveryTest.scala @@ -49,11 +49,4 @@ class AMQPConnectionRecoveryTest extends JUnitSuite with MustMatchers with Loggi disconnectedLatch.tryAwait(2, TimeUnit.SECONDS) must be(true) } } - - @Test - def dummy { - // amqp tests need local rabbitmq server running, so a disabled by default. - // this dummy test makes sure that the whole test class doesn't fail because of missing tests - assert(true) - } } \ No newline at end of file diff --git a/akka-amqp/src/test/scala/AMQPConsumerChannelRecoveryTest.scala b/akka-amqp/src/test/scala/AMQPConsumerChannelRecoveryTest.scala index 0f6fadfcc4..4c2bdf0126 100644 --- a/akka-amqp/src/test/scala/AMQPConsumerChannelRecoveryTest.scala +++ b/akka-amqp/src/test/scala/AMQPConsumerChannelRecoveryTest.scala @@ -60,11 +60,4 @@ class AMQPConsumerChannelRecoveryTest extends JUnitSuite with MustMatchers with connection.stop } } - - @Test - def dummy { - // amqp tests need local rabbitmq server running, so a disabled by default. - // this dummy test makes sure that the whole test class doesn't fail because of missing tests - assert(true) - } } \ No newline at end of file diff --git a/akka-amqp/src/test/scala/AMQPConsumerConnectionRecoveryTest.scala b/akka-amqp/src/test/scala/AMQPConsumerConnectionRecoveryTest.scala index 9dccd43be8..454bd6ba31 100644 --- a/akka-amqp/src/test/scala/AMQPConsumerConnectionRecoveryTest.scala +++ b/akka-amqp/src/test/scala/AMQPConsumerConnectionRecoveryTest.scala @@ -79,11 +79,4 @@ class AMQPConsumerConnectionRecoveryTest extends JUnitSuite with MustMatchers wi connection.stop } } - - @Test - def dummy { - // amqp tests need local rabbitmq server running, so a disabled by default. - // this dummy test makes sure that the whole test class doesn't fail because of missing tests - assert(true) - } } \ No newline at end of file diff --git a/akka-amqp/src/test/scala/AMQPConsumerManualAcknowledgeTest.scala b/akka-amqp/src/test/scala/AMQPConsumerManualAcknowledgeTest.scala index d48f38afc5..22ecc649a1 100644 --- a/akka-amqp/src/test/scala/AMQPConsumerManualAcknowledgeTest.scala +++ b/akka-amqp/src/test/scala/AMQPConsumerManualAcknowledgeTest.scala @@ -57,11 +57,4 @@ class AMQPConsumerManualAcknowledgeTest extends JUnitSuite with MustMatchers wit connection.stop } } - - @Test - def dummy { - // amqp tests need local rabbitmq server running, so a disabled by default. - // this dummy test makes sure that the whole test class doesn't fail because of missing tests - assert(true) - } } \ No newline at end of file diff --git a/akka-amqp/src/test/scala/AMQPConsumerMessageTest.scala b/akka-amqp/src/test/scala/AMQPConsumerMessageTest.scala index af94b0a515..cbfe447adf 100644 --- a/akka-amqp/src/test/scala/AMQPConsumerMessageTest.scala +++ b/akka-amqp/src/test/scala/AMQPConsumerMessageTest.scala @@ -46,11 +46,4 @@ class AMQPConsumerMessageTest extends JUnitSuite with MustMatchers with Logging connection.stop } } - - @Test - def dummy { - // amqp tests need local rabbitmq server running, so a disabled by default. - // this dummy test makes sure that the whole test class doesn't fail because of missing tests - assert(true) - } } \ No newline at end of file diff --git a/akka-amqp/src/test/scala/AMQPProducerChannelRecoveryTest.scala b/akka-amqp/src/test/scala/AMQPProducerChannelRecoveryTest.scala index 095a21fc86..8cc9e9c867 100644 --- a/akka-amqp/src/test/scala/AMQPProducerChannelRecoveryTest.scala +++ b/akka-amqp/src/test/scala/AMQPProducerChannelRecoveryTest.scala @@ -53,11 +53,4 @@ class AMQPProducerChannelRecoveryTest extends JUnitSuite with MustMatchers with connection.stop } } - - @Test - def dummy { - // amqp tests need local rabbitmq server running, so a disabled by default. - // this dummy test makes sure that the whole test class doesn't fail because of missing tests - assert(true) - } } \ No newline at end of file diff --git a/akka-amqp/src/test/scala/AMQPProducerConnectionRecoveryTest.scala b/akka-amqp/src/test/scala/AMQPProducerConnectionRecoveryTest.scala index 71bc08bdaa..d432e82def 100644 --- a/akka-amqp/src/test/scala/AMQPProducerConnectionRecoveryTest.scala +++ b/akka-amqp/src/test/scala/AMQPProducerConnectionRecoveryTest.scala @@ -52,11 +52,4 @@ class AMQPProducerConnectionRecoveryTest extends JUnitSuite with MustMatchers wi connection.stop } } - - @Test - def dummy { - // amqp tests need local rabbitmq server running, so a disabled by default. - // this dummy test makes sure that the whole test class doesn't fail because of missing tests - assert(true) - } } \ No newline at end of file diff --git a/akka-amqp/src/test/scala/AMQPProducerMessageTest.scala b/akka-amqp/src/test/scala/AMQPProducerMessageTest.scala index ab9bb00e7c..01d7888b9b 100644 --- a/akka-amqp/src/test/scala/AMQPProducerMessageTest.scala +++ b/akka-amqp/src/test/scala/AMQPProducerMessageTest.scala @@ -41,11 +41,4 @@ class AMQPProducerMessageTest extends JUnitSuite with MustMatchers with Logging connection.stop } } - - @Test - def dummy { - // amqp tests need local rabbitmq server running, so a disabled by default. - // this dummy test makes sure that the whole test class doesn't fail because of missing tests - assert(true) - } } \ No newline at end of file diff --git a/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala b/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala index 7dbfb4becd..18586202e9 100644 --- a/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala +++ b/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala @@ -11,7 +11,6 @@ import se.scalablesolutions.akka.amqp._ import se.scalablesolutions.akka.actor.Actor._ import org.scalatest.matchers.MustMatchers import java.util.concurrent.{CountDownLatch, TimeUnit} -import se.scalablesolutions.akka.serialization.Serializer import se.scalablesolutions.akka.amqp.AMQP._ class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging { @@ -61,11 +60,4 @@ class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging connection.stop } } - - @Test - def dummy { - // amqp tests need local rabbitmq server running, so a disabled by default. - // this dummy test makes sure that the whole test class doesn't fail because of missing tests - assert(true) - } } \ No newline at end of file From 17e97ead5ab7990327fda9d7289e119b2dad1941 Mon Sep 17 00:00:00 2001 From: momania Date: Fri, 6 Aug 2010 13:35:44 +0200 Subject: [PATCH 2/4] - made rpc handler reqular function instead of partial function - add queuename as optional parameter for rpc server (for i.e. loadbalancing purposes) --- akka-amqp/src/main/scala/ExampleSession.scala | 8 ++-- akka-amqp/src/main/scala/RpcServerActor.scala | 2 +- .../test/scala/AMQPRpcClientServerTest.scala | 44 +++++++++---------- akka-amqp/src/test/scala/AMQPTest.scala | 2 +- 4 files changed, 27 insertions(+), 29 deletions(-) diff --git a/akka-amqp/src/main/scala/ExampleSession.scala b/akka-amqp/src/main/scala/ExampleSession.scala index 97571e2783..4fa1358a29 100644 --- a/akka-amqp/src/main/scala/ExampleSession.scala +++ b/akka-amqp/src/main/scala/ExampleSession.scala @@ -144,10 +144,10 @@ object ExampleSession { } val rpcServerSerializer = new RpcServerSerializer[String, Int](serverFromBinary, serverToBinary) - val rpcServer = AMQP.newRpcServer[String,Int](connection, exchangeParameters, "rpc.in.key", rpcServerSerializer, { - case "rpc_request" => 3 - case _ => error("unknown request") - }) + def requestHandler(request: String) = 3 + + val rpcServer = AMQP.newRpcServer[String,Int](connection, exchangeParameters, "rpc.in.key", rpcServerSerializer, + requestHandler, queueName = Some("rpc.in.key.queue")) /** Client */ diff --git a/akka-amqp/src/main/scala/RpcServerActor.scala b/akka-amqp/src/main/scala/RpcServerActor.scala index c64ef9058e..99d74d9b56 100644 --- a/akka-amqp/src/main/scala/RpcServerActor.scala +++ b/akka-amqp/src/main/scala/RpcServerActor.scala @@ -8,7 +8,7 @@ import se.scalablesolutions.akka.actor.{ActorRef, Actor} import com.rabbitmq.client.AMQP.BasicProperties import se.scalablesolutions.akka.amqp.AMQP.RpcServerSerializer -class RpcServerActor[I,O](producer: ActorRef, serializer: RpcServerSerializer[I,O], requestHandler: PartialFunction[I, O]) extends Actor { +class RpcServerActor[I,O](producer: ActorRef, serializer: RpcServerSerializer[I,O], requestHandler: I => O) extends Actor { log.info("%s started", this) diff --git a/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala b/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala index dcaec4cd06..c585675098 100644 --- a/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala +++ b/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala @@ -11,11 +11,9 @@ import se.scalablesolutions.akka.amqp._ import se.scalablesolutions.akka.actor.Actor._ import org.scalatest.matchers.MustMatchers import java.util.concurrent.{CountDownLatch, TimeUnit} -import se.scalablesolutions.akka.serialization.Serializer import se.scalablesolutions.akka.amqp.AMQP._ class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging { - @Test def consumerMessage = if (AMQPTest.enabled) { val connection = AMQP.newConnection() @@ -32,31 +30,31 @@ class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging val channelParameters = ChannelParameters(channelCallback = Some(channelCallback)) - val serverFromBinary = new FromBinary[String] { - def fromBinary(bytes: Array[Byte]) = new String(bytes) - } - val serverToBinary = new ToBinary[Int] { - def toBinary(t: Int) = Array(t.toByte) - } - val rpcServerSerializer = new RpcServerSerializer[String, Int](serverFromBinary, serverToBinary) - val rpcServer = AMQP.newRpcServer[String,Int](connection, exchangeParameters, "rpc.routing", rpcServerSerializer, { - case "some_payload" => 3 - case _ => error("unknown request") - }, channelParameters = Some(channelParameters)) + val rpcServerSerializer = new RpcServerSerializer[String, Int]( + new FromBinary[String] { + def fromBinary(bytes: Array[Byte]) = new String(bytes) + }, new ToBinary[Int] { + def toBinary(t: Int) = Array(t.toByte) + }) - val clientToBinary = new ToBinary[String] { - def toBinary(t: String) = t.getBytes - } - val clientFromBinary = new FromBinary[Int] { - def fromBinary(bytes: Array[Byte]) = bytes.head.toInt - } - val rpcClientSerializer = new RpcClientSerializer[String, Int](clientToBinary, clientFromBinary) - val rpcClient = AMQP.newRpcClient[String,Int](connection, exchangeParameters, "rpc.routing", rpcClientSerializer, + def requestHandler(request: String) = 3 + + val rpcServer = AMQP.newRpcServer[String, Int](connection, exchangeParameters, "rpc.routing", rpcServerSerializer, + requestHandler, channelParameters = Some(channelParameters)) + + val rpcClientSerializer = new RpcClientSerializer[String, Int]( + new ToBinary[String] { + def toBinary(t: String) = t.getBytes + }, new FromBinary[Int] { + def fromBinary(bytes: Array[Byte]) = bytes.head.toInt + }) + + val rpcClient = AMQP.newRpcClient[String, Int](connection, exchangeParameters, "rpc.routing", rpcClientSerializer, channelParameters = Some(channelParameters)) - countDown.await(2, TimeUnit.SECONDS) must be (true) + countDown.await(2, TimeUnit.SECONDS) must be(true) val response = rpcClient !! "some_payload" - response must be (Some(3)) + response must be(Some(3)) } finally { connection.stop } diff --git a/akka-amqp/src/test/scala/AMQPTest.scala b/akka-amqp/src/test/scala/AMQPTest.scala index 5ff9157bc5..8cad27ba60 100644 --- a/akka-amqp/src/test/scala/AMQPTest.scala +++ b/akka-amqp/src/test/scala/AMQPTest.scala @@ -5,5 +5,5 @@ package se.scalablesolutions.akka.amqp.test object AMQPTest { - def enabled = false + def enabled = true } From f6d86ed79c021659efac506814a9f3208fa9d82d Mon Sep 17 00:00:00 2001 From: momania Date: Fri, 6 Aug 2010 13:36:18 +0200 Subject: [PATCH 3/4] - move helper object actors in specs companion object to avoid clashes with the server spec (where the helpers have the same name) --- .../ClientInitiatedRemoteActorSpec.scala | 73 ++++++++++--------- 1 file changed, 38 insertions(+), 35 deletions(-) diff --git a/akka-core/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala b/akka-core/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala index e9ed8d4fa5..7ff46ab910 100644 --- a/akka-core/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala +++ b/akka-core/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala @@ -9,54 +9,57 @@ import se.scalablesolutions.akka.dispatch.Dispatchers import se.scalablesolutions.akka.actor.{ActorRef, Actor} import Actor._ -case class Send(actor: Actor) +object ClientInitiatedRemoteActorSpec { + case class Send(actor: Actor) -object RemoteActorSpecActorUnidirectional { - val latch = new CountDownLatch(1) -} -class RemoteActorSpecActorUnidirectional extends Actor { - self.dispatcher = Dispatchers.newThreadBasedDispatcher(self) - - def receive = { - case "OneWay" => - RemoteActorSpecActorUnidirectional.latch.countDown + object RemoteActorSpecActorUnidirectional { + val latch = new CountDownLatch(1) } -} + class RemoteActorSpecActorUnidirectional extends Actor { + self.dispatcher = Dispatchers.newThreadBasedDispatcher(self) -class RemoteActorSpecActorBidirectional extends Actor { - def receive = { - case "Hello" => - self.reply("World") - case "Failure" => - throw new RuntimeException("Expected exception; to test fault-tolerance") + def receive = { + case "OneWay" => + RemoteActorSpecActorUnidirectional.latch.countDown + } } -} -class SendOneWayAndReplyReceiverActor extends Actor { - def receive = { - case "Hello" => - self.reply("World") + class RemoteActorSpecActorBidirectional extends Actor { + def receive = { + case "Hello" => + self.reply("World") + case "Failure" => + throw new RuntimeException("Expected exception; to test fault-tolerance") + } } -} -object SendOneWayAndReplySenderActor { - val latch = new CountDownLatch(1) -} -class SendOneWayAndReplySenderActor extends Actor { - var state: Option[AnyRef] = None - var sendTo: ActorRef = _ - var latch: CountDownLatch = _ + class SendOneWayAndReplyReceiverActor extends Actor { + def receive = { + case "Hello" => + self.reply("World") + } + } - def sendOff = sendTo ! "Hello" + object SendOneWayAndReplySenderActor { + val latch = new CountDownLatch(1) + } + class SendOneWayAndReplySenderActor extends Actor { + var state: Option[AnyRef] = None + var sendTo: ActorRef = _ + var latch: CountDownLatch = _ - def receive = { - case msg: AnyRef => - state = Some(msg) - SendOneWayAndReplySenderActor.latch.countDown + def sendOff = sendTo ! "Hello" + + def receive = { + case msg: AnyRef => + state = Some(msg) + SendOneWayAndReplySenderActor.latch.countDown + } } } class ClientInitiatedRemoteActorSpec extends JUnitSuite { + import ClientInitiatedRemoteActorSpec._ se.scalablesolutions.akka.config.Config.config val HOSTNAME = "localhost" From 09d7cc75eb3c2b45735f29fdaa12bbd1c77d1d08 Mon Sep 17 00:00:00 2001 From: momania Date: Fri, 6 Aug 2010 13:39:24 +0200 Subject: [PATCH 4/4] - forgot the api commit - disable tests again :S --- akka-amqp/src/main/scala/AMQP.scala | 6 ++++-- akka-amqp/src/test/scala/AMQPTest.scala | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/akka-amqp/src/main/scala/AMQP.scala b/akka-amqp/src/main/scala/AMQP.scala index 0bfbd93063..b1e08ae752 100644 --- a/akka-amqp/src/main/scala/AMQP.scala +++ b/akka-amqp/src/main/scala/AMQP.scala @@ -95,13 +95,15 @@ object AMQP { exchangeParameters: ExchangeParameters, routingKey: String, serializer: RpcServerSerializer[I,O], - requestHandler: PartialFunction[I, O], + requestHandler: I => O, + queueName: Option[String] = None, channelParameters: Option[ChannelParameters] = None) = { val producer = newProducer(connection, new ProducerParameters(new ExchangeParameters("", ExchangeType.Direct), channelParameters = channelParameters)) val rpcServer = actorOf(new RpcServerActor[I,O](producer, serializer, requestHandler)) val consumer = newConsumer(connection, new ConsumerParameters(exchangeParameters, routingKey, rpcServer , channelParameters = channelParameters - , selfAcknowledging = false)) + , selfAcknowledging = false + , queueName = queueName)) } diff --git a/akka-amqp/src/test/scala/AMQPTest.scala b/akka-amqp/src/test/scala/AMQPTest.scala index 8cad27ba60..5ff9157bc5 100644 --- a/akka-amqp/src/test/scala/AMQPTest.scala +++ b/akka-amqp/src/test/scala/AMQPTest.scala @@ -5,5 +5,5 @@ package se.scalablesolutions.akka.amqp.test object AMQPTest { - def enabled = true + def enabled = false }