diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index c16a34a2ca..935df0acce 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -180,7 +180,7 @@ case class GossipOverview( */ case class Gossip( overview: GossipOverview = GossipOverview(), - members: SortedSet[Member], // sorted set of members with their status, sorted by name + members: SortedSet[Member], // sorted set of members with their status, sorted by address meta: Map[String, Array[Byte]] = Map.empty[String, Array[Byte]], version: VectorClock = VectorClock()) // vector clock version extends ClusterMessage // is a serializable cluster message @@ -214,12 +214,8 @@ case class Gossip( // 1. merge vector clocks val mergedVClock = this.version merge that.version - // 2. group all members by Address => Vector[Member] - var membersGroupedByAddress = Map.empty[Address, Vector[Member]] - (this.members ++ that.members) foreach { m ⇒ - val ms = membersGroupedByAddress.get(m.address).getOrElse(Vector.empty[Member]) - membersGroupedByAddress += (m.address -> (ms :+ m)) - } + // 2. group all members by Address => Seq[Member] + val membersGroupedByAddress = (this.members.toSeq ++ that.members.toSeq).groupBy(_.address) // 3. merge members by selecting the single Member with highest MemberStatus out of the Member groups val mergedMembers = @@ -252,10 +248,9 @@ case class Gossip( * Manages routing of the different cluster commands. * Instantiated as a single instance for each Cluster - e.g. commands are serialized to Cluster message after message. */ -final class ClusterCommandDaemon extends Actor { +private[akka] final class ClusterCommandDaemon(cluster: Cluster) extends Actor { import ClusterAction._ - val cluster = Cluster(context.system) val log = Logging(context.system, this) def receive = { @@ -273,9 +268,8 @@ final class ClusterCommandDaemon extends Actor { * Pooled and routed with N number of configurable instances. * Concurrent access to Cluster. */ -final class ClusterGossipDaemon extends Actor { +private[akka] final class ClusterGossipDaemon(cluster: Cluster) extends Actor { val log = Logging(context.system, this) - val cluster = Cluster(context.system) def receive = { case GossipEnvelope(sender, gossip) ⇒ cluster.receive(sender, gossip) @@ -287,13 +281,13 @@ final class ClusterGossipDaemon extends Actor { /** * Supervisor managing the different Cluster daemons. */ -final class ClusterDaemonSupervisor extends Actor { +private[akka] final class ClusterDaemonSupervisor(cluster: Cluster) extends Actor { val log = Logging(context.system, this) - val cluster = Cluster(context.system) - private val commands = context.actorOf(Props[ClusterCommandDaemon], "commands") + private val commands = context.actorOf(Props(new ClusterCommandDaemon(cluster)), "commands") private val gossip = context.actorOf( - Props[ClusterGossipDaemon].withRouter(RoundRobinRouter(cluster.clusterSettings.NrOfGossipDaemons)), "gossip") + Props(new ClusterGossipDaemon(cluster)).withRouter( + RoundRobinRouter(cluster.clusterSettings.NrOfGossipDaemons)), "gossip") def receive = Actor.emptyBehavior @@ -396,7 +390,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ // create superisor for daemons under path "/system/cluster" private val clusterDaemons = { - val createChild = CreateChild(Props[ClusterDaemonSupervisor], "cluster") + val createChild = CreateChild(Props(new ClusterDaemonSupervisor(this)), "cluster") Await.result(system.systemGuardian ? createChild, defaultTimeout.duration) match { case a: ActorRef ⇒ a case e: Exception ⇒ throw e @@ -794,9 +788,11 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ } /** + * INTERNAL API + * * Gossips latest gossip to an address. */ - private def gossipTo(address: Address): Unit = { + protected def gossipTo(address: Address): Unit = { val connection = clusterGossipConnectionFor(address) log.debug("Cluster Node [{}] - Gossiping to [{}]", selfAddress, connection) connection ! GossipEnvelope(self, latestGossip) @@ -805,23 +801,43 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ /** * Gossips latest gossip to a random member in the set of members passed in as argument. * - * @return 'true' if it gossiped to a "deputy" member. + * @return the used [[akka.actor.Address] if any */ - private def gossipToRandomNodeOf(addresses: Iterable[Address]): Boolean = { + private def gossipToRandomNodeOf(addresses: IndexedSeq[Address]): Option[Address] = { log.debug("Cluster Node [{}] - Selecting random node to gossip to [{}]", selfAddress, addresses.mkString(", ")) - if (addresses.isEmpty) false - else { - val peers = addresses filter (_ != selfAddress) // filter out myself - val peer = selectRandomNode(peers) - gossipTo(peer) - deputyNodes exists (peer == _) + val peers = addresses filterNot (_ == selfAddress) // filter out myself + val peer = selectRandomNode(peers) + peer foreach gossipTo + peer + } + + /** + * INTERNAL API + */ + protected[akka] def gossipToUnreachableProbablity(membersSize: Int, unreachableSize: Int): Double = + (membersSize + unreachableSize) match { + case 0 ⇒ 0.0 + case sum ⇒ unreachableSize.toDouble / sum + } + + /** + * INTERNAL API + */ + protected[akka] def gossipToDeputyProbablity(membersSize: Int, unreachableSize: Int, nrOfDeputyNodes: Int): Double = { + if (nrOfDeputyNodes > membersSize) 1.0 + else if (nrOfDeputyNodes == 0) 0.0 + else (membersSize + unreachableSize) match { + case 0 ⇒ 0.0 + case sum ⇒ (nrOfDeputyNodes + unreachableSize).toDouble / sum } } /** + * INTERNAL API + * * Initates a new round of gossip. */ - private def gossip(): Unit = { + private[akka] def gossip(): Unit = { val localState = state.get if (isSingletonCluster(localState)) { @@ -833,38 +849,42 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ log.debug("Cluster Node [{}] - Initiating new round of gossip", selfAddress) val localGossip = localState.latestGossip - val localMembers = localGossip.members + // important to not accidentally use `map` of the SortedSet, since the original order is not preserved + val localMembers = localGossip.members.toIndexedSeq val localMembersSize = localMembers.size + val localMemberAddresses = localMembers map { _.address } - val localUnreachableMembers = localGossip.overview.unreachable + val localUnreachableMembers = localGossip.overview.unreachable.toIndexedSeq val localUnreachableSize = localUnreachableMembers.size // 1. gossip to alive members - val gossipedToDeputy = gossipToRandomNodeOf(localMembers map { _.address }) + val gossipedToAlive = gossipToRandomNodeOf(localMemberAddresses) // 2. gossip to unreachable members if (localUnreachableSize > 0) { - val probability: Double = localUnreachableSize / (localMembersSize + 1) - if (ThreadLocalRandom.current.nextDouble() < probability) gossipToRandomNodeOf(localUnreachableMembers.map(_.address)) + val probability = gossipToUnreachableProbablity(localMembersSize, localUnreachableSize) + if (ThreadLocalRandom.current.nextDouble() < probability) + gossipToRandomNodeOf(localUnreachableMembers.map(_.address)) } // 3. gossip to a deputy nodes for facilitating partition healing - val deputies = deputyNodes - if ((!gossipedToDeputy || localMembersSize < 1) && deputies.nonEmpty) { - if (localMembersSize == 0) gossipToRandomNodeOf(deputies) - else { - val probability = 1.0 / localMembersSize + localUnreachableSize - if (ThreadLocalRandom.current.nextDouble() <= probability) gossipToRandomNodeOf(deputies) - } + val deputies = deputyNodes(localMemberAddresses) + val alreadyGossipedToDeputy = gossipedToAlive.map(deputies.contains(_)).getOrElse(false) + if ((!alreadyGossipedToDeputy || localMembersSize < NrOfDeputyNodes) && deputies.nonEmpty) { + val probability = gossipToDeputyProbablity(localMembersSize, localUnreachableSize, NrOfDeputyNodes) + if (ThreadLocalRandom.current.nextDouble() < probability) + gossipToRandomNodeOf(deputies) } } } /** + * INTERNAL API + * * Reaps the unreachable members (moves them to the 'unreachable' list in the cluster overview) according to the failure detector's verdict. */ @tailrec - final private def reapUnreachableMembers(): Unit = { + final private[akka] def reapUnreachableMembers(): Unit = { val localState = state.get if (!isSingletonCluster(localState) && isAvailable(localState)) { @@ -905,10 +925,12 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ } /** + * INTERNAL API + * * Runs periodic leader actions, such as auto-downing unreachable nodes, assigning partitions etc. */ @tailrec - final private def leaderActions(): Unit = { + final private[akka] def leaderActions(): Unit = { val localState = state.get val localGossip = localState.latestGossip val localMembers = localGossip.members @@ -1082,11 +1104,17 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ private def clusterGossipConnectionFor(address: Address): ActorRef = system.actorFor(RootActorPath(address) / "system" / "cluster" / "gossip") /** - * Gets an Iterable with the addresses of a all the 'deputy' nodes - excluding this node if part of the group. + * Gets the addresses of a all the 'deputy' nodes - excluding this node if part of the group. */ - private def deputyNodes: Iterable[Address] = state.get.latestGossip.members.toIterable map (_.address) drop 1 take NrOfDeputyNodes filter (_ != selfAddress) + private def deputyNodes(addresses: IndexedSeq[Address]): IndexedSeq[Address] = + addresses drop 1 take NrOfDeputyNodes filterNot (_ == selfAddress) - private def selectRandomNode(addresses: Iterable[Address]): Address = addresses.toSeq(ThreadLocalRandom.current nextInt addresses.size) + /** + * INTERNAL API + */ + protected def selectRandomNode(addresses: IndexedSeq[Address]): Option[Address] = + if (addresses.isEmpty) None + else Some(addresses(ThreadLocalRandom.current nextInt addresses.size)) private def isSingletonCluster(currentState: State): Boolean = currentState.latestGossip.members.size == 1 diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 113064e13c..7f7d60fcdc 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -20,6 +20,7 @@ object MultiNodeClusterSpec { leader-actions-interval = 200 ms unreachable-nodes-reaper-interval = 200 ms periodic-tasks-initial-delay = 300 ms + nr-of-deputy-nodes = 2 } akka.test { single-expect-default = 5 s diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala new file mode 100644 index 0000000000..fdc3095f74 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -0,0 +1,232 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import akka.testkit.AkkaSpec +import akka.util.duration._ +import akka.util.Duration +import akka.actor.ExtendedActorSystem +import akka.actor.Address +import java.util.concurrent.atomic.AtomicInteger +import org.scalatest.BeforeAndAfter + +object ClusterSpec { + val config = """ + akka.cluster { + auto-down = off + nr-of-deputy-nodes = 3 + periodic-tasks-initial-delay = 120 seconds // turn off scheduled tasks + } + akka.actor.provider = "akka.remote.RemoteActorRefProvider" + akka.remote.netty.port = 0 + akka.loglevel = DEBUG + """ + + case class GossipTo(address: Address) +} + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { + import ClusterSpec._ + + val deterministicRandom = new AtomicInteger + + val cluster = new Cluster(system.asInstanceOf[ExtendedActorSystem]) { + + override def selectRandomNode(addresses: IndexedSeq[Address]): Option[Address] = { + if (addresses.isEmpty) None + else Some(addresses.toSeq(deterministicRandom.getAndIncrement % addresses.size)) + } + + override def gossipTo(address: Address): Unit = { + if (address == self.address) { + super.gossipTo(address) + } + // represent the gossip with a message to be used in asserts + testActor ! GossipTo(address) + } + + @volatile + var _gossipToUnreachableProbablity = 0.0 + + override def gossipToUnreachableProbablity(membersSize: Int, unreachableSize: Int): Double = { + if (_gossipToUnreachableProbablity < 0.0) super.gossipToUnreachableProbablity(membersSize, unreachableSize) + else _gossipToUnreachableProbablity + } + + @volatile + var _gossipToDeputyProbablity = 0.0 + + override def gossipToDeputyProbablity(membersSize: Int, unreachableSize: Int, deputySize: Int): Double = { + if (_gossipToDeputyProbablity < 0.0) super.gossipToDeputyProbablity(membersSize, unreachableSize, deputySize) + else _gossipToDeputyProbablity + } + + @volatile + var _unavailable: Set[Address] = Set.empty + + override val failureDetector = new AccrualFailureDetector( + system, selfAddress, clusterSettings.FailureDetectorThreshold, clusterSettings.FailureDetectorMaxSampleSize) { + + override def isAvailable(connection: Address): Boolean = { + if (_unavailable.contains(connection)) false + else super.isAvailable(connection) + } + } + + } + + val selfAddress = cluster.self.address + val addresses = IndexedSeq( + selfAddress, + Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 1), + Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 2), + Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 3), + Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 4), + Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 5)) + + def memberStatus(address: Address): Option[MemberStatus] = + cluster.latestGossip.members.collectFirst { case m if m.address == address ⇒ m.status } + + before { + cluster._gossipToUnreachableProbablity = 0.0 + cluster._gossipToDeputyProbablity = 0.0 + cluster._unavailable = Set.empty + deterministicRandom.set(0) + } + + "A Cluster" must { + + "initially be singleton cluster and reach convergence after first gossip" in { + cluster.isSingletonCluster must be(true) + cluster.latestGossip.members.map(_.address) must be(Set(selfAddress)) + memberStatus(selfAddress) must be(Some(MemberStatus.Joining)) + cluster.convergence.isDefined must be(false) + cluster.gossip() + expectMsg(GossipTo(selfAddress)) + awaitCond(cluster.convergence.isDefined) + memberStatus(selfAddress) must be(Some(MemberStatus.Joining)) + cluster.leaderActions() + memberStatus(selfAddress) must be(Some(MemberStatus.Up)) + } + + "accept a joining node" in { + cluster.joining(addresses(1)) + cluster.latestGossip.members.map(_.address) must be(Set(selfAddress, addresses(1))) + memberStatus(addresses(1)) must be(Some(MemberStatus.Joining)) + // FIXME why is it still convergence immediately after joining? + //cluster.convergence.isDefined must be(false) + } + + "accept a few more joining nodes" in { + for (a ← addresses.drop(2)) { + cluster.joining(a) + memberStatus(a) must be(Some(MemberStatus.Joining)) + } + cluster.latestGossip.members.map(_.address) must be(addresses.toSet) + } + + "order members by host and port" in { + // note the importance of using toSeq before map, otherwise it will not preserve the order + cluster.latestGossip.members.toSeq.map(_.address) must be(addresses.toSeq) + } + + "gossip to random live node" in { + cluster.latestGossip.members + cluster.gossip() + cluster.gossip() + cluster.gossip() + cluster.gossip() + + expectMsg(GossipTo(addresses(1))) + expectMsg(GossipTo(addresses(2))) + expectMsg(GossipTo(addresses(3))) + expectMsg(GossipTo(addresses(4))) + + expectNoMsg(1 second) + } + + "use certain probability for gossiping to unreachable node depending on the number of unreachable and live nodes" in { + cluster._gossipToUnreachableProbablity = -1.0 // use real impl + cluster.gossipToUnreachableProbablity(10, 1) must be < (cluster.gossipToUnreachableProbablity(9, 1)) + cluster.gossipToUnreachableProbablity(10, 1) must be < (cluster.gossipToUnreachableProbablity(10, 2)) + cluster.gossipToUnreachableProbablity(10, 5) must be < (cluster.gossipToUnreachableProbablity(10, 9)) + cluster.gossipToUnreachableProbablity(0, 10) must be <= (1.0) + cluster.gossipToUnreachableProbablity(1, 10) must be <= (1.0) + cluster.gossipToUnreachableProbablity(10, 0) must be(0.0 plusOrMinus (0.0001)) + cluster.gossipToUnreachableProbablity(0, 0) must be(0.0 plusOrMinus (0.0001)) + } + + "use certain probability for gossiping to deputy node depending on the number of unreachable and live nodes" in { + cluster._gossipToDeputyProbablity = -1.0 // use real impl + cluster.gossipToDeputyProbablity(10, 1, 2) must be < (cluster.gossipToDeputyProbablity(9, 1, 2)) + cluster.gossipToDeputyProbablity(10, 1, 2) must be < (cluster.gossipToDeputyProbablity(10, 2, 2)) + cluster.gossipToDeputyProbablity(10, 1, 2) must be < (cluster.gossipToDeputyProbablity(10, 2, 3)) + cluster.gossipToDeputyProbablity(10, 5, 5) must be < (cluster.gossipToDeputyProbablity(10, 9, 5)) + cluster.gossipToDeputyProbablity(0, 10, 0) must be <= (1.0) + cluster.gossipToDeputyProbablity(1, 10, 1) must be <= (1.0) + cluster.gossipToDeputyProbablity(10, 0, 0) must be(0.0 plusOrMinus (0.0001)) + cluster.gossipToDeputyProbablity(0, 0, 0) must be(0.0 plusOrMinus (0.0001)) + cluster.gossipToDeputyProbablity(4, 0, 4) must be(1.0 plusOrMinus (0.0001)) + cluster.gossipToDeputyProbablity(3, 7, 4) must be(1.0 plusOrMinus (0.0001)) + } + + "gossip to duputy node" in { + cluster._gossipToDeputyProbablity = 1.0 // always + + // we have configured 2 deputy nodes + cluster.gossip() // 1 is deputy + cluster.gossip() // 2 is deputy + cluster.gossip() // 3 is deputy + cluster.gossip() // 4 is not deputy, and therefore a deputy is also used + + expectMsg(GossipTo(addresses(1))) + expectMsg(GossipTo(addresses(2))) + expectMsg(GossipTo(addresses(3))) + expectMsg(GossipTo(addresses(4))) + // and the extra gossip to deputy + expectMsgAnyOf(GossipTo(addresses(1)), GossipTo(addresses(2)), GossipTo(addresses(3))) + + expectNoMsg(1 second) + + } + + "gossip to random unreachable node" in { + val dead = Set(addresses(1)) + cluster._unavailable = dead + cluster._gossipToUnreachableProbablity = 1.0 // always + + cluster.reapUnreachableMembers() + cluster.latestGossip.overview.unreachable.map(_.address) must be(dead) + + cluster.gossip() + + expectMsg(GossipTo(addresses(2))) // first available + expectMsg(GossipTo(addresses(1))) // the unavailable + + expectNoMsg(1 second) + } + + "gossip to random deputy node if number of live nodes is less than number of deputy nodes" in { + cluster._gossipToDeputyProbablity = -1.0 // real impl + // 0 and 2 still alive + val dead = Set(addresses(1), addresses(3), addresses(4), addresses(5)) + cluster._unavailable = dead + + cluster.reapUnreachableMembers() + cluster.latestGossip.overview.unreachable.map(_.address) must be(dead) + + for (n ← 1 to 20) { + cluster.gossip() + expectMsg(GossipTo(addresses(2))) // the only available + // and always to one of the 3 deputies + expectMsgAnyOf(GossipTo(addresses(1)), GossipTo(addresses(2)), GossipTo(addresses(3))) + } + + expectNoMsg(1 second) + + } + } +} diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala new file mode 100644 index 0000000000..77cd0c52ba --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala @@ -0,0 +1,42 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers +import akka.actor.Address +import scala.collection.immutable.SortedSet + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class GossipSpec extends WordSpec with MustMatchers { + + "A Gossip" must { + + "merge members by status priority" in { + import MemberStatus._ + val a1 = Member(Address("akka", "sys", "a", 2552), Up) + val a2 = Member(Address("akka", "sys", "a", 2552), Joining) + val b1 = Member(Address("akka", "sys", "b", 2552), Up) + val b2 = Member(Address("akka", "sys", "b", 2552), Removed) + val c1 = Member(Address("akka", "sys", "c", 2552), Leaving) + val c2 = Member(Address("akka", "sys", "c", 2552), Up) + val d1 = Member(Address("akka", "sys", "d", 2552), Leaving) + val d2 = Member(Address("akka", "sys", "d", 2552), Removed) + + val g1 = Gossip(members = SortedSet(a1, b1, c1, d1)) + val g2 = Gossip(members = SortedSet(a2, b2, c2, d2)) + + val merged1 = g1 merge g2 + merged1.members must be(SortedSet(a1, b2, c1, d2)) + merged1.members.toSeq.map(_.status) must be(Seq(Up, Removed, Leaving, Removed)) + + val merged2 = g2 merge g1 + merged2.members must be(SortedSet(a1, b2, c1, d2)) + merged2.members.toSeq.map(_.status) must be(Seq(Up, Removed, Leaving, Removed)) + + } + + } +} diff --git a/akka-cluster/src/test/scala/akka/cluster/MemberSpec.scala b/akka-cluster/src/test/scala/akka/cluster/MemberSpec.scala index 050407577e..bc1f70ae86 100644 --- a/akka-cluster/src/test/scala/akka/cluster/MemberSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/MemberSpec.scala @@ -8,6 +8,7 @@ import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers import akka.actor.Address import scala.util.Random +import scala.collection.immutable.SortedSet @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class MemberSpec extends WordSpec with MustMatchers { @@ -26,6 +27,19 @@ class MemberSpec extends WordSpec with MustMatchers { val expected = IndexedSeq(m1, m2, m3, m4, m5) val shuffled = Random.shuffle(expected) shuffled.sorted must be(expected) + (SortedSet.empty[Member] ++ shuffled).toIndexedSeq must be(expected) + } + + "have stable equals and hashCode" in { + val m1 = Member(Address("akka", "sys1", "host1", 9000), MemberStatus.Joining) + val m2 = Member(Address("akka", "sys1", "host1", 9000), MemberStatus.Up) + val m3 = Member(Address("akka", "sys1", "host1", 10000), MemberStatus.Up) + + m1 must be(m2) + m1.hashCode must be(m2.hashCode) + + m3 must not be (m2) + m3 must not be (m1) } } }