diff --git a/akka-amqp/src/main/java/se/scalablesolutions/akka/amqp/ExampleSessionJava.java b/akka-amqp/src/main/java/se/scalablesolutions/akka/amqp/ExampleSessionJava.java index ad0d7c3841..a878bb6b05 100644 --- a/akka-amqp/src/main/java/se/scalablesolutions/akka/amqp/ExampleSessionJava.java +++ b/akka-amqp/src/main/java/se/scalablesolutions/akka/amqp/ExampleSessionJava.java @@ -5,9 +5,15 @@ import se.scalablesolutions.akka.actor.ActorRegistry; import se.scalablesolutions.akka.actor.UntypedActor; import se.scalablesolutions.akka.actor.UntypedActorFactory; +import se.scalablesolutions.akka.remote.protocol.RemoteProtocol; +import se.scalablesolutions.akka.util.Procedure; + import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import static se.scalablesolutions.akka.amqp.ChannelCallbacks.*; +import static se.scalablesolutions.akka.amqp.ConnectionCallbacks.*; + public class ExampleSessionJava { public static void main(String... args) { @@ -21,13 +27,21 @@ public class ExampleSessionJava { printTopic("CALLBACK"); callback(); - printTopic("Happy hAkking :-)"); + printTopic("EASY STRING PRODUCER AND CONSUMER"); + easyStringProducerConsumer(); + + printTopic("EASY PROTOBUF PRODUCER AND CONSUMER"); + easyProtobufProducerConsumer(); + // postStop everything the amqp tree except the main AMQP supervisor // all connections/consumers/producers will be stopped AMQP.shutdownAll(); ActorRegistry.shutdownAll(); + + printTopic("Happy hAkking :-)"); + System.exit(0); } @@ -46,7 +60,7 @@ public class ExampleSessionJava { // defaults to amqp://guest:guest@localhost:5672/ ActorRef connection = AMQP.newConnection(); - AMQP.ExchangeParameters exchangeParameters = new AMQP.ExchangeParameters("my_direct_exchange", ExchangeTypes.DIRECT); + AMQP.ExchangeParameters exchangeParameters = new AMQP.ExchangeParameters("my_direct_exchange", new ExchangeType.Direct()); ActorRef deliveryHandler = UntypedActor.actorOf(DirectDeliveryHandlerActor.class); @@ -75,7 +89,7 @@ public class ExampleSessionJava { }); channelCallback.start(); - AMQP.ExchangeParameters exchangeParameters = new AMQP.ExchangeParameters("my_callback_exchange", ExchangeTypes.DIRECT); + AMQP.ExchangeParameters exchangeParameters = new AMQP.ExchangeParameters("my_callback_exchange", new ExchangeType.Direct()); AMQP.ChannelParameters channelParameters = new AMQP.ChannelParameters(channelCallback); ActorRef dummyHandler = UntypedActor.actorOf(DummyActor.class); @@ -83,7 +97,7 @@ public class ExampleSessionJava { ActorRef consumer = AMQP.newConsumer(connection, consumerParameters); - ActorRef producer = AMQP.newProducer(connection, new AMQP.ProducerParameters(exchangeParameters)); + ActorRef producer = AMQP.newProducer(connection, new AMQP.ProducerParameters(exchangeParameters, channelParameters)); // Wait until both channels (producer & consumer) are started before stopping the connection try { @@ -92,6 +106,49 @@ public class ExampleSessionJava { } connection.stop(); } + + public void easyStringProducerConsumer() { + ActorRef connection = AMQP.newConnection(); + + String exchangeName = "easy.string"; + + // listen by default to: + // exchange = optional exchangeName + // routingKey = provided routingKey or .request + // queueName = .in + Procedure procedure = new Procedure() { + public void apply(String message) { + System.out.println("### >> Received message: " + message); + } + }; + AMQP.newStringConsumer(connection, procedure, exchangeName); + + // send by default to: + // exchange = exchangeName + // routingKey = .request + AMQP.ProducerClient producer = AMQP.newStringProducer(connection, exchangeName); + + producer.send("This shit is easy!"); + } + + public void easyProtobufProducerConsumer() { + + ActorRef connection = AMQP.newConnection(); + + String exchangeName = "easy.protobuf"; + + Procedure procedure = new Procedure() { + public void apply(RemoteProtocol.AddressProtocol message) { + System.out.println("### >> Received message: " + message); + } + }; + + AMQP.newProtobufConsumer(connection, procedure, exchangeName, RemoteProtocol.AddressProtocol.class); + + AMQP.ProducerClient producerClient = AMQP.newProtobufProducer(connection, exchangeName); + + producerClient.send(RemoteProtocol.AddressProtocol.newBuilder().setHostname("akkarocks.com").setPort(1234).build()); + } } class DummyActor extends UntypedActor { @@ -109,11 +166,11 @@ class ChannelCallbackActor extends UntypedActor { } public void onReceive(Object message) throws Exception { - if (ChannelCallbacks.STARTED.getClass().isAssignableFrom(message.getClass())) { + if (STARTED.getClass().isAssignableFrom(message.getClass())) { System.out.println("### >> Channel callback: Started"); channelCountdown.countDown(); - } else if (ChannelCallbacks.RESTARTING.getClass().isAssignableFrom(message.getClass())) { - } else if (ChannelCallbacks.STOPPED.getClass().isAssignableFrom(message.getClass())) { + } else if (RESTARTING.getClass().isAssignableFrom(message.getClass())) { + } else if (STOPPED.getClass().isAssignableFrom(message.getClass())) { System.out.println("### >> Channel callback: Stopped"); } else throw new IllegalArgumentException("Unknown message: " + message); } @@ -122,10 +179,10 @@ class ChannelCallbackActor extends UntypedActor { class ConnectionCallbackActor extends UntypedActor { public void onReceive(Object message) throws Exception { - if (ConnectionCallbacks.CONNECTED.getClass().isAssignableFrom(message.getClass())) { + if (CONNECTED.getClass().isAssignableFrom(message.getClass())) { System.out.println("### >> Connection callback: Connected!"); - } else if (ConnectionCallbacks.RECONNECTING.getClass().isAssignableFrom(message.getClass())) { - } else if (ConnectionCallbacks.DISCONNECTED.getClass().isAssignableFrom(message.getClass())) { + } else if (RECONNECTING.getClass().isAssignableFrom(message.getClass())) { + } else if (DISCONNECTED.getClass().isAssignableFrom(message.getClass())) { System.out.println("### >> Connection callback: Disconnected!"); } else throw new IllegalArgumentException("Unknown message: " + message); } diff --git a/akka-amqp/src/main/java/se/scalablesolutions/akka/amqp/ExchangeTypes.java b/akka-amqp/src/main/java/se/scalablesolutions/akka/amqp/ExchangeTypes.java deleted file mode 100644 index 9a032554fe..0000000000 --- a/akka-amqp/src/main/java/se/scalablesolutions/akka/amqp/ExchangeTypes.java +++ /dev/null @@ -1,11 +0,0 @@ -package se.scalablesolutions.akka.amqp; - -// Needed for Java API usage -public final class ExchangeTypes { - public static final ExchangeType DIRECT = Direct$.MODULE$; - public static final ExchangeType FANOUT = Fanout$.MODULE$; - public static final ExchangeType TOPIC = Topic$.MODULE$; - public static final ExchangeType MATCH = Match$.MODULE$; - - private ExchangeTypes() {} -} diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala index 1856f63f57..2373219a30 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala @@ -11,6 +11,8 @@ import com.rabbitmq.client.{ReturnListener, ShutdownListener, ConnectionFactory} import ConnectionFactory._ import com.rabbitmq.client.AMQP.BasicProperties import java.lang.{String, IllegalArgumentException} +import reflect.Manifest +import se.scalablesolutions.akka.util.Procedure /** * AMQP Actor API. Implements Connection, Producer and Consumer materialized as Actors. @@ -88,13 +90,13 @@ object AMQP { */ case class ExchangeParameters( exchangeName: String, - exchangeType: ExchangeType = Topic, + exchangeType: ExchangeType = ExchangeType.Topic(), exchangeDeclaration: Declaration = ActiveDeclaration(), configurationArguments: Map[String, AnyRef] = Map.empty) { // Needed for Java API usage def this(exchangeName: String) = - this (exchangeName, Topic, ActiveDeclaration(), Map.empty) + this (exchangeName, ExchangeType.Topic(), ActiveDeclaration(), Map.empty) // Needed for Java API usage def this(exchangeName: String, exchangeType: ExchangeType) = @@ -113,8 +115,7 @@ object AMQP { producerId: Option[String] = None, returnListener: Option[ReturnListener] = None, channelParameters: Option[ChannelParameters] = None) { - - def this() = this(None, None, None, None) + def this() = this (None, None, None, None) // Needed for Java API usage def this(exchangeParameters: ExchangeParameters) = this (Some(exchangeParameters), None, None, None) @@ -147,7 +148,6 @@ object AMQP { queueDeclaration: Declaration = ActiveDeclaration(), selfAcknowledging: Boolean = true, channelParameters: Option[ChannelParameters] = None) { - if (queueName.isEmpty) { queueDeclaration match { case ActiveDeclaration(true, _, _) => @@ -210,7 +210,7 @@ object AMQP { def this(routingKey: String, deliveryHandler: ActorRef, queueName: String, exchangeParameters: ExchangeParameters, queueDeclaration: Declaration, selfAcknowledging: Boolean, channelParameters: ChannelParameters) = this (routingKey, deliveryHandler, Some(queueName), Some(exchangeParameters), queueDeclaration, selfAcknowledging, Some(channelParameters)) - // How about that for some overloading... huh? :P (yes, I know, there are still posibilities left...sue me!) + // How about that for some overloading... huh? :P (yes, I know, there are still possibilities left...sue me!) // Who said java is easy :( } @@ -246,6 +246,15 @@ object AMQP { * Convenience */ class ProducerClient[O](client: ActorRef, routingKey: String, toBinary: ToBinary[O]) { + // Needed for Java API usage + def send(request: O): Unit = { + send(request, None) + } + // Needed for Java API usage + def send(request: O, replyTo: String): Unit = { + send(request, Some(replyTo)) + } + def send(request: O, replyTo: Option[String] = None) = { val basicProperties = new BasicProperties basicProperties.setReplyTo(replyTo.getOrElse(null)) @@ -255,6 +264,27 @@ object AMQP { def stop() = client.stop } + // Needed for Java API usage + def newStringProducer(connection: ActorRef, + exchangeName: String): ProducerClient[String] = { + newStringProducer(connection, Some(exchangeName)) + } + + // Needed for Java API usage + def newStringProducer(connection: ActorRef, + exchangeName: String, + routingKey: String): ProducerClient[String] = { + newStringProducer(connection, Some(exchangeName), Some(routingKey)) + } + + // Needed for Java API usage + def newStringProducer(connection: ActorRef, + exchangeName: String, + routingKey: String, + producerId: String): ProducerClient[String] = { + newStringProducer(connection, Some(exchangeName), Some(routingKey), Some(producerId)) + } + def newStringProducer(connection: ActorRef, exchangeName: Option[String], routingKey: Option[String] = None, @@ -273,6 +303,30 @@ object AMQP { new ProducerClient(producerRef, rKey, toBinary) } + // Needed for Java API usage + def newStringConsumer(connection: ActorRef, + handler: Procedure[String], + exchangeName: String): ActorRef = { + newStringConsumer(connection, handler.apply _, Some(exchangeName)) + } + + // Needed for Java API usage + def newStringConsumer(connection: ActorRef, + handler: Procedure[String], + exchangeName: String, + routingKey: String): ActorRef = { + newStringConsumer(connection, handler.apply _, Some(exchangeName), Some(routingKey)) + } + + // Needed for Java API usage + def newStringConsumer(connection: ActorRef, + handler: Procedure[String], + exchangeName: String, + routingKey: String, + queueName: String): ActorRef = { + newStringConsumer(connection, handler.apply _, Some(exchangeName), Some(routingKey), Some(queueName)) + } + def newStringConsumer(connection: ActorRef, handler: String => Unit, exchangeName: Option[String], @@ -295,6 +349,27 @@ object AMQP { } + // Needed for Java API usage + def newProtobufProducer[O <: com.google.protobuf.Message](connection: ActorRef, + exchangeName: String): ProducerClient[O] = { + newProtobufProducer(connection, Some(exchangeName)) + } + + // Needed for Java API usage + def newProtobufProducer[O <: com.google.protobuf.Message](connection: ActorRef, + exchangeName: String, + routingKey: String): ProducerClient[O] = { + newProtobufProducer(connection, Some(exchangeName), Some(routingKey)) + } + + // Needed for Java API usage + def newProtobufProducer[O <: com.google.protobuf.Message](connection: ActorRef, + exchangeName: String, + routingKey: String, + producerId: String): ProducerClient[O] = { + newProtobufProducer(connection, Some(exchangeName), Some(routingKey), Some(producerId)) + } + def newProtobufProducer[O <: com.google.protobuf.Message](connection: ActorRef, exchangeName: Option[String], routingKey: Option[String] = None, @@ -312,6 +387,36 @@ object AMQP { }) } + // Needed for Java API usage + def newProtobufConsumer[I <: com.google.protobuf.Message](connection: ActorRef, + handler: Procedure[I], + exchangeName: String, + clazz: Class[I]): ActorRef = { + implicit val manifest = Manifest.classType[I](clazz) + newProtobufConsumer[I](connection, handler.apply _, Some(exchangeName)) + } + + // Needed for Java API usage + def newProtobufConsumer[I <: com.google.protobuf.Message](connection: ActorRef, + handler: Procedure[I], + exchangeName: String, + routingKey: String, + clazz: Class[I]): ActorRef = { + implicit val manifest = Manifest.classType[I](clazz) + newProtobufConsumer[I](connection, handler.apply _, Some(exchangeName), Some(routingKey)) + } + + // Needed for Java API usage + def newProtobufConsumer[I <: com.google.protobuf.Message](connection: ActorRef, + handler: Procedure[I], + exchangeName: String, + routingKey: String, + queueName: String, + clazz: Class[I]): ActorRef = { + implicit val manifest = Manifest.classType[I](clazz) + newProtobufConsumer[I](connection, handler.apply _, Some(exchangeName), Some(routingKey), Some(queueName)) + } + def newProtobufConsumer[I <: com.google.protobuf.Message](connection: ActorRef, handler: I => Unit, exchangeName: Option[String], @@ -335,7 +440,7 @@ object AMQP { newConsumer(connection, ConsumerParameters(rKey, deliveryHandler, Some(qName), exchangeParameters)) } - + /** * Main supervisor */ diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala index 1fa39d1882..ed97b359eb 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala @@ -12,6 +12,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit} import java.lang.String import se.scalablesolutions.akka.amqp.AMQP._ import se.scalablesolutions.akka.remote.protocol.RemoteProtocol.AddressProtocol +import se.scalablesolutions.akka.util.Procedure object ExampleSession { @@ -67,7 +68,7 @@ object ExampleSession { // defaults to amqp://guest:guest@localhost:5672/ val connection = AMQP.newConnection() - val exchangeParameters = ExchangeParameters("my_direct_exchange", Direct) + val exchangeParameters = ExchangeParameters("my_direct_exchange", Direct()) val consumer = AMQP.newConsumer(connection, ConsumerParameters("some.routing", actor { case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload)) @@ -82,7 +83,7 @@ object ExampleSession { // defaults to amqp://guest:guest@localhost:5672/ val connection = AMQP.newConnection() - val exchangeParameters = ExchangeParameters("my_fanout_exchange", Fanout) + val exchangeParameters = ExchangeParameters("my_fanout_exchange", Fanout()) val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters("@george_bush", actor { case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload)) @@ -101,7 +102,7 @@ object ExampleSession { // defaults to amqp://guest:guest@localhost:5672/ val connection = AMQP.newConnection() - val exchangeParameters = ExchangeParameters("my_topic_exchange", Topic) + val exchangeParameters = ExchangeParameters("my_topic_exchange", Topic()) val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters("@george_bush", actor { case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload)) @@ -135,7 +136,7 @@ object ExampleSession { case Restarting => // not used, sent when channel or connection fails and initiates a restart case Stopped => log.info("Channel callback: Stopped") } - val exchangeParameters = ExchangeParameters("my_callback_exchange", Direct) + val exchangeParameters = ExchangeParameters("my_callback_exchange", Direct()) val channelParameters = ChannelParameters(channelCallback = Some(channelCallback)) val consumer = AMQP.newConsumer(connection, ConsumerParameters("callback.routing", actor { @@ -177,7 +178,7 @@ object ExampleSession { log.info("Received "+message) } - AMQP.newProtobufConsumer(connection, protobufMessageHandler, Some(exchangeName)) + AMQP.newProtobufConsumer(connection, protobufMessageHandler _, Some(exchangeName)) val producerClient = AMQP.newProtobufProducer[AddressProtocol](connection, Some(exchangeName)) producerClient.send(AddressProtocol.newBuilder.setHostname("akkarocks.com").setPort(1234).build) diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExchangeType.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExchangeType.scala index 332a681ad4..38a0073c30 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExchangeType.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExchangeType.scala @@ -5,15 +5,17 @@ package se.scalablesolutions.akka.amqp sealed trait ExchangeType -case object Direct extends ExchangeType { - override def toString = "direct" -} -case object Topic extends ExchangeType { - override def toString = "topic" -} -case object Fanout extends ExchangeType { - override def toString = "fanout" -} -case object Match extends ExchangeType { - override def toString = "match" +object ExchangeType { + case class Direct() extends ExchangeType { + override def toString = "direct" + } + case class Topic() extends ExchangeType { + override def toString = "topic" + } + case class Fanout() extends ExchangeType { + override def toString = "fanout" + } + case class Match() extends ExchangeType { + override def toString = "match" + } }