From c16bd740731c19950e3d8588321dc58f2f169b0d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Fri, 25 May 2012 17:27:24 +0200 Subject: [PATCH 1/5] Added more logging to Cluster's who to gossip selection process. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jonas Bonér --- akka-cluster/src/main/scala/akka/cluster/Cluster.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 76e3356143..e7d672d051 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -795,6 +795,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ * @return 'true' if it gossiped to a "deputy" member. */ private def gossipToRandomNodeOf(addresses: Iterable[Address]): Boolean = { + log.debug("Cluster Node [{}] - Selecting random node to gossip to [{}]", remoteAddress, addresses.mkString(", ")) if (addresses.isEmpty) false else { val peers = addresses filter (_ != remoteAddress) // filter out myself From d99b1cd7f00d0d091b91195226369fb0e798c1bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Fri, 25 May 2012 17:29:29 +0200 Subject: [PATCH 2/5] Rewritten old in-memory ClientDowningSpec into multi-node specs. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Split it up into two different specs: - ClientDowningNodeThatIsUnreachableMultiJvmSpec - ClientDowningNodeThatIsUpMultiJvmSpec Signed-off-by: Jonas Bonér --- ...ientDowningNodeThatIsUnreachableSpec.scala | 111 ++++++++++++++ .../ClientDowningNodeThatIsUpSpec.scala | 108 +++++++++++++ .../akka/cluster/ClientDowningSpec.scala | 145 ------------------ 3 files changed, 219 insertions(+), 145 deletions(-) create mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala create mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala delete mode 100644 akka-cluster/src/test/scala/akka/cluster/ClientDowningSpec.scala diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala new file mode 100644 index 0000000000..8e02420050 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala @@ -0,0 +1,111 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.util.duration._ +import akka.actor.Address + +object ClientDowningNodeThatIsUnreachableMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + + val waitForConvergence = 20 seconds + + commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(""" + akka { + #loglevel = "DEBUG" + #stdout-loglevel = "DEBUG" + cluster { + gossip-frequency = 100 ms + leader-actions-frequency = 100 ms + periodic-tasks-initial-delay = 300 ms + auto-down = off + } + } + """))) +} + +class ClientDowningNodeThatIsUnreachableMultiJvmNode1 extends ClientDowningNodeThatIsUnreachableSpec +class ClientDowningNodeThatIsUnreachableMultiJvmNode2 extends ClientDowningNodeThatIsUnreachableSpec +class ClientDowningNodeThatIsUnreachableMultiJvmNode3 extends ClientDowningNodeThatIsUnreachableSpec +class ClientDowningNodeThatIsUnreachableMultiJvmNode4 extends ClientDowningNodeThatIsUnreachableSpec + +class ClientDowningNodeThatIsUnreachableSpec extends MultiNodeSpec(ClientDowningNodeThatIsUnreachableMultiJvmSpec) with ImplicitSender with BeforeAndAfter { + import ClientDowningNodeThatIsUnreachableMultiJvmSpec._ + + override def initialParticipants = 4 + + def node = Cluster(system) + + def assertMemberRing(nrOfMembers: Int, canNotBePartOfRing: Seq[Address] = Seq.empty[Address]): Unit = { + awaitCond(node.latestGossip.members.size == nrOfMembers, waitForConvergence) + awaitCond(node.latestGossip.members.forall(_.status == MemberStatus.Up), waitForConvergence) + awaitCond(canNotBePartOfRing forall (address => !(node.latestGossip.members exists (_.address == address))), waitForConvergence) + } + + "Client of a 4 node cluster" must { + + "be able to DOWN a node that is UNREACHABLE (killed)" taggedAs LongRunningTest in { + runOn(first) { + node.self + assertMemberRing(nrOfMembers = 4) + testConductor.enter("all-up") + + val thirdAddress = node(third).address + + // kill 'third' node + testConductor.shutdown(third, 0) + testConductor.removeNode(third) + + // mark 'third' node as DOWN + node.down(thirdAddress) + testConductor.enter("down-third-node") + + assertMemberRing(nrOfMembers = 3, canNotBePartOfRing = Seq(thirdAddress)) + node.latestGossip.members.exists(_.address == thirdAddress) must be(false) + testConductor.enter("await-completion") + } + + runOn(second) { + node.join(node(first).address) + + assertMemberRing(nrOfMembers = 4) + testConductor.enter("all-up") + + val thirdAddress = node(third).address + testConductor.enter("down-third-node") + + assertMemberRing(nrOfMembers = 3, canNotBePartOfRing = Seq(thirdAddress)) + testConductor.enter("await-completion") + } + + runOn(third) { + node.join(node(first).address) + + assertMemberRing(nrOfMembers = 4) + testConductor.enter("all-up") + } + + runOn(fourth) { + node.join(node(first).address) + + assertMemberRing(nrOfMembers = 4) + testConductor.enter("all-up") + + val thirdAddress = node(third).address + testConductor.enter("down-third-node") + + assertMemberRing(nrOfMembers = 3, canNotBePartOfRing = Seq(thirdAddress)) + testConductor.enter("await-completion") + } + } + } +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala new file mode 100644 index 0000000000..52d37a4ed3 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala @@ -0,0 +1,108 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.util.duration._ +import akka.actor.Address + +object ClientDowningNodeThatIsUpMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + + val waitForConvergence = 20 seconds + + commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(""" + akka { + #loglevel = "DEBUG" + #stdout-loglevel = "DEBUG" + cluster { + gossip-frequency = 100 ms + leader-actions-frequency = 100 ms + periodic-tasks-initial-delay = 300 ms + auto-down = off + } + } + """))) +} + +class ClientDowningNodeThatIsUpMultiJvmNode1 extends ClientDowningNodeThatIsUpSpec +class ClientDowningNodeThatIsUpMultiJvmNode2 extends ClientDowningNodeThatIsUpSpec +class ClientDowningNodeThatIsUpMultiJvmNode3 extends ClientDowningNodeThatIsUpSpec +class ClientDowningNodeThatIsUpMultiJvmNode4 extends ClientDowningNodeThatIsUpSpec + +class ClientDowningNodeThatIsUpSpec extends MultiNodeSpec(ClientDowningNodeThatIsUpMultiJvmSpec) with ImplicitSender with BeforeAndAfter { + import ClientDowningNodeThatIsUpMultiJvmSpec._ + + override def initialParticipants = 4 + + def node = Cluster(system) + + def assertMemberRing(nrOfMembers: Int, canNotBePartOfRing: Seq[Address] = Seq.empty[Address]): Unit = { + awaitCond(node.latestGossip.members.size == nrOfMembers, waitForConvergence) + awaitCond(node.latestGossip.members.forall(_.status == MemberStatus.Up), waitForConvergence) + awaitCond(canNotBePartOfRing forall (address => !(node.latestGossip.members exists (_.address.port == address.port))), waitForConvergence) + } + + "Client of a 4 node cluster" must { + + "be able to DOWN a node that is UP (healthy and available)" taggedAs LongRunningTest in { + runOn(first) { + node.self + assertMemberRing(nrOfMembers = 4) + testConductor.enter("all-up") + + val thirdAddress = node(third).address + + // mark 'third' node as DOWN + testConductor.removeNode(third) + node.down(thirdAddress) + testConductor.enter("down-third-node") + + assertMemberRing(nrOfMembers = 3, canNotBePartOfRing = Seq(thirdAddress)) + node.latestGossip.members.exists(_.address == thirdAddress) must be(false) + testConductor.enter("await-completion") + } + + runOn(second) { + node.join(node(first).address) + + assertMemberRing(nrOfMembers = 4) + testConductor.enter("all-up") + + val thirdAddress = node(third).address + testConductor.enter("down-third-node") + + assertMemberRing(nrOfMembers = 3, canNotBePartOfRing = Seq(thirdAddress)) + testConductor.enter("await-completion") + } + + runOn(third) { + node.join(node(first).address) + + assertMemberRing(nrOfMembers = 4) + testConductor.enter("all-up") + } + + runOn(fourth) { + node.join(node(first).address) + + assertMemberRing(nrOfMembers = 4) + testConductor.enter("all-up") + + val thirdAddress = node(third).address + testConductor.enter("down-third-node") + + assertMemberRing(nrOfMembers = 3, canNotBePartOfRing = Seq(thirdAddress)) + testConductor.enter("await-completion") + } + } + } +} diff --git a/akka-cluster/src/test/scala/akka/cluster/ClientDowningSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClientDowningSpec.scala deleted file mode 100644 index 0e7b0ed330..0000000000 --- a/akka-cluster/src/test/scala/akka/cluster/ClientDowningSpec.scala +++ /dev/null @@ -1,145 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package akka.cluster - -import akka.testkit._ -import akka.dispatch._ -import akka.actor._ -import akka.remote._ -import akka.util.duration._ - -import com.typesafe.config._ - -import java.net.InetSocketAddress - -class ClientDowningSpec extends ClusterSpec("akka.cluster.auto-down = off") with ImplicitSender { - val portPrefix = 1 - - var node1: Cluster = _ - var node2: Cluster = _ - var node3: Cluster = _ - var node4: Cluster = _ - - var system1: ActorSystemImpl = _ - var system2: ActorSystemImpl = _ - var system3: ActorSystemImpl = _ - var system4: ActorSystemImpl = _ - - try { - "Client of a 4 node cluster" must { - - // ======= NODE 1 ======== - system1 = ActorSystem("system1", ConfigFactory - .parseString(""" - akka { - actor.provider = "akka.remote.RemoteActorRefProvider" - remote.netty.port = %d550 - }""".format(portPrefix)) - .withFallback(system.settings.config)) - .asInstanceOf[ActorSystemImpl] - val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider] - node1 = Cluster(system1) - val fd1 = node1.failureDetector - val address1 = node1.remoteAddress - - // ======= NODE 2 ======== - system2 = ActorSystem("system2", ConfigFactory - .parseString(""" - akka { - actor.provider = "akka.remote.RemoteActorRefProvider" - remote.netty.port = %d551 - cluster.node-to-join = "akka://system1@localhost:%d550" - }""".format(portPrefix, portPrefix)) - .withFallback(system.settings.config)) - .asInstanceOf[ActorSystemImpl] - val remote2 = system2.provider.asInstanceOf[RemoteActorRefProvider] - node2 = Cluster(system2) - val fd2 = node2.failureDetector - val address2 = node2.remoteAddress - - // ======= NODE 3 ======== - system3 = ActorSystem("system3", ConfigFactory - .parseString(""" - akka { - actor.provider = "akka.remote.RemoteActorRefProvider" - remote.netty.port = %d552 - cluster.node-to-join = "akka://system1@localhost:%d550" - }""".format(portPrefix, portPrefix)) - .withFallback(system.settings.config)) - .asInstanceOf[ActorSystemImpl] - val remote3 = system3.provider.asInstanceOf[RemoteActorRefProvider] - node3 = Cluster(system3) - val fd3 = node3.failureDetector - val address3 = node3.remoteAddress - - // ======= NODE 4 ======== - system4 = ActorSystem("system4", ConfigFactory - .parseString(""" - akka { - actor.provider = "akka.remote.RemoteActorRefProvider" - remote.netty.port = %d553 - cluster.node-to-join = "akka://system1@localhost:%d550" - }""".format(portPrefix, portPrefix)) - .withFallback(system.settings.config)) - .asInstanceOf[ActorSystemImpl] - val remote4 = system4.provider.asInstanceOf[RemoteActorRefProvider] - node4 = Cluster(system4) - val fd4 = node4.failureDetector - val address4 = node4.remoteAddress - - "be able to DOWN a node that is UP" taggedAs LongRunningTest in { - println("Give the system time to converge...") - awaitConvergence(node1 :: node2 :: node3 :: node4 :: Nil) - - node3.shutdown() - system3.shutdown() - - // client marks node3 as DOWN - node1.down(address3) - - println("Give the system time to converge...") - Thread.sleep(10.seconds.dilated.toMillis) - awaitConvergence(node1 :: node2 :: node4 :: Nil) - - node1.latestGossip.members.size must be(3) - node1.latestGossip.members.exists(_.address == address3) must be(false) - } - - "be able to DOWN a node that is UNREACHABLE" taggedAs LongRunningTest in { - node4.shutdown() - system4.shutdown() - - // clien marks node4 as DOWN - node2.down(address4) - - println("Give the system time to converge...") - Thread.sleep(10.seconds.dilated.toMillis) - awaitConvergence(node1 :: node2 :: Nil) - - node1.latestGossip.members.size must be(2) - node1.latestGossip.members.exists(_.address == address4) must be(false) - node1.latestGossip.members.exists(_.address == address3) must be(false) - } - } - } catch { - case e: Exception ⇒ - e.printStackTrace - fail(e.toString) - } - - override def atTermination() { - if (node1 ne null) node1.shutdown() - if (system1 ne null) system1.shutdown() - - if (node2 ne null) node2.shutdown() - if (system2 ne null) system2.shutdown() - - if (node3 ne null) node3.shutdown() - if (system3 ne null) system3.shutdown() - - if (node4 ne null) node4.shutdown() - if (system4 ne null) system4.shutdown() - } -} From ea99a1f315e716da9782ec86b10c96800fc75196 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Fri, 25 May 2012 17:43:03 +0200 Subject: [PATCH 3/5] Simplified config and removed old too-long timeout. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jonas Bonér --- ...ientDowningNodeThatIsUnreachableSpec.scala | 22 +++++++------------ .../ClientDowningNodeThatIsUpSpec.scala | 22 +++++++------------ 2 files changed, 16 insertions(+), 28 deletions(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala index 8e02420050..95510a701d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala @@ -17,18 +17,12 @@ object ClientDowningNodeThatIsUnreachableMultiJvmSpec extends MultiNodeConfig { val third = role("third") val fourth = role("fourth") - val waitForConvergence = 20 seconds - commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(""" - akka { - #loglevel = "DEBUG" - #stdout-loglevel = "DEBUG" - cluster { - gossip-frequency = 100 ms - leader-actions-frequency = 100 ms - periodic-tasks-initial-delay = 300 ms - auto-down = off - } + akka.cluster { + gossip-frequency = 100 ms + leader-actions-frequency = 100 ms + periodic-tasks-initial-delay = 300 ms + auto-down = off } """))) } @@ -46,9 +40,9 @@ class ClientDowningNodeThatIsUnreachableSpec extends MultiNodeSpec(ClientDowning def node = Cluster(system) def assertMemberRing(nrOfMembers: Int, canNotBePartOfRing: Seq[Address] = Seq.empty[Address]): Unit = { - awaitCond(node.latestGossip.members.size == nrOfMembers, waitForConvergence) - awaitCond(node.latestGossip.members.forall(_.status == MemberStatus.Up), waitForConvergence) - awaitCond(canNotBePartOfRing forall (address => !(node.latestGossip.members exists (_.address == address))), waitForConvergence) + awaitCond(node.latestGossip.members.size == nrOfMembers) + awaitCond(node.latestGossip.members.forall(_.status == MemberStatus.Up)) + awaitCond(canNotBePartOfRing forall (address => !(node.latestGossip.members exists (_.address == address)))) } "Client of a 4 node cluster" must { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala index 52d37a4ed3..b92a45f2e4 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala @@ -17,18 +17,12 @@ object ClientDowningNodeThatIsUpMultiJvmSpec extends MultiNodeConfig { val third = role("third") val fourth = role("fourth") - val waitForConvergence = 20 seconds - commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(""" - akka { - #loglevel = "DEBUG" - #stdout-loglevel = "DEBUG" - cluster { - gossip-frequency = 100 ms - leader-actions-frequency = 100 ms - periodic-tasks-initial-delay = 300 ms - auto-down = off - } + akka.cluster { + gossip-frequency = 100 ms + leader-actions-frequency = 100 ms + periodic-tasks-initial-delay = 300 ms + auto-down = off } """))) } @@ -46,9 +40,9 @@ class ClientDowningNodeThatIsUpSpec extends MultiNodeSpec(ClientDowningNodeThatI def node = Cluster(system) def assertMemberRing(nrOfMembers: Int, canNotBePartOfRing: Seq[Address] = Seq.empty[Address]): Unit = { - awaitCond(node.latestGossip.members.size == nrOfMembers, waitForConvergence) - awaitCond(node.latestGossip.members.forall(_.status == MemberStatus.Up), waitForConvergence) - awaitCond(canNotBePartOfRing forall (address => !(node.latestGossip.members exists (_.address.port == address.port))), waitForConvergence) + awaitCond(node.latestGossip.members.size == nrOfMembers) + awaitCond(node.latestGossip.members.forall(_.status == MemberStatus.Up)) + awaitCond(canNotBePartOfRing forall (address => !(node.latestGossip.members exists (_.address.port == address.port)))) } "Client of a 4 node cluster" must { From 50806243903f6242878a6d7206a92d92016068f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Sun, 27 May 2012 21:22:30 +0200 Subject: [PATCH 4/5] Incorporated feedback - switched to MultiNodeClusterSpec etc. --- ...ientDowningNodeThatIsUnreachableSpec.scala | 60 ++++++------------- .../ClientDowningNodeThatIsUpSpec.scala | 60 ++++++------------- .../akka/cluster/MultiNodeClusterSpec.scala | 14 ++++- 3 files changed, 47 insertions(+), 87 deletions(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala index 95510a701d..a80c0a3caa 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala @@ -8,7 +8,6 @@ import org.scalatest.BeforeAndAfter import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ -import akka.util.duration._ import akka.actor.Address object ClientDowningNodeThatIsUnreachableMultiJvmSpec extends MultiNodeConfig { @@ -17,14 +16,9 @@ object ClientDowningNodeThatIsUnreachableMultiJvmSpec extends MultiNodeConfig { val third = role("third") val fourth = role("fourth") - commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(""" - akka.cluster { - gossip-frequency = 100 ms - leader-actions-frequency = 100 ms - periodic-tasks-initial-delay = 300 ms - auto-down = off - } - """))) + commonConfig(debugConfig(on = false). + withFallback(ConfigFactory.parseString("akka.cluster.auto-down = off")). + withFallback(MultiNodeClusterSpec.clusterConfig)) } class ClientDowningNodeThatIsUnreachableMultiJvmNode1 extends ClientDowningNodeThatIsUnreachableSpec @@ -32,25 +26,20 @@ class ClientDowningNodeThatIsUnreachableMultiJvmNode2 extends ClientDowningNodeT class ClientDowningNodeThatIsUnreachableMultiJvmNode3 extends ClientDowningNodeThatIsUnreachableSpec class ClientDowningNodeThatIsUnreachableMultiJvmNode4 extends ClientDowningNodeThatIsUnreachableSpec -class ClientDowningNodeThatIsUnreachableSpec extends MultiNodeSpec(ClientDowningNodeThatIsUnreachableMultiJvmSpec) with ImplicitSender with BeforeAndAfter { +class ClientDowningNodeThatIsUnreachableSpec + extends MultiNodeSpec(ClientDowningNodeThatIsUnreachableMultiJvmSpec) + with MultiNodeClusterSpec + with ImplicitSender with BeforeAndAfter { import ClientDowningNodeThatIsUnreachableMultiJvmSpec._ override def initialParticipants = 4 - def node = Cluster(system) - - def assertMemberRing(nrOfMembers: Int, canNotBePartOfRing: Seq[Address] = Seq.empty[Address]): Unit = { - awaitCond(node.latestGossip.members.size == nrOfMembers) - awaitCond(node.latestGossip.members.forall(_.status == MemberStatus.Up)) - awaitCond(canNotBePartOfRing forall (address => !(node.latestGossip.members exists (_.address == address)))) - } - "Client of a 4 node cluster" must { "be able to DOWN a node that is UNREACHABLE (killed)" taggedAs LongRunningTest in { runOn(first) { - node.self - assertMemberRing(nrOfMembers = 4) + cluster.self + awaitUpConvergence(nrOfMembers = 4) testConductor.enter("all-up") val thirdAddress = node(third).address @@ -60,44 +49,31 @@ class ClientDowningNodeThatIsUnreachableSpec extends MultiNodeSpec(ClientDowning testConductor.removeNode(third) // mark 'third' node as DOWN - node.down(thirdAddress) + cluster.down(thirdAddress) testConductor.enter("down-third-node") - assertMemberRing(nrOfMembers = 3, canNotBePartOfRing = Seq(thirdAddress)) - node.latestGossip.members.exists(_.address == thirdAddress) must be(false) - testConductor.enter("await-completion") - } - - runOn(second) { - node.join(node(first).address) - - assertMemberRing(nrOfMembers = 4) - testConductor.enter("all-up") - - val thirdAddress = node(third).address - testConductor.enter("down-third-node") - - assertMemberRing(nrOfMembers = 3, canNotBePartOfRing = Seq(thirdAddress)) + awaitUpConvergence(nrOfMembers = 3, canNotBePartOfRing = Seq(thirdAddress)) + cluster.latestGossip.members.exists(_.address == thirdAddress) must be(false) testConductor.enter("await-completion") } runOn(third) { - node.join(node(first).address) + cluster.join(node(first).address) - assertMemberRing(nrOfMembers = 4) + awaitUpConvergence(nrOfMembers = 4) testConductor.enter("all-up") } - runOn(fourth) { - node.join(node(first).address) + runOn(second, fourth) { + cluster.join(node(first).address) - assertMemberRing(nrOfMembers = 4) + awaitUpConvergence(nrOfMembers = 4) testConductor.enter("all-up") val thirdAddress = node(third).address testConductor.enter("down-third-node") - assertMemberRing(nrOfMembers = 3, canNotBePartOfRing = Seq(thirdAddress)) + awaitUpConvergence(nrOfMembers = 3, canNotBePartOfRing = Seq(thirdAddress)) testConductor.enter("await-completion") } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala index b92a45f2e4..adfc7aa514 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala @@ -8,7 +8,6 @@ import org.scalatest.BeforeAndAfter import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ -import akka.util.duration._ import akka.actor.Address object ClientDowningNodeThatIsUpMultiJvmSpec extends MultiNodeConfig { @@ -17,14 +16,9 @@ object ClientDowningNodeThatIsUpMultiJvmSpec extends MultiNodeConfig { val third = role("third") val fourth = role("fourth") - commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(""" - akka.cluster { - gossip-frequency = 100 ms - leader-actions-frequency = 100 ms - periodic-tasks-initial-delay = 300 ms - auto-down = off - } - """))) + commonConfig(debugConfig(on = false). + withFallback(ConfigFactory.parseString("akka.cluster.auto-down = off")). + withFallback(MultiNodeClusterSpec.clusterConfig)) } class ClientDowningNodeThatIsUpMultiJvmNode1 extends ClientDowningNodeThatIsUpSpec @@ -32,69 +26,51 @@ class ClientDowningNodeThatIsUpMultiJvmNode2 extends ClientDowningNodeThatIsUpSp class ClientDowningNodeThatIsUpMultiJvmNode3 extends ClientDowningNodeThatIsUpSpec class ClientDowningNodeThatIsUpMultiJvmNode4 extends ClientDowningNodeThatIsUpSpec -class ClientDowningNodeThatIsUpSpec extends MultiNodeSpec(ClientDowningNodeThatIsUpMultiJvmSpec) with ImplicitSender with BeforeAndAfter { +class ClientDowningNodeThatIsUpSpec + extends MultiNodeSpec(ClientDowningNodeThatIsUpMultiJvmSpec) + with MultiNodeClusterSpec + with ImplicitSender with BeforeAndAfter { import ClientDowningNodeThatIsUpMultiJvmSpec._ override def initialParticipants = 4 - def node = Cluster(system) - - def assertMemberRing(nrOfMembers: Int, canNotBePartOfRing: Seq[Address] = Seq.empty[Address]): Unit = { - awaitCond(node.latestGossip.members.size == nrOfMembers) - awaitCond(node.latestGossip.members.forall(_.status == MemberStatus.Up)) - awaitCond(canNotBePartOfRing forall (address => !(node.latestGossip.members exists (_.address.port == address.port)))) - } - "Client of a 4 node cluster" must { "be able to DOWN a node that is UP (healthy and available)" taggedAs LongRunningTest in { runOn(first) { - node.self - assertMemberRing(nrOfMembers = 4) + cluster.self + awaitUpConvergence(nrOfMembers = 4) testConductor.enter("all-up") val thirdAddress = node(third).address // mark 'third' node as DOWN testConductor.removeNode(third) - node.down(thirdAddress) + cluster.down(thirdAddress) testConductor.enter("down-third-node") - assertMemberRing(nrOfMembers = 3, canNotBePartOfRing = Seq(thirdAddress)) - node.latestGossip.members.exists(_.address == thirdAddress) must be(false) - testConductor.enter("await-completion") - } - - runOn(second) { - node.join(node(first).address) - - assertMemberRing(nrOfMembers = 4) - testConductor.enter("all-up") - - val thirdAddress = node(third).address - testConductor.enter("down-third-node") - - assertMemberRing(nrOfMembers = 3, canNotBePartOfRing = Seq(thirdAddress)) + awaitUpConvergence(nrOfMembers = 3, canNotBePartOfRing = Seq(thirdAddress)) + cluster.latestGossip.members.exists(_.address == thirdAddress) must be(false) testConductor.enter("await-completion") } runOn(third) { - node.join(node(first).address) + cluster.join(node(first).address) - assertMemberRing(nrOfMembers = 4) + awaitUpConvergence(nrOfMembers = 4) testConductor.enter("all-up") } - runOn(fourth) { - node.join(node(first).address) + runOn(second, fourth) { + cluster.join(node(first).address) - assertMemberRing(nrOfMembers = 4) + awaitUpConvergence(nrOfMembers = 4) testConductor.enter("all-up") val thirdAddress = node(third).address testConductor.enter("down-third-node") - assertMemberRing(nrOfMembers = 3, canNotBePartOfRing = Seq(thirdAddress)) + awaitUpConvergence(nrOfMembers = 3, canNotBePartOfRing = Seq(thirdAddress)) testConductor.enter("await-completion") } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 873d819dbb..cadbb7b298 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -51,8 +51,7 @@ trait MultiNodeClusterSpec { self: MultiNodeSpec ⇒ } /** - * Wait until the expected number of members has status Up - * and convergence has been reached. + * Wait until the expected number of members has status Up and convergence has been reached. */ def awaitUpConvergence(numberOfMembers: Int): Unit = { awaitCond(cluster.latestGossip.members.size == numberOfMembers) @@ -60,4 +59,13 @@ trait MultiNodeClusterSpec { self: MultiNodeSpec ⇒ awaitCond(cluster.convergence.isDefined, 10 seconds) } -} \ No newline at end of file + /** + * Wait until the expected number of members has status Up and convergence has been reached. + * Also asserts that nodes in the 'canNotBePartOfRing' are *not* part of the cluster ring. + */ + def awaitUpConvergence(nrOfMembers: Int, canNotBePartOfRing: Seq[Address] = Seq.empty[Address]): Unit = { + awaitCond(cluster.latestGossip.members.size == nrOfMembers) + awaitCond(cluster.latestGossip.members.forall(_.status == MemberStatus.Up)) + awaitCond(canNotBePartOfRing forall (address => !(cluster.latestGossip.members exists (_.address.port == address.port)))) + } +} From 4ec49f6ac1d8a63bae380262d4bc9e175073da9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Sun, 27 May 2012 21:55:33 +0200 Subject: [PATCH 5/5] Fixed indeterministic ordering bug in test --- .../cluster/ClientDowningNodeThatIsUnreachableSpec.scala | 6 +++--- .../scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala | 7 +++---- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala index a80c0a3caa..3a4148e3f0 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala @@ -40,9 +40,9 @@ class ClientDowningNodeThatIsUnreachableSpec runOn(first) { cluster.self awaitUpConvergence(nrOfMembers = 4) - testConductor.enter("all-up") val thirdAddress = node(third).address + testConductor.enter("all-up") // kill 'third' node testConductor.shutdown(third, 0) @@ -66,11 +66,11 @@ class ClientDowningNodeThatIsUnreachableSpec runOn(second, fourth) { cluster.join(node(first).address) - awaitUpConvergence(nrOfMembers = 4) - testConductor.enter("all-up") val thirdAddress = node(third).address + testConductor.enter("all-up") + testConductor.enter("down-third-node") awaitUpConvergence(nrOfMembers = 3, canNotBePartOfRing = Seq(thirdAddress)) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala index adfc7aa514..0f48951305 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala @@ -40,9 +40,9 @@ class ClientDowningNodeThatIsUpSpec runOn(first) { cluster.self awaitUpConvergence(nrOfMembers = 4) - testConductor.enter("all-up") val thirdAddress = node(third).address + testConductor.enter("all-up") // mark 'third' node as DOWN testConductor.removeNode(third) @@ -56,18 +56,17 @@ class ClientDowningNodeThatIsUpSpec runOn(third) { cluster.join(node(first).address) - awaitUpConvergence(nrOfMembers = 4) testConductor.enter("all-up") } runOn(second, fourth) { cluster.join(node(first).address) - awaitUpConvergence(nrOfMembers = 4) - testConductor.enter("all-up") val thirdAddress = node(third).address + testConductor.enter("all-up") + testConductor.enter("down-third-node") awaitUpConvergence(nrOfMembers = 3, canNotBePartOfRing = Seq(thirdAddress))