diff --git a/akka-amqp/src/main/scala/AMQP.scala b/akka-amqp/src/main/scala/AMQP.scala index 0bfbd93063..b1e08ae752 100644 --- a/akka-amqp/src/main/scala/AMQP.scala +++ b/akka-amqp/src/main/scala/AMQP.scala @@ -95,13 +95,15 @@ object AMQP { exchangeParameters: ExchangeParameters, routingKey: String, serializer: RpcServerSerializer[I,O], - requestHandler: PartialFunction[I, O], + requestHandler: I => O, + queueName: Option[String] = None, channelParameters: Option[ChannelParameters] = None) = { val producer = newProducer(connection, new ProducerParameters(new ExchangeParameters("", ExchangeType.Direct), channelParameters = channelParameters)) val rpcServer = actorOf(new RpcServerActor[I,O](producer, serializer, requestHandler)) val consumer = newConsumer(connection, new ConsumerParameters(exchangeParameters, routingKey, rpcServer , channelParameters = channelParameters - , selfAcknowledging = false)) + , selfAcknowledging = false + , queueName = queueName)) } diff --git a/akka-amqp/src/main/scala/ExampleSession.scala b/akka-amqp/src/main/scala/ExampleSession.scala index 97571e2783..4fa1358a29 100644 --- a/akka-amqp/src/main/scala/ExampleSession.scala +++ b/akka-amqp/src/main/scala/ExampleSession.scala @@ -144,10 +144,10 @@ object ExampleSession { } val rpcServerSerializer = new RpcServerSerializer[String, Int](serverFromBinary, serverToBinary) - val rpcServer = AMQP.newRpcServer[String,Int](connection, exchangeParameters, "rpc.in.key", rpcServerSerializer, { - case "rpc_request" => 3 - case _ => error("unknown request") - }) + def requestHandler(request: String) = 3 + + val rpcServer = AMQP.newRpcServer[String,Int](connection, exchangeParameters, "rpc.in.key", rpcServerSerializer, + requestHandler, queueName = Some("rpc.in.key.queue")) /** Client */ diff --git a/akka-amqp/src/main/scala/RpcServerActor.scala b/akka-amqp/src/main/scala/RpcServerActor.scala index c64ef9058e..99d74d9b56 100644 --- a/akka-amqp/src/main/scala/RpcServerActor.scala +++ b/akka-amqp/src/main/scala/RpcServerActor.scala @@ -8,7 +8,7 @@ import se.scalablesolutions.akka.actor.{ActorRef, Actor} import com.rabbitmq.client.AMQP.BasicProperties import se.scalablesolutions.akka.amqp.AMQP.RpcServerSerializer -class RpcServerActor[I,O](producer: ActorRef, serializer: RpcServerSerializer[I,O], requestHandler: PartialFunction[I, O]) extends Actor { +class RpcServerActor[I,O](producer: ActorRef, serializer: RpcServerSerializer[I,O], requestHandler: I => O) extends Actor { log.info("%s started", this) diff --git a/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala b/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala index dcaec4cd06..c585675098 100644 --- a/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala +++ b/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala @@ -11,11 +11,9 @@ import se.scalablesolutions.akka.amqp._ import se.scalablesolutions.akka.actor.Actor._ import org.scalatest.matchers.MustMatchers import java.util.concurrent.{CountDownLatch, TimeUnit} -import se.scalablesolutions.akka.serialization.Serializer import se.scalablesolutions.akka.amqp.AMQP._ class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging { - @Test def consumerMessage = if (AMQPTest.enabled) { val connection = AMQP.newConnection() @@ -32,31 +30,31 @@ class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging val channelParameters = ChannelParameters(channelCallback = Some(channelCallback)) - val serverFromBinary = new FromBinary[String] { - def fromBinary(bytes: Array[Byte]) = new String(bytes) - } - val serverToBinary = new ToBinary[Int] { - def toBinary(t: Int) = Array(t.toByte) - } - val rpcServerSerializer = new RpcServerSerializer[String, Int](serverFromBinary, serverToBinary) - val rpcServer = AMQP.newRpcServer[String,Int](connection, exchangeParameters, "rpc.routing", rpcServerSerializer, { - case "some_payload" => 3 - case _ => error("unknown request") - }, channelParameters = Some(channelParameters)) + val rpcServerSerializer = new RpcServerSerializer[String, Int]( + new FromBinary[String] { + def fromBinary(bytes: Array[Byte]) = new String(bytes) + }, new ToBinary[Int] { + def toBinary(t: Int) = Array(t.toByte) + }) - val clientToBinary = new ToBinary[String] { - def toBinary(t: String) = t.getBytes - } - val clientFromBinary = new FromBinary[Int] { - def fromBinary(bytes: Array[Byte]) = bytes.head.toInt - } - val rpcClientSerializer = new RpcClientSerializer[String, Int](clientToBinary, clientFromBinary) - val rpcClient = AMQP.newRpcClient[String,Int](connection, exchangeParameters, "rpc.routing", rpcClientSerializer, + def requestHandler(request: String) = 3 + + val rpcServer = AMQP.newRpcServer[String, Int](connection, exchangeParameters, "rpc.routing", rpcServerSerializer, + requestHandler, channelParameters = Some(channelParameters)) + + val rpcClientSerializer = new RpcClientSerializer[String, Int]( + new ToBinary[String] { + def toBinary(t: String) = t.getBytes + }, new FromBinary[Int] { + def fromBinary(bytes: Array[Byte]) = bytes.head.toInt + }) + + val rpcClient = AMQP.newRpcClient[String, Int](connection, exchangeParameters, "rpc.routing", rpcClientSerializer, channelParameters = Some(channelParameters)) - countDown.await(2, TimeUnit.SECONDS) must be (true) + countDown.await(2, TimeUnit.SECONDS) must be(true) val response = rpcClient !! "some_payload" - response must be (Some(3)) + response must be(Some(3)) } finally { connection.stop } diff --git a/akka-core/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala b/akka-core/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala index e9ed8d4fa5..7ff46ab910 100644 --- a/akka-core/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala +++ b/akka-core/src/test/scala/remote/ClientInitiatedRemoteActorSpec.scala @@ -9,54 +9,57 @@ import se.scalablesolutions.akka.dispatch.Dispatchers import se.scalablesolutions.akka.actor.{ActorRef, Actor} import Actor._ -case class Send(actor: Actor) +object ClientInitiatedRemoteActorSpec { + case class Send(actor: Actor) -object RemoteActorSpecActorUnidirectional { - val latch = new CountDownLatch(1) -} -class RemoteActorSpecActorUnidirectional extends Actor { - self.dispatcher = Dispatchers.newThreadBasedDispatcher(self) - - def receive = { - case "OneWay" => - RemoteActorSpecActorUnidirectional.latch.countDown + object RemoteActorSpecActorUnidirectional { + val latch = new CountDownLatch(1) } -} + class RemoteActorSpecActorUnidirectional extends Actor { + self.dispatcher = Dispatchers.newThreadBasedDispatcher(self) -class RemoteActorSpecActorBidirectional extends Actor { - def receive = { - case "Hello" => - self.reply("World") - case "Failure" => - throw new RuntimeException("Expected exception; to test fault-tolerance") + def receive = { + case "OneWay" => + RemoteActorSpecActorUnidirectional.latch.countDown + } } -} -class SendOneWayAndReplyReceiverActor extends Actor { - def receive = { - case "Hello" => - self.reply("World") + class RemoteActorSpecActorBidirectional extends Actor { + def receive = { + case "Hello" => + self.reply("World") + case "Failure" => + throw new RuntimeException("Expected exception; to test fault-tolerance") + } } -} -object SendOneWayAndReplySenderActor { - val latch = new CountDownLatch(1) -} -class SendOneWayAndReplySenderActor extends Actor { - var state: Option[AnyRef] = None - var sendTo: ActorRef = _ - var latch: CountDownLatch = _ + class SendOneWayAndReplyReceiverActor extends Actor { + def receive = { + case "Hello" => + self.reply("World") + } + } - def sendOff = sendTo ! "Hello" + object SendOneWayAndReplySenderActor { + val latch = new CountDownLatch(1) + } + class SendOneWayAndReplySenderActor extends Actor { + var state: Option[AnyRef] = None + var sendTo: ActorRef = _ + var latch: CountDownLatch = _ - def receive = { - case msg: AnyRef => - state = Some(msg) - SendOneWayAndReplySenderActor.latch.countDown + def sendOff = sendTo ! "Hello" + + def receive = { + case msg: AnyRef => + state = Some(msg) + SendOneWayAndReplySenderActor.latch.countDown + } } } class ClientInitiatedRemoteActorSpec extends JUnitSuite { + import ClientInitiatedRemoteActorSpec._ se.scalablesolutions.akka.config.Config.config val HOSTNAME = "localhost"