From 5e83df74e9b100bdf80830bb5442cb490f2d927e Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 16 Oct 2012 17:02:13 +0200 Subject: [PATCH] Solve wrong barrier problem, see #2583 * The problem was that we didn't wait for the testconductor.shutdown Future to complete and therefore barriers could be triggered in unexpected order. The reason why we didn't await, was that during shutdown the Future was completed with client disconnected failure. I have fixed that and added await to all shutdowns. --- .../cluster/ClientDowningNodeThatIsUnreachableSpec.scala | 2 +- .../akka/cluster/ClusterAccrualFailureDetectorSpec.scala | 2 +- .../multi-jvm/scala/akka/cluster/ConvergenceSpec.scala | 2 +- .../multi-jvm/scala/akka/cluster/LargeClusterSpec.scala | 2 +- .../cluster/LeaderDowningNodeThatIsUnreachableSpec.scala | 4 ++-- .../scala/akka/cluster/LeaderElectionSpec.scala | 4 ++-- .../scala/akka/cluster/SingletonClusterSpec.scala | 2 +- .../main/scala/akka/remote/testconductor/Conductor.scala | 9 +++++++-- 8 files changed, 16 insertions(+), 11 deletions(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala index 49483d39ef..3efb891a3b 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala @@ -44,7 +44,7 @@ abstract class ClientDowningNodeThatIsUnreachableSpec(multiNodeConfig: ClientDow runOn(first) { // kill 'third' node - testConductor.shutdown(third, 0) + testConductor.shutdown(third, 0).await markNodeAsUnavailable(thirdAddress) // mark 'third' node as DOWN diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterAccrualFailureDetectorSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterAccrualFailureDetectorSpec.scala index 05253a16b3..552f90bd49 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterAccrualFailureDetectorSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterAccrualFailureDetectorSpec.scala @@ -46,7 +46,7 @@ abstract class ClusterAccrualFailureDetectorSpec "mark node as 'unavailable' if a node in the cluster is shut down (and its heartbeats stops)" taggedAs LongRunningTest in { runOn(first) { - testConductor.shutdown(third, 0) + testConductor.shutdown(third, 0).await } enterBarrier("third-shutdown") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala index c8125f32b1..b2a9453035 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala @@ -61,7 +61,7 @@ abstract class ConvergenceSpec(multiNodeConfig: ConvergenceMultiNodeConfig) runOn(first) { // kill 'third' node - testConductor.shutdown(third, 0) + testConductor.shutdown(third, 0).await markNodeAsUnavailable(thirdAddress) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala index 6465c5ead8..1ebb930df6 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala @@ -316,7 +316,7 @@ abstract class LargeClusterSpec } runOn(firstDatacenter) { - testConductor.shutdown(secondDatacenter, 0) + testConductor.shutdown(secondDatacenter, 0).await } enterBarrier("second-datacenter-shutdown") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala index bf026b43b3..134ed4d0d6 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala @@ -51,7 +51,7 @@ abstract class LeaderDowningNodeThatIsUnreachableSpec(multiNodeConfig: LeaderDow val fourthAddress = address(fourth) runOn(first) { // kill 'fourth' node - testConductor.shutdown(fourth, 0) + testConductor.shutdown(fourth, 0).await enterBarrier("down-fourth-node") // mark the node as unreachable in the failure detector @@ -81,7 +81,7 @@ abstract class LeaderDowningNodeThatIsUnreachableSpec(multiNodeConfig: LeaderDow enterBarrier("before-down-second-node") runOn(first) { // kill 'second' node - testConductor.shutdown(second, 0) + testConductor.shutdown(second, 0).await enterBarrier("down-second-node") // mark the node as unreachable in the failure detector diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala index ddc9a46ecb..fc8c4d2619 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala @@ -18,7 +18,7 @@ case class LeaderElectionMultiNodeConfig(failureDetectorPuppet: Boolean) extends val third = role("third") val fourth = role("fourth") - commonConfig(debugConfig(on = true).withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet))) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet))) } class LeaderElectionWithFailureDetectorPuppetMultiJvmNode1 extends LeaderElectionSpec(failureDetectorPuppet = true) @@ -70,7 +70,7 @@ abstract class LeaderElectionSpec(multiNodeConfig: LeaderElectionMultiNodeConfig case `controller` ⇒ val leaderAddress = address(leader) enterBarrier("before-shutdown" + n) - testConductor.shutdown(leader, 0) + testConductor.shutdown(leader, 0).await enterBarrier("after-shutdown" + n, "after-unavailable" + n, "after-down" + n, "completed" + n) case `leader` ⇒ diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala index 3575c5b6d4..291a59a44f 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala @@ -61,7 +61,7 @@ abstract class SingletonClusterSpec(multiNodeConfig: SingletonClusterMultiNodeCo "become singleton cluster when one node is shutdown" taggedAs LongRunningTest in { runOn(first) { val secondAddress = address(second) - testConductor.shutdown(second, 0) + testConductor.shutdown(second, 0).await markNodeAsUnavailable(secondAddress) diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala index bbccd3d0a5..eae82b6e6d 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit.MILLISECONDS import akka.util.{ Timeout } import scala.reflect.classTag import akka.ConfigurationException +import akka.AkkaException sealed trait Direction { def includes(other: Direction): Boolean @@ -207,7 +208,10 @@ trait Conductor { this: TestConductorExt ⇒ */ def shutdown(node: RoleName, exitValue: Int): Future[Done] = { import Settings.QueryTimeout - controller ? Terminate(node, exitValue) mapTo classTag[Done] + import system.dispatcher + // the recover is needed to handle ClientDisconnectedException exception, + // which is normal during shutdown + controller ? Terminate(node, exitValue) mapTo classTag[Done] recover { case _: ClientDisconnectedException ⇒ Done } } /** @@ -309,7 +313,7 @@ private[akka] class ServerFSM(val controller: ActorRef, val channel: Channel) ex whenUnhandled { case Event(ClientDisconnected, Some(s)) ⇒ - s ! Status.Failure(new RuntimeException("client disconnected in state " + stateName + ": " + channel)) + s ! Status.Failure(new ClientDisconnectedException("client disconnected in state " + stateName + ": " + channel)) stop() case Event(ClientDisconnected, None) ⇒ stop() } @@ -367,6 +371,7 @@ private[akka] class ServerFSM(val controller: ActorRef, val channel: Channel) ex */ private[akka] object Controller { case class ClientDisconnected(name: RoleName) + class ClientDisconnectedException(msg: String) extends AkkaException(msg) case object GetNodes case object GetSockAddr case class CreateServerFSM(channel: Channel)