Merge branch 'master' of github.com:akka/akka

This commit is contained in:
phaller 2012-06-27 19:14:45 +02:00
commit 01f353ddb7
15 changed files with 262 additions and 63 deletions

View file

@ -26,6 +26,10 @@ object RoutingSpec {
router = round-robin
nr-of-instances = 3
}
/router2 {
router = round-robin
nr-of-instances = 3
}
/myrouter {
router = "akka.routing.RoutingSpec$MyRouter"
foo = bar
@ -129,7 +133,7 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
}
"use configured nr-of-instances when router is specified" in {
val router = system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(nrOfInstances = 2)), "router1")
val router = system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(nrOfInstances = 2)), "router2")
Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(3)
system.stop(router)
}

View file

@ -18,10 +18,13 @@ akka {
# how long to wait for one of the seed nodes to reply to initial join request
seed-node-timeout = 5s
# automatic join the seed-nodes at startup
# Automatic join the seed-nodes at startup.
# If seed-nodes is empty it will join itself and become a single node cluster.
auto-join = on
# should the 'leader' in the cluster be allowed to automatically mark unreachable nodes as DOWN?
# Should the 'leader' in the cluster be allowed to automatically mark unreachable nodes as DOWN?
# Using auto-down implies that two separate clusters will automatically be formed in case of
# network partition.
auto-down = on
# the number of gossip daemon actors

View file

@ -80,12 +80,12 @@ class AccrualFailureDetector(
settings: ClusterSettings) =
this(
system,
settings.FailureDetectorThreshold,
settings.FailureDetectorMaxSampleSize,
settings.FailureDetectorAcceptableHeartbeatPause,
settings.FailureDetectorMinStdDeviation,
settings.HeartbeatInterval,
AccrualFailureDetector.realClock)
threshold = settings.FailureDetectorThreshold,
maxSampleSize = settings.FailureDetectorMaxSampleSize,
minStdDeviation = settings.FailureDetectorMinStdDeviation,
acceptableHeartbeatPause = settings.FailureDetectorAcceptableHeartbeatPause,
firstHeartbeatEstimate = settings.HeartbeatInterval,
clock = AccrualFailureDetector.realClock)
private val log = Logging(system, "FailureDetector")
@ -107,8 +107,7 @@ class AccrualFailureDetector(
private case class State(
version: Long = 0L,
history: Map[Address, HeartbeatHistory] = Map.empty,
timestamps: Map[Address, Long] = Map.empty[Address, Long],
explicitRemovals: Set[Address] = Set.empty[Address])
timestamps: Map[Address, Long] = Map.empty[Address, Long])
private val state = new AtomicReference[State](State())
@ -141,8 +140,7 @@ class AccrualFailureDetector(
val newState = oldState copy (version = oldState.version + 1,
history = oldState.history + (connection -> newHistory),
timestamps = oldState.timestamps + (connection -> timestamp), // record new timestamp,
explicitRemovals = oldState.explicitRemovals - connection)
timestamps = oldState.timestamps + (connection -> timestamp)) // record new timestamp
// if we won the race then update else try again
if (!state.compareAndSet(oldState, newState)) heartbeat(connection) // recur
@ -158,9 +156,7 @@ class AccrualFailureDetector(
val oldState = state.get
val oldTimestamp = oldState.timestamps.get(connection)
// if connection has been removed explicitly
if (oldState.explicitRemovals.contains(connection)) Double.MaxValue
else if (oldTimestamp.isEmpty) 0.0 // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections
if (oldTimestamp.isEmpty) 0.0 // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections
else {
val timeDiff = clock() - oldTimestamp.get
@ -208,13 +204,24 @@ class AccrualFailureDetector(
if (oldState.history.contains(connection)) {
val newState = oldState copy (version = oldState.version + 1,
history = oldState.history - connection,
timestamps = oldState.timestamps - connection,
explicitRemovals = oldState.explicitRemovals + connection)
timestamps = oldState.timestamps - connection)
// if we won the race then update else try again
if (!state.compareAndSet(oldState, newState)) remove(connection) // recur
}
}
def reset(): Unit = {
@tailrec
def doReset(): Unit = {
val oldState = state.get
val newState = oldState.copy(version = oldState.version + 1, history = Map.empty, timestamps = Map.empty)
// if we won the race then update else try again
if (!state.compareAndSet(oldState, newState)) doReset() // recur
}
log.debug("Resetting failure detector")
doReset()
}
}
private[cluster] object HeartbeatHistory {

View file

@ -27,6 +27,7 @@ import javax.management._
import MemberStatus._
import scala.annotation.tailrec
import scala.collection.immutable.{ Map, SortedSet }
import scala.collection.GenTraversableOnce
/**
* Interface for membership change listener.
@ -179,6 +180,20 @@ object Member {
case (Joining, Joining) m1
case (Up, Up) m1
}
// FIXME Workaround for https://issues.scala-lang.org/browse/SI-5986
// SortedSet + and ++ operators replaces existing element
// Use these :+ and :++ operators for the Gossip members
implicit def sortedSetWorkaround(sortedSet: SortedSet[Member]): SortedSetWorkaround = new SortedSetWorkaround(sortedSet)
class SortedSetWorkaround(sortedSet: SortedSet[Member]) {
implicit def :+(elem: Member): SortedSet[Member] = {
if (sortedSet.contains(elem)) sortedSet
else sortedSet + elem
}
implicit def :++(elems: GenTraversableOnce[Member]): SortedSet[Member] =
sortedSet ++ (elems.toSet diff sortedSet)
}
}
/**
@ -226,6 +241,7 @@ case class GossipOverview(
object Gossip {
val emptyMembers: SortedSet[Member] = SortedSet.empty
}
/**
@ -300,7 +316,7 @@ case class Gossip(
*/
def :+(member: Member): Gossip = {
if (members contains member) this
else this copy (members = members + member)
else this copy (members = members :+ member)
}
/**
@ -329,7 +345,7 @@ case class Gossip(
// 4. merge members by selecting the single Member with highest MemberStatus out of the Member groups,
// and exclude unreachable
val mergedMembers = Gossip.emptyMembers ++ Member.pickHighestPriority(this.members, that.members).filterNot(mergedUnreachable.contains)
val mergedMembers = Gossip.emptyMembers :++ Member.pickHighestPriority(this.members, that.members).filterNot(mergedUnreachable.contains)
// 5. fresh seen table
val mergedSeen = Map.empty[Address, VectorClock]
@ -364,20 +380,23 @@ private[cluster] final class ClusterCommandDaemon(cluster: Cluster) extends Acto
val log = Logging(context.system, this)
def receive = {
case JoinSeedNode joinSeedNode()
case InitJoin sender ! InitJoinAck(cluster.selfAddress)
case InitJoinAck(address) cluster.join(address)
case Join(address) cluster.joining(address)
case Down(address) cluster.downing(address)
case Leave(address) cluster.leaving(address)
case Exit(address) cluster.exiting(address)
case Remove(address) cluster.removing(address)
case JoinSeedNode joinSeedNode()
case InitJoin sender ! InitJoinAck(cluster.selfAddress)
case InitJoinAck(address) cluster.join(address)
case Join(address) cluster.joining(address)
case Down(address) cluster.downing(address)
case Leave(address) cluster.leaving(address)
case Exit(address) cluster.exiting(address)
case Remove(address) cluster.removing(address)
case Failure(e: AskTimeoutException) joinSeedNodeTimeout()
}
def joinSeedNode(): Unit = {
val seedRoutees = for (address cluster.seedNodes; if address != cluster.selfAddress)
yield self.path.toStringWithAddress(address)
if (seedRoutees.nonEmpty) {
if (seedRoutees.isEmpty) {
cluster join cluster.selfAddress
} else {
implicit val within = Timeout(cluster.clusterSettings.SeedNodeTimeout)
val seedRouter = context.actorOf(
Props.empty.withRouter(ScatterGatherFirstCompletedRouter(
@ -387,6 +406,8 @@ private[cluster] final class ClusterCommandDaemon(cluster: Cluster) extends Acto
}
}
def joinSeedNodeTimeout(): Unit = cluster join cluster.selfAddress
override def unhandled(unknown: Any) = log.error("Illegal command [{}]", unknown)
}
@ -534,10 +555,9 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
}
private val state = {
val member = Member(selfAddress, Joining)
val versionedGossip = Gossip(members = Gossip.emptyMembers + member) :+ vclockNode // add me as member and update my vector clock
val seenVersionedGossip = versionedGossip seen selfAddress
new AtomicReference[State](State(seenVersionedGossip))
// note that self is not initially member,
// and the Gossip is not versioned for this 'Node' yet
new AtomicReference[State](State(Gossip(members = Gossip.emptyMembers)))
}
// try to join one of the nodes defined in the 'akka.cluster.seed-nodes'
@ -797,7 +817,9 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
val newUnreachableMembers = localUnreachable filterNot { _.address == node }
val newOverview = localGossip.overview copy (unreachable = newUnreachableMembers)
val newMembers = localMembers + Member(node, Joining) // add joining node as Joining
// add joining node as Joining
// add self in case someone else joins before self has joined (Set discards duplicates)
val newMembers = localMembers :+ Member(node, Joining) :+ Member(selfAddress, Joining)
val newGossip = localGossip copy (overview = newOverview, members = newMembers)
val versionedGossip = newGossip :+ vclockNode
@ -939,11 +961,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
if (!localGossip.overview.isNonDownUnreachable(from)) {
val winningGossip =
if (isSingletonCluster(localState) && localGossip.overview.unreachable.isEmpty && remoteGossip.members.contains(self)) {
// a fresh singleton cluster that is joining, no need to merge, use received gossip
remoteGossip
} else if (remoteGossip.version <> localGossip.version) {
if (remoteGossip.version <> localGossip.version) {
// concurrent
val mergedGossip = remoteGossip merge localGossip
val versionedMergedGossip = mergedGossip :+ vclockNode

View file

@ -25,4 +25,9 @@ trait FailureDetector {
* Removes the heartbeat management for a connection.
*/
def remove(connection: Address): Unit
/**
* Removes all connections and starts over.
*/
def reset(): Unit
}

View file

@ -16,7 +16,9 @@ object JoinSeedNodeMultiJvmSpec extends MultiNodeConfig {
val ordinary1 = role("ordinary1")
val ordinary2 = role("ordinary2")
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
commonConfig(debugConfig(on = false).
withFallback(ConfigFactory.parseString("akka.cluster.auto-join = on")).
withFallback(MultiNodeClusterSpec.clusterConfig))
}
class JoinSeedNodeMultiJvmNode1 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy
@ -33,6 +35,23 @@ abstract class JoinSeedNodeSpec
override def seedNodes = IndexedSeq(seed1, seed2)
"A cluster with configured seed nodes" must {
"start the seed nodes sequentially" taggedAs LongRunningTest in {
runOn(seed1) {
startClusterNode()
}
enterBarrier("seed1-started")
runOn(seed2) {
startClusterNode()
}
enterBarrier("seed2-started")
runOn(seed1, seed2) {
awaitUpConvergence(2)
}
enterBarrier("after-1")
}
"join the seed nodes at startup" taggedAs LongRunningTest in {
startClusterNode()
@ -40,7 +59,7 @@ abstract class JoinSeedNodeSpec
awaitUpConvergence(4)
enterBarrier("after")
enterBarrier("after-2")
}
}
}

View file

@ -20,6 +20,7 @@ import akka.actor.RootActorPath
object MultiNodeClusterSpec {
def clusterConfig: Config = ConfigFactory.parseString("""
akka.cluster {
auto-join = off
auto-down = off
gossip-interval = 200 ms
heartbeat-interval = 400 ms
@ -99,10 +100,15 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu
def cluster: Cluster = clusterNode
/**
* Use this method instead of 'cluster.self'
* for the initial startup of the cluster node.
* Use this method for the initial startup of the cluster node.
*/
def startClusterNode(): Unit = cluster.self
def startClusterNode(): Unit = {
if (cluster.latestGossip.members.isEmpty) {
cluster join myself
awaitCond(cluster.latestGossip.members.exists(_.address == address(myself)))
} else
cluster.self
}
/**
* Initialize the cluster with the specified member
@ -192,7 +198,7 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu
}
}
def roleOfLeader(nodesInCluster: Seq[RoleName]): RoleName = {
def roleOfLeader(nodesInCluster: Seq[RoleName] = roles): RoleName = {
nodesInCluster.length must not be (0)
nodesInCluster.sorted.head
}

View file

@ -16,6 +16,7 @@ object SingletonClusterMultiJvmSpec extends MultiNodeConfig {
commonConfig(debugConfig(on = false).
withFallback(ConfigFactory.parseString("""
akka.cluster {
auto-join = on
auto-down = on
failure-detector.threshold = 4
}
@ -38,12 +39,20 @@ abstract class SingletonClusterSpec
"A cluster of 2 nodes" must {
"not be singleton cluster when joined" taggedAs LongRunningTest in {
"become singleton cluster when started with 'auto-join=on' and 'seed-nodes=[]'" taggedAs LongRunningTest in {
startClusterNode()
awaitUpConvergence(1)
cluster.isSingletonCluster must be(true)
enterBarrier("after-1")
}
"not be singleton cluster when joined with other node" taggedAs LongRunningTest in {
awaitClusterUp(first, second)
cluster.isSingletonCluster must be(false)
assertLeader(first, second)
enterBarrier("after-1")
enterBarrier("after-2")
}
"become singleton cluster when one node is shutdown" taggedAs LongRunningTest in {
@ -58,7 +67,7 @@ abstract class SingletonClusterSpec
assertLeader(first)
}
enterBarrier("after-2")
enterBarrier("after-3")
}
}
}

View file

@ -0,0 +1,111 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import com.typesafe.config.ConfigFactory
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import akka.util.duration._
import akka.actor.Address
import akka.remote.testconductor.Direction
object SplitBrainMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
val fifth = role("fifth")
commonConfig(debugConfig(on = false).
withFallback(ConfigFactory.parseString("""
akka.cluster {
auto-down = on
failure-detector.threshold = 4
}""")).
withFallback(MultiNodeClusterSpec.clusterConfig))
}
class SplitBrainWithFailureDetectorPuppetMultiJvmNode1 extends SplitBrainSpec with FailureDetectorPuppetStrategy
class SplitBrainWithFailureDetectorPuppetMultiJvmNode2 extends SplitBrainSpec with FailureDetectorPuppetStrategy
class SplitBrainWithFailureDetectorPuppetMultiJvmNode3 extends SplitBrainSpec with FailureDetectorPuppetStrategy
class SplitBrainWithFailureDetectorPuppetMultiJvmNode4 extends SplitBrainSpec with FailureDetectorPuppetStrategy
class SplitBrainWithFailureDetectorPuppetMultiJvmNode5 extends SplitBrainSpec with FailureDetectorPuppetStrategy
class SplitBrainWithAccrualFailureDetectorMultiJvmNode1 extends SplitBrainSpec with AccrualFailureDetectorStrategy
class SplitBrainWithAccrualFailureDetectorMultiJvmNode2 extends SplitBrainSpec with AccrualFailureDetectorStrategy
class SplitBrainWithAccrualFailureDetectorMultiJvmNode3 extends SplitBrainSpec with AccrualFailureDetectorStrategy
class SplitBrainWithAccrualFailureDetectorMultiJvmNode4 extends SplitBrainSpec with AccrualFailureDetectorStrategy
class SplitBrainWithAccrualFailureDetectorMultiJvmNode5 extends SplitBrainSpec with AccrualFailureDetectorStrategy
abstract class SplitBrainSpec
extends MultiNodeSpec(SplitBrainMultiJvmSpec)
with MultiNodeClusterSpec {
import SplitBrainMultiJvmSpec._
val side1 = IndexedSeq(first, second)
val side2 = IndexedSeq(third, fourth, fifth)
"A cluster of 5 members" must {
"reach initial convergence" taggedAs LongRunningTest in {
awaitClusterUp(first, second, third, fourth, fifth)
enterBarrier("after-1")
}
"detect network partition and mark nodes on other side as unreachable" taggedAs LongRunningTest in {
val thirdAddress = address(third)
enterBarrier("before-split")
runOn(first) {
// split the cluster in two parts (first, second) / (third, fourth, fifth)
for (role1 side1; role2 side2) {
testConductor.blackhole(role1, role2, Direction.Both).await
}
}
enterBarrier("after-split")
runOn(side1.last) {
for (role side2) markNodeAsUnavailable(role)
}
runOn(side2.last) {
for (role side1) markNodeAsUnavailable(role)
}
runOn(side1: _*) {
awaitCond(cluster.latestGossip.overview.unreachable.map(_.address) == (side2.toSet map address), 20 seconds)
}
runOn(side2: _*) {
awaitCond(cluster.latestGossip.overview.unreachable.map(_.address) == (side1.toSet map address), 20 seconds)
}
enterBarrier("after-2")
}
"auto-down the other nodes and form new cluster with potentially new leader" taggedAs LongRunningTest in {
runOn(side1: _*) {
// auto-down = on
awaitCond(cluster.latestGossip.overview.unreachable.forall(m m.status == MemberStatus.Down), 15 seconds)
cluster.latestGossip.overview.unreachable.map(_.address) must be(side2.toSet map address)
awaitUpConvergence(side1.size, side2 map address)
assertLeader(side1: _*)
}
runOn(side2: _*) {
// auto-down = on
awaitCond(cluster.latestGossip.overview.unreachable.forall(m m.status == MemberStatus.Down), 15 seconds)
cluster.latestGossip.overview.unreachable.map(_.address) must be(side1.toSet map address)
awaitUpConvergence(side2.size, side1 map address)
assertLeader(side2: _*)
}
enterBarrier("after-3")
}
}
}

View file

@ -25,6 +25,7 @@ object SunnyWeatherMultiJvmSpec extends MultiNodeConfig {
akka.cluster {
# FIXME remove this (use default) when ticket #2239 has been fixed
gossip-interval = 400 ms
auto-join = off
}
akka.loglevel = INFO
"""))

View file

@ -99,12 +99,14 @@ abstract class TransitionSpec
"start nodes as singleton clusters" taggedAs LongRunningTest in {
startClusterNode()
cluster.isSingletonCluster must be(true)
cluster.status must be(Joining)
cluster.convergence.isDefined must be(true)
cluster.leaderActions()
cluster.status must be(Up)
runOn(first) {
startClusterNode()
cluster.isSingletonCluster must be(true)
cluster.status must be(Joining)
cluster.convergence.isDefined must be(true)
cluster.leaderActions()
cluster.status must be(Up)
}
enterBarrier("after-1")
}
@ -244,13 +246,20 @@ abstract class TransitionSpec
}
"startup a second separated cluster consisting of nodes fourth and fifth" taggedAs LongRunningTest in {
runOn(fifth) {
startClusterNode()
cluster.leaderActions()
cluster.status must be(Up)
}
enterBarrier("fifth-started")
runOn(fourth) {
cluster.join(fifth)
}
runOn(fifth) {
awaitMembers(fourth, fifth)
}
testConductor.enter("fourth-joined")
enterBarrier("fourth-joined")
fifth gossipTo fourth
fourth gossipTo fifth

View file

@ -114,7 +114,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
fd.isAvailable(conn) must be(true)
}
"mark node as dead after explicit removal of connection" in {
"mark node as available after explicit removal of connection" in {
val timeInterval = List[Long](0, 1000, 100, 100, 100)
val fd = createFailureDetector(clock = fakeTimeGenerator(timeInterval))
@ -124,7 +124,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
fd.isAvailable(conn) must be(true)
fd.remove(conn)
fd.isAvailable(conn) must be(false)
fd.isAvailable(conn) must be(true)
}
"mark node as available after explicit removal of connection and receiving heartbeat again" in {
@ -140,7 +140,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
fd.remove(conn)
fd.isAvailable(conn) must be(false) //3300
fd.isAvailable(conn) must be(true) //3300
// it receives heartbeat from an explicitly removed node
fd.heartbeat(conn) //4400

View file

@ -86,11 +86,13 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter {
"use the address of the remote transport" in {
cluster.selfAddress must be(selfAddress)
cluster.self.address must be(selfAddress)
}
"initially be singleton cluster and reach convergence immediately" in {
cluster.isSingletonCluster must be(true)
"initially become singleton cluster when joining itself and reach convergence" in {
cluster.isSingletonCluster must be(false) // auto-join = off
cluster.join(selfAddress)
awaitCond(cluster.isSingletonCluster)
cluster.self.address must be(selfAddress)
cluster.latestGossip.members.map(_.address) must be(Set(selfAddress))
memberStatus(selfAddress) must be(Some(MemberStatus.Joining))
cluster.convergence.isDefined must be(true)

View file

@ -57,4 +57,9 @@ class FailureDetectorPuppet(system: ActorSystem, settings: ClusterSettings) exte
log.debug("Removing cluster node [{}]", connection)
connections.remove(connection)
}
def reset(): Unit = {
log.debug("Resetting failure detector")
connections.clear()
}
}

View file

@ -388,8 +388,8 @@ object AkkaBuild extends Build {
if (tags.isEmpty) Seq.empty else Seq(Tests.Argument("-n", tags.mkString(" ")))
},
// show full stack traces
testOptions in Test += Tests.Argument("-oF")
// show full stack traces and test case durations
testOptions in Test += Tests.Argument("-oDF")
)
lazy val formatSettings = ScalariformPlugin.scalariformSettings ++ Seq(