diff --git a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala index fa28f0a4f3..84c783ed37 100644 --- a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala +++ b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala @@ -504,7 +504,7 @@ trait ClusterNode { private[cluster] def failOverConnections(from: InetSocketAddress, to: InetSocketAddress) - private[cluster] def automaticMigrationFromFailedNodes(currentSetOfClusterNodes: List[String]) + private[cluster] def migrateActorsOnFailedNodes(currentNodes: List[String]) private[cluster] def membershipPathFor(node: String): String @@ -530,7 +530,7 @@ trait ClusterNode { private[cluster] def actorRegistryNodePathFor(uuid: UUID, address: InetSocketAddress): String - private[cluster] def remoteSocketAddressForNode(node: String): InetSocketAddress + private[cluster] def remoteSocketAddressForNode(node: String): Option[InetSocketAddress] private[cluster] def createActorsAtAddressPath() } diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 1ef271f52b..d9450c4ba8 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -60,6 +60,8 @@ import com.google.protobuf.ByteString /** * JMX MBean for the cluster service. * + * FIXME revisit the methods in this MBean interface, they are not up to date with new cluster API + * * @author Jonas Bonér */ trait ClusterNodeMBean { @@ -665,6 +667,9 @@ class DefaultClusterNode private[akka] ( // create ADDRESS -> UUIDs registry ignore[ZkNodeExistsException](zkClient.createPersistent(actorAddressToUuidsPathFor(actorRef.address))) ignore[ZkNodeExistsException](zkClient.createPersistent("%s/%s".format(actorAddressToUuidsPathFor(actorRef.address), uuid))) + + // create NODE NAME -> UUID registry + ignore[ZkNodeExistsException](zkClient.createPersistent(actorAtNodePathFor(nodeAddress.nodeName, uuid))) } import RemoteClusterDaemon._ @@ -1197,7 +1202,7 @@ class DefaultClusterNode private[akka] ( private def sendCommandToReplica(connection: ActorRef, command: RemoteDaemonMessageProtocol, async: Boolean = true) { if (async) { - connection ! (command, remoteDaemonAckTimeout) + connection ! command } else { (connection ? (command, remoteDaemonAckTimeout)).as[Status] match { @@ -1300,8 +1305,8 @@ class DefaultClusterNode private[akka] ( /** * Connect to all available replicas unless already connected). */ - private def connectToAllNewlyArrivedMembershipNodesInCluster() { - membershipNodes foreach { node ⇒ + private def connectToAllNewlyArrivedMembershipNodesInCluster(currentSetOfClusterNodes: Traversable[String] = membershipNodes) { + currentSetOfClusterNodes foreach { node ⇒ if ((node != Config.nodename)) { // no replica on the "home" node of the ref if (!nodeConnections.contains(node)) { // only connect to each replica once val addressOption = remoteSocketAddressForNode(node) @@ -1340,8 +1345,12 @@ class DefaultClusterNode private[akka] ( } } - private[cluster] def remoteSocketAddressForNode(node: String): InetSocketAddress = { - zkClient.readData(membershipPathFor(node), new Stat).asInstanceOf[InetSocketAddress] + private[cluster] def remoteSocketAddressForNode(node: String): Option[InetSocketAddress] = { + try { + Some(zkClient.readData(membershipPathFor(node), new Stat).asInstanceOf[InetSocketAddress]) + } catch { + case e: ZkNoNodeException ⇒ None + } } private[cluster] def createActorsAtAddressPath() { @@ -1352,10 +1361,12 @@ class DefaultClusterNode private[akka] ( clusterActorRefs.values(from) foreach (_.failOver(from, to)) } - // FIXME makes use of automaticMigrationFromFailedNodes method, why is it not used? - private[cluster] def automaticMigrationFromFailedNodes(currentSetOfClusterNodes: List[String]) { - connectToAllNewlyArrivedMembershipNodesInCluster() - findFailedNodes(currentSetOfClusterNodes).foreach { failedNodeName ⇒ + private[cluster] def migrateActorsOnFailedNodes(currentSetOfClusterNodes: List[String]) { + connectToAllNewlyArrivedMembershipNodesInCluster(currentSetOfClusterNodes) + + val failedNodes = findFailedNodes(currentSetOfClusterNodes) + + failedNodes.foreach { failedNodeName ⇒ val allNodes = locallyCachedMembershipNodes.toList val myIndex = allNodes.indexWhere(_.endsWith(nodeAddress.nodeName)) @@ -1363,11 +1374,11 @@ class DefaultClusterNode private[akka] ( // Migrate to the successor of the failed node (using a sorted circular list of the node names) if ((failedNodeIndex == 0 && myIndex == locallyCachedMembershipNodes.size - 1) || // No leftmost successor exists, check the tail - (failedNodeIndex == myIndex + 1)) { - // Am I the leftmost successor? + (failedNodeIndex == myIndex + 1)) { // Am I the leftmost successor? // Yes I am the node to migrate the actor to (can only be one in the cluster) val actorUuidsForFailedNode = zkClient.getChildren(actorsAtNodePathFor(failedNodeName)) + EventHandler.debug(this, "Migrating actors from failed node [%s] to node [%s]: Actor UUIDs [%s]" .format(failedNodeName, nodeAddress.nodeName, actorUuidsForFailedNode)) @@ -1380,33 +1391,29 @@ class DefaultClusterNode private[akka] ( val actorAddressOption = actorAddressForUuid(uuidFrom(uuid)) if (actorAddressOption.isDefined) { val actorAddress = actorAddressOption.get + migrateWithoutCheckingThatActorResidesOnItsHomeNode( // since the ephemeral node is already gone, so can't check NodeAddress(nodeAddress.clusterName, failedNodeName), nodeAddress, actorAddress) - val serializer: Serializer = serializerForActor(actorAddress) - use(actorAddress, serializer) foreach { actor ⇒ - // FIXME remove ugly reflection when we have 1.0 final which has 'fromBinary(byte, homeAddress)(format)' - //actor.homeAddress = remoteServerAddress - val homeAddress = classOf[LocalActorRef].getDeclaredField("homeAddress") - homeAddress.setAccessible(true) - homeAddress.set(actor, Some(remoteServerAddress)) - - remoteService.register(actorAddress, actor) - } + use(actorAddress, serializerForActor(actorAddress)) foreach (actor ⇒ remoteService.register(actorAddress, actor)) } } // notify all available nodes that they should fail-over all connections from 'from' to 'to' val from = nodeNameToAddress(failedNodeName) val to = remoteServerAddress + Serialization.serialize((from, to)) match { case Left(error) ⇒ throw error case Right(bytes) ⇒ + val command = RemoteDaemonMessageProtocol.newBuilder .setMessageType(FAIL_OVER_CONNECTIONS) .setPayload(ByteString.copyFrom(bytes)) .build - membershipNodes foreach { node ⇒ + + // FIXME now we are broadcasting to ALL nodes in the cluster even though a fraction might have a reference to the actors - should that be fixed? + currentSetOfClusterNodes foreach { node ⇒ nodeConnections.get(node) foreach { case (_, connection) ⇒ sendCommandToReplica(connection, command, async = true) } @@ -1433,7 +1440,9 @@ class DefaultClusterNode private[akka] ( release(actorAddress) ignore[ZkNodeExistsException](zkClient.createPersistent(actorRegistryNodePathFor(uuid))) - ignore[ZkNodeExistsException](zkClient.createEphemeral(actorRegistryNodePathFor(uuid, remoteSocketAddressForNode(to.nodeName)))) + ignore[ZkNodeExistsException](zkClient.createEphemeral(actorRegistryNodePathFor(uuid, + remoteSocketAddressForNode(to.nodeName).getOrElse(throw new ClusterException("No remote address registered for [" + to.nodeName + "]"))))) + ignore[ZkNodeExistsException](zkClient.createEphemeral(actorLocationsPathFor(uuid, to))) ignore[ZkNodeExistsException](zkClient.createPersistent(actorAtNodePathFor(nodeAddress.nodeName, uuid))) @@ -1448,19 +1457,19 @@ class DefaultClusterNode private[akka] ( } private[cluster] def findFailedNodes(nodes: List[String]): List[String] = - (locallyCachedMembershipNodes diff Set(nodes: _*)).toList + (locallyCachedMembershipNodes.toArray.toSet.asInstanceOf[Set[String]] diff Set(nodes: _*)).toList private[cluster] def findNewlyConnectedMembershipNodes(nodes: List[String]): List[String] = - (Set(nodes: _*) diff locallyCachedMembershipNodes).toList + (Set(nodes: _*) diff locallyCachedMembershipNodes.toArray.toSet.asInstanceOf[Set[String]]).toList private[cluster] def findNewlyDisconnectedMembershipNodes(nodes: List[String]): List[String] = - (locallyCachedMembershipNodes diff Set(nodes: _*)).toList + (locallyCachedMembershipNodes.toArray.toSet.asInstanceOf[Set[String]] diff Set(nodes: _*)).toList private[cluster] def findNewlyConnectedAvailableNodes(nodes: List[String]): List[String] = - (Set(nodes: _*) diff locallyCachedMembershipNodes).toList + (Set(nodes: _*) diff locallyCachedMembershipNodes.toArray.toSet.asInstanceOf[Set[String]]).toList private[cluster] def findNewlyDisconnectedAvailableNodes(nodes: List[String]): List[String] = - (locallyCachedMembershipNodes diff Set(nodes: _*)).toList + (locallyCachedMembershipNodes.toArray.toSet.asInstanceOf[Set[String]] diff Set(nodes: _*)).toList private def createRootClusterNode() { ignore[ZkNodeExistsException] { @@ -1573,6 +1582,9 @@ class MembershipChildListener(self: ClusterNode) extends IZkChildListener with E if (!childList.isEmpty) EventHandler.debug(this, "MembershipChildListener at [%s] has children [%s]" .format(self.nodeAddress.nodeName, childList.mkString(" "))) + + self.migrateActorsOnFailedNodes(currentChilds.toList) + self.findNewlyConnectedMembershipNodes(childList) foreach { name ⇒ self.remoteSocketAddressForNode(name) foreach (address ⇒ self.nodeNameToAddress += (name -> address)) // update 'nodename-address' map self.publish(NodeConnected(name)) @@ -1772,6 +1784,9 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { } private def payloadFor[T](message: RemoteDaemonMessageProtocol, clazz: Class[T]): T = { - Serialization.serialize(message.getPayload.toByteArray, Some(clazz)).asInstanceOf[T] + Serialization.deserialize(message.getPayload.toByteArray, clazz, None) match { + case Left(error) ⇒ throw error + case Right(instance) ⇒ instance.asInstanceOf[T] + } } } diff --git a/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode3.conf b/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode3.conf new file mode 100644 index 0000000000..480c30c09d --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode3.conf @@ -0,0 +1 @@ +akka.event-handler-level = "DEBUG" diff --git a/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode3.opts b/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode3.opts new file mode 100644 index 0000000000..202496ad31 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode3.opts @@ -0,0 +1 @@ +-Dakka.cluster.nodename=node3 -Dakka.cluster.port=9993 diff --git a/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmSpec.scala b/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmSpec.scala index ee7796067d..5ab7b8726a 100644 --- a/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmSpec.scala @@ -20,7 +20,7 @@ import akka.serialization.Serialization import java.util.concurrent._ object MigrationAutomaticMultiJvmSpec { - var NrOfNodes = 2 + var NrOfNodes = 3 class HelloWorld extends Actor with Serializable { def receive = { @@ -37,6 +37,9 @@ class MigrationAutomaticMultiJvmNode1 extends WordSpec with MustMatchers { "be able to migrate an actor from one node to another" in { + barrier("start-node3", NrOfNodes) { + } + barrier("start-node2", NrOfNodes) { } @@ -57,10 +60,15 @@ class MigrationAutomaticMultiJvmNode1 extends WordSpec with MustMatchers { class MigrationAutomaticMultiJvmNode2 extends WordSpec with MustMatchers with BeforeAndAfterAll { import MigrationAutomaticMultiJvmSpec._ + var isFirstReplicaNode = false + "A cluster" must { "be able to migrate an actor from one node to another" in { + barrier("start-node3", NrOfNodes) { + } + barrier("start-node2", NrOfNodes) { node.start() } @@ -73,14 +81,51 @@ class MigrationAutomaticMultiJvmNode2 extends WordSpec with MustMatchers with Be Thread.sleep(2000) // wait for fail-over - node.isInUseOnNode("hello-world") must be(true) - val actorRef = Actor.registry.local.actorFor("hello-world").getOrElse(fail("Actor should have been in the local actor registry")) - actorRef.address must be("hello-world") - (actorRef ? "Hello").as[String].get must be("World from node [node2]") + barrier("check-fail-over", NrOfNodes - 1) { + // both remaining nodes should now have the replica + node.isInUseOnNode("hello-world") must be(true) + val actorRef = Actor.registry.local.actorFor("hello-world").getOrElse(fail("Actor should have been in the local actor registry")) + actorRef.address must be("hello-world") + (actorRef ? "Hello").as[String].get must be("World from node [node2]") + } node.shutdown() } + } +} +class MigrationAutomaticMultiJvmNode3 extends WordSpec with MustMatchers with BeforeAndAfterAll { + import MigrationAutomaticMultiJvmSpec._ + + "A cluster" must { + + "be able to migrate an actor from one node to another" in { + + barrier("start-node3", NrOfNodes) { + node.start() + } + + barrier("start-node2", NrOfNodes) { + } + + barrier("start-node1", NrOfNodes) { + } + + barrier("store-actor-in-node1", NrOfNodes) { + } + + Thread.sleep(2000) // wait for fail-over + + barrier("check-fail-over", NrOfNodes - 1) { + // both remaining nodes should now have the replica + node.isInUseOnNode("hello-world") must be(true) + val actorRef = Actor.registry.local.actorFor("hello-world").getOrElse(fail("Actor should have been in the local actor registry")) + actorRef.address must be("hello-world") + (actorRef ? "Hello").as[String].get must be("World from node [node3]") + } + + node.shutdown() + } } override def beforeAll() = {