Merge pull request #19295 from akka/wip-19274-down-joining-patrikw
=clu #19274 failure detection of joining/down member status
This commit is contained in:
commit
ebbd9c4f49
3 changed files with 150 additions and 8 deletions
|
|
@ -110,25 +110,21 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
|
|||
def active: Actor.Receive = {
|
||||
case HeartbeatTick ⇒ heartbeat()
|
||||
case HeartbeatRsp(from) ⇒ heartbeatRsp(from)
|
||||
case MemberUp(m) ⇒ addMember(m)
|
||||
case MemberWeaklyUp(m) ⇒ addMember(m)
|
||||
case MemberRemoved(m, _) ⇒ removeMember(m)
|
||||
case evt: MemberEvent ⇒ addMember(evt.member)
|
||||
case UnreachableMember(m) ⇒ unreachableMember(m)
|
||||
case ReachableMember(m) ⇒ reachableMember(m)
|
||||
case _: MemberEvent ⇒ // not interested in other types of MemberEvent
|
||||
case ExpectedFirstHeartbeat(from) ⇒ triggerFirstHeartbeat(from)
|
||||
}
|
||||
|
||||
def init(snapshot: CurrentClusterState): Unit = {
|
||||
val nodes: Set[UniqueAddress] = snapshot.members.collect {
|
||||
case m if m.status == MemberStatus.Up || m.status == MemberStatus.WeaklyUp ⇒ m.uniqueAddress
|
||||
}(collection.breakOut)
|
||||
val nodes: Set[UniqueAddress] = snapshot.members.map(_.uniqueAddress)
|
||||
val unreachable: Set[UniqueAddress] = snapshot.unreachable.map(_.uniqueAddress)
|
||||
state = state.init(nodes, unreachable)
|
||||
}
|
||||
|
||||
def addMember(m: Member): Unit =
|
||||
if (m.uniqueAddress != selfUniqueAddress)
|
||||
if (m.uniqueAddress != selfUniqueAddress && !state.contains(m.uniqueAddress))
|
||||
state = state.addMember(m.uniqueAddress)
|
||||
|
||||
def removeMember(m: Member): Unit =
|
||||
|
|
@ -191,6 +187,8 @@ private[cluster] final case class ClusterHeartbeatSenderState(
|
|||
def init(nodes: Set[UniqueAddress], unreachable: Set[UniqueAddress]): ClusterHeartbeatSenderState =
|
||||
copy(ring = ring.copy(nodes = nodes + selfAddress, unreachable = unreachable))
|
||||
|
||||
def contains(node: UniqueAddress): Boolean = ring.nodes(node)
|
||||
|
||||
def addMember(node: UniqueAddress): ClusterHeartbeatSenderState =
|
||||
membershipChange(ring :+ node)
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,144 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.cluster
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import akka.actor.Actor
|
||||
import akka.actor.ActorSystem
|
||||
import akka.actor.Address
|
||||
import akka.actor.Deploy
|
||||
import akka.actor.Props
|
||||
import akka.actor.RootActorPath
|
||||
import akka.cluster.MemberStatus._
|
||||
import akka.remote.testkit.MultiNodeConfig
|
||||
import akka.remote.testkit.MultiNodeSpec
|
||||
import akka.remote.transport.ThrottlerTransportAdapter.Direction
|
||||
import akka.testkit._
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
object RestartNode3MultiJvmSpec extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
val second = role("second")
|
||||
val third = role("third")
|
||||
|
||||
commonConfig(debugConfig(on = false).
|
||||
withFallback(ConfigFactory.parseString("akka.cluster.auto-down-unreachable-after = off")).
|
||||
withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
|
||||
testTransport(on = true)
|
||||
}
|
||||
|
||||
class RestartNode3MultiJvmNode1 extends RestartNode3Spec
|
||||
class RestartNode3MultiJvmNode2 extends RestartNode3Spec
|
||||
class RestartNode3MultiJvmNode3 extends RestartNode3Spec
|
||||
|
||||
abstract class RestartNode3Spec
|
||||
extends MultiNodeSpec(RestartNode3MultiJvmSpec)
|
||||
with MultiNodeClusterSpec with ImplicitSender {
|
||||
|
||||
import RestartNode3MultiJvmSpec._
|
||||
|
||||
@volatile var secondUniqueAddress: UniqueAddress = _
|
||||
|
||||
// use a separate ActorSystem, to be able to simulate restart
|
||||
lazy val secondSystem = ActorSystem(system.name, system.settings.config)
|
||||
|
||||
def seedNodes: immutable.IndexedSeq[Address] = Vector(first)
|
||||
|
||||
lazy val restartedSecondSystem = ActorSystem(system.name,
|
||||
ConfigFactory.parseString("akka.remote.netty.tcp.port=" + secondUniqueAddress.address.port.get).
|
||||
withFallback(system.settings.config))
|
||||
|
||||
override def afterAll(): Unit = {
|
||||
runOn(second) {
|
||||
if (secondSystem.whenTerminated.isCompleted)
|
||||
shutdown(restartedSecondSystem)
|
||||
else
|
||||
shutdown(secondSystem)
|
||||
}
|
||||
super.afterAll()
|
||||
}
|
||||
|
||||
override def expectedTestDuration = 2.minutes
|
||||
|
||||
"Cluster nodes" must {
|
||||
"be able to restart and join again when Down before Up" taggedAs LongRunningTest in within(60.seconds) {
|
||||
// secondSystem is a separate ActorSystem, to be able to simulate restart
|
||||
// we must transfer its address to first
|
||||
runOn(first, third) {
|
||||
system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case a: UniqueAddress ⇒
|
||||
secondUniqueAddress = a
|
||||
sender() ! "ok"
|
||||
}
|
||||
}).withDeploy(Deploy.local), name = "address-receiver")
|
||||
enterBarrier("second-address-receiver-ready")
|
||||
}
|
||||
|
||||
runOn(second) {
|
||||
enterBarrier("second-address-receiver-ready")
|
||||
secondUniqueAddress = Cluster(secondSystem).selfUniqueAddress
|
||||
List(first, third) foreach { r ⇒
|
||||
system.actorSelection(RootActorPath(r) / "user" / "address-receiver") ! secondUniqueAddress
|
||||
expectMsg(5.seconds, "ok")
|
||||
}
|
||||
}
|
||||
enterBarrier("second-address-transfered")
|
||||
|
||||
// now we can join first, third together
|
||||
runOn(first, third) {
|
||||
cluster.joinSeedNodes(seedNodes)
|
||||
awaitMembersUp(2)
|
||||
}
|
||||
enterBarrier("first-third-up")
|
||||
|
||||
// make third unreachable, so that leader can't perform its duties
|
||||
runOn(first) {
|
||||
testConductor.blackhole(first, third, Direction.Both).await
|
||||
val thirdAddress = address(third)
|
||||
awaitAssert(clusterView.unreachableMembers.map(_.address) should ===(Set(thirdAddress)))
|
||||
}
|
||||
enterBarrier("third-unreachable")
|
||||
|
||||
runOn(second) {
|
||||
Cluster(secondSystem).joinSeedNodes(seedNodes)
|
||||
awaitAssert(Cluster(secondSystem).readView.members.size should ===(3))
|
||||
awaitAssert(Cluster(secondSystem).readView.members.collectFirst {
|
||||
case m if m.address == Cluster(secondSystem).selfAddress ⇒ m.status
|
||||
} should ===(Some(Joining)))
|
||||
}
|
||||
enterBarrier("second-joined")
|
||||
|
||||
// shutdown secondSystem
|
||||
runOn(second) {
|
||||
shutdown(secondSystem, remaining)
|
||||
}
|
||||
enterBarrier("second-shutdown")
|
||||
|
||||
// then immediately start restartedSecondSystem, which has the same address as secondSystem
|
||||
runOn(first) {
|
||||
testConductor.passThrough(first, third, Direction.Both).await
|
||||
}
|
||||
runOn(second) {
|
||||
Cluster(restartedSecondSystem).joinSeedNodes(seedNodes)
|
||||
awaitAssert(Cluster(restartedSecondSystem).readView.members.size should ===(3))
|
||||
awaitAssert(Cluster(restartedSecondSystem).readView.members.map(_.status) should ===(Set(Up)))
|
||||
}
|
||||
runOn(first, third) {
|
||||
awaitAssert {
|
||||
Cluster(system).readView.members.size should ===(3)
|
||||
Cluster(system).readView.members.exists { m ⇒
|
||||
m.address == secondUniqueAddress.address && m.uniqueAddress.uid != secondUniqueAddress.uid
|
||||
}
|
||||
}
|
||||
}
|
||||
enterBarrier("second-restarted")
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -52,7 +52,7 @@ abstract class RestartNodeSpec
|
|||
|
||||
override def afterAll(): Unit = {
|
||||
runOn(second) {
|
||||
if (secondSystem.isTerminated)
|
||||
if (secondSystem.whenTerminated.isCompleted)
|
||||
shutdown(restartedSecondSystem)
|
||||
else
|
||||
shutdown(secondSystem)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue