From 8c8a2b03c18a7073cc79efd99ba62c9dac893c6a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Tue, 10 Aug 2010 21:41:04 +0200 Subject: [PATCH] Reformatting --- akka-amqp/src/main/scala/AMQP.scala | 86 ++++++++++--------- akka-amqp/src/main/scala/AMQPMessage.scala | 38 ++++---- akka-amqp/src/main/scala/ConsumerActor.scala | 18 ++-- .../scala/FaultTolerantConnectionActor.scala | 2 - akka-amqp/src/main/scala/ProducerActor.scala | 33 +++---- akka-amqp/src/main/scala/RpcClientActor.scala | 10 ++- akka-amqp/src/main/scala/RpcServerActor.scala | 8 +- .../src/main/scala/dispatch/Dispatchers.scala | 12 +++ config/akka-reference.conf | 10 +-- 9 files changed, 119 insertions(+), 98 deletions(-) diff --git a/akka-amqp/src/main/scala/AMQP.scala b/akka-amqp/src/main/scala/AMQP.scala index b1e08ae752..9b9c4bb206 100644 --- a/akka-amqp/src/main/scala/AMQP.scala +++ b/akka-amqp/src/main/scala/AMQP.scala @@ -19,41 +19,43 @@ import se.scalablesolutions.akka.util.Logging */ object AMQP { case class ConnectionParameters( - host: String = ConnectionFactory.DEFAULT_HOST, - port: Int = ConnectionFactory.DEFAULT_AMQP_PORT, - username: String = ConnectionFactory.DEFAULT_USER, - password: String = ConnectionFactory.DEFAULT_PASS, - virtualHost: String = ConnectionFactory.DEFAULT_VHOST, - initReconnectDelay: Long = 5000, - connectionCallback: Option[ActorRef] = None) + host: String = ConnectionFactory.DEFAULT_HOST, + port: Int = ConnectionFactory.DEFAULT_AMQP_PORT, + username: String = ConnectionFactory.DEFAULT_USER, + password: String = ConnectionFactory.DEFAULT_PASS, + virtualHost: String = ConnectionFactory.DEFAULT_VHOST, + initReconnectDelay: Long = 5000, + connectionCallback: Option[ActorRef] = None) case class ChannelParameters( - shutdownListener: Option[ShutdownListener] = None, - channelCallback: Option[ActorRef] = None) + shutdownListener: Option[ShutdownListener] = None, + channelCallback: Option[ActorRef] = None) case class ExchangeParameters( - exchangeName: String, - exchangeType: ExchangeType, - exchangeDurable: Boolean = false, - exchangeAutoDelete: Boolean = true, - exchangePassive: Boolean = false, - configurationArguments: Map[String, AnyRef] = Map()) + exchangeName: String, + exchangeType: ExchangeType, + exchangeDurable: Boolean = false, + exchangeAutoDelete: Boolean = true, + exchangePassive: Boolean = false, + configurationArguments: Map[String, AnyRef] = Map()) - case class ProducerParameters(exchangeParameters: ExchangeParameters, - producerId: Option[String] = None, - returnListener: Option[ReturnListener] = None, - channelParameters: Option[ChannelParameters] = None) + case class ProducerParameters( + exchangeParameters: ExchangeParameters, + producerId: Option[String] = None, + returnListener: Option[ReturnListener] = None, + channelParameters: Option[ChannelParameters] = None) - case class ConsumerParameters(exchangeParameters: ExchangeParameters, - routingKey: String, - deliveryHandler: ActorRef, - queueName: Option[String] = None, - queueDurable: Boolean = false, - queueAutoDelete: Boolean = true, - queuePassive: Boolean = false, - queueExclusive: Boolean = false, - selfAcknowledging: Boolean = true, - channelParameters: Option[ChannelParameters] = None) { + case class ConsumerParameters( + exchangeParameters: ExchangeParameters, + routingKey: String, + deliveryHandler: ActorRef, + queueName: Option[String] = None, + queueDurable: Boolean = false, + queueAutoDelete: Boolean = true, + queuePassive: Boolean = false, + queueExclusive: Boolean = false, + selfAcknowledging: Boolean = true, + channelParameters: Option[ChannelParameters] = None) { if (queueDurable && queueName.isEmpty) { throw new IllegalArgumentException("A queue name is required when requesting a durable queue.") } @@ -80,24 +82,26 @@ object AMQP { consumer } - def newRpcClient[O,I](connection: ActorRef, - exchangeParameters: ExchangeParameters, - routingKey: String, - serializer: RpcClientSerializer[O,I], - channelParameters: Option[ChannelParameters] = None): ActorRef = { + def newRpcClient[O,I]( + connection: ActorRef, + exchangeParameters: ExchangeParameters, + routingKey: String, + serializer: RpcClientSerializer[O,I], + channelParameters: Option[ChannelParameters] = None): ActorRef = { val rpcActor: ActorRef = actorOf(new RpcClientActor[O,I](exchangeParameters, routingKey, serializer, channelParameters)) connection.startLink(rpcActor) rpcActor ! Start rpcActor } - def newRpcServer[I,O](connection: ActorRef, - exchangeParameters: ExchangeParameters, - routingKey: String, - serializer: RpcServerSerializer[I,O], - requestHandler: I => O, - queueName: Option[String] = None, - channelParameters: Option[ChannelParameters] = None) = { + def newRpcServer[I,O]( + connection: ActorRef, + exchangeParameters: ExchangeParameters, + routingKey: String, + serializer: RpcServerSerializer[I,O], + requestHandler: I => O, + queueName: Option[String] = None, + channelParameters: Option[ChannelParameters] = None) = { val producer = newProducer(connection, new ProducerParameters(new ExchangeParameters("", ExchangeType.Direct), channelParameters = channelParameters)) val rpcServer = actorOf(new RpcServerActor[I,O](producer, serializer, requestHandler)) val consumer = newConsumer(connection, new ConsumerParameters(exchangeParameters, routingKey, rpcServer diff --git a/akka-amqp/src/main/scala/AMQPMessage.scala b/akka-amqp/src/main/scala/AMQPMessage.scala index bf2461723f..92cd95906a 100644 --- a/akka-amqp/src/main/scala/AMQPMessage.scala +++ b/akka-amqp/src/main/scala/AMQPMessage.scala @@ -11,19 +11,19 @@ import com.rabbitmq.client.ShutdownSignalException sealed trait AMQPMessage sealed trait InternalAMQPMessage extends AMQPMessage -case class Message(payload: Array[Byte], - routingKey: String, - mandatory: Boolean = false, - immediate: Boolean = false, - properties: Option[BasicProperties] = None) extends AMQPMessage - -case class Delivery(payload: Array[Byte], - routingKey: String, - deliveryTag: Long, - properties: BasicProperties, - sender: Option[ActorRef]) extends AMQPMessage - +case class Message( + payload: Array[Byte], + routingKey: String, + mandatory: Boolean = false, + immediate: Boolean = false, + properties: Option[BasicProperties] = None) extends AMQPMessage +case class Delivery( + payload: Array[Byte], + routingKey: String, + deliveryTag: Long, + properties: BasicProperties, + sender: Option[ActorRef]) extends AMQPMessage // connection messages case object Connect extends AMQPMessage @@ -51,10 +51,10 @@ private[akka] case class ConnectionShutdown(cause: ShutdownSignalException) exte private[akka] case class ChannelShutdown(cause: ShutdownSignalException) extends InternalAMQPMessage private[akka] class MessageNotDeliveredException( - val message: String, - val replyCode: Int, - val replyText: String, - val exchange: String, - val routingKey: String, - val properties: BasicProperties, - val body: Array[Byte]) extends RuntimeException(message) + val message: String, + val replyCode: Int, + val replyText: String, + val exchange: String, + val routingKey: String, + val properties: BasicProperties, + val body: Array[Byte]) extends RuntimeException(message) diff --git a/akka-amqp/src/main/scala/ConsumerActor.scala b/akka-amqp/src/main/scala/ConsumerActor.scala index d394e9d997..07dbd437ba 100644 --- a/akka-amqp/src/main/scala/ConsumerActor.scala +++ b/akka-amqp/src/main/scala/ConsumerActor.scala @@ -13,7 +13,8 @@ import com.rabbitmq.client.AMQP.BasicProperties import java.lang.Throwable private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters) - extends FaultTolerantChannelActor(consumerParameters.exchangeParameters, consumerParameters.channelParameters) { + extends FaultTolerantChannelActor( + consumerParameters.exchangeParameters, consumerParameters.channelParameters) { import consumerParameters._ import exchangeParameters._ @@ -37,7 +38,9 @@ private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters) if (queuePassive) { ch.queueDeclarePassive(name) } else { - ch.queueDeclare(name, queueDurable, queueExclusive, queueAutoDelete, JavaConversions.asMap(configurationArguments)) + ch.queueDeclare( + name, queueDurable, queueExclusive, queueAutoDelete, + JavaConversions.asMap(configurationArguments)) } case None => log.debug("Declaring new generated queue for %s", toString) @@ -85,7 +88,6 @@ private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters) throw new IllegalArgumentException(errorMessage) } - override def preRestart(reason: Throwable) = { listenerTag = None super.preRestart(reason) @@ -97,11 +99,11 @@ private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters) super.shutdown } - override def toString(): String = + override def toString = "AMQP.Consumer[id= "+ self.id + - ", exchange=" + exchangeName + - ", exchangeType=" + exchangeType + - ", durable=" + exchangeDurable + - ", autoDelete=" + exchangeAutoDelete + "]" + ", exchange=" + exchangeName + + ", exchangeType=" + exchangeType + + ", durable=" + exchangeDurable + + ", autoDelete=" + exchangeAutoDelete + "]" } diff --git a/akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala b/akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala index 5f0a49910e..97c3074700 100644 --- a/akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala +++ b/akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala @@ -60,7 +60,6 @@ private[amqp] class FaultTolerantConnectionActor(connectionParameters: Connectio } private def connect = if (connection.isEmpty || !connection.get.isOpen) { - try { connection = Some(connectionFactory.newConnection) connection.foreach { @@ -118,5 +117,4 @@ private[amqp] class FaultTolerantConnectionActor(connectionParameters: Connectio notifyCallback(Reconnecting) connect } - } diff --git a/akka-amqp/src/main/scala/ProducerActor.scala b/akka-amqp/src/main/scala/ProducerActor.scala index 48a6be0a94..1d125762ae 100644 --- a/akka-amqp/src/main/scala/ProducerActor.scala +++ b/akka-amqp/src/main/scala/ProducerActor.scala @@ -8,7 +8,8 @@ import com.rabbitmq.client._ import se.scalablesolutions.akka.amqp.AMQP.ProducerParameters private[amqp] class ProducerActor(producerParameters: ProducerParameters) - extends FaultTolerantChannelActor(producerParameters.exchangeParameters, producerParameters.channelParameters) { + extends FaultTolerantChannelActor( + producerParameters.exchangeParameters, producerParameters.channelParameters) { import producerParameters._ import exchangeParameters._ @@ -32,29 +33,29 @@ private[amqp] class ProducerActor(producerParameters: ProducerParameters) case Some(listener) => ch.setReturnListener(listener) case None => ch.setReturnListener(new ReturnListener() { def handleBasicReturn( - replyCode: Int, - replyText: String, - exchange: String, - routingKey: String, - properties: com.rabbitmq.client.AMQP.BasicProperties, - body: Array[Byte]) = { + replyCode: Int, + replyText: String, + exchange: String, + routingKey: String, + properties: com.rabbitmq.client.AMQP.BasicProperties, + body: Array[Byte]) = { throw new MessageNotDeliveredException( "Could not deliver message [" + body + - "] with reply code [" + replyCode + - "] with reply text [" + replyText + - "] and routing key [" + routingKey + - "] to exchange [" + exchange + "]", + "] with reply code [" + replyCode + + "] with reply text [" + replyText + + "] and routing key [" + routingKey + + "] to exchange [" + exchange + "]", replyCode, replyText, exchange, routingKey, properties, body) } }) } } - override def toString(): String = + override def toString = "AMQP.Poducer[id= "+ self.id + - ", exchange=" + exchangeName + - ", exchangeType=" + exchangeType + - ", durable=" + exchangeDurable + - ", autoDelete=" + exchangeAutoDelete + "]" + ", exchange=" + exchangeName + + ", exchangeType=" + exchangeType + + ", durable=" + exchangeDurable + + ", autoDelete=" + exchangeAutoDelete + "]" } diff --git a/akka-amqp/src/main/scala/RpcClientActor.scala b/akka-amqp/src/main/scala/RpcClientActor.scala index 2935982a67..a3583d0ad7 100644 --- a/akka-amqp/src/main/scala/RpcClientActor.scala +++ b/akka-amqp/src/main/scala/RpcClientActor.scala @@ -10,10 +10,12 @@ import se.scalablesolutions.akka.amqp.AMQP.{ChannelParameters, ExchangeParameter import com.rabbitmq.client.{Channel, RpcClient} import se.scalablesolutions.akka.amqp.AMQP.{RpcClientSerializer, ChannelParameters, ExchangeParameters} -class RpcClientActor[I,O](exchangeParameters: ExchangeParameters, - routingKey: String, - serializer: RpcClientSerializer[I,O], - channelParameters: Option[ChannelParameters] = None) extends FaultTolerantChannelActor(exchangeParameters, channelParameters) { +class RpcClientActor[I,O]( + exchangeParameters: ExchangeParameters, + routingKey: String, + serializer: RpcClientSerializer[I,O], + channelParameters: Option[ChannelParameters] = None) + extends FaultTolerantChannelActor(exchangeParameters, channelParameters) { import exchangeParameters._ diff --git a/akka-amqp/src/main/scala/RpcServerActor.scala b/akka-amqp/src/main/scala/RpcServerActor.scala index 99d74d9b56..0d6a8d2a89 100644 --- a/akka-amqp/src/main/scala/RpcServerActor.scala +++ b/akka-amqp/src/main/scala/RpcServerActor.scala @@ -8,7 +8,10 @@ import se.scalablesolutions.akka.actor.{ActorRef, Actor} import com.rabbitmq.client.AMQP.BasicProperties import se.scalablesolutions.akka.amqp.AMQP.RpcServerSerializer -class RpcServerActor[I,O](producer: ActorRef, serializer: RpcServerSerializer[I,O], requestHandler: I => O) extends Actor { +class RpcServerActor[I,O]( + producer: ActorRef, + serializer: RpcServerSerializer[I,O], + requestHandler: I => O) extends Actor { log.info("%s started", this) @@ -29,6 +32,5 @@ class RpcServerActor[I,O](producer: ActorRef, serializer: RpcServerSerializer[I, case Acknowledged(tag) => log.debug("%s acknowledged delivery with tag %d", this, tag) } - override def toString(): String = - "AMQP.RpcServer[]" + override def toString = "AMQP.RpcServer[]" } diff --git a/akka-core/src/main/scala/dispatch/Dispatchers.scala b/akka-core/src/main/scala/dispatch/Dispatchers.scala index e938e36e4e..e83d52da8b 100644 --- a/akka-core/src/main/scala/dispatch/Dispatchers.scala +++ b/akka-core/src/main/scala/dispatch/Dispatchers.scala @@ -42,6 +42,8 @@ import se.scalablesolutions.akka.config.Config.config object Dispatchers { val THROUGHPUT = config.getInt("akka.actor.throughput", 5) + object globalHawtDispatcher extends HawtDispatcher + object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global") { override def register(actor: ActorRef) = { if (isShutdown) init @@ -50,8 +52,18 @@ object Dispatchers { } object globalReactorBasedSingleThreadEventDrivenDispatcher extends ReactorBasedSingleThreadEventDrivenDispatcher("global") + object globalReactorBasedThreadPoolEventDrivenDispatcher extends ReactorBasedThreadPoolEventDrivenDispatcher("global") + /** + * Creates an event-driven dispatcher based on the excellent HawtDispatch library. + *

+ * Can be beneficial to use the HawtDispatcher.pin(self) to "pin" an actor to a specific thread. + *

+ * See the ScalaDoc for the {@link se.scalablesolutions.akka.dispatch.HawtDispatcher} for details. + */ + def newHawtDispatcher(aggregate: Boolean) = new HawtDispatcher(aggregate) + /** * Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool. *

diff --git a/config/akka-reference.conf b/config/akka-reference.conf index cda3f01d6d..8c45c38830 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -66,13 +66,13 @@ akka { #You can either use java command-line options or use the settings below - #key-store-type = "pkcs12" #Same as -Djavax.net.ssl.keyStoreType=pkcs12 + #key-store-type = "pkcs12" #Same as -Djavax.net.ssl.keyStoreType=pkcs12 #key-store = "yourcertificate.p12" #Same as -Djavax.net.ssl.keyStore=yourcertificate.p12 - #key-store-pass = "$PASS" #Same as -Djavax.net.ssl.keyStorePassword=$PASS + #key-store-pass = "$PASS" #Same as -Djavax.net.ssl.keyStorePassword=$PASS - #trust-store-type = "jks" #Same as -Djavax.net.ssl.trustStoreType=jks - #trust-store = "your.keystore" #Same as -Djavax.net.ssl.trustStore=your.keystore - #trust-store-pass = "$PASS" #-Djavax.net.ssl.trustStorePassword=$PASS + #trust-store-type = "jks" #Same as -Djavax.net.ssl.trustStoreType=jks + #trust-store = "your.keystore" #Same as -Djavax.net.ssl.trustStore=your.keystore + #trust-store-pass = "$PASS" #Same as -Djavax.net.ssl.trustStorePassword=$PASS #This can be useful for debugging debug = off #if on, very verbose debug, same as -Djavax.net.debug=ssl