From b569869b61aa64cd4d0027131697d1a336c03a34 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 25 May 2012 12:10:17 +0200 Subject: [PATCH 1/4] Use better sort order of members. See #2133 --- .../src/main/scala/akka/cluster/Cluster.scala | 12 +++++++- .../test/scala/akka/cluster/MemberSpec.scala | 30 +++++++++++++++++++ 2 files changed, 41 insertions(+), 1 deletion(-) create mode 100644 akka-cluster/src/test/scala/akka/cluster/MemberSpec.scala diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 76e3356143..55f9967596 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -100,7 +100,17 @@ class Member(val address: Address, val status: MemberStatus) extends ClusterMess object Member { import MemberStatus._ - implicit val ordering = Ordering.fromLessThan[Member](_.address.toString < _.address.toString) + implicit val addressOrdering: Ordering[Address] = Ordering.fromLessThan[Address] { (a, b) ⇒ + if (a.protocol < b.protocol) true + else if (a.system < b.system) true + else if (a.host.getOrElse("") < b.host.getOrElse("")) true + else if (a.port.getOrElse(0) < b.port.getOrElse(0)) true + else false + } + + implicit val ordering: Ordering[Member] = new Ordering[Member] { + def compare(x: Member, y: Member) = addressOrdering.compare(x.address, y.address) + } def apply(address: Address, status: MemberStatus): Member = new Member(address, status) diff --git a/akka-cluster/src/test/scala/akka/cluster/MemberSpec.scala b/akka-cluster/src/test/scala/akka/cluster/MemberSpec.scala new file mode 100644 index 0000000000..a75ead0149 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/MemberSpec.scala @@ -0,0 +1,30 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers +import akka.actor.Address +import scala.util.Random + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class MemberSpec extends WordSpec with MustMatchers { + + "Member" must { + + "be sorted by address correctly" in { + import Member.ordering + val m1 = Member(Address("akka", "sys1", "host1", 9000), MemberStatus.Up) + val m2 = Member(Address("akka", "sys1", "host1", 10000), MemberStatus.Up) + val m3 = Member(Address("cluster", "sys1", "host1", 10000), MemberStatus.Up) + val m4 = Member(Address("cluster", "sys1", "host2", 10000), MemberStatus.Up) + val m5 = Member(Address("cluster", "sys2", "host2", 10000), MemberStatus.Up) + + val expected = IndexedSeq(m1, m2, m3, m4, m5) + val shuffled = Random.shuffle(expected) + shuffled.sorted must be(expected) + } + } +} From 829783f359ffb6b1a093d3b4dcb4a5bc07b9ea91 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 25 May 2012 12:10:37 +0200 Subject: [PATCH 2/4] Remove port awareness in asserts of leader and members. See #2133 * Extracted common parts to MultiNodeClusterSpec --- .../akka/cluster/JoinTwoClustersSpec.scala | 27 +++----- .../MembershipChangeListenerSpec.scala | 13 +--- .../akka/cluster/MultiNodeClusterSpec.scala | 63 +++++++++++++++++++ .../akka/cluster/NodeMembershipSpec.scala | 26 ++------ .../scala/akka/cluster/NodeStartupSpec.scala | 12 +--- 5 files changed, 80 insertions(+), 61 deletions(-) create mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala index 9ed003944f..c15af5e651 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala @@ -18,13 +18,7 @@ object JoinTwoClustersMultiJvmSpec extends MultiNodeConfig { val c1 = role("c1") val c2 = role("c2") - commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(""" - akka.cluster { - gossip-frequency = 200 ms - leader-actions-frequency = 200 ms - periodic-tasks-initial-delay = 300 ms - } - """))) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } @@ -35,13 +29,11 @@ class JoinTwoClustersMultiJvmNode4 extends JoinTwoClustersSpec class JoinTwoClustersMultiJvmNode5 extends JoinTwoClustersSpec class JoinTwoClustersMultiJvmNode6 extends JoinTwoClustersSpec -abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvmSpec) with ImplicitSender with BeforeAndAfter { +abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { import JoinTwoClustersMultiJvmSpec._ override def initialParticipants = 6 - def cluster: Cluster = Cluster(system) - after { testConductor.enter("after") } @@ -50,12 +42,6 @@ abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvm val b1Address = node(b1).address val c1Address = node(c1).address - def awaitUpConvergence(numberOfMembers: Int): Unit = { - awaitCond(cluster.latestGossip.members.size == numberOfMembers) - awaitCond(cluster.latestGossip.members.forall(_.status == MemberStatus.Up)) - awaitCond(cluster.convergence.isDefined) - } - "Three different clusters (A, B and C)" must { "be able to 'elect' a single leader after joining (A -> B)" in { @@ -72,7 +58,9 @@ abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvm awaitUpConvergence(numberOfMembers = 2) - cluster.isLeader must be(ifNode(a1, b1, c1)(true)(false)) + assertLeader(a1, a2) + assertLeader(b1, b2) + assertLeader(c1, c2) runOn(b2) { cluster.join(a1Address) @@ -82,7 +70,8 @@ abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvm awaitUpConvergence(numberOfMembers = 4) } - cluster.isLeader must be(ifNode(a1, c1)(true)(false)) + assertLeader(a1, a2, b1, b2) + assertLeader(c1, c2) } @@ -94,7 +83,7 @@ abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvm awaitUpConvergence(numberOfMembers = 6) - cluster.isLeader must be(ifNode(a1)(true)(false)) + assertLeader(a1, a2, b1, b2, c1, c2) } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala index c5ae2f11e7..844d2803a1 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala @@ -16,13 +16,7 @@ object MembershipChangeListenerMultiJvmSpec extends MultiNodeConfig { val second = role("second") val third = role("third") - commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(""" - akka.cluster { - gossip-frequency = 200 ms - leader-actions-frequency = 200 ms - periodic-tasks-initial-delay = 300 ms - } - """))) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } @@ -30,13 +24,12 @@ class MembershipChangeListenerMultiJvmNode1 extends MembershipChangeListenerSpec class MembershipChangeListenerMultiJvmNode2 extends MembershipChangeListenerSpec class MembershipChangeListenerMultiJvmNode3 extends MembershipChangeListenerSpec -abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChangeListenerMultiJvmSpec) with ImplicitSender with BeforeAndAfter { +abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChangeListenerMultiJvmSpec) + with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { import MembershipChangeListenerMultiJvmSpec._ override def initialParticipants = 3 - def cluster: Cluster = Cluster(system) - after { testConductor.enter("after") } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala new file mode 100644 index 0000000000..873d819dbb --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -0,0 +1,63 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import akka.actor.Address +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeSpec +import akka.util.duration._ + +object MultiNodeClusterSpec { + def clusterConfig: Config = ConfigFactory.parseString(""" + akka.cluster { + gossip-frequency = 200 ms + leader-actions-frequency = 200 ms + periodic-tasks-initial-delay = 300 ms + } + akka.test { + single-expect-default = 5 s + } + """) +} + +trait MultiNodeClusterSpec { self: MultiNodeSpec ⇒ + + def cluster: Cluster = Cluster(system) + + /** + * Assert that the member addresses match the expected addresses in the + * sort order used by the cluster. + */ + def assertMembers(gotMembers: Iterable[Member], expectedAddresses: Address*): Unit = { + import Member.addressOrdering + val members = gotMembers.toIndexedSeq + members.size must be(expectedAddresses.length) + expectedAddresses.sorted.zipWithIndex.foreach { case (a, i) ⇒ members(i).address must be(a) } + } + + /** + * Assert that the cluster has elected the correct leader + * out of all nodes in the cluster. First + * member in the cluster ring is expected leader. + */ + def assertLeader(nodesInCluster: RoleName*): Unit = if (nodesInCluster.contains(mySelf)) { + nodesInCluster.length must not be (0) + import Member.addressOrdering + val expectedLeader = nodesInCluster.map(role ⇒ (role, node(role).address)).sortBy(_._2).head._1 + cluster.isLeader must be(ifNode(expectedLeader)(true)(false)) + } + + /** + * Wait until the expected number of members has status Up + * and convergence has been reached. + */ + def awaitUpConvergence(numberOfMembers: Int): Unit = { + awaitCond(cluster.latestGossip.members.size == numberOfMembers) + awaitCond(cluster.latestGossip.members.forall(_.status == MemberStatus.Up)) + awaitCond(cluster.convergence.isDefined, 10 seconds) + } + +} \ No newline at end of file diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala index a8af644fe0..b5dc5d4d42 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala @@ -14,13 +14,7 @@ object NodeMembershipMultiJvmSpec extends MultiNodeConfig { val second = role("second") val third = role("third") - commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(""" - akka.cluster { - gossip-frequency = 200 ms - leader-actions-frequency = 200 ms - periodic-tasks-initial-delay = 300 ms - } - """))) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } @@ -28,13 +22,11 @@ class NodeMembershipMultiJvmNode1 extends NodeMembershipSpec class NodeMembershipMultiJvmNode2 extends NodeMembershipSpec class NodeMembershipMultiJvmNode3 extends NodeMembershipSpec -abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSpec) with ImplicitSender with BeforeAndAfter { +abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { import NodeMembershipMultiJvmSpec._ override def initialParticipants = 3 - def cluster: Cluster = Cluster(system) - after { testConductor.enter("after") } @@ -50,11 +42,7 @@ abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSp runOn(first, second) { cluster.join(firstAddress) awaitCond(cluster.latestGossip.members.size == 2) - val members = cluster.latestGossip.members.toIndexedSeq - members.size must be(2) - val sortedAddresses = IndexedSeq(firstAddress, secondAddress).sortBy(_.toString) - members(0).address must be(sortedAddresses(0)) - members(1).address must be(sortedAddresses(1)) + assertMembers(cluster.latestGossip.members, firstAddress, secondAddress) awaitCond { cluster.latestGossip.members.forall(_.status == MemberStatus.Up) } @@ -69,14 +57,8 @@ abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSp cluster.join(firstAddress) } - // runOn all awaitCond(cluster.latestGossip.members.size == 3) - val members = cluster.latestGossip.members.toIndexedSeq - members.size must be(3) - val sortedAddresses = IndexedSeq(firstAddress, secondAddress, thirdAddress).sortBy(_.toString) - members(0).address must be(sortedAddresses(0)) - members(1).address must be(sortedAddresses(1)) - members(2).address must be(sortedAddresses(2)) + assertMembers(cluster.latestGossip.members, firstAddress, secondAddress, thirdAddress) awaitCond { cluster.latestGossip.members.forall(_.status == MemberStatus.Up) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala index f2206f8d89..55a0b15f63 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala @@ -13,26 +13,18 @@ object NodeStartupMultiJvmSpec extends MultiNodeConfig { val first = role("first") val second = role("second") - commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(""" - akka.cluster { - gossip-frequency = 200 ms - leader-actions-frequency = 200 ms - periodic-tasks-initial-delay = 300 ms - } - """))) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } class NodeStartupMultiJvmNode1 extends NodeStartupSpec class NodeStartupMultiJvmNode2 extends NodeStartupSpec -abstract class NodeStartupSpec extends MultiNodeSpec(NodeStartupMultiJvmSpec) with ImplicitSender with BeforeAndAfter { +abstract class NodeStartupSpec extends MultiNodeSpec(NodeStartupMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { import NodeStartupMultiJvmSpec._ override def initialParticipants = 2 - def cluster: Cluster = Cluster(system) - after { testConductor.enter("after") } From 34915063cda510ba06baba744af1c3ee2bfcaf22 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 25 May 2012 12:51:37 +0200 Subject: [PATCH 3/4] Correct sort. See #2133 --- akka-cluster/src/main/scala/akka/cluster/Cluster.scala | 8 ++++---- akka-cluster/src/test/scala/akka/cluster/MemberSpec.scala | 3 ++- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 55f9967596..259f487358 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -101,10 +101,10 @@ object Member { import MemberStatus._ implicit val addressOrdering: Ordering[Address] = Ordering.fromLessThan[Address] { (a, b) ⇒ - if (a.protocol < b.protocol) true - else if (a.system < b.system) true - else if (a.host.getOrElse("") < b.host.getOrElse("")) true - else if (a.port.getOrElse(0) < b.port.getOrElse(0)) true + if (a.protocol != b.protocol) a.protocol.compareTo(b.protocol) < 0 + else if (a.system != b.system) a.system.compareTo(b.system) < 0 + else if (a.host.getOrElse("") != b.host.getOrElse("")) a.host.getOrElse("").compareTo(b.host.getOrElse("")) < 0 + else if (a.port.getOrElse(0) != b.port.getOrElse(0)) a.port.getOrElse(0) < b.port.getOrElse(0) else false } diff --git a/akka-cluster/src/test/scala/akka/cluster/MemberSpec.scala b/akka-cluster/src/test/scala/akka/cluster/MemberSpec.scala index a75ead0149..ba1037b8bc 100644 --- a/akka-cluster/src/test/scala/akka/cluster/MemberSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/MemberSpec.scala @@ -21,8 +21,9 @@ class MemberSpec extends WordSpec with MustMatchers { val m3 = Member(Address("cluster", "sys1", "host1", 10000), MemberStatus.Up) val m4 = Member(Address("cluster", "sys1", "host2", 10000), MemberStatus.Up) val m5 = Member(Address("cluster", "sys2", "host2", 10000), MemberStatus.Up) + val m6 = Member(Address("cluster", "sys2", "host3", 8000), MemberStatus.Up) - val expected = IndexedSeq(m1, m2, m3, m4, m5) + val expected = IndexedSeq(m1, m2, m3, m4, m5, m6) val shuffled = Random.shuffle(expected) shuffled.sorted must be(expected) } From a2cd84e0470cd6e1076d652238cd5a7bcc80a6e4 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 25 May 2012 12:59:14 +0200 Subject: [PATCH 4/4] Sort on host and port only. See #2133 --- akka-cluster/src/main/scala/akka/cluster/Cluster.scala | 9 +++++---- .../src/test/scala/akka/cluster/MemberSpec.scala | 10 +++++----- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 259f487358..d55882205c 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -100,11 +100,12 @@ class Member(val address: Address, val status: MemberStatus) extends ClusterMess object Member { import MemberStatus._ + /** + * Sort Address by host and port + */ implicit val addressOrdering: Ordering[Address] = Ordering.fromLessThan[Address] { (a, b) ⇒ - if (a.protocol != b.protocol) a.protocol.compareTo(b.protocol) < 0 - else if (a.system != b.system) a.system.compareTo(b.system) < 0 - else if (a.host.getOrElse("") != b.host.getOrElse("")) a.host.getOrElse("").compareTo(b.host.getOrElse("")) < 0 - else if (a.port.getOrElse(0) != b.port.getOrElse(0)) a.port.getOrElse(0) < b.port.getOrElse(0) + if (a.host != b.host) a.host.getOrElse("").compareTo(b.host.getOrElse("")) < 0 + else if (a.port != b.port) a.port.getOrElse(0) < b.port.getOrElse(0) else false } diff --git a/akka-cluster/src/test/scala/akka/cluster/MemberSpec.scala b/akka-cluster/src/test/scala/akka/cluster/MemberSpec.scala index ba1037b8bc..050407577e 100644 --- a/akka-cluster/src/test/scala/akka/cluster/MemberSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/MemberSpec.scala @@ -16,14 +16,14 @@ class MemberSpec extends WordSpec with MustMatchers { "be sorted by address correctly" in { import Member.ordering + // sorting should be done on host and port, only val m1 = Member(Address("akka", "sys1", "host1", 9000), MemberStatus.Up) val m2 = Member(Address("akka", "sys1", "host1", 10000), MemberStatus.Up) - val m3 = Member(Address("cluster", "sys1", "host1", 10000), MemberStatus.Up) - val m4 = Member(Address("cluster", "sys1", "host2", 10000), MemberStatus.Up) - val m5 = Member(Address("cluster", "sys2", "host2", 10000), MemberStatus.Up) - val m6 = Member(Address("cluster", "sys2", "host3", 8000), MemberStatus.Up) + val m3 = Member(Address("cluster", "sys2", "host2", 8000), MemberStatus.Up) + val m4 = Member(Address("cluster", "sys2", "host2", 9000), MemberStatus.Up) + val m5 = Member(Address("cluster", "sys1", "host2", 10000), MemberStatus.Up) - val expected = IndexedSeq(m1, m2, m3, m4, m5, m6) + val expected = IndexedSeq(m1, m2, m3, m4, m5) val shuffled = Random.shuffle(expected) shuffled.sorted must be(expected) }