Merge branch 'master' into ticket337

This commit is contained in:
Viktor Klang 2010-08-06 16:05:27 +02:00
commit afe820e2aa
5 changed files with 68 additions and 65 deletions

View file

@ -95,13 +95,15 @@ object AMQP {
exchangeParameters: ExchangeParameters,
routingKey: String,
serializer: RpcServerSerializer[I,O],
requestHandler: PartialFunction[I, O],
requestHandler: I => O,
queueName: Option[String] = None,
channelParameters: Option[ChannelParameters] = None) = {
val producer = newProducer(connection, new ProducerParameters(new ExchangeParameters("", ExchangeType.Direct), channelParameters = channelParameters))
val rpcServer = actorOf(new RpcServerActor[I,O](producer, serializer, requestHandler))
val consumer = newConsumer(connection, new ConsumerParameters(exchangeParameters, routingKey, rpcServer
, channelParameters = channelParameters
, selfAcknowledging = false))
, selfAcknowledging = false
, queueName = queueName))
}

View file

@ -144,10 +144,10 @@ object ExampleSession {
}
val rpcServerSerializer = new RpcServerSerializer[String, Int](serverFromBinary, serverToBinary)
val rpcServer = AMQP.newRpcServer[String,Int](connection, exchangeParameters, "rpc.in.key", rpcServerSerializer, {
case "rpc_request" => 3
case _ => error("unknown request")
})
def requestHandler(request: String) = 3
val rpcServer = AMQP.newRpcServer[String,Int](connection, exchangeParameters, "rpc.in.key", rpcServerSerializer,
requestHandler, queueName = Some("rpc.in.key.queue"))
/** Client */

View file

@ -8,7 +8,7 @@ import se.scalablesolutions.akka.actor.{ActorRef, Actor}
import com.rabbitmq.client.AMQP.BasicProperties
import se.scalablesolutions.akka.amqp.AMQP.RpcServerSerializer
class RpcServerActor[I,O](producer: ActorRef, serializer: RpcServerSerializer[I,O], requestHandler: PartialFunction[I, O]) extends Actor {
class RpcServerActor[I,O](producer: ActorRef, serializer: RpcServerSerializer[I,O], requestHandler: I => O) extends Actor {
log.info("%s started", this)

View file

@ -11,11 +11,9 @@ import se.scalablesolutions.akka.amqp._
import se.scalablesolutions.akka.actor.Actor._
import org.scalatest.matchers.MustMatchers
import java.util.concurrent.{CountDownLatch, TimeUnit}
import se.scalablesolutions.akka.serialization.Serializer
import se.scalablesolutions.akka.amqp.AMQP._
class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging {
@Test
def consumerMessage = if (AMQPTest.enabled) {
val connection = AMQP.newConnection()
@ -32,31 +30,31 @@ class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging
val channelParameters = ChannelParameters(channelCallback
= Some(channelCallback))
val serverFromBinary = new FromBinary[String] {
def fromBinary(bytes: Array[Byte]) = new String(bytes)
}
val serverToBinary = new ToBinary[Int] {
def toBinary(t: Int) = Array(t.toByte)
}
val rpcServerSerializer = new RpcServerSerializer[String, Int](serverFromBinary, serverToBinary)
val rpcServer = AMQP.newRpcServer[String,Int](connection, exchangeParameters, "rpc.routing", rpcServerSerializer, {
case "some_payload" => 3
case _ => error("unknown request")
}, channelParameters = Some(channelParameters))
val rpcServerSerializer = new RpcServerSerializer[String, Int](
new FromBinary[String] {
def fromBinary(bytes: Array[Byte]) = new String(bytes)
}, new ToBinary[Int] {
def toBinary(t: Int) = Array(t.toByte)
})
val clientToBinary = new ToBinary[String] {
def toBinary(t: String) = t.getBytes
}
val clientFromBinary = new FromBinary[Int] {
def fromBinary(bytes: Array[Byte]) = bytes.head.toInt
}
val rpcClientSerializer = new RpcClientSerializer[String, Int](clientToBinary, clientFromBinary)
val rpcClient = AMQP.newRpcClient[String,Int](connection, exchangeParameters, "rpc.routing", rpcClientSerializer,
def requestHandler(request: String) = 3
val rpcServer = AMQP.newRpcServer[String, Int](connection, exchangeParameters, "rpc.routing", rpcServerSerializer,
requestHandler, channelParameters = Some(channelParameters))
val rpcClientSerializer = new RpcClientSerializer[String, Int](
new ToBinary[String] {
def toBinary(t: String) = t.getBytes
}, new FromBinary[Int] {
def fromBinary(bytes: Array[Byte]) = bytes.head.toInt
})
val rpcClient = AMQP.newRpcClient[String, Int](connection, exchangeParameters, "rpc.routing", rpcClientSerializer,
channelParameters = Some(channelParameters))
countDown.await(2, TimeUnit.SECONDS) must be (true)
countDown.await(2, TimeUnit.SECONDS) must be(true)
val response = rpcClient !! "some_payload"
response must be (Some(3))
response must be(Some(3))
} finally {
connection.stop
}

View file

@ -9,54 +9,57 @@ import se.scalablesolutions.akka.dispatch.Dispatchers
import se.scalablesolutions.akka.actor.{ActorRef, Actor}
import Actor._
case class Send(actor: Actor)
object ClientInitiatedRemoteActorSpec {
case class Send(actor: Actor)
object RemoteActorSpecActorUnidirectional {
val latch = new CountDownLatch(1)
}
class RemoteActorSpecActorUnidirectional extends Actor {
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
def receive = {
case "OneWay" =>
RemoteActorSpecActorUnidirectional.latch.countDown
object RemoteActorSpecActorUnidirectional {
val latch = new CountDownLatch(1)
}
}
class RemoteActorSpecActorUnidirectional extends Actor {
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
class RemoteActorSpecActorBidirectional extends Actor {
def receive = {
case "Hello" =>
self.reply("World")
case "Failure" =>
throw new RuntimeException("Expected exception; to test fault-tolerance")
def receive = {
case "OneWay" =>
RemoteActorSpecActorUnidirectional.latch.countDown
}
}
}
class SendOneWayAndReplyReceiverActor extends Actor {
def receive = {
case "Hello" =>
self.reply("World")
class RemoteActorSpecActorBidirectional extends Actor {
def receive = {
case "Hello" =>
self.reply("World")
case "Failure" =>
throw new RuntimeException("Expected exception; to test fault-tolerance")
}
}
}
object SendOneWayAndReplySenderActor {
val latch = new CountDownLatch(1)
}
class SendOneWayAndReplySenderActor extends Actor {
var state: Option[AnyRef] = None
var sendTo: ActorRef = _
var latch: CountDownLatch = _
class SendOneWayAndReplyReceiverActor extends Actor {
def receive = {
case "Hello" =>
self.reply("World")
}
}
def sendOff = sendTo ! "Hello"
object SendOneWayAndReplySenderActor {
val latch = new CountDownLatch(1)
}
class SendOneWayAndReplySenderActor extends Actor {
var state: Option[AnyRef] = None
var sendTo: ActorRef = _
var latch: CountDownLatch = _
def receive = {
case msg: AnyRef =>
state = Some(msg)
SendOneWayAndReplySenderActor.latch.countDown
def sendOff = sendTo ! "Hello"
def receive = {
case msg: AnyRef =>
state = Some(msg)
SendOneWayAndReplySenderActor.latch.countDown
}
}
}
class ClientInitiatedRemoteActorSpec extends JUnitSuite {
import ClientInitiatedRemoteActorSpec._
se.scalablesolutions.akka.config.Config.config
val HOSTNAME = "localhost"