Fixed problems with actor migration in cluster and added tests for explicit actor migration through API

This commit is contained in:
Jonas Bonér 2011-06-26 09:49:03 +02:00
parent 8e4bcb304a
commit a0abd5ef57
14 changed files with 227 additions and 278 deletions

View file

@ -164,14 +164,14 @@ object Cluster {
/**
* The node address.
*/
val nodeAddress = NodeAddress(name, nodename, hostname, port)
val nodeAddress = NodeAddress(name, nodename)
/**
* The reference to the running ClusterNode.
*/
val node = {
if (nodeAddress eq null) throw new IllegalArgumentException("NodeAddress can't be null")
new DefaultClusterNode(nodeAddress, zooKeeperServers, defaultSerializer)
new DefaultClusterNode(nodeAddress, hostname, port, zooKeeperServers, defaultSerializer)
}
/**
@ -273,13 +273,17 @@ object Cluster {
*/
class DefaultClusterNode private[akka] (
val nodeAddress: NodeAddress,
val hostname: String = Config.hostname,
val port: Int = Config.remoteServerPort,
val zkServerAddresses: String,
val serializer: ZkSerializer) extends ErrorHandler with ClusterNode {
self
if ((hostname eq null) || hostname == "") throw new NullPointerException("Host name must not be null or empty string")
if (port < 1) throw new NullPointerException("Port can not be negative")
if (nodeAddress eq null) throw new IllegalArgumentException("'nodeAddress' can not be 'null'")
val clusterJmxObjectName = JMX.nameFor(nodeAddress.hostname, "monitoring", "cluster")
val clusterJmxObjectName = JMX.nameFor(hostname, "monitoring", "cluster")
import Cluster._
@ -303,7 +307,7 @@ class DefaultClusterNode private[akka] (
lazy val remoteService: RemoteSupport = {
val remote = new akka.remote.netty.NettyRemoteSupport
remote.start(nodeAddress.hostname, nodeAddress.port)
remote.start(hostname, port)
remote.register(RemoteClusterDaemon.ADDRESS, remoteDaemon)
remote.addListener(remoteClientLifeCycleListener)
remote
@ -336,7 +340,7 @@ class DefaultClusterNode private[akka] (
def membershipNodes: Array[String] = locallyCachedMembershipNodes.toList.toArray.asInstanceOf[Array[String]]
private[akka] val replicaConnections: ConcurrentMap[String, Tuple2[InetSocketAddress, ActorRef]] =
private[akka] val nodeConnections: ConcurrentMap[String, Tuple2[InetSocketAddress, ActorRef]] =
new ConcurrentHashMap[String, Tuple2[InetSocketAddress, ActorRef]]
// zookeeper listeners
@ -388,7 +392,7 @@ class DefaultClusterNode private[akka] (
locallyCachedMembershipNodes.clear()
locallyCheckedOutActors.clear()
replicaConnections.toList.foreach({
nodeConnections.toList.foreach({
case (_, (address, _))
Actor.remote.shutdownClientConnection(address) // shut down client connections
})
@ -401,7 +405,7 @@ class DefaultClusterNode private[akka] (
// for monitoring remote listener
registry.local.actors.filter(remoteService.hasListener).foreach(_.stop())
replicaConnections.clear()
nodeConnections.clear()
disconnect()
EventHandler.info(this, "Cluster node shut down [%s]".format(nodeAddress))
@ -670,23 +674,7 @@ class DefaultClusterNode private[akka] (
.setActorUuid(uuidToUuidProtocol(uuid))
.build
replicaConnectionsForReplicationFactor(replicationFactor) foreach { connection
(connection ? (command, remoteDaemonAckTimeout)).as[Status] match {
case Some(Success)
EventHandler.debug(this, "Replica for [%s] successfully created".format(actorRef.address))
case Some(Failure(cause))
EventHandler.error(cause, this, cause.toString)
throw cause
case None
val error = new ClusterException(
"Operation to instantiate replicas throughout the cluster timed out")
EventHandler.error(error, this, error.toString)
throw error
}
}
nodeConnectionsForReplicationFactor(replicationFactor) foreach { connection sendCommandToReplica(connection, command, async = false) }
this
} else throw new ClusterException("Not connected to cluster")
@ -804,30 +792,36 @@ class DefaultClusterNode private[akka] (
EventHandler.debug(this,
"Using (checking out) all actors with UUID [%s] on all nodes in cluster".format(uuid))
connectToAllNewlyArrivedMembershipNodesInCluster()
val command = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(USE)
.setActorUuid(uuidToUuidProtocol(uuid))
.build
membershipNodes foreach { node
replicaConnections.get(node) foreach {
case (_, connection) connection ! command
nodeConnections.get(node) foreach {
case (_, connection) sendCommandToReplica(connection, command, async = false)
}
}
}
}
/**
* Using (checking out) specific UUID on a specefic node.
* Using (checking out) specific UUID on a specific node.
*/
def useActorOnNode(node: String, uuid: UUID) {
isConnected ifOn {
replicaConnections.get(node) foreach {
connectToAllNewlyArrivedMembershipNodesInCluster()
nodeConnections.get(node) foreach {
case (_, connection)
connection ! RemoteDaemonMessageProtocol.newBuilder
val command = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(USE)
.setActorUuid(uuidToUuidProtocol(uuid))
.build
sendCommandToReplica(connection, command, async = false)
}
}
}
@ -866,14 +860,17 @@ class DefaultClusterNode private[akka] (
isConnected ifOn {
EventHandler.debug(this,
"Releasing (checking in) all actors with UUID [%s] on all nodes in cluster".format(uuid))
connectToAllNewlyArrivedMembershipNodesInCluster()
val command = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(RELEASE)
.setActorUuid(uuidToUuidProtocol(uuid))
.build
nodesForActorsInUseWithUuid(uuid) foreach { node
replicaConnections.get(node) foreach {
case (_, connection)
connection ! command
nodeConnections.get(node) foreach {
case (_, connection) sendCommandToReplica(connection, command, async = true)
}
}
}
@ -1084,7 +1081,7 @@ class DefaultClusterNode private[akka] (
.setMessageType(FUNCTION_FUN0_UNIT)
.setPayload(ByteString.copyFrom(bytes))
.build
replicaConnectionsForReplicationFactor(replicationFactor) foreach (_ ! message)
nodeConnectionsForReplicationFactor(replicationFactor) foreach (_ ! message)
}
}
@ -1100,7 +1097,7 @@ class DefaultClusterNode private[akka] (
.setMessageType(FUNCTION_FUN0_ANY)
.setPayload(ByteString.copyFrom(bytes))
.build
val results = replicaConnectionsForReplicationFactor(replicationFactor) map (_ ? message)
val results = nodeConnectionsForReplicationFactor(replicationFactor) map (_ ? message)
results.toList.asInstanceOf[List[Future[Any]]]
}
}
@ -1117,7 +1114,7 @@ class DefaultClusterNode private[akka] (
.setMessageType(FUNCTION_FUN1_ARG_UNIT)
.setPayload(ByteString.copyFrom(bytes))
.build
replicaConnectionsForReplicationFactor(replicationFactor) foreach (_ ! message)
nodeConnectionsForReplicationFactor(replicationFactor) foreach (_ ! message)
}
}
@ -1134,7 +1131,7 @@ class DefaultClusterNode private[akka] (
.setMessageType(FUNCTION_FUN1_ARG_ANY)
.setPayload(ByteString.copyFrom(bytes))
.build
val results = replicaConnectionsForReplicationFactor(replicationFactor) map (_ ? message)
val results = nodeConnectionsForReplicationFactor(replicationFactor) map (_ ? message)
results.toList.asInstanceOf[List[Future[Any]]]
}
}
@ -1192,12 +1189,37 @@ class DefaultClusterNode private[akka] (
}
}
/**
* Returns a list with all config element keys.
*/
def getConfigElementKeys: Array[String] = zkClient.getChildren(CONFIGURATION_PATH).toList.toArray.asInstanceOf[Array[String]]
// =======================================
// Private
// =======================================
private def sendCommandToReplica(connection: ActorRef, command: RemoteDaemonMessageProtocol, async: Boolean = true) {
if (async) {
connection ! (command, remoteDaemonAckTimeout)
} else {
(connection ? (command, remoteDaemonAckTimeout)).as[Status] match {
case Some(Success)
EventHandler.debug(this, "Replica for [%s] successfully created".format(connection.address))
case Some(Failure(cause))
EventHandler.error(cause, this, cause.toString)
throw cause
case None
val error = new ClusterException(
"Operation to instantiate replicas throughout the cluster timed out")
EventHandler.error(error, this, error.toString)
throw error
}
}
}
private[cluster] def membershipPathFor(node: String) = "%s/%s".format(MEMBERSHIP_PATH, node)
private[cluster] def configurationPathFor(key: String) = "%s/%s".format(CONFIGURATION_PATH, key)
@ -1232,32 +1254,18 @@ class DefaultClusterNode private[akka] (
"\n\tport = [%s]" +
"\n\tzookeeper server addresses = [%s]" +
"\n\tserializer = [%s]")
.format(nodeAddress.clusterName, nodeAddress.nodeName, nodeAddress.port, zkServerAddresses, serializer))
.format(nodeAddress.clusterName, nodeAddress.nodeName, port, zkServerAddresses, serializer))
EventHandler.info(this, "Starting up remote server [%s]".format(remoteServerAddress.toString))
createRootClusterNode()
val isLeader = joinLeaderElection()
if (isLeader) createNodeStructureIfNeeded()
registerListeners()
joinMembershipPath()
joinActorsAtAddressPath()
joinCluster()
createActorsAtAddressPath()
fetchMembershipNodes()
EventHandler.info(this, "Cluster node [%s] started successfully".format(nodeAddress))
}
private[cluster] def addressForNode(node: String): Option[InetSocketAddress] = {
try {
val address = zkClient.readData(membershipPathFor(node)).asInstanceOf[String]
val tokenizer = new java.util.StringTokenizer(address, ":")
tokenizer.nextToken // cluster name
tokenizer.nextToken // node name
val hostname = tokenizer.nextToken // hostname
val port = tokenizer.nextToken.toInt // port
Some(new InetSocketAddress(hostname, port))
} catch {
case e: ZkNoNodeException None
}
}
private def actorUuidsForActorAddress(actorAddress: String): Array[UUID] =
uuidsForActorAddress(actorAddress) filter (_ ne null)
@ -1265,14 +1273,14 @@ class DefaultClusterNode private[akka] (
* Returns a random set with replica connections of size 'replicationFactor'.
* Default replicationFactor is 0, which returns the empty set.
*/
private def replicaConnectionsForReplicationFactor(replicationFactor: Int = 0): Set[ActorRef] = {
private def nodeConnectionsForReplicationFactor(replicationFactor: Int = 0): Set[ActorRef] = {
var replicas = HashSet.empty[ActorRef]
if (replicationFactor < 1) return replicas
connectToAllMembershipNodesInCluster()
connectToAllNewlyArrivedMembershipNodesInCluster()
val numberOfReplicas = replicaConnections.size
val replicaConnectionsAsArray = replicaConnections.toList map {
val numberOfReplicas = nodeConnections.size
val nodeConnectionsAsArray = nodeConnections.toList map {
case (node, (address, actorRef)) actorRef
} // the ActorRefs
@ -1281,12 +1289,12 @@ class DefaultClusterNode private[akka] (
"Replication factor [" + replicationFactor +
"] is greater than the number of available nodes [" + numberOfReplicas + "]")
} else if (numberOfReplicas == replicationFactor) {
replicas = replicas ++ replicaConnectionsAsArray
replicas = replicas ++ nodeConnectionsAsArray
} else {
val random = new java.util.Random(System.currentTimeMillis)
while (replicas.size < replicationFactor) {
val index = random.nextInt(numberOfReplicas)
replicas = replicas + replicaConnectionsAsArray(index)
replicas = replicas + nodeConnectionsAsArray(index)
}
}
replicas
@ -1295,29 +1303,29 @@ class DefaultClusterNode private[akka] (
/**
* Connect to all available replicas unless already connected).
*/
private def connectToAllMembershipNodesInCluster() {
private def connectToAllNewlyArrivedMembershipNodesInCluster() {
membershipNodes foreach { node
if ((node != Config.nodename)) { // no replica on the "home" node of the ref
if (!replicaConnections.contains(node)) { // only connect to each replica once
val addressOption = addressForNode(node)
if (!nodeConnections.contains(node)) { // only connect to each replica once
val addressOption = remoteSocketAddressForNode(node)
if (addressOption.isDefined) {
val address = addressOption.get
EventHandler.debug(this,
"Connecting to replica with nodename [%s] and address [%s]".format(node, address))
"Setting up connection to node with nodename [%s] and address [%s]".format(node, address))
val clusterDaemon = Actor.remote.actorFor(RemoteClusterDaemon.ADDRESS, address.getHostName, address.getPort)
replicaConnections.put(node, (address, clusterDaemon))
nodeConnections.put(node, (address, clusterDaemon))
}
}
}
}
}
private[cluster] def joinMembershipPath() {
private[cluster] def joinCluster() {
nodeNameToAddress += (nodeAddress.nodeName -> remoteServerAddress)
try {
EventHandler.info(this,
"Joining cluster as membership node [%s] on [%s]".format(nodeAddress, membershipNodePath))
zkClient.createEphemeral(membershipNodePath, nodeAddress.toString)
zkClient.createEphemeral(membershipNodePath, remoteServerAddress)
} catch {
case e: ZkNodeExistsException
val error = new ClusterException("Can't join the cluster. The node name [" + nodeAddress.nodeName + "] is already in by another node")
@ -1326,10 +1334,6 @@ class DefaultClusterNode private[akka] (
}
}
private[cluster] def joinActorsAtAddressPath() {
ignore[ZkNodeExistsException](zkClient.createPersistent(actorsAtNodePathFor(nodeAddress.nodeName)))
}
private[cluster] def joinLeaderElection(): Boolean = {
EventHandler.info(this, "Node [%s] is joining leader election".format(nodeAddress.nodeName))
try {
@ -1339,12 +1343,22 @@ class DefaultClusterNode private[akka] (
}
}
private[cluster] def remoteSocketAddressForNode(node: String): InetSocketAddress = {
zkClient.readData(membershipPathFor(node), new Stat).asInstanceOf[InetSocketAddress]
}
private[cluster] def createActorsAtAddressPath() {
ignore[ZkNodeExistsException](zkClient.createPersistent(actorsAtNodePathFor(nodeAddress.nodeName)))
}
private[cluster] def failOverConnections(from: InetSocketAddress, to: InetSocketAddress) {
clusterActorRefs.values(from) foreach (_.failOver(from, to))
}
private[cluster] def migrateFromFailedNodes[T <: Actor](currentSetOfClusterNodes: List[String]) = {
findFailedNodes(currentSetOfClusterNodes).foreach { failedNodeName
// FIXME makes use of automaticMigrationFromFailedNodes method, why is it not used?
private[cluster] def automaticMigrationFromFailedNodes() {
connectToAllNewlyArrivedMembershipNodesInCluster()
findFailedNodes(membershipNodes.toList).foreach { failedNodeName
val allNodes = locallyCachedMembershipNodes.toList
val myIndex = allNodes.indexWhere(_.endsWith(nodeAddress.nodeName))
@ -1396,9 +1410,8 @@ class DefaultClusterNode private[akka] (
.setPayload(ByteString.copyFrom(bytes))
.build
membershipNodes foreach { node
replicaConnections.get(node) foreach {
case (_, connection)
connection ! command
nodeConnections.get(node) foreach {
case (_, connection) sendCommandToReplica(connection, command, async = true)
}
}
}
@ -1412,6 +1425,8 @@ class DefaultClusterNode private[akka] (
private def migrateWithoutCheckingThatActorResidesOnItsHomeNode(
from: NodeAddress, to: NodeAddress, actorAddress: String) {
EventHandler.debug(this, "Migrating actor [%s] from node [%s] to node [%s]".format(actorAddress, from, to))
actorUuidsForActorAddress(actorAddress) map { uuid
val actorAddressOption = actorAddressForUuid(uuid)
if (actorAddressOption.isDefined) {
@ -1420,9 +1435,8 @@ class DefaultClusterNode private[akka] (
if (!isInUseOnNode(actorAddress, to)) {
release(actorAddress)
val newAddress = new InetSocketAddress(to.hostname, to.port)
ignore[ZkNodeExistsException](zkClient.createPersistent(actorRegistryNodePathFor(uuid)))
ignore[ZkNodeExistsException](zkClient.createEphemeral(actorRegistryNodePathFor(uuid, newAddress)))
ignore[ZkNodeExistsException](zkClient.createEphemeral(actorRegistryNodePathFor(uuid, remoteSocketAddressForNode(to.nodeName))))
ignore[ZkNodeExistsException](zkClient.createEphemeral(actorLocationsPathFor(uuid, to)))
ignore[ZkNodeExistsException](zkClient.createPersistent(actorAtNodePathFor(nodeAddress.nodeName, uuid)))
@ -1505,9 +1519,9 @@ class DefaultClusterNode private[akka] (
override def isConnected = self.isConnected.isOn
override def getRemoteServerHostname = self.nodeAddress.hostname
override def getRemoteServerHostname = self.hostname
override def getRemoteServerPort = self.nodeAddress.port
override def getRemoteServerPort = self.port
override def getNodeName = self.nodeAddress.nodeName
@ -1563,7 +1577,7 @@ class MembershipChildListener(self: ClusterNode) extends IZkChildListener with E
"MembershipChildListener at [%s] has children [%s]"
.format(self.nodeAddress.nodeName, childList.mkString(" ")))
self.findNewlyConnectedMembershipNodes(childList) foreach { name
self.addressForNode(name) foreach (address self.nodeNameToAddress += (name -> address)) // update 'nodename-address' map
self.remoteSocketAddressForNode(name) foreach (address self.nodeNameToAddress += (name -> address)) // update 'nodename-address' map
self.publish(NodeConnected(name))
}