diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmNode1.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmNode1.conf index 81c84d4084..e0993d6d9c 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmNode1.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmNode1.conf @@ -4,4 +4,5 @@ akka.event-handler-level = "WARNING" akka.actor.deployment.service-hello.router = "random" akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1", "node:node3"] akka.actor.deployment.service-hello.clustered.replication-factor = 2 -akka.actor.timeout = 30 \ No newline at end of file +akka.actor.timeout = 30 +akka.cluster.session-timeout = 10 diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmNode1.opts b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmNode1.opts index a88c260d8c..d739389a77 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmNode1.opts +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmNode1.opts @@ -1 +1 @@ --Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991 +-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991 -Dakka.event.force-sync=true diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmNode2.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmNode2.conf index 81c84d4084..e0993d6d9c 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmNode2.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmNode2.conf @@ -4,4 +4,5 @@ akka.event-handler-level = "WARNING" akka.actor.deployment.service-hello.router = "random" akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1", "node:node3"] akka.actor.deployment.service-hello.clustered.replication-factor = 2 -akka.actor.timeout = 30 \ No newline at end of file +akka.actor.timeout = 30 +akka.cluster.session-timeout = 10 diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmNode2.opts b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmNode2.opts index f1e01f253d..e32c0130f6 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmNode2.opts +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmNode2.opts @@ -1 +1 @@ --Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992 +-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992 -Dakka.event.force-sync=true diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmNode3.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmNode3.conf index 81c84d4084..e0993d6d9c 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmNode3.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmNode3.conf @@ -4,4 +4,5 @@ akka.event-handler-level = "WARNING" akka.actor.deployment.service-hello.router = "random" akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1", "node:node3"] akka.actor.deployment.service-hello.clustered.replication-factor = 2 -akka.actor.timeout = 30 \ No newline at end of file +akka.actor.timeout = 30 +akka.cluster.session-timeout = 10 diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmNode3.opts b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmNode3.opts index 202496ad31..eba2597cdb 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmNode3.opts +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmNode3.opts @@ -1 +1 @@ --Dakka.cluster.nodename=node3 -Dakka.cluster.port=9993 +-Dakka.cluster.nodename=node3 -Dakka.cluster.port=9993 -Dakka.event.force-sync=true diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmSpec.scala index c428761371..fd2e5a0058 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/random/failover/RandomFailoverMultiJvmSpec.scala @@ -4,6 +4,8 @@ import akka.config.Config import akka.cluster._ import akka.actor.{ ActorRef, Actor } import akka.event.EventHandler +import akka.util.duration._ +import akka.util.{ Duration, Timer } import akka.testkit.{ EventFilter, TestEvent } import java.util.{ Collections, Set ⇒ JSet } import java.net.ConnectException @@ -17,17 +19,8 @@ object RandomFailoverMultiJvmSpec { class SomeActor extends Actor with Serializable { def receive = { - case "identify" ⇒ { + case "identify" ⇒ self.reply(Config.nodename) - } - case "shutdown" ⇒ { - new Thread() { - override def run() { - Thread.sleep(2000) - Cluster.node.shutdown() - } - }.start() - } } } @@ -44,23 +37,28 @@ class RandomFailoverMultiJvmNode1 extends MasterClusterTestNode { val ignoreExceptions = Seq( EventFilter[NotYetConnectedException], EventFilter[ConnectException], - EventFilter[ClusterException]) + EventFilter[ClusterException], + EventFilter[java.nio.channels.ClosedChannelException]) var oldFoundConnections: JSet[String] = null var actor: ActorRef = null barrier("node-start", NrOfNodes) { EventHandler.notify(TestEvent.Mute(ignoreExceptions)) - Cluster.node + Cluster.node.start() } barrier("actor-creation", NrOfNodes) { actor = Actor.actorOf[SomeActor]("service-hello") actor.isInstanceOf[ClusterActorRef] must be(true) + } - // val actor2 = Actor.registry.local.actorFor("service-hello") - // .getOrElse(fail("Actor should have been in the local actor registry")) + val timer = Timer(30.seconds, true) + while (timer.isTicking && + !Cluster.node.isInUseOnNode("service-hello", "node1") && + !Cluster.node.isInUseOnNode("service-hello", "node3")) {} + barrier("actor-usage", NrOfNodes) { Cluster.node.isInUseOnNode("service-hello") must be(true) oldFoundConnections = identifyConnections(actor) @@ -68,9 +66,12 @@ class RandomFailoverMultiJvmNode1 extends MasterClusterTestNode { oldFoundConnections.size() must be(2) } - Thread.sleep(5000) // wait for fail-over from node3 - barrier("verify-fail-over", NrOfNodes - 1) { + val timer = Timer(30.seconds, true) + while (timer.isTicking && + !Cluster.node.isInUseOnNode("service-hello", "node1") && + !Cluster.node.isInUseOnNode("service-hello", "node2")) {} + val newFoundConnections = identifyConnections(actor) //it still must be 2 since a different node should have been used to failover to @@ -89,7 +90,7 @@ class RandomFailoverMultiJvmNode1 extends MasterClusterTestNode { def identifyConnections(actor: ActorRef): JSet[String] = { val set = new java.util.HashSet[String] - for (i ← 0 until NrOfNodes * 2) { + for (i ← 0 until 100) { // we should get hits from both nodes in 100 attempts, if not then not very random val value = (actor ? "identify").get.asInstanceOf[String] set.add(value) } @@ -104,10 +105,13 @@ class RandomFailoverMultiJvmNode2 extends ClusterTestNode { "___" must { "___" in { barrier("node-start", NrOfNodes) { - Cluster.node + Cluster.node.start() } barrier("actor-creation", NrOfNodes).await() + barrier("actor-usage", NrOfNodes).await() + + Cluster.node.isInUseOnNode("service-hello") must be(false) Thread.sleep(5000) // wait for fail-over from node3 @@ -125,10 +129,13 @@ class RandomFailoverMultiJvmNode3 extends ClusterTestNode { "___" must { "___" in { barrier("node-start", NrOfNodes) { - Cluster.node + Cluster.node.start() } barrier("actor-creation", NrOfNodes).await() + barrier("actor-usage", NrOfNodes).await() + + Cluster.node.isInUseOnNode("service-hello") must be(true) Cluster.node.shutdown() } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin/failover/RoundRobinFailoverMultiJvmNode1.opts b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin/failover/RoundRobinFailoverMultiJvmNode1.opts index a88c260d8c..d739389a77 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin/failover/RoundRobinFailoverMultiJvmNode1.opts +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin/failover/RoundRobinFailoverMultiJvmNode1.opts @@ -1 +1 @@ --Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991 +-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991 -Dakka.event.force-sync=true diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin/failover/RoundRobinFailoverMultiJvmNode2.opts b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin/failover/RoundRobinFailoverMultiJvmNode2.opts index f1e01f253d..e32c0130f6 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin/failover/RoundRobinFailoverMultiJvmNode2.opts +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin/failover/RoundRobinFailoverMultiJvmNode2.opts @@ -1 +1 @@ --Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992 +-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992 -Dakka.event.force-sync=true diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin/failover/RoundRobinFailoverMultiJvmNode3.opts b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin/failover/RoundRobinFailoverMultiJvmNode3.opts index 202496ad31..eba2597cdb 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin/failover/RoundRobinFailoverMultiJvmNode3.opts +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin/failover/RoundRobinFailoverMultiJvmNode3.opts @@ -1 +1 @@ --Dakka.cluster.nodename=node3 -Dakka.cluster.port=9993 +-Dakka.cluster.nodename=node3 -Dakka.cluster.port=9993 -Dakka.event.force-sync=true diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin/failover/RoundRobinFailoverMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin/failover/RoundRobinFailoverMultiJvmSpec.scala index 0ba244c0d2..a3bb299381 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin/failover/RoundRobinFailoverMultiJvmSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/roundrobin/failover/RoundRobinFailoverMultiJvmSpec.scala @@ -5,6 +5,8 @@ import akka.cluster._ import akka.actor.{ ActorRef, Actor } import akka.event.EventHandler import akka.testkit.{ EventFilter, TestEvent } +import akka.util.duration._ +import akka.util.{ Duration, Timer } import java.util.{ Collections, Set ⇒ JSet } import java.net.ConnectException import java.nio.channels.NotYetConnectedException @@ -18,17 +20,8 @@ object RoundRobinFailoverMultiJvmSpec { class SomeActor extends Actor with Serializable { def receive = { - case "identify" ⇒ { + case "identify" ⇒ self.reply(Config.nodename) - } - case "shutdown" ⇒ { - new Thread() { - override def run() { - Thread.sleep(2000) - Cluster.node.shutdown() - } - }.start() - } } } @@ -58,10 +51,15 @@ class RoundRobinFailoverMultiJvmNode1 extends MasterClusterTestNode { barrier("actor-creation", NrOfNodes) { actor = Actor.actorOf[SomeActor]("service-hello") actor.isInstanceOf[ClusterActorRef] must be(true) + } - // val actor2 = Actor.registry.local.actorFor("service-hello") - // .getOrElse(fail("Actor should have been in the local actor registry")) + val timer = Timer(30.seconds, true) + while (timer.isTicking && + !Cluster.node.isInUseOnNode("service-hello", "node1") && + !Cluster.node.isInUseOnNode("service-hello", "node3")) {} + //Thread.sleep(5000) // wait for all actors to start up on other nodes + barrier("actor-usage", NrOfNodes) { Cluster.node.isInUseOnNode("service-hello") must be(true) oldFoundConnections = identifyConnections(actor) @@ -72,6 +70,11 @@ class RoundRobinFailoverMultiJvmNode1 extends MasterClusterTestNode { Thread.sleep(5000) // wait for fail-over from node3 barrier("verify-fail-over", NrOfNodes - 1) { + val timer = Timer(30.seconds, true) + while (timer.isTicking && + !Cluster.node.isInUseOnNode("service-hello", "node1") && + !Cluster.node.isInUseOnNode("service-hello", "node2")) {} + val newFoundConnections = identifyConnections(actor) //it still must be 2 since a different node should have been used to failover to @@ -90,7 +93,7 @@ class RoundRobinFailoverMultiJvmNode1 extends MasterClusterTestNode { def identifyConnections(actor: ActorRef): JSet[String] = { val set = new java.util.HashSet[String] - for (i ← 0 until NrOfNodes * 2) { + for (i ← 0 until 100) { val value = (actor ? "identify").get.asInstanceOf[String] set.add(value) } @@ -109,6 +112,7 @@ class RoundRobinFailoverMultiJvmNode2 extends ClusterTestNode { } barrier("actor-creation", NrOfNodes).await() + barrier("actor-usage", NrOfNodes).await() Cluster.node.isInUseOnNode("service-hello") must be(false) @@ -130,6 +134,7 @@ class RoundRobinFailoverMultiJvmNode3 extends ClusterTestNode { } barrier("actor-creation", NrOfNodes).await() + barrier("actor-usage", NrOfNodes).await() Cluster.node.isInUseOnNode("service-hello") must be(true)