Merge remote-tracking branch 'akka/master' into osgi

This commit is contained in:
Gert Vanthienen 2012-06-27 14:31:41 +02:00
commit be4119a2bd
6 changed files with 156 additions and 18 deletions

View file

@ -22,7 +22,9 @@ akka {
# If seed-nodes is empty it will join itself and become a single node cluster.
auto-join = on
# should the 'leader' in the cluster be allowed to automatically mark unreachable nodes as DOWN?
# Should the 'leader' in the cluster be allowed to automatically mark unreachable nodes as DOWN?
# Using auto-down implies that two separate clusters will automatically be formed in case of
# network partition.
auto-down = on
# the number of gossip daemon actors

View file

@ -80,12 +80,12 @@ class AccrualFailureDetector(
settings: ClusterSettings) =
this(
system,
settings.FailureDetectorThreshold,
settings.FailureDetectorMaxSampleSize,
settings.FailureDetectorAcceptableHeartbeatPause,
settings.FailureDetectorMinStdDeviation,
settings.HeartbeatInterval,
AccrualFailureDetector.realClock)
threshold = settings.FailureDetectorThreshold,
maxSampleSize = settings.FailureDetectorMaxSampleSize,
minStdDeviation = settings.FailureDetectorMinStdDeviation,
acceptableHeartbeatPause = settings.FailureDetectorAcceptableHeartbeatPause,
firstHeartbeatEstimate = settings.HeartbeatInterval,
clock = AccrualFailureDetector.realClock)
private val log = Logging(system, "FailureDetector")

View file

@ -27,6 +27,7 @@ import javax.management._
import MemberStatus._
import scala.annotation.tailrec
import scala.collection.immutable.{ Map, SortedSet }
import scala.collection.GenTraversableOnce
/**
* Interface for membership change listener.
@ -179,6 +180,20 @@ object Member {
case (Joining, Joining) m1
case (Up, Up) m1
}
// FIXME Workaround for https://issues.scala-lang.org/browse/SI-5986
// SortedSet + and ++ operators replaces existing element
// Use these :+ and :++ operators for the Gossip members
implicit def sortedSetWorkaround(sortedSet: SortedSet[Member]): SortedSetWorkaround = new SortedSetWorkaround(sortedSet)
class SortedSetWorkaround(sortedSet: SortedSet[Member]) {
implicit def :+(elem: Member): SortedSet[Member] = {
if (sortedSet.contains(elem)) sortedSet
else sortedSet + elem
}
implicit def :++(elems: GenTraversableOnce[Member]): SortedSet[Member] =
sortedSet ++ (elems.toSet diff sortedSet)
}
}
/**
@ -226,6 +241,7 @@ case class GossipOverview(
object Gossip {
val emptyMembers: SortedSet[Member] = SortedSet.empty
}
/**
@ -300,7 +316,7 @@ case class Gossip(
*/
def :+(member: Member): Gossip = {
if (members contains member) this
else this copy (members = members + member)
else this copy (members = members :+ member)
}
/**
@ -329,7 +345,7 @@ case class Gossip(
// 4. merge members by selecting the single Member with highest MemberStatus out of the Member groups,
// and exclude unreachable
val mergedMembers = Gossip.emptyMembers ++ Member.pickHighestPriority(this.members, that.members).filterNot(mergedUnreachable.contains)
val mergedMembers = Gossip.emptyMembers :++ Member.pickHighestPriority(this.members, that.members).filterNot(mergedUnreachable.contains)
// 5. fresh seen table
val mergedSeen = Map.empty[Address, VectorClock]
@ -803,7 +819,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
// add joining node as Joining
// add self in case someone else joins before self has joined (Set discards duplicates)
val newMembers = localMembers + Member(node, Joining) + Member(selfAddress, Joining)
val newMembers = localMembers :+ Member(node, Joining) :+ Member(selfAddress, Joining)
val newGossip = localGossip copy (overview = newOverview, members = newMembers)
val versionedGossip = newGossip :+ vclockNode

View file

@ -198,7 +198,7 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu
}
}
def roleOfLeader(nodesInCluster: Seq[RoleName]): RoleName = {
def roleOfLeader(nodesInCluster: Seq[RoleName] = roles): RoleName = {
nodesInCluster.length must not be (0)
nodesInCluster.sorted.head
}

View file

@ -0,0 +1,111 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import com.typesafe.config.ConfigFactory
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import akka.util.duration._
import akka.actor.Address
import akka.remote.testconductor.Direction
object SplitBrainMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
val fifth = role("fifth")
commonConfig(debugConfig(on = false).
withFallback(ConfigFactory.parseString("""
akka.cluster {
auto-down = on
failure-detector.threshold = 4
}""")).
withFallback(MultiNodeClusterSpec.clusterConfig))
}
class SplitBrainWithFailureDetectorPuppetMultiJvmNode1 extends SplitBrainSpec with FailureDetectorPuppetStrategy
class SplitBrainWithFailureDetectorPuppetMultiJvmNode2 extends SplitBrainSpec with FailureDetectorPuppetStrategy
class SplitBrainWithFailureDetectorPuppetMultiJvmNode3 extends SplitBrainSpec with FailureDetectorPuppetStrategy
class SplitBrainWithFailureDetectorPuppetMultiJvmNode4 extends SplitBrainSpec with FailureDetectorPuppetStrategy
class SplitBrainWithFailureDetectorPuppetMultiJvmNode5 extends SplitBrainSpec with FailureDetectorPuppetStrategy
class SplitBrainWithAccrualFailureDetectorMultiJvmNode1 extends SplitBrainSpec with AccrualFailureDetectorStrategy
class SplitBrainWithAccrualFailureDetectorMultiJvmNode2 extends SplitBrainSpec with AccrualFailureDetectorStrategy
class SplitBrainWithAccrualFailureDetectorMultiJvmNode3 extends SplitBrainSpec with AccrualFailureDetectorStrategy
class SplitBrainWithAccrualFailureDetectorMultiJvmNode4 extends SplitBrainSpec with AccrualFailureDetectorStrategy
class SplitBrainWithAccrualFailureDetectorMultiJvmNode5 extends SplitBrainSpec with AccrualFailureDetectorStrategy
abstract class SplitBrainSpec
extends MultiNodeSpec(SplitBrainMultiJvmSpec)
with MultiNodeClusterSpec {
import SplitBrainMultiJvmSpec._
val side1 = IndexedSeq(first, second)
val side2 = IndexedSeq(third, fourth, fifth)
"A cluster of 5 members" must {
"reach initial convergence" taggedAs LongRunningTest in {
awaitClusterUp(first, second, third, fourth, fifth)
enterBarrier("after-1")
}
"detect network partition and mark nodes on other side as unreachable" taggedAs LongRunningTest in {
val thirdAddress = address(third)
enterBarrier("before-split")
runOn(first) {
// split the cluster in two parts (first, second) / (third, fourth, fifth)
for (role1 side1; role2 side2) {
testConductor.blackhole(role1, role2, Direction.Both).await
}
}
enterBarrier("after-split")
runOn(side1.last) {
for (role side2) markNodeAsUnavailable(role)
}
runOn(side2.last) {
for (role side1) markNodeAsUnavailable(role)
}
runOn(side1: _*) {
awaitCond(cluster.latestGossip.overview.unreachable.map(_.address) == (side2.toSet map address), 20 seconds)
}
runOn(side2: _*) {
awaitCond(cluster.latestGossip.overview.unreachable.map(_.address) == (side1.toSet map address), 20 seconds)
}
enterBarrier("after-2")
}
"auto-down the other nodes and form new cluster with potentially new leader" taggedAs LongRunningTest in {
runOn(side1: _*) {
// auto-down = on
awaitCond(cluster.latestGossip.overview.unreachable.forall(m m.status == MemberStatus.Down), 15 seconds)
cluster.latestGossip.overview.unreachable.map(_.address) must be(side2.toSet map address)
awaitUpConvergence(side1.size, side2 map address)
assertLeader(side1: _*)
}
runOn(side2: _*) {
// auto-down = on
awaitCond(cluster.latestGossip.overview.unreachable.forall(m m.status == MemberStatus.Down), 15 seconds)
cluster.latestGossip.overview.unreachable.map(_.address) must be(side1.toSet map address)
awaitUpConvergence(side2.size, side1 map address)
assertLeader(side2: _*)
}
enterBarrier("after-3")
}
}
}

View file

@ -99,12 +99,14 @@ abstract class TransitionSpec
"start nodes as singleton clusters" taggedAs LongRunningTest in {
startClusterNode()
cluster.isSingletonCluster must be(true)
cluster.status must be(Joining)
cluster.convergence.isDefined must be(true)
cluster.leaderActions()
cluster.status must be(Up)
runOn(first) {
startClusterNode()
cluster.isSingletonCluster must be(true)
cluster.status must be(Joining)
cluster.convergence.isDefined must be(true)
cluster.leaderActions()
cluster.status must be(Up)
}
enterBarrier("after-1")
}
@ -244,13 +246,20 @@ abstract class TransitionSpec
}
"startup a second separated cluster consisting of nodes fourth and fifth" taggedAs LongRunningTest in {
runOn(fifth) {
startClusterNode()
cluster.leaderActions()
cluster.status must be(Up)
}
enterBarrier("fifth-started")
runOn(fourth) {
cluster.join(fifth)
}
runOn(fifth) {
awaitMembers(fourth, fifth)
}
testConductor.enter("fourth-joined")
enterBarrier("fourth-joined")
fifth gossipTo fourth
fourth gossipTo fifth