diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index d226506acc..a06e9273cb 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -22,7 +22,9 @@ akka { # If seed-nodes is empty it will join itself and become a single node cluster. auto-join = on - # should the 'leader' in the cluster be allowed to automatically mark unreachable nodes as DOWN? + # Should the 'leader' in the cluster be allowed to automatically mark unreachable nodes as DOWN? + # Using auto-down implies that two separate clusters will automatically be formed in case of + # network partition. auto-down = on # the number of gossip daemon actors diff --git a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala index c397d065e5..f1c761dec7 100644 --- a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala @@ -80,12 +80,12 @@ class AccrualFailureDetector( settings: ClusterSettings) = this( system, - settings.FailureDetectorThreshold, - settings.FailureDetectorMaxSampleSize, - settings.FailureDetectorAcceptableHeartbeatPause, - settings.FailureDetectorMinStdDeviation, - settings.HeartbeatInterval, - AccrualFailureDetector.realClock) + threshold = settings.FailureDetectorThreshold, + maxSampleSize = settings.FailureDetectorMaxSampleSize, + minStdDeviation = settings.FailureDetectorMinStdDeviation, + acceptableHeartbeatPause = settings.FailureDetectorAcceptableHeartbeatPause, + firstHeartbeatEstimate = settings.HeartbeatInterval, + clock = AccrualFailureDetector.realClock) private val log = Logging(system, "FailureDetector") @@ -107,8 +107,7 @@ class AccrualFailureDetector( private case class State( version: Long = 0L, history: Map[Address, HeartbeatHistory] = Map.empty, - timestamps: Map[Address, Long] = Map.empty[Address, Long], - explicitRemovals: Set[Address] = Set.empty[Address]) + timestamps: Map[Address, Long] = Map.empty[Address, Long]) private val state = new AtomicReference[State](State()) @@ -141,8 +140,7 @@ class AccrualFailureDetector( val newState = oldState copy (version = oldState.version + 1, history = oldState.history + (connection -> newHistory), - timestamps = oldState.timestamps + (connection -> timestamp), // record new timestamp, - explicitRemovals = oldState.explicitRemovals - connection) + timestamps = oldState.timestamps + (connection -> timestamp)) // record new timestamp // if we won the race then update else try again if (!state.compareAndSet(oldState, newState)) heartbeat(connection) // recur @@ -158,9 +156,7 @@ class AccrualFailureDetector( val oldState = state.get val oldTimestamp = oldState.timestamps.get(connection) - // if connection has been removed explicitly - if (oldState.explicitRemovals.contains(connection)) Double.MaxValue - else if (oldTimestamp.isEmpty) 0.0 // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections + if (oldTimestamp.isEmpty) 0.0 // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections else { val timeDiff = clock() - oldTimestamp.get @@ -208,13 +204,24 @@ class AccrualFailureDetector( if (oldState.history.contains(connection)) { val newState = oldState copy (version = oldState.version + 1, history = oldState.history - connection, - timestamps = oldState.timestamps - connection, - explicitRemovals = oldState.explicitRemovals + connection) + timestamps = oldState.timestamps - connection) // if we won the race then update else try again if (!state.compareAndSet(oldState, newState)) remove(connection) // recur } } + + def reset(): Unit = { + @tailrec + def doReset(): Unit = { + val oldState = state.get + val newState = oldState.copy(version = oldState.version + 1, history = Map.empty, timestamps = Map.empty) + // if we won the race then update else try again + if (!state.compareAndSet(oldState, newState)) doReset() // recur + } + log.debug("Resetting failure detector") + doReset() + } } private[cluster] object HeartbeatHistory { diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 091f250dec..f6cf9b6864 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -27,6 +27,7 @@ import javax.management._ import MemberStatus._ import scala.annotation.tailrec import scala.collection.immutable.{ Map, SortedSet } +import scala.collection.GenTraversableOnce /** * Interface for membership change listener. @@ -179,6 +180,20 @@ object Member { case (Joining, Joining) ⇒ m1 case (Up, Up) ⇒ m1 } + + // FIXME Workaround for https://issues.scala-lang.org/browse/SI-5986 + // SortedSet + and ++ operators replaces existing element + // Use these :+ and :++ operators for the Gossip members + implicit def sortedSetWorkaround(sortedSet: SortedSet[Member]): SortedSetWorkaround = new SortedSetWorkaround(sortedSet) + class SortedSetWorkaround(sortedSet: SortedSet[Member]) { + implicit def :+(elem: Member): SortedSet[Member] = { + if (sortedSet.contains(elem)) sortedSet + else sortedSet + elem + } + + implicit def :++(elems: GenTraversableOnce[Member]): SortedSet[Member] = + sortedSet ++ (elems.toSet diff sortedSet) + } } /** @@ -226,6 +241,7 @@ case class GossipOverview( object Gossip { val emptyMembers: SortedSet[Member] = SortedSet.empty + } /** @@ -300,7 +316,7 @@ case class Gossip( */ def :+(member: Member): Gossip = { if (members contains member) this - else this copy (members = members + member) + else this copy (members = members :+ member) } /** @@ -329,7 +345,7 @@ case class Gossip( // 4. merge members by selecting the single Member with highest MemberStatus out of the Member groups, // and exclude unreachable - val mergedMembers = Gossip.emptyMembers ++ Member.pickHighestPriority(this.members, that.members).filterNot(mergedUnreachable.contains) + val mergedMembers = Gossip.emptyMembers :++ Member.pickHighestPriority(this.members, that.members).filterNot(mergedUnreachable.contains) // 5. fresh seen table val mergedSeen = Map.empty[Address, VectorClock] @@ -803,7 +819,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) // add joining node as Joining // add self in case someone else joins before self has joined (Set discards duplicates) - val newMembers = localMembers + Member(node, Joining) + Member(selfAddress, Joining) + val newMembers = localMembers :+ Member(node, Joining) :+ Member(selfAddress, Joining) val newGossip = localGossip copy (overview = newOverview, members = newMembers) val versionedGossip = newGossip :+ vclockNode diff --git a/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala index 60af0a1c41..1aa926c5e5 100644 --- a/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala @@ -25,4 +25,9 @@ trait FailureDetector { * Removes the heartbeat management for a connection. */ def remove(connection: Address): Unit + + /** + * Removes all connections and starts over. + */ + def reset(): Unit } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index ed95013bf4..3264c661b0 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -198,7 +198,7 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu } } - def roleOfLeader(nodesInCluster: Seq[RoleName]): RoleName = { + def roleOfLeader(nodesInCluster: Seq[RoleName] = roles): RoleName = { nodesInCluster.length must not be (0) nodesInCluster.sorted.head } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala new file mode 100644 index 0000000000..24e94f715d --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala @@ -0,0 +1,111 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.ConfigFactory +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.util.duration._ +import akka.actor.Address +import akka.remote.testconductor.Direction + +object SplitBrainMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + val fifth = role("fifth") + + commonConfig(debugConfig(on = false). + withFallback(ConfigFactory.parseString(""" + akka.cluster { + auto-down = on + failure-detector.threshold = 4 + }""")). + withFallback(MultiNodeClusterSpec.clusterConfig)) +} + +class SplitBrainWithFailureDetectorPuppetMultiJvmNode1 extends SplitBrainSpec with FailureDetectorPuppetStrategy +class SplitBrainWithFailureDetectorPuppetMultiJvmNode2 extends SplitBrainSpec with FailureDetectorPuppetStrategy +class SplitBrainWithFailureDetectorPuppetMultiJvmNode3 extends SplitBrainSpec with FailureDetectorPuppetStrategy +class SplitBrainWithFailureDetectorPuppetMultiJvmNode4 extends SplitBrainSpec with FailureDetectorPuppetStrategy +class SplitBrainWithFailureDetectorPuppetMultiJvmNode5 extends SplitBrainSpec with FailureDetectorPuppetStrategy + +class SplitBrainWithAccrualFailureDetectorMultiJvmNode1 extends SplitBrainSpec with AccrualFailureDetectorStrategy +class SplitBrainWithAccrualFailureDetectorMultiJvmNode2 extends SplitBrainSpec with AccrualFailureDetectorStrategy +class SplitBrainWithAccrualFailureDetectorMultiJvmNode3 extends SplitBrainSpec with AccrualFailureDetectorStrategy +class SplitBrainWithAccrualFailureDetectorMultiJvmNode4 extends SplitBrainSpec with AccrualFailureDetectorStrategy +class SplitBrainWithAccrualFailureDetectorMultiJvmNode5 extends SplitBrainSpec with AccrualFailureDetectorStrategy + +abstract class SplitBrainSpec + extends MultiNodeSpec(SplitBrainMultiJvmSpec) + with MultiNodeClusterSpec { + + import SplitBrainMultiJvmSpec._ + + val side1 = IndexedSeq(first, second) + val side2 = IndexedSeq(third, fourth, fifth) + + "A cluster of 5 members" must { + + "reach initial convergence" taggedAs LongRunningTest in { + awaitClusterUp(first, second, third, fourth, fifth) + + enterBarrier("after-1") + } + + "detect network partition and mark nodes on other side as unreachable" taggedAs LongRunningTest in { + val thirdAddress = address(third) + enterBarrier("before-split") + + runOn(first) { + // split the cluster in two parts (first, second) / (third, fourth, fifth) + for (role1 ← side1; role2 ← side2) { + testConductor.blackhole(role1, role2, Direction.Both).await + } + } + enterBarrier("after-split") + + runOn(side1.last) { + for (role ← side2) markNodeAsUnavailable(role) + } + runOn(side2.last) { + for (role ← side1) markNodeAsUnavailable(role) + } + + runOn(side1: _*) { + awaitCond(cluster.latestGossip.overview.unreachable.map(_.address) == (side2.toSet map address), 20 seconds) + } + runOn(side2: _*) { + awaitCond(cluster.latestGossip.overview.unreachable.map(_.address) == (side1.toSet map address), 20 seconds) + } + + enterBarrier("after-2") + } + + "auto-down the other nodes and form new cluster with potentially new leader" taggedAs LongRunningTest in { + + runOn(side1: _*) { + // auto-down = on + awaitCond(cluster.latestGossip.overview.unreachable.forall(m ⇒ m.status == MemberStatus.Down), 15 seconds) + cluster.latestGossip.overview.unreachable.map(_.address) must be(side2.toSet map address) + awaitUpConvergence(side1.size, side2 map address) + assertLeader(side1: _*) + } + + runOn(side2: _*) { + // auto-down = on + awaitCond(cluster.latestGossip.overview.unreachable.forall(m ⇒ m.status == MemberStatus.Down), 15 seconds) + cluster.latestGossip.overview.unreachable.map(_.address) must be(side1.toSet map address) + awaitUpConvergence(side2.size, side1 map address) + assertLeader(side2: _*) + } + + enterBarrier("after-3") + } + + } + +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala index 0376545b41..397d824ef4 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala @@ -99,12 +99,14 @@ abstract class TransitionSpec "start nodes as singleton clusters" taggedAs LongRunningTest in { - startClusterNode() - cluster.isSingletonCluster must be(true) - cluster.status must be(Joining) - cluster.convergence.isDefined must be(true) - cluster.leaderActions() - cluster.status must be(Up) + runOn(first) { + startClusterNode() + cluster.isSingletonCluster must be(true) + cluster.status must be(Joining) + cluster.convergence.isDefined must be(true) + cluster.leaderActions() + cluster.status must be(Up) + } enterBarrier("after-1") } @@ -244,13 +246,20 @@ abstract class TransitionSpec } "startup a second separated cluster consisting of nodes fourth and fifth" taggedAs LongRunningTest in { + runOn(fifth) { + startClusterNode() + cluster.leaderActions() + cluster.status must be(Up) + } + enterBarrier("fifth-started") + runOn(fourth) { cluster.join(fifth) } runOn(fifth) { awaitMembers(fourth, fifth) } - testConductor.enter("fourth-joined") + enterBarrier("fourth-joined") fifth gossipTo fourth fourth gossipTo fifth diff --git a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala index 5c7186502c..df69a52e19 100644 --- a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala @@ -114,7 +114,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" fd.isAvailable(conn) must be(true) } - "mark node as dead after explicit removal of connection" in { + "mark node as available after explicit removal of connection" in { val timeInterval = List[Long](0, 1000, 100, 100, 100) val fd = createFailureDetector(clock = fakeTimeGenerator(timeInterval)) @@ -124,7 +124,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" fd.isAvailable(conn) must be(true) fd.remove(conn) - fd.isAvailable(conn) must be(false) + fd.isAvailable(conn) must be(true) } "mark node as available after explicit removal of connection and receiving heartbeat again" in { @@ -140,7 +140,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" fd.remove(conn) - fd.isAvailable(conn) must be(false) //3300 + fd.isAvailable(conn) must be(true) //3300 // it receives heartbeat from an explicitly removed node fd.heartbeat(conn) //4400 diff --git a/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala b/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala index f35bca381d..9ddc9942b0 100644 --- a/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala +++ b/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala @@ -57,4 +57,9 @@ class FailureDetectorPuppet(system: ActorSystem, settings: ClusterSettings) exte log.debug("Removing cluster node [{}]", connection) connections.remove(connection) } + + def reset(): Unit = { + log.debug("Resetting failure detector") + connections.clear() + } } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/QuietReporter.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/QuietReporter.scala index f323b75e23..eea5f079d3 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/QuietReporter.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/QuietReporter.scala @@ -8,8 +8,8 @@ import org.scalatest.tools.StandardOutReporter import org.scalatest.events._ import java.lang.Boolean.getBoolean -class QuietReporter(inColor: Boolean) extends StandardOutReporter(false, inColor, false, true) { - def this() = this(!getBoolean("akka.test.nocolor")) +class QuietReporter(inColor: Boolean, withDurations: Boolean = false) extends StandardOutReporter(withDurations, inColor, false, true) { + def this() = this(!getBoolean("akka.test.nocolor"), !getBoolean("akka.test.nodurations")) override def apply(event: Event): Unit = event match { case _: RunStarting ⇒ () diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index a8d2cb2680..f365d5ce19 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -71,6 +71,9 @@ akka { # If this is "on", Akka will log all outbound messages at DEBUG level, if off then they are not logged log-sent-messages = off + # If this is "on", Akka will log all RemoteLifeCycleEvents at the level defined for each, if off then they are not logged + log-remote-lifecycle-events = off + # Each property is annotated with (I) or (O) or (I&O), where I stands for “inbound” and O for “outbound” connections. # The NettyRemoteTransport always starts the server role to allow inbound connections, and it starts # active client connections whenever sending to a destination which is not yet connected; if configured diff --git a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala index 951c007fbc..88a7003309 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala @@ -14,4 +14,5 @@ class RemoteSettings(val config: Config, val systemName: String) { val LogSend: Boolean = getBoolean("akka.remote.log-sent-messages") val RemoteSystemDaemonAckTimeout: Duration = Duration(getMilliseconds("akka.remote.remote-daemon-ack-timeout"), MILLISECONDS) val UntrustedMode: Boolean = getBoolean("akka.remote.untrusted-mode") + val LogRemoteLifeCycleEvents: Boolean = getBoolean("akka.remote.log-remote-lifecycle-events") } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala index df76fe58a5..3aa3818de7 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala @@ -202,7 +202,7 @@ abstract class RemoteTransport(val system: ExtendedActorSystem, val provider: Re */ def notifyListeners(message: RemoteLifeCycleEvent): Unit = { system.eventStream.publish(message) - system.log.log(message.logLevel, "{}", message) + if (logRemoteLifeCycleEvents) log.log(message.logLevel, "{}", message) } /** @@ -220,6 +220,11 @@ abstract class RemoteTransport(val system: ExtendedActorSystem, val provider: Re */ protected def useUntrustedMode: Boolean + /** + * When this method returns true, RemoteLifeCycleEvents will be logged as well as be put onto the eventStream. + */ + protected def logRemoteLifeCycleEvents: Boolean + /** * Returns a newly created AkkaRemoteProtocol with the given message payload. */ diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 9c6e4c85f2..5e3c989fd5 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -128,6 +128,8 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider override protected def useUntrustedMode = remoteSettings.UntrustedMode + override protected def logRemoteLifeCycleEvents = remoteSettings.LogRemoteLifeCycleEvents + val server: NettyRemoteServer = try createServer() catch { case NonFatal(ex) ⇒ shutdown(); throw ex } /** diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala index 3ec52e2243..43a46240ac 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala @@ -29,6 +29,7 @@ class RemoteConfigSpec extends AkkaSpec( RemoteTransport must be("akka.remote.netty.NettyRemoteTransport") UntrustedMode must be(false) RemoteSystemDaemonAckTimeout must be(30 seconds) + LogRemoteLifeCycleEvents must be(false) } "be able to parse Netty config elements" in {