- Adding java api to AMQP module

- Reorg of params, especially declaration attributes and exhange name/params
This commit is contained in:
momania 2010-09-23 17:07:51 +02:00
parent 476e810833
commit 84785b33eb
9 changed files with 372 additions and 196 deletions

View file

@ -8,6 +8,7 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.config.OneForOneStrategy
import com.rabbitmq.client.{ReturnListener, ShutdownListener, ConnectionFactory}
import ConnectionFactory._
import com.rabbitmq.client.AMQP.BasicProperties
import java.lang.{String, IllegalArgumentException}
@ -19,56 +20,208 @@ import java.lang.{String, IllegalArgumentException}
* @author Irmo Manie
*/
object AMQP {
case class ConnectionParameters(
host: String = ConnectionFactory.DEFAULT_HOST,
port: Int = ConnectionFactory.DEFAULT_AMQP_PORT,
username: String = ConnectionFactory.DEFAULT_USER,
password: String = ConnectionFactory.DEFAULT_PASS,
virtualHost: String = ConnectionFactory.DEFAULT_VHOST,
initReconnectDelay: Long = 5000,
connectionCallback: Option[ActorRef] = None)
/**
* Parameters used to make the connection to the amqp broker. Uses the rabbitmq defaults.
*/
case class ConnectionParameters(
host: String = DEFAULT_HOST,
port: Int = DEFAULT_AMQP_PORT,
username: String = DEFAULT_USER,
password: String = DEFAULT_PASS,
virtualHost: String = DEFAULT_VHOST,
initReconnectDelay: Long = 5000,
connectionCallback: Option[ActorRef] = None) {
// Needed for Java API usage
def this() = this (DEFAULT_HOST, DEFAULT_AMQP_PORT, DEFAULT_USER, DEFAULT_PASS, DEFAULT_VHOST, 5000, None)
// Needed for Java API usage
def this(host: String, port: Int, username: String, password: String, virtualHost: String) =
this (host, port, username, password, virtualHost, 5000, None)
// Needed for Java API usage
def this(host: String, port: Int, username: String, password: String, virtualHost: String, initReconnectDelay: Long, connectionCallback: ActorRef) =
this (host, port, username, password, virtualHost, initReconnectDelay, Some(connectionCallback))
// Needed for Java API usage
def this(connectionCallback: ActorRef) =
this (DEFAULT_HOST, DEFAULT_AMQP_PORT, DEFAULT_USER, DEFAULT_PASS, DEFAULT_VHOST, 5000, Some(connectionCallback))
}
/**
* Additional parameters for the channel
*/
case class ChannelParameters(
shutdownListener: Option[ShutdownListener] = None,
channelCallback: Option[ActorRef] = None)
channelCallback: Option[ActorRef] = None) {
// Needed for Java API usage
def this() = this (None, None)
// Needed for Java API usage
def this(channelCallback: ActorRef) = this (None, Some(channelCallback))
// Needed for Java API usage
def this(shutdownListener: ShutdownListener, channelCallback: ActorRef) =
this (Some(shutdownListener), Some(channelCallback))
}
/**
* Declaration type used for either exchange or queue declaration
*/
sealed trait Declaration
case object NoActionDeclaration extends Declaration
case object PassiveDeclaration extends Declaration
case class ActiveDeclaration(durable: Boolean = false, autoDelete: Boolean = true, exclusive: Boolean = false) extends Declaration {
// Needed for Java API usage
def this() = this (false, true, false)
// Needed for Java API usage
def this(durable: Boolean, autoDelete: Boolean) = this (durable, autoDelete, false)
}
/**
* Exchange specific parameters
*/
case class ExchangeParameters(
exchangeName: String,
exchangeType: ExchangeType,
exchangeDurable: Boolean = false,
exchangeAutoDelete: Boolean = true,
exchangePassive: Boolean = false,
configurationArguments: Map[String, AnyRef] = Map())
exchangeType: ExchangeType = ExchangeType.Topic,
exchangeDeclaration: Declaration = new ActiveDeclaration(),
configurationArguments: Map[String, AnyRef] = Map.empty()) {
// Needed for Java API usage
def this(exchangeName: String) =
this (exchangeName, ExchangeType.Topic, new ActiveDeclaration(), Map.empty())
// Needed for Java API usage
def this(exchangeName: String, exchangeType: ExchangeType) =
this (exchangeName, exchangeType, new ActiveDeclaration(), Map.empty())
// Needed for Java API usage
def this(exchangeName: String, exchangeType: ExchangeType, exchangeDeclaration: Declaration) =
this (exchangeName, exchangeType, exchangeDeclaration, Map.empty())
}
/**
* Producer specific parameters
*/
case class ProducerParameters(
exchangeParameters: ExchangeParameters,
exchangeParameters: Option[ExchangeParameters] = None,
producerId: Option[String] = None,
returnListener: Option[ReturnListener] = None,
channelParameters: Option[ChannelParameters] = None)
channelParameters: Option[ChannelParameters] = None) {
def this() = this(None, None, None, None)
// Needed for Java API usage
def this(exchangeParameters: ExchangeParameters) = this (Some(exchangeParameters), None, None, None)
// Needed for Java API usage
def this(exchangeParameters: ExchangeParameters, producerId: String) =
this (Some(exchangeParameters), Some(producerId), None, None)
// Needed for Java API usage
def this(exchangeParameters: ExchangeParameters, returnListener: ReturnListener) =
this (Some(exchangeParameters), None, Some(returnListener), None)
// Needed for Java API usage
def this(exchangeParameters: ExchangeParameters, channelParameters: ChannelParameters) =
this (Some(exchangeParameters), None, None, Some(channelParameters))
// Needed for Java API usage
def this(exchangeParameters: ExchangeParameters, producerId: String, returnListener: ReturnListener, channelParameters: ChannelParameters) =
this (Some(exchangeParameters), Some(producerId), Some(returnListener), Some(channelParameters))
}
/**
* Consumer specific parameters
*/
case class ConsumerParameters(
exchangeParameters: ExchangeParameters,
routingKey: String,
deliveryHandler: ActorRef,
queueName: Option[String] = None,
queueDurable: Boolean = false,
queueAutoDelete: Boolean = true,
queuePassive: Boolean = false,
queueExclusive: Boolean = false,
exchangeParameters: Option[ExchangeParameters],
queueDeclaration: Declaration = new ActiveDeclaration(),
selfAcknowledging: Boolean = true,
channelParameters: Option[ChannelParameters] = None) {
if (queueDurable && queueName.isEmpty) {
throw new IllegalArgumentException("A queue name is required when requesting a durable queue.")
if (queueName.isEmpty) {
queueDeclaration match {
case ActiveDeclaration(true, _, _) =>
throw new IllegalArgumentException("A queue name is required when requesting a durable queue.")
case PassiveDeclaration =>
throw new IllegalArgumentException("A queue name is required when requesting passive declaration.")
case NoActionDeclaration => () // ignore
}
}
// Needed for Java API usage
def this(routingKey: String, deliveryHandler: ActorRef) =
this (routingKey, deliveryHandler, None, None, new ActiveDeclaration(), true, None)
// Needed for Java API usage
def this(routingKey: String, deliveryHandler: ActorRef, channelParameters: ChannelParameters) =
this (routingKey, deliveryHandler, None, None, new ActiveDeclaration(), true, Some(channelParameters))
// Needed for Java API usage
def this(routingKey: String, deliveryHandler: ActorRef, selfAcknowledging: Boolean) =
this (routingKey, deliveryHandler, None, None, new ActiveDeclaration(), selfAcknowledging, None)
// Needed for Java API usage
def this(routingKey: String, deliveryHandler: ActorRef, selfAcknowledging: Boolean, channelParameters: ChannelParameters) =
this (routingKey, deliveryHandler, None, None, new ActiveDeclaration(), selfAcknowledging, Some(channelParameters))
// Needed for Java API usage
def this(routingKey: String, deliveryHandler: ActorRef, queueName: String) =
this (routingKey, deliveryHandler, Some(queueName), None, new ActiveDeclaration(), true, None)
// Needed for Java API usage
def this(routingKey: String, deliveryHandler: ActorRef, queueName: String, queueDeclaration: Declaration, selfAcknowledging: Boolean, channelParameters: ChannelParameters) =
this (routingKey, deliveryHandler, Some(queueName), None, queueDeclaration, selfAcknowledging, Some(channelParameters))
// Needed for Java API usage
def this(routingKey: String, deliveryHandler: ActorRef, exchangeParameters: ExchangeParameters) =
this (routingKey, deliveryHandler, None, Some(exchangeParameters), new ActiveDeclaration(), true, None)
// Needed for Java API usage
def this(routingKey: String, deliveryHandler: ActorRef, exchangeParameters: ExchangeParameters, selfAcknowledging: Boolean) =
this (routingKey, deliveryHandler, None, Some(exchangeParameters), new ActiveDeclaration(), selfAcknowledging, None)
// Needed for Java API usage
def this(routingKey: String, deliveryHandler: ActorRef, queueName: String, exchangeParameters: ExchangeParameters) =
this (routingKey, deliveryHandler, Some(queueName), Some(exchangeParameters), new ActiveDeclaration(), true, None)
// Needed for Java API usage
def this(routingKey: String, deliveryHandler: ActorRef, queueName: String, exchangeParameters: ExchangeParameters, queueDeclaration: Declaration) =
this (routingKey, deliveryHandler, Some(queueName), Some(exchangeParameters), queueDeclaration, true, None)
// Needed for Java API usage
def this(routingKey: String, deliveryHandler: ActorRef, queueName: String, exchangeParameters: ExchangeParameters, queueDeclaration: Declaration, selfAcknowledging: Boolean) =
this (routingKey, deliveryHandler, Some(queueName), Some(exchangeParameters), queueDeclaration, selfAcknowledging, None)
// Needed for Java API usage
def this(routingKey: String, deliveryHandler: ActorRef, queueName: String, exchangeParameters: ExchangeParameters, queueDeclaration: Declaration, selfAcknowledging: Boolean, channelParameters: ChannelParameters) =
this (routingKey, deliveryHandler, Some(queueName), Some(exchangeParameters), queueDeclaration, selfAcknowledging, Some(channelParameters))
// How about that for some overloading... huh? :P (yes, I know, there are still posibilities left...sue me!)
// Who said java is easy :(
}
def newConnection(connectionParameters: ConnectionParameters = new ConnectionParameters): ActorRef = {
def newConnection(connectionParameters: ConnectionParameters = new ConnectionParameters()): ActorRef = {
val connection = actorOf(new FaultTolerantConnectionActor(connectionParameters))
supervisor.startLink(connection)
connection ! Connect
connection
}
// Needed for Java API usage
def newConnection(): ActorRef = {
newConnection(new ConnectionParameters())
}
def newProducer(connection: ActorRef, producerParameters: ProducerParameters): ActorRef = {
val producer: ActorRef = Actor.actorOf(new ProducerActor(producerParameters))
connection.startLink(producer)
@ -86,7 +239,7 @@ object AMQP {
}
/**
* Convenience
* Convenience
*/
class ProducerClient[O](client: ActorRef, routingKey: String, toBinary: ToBinary[O]) {
def send(request: O, replyTo: Option[String] = None) = {
@ -95,20 +248,19 @@ object AMQP {
client ! Message(toBinary.toBinary(request), routingKey, false, false, Some(basicProperties))
}
def stop = client.stop
def stop() = client.stop
}
def newStringProducer(connection: ActorRef,
exchange: String,
routingKey: Option[String] = None,
producerId: Option[String] = None,
durable: Boolean = false,
autoDelete: Boolean = true,
passive: Boolean = true): ProducerClient[String] = {
exchangeName: Option[String],
routingKey: Option[String] = None,
producerId: Option[String] = None): ProducerClient[String] = {
val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic,
exchangeDurable = durable, exchangeAutoDelete = autoDelete)
val rKey = routingKey.getOrElse("%s.request".format(exchange))
if (exchangeName.isEmpty && routingKey.isEmpty) {
throw new IllegalArgumentException("Either exchange name or routing key is mandatory")
}
val exchangeParameters = exchangeName.flatMap(name => Some(ExchangeParameters(name)))
val rKey = routingKey.getOrElse("%s.request".format(exchangeName.get))
val producerRef = newProducer(connection, ProducerParameters(exchangeParameters, producerId))
val toBinary = new ToBinary[String] {
@ -117,37 +269,57 @@ object AMQP {
new ProducerClient(producerRef, rKey, toBinary)
}
// Needed for Java API usage
def newStringProducer(connection: ActorRef): ProducerClient[String] = {
newStringProducer(connection, None, None, None)
}
// Needed for Java API usage
def newStringProducer(connection: ActorRef, exchangeName: String): ProducerClient[String] = {
newStringProducer(connection, Some(exchangeName), None, None)
}
// Needed for Java API usage
def newStringProducer(connection: ActorRef, exchangeName: String, routingKey: String): ProducerClient[String] = {
newStringProducer(connection, Some(exchangeName), Some(routingKey), None)
}
// Needed for Java API usage
def newStringProducer(connection: ActorRef, exchangeName: String, routingKey: String, producerId: String): ProducerClient[String] = {
newStringProducer(connection, Some(exchangeName), Some(routingKey), Some(producerId))
}
def newStringConsumer(connection: ActorRef,
exchange: String,
handler: String => Unit,
routingKey: Option[String] = None,
queueName: Option[String] = None,
durable: Boolean = false,
autoDelete: Boolean = true): ActorRef = {
handler: String => Unit,
exchangeName: Option[String],
routingKey: Option[String] = None,
queueName: Option[String] = None): ActorRef = {
if (exchangeName.isEmpty && routingKey.isEmpty) {
throw new IllegalArgumentException("Either exchange name or routing key is mandatory")
}
val deliveryHandler = actor {
case Delivery(payload, _, _, _, _) => handler.apply(new String(payload))
}
val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic,
exchangeDurable = durable, exchangeAutoDelete = autoDelete)
val rKey = routingKey.getOrElse("%s.request".format(exchange))
val exchangeParameters = exchangeName.flatMap(name => Some(ExchangeParameters(name)))
val rKey = routingKey.getOrElse("%s.request".format(exchangeName.get))
val qName = queueName.getOrElse("%s.in".format(rKey))
newConsumer(connection, ConsumerParameters(exchangeParameters, rKey, deliveryHandler, Some(qName), durable, autoDelete))
newConsumer(connection, ConsumerParameters(rKey, deliveryHandler, Some(qName), exchangeParameters))
}
def newProtobufProducer[O <: com.google.protobuf.Message](connection: ActorRef,
exchange: String,
routingKey: Option[String] = None,
producerId: Option[String] = None,
durable: Boolean = false,
autoDelete: Boolean = true,
passive: Boolean = true): ProducerClient[O] = {
exchangeName: Option[String],
routingKey: Option[String] = None,
producerId: Option[String] = None): ProducerClient[O] = {
val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic,
exchangeDurable = durable, exchangeAutoDelete = autoDelete)
val rKey = routingKey.getOrElse("%s.request".format(exchange))
if (exchangeName.isEmpty && routingKey.isEmpty) {
throw new IllegalArgumentException("Either exchange name or routing key is mandatory")
}
val exchangeParameters = exchangeName.flatMap(name => Some(ExchangeParameters(name)))
val rKey = routingKey.getOrElse("%s.request".format(exchangeName.get))
val producerRef = newProducer(connection, ProducerParameters(exchangeParameters, producerId))
new ProducerClient(producerRef, rKey, new ToBinary[O] {
@ -156,12 +328,14 @@ object AMQP {
}
def newProtobufConsumer[I <: com.google.protobuf.Message](connection: ActorRef,
exchange: String,
handler: I => Unit,
routingKey: Option[String] = None,
queueName: Option[String] = None,
durable: Boolean = false,
autoDelete: Boolean = true)(implicit manifest: Manifest[I]): ActorRef = {
handler: I => Unit,
exchangeName: Option[String],
routingKey: Option[String] = None,
queueName: Option[String] = None)(implicit manifest: Manifest[I]): ActorRef = {
if (exchangeName.isEmpty && routingKey.isEmpty) {
throw new IllegalArgumentException("Either exchange name or routing key is mandatory")
}
val deliveryHandler = actor {
case Delivery(payload, _, _, _, _) => {
@ -169,22 +343,20 @@ object AMQP {
}
}
val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic,
exchangeDurable = durable, exchangeAutoDelete = autoDelete)
val rKey = routingKey.getOrElse("%s.request".format(exchange))
val exchangeParameters = exchangeName.flatMap(name => Some(ExchangeParameters(name)))
val rKey = routingKey.getOrElse("%s.request".format(exchangeName.get))
val qName = queueName.getOrElse("%s.in".format(rKey))
newConsumer(connection, ConsumerParameters(exchangeParameters, rKey, deliveryHandler, Some(qName), durable, autoDelete))
newConsumer(connection, ConsumerParameters(rKey, deliveryHandler, Some(qName), exchangeParameters))
}
/**
* Main supervisor
*/
class AMQPSupervisorActor extends Actor {
import self._
faultHandler = Some(OneForOneStrategy(5, 5000))
faultHandler = Some(OneForOneStrategy(None, None)) // never die
trapExit = List(classOf[Throwable])
def receive = {
@ -194,7 +366,7 @@ object AMQP {
private val supervisor = actorOf(new AMQPSupervisorActor).start
def shutdownAll = {
def shutdownAll() = {
supervisor.shutdownLinkedActors
}

View file

@ -6,20 +6,16 @@ package se.scalablesolutions.akka.amqp
import collection.JavaConversions
import se.scalablesolutions.akka.amqp.AMQP.ConsumerParameters
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.AkkaException
import com.rabbitmq.client.AMQP.Queue.DeclareOk
import com.rabbitmq.client.AMQP.BasicProperties
import com.rabbitmq.client.{Channel, Envelope, DefaultConsumer}
import se.scalablesolutions.akka.amqp.AMQP.{NoActionDeclaration, ActiveDeclaration, PassiveDeclaration, ConsumerParameters}
private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters)
extends FaultTolerantChannelActor(
consumerParameters.exchangeParameters, consumerParameters.channelParameters) {
extends FaultTolerantChannelActor(
consumerParameters.exchangeParameters, consumerParameters.channelParameters) {
import consumerParameters._
import exchangeParameters._
var listenerTag: Option[String] = None
@ -34,15 +30,21 @@ private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters)
protected def setupChannel(ch: Channel) = {
val queueDeclare: DeclareOk = {
val queueDeclare: com.rabbitmq.client.AMQP.Queue.DeclareOk = {
queueName match {
case Some(name) =>
log.debug("Declaring new queue [%s] for %s", name, toString)
if (queuePassive) ch.queueDeclarePassive(name)
else {
ch.queueDeclare(
name, queueDurable, queueExclusive, queueAutoDelete,
JavaConversions.asMap(configurationArguments))
queueDeclaration match {
case PassiveDeclaration =>
log.debug("Passively declaring new queue [%s] for %s", name, toString)
ch.queueDeclarePassive(name)
case ActiveDeclaration(durable, autoDelete, exclusive) =>
log.debug("Actively declaring new queue [%s] for %s", name, toString)
val configurationArguments = exchangeParameters match {
case Some(params) => params.configurationArguments
case _ => Map.empty()
}
ch.queueDeclare(name, durable, exclusive, autoDelete, JavaConversions.asMap(configurationArguments))
case NoActionDeclaration => new com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk(name, 0, 0) // do nothing here
}
case None =>
log.debug("Declaring new generated queue for %s", toString)
@ -50,8 +52,11 @@ private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters)
}
}
log.debug("Binding new queue [%s] for %s", queueDeclare.getQueue, toString)
ch.queueBind(queueDeclare.getQueue, exchangeName, routingKey)
exchangeParameters.foreach {
params =>
log.debug("Binding new queue [%s] for %s", queueDeclare.getQueue, toString)
ch.queueBind(queueDeclare.getQueue, params.exchangeName, routingKey)
}
val tag = ch.basicConsume(queueDeclare.getQueue, false, new DefaultConsumer(ch) with Logging {
override def handleDelivery(tag: String, envelope: Envelope, properties: BasicProperties, payload: Array[Byte]) {
@ -77,11 +82,12 @@ private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters)
private def acknowledgeDeliveryTag(deliveryTag: Long, remoteAcknowledgement: Boolean) = {
log.debug("Acking message with delivery tag [%s]", deliveryTag)
channel.foreach{ch =>
ch.basicAck(deliveryTag, false)
if (remoteAcknowledgement) {
deliveryHandler ! Acknowledged(deliveryTag)
}
channel.foreach {
ch =>
ch.basicAck(deliveryTag, false)
if (remoteAcknowledgement) {
deliveryHandler ! Acknowledged(deliveryTag)
}
}
}
@ -90,10 +96,11 @@ private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters)
// FIXME: when rabbitmq 1.9 arrives, basicReject should be available on the API and implemented instead of this
log.warning("Consumer is rejecting delivery with tag [%s] - " +
"for now this means we have to self terminate and kill the channel - see you in a second.")
channel.foreach{ch =>
if (remoteAcknowledgement) {
deliveryHandler ! Rejected(deliveryTag)
}
channel.foreach {
ch =>
if (remoteAcknowledgement) {
deliveryHandler ! Rejected(deliveryTag)
}
}
throw new RejectionException(deliveryTag)
}
@ -115,10 +122,8 @@ private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters)
}
override def toString =
"AMQP.Consumer[id= "+ self.id +
", exchange=" + exchangeName +
", exchangeType=" + exchangeType +
", durable=" + exchangeDurable +
", autoDelete=" + exchangeAutoDelete + "]"
"AMQP.Consumer[id= " + self.id +
", exchangeParameters=" + exchangeParameters +
", queueDeclaration=" + queueDeclaration + "]"
}

View file

@ -69,11 +69,11 @@ object ExampleSession {
val exchangeParameters = ExchangeParameters("my_direct_exchange", ExchangeType.Direct)
val consumer = AMQP.newConsumer(connection, ConsumerParameters(exchangeParameters, "some.routing", actor {
val consumer = AMQP.newConsumer(connection, ConsumerParameters("some.routing", actor {
case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload))
}))
}, None, Some(exchangeParameters)))
val producer = AMQP.newProducer(connection, ProducerParameters(exchangeParameters))
val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters)))
producer ! Message("@jonas_boner: You sucked!!".getBytes, "some.routing")
}
@ -84,15 +84,15 @@ object ExampleSession {
val exchangeParameters = ExchangeParameters("my_fanout_exchange", ExchangeType.Fanout)
val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters(exchangeParameters, "@george_bush", actor {
val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters("@george_bush", actor {
case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload))
}))
}, None, Some(exchangeParameters)))
val obamaConsumer = AMQP.newConsumer(connection, ConsumerParameters(exchangeParameters, "@barack_obama", actor {
val obamaConsumer = AMQP.newConsumer(connection, ConsumerParameters("@barack_obama", actor {
case Delivery(payload, _, _, _, _) => log.info("@barack_obama received message from: %s", new String(payload))
}))
}, None, Some(exchangeParameters)))
val producer = AMQP.newProducer(connection, ProducerParameters(exchangeParameters))
val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters)))
producer ! Message("@jonas_boner: I'm going surfing".getBytes, "")
}
@ -103,15 +103,15 @@ object ExampleSession {
val exchangeParameters = ExchangeParameters("my_topic_exchange", ExchangeType.Topic)
val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters(exchangeParameters, "@george_bush", actor {
val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters("@george_bush", actor {
case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload))
}))
}, None, Some(exchangeParameters)))
val obamaConsumer = AMQP.newConsumer(connection, ConsumerParameters(exchangeParameters, "@barack_obama", actor {
val obamaConsumer = AMQP.newConsumer(connection, ConsumerParameters("@barack_obama", actor {
case Delivery(payload, _, _, _, _) => log.info("@barack_obama received message from: %s", new String(payload))
}))
}, None, Some(exchangeParameters)))
val producer = AMQP.newProducer(connection, ProducerParameters(exchangeParameters))
val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters)))
producer ! Message("@jonas_boner: You still suck!!".getBytes, "@george_bush")
producer ! Message("@jonas_boner: Yes I can!".getBytes, "@barack_obama")
}
@ -138,11 +138,11 @@ object ExampleSession {
val exchangeParameters = ExchangeParameters("my_callback_exchange", ExchangeType.Direct)
val channelParameters = ChannelParameters(channelCallback = Some(channelCallback))
val consumer = AMQP.newConsumer(connection, ConsumerParameters(exchangeParameters, "callback.routing", actor {
val consumer = AMQP.newConsumer(connection, ConsumerParameters("callback.routing", actor {
case _ => () // not used
}, channelParameters = Some(channelParameters)))
}, None, Some(exchangeParameters), channelParameters = Some(channelParameters)))
val producer = AMQP.newProducer(connection, ProducerParameters(exchangeParameters))
val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters)))
// Wait until both channels (producer & consumer) are started before stopping the connection
channelCountdown.await(2, TimeUnit.SECONDS)
@ -155,10 +155,10 @@ object ExampleSession {
val exchangeName = "easy.string"
// listen by default to:
// exchange = exchangeName
// routingKey = <exchange>.request
// exchange = optional exchangeName
// routingKey = provided routingKey or <exchangeName>.request
// queueName = <routingKey>.in
AMQP.newStringConsumer(connection, exchangeName, message => println("Received message: "+message))
AMQP.newStringConsumer(connection, message => println("Received message: "+message), Some(exchangeName))
// send by default to:
// exchange = exchangeName
@ -177,9 +177,9 @@ object ExampleSession {
log.info("Received "+message)
}
AMQP.newProtobufConsumer(connection, exchangeName, protobufMessageHandler)
AMQP.newProtobufConsumer(connection, protobufMessageHandler, Some(exchangeName))
val producerClient = AMQP.newProtobufProducer[AddressProtocol](connection, exchangeName)
val producerClient = AMQP.newProtobufProducer[AddressProtocol](connection, Some(exchangeName))
producerClient.send(AddressProtocol.newBuilder.setHostname("akkarocks.com").setPort(1234).build)
}
@ -187,7 +187,7 @@ object ExampleSession {
val connection = AMQP.newConnection()
val exchangeParameters = ExchangeParameters("my_rpc_exchange", ExchangeType.Topic)
val exchangeName = "my_rpc_exchange"
/** Server */
val serverFromBinary = new FromBinary[String] {
@ -200,7 +200,7 @@ object ExampleSession {
def requestHandler(request: String) = 3
val rpcServer = RPC.newRpcServer[String,Int](connection, exchangeParameters, "rpc.in.key", rpcServerSerializer,
val rpcServer = RPC.newRpcServer[String,Int](connection, exchangeName, "rpc.in.key", rpcServerSerializer,
requestHandler, queueName = Some("rpc.in.key.queue"))
@ -213,7 +213,7 @@ object ExampleSession {
}
val rpcClientSerializer = new RpcClientSerializer[String, Int](clientToBinary, clientFromBinary)
val rpcClient = RPC.newRpcClient[String,Int](connection, exchangeParameters, "rpc.in.key", rpcClientSerializer)
val rpcClient = RPC.newRpcClient[String,Int](connection, exchangeName, "rpc.in.key", rpcClientSerializer)
val response = (rpcClient !! "rpc_request")
log.info("Response: " + response)

View file

@ -10,13 +10,10 @@ import se.scalablesolutions.akka.actor.Actor
import Actor._
import com.rabbitmq.client.{ShutdownSignalException, Channel, ShutdownListener}
import scala.PartialFunction
import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ChannelParameters}
import se.scalablesolutions.akka.amqp.AMQP._
abstract private[amqp] class FaultTolerantChannelActor(
exchangeParameters: ExchangeParameters, channelParameters: Option[ChannelParameters]) extends Actor {
import exchangeParameters._
exchangeParameters: Option[ExchangeParameters], channelParameters: Option[ChannelParameters]) extends Actor {
protected[amqp] var channel: Option[Channel] = None
log.info("%s is started", toString)
@ -64,12 +61,16 @@ abstract private[amqp] class FaultTolerantChannelActor(
protected def setupChannel(ch: Channel)
private def setupChannelInternal(ch: Channel) = if (channel.isEmpty) {
if (exchangeName != "") {
if (exchangePassive) {
ch.exchangeDeclarePassive(exchangeName)
} else {
ch.exchangeDeclare(exchangeName, exchangeType.toString, exchangeDurable, exchangeAutoDelete, JavaConversions.asMap(configurationArguments))
}
exchangeParameters.foreach {
params =>
import params._
exchangeDeclaration match {
case PassiveDeclaration => ch.exchangeDeclarePassive(exchangeName)
case ActiveDeclaration(durable, autoDelete, _) =>
ch.exchangeDeclare(exchangeName, exchangeType.toString, durable, autoDelete, JavaConversions.asMap(configurationArguments))
case NoActionDeclaration => // ignore
}
}
ch.addShutdownListener(new ShutdownListener {
def shutdownCompleted(cause: ShutdownSignalException) = {

View file

@ -19,7 +19,7 @@ private[amqp] class FaultTolerantConnectionActor(connectionParameters: Connectio
self.lifeCycle = Some(LifeCycle(Permanent))
self.trapExit = List(classOf[Throwable])
self.faultHandler = Some(OneForOneStrategy(5, 5000))
self.faultHandler = Some(OneForOneStrategy(None, None)) // never die
val reconnectionTimer = new Timer("%s-timer".format(self.id))

View file

@ -7,14 +7,14 @@ package se.scalablesolutions.akka.amqp
import com.rabbitmq.client._
import se.scalablesolutions.akka.amqp.AMQP.ProducerParameters
import se.scalablesolutions.akka.AkkaException
private[amqp] class ProducerActor(producerParameters: ProducerParameters)
extends FaultTolerantChannelActor(
producerParameters.exchangeParameters, producerParameters.channelParameters) {
import producerParameters._
import exchangeParameters._
val exchangeName = exchangeParameters.flatMap(params => Some(params.exchangeName))
producerId.foreach(id => self.id = id)
@ -22,7 +22,7 @@ private[amqp] class ProducerActor(producerParameters: ProducerParameters)
case message@Message(payload, routingKey, mandatory, immediate, properties) if channel.isDefined => {
log.debug("Sending message [%s]", message)
channel.foreach(_.basicPublish(exchangeName, routingKey, mandatory, immediate, properties.getOrElse(null), payload))
channel.foreach(_.basicPublish(exchangeName.getOrElse(null), routingKey, mandatory, immediate, properties.getOrElse(null), payload))
}
case message@Message(payload, routingKey, mandatory, immediate, properties) => {
log.warning("Unable to send message [%s]", message)
@ -55,9 +55,6 @@ private[amqp] class ProducerActor(producerParameters: ProducerParameters)
override def toString =
"AMQP.Poducer[id= "+ self.id +
", exchange=" + exchangeName +
", exchangeType=" + exchangeType +
", durable=" + exchangeDurable +
", autoDelete=" + exchangeAutoDelete + "]"
", exchangeParameters=" + exchangeParameters + "]"
}

View file

@ -9,29 +9,30 @@ import se.scalablesolutions.akka.amqp._
object RPC {
def newRpcClient[O, I](connection: ActorRef,
exchangeParameters: ExchangeParameters,
exchangeName: String,
routingKey: String,
serializer: RpcClientSerializer[O, I],
channelParameters: Option[ChannelParameters] = None): ActorRef = {
val rpcActor: ActorRef = actorOf(new RpcClientActor[O, I](
exchangeParameters, routingKey, serializer, channelParameters))
ExchangeParameters(exchangeName), routingKey, serializer, channelParameters))
connection.startLink(rpcActor)
rpcActor ! Start
rpcActor
}
def newRpcServer[I, O](connection: ActorRef,
exchangeParameters: ExchangeParameters,
exchangeName: String,
routingKey: String,
serializer: RpcServerSerializer[I, O],
requestHandler: I => O,
queueName: Option[String] = None,
channelParameters: Option[ChannelParameters] = None): RpcServerHandle = {
val producer = newProducer(connection, ProducerParameters(
ExchangeParameters("", ExchangeType.Direct), channelParameters = channelParameters))
val producer = newProducer(connection, ProducerParameters(channelParameters = channelParameters))
val rpcServer = actorOf(new RpcServerActor[I, O](producer, serializer, requestHandler))
val consumer = newConsumer(connection, ConsumerParameters(exchangeParameters, routingKey, rpcServer,
channelParameters = channelParameters, selfAcknowledging = false, queueName = queueName))
val consumer = newConsumer(connection, ConsumerParameters(routingKey, rpcServer,
exchangeParameters = Some(ExchangeParameters(exchangeName)), channelParameters = channelParameters,
selfAcknowledging = false, queueName = queueName))
RpcServerHandle(producer, consumer)
}
@ -66,7 +67,7 @@ object RPC {
def newProtobufRpcServer[I <: Message, O <: Message](
connection: ActorRef,
exchange: String,
exchangeName: String,
requestHandler: I => O,
routingKey: Option[String] = None,
queueName: Option[String] = None,
@ -82,12 +83,12 @@ object RPC {
def toBinary(t: O) = t.toByteArray
})
startServer(connection, exchange, requestHandler, routingKey, queueName, durable, autoDelete, serializer)
startServer(connection, exchangeName, requestHandler, routingKey, queueName, durable, autoDelete, serializer)
}
def newProtobufRpcClient[O <: Message, I <: Message](
connection: ActorRef,
exchange: String,
exchangeName: String,
routingKey: Option[String] = None,
durable: Boolean = false,
autoDelete: Boolean = true,
@ -103,11 +104,11 @@ object RPC {
}
})
startClient(connection, exchange, routingKey, durable, autoDelete, passive, serializer)
startClient(connection, exchangeName, routingKey, durable, autoDelete, passive, serializer)
}
def newStringRpcServer(connection: ActorRef,
exchange: String,
exchangeName: String,
requestHandler: String => String,
routingKey: Option[String] = None,
queueName: Option[String] = None,
@ -123,7 +124,7 @@ object RPC {
def toBinary(t: String) = t.getBytes
})
startServer(connection, exchange, requestHandler, routingKey, queueName, durable, autoDelete, serializer)
startServer(connection, exchangeName, requestHandler, routingKey, queueName, durable, autoDelete, serializer)
}
def newStringRpcClient(connection: ActorRef,
@ -147,23 +148,21 @@ object RPC {
}
private def startClient[O, I](connection: ActorRef,
exchange: String,
exchangeName: String,
routingKey: Option[String] = None,
durable: Boolean = false,
autoDelete: Boolean = true,
passive: Boolean = true,
serializer: RpcClientSerializer[O, I]): RpcClient[O, I] = {
val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic,
exchangeDurable = durable, exchangeAutoDelete = autoDelete, exchangePassive = passive)
val rKey = routingKey.getOrElse("%s.request".format(exchange))
val rKey = routingKey.getOrElse("%s.request".format(exchangeName))
val client = newRpcClient(connection, exchangeParameters, rKey, serializer)
val client = newRpcClient(connection, exchangeName, rKey, serializer)
new RpcClient(client)
}
private def startServer[I, O](connection: ActorRef,
exchange: String,
exchangeName: String,
requestHandler: I => O,
routingKey: Option[String] = None,
queueName: Option[String] = None,
@ -171,12 +170,10 @@ object RPC {
autoDelete: Boolean = true,
serializer: RpcServerSerializer[I, O]): RpcServerHandle = {
val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic,
exchangeDurable = durable, exchangeAutoDelete = autoDelete)
val rKey = routingKey.getOrElse("%s.request".format(exchange))
val rKey = routingKey.getOrElse("%s.request".format(exchangeName))
val qName = queueName.getOrElse("%s.in".format(rKey))
newRpcServer(connection, exchangeParameters, rKey, serializer, requestHandler, queueName = Some(qName))
newRpcServer(connection, exchangeName, rKey, serializer, requestHandler, Some(qName))
}
}

View file

@ -13,7 +13,7 @@ class RpcClientActor[I,O](
routingKey: String,
serializer: RpcClientSerializer[I,O],
channelParameters: Option[ChannelParameters] = None)
extends FaultTolerantChannelActor(exchangeParameters, channelParameters) {
extends FaultTolerantChannelActor(Some(exchangeParameters), channelParameters) {
import exchangeParameters._

View file

@ -40,19 +40,19 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
// -------------------------------------------------------------------------------------------------------------------
object Repositories {
lazy val AkkaRepo = MavenRepository("Akka Repository", "http://scalablesolutions.se/akka/repository")
lazy val CasbahRepo = MavenRepository("Casbah Repo", "http://repo.bumnetworks.com/releases")
lazy val CasbahSnapshotRepo = MavenRepository("Casbah Snapshots", "http://repo.bumnetworks.com/snapshots")
lazy val CodehausRepo = MavenRepository("Codehaus Repo", "http://repository.codehaus.org")
// lazy val AkkaRepo = MavenRepository("Akka Repository", "http://scalablesolutions.se/akka/repository")
// lazy val CasbahRepo = MavenRepository("Casbah Repo", "http://repo.bumnetworks.com/releases")
// lazy val CasbahSnapshotRepo = MavenRepository("Casbah Snapshots", "http://repo.bumnetworks.com/snapshots")
// lazy val CodehausRepo = MavenRepository("Codehaus Repo", "http://repository.codehaus.org")
lazy val EmbeddedRepo = MavenRepository("Embedded Repo", (info.projectPath / "embedded-repo").asURL.toString)
lazy val FusesourceSnapshotRepo = MavenRepository("Fusesource Snapshots", "http://repo.fusesource.com/nexus/content/repositories/snapshots")
lazy val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/")
lazy val JBossRepo = MavenRepository("JBoss Repo", "https://repository.jboss.org/nexus/content/groups/public/")
lazy val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2")
lazy val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases")
lazy val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo")
lazy val CasbahRepoReleases = MavenRepository("Casbah Release Repo", "http://repo.bumnetworks.com/releases")
lazy val ZookeeperRepo = MavenRepository("Zookeeper Repo", "http://lilycms.org/maven/maven2/deploy/")
// lazy val FusesourceSnapshotRepo = MavenRepository("Fusesource Snapshots", "http://repo.fusesource.com/nexus/content/repositories/snapshots")
// lazy val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/")
// lazy val JBossRepo = MavenRepository("JBoss Repo", "https://repository.jboss.org/nexus/content/groups/public/")
// lazy val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2")
// lazy val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases")
// lazy val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo")
// lazy val CasbahRepoReleases = MavenRepository("Casbah Release Repo", "http://repo.bumnetworks.com/releases")
// lazy val ZookeeperRepo = MavenRepository("Zookeeper Repo", "http://lilycms.org/maven/maven2/deploy/")
}
// -------------------------------------------------------------------------------------------------------------------
@ -63,27 +63,31 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
// -------------------------------------------------------------------------------------------------------------------
import Repositories._
lazy val atmosphereModuleConfig = ModuleConfiguration("org.atmosphere", SonatypeSnapshotRepo)
lazy val jettyModuleConfig = ModuleConfiguration("org.eclipse.jetty", sbt.DefaultMavenRepository)
lazy val guiceyFruitModuleConfig = ModuleConfiguration("org.guiceyfruit", GuiceyFruitRepo)
// lazy val hawtdispatchModuleConfig = ModuleConfiguration("org.fusesource.hawtdispatch", FusesourceSnapshotRepo)
lazy val jbossModuleConfig = ModuleConfiguration("org.jboss", JBossRepo)
lazy val jdmkModuleConfig = ModuleConfiguration("com.sun.jdmk", SunJDMKRepo)
lazy val jmsModuleConfig = ModuleConfiguration("javax.jms", SunJDMKRepo)
lazy val jmxModuleConfig = ModuleConfiguration("com.sun.jmx", SunJDMKRepo)
lazy val jerseyContrModuleConfig = ModuleConfiguration("com.sun.jersey.contribs", JavaNetRepo)
lazy val jerseyModuleConfig = ModuleConfiguration("com.sun.jersey", JavaNetRepo)
lazy val jgroupsModuleConfig = ModuleConfiguration("jgroups", JBossRepo)
lazy val multiverseModuleConfig = ModuleConfiguration("org.multiverse", CodehausRepo)
lazy val nettyModuleConfig = ModuleConfiguration("org.jboss.netty", JBossRepo)
lazy val scalaTestModuleConfig = ModuleConfiguration("org.scalatest", ScalaToolsSnapshots)
lazy val logbackModuleConfig = ModuleConfiguration("ch.qos.logback",sbt.DefaultMavenRepository)
lazy val atomikosModuleConfig = ModuleConfiguration("com.atomikos",sbt.DefaultMavenRepository)
lazy val casbahRelease = ModuleConfiguration("com.novus",CasbahRepoReleases)
lazy val zookeeperRelease = ModuleConfiguration("org.apache.hadoop.zookeeper",ZookeeperRepo)
lazy val casbahModuleConfig = ModuleConfiguration("com.novus", CasbahRepo)
lazy val timeModuleConfig = ModuleConfiguration("org.scala-tools", "time", CasbahSnapshotRepo)
// lazy val atmosphereModuleConfig = ModuleConfiguration("org.atmosphere", SonatypeSnapshotRepo)
// lazy val jettyModuleConfig = ModuleConfiguration("org.eclipse.jetty", sbt.DefaultMavenRepository)
// lazy val guiceyFruitModuleConfig = ModuleConfiguration("org.guiceyfruit", GuiceyFruitRepo)
// // lazy val hawtdispatchModuleConfig = ModuleConfiguration("org.fusesource.hawtdispatch", FusesourceSnapshotRepo)
// lazy val jbossModuleConfig = ModuleConfiguration("org.jboss", JBossRepo)
// lazy val jdmkModuleConfig = ModuleConfiguration("com.sun.jdmk", SunJDMKRepo)
// lazy val jmsModuleConfig = ModuleConfiguration("javax.jms", SunJDMKRepo)
// lazy val jmxModuleConfig = ModuleConfiguration("com.sun.jmx", SunJDMKRepo)
// lazy val jerseyContrModuleConfig = ModuleConfiguration("com.sun.jersey.contribs", JavaNetRepo)
// lazy val jerseyModuleConfig = ModuleConfiguration("com.sun.jersey", JavaNetRepo)
// lazy val jgroupsModuleConfig = ModuleConfiguration("jgroups", JBossRepo)
// lazy val multiverseModuleConfig = ModuleConfiguration("org.multiverse", CodehausRepo)
// lazy val nettyModuleConfig = ModuleConfiguration("org.jboss.netty", JBossRepo)
// lazy val scalaTestModuleConfig = ModuleConfiguration("org.scalatest", ScalaToolsSnapshots)
// lazy val logbackModuleConfig = ModuleConfiguration("ch.qos.logback",sbt.DefaultMavenRepository)
// lazy val atomikosModuleConfig = ModuleConfiguration("com.atomikos",sbt.DefaultMavenRepository)
// lazy val casbahRelease = ModuleConfiguration("com.novus",CasbahRepoReleases)
// lazy val zookeeperRelease = ModuleConfiguration("org.apache.hadoop.zookeeper",ZookeeperRepo)
// lazy val casbahModuleConfig = ModuleConfiguration("com.novus", CasbahRepo)
// lazy val timeModuleConfig = ModuleConfiguration("org.scala-tools", "time", CasbahSnapshotRepo)
lazy val embeddedRepo = EmbeddedRepo // This is the only exception, because the embedded repo is fast!
val mavenLocal = "Local Maven Repository" at "file:/e:/maven-repository"
val efgfpNexusReleasesRepository = "Nexus Releases" at "http://nexus/nexus/content/groups/public"
val efgfpNexusSnaphotsRepository = "Nexus Snapshots" at "http://nexus/nexus/content/groups/public-snapshots"
// -------------------------------------------------------------------------------------------------------------------
// Versions