diff --git a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala index 3963042f22..690a69841f 100644 --- a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala +++ b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala @@ -122,8 +122,6 @@ object NodeAddress { trait ClusterNode { import ChangeListener._ - val isConnected = new AtomicBoolean(false) - private[cluster] val locallyCachedMembershipNodes = new ConcurrentSkipListSet[String]() def membershipNodes: Array[String] @@ -136,7 +134,7 @@ trait ClusterNode { def remoteServerAddress: InetSocketAddress - def isRunning: Boolean = isConnected.get + def isRunning: Boolean def start(): ClusterNode diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 57253b2572..152de0368f 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -6,15 +6,14 @@ package akka.cluster import org.apache.zookeeper._ import org.apache.zookeeper.Watcher.Event._ import org.apache.zookeeper.data.Stat -import org.apache.zookeeper.recipes.lock.{ WriteLock, LockListener } +import org.apache.zookeeper.recipes.lock.{WriteLock, LockListener} import org.I0Itec.zkclient._ import org.I0Itec.zkclient.serialize._ import org.I0Itec.zkclient.exception._ -import java.util.{ List ⇒ JList } -import java.util.concurrent.atomic.{ AtomicBoolean, AtomicReference } -import java.util.concurrent.{ CopyOnWriteArrayList, Callable, ConcurrentHashMap } +import java.util.{List ⇒ JList} +import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} import java.net.InetSocketAddress import javax.management.StandardMBean @@ -30,17 +29,17 @@ import Status._ import DeploymentConfig._ import akka.event.EventHandler -import akka.dispatch.{ Dispatchers, Future } +import akka.dispatch.{Dispatchers, Future} import akka.remoteinterface._ import akka.routing.RouterType -import akka.config.{ Config, Supervision } +import akka.config.{Config, Supervision} import Supervision._ import Config._ -import akka.serialization.{ Serialization, Serializer, Compression, ActorSerialization } +import akka.serialization.{Serialization, Serializer, ActorSerialization} import ActorSerialization._ -import Compression.LZF +import akka.serialization.Compression.LZF import akka.cluster.zookeeper._ import ChangeListener._ @@ -50,6 +49,7 @@ import RemoteDaemonMessageType._ import com.eaio.uuid.UUID import com.google.protobuf.ByteString +import java.util.concurrent.{CopyOnWriteArrayList, Callable, ConcurrentHashMap} // FIXME add watch for each node that when the entry for the node is removed then the node shuts itself down @@ -84,7 +84,7 @@ trait ClusterNodeMBean { def getMemberNodes: Array[String] - def getNodeAddres():NodeAddress + def getNodeAddres(): NodeAddress def getLeaderLockName: String @@ -112,31 +112,31 @@ trait ClusterNodeMBean { def getConfigElementKeys: Array[String] - def getMemberShipPathFor(node:String):String + def getMemberShipPathFor(node: String): String - def getConfigurationPathFor(key:String):String + def getConfigurationPathFor(key: String): String - def getActorAddresstoNodesPathFor(actorAddress:String):String + def getActorAddresstoNodesPathFor(actorAddress: String): String - def getActorAddressToNodesPathForWithNodeName(actorAddress:String, nodeName:String):String + def getActorAddressToNodesPathForWithNodeName(actorAddress: String, nodeName: String): String - def getNodeToUuidsPathFor(node:String):String + def getNodeToUuidsPathFor(node: String): String - def getNodeToUuidsPathFor(node:String, uuid:UUID):String + def getNodeToUuidsPathFor(node: String, uuid: UUID): String - def getActorAddressRegistryPathFor(actorAddress:String):String + def getActorAddressRegistryPathFor(actorAddress: String): String - def getActorAddressRegistrySerializerPathFor(actorAddress:String):String + def getActorAddressRegistrySerializerPathFor(actorAddress: String): String - def getActorAddressRegistryUuidPathFor(actorAddress:String):String + def getActorAddressRegistryUuidPathFor(actorAddress: String): String - def getActorUuidRegistryNodePathFor(uuid: UUID):String + def getActorUuidRegistryNodePathFor(uuid: UUID): String - def getActorUuidRegistryRemoteAddressPathFor(uuid: UUID):String + def getActorUuidRegistryRemoteAddressPathFor(uuid: UUID): String - def getActorAddressToUuidsPathFor(actorAddress: String):String + def getActorAddressToUuidsPathFor(actorAddress: String): String - def getActorAddressToUuidsPathForWithNodeName(actorAddress: String, uuid: UUID):String + def getActorAddressToUuidsPathForWithNodeName(actorAddress: String, uuid: UUID): String } /** @@ -181,17 +181,17 @@ object Cluster { private def nodename: String = properties.get("akka.cluster.nodename") match { case Some(uberride) ⇒ uberride - case None ⇒ Config.nodename + case None ⇒ Config.nodename } private def hostname: String = properties.get("akka.cluster.hostname") match { case Some(uberride) ⇒ uberride - case None ⇒ Config.hostname + case None ⇒ Config.hostname } private def port: Int = properties.get("akka.cluster.port") match { case Some(uberride) ⇒ uberride.toInt - case None ⇒ Config.remoteServerPort + case None ⇒ Config.remoteServerPort } val defaultZooKeeperSerializer = new SerializableSerializer @@ -329,12 +329,12 @@ object Cluster { * * @author Jonas Bonér */ -class DefaultClusterNode private[akka] ( - val nodeAddress: NodeAddress, - val hostname: String = Config.hostname, - val port: Int = Config.remoteServerPort, - val zkServerAddresses: String, - val serializer: ZkSerializer) extends ErrorHandler with ClusterNode { +class DefaultClusterNode private[akka]( + val nodeAddress: NodeAddress, + val hostname: String = Config.hostname, + val port: Int = Config.remoteServerPort, + val zkServerAddresses: String, + val serializer: ZkSerializer) extends ErrorHandler with ClusterNode { self ⇒ if ((hostname eq null) || hostname == "") throw new NullPointerException("Host name must not be null or empty string") @@ -349,7 +349,7 @@ class DefaultClusterNode private[akka] ( def receive = { case RemoteClientError(cause, client, address) ⇒ client.shutdownClientModule() case RemoteClientDisconnected(client, address) ⇒ client.shutdownClientModule() - case _ ⇒ //ignore other + case _ ⇒ //ignore other } }, "akka.cluster.RemoteClientLifeCycleListener").start() @@ -373,6 +373,8 @@ class DefaultClusterNode private[akka] ( lazy val remoteServerAddress: InetSocketAddress = remoteService.address + val isConnected = new Switch(false) + // static nodes val CLUSTER_PATH = "/" + nodeAddress.clusterName val MEMBERSHIP_PATH = CLUSTER_PATH + "/members" @@ -445,39 +447,65 @@ class DefaultClusterNode private[akka] ( // Node // ======================================= + def isRunning: Boolean = isConnected.isOn + def start(): ClusterNode = { - if (isConnected.compareAndSet(false, true)) { + isConnected.switchOn { initializeNode() } + this } + private[cluster] def initializeNode() { + EventHandler.info(this, + ("\nCreating cluster node with" + + "\n\tcluster name = [%s]" + + "\n\tnode name = [%s]" + + "\n\tport = [%s]" + + "\n\tzookeeper server addresses = [%s]" + + "\n\tserializer = [%s]") + .format(nodeAddress.clusterName, nodeAddress.nodeName, port, zkServerAddresses, serializer)) + EventHandler.info(this, "Starting up remote server [%s]".format(remoteServerAddress.toString)) + createZooKeeperPathStructureIfNeeded() + registerListeners() + joinCluster() + joinLeaderElection() + fetchMembershipNodes() + EventHandler.info(this, "Cluster node [%s] started successfully".format(nodeAddress)) + } + + def shutdown() { - if (isConnected.compareAndSet(true, false)) { - ignore[ZkNoNodeException](zkClient.deleteRecursive(membershipNodePath)) - - locallyCachedMembershipNodes.clear() - - nodeConnections.toList.foreach({ - case (_, (address, _)) ⇒ - Actor.remote.shutdownClientConnection(address) // shut down client connections - }) - - remoteService.shutdown() // shutdown server - - remoteClientLifeCycleListener.stop() - remoteDaemon.stop() - - // for monitoring remote listener - registry.local.actors.filter(remoteService.hasListener).foreach(_.stop()) - - nodeConnections.clear() - - disconnect() - EventHandler.info(this, "Cluster node shut down [%s]".format(nodeAddress)) + isConnected.switchOff { + shutdownNode() } } + private def shutdownNode() { + ignore[ZkNoNodeException](zkClient.deleteRecursive(membershipNodePath)) + + locallyCachedMembershipNodes.clear() + + nodeConnections.toList.foreach({ + case (_, (address, _)) ⇒ + Actor.remote.shutdownClientConnection(address) // shut down client connections + }) + + remoteService.shutdown() // shutdown server + + remoteClientLifeCycleListener.stop() + remoteDaemon.stop() + + // for monitoring remote listener + registry.local.actors.filter(remoteService.hasListener).foreach(_.stop()) + + nodeConnections.clear() + + disconnect() + EventHandler.info(this, "Cluster node shut down [%s]".format(nodeAddress)) + } + def disconnect(): ClusterNode = { zkClient.unsubscribeAll() zkClient.close() @@ -668,12 +696,12 @@ class DefaultClusterNode private[akka] ( * available durable store. */ def store( - actorAddress: String, - actorFactory: () ⇒ ActorRef, - replicationFactor: Int, - replicationScheme: ReplicationScheme, - serializeMailbox: Boolean, - serializer: Serializer): ClusterNode = if (isConnected.get) { + actorAddress: String, + actorFactory: () ⇒ ActorRef, + replicationFactor: Int, + replicationScheme: ReplicationScheme, + serializeMailbox: Boolean, + serializer: Serializer): ClusterNode = if (isConnected.isOn) { val serializerClassName = serializer.getClass.getName @@ -704,7 +732,7 @@ class DefaultClusterNode private[akka] ( } } }) match { - case Left(path) ⇒ path + case Left(path) ⇒ path case Right(exception) ⇒ actorAddressRegistryPath } } @@ -749,7 +777,7 @@ class DefaultClusterNode private[akka] ( /** * Is the actor with uuid clustered or not? */ - def isClustered(actorAddress: String): Boolean = if (isConnected.get) { + def isClustered(actorAddress: String): Boolean = if (isConnected.isOn) { zkClient.exists(actorAddressRegistryPathFor(actorAddress)) } else false @@ -761,7 +789,7 @@ class DefaultClusterNode private[akka] ( /** * Is the actor with uuid in use or not? */ - def isInUseOnNode(actorAddress: String, node: NodeAddress): Boolean = if (isConnected.get) { + def isInUseOnNode(actorAddress: String, node: NodeAddress): Boolean = if (isConnected.isOn) { zkClient.exists(actorAddressToNodesPathFor(actorAddress, node.nodeName)) } else false @@ -775,7 +803,7 @@ class DefaultClusterNode private[akka] ( * Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available * for remote access through lookup by its UUID. */ - def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[LocalActorRef] = if (isConnected.get) { + def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[LocalActorRef] = if (isConnected.isOn) { val nodeName = nodeAddress.nodeName ignore[ZkNodeExistsException](zkClient.createEphemeral(actorAddressToNodesPathFor(actorAddress, nodeName))) @@ -791,7 +819,7 @@ class DefaultClusterNode private[akka] ( val actorFactory = Serialization.deserialize(actorFactoryBytes, classOf[() ⇒ LocalActorRef], None) match { - case Left(error) ⇒ throw error + case Left(error) ⇒ throw error case Right(instance) ⇒ instance.asInstanceOf[() ⇒ LocalActorRef] } @@ -860,7 +888,7 @@ class DefaultClusterNode private[akka] ( EventHandler.debug(this, "Sending command to nodes [%s] for checking out actor [%s]".format(nodes.mkString(", "), actorAddress)) - if (isConnected.get) { + if (isConnected.isOn) { val builder = RemoteDaemonMessageProtocol.newBuilder .setMessageType(USE) @@ -871,11 +899,12 @@ class DefaultClusterNode private[akka] ( val command = builder.build - nodes foreach { node ⇒ - nodeConnections.get(node) foreach { - case (_, connection) ⇒ - sendCommandToNode(connection, command, async = false) - } + nodes foreach { + node ⇒ + nodeConnections.get(node) foreach { + case (_, connection) ⇒ + sendCommandToNode(connection, command, async = false) + } } } } @@ -908,15 +937,16 @@ class DefaultClusterNode private[akka] ( // FIXME 'Cluster.release' needs to notify all existing ClusterActorRef's that are using the instance that it is no longer available. Then what to do? Should we even remove this method? - if (isConnected.get) { + if (isConnected.isOn) { ignore[ZkNoNodeException](zkClient.delete(actorAddressToNodesPathFor(actorAddress, nodeAddress.nodeName))) - uuidsForActorAddress(actorAddress) foreach { uuid ⇒ - EventHandler.debug(this, - "Releasing actor [%s] with UUID [%s] after usage".format(actorAddress, uuid)) + uuidsForActorAddress(actorAddress) foreach { + uuid ⇒ + EventHandler.debug(this, + "Releasing actor [%s] with UUID [%s] after usage".format(actorAddress, uuid)) - ignore[ZkNoNodeException](zkClient.deleteRecursive(nodeToUuidsPathFor(nodeAddress.nodeName, uuid))) - ignore[ZkNoNodeException](zkClient.delete(actorUuidRegistryRemoteAddressPathFor(uuid))) + ignore[ZkNoNodeException](zkClient.deleteRecursive(nodeToUuidsPathFor(nodeAddress.nodeName, uuid))) + ignore[ZkNoNodeException](zkClient.delete(actorUuidRegistryRemoteAddressPathFor(uuid))) } } } @@ -925,7 +955,7 @@ class DefaultClusterNode private[akka] ( * Releases (checking in) all actors with a specific address on all nodes in the cluster where the actor is in 'use'. */ private[akka] def releaseActorOnAllNodes(actorAddress: String) { - if (isConnected.get) { + if (isConnected.isOn) { EventHandler.debug(this, "Releasing (checking in) all actors with address [%s] on all nodes in cluster".format(actorAddress)) @@ -934,10 +964,11 @@ class DefaultClusterNode private[akka] ( .setActorAddress(actorAddress) .build - nodesForActorsInUseWithAddress(actorAddress) foreach { node ⇒ - nodeConnections.get(node) foreach { - case (_, connection) ⇒ sendCommandToNode(connection, command, async = true) - } + nodesForActorsInUseWithAddress(actorAddress) foreach { + node ⇒ + nodeConnections.get(node) foreach { + case (_, connection) ⇒ sendCommandToNode(connection, command, async = true) + } } } } @@ -945,14 +976,16 @@ class DefaultClusterNode private[akka] ( /** * Creates an ActorRef with a Router to a set of clustered actors. */ - def ref(actorAddress: String, router: RouterType): ActorRef = if (isConnected.get) { + def ref(actorAddress: String, router: RouterType): ActorRef = if (isConnected.isOn) { val addresses = addressesForActor(actorAddress) EventHandler.debug(this, "Checking out cluster actor ref with address [%s] and router [%s] on [%s] connected to [\n\t%s]" .format(actorAddress, router, remoteServerAddress, addresses.map(_._2).mkString("\n\t"))) val actorRef = Router newRouter (router, addresses, actorAddress, Actor.TIMEOUT) - addresses foreach { case (_, address) ⇒ clusterActorRefs.put(address, actorRef) } + addresses foreach { + case (_, address) ⇒ clusterActorRefs.put(address, actorRef) + } actorRef.start() } else throw new ClusterException("Not connected to cluster") @@ -970,7 +1003,7 @@ class DefaultClusterNode private[akka] ( /** * Returns the UUIDs of all actors registered in this cluster. */ - private[akka] def uuidsForClusteredActors: Array[UUID] = if (isConnected.get) { + private[akka] def uuidsForClusteredActors: Array[UUID] = if (isConnected.isOn) { zkClient.getChildren(ACTOR_UUID_REGISTRY_PATH).toList.map(new UUID(_)).toArray.asInstanceOf[Array[UUID]] } else Array.empty[UUID] @@ -982,7 +1015,7 @@ class DefaultClusterNode private[akka] ( /** * Returns the actor id for the actor with a specific UUID. */ - private[akka] def actorAddressForUuid(uuid: UUID): Option[String] = if (isConnected.get) { + private[akka] def actorAddressForUuid(uuid: UUID): Option[String] = if (isConnected.isOn) { try { Some(zkClient.readData(actorUuidRegistryAddressPathFor(uuid)).asInstanceOf[String]) } catch { @@ -999,7 +1032,7 @@ class DefaultClusterNode private[akka] ( /** * Returns the actor UUIDs for actor ID. */ - private[akka] def uuidsForActorAddress(actorAddress: String): Array[UUID] = if (isConnected.get) { + private[akka] def uuidsForActorAddress(actorAddress: String): Array[UUID] = if (isConnected.isOn) { try { zkClient.getChildren(actorAddressToUuidsPathFor(actorAddress)).toList.toArray map { case c: CharSequence ⇒ new UUID(c) @@ -1012,7 +1045,7 @@ class DefaultClusterNode private[akka] ( /** * Returns the node names of all actors in use with UUID. */ - private[akka] def nodesForActorsInUseWithAddress(actorAddress: String): Array[String] = if (isConnected.get) { + private[akka] def nodesForActorsInUseWithAddress(actorAddress: String): Array[String] = if (isConnected.isOn) { try { zkClient.getChildren(actorAddressToNodesPathFor(actorAddress)).toList.toArray.asInstanceOf[Array[String]] } catch { @@ -1023,7 +1056,7 @@ class DefaultClusterNode private[akka] ( /** * Returns the UUIDs of all actors in use registered on a specific node. */ - private[akka] def uuidsForActorsInUseOnNode(nodeName: String): Array[UUID] = if (isConnected.get) { + private[akka] def uuidsForActorsInUseOnNode(nodeName: String): Array[UUID] = if (isConnected.isOn) { try { zkClient.getChildren(nodeToUuidsPathFor(nodeName)).toList.toArray map { case c: CharSequence ⇒ new UUID(c) @@ -1036,7 +1069,7 @@ class DefaultClusterNode private[akka] ( /** * Returns the addresses of all actors in use registered on a specific node. */ - def addressesForActorsInUseOnNode(nodeName: String): Array[String] = if (isConnected.get) { + def addressesForActorsInUseOnNode(nodeName: String): Array[String] = if (isConnected.isOn) { val uuids = try { zkClient.getChildren(nodeToUuidsPathFor(nodeName)).toList.toArray map { @@ -1059,7 +1092,8 @@ class DefaultClusterNode private[akka] ( case e: ZkNoNodeException ⇒ throw new IllegalStateException("No serializer found for actor with address [%s]".format(actorAddress)) } - ReflectiveAccess.getClassFor(serializerClassName) match { // FIXME need to pass in a user provide class loader? Now using default in ReflectiveAccess. + ReflectiveAccess.getClassFor(serializerClassName) match { + // FIXME need to pass in a user provide class loader? Now using default in ReflectiveAccess. case Right(clazz) ⇒ clazz.newInstance.asInstanceOf[Serializer] case Left(error) ⇒ EventHandler.error(error, this, "Could not load serializer class [%s] due to: %s".format(serializerClassName, error.toString)) @@ -1183,7 +1217,7 @@ class DefaultClusterNode private[akka] ( } } }) match { - case Left(_) ⇒ /* do nothing */ + case Left(_) ⇒ /* do nothing */ case Right(exception) ⇒ throw exception } } @@ -1242,44 +1276,35 @@ class DefaultClusterNode private[akka] ( } private[cluster] def membershipPathFor(node: String): String = "%s/%s".format(MEMBERSHIP_PATH, node) + private[cluster] def configurationPathFor(key: String): String = "%s/%s".format(CONFIGURATION_PATH, key) private[cluster] def actorAddressToNodesPathFor(actorAddress: String): String = "%s/%s".format(ACTOR_ADDRESS_NODES_TO_PATH, actorAddress) + private[cluster] def actorAddressToNodesPathFor(actorAddress: String, nodeName: String): String = "%s/%s".format(actorAddressToNodesPathFor(actorAddress), nodeName) private[cluster] def nodeToUuidsPathFor(node: String): String = "%s/%s".format(NODE_TO_ACTOR_UUIDS_PATH, node) + private[cluster] def nodeToUuidsPathFor(node: String, uuid: UUID): String = "%s/%s/%s".format(NODE_TO_ACTOR_UUIDS_PATH, node, uuid) private[cluster] def actorAddressRegistryPathFor(actorAddress: String): String = "%s/%s".format(ACTOR_ADDRESS_REGISTRY_PATH, actorAddress) + private[cluster] def actorAddressRegistrySerializerPathFor(actorAddress: String): String = "%s/%s".format(actorAddressRegistryPathFor(actorAddress), "serializer") + private[cluster] def actorAddressRegistryUuidPathFor(actorAddress: String): String = "%s/%s".format(actorAddressRegistryPathFor(actorAddress), "uuid") private[cluster] def actorUuidRegistryPathFor(uuid: UUID): String = "%s/%s".format(ACTOR_UUID_REGISTRY_PATH, uuid) + private[cluster] def actorUuidRegistryNodePathFor(uuid: UUID): String = "%s/%s".format(actorUuidRegistryPathFor(uuid), "node") + private[cluster] def actorUuidRegistryAddressPathFor(uuid: UUID): String = "%s/%s".format(actorUuidRegistryPathFor(uuid), "address") private[cluster] def actorUuidRegistryRemoteAddressPathFor(uuid: UUID): String = "%s/%s".format(actorUuidRegistryPathFor(uuid), "remote-address") private[cluster] def actorAddressToUuidsPathFor(actorAddress: String): String = "%s/%s".format(ACTOR_ADDRESS_TO_UUIDS_PATH, actorAddress.replace('.', '_')) + private[cluster] def actorAddressToUuidsPathFor(actorAddress: String, uuid: UUID): String = "%s/%s".format(actorAddressToUuidsPathFor(actorAddress), uuid) - private[cluster] def initializeNode() { - EventHandler.info(this, - ("\nCreating cluster node with" + - "\n\tcluster name = [%s]" + - "\n\tnode name = [%s]" + - "\n\tport = [%s]" + - "\n\tzookeeper server addresses = [%s]" + - "\n\tserializer = [%s]") - .format(nodeAddress.clusterName, nodeAddress.nodeName, port, zkServerAddresses, serializer)) - EventHandler.info(this, "Starting up remote server [%s]".format(remoteServerAddress.toString)) - createZooKeeperPathStructureIfNeeded() - registerListeners() - joinCluster() - joinLeaderElection() - fetchMembershipNodes() - EventHandler.info(this, "Cluster node [%s] started successfully".format(nodeAddress)) - } /** * Returns a random set with node names of size 'replicationFactor'. @@ -1295,7 +1320,8 @@ class DefaultClusterNode private[akka] ( "] is greater than the number of available nodeNames [" + nrOfClusterNodes + "]") val preferredNodes = - if (actorAddress.isDefined) { // use 'preferred-nodes' in deployment config for the actor + if (actorAddress.isDefined) { + // use 'preferred-nodes' in deployment config for the actor Deployer.deploymentFor(actorAddress.get) match { case Deploy(_, _, Clustered(nodes, _, _)) ⇒ nodes map (node ⇒ DeploymentConfig.nodeNameFor(node)) take replicationFactor @@ -1350,13 +1376,16 @@ class DefaultClusterNode private[akka] ( * @returns a Map with the remote socket addresses to of disconnected node connections */ private[cluster] def connectToAllNewlyArrivedMembershipNodesInCluster( - newlyConnectedMembershipNodes: Traversable[String], - newlyDisconnectedMembershipNodes: Traversable[String]): Map[String, InetSocketAddress] = { + newlyConnectedMembershipNodes: Traversable[String], + newlyDisconnectedMembershipNodes: Traversable[String]): Map[String, InetSocketAddress] = { // cache the disconnected connections in a map, needed for fail-over of these connections later var disconnectedConnections = Map.empty[String, InetSocketAddress] - newlyDisconnectedMembershipNodes foreach { node ⇒ - disconnectedConnections += (node -> (nodeConnections(node) match { case (address, _) ⇒ address })) + newlyDisconnectedMembershipNodes foreach { + node ⇒ + disconnectedConnections += (node -> (nodeConnections(node) match { + case (address, _) ⇒ address + })) } if (connectToAllNewlyArrivedMembershipNodesInClusterLock.compareAndSet(false, true)) { @@ -1365,17 +1394,20 @@ class DefaultClusterNode private[akka] ( newlyDisconnectedMembershipNodes foreach (nodeConnections.remove(_)) // add connections newly arrived nodes - newlyConnectedMembershipNodes foreach { node ⇒ - if (!nodeConnections.contains(node)) { // only connect to each replica once + newlyConnectedMembershipNodes foreach { + node ⇒ + if (!nodeConnections.contains(node)) { + // only connect to each replica once - remoteSocketAddressForNode(node) foreach { address ⇒ - EventHandler.debug(this, - "Setting up connection to node with nodename [%s] and address [%s]".format(node, address)) + remoteSocketAddressForNode(node) foreach { + address ⇒ + EventHandler.debug(this, + "Setting up connection to node with nodename [%s] and address [%s]".format(node, address)) - val clusterDaemon = Actor.remote.actorFor(RemoteClusterDaemon.Address, address.getHostName, address.getPort).start() - nodeConnections.put(node, (address, clusterDaemon)) + val clusterDaemon = Actor.remote.actorFor(RemoteClusterDaemon.Address, address.getHostName, address.getPort).start() + nodeConnections.put(node, (address, clusterDaemon)) + } } - } } } finally { connectToAllNewlyArrivedMembershipNodesInClusterLock.set(false) @@ -1422,84 +1454,87 @@ class DefaultClusterNode private[akka] ( } private[cluster] def migrateActorsOnFailedNodes( - failedNodes: List[String], - currentClusterNodes: List[String], - oldClusterNodes: List[String], - disconnectedConnections: Map[String, InetSocketAddress]) { + failedNodes: List[String], + currentClusterNodes: List[String], + oldClusterNodes: List[String], + disconnectedConnections: Map[String, InetSocketAddress]) { - failedNodes.foreach { failedNodeName ⇒ + failedNodes.foreach { + failedNodeName ⇒ - val failedNodeAddress = NodeAddress(nodeAddress.clusterName, failedNodeName) + val failedNodeAddress = NodeAddress(nodeAddress.clusterName, failedNodeName) - val myIndex = oldClusterNodes.indexWhere(_.endsWith(nodeAddress.nodeName)) - val failedNodeIndex = oldClusterNodes.indexWhere(_ == failedNodeName) + val myIndex = oldClusterNodes.indexWhere(_.endsWith(nodeAddress.nodeName)) + val failedNodeIndex = oldClusterNodes.indexWhere(_ == failedNodeName) - // Migrate to the successor of the failed node (using a sorted circular list of the node names) - if ((failedNodeIndex == 0 && myIndex == oldClusterNodes.size - 1) || // No leftmost successor exists, check the tail - (failedNodeIndex == myIndex + 1)) { // Am I the leftmost successor? + // Migrate to the successor of the failed node (using a sorted circular list of the node names) + if ((failedNodeIndex == 0 && myIndex == oldClusterNodes.size - 1) || // No leftmost successor exists, check the tail + (failedNodeIndex == myIndex + 1)) { + // Am I the leftmost successor? - // Takes the lead of migrating the actors. Not all to this node. - // All to this node except if the actor already resides here, then pick another node it is not already on. + // Takes the lead of migrating the actors. Not all to this node. + // All to this node except if the actor already resides here, then pick another node it is not already on. - // Yes I am the node to migrate the actor to (can only be one in the cluster) - val actorUuidsForFailedNode = zkClient.getChildren(nodeToUuidsPathFor(failedNodeName)).toList + // Yes I am the node to migrate the actor to (can only be one in the cluster) + val actorUuidsForFailedNode = zkClient.getChildren(nodeToUuidsPathFor(failedNodeName)).toList - actorUuidsForFailedNode.foreach { uuidAsString ⇒ - EventHandler.debug(this, - "Cluster node [%s] has failed, migrating actor with UUID [%s] to [%s]" - .format(failedNodeName, uuidAsString, nodeAddress.nodeName)) + actorUuidsForFailedNode.foreach { + uuidAsString ⇒ + EventHandler.debug(this, + "Cluster node [%s] has failed, migrating actor with UUID [%s] to [%s]" + .format(failedNodeName, uuidAsString, nodeAddress.nodeName)) - val uuid = uuidFrom(uuidAsString) - val actorAddress = actorAddressForUuid(uuid).getOrElse( - throw new IllegalStateException("No actor address found for UUID [" + uuidAsString + "]")) + val uuid = uuidFrom(uuidAsString) + val actorAddress = actorAddressForUuid(uuid).getOrElse( + throw new IllegalStateException("No actor address found for UUID [" + uuidAsString + "]")) - val migrateToNodeAddress = - if (isInUseOnNode(actorAddress)) { - // already in use on this node, pick another node to instantiate the actor on - val replicaNodesForActor = nodesForActorsInUseWithAddress(actorAddress) - val nodesAvailableForMigration = (currentClusterNodes.toSet diff failedNodes.toSet) diff replicaNodesForActor.toSet + val migrateToNodeAddress = + if (isInUseOnNode(actorAddress)) { + // already in use on this node, pick another node to instantiate the actor on + val replicaNodesForActor = nodesForActorsInUseWithAddress(actorAddress) + val nodesAvailableForMigration = (currentClusterNodes.toSet diff failedNodes.toSet) diff replicaNodesForActor.toSet - if (nodesAvailableForMigration.isEmpty) throw new ClusterException( - "Can not migrate actor to new node since there are not any available nodes left. " + - "(However, the actor already has >1 replica in cluster, so we are ok)") + if (nodesAvailableForMigration.isEmpty) throw new ClusterException( + "Can not migrate actor to new node since there are not any available nodes left. " + + "(However, the actor already has >1 replica in cluster, so we are ok)") - NodeAddress(nodeAddress.clusterName, nodesAvailableForMigration.head) - } else { - // actor is not in use on this node, migrate it here - nodeAddress - } + NodeAddress(nodeAddress.clusterName, nodesAvailableForMigration.head) + } else { + // actor is not in use on this node, migrate it here + nodeAddress + } - // if actor is replicated => pass along the UUID for the actor to replicate from (replay transaction log etc.) - val replicateFromUuid = - if (isReplicated(actorAddress)) Some(uuid) - else None + // if actor is replicated => pass along the UUID for the actor to replicate from (replay transaction log etc.) + val replicateFromUuid = + if (isReplicated(actorAddress)) Some(uuid) + else None - migrateWithoutCheckingThatActorResidesOnItsHomeNode( - failedNodeAddress, - migrateToNodeAddress, - actorAddress, - replicateFromUuid) + migrateWithoutCheckingThatActorResidesOnItsHomeNode( + failedNodeAddress, + migrateToNodeAddress, + actorAddress, + replicateFromUuid) + } + + // notify all available nodes that they should fail-over all connections from 'from' to 'to' + val from = disconnectedConnections(failedNodeName) + val to = remoteServerAddress + + Serialization.serialize((from, to)) match { + case Left(error) ⇒ throw error + case Right(bytes) ⇒ + + val command = RemoteDaemonMessageProtocol.newBuilder + .setMessageType(FAIL_OVER_CONNECTIONS) + .setPayload(ByteString.copyFrom(bytes)) + .build + + // FIXME now we are broadcasting to ALL nodes in the cluster even though a fraction might have a reference to the actors - should that be fixed? + nodeConnections.values foreach { + case (_, connection) ⇒ sendCommandToNode(connection, command, async = true) + } + } } - - // notify all available nodes that they should fail-over all connections from 'from' to 'to' - val from = disconnectedConnections(failedNodeName) - val to = remoteServerAddress - - Serialization.serialize((from, to)) match { - case Left(error) ⇒ throw error - case Right(bytes) ⇒ - - val command = RemoteDaemonMessageProtocol.newBuilder - .setMessageType(FAIL_OVER_CONNECTIONS) - .setPayload(ByteString.copyFrom(bytes)) - .build - - // FIXME now we are broadcasting to ALL nodes in the cluster even though a fraction might have a reference to the actors - should that be fixed? - nodeConnections.values foreach { - case (_, connection) ⇒ sendCommandToNode(connection, command, async = true) - } - } - } } } @@ -1507,7 +1542,7 @@ class DefaultClusterNode private[akka] ( * Used when the ephemeral "home" node is already gone, so we can't check if it is available. */ private def migrateWithoutCheckingThatActorResidesOnItsHomeNode( - from: NodeAddress, to: NodeAddress, actorAddress: String, replicateFromUuid: Option[UUID]) { + from: NodeAddress, to: NodeAddress, actorAddress: String, replicateFromUuid: Option[UUID]) { EventHandler.debug(this, "Migrating actor [%s] from node [%s] to node [%s]".format(actorAddress, from, to)) if (!isInUseOnNode(actorAddress, to)) { @@ -1533,16 +1568,17 @@ class DefaultClusterNode private[akka] ( EventHandler.info(this, "Created node [%s]".format(CLUSTER_PATH)) } - basePaths.foreach { path ⇒ - try { - ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT)) - EventHandler.debug(this, "Created node [%s]".format(path)) - } catch { - case e ⇒ - val error = new ClusterException(e.toString) - EventHandler.error(error, this) - throw error - } + basePaths.foreach { + path ⇒ + try { + ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT)) + EventHandler.debug(this, "Created node [%s]".format(path)) + } catch { + case e ⇒ + val error = new ClusterException(e.toString) + EventHandler.error(error, this) + throw error + } } } @@ -1578,7 +1614,7 @@ class DefaultClusterNode private[akka] ( override def resign() = self.resign() - override def isConnected = self.isConnected.get + override def isConnected = self.isConnected.isOn override def getNodeAddres = self.nodeAddress @@ -1620,27 +1656,27 @@ class DefaultClusterNode private[akka] ( override def getConfigElementKeys = self.getConfigElementKeys.toArray - override def getMemberShipPathFor(node:String) = self.membershipPathFor(node) + override def getMemberShipPathFor(node: String) = self.membershipPathFor(node) - override def getConfigurationPathFor(key:String) = self.configurationPathFor(key) + override def getConfigurationPathFor(key: String) = self.configurationPathFor(key) - override def getActorAddresstoNodesPathFor(actorAddress:String) = self.actorAddressToNodesPathFor(actorAddress) + override def getActorAddresstoNodesPathFor(actorAddress: String) = self.actorAddressToNodesPathFor(actorAddress) - override def getActorAddressToNodesPathForWithNodeName(actorAddress:String, nodeName:String) = self.actorAddressToNodesPathFor(actorAddress, nodeName) + override def getActorAddressToNodesPathForWithNodeName(actorAddress: String, nodeName: String) = self.actorAddressToNodesPathFor(actorAddress, nodeName) - override def getNodeToUuidsPathFor(node:String) = self.nodeToUuidsPathFor(node) + override def getNodeToUuidsPathFor(node: String) = self.nodeToUuidsPathFor(node) - override def getNodeToUuidsPathFor(node:String, uuid:UUID) = self.nodeToUuidsPathFor(node, uuid) + override def getNodeToUuidsPathFor(node: String, uuid: UUID) = self.nodeToUuidsPathFor(node, uuid) - override def getActorAddressRegistryPathFor(actorAddress:String) = self.actorAddressRegistryPathFor(actorAddress) + override def getActorAddressRegistryPathFor(actorAddress: String) = self.actorAddressRegistryPathFor(actorAddress) - override def getActorAddressRegistrySerializerPathFor(actorAddress:String) = self.actorAddressRegistrySerializerPathFor(actorAddress) + override def getActorAddressRegistrySerializerPathFor(actorAddress: String) = self.actorAddressRegistrySerializerPathFor(actorAddress) - override def getActorAddressRegistryUuidPathFor(actorAddress:String) = self.actorAddressRegistryUuidPathFor(actorAddress) + override def getActorAddressRegistryUuidPathFor(actorAddress: String) = self.actorAddressRegistryUuidPathFor(actorAddress) override def getActorUuidRegistryNodePathFor(uuid: UUID) = self.actorUuidRegistryNodePathFor(uuid) - override def getActorUuidRegistryRemoteAddressPathFor(uuid: UUID)= self.actorUuidRegistryNodePathFor(uuid) + override def getActorUuidRegistryRemoteAddressPathFor(uuid: UUID) = self.actorUuidRegistryNodePathFor(uuid) override def getActorAddressToUuidsPathFor(actorAddress: String) = self.actorAddressToUuidsPathFor(actorAddress) @@ -1770,81 +1806,85 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { try { if (message.hasActorAddress) { val actorAddress = message.getActorAddress - cluster.serializerForActor(actorAddress) foreach { serializer ⇒ - cluster.use(actorAddress, serializer) foreach { newActorRef ⇒ - cluster.remoteService.register(actorAddress, newActorRef) + cluster.serializerForActor(actorAddress) foreach { + serializer ⇒ + cluster.use(actorAddress, serializer) foreach { + newActorRef ⇒ + cluster.remoteService.register(actorAddress, newActorRef) - if (message.hasReplicateActorFromUuid) { - // replication is used - fetch the messages and replay them - import akka.remote.protocol.RemoteProtocol._ - import akka.remote.MessageSerializer + if (message.hasReplicateActorFromUuid) { + // replication is used - fetch the messages and replay them + import akka.remote.protocol.RemoteProtocol._ + import akka.remote.MessageSerializer - val replicateFromUuid = uuidProtocolToUuid(message.getReplicateActorFromUuid) - val deployment = Deployer.deploymentFor(actorAddress) - val replicationScheme = DeploymentConfig.replicationSchemeFor(deployment).getOrElse( - throw new IllegalStateException( - "Actor [" + actorAddress + "] should have been configured as a replicated actor but could not find its ReplicationScheme")) - val isWriteBehind = DeploymentConfig.isWriteBehindReplication(replicationScheme) + val replicateFromUuid = uuidProtocolToUuid(message.getReplicateActorFromUuid) + val deployment = Deployer.deploymentFor(actorAddress) + val replicationScheme = DeploymentConfig.replicationSchemeFor(deployment).getOrElse( + throw new IllegalStateException( + "Actor [" + actorAddress + "] should have been configured as a replicated actor but could not find its ReplicationScheme")) + val isWriteBehind = DeploymentConfig.isWriteBehindReplication(replicationScheme) - try { - // get the transaction log for the actor UUID - val txLog = TransactionLog.logFor(replicateFromUuid.toString, isWriteBehind, replicationScheme) + try { + // get the transaction log for the actor UUID + val txLog = TransactionLog.logFor(replicateFromUuid.toString, isWriteBehind, replicationScheme) - // get the latest snapshot (Option[Array[Byte]]) and all the subsequent messages (Array[Byte]) - val (snapshotAsBytes, entriesAsBytes) = txLog.latestSnapshotAndSubsequentEntries + // get the latest snapshot (Option[Array[Byte]]) and all the subsequent messages (Array[Byte]) + val (snapshotAsBytes, entriesAsBytes) = txLog.latestSnapshotAndSubsequentEntries - // deserialize and restore actor snapshot - val actorRefToUseForReplay = - snapshotAsBytes match { + // deserialize and restore actor snapshot + val actorRefToUseForReplay = + snapshotAsBytes match { - // we have a new actor ref - the snapshot - case Some(bytes) ⇒ - // stop the new actor ref and use the snapshot instead - cluster.remoteService.unregister(actorAddress) + // we have a new actor ref - the snapshot + case Some(bytes) ⇒ + // stop the new actor ref and use the snapshot instead + cluster.remoteService.unregister(actorAddress) - // deserialize the snapshot actor ref and register it as remote actor - val uncompressedBytes = - if (Cluster.shouldCompressData) LZF.uncompress(bytes) - else bytes + // deserialize the snapshot actor ref and register it as remote actor + val uncompressedBytes = + if (Cluster.shouldCompressData) LZF.uncompress(bytes) + else bytes - val snapshotActorRef = fromBinary(uncompressedBytes, newActorRef.uuid).start() - cluster.remoteService.register(actorAddress, snapshotActorRef) + val snapshotActorRef = fromBinary(uncompressedBytes, newActorRef.uuid).start() + cluster.remoteService.register(actorAddress, snapshotActorRef) - // FIXME we should call 'stop()' here (to GC the actor), but can't since that will currently shut down the TransactionLog for this UUID - since both this actor and the new snapshotActorRef have the same UUID (which they should) - //newActorRef.stop() + // FIXME we should call 'stop()' here (to GC the actor), but can't since that will currently shut down the TransactionLog for this UUID - since both this actor and the new snapshotActorRef have the same UUID (which they should) + //newActorRef.stop() - snapshotActorRef + snapshotActorRef - // we have no snapshot - use the new actor ref - case None ⇒ - newActorRef + // we have no snapshot - use the new actor ref + case None ⇒ + newActorRef + } + + // deserialize the messages + val messages: Vector[AnyRef] = entriesAsBytes map { + bytes ⇒ + val messageBytes = + if (Cluster.shouldCompressData) LZF.uncompress(bytes) + else bytes + MessageSerializer.deserialize(MessageProtocol.parseFrom(messageBytes), None) + } + + EventHandler.info(this, "Replaying [%s] messages to actor [%s]".format(messages.size, actorAddress)) + + // replay all messages + messages foreach { + message ⇒ + EventHandler.debug(this, "Replaying message [%s] to actor [%s]".format(message, actorAddress)) + + // FIXME how to handle '?' messages? We can *not* replay them with the correct semantics. Should we: 1. Ignore/drop them and log warning? 2. Throw exception when about to log them? 3. Other? + actorRefToUseForReplay ! message + } + + } catch { + case e: Throwable ⇒ + EventHandler.error(e, this, e.toString) + throw e } - - // deserialize the messages - val messages: Vector[AnyRef] = entriesAsBytes map { bytes ⇒ - val messageBytes = - if (Cluster.shouldCompressData) LZF.uncompress(bytes) - else bytes - MessageSerializer.deserialize(MessageProtocol.parseFrom(messageBytes), None) } - - EventHandler.info(this, "Replaying [%s] messages to actor [%s]".format(messages.size, actorAddress)) - - // replay all messages - messages foreach { message ⇒ - EventHandler.debug(this, "Replaying message [%s] to actor [%s]".format(message, actorAddress)) - - // FIXME how to handle '?' messages? We can *not* replay them with the correct semantics. Should we: 1. Ignore/drop them and log warning? 2. Throw exception when about to log them? 3. Other? - actorRefToUseForReplay ! message - } - - } catch { - case e: Throwable ⇒ - EventHandler.error(e, this, e.toString) - throw e - } } - } } } else { EventHandler.error(this, "Actor 'address' is not defined, ignoring remote cluster daemon command [%s]".format(message)) @@ -1859,8 +1899,9 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { case RELEASE ⇒ if (message.hasActorUuid) { - cluster.actorAddressForUuid(uuidProtocolToUuid(message.getActorUuid)) foreach { address ⇒ - cluster.release(address) + cluster.actorAddressForUuid(uuidProtocolToUuid(message.getActorUuid)) foreach { + address ⇒ + cluster.release(address) } } else if (message.hasActorAddress) { cluster release message.getActorAddress @@ -1870,15 +1911,15 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { .format(message)) } - case START ⇒ cluster.start() + case START ⇒ cluster.start() - case STOP ⇒ cluster.shutdown() + case STOP ⇒ cluster.shutdown() case DISCONNECT ⇒ cluster.disconnect() - case RECONNECT ⇒ cluster.reconnect() + case RECONNECT ⇒ cluster.reconnect() - case RESIGN ⇒ cluster.resign() + case RESIGN ⇒ cluster.resign() case FAIL_OVER_CONNECTIONS ⇒ val (from, to) = payloadFor(message, classOf[(InetSocketAddress, InetSocketAddress)]) @@ -1942,7 +1983,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { private def payloadFor[T](message: RemoteDaemonMessageProtocol, clazz: Class[T]): T = { Serialization.deserialize(message.getPayload.toByteArray, clazz, None) match { - case Left(error) ⇒ throw error + case Left(error) ⇒ throw error case Right(instance) ⇒ instance.asInstanceOf[T] } }