Fixed broken support for automatic migration of actors residing on crashed node.

Also hardened the test for automatic migration of actors.

Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
Jonas Bonér 2011-06-27 18:57:48 +02:00
parent fefb902350
commit 82391e728c
5 changed files with 98 additions and 36 deletions

View file

@ -60,6 +60,8 @@ import com.google.protobuf.ByteString
/**
* JMX MBean for the cluster service.
*
* FIXME revisit the methods in this MBean interface, they are not up to date with new cluster API
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait ClusterNodeMBean {
@ -665,6 +667,9 @@ class DefaultClusterNode private[akka] (
// create ADDRESS -> UUIDs registry
ignore[ZkNodeExistsException](zkClient.createPersistent(actorAddressToUuidsPathFor(actorRef.address)))
ignore[ZkNodeExistsException](zkClient.createPersistent("%s/%s".format(actorAddressToUuidsPathFor(actorRef.address), uuid)))
// create NODE NAME -> UUID registry
ignore[ZkNodeExistsException](zkClient.createPersistent(actorAtNodePathFor(nodeAddress.nodeName, uuid)))
}
import RemoteClusterDaemon._
@ -1197,7 +1202,7 @@ class DefaultClusterNode private[akka] (
private def sendCommandToReplica(connection: ActorRef, command: RemoteDaemonMessageProtocol, async: Boolean = true) {
if (async) {
connection ! (command, remoteDaemonAckTimeout)
connection ! command
} else {
(connection ? (command, remoteDaemonAckTimeout)).as[Status] match {
@ -1300,8 +1305,8 @@ class DefaultClusterNode private[akka] (
/**
* Connect to all available replicas unless already connected).
*/
private def connectToAllNewlyArrivedMembershipNodesInCluster() {
membershipNodes foreach { node
private def connectToAllNewlyArrivedMembershipNodesInCluster(currentSetOfClusterNodes: Traversable[String] = membershipNodes) {
currentSetOfClusterNodes foreach { node
if ((node != Config.nodename)) { // no replica on the "home" node of the ref
if (!nodeConnections.contains(node)) { // only connect to each replica once
val addressOption = remoteSocketAddressForNode(node)
@ -1340,8 +1345,12 @@ class DefaultClusterNode private[akka] (
}
}
private[cluster] def remoteSocketAddressForNode(node: String): InetSocketAddress = {
zkClient.readData(membershipPathFor(node), new Stat).asInstanceOf[InetSocketAddress]
private[cluster] def remoteSocketAddressForNode(node: String): Option[InetSocketAddress] = {
try {
Some(zkClient.readData(membershipPathFor(node), new Stat).asInstanceOf[InetSocketAddress])
} catch {
case e: ZkNoNodeException None
}
}
private[cluster] def createActorsAtAddressPath() {
@ -1352,10 +1361,12 @@ class DefaultClusterNode private[akka] (
clusterActorRefs.values(from) foreach (_.failOver(from, to))
}
// FIXME makes use of automaticMigrationFromFailedNodes method, why is it not used?
private[cluster] def automaticMigrationFromFailedNodes(currentSetOfClusterNodes: List[String]) {
connectToAllNewlyArrivedMembershipNodesInCluster()
findFailedNodes(currentSetOfClusterNodes).foreach { failedNodeName
private[cluster] def migrateActorsOnFailedNodes(currentSetOfClusterNodes: List[String]) {
connectToAllNewlyArrivedMembershipNodesInCluster(currentSetOfClusterNodes)
val failedNodes = findFailedNodes(currentSetOfClusterNodes)
failedNodes.foreach { failedNodeName
val allNodes = locallyCachedMembershipNodes.toList
val myIndex = allNodes.indexWhere(_.endsWith(nodeAddress.nodeName))
@ -1363,11 +1374,11 @@ class DefaultClusterNode private[akka] (
// Migrate to the successor of the failed node (using a sorted circular list of the node names)
if ((failedNodeIndex == 0 && myIndex == locallyCachedMembershipNodes.size - 1) || // No leftmost successor exists, check the tail
(failedNodeIndex == myIndex + 1)) {
// Am I the leftmost successor?
(failedNodeIndex == myIndex + 1)) { // Am I the leftmost successor?
// Yes I am the node to migrate the actor to (can only be one in the cluster)
val actorUuidsForFailedNode = zkClient.getChildren(actorsAtNodePathFor(failedNodeName))
EventHandler.debug(this,
"Migrating actors from failed node [%s] to node [%s]: Actor UUIDs [%s]"
.format(failedNodeName, nodeAddress.nodeName, actorUuidsForFailedNode))
@ -1380,33 +1391,29 @@ class DefaultClusterNode private[akka] (
val actorAddressOption = actorAddressForUuid(uuidFrom(uuid))
if (actorAddressOption.isDefined) {
val actorAddress = actorAddressOption.get
migrateWithoutCheckingThatActorResidesOnItsHomeNode( // since the ephemeral node is already gone, so can't check
NodeAddress(nodeAddress.clusterName, failedNodeName), nodeAddress, actorAddress)
val serializer: Serializer = serializerForActor(actorAddress)
use(actorAddress, serializer) foreach { actor
// FIXME remove ugly reflection when we have 1.0 final which has 'fromBinary(byte, homeAddress)(format)'
//actor.homeAddress = remoteServerAddress
val homeAddress = classOf[LocalActorRef].getDeclaredField("homeAddress")
homeAddress.setAccessible(true)
homeAddress.set(actor, Some(remoteServerAddress))
remoteService.register(actorAddress, actor)
}
use(actorAddress, serializerForActor(actorAddress)) foreach (actor remoteService.register(actorAddress, actor))
}
}
// notify all available nodes that they should fail-over all connections from 'from' to 'to'
val from = nodeNameToAddress(failedNodeName)
val to = remoteServerAddress
Serialization.serialize((from, to)) match {
case Left(error) throw error
case Right(bytes)
val command = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(FAIL_OVER_CONNECTIONS)
.setPayload(ByteString.copyFrom(bytes))
.build
membershipNodes foreach { node
// FIXME now we are broadcasting to ALL nodes in the cluster even though a fraction might have a reference to the actors - should that be fixed?
currentSetOfClusterNodes foreach { node
nodeConnections.get(node) foreach {
case (_, connection) sendCommandToReplica(connection, command, async = true)
}
@ -1433,7 +1440,9 @@ class DefaultClusterNode private[akka] (
release(actorAddress)
ignore[ZkNodeExistsException](zkClient.createPersistent(actorRegistryNodePathFor(uuid)))
ignore[ZkNodeExistsException](zkClient.createEphemeral(actorRegistryNodePathFor(uuid, remoteSocketAddressForNode(to.nodeName))))
ignore[ZkNodeExistsException](zkClient.createEphemeral(actorRegistryNodePathFor(uuid,
remoteSocketAddressForNode(to.nodeName).getOrElse(throw new ClusterException("No remote address registered for [" + to.nodeName + "]")))))
ignore[ZkNodeExistsException](zkClient.createEphemeral(actorLocationsPathFor(uuid, to)))
ignore[ZkNodeExistsException](zkClient.createPersistent(actorAtNodePathFor(nodeAddress.nodeName, uuid)))
@ -1448,19 +1457,19 @@ class DefaultClusterNode private[akka] (
}
private[cluster] def findFailedNodes(nodes: List[String]): List[String] =
(locallyCachedMembershipNodes diff Set(nodes: _*)).toList
(locallyCachedMembershipNodes.toArray.toSet.asInstanceOf[Set[String]] diff Set(nodes: _*)).toList
private[cluster] def findNewlyConnectedMembershipNodes(nodes: List[String]): List[String] =
(Set(nodes: _*) diff locallyCachedMembershipNodes).toList
(Set(nodes: _*) diff locallyCachedMembershipNodes.toArray.toSet.asInstanceOf[Set[String]]).toList
private[cluster] def findNewlyDisconnectedMembershipNodes(nodes: List[String]): List[String] =
(locallyCachedMembershipNodes diff Set(nodes: _*)).toList
(locallyCachedMembershipNodes.toArray.toSet.asInstanceOf[Set[String]] diff Set(nodes: _*)).toList
private[cluster] def findNewlyConnectedAvailableNodes(nodes: List[String]): List[String] =
(Set(nodes: _*) diff locallyCachedMembershipNodes).toList
(Set(nodes: _*) diff locallyCachedMembershipNodes.toArray.toSet.asInstanceOf[Set[String]]).toList
private[cluster] def findNewlyDisconnectedAvailableNodes(nodes: List[String]): List[String] =
(locallyCachedMembershipNodes diff Set(nodes: _*)).toList
(locallyCachedMembershipNodes.toArray.toSet.asInstanceOf[Set[String]] diff Set(nodes: _*)).toList
private def createRootClusterNode() {
ignore[ZkNodeExistsException] {
@ -1573,6 +1582,9 @@ class MembershipChildListener(self: ClusterNode) extends IZkChildListener with E
if (!childList.isEmpty) EventHandler.debug(this,
"MembershipChildListener at [%s] has children [%s]"
.format(self.nodeAddress.nodeName, childList.mkString(" ")))
self.migrateActorsOnFailedNodes(currentChilds.toList)
self.findNewlyConnectedMembershipNodes(childList) foreach { name
self.remoteSocketAddressForNode(name) foreach (address self.nodeNameToAddress += (name -> address)) // update 'nodename-address' map
self.publish(NodeConnected(name))
@ -1772,6 +1784,9 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
}
private def payloadFor[T](message: RemoteDaemonMessageProtocol, clazz: Class[T]): T = {
Serialization.serialize(message.getPayload.toByteArray, Some(clazz)).asInstanceOf[T]
Serialization.deserialize(message.getPayload.toByteArray, clazz, None) match {
case Left(error) throw error
case Right(instance) instance.asInstanceOf[T]
}
}
}