From 4d64901228d3c78558cde4f48a1df20d73a4e5d6 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Sat, 26 Dec 2015 11:30:18 +0100 Subject: [PATCH] =clu #19274 failure detection of joining/down member status * Failure detection heartbeating was not performed to joining nodes, since it was expected that they will become Up first. * If a joining node is downed before it is changed to Up failure detection will not be performed for that node. That resulted in the downed node will not be removed from membership, since the unreachability signal is used as confirmation that the node is actually stopped before removing it. --- .../scala/akka/cluster/ClusterHeartbeat.scala | 12 +- .../scala/akka/cluster/RestartNode3Spec.scala | 144 ++++++++++++++++++ .../scala/akka/cluster/RestartNodeSpec.scala | 2 +- 3 files changed, 150 insertions(+), 8 deletions(-) create mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNode3Spec.scala diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala index e7f58ab1e3..68b791b68f 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -110,25 +110,21 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg def active: Actor.Receive = { case HeartbeatTick ⇒ heartbeat() case HeartbeatRsp(from) ⇒ heartbeatRsp(from) - case MemberUp(m) ⇒ addMember(m) - case MemberWeaklyUp(m) ⇒ addMember(m) case MemberRemoved(m, _) ⇒ removeMember(m) + case evt: MemberEvent ⇒ addMember(evt.member) case UnreachableMember(m) ⇒ unreachableMember(m) case ReachableMember(m) ⇒ reachableMember(m) - case _: MemberEvent ⇒ // not interested in other types of MemberEvent case ExpectedFirstHeartbeat(from) ⇒ triggerFirstHeartbeat(from) } def init(snapshot: CurrentClusterState): Unit = { - val nodes: Set[UniqueAddress] = snapshot.members.collect { - case m if m.status == MemberStatus.Up || m.status == MemberStatus.WeaklyUp ⇒ m.uniqueAddress - }(collection.breakOut) + val nodes: Set[UniqueAddress] = snapshot.members.map(_.uniqueAddress) val unreachable: Set[UniqueAddress] = snapshot.unreachable.map(_.uniqueAddress) state = state.init(nodes, unreachable) } def addMember(m: Member): Unit = - if (m.uniqueAddress != selfUniqueAddress) + if (m.uniqueAddress != selfUniqueAddress && !state.contains(m.uniqueAddress)) state = state.addMember(m.uniqueAddress) def removeMember(m: Member): Unit = @@ -191,6 +187,8 @@ private[cluster] final case class ClusterHeartbeatSenderState( def init(nodes: Set[UniqueAddress], unreachable: Set[UniqueAddress]): ClusterHeartbeatSenderState = copy(ring = ring.copy(nodes = nodes + selfAddress, unreachable = unreachable)) + def contains(node: UniqueAddress): Boolean = ring.nodes(node) + def addMember(node: UniqueAddress): ClusterHeartbeatSenderState = membershipChange(ring :+ node) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNode3Spec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNode3Spec.scala new file mode 100644 index 0000000000..f1733ffd8d --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNode3Spec.scala @@ -0,0 +1,144 @@ +/** + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.cluster + +import scala.collection.immutable +import scala.concurrent.duration._ + +import akka.actor.Actor +import akka.actor.ActorSystem +import akka.actor.Address +import akka.actor.Deploy +import akka.actor.Props +import akka.actor.RootActorPath +import akka.cluster.MemberStatus._ +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.transport.ThrottlerTransportAdapter.Direction +import akka.testkit._ +import com.typesafe.config.ConfigFactory + +object RestartNode3MultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig(debugConfig(on = false). + withFallback(ConfigFactory.parseString("akka.cluster.auto-down-unreachable-after = off")). + withFallback(MultiNodeClusterSpec.clusterConfig)) + + testTransport(on = true) +} + +class RestartNode3MultiJvmNode1 extends RestartNode3Spec +class RestartNode3MultiJvmNode2 extends RestartNode3Spec +class RestartNode3MultiJvmNode3 extends RestartNode3Spec + +abstract class RestartNode3Spec + extends MultiNodeSpec(RestartNode3MultiJvmSpec) + with MultiNodeClusterSpec with ImplicitSender { + + import RestartNode3MultiJvmSpec._ + + @volatile var secondUniqueAddress: UniqueAddress = _ + + // use a separate ActorSystem, to be able to simulate restart + lazy val secondSystem = ActorSystem(system.name, system.settings.config) + + def seedNodes: immutable.IndexedSeq[Address] = Vector(first) + + lazy val restartedSecondSystem = ActorSystem(system.name, + ConfigFactory.parseString("akka.remote.netty.tcp.port=" + secondUniqueAddress.address.port.get). + withFallback(system.settings.config)) + + override def afterAll(): Unit = { + runOn(second) { + if (secondSystem.whenTerminated.isCompleted) + shutdown(restartedSecondSystem) + else + shutdown(secondSystem) + } + super.afterAll() + } + + override def expectedTestDuration = 2.minutes + + "Cluster nodes" must { + "be able to restart and join again when Down before Up" taggedAs LongRunningTest in within(60.seconds) { + // secondSystem is a separate ActorSystem, to be able to simulate restart + // we must transfer its address to first + runOn(first, third) { + system.actorOf(Props(new Actor { + def receive = { + case a: UniqueAddress ⇒ + secondUniqueAddress = a + sender() ! "ok" + } + }).withDeploy(Deploy.local), name = "address-receiver") + enterBarrier("second-address-receiver-ready") + } + + runOn(second) { + enterBarrier("second-address-receiver-ready") + secondUniqueAddress = Cluster(secondSystem).selfUniqueAddress + List(first, third) foreach { r ⇒ + system.actorSelection(RootActorPath(r) / "user" / "address-receiver") ! secondUniqueAddress + expectMsg(5.seconds, "ok") + } + } + enterBarrier("second-address-transfered") + + // now we can join first, third together + runOn(first, third) { + cluster.joinSeedNodes(seedNodes) + awaitMembersUp(2) + } + enterBarrier("first-third-up") + + // make third unreachable, so that leader can't perform its duties + runOn(first) { + testConductor.blackhole(first, third, Direction.Both).await + val thirdAddress = address(third) + awaitAssert(clusterView.unreachableMembers.map(_.address) should ===(Set(thirdAddress))) + } + enterBarrier("third-unreachable") + + runOn(second) { + Cluster(secondSystem).joinSeedNodes(seedNodes) + awaitAssert(Cluster(secondSystem).readView.members.size should ===(3)) + awaitAssert(Cluster(secondSystem).readView.members.collectFirst { + case m if m.address == Cluster(secondSystem).selfAddress ⇒ m.status + } should ===(Some(Joining))) + } + enterBarrier("second-joined") + + // shutdown secondSystem + runOn(second) { + shutdown(secondSystem, remaining) + } + enterBarrier("second-shutdown") + + // then immediately start restartedSecondSystem, which has the same address as secondSystem + runOn(first) { + testConductor.passThrough(first, third, Direction.Both).await + } + runOn(second) { + Cluster(restartedSecondSystem).joinSeedNodes(seedNodes) + awaitAssert(Cluster(restartedSecondSystem).readView.members.size should ===(3)) + awaitAssert(Cluster(restartedSecondSystem).readView.members.map(_.status) should ===(Set(Up))) + } + runOn(first, third) { + awaitAssert { + Cluster(system).readView.members.size should ===(3) + Cluster(system).readView.members.exists { m ⇒ + m.address == secondUniqueAddress.address && m.uniqueAddress.uid != secondUniqueAddress.uid + } + } + } + enterBarrier("second-restarted") + + } + + } +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNodeSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNodeSpec.scala index 2e43e4ce19..48aea86078 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNodeSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNodeSpec.scala @@ -52,7 +52,7 @@ abstract class RestartNodeSpec override def afterAll(): Unit = { runOn(second) { - if (secondSystem.isTerminated) + if (secondSystem.whenTerminated.isCompleted) shutdown(restartedSecondSystem) else shutdown(secondSystem)