Unit tests of Cluster, see 2163

* ClusterSpec
- Test gossiping rules for deputies and unreachable
- Fix strange/wrong probabilites for gossip to unreachable and deputy nodes
- Fix lost order of Members when using map (without .toSeq) on the members SortedSet

* MemberSpec
- Test equals, hashCode

* GossipSpec
- Test member merge by status prio
- Fix bug in member merge (groupBy was wrong)
This commit is contained in:
Patrik Nordwall 2012-06-05 22:16:15 +02:00
parent 60c11cab7b
commit bc289df018
5 changed files with 360 additions and 43 deletions

View file

@ -180,7 +180,7 @@ case class GossipOverview(
*/
case class Gossip(
overview: GossipOverview = GossipOverview(),
members: SortedSet[Member], // sorted set of members with their status, sorted by name
members: SortedSet[Member], // sorted set of members with their status, sorted by address
meta: Map[String, Array[Byte]] = Map.empty[String, Array[Byte]],
version: VectorClock = VectorClock()) // vector clock version
extends ClusterMessage // is a serializable cluster message
@ -214,12 +214,8 @@ case class Gossip(
// 1. merge vector clocks
val mergedVClock = this.version merge that.version
// 2. group all members by Address => Vector[Member]
var membersGroupedByAddress = Map.empty[Address, Vector[Member]]
(this.members ++ that.members) foreach { m
val ms = membersGroupedByAddress.get(m.address).getOrElse(Vector.empty[Member])
membersGroupedByAddress += (m.address -> (ms :+ m))
}
// 2. group all members by Address => Seq[Member]
val membersGroupedByAddress = (this.members.toSeq ++ that.members.toSeq).groupBy(_.address)
// 3. merge members by selecting the single Member with highest MemberStatus out of the Member groups
val mergedMembers =
@ -252,10 +248,9 @@ case class Gossip(
* Manages routing of the different cluster commands.
* Instantiated as a single instance for each Cluster - e.g. commands are serialized to Cluster message after message.
*/
final class ClusterCommandDaemon extends Actor {
private[akka] final class ClusterCommandDaemon(cluster: Cluster) extends Actor {
import ClusterAction._
val cluster = Cluster(context.system)
val log = Logging(context.system, this)
def receive = {
@ -273,9 +268,8 @@ final class ClusterCommandDaemon extends Actor {
* Pooled and routed with N number of configurable instances.
* Concurrent access to Cluster.
*/
final class ClusterGossipDaemon extends Actor {
private[akka] final class ClusterGossipDaemon(cluster: Cluster) extends Actor {
val log = Logging(context.system, this)
val cluster = Cluster(context.system)
def receive = {
case GossipEnvelope(sender, gossip) cluster.receive(sender, gossip)
@ -287,13 +281,13 @@ final class ClusterGossipDaemon extends Actor {
/**
* Supervisor managing the different Cluster daemons.
*/
final class ClusterDaemonSupervisor extends Actor {
private[akka] final class ClusterDaemonSupervisor(cluster: Cluster) extends Actor {
val log = Logging(context.system, this)
val cluster = Cluster(context.system)
private val commands = context.actorOf(Props[ClusterCommandDaemon], "commands")
private val commands = context.actorOf(Props(new ClusterCommandDaemon(cluster)), "commands")
private val gossip = context.actorOf(
Props[ClusterGossipDaemon].withRouter(RoundRobinRouter(cluster.clusterSettings.NrOfGossipDaemons)), "gossip")
Props(new ClusterGossipDaemon(cluster)).withRouter(
RoundRobinRouter(cluster.clusterSettings.NrOfGossipDaemons)), "gossip")
def receive = Actor.emptyBehavior
@ -396,7 +390,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
// create superisor for daemons under path "/system/cluster"
private val clusterDaemons = {
val createChild = CreateChild(Props[ClusterDaemonSupervisor], "cluster")
val createChild = CreateChild(Props(new ClusterDaemonSupervisor(this)), "cluster")
Await.result(system.systemGuardian ? createChild, defaultTimeout.duration) match {
case a: ActorRef a
case e: Exception throw e
@ -794,9 +788,11 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
}
/**
* INTERNAL API
*
* Gossips latest gossip to an address.
*/
private def gossipTo(address: Address): Unit = {
protected def gossipTo(address: Address): Unit = {
val connection = clusterGossipConnectionFor(address)
log.debug("Cluster Node [{}] - Gossiping to [{}]", selfAddress, connection)
connection ! GossipEnvelope(self, latestGossip)
@ -805,23 +801,43 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
/**
* Gossips latest gossip to a random member in the set of members passed in as argument.
*
* @return 'true' if it gossiped to a "deputy" member.
* @return the used [[akka.actor.Address] if any
*/
private def gossipToRandomNodeOf(addresses: Iterable[Address]): Boolean = {
private def gossipToRandomNodeOf(addresses: IndexedSeq[Address]): Option[Address] = {
log.debug("Cluster Node [{}] - Selecting random node to gossip to [{}]", selfAddress, addresses.mkString(", "))
if (addresses.isEmpty) false
else {
val peers = addresses filter (_ != selfAddress) // filter out myself
val peer = selectRandomNode(peers)
gossipTo(peer)
deputyNodes exists (peer == _)
val peers = addresses filterNot (_ == selfAddress) // filter out myself
val peer = selectRandomNode(peers)
peer foreach gossipTo
peer
}
/**
* INTERNAL API
*/
protected[akka] def gossipToUnreachableProbablity(membersSize: Int, unreachableSize: Int): Double =
(membersSize + unreachableSize) match {
case 0 0.0
case sum unreachableSize.toDouble / sum
}
/**
* INTERNAL API
*/
protected[akka] def gossipToDeputyProbablity(membersSize: Int, unreachableSize: Int, nrOfDeputyNodes: Int): Double = {
if (nrOfDeputyNodes > membersSize) 1.0
else if (nrOfDeputyNodes == 0) 0.0
else (membersSize + unreachableSize) match {
case 0 0.0
case sum (nrOfDeputyNodes + unreachableSize).toDouble / sum
}
}
/**
* INTERNAL API
*
* Initates a new round of gossip.
*/
private def gossip(): Unit = {
private[akka] def gossip(): Unit = {
val localState = state.get
if (isSingletonCluster(localState)) {
@ -833,38 +849,42 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
log.debug("Cluster Node [{}] - Initiating new round of gossip", selfAddress)
val localGossip = localState.latestGossip
val localMembers = localGossip.members
// important to not accidentally use `map` of the SortedSet, since the original order is not preserved
val localMembers = localGossip.members.toIndexedSeq
val localMembersSize = localMembers.size
val localMemberAddresses = localMembers map { _.address }
val localUnreachableMembers = localGossip.overview.unreachable
val localUnreachableMembers = localGossip.overview.unreachable.toIndexedSeq
val localUnreachableSize = localUnreachableMembers.size
// 1. gossip to alive members
val gossipedToDeputy = gossipToRandomNodeOf(localMembers map { _.address })
val gossipedToAlive = gossipToRandomNodeOf(localMemberAddresses)
// 2. gossip to unreachable members
if (localUnreachableSize > 0) {
val probability: Double = localUnreachableSize / (localMembersSize + 1)
if (ThreadLocalRandom.current.nextDouble() < probability) gossipToRandomNodeOf(localUnreachableMembers.map(_.address))
val probability = gossipToUnreachableProbablity(localMembersSize, localUnreachableSize)
if (ThreadLocalRandom.current.nextDouble() < probability)
gossipToRandomNodeOf(localUnreachableMembers.map(_.address))
}
// 3. gossip to a deputy nodes for facilitating partition healing
val deputies = deputyNodes
if ((!gossipedToDeputy || localMembersSize < 1) && deputies.nonEmpty) {
if (localMembersSize == 0) gossipToRandomNodeOf(deputies)
else {
val probability = 1.0 / localMembersSize + localUnreachableSize
if (ThreadLocalRandom.current.nextDouble() <= probability) gossipToRandomNodeOf(deputies)
}
val deputies = deputyNodes(localMemberAddresses)
val alreadyGossipedToDeputy = gossipedToAlive.map(deputies.contains(_)).getOrElse(false)
if ((!alreadyGossipedToDeputy || localMembersSize < NrOfDeputyNodes) && deputies.nonEmpty) {
val probability = gossipToDeputyProbablity(localMembersSize, localUnreachableSize, NrOfDeputyNodes)
if (ThreadLocalRandom.current.nextDouble() < probability)
gossipToRandomNodeOf(deputies)
}
}
}
/**
* INTERNAL API
*
* Reaps the unreachable members (moves them to the 'unreachable' list in the cluster overview) according to the failure detector's verdict.
*/
@tailrec
final private def reapUnreachableMembers(): Unit = {
final private[akka] def reapUnreachableMembers(): Unit = {
val localState = state.get
if (!isSingletonCluster(localState) && isAvailable(localState)) {
@ -905,10 +925,12 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
}
/**
* INTERNAL API
*
* Runs periodic leader actions, such as auto-downing unreachable nodes, assigning partitions etc.
*/
@tailrec
final private def leaderActions(): Unit = {
final private[akka] def leaderActions(): Unit = {
val localState = state.get
val localGossip = localState.latestGossip
val localMembers = localGossip.members
@ -1082,11 +1104,17 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
private def clusterGossipConnectionFor(address: Address): ActorRef = system.actorFor(RootActorPath(address) / "system" / "cluster" / "gossip")
/**
* Gets an Iterable with the addresses of a all the 'deputy' nodes - excluding this node if part of the group.
* Gets the addresses of a all the 'deputy' nodes - excluding this node if part of the group.
*/
private def deputyNodes: Iterable[Address] = state.get.latestGossip.members.toIterable map (_.address) drop 1 take NrOfDeputyNodes filter (_ != selfAddress)
private def deputyNodes(addresses: IndexedSeq[Address]): IndexedSeq[Address] =
addresses drop 1 take NrOfDeputyNodes filterNot (_ == selfAddress)
private def selectRandomNode(addresses: Iterable[Address]): Address = addresses.toSeq(ThreadLocalRandom.current nextInt addresses.size)
/**
* INTERNAL API
*/
protected def selectRandomNode(addresses: IndexedSeq[Address]): Option[Address] =
if (addresses.isEmpty) None
else Some(addresses(ThreadLocalRandom.current nextInt addresses.size))
private def isSingletonCluster(currentState: State): Boolean = currentState.latestGossip.members.size == 1

View file

@ -20,6 +20,7 @@ object MultiNodeClusterSpec {
leader-actions-interval = 200 ms
unreachable-nodes-reaper-interval = 200 ms
periodic-tasks-initial-delay = 300 ms
nr-of-deputy-nodes = 2
}
akka.test {
single-expect-default = 5 s

View file

@ -0,0 +1,232 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import akka.testkit.AkkaSpec
import akka.util.duration._
import akka.util.Duration
import akka.actor.ExtendedActorSystem
import akka.actor.Address
import java.util.concurrent.atomic.AtomicInteger
import org.scalatest.BeforeAndAfter
object ClusterSpec {
val config = """
akka.cluster {
auto-down = off
nr-of-deputy-nodes = 3
periodic-tasks-initial-delay = 120 seconds // turn off scheduled tasks
}
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
akka.remote.netty.port = 0
akka.loglevel = DEBUG
"""
case class GossipTo(address: Address)
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter {
import ClusterSpec._
val deterministicRandom = new AtomicInteger
val cluster = new Cluster(system.asInstanceOf[ExtendedActorSystem]) {
override def selectRandomNode(addresses: IndexedSeq[Address]): Option[Address] = {
if (addresses.isEmpty) None
else Some(addresses.toSeq(deterministicRandom.getAndIncrement % addresses.size))
}
override def gossipTo(address: Address): Unit = {
if (address == self.address) {
super.gossipTo(address)
}
// represent the gossip with a message to be used in asserts
testActor ! GossipTo(address)
}
@volatile
var _gossipToUnreachableProbablity = 0.0
override def gossipToUnreachableProbablity(membersSize: Int, unreachableSize: Int): Double = {
if (_gossipToUnreachableProbablity < 0.0) super.gossipToUnreachableProbablity(membersSize, unreachableSize)
else _gossipToUnreachableProbablity
}
@volatile
var _gossipToDeputyProbablity = 0.0
override def gossipToDeputyProbablity(membersSize: Int, unreachableSize: Int, deputySize: Int): Double = {
if (_gossipToDeputyProbablity < 0.0) super.gossipToDeputyProbablity(membersSize, unreachableSize, deputySize)
else _gossipToDeputyProbablity
}
@volatile
var _unavailable: Set[Address] = Set.empty
override val failureDetector = new AccrualFailureDetector(
system, selfAddress, clusterSettings.FailureDetectorThreshold, clusterSettings.FailureDetectorMaxSampleSize) {
override def isAvailable(connection: Address): Boolean = {
if (_unavailable.contains(connection)) false
else super.isAvailable(connection)
}
}
}
val selfAddress = cluster.self.address
val addresses = IndexedSeq(
selfAddress,
Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 1),
Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 2),
Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 3),
Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 4),
Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 5))
def memberStatus(address: Address): Option[MemberStatus] =
cluster.latestGossip.members.collectFirst { case m if m.address == address m.status }
before {
cluster._gossipToUnreachableProbablity = 0.0
cluster._gossipToDeputyProbablity = 0.0
cluster._unavailable = Set.empty
deterministicRandom.set(0)
}
"A Cluster" must {
"initially be singleton cluster and reach convergence after first gossip" in {
cluster.isSingletonCluster must be(true)
cluster.latestGossip.members.map(_.address) must be(Set(selfAddress))
memberStatus(selfAddress) must be(Some(MemberStatus.Joining))
cluster.convergence.isDefined must be(false)
cluster.gossip()
expectMsg(GossipTo(selfAddress))
awaitCond(cluster.convergence.isDefined)
memberStatus(selfAddress) must be(Some(MemberStatus.Joining))
cluster.leaderActions()
memberStatus(selfAddress) must be(Some(MemberStatus.Up))
}
"accept a joining node" in {
cluster.joining(addresses(1))
cluster.latestGossip.members.map(_.address) must be(Set(selfAddress, addresses(1)))
memberStatus(addresses(1)) must be(Some(MemberStatus.Joining))
// FIXME why is it still convergence immediately after joining?
//cluster.convergence.isDefined must be(false)
}
"accept a few more joining nodes" in {
for (a addresses.drop(2)) {
cluster.joining(a)
memberStatus(a) must be(Some(MemberStatus.Joining))
}
cluster.latestGossip.members.map(_.address) must be(addresses.toSet)
}
"order members by host and port" in {
// note the importance of using toSeq before map, otherwise it will not preserve the order
cluster.latestGossip.members.toSeq.map(_.address) must be(addresses.toSeq)
}
"gossip to random live node" in {
cluster.latestGossip.members
cluster.gossip()
cluster.gossip()
cluster.gossip()
cluster.gossip()
expectMsg(GossipTo(addresses(1)))
expectMsg(GossipTo(addresses(2)))
expectMsg(GossipTo(addresses(3)))
expectMsg(GossipTo(addresses(4)))
expectNoMsg(1 second)
}
"use certain probability for gossiping to unreachable node depending on the number of unreachable and live nodes" in {
cluster._gossipToUnreachableProbablity = -1.0 // use real impl
cluster.gossipToUnreachableProbablity(10, 1) must be < (cluster.gossipToUnreachableProbablity(9, 1))
cluster.gossipToUnreachableProbablity(10, 1) must be < (cluster.gossipToUnreachableProbablity(10, 2))
cluster.gossipToUnreachableProbablity(10, 5) must be < (cluster.gossipToUnreachableProbablity(10, 9))
cluster.gossipToUnreachableProbablity(0, 10) must be <= (1.0)
cluster.gossipToUnreachableProbablity(1, 10) must be <= (1.0)
cluster.gossipToUnreachableProbablity(10, 0) must be(0.0 plusOrMinus (0.0001))
cluster.gossipToUnreachableProbablity(0, 0) must be(0.0 plusOrMinus (0.0001))
}
"use certain probability for gossiping to deputy node depending on the number of unreachable and live nodes" in {
cluster._gossipToDeputyProbablity = -1.0 // use real impl
cluster.gossipToDeputyProbablity(10, 1, 2) must be < (cluster.gossipToDeputyProbablity(9, 1, 2))
cluster.gossipToDeputyProbablity(10, 1, 2) must be < (cluster.gossipToDeputyProbablity(10, 2, 2))
cluster.gossipToDeputyProbablity(10, 1, 2) must be < (cluster.gossipToDeputyProbablity(10, 2, 3))
cluster.gossipToDeputyProbablity(10, 5, 5) must be < (cluster.gossipToDeputyProbablity(10, 9, 5))
cluster.gossipToDeputyProbablity(0, 10, 0) must be <= (1.0)
cluster.gossipToDeputyProbablity(1, 10, 1) must be <= (1.0)
cluster.gossipToDeputyProbablity(10, 0, 0) must be(0.0 plusOrMinus (0.0001))
cluster.gossipToDeputyProbablity(0, 0, 0) must be(0.0 plusOrMinus (0.0001))
cluster.gossipToDeputyProbablity(4, 0, 4) must be(1.0 plusOrMinus (0.0001))
cluster.gossipToDeputyProbablity(3, 7, 4) must be(1.0 plusOrMinus (0.0001))
}
"gossip to duputy node" in {
cluster._gossipToDeputyProbablity = 1.0 // always
// we have configured 2 deputy nodes
cluster.gossip() // 1 is deputy
cluster.gossip() // 2 is deputy
cluster.gossip() // 3 is deputy
cluster.gossip() // 4 is not deputy, and therefore a deputy is also used
expectMsg(GossipTo(addresses(1)))
expectMsg(GossipTo(addresses(2)))
expectMsg(GossipTo(addresses(3)))
expectMsg(GossipTo(addresses(4)))
// and the extra gossip to deputy
expectMsgAnyOf(GossipTo(addresses(1)), GossipTo(addresses(2)), GossipTo(addresses(3)))
expectNoMsg(1 second)
}
"gossip to random unreachable node" in {
val dead = Set(addresses(1))
cluster._unavailable = dead
cluster._gossipToUnreachableProbablity = 1.0 // always
cluster.reapUnreachableMembers()
cluster.latestGossip.overview.unreachable.map(_.address) must be(dead)
cluster.gossip()
expectMsg(GossipTo(addresses(2))) // first available
expectMsg(GossipTo(addresses(1))) // the unavailable
expectNoMsg(1 second)
}
"gossip to random deputy node if number of live nodes is less than number of deputy nodes" in {
cluster._gossipToDeputyProbablity = -1.0 // real impl
// 0 and 2 still alive
val dead = Set(addresses(1), addresses(3), addresses(4), addresses(5))
cluster._unavailable = dead
cluster.reapUnreachableMembers()
cluster.latestGossip.overview.unreachable.map(_.address) must be(dead)
for (n 1 to 20) {
cluster.gossip()
expectMsg(GossipTo(addresses(2))) // the only available
// and always to one of the 3 deputies
expectMsgAnyOf(GossipTo(addresses(1)), GossipTo(addresses(2)), GossipTo(addresses(3)))
}
expectNoMsg(1 second)
}
}
}

View file

@ -0,0 +1,42 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import akka.actor.Address
import scala.collection.immutable.SortedSet
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class GossipSpec extends WordSpec with MustMatchers {
"A Gossip" must {
"merge members by status priority" in {
import MemberStatus._
val a1 = Member(Address("akka", "sys", "a", 2552), Up)
val a2 = Member(Address("akka", "sys", "a", 2552), Joining)
val b1 = Member(Address("akka", "sys", "b", 2552), Up)
val b2 = Member(Address("akka", "sys", "b", 2552), Removed)
val c1 = Member(Address("akka", "sys", "c", 2552), Leaving)
val c2 = Member(Address("akka", "sys", "c", 2552), Up)
val d1 = Member(Address("akka", "sys", "d", 2552), Leaving)
val d2 = Member(Address("akka", "sys", "d", 2552), Removed)
val g1 = Gossip(members = SortedSet(a1, b1, c1, d1))
val g2 = Gossip(members = SortedSet(a2, b2, c2, d2))
val merged1 = g1 merge g2
merged1.members must be(SortedSet(a1, b2, c1, d2))
merged1.members.toSeq.map(_.status) must be(Seq(Up, Removed, Leaving, Removed))
val merged2 = g2 merge g1
merged2.members must be(SortedSet(a1, b2, c1, d2))
merged2.members.toSeq.map(_.status) must be(Seq(Up, Removed, Leaving, Removed))
}
}
}

View file

@ -8,6 +8,7 @@ import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import akka.actor.Address
import scala.util.Random
import scala.collection.immutable.SortedSet
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class MemberSpec extends WordSpec with MustMatchers {
@ -26,6 +27,19 @@ class MemberSpec extends WordSpec with MustMatchers {
val expected = IndexedSeq(m1, m2, m3, m4, m5)
val shuffled = Random.shuffle(expected)
shuffled.sorted must be(expected)
(SortedSet.empty[Member] ++ shuffled).toIndexedSeq must be(expected)
}
"have stable equals and hashCode" in {
val m1 = Member(Address("akka", "sys1", "host1", 9000), MemberStatus.Joining)
val m2 = Member(Address("akka", "sys1", "host1", 9000), MemberStatus.Up)
val m3 = Member(Address("akka", "sys1", "host1", 10000), MemberStatus.Up)
m1 must be(m2)
m1.hashCode must be(m2.hashCode)
m3 must not be (m2)
m3 must not be (m1)
}
}
}