Fix shutdown/remove race as described by @rkuhn, see #2137
* Skip nodes removal * Ignore removed client when enter barrier * Change order of testConductor.shutdown and testConductor.removeNode
This commit is contained in:
parent
e7cf92e72a
commit
52f122107c
6 changed files with 8 additions and 9 deletions
|
|
@ -43,8 +43,8 @@ class ClientDowningNodeThatIsUnreachableSpec
|
||||||
testConductor.enter("all-up")
|
testConductor.enter("all-up")
|
||||||
|
|
||||||
// kill 'third' node
|
// kill 'third' node
|
||||||
testConductor.shutdown(third, 0)
|
|
||||||
testConductor.removeNode(third)
|
testConductor.removeNode(third)
|
||||||
|
testConductor.shutdown(third, 0)
|
||||||
|
|
||||||
// mark 'third' node as DOWN
|
// mark 'third' node as DOWN
|
||||||
cluster.down(thirdAddress)
|
cluster.down(thirdAddress)
|
||||||
|
|
|
||||||
|
|
@ -57,8 +57,8 @@ abstract class GossipingAccrualFailureDetectorSpec extends MultiNodeSpec(Gossipi
|
||||||
|
|
||||||
"mark node as 'unavailable' if a node in the cluster is shut down (and its heartbeats stops)" taggedAs LongRunningTest in {
|
"mark node as 'unavailable' if a node in the cluster is shut down (and its heartbeats stops)" taggedAs LongRunningTest in {
|
||||||
runOn(first) {
|
runOn(first) {
|
||||||
testConductor.shutdown(third, 0)
|
|
||||||
testConductor.removeNode(third)
|
testConductor.removeNode(third)
|
||||||
|
testConductor.shutdown(third, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
runOn(first, second) {
|
runOn(first, second) {
|
||||||
|
|
|
||||||
|
|
@ -51,8 +51,8 @@ class LeaderDowningNodeThatIsUnreachableSpec
|
||||||
testConductor.enter("all-up")
|
testConductor.enter("all-up")
|
||||||
|
|
||||||
// kill 'fourth' node
|
// kill 'fourth' node
|
||||||
testConductor.shutdown(fourth, 0)
|
|
||||||
testConductor.removeNode(fourth)
|
testConductor.removeNode(fourth)
|
||||||
|
testConductor.shutdown(fourth, 0)
|
||||||
testConductor.enter("down-fourth-node")
|
testConductor.enter("down-fourth-node")
|
||||||
|
|
||||||
// --- HERE THE LEADER SHOULD DETECT FAILURE AND AUTO-DOWN THE UNREACHABLE NODE ---
|
// --- HERE THE LEADER SHOULD DETECT FAILURE AND AUTO-DOWN THE UNREACHABLE NODE ---
|
||||||
|
|
@ -91,8 +91,8 @@ class LeaderDowningNodeThatIsUnreachableSpec
|
||||||
testConductor.enter("all-up")
|
testConductor.enter("all-up")
|
||||||
|
|
||||||
// kill 'second' node
|
// kill 'second' node
|
||||||
testConductor.shutdown(second, 0)
|
|
||||||
testConductor.removeNode(second)
|
testConductor.removeNode(second)
|
||||||
|
testConductor.shutdown(second, 0)
|
||||||
testConductor.enter("down-second-node")
|
testConductor.enter("down-second-node")
|
||||||
|
|
||||||
// --- HERE THE LEADER SHOULD DETECT FAILURE AND AUTO-DOWN THE UNREACHABLE NODE ---
|
// --- HERE THE LEADER SHOULD DETECT FAILURE AND AUTO-DOWN THE UNREACHABLE NODE ---
|
||||||
|
|
|
||||||
|
|
@ -65,8 +65,8 @@ abstract class LeaderElectionSpec extends MultiNodeSpec(LeaderElectionMultiJvmSp
|
||||||
|
|
||||||
case `controller` ⇒
|
case `controller` ⇒
|
||||||
testConductor.enter("before-shutdown")
|
testConductor.enter("before-shutdown")
|
||||||
testConductor.shutdown(leader, 0)
|
|
||||||
testConductor.removeNode(leader)
|
testConductor.removeNode(leader)
|
||||||
|
testConductor.shutdown(leader, 0)
|
||||||
testConductor.enter("after-shutdown", "after-down", "completed")
|
testConductor.enter("after-shutdown", "after-down", "completed")
|
||||||
|
|
||||||
case `leader` ⇒
|
case `leader` ⇒
|
||||||
|
|
|
||||||
|
|
@ -57,8 +57,8 @@ abstract class NodeShutdownSpec extends MultiNodeSpec(NodeShutdownMultiJvmSpec)
|
||||||
"become singleton cluster when one node is shutdown" taggedAs LongRunningTest in {
|
"become singleton cluster when one node is shutdown" taggedAs LongRunningTest in {
|
||||||
runOn(first) {
|
runOn(first) {
|
||||||
val secondAddress = node(second).address
|
val secondAddress = node(second).address
|
||||||
testConductor.shutdown(second, 0)
|
|
||||||
testConductor.removeNode(second)
|
testConductor.removeNode(second)
|
||||||
|
testConductor.shutdown(second, 0)
|
||||||
awaitUpConvergence(numberOfMembers = 1, canNotBePartOfMemberRing = Seq(secondAddress), 30.seconds)
|
awaitUpConvergence(numberOfMembers = 1, canNotBePartOfMemberRing = Seq(secondAddress), 30.seconds)
|
||||||
cluster.isSingletonCluster must be(true)
|
cluster.isSingletonCluster must be(true)
|
||||||
assertLeader(first)
|
assertLeader(first)
|
||||||
|
|
|
||||||
|
|
@ -444,7 +444,6 @@ private[akka] class Controller(private var initialParticipants: Int, controllerP
|
||||||
nodes(node).fsm forward ToClient(TerminateMsg(exitValueOrKill))
|
nodes(node).fsm forward ToClient(TerminateMsg(exitValueOrKill))
|
||||||
}
|
}
|
||||||
case Remove(node) ⇒
|
case Remove(node) ⇒
|
||||||
nodes -= node
|
|
||||||
barrier ! BarrierCoordinator.RemoveClient(node)
|
barrier ! BarrierCoordinator.RemoveClient(node)
|
||||||
}
|
}
|
||||||
case GetNodes ⇒ sender ! nodes.keys
|
case GetNodes ⇒ sender ! nodes.keys
|
||||||
|
|
@ -540,8 +539,8 @@ private[akka] class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoor
|
||||||
|
|
||||||
when(Waiting) {
|
when(Waiting) {
|
||||||
case Event(EnterBarrier(name), d @ Data(clients, barrier, arrived)) ⇒
|
case Event(EnterBarrier(name), d @ Data(clients, barrier, arrived)) ⇒
|
||||||
if (name != barrier || clients.find(_.fsm == sender).isEmpty) throw WrongBarrier(name, sender, d)
|
if (name != barrier) throw WrongBarrier(name, sender, d)
|
||||||
val together = sender :: arrived
|
val together = if (clients.find(_.fsm == sender).isDefined) sender :: arrived else arrived
|
||||||
handleBarrier(d.copy(arrived = together))
|
handleBarrier(d.copy(arrived = together))
|
||||||
case Event(RemoveClient(name), d @ Data(clients, barrier, arrived)) ⇒
|
case Event(RemoveClient(name), d @ Data(clients, barrier, arrived)) ⇒
|
||||||
clients find (_.name == name) match {
|
clients find (_.name == name) match {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue