diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index a202778fe5..2b946ec1da 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -26,6 +26,10 @@ object RoutingSpec { router = round-robin nr-of-instances = 3 } + /router2 { + router = round-robin + nr-of-instances = 3 + } /myrouter { router = "akka.routing.RoutingSpec$MyRouter" foo = bar @@ -129,7 +133,7 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with } "use configured nr-of-instances when router is specified" in { - val router = system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(nrOfInstances = 2)), "router1") + val router = system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(nrOfInstances = 2)), "router2") Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(3) system.stop(router) } diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 4da4dd6620..a06e9273cb 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -18,10 +18,13 @@ akka { # how long to wait for one of the seed nodes to reply to initial join request seed-node-timeout = 5s - # automatic join the seed-nodes at startup + # Automatic join the seed-nodes at startup. + # If seed-nodes is empty it will join itself and become a single node cluster. auto-join = on - # should the 'leader' in the cluster be allowed to automatically mark unreachable nodes as DOWN? + # Should the 'leader' in the cluster be allowed to automatically mark unreachable nodes as DOWN? + # Using auto-down implies that two separate clusters will automatically be formed in case of + # network partition. auto-down = on # the number of gossip daemon actors diff --git a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala index c397d065e5..f1c761dec7 100644 --- a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala @@ -80,12 +80,12 @@ class AccrualFailureDetector( settings: ClusterSettings) = this( system, - settings.FailureDetectorThreshold, - settings.FailureDetectorMaxSampleSize, - settings.FailureDetectorAcceptableHeartbeatPause, - settings.FailureDetectorMinStdDeviation, - settings.HeartbeatInterval, - AccrualFailureDetector.realClock) + threshold = settings.FailureDetectorThreshold, + maxSampleSize = settings.FailureDetectorMaxSampleSize, + minStdDeviation = settings.FailureDetectorMinStdDeviation, + acceptableHeartbeatPause = settings.FailureDetectorAcceptableHeartbeatPause, + firstHeartbeatEstimate = settings.HeartbeatInterval, + clock = AccrualFailureDetector.realClock) private val log = Logging(system, "FailureDetector") @@ -107,8 +107,7 @@ class AccrualFailureDetector( private case class State( version: Long = 0L, history: Map[Address, HeartbeatHistory] = Map.empty, - timestamps: Map[Address, Long] = Map.empty[Address, Long], - explicitRemovals: Set[Address] = Set.empty[Address]) + timestamps: Map[Address, Long] = Map.empty[Address, Long]) private val state = new AtomicReference[State](State()) @@ -141,8 +140,7 @@ class AccrualFailureDetector( val newState = oldState copy (version = oldState.version + 1, history = oldState.history + (connection -> newHistory), - timestamps = oldState.timestamps + (connection -> timestamp), // record new timestamp, - explicitRemovals = oldState.explicitRemovals - connection) + timestamps = oldState.timestamps + (connection -> timestamp)) // record new timestamp // if we won the race then update else try again if (!state.compareAndSet(oldState, newState)) heartbeat(connection) // recur @@ -158,9 +156,7 @@ class AccrualFailureDetector( val oldState = state.get val oldTimestamp = oldState.timestamps.get(connection) - // if connection has been removed explicitly - if (oldState.explicitRemovals.contains(connection)) Double.MaxValue - else if (oldTimestamp.isEmpty) 0.0 // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections + if (oldTimestamp.isEmpty) 0.0 // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections else { val timeDiff = clock() - oldTimestamp.get @@ -208,13 +204,24 @@ class AccrualFailureDetector( if (oldState.history.contains(connection)) { val newState = oldState copy (version = oldState.version + 1, history = oldState.history - connection, - timestamps = oldState.timestamps - connection, - explicitRemovals = oldState.explicitRemovals + connection) + timestamps = oldState.timestamps - connection) // if we won the race then update else try again if (!state.compareAndSet(oldState, newState)) remove(connection) // recur } } + + def reset(): Unit = { + @tailrec + def doReset(): Unit = { + val oldState = state.get + val newState = oldState.copy(version = oldState.version + 1, history = Map.empty, timestamps = Map.empty) + // if we won the race then update else try again + if (!state.compareAndSet(oldState, newState)) doReset() // recur + } + log.debug("Resetting failure detector") + doReset() + } } private[cluster] object HeartbeatHistory { diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index caecf3906b..357d610ed5 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -27,6 +27,7 @@ import javax.management._ import MemberStatus._ import scala.annotation.tailrec import scala.collection.immutable.{ Map, SortedSet } +import scala.collection.GenTraversableOnce /** * Interface for membership change listener. @@ -179,6 +180,20 @@ object Member { case (Joining, Joining) ⇒ m1 case (Up, Up) ⇒ m1 } + + // FIXME Workaround for https://issues.scala-lang.org/browse/SI-5986 + // SortedSet + and ++ operators replaces existing element + // Use these :+ and :++ operators for the Gossip members + implicit def sortedSetWorkaround(sortedSet: SortedSet[Member]): SortedSetWorkaround = new SortedSetWorkaround(sortedSet) + class SortedSetWorkaround(sortedSet: SortedSet[Member]) { + implicit def :+(elem: Member): SortedSet[Member] = { + if (sortedSet.contains(elem)) sortedSet + else sortedSet + elem + } + + implicit def :++(elems: GenTraversableOnce[Member]): SortedSet[Member] = + sortedSet ++ (elems.toSet diff sortedSet) + } } /** @@ -226,6 +241,7 @@ case class GossipOverview( object Gossip { val emptyMembers: SortedSet[Member] = SortedSet.empty + } /** @@ -300,7 +316,7 @@ case class Gossip( */ def :+(member: Member): Gossip = { if (members contains member) this - else this copy (members = members + member) + else this copy (members = members :+ member) } /** @@ -329,7 +345,7 @@ case class Gossip( // 4. merge members by selecting the single Member with highest MemberStatus out of the Member groups, // and exclude unreachable - val mergedMembers = Gossip.emptyMembers ++ Member.pickHighestPriority(this.members, that.members).filterNot(mergedUnreachable.contains) + val mergedMembers = Gossip.emptyMembers :++ Member.pickHighestPriority(this.members, that.members).filterNot(mergedUnreachable.contains) // 5. fresh seen table val mergedSeen = Map.empty[Address, VectorClock] @@ -364,20 +380,23 @@ private[cluster] final class ClusterCommandDaemon(cluster: Cluster) extends Acto val log = Logging(context.system, this) def receive = { - case JoinSeedNode ⇒ joinSeedNode() - case InitJoin ⇒ sender ! InitJoinAck(cluster.selfAddress) - case InitJoinAck(address) ⇒ cluster.join(address) - case Join(address) ⇒ cluster.joining(address) - case Down(address) ⇒ cluster.downing(address) - case Leave(address) ⇒ cluster.leaving(address) - case Exit(address) ⇒ cluster.exiting(address) - case Remove(address) ⇒ cluster.removing(address) + case JoinSeedNode ⇒ joinSeedNode() + case InitJoin ⇒ sender ! InitJoinAck(cluster.selfAddress) + case InitJoinAck(address) ⇒ cluster.join(address) + case Join(address) ⇒ cluster.joining(address) + case Down(address) ⇒ cluster.downing(address) + case Leave(address) ⇒ cluster.leaving(address) + case Exit(address) ⇒ cluster.exiting(address) + case Remove(address) ⇒ cluster.removing(address) + case Failure(e: AskTimeoutException) ⇒ joinSeedNodeTimeout() } def joinSeedNode(): Unit = { val seedRoutees = for (address ← cluster.seedNodes; if address != cluster.selfAddress) yield self.path.toStringWithAddress(address) - if (seedRoutees.nonEmpty) { + if (seedRoutees.isEmpty) { + cluster join cluster.selfAddress + } else { implicit val within = Timeout(cluster.clusterSettings.SeedNodeTimeout) val seedRouter = context.actorOf( Props.empty.withRouter(ScatterGatherFirstCompletedRouter( @@ -387,6 +406,8 @@ private[cluster] final class ClusterCommandDaemon(cluster: Cluster) extends Acto } } + def joinSeedNodeTimeout(): Unit = cluster join cluster.selfAddress + override def unhandled(unknown: Any) = log.error("Illegal command [{}]", unknown) } @@ -534,10 +555,9 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) } private val state = { - val member = Member(selfAddress, Joining) - val versionedGossip = Gossip(members = Gossip.emptyMembers + member) :+ vclockNode // add me as member and update my vector clock - val seenVersionedGossip = versionedGossip seen selfAddress - new AtomicReference[State](State(seenVersionedGossip)) + // note that self is not initially member, + // and the Gossip is not versioned for this 'Node' yet + new AtomicReference[State](State(Gossip(members = Gossip.emptyMembers))) } // try to join one of the nodes defined in the 'akka.cluster.seed-nodes' @@ -797,7 +817,9 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val newUnreachableMembers = localUnreachable filterNot { _.address == node } val newOverview = localGossip.overview copy (unreachable = newUnreachableMembers) - val newMembers = localMembers + Member(node, Joining) // add joining node as Joining + // add joining node as Joining + // add self in case someone else joins before self has joined (Set discards duplicates) + val newMembers = localMembers :+ Member(node, Joining) :+ Member(selfAddress, Joining) val newGossip = localGossip copy (overview = newOverview, members = newMembers) val versionedGossip = newGossip :+ vclockNode @@ -939,11 +961,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) if (!localGossip.overview.isNonDownUnreachable(from)) { val winningGossip = - if (isSingletonCluster(localState) && localGossip.overview.unreachable.isEmpty && remoteGossip.members.contains(self)) { - // a fresh singleton cluster that is joining, no need to merge, use received gossip - remoteGossip - - } else if (remoteGossip.version <> localGossip.version) { + if (remoteGossip.version <> localGossip.version) { // concurrent val mergedGossip = remoteGossip merge localGossip val versionedMergedGossip = mergedGossip :+ vclockNode diff --git a/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala index 60af0a1c41..1aa926c5e5 100644 --- a/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala @@ -25,4 +25,9 @@ trait FailureDetector { * Removes the heartbeat management for a connection. */ def remove(connection: Address): Unit + + /** + * Removes all connections and starts over. + */ + def reset(): Unit } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala index 38f03a4e66..20dec26a45 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala @@ -16,7 +16,9 @@ object JoinSeedNodeMultiJvmSpec extends MultiNodeConfig { val ordinary1 = role("ordinary1") val ordinary2 = role("ordinary2") - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + commonConfig(debugConfig(on = false). + withFallback(ConfigFactory.parseString("akka.cluster.auto-join = on")). + withFallback(MultiNodeClusterSpec.clusterConfig)) } class JoinSeedNodeMultiJvmNode1 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy @@ -33,6 +35,23 @@ abstract class JoinSeedNodeSpec override def seedNodes = IndexedSeq(seed1, seed2) "A cluster with configured seed nodes" must { + "start the seed nodes sequentially" taggedAs LongRunningTest in { + runOn(seed1) { + startClusterNode() + } + enterBarrier("seed1-started") + + runOn(seed2) { + startClusterNode() + } + enterBarrier("seed2-started") + + runOn(seed1, seed2) { + awaitUpConvergence(2) + } + enterBarrier("after-1") + } + "join the seed nodes at startup" taggedAs LongRunningTest in { startClusterNode() @@ -40,7 +59,7 @@ abstract class JoinSeedNodeSpec awaitUpConvergence(4) - enterBarrier("after") + enterBarrier("after-2") } } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 79e3a67e1e..3264c661b0 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -20,6 +20,7 @@ import akka.actor.RootActorPath object MultiNodeClusterSpec { def clusterConfig: Config = ConfigFactory.parseString(""" akka.cluster { + auto-join = off auto-down = off gossip-interval = 200 ms heartbeat-interval = 400 ms @@ -99,10 +100,15 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu def cluster: Cluster = clusterNode /** - * Use this method instead of 'cluster.self' - * for the initial startup of the cluster node. + * Use this method for the initial startup of the cluster node. */ - def startClusterNode(): Unit = cluster.self + def startClusterNode(): Unit = { + if (cluster.latestGossip.members.isEmpty) { + cluster join myself + awaitCond(cluster.latestGossip.members.exists(_.address == address(myself))) + } else + cluster.self + } /** * Initialize the cluster with the specified member @@ -192,7 +198,7 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu } } - def roleOfLeader(nodesInCluster: Seq[RoleName]): RoleName = { + def roleOfLeader(nodesInCluster: Seq[RoleName] = roles): RoleName = { nodesInCluster.length must not be (0) nodesInCluster.sorted.head } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala index 9f79af2f13..3c35e95333 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala @@ -16,6 +16,7 @@ object SingletonClusterMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false). withFallback(ConfigFactory.parseString(""" akka.cluster { + auto-join = on auto-down = on failure-detector.threshold = 4 } @@ -38,12 +39,20 @@ abstract class SingletonClusterSpec "A cluster of 2 nodes" must { - "not be singleton cluster when joined" taggedAs LongRunningTest in { + "become singleton cluster when started with 'auto-join=on' and 'seed-nodes=[]'" taggedAs LongRunningTest in { + startClusterNode() + awaitUpConvergence(1) + cluster.isSingletonCluster must be(true) + + enterBarrier("after-1") + } + + "not be singleton cluster when joined with other node" taggedAs LongRunningTest in { awaitClusterUp(first, second) cluster.isSingletonCluster must be(false) assertLeader(first, second) - enterBarrier("after-1") + enterBarrier("after-2") } "become singleton cluster when one node is shutdown" taggedAs LongRunningTest in { @@ -58,7 +67,7 @@ abstract class SingletonClusterSpec assertLeader(first) } - enterBarrier("after-2") + enterBarrier("after-3") } } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala new file mode 100644 index 0000000000..24e94f715d --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala @@ -0,0 +1,111 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.ConfigFactory +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.util.duration._ +import akka.actor.Address +import akka.remote.testconductor.Direction + +object SplitBrainMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + val fifth = role("fifth") + + commonConfig(debugConfig(on = false). + withFallback(ConfigFactory.parseString(""" + akka.cluster { + auto-down = on + failure-detector.threshold = 4 + }""")). + withFallback(MultiNodeClusterSpec.clusterConfig)) +} + +class SplitBrainWithFailureDetectorPuppetMultiJvmNode1 extends SplitBrainSpec with FailureDetectorPuppetStrategy +class SplitBrainWithFailureDetectorPuppetMultiJvmNode2 extends SplitBrainSpec with FailureDetectorPuppetStrategy +class SplitBrainWithFailureDetectorPuppetMultiJvmNode3 extends SplitBrainSpec with FailureDetectorPuppetStrategy +class SplitBrainWithFailureDetectorPuppetMultiJvmNode4 extends SplitBrainSpec with FailureDetectorPuppetStrategy +class SplitBrainWithFailureDetectorPuppetMultiJvmNode5 extends SplitBrainSpec with FailureDetectorPuppetStrategy + +class SplitBrainWithAccrualFailureDetectorMultiJvmNode1 extends SplitBrainSpec with AccrualFailureDetectorStrategy +class SplitBrainWithAccrualFailureDetectorMultiJvmNode2 extends SplitBrainSpec with AccrualFailureDetectorStrategy +class SplitBrainWithAccrualFailureDetectorMultiJvmNode3 extends SplitBrainSpec with AccrualFailureDetectorStrategy +class SplitBrainWithAccrualFailureDetectorMultiJvmNode4 extends SplitBrainSpec with AccrualFailureDetectorStrategy +class SplitBrainWithAccrualFailureDetectorMultiJvmNode5 extends SplitBrainSpec with AccrualFailureDetectorStrategy + +abstract class SplitBrainSpec + extends MultiNodeSpec(SplitBrainMultiJvmSpec) + with MultiNodeClusterSpec { + + import SplitBrainMultiJvmSpec._ + + val side1 = IndexedSeq(first, second) + val side2 = IndexedSeq(third, fourth, fifth) + + "A cluster of 5 members" must { + + "reach initial convergence" taggedAs LongRunningTest in { + awaitClusterUp(first, second, third, fourth, fifth) + + enterBarrier("after-1") + } + + "detect network partition and mark nodes on other side as unreachable" taggedAs LongRunningTest in { + val thirdAddress = address(third) + enterBarrier("before-split") + + runOn(first) { + // split the cluster in two parts (first, second) / (third, fourth, fifth) + for (role1 ← side1; role2 ← side2) { + testConductor.blackhole(role1, role2, Direction.Both).await + } + } + enterBarrier("after-split") + + runOn(side1.last) { + for (role ← side2) markNodeAsUnavailable(role) + } + runOn(side2.last) { + for (role ← side1) markNodeAsUnavailable(role) + } + + runOn(side1: _*) { + awaitCond(cluster.latestGossip.overview.unreachable.map(_.address) == (side2.toSet map address), 20 seconds) + } + runOn(side2: _*) { + awaitCond(cluster.latestGossip.overview.unreachable.map(_.address) == (side1.toSet map address), 20 seconds) + } + + enterBarrier("after-2") + } + + "auto-down the other nodes and form new cluster with potentially new leader" taggedAs LongRunningTest in { + + runOn(side1: _*) { + // auto-down = on + awaitCond(cluster.latestGossip.overview.unreachable.forall(m ⇒ m.status == MemberStatus.Down), 15 seconds) + cluster.latestGossip.overview.unreachable.map(_.address) must be(side2.toSet map address) + awaitUpConvergence(side1.size, side2 map address) + assertLeader(side1: _*) + } + + runOn(side2: _*) { + // auto-down = on + awaitCond(cluster.latestGossip.overview.unreachable.forall(m ⇒ m.status == MemberStatus.Down), 15 seconds) + cluster.latestGossip.overview.unreachable.map(_.address) must be(side1.toSet map address) + awaitUpConvergence(side2.size, side1 map address) + assertLeader(side2: _*) + } + + enterBarrier("after-3") + } + + } + +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala index d661f0cc51..3be082d2f3 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala @@ -25,6 +25,7 @@ object SunnyWeatherMultiJvmSpec extends MultiNodeConfig { akka.cluster { # FIXME remove this (use default) when ticket #2239 has been fixed gossip-interval = 400 ms + auto-join = off } akka.loglevel = INFO """)) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala index 0376545b41..397d824ef4 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala @@ -99,12 +99,14 @@ abstract class TransitionSpec "start nodes as singleton clusters" taggedAs LongRunningTest in { - startClusterNode() - cluster.isSingletonCluster must be(true) - cluster.status must be(Joining) - cluster.convergence.isDefined must be(true) - cluster.leaderActions() - cluster.status must be(Up) + runOn(first) { + startClusterNode() + cluster.isSingletonCluster must be(true) + cluster.status must be(Joining) + cluster.convergence.isDefined must be(true) + cluster.leaderActions() + cluster.status must be(Up) + } enterBarrier("after-1") } @@ -244,13 +246,20 @@ abstract class TransitionSpec } "startup a second separated cluster consisting of nodes fourth and fifth" taggedAs LongRunningTest in { + runOn(fifth) { + startClusterNode() + cluster.leaderActions() + cluster.status must be(Up) + } + enterBarrier("fifth-started") + runOn(fourth) { cluster.join(fifth) } runOn(fifth) { awaitMembers(fourth, fifth) } - testConductor.enter("fourth-joined") + enterBarrier("fourth-joined") fifth gossipTo fourth fourth gossipTo fifth diff --git a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala index 5c7186502c..df69a52e19 100644 --- a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala @@ -114,7 +114,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" fd.isAvailable(conn) must be(true) } - "mark node as dead after explicit removal of connection" in { + "mark node as available after explicit removal of connection" in { val timeInterval = List[Long](0, 1000, 100, 100, 100) val fd = createFailureDetector(clock = fakeTimeGenerator(timeInterval)) @@ -124,7 +124,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" fd.isAvailable(conn) must be(true) fd.remove(conn) - fd.isAvailable(conn) must be(false) + fd.isAvailable(conn) must be(true) } "mark node as available after explicit removal of connection and receiving heartbeat again" in { @@ -140,7 +140,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" fd.remove(conn) - fd.isAvailable(conn) must be(false) //3300 + fd.isAvailable(conn) must be(true) //3300 // it receives heartbeat from an explicitly removed node fd.heartbeat(conn) //4400 diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index 6f70193715..e818847969 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -86,11 +86,13 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { "use the address of the remote transport" in { cluster.selfAddress must be(selfAddress) - cluster.self.address must be(selfAddress) } - "initially be singleton cluster and reach convergence immediately" in { - cluster.isSingletonCluster must be(true) + "initially become singleton cluster when joining itself and reach convergence" in { + cluster.isSingletonCluster must be(false) // auto-join = off + cluster.join(selfAddress) + awaitCond(cluster.isSingletonCluster) + cluster.self.address must be(selfAddress) cluster.latestGossip.members.map(_.address) must be(Set(selfAddress)) memberStatus(selfAddress) must be(Some(MemberStatus.Joining)) cluster.convergence.isDefined must be(true) diff --git a/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala b/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala index f35bca381d..9ddc9942b0 100644 --- a/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala +++ b/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala @@ -57,4 +57,9 @@ class FailureDetectorPuppet(system: ActorSystem, settings: ClusterSettings) exte log.debug("Removing cluster node [{}]", connection) connections.remove(connection) } + + def reset(): Unit = { + log.debug("Resetting failure detector") + connections.clear() + } } diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 584625cc82..24e803aecc 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -388,8 +388,8 @@ object AkkaBuild extends Build { if (tags.isEmpty) Seq.empty else Seq(Tests.Argument("-n", tags.mkString(" "))) }, - // show full stack traces - testOptions in Test += Tests.Argument("-oF") + // show full stack traces and test case durations + testOptions in Test += Tests.Argument("-oDF") ) lazy val formatSettings = ScalariformPlugin.scalariformSettings ++ Seq(