Merge branch 'master' into wip-scala210M4-√

This commit is contained in:
Viktor Klang 2012-06-28 15:55:47 +02:00
commit dec7824de8
15 changed files with 200 additions and 33 deletions

View file

@ -22,7 +22,9 @@ akka {
# If seed-nodes is empty it will join itself and become a single node cluster.
auto-join = on
# should the 'leader' in the cluster be allowed to automatically mark unreachable nodes as DOWN?
# Should the 'leader' in the cluster be allowed to automatically mark unreachable nodes as DOWN?
# Using auto-down implies that two separate clusters will automatically be formed in case of
# network partition.
auto-down = on
# the number of gossip daemon actors

View file

@ -80,12 +80,12 @@ class AccrualFailureDetector(
settings: ClusterSettings) =
this(
system,
settings.FailureDetectorThreshold,
settings.FailureDetectorMaxSampleSize,
settings.FailureDetectorAcceptableHeartbeatPause,
settings.FailureDetectorMinStdDeviation,
settings.HeartbeatInterval,
AccrualFailureDetector.realClock)
threshold = settings.FailureDetectorThreshold,
maxSampleSize = settings.FailureDetectorMaxSampleSize,
minStdDeviation = settings.FailureDetectorMinStdDeviation,
acceptableHeartbeatPause = settings.FailureDetectorAcceptableHeartbeatPause,
firstHeartbeatEstimate = settings.HeartbeatInterval,
clock = AccrualFailureDetector.realClock)
private val log = Logging(system, "FailureDetector")
@ -107,8 +107,7 @@ class AccrualFailureDetector(
private case class State(
version: Long = 0L,
history: Map[Address, HeartbeatHistory] = Map.empty,
timestamps: Map[Address, Long] = Map.empty[Address, Long],
explicitRemovals: Set[Address] = Set.empty[Address])
timestamps: Map[Address, Long] = Map.empty[Address, Long])
private val state = new AtomicReference[State](State())
@ -141,8 +140,7 @@ class AccrualFailureDetector(
val newState = oldState copy (version = oldState.version + 1,
history = oldState.history + (connection -> newHistory),
timestamps = oldState.timestamps + (connection -> timestamp), // record new timestamp,
explicitRemovals = oldState.explicitRemovals - connection)
timestamps = oldState.timestamps + (connection -> timestamp)) // record new timestamp
// if we won the race then update else try again
if (!state.compareAndSet(oldState, newState)) heartbeat(connection) // recur
@ -158,9 +156,7 @@ class AccrualFailureDetector(
val oldState = state.get
val oldTimestamp = oldState.timestamps.get(connection)
// if connection has been removed explicitly
if (oldState.explicitRemovals.contains(connection)) Double.MaxValue
else if (oldTimestamp.isEmpty) 0.0 // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections
if (oldTimestamp.isEmpty) 0.0 // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections
else {
val timeDiff = clock() - oldTimestamp.get
@ -208,13 +204,24 @@ class AccrualFailureDetector(
if (oldState.history.contains(connection)) {
val newState = oldState copy (version = oldState.version + 1,
history = oldState.history - connection,
timestamps = oldState.timestamps - connection,
explicitRemovals = oldState.explicitRemovals + connection)
timestamps = oldState.timestamps - connection)
// if we won the race then update else try again
if (!state.compareAndSet(oldState, newState)) remove(connection) // recur
}
}
def reset(): Unit = {
@tailrec
def doReset(): Unit = {
val oldState = state.get
val newState = oldState.copy(version = oldState.version + 1, history = Map.empty, timestamps = Map.empty)
// if we won the race then update else try again
if (!state.compareAndSet(oldState, newState)) doReset() // recur
}
log.debug("Resetting failure detector")
doReset()
}
}
private[cluster] object HeartbeatHistory {

View file

@ -27,6 +27,7 @@ import javax.management._
import MemberStatus._
import scala.annotation.tailrec
import scala.collection.immutable.{ Map, SortedSet }
import scala.collection.GenTraversableOnce
/**
* Interface for membership change listener.
@ -179,6 +180,20 @@ object Member {
case (Joining, Joining) m1
case (Up, Up) m1
}
// FIXME Workaround for https://issues.scala-lang.org/browse/SI-5986
// SortedSet + and ++ operators replaces existing element
// Use these :+ and :++ operators for the Gossip members
implicit def sortedSetWorkaround(sortedSet: SortedSet[Member]): SortedSetWorkaround = new SortedSetWorkaround(sortedSet)
class SortedSetWorkaround(sortedSet: SortedSet[Member]) {
implicit def :+(elem: Member): SortedSet[Member] = {
if (sortedSet.contains(elem)) sortedSet
else sortedSet + elem
}
implicit def :++(elems: GenTraversableOnce[Member]): SortedSet[Member] =
sortedSet ++ (elems.toSet diff sortedSet)
}
}
/**
@ -226,6 +241,7 @@ case class GossipOverview(
object Gossip {
val emptyMembers: SortedSet[Member] = SortedSet.empty
}
/**
@ -300,7 +316,7 @@ case class Gossip(
*/
def :+(member: Member): Gossip = {
if (members contains member) this
else this copy (members = members + member)
else this copy (members = members :+ member)
}
/**
@ -329,7 +345,7 @@ case class Gossip(
// 4. merge members by selecting the single Member with highest MemberStatus out of the Member groups,
// and exclude unreachable
val mergedMembers = Gossip.emptyMembers ++ Member.pickHighestPriority(this.members, that.members).filterNot(mergedUnreachable.contains)
val mergedMembers = Gossip.emptyMembers :++ Member.pickHighestPriority(this.members, that.members).filterNot(mergedUnreachable.contains)
// 5. fresh seen table
val mergedSeen = Map.empty[Address, VectorClock]
@ -803,7 +819,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
// add joining node as Joining
// add self in case someone else joins before self has joined (Set discards duplicates)
val newMembers = localMembers + Member(node, Joining) + Member(selfAddress, Joining)
val newMembers = localMembers :+ Member(node, Joining) :+ Member(selfAddress, Joining)
val newGossip = localGossip copy (overview = newOverview, members = newMembers)
val versionedGossip = newGossip :+ vclockNode

View file

@ -25,4 +25,9 @@ trait FailureDetector {
* Removes the heartbeat management for a connection.
*/
def remove(connection: Address): Unit
/**
* Removes all connections and starts over.
*/
def reset(): Unit
}

View file

@ -198,7 +198,7 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu
}
}
def roleOfLeader(nodesInCluster: Seq[RoleName]): RoleName = {
def roleOfLeader(nodesInCluster: Seq[RoleName] = roles): RoleName = {
nodesInCluster.length must not be (0)
nodesInCluster.sorted.head
}

View file

@ -0,0 +1,111 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import com.typesafe.config.ConfigFactory
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import akka.util.duration._
import akka.actor.Address
import akka.remote.testconductor.Direction
object SplitBrainMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
val fifth = role("fifth")
commonConfig(debugConfig(on = false).
withFallback(ConfigFactory.parseString("""
akka.cluster {
auto-down = on
failure-detector.threshold = 4
}""")).
withFallback(MultiNodeClusterSpec.clusterConfig))
}
class SplitBrainWithFailureDetectorPuppetMultiJvmNode1 extends SplitBrainSpec with FailureDetectorPuppetStrategy
class SplitBrainWithFailureDetectorPuppetMultiJvmNode2 extends SplitBrainSpec with FailureDetectorPuppetStrategy
class SplitBrainWithFailureDetectorPuppetMultiJvmNode3 extends SplitBrainSpec with FailureDetectorPuppetStrategy
class SplitBrainWithFailureDetectorPuppetMultiJvmNode4 extends SplitBrainSpec with FailureDetectorPuppetStrategy
class SplitBrainWithFailureDetectorPuppetMultiJvmNode5 extends SplitBrainSpec with FailureDetectorPuppetStrategy
class SplitBrainWithAccrualFailureDetectorMultiJvmNode1 extends SplitBrainSpec with AccrualFailureDetectorStrategy
class SplitBrainWithAccrualFailureDetectorMultiJvmNode2 extends SplitBrainSpec with AccrualFailureDetectorStrategy
class SplitBrainWithAccrualFailureDetectorMultiJvmNode3 extends SplitBrainSpec with AccrualFailureDetectorStrategy
class SplitBrainWithAccrualFailureDetectorMultiJvmNode4 extends SplitBrainSpec with AccrualFailureDetectorStrategy
class SplitBrainWithAccrualFailureDetectorMultiJvmNode5 extends SplitBrainSpec with AccrualFailureDetectorStrategy
abstract class SplitBrainSpec
extends MultiNodeSpec(SplitBrainMultiJvmSpec)
with MultiNodeClusterSpec {
import SplitBrainMultiJvmSpec._
val side1 = IndexedSeq(first, second)
val side2 = IndexedSeq(third, fourth, fifth)
"A cluster of 5 members" must {
"reach initial convergence" taggedAs LongRunningTest in {
awaitClusterUp(first, second, third, fourth, fifth)
enterBarrier("after-1")
}
"detect network partition and mark nodes on other side as unreachable" taggedAs LongRunningTest in {
val thirdAddress = address(third)
enterBarrier("before-split")
runOn(first) {
// split the cluster in two parts (first, second) / (third, fourth, fifth)
for (role1 side1; role2 side2) {
testConductor.blackhole(role1, role2, Direction.Both).await
}
}
enterBarrier("after-split")
runOn(side1.last) {
for (role side2) markNodeAsUnavailable(role)
}
runOn(side2.last) {
for (role side1) markNodeAsUnavailable(role)
}
runOn(side1: _*) {
awaitCond(cluster.latestGossip.overview.unreachable.map(_.address) == (side2.toSet map address), 20 seconds)
}
runOn(side2: _*) {
awaitCond(cluster.latestGossip.overview.unreachable.map(_.address) == (side1.toSet map address), 20 seconds)
}
enterBarrier("after-2")
}
"auto-down the other nodes and form new cluster with potentially new leader" taggedAs LongRunningTest in {
runOn(side1: _*) {
// auto-down = on
awaitCond(cluster.latestGossip.overview.unreachable.forall(m m.status == MemberStatus.Down), 15 seconds)
cluster.latestGossip.overview.unreachable.map(_.address) must be(side2.toSet map address)
awaitUpConvergence(side1.size, side2 map address)
assertLeader(side1: _*)
}
runOn(side2: _*) {
// auto-down = on
awaitCond(cluster.latestGossip.overview.unreachable.forall(m m.status == MemberStatus.Down), 15 seconds)
cluster.latestGossip.overview.unreachable.map(_.address) must be(side1.toSet map address)
awaitUpConvergence(side2.size, side1 map address)
assertLeader(side2: _*)
}
enterBarrier("after-3")
}
}
}

View file

@ -99,12 +99,14 @@ abstract class TransitionSpec
"start nodes as singleton clusters" taggedAs LongRunningTest in {
runOn(first) {
startClusterNode()
cluster.isSingletonCluster must be(true)
cluster.status must be(Joining)
cluster.convergence.isDefined must be(true)
cluster.leaderActions()
cluster.status must be(Up)
}
enterBarrier("after-1")
}
@ -244,13 +246,20 @@ abstract class TransitionSpec
}
"startup a second separated cluster consisting of nodes fourth and fifth" taggedAs LongRunningTest in {
runOn(fifth) {
startClusterNode()
cluster.leaderActions()
cluster.status must be(Up)
}
enterBarrier("fifth-started")
runOn(fourth) {
cluster.join(fifth)
}
runOn(fifth) {
awaitMembers(fourth, fifth)
}
testConductor.enter("fourth-joined")
enterBarrier("fourth-joined")
fifth gossipTo fourth
fourth gossipTo fifth

View file

@ -114,7 +114,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
fd.isAvailable(conn) must be(true)
}
"mark node as dead after explicit removal of connection" in {
"mark node as available after explicit removal of connection" in {
val timeInterval = List[Long](0, 1000, 100, 100, 100)
val fd = createFailureDetector(clock = fakeTimeGenerator(timeInterval))
@ -124,7 +124,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
fd.isAvailable(conn) must be(true)
fd.remove(conn)
fd.isAvailable(conn) must be(false)
fd.isAvailable(conn) must be(true)
}
"mark node as available after explicit removal of connection and receiving heartbeat again" in {
@ -140,7 +140,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
fd.remove(conn)
fd.isAvailable(conn) must be(false) //3300
fd.isAvailable(conn) must be(true) //3300
// it receives heartbeat from an explicitly removed node
fd.heartbeat(conn) //4400

View file

@ -57,4 +57,9 @@ class FailureDetectorPuppet(system: ActorSystem, settings: ClusterSettings) exte
log.debug("Removing cluster node [{}]", connection)
connections.remove(connection)
}
def reset(): Unit = {
log.debug("Resetting failure detector")
connections.clear()
}
}

View file

@ -8,8 +8,8 @@ import org.scalatest.tools.StandardOutReporter
import org.scalatest.events._
import java.lang.Boolean.getBoolean
class QuietReporter(inColor: Boolean) extends StandardOutReporter(false, inColor, false, true) {
def this() = this(!getBoolean("akka.test.nocolor"))
class QuietReporter(inColor: Boolean, withDurations: Boolean = false) extends StandardOutReporter(withDurations, inColor, false, true) {
def this() = this(!getBoolean("akka.test.nocolor"), !getBoolean("akka.test.nodurations"))
override def apply(event: Event): Unit = event match {
case _: RunStarting ()

View file

@ -71,6 +71,9 @@ akka {
# If this is "on", Akka will log all outbound messages at DEBUG level, if off then they are not logged
log-sent-messages = off
# If this is "on", Akka will log all RemoteLifeCycleEvents at the level defined for each, if off then they are not logged
log-remote-lifecycle-events = off
# Each property is annotated with (I) or (O) or (I&O), where I stands for “inbound” and O for “outbound” connections.
# The NettyRemoteTransport always starts the server role to allow inbound connections, and it starts
# active client connections whenever sending to a destination which is not yet connected; if configured

View file

@ -14,4 +14,5 @@ class RemoteSettings(val config: Config, val systemName: String) {
val LogSend: Boolean = getBoolean("akka.remote.log-sent-messages")
val RemoteSystemDaemonAckTimeout: Duration = Duration(getMilliseconds("akka.remote.remote-daemon-ack-timeout"), MILLISECONDS)
val UntrustedMode: Boolean = getBoolean("akka.remote.untrusted-mode")
val LogRemoteLifeCycleEvents: Boolean = getBoolean("akka.remote.log-remote-lifecycle-events")
}

View file

@ -202,7 +202,7 @@ abstract class RemoteTransport(val system: ExtendedActorSystem, val provider: Re
*/
def notifyListeners(message: RemoteLifeCycleEvent): Unit = {
system.eventStream.publish(message)
system.log.log(message.logLevel, "{}", message)
if (logRemoteLifeCycleEvents) log.log(message.logLevel, "{}", message)
}
/**
@ -220,6 +220,11 @@ abstract class RemoteTransport(val system: ExtendedActorSystem, val provider: Re
*/
protected def useUntrustedMode: Boolean
/**
* When this method returns true, RemoteLifeCycleEvents will be logged as well as be put onto the eventStream.
*/
protected def logRemoteLifeCycleEvents: Boolean
/**
* Returns a newly created AkkaRemoteProtocol with the given message payload.
*/

View file

@ -128,6 +128,8 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider
override protected def useUntrustedMode = remoteSettings.UntrustedMode
override protected def logRemoteLifeCycleEvents = remoteSettings.LogRemoteLifeCycleEvents
val server: NettyRemoteServer = try createServer() catch { case NonFatal(ex) shutdown(); throw ex }
/**

View file

@ -29,6 +29,7 @@ class RemoteConfigSpec extends AkkaSpec(
RemoteTransport must be("akka.remote.netty.NettyRemoteTransport")
UntrustedMode must be(false)
RemoteSystemDaemonAckTimeout must be(30 seconds)
LogRemoteLifeCycleEvents must be(false)
}
"be able to parse Netty config elements" in {