Reformatting

This commit is contained in:
Jonas Bonér 2010-08-10 21:41:04 +02:00
parent 2090761cb1
commit 8c8a2b03c1
9 changed files with 119 additions and 98 deletions

View file

@ -19,41 +19,43 @@ import se.scalablesolutions.akka.util.Logging
*/
object AMQP {
case class ConnectionParameters(
host: String = ConnectionFactory.DEFAULT_HOST,
port: Int = ConnectionFactory.DEFAULT_AMQP_PORT,
username: String = ConnectionFactory.DEFAULT_USER,
password: String = ConnectionFactory.DEFAULT_PASS,
virtualHost: String = ConnectionFactory.DEFAULT_VHOST,
initReconnectDelay: Long = 5000,
connectionCallback: Option[ActorRef] = None)
host: String = ConnectionFactory.DEFAULT_HOST,
port: Int = ConnectionFactory.DEFAULT_AMQP_PORT,
username: String = ConnectionFactory.DEFAULT_USER,
password: String = ConnectionFactory.DEFAULT_PASS,
virtualHost: String = ConnectionFactory.DEFAULT_VHOST,
initReconnectDelay: Long = 5000,
connectionCallback: Option[ActorRef] = None)
case class ChannelParameters(
shutdownListener: Option[ShutdownListener] = None,
channelCallback: Option[ActorRef] = None)
shutdownListener: Option[ShutdownListener] = None,
channelCallback: Option[ActorRef] = None)
case class ExchangeParameters(
exchangeName: String,
exchangeType: ExchangeType,
exchangeDurable: Boolean = false,
exchangeAutoDelete: Boolean = true,
exchangePassive: Boolean = false,
configurationArguments: Map[String, AnyRef] = Map())
exchangeName: String,
exchangeType: ExchangeType,
exchangeDurable: Boolean = false,
exchangeAutoDelete: Boolean = true,
exchangePassive: Boolean = false,
configurationArguments: Map[String, AnyRef] = Map())
case class ProducerParameters(exchangeParameters: ExchangeParameters,
producerId: Option[String] = None,
returnListener: Option[ReturnListener] = None,
channelParameters: Option[ChannelParameters] = None)
case class ProducerParameters(
exchangeParameters: ExchangeParameters,
producerId: Option[String] = None,
returnListener: Option[ReturnListener] = None,
channelParameters: Option[ChannelParameters] = None)
case class ConsumerParameters(exchangeParameters: ExchangeParameters,
routingKey: String,
deliveryHandler: ActorRef,
queueName: Option[String] = None,
queueDurable: Boolean = false,
queueAutoDelete: Boolean = true,
queuePassive: Boolean = false,
queueExclusive: Boolean = false,
selfAcknowledging: Boolean = true,
channelParameters: Option[ChannelParameters] = None) {
case class ConsumerParameters(
exchangeParameters: ExchangeParameters,
routingKey: String,
deliveryHandler: ActorRef,
queueName: Option[String] = None,
queueDurable: Boolean = false,
queueAutoDelete: Boolean = true,
queuePassive: Boolean = false,
queueExclusive: Boolean = false,
selfAcknowledging: Boolean = true,
channelParameters: Option[ChannelParameters] = None) {
if (queueDurable && queueName.isEmpty) {
throw new IllegalArgumentException("A queue name is required when requesting a durable queue.")
}
@ -80,24 +82,26 @@ object AMQP {
consumer
}
def newRpcClient[O,I](connection: ActorRef,
exchangeParameters: ExchangeParameters,
routingKey: String,
serializer: RpcClientSerializer[O,I],
channelParameters: Option[ChannelParameters] = None): ActorRef = {
def newRpcClient[O,I](
connection: ActorRef,
exchangeParameters: ExchangeParameters,
routingKey: String,
serializer: RpcClientSerializer[O,I],
channelParameters: Option[ChannelParameters] = None): ActorRef = {
val rpcActor: ActorRef = actorOf(new RpcClientActor[O,I](exchangeParameters, routingKey, serializer, channelParameters))
connection.startLink(rpcActor)
rpcActor ! Start
rpcActor
}
def newRpcServer[I,O](connection: ActorRef,
exchangeParameters: ExchangeParameters,
routingKey: String,
serializer: RpcServerSerializer[I,O],
requestHandler: I => O,
queueName: Option[String] = None,
channelParameters: Option[ChannelParameters] = None) = {
def newRpcServer[I,O](
connection: ActorRef,
exchangeParameters: ExchangeParameters,
routingKey: String,
serializer: RpcServerSerializer[I,O],
requestHandler: I => O,
queueName: Option[String] = None,
channelParameters: Option[ChannelParameters] = None) = {
val producer = newProducer(connection, new ProducerParameters(new ExchangeParameters("", ExchangeType.Direct), channelParameters = channelParameters))
val rpcServer = actorOf(new RpcServerActor[I,O](producer, serializer, requestHandler))
val consumer = newConsumer(connection, new ConsumerParameters(exchangeParameters, routingKey, rpcServer

View file

@ -11,19 +11,19 @@ import com.rabbitmq.client.ShutdownSignalException
sealed trait AMQPMessage
sealed trait InternalAMQPMessage extends AMQPMessage
case class Message(payload: Array[Byte],
routingKey: String,
mandatory: Boolean = false,
immediate: Boolean = false,
properties: Option[BasicProperties] = None) extends AMQPMessage
case class Delivery(payload: Array[Byte],
routingKey: String,
deliveryTag: Long,
properties: BasicProperties,
sender: Option[ActorRef]) extends AMQPMessage
case class Message(
payload: Array[Byte],
routingKey: String,
mandatory: Boolean = false,
immediate: Boolean = false,
properties: Option[BasicProperties] = None) extends AMQPMessage
case class Delivery(
payload: Array[Byte],
routingKey: String,
deliveryTag: Long,
properties: BasicProperties,
sender: Option[ActorRef]) extends AMQPMessage
// connection messages
case object Connect extends AMQPMessage
@ -51,10 +51,10 @@ private[akka] case class ConnectionShutdown(cause: ShutdownSignalException) exte
private[akka] case class ChannelShutdown(cause: ShutdownSignalException) extends InternalAMQPMessage
private[akka] class MessageNotDeliveredException(
val message: String,
val replyCode: Int,
val replyText: String,
val exchange: String,
val routingKey: String,
val properties: BasicProperties,
val body: Array[Byte]) extends RuntimeException(message)
val message: String,
val replyCode: Int,
val replyText: String,
val exchange: String,
val routingKey: String,
val properties: BasicProperties,
val body: Array[Byte]) extends RuntimeException(message)

View file

@ -13,7 +13,8 @@ import com.rabbitmq.client.AMQP.BasicProperties
import java.lang.Throwable
private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters)
extends FaultTolerantChannelActor(consumerParameters.exchangeParameters, consumerParameters.channelParameters) {
extends FaultTolerantChannelActor(
consumerParameters.exchangeParameters, consumerParameters.channelParameters) {
import consumerParameters._
import exchangeParameters._
@ -37,7 +38,9 @@ private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters)
if (queuePassive) {
ch.queueDeclarePassive(name)
} else {
ch.queueDeclare(name, queueDurable, queueExclusive, queueAutoDelete, JavaConversions.asMap(configurationArguments))
ch.queueDeclare(
name, queueDurable, queueExclusive, queueAutoDelete,
JavaConversions.asMap(configurationArguments))
}
case None =>
log.debug("Declaring new generated queue for %s", toString)
@ -85,7 +88,6 @@ private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters)
throw new IllegalArgumentException(errorMessage)
}
override def preRestart(reason: Throwable) = {
listenerTag = None
super.preRestart(reason)
@ -97,11 +99,11 @@ private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters)
super.shutdown
}
override def toString(): String =
override def toString =
"AMQP.Consumer[id= "+ self.id +
", exchange=" + exchangeName +
", exchangeType=" + exchangeType +
", durable=" + exchangeDurable +
", autoDelete=" + exchangeAutoDelete + "]"
", exchange=" + exchangeName +
", exchangeType=" + exchangeType +
", durable=" + exchangeDurable +
", autoDelete=" + exchangeAutoDelete + "]"
}

View file

@ -60,7 +60,6 @@ private[amqp] class FaultTolerantConnectionActor(connectionParameters: Connectio
}
private def connect = if (connection.isEmpty || !connection.get.isOpen) {
try {
connection = Some(connectionFactory.newConnection)
connection.foreach {
@ -118,5 +117,4 @@ private[amqp] class FaultTolerantConnectionActor(connectionParameters: Connectio
notifyCallback(Reconnecting)
connect
}
}

View file

@ -8,7 +8,8 @@ import com.rabbitmq.client._
import se.scalablesolutions.akka.amqp.AMQP.ProducerParameters
private[amqp] class ProducerActor(producerParameters: ProducerParameters)
extends FaultTolerantChannelActor(producerParameters.exchangeParameters, producerParameters.channelParameters) {
extends FaultTolerantChannelActor(
producerParameters.exchangeParameters, producerParameters.channelParameters) {
import producerParameters._
import exchangeParameters._
@ -32,29 +33,29 @@ private[amqp] class ProducerActor(producerParameters: ProducerParameters)
case Some(listener) => ch.setReturnListener(listener)
case None => ch.setReturnListener(new ReturnListener() {
def handleBasicReturn(
replyCode: Int,
replyText: String,
exchange: String,
routingKey: String,
properties: com.rabbitmq.client.AMQP.BasicProperties,
body: Array[Byte]) = {
replyCode: Int,
replyText: String,
exchange: String,
routingKey: String,
properties: com.rabbitmq.client.AMQP.BasicProperties,
body: Array[Byte]) = {
throw new MessageNotDeliveredException(
"Could not deliver message [" + body +
"] with reply code [" + replyCode +
"] with reply text [" + replyText +
"] and routing key [" + routingKey +
"] to exchange [" + exchange + "]",
"] with reply code [" + replyCode +
"] with reply text [" + replyText +
"] and routing key [" + routingKey +
"] to exchange [" + exchange + "]",
replyCode, replyText, exchange, routingKey, properties, body)
}
})
}
}
override def toString(): String =
override def toString =
"AMQP.Poducer[id= "+ self.id +
", exchange=" + exchangeName +
", exchangeType=" + exchangeType +
", durable=" + exchangeDurable +
", autoDelete=" + exchangeAutoDelete + "]"
", exchange=" + exchangeName +
", exchangeType=" + exchangeType +
", durable=" + exchangeDurable +
", autoDelete=" + exchangeAutoDelete + "]"
}

View file

@ -10,10 +10,12 @@ import se.scalablesolutions.akka.amqp.AMQP.{ChannelParameters, ExchangeParameter
import com.rabbitmq.client.{Channel, RpcClient}
import se.scalablesolutions.akka.amqp.AMQP.{RpcClientSerializer, ChannelParameters, ExchangeParameters}
class RpcClientActor[I,O](exchangeParameters: ExchangeParameters,
routingKey: String,
serializer: RpcClientSerializer[I,O],
channelParameters: Option[ChannelParameters] = None) extends FaultTolerantChannelActor(exchangeParameters, channelParameters) {
class RpcClientActor[I,O](
exchangeParameters: ExchangeParameters,
routingKey: String,
serializer: RpcClientSerializer[I,O],
channelParameters: Option[ChannelParameters] = None)
extends FaultTolerantChannelActor(exchangeParameters, channelParameters) {
import exchangeParameters._

View file

@ -8,7 +8,10 @@ import se.scalablesolutions.akka.actor.{ActorRef, Actor}
import com.rabbitmq.client.AMQP.BasicProperties
import se.scalablesolutions.akka.amqp.AMQP.RpcServerSerializer
class RpcServerActor[I,O](producer: ActorRef, serializer: RpcServerSerializer[I,O], requestHandler: I => O) extends Actor {
class RpcServerActor[I,O](
producer: ActorRef,
serializer: RpcServerSerializer[I,O],
requestHandler: I => O) extends Actor {
log.info("%s started", this)
@ -29,6 +32,5 @@ class RpcServerActor[I,O](producer: ActorRef, serializer: RpcServerSerializer[I,
case Acknowledged(tag) => log.debug("%s acknowledged delivery with tag %d", this, tag)
}
override def toString(): String =
"AMQP.RpcServer[]"
override def toString = "AMQP.RpcServer[]"
}

View file

@ -42,6 +42,8 @@ import se.scalablesolutions.akka.config.Config.config
object Dispatchers {
val THROUGHPUT = config.getInt("akka.actor.throughput", 5)
object globalHawtDispatcher extends HawtDispatcher
object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global") {
override def register(actor: ActorRef) = {
if (isShutdown) init
@ -50,8 +52,18 @@ object Dispatchers {
}
object globalReactorBasedSingleThreadEventDrivenDispatcher extends ReactorBasedSingleThreadEventDrivenDispatcher("global")
object globalReactorBasedThreadPoolEventDrivenDispatcher extends ReactorBasedThreadPoolEventDrivenDispatcher("global")
/**
* Creates an event-driven dispatcher based on the excellent HawtDispatch library.
* <p/>
* Can be beneficial to use the <code>HawtDispatcher.pin(self)</code> to "pin" an actor to a specific thread.
* <p/>
* See the ScalaDoc for the {@link se.scalablesolutions.akka.dispatch.HawtDispatcher} for details.
*/
def newHawtDispatcher(aggregate: Boolean) = new HawtDispatcher(aggregate)
/**
* Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
* <p/>

View file

@ -66,13 +66,13 @@ akka {
#You can either use java command-line options or use the settings below
#key-store-type = "pkcs12" #Same as -Djavax.net.ssl.keyStoreType=pkcs12
#key-store-type = "pkcs12" #Same as -Djavax.net.ssl.keyStoreType=pkcs12
#key-store = "yourcertificate.p12" #Same as -Djavax.net.ssl.keyStore=yourcertificate.p12
#key-store-pass = "$PASS" #Same as -Djavax.net.ssl.keyStorePassword=$PASS
#key-store-pass = "$PASS" #Same as -Djavax.net.ssl.keyStorePassword=$PASS
#trust-store-type = "jks" #Same as -Djavax.net.ssl.trustStoreType=jks
#trust-store = "your.keystore" #Same as -Djavax.net.ssl.trustStore=your.keystore
#trust-store-pass = "$PASS" #-Djavax.net.ssl.trustStorePassword=$PASS
#trust-store-type = "jks" #Same as -Djavax.net.ssl.trustStoreType=jks
#trust-store = "your.keystore" #Same as -Djavax.net.ssl.trustStore=your.keystore
#trust-store-pass = "$PASS" #Same as -Djavax.net.ssl.trustStorePassword=$PASS
#This can be useful for debugging
debug = off #if on, very verbose debug, same as -Djavax.net.debug=ssl