diff --git a/akka-amqp/src/main/java/se/scalablesolutions/akka/amqp/ChannelCallbacks.java b/akka-amqp/src/main/java/se/scalablesolutions/akka/amqp/ChannelCallbacks.java new file mode 100644 index 0000000000..36f54fd825 --- /dev/null +++ b/akka-amqp/src/main/java/se/scalablesolutions/akka/amqp/ChannelCallbacks.java @@ -0,0 +1,10 @@ +package se.scalablesolutions.akka.amqp; + +public final class ChannelCallbacks { + + public static final AMQPMessage STARTED = Started$.MODULE$; + public static final AMQPMessage RESTARTING = Restarting$.MODULE$; + public static final AMQPMessage STOPPED = Stopped$.MODULE$; + + private ChannelCallbacks() {} +} diff --git a/akka-amqp/src/main/java/se/scalablesolutions/akka/amqp/ConnectionCallbacks.java b/akka-amqp/src/main/java/se/scalablesolutions/akka/amqp/ConnectionCallbacks.java new file mode 100644 index 0000000000..1314330928 --- /dev/null +++ b/akka-amqp/src/main/java/se/scalablesolutions/akka/amqp/ConnectionCallbacks.java @@ -0,0 +1,10 @@ +package se.scalablesolutions.akka.amqp; + +public final class ConnectionCallbacks { + + public static final AMQPMessage CONNECTED = Connected$.MODULE$; + public static final AMQPMessage RECONNECTING = Reconnecting$.MODULE$; + public static final AMQPMessage DISCONNECTED = Disconnected$.MODULE$; + + private ConnectionCallbacks() {} +} diff --git a/akka-amqp/src/main/java/se/scalablesolutions/akka/amqp/ExampleSessionJava.java b/akka-amqp/src/main/java/se/scalablesolutions/akka/amqp/ExampleSessionJava.java new file mode 100644 index 0000000000..ad0d7c3841 --- /dev/null +++ b/akka-amqp/src/main/java/se/scalablesolutions/akka/amqp/ExampleSessionJava.java @@ -0,0 +1,142 @@ +package se.scalablesolutions.akka.amqp; + +import se.scalablesolutions.akka.actor.ActorRef; +import se.scalablesolutions.akka.actor.ActorRegistry; +import se.scalablesolutions.akka.actor.UntypedActor; +import se.scalablesolutions.akka.actor.UntypedActorFactory; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class ExampleSessionJava { + + public static void main(String... args) { + new ExampleSessionJava(); + } + + public ExampleSessionJava() { + printTopic("DIRECT"); + direct(); + + printTopic("CALLBACK"); + callback(); + + printTopic("Happy hAkking :-)"); + + // postStop everything the amqp tree except the main AMQP supervisor + // all connections/consumers/producers will be stopped + AMQP.shutdownAll(); + + ActorRegistry.shutdownAll(); + System.exit(0); + } + + private void printTopic(String topic) { + + System.out.println(""); + System.out.println("==== " + topic + " ==="); + System.out.println(""); + try { + TimeUnit.SECONDS.sleep(2); + } catch (InterruptedException ignore) { + } + } + + private void direct() { + // defaults to amqp://guest:guest@localhost:5672/ + ActorRef connection = AMQP.newConnection(); + + AMQP.ExchangeParameters exchangeParameters = new AMQP.ExchangeParameters("my_direct_exchange", ExchangeTypes.DIRECT); + + ActorRef deliveryHandler = UntypedActor.actorOf(DirectDeliveryHandlerActor.class); + + AMQP.ConsumerParameters consumerParameters = new AMQP.ConsumerParameters("some.routing", deliveryHandler, exchangeParameters); + ActorRef consumer = AMQP.newConsumer(connection, consumerParameters); + + + ActorRef producer = AMQP.newProducer(connection, new AMQP.ProducerParameters(exchangeParameters)); + producer.sendOneWay(new Message("@jonas_boner: You sucked!!".getBytes(), "some.routing")); + } + + private void callback() { + + final CountDownLatch channelCountdown = new CountDownLatch(2); + + ActorRef connectionCallback = UntypedActor.actorOf(ConnectionCallbackActor.class); + connectionCallback.start(); + + AMQP.ConnectionParameters connectionParameters = new AMQP.ConnectionParameters(connectionCallback); + ActorRef connection = AMQP.newConnection(connectionParameters); + + ActorRef channelCallback = UntypedActor.actorOf(new UntypedActorFactory() { + public UntypedActor create() { + return new ChannelCallbackActor(channelCountdown); + } + }); + channelCallback.start(); + + AMQP.ExchangeParameters exchangeParameters = new AMQP.ExchangeParameters("my_callback_exchange", ExchangeTypes.DIRECT); + AMQP.ChannelParameters channelParameters = new AMQP.ChannelParameters(channelCallback); + + ActorRef dummyHandler = UntypedActor.actorOf(DummyActor.class); + AMQP.ConsumerParameters consumerParameters = new AMQP.ConsumerParameters("callback.routing", dummyHandler, exchangeParameters, channelParameters); + + ActorRef consumer = AMQP.newConsumer(connection, consumerParameters); + + ActorRef producer = AMQP.newProducer(connection, new AMQP.ProducerParameters(exchangeParameters)); + + // Wait until both channels (producer & consumer) are started before stopping the connection + try { + channelCountdown.await(2, TimeUnit.SECONDS); + } catch (InterruptedException ignore) { + } + connection.stop(); + } +} + +class DummyActor extends UntypedActor { + public void onReceive(Object message) throws Exception { + // not used + } +} + +class ChannelCallbackActor extends UntypedActor { + + private final CountDownLatch channelCountdown; + + public ChannelCallbackActor(CountDownLatch channelCountdown) { + this.channelCountdown = channelCountdown; + } + + public void onReceive(Object message) throws Exception { + if (ChannelCallbacks.STARTED.getClass().isAssignableFrom(message.getClass())) { + System.out.println("### >> Channel callback: Started"); + channelCountdown.countDown(); + } else if (ChannelCallbacks.RESTARTING.getClass().isAssignableFrom(message.getClass())) { + } else if (ChannelCallbacks.STOPPED.getClass().isAssignableFrom(message.getClass())) { + System.out.println("### >> Channel callback: Stopped"); + } else throw new IllegalArgumentException("Unknown message: " + message); + } +} + +class ConnectionCallbackActor extends UntypedActor { + + public void onReceive(Object message) throws Exception { + if (ConnectionCallbacks.CONNECTED.getClass().isAssignableFrom(message.getClass())) { + System.out.println("### >> Connection callback: Connected!"); + } else if (ConnectionCallbacks.RECONNECTING.getClass().isAssignableFrom(message.getClass())) { + } else if (ConnectionCallbacks.DISCONNECTED.getClass().isAssignableFrom(message.getClass())) { + System.out.println("### >> Connection callback: Disconnected!"); + } else throw new IllegalArgumentException("Unknown message: " + message); + } +} + +class DirectDeliveryHandlerActor extends UntypedActor { + + public void onReceive(Object message) throws Exception { + if (Delivery.class.isAssignableFrom(message.getClass())) { + Delivery delivery = (Delivery) message; + System.out.println("### >> @george_bush received message from: " + new String(delivery.payload())); + } else throw new IllegalArgumentException("Unknown message: " + message); + } +} diff --git a/akka-amqp/src/main/java/se/scalablesolutions/akka/amqp/ExchangeTypes.java b/akka-amqp/src/main/java/se/scalablesolutions/akka/amqp/ExchangeTypes.java new file mode 100644 index 0000000000..9a032554fe --- /dev/null +++ b/akka-amqp/src/main/java/se/scalablesolutions/akka/amqp/ExchangeTypes.java @@ -0,0 +1,11 @@ +package se.scalablesolutions.akka.amqp; + +// Needed for Java API usage +public final class ExchangeTypes { + public static final ExchangeType DIRECT = Direct$.MODULE$; + public static final ExchangeType FANOUT = Fanout$.MODULE$; + public static final ExchangeType TOPIC = Topic$.MODULE$; + public static final ExchangeType MATCH = Match$.MODULE$; + + private ExchangeTypes() {} +} diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala index 5b2fe6501f..1856f63f57 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala @@ -88,13 +88,13 @@ object AMQP { */ case class ExchangeParameters( exchangeName: String, - exchangeType: ExchangeType = ExchangeType.Topic, + exchangeType: ExchangeType = Topic, exchangeDeclaration: Declaration = ActiveDeclaration(), configurationArguments: Map[String, AnyRef] = Map.empty) { // Needed for Java API usage def this(exchangeName: String) = - this (exchangeName, ExchangeType.Topic, ActiveDeclaration(), Map.empty) + this (exchangeName, Topic, ActiveDeclaration(), Map.empty) // Needed for Java API usage def this(exchangeName: String, exchangeType: ExchangeType) = @@ -186,6 +186,10 @@ object AMQP { def this(routingKey: String, deliveryHandler: ActorRef, exchangeParameters: ExchangeParameters) = this (routingKey, deliveryHandler, None, Some(exchangeParameters), ActiveDeclaration(), true, None) + // Needed for Java API usage + def this(routingKey: String, deliveryHandler: ActorRef, exchangeParameters: ExchangeParameters, channelParameters: ChannelParameters) = + this (routingKey, deliveryHandler, None, Some(exchangeParameters), ActiveDeclaration(), true, Some(channelParameters)) + // Needed for Java API usage def this(routingKey: String, deliveryHandler: ActorRef, exchangeParameters: ExchangeParameters, selfAcknowledging: Boolean) = this (routingKey, deliveryHandler, None, Some(exchangeParameters), ActiveDeclaration(), selfAcknowledging, None) @@ -269,26 +273,6 @@ object AMQP { new ProducerClient(producerRef, rKey, toBinary) } - // Needed for Java API usage - def newStringProducer(connection: ActorRef): ProducerClient[String] = { - newStringProducer(connection, None, None, None) - } - // Needed for Java API usage - def newStringProducer(connection: ActorRef, exchangeName: String): ProducerClient[String] = { - newStringProducer(connection, Some(exchangeName), None, None) - } - - // Needed for Java API usage - def newStringProducer(connection: ActorRef, exchangeName: String, routingKey: String): ProducerClient[String] = { - newStringProducer(connection, Some(exchangeName), Some(routingKey), None) - } - - // Needed for Java API usage - def newStringProducer(connection: ActorRef, exchangeName: String, routingKey: String, producerId: String): ProducerClient[String] = { - newStringProducer(connection, Some(exchangeName), Some(routingKey), Some(producerId)) - } - - def newStringConsumer(connection: ActorRef, handler: String => Unit, exchangeName: Option[String], @@ -310,6 +294,7 @@ object AMQP { newConsumer(connection, ConsumerParameters(rKey, deliveryHandler, Some(qName), exchangeParameters)) } + def newProtobufProducer[O <: com.google.protobuf.Message](connection: ActorRef, exchangeName: Option[String], routingKey: Option[String] = None, @@ -350,6 +335,7 @@ object AMQP { newConsumer(connection, ConsumerParameters(rKey, deliveryHandler, Some(qName), exchangeParameters)) } + /** * Main supervisor */ diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQPMessage.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQPMessage.scala index 2b53a73d08..a99b1afc67 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQPMessage.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQPMessage.scala @@ -18,7 +18,23 @@ case class Message( routingKey: String, mandatory: Boolean = false, immediate: Boolean = false, - properties: Option[BasicProperties] = None) extends AMQPMessage + properties: Option[BasicProperties] = None) extends AMQPMessage { + + // Needed for Java API usage + def this(payload: Array[Byte], routingKey: String) = this(payload, routingKey, false, false, None) + + // Needed for Java API usage + def this(payload: Array[Byte], routingKey: String, mandatory: Boolean, immediate: Boolean) = + this(payload, routingKey, mandatory, immediate, None) + + // Needed for Java API usage + def this(payload: Array[Byte], routingKey: String, properties: BasicProperties) = + this(payload, routingKey, false, false, Some(properties)) + + // Needed for Java API usage + def this(payload: Array[Byte], routingKey: String, mandatory: Boolean, immediate: Boolean, properties: BasicProperties) = + this(payload, routingKey, mandatory, immediate, Some(properties)) +} case class Delivery( payload: Array[Byte], @@ -52,8 +68,8 @@ class RejectionException(deliveryTag: Long) extends RuntimeException // internal messages private[akka] case class Failure(cause: Throwable) extends InternalAMQPMessage -private[akka] case class ConnectionShutdown(cause: ShutdownSignalException) extends InternalAMQPMessage -private[akka] case class ChannelShutdown(cause: ShutdownSignalException) extends InternalAMQPMessage +case class ConnectionShutdown(cause: ShutdownSignalException) extends InternalAMQPMessage +case class ChannelShutdown(cause: ShutdownSignalException) extends InternalAMQPMessage private[akka] class MessageNotDeliveredException( val message: String, diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala index 8468d75919..1fa39d1882 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala @@ -67,7 +67,7 @@ object ExampleSession { // defaults to amqp://guest:guest@localhost:5672/ val connection = AMQP.newConnection() - val exchangeParameters = ExchangeParameters("my_direct_exchange", ExchangeType.Direct) + val exchangeParameters = ExchangeParameters("my_direct_exchange", Direct) val consumer = AMQP.newConsumer(connection, ConsumerParameters("some.routing", actor { case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload)) @@ -82,7 +82,7 @@ object ExampleSession { // defaults to amqp://guest:guest@localhost:5672/ val connection = AMQP.newConnection() - val exchangeParameters = ExchangeParameters("my_fanout_exchange", ExchangeType.Fanout) + val exchangeParameters = ExchangeParameters("my_fanout_exchange", Fanout) val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters("@george_bush", actor { case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload)) @@ -101,7 +101,7 @@ object ExampleSession { // defaults to amqp://guest:guest@localhost:5672/ val connection = AMQP.newConnection() - val exchangeParameters = ExchangeParameters("my_topic_exchange", ExchangeType.Topic) + val exchangeParameters = ExchangeParameters("my_topic_exchange", Topic) val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters("@george_bush", actor { case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload)) @@ -135,7 +135,7 @@ object ExampleSession { case Restarting => // not used, sent when channel or connection fails and initiates a restart case Stopped => log.info("Channel callback: Stopped") } - val exchangeParameters = ExchangeParameters("my_callback_exchange", ExchangeType.Direct) + val exchangeParameters = ExchangeParameters("my_callback_exchange", Direct) val channelParameters = ChannelParameters(channelCallback = Some(channelCallback)) val consumer = AMQP.newConsumer(connection, ConsumerParameters("callback.routing", actor { @@ -163,7 +163,7 @@ object ExampleSession { // send by default to: // exchange = exchangeName // routingKey = .request - val producer = AMQP.newStringProducer(connection, exchangeName) + val producer = AMQP.newStringProducer(connection, Some(exchangeName)) producer.send("This shit is easy!") } diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExchangeType.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExchangeType.scala index 1fa6e0543f..332a681ad4 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExchangeType.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExchangeType.scala @@ -5,17 +5,15 @@ package se.scalablesolutions.akka.amqp sealed trait ExchangeType -object ExchangeType { - case object Direct extends ExchangeType { - override def toString = "direct" - } - case object Topic extends ExchangeType { - override def toString = "topic" - } - case object Fanout extends ExchangeType { - override def toString = "fanout" - } - case object Match extends ExchangeType { - override def toString = "match" - } +case object Direct extends ExchangeType { + override def toString = "direct" +} +case object Topic extends ExchangeType { + override def toString = "topic" +} +case object Fanout extends ExchangeType { + override def toString = "fanout" +} +case object Match extends ExchangeType { + override def toString = "match" } diff --git a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPStringProducerConsumerTestIntegration.scala b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPStringProducerConsumerTestIntegration.scala index 41f698821c..6fa7ccefd2 100644 --- a/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPStringProducerConsumerTestIntegration.scala +++ b/akka-amqp/src/test/scala/se/scalablesolutions/akka/amqp/test/AMQPStringProducerConsumerTestIntegration.scala @@ -31,7 +31,7 @@ class AMQPStringProducerConsumerTestIntegration extends JUnitSuite with MustMatc } AMQP.newStringConsumer(connection, responseHandler, None, Some("string.reply.key")) - val producer = AMQP.newStringProducer(connection, "stringexchange") + val producer = AMQP.newStringProducer(connection, Some("stringexchange")) producer.send(request, Some("string.reply.key")) responseLatch.tryAwait(2, TimeUnit.SECONDS) must be (true) diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 3c4786ef15..bc9d7f3a99 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -434,12 +434,12 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { class AkkaAMQPProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) { val commons_io = Dependencies.commons_io val rabbit = Dependencies.rabbit - val protobuf = Dependencies.protobuf + val protobuf = Dependencies.protobuf // testing - val junit = Dependencies.junit - val multiverse = Dependencies.multiverse - val scalatest = Dependencies.scalatest + val junit = Dependencies.junit + val multiverse = Dependencies.multiverse + val scalatest = Dependencies.scalatest override def testOptions = createTestFilter( _.endsWith("Test") ) }