diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala index f8dd381c06..fbc023e884 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -4,6 +4,7 @@ package akka.cluster import language.postfixOps +import scala.annotation.tailrec import scala.collection.immutable import scala.concurrent.duration._ import akka.actor.{ ActorLogging, ActorRef, ActorSelection, Address, Actor, RootActorPath, Props } @@ -70,8 +71,8 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg val selfHeartbeat = Heartbeat(selfAddress) var state = ClusterHeartbeatSenderState( - ring = HeartbeatNodeRing(selfUniqueAddress, Set(selfUniqueAddress), MonitoredByNrOfMembers), - unreachable = Set.empty[UniqueAddress], + ring = HeartbeatNodeRing(selfUniqueAddress, Set(selfUniqueAddress), Set.empty, MonitoredByNrOfMembers), + oldReceiversNowUnreachable = Set.empty[UniqueAddress], failureDetector) // start periodic heartbeat to other nodes in cluster @@ -79,7 +80,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg HeartbeatInterval, self, HeartbeatTick) override def preStart(): Unit = { - cluster.subscribe(self, classOf[MemberEvent]) + cluster.subscribe(self, classOf[MemberEvent], classOf[ReachabilityEvent]) } override def postStop(): Unit = { @@ -108,6 +109,8 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg case HeartbeatRsp(from) ⇒ heartbeatRsp(from) case MemberUp(m) ⇒ addMember(m) case MemberRemoved(m, _) ⇒ removeMember(m) + case UnreachableMember(m) ⇒ unreachableMember(m) + case ReachableMember(m) ⇒ reachableMember(m) case _: MemberEvent ⇒ // not interested in other types of MemberEvent case ExpectedFirstHeartbeat(from) ⇒ triggerFirstHeartbeat(from) } @@ -116,7 +119,8 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg val nodes: Set[UniqueAddress] = snapshot.members.collect { case m if m.status == MemberStatus.Up ⇒ m.uniqueAddress }(collection.breakOut) - state = state.init(nodes) + val unreachable: Set[UniqueAddress] = snapshot.unreachable.map(_.uniqueAddress) + state = state.init(nodes, unreachable) } def addMember(m: Member): Unit = @@ -132,6 +136,12 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg state = state.removeMember(m.uniqueAddress) } + def unreachableMember(m: Member): Unit = + state = state.unreachableMember(m.uniqueAddress) + + def reachableMember(m: Member): Unit = + state = state.reachableMember(m.uniqueAddress) + def heartbeat(): Unit = { state.activeReceivers foreach { to ⇒ if (cluster.failureDetector.isMonitoring(to.address)) @@ -167,15 +177,15 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg */ private[cluster] final case class ClusterHeartbeatSenderState( ring: HeartbeatNodeRing, - unreachable: Set[UniqueAddress], + oldReceiversNowUnreachable: Set[UniqueAddress], failureDetector: FailureDetectorRegistry[Address]) { - val activeReceivers: Set[UniqueAddress] = ring.myReceivers ++ unreachable + val activeReceivers: Set[UniqueAddress] = ring.myReceivers ++ oldReceiversNowUnreachable def selfAddress = ring.selfAddress - def init(nodes: Set[UniqueAddress]): ClusterHeartbeatSenderState = - copy(ring = ring.copy(nodes = nodes + selfAddress)) + def init(nodes: Set[UniqueAddress], unreachable: Set[UniqueAddress]): ClusterHeartbeatSenderState = + copy(ring = ring.copy(nodes = nodes + selfAddress, unreachable = unreachable)) def addMember(node: UniqueAddress): ClusterHeartbeatSenderState = membershipChange(ring :+ node) @@ -184,33 +194,39 @@ private[cluster] final case class ClusterHeartbeatSenderState( val newState = membershipChange(ring :- node) failureDetector remove node.address - if (newState.unreachable(node)) - newState.copy(unreachable = newState.unreachable - node) + if (newState.oldReceiversNowUnreachable(node)) + newState.copy(oldReceiversNowUnreachable = newState.oldReceiversNowUnreachable - node) else newState } + def unreachableMember(node: UniqueAddress): ClusterHeartbeatSenderState = + membershipChange(ring.copy(unreachable = ring.unreachable + node)) + + def reachableMember(node: UniqueAddress): ClusterHeartbeatSenderState = + membershipChange(ring.copy(unreachable = ring.unreachable - node)) + private def membershipChange(newRing: HeartbeatNodeRing): ClusterHeartbeatSenderState = { val oldReceivers = ring.myReceivers val removedReceivers = oldReceivers -- newRing.myReceivers - var newUnreachable = unreachable + var adjustedOldReceiversNowUnreachable = oldReceiversNowUnreachable removedReceivers foreach { a ⇒ if (failureDetector.isAvailable(a.address)) failureDetector remove a.address else - newUnreachable += a + adjustedOldReceiversNowUnreachable += a } - copy(newRing, newUnreachable) + copy(newRing, adjustedOldReceiversNowUnreachable) } def heartbeatRsp(from: UniqueAddress): ClusterHeartbeatSenderState = if (activeReceivers(from)) { failureDetector heartbeat from.address - if (unreachable(from)) { + if (oldReceiversNowUnreachable(from)) { // back from unreachable, ok to stop heartbeating to it if (!ring.myReceivers(from)) failureDetector remove from.address - copy(unreachable = unreachable - from) + copy(oldReceiversNowUnreachable = oldReceiversNowUnreachable - from) } else this } else this @@ -225,7 +241,11 @@ private[cluster] final case class ClusterHeartbeatSenderState( * * It is immutable, i.e. the methods return new instances. */ -private[cluster] final case class HeartbeatNodeRing(selfAddress: UniqueAddress, nodes: Set[UniqueAddress], monitoredByNrOfMembers: Int) { +private[cluster] final case class HeartbeatNodeRing( + selfAddress: UniqueAddress, + nodes: Set[UniqueAddress], + unreachable: Set[UniqueAddress], + monitoredByNrOfMembers: Int) { require(nodes contains selfAddress, s"nodes [${nodes.mkString(", ")}] must contain selfAddress [${selfAddress}]") @@ -249,14 +269,41 @@ private[cluster] final case class HeartbeatNodeRing(selfAddress: UniqueAddress, /** * The receivers to use from a specified sender. */ - def receivers(sender: UniqueAddress): immutable.Set[UniqueAddress] = + def receivers(sender: UniqueAddress): Set[UniqueAddress] = if (useAllAsReceivers) nodeRing - sender else { - val slice = nodeRing.from(sender).tail.take(monitoredByNrOfMembers) - if (slice.size < monitoredByNrOfMembers) - (slice ++ nodeRing.take(monitoredByNrOfMembers - slice.size)) - else slice + + // Pick nodes from the iterator until n nodes that are not unreachable have been selected. + // Intermediate unreachable nodes up to `monitoredByNrOfMembers` are also included in the result. + // The reason for not limiting it to strictly monitoredByNrOfMembers is that the leader must + // be able to continue its duties (e.g. removal of downed nodes) when many nodes are shutdown + // at the same time and nobody in the remaining cluster is monitoring some of the shutdown nodes. + // This was reported in issue #16624. + @tailrec def take(n: Int, iter: Iterator[UniqueAddress], acc: Set[UniqueAddress]): (Int, Set[UniqueAddress]) = + if (iter.isEmpty || n == 0) (n, acc) + else { + val next = iter.next() + val isUnreachable = unreachable(next) + if (isUnreachable && acc.size >= monitoredByNrOfMembers) + take(n, iter, acc) // skip the unreachable, since we have already picked `monitoredByNrOfMembers` + else if (isUnreachable) + take(n, iter, acc + next) // include the unreachable, but don't count it + else + take(n - 1, iter, acc + next) // include the reachable + } + + val (remaining, slice1) = take(monitoredByNrOfMembers, nodeRing.from(sender).tail.iterator, Set.empty) + val slice = + if (remaining == 0) + slice1 + else { + // wrap around + val (_, slice2) = take(remaining, nodeRing.to(sender).iterator.filterNot(_ == sender), slice1) + slice2 + } + + slice } /** @@ -267,6 +314,9 @@ private[cluster] final case class HeartbeatNodeRing(selfAddress: UniqueAddress, /** * Remove a node from the ring. */ - def :-(node: UniqueAddress): HeartbeatNodeRing = if (nodes contains node) copy(nodes = nodes - node) else this + def :-(node: UniqueAddress): HeartbeatNodeRing = + if (nodes.contains(node) || unreachable.contains(node)) + copy(nodes = nodes - node, unreachable = unreachable - node) + else this } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningAllOtherNodesSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningAllOtherNodesSpec.scala new file mode 100644 index 0000000000..e7f7b21e07 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningAllOtherNodesSpec.scala @@ -0,0 +1,61 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.cluster + +import scala.concurrent.duration._ +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import com.typesafe.config.ConfigFactory + +object LeaderDowningAllOtherNodesMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + val fifth = role("fifth") + val sixth = role("sixth") + + commonConfig(debugConfig(on = false).withFallback( + ConfigFactory.parseString(""" + akka.cluster.failure-detector.monitored-by-nr-of-members = 2 + akka.cluster.auto-down-unreachable-after = 1s + """)). + withFallback(MultiNodeClusterSpec.clusterConfig)) +} + +class LeaderDowningAllOtherNodesMultiJvmNode1 extends LeaderDowningAllOtherNodesSpec +class LeaderDowningAllOtherNodesMultiJvmNode2 extends LeaderDowningAllOtherNodesSpec +class LeaderDowningAllOtherNodesMultiJvmNode3 extends LeaderDowningAllOtherNodesSpec +class LeaderDowningAllOtherNodesMultiJvmNode4 extends LeaderDowningAllOtherNodesSpec +class LeaderDowningAllOtherNodesMultiJvmNode5 extends LeaderDowningAllOtherNodesSpec +class LeaderDowningAllOtherNodesMultiJvmNode6 extends LeaderDowningAllOtherNodesSpec + +abstract class LeaderDowningAllOtherNodesSpec + extends MultiNodeSpec(LeaderDowningAllOtherNodesMultiJvmSpec) + with MultiNodeClusterSpec { + + import LeaderDowningAllOtherNodesMultiJvmSpec._ + import ClusterEvent._ + + "A cluster of 6 nodes with monitored-by-nr-of-members=2" must { + "setup" taggedAs LongRunningTest in { + // start some + awaitClusterUp(roles: _*) + enterBarrier("after-1") + } + + "remove all shutdown nodes" taggedAs LongRunningTest in { + val others = roles.drop(1) + val shutdownAddresses = others.map(address).toSet + runOn(first) { + for (node ← others) + testConductor.exit(node, 0).await + } + enterBarrier("all-other-shutdown") + awaitMembersUp(numberOfMembers = 1, canNotBePartOfMemberRing = shutdownAddresses, 30.seconds) + } + + } +} diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala index f694c104c4..184cfea15c 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala @@ -52,8 +52,8 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with Matchers { def emptyState: ClusterHeartbeatSenderState = emptyState(aa) def emptyState(selfUniqueAddress: UniqueAddress) = ClusterHeartbeatSenderState( - ring = HeartbeatNodeRing(selfUniqueAddress, Set(selfUniqueAddress), monitoredByNrOfMembers = 3), - unreachable = Set.empty[UniqueAddress], + ring = HeartbeatNodeRing(selfUniqueAddress, Set(selfUniqueAddress), Set.empty, monitoredByNrOfMembers = 3), + oldReceiversNowUnreachable = Set.empty[UniqueAddress], failureDetector = new DefaultFailureDetectorRegistry[Address](() ⇒ new FailureDetectorStub)) def fd(state: ClusterHeartbeatSenderState, node: UniqueAddress): FailureDetectorStub = @@ -67,21 +67,25 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with Matchers { } "init with empty" in { - emptyState.init(Set.empty).activeReceivers should be(Set.empty) + emptyState.init(Set.empty, Set.empty).activeReceivers should be(Set.empty) } "init with self" in { - emptyState.init(Set(aa, bb, cc)).activeReceivers should be(Set(bb, cc)) + emptyState.init(Set(aa, bb, cc), Set.empty).activeReceivers should be(Set(bb, cc)) } "init without self" in { - emptyState.init(Set(bb, cc)).activeReceivers should be(Set(bb, cc)) + emptyState.init(Set(bb, cc), Set.empty).activeReceivers should be(Set(bb, cc)) } "use added members" in { emptyState.addMember(bb).addMember(cc).activeReceivers should be(Set(bb, cc)) } + "use added members also when unreachable" in { + emptyState.addMember(bb).addMember(cc).unreachableMember(bb).activeReceivers should be(Set(bb, cc)) + } + "not use removed members" in { emptyState.addMember(bb).addMember(cc).removeMember(bb).activeReceivers should be(Set(cc)) } @@ -91,6 +95,12 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with Matchers { emptyState.addMember(cc).addMember(dd).addMember(bb).addMember(ee).activeReceivers should be(Set(bb, cc, dd)) } + "use specified number of members + unreachable" in { + // they are sorted by the hash (uid) of the UniqueAddress + emptyState.addMember(cc).addMember(dd).addMember(bb).addMember(ee).unreachableMember(cc) + .activeReceivers should be(Set(bb, cc, dd, ee)) + } + "update failure detector in active set" in { val s1 = emptyState.addMember(bb).addMember(cc).addMember(dd) val s2 = s1.heartbeatRsp(bb).heartbeatRsp(cc).heartbeatRsp(dd).heartbeatRsp(ee) @@ -150,7 +160,7 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with Matchers { operation match { case Add ⇒ if (node != selfUniqueAddress && !state.ring.nodes.contains(node)) { - val oldUnreachable = state.unreachable + val oldUnreachable = state.oldReceiversNowUnreachable state = state.addMember(node) // keep unreachable (oldUnreachable -- state.activeReceivers) should be(Set.empty) @@ -160,7 +170,7 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with Matchers { case Remove ⇒ if (node != selfUniqueAddress && state.ring.nodes.contains(node)) { - val oldUnreachable = state.unreachable + val oldUnreachable = state.oldReceiversNowUnreachable state = state.removeMember(node) // keep unreachable, unless it was the removed if (oldUnreachable(node)) @@ -179,17 +189,18 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with Matchers { fd(state, node).markNodeAsUnavailable() state.failureDetector.isMonitoring(node.address) should be(true) state.failureDetector.isAvailable(node.address) should be(false) + state = state.unreachableMember(node) } case HeartbeatRsp ⇒ if (node != selfUniqueAddress && state.ring.nodes.contains(node)) { - val oldUnreachable = state.unreachable + val oldUnreachable = state.oldReceiversNowUnreachable val oldReceivers = state.activeReceivers val oldRingReceivers = state.ring.myReceivers state = state.heartbeatRsp(node) if (oldUnreachable(node)) - state.unreachable should not contain (node) + state.oldReceiversNowUnreachable should not contain (node) if (oldUnreachable(node) && !oldRingReceivers(node)) state.failureDetector.isMonitoring(node.address) should be(false) @@ -205,7 +216,8 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with Matchers { } } catch { case e: Throwable ⇒ - println(s"Failure context: i=$i, node=$node, op=$operation, unreachable=${state.unreachable}, " + + println(s"Failure context: i=$i, node=$node, op=$operation, " + + s"oldReceiversNowUnreachable=${state.oldReceiversNowUnreachable}, " + s"ringReceivers=${state.ring.myReceivers}, ringNodes=${state.ring.nodes}") throw e } diff --git a/akka-cluster/src/test/scala/akka/cluster/HeartbeatNodeRingPerfSpec.scala b/akka-cluster/src/test/scala/akka/cluster/HeartbeatNodeRingPerfSpec.scala index 8a540a21bb..a4012c2938 100644 --- a/akka-cluster/src/test/scala/akka/cluster/HeartbeatNodeRingPerfSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/HeartbeatNodeRingPerfSpec.scala @@ -18,7 +18,7 @@ class HeartbeatNodeRingPerfSpec extends WordSpec with Matchers { def createHeartbeatNodeRingOfSize(size: Int): HeartbeatNodeRing = { val nodes = (1 to size).map(n ⇒ UniqueAddress(Address("akka.tcp", "sys", "node-" + n, 2552), n)) val selfAddress = nodes(size / 2) - HeartbeatNodeRing(selfAddress, nodes.toSet, 5) + HeartbeatNodeRing(selfAddress, nodes.toSet, Set.empty, 5) } val heartbeatNodeRing = createHeartbeatNodeRingOfSize(nodesSize) @@ -27,7 +27,7 @@ class HeartbeatNodeRingPerfSpec extends WordSpec with Matchers { for (i ← 1 to times) thunk(ring) def myReceivers(ring: HeartbeatNodeRing): Unit = { - val r = HeartbeatNodeRing(ring.selfAddress, ring.nodes, ring.monitoredByNrOfMembers) + val r = HeartbeatNodeRing(ring.selfAddress, ring.nodes, Set.empty, ring.monitoredByNrOfMembers) r.myReceivers.isEmpty should be(false) } diff --git a/akka-cluster/src/test/scala/akka/cluster/HeartbeatNodeRingSpec.scala b/akka-cluster/src/test/scala/akka/cluster/HeartbeatNodeRingSpec.scala index e73293ca49..17ba1e2c4e 100644 --- a/akka-cluster/src/test/scala/akka/cluster/HeartbeatNodeRingSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/HeartbeatNodeRingSpec.scala @@ -19,13 +19,14 @@ class HeartbeatNodeRingSpec extends WordSpec with Matchers { val cc = UniqueAddress(Address("akka.tcp", "sys", "cc", 2552), 3) val dd = UniqueAddress(Address("akka.tcp", "sys", "dd", 2552), 4) val ee = UniqueAddress(Address("akka.tcp", "sys", "ee", 2552), 5) + val ff = UniqueAddress(Address("akka.tcp", "sys", "ff", 2552), 6) - val nodes = Set(aa, bb, cc, dd, ee) + val nodes = Set(aa, bb, cc, dd, ee, ff) "A HashedNodeRing" must { "pick specified number of nodes as receivers" in { - val ring = HeartbeatNodeRing(cc, nodes, 3) + val ring = HeartbeatNodeRing(cc, nodes, Set.empty, 3) ring.myReceivers should be(ring.receivers(cc)) nodes foreach { n ⇒ @@ -35,15 +36,27 @@ class HeartbeatNodeRingSpec extends WordSpec with Matchers { } } + "pick specified number of nodes + unreachable as receivers" in { + val ring = HeartbeatNodeRing(cc, nodes, unreachable = Set(aa, dd, ee), monitoredByNrOfMembers = 3) + ring.myReceivers should be(ring.receivers(cc)) + + ring.receivers(aa) should be(Set(bb, cc, dd, ff)) // unreachable ee skipped + ring.receivers(bb) should be(Set(cc, dd, ee, ff)) // unreachable aa skipped + ring.receivers(cc) should be(Set(dd, ee, ff, bb)) // unreachable aa skipped + ring.receivers(dd) should be(Set(ee, ff, aa, bb, cc)) + ring.receivers(ee) should be(Set(ff, aa, bb, cc)) + ring.receivers(ff) should be(Set(aa, bb, cc)) // unreachable dd and ee skipped + } + "pick all except own as receivers when less than total number of nodes" in { - val expected = Set(aa, bb, dd, ee) - HeartbeatNodeRing(cc, nodes, 4).myReceivers should be(expected) - HeartbeatNodeRing(cc, nodes, 5).myReceivers should be(expected) - HeartbeatNodeRing(cc, nodes, 6).myReceivers should be(expected) + val expected = Set(aa, bb, dd, ee, ff) + HeartbeatNodeRing(cc, nodes, Set.empty, 5).myReceivers should be(expected) + HeartbeatNodeRing(cc, nodes, Set.empty, 6).myReceivers should be(expected) + HeartbeatNodeRing(cc, nodes, Set.empty, 7).myReceivers should be(expected) } "pick none when alone" in { - val ring = HeartbeatNodeRing(cc, Set(cc), 3) + val ring = HeartbeatNodeRing(cc, Set(cc), Set.empty, 3) ring.myReceivers should be(Set()) }