initial take on java examples

This commit is contained in:
momania 2010-09-24 12:13:29 +02:00
parent 489db0074f
commit c6470b0e02
10 changed files with 221 additions and 48 deletions

View file

@ -0,0 +1,10 @@
package se.scalablesolutions.akka.amqp;
public final class ChannelCallbacks {
public static final AMQPMessage STARTED = Started$.MODULE$;
public static final AMQPMessage RESTARTING = Restarting$.MODULE$;
public static final AMQPMessage STOPPED = Stopped$.MODULE$;
private ChannelCallbacks() {}
}

View file

@ -0,0 +1,10 @@
package se.scalablesolutions.akka.amqp;
public final class ConnectionCallbacks {
public static final AMQPMessage CONNECTED = Connected$.MODULE$;
public static final AMQPMessage RECONNECTING = Reconnecting$.MODULE$;
public static final AMQPMessage DISCONNECTED = Disconnected$.MODULE$;
private ConnectionCallbacks() {}
}

View file

@ -0,0 +1,142 @@
package se.scalablesolutions.akka.amqp;
import se.scalablesolutions.akka.actor.ActorRef;
import se.scalablesolutions.akka.actor.ActorRegistry;
import se.scalablesolutions.akka.actor.UntypedActor;
import se.scalablesolutions.akka.actor.UntypedActorFactory;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class ExampleSessionJava {
public static void main(String... args) {
new ExampleSessionJava();
}
public ExampleSessionJava() {
printTopic("DIRECT");
direct();
printTopic("CALLBACK");
callback();
printTopic("Happy hAkking :-)");
// postStop everything the amqp tree except the main AMQP supervisor
// all connections/consumers/producers will be stopped
AMQP.shutdownAll();
ActorRegistry.shutdownAll();
System.exit(0);
}
private void printTopic(String topic) {
System.out.println("");
System.out.println("==== " + topic + " ===");
System.out.println("");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException ignore) {
}
}
private void direct() {
// defaults to amqp://guest:guest@localhost:5672/
ActorRef connection = AMQP.newConnection();
AMQP.ExchangeParameters exchangeParameters = new AMQP.ExchangeParameters("my_direct_exchange", ExchangeTypes.DIRECT);
ActorRef deliveryHandler = UntypedActor.actorOf(DirectDeliveryHandlerActor.class);
AMQP.ConsumerParameters consumerParameters = new AMQP.ConsumerParameters("some.routing", deliveryHandler, exchangeParameters);
ActorRef consumer = AMQP.newConsumer(connection, consumerParameters);
ActorRef producer = AMQP.newProducer(connection, new AMQP.ProducerParameters(exchangeParameters));
producer.sendOneWay(new Message("@jonas_boner: You sucked!!".getBytes(), "some.routing"));
}
private void callback() {
final CountDownLatch channelCountdown = new CountDownLatch(2);
ActorRef connectionCallback = UntypedActor.actorOf(ConnectionCallbackActor.class);
connectionCallback.start();
AMQP.ConnectionParameters connectionParameters = new AMQP.ConnectionParameters(connectionCallback);
ActorRef connection = AMQP.newConnection(connectionParameters);
ActorRef channelCallback = UntypedActor.actorOf(new UntypedActorFactory() {
public UntypedActor create() {
return new ChannelCallbackActor(channelCountdown);
}
});
channelCallback.start();
AMQP.ExchangeParameters exchangeParameters = new AMQP.ExchangeParameters("my_callback_exchange", ExchangeTypes.DIRECT);
AMQP.ChannelParameters channelParameters = new AMQP.ChannelParameters(channelCallback);
ActorRef dummyHandler = UntypedActor.actorOf(DummyActor.class);
AMQP.ConsumerParameters consumerParameters = new AMQP.ConsumerParameters("callback.routing", dummyHandler, exchangeParameters, channelParameters);
ActorRef consumer = AMQP.newConsumer(connection, consumerParameters);
ActorRef producer = AMQP.newProducer(connection, new AMQP.ProducerParameters(exchangeParameters));
// Wait until both channels (producer & consumer) are started before stopping the connection
try {
channelCountdown.await(2, TimeUnit.SECONDS);
} catch (InterruptedException ignore) {
}
connection.stop();
}
}
class DummyActor extends UntypedActor {
public void onReceive(Object message) throws Exception {
// not used
}
}
class ChannelCallbackActor extends UntypedActor {
private final CountDownLatch channelCountdown;
public ChannelCallbackActor(CountDownLatch channelCountdown) {
this.channelCountdown = channelCountdown;
}
public void onReceive(Object message) throws Exception {
if (ChannelCallbacks.STARTED.getClass().isAssignableFrom(message.getClass())) {
System.out.println("### >> Channel callback: Started");
channelCountdown.countDown();
} else if (ChannelCallbacks.RESTARTING.getClass().isAssignableFrom(message.getClass())) {
} else if (ChannelCallbacks.STOPPED.getClass().isAssignableFrom(message.getClass())) {
System.out.println("### >> Channel callback: Stopped");
} else throw new IllegalArgumentException("Unknown message: " + message);
}
}
class ConnectionCallbackActor extends UntypedActor {
public void onReceive(Object message) throws Exception {
if (ConnectionCallbacks.CONNECTED.getClass().isAssignableFrom(message.getClass())) {
System.out.println("### >> Connection callback: Connected!");
} else if (ConnectionCallbacks.RECONNECTING.getClass().isAssignableFrom(message.getClass())) {
} else if (ConnectionCallbacks.DISCONNECTED.getClass().isAssignableFrom(message.getClass())) {
System.out.println("### >> Connection callback: Disconnected!");
} else throw new IllegalArgumentException("Unknown message: " + message);
}
}
class DirectDeliveryHandlerActor extends UntypedActor {
public void onReceive(Object message) throws Exception {
if (Delivery.class.isAssignableFrom(message.getClass())) {
Delivery delivery = (Delivery) message;
System.out.println("### >> @george_bush received message from: " + new String(delivery.payload()));
} else throw new IllegalArgumentException("Unknown message: " + message);
}
}

View file

@ -0,0 +1,11 @@
package se.scalablesolutions.akka.amqp;
// Needed for Java API usage
public final class ExchangeTypes {
public static final ExchangeType DIRECT = Direct$.MODULE$;
public static final ExchangeType FANOUT = Fanout$.MODULE$;
public static final ExchangeType TOPIC = Topic$.MODULE$;
public static final ExchangeType MATCH = Match$.MODULE$;
private ExchangeTypes() {}
}

View file

@ -88,13 +88,13 @@ object AMQP {
*/
case class ExchangeParameters(
exchangeName: String,
exchangeType: ExchangeType = ExchangeType.Topic,
exchangeType: ExchangeType = Topic,
exchangeDeclaration: Declaration = ActiveDeclaration(),
configurationArguments: Map[String, AnyRef] = Map.empty) {
// Needed for Java API usage
def this(exchangeName: String) =
this (exchangeName, ExchangeType.Topic, ActiveDeclaration(), Map.empty)
this (exchangeName, Topic, ActiveDeclaration(), Map.empty)
// Needed for Java API usage
def this(exchangeName: String, exchangeType: ExchangeType) =
@ -186,6 +186,10 @@ object AMQP {
def this(routingKey: String, deliveryHandler: ActorRef, exchangeParameters: ExchangeParameters) =
this (routingKey, deliveryHandler, None, Some(exchangeParameters), ActiveDeclaration(), true, None)
// Needed for Java API usage
def this(routingKey: String, deliveryHandler: ActorRef, exchangeParameters: ExchangeParameters, channelParameters: ChannelParameters) =
this (routingKey, deliveryHandler, None, Some(exchangeParameters), ActiveDeclaration(), true, Some(channelParameters))
// Needed for Java API usage
def this(routingKey: String, deliveryHandler: ActorRef, exchangeParameters: ExchangeParameters, selfAcknowledging: Boolean) =
this (routingKey, deliveryHandler, None, Some(exchangeParameters), ActiveDeclaration(), selfAcknowledging, None)
@ -269,26 +273,6 @@ object AMQP {
new ProducerClient(producerRef, rKey, toBinary)
}
// Needed for Java API usage
def newStringProducer(connection: ActorRef): ProducerClient[String] = {
newStringProducer(connection, None, None, None)
}
// Needed for Java API usage
def newStringProducer(connection: ActorRef, exchangeName: String): ProducerClient[String] = {
newStringProducer(connection, Some(exchangeName), None, None)
}
// Needed for Java API usage
def newStringProducer(connection: ActorRef, exchangeName: String, routingKey: String): ProducerClient[String] = {
newStringProducer(connection, Some(exchangeName), Some(routingKey), None)
}
// Needed for Java API usage
def newStringProducer(connection: ActorRef, exchangeName: String, routingKey: String, producerId: String): ProducerClient[String] = {
newStringProducer(connection, Some(exchangeName), Some(routingKey), Some(producerId))
}
def newStringConsumer(connection: ActorRef,
handler: String => Unit,
exchangeName: Option[String],
@ -310,6 +294,7 @@ object AMQP {
newConsumer(connection, ConsumerParameters(rKey, deliveryHandler, Some(qName), exchangeParameters))
}
def newProtobufProducer[O <: com.google.protobuf.Message](connection: ActorRef,
exchangeName: Option[String],
routingKey: Option[String] = None,
@ -350,6 +335,7 @@ object AMQP {
newConsumer(connection, ConsumerParameters(rKey, deliveryHandler, Some(qName), exchangeParameters))
}
/**
* Main supervisor
*/

View file

@ -18,7 +18,23 @@ case class Message(
routingKey: String,
mandatory: Boolean = false,
immediate: Boolean = false,
properties: Option[BasicProperties] = None) extends AMQPMessage
properties: Option[BasicProperties] = None) extends AMQPMessage {
// Needed for Java API usage
def this(payload: Array[Byte], routingKey: String) = this(payload, routingKey, false, false, None)
// Needed for Java API usage
def this(payload: Array[Byte], routingKey: String, mandatory: Boolean, immediate: Boolean) =
this(payload, routingKey, mandatory, immediate, None)
// Needed for Java API usage
def this(payload: Array[Byte], routingKey: String, properties: BasicProperties) =
this(payload, routingKey, false, false, Some(properties))
// Needed for Java API usage
def this(payload: Array[Byte], routingKey: String, mandatory: Boolean, immediate: Boolean, properties: BasicProperties) =
this(payload, routingKey, mandatory, immediate, Some(properties))
}
case class Delivery(
payload: Array[Byte],
@ -52,8 +68,8 @@ class RejectionException(deliveryTag: Long) extends RuntimeException
// internal messages
private[akka] case class Failure(cause: Throwable) extends InternalAMQPMessage
private[akka] case class ConnectionShutdown(cause: ShutdownSignalException) extends InternalAMQPMessage
private[akka] case class ChannelShutdown(cause: ShutdownSignalException) extends InternalAMQPMessage
case class ConnectionShutdown(cause: ShutdownSignalException) extends InternalAMQPMessage
case class ChannelShutdown(cause: ShutdownSignalException) extends InternalAMQPMessage
private[akka] class MessageNotDeliveredException(
val message: String,

View file

@ -67,7 +67,7 @@ object ExampleSession {
// defaults to amqp://guest:guest@localhost:5672/
val connection = AMQP.newConnection()
val exchangeParameters = ExchangeParameters("my_direct_exchange", ExchangeType.Direct)
val exchangeParameters = ExchangeParameters("my_direct_exchange", Direct)
val consumer = AMQP.newConsumer(connection, ConsumerParameters("some.routing", actor {
case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload))
@ -82,7 +82,7 @@ object ExampleSession {
// defaults to amqp://guest:guest@localhost:5672/
val connection = AMQP.newConnection()
val exchangeParameters = ExchangeParameters("my_fanout_exchange", ExchangeType.Fanout)
val exchangeParameters = ExchangeParameters("my_fanout_exchange", Fanout)
val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters("@george_bush", actor {
case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload))
@ -101,7 +101,7 @@ object ExampleSession {
// defaults to amqp://guest:guest@localhost:5672/
val connection = AMQP.newConnection()
val exchangeParameters = ExchangeParameters("my_topic_exchange", ExchangeType.Topic)
val exchangeParameters = ExchangeParameters("my_topic_exchange", Topic)
val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters("@george_bush", actor {
case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload))
@ -135,7 +135,7 @@ object ExampleSession {
case Restarting => // not used, sent when channel or connection fails and initiates a restart
case Stopped => log.info("Channel callback: Stopped")
}
val exchangeParameters = ExchangeParameters("my_callback_exchange", ExchangeType.Direct)
val exchangeParameters = ExchangeParameters("my_callback_exchange", Direct)
val channelParameters = ChannelParameters(channelCallback = Some(channelCallback))
val consumer = AMQP.newConsumer(connection, ConsumerParameters("callback.routing", actor {
@ -163,7 +163,7 @@ object ExampleSession {
// send by default to:
// exchange = exchangeName
// routingKey = <exchange>.request
val producer = AMQP.newStringProducer(connection, exchangeName)
val producer = AMQP.newStringProducer(connection, Some(exchangeName))
producer.send("This shit is easy!")
}

View file

@ -5,7 +5,6 @@
package se.scalablesolutions.akka.amqp
sealed trait ExchangeType
object ExchangeType {
case object Direct extends ExchangeType {
override def toString = "direct"
}
@ -18,4 +17,3 @@ object ExchangeType {
case object Match extends ExchangeType {
override def toString = "match"
}
}

View file

@ -31,7 +31,7 @@ class AMQPStringProducerConsumerTestIntegration extends JUnitSuite with MustMatc
}
AMQP.newStringConsumer(connection, responseHandler, None, Some("string.reply.key"))
val producer = AMQP.newStringProducer(connection, "stringexchange")
val producer = AMQP.newStringProducer(connection, Some("stringexchange"))
producer.send(request, Some("string.reply.key"))
responseLatch.tryAwait(2, TimeUnit.SECONDS) must be (true)