From be4be45a8d3728bb51ee0bbb7201b4417e286556 Mon Sep 17 00:00:00 2001 From: momania Date: Fri, 6 Aug 2010 17:31:37 +0200 Subject: [PATCH] - moved all into package folder structure - added simple protobuf based rpc convenience --- .../scalablesolutions/akka/amqp}/AMQP.scala | 53 ++------- .../akka/amqp}/AMQPMessage.scala | 0 .../akka/amqp}/ConsumerActor.scala | 0 .../akka/amqp}/ExampleSession.scala | 6 +- .../akka/amqp}/ExchangeType.scala | 0 .../amqp}/FaultTolerantChannelActor.scala | 0 .../amqp}/FaultTolerantConnectionActor.scala | 0 .../akka/amqp}/ProducerActor.scala | 0 .../scalablesolutions/akka/amqp/rpc/RPC.scala | 110 ++++++++++++++++++ .../akka/amqp/rpc}/RpcClientActor.scala | 6 +- .../akka/amqp/rpc}/RpcServerActor.scala | 2 +- .../test}/AMQPConnectionRecoveryTest.scala | 17 +-- .../AMQPConsumerChannelRecoveryTest.scala | 18 +-- .../AMQPConsumerConnectionRecoveryTest.scala | 20 +--- .../AMQPConsumerManualAcknowledgeTest.scala | 18 +-- .../amqp/test}/AMQPConsumerMessageTest.scala | 18 +-- .../AMQPProducerChannelRecoveryTest.scala | 18 +-- .../AMQPProducerConnectionRecoveryTest.scala | 18 +-- .../amqp/test}/AMQPProducerMessageTest.scala | 18 +-- .../amqp/test}/AMQPRpcClientServerTest.scala | 25 ++-- .../akka/amqp/test/AMQPRpcProtobufTest.scala | 35 ++++++ .../akka/amqp/test}/AMQPTest.scala | 2 +- project/build/AkkaProject.scala | 50 ++++---- 23 files changed, 238 insertions(+), 196 deletions(-) rename akka-amqp/src/main/scala/{ => se/scalablesolutions/akka/amqp}/AMQP.scala (69%) rename akka-amqp/src/main/scala/{ => se/scalablesolutions/akka/amqp}/AMQPMessage.scala (100%) rename akka-amqp/src/main/scala/{ => se/scalablesolutions/akka/amqp}/ConsumerActor.scala (100%) rename akka-amqp/src/main/scala/{ => se/scalablesolutions/akka/amqp}/ExampleSession.scala (94%) rename akka-amqp/src/main/scala/{ => se/scalablesolutions/akka/amqp}/ExchangeType.scala (100%) rename akka-amqp/src/main/scala/{ => se/scalablesolutions/akka/amqp}/FaultTolerantChannelActor.scala (100%) rename akka-amqp/src/main/scala/{ => se/scalablesolutions/akka/amqp}/FaultTolerantConnectionActor.scala (100%) rename akka-amqp/src/main/scala/{ => se/scalablesolutions/akka/amqp}/ProducerActor.scala (100%) create mode 100644 akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala rename akka-amqp/src/main/scala/{ => se/scalablesolutions/akka/amqp/rpc}/RpcClientActor.scala (89%) rename akka-amqp/src/main/scala/{ => se/scalablesolutions/akka/amqp/rpc}/RpcServerActor.scala (94%) rename akka-amqp/src/test/scala/{ => se/scalablesolutions/akka/amqp/test}/AMQPConnectionRecoveryTest.scala (85%) rename akka-amqp/src/test/scala/{ => se/scalablesolutions/akka/amqp/test}/AMQPConsumerChannelRecoveryTest.scala (88%) rename akka-amqp/src/test/scala/{ => se/scalablesolutions/akka/amqp/test}/AMQPConsumerConnectionRecoveryTest.scala (88%) rename akka-amqp/src/test/scala/{ => se/scalablesolutions/akka/amqp/test}/AMQPConsumerManualAcknowledgeTest.scala (88%) rename akka-amqp/src/test/scala/{ => se/scalablesolutions/akka/amqp/test}/AMQPConsumerMessageTest.scala (86%) rename akka-amqp/src/test/scala/{ => se/scalablesolutions/akka/amqp/test}/AMQPProducerChannelRecoveryTest.scala (86%) rename akka-amqp/src/test/scala/{ => se/scalablesolutions/akka/amqp/test}/AMQPProducerConnectionRecoveryTest.scala (86%) rename akka-amqp/src/test/scala/{ => se/scalablesolutions/akka/amqp/test}/AMQPProducerMessageTest.scala (84%) rename akka-amqp/src/test/scala/{ => se/scalablesolutions/akka/amqp/test}/AMQPRpcClientServerTest.scala (77%) create mode 100644 akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcProtobufTest.scala rename akka-amqp/src/test/scala/{ => se/scalablesolutions/akka/amqp/test}/AMQPTest.scala (87%) diff --git a/akka-amqp/src/main/scala/AMQP.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala similarity index 69% rename from akka-amqp/src/main/scala/AMQP.scala rename to akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala index b1e08ae752..0823f306cd 100644 --- a/akka-amqp/src/main/scala/AMQP.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala @@ -1,15 +1,16 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - package se.scalablesolutions.akka.amqp -import se.scalablesolutions.akka.actor.{Actor, ActorRef} -import se.scalablesolutions.akka.actor.Actor._ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + import se.scalablesolutions.akka.config.OneForOneStrategy import com.rabbitmq.client.{ReturnListener, ShutdownListener, ConnectionFactory} import java.lang.IllegalArgumentException import se.scalablesolutions.akka.util.Logging +import se.scalablesolutions.akka.actor.{Actor, ActorRef} +import Actor._ + /** * AMQP Actor API. Implements Connection, Producer and Consumer materialized as Actors. * @@ -80,33 +81,6 @@ object AMQP { consumer } - def newRpcClient[O,I](connection: ActorRef, - exchangeParameters: ExchangeParameters, - routingKey: String, - serializer: RpcClientSerializer[O,I], - channelParameters: Option[ChannelParameters] = None): ActorRef = { - val rpcActor: ActorRef = actorOf(new RpcClientActor[O,I](exchangeParameters, routingKey, serializer, channelParameters)) - connection.startLink(rpcActor) - rpcActor ! Start - rpcActor - } - - def newRpcServer[I,O](connection: ActorRef, - exchangeParameters: ExchangeParameters, - routingKey: String, - serializer: RpcServerSerializer[I,O], - requestHandler: I => O, - queueName: Option[String] = None, - channelParameters: Option[ChannelParameters] = None) = { - val producer = newProducer(connection, new ProducerParameters(new ExchangeParameters("", ExchangeType.Direct), channelParameters = channelParameters)) - val rpcServer = actorOf(new RpcServerActor[I,O](producer, serializer, requestHandler)) - val consumer = newConsumer(connection, new ConsumerParameters(exchangeParameters, routingKey, rpcServer - , channelParameters = channelParameters - , selfAcknowledging = false - , queueName = queueName)) - - } - private val supervisor = new AMQPSupervisor class AMQPSupervisor extends Logging { @@ -129,17 +103,4 @@ object AMQP { connectionActor } } - - trait FromBinary[T] { - def fromBinary(bytes: Array[Byte]): T - } - - trait ToBinary[T] { - def toBinary(t: T): Array[Byte] - } - - - case class RpcClientSerializer[O,I](toBinary: ToBinary[O], fromBinary: FromBinary[I]) - - case class RpcServerSerializer[I,O](fromBinary: FromBinary[I], toBinary: ToBinary[O]) } diff --git a/akka-amqp/src/main/scala/AMQPMessage.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQPMessage.scala similarity index 100% rename from akka-amqp/src/main/scala/AMQPMessage.scala rename to akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQPMessage.scala diff --git a/akka-amqp/src/main/scala/ConsumerActor.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ConsumerActor.scala similarity index 100% rename from akka-amqp/src/main/scala/ConsumerActor.scala rename to akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ConsumerActor.scala diff --git a/akka-amqp/src/main/scala/ExampleSession.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala similarity index 94% rename from akka-amqp/src/main/scala/ExampleSession.scala rename to akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala index 4fa1358a29..f2ae5e8295 100644 --- a/akka-amqp/src/main/scala/ExampleSession.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala @@ -4,6 +4,8 @@ package se.scalablesolutions.akka.amqp +import rpc.RPC +import rpc.RPC.{RpcClientSerializer, RpcServerSerializer, ToBinary, FromBinary} import se.scalablesolutions.akka.actor.{Actor, ActorRegistry} import Actor._ import java.util.concurrent.{CountDownLatch, TimeUnit} @@ -146,7 +148,7 @@ object ExampleSession { def requestHandler(request: String) = 3 - val rpcServer = AMQP.newRpcServer[String,Int](connection, exchangeParameters, "rpc.in.key", rpcServerSerializer, + val rpcServer = RPC.newRpcServer[String,Int](connection, exchangeParameters, "rpc.in.key", rpcServerSerializer, requestHandler, queueName = Some("rpc.in.key.queue")) @@ -159,7 +161,7 @@ object ExampleSession { } val rpcClientSerializer = new RpcClientSerializer[String, Int](clientToBinary, clientFromBinary) - val rpcClient = AMQP.newRpcClient[String,Int](connection, exchangeParameters, "rpc.in.key", rpcClientSerializer) + val rpcClient = RPC.newRpcClient[String,Int](connection, exchangeParameters, "rpc.in.key", rpcClientSerializer) val response = (rpcClient !! "rpc_request") log.info("Response: " + response) diff --git a/akka-amqp/src/main/scala/ExchangeType.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExchangeType.scala similarity index 100% rename from akka-amqp/src/main/scala/ExchangeType.scala rename to akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExchangeType.scala diff --git a/akka-amqp/src/main/scala/FaultTolerantChannelActor.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantChannelActor.scala similarity index 100% rename from akka-amqp/src/main/scala/FaultTolerantChannelActor.scala rename to akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantChannelActor.scala diff --git a/akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantConnectionActor.scala similarity index 100% rename from akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala rename to akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantConnectionActor.scala diff --git a/akka-amqp/src/main/scala/ProducerActor.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ProducerActor.scala similarity index 100% rename from akka-amqp/src/main/scala/ProducerActor.scala rename to akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ProducerActor.scala diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala new file mode 100644 index 0000000000..a711a01cc5 --- /dev/null +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala @@ -0,0 +1,110 @@ +package se.scalablesolutions.akka.amqp.rpc + +import se.scalablesolutions.akka.amqp.AMQP._ +import com.google.protobuf.Message +import se.scalablesolutions.akka.actor.{Actor, ActorRef} +import Actor._ +import se.scalablesolutions.akka.amqp._ + +object RPC { + + def newRpcClient[O, I](connection: ActorRef, + exchangeParameters: ExchangeParameters, + routingKey: String, + serializer: RpcClientSerializer[O, I], + channelParameters: Option[ChannelParameters] = None): ActorRef = { + val rpcActor: ActorRef = actorOf(new RpcClientActor[O, I](exchangeParameters, routingKey, serializer, channelParameters)) + connection.startLink(rpcActor) + rpcActor ! Start + rpcActor + } + + def newRpcServer[I, O](connection: ActorRef, + exchangeParameters: ExchangeParameters, + routingKey: String, + serializer: RpcServerSerializer[I, O], + requestHandler: I => O, + queueName: Option[String] = None, + channelParameters: Option[ChannelParameters] = None) = { + val producer = newProducer(connection, new ProducerParameters(new ExchangeParameters("", ExchangeType.Direct), channelParameters = channelParameters)) + val rpcServer = actorOf(new RpcServerActor[I, O](producer, serializer, requestHandler)) + val consumer = newConsumer(connection, new ConsumerParameters(exchangeParameters, routingKey, rpcServer + , channelParameters = channelParameters + , selfAcknowledging = false + , queueName = queueName)) + + } + + trait FromBinary[T] { + def fromBinary(bytes: Array[Byte]): T + } + + trait ToBinary[T] { + def toBinary(t: T): Array[Byte] + } + + + case class RpcClientSerializer[O, I](toBinary: ToBinary[O], fromBinary: FromBinary[I]) + + case class RpcServerSerializer[I, O](fromBinary: FromBinary[I], toBinary: ToBinary[O]) + + + /** + * RPC convenience + */ + trait RpcClient[O, I] { + def callService(request: O, timeout: Long = 5000): Option[I] + } + + private class RpcServiceClient[O, I](client: ActorRef) extends RpcClient[O, I] { + def callService(request: O, timeout: Long = 5000): Option[I] = { + (client.!!(request, timeout)).as[I] + } + } + + private val ARRAY_OF_BYTE_ARRAY = Array[Class[_]](classOf[Array[Byte]]) + + def startProtobufServer[I <: Message, O <: Message]( + connection: ActorRef, serviceName: String, requestHandler: I => O)(implicit manifest: Manifest[I]) = { + + val serializer = new RpcServerSerializer[I, O]( + new FromBinary[I] { + def fromBinary(bytes: Array[Byte]): I = { + createProtobufFromBytes[I](bytes) + } + }, new ToBinary[O] { + def toBinary(t: O) = t.toByteArray + }) + + val exchangeParameters = new ExchangeParameters(serviceName, ExchangeType.Topic) + val routingKey = "%s.request".format(serviceName) + val queueName = "%s.in".format(routingKey) + + newRpcServer[I, O](connection, exchangeParameters, routingKey, serializer, requestHandler, + queueName = Some(queueName)) + } + + def startProtobufClient[O <: Message, I <: Message]( + connection: ActorRef, serviceName: String)(implicit manifest: Manifest[I]): RpcClient[O, I] = { + + val serializer = new RpcClientSerializer[O, I]( + new ToBinary[O] { + def toBinary(t: O) = t.toByteArray + }, new FromBinary[I] { + def fromBinary(bytes: Array[Byte]): I = { + createProtobufFromBytes[I](bytes) + } + }) + + val exchangeParameters = new ExchangeParameters(serviceName, ExchangeType.Topic) + val routingKey = "%s.request".format(serviceName) + val queueName = "%s.in".format(routingKey) + + val client = newRpcClient[O, I](connection, exchangeParameters, routingKey, serializer) + new RpcServiceClient[O, I](client) + } + + private def createProtobufFromBytes[I](bytes: Array[Byte])(implicit manifest: Manifest[I]): I = { + manifest.erasure.getDeclaredMethod("parseFrom", ARRAY_OF_BYTE_ARRAY: _*).invoke(null, bytes).asInstanceOf[I] + } +} \ No newline at end of file diff --git a/akka-amqp/src/main/scala/RpcClientActor.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RpcClientActor.scala similarity index 89% rename from akka-amqp/src/main/scala/RpcClientActor.scala rename to akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RpcClientActor.scala index 2935982a67..40665194fc 100644 --- a/akka-amqp/src/main/scala/RpcClientActor.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RpcClientActor.scala @@ -4,11 +4,9 @@ package se.scalablesolutions.akka.amqp -import se.scalablesolutions.akka.serialization.Serializer -import se.scalablesolutions.akka.amqp.AMQP.{ChannelParameters, ExchangeParameters} - import com.rabbitmq.client.{Channel, RpcClient} -import se.scalablesolutions.akka.amqp.AMQP.{RpcClientSerializer, ChannelParameters, ExchangeParameters} +import rpc.RPC.RpcClientSerializer +import se.scalablesolutions.akka.amqp.AMQP.{ChannelParameters, ExchangeParameters} class RpcClientActor[I,O](exchangeParameters: ExchangeParameters, routingKey: String, diff --git a/akka-amqp/src/main/scala/RpcServerActor.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RpcServerActor.scala similarity index 94% rename from akka-amqp/src/main/scala/RpcServerActor.scala rename to akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RpcServerActor.scala index 99d74d9b56..7c1cc01d01 100644 --- a/akka-amqp/src/main/scala/RpcServerActor.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RpcServerActor.scala @@ -4,9 +4,9 @@ package se.scalablesolutions.akka.amqp +import rpc.RPC.RpcServerSerializer import se.scalablesolutions.akka.actor.{ActorRef, Actor} import com.rabbitmq.client.AMQP.BasicProperties -import se.scalablesolutions.akka.amqp.AMQP.RpcServerSerializer class RpcServerActor[I,O](producer: ActorRef, serializer: RpcServerSerializer[I,O], requestHandler: I => O) extends Actor { diff --git a/akka-amqp/src/test/scala/AMQPConnectionRecoveryTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConnectionRecoveryTest.scala similarity index 85% rename from akka-amqp/src/test/scala/AMQPConnectionRecoveryTest.scala rename to akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConnectionRecoveryTest.scala index c1af35546a..ef2ee5b80f 100644 --- a/akka-amqp/src/test/scala/AMQPConnectionRecoveryTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConnectionRecoveryTest.scala @@ -1,12 +1,9 @@ +package se.scalablesolutions.akka.amqp.test + /** * Copyright (C) 2009-2010 Scalable Solutions AB */ -package se.scalablesolutions.akka.amqp.test - -import se.scalablesolutions.akka.util.Logging -import org.scalatest.junit.JUnitSuite -import org.junit.Test import java.util.concurrent.TimeUnit import se.scalablesolutions.akka.actor.{Actor, ActorRef} import org.multiverse.api.latches.StandardLatch @@ -14,8 +11,10 @@ import com.rabbitmq.client.ShutdownSignalException import se.scalablesolutions.akka.amqp._ import se.scalablesolutions.akka.amqp.AMQP.ConnectionParameters import org.scalatest.matchers.MustMatchers +import org.scalatest.junit.JUnitSuite +import org.junit.Test -class AMQPConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging { +class AMQPConnectionRecoveryTest extends JUnitSuite with MustMatchers { @Test def connectionAndRecovery = if (AMQPTest.enabled) { @@ -50,10 +49,4 @@ class AMQPConnectionRecoveryTest extends JUnitSuite with MustMatchers with Loggi } } - @Test - def dummy { - // amqp tests need local rabbitmq server running, so a disabled by default. - // this dummy test makes sure that the whole test class doesn't fail because of missing tests - assert(true) - } } diff --git a/akka-amqp/src/test/scala/AMQPConsumerChannelRecoveryTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerChannelRecoveryTest.scala similarity index 88% rename from akka-amqp/src/test/scala/AMQPConsumerChannelRecoveryTest.scala rename to akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerChannelRecoveryTest.scala index a0b44f4739..3c21fa36af 100644 --- a/akka-amqp/src/test/scala/AMQPConsumerChannelRecoveryTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerChannelRecoveryTest.scala @@ -1,12 +1,9 @@ +package se.scalablesolutions.akka.amqp.test + /** * Copyright (C) 2009-2010 Scalable Solutions AB */ -package se.scalablesolutions.akka.amqp.test - -import se.scalablesolutions.akka.util.Logging -import org.scalatest.junit.JUnitSuite -import se.scalablesolutions.akka.actor.Actor._ import org.multiverse.api.latches.StandardLatch import com.rabbitmq.client.ShutdownSignalException import se.scalablesolutions.akka.amqp._ @@ -15,8 +12,10 @@ import java.util.concurrent.TimeUnit import se.scalablesolutions.akka.actor.ActorRef import org.junit.Test import se.scalablesolutions.akka.amqp.AMQP._ +import org.scalatest.junit.JUnitSuite +import se.scalablesolutions.akka.actor.Actor._ -class AMQPConsumerChannelRecoveryTest extends JUnitSuite with MustMatchers with Logging { +class AMQPConsumerChannelRecoveryTest extends JUnitSuite with MustMatchers { @Test def consumerChannelRecovery = if (AMQPTest.enabled) { @@ -60,11 +59,4 @@ class AMQPConsumerChannelRecoveryTest extends JUnitSuite with MustMatchers with connection.stop } } - - @Test - def dummy { - // amqp tests need local rabbitmq server running, so a disabled by default. - // this dummy test makes sure that the whole test class doesn't fail because of missing tests - assert(true) - } } diff --git a/akka-amqp/src/test/scala/AMQPConsumerConnectionRecoveryTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerConnectionRecoveryTest.scala similarity index 88% rename from akka-amqp/src/test/scala/AMQPConsumerConnectionRecoveryTest.scala rename to akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerConnectionRecoveryTest.scala index bf4885fea5..301489bd8f 100644 --- a/akka-amqp/src/test/scala/AMQPConsumerConnectionRecoveryTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerConnectionRecoveryTest.scala @@ -1,22 +1,21 @@ +package se.scalablesolutions.akka.amqp.test + /** * Copyright (C) 2009-2010 Scalable Solutions AB */ -package se.scalablesolutions.akka.amqp.test - -import se.scalablesolutions.akka.util.Logging -import org.scalatest.junit.JUnitSuite -import se.scalablesolutions.akka.actor.Actor._ import org.multiverse.api.latches.StandardLatch import com.rabbitmq.client.ShutdownSignalException import se.scalablesolutions.akka.amqp._ import org.scalatest.matchers.MustMatchers import java.util.concurrent.TimeUnit -import se.scalablesolutions.akka.actor.ActorRef import org.junit.Test import se.scalablesolutions.akka.amqp.AMQP._ +import org.scalatest.junit.JUnitSuite +import se.scalablesolutions.akka.actor.{Actor, ActorRef} +import Actor._ -class AMQPConsumerConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging { +class AMQPConsumerConnectionRecoveryTest extends JUnitSuite with MustMatchers { @Test def consumerConnectionRecovery = if (AMQPTest.enabled) { @@ -79,11 +78,4 @@ class AMQPConsumerConnectionRecoveryTest extends JUnitSuite with MustMatchers wi connection.stop } } - - @Test - def dummy { - // amqp tests need local rabbitmq server running, so a disabled by default. - // this dummy test makes sure that the whole test class doesn't fail because of missing tests - assert(true) - } } diff --git a/akka-amqp/src/test/scala/AMQPConsumerManualAcknowledgeTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerManualAcknowledgeTest.scala similarity index 88% rename from akka-amqp/src/test/scala/AMQPConsumerManualAcknowledgeTest.scala rename to akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerManualAcknowledgeTest.scala index 2dc4ee939b..a18db40d5f 100644 --- a/akka-amqp/src/test/scala/AMQPConsumerManualAcknowledgeTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerManualAcknowledgeTest.scala @@ -1,12 +1,9 @@ +package se.scalablesolutions.akka.amqp.test + /** * Copyright (C) 2009-2010 Scalable Solutions AB */ -package se.scalablesolutions.akka.amqp.test - -import se.scalablesolutions.akka.util.Logging -import org.scalatest.junit.JUnitSuite -import org.multiverse.api.latches.StandardLatch import se.scalablesolutions.akka.actor.Actor._ import org.scalatest.matchers.MustMatchers import se.scalablesolutions.akka.amqp._ @@ -14,8 +11,10 @@ import org.junit.Test import se.scalablesolutions.akka.actor.ActorRef import java.util.concurrent.{CountDownLatch, TimeUnit} import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ConsumerParameters, ChannelParameters, ProducerParameters} +import org.multiverse.api.latches.StandardLatch +import org.scalatest.junit.JUnitSuite -class AMQPConsumerManualAcknowledgeTest extends JUnitSuite with MustMatchers with Logging { +class AMQPConsumerManualAcknowledgeTest extends JUnitSuite with MustMatchers { @Test def consumerMessageManualAcknowledge = if (AMQPTest.enabled) { @@ -57,11 +56,4 @@ class AMQPConsumerManualAcknowledgeTest extends JUnitSuite with MustMatchers wit connection.stop } } - - @Test - def dummy { - // amqp tests need local rabbitmq server running, so a disabled by default. - // this dummy test makes sure that the whole test class doesn't fail because of missing tests - assert(true) - } } diff --git a/akka-amqp/src/test/scala/AMQPConsumerMessageTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerMessageTest.scala similarity index 86% rename from akka-amqp/src/test/scala/AMQPConsumerMessageTest.scala rename to akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerMessageTest.scala index 5d34f867d6..9eb7aa9158 100644 --- a/akka-amqp/src/test/scala/AMQPConsumerMessageTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerMessageTest.scala @@ -1,20 +1,19 @@ +package se.scalablesolutions.akka.amqp.test + /** * Copyright (C) 2009-2010 Scalable Solutions AB */ -package se.scalablesolutions.akka.amqp.test - -import se.scalablesolutions.akka.util.Logging -import org.scalatest.junit.JUnitSuite -import org.junit.Test import se.scalablesolutions.akka.amqp._ import org.multiverse.api.latches.StandardLatch import se.scalablesolutions.akka.actor.Actor._ import org.scalatest.matchers.MustMatchers import java.util.concurrent.{CountDownLatch, TimeUnit} import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ConsumerParameters, ChannelParameters, ProducerParameters} +import org.scalatest.junit.JUnitSuite +import org.junit.Test -class AMQPConsumerMessageTest extends JUnitSuite with MustMatchers with Logging { +class AMQPConsumerMessageTest extends JUnitSuite with MustMatchers { @Test def consumerMessage = if (AMQPTest.enabled) { @@ -46,11 +45,4 @@ class AMQPConsumerMessageTest extends JUnitSuite with MustMatchers with Logging connection.stop } } - - @Test - def dummy { - // amqp tests need local rabbitmq server running, so a disabled by default. - // this dummy test makes sure that the whole test class doesn't fail because of missing tests - assert(true) - } } diff --git a/akka-amqp/src/test/scala/AMQPProducerChannelRecoveryTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerChannelRecoveryTest.scala similarity index 86% rename from akka-amqp/src/test/scala/AMQPProducerChannelRecoveryTest.scala rename to akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerChannelRecoveryTest.scala index 26b2d78393..3aec05c122 100644 --- a/akka-amqp/src/test/scala/AMQPProducerChannelRecoveryTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerChannelRecoveryTest.scala @@ -1,12 +1,9 @@ +package se.scalablesolutions.akka.amqp.test + /** * Copyright (C) 2009-2010 Scalable Solutions AB */ -package se.scalablesolutions.akka.amqp.test - -import se.scalablesolutions.akka.util.Logging -import org.scalatest.junit.JUnitSuite -import org.junit.Test import java.util.concurrent.TimeUnit import se.scalablesolutions.akka.actor.{Actor, ActorRef} import org.multiverse.api.latches.StandardLatch @@ -14,8 +11,10 @@ import com.rabbitmq.client.ShutdownSignalException import se.scalablesolutions.akka.amqp._ import org.scalatest.matchers.MustMatchers import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ChannelParameters, ProducerParameters, ConnectionParameters} +import org.scalatest.junit.JUnitSuite +import org.junit.Test -class AMQPProducerChannelRecoveryTest extends JUnitSuite with MustMatchers with Logging { +class AMQPProducerChannelRecoveryTest extends JUnitSuite with MustMatchers { @Test def producerChannelRecovery = if (AMQPTest.enabled) { @@ -53,11 +52,4 @@ class AMQPProducerChannelRecoveryTest extends JUnitSuite with MustMatchers with connection.stop } } - - @Test - def dummy { - // amqp tests need local rabbitmq server running, so a disabled by default. - // this dummy test makes sure that the whole test class doesn't fail because of missing tests - assert(true) - } } diff --git a/akka-amqp/src/test/scala/AMQPProducerConnectionRecoveryTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerConnectionRecoveryTest.scala similarity index 86% rename from akka-amqp/src/test/scala/AMQPProducerConnectionRecoveryTest.scala rename to akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerConnectionRecoveryTest.scala index fe8259b208..b1a4cc1027 100644 --- a/akka-amqp/src/test/scala/AMQPProducerConnectionRecoveryTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerConnectionRecoveryTest.scala @@ -1,12 +1,9 @@ +package se.scalablesolutions.akka.amqp.test + /** * Copyright (C) 2009-2010 Scalable Solutions AB */ -package se.scalablesolutions.akka.amqp.test - -import se.scalablesolutions.akka.util.Logging -import org.scalatest.junit.JUnitSuite -import org.junit.Test import java.util.concurrent.TimeUnit import se.scalablesolutions.akka.actor.{Actor, ActorRef} import org.multiverse.api.latches.StandardLatch @@ -14,8 +11,10 @@ import com.rabbitmq.client.ShutdownSignalException import se.scalablesolutions.akka.amqp._ import org.scalatest.matchers.MustMatchers import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ChannelParameters, ProducerParameters, ConnectionParameters} +import org.scalatest.junit.JUnitSuite +import org.junit.Test -class AMQPProducerConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging { +class AMQPProducerConnectionRecoveryTest extends JUnitSuite with MustMatchers { @Test def producerConnectionRecovery = if (AMQPTest.enabled) { @@ -52,11 +51,4 @@ class AMQPProducerConnectionRecoveryTest extends JUnitSuite with MustMatchers wi connection.stop } } - - @Test - def dummy { - // amqp tests need local rabbitmq server running, so a disabled by default. - // this dummy test makes sure that the whole test class doesn't fail because of missing tests - assert(true) - } } diff --git a/akka-amqp/src/test/scala/AMQPProducerMessageTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerMessageTest.scala similarity index 84% rename from akka-amqp/src/test/scala/AMQPProducerMessageTest.scala rename to akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerMessageTest.scala index 5b19df351f..587f03ec92 100644 --- a/akka-amqp/src/test/scala/AMQPProducerMessageTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProducerMessageTest.scala @@ -1,12 +1,9 @@ +package se.scalablesolutions.akka.amqp.test + /** * Copyright (C) 2009-2010 Scalable Solutions AB */ -package se.scalablesolutions.akka.amqp.test - -import se.scalablesolutions.akka.util.Logging -import org.scalatest.junit.JUnitSuite -import org.junit.Test import java.util.concurrent.TimeUnit import se.scalablesolutions.akka.actor.ActorRef import org.multiverse.api.latches.StandardLatch @@ -16,8 +13,10 @@ import com.rabbitmq.client.AMQP.BasicProperties import java.lang.String import org.scalatest.matchers.MustMatchers import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ProducerParameters} +import org.scalatest.junit.JUnitSuite +import org.junit.Test -class AMQPProducerMessageTest extends JUnitSuite with MustMatchers with Logging { +class AMQPProducerMessageTest extends JUnitSuite with MustMatchers { @Test def producerMessage = if (AMQPTest.enabled) { @@ -41,11 +40,4 @@ class AMQPProducerMessageTest extends JUnitSuite with MustMatchers with Logging connection.stop } } - - @Test - def dummy { - // amqp tests need local rabbitmq server running, so a disabled by default. - // this dummy test makes sure that the whole test class doesn't fail because of missing tests - assert(true) - } } diff --git a/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcClientServerTest.scala similarity index 77% rename from akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala rename to akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcClientServerTest.scala index c585675098..bf0ee6610d 100644 --- a/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcClientServerTest.scala @@ -1,19 +1,21 @@ +package se.scalablesolutions.akka.amqp.test + /** * Copyright (C) 2009-2010 Scalable Solutions AB */ -package se.scalablesolutions.akka.amqp.test - -import se.scalablesolutions.akka.util.Logging -import org.scalatest.junit.JUnitSuite -import org.junit.Test import se.scalablesolutions.akka.amqp._ +import rpc.RPC +import rpc.RPC.{RpcClientSerializer, RpcServerSerializer, FromBinary, ToBinary} import se.scalablesolutions.akka.actor.Actor._ import org.scalatest.matchers.MustMatchers import java.util.concurrent.{CountDownLatch, TimeUnit} import se.scalablesolutions.akka.amqp.AMQP._ +import org.scalatest.junit.JUnitSuite +import org.junit.Test -class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging { +class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers { + @Test def consumerMessage = if (AMQPTest.enabled) { val connection = AMQP.newConnection() @@ -39,7 +41,7 @@ class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging def requestHandler(request: String) = 3 - val rpcServer = AMQP.newRpcServer[String, Int](connection, exchangeParameters, "rpc.routing", rpcServerSerializer, + val rpcServer = RPC.newRpcServer[String, Int](connection, exchangeParameters, "rpc.routing", rpcServerSerializer, requestHandler, channelParameters = Some(channelParameters)) val rpcClientSerializer = new RpcClientSerializer[String, Int]( @@ -49,7 +51,7 @@ class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging def fromBinary(bytes: Array[Byte]) = bytes.head.toInt }) - val rpcClient = AMQP.newRpcClient[String, Int](connection, exchangeParameters, "rpc.routing", rpcClientSerializer, + val rpcClient = RPC.newRpcClient[String, Int](connection, exchangeParameters, "rpc.routing", rpcClientSerializer, channelParameters = Some(channelParameters)) countDown.await(2, TimeUnit.SECONDS) must be(true) @@ -59,11 +61,4 @@ class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging connection.stop } } - - @Test - def dummy { - // amqp tests need local rabbitmq server running, so a disabled by default. - // this dummy test makes sure that the whole test class doesn't fail because of missing tests - assert(true) - } } diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcProtobufTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcProtobufTest.scala new file mode 100644 index 0000000000..b0ca3f8ffe --- /dev/null +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcProtobufTest.scala @@ -0,0 +1,35 @@ +package se.scalablesolutions.akka.amqp.test + +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ +import org.scalatest.matchers.MustMatchers +import org.scalatest.junit.JUnitSuite +import se.scalablesolutions.akka.amqp.AMQP +import se.scalablesolutions.akka.remote.protocol.RemoteProtocol.AddressProtocol +import org.junit.Test +import se.scalablesolutions.akka.amqp.rpc.RPC + +class AMQPRpcProtobufTest extends JUnitSuite with MustMatchers { + + @Test + def consumerMessage = if (AMQPTest.enabled) { + + val connection = AMQP.newConnection() + + RPC.startProtobufServer(connection, "protoservice", requestHandler) + + val protobufClient = RPC.startProtobufClient[AddressProtocol, AddressProtocol](connection, "protoservice") + + val request = AddressProtocol.newBuilder.setHostname("testhost").setPort(4321).build + + protobufClient.callService(request) match { + case Some(response) => assert(response.getHostname == request.getHostname.reverse) + case None => fail("no response") + } + } + + def requestHandler(request: AddressProtocol): AddressProtocol = { + AddressProtocol.newBuilder.setHostname(request.getHostname.reverse).setPort(request.getPort).build + } +} \ No newline at end of file diff --git a/akka-amqp/src/test/scala/AMQPTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPTest.scala similarity index 87% rename from akka-amqp/src/test/scala/AMQPTest.scala rename to akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPTest.scala index 5ff9157bc5..8cad27ba60 100644 --- a/akka-amqp/src/test/scala/AMQPTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPTest.scala @@ -5,5 +5,5 @@ package se.scalablesolutions.akka.amqp.test object AMQPTest { - def enabled = false + def enabled = true } diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 02f72d7f0e..7e3477027f 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -41,15 +41,15 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { // ------------------------------------------------------------------------------------------------------------------- object Repositories { - lazy val AkkaRepo = MavenRepository("Akka Repository", "http://scalablesolutions.se/akka/repository") - lazy val CodehausSnapshotRepo = MavenRepository("Codehaus Snapshots", "http://snapshots.repository.codehaus.org") +// lazy val AkkaRepo = MavenRepository("Akka Repository", "http://scalablesolutions.se/akka/repository") +// lazy val CodehausSnapshotRepo = MavenRepository("Codehaus Snapshots", "http://snapshots.repository.codehaus.org") lazy val EmbeddedRepo = MavenRepository("Embedded Repo", (info.projectPath / "embedded-repo").asURL.toString) - lazy val FusesourceSnapshotRepo = MavenRepository("Fusesource Snapshots", "http://repo.fusesource.com/nexus/content/repositories/snapshots") - lazy val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/") - lazy val JBossRepo = MavenRepository("JBoss Repo", "https://repository.jboss.org/nexus/content/groups/public/") - lazy val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2") - lazy val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases") - lazy val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo") +// lazy val FusesourceSnapshotRepo = MavenRepository("Fusesource Snapshots", "http://repo.fusesource.com/nexus/content/repositories/snapshots") +// lazy val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/") +// lazy val JBossRepo = MavenRepository("JBoss Repo", "https://repository.jboss.org/nexus/content/groups/public/") +// lazy val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2") +// lazy val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases") +// lazy val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo") } // ------------------------------------------------------------------------------------------------------------------- @@ -60,23 +60,26 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { // ------------------------------------------------------------------------------------------------------------------- import Repositories._ - lazy val atmosphereModuleConfig = ModuleConfiguration("org.atmosphere", SonatypeSnapshotRepo) - lazy val grizzlyModuleConfig = ModuleConfiguration("com.sun.grizzly", JavaNetRepo) - lazy val guiceyFruitModuleConfig = ModuleConfiguration("org.guiceyfruit", GuiceyFruitRepo) - // lazy val hawtdispatchModuleConfig = ModuleConfiguration("org.fusesource.hawtdispatch", FusesourceSnapshotRepo) - lazy val jbossModuleConfig = ModuleConfiguration("org.jboss", JBossRepo) - lazy val jdmkModuleConfig = ModuleConfiguration("com.sun.jdmk", SunJDMKRepo) - lazy val jerseyContrModuleConfig = ModuleConfiguration("com.sun.jersey.contribs", JavaNetRepo) - lazy val jerseyModuleConfig = ModuleConfiguration("com.sun.jersey", JavaNetRepo) - lazy val jgroupsModuleConfig = ModuleConfiguration("jgroups", JBossRepo) - lazy val jmsModuleConfig = ModuleConfiguration("javax.jms", SunJDMKRepo) - lazy val jmxModuleConfig = ModuleConfiguration("com.sun.jmx", SunJDMKRepo) - lazy val liftModuleConfig = ModuleConfiguration("net.liftweb", ScalaToolsSnapshots) - lazy val multiverseModuleConfig = ModuleConfiguration("org.multiverse", CodehausSnapshotRepo) - lazy val nettyModuleConfig = ModuleConfiguration("org.jboss.netty", JBossRepo) - lazy val scalaTestModuleConfig = ModuleConfiguration("org.scalatest", ScalaToolsSnapshots) +// lazy val atmosphereModuleConfig = ModuleConfiguration("org.atmosphere", SonatypeSnapshotRepo) +// lazy val grizzlyModuleConfig = ModuleConfiguration("com.sun.grizzly", JavaNetRepo) +// lazy val guiceyFruitModuleConfig = ModuleConfiguration("org.guiceyfruit", GuiceyFruitRepo) +// // lazy val hawtdispatchModuleConfig = ModuleConfiguration("org.fusesource.hawtdispatch", FusesourceSnapshotRepo) +// lazy val jbossModuleConfig = ModuleConfiguration("org.jboss", JBossRepo) +// lazy val jdmkModuleConfig = ModuleConfiguration("com.sun.jdmk", SunJDMKRepo) +// lazy val jerseyContrModuleConfig = ModuleConfiguration("com.sun.jersey.contribs", JavaNetRepo) +// lazy val jerseyModuleConfig = ModuleConfiguration("com.sun.jersey", JavaNetRepo) +// lazy val jgroupsModuleConfig = ModuleConfiguration("jgroups", JBossRepo) +// lazy val jmsModuleConfig = ModuleConfiguration("javax.jms", SunJDMKRepo) +// lazy val jmxModuleConfig = ModuleConfiguration("com.sun.jmx", SunJDMKRepo) +// lazy val liftModuleConfig = ModuleConfiguration("net.liftweb", ScalaToolsSnapshots) +// lazy val multiverseModuleConfig = ModuleConfiguration("org.multiverse", CodehausSnapshotRepo) +// lazy val nettyModuleConfig = ModuleConfiguration("org.jboss.netty", JBossRepo) +// lazy val scalaTestModuleConfig = ModuleConfiguration("org.scalatest", ScalaToolsSnapshots) lazy val embeddedRepo = EmbeddedRepo // This is the only exception, because the embedded repo is fast! + val mavenLocal = "Local Maven Repository" at "file:/e:/maven-repository" + val efgfpNexusReleasesRepository = "Nexus Releases" at "http://nexus/nexus/content/groups/public" + val efgfpNexusSnaphotsRepository = "Nexus Snapshots" at "http://nexus/nexus/content/groups/public-snapshots" // ------------------------------------------------------------------------------------------------------------------- // Versions // ------------------------------------------------------------------------------------------------------------------- @@ -358,6 +361,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { class AkkaAMQPProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) with CodeFellowPlugin { val commons_io = Dependencies.commons_io val rabbit = Dependencies.rabbit + val protobuf = Dependencies.protobuf // testing val junit = Dependencies.junit