Solve wrong barrier problem, see #2583
* The problem was that we didn't wait for the testconductor.shutdown Future to complete and therefore barriers could be triggered in unexpected order. The reason why we didn't await, was that during shutdown the Future was completed with client disconnected failure. I have fixed that and added await to all shutdowns.
This commit is contained in:
parent
b68c7a8469
commit
5e83df74e9
8 changed files with 16 additions and 11 deletions
|
|
@ -44,7 +44,7 @@ abstract class ClientDowningNodeThatIsUnreachableSpec(multiNodeConfig: ClientDow
|
|||
|
||||
runOn(first) {
|
||||
// kill 'third' node
|
||||
testConductor.shutdown(third, 0)
|
||||
testConductor.shutdown(third, 0).await
|
||||
markNodeAsUnavailable(thirdAddress)
|
||||
|
||||
// mark 'third' node as DOWN
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@ abstract class ClusterAccrualFailureDetectorSpec
|
|||
|
||||
"mark node as 'unavailable' if a node in the cluster is shut down (and its heartbeats stops)" taggedAs LongRunningTest in {
|
||||
runOn(first) {
|
||||
testConductor.shutdown(third, 0)
|
||||
testConductor.shutdown(third, 0).await
|
||||
}
|
||||
|
||||
enterBarrier("third-shutdown")
|
||||
|
|
|
|||
|
|
@ -61,7 +61,7 @@ abstract class ConvergenceSpec(multiNodeConfig: ConvergenceMultiNodeConfig)
|
|||
|
||||
runOn(first) {
|
||||
// kill 'third' node
|
||||
testConductor.shutdown(third, 0)
|
||||
testConductor.shutdown(third, 0).await
|
||||
markNodeAsUnavailable(thirdAddress)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -316,7 +316,7 @@ abstract class LargeClusterSpec
|
|||
}
|
||||
|
||||
runOn(firstDatacenter) {
|
||||
testConductor.shutdown(secondDatacenter, 0)
|
||||
testConductor.shutdown(secondDatacenter, 0).await
|
||||
}
|
||||
|
||||
enterBarrier("second-datacenter-shutdown")
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ abstract class LeaderDowningNodeThatIsUnreachableSpec(multiNodeConfig: LeaderDow
|
|||
val fourthAddress = address(fourth)
|
||||
runOn(first) {
|
||||
// kill 'fourth' node
|
||||
testConductor.shutdown(fourth, 0)
|
||||
testConductor.shutdown(fourth, 0).await
|
||||
enterBarrier("down-fourth-node")
|
||||
|
||||
// mark the node as unreachable in the failure detector
|
||||
|
|
@ -81,7 +81,7 @@ abstract class LeaderDowningNodeThatIsUnreachableSpec(multiNodeConfig: LeaderDow
|
|||
enterBarrier("before-down-second-node")
|
||||
runOn(first) {
|
||||
// kill 'second' node
|
||||
testConductor.shutdown(second, 0)
|
||||
testConductor.shutdown(second, 0).await
|
||||
enterBarrier("down-second-node")
|
||||
|
||||
// mark the node as unreachable in the failure detector
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@ case class LeaderElectionMultiNodeConfig(failureDetectorPuppet: Boolean) extends
|
|||
val third = role("third")
|
||||
val fourth = role("fourth")
|
||||
|
||||
commonConfig(debugConfig(on = true).withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet)))
|
||||
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet)))
|
||||
}
|
||||
|
||||
class LeaderElectionWithFailureDetectorPuppetMultiJvmNode1 extends LeaderElectionSpec(failureDetectorPuppet = true)
|
||||
|
|
@ -70,7 +70,7 @@ abstract class LeaderElectionSpec(multiNodeConfig: LeaderElectionMultiNodeConfig
|
|||
case `controller` ⇒
|
||||
val leaderAddress = address(leader)
|
||||
enterBarrier("before-shutdown" + n)
|
||||
testConductor.shutdown(leader, 0)
|
||||
testConductor.shutdown(leader, 0).await
|
||||
enterBarrier("after-shutdown" + n, "after-unavailable" + n, "after-down" + n, "completed" + n)
|
||||
|
||||
case `leader` ⇒
|
||||
|
|
|
|||
|
|
@ -61,7 +61,7 @@ abstract class SingletonClusterSpec(multiNodeConfig: SingletonClusterMultiNodeCo
|
|||
"become singleton cluster when one node is shutdown" taggedAs LongRunningTest in {
|
||||
runOn(first) {
|
||||
val secondAddress = address(second)
|
||||
testConductor.shutdown(second, 0)
|
||||
testConductor.shutdown(second, 0).await
|
||||
|
||||
markNodeAsUnavailable(secondAddress)
|
||||
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit.MILLISECONDS
|
|||
import akka.util.{ Timeout }
|
||||
import scala.reflect.classTag
|
||||
import akka.ConfigurationException
|
||||
import akka.AkkaException
|
||||
|
||||
sealed trait Direction {
|
||||
def includes(other: Direction): Boolean
|
||||
|
|
@ -207,7 +208,10 @@ trait Conductor { this: TestConductorExt ⇒
|
|||
*/
|
||||
def shutdown(node: RoleName, exitValue: Int): Future[Done] = {
|
||||
import Settings.QueryTimeout
|
||||
controller ? Terminate(node, exitValue) mapTo classTag[Done]
|
||||
import system.dispatcher
|
||||
// the recover is needed to handle ClientDisconnectedException exception,
|
||||
// which is normal during shutdown
|
||||
controller ? Terminate(node, exitValue) mapTo classTag[Done] recover { case _: ClientDisconnectedException ⇒ Done }
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -309,7 +313,7 @@ private[akka] class ServerFSM(val controller: ActorRef, val channel: Channel) ex
|
|||
|
||||
whenUnhandled {
|
||||
case Event(ClientDisconnected, Some(s)) ⇒
|
||||
s ! Status.Failure(new RuntimeException("client disconnected in state " + stateName + ": " + channel))
|
||||
s ! Status.Failure(new ClientDisconnectedException("client disconnected in state " + stateName + ": " + channel))
|
||||
stop()
|
||||
case Event(ClientDisconnected, None) ⇒ stop()
|
||||
}
|
||||
|
|
@ -367,6 +371,7 @@ private[akka] class ServerFSM(val controller: ActorRef, val channel: Channel) ex
|
|||
*/
|
||||
private[akka] object Controller {
|
||||
case class ClientDisconnected(name: RoleName)
|
||||
class ClientDisconnectedException(msg: String) extends AkkaException(msg)
|
||||
case object GetNodes
|
||||
case object GetSockAddr
|
||||
case class CreateServerFSM(channel: Channel)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue