diff --git a/akka-amqp/src/main/scala/AMQP.scala b/akka-amqp/src/main/scala/AMQP.scala index b1e08ae752..0d94c7a7dc 100644 --- a/akka-amqp/src/main/scala/AMQP.scala +++ b/akka-amqp/src/main/scala/AMQP.scala @@ -19,41 +19,43 @@ import se.scalablesolutions.akka.util.Logging */ object AMQP { case class ConnectionParameters( - host: String = ConnectionFactory.DEFAULT_HOST, - port: Int = ConnectionFactory.DEFAULT_AMQP_PORT, - username: String = ConnectionFactory.DEFAULT_USER, - password: String = ConnectionFactory.DEFAULT_PASS, - virtualHost: String = ConnectionFactory.DEFAULT_VHOST, - initReconnectDelay: Long = 5000, - connectionCallback: Option[ActorRef] = None) + host: String = ConnectionFactory.DEFAULT_HOST, + port: Int = ConnectionFactory.DEFAULT_AMQP_PORT, + username: String = ConnectionFactory.DEFAULT_USER, + password: String = ConnectionFactory.DEFAULT_PASS, + virtualHost: String = ConnectionFactory.DEFAULT_VHOST, + initReconnectDelay: Long = 5000, + connectionCallback: Option[ActorRef] = None) case class ChannelParameters( - shutdownListener: Option[ShutdownListener] = None, - channelCallback: Option[ActorRef] = None) + shutdownListener: Option[ShutdownListener] = None, + channelCallback: Option[ActorRef] = None) case class ExchangeParameters( - exchangeName: String, - exchangeType: ExchangeType, - exchangeDurable: Boolean = false, - exchangeAutoDelete: Boolean = true, - exchangePassive: Boolean = false, - configurationArguments: Map[String, AnyRef] = Map()) + exchangeName: String, + exchangeType: ExchangeType, + exchangeDurable: Boolean = false, + exchangeAutoDelete: Boolean = true, + exchangePassive: Boolean = false, + configurationArguments: Map[String, AnyRef] = Map()) - case class ProducerParameters(exchangeParameters: ExchangeParameters, - producerId: Option[String] = None, - returnListener: Option[ReturnListener] = None, - channelParameters: Option[ChannelParameters] = None) + case class ProducerParameters( + exchangeParameters: ExchangeParameters, + producerId: Option[String] = None, + returnListener: Option[ReturnListener] = None, + channelParameters: Option[ChannelParameters] = None) - case class ConsumerParameters(exchangeParameters: ExchangeParameters, - routingKey: String, - deliveryHandler: ActorRef, - queueName: Option[String] = None, - queueDurable: Boolean = false, - queueAutoDelete: Boolean = true, - queuePassive: Boolean = false, - queueExclusive: Boolean = false, - selfAcknowledging: Boolean = true, - channelParameters: Option[ChannelParameters] = None) { + case class ConsumerParameters( + exchangeParameters: ExchangeParameters, + routingKey: String, + deliveryHandler: ActorRef, + queueName: Option[String] = None, + queueDurable: Boolean = false, + queueAutoDelete: Boolean = true, + queuePassive: Boolean = false, + queueExclusive: Boolean = false, + selfAcknowledging: Boolean = true, + channelParameters: Option[ChannelParameters] = None) { if (queueDurable && queueName.isEmpty) { throw new IllegalArgumentException("A queue name is required when requesting a durable queue.") } @@ -74,30 +76,33 @@ object AMQP { def newConsumer(connection: ActorRef, consumerParameters: ConsumerParameters): ActorRef = { val consumer: ActorRef = actorOf(new ConsumerActor(consumerParameters)) - consumer.startLink(consumerParameters.deliveryHandler) + val handler = consumerParameters.deliveryHandler + if (handler.supervisor.isEmpty) consumer.startLink(handler) connection.startLink(consumer) consumer ! Start consumer } - def newRpcClient[O,I](connection: ActorRef, - exchangeParameters: ExchangeParameters, - routingKey: String, - serializer: RpcClientSerializer[O,I], - channelParameters: Option[ChannelParameters] = None): ActorRef = { + def newRpcClient[O,I]( + connection: ActorRef, + exchangeParameters: ExchangeParameters, + routingKey: String, + serializer: RpcClientSerializer[O,I], + channelParameters: Option[ChannelParameters] = None): ActorRef = { val rpcActor: ActorRef = actorOf(new RpcClientActor[O,I](exchangeParameters, routingKey, serializer, channelParameters)) connection.startLink(rpcActor) rpcActor ! Start rpcActor } - def newRpcServer[I,O](connection: ActorRef, - exchangeParameters: ExchangeParameters, - routingKey: String, - serializer: RpcServerSerializer[I,O], - requestHandler: I => O, - queueName: Option[String] = None, - channelParameters: Option[ChannelParameters] = None) = { + def newRpcServer[I,O]( + connection: ActorRef, + exchangeParameters: ExchangeParameters, + routingKey: String, + serializer: RpcServerSerializer[I,O], + requestHandler: I => O, + queueName: Option[String] = None, + channelParameters: Option[ChannelParameters] = None) = { val producer = newProducer(connection, new ProducerParameters(new ExchangeParameters("", ExchangeType.Direct), channelParameters = channelParameters)) val rpcServer = actorOf(new RpcServerActor[I,O](producer, serializer, requestHandler)) val consumer = newConsumer(connection, new ConsumerParameters(exchangeParameters, routingKey, rpcServer diff --git a/akka-amqp/src/main/scala/AMQPMessage.scala b/akka-amqp/src/main/scala/AMQPMessage.scala index bf2461723f..92cd95906a 100644 --- a/akka-amqp/src/main/scala/AMQPMessage.scala +++ b/akka-amqp/src/main/scala/AMQPMessage.scala @@ -11,19 +11,19 @@ import com.rabbitmq.client.ShutdownSignalException sealed trait AMQPMessage sealed trait InternalAMQPMessage extends AMQPMessage -case class Message(payload: Array[Byte], - routingKey: String, - mandatory: Boolean = false, - immediate: Boolean = false, - properties: Option[BasicProperties] = None) extends AMQPMessage - -case class Delivery(payload: Array[Byte], - routingKey: String, - deliveryTag: Long, - properties: BasicProperties, - sender: Option[ActorRef]) extends AMQPMessage - +case class Message( + payload: Array[Byte], + routingKey: String, + mandatory: Boolean = false, + immediate: Boolean = false, + properties: Option[BasicProperties] = None) extends AMQPMessage +case class Delivery( + payload: Array[Byte], + routingKey: String, + deliveryTag: Long, + properties: BasicProperties, + sender: Option[ActorRef]) extends AMQPMessage // connection messages case object Connect extends AMQPMessage @@ -51,10 +51,10 @@ private[akka] case class ConnectionShutdown(cause: ShutdownSignalException) exte private[akka] case class ChannelShutdown(cause: ShutdownSignalException) extends InternalAMQPMessage private[akka] class MessageNotDeliveredException( - val message: String, - val replyCode: Int, - val replyText: String, - val exchange: String, - val routingKey: String, - val properties: BasicProperties, - val body: Array[Byte]) extends RuntimeException(message) + val message: String, + val replyCode: Int, + val replyText: String, + val exchange: String, + val routingKey: String, + val properties: BasicProperties, + val body: Array[Byte]) extends RuntimeException(message) diff --git a/akka-amqp/src/main/scala/ConsumerActor.scala b/akka-amqp/src/main/scala/ConsumerActor.scala index d394e9d997..90c8d7deec 100644 --- a/akka-amqp/src/main/scala/ConsumerActor.scala +++ b/akka-amqp/src/main/scala/ConsumerActor.scala @@ -13,7 +13,8 @@ import com.rabbitmq.client.AMQP.BasicProperties import java.lang.Throwable private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters) - extends FaultTolerantChannelActor(consumerParameters.exchangeParameters, consumerParameters.channelParameters) { + extends FaultTolerantChannelActor( + consumerParameters.exchangeParameters, consumerParameters.channelParameters) { import consumerParameters._ import exchangeParameters._ @@ -34,10 +35,11 @@ private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters) queueName match { case Some(name) => log.debug("Declaring new queue [%s] for %s", name, toString) - if (queuePassive) { - ch.queueDeclarePassive(name) - } else { - ch.queueDeclare(name, queueDurable, queueExclusive, queueAutoDelete, JavaConversions.asMap(configurationArguments)) + if (queuePassive) ch.queueDeclarePassive(name) + else { + ch.queueDeclare( + name, queueDurable, queueExclusive, queueAutoDelete, + JavaConversions.asMap(configurationArguments)) } case None => log.debug("Declaring new generated queue for %s", toString) @@ -85,7 +87,6 @@ private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters) throw new IllegalArgumentException(errorMessage) } - override def preRestart(reason: Throwable) = { listenerTag = None super.preRestart(reason) @@ -97,11 +98,11 @@ private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters) super.shutdown } - override def toString(): String = + override def toString = "AMQP.Consumer[id= "+ self.id + - ", exchange=" + exchangeName + - ", exchangeType=" + exchangeType + - ", durable=" + exchangeDurable + - ", autoDelete=" + exchangeAutoDelete + "]" + ", exchange=" + exchangeName + + ", exchangeType=" + exchangeType + + ", durable=" + exchangeDurable + + ", autoDelete=" + exchangeAutoDelete + "]" } diff --git a/akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala b/akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala index 5f0a49910e..97c3074700 100644 --- a/akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala +++ b/akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala @@ -60,7 +60,6 @@ private[amqp] class FaultTolerantConnectionActor(connectionParameters: Connectio } private def connect = if (connection.isEmpty || !connection.get.isOpen) { - try { connection = Some(connectionFactory.newConnection) connection.foreach { @@ -118,5 +117,4 @@ private[amqp] class FaultTolerantConnectionActor(connectionParameters: Connectio notifyCallback(Reconnecting) connect } - } diff --git a/akka-amqp/src/main/scala/ProducerActor.scala b/akka-amqp/src/main/scala/ProducerActor.scala index 48a6be0a94..1d125762ae 100644 --- a/akka-amqp/src/main/scala/ProducerActor.scala +++ b/akka-amqp/src/main/scala/ProducerActor.scala @@ -8,7 +8,8 @@ import com.rabbitmq.client._ import se.scalablesolutions.akka.amqp.AMQP.ProducerParameters private[amqp] class ProducerActor(producerParameters: ProducerParameters) - extends FaultTolerantChannelActor(producerParameters.exchangeParameters, producerParameters.channelParameters) { + extends FaultTolerantChannelActor( + producerParameters.exchangeParameters, producerParameters.channelParameters) { import producerParameters._ import exchangeParameters._ @@ -32,29 +33,29 @@ private[amqp] class ProducerActor(producerParameters: ProducerParameters) case Some(listener) => ch.setReturnListener(listener) case None => ch.setReturnListener(new ReturnListener() { def handleBasicReturn( - replyCode: Int, - replyText: String, - exchange: String, - routingKey: String, - properties: com.rabbitmq.client.AMQP.BasicProperties, - body: Array[Byte]) = { + replyCode: Int, + replyText: String, + exchange: String, + routingKey: String, + properties: com.rabbitmq.client.AMQP.BasicProperties, + body: Array[Byte]) = { throw new MessageNotDeliveredException( "Could not deliver message [" + body + - "] with reply code [" + replyCode + - "] with reply text [" + replyText + - "] and routing key [" + routingKey + - "] to exchange [" + exchange + "]", + "] with reply code [" + replyCode + + "] with reply text [" + replyText + + "] and routing key [" + routingKey + + "] to exchange [" + exchange + "]", replyCode, replyText, exchange, routingKey, properties, body) } }) } } - override def toString(): String = + override def toString = "AMQP.Poducer[id= "+ self.id + - ", exchange=" + exchangeName + - ", exchangeType=" + exchangeType + - ", durable=" + exchangeDurable + - ", autoDelete=" + exchangeAutoDelete + "]" + ", exchange=" + exchangeName + + ", exchangeType=" + exchangeType + + ", durable=" + exchangeDurable + + ", autoDelete=" + exchangeAutoDelete + "]" } diff --git a/akka-amqp/src/main/scala/RpcClientActor.scala b/akka-amqp/src/main/scala/RpcClientActor.scala index 2935982a67..0691e76884 100644 --- a/akka-amqp/src/main/scala/RpcClientActor.scala +++ b/akka-amqp/src/main/scala/RpcClientActor.scala @@ -10,10 +10,12 @@ import se.scalablesolutions.akka.amqp.AMQP.{ChannelParameters, ExchangeParameter import com.rabbitmq.client.{Channel, RpcClient} import se.scalablesolutions.akka.amqp.AMQP.{RpcClientSerializer, ChannelParameters, ExchangeParameters} -class RpcClientActor[I,O](exchangeParameters: ExchangeParameters, - routingKey: String, - serializer: RpcClientSerializer[I,O], - channelParameters: Option[ChannelParameters] = None) extends FaultTolerantChannelActor(exchangeParameters, channelParameters) { +class RpcClientActor[I,O]( + exchangeParameters: ExchangeParameters, + routingKey: String, + serializer: RpcClientSerializer[I,O], + channelParameters: Option[ChannelParameters] = None) + extends FaultTolerantChannelActor(exchangeParameters, channelParameters) { import exchangeParameters._ diff --git a/akka-amqp/src/main/scala/RpcServerActor.scala b/akka-amqp/src/main/scala/RpcServerActor.scala index 99d74d9b56..309c7fa40c 100644 --- a/akka-amqp/src/main/scala/RpcServerActor.scala +++ b/akka-amqp/src/main/scala/RpcServerActor.scala @@ -8,7 +8,10 @@ import se.scalablesolutions.akka.actor.{ActorRef, Actor} import com.rabbitmq.client.AMQP.BasicProperties import se.scalablesolutions.akka.amqp.AMQP.RpcServerSerializer -class RpcServerActor[I,O](producer: ActorRef, serializer: RpcServerSerializer[I,O], requestHandler: I => O) extends Actor { +class RpcServerActor[I,O]( + producer: ActorRef, + serializer: RpcServerSerializer[I,O], + requestHandler: I => O) extends Actor { log.info("%s started", this) @@ -29,6 +32,5 @@ class RpcServerActor[I,O](producer: ActorRef, serializer: RpcServerSerializer[I, case Acknowledged(tag) => log.debug("%s acknowledged delivery with tag %d", this, tag) } - override def toString(): String = - "AMQP.RpcServer[]" + override def toString = "AMQP.RpcServer[]" } diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index a5b0329d24..11451fb1a6 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -97,7 +97,7 @@ object Actor extends Logging { type Receive = PartialFunction[Any, Unit] private[actor] val actorRefInCreation = new scala.util.DynamicVariable[Option[ActorRef]](None) - + /** * Creates an ActorRef out of the Actor with type T. *
diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala
index 333b21bd9e..c4ab911d86 100644
--- a/akka-core/src/main/scala/actor/ActorRef.scala
+++ b/akka-core/src/main/scala/actor/ActorRef.scala
@@ -203,7 +203,7 @@ trait ActorRef extends TransactionManagement with java.lang.Comparable[ActorRef]
 
   protected[akka] def currentMessage_=(msg: Option[MessageInvocation]) = guard.withGuard { _currentMessage = msg }
   protected[akka] def currentMessage = guard.withGuard { _currentMessage }
-  
+
   /** comparison only takes uuid into account
    */
   def compareTo(other: ActorRef) = this.uuid.compareTo(other.uuid)
@@ -1260,8 +1260,8 @@ class LocalActorRef private[akka](
  * @author Jonas Bonér
  */
 private[akka] case class RemoteActorRef private[akka] (
-//  uuid: String, className: String, hostname: String, port: Int, timeOut: Long, isOnRemoteHost: Boolean) extends ActorRef {
   uuuid: String, val className: String, val hostname: String, val port: Int, _timeout: Long, loader: Option[ClassLoader])
+  //  uuid: String, className: String, hostname: String, port: Int, timeOut: Long, isOnRemoteHost: Boolean) extends ActorRef {
   extends ActorRef {
   _uuid = uuuid
   timeout = _timeout
diff --git a/akka-core/src/main/scala/actor/ActorRegistry.scala b/akka-core/src/main/scala/actor/ActorRegistry.scala
index 532ead6754..1e097f9034 100644
--- a/akka-core/src/main/scala/actor/ActorRegistry.scala
+++ b/akka-core/src/main/scala/actor/ActorRegistry.scala
@@ -49,13 +49,13 @@ object ActorRegistry extends ListenerManagement {
 
   /**
    * Invokes the function on all known actors until it returns Some
-   * Returns None if the function never returns Some 
+   * Returns None if the function never returns Some
    */
   def find[T](f: (ActorRef) => Option[T]) : Option[T] = {
     val elements = actorsByUUID.elements
     while (elements.hasMoreElements) {
       val result = f(elements.nextElement)
-      
+
       if(result.isDefined)
         return result
     }
@@ -134,7 +134,7 @@ object ActorRegistry extends ListenerManagement {
       newSet add actor
 
       val oldSet = actorsById.putIfAbsent(id,newSet)
-      
+
       //Parry for two simultaneous putIfAbsent(id,newSet)
       if(oldSet ne null)
         oldSet add actor
diff --git a/akka-core/src/main/scala/actor/BootableActorLoaderService.scala b/akka-core/src/main/scala/actor/BootableActorLoaderService.scala
index 8fa68d189d..dfb8541396 100644
--- a/akka-core/src/main/scala/actor/BootableActorLoaderService.scala
+++ b/akka-core/src/main/scala/actor/BootableActorLoaderService.scala
@@ -83,7 +83,7 @@ trait BootableActorLoaderService extends Bootable with Logging {
     } else Thread.currentThread.getContextClassLoader)
   }
 
-  abstract override def onLoad = {  
+  abstract override def onLoad = {
     applicationLoader.foreach(_ => log.info("Creating /deploy class-loader"))
 
     super.onLoad
diff --git a/akka-core/src/main/scala/actor/UntypedActor.scala b/akka-core/src/main/scala/actor/UntypedActor.scala
index b0c4c2ff04..474c75e6de 100644
--- a/akka-core/src/main/scala/actor/UntypedActor.scala
+++ b/akka-core/src/main/scala/actor/UntypedActor.scala
@@ -14,7 +14,7 @@ import java.net.InetSocketAddress
 import scala.reflect.BeanProperty
 
 /**
- * Subclass this abstract class to create a MDB-style untyped actor. 
+ * Subclass this abstract class to create a MDB-style untyped actor.
  * 

* This class is meant to be used from Java. *

@@ -29,12 +29,12 @@ import scala.reflect.BeanProperty * // Reply to original sender of message using the 'replyUnsafe' method * getContext().replyUnsafe(msg + ":" + getContext().getUuid()); * - * } else if (msg.equals("UseSender") && getContext().getSender().isDefined()) { + * } else if (msg.equals("UseSender") && getContext().getSender().isDefined()) { * // Reply to original sender of message using the sender reference * // also passing along my own refererence (the context) - * getContext().getSender().get().sendOneWay(msg, context); + * getContext().getSender().get().sendOneWay(msg, context); * - * } else if (msg.equals("UseSenderFuture") && getContext().getSenderFuture().isDefined()) { + * } else if (msg.equals("UseSenderFuture") && getContext().getSenderFuture().isDefined()) { * // Reply to original sender of message using the sender future reference * getContext().getSenderFuture().get().completeWithResult(msg); * @@ -51,7 +51,7 @@ import scala.reflect.BeanProperty * } else throw new IllegalArgumentException("Unknown message: " + message); * } else throw new IllegalArgumentException("Unknown message: " + message); * } - * + * * public static void main(String[] args) { * UntypedActorRef actor = UntypedActor.actorOf(SampleUntypedActor.class); * actor.start(); @@ -96,7 +96,7 @@ abstract class RemoteUntypedActor(address: InetSocketAddress) extends UntypedAct /** * Factory object for creating and managing 'UntypedActor's. Meant to be used from Java. *

- * Example on how to create an actor: + * Example on how to create an actor: *

  *   ActorRef actor = UntypedActor.actorOf(MyUntypedActor.class);
  *   actor.start();
@@ -111,11 +111,11 @@ abstract class RemoteUntypedActor(address: InetSocketAddress) extends UntypedAct
  * @author Jonas Bonér
  */
 object UntypedActor {
-  
+
   /**
    * Creates an ActorRef out of the Actor. Allows you to pass in the class for the Actor.
    * 

- * Example in Java: + * Example in Java: *

    *   ActorRef actor = UntypedActor.actorOf(MyUntypedActor.class);
    *   actor.start();
@@ -134,13 +134,13 @@ object UntypedActor {
   }
 
   /**
-   * NOTE: Use this convenience method with care, do NOT make it possible to get a reference to the 
+   * NOTE: Use this convenience method with care, do NOT make it possible to get a reference to the
    * UntypedActor instance directly, but only through its 'UntypedActorRef' wrapper reference.
    * 

- * Creates an ActorRef out of the Actor. Allows you to pass in the instance for the Actor. Only + * Creates an ActorRef out of the Actor. Allows you to pass in the instance for the Actor. Only * use this method when you need to pass in constructor arguments into the 'UntypedActor'. *

- * Example in Java: + * Example in Java: *

    *   ActorRef actor = UntypedActor.actorOf(new MyUntypedActor("service:name", 5));
    *   actor.start();
@@ -156,7 +156,7 @@ object UntypedActor {
 }
 
 /**
- * Use this class if you need to wrap an 'ActorRef' in the more Java-friendly 'UntypedActorRef'.  
+ * Use this class if you need to wrap an 'ActorRef' in the more Java-friendly 'UntypedActorRef'.
  *
  * @author Jonas Bonér
  */
@@ -170,7 +170,7 @@ object UntypedActorRef {
  * @author Jonas Bonér
  */
 class UntypedActorRef(val actorRef: ActorRef) {
-  
+
   /**
    * Returns the uuid for the actor.
    */
@@ -186,14 +186,14 @@ class UntypedActorRef(val actorRef: ActorRef) {
    */
   def setId(id: String) = actorRef.id = id
   def getId(): String = actorRef.id
-  
+
   /**
    * Defines the default timeout for '!!' and '!!!' invocations,
    * e.g. the timeout for the future returned by the call to '!!' and '!!!'.
    */
   def setTimeout(timeout: Long) = actorRef.timeout = timeout
   def getTimeout(): Long = actorRef.timeout
-  
+
   /**
    * Defines the default timeout for an initial receive invocation.
    * When specified, the receive function should be able to handle a 'ReceiveTimeout' message.
@@ -269,7 +269,7 @@ class UntypedActorRef(val actorRef: ActorRef) {
    * Is defined if the message was sent with sent with 'sendRequestReply' or 'sendRequestReplyFuture', else None.
    */
   def getSenderFuture(): Option[CompletableFuture[Any]] = actorRef.senderFuture
-  
+
   /**
    * Starts up the actor and its message queue.
    */
@@ -285,7 +285,7 @@ class UntypedActorRef(val actorRef: ActorRef) {
    * Shuts down the actor its dispatcher and message queue.
    */
   def stop(): Unit = actorRef.stop()
-  
+
   /**
    * Sends a one-way asynchronous message. E.g. fire-and-forget semantics.
    * 

@@ -309,12 +309,12 @@ class UntypedActorRef(val actorRef: ActorRef) { def sendOneWay(message: AnyRef, sender: UntypedActorRef) = if (sender eq null) actorRef.!(message)(None) else actorRef.!(message)(Some(sender.actorRef)) - + /** - * Sends a message asynchronously and waits on a future for a reply message under the hood. The timeout is taken from - * the default timeout in the Actor. + * Sends a message asynchronously and waits on a future for a reply message under the hood. The timeout is taken from + * the default timeout in the Actor. *

- * It waits on the reply either until it receives it or until the timeout expires + * It waits on the reply either until it receives it or until the timeout expires * (which will throw an ActorTimeoutException). E.g. send-and-receive-eventually semantics. *

* NOTE: @@ -324,19 +324,19 @@ class UntypedActorRef(val actorRef: ActorRef) { * If you are sending messages using sendRequestReply then you have to use getContext().reply(..) * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ - def sendRequestReply(message: AnyRef): AnyRef = + def sendRequestReply(message: AnyRef): AnyRef = actorRef.!!(message)(None).getOrElse(throw new ActorTimeoutException( - "Message [" + message + - "]\n\tsent to [" + actorRef.actorClassName + - "]\n\twith timeout [" + actorRef.timeout + + "Message [" + message + + "]\n\tsent to [" + actorRef.actorClassName + + "]\n\twith timeout [" + actorRef.timeout + "]\n\ttimed out.")) .asInstanceOf[AnyRef] /** - * Sends a message asynchronously and waits on a future for a reply message under the hood. The timeout is taken from - * the default timeout in the Actor. + * Sends a message asynchronously and waits on a future for a reply message under the hood. The timeout is taken from + * the default timeout in the Actor. *

- * It waits on the reply either until it receives it or until the timeout expires + * It waits on the reply either until it receives it or until the timeout expires * (which will throw an ActorTimeoutException). E.g. send-and-receive-eventually semantics. *

* NOTE: @@ -350,18 +350,18 @@ class UntypedActorRef(val actorRef: ActorRef) { val result = if (sender eq null) actorRef.!!(message)(None) else actorRef.!!(message)(Some(sender.actorRef)) result.getOrElse(throw new ActorTimeoutException( - "Message [" + message + - "]\n\tsent to [" + actorRef.actorClassName + - "]\n\tfrom [" + sender.actorRef.actorClassName + - "]\n\twith timeout [" + actorRef.timeout + + "Message [" + message + + "]\n\tsent to [" + actorRef.actorClassName + + "]\n\tfrom [" + sender.actorRef.actorClassName + + "]\n\twith timeout [" + actorRef.timeout + "]\n\ttimed out.")) .asInstanceOf[AnyRef] } - + /** * Sends a message asynchronously and waits on a future for a reply message under the hood. *

- * It waits on the reply either until it receives it or until the timeout expires + * It waits on the reply either until it receives it or until the timeout expires * (which will throw an ActorTimeoutException). E.g. send-and-receive-eventually semantics. *

* NOTE: @@ -371,18 +371,18 @@ class UntypedActorRef(val actorRef: ActorRef) { * If you are sending messages using sendRequestReply then you have to use getContext().reply(..) * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ - def sendRequestReply(message: AnyRef, timeout: Long): AnyRef = + def sendRequestReply(message: AnyRef, timeout: Long): AnyRef = actorRef.!!(message, timeout)(None).getOrElse(throw new ActorTimeoutException( - "Message [" + message + - "]\n\tsent to [" + actorRef.actorClassName + - "]\n\twith timeout [" + timeout + - "]\n\ttimed out.")) + "Message [" + message + + "]\n\tsent to [" + actorRef.actorClassName + + "]\n\twith timeout [" + timeout + + "]\n\ttimed out.")) .asInstanceOf[AnyRef] - + /** * Sends a message asynchronously and waits on a future for a reply message under the hood. *

- * It waits on the reply either until it receives it or until the timeout expires + * It waits on the reply either until it receives it or until the timeout expires * (which will throw an ActorTimeoutException). E.g. send-and-receive-eventually semantics. *

* NOTE: @@ -396,17 +396,17 @@ class UntypedActorRef(val actorRef: ActorRef) { val result = if (sender eq null) actorRef.!!(message, timeout)(None) else actorRef.!!(message)(Some(sender.actorRef)) result.getOrElse(throw new ActorTimeoutException( - "Message [" + message + - "]\n\tsent to [" + actorRef.actorClassName + - "]\n\tfrom [" + sender.actorRef.actorClassName + - "]\n\twith timeout [" + timeout + - "]\n\ttimed out.")) + "Message [" + message + + "]\n\tsent to [" + actorRef.actorClassName + + "]\n\tfrom [" + sender.actorRef.actorClassName + + "]\n\twith timeout [" + timeout + + "]\n\ttimed out.")) .asInstanceOf[AnyRef] } - + /** - * Sends a message asynchronously returns a future holding the eventual reply message. The timeout is taken from - * the default timeout in the Actor. + * Sends a message asynchronously returns a future holding the eventual reply message. The timeout is taken from + * the default timeout in the Actor. *

* NOTE: * Use this method with care. In most cases it is better to use 'sendOneWay' together with the 'getContext().getSender()' to @@ -416,10 +416,10 @@ class UntypedActorRef(val actorRef: ActorRef) { * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ def sendRequestReplyFuture(message: AnyRef): Future[_] = actorRef.!!!(message)(None) - + /** - * Sends a message asynchronously returns a future holding the eventual reply message. The timeout is taken from - * the default timeout in the Actor. + * Sends a message asynchronously returns a future holding the eventual reply message. The timeout is taken from + * the default timeout in the Actor. *

* NOTE: * Use this method with care. In most cases it is better to use 'sendOneWay' together with the 'getContext().getSender()' to @@ -428,10 +428,10 @@ class UntypedActorRef(val actorRef: ActorRef) { * If you are sending messages using sendRequestReplyFuture then you have to use getContext().reply(..) * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ - def sendRequestReplyFuture(message: AnyRef, sender: UntypedActorRef): Future[_] = + def sendRequestReplyFuture(message: AnyRef, sender: UntypedActorRef): Future[_] = if (sender eq null) actorRef.!!!(message)(None) else actorRef.!!!(message)(Some(sender.actorRef)) - + /** * Sends a message asynchronously returns a future holding the eventual reply message. *

@@ -443,7 +443,7 @@ class UntypedActorRef(val actorRef: ActorRef) { * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ def sendRequestReplyFuture(message: AnyRef, timeout: Long): Future[_] = actorRef.!!!(message, timeout)(None) - + /** * Sends a message asynchronously returns a future holding the eventual reply message. *

@@ -454,17 +454,17 @@ class UntypedActorRef(val actorRef: ActorRef) { * If you are sending messages using sendRequestReplyFuture then you have to use getContext().reply(..) * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ - def sendRequestReplyFuture(message: AnyRef, timeout: Long, sender: UntypedActorRef): Future[_] = + def sendRequestReplyFuture(message: AnyRef, timeout: Long, sender: UntypedActorRef): Future[_] = if (sender eq null) actorRef.!!!(message, timeout)(None) else actorRef.!!!(message)(Some(sender.actorRef)) - + /** * Forwards the message and passes the original sender actor as the sender. *

* Works with 'sendOneWay', 'sendRequestReply' and 'sendRequestReplyFuture'. */ - def forward(message: AnyRef, sender: UntypedActorRef): Unit = - if (sender eq null) throw new IllegalArgumentException("The 'sender' argument to 'forward' can't be null") + def forward(message: AnyRef, sender: UntypedActorRef): Unit = + if (sender eq null) throw new IllegalArgumentException("The 'sender' argument to 'forward' can't be null") else actorRef.forward(message)(Some(sender.actorRef)) /** @@ -474,7 +474,7 @@ class UntypedActorRef(val actorRef: ActorRef) { * Throws an IllegalStateException if unable to determine what to reply to. */ def replyUnsafe(message: AnyRef): Unit = actorRef.reply(message) - + /** * Use getContext().replySafe(..) to reply with a message to the original sender of the message currently * being processed. @@ -492,7 +492,7 @@ class UntypedActorRef(val actorRef: ActorRef) { * Returns the class name for the Actor instance that is managed by the ActorRef. */ def getActorClassName(): String = actorRef.actorClassName - + /** * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host. */ @@ -538,7 +538,7 @@ class UntypedActorRef(val actorRef: ActorRef) { * Set the home address and port for this actor. */ def setHomeAddress(address: InetSocketAddress): Unit = actorRef.homeAddress = address - + /** * Links an other actor to this actor. Links are unidirectional and means that a the linking actor will * receive a notification if the linked actor has crashed. @@ -562,7 +562,7 @@ class UntypedActorRef(val actorRef: ActorRef) { /** * Atomically start, link and make an actor remote. */ - def startLinkRemote(actor: UntypedActorRef, hostname: String, port: Int): Unit = + def startLinkRemote(actor: UntypedActorRef, hostname: String, port: Int): Unit = actorRef.startLinkRemote(actor.actorRef, hostname, port) /** diff --git a/akka-core/src/main/scala/dispatch/Dispatchers.scala b/akka-core/src/main/scala/dispatch/Dispatchers.scala index e938e36e4e..0d2da205c6 100644 --- a/akka-core/src/main/scala/dispatch/Dispatchers.scala +++ b/akka-core/src/main/scala/dispatch/Dispatchers.scala @@ -42,6 +42,8 @@ import se.scalablesolutions.akka.config.Config.config object Dispatchers { val THROUGHPUT = config.getInt("akka.actor.throughput", 5) + object globalHawtDispatcher extends HawtDispatcher + object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global") { override def register(actor: ActorRef) = { if (isShutdown) init @@ -50,8 +52,18 @@ object Dispatchers { } object globalReactorBasedSingleThreadEventDrivenDispatcher extends ReactorBasedSingleThreadEventDrivenDispatcher("global") + object globalReactorBasedThreadPoolEventDrivenDispatcher extends ReactorBasedThreadPoolEventDrivenDispatcher("global") + /** + * Creates an event-driven dispatcher based on the excellent HawtDispatch library. + *

+ * Can be beneficial to use the HawtDispatcher.pin(self) to "pin" an actor to a specific thread. + *

+ * See the ScalaDoc for the {@link se.scalablesolutions.akka.dispatch.HawtDispatcher} for details. + */ + def newHawtDispatcher(aggregate: Boolean) = new HawtDispatcher(aggregate) + /** * Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool. *

diff --git a/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala b/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala index 5abf431ef8..55755250e2 100644 --- a/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala +++ b/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala @@ -234,7 +234,7 @@ trait ThreadPoolBuilder { extends Thread(runnable, name + "-" + MonitorableThread.created.incrementAndGet) with Logging { setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { - def uncaughtException(thread: Thread, cause: Throwable) = + def uncaughtException(thread: Thread, cause: Throwable) = log.error(cause, "UNCAUGHT in thread [%s]", thread.getName) }) diff --git a/akka-core/src/main/scala/remote/BootableRemoteActorService.scala b/akka-core/src/main/scala/remote/BootableRemoteActorService.scala index 7d6286f24c..643dbd2f33 100644 --- a/akka-core/src/main/scala/remote/BootableRemoteActorService.scala +++ b/akka-core/src/main/scala/remote/BootableRemoteActorService.scala @@ -25,7 +25,7 @@ trait BootableRemoteActorService extends Bootable with Logging { def startRemoteService = remoteServerThread.start - abstract override def onLoad = { + abstract override def onLoad = { if (config.getBool("akka.remote.server.service", true)) { if (config.getBool("akka.remote.cluster.service", true)) Cluster.start(self.applicationLoader) log.info("Initializing Remote Actors Service...") diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala index cc32e96998..e17c7f57ef 100644 --- a/akka-core/src/main/scala/remote/RemoteClient.scala +++ b/akka-core/src/main/scala/remote/RemoteClient.scala @@ -48,44 +48,48 @@ case class RemoteClientError(@BeanProperty val cause: Throwable, @BeanProperty v case class RemoteClientDisconnected(@BeanProperty val host: String, @BeanProperty val port: Int) extends RemoteClientLifeCycleEvent case class RemoteClientConnected(@BeanProperty val host: String, @BeanProperty val port: Int) extends RemoteClientLifeCycleEvent +class RemoteClientException private[akka](message: String) extends RuntimeException(message) + /** + * The RemoteClient object manages RemoteClient instances and gives you an API to lookup remote actor handles. + * * @author Jonas Bonér */ object RemoteClient extends Logging { - val READ_TIMEOUT = Duration(config.getInt("akka.remote.client.read-timeout", 1), TIME_UNIT) + val READ_TIMEOUT = Duration(config.getInt("akka.remote.client.read-timeout", 1), TIME_UNIT) val RECONNECT_DELAY = Duration(config.getInt("akka.remote.client.reconnect-delay", 5), TIME_UNIT) private val remoteClients = new HashMap[String, RemoteClient] - private val remoteActors = new HashMap[RemoteServer.Address, HashSet[String]] + private val remoteActors = new HashMap[RemoteServer.Address, HashSet[String]] // FIXME: simplify overloaded methods when we have Scala 2.8 - def actorFor(className: String, hostname: String, port: Int): ActorRef = - actorFor(className, className, 5000L, hostname, port, None) + def actorFor(classNameOrServiceId: String, hostname: String, port: Int): ActorRef = + actorFor(classNameOrServiceId, classNameOrServiceId, 5000L, hostname, port, None) - def actorFor(className: String, hostname: String, port: Int, loader: ClassLoader): ActorRef = - actorFor(className, className, 5000L, hostname, port, Some(loader)) + def actorFor(classNameOrServiceId: String, hostname: String, port: Int, loader: ClassLoader): ActorRef = + actorFor(classNameOrServiceId, classNameOrServiceId, 5000L, hostname, port, Some(loader)) - def actorFor(uuid: String, className: String, hostname: String, port: Int): ActorRef = - actorFor(uuid, className, 5000L, hostname, port, None) + def actorFor(serviceId: String, className: String, hostname: String, port: Int): ActorRef = + actorFor(serviceId, className, 5000L, hostname, port, None) - def actorFor(uuid: String, className: String, hostname: String, port: Int, loader: ClassLoader): ActorRef = - actorFor(uuid, className, 5000L, hostname, port, Some(loader)) + def actorFor(serviceId: String, className: String, hostname: String, port: Int, loader: ClassLoader): ActorRef = + actorFor(serviceId, className, 5000L, hostname, port, Some(loader)) - def actorFor(className: String, timeout: Long, hostname: String, port: Int): ActorRef = - actorFor(className, className, timeout, hostname, port, None) + def actorFor(classNameOrServiceId: String, timeout: Long, hostname: String, port: Int): ActorRef = + actorFor(classNameOrServiceId, classNameOrServiceId, timeout, hostname, port, None) - def actorFor(className: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): ActorRef = - actorFor(className, className, timeout, hostname, port, Some(loader)) + def actorFor(classNameOrServiceId: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): ActorRef = + actorFor(classNameOrServiceId, classNameOrServiceId, timeout, hostname, port, Some(loader)) - def actorFor(uuid: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef = - RemoteActorRef(uuid, className, hostname, port, timeout, None) + def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef = + RemoteActorRef(serviceId, className, hostname, port, timeout, None) - private[akka] def actorFor(uuid: String, className: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): ActorRef = - RemoteActorRef(uuid, className, hostname, port, timeout, Some(loader)) + private[akka] def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): ActorRef = + RemoteActorRef(serviceId, className, hostname, port, timeout, Some(loader)) - private[akka] def actorFor(uuid: String, className: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): ActorRef = - RemoteActorRef(uuid, className, hostname, port, timeout, loader) + private[akka] def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): ActorRef = + RemoteActorRef(serviceId, className, hostname, port, timeout, loader) def clientFor(hostname: String, port: Int): RemoteClient = clientFor(new InetSocketAddress(hostname, port), None) @@ -158,9 +162,11 @@ object RemoteClient extends Logging { } /** + * RemoteClient represents a connection to a RemoteServer. Is used to send messages to remote actors on the RemoteServer. + * * @author Jonas Bonér */ -class RemoteClient private[akka] (val hostname: String, val port: Int, loader: Option[ClassLoader]) extends Logging { +class RemoteClient private[akka] (val hostname: String, val port: Int, val loader: Option[ClassLoader] = None) extends Logging { val name = "RemoteClient@" + hostname + "::" + port @volatile private[remote] var isRunning = false @@ -226,7 +232,7 @@ class RemoteClient private[akka] (val hostname: String, val port: Int, loader: O } } } else { - val exception = new IllegalStateException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.") + val exception = new RemoteClientException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.") listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(exception, hostname, port)) throw exception } @@ -245,21 +251,22 @@ class RemoteClient private[akka] (val hostname: String, val port: Int, loader: O /** * @author Jonas Bonér */ -class RemoteClientPipelineFactory(name: String, - futures: ConcurrentMap[Long, CompletableFuture[_]], - supervisors: ConcurrentMap[String, ActorRef], - bootstrap: ClientBootstrap, - remoteAddress: SocketAddress, - timer: HashedWheelTimer, - client: RemoteClient) extends ChannelPipelineFactory { +class RemoteClientPipelineFactory( + name: String, + futures: ConcurrentMap[Long, CompletableFuture[_]], + supervisors: ConcurrentMap[String, ActorRef], + bootstrap: ClientBootstrap, + remoteAddress: SocketAddress, + timer: HashedWheelTimer, + client: RemoteClient) extends ChannelPipelineFactory { def getPipeline: ChannelPipeline = { def join(ch: ChannelHandler*) = Array[ChannelHandler](ch:_*) - + val engine = RemoteServerSslContext.client.createSSLEngine() engine.setEnabledCipherSuites(engine.getSupportedCipherSuites) //TODO is this sensible? engine.setUseClientMode(true) - + val ssl = if(RemoteServer.SECURE) join(new SslHandler(engine)) else join() val timeout = new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT.toMillis.toInt) val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4) @@ -272,9 +279,7 @@ class RemoteClientPipelineFactory(name: String, } val remoteClient = new RemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer, client) - val stages = ssl ++ join(timeout) ++ dec ++ join(lenDec, protobufDec) ++ enc ++ join(lenPrep, protobufEnc, remoteClient) - new StaticChannelPipeline(stages: _*) } } @@ -283,13 +288,14 @@ class RemoteClientPipelineFactory(name: String, * @author Jonas Bonér */ @ChannelHandler.Sharable -class RemoteClientHandler(val name: String, - val futures: ConcurrentMap[Long, CompletableFuture[_]], - val supervisors: ConcurrentMap[String, ActorRef], - val bootstrap: ClientBootstrap, - val remoteAddress: SocketAddress, - val timer: HashedWheelTimer, - val client: RemoteClient) +class RemoteClientHandler( + val name: String, + val futures: ConcurrentMap[Long, CompletableFuture[_]], + val supervisors: ConcurrentMap[String, ActorRef], + val bootstrap: ClientBootstrap, + val remoteAddress: SocketAddress, + val timer: HashedWheelTimer, + val client: RemoteClient) extends SimpleChannelUpstreamHandler with Logging { override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = { @@ -318,13 +324,13 @@ class RemoteClientHandler(val name: String, val supervisedActor = supervisors.get(supervisorUuid) if (!supervisedActor.supervisor.isDefined) throw new IllegalActorStateException( "Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed") - else supervisedActor.supervisor.get ! Exit(supervisedActor, parseException(reply)) + else supervisedActor.supervisor.get ! Exit(supervisedActor, parseException(reply, client.loader)) } - future.completeWithException(null, parseException(reply)) + future.completeWithException(null, parseException(reply, client.loader)) } futures.remove(reply.getId) } else { - val exception = new IllegalArgumentException("Unknown message received in remote client handler: " + result) + val exception = new RemoteClientException("Unknown message received in remote client handler: " + result) client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(exception, client.hostname, client.port)) throw exception } @@ -358,24 +364,20 @@ class RemoteClientHandler(val name: String, log.debug("Remote client connected to [%s]", ctx.getChannel.getRemoteAddress) } - if(RemoteServer.SECURE){ - val sslHandler : SslHandler = ctx.getPipeline.get(classOf[SslHandler]) - sslHandler.handshake().addListener( new ChannelFutureListener { - def operationComplete(future : ChannelFuture) : Unit = { - if(future.isSuccess) - connect - //else - //FIXME: What is the correct action here? - } - }) - } else { - connect - } + if (RemoteServer.SECURE) { + val sslHandler: SslHandler = ctx.getPipeline.get(classOf[SslHandler]) + sslHandler.handshake.addListener(new ChannelFutureListener { + def operationComplete(future: ChannelFuture): Unit = { + if (future.isSuccess) connect + else throw new RemoteClientException("Could not establish SSL handshake") + } + }) + } else connect } override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - client.listeners.toArray.foreach(l => - l.asInstanceOf[ActorRef] ! RemoteClientDisconnected(client.hostname, client.port)) + client.listeners.toArray.foreach(listener => + listener.asInstanceOf[ActorRef] ! RemoteClientDisconnected(client.hostname, client.port)) log.debug("Remote client disconnected from [%s]", ctx.getChannel.getRemoteAddress) } @@ -385,9 +387,11 @@ class RemoteClientHandler(val name: String, event.getChannel.close } - private def parseException(reply: RemoteReplyProtocol): Throwable = { + private def parseException(reply: RemoteReplyProtocol, loader: Option[ClassLoader]): Throwable = { val exception = reply.getException - val exceptionClass = Class.forName(exception.getClassname) + val classname = exception.getClassname + val exceptionClass = if (loader.isDefined) loader.get.loadClass(classname) + else Class.forName(classname) exceptionClass .getConstructor(Array[Class[_]](classOf[String]): _*) .newInstance(exception.getMessage).asInstanceOf[Throwable] diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index a37ff7811e..89c0a6e437 100644 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -76,34 +76,31 @@ object RemoteServer { } val SECURE = { - if(config.getBool("akka.remote.ssl.service",false)){ - + if (config.getBool("akka.remote.ssl.service",false)) { val properties = List( - ("key-store-type" ,"keyStoreType"), - ("key-store" ,"keyStore"), - ("key-store-pass" ,"keyStorePassword"), - ("trust-store-type","trustStoreType"), - ("trust-store" ,"trustStore"), - ("trust-store-pass","trustStorePassword") - ).map(x => ("akka.remote.ssl." + x._1,"javax.net.ssl."+x._2)) - - //If property is not set, and we have a value from our akka.conf, use that value - for{ p <- properties if System.getProperty(p._2) eq null - c <- config.getString(p._1) - } System.setProperty(p._2,c) - - if(config.getBool("akka.remote.ssl.debug",false)) - System.setProperty("javax.net.debug","ssl") + ("key-store-type" , "keyStoreType"), + ("key-store" , "keyStore"), + ("key-store-pass" , "keyStorePassword"), + ("trust-store-type", "trustStoreType"), + ("trust-store" , "trustStore"), + ("trust-store-pass", "trustStorePassword") + ).map(x => ("akka.remote.ssl." + x._1, "javax.net.ssl." + x._2)) + // If property is not set, and we have a value from our akka.conf, use that value + for { + p <- properties if System.getProperty(p._2) eq null + c <- config.getString(p._1) + } System.setProperty(p._2, c) + + if (config.getBool("akka.remote.ssl.debug", false)) System.setProperty("javax.net.debug","ssl") true - } - else - false + } else false } object Address { def apply(hostname: String, port: Int) = new Address(hostname, port) } + class Address(val hostname: String, val port: Int) { override def hashCode: Int = { var result = HashCode.SEED @@ -120,7 +117,7 @@ object RemoteServer { } private class RemoteActorSet { - private[RemoteServer] val actors = new ConcurrentHashMap[String, ActorRef] + private[RemoteServer] val actors = new ConcurrentHashMap[String, ActorRef] private[RemoteServer] val typedActors = new ConcurrentHashMap[String, AnyRef] } @@ -307,17 +304,14 @@ class RemoteServer extends Logging { object RemoteServerSslContext { import javax.net.ssl.SSLContext - val (client,server) = { + val (client, server) = { val protocol = "TLS" //val algorithm = Option(Security.getProperty("ssl.KeyManagerFactory.algorithm")).getOrElse("SunX509") //val store = KeyStore.getInstance("JKS") - val s = SSLContext.getInstance(protocol) s.init(null,null,null) - val c = SSLContext.getInstance(protocol) c.init(null,null,null) - (c,s) } } @@ -340,20 +334,18 @@ class RemoteServerPipelineFactory( engine.setEnabledCipherSuites(engine.getSupportedCipherSuites) //TODO is this sensible? engine.setUseClientMode(false) - val ssl = if(RemoteServer.SECURE) join(new SslHandler(engine)) else join() + val ssl = if(RemoteServer.SECURE) join(new SslHandler(engine)) else join() val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4) val lenPrep = new LengthFieldPrepender(4) val protobufDec = new ProtobufDecoder(RemoteRequestProtocol.getDefaultInstance) val protobufEnc = new ProtobufEncoder - val(enc,dec) = RemoteServer.COMPRESSION_SCHEME match { - case "zlib" => (join(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)),join(new ZlibDecoder)) - case _ => (join(),join()) + val (enc,dec) = RemoteServer.COMPRESSION_SCHEME match { + case "zlib" => (join(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder)) + case _ => (join(), join()) } val remoteServer = new RemoteServerHandler(name, openChannels, loader, actors, typedActors) - val stages = ssl ++ dec ++ join(lenDec, protobufDec) ++ enc ++ join(lenPrep, protobufEnc, remoteServer) - new StaticChannelPipeline(stages: _*) } } @@ -379,24 +371,21 @@ class RemoteServerHandler( override def channelOpen(ctx: ChannelHandlerContext, event: ChannelStateEvent) { openChannels.add(ctx.getChannel) } - - override def channelConnected(ctx : ChannelHandlerContext, e : ChannelStateEvent) { - if(RemoteServer.SECURE) { + + override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent) { + if (RemoteServer.SECURE) { val sslHandler : SslHandler = ctx.getPipeline.get(classOf[SslHandler]) - + // Begin handshake. sslHandler.handshake().addListener( new ChannelFutureListener { - def operationComplete(future : ChannelFuture) : Unit = { - if(future.isSuccess) - openChannels.add(future.getChannel) - else - future.getChannel.close + def operationComplete(future: ChannelFuture): Unit = { + if (future.isSuccess) openChannels.add(future.getChannel) + else future.getChannel.close } }) } } - override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = { if (event.isInstanceOf[ChannelStateEvent] && event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) { @@ -499,8 +488,8 @@ class RemoteServerHandler( * Does not start the actor. */ private def createActor(actorInfo: ActorInfoProtocol): ActorRef = { - val name = actorInfo.getTarget val uuid = actorInfo.getUuid + val name = actorInfo.getTarget val timeout = actorInfo.getTimeout val actorRefOrNull = actors get uuid diff --git a/akka-core/src/test/scala/misc/SchedulerSpec.scala b/akka-core/src/test/scala/misc/SchedulerSpec.scala index e1219d708a..510b24dedc 100644 --- a/akka-core/src/test/scala/misc/SchedulerSpec.scala +++ b/akka-core/src/test/scala/misc/SchedulerSpec.scala @@ -44,7 +44,7 @@ class SchedulerSpec extends JUnitSuite { // should still be 1 left assert(countDownLatch.getCount == 1) } - + /** * ticket #372 */ @@ -59,19 +59,19 @@ class SchedulerSpec extends JUnitSuite { assert(ticks.await(10,TimeUnit.SECONDS)) assert(ActorRegistry.actors.length === numActors) } - + /** * ticket #372 */ @Test def schedulerShouldBeCancellable = withCleanEndState { object Ping val ticks = new CountDownLatch(1) - + val actor = actorOf(new Actor { def receive = { case Ping => ticks.countDown } }).start - (1 to 10).foreach { i => + (1 to 10).foreach { i => val future = Scheduler.scheduleOnce(actor,Ping,1,TimeUnit.SECONDS) future.cancel(true) } diff --git a/config/akka-reference.conf b/config/akka-reference.conf index cda3f01d6d..8c45c38830 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -66,13 +66,13 @@ akka { #You can either use java command-line options or use the settings below - #key-store-type = "pkcs12" #Same as -Djavax.net.ssl.keyStoreType=pkcs12 + #key-store-type = "pkcs12" #Same as -Djavax.net.ssl.keyStoreType=pkcs12 #key-store = "yourcertificate.p12" #Same as -Djavax.net.ssl.keyStore=yourcertificate.p12 - #key-store-pass = "$PASS" #Same as -Djavax.net.ssl.keyStorePassword=$PASS + #key-store-pass = "$PASS" #Same as -Djavax.net.ssl.keyStorePassword=$PASS - #trust-store-type = "jks" #Same as -Djavax.net.ssl.trustStoreType=jks - #trust-store = "your.keystore" #Same as -Djavax.net.ssl.trustStore=your.keystore - #trust-store-pass = "$PASS" #-Djavax.net.ssl.trustStorePassword=$PASS + #trust-store-type = "jks" #Same as -Djavax.net.ssl.trustStoreType=jks + #trust-store = "your.keystore" #Same as -Djavax.net.ssl.trustStore=your.keystore + #trust-store-pass = "$PASS" #Same as -Djavax.net.ssl.trustStorePassword=$PASS #This can be useful for debugging debug = off #if on, very verbose debug, same as -Djavax.net.debug=ssl diff --git a/config/akka.conf b/config/akka.conf index 71670837d3..84b9bfbbcf 100644 --- a/config/akka.conf +++ b/config/akka.conf @@ -3,10 +3,3 @@ include "akka-reference.conf" # In this file you can override any option defined in the 'akka-reference.conf' file. # Copy in all or parts of the 'akka-reference.conf' file and modify as you please. -# -# -# -# cluster = ["localhost:6379", "localhost:6380", "localhost:6381"] -# -# -# diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 5d994c371b..eb8e1bb5ee 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -428,7 +428,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { // ------------------------------------------------------------------------------------------------------------------- // akka-persistence-common subproject // ------------------------------------------------------------------------------------------------------------------- - + class AkkaPersistenceCommonProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) { val commons_pool = Dependencies.commons_pool val thrift = Dependencies.thrift @@ -747,4 +747,4 @@ trait DeployProject { self: BasicScalaProject => trait OSGiProject extends BNDPlugin { self: DefaultProject => override def bndExportPackage = Seq("se.scalablesolutions.akka.*;version=%s".format(projectVersion.value)) -} \ No newline at end of file +}