ticket 1025

This commit is contained in:
Peter Veentjer 2011-07-15 09:55:45 +03:00
parent 5678692e66
commit 966f7d9297
2 changed files with 319 additions and 280 deletions

View file

@ -6,15 +6,14 @@ package akka.cluster
import org.apache.zookeeper._
import org.apache.zookeeper.Watcher.Event._
import org.apache.zookeeper.data.Stat
import org.apache.zookeeper.recipes.lock.{ WriteLock, LockListener }
import org.apache.zookeeper.recipes.lock.{WriteLock, LockListener}
import org.I0Itec.zkclient._
import org.I0Itec.zkclient.serialize._
import org.I0Itec.zkclient.exception._
import java.util.{ List JList }
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicReference }
import java.util.concurrent.{ CopyOnWriteArrayList, Callable, ConcurrentHashMap }
import java.util.{List JList}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
import java.net.InetSocketAddress
import javax.management.StandardMBean
@ -30,17 +29,17 @@ import Status._
import DeploymentConfig._
import akka.event.EventHandler
import akka.dispatch.{ Dispatchers, Future }
import akka.dispatch.{Dispatchers, Future}
import akka.remoteinterface._
import akka.routing.RouterType
import akka.config.{ Config, Supervision }
import akka.config.{Config, Supervision}
import Supervision._
import Config._
import akka.serialization.{ Serialization, Serializer, Compression, ActorSerialization }
import akka.serialization.{Serialization, Serializer, ActorSerialization}
import ActorSerialization._
import Compression.LZF
import akka.serialization.Compression.LZF
import akka.cluster.zookeeper._
import ChangeListener._
@ -50,6 +49,7 @@ import RemoteDaemonMessageType._
import com.eaio.uuid.UUID
import com.google.protobuf.ByteString
import java.util.concurrent.{CopyOnWriteArrayList, Callable, ConcurrentHashMap}
// FIXME add watch for each node that when the entry for the node is removed then the node shuts itself down
@ -84,7 +84,7 @@ trait ClusterNodeMBean {
def getMemberNodes: Array[String]
def getNodeAddres():NodeAddress
def getNodeAddres(): NodeAddress
def getLeaderLockName: String
@ -112,31 +112,31 @@ trait ClusterNodeMBean {
def getConfigElementKeys: Array[String]
def getMemberShipPathFor(node:String):String
def getMemberShipPathFor(node: String): String
def getConfigurationPathFor(key:String):String
def getConfigurationPathFor(key: String): String
def getActorAddresstoNodesPathFor(actorAddress:String):String
def getActorAddresstoNodesPathFor(actorAddress: String): String
def getActorAddressToNodesPathForWithNodeName(actorAddress:String, nodeName:String):String
def getActorAddressToNodesPathForWithNodeName(actorAddress: String, nodeName: String): String
def getNodeToUuidsPathFor(node:String):String
def getNodeToUuidsPathFor(node: String): String
def getNodeToUuidsPathFor(node:String, uuid:UUID):String
def getNodeToUuidsPathFor(node: String, uuid: UUID): String
def getActorAddressRegistryPathFor(actorAddress:String):String
def getActorAddressRegistryPathFor(actorAddress: String): String
def getActorAddressRegistrySerializerPathFor(actorAddress:String):String
def getActorAddressRegistrySerializerPathFor(actorAddress: String): String
def getActorAddressRegistryUuidPathFor(actorAddress:String):String
def getActorAddressRegistryUuidPathFor(actorAddress: String): String
def getActorUuidRegistryNodePathFor(uuid: UUID):String
def getActorUuidRegistryNodePathFor(uuid: UUID): String
def getActorUuidRegistryRemoteAddressPathFor(uuid: UUID):String
def getActorUuidRegistryRemoteAddressPathFor(uuid: UUID): String
def getActorAddressToUuidsPathFor(actorAddress: String):String
def getActorAddressToUuidsPathFor(actorAddress: String): String
def getActorAddressToUuidsPathForWithNodeName(actorAddress: String, uuid: UUID):String
def getActorAddressToUuidsPathForWithNodeName(actorAddress: String, uuid: UUID): String
}
/**
@ -181,17 +181,17 @@ object Cluster {
private def nodename: String = properties.get("akka.cluster.nodename") match {
case Some(uberride) uberride
case None Config.nodename
case None Config.nodename
}
private def hostname: String = properties.get("akka.cluster.hostname") match {
case Some(uberride) uberride
case None Config.hostname
case None Config.hostname
}
private def port: Int = properties.get("akka.cluster.port") match {
case Some(uberride) uberride.toInt
case None Config.remoteServerPort
case None Config.remoteServerPort
}
val defaultZooKeeperSerializer = new SerializableSerializer
@ -329,12 +329,12 @@ object Cluster {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class DefaultClusterNode private[akka] (
val nodeAddress: NodeAddress,
val hostname: String = Config.hostname,
val port: Int = Config.remoteServerPort,
val zkServerAddresses: String,
val serializer: ZkSerializer) extends ErrorHandler with ClusterNode {
class DefaultClusterNode private[akka](
val nodeAddress: NodeAddress,
val hostname: String = Config.hostname,
val port: Int = Config.remoteServerPort,
val zkServerAddresses: String,
val serializer: ZkSerializer) extends ErrorHandler with ClusterNode {
self
if ((hostname eq null) || hostname == "") throw new NullPointerException("Host name must not be null or empty string")
@ -349,7 +349,7 @@ class DefaultClusterNode private[akka] (
def receive = {
case RemoteClientError(cause, client, address) client.shutdownClientModule()
case RemoteClientDisconnected(client, address) client.shutdownClientModule()
case _ //ignore other
case _ //ignore other
}
}, "akka.cluster.RemoteClientLifeCycleListener").start()
@ -373,6 +373,8 @@ class DefaultClusterNode private[akka] (
lazy val remoteServerAddress: InetSocketAddress = remoteService.address
val isConnected = new Switch(false)
// static nodes
val CLUSTER_PATH = "/" + nodeAddress.clusterName
val MEMBERSHIP_PATH = CLUSTER_PATH + "/members"
@ -445,39 +447,65 @@ class DefaultClusterNode private[akka] (
// Node
// =======================================
def isRunning: Boolean = isConnected.isOn
def start(): ClusterNode = {
if (isConnected.compareAndSet(false, true)) {
isConnected.switchOn {
initializeNode()
}
this
}
private[cluster] def initializeNode() {
EventHandler.info(this,
("\nCreating cluster node with" +
"\n\tcluster name = [%s]" +
"\n\tnode name = [%s]" +
"\n\tport = [%s]" +
"\n\tzookeeper server addresses = [%s]" +
"\n\tserializer = [%s]")
.format(nodeAddress.clusterName, nodeAddress.nodeName, port, zkServerAddresses, serializer))
EventHandler.info(this, "Starting up remote server [%s]".format(remoteServerAddress.toString))
createZooKeeperPathStructureIfNeeded()
registerListeners()
joinCluster()
joinLeaderElection()
fetchMembershipNodes()
EventHandler.info(this, "Cluster node [%s] started successfully".format(nodeAddress))
}
def shutdown() {
if (isConnected.compareAndSet(true, false)) {
ignore[ZkNoNodeException](zkClient.deleteRecursive(membershipNodePath))
locallyCachedMembershipNodes.clear()
nodeConnections.toList.foreach({
case (_, (address, _))
Actor.remote.shutdownClientConnection(address) // shut down client connections
})
remoteService.shutdown() // shutdown server
remoteClientLifeCycleListener.stop()
remoteDaemon.stop()
// for monitoring remote listener
registry.local.actors.filter(remoteService.hasListener).foreach(_.stop())
nodeConnections.clear()
disconnect()
EventHandler.info(this, "Cluster node shut down [%s]".format(nodeAddress))
isConnected.switchOff {
shutdownNode()
}
}
private def shutdownNode() {
ignore[ZkNoNodeException](zkClient.deleteRecursive(membershipNodePath))
locallyCachedMembershipNodes.clear()
nodeConnections.toList.foreach({
case (_, (address, _))
Actor.remote.shutdownClientConnection(address) // shut down client connections
})
remoteService.shutdown() // shutdown server
remoteClientLifeCycleListener.stop()
remoteDaemon.stop()
// for monitoring remote listener
registry.local.actors.filter(remoteService.hasListener).foreach(_.stop())
nodeConnections.clear()
disconnect()
EventHandler.info(this, "Cluster node shut down [%s]".format(nodeAddress))
}
def disconnect(): ClusterNode = {
zkClient.unsubscribeAll()
zkClient.close()
@ -668,12 +696,12 @@ class DefaultClusterNode private[akka] (
* available durable store.
*/
def store(
actorAddress: String,
actorFactory: () ActorRef,
replicationFactor: Int,
replicationScheme: ReplicationScheme,
serializeMailbox: Boolean,
serializer: Serializer): ClusterNode = if (isConnected.get) {
actorAddress: String,
actorFactory: () ActorRef,
replicationFactor: Int,
replicationScheme: ReplicationScheme,
serializeMailbox: Boolean,
serializer: Serializer): ClusterNode = if (isConnected.isOn) {
val serializerClassName = serializer.getClass.getName
@ -704,7 +732,7 @@ class DefaultClusterNode private[akka] (
}
}
}) match {
case Left(path) path
case Left(path) path
case Right(exception) actorAddressRegistryPath
}
}
@ -749,7 +777,7 @@ class DefaultClusterNode private[akka] (
/**
* Is the actor with uuid clustered or not?
*/
def isClustered(actorAddress: String): Boolean = if (isConnected.get) {
def isClustered(actorAddress: String): Boolean = if (isConnected.isOn) {
zkClient.exists(actorAddressRegistryPathFor(actorAddress))
} else false
@ -761,7 +789,7 @@ class DefaultClusterNode private[akka] (
/**
* Is the actor with uuid in use or not?
*/
def isInUseOnNode(actorAddress: String, node: NodeAddress): Boolean = if (isConnected.get) {
def isInUseOnNode(actorAddress: String, node: NodeAddress): Boolean = if (isConnected.isOn) {
zkClient.exists(actorAddressToNodesPathFor(actorAddress, node.nodeName))
} else false
@ -775,7 +803,7 @@ class DefaultClusterNode private[akka] (
* Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available
* for remote access through lookup by its UUID.
*/
def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[LocalActorRef] = if (isConnected.get) {
def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[LocalActorRef] = if (isConnected.isOn) {
val nodeName = nodeAddress.nodeName
ignore[ZkNodeExistsException](zkClient.createEphemeral(actorAddressToNodesPathFor(actorAddress, nodeName)))
@ -791,7 +819,7 @@ class DefaultClusterNode private[akka] (
val actorFactory =
Serialization.deserialize(actorFactoryBytes, classOf[() LocalActorRef], None) match {
case Left(error) throw error
case Left(error) throw error
case Right(instance) instance.asInstanceOf[() LocalActorRef]
}
@ -860,7 +888,7 @@ class DefaultClusterNode private[akka] (
EventHandler.debug(this,
"Sending command to nodes [%s] for checking out actor [%s]".format(nodes.mkString(", "), actorAddress))
if (isConnected.get) {
if (isConnected.isOn) {
val builder = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(USE)
@ -871,11 +899,12 @@ class DefaultClusterNode private[akka] (
val command = builder.build
nodes foreach { node
nodeConnections.get(node) foreach {
case (_, connection)
sendCommandToNode(connection, command, async = false)
}
nodes foreach {
node
nodeConnections.get(node) foreach {
case (_, connection)
sendCommandToNode(connection, command, async = false)
}
}
}
}
@ -908,15 +937,16 @@ class DefaultClusterNode private[akka] (
// FIXME 'Cluster.release' needs to notify all existing ClusterActorRef's that are using the instance that it is no longer available. Then what to do? Should we even remove this method?
if (isConnected.get) {
if (isConnected.isOn) {
ignore[ZkNoNodeException](zkClient.delete(actorAddressToNodesPathFor(actorAddress, nodeAddress.nodeName)))
uuidsForActorAddress(actorAddress) foreach { uuid
EventHandler.debug(this,
"Releasing actor [%s] with UUID [%s] after usage".format(actorAddress, uuid))
uuidsForActorAddress(actorAddress) foreach {
uuid
EventHandler.debug(this,
"Releasing actor [%s] with UUID [%s] after usage".format(actorAddress, uuid))
ignore[ZkNoNodeException](zkClient.deleteRecursive(nodeToUuidsPathFor(nodeAddress.nodeName, uuid)))
ignore[ZkNoNodeException](zkClient.delete(actorUuidRegistryRemoteAddressPathFor(uuid)))
ignore[ZkNoNodeException](zkClient.deleteRecursive(nodeToUuidsPathFor(nodeAddress.nodeName, uuid)))
ignore[ZkNoNodeException](zkClient.delete(actorUuidRegistryRemoteAddressPathFor(uuid)))
}
}
}
@ -925,7 +955,7 @@ class DefaultClusterNode private[akka] (
* Releases (checking in) all actors with a specific address on all nodes in the cluster where the actor is in 'use'.
*/
private[akka] def releaseActorOnAllNodes(actorAddress: String) {
if (isConnected.get) {
if (isConnected.isOn) {
EventHandler.debug(this,
"Releasing (checking in) all actors with address [%s] on all nodes in cluster".format(actorAddress))
@ -934,10 +964,11 @@ class DefaultClusterNode private[akka] (
.setActorAddress(actorAddress)
.build
nodesForActorsInUseWithAddress(actorAddress) foreach { node
nodeConnections.get(node) foreach {
case (_, connection) sendCommandToNode(connection, command, async = true)
}
nodesForActorsInUseWithAddress(actorAddress) foreach {
node
nodeConnections.get(node) foreach {
case (_, connection) sendCommandToNode(connection, command, async = true)
}
}
}
}
@ -945,14 +976,16 @@ class DefaultClusterNode private[akka] (
/**
* Creates an ActorRef with a Router to a set of clustered actors.
*/
def ref(actorAddress: String, router: RouterType): ActorRef = if (isConnected.get) {
def ref(actorAddress: String, router: RouterType): ActorRef = if (isConnected.isOn) {
val addresses = addressesForActor(actorAddress)
EventHandler.debug(this,
"Checking out cluster actor ref with address [%s] and router [%s] on [%s] connected to [\n\t%s]"
.format(actorAddress, router, remoteServerAddress, addresses.map(_._2).mkString("\n\t")))
val actorRef = Router newRouter (router, addresses, actorAddress, Actor.TIMEOUT)
addresses foreach { case (_, address) clusterActorRefs.put(address, actorRef) }
addresses foreach {
case (_, address) clusterActorRefs.put(address, actorRef)
}
actorRef.start()
} else throw new ClusterException("Not connected to cluster")
@ -970,7 +1003,7 @@ class DefaultClusterNode private[akka] (
/**
* Returns the UUIDs of all actors registered in this cluster.
*/
private[akka] def uuidsForClusteredActors: Array[UUID] = if (isConnected.get) {
private[akka] def uuidsForClusteredActors: Array[UUID] = if (isConnected.isOn) {
zkClient.getChildren(ACTOR_UUID_REGISTRY_PATH).toList.map(new UUID(_)).toArray.asInstanceOf[Array[UUID]]
} else Array.empty[UUID]
@ -982,7 +1015,7 @@ class DefaultClusterNode private[akka] (
/**
* Returns the actor id for the actor with a specific UUID.
*/
private[akka] def actorAddressForUuid(uuid: UUID): Option[String] = if (isConnected.get) {
private[akka] def actorAddressForUuid(uuid: UUID): Option[String] = if (isConnected.isOn) {
try {
Some(zkClient.readData(actorUuidRegistryAddressPathFor(uuid)).asInstanceOf[String])
} catch {
@ -999,7 +1032,7 @@ class DefaultClusterNode private[akka] (
/**
* Returns the actor UUIDs for actor ID.
*/
private[akka] def uuidsForActorAddress(actorAddress: String): Array[UUID] = if (isConnected.get) {
private[akka] def uuidsForActorAddress(actorAddress: String): Array[UUID] = if (isConnected.isOn) {
try {
zkClient.getChildren(actorAddressToUuidsPathFor(actorAddress)).toList.toArray map {
case c: CharSequence new UUID(c)
@ -1012,7 +1045,7 @@ class DefaultClusterNode private[akka] (
/**
* Returns the node names of all actors in use with UUID.
*/
private[akka] def nodesForActorsInUseWithAddress(actorAddress: String): Array[String] = if (isConnected.get) {
private[akka] def nodesForActorsInUseWithAddress(actorAddress: String): Array[String] = if (isConnected.isOn) {
try {
zkClient.getChildren(actorAddressToNodesPathFor(actorAddress)).toList.toArray.asInstanceOf[Array[String]]
} catch {
@ -1023,7 +1056,7 @@ class DefaultClusterNode private[akka] (
/**
* Returns the UUIDs of all actors in use registered on a specific node.
*/
private[akka] def uuidsForActorsInUseOnNode(nodeName: String): Array[UUID] = if (isConnected.get) {
private[akka] def uuidsForActorsInUseOnNode(nodeName: String): Array[UUID] = if (isConnected.isOn) {
try {
zkClient.getChildren(nodeToUuidsPathFor(nodeName)).toList.toArray map {
case c: CharSequence new UUID(c)
@ -1036,7 +1069,7 @@ class DefaultClusterNode private[akka] (
/**
* Returns the addresses of all actors in use registered on a specific node.
*/
def addressesForActorsInUseOnNode(nodeName: String): Array[String] = if (isConnected.get) {
def addressesForActorsInUseOnNode(nodeName: String): Array[String] = if (isConnected.isOn) {
val uuids =
try {
zkClient.getChildren(nodeToUuidsPathFor(nodeName)).toList.toArray map {
@ -1059,7 +1092,8 @@ class DefaultClusterNode private[akka] (
case e: ZkNoNodeException throw new IllegalStateException("No serializer found for actor with address [%s]".format(actorAddress))
}
ReflectiveAccess.getClassFor(serializerClassName) match { // FIXME need to pass in a user provide class loader? Now using default in ReflectiveAccess.
ReflectiveAccess.getClassFor(serializerClassName) match {
// FIXME need to pass in a user provide class loader? Now using default in ReflectiveAccess.
case Right(clazz) clazz.newInstance.asInstanceOf[Serializer]
case Left(error)
EventHandler.error(error, this, "Could not load serializer class [%s] due to: %s".format(serializerClassName, error.toString))
@ -1183,7 +1217,7 @@ class DefaultClusterNode private[akka] (
}
}
}) match {
case Left(_) /* do nothing */
case Left(_) /* do nothing */
case Right(exception) throw exception
}
}
@ -1242,44 +1276,35 @@ class DefaultClusterNode private[akka] (
}
private[cluster] def membershipPathFor(node: String): String = "%s/%s".format(MEMBERSHIP_PATH, node)
private[cluster] def configurationPathFor(key: String): String = "%s/%s".format(CONFIGURATION_PATH, key)
private[cluster] def actorAddressToNodesPathFor(actorAddress: String): String = "%s/%s".format(ACTOR_ADDRESS_NODES_TO_PATH, actorAddress)
private[cluster] def actorAddressToNodesPathFor(actorAddress: String, nodeName: String): String = "%s/%s".format(actorAddressToNodesPathFor(actorAddress), nodeName)
private[cluster] def nodeToUuidsPathFor(node: String): String = "%s/%s".format(NODE_TO_ACTOR_UUIDS_PATH, node)
private[cluster] def nodeToUuidsPathFor(node: String, uuid: UUID): String = "%s/%s/%s".format(NODE_TO_ACTOR_UUIDS_PATH, node, uuid)
private[cluster] def actorAddressRegistryPathFor(actorAddress: String): String = "%s/%s".format(ACTOR_ADDRESS_REGISTRY_PATH, actorAddress)
private[cluster] def actorAddressRegistrySerializerPathFor(actorAddress: String): String = "%s/%s".format(actorAddressRegistryPathFor(actorAddress), "serializer")
private[cluster] def actorAddressRegistryUuidPathFor(actorAddress: String): String = "%s/%s".format(actorAddressRegistryPathFor(actorAddress), "uuid")
private[cluster] def actorUuidRegistryPathFor(uuid: UUID): String = "%s/%s".format(ACTOR_UUID_REGISTRY_PATH, uuid)
private[cluster] def actorUuidRegistryNodePathFor(uuid: UUID): String = "%s/%s".format(actorUuidRegistryPathFor(uuid), "node")
private[cluster] def actorUuidRegistryAddressPathFor(uuid: UUID): String = "%s/%s".format(actorUuidRegistryPathFor(uuid), "address")
private[cluster] def actorUuidRegistryRemoteAddressPathFor(uuid: UUID): String = "%s/%s".format(actorUuidRegistryPathFor(uuid), "remote-address")
private[cluster] def actorAddressToUuidsPathFor(actorAddress: String): String = "%s/%s".format(ACTOR_ADDRESS_TO_UUIDS_PATH, actorAddress.replace('.', '_'))
private[cluster] def actorAddressToUuidsPathFor(actorAddress: String, uuid: UUID): String = "%s/%s".format(actorAddressToUuidsPathFor(actorAddress), uuid)
private[cluster] def initializeNode() {
EventHandler.info(this,
("\nCreating cluster node with" +
"\n\tcluster name = [%s]" +
"\n\tnode name = [%s]" +
"\n\tport = [%s]" +
"\n\tzookeeper server addresses = [%s]" +
"\n\tserializer = [%s]")
.format(nodeAddress.clusterName, nodeAddress.nodeName, port, zkServerAddresses, serializer))
EventHandler.info(this, "Starting up remote server [%s]".format(remoteServerAddress.toString))
createZooKeeperPathStructureIfNeeded()
registerListeners()
joinCluster()
joinLeaderElection()
fetchMembershipNodes()
EventHandler.info(this, "Cluster node [%s] started successfully".format(nodeAddress))
}
/**
* Returns a random set with node names of size 'replicationFactor'.
@ -1295,7 +1320,8 @@ class DefaultClusterNode private[akka] (
"] is greater than the number of available nodeNames [" + nrOfClusterNodes + "]")
val preferredNodes =
if (actorAddress.isDefined) { // use 'preferred-nodes' in deployment config for the actor
if (actorAddress.isDefined) {
// use 'preferred-nodes' in deployment config for the actor
Deployer.deploymentFor(actorAddress.get) match {
case Deploy(_, _, Clustered(nodes, _, _))
nodes map (node DeploymentConfig.nodeNameFor(node)) take replicationFactor
@ -1350,13 +1376,16 @@ class DefaultClusterNode private[akka] (
* @returns a Map with the remote socket addresses to of disconnected node connections
*/
private[cluster] def connectToAllNewlyArrivedMembershipNodesInCluster(
newlyConnectedMembershipNodes: Traversable[String],
newlyDisconnectedMembershipNodes: Traversable[String]): Map[String, InetSocketAddress] = {
newlyConnectedMembershipNodes: Traversable[String],
newlyDisconnectedMembershipNodes: Traversable[String]): Map[String, InetSocketAddress] = {
// cache the disconnected connections in a map, needed for fail-over of these connections later
var disconnectedConnections = Map.empty[String, InetSocketAddress]
newlyDisconnectedMembershipNodes foreach { node
disconnectedConnections += (node -> (nodeConnections(node) match { case (address, _) address }))
newlyDisconnectedMembershipNodes foreach {
node
disconnectedConnections += (node -> (nodeConnections(node) match {
case (address, _) address
}))
}
if (connectToAllNewlyArrivedMembershipNodesInClusterLock.compareAndSet(false, true)) {
@ -1365,17 +1394,20 @@ class DefaultClusterNode private[akka] (
newlyDisconnectedMembershipNodes foreach (nodeConnections.remove(_))
// add connections newly arrived nodes
newlyConnectedMembershipNodes foreach { node
if (!nodeConnections.contains(node)) { // only connect to each replica once
newlyConnectedMembershipNodes foreach {
node
if (!nodeConnections.contains(node)) {
// only connect to each replica once
remoteSocketAddressForNode(node) foreach { address
EventHandler.debug(this,
"Setting up connection to node with nodename [%s] and address [%s]".format(node, address))
remoteSocketAddressForNode(node) foreach {
address
EventHandler.debug(this,
"Setting up connection to node with nodename [%s] and address [%s]".format(node, address))
val clusterDaemon = Actor.remote.actorFor(RemoteClusterDaemon.Address, address.getHostName, address.getPort).start()
nodeConnections.put(node, (address, clusterDaemon))
val clusterDaemon = Actor.remote.actorFor(RemoteClusterDaemon.Address, address.getHostName, address.getPort).start()
nodeConnections.put(node, (address, clusterDaemon))
}
}
}
}
} finally {
connectToAllNewlyArrivedMembershipNodesInClusterLock.set(false)
@ -1422,84 +1454,87 @@ class DefaultClusterNode private[akka] (
}
private[cluster] def migrateActorsOnFailedNodes(
failedNodes: List[String],
currentClusterNodes: List[String],
oldClusterNodes: List[String],
disconnectedConnections: Map[String, InetSocketAddress]) {
failedNodes: List[String],
currentClusterNodes: List[String],
oldClusterNodes: List[String],
disconnectedConnections: Map[String, InetSocketAddress]) {
failedNodes.foreach { failedNodeName
failedNodes.foreach {
failedNodeName
val failedNodeAddress = NodeAddress(nodeAddress.clusterName, failedNodeName)
val failedNodeAddress = NodeAddress(nodeAddress.clusterName, failedNodeName)
val myIndex = oldClusterNodes.indexWhere(_.endsWith(nodeAddress.nodeName))
val failedNodeIndex = oldClusterNodes.indexWhere(_ == failedNodeName)
val myIndex = oldClusterNodes.indexWhere(_.endsWith(nodeAddress.nodeName))
val failedNodeIndex = oldClusterNodes.indexWhere(_ == failedNodeName)
// Migrate to the successor of the failed node (using a sorted circular list of the node names)
if ((failedNodeIndex == 0 && myIndex == oldClusterNodes.size - 1) || // No leftmost successor exists, check the tail
(failedNodeIndex == myIndex + 1)) { // Am I the leftmost successor?
// Migrate to the successor of the failed node (using a sorted circular list of the node names)
if ((failedNodeIndex == 0 && myIndex == oldClusterNodes.size - 1) || // No leftmost successor exists, check the tail
(failedNodeIndex == myIndex + 1)) {
// Am I the leftmost successor?
// Takes the lead of migrating the actors. Not all to this node.
// All to this node except if the actor already resides here, then pick another node it is not already on.
// Takes the lead of migrating the actors. Not all to this node.
// All to this node except if the actor already resides here, then pick another node it is not already on.
// Yes I am the node to migrate the actor to (can only be one in the cluster)
val actorUuidsForFailedNode = zkClient.getChildren(nodeToUuidsPathFor(failedNodeName)).toList
// Yes I am the node to migrate the actor to (can only be one in the cluster)
val actorUuidsForFailedNode = zkClient.getChildren(nodeToUuidsPathFor(failedNodeName)).toList
actorUuidsForFailedNode.foreach { uuidAsString
EventHandler.debug(this,
"Cluster node [%s] has failed, migrating actor with UUID [%s] to [%s]"
.format(failedNodeName, uuidAsString, nodeAddress.nodeName))
actorUuidsForFailedNode.foreach {
uuidAsString
EventHandler.debug(this,
"Cluster node [%s] has failed, migrating actor with UUID [%s] to [%s]"
.format(failedNodeName, uuidAsString, nodeAddress.nodeName))
val uuid = uuidFrom(uuidAsString)
val actorAddress = actorAddressForUuid(uuid).getOrElse(
throw new IllegalStateException("No actor address found for UUID [" + uuidAsString + "]"))
val uuid = uuidFrom(uuidAsString)
val actorAddress = actorAddressForUuid(uuid).getOrElse(
throw new IllegalStateException("No actor address found for UUID [" + uuidAsString + "]"))
val migrateToNodeAddress =
if (isInUseOnNode(actorAddress)) {
// already in use on this node, pick another node to instantiate the actor on
val replicaNodesForActor = nodesForActorsInUseWithAddress(actorAddress)
val nodesAvailableForMigration = (currentClusterNodes.toSet diff failedNodes.toSet) diff replicaNodesForActor.toSet
val migrateToNodeAddress =
if (isInUseOnNode(actorAddress)) {
// already in use on this node, pick another node to instantiate the actor on
val replicaNodesForActor = nodesForActorsInUseWithAddress(actorAddress)
val nodesAvailableForMigration = (currentClusterNodes.toSet diff failedNodes.toSet) diff replicaNodesForActor.toSet
if (nodesAvailableForMigration.isEmpty) throw new ClusterException(
"Can not migrate actor to new node since there are not any available nodes left. " +
"(However, the actor already has >1 replica in cluster, so we are ok)")
if (nodesAvailableForMigration.isEmpty) throw new ClusterException(
"Can not migrate actor to new node since there are not any available nodes left. " +
"(However, the actor already has >1 replica in cluster, so we are ok)")
NodeAddress(nodeAddress.clusterName, nodesAvailableForMigration.head)
} else {
// actor is not in use on this node, migrate it here
nodeAddress
}
NodeAddress(nodeAddress.clusterName, nodesAvailableForMigration.head)
} else {
// actor is not in use on this node, migrate it here
nodeAddress
}
// if actor is replicated => pass along the UUID for the actor to replicate from (replay transaction log etc.)
val replicateFromUuid =
if (isReplicated(actorAddress)) Some(uuid)
else None
// if actor is replicated => pass along the UUID for the actor to replicate from (replay transaction log etc.)
val replicateFromUuid =
if (isReplicated(actorAddress)) Some(uuid)
else None
migrateWithoutCheckingThatActorResidesOnItsHomeNode(
failedNodeAddress,
migrateToNodeAddress,
actorAddress,
replicateFromUuid)
migrateWithoutCheckingThatActorResidesOnItsHomeNode(
failedNodeAddress,
migrateToNodeAddress,
actorAddress,
replicateFromUuid)
}
// notify all available nodes that they should fail-over all connections from 'from' to 'to'
val from = disconnectedConnections(failedNodeName)
val to = remoteServerAddress
Serialization.serialize((from, to)) match {
case Left(error) throw error
case Right(bytes)
val command = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(FAIL_OVER_CONNECTIONS)
.setPayload(ByteString.copyFrom(bytes))
.build
// FIXME now we are broadcasting to ALL nodes in the cluster even though a fraction might have a reference to the actors - should that be fixed?
nodeConnections.values foreach {
case (_, connection) sendCommandToNode(connection, command, async = true)
}
}
}
// notify all available nodes that they should fail-over all connections from 'from' to 'to'
val from = disconnectedConnections(failedNodeName)
val to = remoteServerAddress
Serialization.serialize((from, to)) match {
case Left(error) throw error
case Right(bytes)
val command = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(FAIL_OVER_CONNECTIONS)
.setPayload(ByteString.copyFrom(bytes))
.build
// FIXME now we are broadcasting to ALL nodes in the cluster even though a fraction might have a reference to the actors - should that be fixed?
nodeConnections.values foreach {
case (_, connection) sendCommandToNode(connection, command, async = true)
}
}
}
}
}
@ -1507,7 +1542,7 @@ class DefaultClusterNode private[akka] (
* Used when the ephemeral "home" node is already gone, so we can't check if it is available.
*/
private def migrateWithoutCheckingThatActorResidesOnItsHomeNode(
from: NodeAddress, to: NodeAddress, actorAddress: String, replicateFromUuid: Option[UUID]) {
from: NodeAddress, to: NodeAddress, actorAddress: String, replicateFromUuid: Option[UUID]) {
EventHandler.debug(this, "Migrating actor [%s] from node [%s] to node [%s]".format(actorAddress, from, to))
if (!isInUseOnNode(actorAddress, to)) {
@ -1533,16 +1568,17 @@ class DefaultClusterNode private[akka] (
EventHandler.info(this, "Created node [%s]".format(CLUSTER_PATH))
}
basePaths.foreach { path
try {
ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT))
EventHandler.debug(this, "Created node [%s]".format(path))
} catch {
case e
val error = new ClusterException(e.toString)
EventHandler.error(error, this)
throw error
}
basePaths.foreach {
path
try {
ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT))
EventHandler.debug(this, "Created node [%s]".format(path))
} catch {
case e
val error = new ClusterException(e.toString)
EventHandler.error(error, this)
throw error
}
}
}
@ -1578,7 +1614,7 @@ class DefaultClusterNode private[akka] (
override def resign() = self.resign()
override def isConnected = self.isConnected.get
override def isConnected = self.isConnected.isOn
override def getNodeAddres = self.nodeAddress
@ -1620,27 +1656,27 @@ class DefaultClusterNode private[akka] (
override def getConfigElementKeys = self.getConfigElementKeys.toArray
override def getMemberShipPathFor(node:String) = self.membershipPathFor(node)
override def getMemberShipPathFor(node: String) = self.membershipPathFor(node)
override def getConfigurationPathFor(key:String) = self.configurationPathFor(key)
override def getConfigurationPathFor(key: String) = self.configurationPathFor(key)
override def getActorAddresstoNodesPathFor(actorAddress:String) = self.actorAddressToNodesPathFor(actorAddress)
override def getActorAddresstoNodesPathFor(actorAddress: String) = self.actorAddressToNodesPathFor(actorAddress)
override def getActorAddressToNodesPathForWithNodeName(actorAddress:String, nodeName:String) = self.actorAddressToNodesPathFor(actorAddress, nodeName)
override def getActorAddressToNodesPathForWithNodeName(actorAddress: String, nodeName: String) = self.actorAddressToNodesPathFor(actorAddress, nodeName)
override def getNodeToUuidsPathFor(node:String) = self.nodeToUuidsPathFor(node)
override def getNodeToUuidsPathFor(node: String) = self.nodeToUuidsPathFor(node)
override def getNodeToUuidsPathFor(node:String, uuid:UUID) = self.nodeToUuidsPathFor(node, uuid)
override def getNodeToUuidsPathFor(node: String, uuid: UUID) = self.nodeToUuidsPathFor(node, uuid)
override def getActorAddressRegistryPathFor(actorAddress:String) = self.actorAddressRegistryPathFor(actorAddress)
override def getActorAddressRegistryPathFor(actorAddress: String) = self.actorAddressRegistryPathFor(actorAddress)
override def getActorAddressRegistrySerializerPathFor(actorAddress:String) = self.actorAddressRegistrySerializerPathFor(actorAddress)
override def getActorAddressRegistrySerializerPathFor(actorAddress: String) = self.actorAddressRegistrySerializerPathFor(actorAddress)
override def getActorAddressRegistryUuidPathFor(actorAddress:String) = self.actorAddressRegistryUuidPathFor(actorAddress)
override def getActorAddressRegistryUuidPathFor(actorAddress: String) = self.actorAddressRegistryUuidPathFor(actorAddress)
override def getActorUuidRegistryNodePathFor(uuid: UUID) = self.actorUuidRegistryNodePathFor(uuid)
override def getActorUuidRegistryRemoteAddressPathFor(uuid: UUID)= self.actorUuidRegistryNodePathFor(uuid)
override def getActorUuidRegistryRemoteAddressPathFor(uuid: UUID) = self.actorUuidRegistryNodePathFor(uuid)
override def getActorAddressToUuidsPathFor(actorAddress: String) = self.actorAddressToUuidsPathFor(actorAddress)
@ -1770,81 +1806,85 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
try {
if (message.hasActorAddress) {
val actorAddress = message.getActorAddress
cluster.serializerForActor(actorAddress) foreach { serializer
cluster.use(actorAddress, serializer) foreach { newActorRef
cluster.remoteService.register(actorAddress, newActorRef)
cluster.serializerForActor(actorAddress) foreach {
serializer
cluster.use(actorAddress, serializer) foreach {
newActorRef
cluster.remoteService.register(actorAddress, newActorRef)
if (message.hasReplicateActorFromUuid) {
// replication is used - fetch the messages and replay them
import akka.remote.protocol.RemoteProtocol._
import akka.remote.MessageSerializer
if (message.hasReplicateActorFromUuid) {
// replication is used - fetch the messages and replay them
import akka.remote.protocol.RemoteProtocol._
import akka.remote.MessageSerializer
val replicateFromUuid = uuidProtocolToUuid(message.getReplicateActorFromUuid)
val deployment = Deployer.deploymentFor(actorAddress)
val replicationScheme = DeploymentConfig.replicationSchemeFor(deployment).getOrElse(
throw new IllegalStateException(
"Actor [" + actorAddress + "] should have been configured as a replicated actor but could not find its ReplicationScheme"))
val isWriteBehind = DeploymentConfig.isWriteBehindReplication(replicationScheme)
val replicateFromUuid = uuidProtocolToUuid(message.getReplicateActorFromUuid)
val deployment = Deployer.deploymentFor(actorAddress)
val replicationScheme = DeploymentConfig.replicationSchemeFor(deployment).getOrElse(
throw new IllegalStateException(
"Actor [" + actorAddress + "] should have been configured as a replicated actor but could not find its ReplicationScheme"))
val isWriteBehind = DeploymentConfig.isWriteBehindReplication(replicationScheme)
try {
// get the transaction log for the actor UUID
val txLog = TransactionLog.logFor(replicateFromUuid.toString, isWriteBehind, replicationScheme)
try {
// get the transaction log for the actor UUID
val txLog = TransactionLog.logFor(replicateFromUuid.toString, isWriteBehind, replicationScheme)
// get the latest snapshot (Option[Array[Byte]]) and all the subsequent messages (Array[Byte])
val (snapshotAsBytes, entriesAsBytes) = txLog.latestSnapshotAndSubsequentEntries
// get the latest snapshot (Option[Array[Byte]]) and all the subsequent messages (Array[Byte])
val (snapshotAsBytes, entriesAsBytes) = txLog.latestSnapshotAndSubsequentEntries
// deserialize and restore actor snapshot
val actorRefToUseForReplay =
snapshotAsBytes match {
// deserialize and restore actor snapshot
val actorRefToUseForReplay =
snapshotAsBytes match {
// we have a new actor ref - the snapshot
case Some(bytes)
// stop the new actor ref and use the snapshot instead
cluster.remoteService.unregister(actorAddress)
// we have a new actor ref - the snapshot
case Some(bytes)
// stop the new actor ref and use the snapshot instead
cluster.remoteService.unregister(actorAddress)
// deserialize the snapshot actor ref and register it as remote actor
val uncompressedBytes =
if (Cluster.shouldCompressData) LZF.uncompress(bytes)
else bytes
// deserialize the snapshot actor ref and register it as remote actor
val uncompressedBytes =
if (Cluster.shouldCompressData) LZF.uncompress(bytes)
else bytes
val snapshotActorRef = fromBinary(uncompressedBytes, newActorRef.uuid).start()
cluster.remoteService.register(actorAddress, snapshotActorRef)
val snapshotActorRef = fromBinary(uncompressedBytes, newActorRef.uuid).start()
cluster.remoteService.register(actorAddress, snapshotActorRef)
// FIXME we should call 'stop()' here (to GC the actor), but can't since that will currently shut down the TransactionLog for this UUID - since both this actor and the new snapshotActorRef have the same UUID (which they should)
//newActorRef.stop()
// FIXME we should call 'stop()' here (to GC the actor), but can't since that will currently shut down the TransactionLog for this UUID - since both this actor and the new snapshotActorRef have the same UUID (which they should)
//newActorRef.stop()
snapshotActorRef
snapshotActorRef
// we have no snapshot - use the new actor ref
case None
newActorRef
// we have no snapshot - use the new actor ref
case None
newActorRef
}
// deserialize the messages
val messages: Vector[AnyRef] = entriesAsBytes map {
bytes
val messageBytes =
if (Cluster.shouldCompressData) LZF.uncompress(bytes)
else bytes
MessageSerializer.deserialize(MessageProtocol.parseFrom(messageBytes), None)
}
EventHandler.info(this, "Replaying [%s] messages to actor [%s]".format(messages.size, actorAddress))
// replay all messages
messages foreach {
message
EventHandler.debug(this, "Replaying message [%s] to actor [%s]".format(message, actorAddress))
// FIXME how to handle '?' messages? We can *not* replay them with the correct semantics. Should we: 1. Ignore/drop them and log warning? 2. Throw exception when about to log them? 3. Other?
actorRefToUseForReplay ! message
}
} catch {
case e: Throwable
EventHandler.error(e, this, e.toString)
throw e
}
// deserialize the messages
val messages: Vector[AnyRef] = entriesAsBytes map { bytes
val messageBytes =
if (Cluster.shouldCompressData) LZF.uncompress(bytes)
else bytes
MessageSerializer.deserialize(MessageProtocol.parseFrom(messageBytes), None)
}
EventHandler.info(this, "Replaying [%s] messages to actor [%s]".format(messages.size, actorAddress))
// replay all messages
messages foreach { message
EventHandler.debug(this, "Replaying message [%s] to actor [%s]".format(message, actorAddress))
// FIXME how to handle '?' messages? We can *not* replay them with the correct semantics. Should we: 1. Ignore/drop them and log warning? 2. Throw exception when about to log them? 3. Other?
actorRefToUseForReplay ! message
}
} catch {
case e: Throwable
EventHandler.error(e, this, e.toString)
throw e
}
}
}
}
} else {
EventHandler.error(this, "Actor 'address' is not defined, ignoring remote cluster daemon command [%s]".format(message))
@ -1859,8 +1899,9 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
case RELEASE
if (message.hasActorUuid) {
cluster.actorAddressForUuid(uuidProtocolToUuid(message.getActorUuid)) foreach { address
cluster.release(address)
cluster.actorAddressForUuid(uuidProtocolToUuid(message.getActorUuid)) foreach {
address
cluster.release(address)
}
} else if (message.hasActorAddress) {
cluster release message.getActorAddress
@ -1870,15 +1911,15 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
.format(message))
}
case START cluster.start()
case START cluster.start()
case STOP cluster.shutdown()
case STOP cluster.shutdown()
case DISCONNECT cluster.disconnect()
case RECONNECT cluster.reconnect()
case RECONNECT cluster.reconnect()
case RESIGN cluster.resign()
case RESIGN cluster.resign()
case FAIL_OVER_CONNECTIONS
val (from, to) = payloadFor(message, classOf[(InetSocketAddress, InetSocketAddress)])
@ -1942,7 +1983,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
private def payloadFor[T](message: RemoteDaemonMessageProtocol, clazz: Class[T]): T = {
Serialization.deserialize(message.getPayload.toByteArray, clazz, None) match {
case Left(error) throw error
case Left(error) throw error
case Right(instance) instance.asInstanceOf[T]
}
}