=clu #19274 failure detection of joining/down member status

* Failure detection heartbeating was not performed to joining
  nodes, since it was expected that they will become Up first.
* If a joining node is downed before it is changed to Up failure
  detection will not be performed for that node. That resulted in
  the downed node will not be removed from membership, since the
  unreachability signal is used as confirmation that the node is
  actually stopped before removing it.
This commit is contained in:
Patrik Nordwall 2015-12-26 11:30:18 +01:00
parent 290f402b79
commit 4d64901228
3 changed files with 150 additions and 8 deletions

View file

@ -110,25 +110,21 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
def active: Actor.Receive = {
case HeartbeatTick heartbeat()
case HeartbeatRsp(from) heartbeatRsp(from)
case MemberUp(m) addMember(m)
case MemberWeaklyUp(m) addMember(m)
case MemberRemoved(m, _) removeMember(m)
case evt: MemberEvent addMember(evt.member)
case UnreachableMember(m) unreachableMember(m)
case ReachableMember(m) reachableMember(m)
case _: MemberEvent // not interested in other types of MemberEvent
case ExpectedFirstHeartbeat(from) triggerFirstHeartbeat(from)
}
def init(snapshot: CurrentClusterState): Unit = {
val nodes: Set[UniqueAddress] = snapshot.members.collect {
case m if m.status == MemberStatus.Up || m.status == MemberStatus.WeaklyUp m.uniqueAddress
}(collection.breakOut)
val nodes: Set[UniqueAddress] = snapshot.members.map(_.uniqueAddress)
val unreachable: Set[UniqueAddress] = snapshot.unreachable.map(_.uniqueAddress)
state = state.init(nodes, unreachable)
}
def addMember(m: Member): Unit =
if (m.uniqueAddress != selfUniqueAddress)
if (m.uniqueAddress != selfUniqueAddress && !state.contains(m.uniqueAddress))
state = state.addMember(m.uniqueAddress)
def removeMember(m: Member): Unit =
@ -191,6 +187,8 @@ private[cluster] final case class ClusterHeartbeatSenderState(
def init(nodes: Set[UniqueAddress], unreachable: Set[UniqueAddress]): ClusterHeartbeatSenderState =
copy(ring = ring.copy(nodes = nodes + selfAddress, unreachable = unreachable))
def contains(node: UniqueAddress): Boolean = ring.nodes(node)
def addMember(node: UniqueAddress): ClusterHeartbeatSenderState =
membershipChange(ring :+ node)

View file

@ -0,0 +1,144 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import scala.collection.immutable
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Address
import akka.actor.Deploy
import akka.actor.Props
import akka.actor.RootActorPath
import akka.cluster.MemberStatus._
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.transport.ThrottlerTransportAdapter.Direction
import akka.testkit._
import com.typesafe.config.ConfigFactory
object RestartNode3MultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
commonConfig(debugConfig(on = false).
withFallback(ConfigFactory.parseString("akka.cluster.auto-down-unreachable-after = off")).
withFallback(MultiNodeClusterSpec.clusterConfig))
testTransport(on = true)
}
class RestartNode3MultiJvmNode1 extends RestartNode3Spec
class RestartNode3MultiJvmNode2 extends RestartNode3Spec
class RestartNode3MultiJvmNode3 extends RestartNode3Spec
abstract class RestartNode3Spec
extends MultiNodeSpec(RestartNode3MultiJvmSpec)
with MultiNodeClusterSpec with ImplicitSender {
import RestartNode3MultiJvmSpec._
@volatile var secondUniqueAddress: UniqueAddress = _
// use a separate ActorSystem, to be able to simulate restart
lazy val secondSystem = ActorSystem(system.name, system.settings.config)
def seedNodes: immutable.IndexedSeq[Address] = Vector(first)
lazy val restartedSecondSystem = ActorSystem(system.name,
ConfigFactory.parseString("akka.remote.netty.tcp.port=" + secondUniqueAddress.address.port.get).
withFallback(system.settings.config))
override def afterAll(): Unit = {
runOn(second) {
if (secondSystem.whenTerminated.isCompleted)
shutdown(restartedSecondSystem)
else
shutdown(secondSystem)
}
super.afterAll()
}
override def expectedTestDuration = 2.minutes
"Cluster nodes" must {
"be able to restart and join again when Down before Up" taggedAs LongRunningTest in within(60.seconds) {
// secondSystem is a separate ActorSystem, to be able to simulate restart
// we must transfer its address to first
runOn(first, third) {
system.actorOf(Props(new Actor {
def receive = {
case a: UniqueAddress
secondUniqueAddress = a
sender() ! "ok"
}
}).withDeploy(Deploy.local), name = "address-receiver")
enterBarrier("second-address-receiver-ready")
}
runOn(second) {
enterBarrier("second-address-receiver-ready")
secondUniqueAddress = Cluster(secondSystem).selfUniqueAddress
List(first, third) foreach { r
system.actorSelection(RootActorPath(r) / "user" / "address-receiver") ! secondUniqueAddress
expectMsg(5.seconds, "ok")
}
}
enterBarrier("second-address-transfered")
// now we can join first, third together
runOn(first, third) {
cluster.joinSeedNodes(seedNodes)
awaitMembersUp(2)
}
enterBarrier("first-third-up")
// make third unreachable, so that leader can't perform its duties
runOn(first) {
testConductor.blackhole(first, third, Direction.Both).await
val thirdAddress = address(third)
awaitAssert(clusterView.unreachableMembers.map(_.address) should ===(Set(thirdAddress)))
}
enterBarrier("third-unreachable")
runOn(second) {
Cluster(secondSystem).joinSeedNodes(seedNodes)
awaitAssert(Cluster(secondSystem).readView.members.size should ===(3))
awaitAssert(Cluster(secondSystem).readView.members.collectFirst {
case m if m.address == Cluster(secondSystem).selfAddress m.status
} should ===(Some(Joining)))
}
enterBarrier("second-joined")
// shutdown secondSystem
runOn(second) {
shutdown(secondSystem, remaining)
}
enterBarrier("second-shutdown")
// then immediately start restartedSecondSystem, which has the same address as secondSystem
runOn(first) {
testConductor.passThrough(first, third, Direction.Both).await
}
runOn(second) {
Cluster(restartedSecondSystem).joinSeedNodes(seedNodes)
awaitAssert(Cluster(restartedSecondSystem).readView.members.size should ===(3))
awaitAssert(Cluster(restartedSecondSystem).readView.members.map(_.status) should ===(Set(Up)))
}
runOn(first, third) {
awaitAssert {
Cluster(system).readView.members.size should ===(3)
Cluster(system).readView.members.exists { m
m.address == secondUniqueAddress.address && m.uniqueAddress.uid != secondUniqueAddress.uid
}
}
}
enterBarrier("second-restarted")
}
}
}

View file

@ -52,7 +52,7 @@ abstract class RestartNodeSpec
override def afterAll(): Unit = {
runOn(second) {
if (secondSystem.isTerminated)
if (secondSystem.whenTerminated.isCompleted)
shutdown(restartedSecondSystem)
else
shutdown(secondSystem)