=clu #16624 Expand the monitored nodes when unreachable

* Otherwise the leader might stall (cannot remove downed nodes)
  if many nodes are shutdown at the same time and nobody in the
  remaining cluster is monitoring some of the shutdown nodes.

(cherry picked from commit 1354524c4fde6f40499833bdd4c0edd479e6f906)

Conflicts:
	akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala
	project/AkkaBuild.scala
This commit is contained in:
Patrik Nordwall 2015-01-19 10:10:30 +01:00
parent dc0547dd73
commit bc87dcad78
5 changed files with 177 additions and 41 deletions

View file

@ -4,6 +4,7 @@
package akka.cluster
import language.postfixOps
import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.duration._
import akka.actor.{ ActorLogging, ActorRef, ActorSelection, Address, Actor, RootActorPath, Props }
@ -70,8 +71,8 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
val selfHeartbeat = Heartbeat(selfAddress)
var state = ClusterHeartbeatSenderState(
ring = HeartbeatNodeRing(selfUniqueAddress, Set(selfUniqueAddress), MonitoredByNrOfMembers),
unreachable = Set.empty[UniqueAddress],
ring = HeartbeatNodeRing(selfUniqueAddress, Set(selfUniqueAddress), Set.empty, MonitoredByNrOfMembers),
oldReceiversNowUnreachable = Set.empty[UniqueAddress],
failureDetector)
// start periodic heartbeat to other nodes in cluster
@ -79,7 +80,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
HeartbeatInterval, self, HeartbeatTick)
override def preStart(): Unit = {
cluster.subscribe(self, classOf[MemberEvent])
cluster.subscribe(self, classOf[MemberEvent], classOf[ReachabilityEvent])
}
override def postStop(): Unit = {
@ -108,6 +109,8 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
case HeartbeatRsp(from) heartbeatRsp(from)
case MemberUp(m) addMember(m)
case MemberRemoved(m, _) removeMember(m)
case UnreachableMember(m) unreachableMember(m)
case ReachableMember(m) reachableMember(m)
case _: MemberEvent // not interested in other types of MemberEvent
case ExpectedFirstHeartbeat(from) triggerFirstHeartbeat(from)
}
@ -116,7 +119,8 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
val nodes: Set[UniqueAddress] = snapshot.members.collect {
case m if m.status == MemberStatus.Up m.uniqueAddress
}(collection.breakOut)
state = state.init(nodes)
val unreachable: Set[UniqueAddress] = snapshot.unreachable.map(_.uniqueAddress)
state = state.init(nodes, unreachable)
}
def addMember(m: Member): Unit =
@ -132,6 +136,12 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
state = state.removeMember(m.uniqueAddress)
}
def unreachableMember(m: Member): Unit =
state = state.unreachableMember(m.uniqueAddress)
def reachableMember(m: Member): Unit =
state = state.reachableMember(m.uniqueAddress)
def heartbeat(): Unit = {
state.activeReceivers foreach { to
if (cluster.failureDetector.isMonitoring(to.address))
@ -167,15 +177,15 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
*/
private[cluster] final case class ClusterHeartbeatSenderState(
ring: HeartbeatNodeRing,
unreachable: Set[UniqueAddress],
oldReceiversNowUnreachable: Set[UniqueAddress],
failureDetector: FailureDetectorRegistry[Address]) {
val activeReceivers: Set[UniqueAddress] = ring.myReceivers ++ unreachable
val activeReceivers: Set[UniqueAddress] = ring.myReceivers ++ oldReceiversNowUnreachable
def selfAddress = ring.selfAddress
def init(nodes: Set[UniqueAddress]): ClusterHeartbeatSenderState =
copy(ring = ring.copy(nodes = nodes + selfAddress))
def init(nodes: Set[UniqueAddress], unreachable: Set[UniqueAddress]): ClusterHeartbeatSenderState =
copy(ring = ring.copy(nodes = nodes + selfAddress, unreachable = unreachable))
def addMember(node: UniqueAddress): ClusterHeartbeatSenderState =
membershipChange(ring :+ node)
@ -184,33 +194,39 @@ private[cluster] final case class ClusterHeartbeatSenderState(
val newState = membershipChange(ring :- node)
failureDetector remove node.address
if (newState.unreachable(node))
newState.copy(unreachable = newState.unreachable - node)
if (newState.oldReceiversNowUnreachable(node))
newState.copy(oldReceiversNowUnreachable = newState.oldReceiversNowUnreachable - node)
else
newState
}
def unreachableMember(node: UniqueAddress): ClusterHeartbeatSenderState =
membershipChange(ring.copy(unreachable = ring.unreachable + node))
def reachableMember(node: UniqueAddress): ClusterHeartbeatSenderState =
membershipChange(ring.copy(unreachable = ring.unreachable - node))
private def membershipChange(newRing: HeartbeatNodeRing): ClusterHeartbeatSenderState = {
val oldReceivers = ring.myReceivers
val removedReceivers = oldReceivers -- newRing.myReceivers
var newUnreachable = unreachable
var adjustedOldReceiversNowUnreachable = oldReceiversNowUnreachable
removedReceivers foreach { a
if (failureDetector.isAvailable(a.address))
failureDetector remove a.address
else
newUnreachable += a
adjustedOldReceiversNowUnreachable += a
}
copy(newRing, newUnreachable)
copy(newRing, adjustedOldReceiversNowUnreachable)
}
def heartbeatRsp(from: UniqueAddress): ClusterHeartbeatSenderState =
if (activeReceivers(from)) {
failureDetector heartbeat from.address
if (unreachable(from)) {
if (oldReceiversNowUnreachable(from)) {
// back from unreachable, ok to stop heartbeating to it
if (!ring.myReceivers(from))
failureDetector remove from.address
copy(unreachable = unreachable - from)
copy(oldReceiversNowUnreachable = oldReceiversNowUnreachable - from)
} else this
} else this
@ -225,7 +241,11 @@ private[cluster] final case class ClusterHeartbeatSenderState(
*
* It is immutable, i.e. the methods return new instances.
*/
private[cluster] final case class HeartbeatNodeRing(selfAddress: UniqueAddress, nodes: Set[UniqueAddress], monitoredByNrOfMembers: Int) {
private[cluster] final case class HeartbeatNodeRing(
selfAddress: UniqueAddress,
nodes: Set[UniqueAddress],
unreachable: Set[UniqueAddress],
monitoredByNrOfMembers: Int) {
require(nodes contains selfAddress, s"nodes [${nodes.mkString(", ")}] must contain selfAddress [${selfAddress}]")
@ -249,14 +269,41 @@ private[cluster] final case class HeartbeatNodeRing(selfAddress: UniqueAddress,
/**
* The receivers to use from a specified sender.
*/
def receivers(sender: UniqueAddress): immutable.Set[UniqueAddress] =
def receivers(sender: UniqueAddress): Set[UniqueAddress] =
if (useAllAsReceivers)
nodeRing - sender
else {
val slice = nodeRing.from(sender).tail.take(monitoredByNrOfMembers)
if (slice.size < monitoredByNrOfMembers)
(slice ++ nodeRing.take(monitoredByNrOfMembers - slice.size))
else slice
// Pick nodes from the iterator until n nodes that are not unreachable have been selected.
// Intermediate unreachable nodes up to `monitoredByNrOfMembers` are also included in the result.
// The reason for not limiting it to strictly monitoredByNrOfMembers is that the leader must
// be able to continue its duties (e.g. removal of downed nodes) when many nodes are shutdown
// at the same time and nobody in the remaining cluster is monitoring some of the shutdown nodes.
// This was reported in issue #16624.
@tailrec def take(n: Int, iter: Iterator[UniqueAddress], acc: Set[UniqueAddress]): (Int, Set[UniqueAddress]) =
if (iter.isEmpty || n == 0) (n, acc)
else {
val next = iter.next()
val isUnreachable = unreachable(next)
if (isUnreachable && acc.size >= monitoredByNrOfMembers)
take(n, iter, acc) // skip the unreachable, since we have already picked `monitoredByNrOfMembers`
else if (isUnreachable)
take(n, iter, acc + next) // include the unreachable, but don't count it
else
take(n - 1, iter, acc + next) // include the reachable
}
val (remaining, slice1) = take(monitoredByNrOfMembers, nodeRing.from(sender).tail.iterator, Set.empty)
val slice =
if (remaining == 0)
slice1
else {
// wrap around
val (_, slice2) = take(remaining, nodeRing.to(sender).iterator.filterNot(_ == sender), slice1)
slice2
}
slice
}
/**
@ -267,6 +314,9 @@ private[cluster] final case class HeartbeatNodeRing(selfAddress: UniqueAddress,
/**
* Remove a node from the ring.
*/
def :-(node: UniqueAddress): HeartbeatNodeRing = if (nodes contains node) copy(nodes = nodes - node) else this
def :-(node: UniqueAddress): HeartbeatNodeRing =
if (nodes.contains(node) || unreachable.contains(node))
copy(nodes = nodes - node, unreachable = unreachable - node)
else this
}

View file

@ -0,0 +1,61 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import scala.concurrent.duration._
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import com.typesafe.config.ConfigFactory
object LeaderDowningAllOtherNodesMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
val fifth = role("fifth")
val sixth = role("sixth")
commonConfig(debugConfig(on = false).withFallback(
ConfigFactory.parseString("""
akka.cluster.failure-detector.monitored-by-nr-of-members = 2
akka.cluster.auto-down-unreachable-after = 1s
""")).
withFallback(MultiNodeClusterSpec.clusterConfig))
}
class LeaderDowningAllOtherNodesMultiJvmNode1 extends LeaderDowningAllOtherNodesSpec
class LeaderDowningAllOtherNodesMultiJvmNode2 extends LeaderDowningAllOtherNodesSpec
class LeaderDowningAllOtherNodesMultiJvmNode3 extends LeaderDowningAllOtherNodesSpec
class LeaderDowningAllOtherNodesMultiJvmNode4 extends LeaderDowningAllOtherNodesSpec
class LeaderDowningAllOtherNodesMultiJvmNode5 extends LeaderDowningAllOtherNodesSpec
class LeaderDowningAllOtherNodesMultiJvmNode6 extends LeaderDowningAllOtherNodesSpec
abstract class LeaderDowningAllOtherNodesSpec
extends MultiNodeSpec(LeaderDowningAllOtherNodesMultiJvmSpec)
with MultiNodeClusterSpec {
import LeaderDowningAllOtherNodesMultiJvmSpec._
import ClusterEvent._
"A cluster of 6 nodes with monitored-by-nr-of-members=2" must {
"setup" taggedAs LongRunningTest in {
// start some
awaitClusterUp(roles: _*)
enterBarrier("after-1")
}
"remove all shutdown nodes" taggedAs LongRunningTest in {
val others = roles.drop(1)
val shutdownAddresses = others.map(address).toSet
runOn(first) {
for (node others)
testConductor.exit(node, 0).await
}
enterBarrier("all-other-shutdown")
awaitMembersUp(numberOfMembers = 1, canNotBePartOfMemberRing = shutdownAddresses, 30.seconds)
}
}
}

View file

@ -52,8 +52,8 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with Matchers {
def emptyState: ClusterHeartbeatSenderState = emptyState(aa)
def emptyState(selfUniqueAddress: UniqueAddress) = ClusterHeartbeatSenderState(
ring = HeartbeatNodeRing(selfUniqueAddress, Set(selfUniqueAddress), monitoredByNrOfMembers = 3),
unreachable = Set.empty[UniqueAddress],
ring = HeartbeatNodeRing(selfUniqueAddress, Set(selfUniqueAddress), Set.empty, monitoredByNrOfMembers = 3),
oldReceiversNowUnreachable = Set.empty[UniqueAddress],
failureDetector = new DefaultFailureDetectorRegistry[Address](() new FailureDetectorStub))
def fd(state: ClusterHeartbeatSenderState, node: UniqueAddress): FailureDetectorStub =
@ -67,21 +67,25 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with Matchers {
}
"init with empty" in {
emptyState.init(Set.empty).activeReceivers should be(Set.empty)
emptyState.init(Set.empty, Set.empty).activeReceivers should be(Set.empty)
}
"init with self" in {
emptyState.init(Set(aa, bb, cc)).activeReceivers should be(Set(bb, cc))
emptyState.init(Set(aa, bb, cc), Set.empty).activeReceivers should be(Set(bb, cc))
}
"init without self" in {
emptyState.init(Set(bb, cc)).activeReceivers should be(Set(bb, cc))
emptyState.init(Set(bb, cc), Set.empty).activeReceivers should be(Set(bb, cc))
}
"use added members" in {
emptyState.addMember(bb).addMember(cc).activeReceivers should be(Set(bb, cc))
}
"use added members also when unreachable" in {
emptyState.addMember(bb).addMember(cc).unreachableMember(bb).activeReceivers should be(Set(bb, cc))
}
"not use removed members" in {
emptyState.addMember(bb).addMember(cc).removeMember(bb).activeReceivers should be(Set(cc))
}
@ -91,6 +95,12 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with Matchers {
emptyState.addMember(cc).addMember(dd).addMember(bb).addMember(ee).activeReceivers should be(Set(bb, cc, dd))
}
"use specified number of members + unreachable" in {
// they are sorted by the hash (uid) of the UniqueAddress
emptyState.addMember(cc).addMember(dd).addMember(bb).addMember(ee).unreachableMember(cc)
.activeReceivers should be(Set(bb, cc, dd, ee))
}
"update failure detector in active set" in {
val s1 = emptyState.addMember(bb).addMember(cc).addMember(dd)
val s2 = s1.heartbeatRsp(bb).heartbeatRsp(cc).heartbeatRsp(dd).heartbeatRsp(ee)
@ -150,7 +160,7 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with Matchers {
operation match {
case Add
if (node != selfUniqueAddress && !state.ring.nodes.contains(node)) {
val oldUnreachable = state.unreachable
val oldUnreachable = state.oldReceiversNowUnreachable
state = state.addMember(node)
// keep unreachable
(oldUnreachable -- state.activeReceivers) should be(Set.empty)
@ -160,7 +170,7 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with Matchers {
case Remove
if (node != selfUniqueAddress && state.ring.nodes.contains(node)) {
val oldUnreachable = state.unreachable
val oldUnreachable = state.oldReceiversNowUnreachable
state = state.removeMember(node)
// keep unreachable, unless it was the removed
if (oldUnreachable(node))
@ -179,17 +189,18 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with Matchers {
fd(state, node).markNodeAsUnavailable()
state.failureDetector.isMonitoring(node.address) should be(true)
state.failureDetector.isAvailable(node.address) should be(false)
state = state.unreachableMember(node)
}
case HeartbeatRsp
if (node != selfUniqueAddress && state.ring.nodes.contains(node)) {
val oldUnreachable = state.unreachable
val oldUnreachable = state.oldReceiversNowUnreachable
val oldReceivers = state.activeReceivers
val oldRingReceivers = state.ring.myReceivers
state = state.heartbeatRsp(node)
if (oldUnreachable(node))
state.unreachable should not contain (node)
state.oldReceiversNowUnreachable should not contain (node)
if (oldUnreachable(node) && !oldRingReceivers(node))
state.failureDetector.isMonitoring(node.address) should be(false)
@ -205,7 +216,8 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with Matchers {
}
} catch {
case e: Throwable
println(s"Failure context: i=$i, node=$node, op=$operation, unreachable=${state.unreachable}, " +
println(s"Failure context: i=$i, node=$node, op=$operation, " +
s"oldReceiversNowUnreachable=${state.oldReceiversNowUnreachable}, " +
s"ringReceivers=${state.ring.myReceivers}, ringNodes=${state.ring.nodes}")
throw e
}

View file

@ -18,7 +18,7 @@ class HeartbeatNodeRingPerfSpec extends WordSpec with Matchers {
def createHeartbeatNodeRingOfSize(size: Int): HeartbeatNodeRing = {
val nodes = (1 to size).map(n UniqueAddress(Address("akka.tcp", "sys", "node-" + n, 2552), n))
val selfAddress = nodes(size / 2)
HeartbeatNodeRing(selfAddress, nodes.toSet, 5)
HeartbeatNodeRing(selfAddress, nodes.toSet, Set.empty, 5)
}
val heartbeatNodeRing = createHeartbeatNodeRingOfSize(nodesSize)
@ -27,7 +27,7 @@ class HeartbeatNodeRingPerfSpec extends WordSpec with Matchers {
for (i 1 to times) thunk(ring)
def myReceivers(ring: HeartbeatNodeRing): Unit = {
val r = HeartbeatNodeRing(ring.selfAddress, ring.nodes, ring.monitoredByNrOfMembers)
val r = HeartbeatNodeRing(ring.selfAddress, ring.nodes, Set.empty, ring.monitoredByNrOfMembers)
r.myReceivers.isEmpty should be(false)
}

View file

@ -19,13 +19,14 @@ class HeartbeatNodeRingSpec extends WordSpec with Matchers {
val cc = UniqueAddress(Address("akka.tcp", "sys", "cc", 2552), 3)
val dd = UniqueAddress(Address("akka.tcp", "sys", "dd", 2552), 4)
val ee = UniqueAddress(Address("akka.tcp", "sys", "ee", 2552), 5)
val ff = UniqueAddress(Address("akka.tcp", "sys", "ff", 2552), 6)
val nodes = Set(aa, bb, cc, dd, ee)
val nodes = Set(aa, bb, cc, dd, ee, ff)
"A HashedNodeRing" must {
"pick specified number of nodes as receivers" in {
val ring = HeartbeatNodeRing(cc, nodes, 3)
val ring = HeartbeatNodeRing(cc, nodes, Set.empty, 3)
ring.myReceivers should be(ring.receivers(cc))
nodes foreach { n
@ -35,15 +36,27 @@ class HeartbeatNodeRingSpec extends WordSpec with Matchers {
}
}
"pick specified number of nodes + unreachable as receivers" in {
val ring = HeartbeatNodeRing(cc, nodes, unreachable = Set(aa, dd, ee), monitoredByNrOfMembers = 3)
ring.myReceivers should be(ring.receivers(cc))
ring.receivers(aa) should be(Set(bb, cc, dd, ff)) // unreachable ee skipped
ring.receivers(bb) should be(Set(cc, dd, ee, ff)) // unreachable aa skipped
ring.receivers(cc) should be(Set(dd, ee, ff, bb)) // unreachable aa skipped
ring.receivers(dd) should be(Set(ee, ff, aa, bb, cc))
ring.receivers(ee) should be(Set(ff, aa, bb, cc))
ring.receivers(ff) should be(Set(aa, bb, cc)) // unreachable dd and ee skipped
}
"pick all except own as receivers when less than total number of nodes" in {
val expected = Set(aa, bb, dd, ee)
HeartbeatNodeRing(cc, nodes, 4).myReceivers should be(expected)
HeartbeatNodeRing(cc, nodes, 5).myReceivers should be(expected)
HeartbeatNodeRing(cc, nodes, 6).myReceivers should be(expected)
val expected = Set(aa, bb, dd, ee, ff)
HeartbeatNodeRing(cc, nodes, Set.empty, 5).myReceivers should be(expected)
HeartbeatNodeRing(cc, nodes, Set.empty, 6).myReceivers should be(expected)
HeartbeatNodeRing(cc, nodes, Set.empty, 7).myReceivers should be(expected)
}
"pick none when alone" in {
val ring = HeartbeatNodeRing(cc, Set(cc), 3)
val ring = HeartbeatNodeRing(cc, Set(cc), Set.empty, 3)
ring.myReceivers should be(Set())
}