Rewrote RandomFailoverMultiJvm and RoundRobinFailoverMultiJvm to use polling instead of Thread.sleep.
Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
parent
66bb47db6a
commit
fe1d32b605
11 changed files with 56 additions and 41 deletions
|
|
@ -4,4 +4,5 @@ akka.event-handler-level = "WARNING"
|
|||
akka.actor.deployment.service-hello.router = "random"
|
||||
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1", "node:node3"]
|
||||
akka.actor.deployment.service-hello.clustered.replication-factor = 2
|
||||
akka.actor.timeout = 30
|
||||
akka.actor.timeout = 30
|
||||
akka.cluster.session-timeout = 10
|
||||
|
|
|
|||
|
|
@ -1 +1 @@
|
|||
-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991
|
||||
-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991 -Dakka.event.force-sync=true
|
||||
|
|
|
|||
|
|
@ -4,4 +4,5 @@ akka.event-handler-level = "WARNING"
|
|||
akka.actor.deployment.service-hello.router = "random"
|
||||
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1", "node:node3"]
|
||||
akka.actor.deployment.service-hello.clustered.replication-factor = 2
|
||||
akka.actor.timeout = 30
|
||||
akka.actor.timeout = 30
|
||||
akka.cluster.session-timeout = 10
|
||||
|
|
|
|||
|
|
@ -1 +1 @@
|
|||
-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992
|
||||
-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992 -Dakka.event.force-sync=true
|
||||
|
|
|
|||
|
|
@ -4,4 +4,5 @@ akka.event-handler-level = "WARNING"
|
|||
akka.actor.deployment.service-hello.router = "random"
|
||||
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1", "node:node3"]
|
||||
akka.actor.deployment.service-hello.clustered.replication-factor = 2
|
||||
akka.actor.timeout = 30
|
||||
akka.actor.timeout = 30
|
||||
akka.cluster.session-timeout = 10
|
||||
|
|
|
|||
|
|
@ -1 +1 @@
|
|||
-Dakka.cluster.nodename=node3 -Dakka.cluster.port=9993
|
||||
-Dakka.cluster.nodename=node3 -Dakka.cluster.port=9993 -Dakka.event.force-sync=true
|
||||
|
|
|
|||
|
|
@ -4,6 +4,8 @@ import akka.config.Config
|
|||
import akka.cluster._
|
||||
import akka.actor.{ ActorRef, Actor }
|
||||
import akka.event.EventHandler
|
||||
import akka.util.duration._
|
||||
import akka.util.{ Duration, Timer }
|
||||
import akka.testkit.{ EventFilter, TestEvent }
|
||||
import java.util.{ Collections, Set ⇒ JSet }
|
||||
import java.net.ConnectException
|
||||
|
|
@ -17,17 +19,8 @@ object RandomFailoverMultiJvmSpec {
|
|||
class SomeActor extends Actor with Serializable {
|
||||
|
||||
def receive = {
|
||||
case "identify" ⇒ {
|
||||
case "identify" ⇒
|
||||
self.reply(Config.nodename)
|
||||
}
|
||||
case "shutdown" ⇒ {
|
||||
new Thread() {
|
||||
override def run() {
|
||||
Thread.sleep(2000)
|
||||
Cluster.node.shutdown()
|
||||
}
|
||||
}.start()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -44,23 +37,28 @@ class RandomFailoverMultiJvmNode1 extends MasterClusterTestNode {
|
|||
val ignoreExceptions = Seq(
|
||||
EventFilter[NotYetConnectedException],
|
||||
EventFilter[ConnectException],
|
||||
EventFilter[ClusterException])
|
||||
EventFilter[ClusterException],
|
||||
EventFilter[java.nio.channels.ClosedChannelException])
|
||||
|
||||
var oldFoundConnections: JSet[String] = null
|
||||
var actor: ActorRef = null
|
||||
|
||||
barrier("node-start", NrOfNodes) {
|
||||
EventHandler.notify(TestEvent.Mute(ignoreExceptions))
|
||||
Cluster.node
|
||||
Cluster.node.start()
|
||||
}
|
||||
|
||||
barrier("actor-creation", NrOfNodes) {
|
||||
actor = Actor.actorOf[SomeActor]("service-hello")
|
||||
actor.isInstanceOf[ClusterActorRef] must be(true)
|
||||
}
|
||||
|
||||
// val actor2 = Actor.registry.local.actorFor("service-hello")
|
||||
// .getOrElse(fail("Actor should have been in the local actor registry"))
|
||||
val timer = Timer(30.seconds, true)
|
||||
while (timer.isTicking &&
|
||||
!Cluster.node.isInUseOnNode("service-hello", "node1") &&
|
||||
!Cluster.node.isInUseOnNode("service-hello", "node3")) {}
|
||||
|
||||
barrier("actor-usage", NrOfNodes) {
|
||||
Cluster.node.isInUseOnNode("service-hello") must be(true)
|
||||
oldFoundConnections = identifyConnections(actor)
|
||||
|
||||
|
|
@ -68,9 +66,12 @@ class RandomFailoverMultiJvmNode1 extends MasterClusterTestNode {
|
|||
oldFoundConnections.size() must be(2)
|
||||
}
|
||||
|
||||
Thread.sleep(5000) // wait for fail-over from node3
|
||||
|
||||
barrier("verify-fail-over", NrOfNodes - 1) {
|
||||
val timer = Timer(30.seconds, true)
|
||||
while (timer.isTicking &&
|
||||
!Cluster.node.isInUseOnNode("service-hello", "node1") &&
|
||||
!Cluster.node.isInUseOnNode("service-hello", "node2")) {}
|
||||
|
||||
val newFoundConnections = identifyConnections(actor)
|
||||
|
||||
//it still must be 2 since a different node should have been used to failover to
|
||||
|
|
@ -89,7 +90,7 @@ class RandomFailoverMultiJvmNode1 extends MasterClusterTestNode {
|
|||
|
||||
def identifyConnections(actor: ActorRef): JSet[String] = {
|
||||
val set = new java.util.HashSet[String]
|
||||
for (i ← 0 until NrOfNodes * 2) {
|
||||
for (i ← 0 until 100) { // we should get hits from both nodes in 100 attempts, if not then not very random
|
||||
val value = (actor ? "identify").get.asInstanceOf[String]
|
||||
set.add(value)
|
||||
}
|
||||
|
|
@ -104,10 +105,13 @@ class RandomFailoverMultiJvmNode2 extends ClusterTestNode {
|
|||
"___" must {
|
||||
"___" in {
|
||||
barrier("node-start", NrOfNodes) {
|
||||
Cluster.node
|
||||
Cluster.node.start()
|
||||
}
|
||||
|
||||
barrier("actor-creation", NrOfNodes).await()
|
||||
barrier("actor-usage", NrOfNodes).await()
|
||||
|
||||
Cluster.node.isInUseOnNode("service-hello") must be(false)
|
||||
|
||||
Thread.sleep(5000) // wait for fail-over from node3
|
||||
|
||||
|
|
@ -125,10 +129,13 @@ class RandomFailoverMultiJvmNode3 extends ClusterTestNode {
|
|||
"___" must {
|
||||
"___" in {
|
||||
barrier("node-start", NrOfNodes) {
|
||||
Cluster.node
|
||||
Cluster.node.start()
|
||||
}
|
||||
|
||||
barrier("actor-creation", NrOfNodes).await()
|
||||
barrier("actor-usage", NrOfNodes).await()
|
||||
|
||||
Cluster.node.isInUseOnNode("service-hello") must be(true)
|
||||
|
||||
Cluster.node.shutdown()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1 +1 @@
|
|||
-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991
|
||||
-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991 -Dakka.event.force-sync=true
|
||||
|
|
|
|||
|
|
@ -1 +1 @@
|
|||
-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992
|
||||
-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992 -Dakka.event.force-sync=true
|
||||
|
|
|
|||
|
|
@ -1 +1 @@
|
|||
-Dakka.cluster.nodename=node3 -Dakka.cluster.port=9993
|
||||
-Dakka.cluster.nodename=node3 -Dakka.cluster.port=9993 -Dakka.event.force-sync=true
|
||||
|
|
|
|||
|
|
@ -5,6 +5,8 @@ import akka.cluster._
|
|||
import akka.actor.{ ActorRef, Actor }
|
||||
import akka.event.EventHandler
|
||||
import akka.testkit.{ EventFilter, TestEvent }
|
||||
import akka.util.duration._
|
||||
import akka.util.{ Duration, Timer }
|
||||
import java.util.{ Collections, Set ⇒ JSet }
|
||||
import java.net.ConnectException
|
||||
import java.nio.channels.NotYetConnectedException
|
||||
|
|
@ -18,17 +20,8 @@ object RoundRobinFailoverMultiJvmSpec {
|
|||
class SomeActor extends Actor with Serializable {
|
||||
|
||||
def receive = {
|
||||
case "identify" ⇒ {
|
||||
case "identify" ⇒
|
||||
self.reply(Config.nodename)
|
||||
}
|
||||
case "shutdown" ⇒ {
|
||||
new Thread() {
|
||||
override def run() {
|
||||
Thread.sleep(2000)
|
||||
Cluster.node.shutdown()
|
||||
}
|
||||
}.start()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -58,10 +51,15 @@ class RoundRobinFailoverMultiJvmNode1 extends MasterClusterTestNode {
|
|||
barrier("actor-creation", NrOfNodes) {
|
||||
actor = Actor.actorOf[SomeActor]("service-hello")
|
||||
actor.isInstanceOf[ClusterActorRef] must be(true)
|
||||
}
|
||||
|
||||
// val actor2 = Actor.registry.local.actorFor("service-hello")
|
||||
// .getOrElse(fail("Actor should have been in the local actor registry"))
|
||||
val timer = Timer(30.seconds, true)
|
||||
while (timer.isTicking &&
|
||||
!Cluster.node.isInUseOnNode("service-hello", "node1") &&
|
||||
!Cluster.node.isInUseOnNode("service-hello", "node3")) {}
|
||||
//Thread.sleep(5000) // wait for all actors to start up on other nodes
|
||||
|
||||
barrier("actor-usage", NrOfNodes) {
|
||||
Cluster.node.isInUseOnNode("service-hello") must be(true)
|
||||
oldFoundConnections = identifyConnections(actor)
|
||||
|
||||
|
|
@ -72,6 +70,11 @@ class RoundRobinFailoverMultiJvmNode1 extends MasterClusterTestNode {
|
|||
Thread.sleep(5000) // wait for fail-over from node3
|
||||
|
||||
barrier("verify-fail-over", NrOfNodes - 1) {
|
||||
val timer = Timer(30.seconds, true)
|
||||
while (timer.isTicking &&
|
||||
!Cluster.node.isInUseOnNode("service-hello", "node1") &&
|
||||
!Cluster.node.isInUseOnNode("service-hello", "node2")) {}
|
||||
|
||||
val newFoundConnections = identifyConnections(actor)
|
||||
|
||||
//it still must be 2 since a different node should have been used to failover to
|
||||
|
|
@ -90,7 +93,7 @@ class RoundRobinFailoverMultiJvmNode1 extends MasterClusterTestNode {
|
|||
|
||||
def identifyConnections(actor: ActorRef): JSet[String] = {
|
||||
val set = new java.util.HashSet[String]
|
||||
for (i ← 0 until NrOfNodes * 2) {
|
||||
for (i ← 0 until 100) {
|
||||
val value = (actor ? "identify").get.asInstanceOf[String]
|
||||
set.add(value)
|
||||
}
|
||||
|
|
@ -109,6 +112,7 @@ class RoundRobinFailoverMultiJvmNode2 extends ClusterTestNode {
|
|||
}
|
||||
|
||||
barrier("actor-creation", NrOfNodes).await()
|
||||
barrier("actor-usage", NrOfNodes).await()
|
||||
|
||||
Cluster.node.isInUseOnNode("service-hello") must be(false)
|
||||
|
||||
|
|
@ -130,6 +134,7 @@ class RoundRobinFailoverMultiJvmNode3 extends ClusterTestNode {
|
|||
}
|
||||
|
||||
barrier("actor-creation", NrOfNodes).await()
|
||||
barrier("actor-usage", NrOfNodes).await()
|
||||
|
||||
Cluster.node.isInUseOnNode("service-hello") must be(true)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue