- Finshed up java api for RPC

- Made case objects 'java compatible' via getInstance function
- Added RPC examples in java examplesession
This commit is contained in:
momania 2010-10-08 17:40:53 +02:00
parent 65e96e71c5
commit 31e74bfa53
14 changed files with 348 additions and 104 deletions

View file

@ -1,16 +1,22 @@
package se.scalablesolutions.akka.amqp; package se.scalablesolutions.akka.amqp;
import org.multiverse.api.latches.StandardLatch;
import scala.Option;
import se.scalablesolutions.akka.actor.ActorRef; import se.scalablesolutions.akka.actor.ActorRef;
import se.scalablesolutions.akka.actor.ActorRegistry; import se.scalablesolutions.akka.actor.ActorRegistry;
import se.scalablesolutions.akka.actor.UntypedActor; import se.scalablesolutions.akka.actor.UntypedActor;
import se.scalablesolutions.akka.actor.UntypedActorFactory; import se.scalablesolutions.akka.actor.UntypedActorFactory;
import se.scalablesolutions.akka.amqp.rpc.RPC;
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol; import se.scalablesolutions.akka.remote.protocol.RemoteProtocol;
import se.scalablesolutions.akka.util.Function;
import se.scalablesolutions.akka.util.Procedure; import se.scalablesolutions.akka.util.Procedure;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@SuppressWarnings({"unchecked"})
public class ExampleSessionJava { public class ExampleSessionJava {
public static void main(String... args) { public static void main(String... args) {
@ -30,6 +36,11 @@ public class ExampleSessionJava {
printTopic("EASY PROTOBUF PRODUCER AND CONSUMER"); printTopic("EASY PROTOBUF PRODUCER AND CONSUMER");
easyProtobufProducerConsumer(); easyProtobufProducerConsumer();
printTopic("EASY STRING RPC");
easyStringRpc();
printTopic("EASY PROTOBUF RPC");
easyProtobufRpc();
// postStop everything the amqp tree except the main AMQP supervisor // postStop everything the amqp tree except the main AMQP supervisor
// all connections/consumers/producers will be stopped // all connections/consumers/producers will be stopped
@ -57,7 +68,7 @@ public class ExampleSessionJava {
// defaults to amqp://guest:guest@localhost:5672/ // defaults to amqp://guest:guest@localhost:5672/
ActorRef connection = AMQP.newConnection(); ActorRef connection = AMQP.newConnection();
AMQP.ExchangeParameters exchangeParameters = new AMQP.ExchangeParameters("my_direct_exchange", new ExchangeType.Direct()); AMQP.ExchangeParameters exchangeParameters = new AMQP.ExchangeParameters("my_direct_exchange", Direct.getInstance());
ActorRef deliveryHandler = UntypedActor.actorOf(DirectDeliveryHandlerActor.class); ActorRef deliveryHandler = UntypedActor.actorOf(DirectDeliveryHandlerActor.class);
@ -86,7 +97,7 @@ public class ExampleSessionJava {
}); });
channelCallback.start(); channelCallback.start();
AMQP.ExchangeParameters exchangeParameters = new AMQP.ExchangeParameters("my_callback_exchange", new ExchangeType.Direct()); AMQP.ExchangeParameters exchangeParameters = new AMQP.ExchangeParameters("my_callback_exchange", Direct.getInstance());
AMQP.ChannelParameters channelParameters = new AMQP.ChannelParameters(channelCallback); AMQP.ChannelParameters channelParameters = new AMQP.ChannelParameters(channelCallback);
ActorRef dummyHandler = UntypedActor.actorOf(DummyActor.class); ActorRef dummyHandler = UntypedActor.actorOf(DummyActor.class);
@ -133,7 +144,7 @@ public class ExampleSessionJava {
ActorRef connection = AMQP.newConnection(); ActorRef connection = AMQP.newConnection();
String exchangeName = "easy.protobuf"; String exchangeName = "easy.protobuf";
Procedure<RemoteProtocol.AddressProtocol> procedure = new Procedure<RemoteProtocol.AddressProtocol>() { Procedure<RemoteProtocol.AddressProtocol> procedure = new Procedure<RemoteProtocol.AddressProtocol>() {
public void apply(RemoteProtocol.AddressProtocol message) { public void apply(RemoteProtocol.AddressProtocol message) {
System.out.println("### >> Received message: " + message); System.out.println("### >> Received message: " + message);
@ -146,6 +157,66 @@ public class ExampleSessionJava {
producerClient.send(RemoteProtocol.AddressProtocol.newBuilder().setHostname("akkarocks.com").setPort(1234).build()); producerClient.send(RemoteProtocol.AddressProtocol.newBuilder().setHostname("akkarocks.com").setPort(1234).build());
} }
public void easyStringRpc() {
ActorRef connection = AMQP.newConnection();
String exchangeName = "easy.stringrpc";
// listen by default to:
// exchange = exchangeName
// routingKey = <exchange>.request
// queueName = <routingKey>.in
RPC.newStringRpcServer(connection, exchangeName, new Function<String, String>() {
public String apply(String request) {
System.out.println("### >> Got request: " + request);
return "Response to: '" + request + "'";
}
});
// send by default to:
// exchange = exchangeName
// routingKey = <exchange>.request
RPC.RpcClient<String, String> stringRpcClient = RPC.newStringRpcClient(connection, exchangeName);
Option<String> response = stringRpcClient.call("AMQP Rocks!");
System.out.println("### >> Got response: " + response);
final StandardLatch standardLatch = new StandardLatch();
stringRpcClient.callAsync("AMQP is dead easy", new Procedure<String>() {
public void apply(String request) {
System.out.println("### >> This is handled async: " + request);
standardLatch.open();
}
});
try {
standardLatch.tryAwait(2, TimeUnit.SECONDS);
} catch (InterruptedException ignore) {
}
}
public void easyProtobufRpc() {
ActorRef connection = AMQP.newConnection();
String exchangeName = "easy.protobuf.rpc";
RPC.newProtobufRpcServer(connection, exchangeName, new Function<RemoteProtocol.AddressProtocol, RemoteProtocol.AddressProtocol>() {
public RemoteProtocol.AddressProtocol apply(RemoteProtocol.AddressProtocol request) {
return RemoteProtocol.AddressProtocol.newBuilder().setHostname(request.getHostname()).setPort(request.getPort()).build();
}
}, RemoteProtocol.AddressProtocol.class);
RPC.RpcClient<RemoteProtocol.AddressProtocol, RemoteProtocol.AddressProtocol> protobufRpcClient =
RPC.newProtobufRpcClient(connection, exchangeName, RemoteProtocol.AddressProtocol.class);
scala.Option<RemoteProtocol.AddressProtocol> response =
protobufRpcClient.call(RemoteProtocol.AddressProtocol.newBuilder().setHostname("localhost").setPort(4321).build());
System.out.println("### >> Got response: " + response);
}
} }
class DummyActor extends UntypedActor { class DummyActor extends UntypedActor {
@ -163,11 +234,11 @@ class ChannelCallbackActor extends UntypedActor {
} }
public void onReceive(Object message) throws Exception { public void onReceive(Object message) throws Exception {
if (Started.class.isAssignableFrom(message.getClass())) { if (Started.getInstance().getClass().isAssignableFrom(message.getClass())) {
System.out.println("### >> Channel callback: Started"); System.out.println("### >> Channel callback: Started");
channelCountdown.countDown(); channelCountdown.countDown();
} else if (Restarting.class.isAssignableFrom(message.getClass())) { } else if (Restarting.getInstance().getClass().isAssignableFrom(message.getClass())) {
} else if (Stopped.class.isAssignableFrom(message.getClass())) { } else if (Stopped.getInstance().getClass().isAssignableFrom(message.getClass())) {
System.out.println("### >> Channel callback: Stopped"); System.out.println("### >> Channel callback: Stopped");
} else throw new IllegalArgumentException("Unknown message: " + message); } else throw new IllegalArgumentException("Unknown message: " + message);
} }
@ -176,10 +247,10 @@ class ChannelCallbackActor extends UntypedActor {
class ConnectionCallbackActor extends UntypedActor { class ConnectionCallbackActor extends UntypedActor {
public void onReceive(Object message) throws Exception { public void onReceive(Object message) throws Exception {
if (Connected.class.isAssignableFrom(message.getClass())) { if (Connected.getInstance().getClass().isAssignableFrom(message.getClass())) {
System.out.println("### >> Connection callback: Connected!"); System.out.println("### >> Connection callback: Connected!");
} else if (Reconnecting.class.isAssignableFrom(message.getClass())) { } else if (Reconnecting.getInstance().getClass().isAssignableFrom(message.getClass())) {
} else if (Disconnected.class.isAssignableFrom(message.getClass())) { } else if (Disconnected.getInstance().getClass().isAssignableFrom(message.getClass())) {
System.out.println("### >> Connection callback: Disconnected!"); System.out.println("### >> Connection callback: Disconnected!");
} else throw new IllegalArgumentException("Unknown message: " + message); } else throw new IllegalArgumentException("Unknown message: " + message);
} }

View file

@ -74,8 +74,12 @@ object AMQP {
* Declaration type used for either exchange or queue declaration * Declaration type used for either exchange or queue declaration
*/ */
sealed trait Declaration sealed trait Declaration
case object NoActionDeclaration extends Declaration case object NoActionDeclaration extends Declaration {
case object PassiveDeclaration extends Declaration def getInstance() = this // Needed for Java API usage
}
case object PassiveDeclaration extends Declaration {
def getInstance() = this // Needed for Java API usage
}
case class ActiveDeclaration(durable: Boolean = false, autoDelete: Boolean = true, exclusive: Boolean = false) extends Declaration { case class ActiveDeclaration(durable: Boolean = false, autoDelete: Boolean = true, exclusive: Boolean = false) extends Declaration {
// Needed for Java API usage // Needed for Java API usage
@ -90,13 +94,13 @@ object AMQP {
*/ */
case class ExchangeParameters( case class ExchangeParameters(
exchangeName: String, exchangeName: String,
exchangeType: ExchangeType = ExchangeType.Topic(), exchangeType: ExchangeType = Topic,
exchangeDeclaration: Declaration = ActiveDeclaration(), exchangeDeclaration: Declaration = ActiveDeclaration(),
configurationArguments: Map[String, AnyRef] = Map.empty) { configurationArguments: Map[String, AnyRef] = Map.empty) {
// Needed for Java API usage // Needed for Java API usage
def this(exchangeName: String) = def this(exchangeName: String) =
this (exchangeName, ExchangeType.Topic(), ActiveDeclaration(), Map.empty) this (exchangeName, Topic, ActiveDeclaration(), Map.empty)
// Needed for Java API usage // Needed for Java API usage
def this(exchangeName: String, exchangeType: ExchangeType) = def this(exchangeName: String, exchangeType: ExchangeType) =

View file

@ -44,18 +44,30 @@ case class Delivery(
// connection messages // connection messages
case object Connect extends AMQPMessage case object Connect extends AMQPMessage
case class Connected() extends AMQPMessage // Needed for Java API usage case object Connected extends AMQPMessage {
case class Reconnecting() extends AMQPMessage // Needed for Java API usage def getInstance() = this // Needed for Java API usage
case class Disconnected() extends AMQPMessage // Needed for Java API usage }
case object Reconnecting extends AMQPMessage {
def getInstance() = this // Needed for Java API usage
}
case object Disconnected extends AMQPMessage {
def getInstance() = this // Needed for Java API usage
}
case object ChannelRequest extends InternalAMQPMessage case object ChannelRequest extends InternalAMQPMessage
// channel messages // channel messages
case object Start extends AMQPMessage case object Start extends AMQPMessage
case class Started() extends AMQPMessage // Needed for Java API usage case object Started extends AMQPMessage {
case class Restarting() extends AMQPMessage // Needed for Java API usage def getInstance() = this // Needed for Java API usage
case class Stopped() extends AMQPMessage // Needed for Java API usage }
case object Restarting extends AMQPMessage {
def getInstance() = this // Needed for Java API usage
}
case object Stopped extends AMQPMessage {
def getInstance() = this // Needed for Java API usage
}
// delivery messages // delivery messages
case class Acknowledge(deliveryTag: Long) extends AMQPMessage case class Acknowledge(deliveryTag: Long) extends AMQPMessage

View file

@ -12,7 +12,6 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.lang.String import java.lang.String
import se.scalablesolutions.akka.amqp.AMQP._ import se.scalablesolutions.akka.amqp.AMQP._
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol.AddressProtocol import se.scalablesolutions.akka.remote.protocol.RemoteProtocol.AddressProtocol
import se.scalablesolutions.akka.amqp.ExchangeType._
object ExampleSession { object ExampleSession {
@ -68,7 +67,7 @@ object ExampleSession {
// defaults to amqp://guest:guest@localhost:5672/ // defaults to amqp://guest:guest@localhost:5672/
val connection = AMQP.newConnection() val connection = AMQP.newConnection()
val exchangeParameters = ExchangeParameters("my_direct_exchange", Direct()) val exchangeParameters = ExchangeParameters("my_direct_exchange", Direct)
val consumer = AMQP.newConsumer(connection, ConsumerParameters("some.routing", actor { val consumer = AMQP.newConsumer(connection, ConsumerParameters("some.routing", actor {
case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload)) case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload))
@ -83,7 +82,7 @@ object ExampleSession {
// defaults to amqp://guest:guest@localhost:5672/ // defaults to amqp://guest:guest@localhost:5672/
val connection = AMQP.newConnection() val connection = AMQP.newConnection()
val exchangeParameters = ExchangeParameters("my_fanout_exchange", Fanout()) val exchangeParameters = ExchangeParameters("my_fanout_exchange", Fanout)
val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters("@george_bush", actor { val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters("@george_bush", actor {
case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload)) case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload))
@ -102,7 +101,7 @@ object ExampleSession {
// defaults to amqp://guest:guest@localhost:5672/ // defaults to amqp://guest:guest@localhost:5672/
val connection = AMQP.newConnection() val connection = AMQP.newConnection()
val exchangeParameters = ExchangeParameters("my_topic_exchange", Topic()) val exchangeParameters = ExchangeParameters("my_topic_exchange", Topic)
val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters("@george_bush", actor { val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters("@george_bush", actor {
case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload)) case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload))
@ -136,7 +135,7 @@ object ExampleSession {
case Restarting => // not used, sent when channel or connection fails and initiates a restart case Restarting => // not used, sent when channel or connection fails and initiates a restart
case Stopped => log.info("Channel callback: Stopped") case Stopped => log.info("Channel callback: Stopped")
} }
val exchangeParameters = ExchangeParameters("my_callback_exchange", Direct()) val exchangeParameters = ExchangeParameters("my_callback_exchange", Direct)
val channelParameters = ChannelParameters(channelCallback = Some(channelCallback)) val channelParameters = ChannelParameters(channelCallback = Some(channelCallback))
val consumer = AMQP.newConsumer(connection, ConsumerParameters("callback.routing", actor { val consumer = AMQP.newConsumer(connection, ConsumerParameters("callback.routing", actor {
@ -202,7 +201,7 @@ object ExampleSession {
def requestHandler(request: String) = 3 def requestHandler(request: String) = 3
val rpcServer = RPC.newRpcServer[String,Int](connection, exchangeName, "rpc.in.key", rpcServerSerializer, val rpcServer = RPC.newRpcServer[String,Int](connection, exchangeName, "rpc.in.key", rpcServerSerializer,
requestHandler, queueName = Some("rpc.in.key.queue")) requestHandler _, queueName = Some("rpc.in.key.queue"))
/** Client */ /** Client */

View file

@ -5,17 +5,19 @@
package se.scalablesolutions.akka.amqp package se.scalablesolutions.akka.amqp
sealed trait ExchangeType sealed trait ExchangeType
object ExchangeType { case object Direct extends ExchangeType {
case class Direct() extends ExchangeType { def getInstance() = this // Needed for Java API usage
override def toString = "direct" override def toString = "direct"
} }
case class Topic() extends ExchangeType { case object Topic extends ExchangeType {
override def toString = "topic" def getInstance() = this // Needed for Java API usage
} override def toString = "topic"
case class Fanout() extends ExchangeType { }
override def toString = "fanout" case object Fanout extends ExchangeType {
} def getInstance() = this // Needed for Java API usage
case class Match() extends ExchangeType { override def toString = "fanout"
override def toString = "match" }
} case object Match extends ExchangeType {
def getInstance() = this // Needed for Java API usage
override def toString = "match"
} }

View file

@ -46,14 +46,14 @@ abstract private[amqp] class FaultTolerantChannelActor(
} else { } else {
// channel error // channel error
log.error(cause, "%s self restarting because of channel shutdown", toString) log.error(cause, "%s self restarting because of channel shutdown", toString)
notifyCallback(Restarting()) notifyCallback(Restarting)
self ! Start self ! Start
} }
} }
case Failure(cause) => case Failure(cause) =>
log.error(cause, "%s self restarting because of channel failure", toString) log.error(cause, "%s self restarting because of channel failure", toString)
closeChannel closeChannel
notifyCallback(Restarting()) notifyCallback(Restarting)
self ! Start self ! Start
} }
@ -81,7 +81,7 @@ abstract private[amqp] class FaultTolerantChannelActor(
setupChannel(ch) setupChannel(ch)
channel = Some(ch) channel = Some(ch)
notifyCallback(Started()) notifyCallback(Started)
log.info("Channel setup for %s", toString) log.info("Channel setup for %s", toString)
} }
@ -89,7 +89,7 @@ abstract private[amqp] class FaultTolerantChannelActor(
channel.foreach { channel.foreach {
ch => ch =>
if (ch.isOpen) ch.close if (ch.isOpen) ch.close
notifyCallback(Stopped()) notifyCallback(Stopped)
log.info("%s channel closed", toString) log.info("%s channel closed", toString)
} }
channel = None channel = None
@ -100,7 +100,7 @@ abstract private[amqp] class FaultTolerantChannelActor(
} }
override def preRestart(reason: Throwable) = { override def preRestart(reason: Throwable) = {
notifyCallback(Restarting()) notifyCallback(Restarting)
closeChannel closeChannel
} }

View file

@ -72,7 +72,7 @@ private[amqp] class FaultTolerantConnectionActor(connectionParameters: Connectio
log.info("Successfully (re)connected to AMQP Server %s:%s [%s]", host, port, self.id) log.info("Successfully (re)connected to AMQP Server %s:%s [%s]", host, port, self.id)
log.debug("Sending new channel to %d already linked actors", self.linkedActorsAsList.size) log.debug("Sending new channel to %d already linked actors", self.linkedActorsAsList.size)
self.linkedActorsAsList.foreach(_ ! conn.createChannel) self.linkedActorsAsList.foreach(_ ! conn.createChannel)
notifyCallback(Connected()) notifyCallback(Connected)
} }
} catch { } catch {
case e: Exception => case e: Exception =>
@ -81,7 +81,7 @@ private[amqp] class FaultTolerantConnectionActor(connectionParameters: Connectio
, connectionParameters.initReconnectDelay, self.id) , connectionParameters.initReconnectDelay, self.id)
reconnectionTimer.schedule(new TimerTask() { reconnectionTimer.schedule(new TimerTask() {
override def run = { override def run = {
notifyCallback(Reconnecting()) notifyCallback(Reconnecting)
self ! Connect self ! Connect
} }
}, connectionParameters.initReconnectDelay) }, connectionParameters.initReconnectDelay)
@ -92,7 +92,7 @@ private[amqp] class FaultTolerantConnectionActor(connectionParameters: Connectio
try { try {
connection.foreach(_.close) connection.foreach(_.close)
log.debug("Disconnected AMQP connection at %s:%s [%s]", host, port, self.id) log.debug("Disconnected AMQP connection at %s:%s [%s]", host, port, self.id)
notifyCallback(Disconnected()) notifyCallback(Disconnected)
} catch { } catch {
case e: IOException => log.error("Could not close AMQP connection %s:%s [%s]", host, port, self.id) case e: IOException => log.error("Could not close AMQP connection %s:%s [%s]", host, port, self.id)
case _ => () case _ => ()
@ -114,7 +114,7 @@ private[amqp] class FaultTolerantConnectionActor(connectionParameters: Connectio
override def preRestart(reason: Throwable) = disconnect override def preRestart(reason: Throwable) = disconnect
override def postRestart(reason: Throwable) = { override def postRestart(reason: Throwable) = {
notifyCallback(Reconnecting()) notifyCallback(Reconnecting)
connect connect
} }
} }

View file

@ -5,21 +5,80 @@ import com.google.protobuf.Message
import se.scalablesolutions.akka.actor.{Actor, ActorRef} import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import Actor._ import Actor._
import se.scalablesolutions.akka.amqp._ import se.scalablesolutions.akka.amqp._
import se.scalablesolutions.akka.util.Procedure
import reflect.Manifest
object RPC { object RPC {
// Needed for Java API usage
def newRpcClient[O, I](connection: ActorRef,
exchangeName: String,
routingKey: String,
serializer: RpcClientSerializer[O, I]): ActorRef = {
newRpcClient(connection, exchangeName, routingKey, serializer, None)
}
// Needed for Java API usage
def newRpcClient[O, I](connection: ActorRef,
exchangeName: String,
routingKey: String,
serializer: RpcClientSerializer[O, I],
channelParameters: ChannelParameters): ActorRef = {
newRpcClient(connection, exchangeName, routingKey, serializer, Some(channelParameters))
}
def newRpcClient[O, I](connection: ActorRef, def newRpcClient[O, I](connection: ActorRef,
exchangeName: String, exchangeName: String,
routingKey: String, routingKey: String,
serializer: RpcClientSerializer[O, I], serializer: RpcClientSerializer[O, I],
channelParameters: Option[ChannelParameters] = None): ActorRef = { channelParameters: Option[ChannelParameters] = None): ActorRef = {
val rpcActor: ActorRef = actorOf(new RpcClientActor[O, I]( val rpcActor: ActorRef = actorOf(new RpcClientActor[O, I](
ExchangeParameters(exchangeName), routingKey, serializer, channelParameters)) ExchangeParameters(exchangeName, exchangeDeclaration = PassiveDeclaration), routingKey, serializer, channelParameters))
connection.startLink(rpcActor) connection.startLink(rpcActor)
rpcActor ! Start rpcActor ! Start
rpcActor rpcActor
} }
// Needed for Java API usage
def newRpcServer[I, O](connection: ActorRef,
exchangeName: String,
routingKey: String,
serializer: RpcServerSerializer[I, O],
requestHandler: se.scalablesolutions.akka.util.Function[I,O]): RpcServerHandle = {
newRpcServer(connection, exchangeName, routingKey, serializer, requestHandler.apply _)
}
// Needed for Java API usage
def newRpcServer[I, O](connection: ActorRef,
exchangeName: String,
routingKey: String,
serializer: RpcServerSerializer[I, O],
requestHandler: se.scalablesolutions.akka.util.Function[I,O],
queueName: String): RpcServerHandle = {
newRpcServer(connection, exchangeName, routingKey, serializer, requestHandler.apply _, Some(queueName))
}
// Needed for Java API usage
def newRpcServer[I, O](connection: ActorRef,
exchangeName: String,
routingKey: String,
serializer: RpcServerSerializer[I, O],
requestHandler: se.scalablesolutions.akka.util.Function[I,O],
channelParameters: ChannelParameters): RpcServerHandle = {
newRpcServer(connection, exchangeName, routingKey, serializer, requestHandler.apply _, None, Some(channelParameters))
}
// Needed for Java API usage
def newRpcServer[I, O](connection: ActorRef,
exchangeName: String,
routingKey: String,
serializer: RpcServerSerializer[I, O],
requestHandler: se.scalablesolutions.akka.util.Function[I,O],
queueName: String,
channelParameters: ChannelParameters): RpcServerHandle = {
newRpcServer(connection, exchangeName, routingKey, serializer, requestHandler.apply _, Some(queueName), Some(channelParameters))
}
def newRpcServer[I, O](connection: ActorRef, def newRpcServer[I, O](connection: ActorRef,
exchangeName: String, exchangeName: String,
routingKey: String, routingKey: String,
@ -52,10 +111,28 @@ object RPC {
* RPC convenience * RPC convenience
*/ */
class RpcClient[O, I](client: ActorRef){ class RpcClient[O, I](client: ActorRef){
// Needed for Java API usage
def call(request: O): Option[I] = {
call(request, 5000)
}
def call(request: O, timeout: Long = 5000): Option[I] = { def call(request: O, timeout: Long = 5000): Option[I] = {
(client.!!(request, timeout)).as[I] (client.!!(request, timeout)).as[I]
} }
// Needed for Java API usage
def callAsync(request: O, responseHandler: Procedure[I]): Unit = {
callAsync(request, 5000, responseHandler)
}
// Needed for Java API usage
def callAsync(request: O, timeout: Long, responseHandler: Procedure[I]): Unit = {
callAsync(request, timeout){
case Some(response) => responseHandler.apply(response)
}
}
def callAsync(request: O, timeout: Long = 5000)(responseHandler: PartialFunction[Option[I],Unit]) = { def callAsync(request: O, timeout: Long = 5000)(responseHandler: PartialFunction[Option[I],Unit]) = {
spawn { spawn {
val result = call(request, timeout) val result = call(request, timeout)
@ -65,14 +142,49 @@ object RPC {
def stop = client.stop def stop = client.stop
} }
// Needed for Java API usage
def newProtobufRpcServer[I <: Message, O <: Message](
connection: ActorRef,
exchangeName: String,
requestHandler: se.scalablesolutions.akka.util.Function[I,O],
resultClazz: Class[I]): RpcServerHandle = {
implicit val manifest = Manifest.classType[I](resultClazz)
newProtobufRpcServer(connection, exchangeName, requestHandler.apply _)
}
// Needed for Java API usage
def newProtobufRpcServer[I <: Message, O <: Message](
connection: ActorRef,
exchangeName: String,
requestHandler: se.scalablesolutions.akka.util.Function[I,O],
routingKey: String,
resultClazz: Class[I]): RpcServerHandle = {
implicit val manifest = Manifest.classType[I](resultClazz)
newProtobufRpcServer(connection, exchangeName, requestHandler.apply _, Some(routingKey))
}
// Needed for Java API usage
def newProtobufRpcServer[I <: Message, O <: Message](
connection: ActorRef,
exchangeName: String,
requestHandler: se.scalablesolutions.akka.util.Function[I,O],
routingKey: String,
queueName: String,
resultClazz: Class[I]): RpcServerHandle = {
implicit val manifest = Manifest.classType[I](resultClazz)
newProtobufRpcServer(connection, exchangeName, requestHandler.apply _, Some(routingKey), Some(queueName))
}
def newProtobufRpcServer[I <: Message, O <: Message]( def newProtobufRpcServer[I <: Message, O <: Message](
connection: ActorRef, connection: ActorRef,
exchangeName: String, exchangeName: String,
requestHandler: I => O, requestHandler: I => O,
routingKey: Option[String] = None, routingKey: Option[String] = None,
queueName: Option[String] = None, queueName: Option[String] = None)(implicit manifest: Manifest[I]): RpcServerHandle = {
durable: Boolean = false,
autoDelete: Boolean = true)(implicit manifest: Manifest[I]): RpcServerHandle = {
val serializer = new RpcServerSerializer[I, O]( val serializer = new RpcServerSerializer[I, O](
new FromBinary[I] { new FromBinary[I] {
@ -83,16 +195,34 @@ object RPC {
def toBinary(t: O) = t.toByteArray def toBinary(t: O) = t.toByteArray
}) })
startServer(connection, exchangeName, requestHandler, routingKey, queueName, durable, autoDelete, serializer) startServer(connection, exchangeName, requestHandler, routingKey, queueName, serializer)
}
// Needed for Java API usage
def newProtobufRpcClient[O <: Message, I <: Message](
connection: ActorRef,
exchangeName: String,
resultClazz: Class[I]): RpcClient[O, I] = {
implicit val manifest = Manifest.classType[I](resultClazz)
newProtobufRpcClient(connection, exchangeName, None)
}
// Needed for Java API usage
def newProtobufRpcClient[O <: Message, I <: Message](
connection: ActorRef,
exchangeName: String,
routingKey: String,
resultClazz: Class[I]): RpcClient[O, I] = {
implicit val manifest = Manifest.classType[I](resultClazz)
newProtobufRpcClient(connection, exchangeName, Some(routingKey))
} }
def newProtobufRpcClient[O <: Message, I <: Message]( def newProtobufRpcClient[O <: Message, I <: Message](
connection: ActorRef, connection: ActorRef,
exchangeName: String, exchangeName: String,
routingKey: Option[String] = None, routingKey: Option[String] = None)(implicit manifest: Manifest[I]): RpcClient[O, I] = {
durable: Boolean = false,
autoDelete: Boolean = true,
passive: Boolean = true)(implicit manifest: Manifest[I]): RpcClient[O, I] = {
val serializer = new RpcClientSerializer[O, I]( val serializer = new RpcClientSerializer[O, I](
@ -104,16 +234,38 @@ object RPC {
} }
}) })
startClient(connection, exchangeName, routingKey, durable, autoDelete, passive, serializer) startClient(connection, exchangeName, routingKey, serializer)
}
// Needed for Java API usage
def newStringRpcServer(connection: ActorRef,
exchangeName: String,
requestHandler: se.scalablesolutions.akka.util.Function[String,String]): RpcServerHandle = {
newStringRpcServer(connection, exchangeName, requestHandler.apply _)
}
// Needed for Java API usage
def newStringRpcServer(connection: ActorRef,
exchangeName: String,
requestHandler: se.scalablesolutions.akka.util.Function[String,String],
routingKey: String): RpcServerHandle = {
newStringRpcServer(connection, exchangeName, requestHandler.apply _, Some(routingKey))
}
// Needed for Java API usage
def newStringRpcServer(connection: ActorRef,
exchangeName: String,
requestHandler: se.scalablesolutions.akka.util.Function[String,String],
routingKey: String,
queueName: String): RpcServerHandle = {
newStringRpcServer(connection, exchangeName, requestHandler.apply _, Some(routingKey), Some(queueName))
} }
def newStringRpcServer(connection: ActorRef, def newStringRpcServer(connection: ActorRef,
exchangeName: String, exchangeName: String,
requestHandler: String => String, requestHandler: String => String,
routingKey: Option[String] = None, routingKey: Option[String] = None,
queueName: Option[String] = None, queueName: Option[String] = None): RpcServerHandle = {
durable: Boolean = false,
autoDelete: Boolean = true): RpcServerHandle = {
val serializer = new RpcServerSerializer[String, String]( val serializer = new RpcServerSerializer[String, String](
new FromBinary[String] { new FromBinary[String] {
@ -124,15 +276,25 @@ object RPC {
def toBinary(t: String) = t.getBytes def toBinary(t: String) = t.getBytes
}) })
startServer(connection, exchangeName, requestHandler, routingKey, queueName, durable, autoDelete, serializer) startServer(connection, exchangeName, requestHandler, routingKey, queueName, serializer)
}
// Needed for Java API usage
def newStringRpcClient(connection: ActorRef,
exchange: String): RpcClient[String, String] = {
newStringRpcClient(connection, exchange, None)
}
// Needed for Java API usage
def newStringRpcClient(connection: ActorRef,
exchange: String,
routingKey: String): RpcClient[String, String] = {
newStringRpcClient(connection, exchange, Some(routingKey))
} }
def newStringRpcClient(connection: ActorRef, def newStringRpcClient(connection: ActorRef,
exchange: String, exchange: String,
routingKey: Option[String] = None, routingKey: Option[String] = None): RpcClient[String, String] = {
durable: Boolean = false,
autoDelete: Boolean = true,
passive: Boolean = true): RpcClient[String, String] = {
val serializer = new RpcClientSerializer[String, String]( val serializer = new RpcClientSerializer[String, String](
@ -144,15 +306,12 @@ object RPC {
} }
}) })
startClient(connection, exchange, routingKey, durable, autoDelete, passive, serializer) startClient(connection, exchange, routingKey, serializer)
} }
private def startClient[O, I](connection: ActorRef, private def startClient[O, I](connection: ActorRef,
exchangeName: String, exchangeName: String,
routingKey: Option[String] = None, routingKey: Option[String] = None,
durable: Boolean = false,
autoDelete: Boolean = true,
passive: Boolean = true,
serializer: RpcClientSerializer[O, I]): RpcClient[O, I] = { serializer: RpcClientSerializer[O, I]): RpcClient[O, I] = {
val rKey = routingKey.getOrElse("%s.request".format(exchangeName)) val rKey = routingKey.getOrElse("%s.request".format(exchangeName))
@ -166,8 +325,6 @@ object RPC {
requestHandler: I => O, requestHandler: I => O,
routingKey: Option[String] = None, routingKey: Option[String] = None,
queueName: Option[String] = None, queueName: Option[String] = None,
durable: Boolean = false,
autoDelete: Boolean = true,
serializer: RpcServerSerializer[I, O]): RpcServerHandle = { serializer: RpcServerSerializer[I, O]): RpcServerHandle = {
val rKey = routingKey.getOrElse("%s.request".format(exchangeName)) val rKey = routingKey.getOrElse("%s.request".format(exchangeName))

View file

@ -18,31 +18,26 @@ class AMQPConsumerMessageTestIntegration extends JUnitSuite with MustMatchers {
@Test @Test
def consumerMessage = AMQPTest.withCleanEndState { def consumerMessage = AMQPTest.withCleanEndState {
val connection = AMQP.newConnection() val connection = AMQP.newConnection()
try { val countDown = new CountDownLatch(2)
val channelCallback = actor {
val countDown = new CountDownLatch(2) case Started => countDown.countDown
val channelCallback = actor { case Restarting => ()
case Started => countDown.countDown case Stopped => ()
case Restarting => ()
case Stopped => ()
}
val exchangeParameters = ExchangeParameters("text_exchange")
val channelParameters = ChannelParameters(channelCallback = Some(channelCallback))
val payloadLatch = new StandardLatch
val consumer = AMQP.newConsumer(connection, ConsumerParameters("non.interesting.routing.key", actor {
case Delivery(payload, _, _, _, _) => payloadLatch.open
}, exchangeParameters = Some(exchangeParameters), channelParameters = Some(channelParameters)))
val producer = AMQP.newProducer(connection,
ProducerParameters(Some(exchangeParameters), channelParameters = Some(channelParameters)))
countDown.await(2, TimeUnit.SECONDS) must be (true)
producer ! Message("some_payload".getBytes, "non.interesting.routing.key")
payloadLatch.tryAwait(2, TimeUnit.SECONDS) must be (true)
} finally {
connection.stop
} }
val exchangeParameters = ExchangeParameters("text_exchange")
val channelParameters = ChannelParameters(channelCallback = Some(channelCallback))
val payloadLatch = new StandardLatch
val consumer = AMQP.newConsumer(connection, ConsumerParameters("non.interesting.routing.key", actor {
case Delivery(payload, _, _, _, _) => payloadLatch.open
}, exchangeParameters = Some(exchangeParameters), channelParameters = Some(channelParameters)))
val producer = AMQP.newProducer(connection,
ProducerParameters(Some(exchangeParameters), channelParameters = Some(channelParameters)))
countDown.await(2, TimeUnit.SECONDS) must be (true)
producer ! Message("some_payload".getBytes, "non.interesting.routing.key")
payloadLatch.tryAwait(2, TimeUnit.SECONDS) must be (true)
} }
} }

View file

@ -29,7 +29,7 @@ class AMQPProtobufProducerConsumerTestIntegration extends JUnitSuite with MustMa
assert(response.getHostname == request.getHostname.reverse) assert(response.getHostname == request.getHostname.reverse)
responseLatch.open responseLatch.open
} }
AMQP.newProtobufConsumer(connection, responseHandler, None, Some("proto.reply.key")) AMQP.newProtobufConsumer(connection, responseHandler _, None, Some("proto.reply.key"))
val producer = AMQP.newProtobufProducer[AddressProtocol](connection, Some("protoexchange")) val producer = AMQP.newProtobufProducer[AddressProtocol](connection, Some("protoexchange"))
producer.send(request, Some("proto.reply.key")) producer.send(request, Some("proto.reply.key"))

View file

@ -42,7 +42,7 @@ class AMQPRpcClientServerTestIntegration extends JUnitSuite with MustMatchers {
def requestHandler(request: String) = 3 def requestHandler(request: String) = 3
val rpcServer = RPC.newRpcServer[String, Int](connection, exchangeName, "rpc.routing", rpcServerSerializer, val rpcServer = RPC.newRpcServer[String, Int](connection, exchangeName, "rpc.routing", rpcServerSerializer,
requestHandler, channelParameters = Some(channelParameters)) requestHandler _, channelParameters = Some(channelParameters))
val rpcClientSerializer = new RpcClientSerializer[String, Int]( val rpcClientSerializer = new RpcClientSerializer[String, Int](
new ToBinary[String] { new ToBinary[String] {

View file

@ -18,7 +18,7 @@ class AMQPRpcStringTestIntegration extends JUnitSuite with MustMatchers {
val connection = AMQP.newConnection() val connection = AMQP.newConnection()
RPC.newStringRpcServer(connection, "stringservice", requestHandler) RPC.newStringRpcServer(connection, "stringservice", requestHandler _)
val protobufClient = RPC.newStringRpcClient(connection, "stringservice") val protobufClient = RPC.newStringRpcClient(connection, "stringservice")

View file

@ -20,7 +20,7 @@ class AMQPStringProducerConsumerTestIntegration extends JUnitSuite with MustMatc
val responseLatch = new StandardLatch val responseLatch = new StandardLatch
RPC.newStringRpcServer(connection, "stringexchange", requestHandler) RPC.newStringRpcServer(connection, "stringexchange", requestHandler _)
val request = "somemessage" val request = "somemessage"
@ -29,7 +29,7 @@ class AMQPStringProducerConsumerTestIntegration extends JUnitSuite with MustMatc
assert(response == request.reverse) assert(response == request.reverse)
responseLatch.open responseLatch.open
} }
AMQP.newStringConsumer(connection, responseHandler, None, Some("string.reply.key")) AMQP.newStringConsumer(connection, responseHandler _, None, Some("string.reply.key"))
val producer = AMQP.newStringProducer(connection, Some("stringexchange")) val producer = AMQP.newStringProducer(connection, Some("stringexchange"))
producer.send(request, Some("string.reply.key")) producer.send(request, Some("string.reply.key"))

View file

@ -10,9 +10,13 @@ object AMQPTest {
def withCleanEndState(action: => Unit) { def withCleanEndState(action: => Unit) {
try { try {
action try {
} finally { action
AMQP.shutdownAll } finally {
AMQP.shutdownAll
}
} catch {
case e => println(e)
} }
} }
} }