diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala index 029b0b221d..fa70b9a134 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala @@ -3,111 +3,73 @@ */ package akka.cluster -import akka.testkit._ -import akka.dispatch._ -import akka.actor._ -import akka.remote._ +import org.scalatest.BeforeAndAfter +import com.typesafe.config.ConfigFactory +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec import akka.util.duration._ +import akka.testkit._ -import com.typesafe.config._ +object GossipingAccrualFailureDetectorMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") -import java.net.InetSocketAddress + commonConfig(debugConfig(on = false). + withFallback(ConfigFactory.parseString("akka.cluster.failure-detector.threshold=5")). + withFallback(MultiNodeClusterSpec.clusterConfig)) +} -class GossipingAccrualFailureDetectorSpec extends ClusterSpec with ImplicitSender { - val portPrefix = 2 +class GossipingAccrualFailureDetectorMultiJvmNode1 extends GossipingAccrualFailureDetectorSpec +class GossipingAccrualFailureDetectorMultiJvmNode2 extends GossipingAccrualFailureDetectorSpec +class GossipingAccrualFailureDetectorMultiJvmNode3 extends GossipingAccrualFailureDetectorSpec - var node1: Cluster = _ - var node2: Cluster = _ - var node3: Cluster = _ +abstract class GossipingAccrualFailureDetectorSpec extends MultiNodeSpec(GossipingAccrualFailureDetectorMultiJvmSpec) + with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { + import GossipingAccrualFailureDetectorMultiJvmSpec._ - var system1: ActorSystemImpl = _ - var system2: ActorSystemImpl = _ - var system3: ActorSystemImpl = _ + override def initialParticipants = 3 - try { - "A Gossip-driven Failure Detector" must { + lazy val firstAddress = node(first).address + lazy val secondAddress = node(second).address + lazy val thirdAddress = node(third).address - // ======= NODE 1 ======== - system1 = ActorSystem("system1", ConfigFactory - .parseString(""" - akka { - actor.provider = "akka.remote.RemoteActorRefProvider" - remote.netty.port=%d550 - }""".format(portPrefix)) - .withFallback(system.settings.config)) - .asInstanceOf[ActorSystemImpl] - val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider] - node1 = Cluster(system1) - val fd1 = node1.failureDetector - val address1 = node1.remoteAddress + after { + testConductor.enter("after") + } - // ======= NODE 2 ======== - system2 = ActorSystem("system2", ConfigFactory - .parseString(""" - akka { - actor.provider = "akka.remote.RemoteActorRefProvider" - remote.netty.port=%d551 - cluster.node-to-join = "akka://system1@localhost:%d550" - }""".format(portPrefix, portPrefix)) - .withFallback(system.settings.config)) - .asInstanceOf[ActorSystemImpl] - val remote2 = system2.provider.asInstanceOf[RemoteActorRefProvider] - node2 = Cluster(system2) - val fd2 = node2.failureDetector - val address2 = node2.remoteAddress + "A Gossip-driven Failure Detector" must { - // ======= NODE 3 ======== - system3 = ActorSystem("system3", ConfigFactory - .parseString(""" - akka { - actor.provider = "akka.remote.RemoteActorRefProvider" - remote.netty.port=%d552 - cluster.node-to-join = "akka://system1@localhost:%d550" - }""".format(portPrefix, portPrefix)) - .withFallback(system.settings.config)) - .asInstanceOf[ActorSystemImpl] - val remote3 = system3.provider.asInstanceOf[RemoteActorRefProvider] - node3 = Cluster(system3) - val fd3 = node3.failureDetector - val address3 = node3.remoteAddress + "receive gossip heartbeats so that all healthy systems in the cluster are marked 'available'" taggedAs LongRunningTest in { + // make sure that the node-to-join is started before other join + runOn(first) { + cluster.self + } + testConductor.enter("first-started") - "receive gossip heartbeats so that all healthy systems in the cluster are marked 'available'" taggedAs LongRunningTest in { - println("Let the systems gossip for a while...") - Thread.sleep(30.seconds.dilated.toMillis) // let them gossip for 30 seconds - fd1.isAvailable(address2) must be(true) - fd1.isAvailable(address3) must be(true) - fd2.isAvailable(address1) must be(true) - fd2.isAvailable(address3) must be(true) - fd3.isAvailable(address1) must be(true) - fd3.isAvailable(address2) must be(true) + cluster.join(firstAddress) + + log.info("Let the systems gossip for a while...") + 10.seconds.dilated.sleep // let them gossip + cluster.failureDetector.isAvailable(firstAddress) must be(true) + cluster.failureDetector.isAvailable(secondAddress) must be(true) + cluster.failureDetector.isAvailable(thirdAddress) must be(true) + } + + "mark system as 'unavailable' if a system in the cluster is shut down (and its heartbeats stops)" taggedAs LongRunningTest in { + runOn(first) { + testConductor.shutdown(third, 0) + testConductor.removeNode(third) } - "mark system as 'unavailable' if a system in the cluster is shut down (and its heartbeats stops)" taggedAs LongRunningTest in { - // shut down system3 - node3.shutdown() - system3.shutdown() - println("Give the remaning systems time to detect failure...") - Thread.sleep(30.seconds.dilated.toMillis) // give them 30 seconds to detect failure of system3 - fd1.isAvailable(address2) must be(true) - fd1.isAvailable(address3) must be(false) - fd2.isAvailable(address1) must be(true) - fd2.isAvailable(address3) must be(false) + runOn(first, second) { + log.info("Give the remaning systems time to detect failure...") + 15.seconds.dilated.sleep // give them time to detect failure + cluster.failureDetector.isAvailable(firstAddress) must be(true) + cluster.failureDetector.isAvailable(secondAddress) must be(true) + cluster.failureDetector.isAvailable(thirdAddress) must be(false) } } - } catch { - case e: Exception ⇒ - e.printStackTrace - fail(e.toString) } - override def atTermination() { - if (node1 ne null) node1.shutdown() - if (system1 ne null) system1.shutdown() - - if (node2 ne null) node2.shutdown() - if (system2 ne null) system2.shutdown() - - if (node3 ne null) node3.shutdown() - if (system3 ne null) system3.shutdown() - } }