diff --git a/akka-amqp/src/main/java/se/scalablesolutions/akka/amqp/ExampleSessionJava.java b/akka-amqp/src/main/java/se/scalablesolutions/akka/amqp/ExampleSessionJava.java index 6dd8c3dac3..19b12d4c64 100644 --- a/akka-amqp/src/main/java/se/scalablesolutions/akka/amqp/ExampleSessionJava.java +++ b/akka-amqp/src/main/java/se/scalablesolutions/akka/amqp/ExampleSessionJava.java @@ -1,16 +1,22 @@ package se.scalablesolutions.akka.amqp; +import org.multiverse.api.latches.StandardLatch; +import scala.Option; import se.scalablesolutions.akka.actor.ActorRef; import se.scalablesolutions.akka.actor.ActorRegistry; import se.scalablesolutions.akka.actor.UntypedActor; import se.scalablesolutions.akka.actor.UntypedActorFactory; +import se.scalablesolutions.akka.amqp.rpc.RPC; import se.scalablesolutions.akka.remote.protocol.RemoteProtocol; + +import se.scalablesolutions.akka.util.Function; import se.scalablesolutions.akka.util.Procedure; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +@SuppressWarnings({"unchecked"}) public class ExampleSessionJava { public static void main(String... args) { @@ -30,6 +36,11 @@ public class ExampleSessionJava { printTopic("EASY PROTOBUF PRODUCER AND CONSUMER"); easyProtobufProducerConsumer(); + printTopic("EASY STRING RPC"); + easyStringRpc(); + + printTopic("EASY PROTOBUF RPC"); + easyProtobufRpc(); // postStop everything the amqp tree except the main AMQP supervisor // all connections/consumers/producers will be stopped @@ -57,7 +68,7 @@ public class ExampleSessionJava { // defaults to amqp://guest:guest@localhost:5672/ ActorRef connection = AMQP.newConnection(); - AMQP.ExchangeParameters exchangeParameters = new AMQP.ExchangeParameters("my_direct_exchange", new ExchangeType.Direct()); + AMQP.ExchangeParameters exchangeParameters = new AMQP.ExchangeParameters("my_direct_exchange", Direct.getInstance()); ActorRef deliveryHandler = UntypedActor.actorOf(DirectDeliveryHandlerActor.class); @@ -86,7 +97,7 @@ public class ExampleSessionJava { }); channelCallback.start(); - AMQP.ExchangeParameters exchangeParameters = new AMQP.ExchangeParameters("my_callback_exchange", new ExchangeType.Direct()); + AMQP.ExchangeParameters exchangeParameters = new AMQP.ExchangeParameters("my_callback_exchange", Direct.getInstance()); AMQP.ChannelParameters channelParameters = new AMQP.ChannelParameters(channelCallback); ActorRef dummyHandler = UntypedActor.actorOf(DummyActor.class); @@ -133,7 +144,7 @@ public class ExampleSessionJava { ActorRef connection = AMQP.newConnection(); String exchangeName = "easy.protobuf"; - + Procedure procedure = new Procedure() { public void apply(RemoteProtocol.AddressProtocol message) { System.out.println("### >> Received message: " + message); @@ -146,6 +157,66 @@ public class ExampleSessionJava { producerClient.send(RemoteProtocol.AddressProtocol.newBuilder().setHostname("akkarocks.com").setPort(1234).build()); } + + public void easyStringRpc() { + + ActorRef connection = AMQP.newConnection(); + + String exchangeName = "easy.stringrpc"; + + // listen by default to: + // exchange = exchangeName + // routingKey = .request + // queueName = .in + RPC.newStringRpcServer(connection, exchangeName, new Function() { + public String apply(String request) { + System.out.println("### >> Got request: " + request); + return "Response to: '" + request + "'"; + } + }); + + // send by default to: + // exchange = exchangeName + // routingKey = .request + RPC.RpcClient stringRpcClient = RPC.newStringRpcClient(connection, exchangeName); + + Option response = stringRpcClient.call("AMQP Rocks!"); + System.out.println("### >> Got response: " + response); + + final StandardLatch standardLatch = new StandardLatch(); + stringRpcClient.callAsync("AMQP is dead easy", new Procedure() { + public void apply(String request) { + System.out.println("### >> This is handled async: " + request); + standardLatch.open(); + } + }); + try { + standardLatch.tryAwait(2, TimeUnit.SECONDS); + } catch (InterruptedException ignore) { + } + } + + + public void easyProtobufRpc() { + + ActorRef connection = AMQP.newConnection(); + + String exchangeName = "easy.protobuf.rpc"; + + RPC.newProtobufRpcServer(connection, exchangeName, new Function() { + public RemoteProtocol.AddressProtocol apply(RemoteProtocol.AddressProtocol request) { + return RemoteProtocol.AddressProtocol.newBuilder().setHostname(request.getHostname()).setPort(request.getPort()).build(); + } + }, RemoteProtocol.AddressProtocol.class); + + RPC.RpcClient protobufRpcClient = + RPC.newProtobufRpcClient(connection, exchangeName, RemoteProtocol.AddressProtocol.class); + + scala.Option response = + protobufRpcClient.call(RemoteProtocol.AddressProtocol.newBuilder().setHostname("localhost").setPort(4321).build()); + + System.out.println("### >> Got response: " + response); + } } class DummyActor extends UntypedActor { @@ -163,11 +234,11 @@ class ChannelCallbackActor extends UntypedActor { } public void onReceive(Object message) throws Exception { - if (Started.class.isAssignableFrom(message.getClass())) { + if (Started.getInstance().getClass().isAssignableFrom(message.getClass())) { System.out.println("### >> Channel callback: Started"); channelCountdown.countDown(); - } else if (Restarting.class.isAssignableFrom(message.getClass())) { - } else if (Stopped.class.isAssignableFrom(message.getClass())) { + } else if (Restarting.getInstance().getClass().isAssignableFrom(message.getClass())) { + } else if (Stopped.getInstance().getClass().isAssignableFrom(message.getClass())) { System.out.println("### >> Channel callback: Stopped"); } else throw new IllegalArgumentException("Unknown message: " + message); } @@ -176,10 +247,10 @@ class ChannelCallbackActor extends UntypedActor { class ConnectionCallbackActor extends UntypedActor { public void onReceive(Object message) throws Exception { - if (Connected.class.isAssignableFrom(message.getClass())) { + if (Connected.getInstance().getClass().isAssignableFrom(message.getClass())) { System.out.println("### >> Connection callback: Connected!"); - } else if (Reconnecting.class.isAssignableFrom(message.getClass())) { - } else if (Disconnected.class.isAssignableFrom(message.getClass())) { + } else if (Reconnecting.getInstance().getClass().isAssignableFrom(message.getClass())) { + } else if (Disconnected.getInstance().getClass().isAssignableFrom(message.getClass())) { System.out.println("### >> Connection callback: Disconnected!"); } else throw new IllegalArgumentException("Unknown message: " + message); } diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala index 2373219a30..5a56502de8 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala @@ -74,8 +74,12 @@ object AMQP { * Declaration type used for either exchange or queue declaration */ sealed trait Declaration - case object NoActionDeclaration extends Declaration - case object PassiveDeclaration extends Declaration + case object NoActionDeclaration extends Declaration { + def getInstance() = this // Needed for Java API usage + } + case object PassiveDeclaration extends Declaration { + def getInstance() = this // Needed for Java API usage + } case class ActiveDeclaration(durable: Boolean = false, autoDelete: Boolean = true, exclusive: Boolean = false) extends Declaration { // Needed for Java API usage @@ -90,13 +94,13 @@ object AMQP { */ case class ExchangeParameters( exchangeName: String, - exchangeType: ExchangeType = ExchangeType.Topic(), + exchangeType: ExchangeType = Topic, exchangeDeclaration: Declaration = ActiveDeclaration(), configurationArguments: Map[String, AnyRef] = Map.empty) { // Needed for Java API usage def this(exchangeName: String) = - this (exchangeName, ExchangeType.Topic(), ActiveDeclaration(), Map.empty) + this (exchangeName, Topic, ActiveDeclaration(), Map.empty) // Needed for Java API usage def this(exchangeName: String, exchangeType: ExchangeType) = diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQPMessage.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQPMessage.scala index 26fab46799..7f1ef053de 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQPMessage.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQPMessage.scala @@ -44,18 +44,30 @@ case class Delivery( // connection messages case object Connect extends AMQPMessage -case class Connected() extends AMQPMessage // Needed for Java API usage -case class Reconnecting() extends AMQPMessage // Needed for Java API usage -case class Disconnected() extends AMQPMessage // Needed for Java API usage +case object Connected extends AMQPMessage { + def getInstance() = this // Needed for Java API usage +} +case object Reconnecting extends AMQPMessage { + def getInstance() = this // Needed for Java API usage +} +case object Disconnected extends AMQPMessage { + def getInstance() = this // Needed for Java API usage +} case object ChannelRequest extends InternalAMQPMessage // channel messages case object Start extends AMQPMessage -case class Started() extends AMQPMessage // Needed for Java API usage -case class Restarting() extends AMQPMessage // Needed for Java API usage -case class Stopped() extends AMQPMessage // Needed for Java API usage +case object Started extends AMQPMessage { + def getInstance() = this // Needed for Java API usage +} +case object Restarting extends AMQPMessage { + def getInstance() = this // Needed for Java API usage +} +case object Stopped extends AMQPMessage { + def getInstance() = this // Needed for Java API usage +} // delivery messages case class Acknowledge(deliveryTag: Long) extends AMQPMessage diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala index b0fc0dd59f..00756aa959 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala @@ -12,7 +12,6 @@ import java.util.concurrent.{CountDownLatch, TimeUnit} import java.lang.String import se.scalablesolutions.akka.amqp.AMQP._ import se.scalablesolutions.akka.remote.protocol.RemoteProtocol.AddressProtocol -import se.scalablesolutions.akka.amqp.ExchangeType._ object ExampleSession { @@ -68,7 +67,7 @@ object ExampleSession { // defaults to amqp://guest:guest@localhost:5672/ val connection = AMQP.newConnection() - val exchangeParameters = ExchangeParameters("my_direct_exchange", Direct()) + val exchangeParameters = ExchangeParameters("my_direct_exchange", Direct) val consumer = AMQP.newConsumer(connection, ConsumerParameters("some.routing", actor { case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload)) @@ -83,7 +82,7 @@ object ExampleSession { // defaults to amqp://guest:guest@localhost:5672/ val connection = AMQP.newConnection() - val exchangeParameters = ExchangeParameters("my_fanout_exchange", Fanout()) + val exchangeParameters = ExchangeParameters("my_fanout_exchange", Fanout) val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters("@george_bush", actor { case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload)) @@ -102,7 +101,7 @@ object ExampleSession { // defaults to amqp://guest:guest@localhost:5672/ val connection = AMQP.newConnection() - val exchangeParameters = ExchangeParameters("my_topic_exchange", Topic()) + val exchangeParameters = ExchangeParameters("my_topic_exchange", Topic) val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters("@george_bush", actor { case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload)) @@ -136,7 +135,7 @@ object ExampleSession { case Restarting => // not used, sent when channel or connection fails and initiates a restart case Stopped => log.info("Channel callback: Stopped") } - val exchangeParameters = ExchangeParameters("my_callback_exchange", Direct()) + val exchangeParameters = ExchangeParameters("my_callback_exchange", Direct) val channelParameters = ChannelParameters(channelCallback = Some(channelCallback)) val consumer = AMQP.newConsumer(connection, ConsumerParameters("callback.routing", actor { @@ -202,7 +201,7 @@ object ExampleSession { def requestHandler(request: String) = 3 val rpcServer = RPC.newRpcServer[String,Int](connection, exchangeName, "rpc.in.key", rpcServerSerializer, - requestHandler, queueName = Some("rpc.in.key.queue")) + requestHandler _, queueName = Some("rpc.in.key.queue")) /** Client */ diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExchangeType.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExchangeType.scala index 38a0073c30..2c35a017e4 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExchangeType.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExchangeType.scala @@ -5,17 +5,19 @@ package se.scalablesolutions.akka.amqp sealed trait ExchangeType -object ExchangeType { - case class Direct() extends ExchangeType { - override def toString = "direct" - } - case class Topic() extends ExchangeType { - override def toString = "topic" - } - case class Fanout() extends ExchangeType { - override def toString = "fanout" - } - case class Match() extends ExchangeType { - override def toString = "match" - } +case object Direct extends ExchangeType { + def getInstance() = this // Needed for Java API usage + override def toString = "direct" +} +case object Topic extends ExchangeType { + def getInstance() = this // Needed for Java API usage + override def toString = "topic" +} +case object Fanout extends ExchangeType { + def getInstance() = this // Needed for Java API usage + override def toString = "fanout" +} +case object Match extends ExchangeType { + def getInstance() = this // Needed for Java API usage + override def toString = "match" } diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantChannelActor.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantChannelActor.scala index d2333f8b7c..6617c62a44 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantChannelActor.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantChannelActor.scala @@ -46,14 +46,14 @@ abstract private[amqp] class FaultTolerantChannelActor( } else { // channel error log.error(cause, "%s self restarting because of channel shutdown", toString) - notifyCallback(Restarting()) + notifyCallback(Restarting) self ! Start } } case Failure(cause) => log.error(cause, "%s self restarting because of channel failure", toString) closeChannel - notifyCallback(Restarting()) + notifyCallback(Restarting) self ! Start } @@ -81,7 +81,7 @@ abstract private[amqp] class FaultTolerantChannelActor( setupChannel(ch) channel = Some(ch) - notifyCallback(Started()) + notifyCallback(Started) log.info("Channel setup for %s", toString) } @@ -89,7 +89,7 @@ abstract private[amqp] class FaultTolerantChannelActor( channel.foreach { ch => if (ch.isOpen) ch.close - notifyCallback(Stopped()) + notifyCallback(Stopped) log.info("%s channel closed", toString) } channel = None @@ -100,7 +100,7 @@ abstract private[amqp] class FaultTolerantChannelActor( } override def preRestart(reason: Throwable) = { - notifyCallback(Restarting()) + notifyCallback(Restarting) closeChannel } diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantConnectionActor.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantConnectionActor.scala index 07c6f14954..72a897dccf 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantConnectionActor.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantConnectionActor.scala @@ -72,7 +72,7 @@ private[amqp] class FaultTolerantConnectionActor(connectionParameters: Connectio log.info("Successfully (re)connected to AMQP Server %s:%s [%s]", host, port, self.id) log.debug("Sending new channel to %d already linked actors", self.linkedActorsAsList.size) self.linkedActorsAsList.foreach(_ ! conn.createChannel) - notifyCallback(Connected()) + notifyCallback(Connected) } } catch { case e: Exception => @@ -81,7 +81,7 @@ private[amqp] class FaultTolerantConnectionActor(connectionParameters: Connectio , connectionParameters.initReconnectDelay, self.id) reconnectionTimer.schedule(new TimerTask() { override def run = { - notifyCallback(Reconnecting()) + notifyCallback(Reconnecting) self ! Connect } }, connectionParameters.initReconnectDelay) @@ -92,7 +92,7 @@ private[amqp] class FaultTolerantConnectionActor(connectionParameters: Connectio try { connection.foreach(_.close) log.debug("Disconnected AMQP connection at %s:%s [%s]", host, port, self.id) - notifyCallback(Disconnected()) + notifyCallback(Disconnected) } catch { case e: IOException => log.error("Could not close AMQP connection %s:%s [%s]", host, port, self.id) case _ => () @@ -114,7 +114,7 @@ private[amqp] class FaultTolerantConnectionActor(connectionParameters: Connectio override def preRestart(reason: Throwable) = disconnect override def postRestart(reason: Throwable) = { - notifyCallback(Reconnecting()) + notifyCallback(Reconnecting) connect } } diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala index 394315a68b..87800c7742 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala @@ -5,21 +5,80 @@ import com.google.protobuf.Message import se.scalablesolutions.akka.actor.{Actor, ActorRef} import Actor._ import se.scalablesolutions.akka.amqp._ +import se.scalablesolutions.akka.util.Procedure +import reflect.Manifest object RPC { + // Needed for Java API usage + def newRpcClient[O, I](connection: ActorRef, + exchangeName: String, + routingKey: String, + serializer: RpcClientSerializer[O, I]): ActorRef = { + newRpcClient(connection, exchangeName, routingKey, serializer, None) + } + + // Needed for Java API usage + def newRpcClient[O, I](connection: ActorRef, + exchangeName: String, + routingKey: String, + serializer: RpcClientSerializer[O, I], + channelParameters: ChannelParameters): ActorRef = { + newRpcClient(connection, exchangeName, routingKey, serializer, Some(channelParameters)) + } + def newRpcClient[O, I](connection: ActorRef, exchangeName: String, routingKey: String, serializer: RpcClientSerializer[O, I], channelParameters: Option[ChannelParameters] = None): ActorRef = { val rpcActor: ActorRef = actorOf(new RpcClientActor[O, I]( - ExchangeParameters(exchangeName), routingKey, serializer, channelParameters)) + ExchangeParameters(exchangeName, exchangeDeclaration = PassiveDeclaration), routingKey, serializer, channelParameters)) connection.startLink(rpcActor) rpcActor ! Start rpcActor } + // Needed for Java API usage + def newRpcServer[I, O](connection: ActorRef, + exchangeName: String, + routingKey: String, + serializer: RpcServerSerializer[I, O], + requestHandler: se.scalablesolutions.akka.util.Function[I,O]): RpcServerHandle = { + newRpcServer(connection, exchangeName, routingKey, serializer, requestHandler.apply _) + } + + // Needed for Java API usage + def newRpcServer[I, O](connection: ActorRef, + exchangeName: String, + routingKey: String, + serializer: RpcServerSerializer[I, O], + requestHandler: se.scalablesolutions.akka.util.Function[I,O], + queueName: String): RpcServerHandle = { + newRpcServer(connection, exchangeName, routingKey, serializer, requestHandler.apply _, Some(queueName)) + } + + // Needed for Java API usage + def newRpcServer[I, O](connection: ActorRef, + exchangeName: String, + routingKey: String, + serializer: RpcServerSerializer[I, O], + requestHandler: se.scalablesolutions.akka.util.Function[I,O], + channelParameters: ChannelParameters): RpcServerHandle = { + newRpcServer(connection, exchangeName, routingKey, serializer, requestHandler.apply _, None, Some(channelParameters)) + } + + // Needed for Java API usage + def newRpcServer[I, O](connection: ActorRef, + exchangeName: String, + routingKey: String, + serializer: RpcServerSerializer[I, O], + requestHandler: se.scalablesolutions.akka.util.Function[I,O], + queueName: String, + channelParameters: ChannelParameters): RpcServerHandle = { + newRpcServer(connection, exchangeName, routingKey, serializer, requestHandler.apply _, Some(queueName), Some(channelParameters)) + } + def newRpcServer[I, O](connection: ActorRef, exchangeName: String, routingKey: String, @@ -52,10 +111,28 @@ object RPC { * RPC convenience */ class RpcClient[O, I](client: ActorRef){ + + // Needed for Java API usage + def call(request: O): Option[I] = { + call(request, 5000) + } + def call(request: O, timeout: Long = 5000): Option[I] = { (client.!!(request, timeout)).as[I] } + // Needed for Java API usage + def callAsync(request: O, responseHandler: Procedure[I]): Unit = { + callAsync(request, 5000, responseHandler) + } + + // Needed for Java API usage + def callAsync(request: O, timeout: Long, responseHandler: Procedure[I]): Unit = { + callAsync(request, timeout){ + case Some(response) => responseHandler.apply(response) + } + } + def callAsync(request: O, timeout: Long = 5000)(responseHandler: PartialFunction[Option[I],Unit]) = { spawn { val result = call(request, timeout) @@ -65,14 +142,49 @@ object RPC { def stop = client.stop } + + // Needed for Java API usage + def newProtobufRpcServer[I <: Message, O <: Message]( + connection: ActorRef, + exchangeName: String, + requestHandler: se.scalablesolutions.akka.util.Function[I,O], + resultClazz: Class[I]): RpcServerHandle = { + + implicit val manifest = Manifest.classType[I](resultClazz) + newProtobufRpcServer(connection, exchangeName, requestHandler.apply _) + } + + // Needed for Java API usage + def newProtobufRpcServer[I <: Message, O <: Message]( + connection: ActorRef, + exchangeName: String, + requestHandler: se.scalablesolutions.akka.util.Function[I,O], + routingKey: String, + resultClazz: Class[I]): RpcServerHandle = { + + implicit val manifest = Manifest.classType[I](resultClazz) + newProtobufRpcServer(connection, exchangeName, requestHandler.apply _, Some(routingKey)) + } + + // Needed for Java API usage + def newProtobufRpcServer[I <: Message, O <: Message]( + connection: ActorRef, + exchangeName: String, + requestHandler: se.scalablesolutions.akka.util.Function[I,O], + routingKey: String, + queueName: String, + resultClazz: Class[I]): RpcServerHandle = { + + implicit val manifest = Manifest.classType[I](resultClazz) + newProtobufRpcServer(connection, exchangeName, requestHandler.apply _, Some(routingKey), Some(queueName)) + } + def newProtobufRpcServer[I <: Message, O <: Message]( connection: ActorRef, exchangeName: String, requestHandler: I => O, routingKey: Option[String] = None, - queueName: Option[String] = None, - durable: Boolean = false, - autoDelete: Boolean = true)(implicit manifest: Manifest[I]): RpcServerHandle = { + queueName: Option[String] = None)(implicit manifest: Manifest[I]): RpcServerHandle = { val serializer = new RpcServerSerializer[I, O]( new FromBinary[I] { @@ -83,16 +195,34 @@ object RPC { def toBinary(t: O) = t.toByteArray }) - startServer(connection, exchangeName, requestHandler, routingKey, queueName, durable, autoDelete, serializer) + startServer(connection, exchangeName, requestHandler, routingKey, queueName, serializer) + } + + // Needed for Java API usage + def newProtobufRpcClient[O <: Message, I <: Message]( + connection: ActorRef, + exchangeName: String, + resultClazz: Class[I]): RpcClient[O, I] = { + + implicit val manifest = Manifest.classType[I](resultClazz) + newProtobufRpcClient(connection, exchangeName, None) + } + + // Needed for Java API usage + def newProtobufRpcClient[O <: Message, I <: Message]( + connection: ActorRef, + exchangeName: String, + routingKey: String, + resultClazz: Class[I]): RpcClient[O, I] = { + + implicit val manifest = Manifest.classType[I](resultClazz) + newProtobufRpcClient(connection, exchangeName, Some(routingKey)) } def newProtobufRpcClient[O <: Message, I <: Message]( connection: ActorRef, exchangeName: String, - routingKey: Option[String] = None, - durable: Boolean = false, - autoDelete: Boolean = true, - passive: Boolean = true)(implicit manifest: Manifest[I]): RpcClient[O, I] = { + routingKey: Option[String] = None)(implicit manifest: Manifest[I]): RpcClient[O, I] = { val serializer = new RpcClientSerializer[O, I]( @@ -104,16 +234,38 @@ object RPC { } }) - startClient(connection, exchangeName, routingKey, durable, autoDelete, passive, serializer) + startClient(connection, exchangeName, routingKey, serializer) + } + + // Needed for Java API usage + def newStringRpcServer(connection: ActorRef, + exchangeName: String, + requestHandler: se.scalablesolutions.akka.util.Function[String,String]): RpcServerHandle = { + newStringRpcServer(connection, exchangeName, requestHandler.apply _) + } + + // Needed for Java API usage + def newStringRpcServer(connection: ActorRef, + exchangeName: String, + requestHandler: se.scalablesolutions.akka.util.Function[String,String], + routingKey: String): RpcServerHandle = { + newStringRpcServer(connection, exchangeName, requestHandler.apply _, Some(routingKey)) + } + + // Needed for Java API usage + def newStringRpcServer(connection: ActorRef, + exchangeName: String, + requestHandler: se.scalablesolutions.akka.util.Function[String,String], + routingKey: String, + queueName: String): RpcServerHandle = { + newStringRpcServer(connection, exchangeName, requestHandler.apply _, Some(routingKey), Some(queueName)) } def newStringRpcServer(connection: ActorRef, exchangeName: String, requestHandler: String => String, routingKey: Option[String] = None, - queueName: Option[String] = None, - durable: Boolean = false, - autoDelete: Boolean = true): RpcServerHandle = { + queueName: Option[String] = None): RpcServerHandle = { val serializer = new RpcServerSerializer[String, String]( new FromBinary[String] { @@ -124,15 +276,25 @@ object RPC { def toBinary(t: String) = t.getBytes }) - startServer(connection, exchangeName, requestHandler, routingKey, queueName, durable, autoDelete, serializer) + startServer(connection, exchangeName, requestHandler, routingKey, queueName, serializer) + } + + // Needed for Java API usage + def newStringRpcClient(connection: ActorRef, + exchange: String): RpcClient[String, String] = { + newStringRpcClient(connection, exchange, None) + } + + // Needed for Java API usage + def newStringRpcClient(connection: ActorRef, + exchange: String, + routingKey: String): RpcClient[String, String] = { + newStringRpcClient(connection, exchange, Some(routingKey)) } def newStringRpcClient(connection: ActorRef, exchange: String, - routingKey: Option[String] = None, - durable: Boolean = false, - autoDelete: Boolean = true, - passive: Boolean = true): RpcClient[String, String] = { + routingKey: Option[String] = None): RpcClient[String, String] = { val serializer = new RpcClientSerializer[String, String]( @@ -144,15 +306,12 @@ object RPC { } }) - startClient(connection, exchange, routingKey, durable, autoDelete, passive, serializer) + startClient(connection, exchange, routingKey, serializer) } private def startClient[O, I](connection: ActorRef, exchangeName: String, routingKey: Option[String] = None, - durable: Boolean = false, - autoDelete: Boolean = true, - passive: Boolean = true, serializer: RpcClientSerializer[O, I]): RpcClient[O, I] = { val rKey = routingKey.getOrElse("%s.request".format(exchangeName)) @@ -166,8 +325,6 @@ object RPC { requestHandler: I => O, routingKey: Option[String] = None, queueName: Option[String] = None, - durable: Boolean = false, - autoDelete: Boolean = true, serializer: RpcServerSerializer[I, O]): RpcServerHandle = { val rKey = routingKey.getOrElse("%s.request".format(exchangeName)) diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerMessageTestIntegration.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerMessageTestIntegration.scala index 0fed1951db..baa6b4e551 100644 --- a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerMessageTestIntegration.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPConsumerMessageTestIntegration.scala @@ -18,31 +18,26 @@ class AMQPConsumerMessageTestIntegration extends JUnitSuite with MustMatchers { @Test def consumerMessage = AMQPTest.withCleanEndState { val connection = AMQP.newConnection() - try { - - val countDown = new CountDownLatch(2) - val channelCallback = actor { - case Started => countDown.countDown - case Restarting => () - case Stopped => () - } - - val exchangeParameters = ExchangeParameters("text_exchange") - val channelParameters = ChannelParameters(channelCallback = Some(channelCallback)) - - val payloadLatch = new StandardLatch - val consumer = AMQP.newConsumer(connection, ConsumerParameters("non.interesting.routing.key", actor { - case Delivery(payload, _, _, _, _) => payloadLatch.open - }, exchangeParameters = Some(exchangeParameters), channelParameters = Some(channelParameters))) - - val producer = AMQP.newProducer(connection, - ProducerParameters(Some(exchangeParameters), channelParameters = Some(channelParameters))) - - countDown.await(2, TimeUnit.SECONDS) must be (true) - producer ! Message("some_payload".getBytes, "non.interesting.routing.key") - payloadLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) - } finally { - connection.stop + val countDown = new CountDownLatch(2) + val channelCallback = actor { + case Started => countDown.countDown + case Restarting => () + case Stopped => () } + + val exchangeParameters = ExchangeParameters("text_exchange") + val channelParameters = ChannelParameters(channelCallback = Some(channelCallback)) + + val payloadLatch = new StandardLatch + val consumer = AMQP.newConsumer(connection, ConsumerParameters("non.interesting.routing.key", actor { + case Delivery(payload, _, _, _, _) => payloadLatch.open + }, exchangeParameters = Some(exchangeParameters), channelParameters = Some(channelParameters))) + + val producer = AMQP.newProducer(connection, + ProducerParameters(Some(exchangeParameters), channelParameters = Some(channelParameters))) + + countDown.await(2, TimeUnit.SECONDS) must be (true) + producer ! Message("some_payload".getBytes, "non.interesting.routing.key") + payloadLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) } } diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProtobufProducerConsumerTestIntegration.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProtobufProducerConsumerTestIntegration.scala index d6d719d241..1502d1e40b 100644 --- a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProtobufProducerConsumerTestIntegration.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPProtobufProducerConsumerTestIntegration.scala @@ -29,7 +29,7 @@ class AMQPProtobufProducerConsumerTestIntegration extends JUnitSuite with MustMa assert(response.getHostname == request.getHostname.reverse) responseLatch.open } - AMQP.newProtobufConsumer(connection, responseHandler, None, Some("proto.reply.key")) + AMQP.newProtobufConsumer(connection, responseHandler _, None, Some("proto.reply.key")) val producer = AMQP.newProtobufProducer[AddressProtocol](connection, Some("protoexchange")) producer.send(request, Some("proto.reply.key")) diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcClientServerTestIntegration.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcClientServerTestIntegration.scala index d322b243c8..b4f2a49939 100644 --- a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcClientServerTestIntegration.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcClientServerTestIntegration.scala @@ -42,7 +42,7 @@ class AMQPRpcClientServerTestIntegration extends JUnitSuite with MustMatchers { def requestHandler(request: String) = 3 val rpcServer = RPC.newRpcServer[String, Int](connection, exchangeName, "rpc.routing", rpcServerSerializer, - requestHandler, channelParameters = Some(channelParameters)) + requestHandler _, channelParameters = Some(channelParameters)) val rpcClientSerializer = new RpcClientSerializer[String, Int]( new ToBinary[String] { diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcStringTestIntegration.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcStringTestIntegration.scala index 964cd94adb..fb36af74ab 100644 --- a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcStringTestIntegration.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPRpcStringTestIntegration.scala @@ -18,7 +18,7 @@ class AMQPRpcStringTestIntegration extends JUnitSuite with MustMatchers { val connection = AMQP.newConnection() - RPC.newStringRpcServer(connection, "stringservice", requestHandler) + RPC.newStringRpcServer(connection, "stringservice", requestHandler _) val protobufClient = RPC.newStringRpcClient(connection, "stringservice") diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPStringProducerConsumerTestIntegration.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPStringProducerConsumerTestIntegration.scala index 6fa7ccefd2..a9de971815 100644 --- a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPStringProducerConsumerTestIntegration.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPStringProducerConsumerTestIntegration.scala @@ -20,7 +20,7 @@ class AMQPStringProducerConsumerTestIntegration extends JUnitSuite with MustMatc val responseLatch = new StandardLatch - RPC.newStringRpcServer(connection, "stringexchange", requestHandler) + RPC.newStringRpcServer(connection, "stringexchange", requestHandler _) val request = "somemessage" @@ -29,7 +29,7 @@ class AMQPStringProducerConsumerTestIntegration extends JUnitSuite with MustMatc assert(response == request.reverse) responseLatch.open } - AMQP.newStringConsumer(connection, responseHandler, None, Some("string.reply.key")) + AMQP.newStringConsumer(connection, responseHandler _, None, Some("string.reply.key")) val producer = AMQP.newStringProducer(connection, Some("stringexchange")) producer.send(request, Some("string.reply.key")) diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPTest.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPTest.scala index ec1b6aa2f4..2a35df0a77 100644 --- a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPTest.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPTest.scala @@ -10,9 +10,13 @@ object AMQPTest { def withCleanEndState(action: => Unit) { try { - action - } finally { - AMQP.shutdownAll + try { + action + } finally { + AMQP.shutdownAll + } + } catch { + case e => println(e) } } }