diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 00bf1bbf02..2ad30bcd55 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -159,9 +159,6 @@ object Actor extends ListenerManagement { */ private[akka] lazy val remote: RemoteSupport = cluster.remoteService - // start up a cluster node to join the ZooKeeper cluster - //if (ClusterModule.isEnabled) cluster.start() - /** * Creates an ActorRef out of the Actor with type T. *
@@ -382,10 +379,13 @@ object Actor extends ListenerManagement {
 
   private def newClusterActorRef(factory: () ⇒ ActorRef, address: String, deploy: Deploy): ActorRef = {
     deploy match {
-      case Deploy(_, router, serializerClassName, Clustered(home, replication: Replication, state: State)) ⇒
-
+      case Deploy(configAdress, router, serializerClassName, Clustered(home, replication: Replication, state: State)) ⇒
         ClusterModule.ensureEnabled()
-        if (!Actor.remote.isRunning) throw new IllegalStateException("Remote server is not running")
+
+        if (configAdress != address) throw new IllegalStateException(
+          "Deployment config for [" + address + "] is wrong [" + deploy + "]")
+        if (!Actor.remote.isRunning) throw new IllegalStateException(
+          "Remote server is not running")
 
         val isHomeNode = DeploymentConfig.isHomeNode(home)
         val replicas = DeploymentConfig.replicaValueFor(replication)
@@ -417,15 +417,20 @@ object Actor extends ListenerManagement {
           }
         }
 
-        if (isHomeNode) { // home node for clustered actor
+        val isStateful = state match {
+          case Stateless ⇒ false
+          case Stateful  ⇒ true
+        }
+
+        if (isStateful && isHomeNode) { // stateful actor's home node
           cluster
             .use(address, serializer)
             .getOrElse(throw new ConfigurationException(
               "Could not check out actor [" + address + "] from cluster registry as a \"local\" actor"))
 
         } else {
-          if (!cluster.isClustered(address)) {
-            cluster.store(factory().start(), replicas, false, serializer) // add actor to cluster registry (if not already added)
+          if (!cluster.isClustered(address)) { // add actor to cluster registry (if not already added)
+            cluster.store(factory().start(), replicas, false, serializer)
           }
 
           // remote node (not home node), check out as ClusterActorRef
@@ -651,14 +656,21 @@ trait Actor {
   private[akka] final def apply(msg: Any) = {
     if (msg.isInstanceOf[AnyRef] && (msg.asInstanceOf[AnyRef] eq null))
       throw new InvalidMessageException("Message from [" + self.sender + "] to [" + self.toString + "] is null")
+
     val behaviorStack = self.hotswap
+
     msg match {
-      case l: AutoReceivedMessage ⇒ autoReceiveMessage(l)
-      case msg if behaviorStack.nonEmpty &&
-        behaviorStack.head.isDefinedAt(msg) ⇒ behaviorStack.head.apply(msg)
-      case msg if behaviorStack.isEmpty &&
-        processingBehavior.isDefinedAt(msg) ⇒ processingBehavior.apply(msg)
-      case unknown ⇒ unhandled(unknown) //This is the only line that differs from processingbehavior
+      case l: AutoReceivedMessage ⇒
+        autoReceiveMessage(l)
+
+      case msg if behaviorStack.nonEmpty && behaviorStack.head.isDefinedAt(msg) ⇒
+        behaviorStack.head.apply(msg)
+
+      case msg if behaviorStack.isEmpty && processingBehavior.isDefinedAt(msg) ⇒
+        processingBehavior.apply(msg)
+
+      case unknown ⇒
+        unhandled(unknown) //This is the only line that differs from processingbehavior
     }
   }
 
diff --git a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala
index f3c38887fb..383c6d9545 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala
@@ -45,7 +45,6 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag
    */
   def actorFor(address: String): Option[ActorRef] = {
     if (actorsByAddress.containsKey(address)) Some(actorsByAddress.get(address))
-    //    else if (isClusterEnabled)                ClusterModule.node.use(address) // FIXME uncomment and fix
     else None
   }
 
@@ -135,9 +134,9 @@ class LocalActorRegistry(
   def shutdownAll() {
     foreach(_.stop)
     if (ClusterModule.isEnabled) Actor.remote.clear //FIXME: Should this be here?
-    actorsByAddress.clear
-    actorsByUuid.clear
-    typedActorsByUuid.clear
+    actorsByAddress.clear()
+    actorsByUuid.clear()
+    typedActorsByUuid.clear()
   }
 
   //============== ACTORS ==============
diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala
index 706cf29b6f..5943e28117 100644
--- a/akka-actor/src/main/scala/akka/actor/Deployer.scala
+++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala
@@ -356,7 +356,7 @@ object Deployer {
     }
   }
 
-  private def throwDeploymentBoundException(deployment: Deploy): Nothing = {
+  private[akka] def throwDeploymentBoundException(deployment: Deploy): Nothing = {
     val e = new DeploymentAlreadyBoundException(
       "Address [" + deployment.address +
         "] already bound to [" + deployment +
@@ -365,7 +365,7 @@ object Deployer {
     throw e
   }
 
-  private def thrownNoDeploymentBoundException(address: String): Nothing = {
+  private[akka] def thrownNoDeploymentBoundException(address: String): Nothing = {
     val e = new NoDeploymentBoundException("Address [" + address + "] is not bound to a deployment")
     EventHandler.error(e, this, e.getMessage)
     throw e
@@ -392,8 +392,7 @@ object LocalDeployer {
 
   private[akka] def deploy(deployment: Deploy) {
     if (deployments.putIfAbsent(deployment.address, deployment) != deployment) {
-      // FIXME do automatic 'undeploy' and redeploy (perhaps have it configurable if redeploy should be done or exception thrown)
-      // throwDeploymentBoundException(deployment)
+      //Deployer.throwDeploymentBoundException(deployment) // FIXME uncomment this and fix the issue with multiple deployments
     }
   }
 
diff --git a/akka-actor/src/main/scala/akka/actor/Supervisor.scala b/akka-actor/src/main/scala/akka/actor/Supervisor.scala
index 2d7531a33f..fbcf0862f2 100644
--- a/akka-actor/src/main/scala/akka/actor/Supervisor.scala
+++ b/akka-actor/src/main/scala/akka/actor/Supervisor.scala
@@ -44,7 +44,7 @@ class SupervisorException private[akka] (message: String, cause: Throwable = nul
  * @author Jonas Bonér
  */
 object Supervisor {
-  def apply(config: SupervisorConfig): Supervisor = SupervisorFactory(config).newInstance.start
+  def apply(config: SupervisorConfig): Supervisor = SupervisorFactory(config).newInstance.start()
 }
 
 /**
@@ -108,7 +108,7 @@ sealed class Supervisor(handler: FaultHandlingStrategy, maxRestartsHandler: (Act
 
   def uuid = supervisor.uuid
 
-  def start: Supervisor = {
+  def start(): Supervisor = {
     this
   }
 
diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala
index a75d65ddd9..af530c3068 100644
--- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala
+++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala
@@ -166,7 +166,7 @@ object ReflectiveAccess {
                         classloader: ClassLoader = loader): Either[Exception, T] = try {
     assert(params ne null)
     assert(args ne null)
-    getClassFor(fqn) match {
+    getClassFor(fqn, classloader) match {
       case Right(value) ⇒
         val ctor = value.getDeclaredConstructor(params: _*)
         ctor.setAccessible(true)
@@ -180,7 +180,7 @@ object ReflectiveAccess {
 
   //Obtains a reference to fqn.MODULE$
   def getObjectFor[T](fqn: String, classloader: ClassLoader = loader): Either[Exception, T] = try {
-    getClassFor(fqn) match {
+    getClassFor(fqn, classloader) match {
       case Right(value) ⇒
         val instance = value.getDeclaredField("MODULE$")
         instance.setAccessible(true)
diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
index 1403295a49..37bff63270 100644
--- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
@@ -33,7 +33,8 @@ import akka.event.EventHandler
 import akka.dispatch.{ Dispatchers, Future }
 import akka.remoteinterface._
 import akka.routing.RouterType
-import akka.config.Config
+import akka.config.{ Config, Supervision }
+import Supervision._
 import Config._
 import akka.serialization.{ Format, Serializers, Serializer, Compression }
 import Compression.LZF
@@ -225,7 +226,7 @@ object Cluster {
   /**
    * Creates a new AkkaZkClient.
    */
-  def newZkClient: AkkaZkClient = new AkkaZkClient(zooKeeperServers, sessionTimeout, connectionTimeout, defaultSerializer)
+  def newZkClient(): AkkaZkClient = new AkkaZkClient(zooKeeperServers, sessionTimeout, connectionTimeout, defaultSerializer)
 
   def createQueue(rootPath: String, blocking: Boolean = true) = new ZooKeeperQueue(node.zkClient, rootPath, blocking)
 
@@ -282,8 +283,18 @@ class DefaultClusterNode private[akka] (
       case RemoteClientDisconnected(client, address) ⇒ client.shutdownClientModule()
       case _                                         ⇒ //ignore other
     }
-  }, "akka.cluster.remoteClientLifeCycleListener").start()
+  }, "akka.cluster.RemoteClientLifeCycleListener").start()
+
   lazy val remoteDaemon = actorOf(new RemoteClusterDaemon(this), RemoteClusterDaemon.ADDRESS).start()
+
+  lazy val remoteDaemonSupervisor = Supervisor(
+    SupervisorConfig(
+      OneForOneStrategy(List(classOf[Exception]), Int.MaxValue, Int.MaxValue), // is infinite restart what we want?
+      Supervise(
+        remoteDaemon,
+        Permanent)
+        :: Nil))
+
   lazy val remoteService: RemoteSupport = {
     val remote = new akka.remote.netty.NettyRemoteSupport
     remote.start(nodeAddress.hostname, nodeAddress.port)
@@ -291,6 +302,7 @@ class DefaultClusterNode private[akka] (
     remote.addListener(remoteClientLifeCycleListener)
     remote
   }
+
   lazy val remoteServerAddress: InetSocketAddress = remoteService.address
 
   // static nodes
@@ -688,12 +700,12 @@ class DefaultClusterNode private[akka] (
           // FIXME switch to ReplicatedActorRef here
           // val actor = new ReplicatedActorRef(fromBinary[T](bytes, remoteServerAddress)(format))
           val actor = fromBinary[T](bytes, remoteServerAddress)(format)
-          remoteService.register(UUID_PREFIX + uuid, actor) // clustered refs are always registered and looked up by UUID
+          //          remoteService.register(UUID_PREFIX + uuid, actor) // FIXME is Actor.remote.register(UUID, ..) correct here?
           actor.start()
           actor.asInstanceOf[LocalActorRef]
         case Right(exception) ⇒ throw exception
       }
-    } headOption // FIXME should not be an array at all coming here
+    } headOption // FIXME should not be an array at all coming here but an Option[ActorRef]
   } else None
 
   /**
@@ -1086,9 +1098,9 @@ class DefaultClusterNode private[akka] (
         .format(nodeAddress.clusterName, nodeAddress.nodeName, nodeAddress.port, zkServerAddresses, serializer))
     EventHandler.info(this, "Starting up remote server [%s]".format(remoteServerAddress.toString))
     createRootClusterNode()
-    val isLeader = joinLeaderElection
+    val isLeader = joinLeaderElection()
     if (isLeader) createNodeStructureIfNeeded()
-    registerListeners
+    registerListeners()
     joinMembershipNode()
     joinActorsAtAddressNode()
     fetchMembershipChildrenNodes()
@@ -1125,7 +1137,8 @@ class DefaultClusterNode private[akka] (
 
     if (numberOfReplicas < replicationFactor) {
       throw new IllegalArgumentException(
-        "Replication factor [" + replicationFactor + "] is greater than the number of available nodes [" + numberOfReplicas + "]")
+        "Replication factor [" + replicationFactor +
+          "] is greater than the number of available nodes [" + numberOfReplicas + "]")
     } else if (numberOfReplicas == replicationFactor) {
       replicas = replicas ++ replicaConnectionsAsArray
     } else {
@@ -1164,7 +1177,7 @@ class DefaultClusterNode private[akka] (
     } catch {
       case e: ZkNodeExistsException ⇒
         val error = new ClusterException("Can't join the cluster. The node name [" + nodeAddress.nodeName + "] is already in by another node")
-        EventHandler.error(error, this, "")
+        EventHandler.error(error, this, error.toString)
         throw error
     }
   }
@@ -1173,7 +1186,7 @@ class DefaultClusterNode private[akka] (
     ignore[ZkNodeExistsException](zkClient.createPersistent(actorsAtNodePathFor(nodeAddress.nodeName)))
   }
 
-  private[cluster] def joinLeaderElection: Boolean = {
+  private[cluster] def joinLeaderElection(): Boolean = {
     EventHandler.info(this, "Node [%s] is joining leader election".format(nodeAddress.nodeName))
     leaderLock.lock
   }
@@ -1217,7 +1230,7 @@ class DefaultClusterNode private[akka] (
             homeAddress.setAccessible(true)
             homeAddress.set(actor, Some(remoteServerAddress))
 
-            remoteService.register(uuid, actor)
+            remoteService.register(uuid, actor) // FIXME is Actor.remote.register(UUID, ..) correct here?
           }
         }
 
@@ -1302,7 +1315,7 @@ class DefaultClusterNode private[akka] (
     }
   }
 
-  private def registerListeners = {
+  private def registerListeners() = {
     zkClient.subscribeStateChanges(stateListener)
     zkClient.subscribeChildChanges(MEMBERSHIP_NODE, membershipListener)
   }
@@ -1456,8 +1469,8 @@ trait ErrorHandler {
 object RemoteClusterDaemon {
   val ADDRESS = "akka-cluster-daemon".intern
 
-  // FIXME configure functionServerDispatcher to what?
-  val functionServerDispatcher = Dispatchers.newDispatcher("akka:cloud:cluster:function:server").build
+  // FIXME configure computeGridDispatcher to what?
+  val computeGridDispatcher = Dispatchers.newDispatcher("akka:cloud:cluster:compute-grid").build
 }
 
 // FIXME supervise RemoteClusterDaemon
@@ -1472,6 +1485,10 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
 
   self.dispatcher = Dispatchers.newPinnedDispatcher(self)
 
+  override def preRestart(reason: Throwable) {
+    EventHandler.debug(this, "RemoteClusterDaemon failed due to [%s] restarting...".format(reason))
+  }
+
   def receive: Receive = {
     case message: RemoteDaemonMessageProtocol ⇒
       EventHandler.debug(this, "Received command to RemoteClusterDaemon [%s]".format(message))
@@ -1528,7 +1545,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
 
         case FUNCTION_FUN0_UNIT ⇒
           actorOf(new Actor() {
-            self.dispatcher = functionServerDispatcher
+            self.dispatcher = computeGridDispatcher
 
             def receive = {
               case f: Function0[Unit] ⇒ try {
@@ -1541,7 +1558,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
 
         case FUNCTION_FUN0_ANY ⇒
           actorOf(new Actor() {
-            self.dispatcher = functionServerDispatcher
+            self.dispatcher = computeGridDispatcher
 
             def receive = {
               case f: Function0[Any] ⇒ try {
@@ -1554,7 +1571,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
 
         case FUNCTION_FUN1_ARG_UNIT ⇒
           actorOf(new Actor() {
-            self.dispatcher = functionServerDispatcher
+            self.dispatcher = computeGridDispatcher
 
             def receive = {
               case (fun: Function[Any, Unit], param: Any) ⇒ try {
@@ -1567,7 +1584,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
 
         case FUNCTION_FUN1_ARG_ANY ⇒
           actorOf(new Actor() {
-            self.dispatcher = functionServerDispatcher
+            self.dispatcher = computeGridDispatcher
 
             def receive = {
               case (fun: Function[Any, Unit], param: Any) ⇒ try {
diff --git a/config/akka-reference.conf b/config/akka-reference.conf
index f2385f19fe..16a3f872b1 100644
--- a/config/akka-reference.conf
+++ b/config/akka-reference.conf
@@ -1,9 +1,11 @@
-####################
-# Akka Config File #
-####################
+##############################
+# Akka Reference Config File #
+##############################
 
-# This file has all the default settings, so all these could be removed with no visible effect.
+# This the reference config file has all the default settings.
+# All these could be removed with no visible effect.
 # Modify as needed.
+# This file is imported in the 'akka.conf' file. Make your edits/overrides there.
 
 akka {
   version = "2.0-SNAPSHOT" # Akka version, checked against the runtime version of Akka.
@@ -132,19 +134,58 @@ akka {
     session-timeout = 60
     connection-timeout = 60
     use-compression = off
-    remote-daemon-ack-timeout = 30
-    exclude-ref-node-in-replica-set = on # should a replica be instantiated on the same node as the
+    remote-daemon-ack-timeout = 30       # Timeout for ACK of cluster operations, lik checking actor out etc.
+    exclude-ref-node-in-replica-set = on # Should a replica be instantiated on the same node as the
                                          #     cluster reference to the actor
-                                         #     default: on
+                                         #     Default: on
 
     replication {
-      digest-type = "MAC" # Options: CRC32 (cheap & unsafe), MAC (expensive & secure using password)
-      password = "secret"
+      digest-type = "MAC"                # Options: CRC32 (cheap & unsafe), MAC (expensive & secure using password)
+      password = "secret"                # FIXME: store open in file?
       ensemble-size = 3
       quorum-size = 2
     }
   }
 
+  remote {
+
+    # secure-cookie = "050E0A0D0D06010A00000900040D060F0C09060B" # generate your own with '$AKKA_HOME/scripts/generate_config_with_secure_cookie.sh' or using 'Crypt.generateSecureCookie'
+    secure-cookie = ""
+
+    compression-scheme = "zlib" # Options: "zlib" (lzf to come), leave out for no compression
+    zlib-compression-level = 6  # Options: 0-9 (1 being fastest and 9 being the most compressed), default is 6
+
+    layer = "akka.remote.netty.NettyRemoteSupport"
+
+    server {
+      # FIXME remove hostname/port
+      hostname = "localhost"        # The hostname or IP that clients should connect to
+      port = 2552                   # The port clients should connect to. Default is 2552 (AKKA)
+      message-frame-size = 1048576  # Increase this if you want to be able to send messages with large payloads
+      connection-timeout = 1
+      require-cookie = off          # Should the remote server require that it peers share the same secure-cookie (defined in the 'remote' section)?
+      untrusted-mode = off          # Enable untrusted mode for full security of server managed actors, allows untrusted clients to connect.
+      backlog = 4096                # Sets the size of the connection backlog
+      execution-pool-keepalive = 60 # Length in akka.time-unit how long core threads will be kept alive if idling
+      execution-pool-size      = 16 # Size of the core pool of the remote execution unit
+      max-channel-memory-size  = 0  # Maximum channel size, 0 for off
+      max-total-memory-size    = 0  # Maximum total size of all channels, 0 for off
+    }
+
+    client {
+      buffering {
+        retry-message-send-on-failure = on
+        capacity = -1                      # If negative (or zero) then an unbounded mailbox is used (default)
+                                           # If positive then a bounded mailbox is used and the capacity is set using the property
+      }
+      reconnect-delay = 5
+      read-timeout = 10
+      message-frame-size = 1048576
+      reap-futures-delay = 5
+      reconnection-time-window = 600 # Maximum time window that a client should try to reconnect for
+    }
+  }
+
   stm {
     fair             = on     # Should global transactions be fair or non-fair (non fair yield better performance)
     max-retries      = 1000
@@ -199,42 +240,4 @@ akka {
     expired-header-name = "Async-Timeout"   # the name of the response header to use when an async request expires
     expired-header-value = "expired"        # the value of the response header to use when an async request expires
   }
-
-  remote {
-
-    # secure-cookie = "050E0A0D0D06010A00000900040D060F0C09060B" # generate your own with '$AKKA_HOME/scripts/generate_config_with_secure_cookie.sh' or using 'Crypt.generateSecureCookie'
-    secure-cookie = ""
-
-    compression-scheme = "zlib" # Options: "zlib" (lzf to come), leave out for no compression
-    zlib-compression-level = 6  # Options: 0-9 (1 being fastest and 9 being the most compressed), default is 6
-
-    layer = "akka.remote.netty.NettyRemoteSupport"
-
-    server {
-      hostname = "localhost"        # The hostname or IP that clients should connect to
-      port = 2552                   # The port clients should connect to. Default is 2552 (AKKA)
-      message-frame-size = 1048576  # Increase this if you want to be able to send messages with large payloads
-      connection-timeout = 1
-      require-cookie = off          # Should the remote server require that it peers share the same secure-cookie (defined in the 'remote' section)?
-      untrusted-mode = off          # Enable untrusted mode for full security of server managed actors, allows untrusted clients to connect.
-      backlog = 4096                # Sets the size of the connection backlog
-      execution-pool-keepalive = 60 # Length in akka.time-unit how long core threads will be kept alive if idling
-      execution-pool-size      = 16 # Size of the core pool of the remote execution unit
-      max-channel-memory-size  = 0  # Maximum channel size, 0 for off
-      max-total-memory-size    = 0  # Maximum total size of all channels, 0 for off
-    }
-
-    client {
-      buffering {
-        retry-message-send-on-failure = on
-        capacity = -1                      # If negative (or zero) then an unbounded mailbox is used (default)
-                                           # If positive then a bounded mailbox is used and the capacity is set using the property
-      }
-      reconnect-delay = 5
-      read-timeout = 10
-      message-frame-size = 1048576
-      reap-futures-delay = 5
-      reconnection-time-window = 600 # Maximum time window that a client should try to reconnect for
-    }
-  }
 }