From 7a5c95e44d4024d34295f9329f19992ab4adff01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Sun, 26 Jun 2011 15:04:11 +0200 Subject: [PATCH] Added tests for automatic actor migration when node is shut down. Updated to modified version of ZkClient (0.3, forked and fixed to allow interrupting connection retry). MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jonas Bonér --- .../scala/akka/cluster/ClusterInterface.scala | 2 +- .../src/main/scala/akka/cluster/Cluster.scala | 2 +- .../election/LeaderElectionMultiJvmSpec.scala | 2 +- .../MigrationAutomaticMultiJvmNode1.conf | 1 + .../MigrationAutomaticMultiJvmNode1.opts | 1 + .../MigrationAutomaticMultiJvmNode2.conf | 1 + .../MigrationAutomaticMultiJvmNode2.opts | 1 + .../MigrationAutomaticMultiJvmNode3.conf | 1 + .../MigrationAutomaticMultiJvmNode3.opts | 1 + .../MigrationAutomaticMultiJvmSpec.scala | 132 ++++++++++++++++++ .../sample/ClusteredPingPongSample.scala | 2 +- .../sample/PingPongMultiJvmExample.scala | 4 +- project/build/AkkaProject.scala | 2 +- 13 files changed, 145 insertions(+), 7 deletions(-) create mode 100644 akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode1.conf create mode 100644 akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode1.opts create mode 100644 akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode2.conf create mode 100644 akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode2.opts create mode 100644 akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode3.conf create mode 100644 akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode3.opts create mode 100644 akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmSpec.scala diff --git a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala index aa22b25ff5..2254fbd956 100644 --- a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala +++ b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala @@ -497,7 +497,7 @@ trait ClusterNode { private[cluster] def failOverConnections(from: InetSocketAddress, to: InetSocketAddress) - private[cluster] def automaticMigrationFromFailedNodes](currentSetOfClusterNodes: List[String]) + private[cluster] def automaticMigrationFromFailedNodes(currentSetOfClusterNodes: List[String]) private[cluster] def membershipPathFor(node: String): String diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 50a1e52203..d0eb2983c4 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -1354,7 +1354,7 @@ class DefaultClusterNode private[akka] ( } // FIXME makes use of automaticMigrationFromFailedNodes method, why is it not used? - private[cluster] def automaticMigrationFromFailedNodes](currentSetOfClusterNodes: List[String]) { + private[cluster] def automaticMigrationFromFailedNodes(currentSetOfClusterNodes: List[String]) { connectToAllNewlyArrivedMembershipNodesInCluster() findFailedNodes(currentSetOfClusterNodes).foreach { failedNodeName ⇒ diff --git a/akka-cluster/src/test/scala/akka/cluster/api/leader/election/LeaderElectionMultiJvmSpec.scala b/akka-cluster/src/test/scala/akka/cluster/api/leader/election/LeaderElectionMultiJvmSpec.scala index 92678883e8..b87dcd07ac 100644 --- a/akka-cluster/src/test/scala/akka/cluster/api/leader/election/LeaderElectionMultiJvmSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/api/leader/election/LeaderElectionMultiJvmSpec.scala @@ -73,4 +73,4 @@ class LeaderElectionMultiJvmNode2 extends WordSpec with MustMatchers { } } } -*/ \ No newline at end of file +*/ \ No newline at end of file diff --git a/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode1.conf b/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode1.conf new file mode 100644 index 0000000000..480c30c09d --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode1.conf @@ -0,0 +1 @@ +akka.event-handler-level = "DEBUG" diff --git a/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode1.opts b/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode1.opts new file mode 100644 index 0000000000..a88c260d8c --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode1.opts @@ -0,0 +1 @@ +-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991 diff --git a/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode2.conf b/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode2.conf new file mode 100644 index 0000000000..480c30c09d --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode2.conf @@ -0,0 +1 @@ +akka.event-handler-level = "DEBUG" diff --git a/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode2.opts b/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode2.opts new file mode 100644 index 0000000000..f1e01f253d --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode2.opts @@ -0,0 +1 @@ +-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992 diff --git a/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode3.conf b/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode3.conf new file mode 100644 index 0000000000..480c30c09d --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode3.conf @@ -0,0 +1 @@ +akka.event-handler-level = "DEBUG" diff --git a/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode3.opts b/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode3.opts new file mode 100644 index 0000000000..202496ad31 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode3.opts @@ -0,0 +1 @@ +-Dakka.cluster.nodename=node3 -Dakka.cluster.port=9993 diff --git a/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmSpec.scala b/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmSpec.scala new file mode 100644 index 0000000000..98b22c38a2 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmSpec.scala @@ -0,0 +1,132 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +package akka.cluster.api.migration.automatic + +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers +import org.scalatest.BeforeAndAfterAll + +import akka.actor._ +import Actor._ +import akka.cluster._ +import ChangeListener._ +import Cluster._ +import DeploymentConfig._ +import akka.config.Config +import akka.serialization.Serialization + +import java.util.concurrent._ + +object MigrationAutomaticMultiJvmSpec { + var NrOfNodes = 2 + + class HelloWorld extends Actor with Serializable { + def receive = { + case "Hello" ⇒ + self.reply("World from node [" + Config.nodename + "]") + } + } +} + +class MigrationAutomaticMultiJvmNode1 extends WordSpec with MustMatchers { + import MigrationAutomaticMultiJvmSpec._ + + "A cluster" must { + + "be able to migrate an actor from one node to another" in { + + barrier("start-node2", NrOfNodes) { + } + + barrier("start-node1", NrOfNodes) { + node.start() + } + + barrier("store-actor-in-node1", NrOfNodes) { + val serializer = Serialization.serializerFor(classOf[HelloWorld]).fold(x ⇒ fail("No serializer found"), s ⇒ s) + node.store(actorOf[HelloWorld]("hello-world"), 1, serializer) + } + + node.shutdown() + } + } +} + +class MigrationAutomaticMultiJvmNode2 extends WordSpec with MustMatchers with BeforeAndAfterAll { + import MigrationAutomaticMultiJvmSpec._ + + "A cluster" must { + + "be able to migrate an actor from one node to another" in { + + barrier("start-node2", NrOfNodes) { + node.start() + } + + barrier("start-node1", NrOfNodes) { + } + + barrier("store-actor-in-node1", NrOfNodes) { + } + + Thread.sleep(2000) // wait for fail-over + + node.isInUseOnNode("hello-world") must be(true) + val actorRef = Actor.registry.local.actorFor("hello-world").getOrElse(fail("Actor should have been in the local actor registry")) + actorRef.address must be("hello-world") + (actorRef ? "Hello").as[String].get must be("World from node [node2]") + + node.shutdown() + } + + } + + override def beforeAll() = { + startLocalCluster() + } + + override def afterAll() = { + shutdownLocalCluster() + } +} +/* +class MigrationAutomaticMultiJvmNode3 extends WordSpec with MustMatchers { + import MigrationAutomaticMultiJvmSpec._ + + "A cluster" must { + + "be able to migrate an actor from one node to another" in { + + barrier("start-node-1", NrOfNodes) { + } + + barrier("start-node-2", NrOfNodes) { + node.start() + } + + barrier("store-1-in-node-1", NrOfNodes) { + } + + barrier("use-1-in-node-2", NrOfNodes) { + val actorOrOption = node.use("hello-world") + if (actorOrOption.isEmpty) fail("Actor could not be retrieved") + + val actorRef = actorOrOption.get + actorRef.address must be("hello-world") + + (actorRef ? "Hello").as[String].get must be("World from node [node2]") + } + + barrier("migrate-from-node2-to-node1", NrOfNodes) { + node.migrate(NodeAddress(node.nodeAddress.clusterName, "node1"), "hello-world") + Thread.sleep(2000) + } + + barrier("check-actor-is-moved-to-node1", NrOfNodes) { + } + } + } +} +*/ \ No newline at end of file diff --git a/akka-cluster/src/test/scala/akka/cluster/sample/ClusteredPingPongSample.scala b/akka-cluster/src/test/scala/akka/cluster/sample/ClusteredPingPongSample.scala index 5b53e8bcc6..d44212899b 100644 --- a/akka-cluster/src/test/scala/akka/cluster/sample/ClusteredPingPongSample.scala +++ b/akka-cluster/src/test/scala/akka/cluster/sample/ClusteredPingPongSample.scala @@ -131,4 +131,4 @@ object ClusteredPingPongSample { Cluster.shutdownLocalCluster() } } -*/ \ No newline at end of file +*/ \ No newline at end of file diff --git a/akka-cluster/src/test/scala/akka/cluster/sample/PingPongMultiJvmExample.scala b/akka-cluster/src/test/scala/akka/cluster/sample/PingPongMultiJvmExample.scala index aed854a30f..f97724bf67 100644 --- a/akka-cluster/src/test/scala/akka/cluster/sample/PingPongMultiJvmExample.scala +++ b/akka-cluster/src/test/scala/akka/cluster/sample/PingPongMultiJvmExample.scala @@ -9,7 +9,7 @@ import akka.cluster._ import akka.actor._ import akka.util.duration._ -object PingPong { +object PingPongMultiJvmExample { val PING_ADDRESS = "ping" val PONG_ADDRESS = "pong" @@ -224,4 +224,4 @@ class PongNode(number: Int) { node.stop } } -*/ \ No newline at end of file +*/ \ No newline at end of file diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index ceb24c849b..8dc37fac60 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -139,7 +139,7 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec lazy val log4j = "log4j" % "log4j" % "1.2.15" //ApacheV2 lazy val zookeeper = "org.apache.hadoop.zookeeper" % "zookeeper" % ZOOKEEPER_VERSION //ApacheV2 lazy val zookeeper_lock = "org.apache.hadoop.zookeeper" % "zookeeper-recipes-lock" % ZOOKEEPER_VERSION //ApacheV2 - lazy val zkClient = "zkclient" % "zkclient" % "0.2" //ApacheV2 + lazy val zkClient = "zkclient" % "zkclient" % "0.3" //ApacheV2 // Test lazy val multiverse_test = "org.multiverse" % "multiverse-alpha" % MULTIVERSE_VERSION % "test" //ApacheV2