diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala index 49483d39ef..3efb891a3b 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala @@ -44,7 +44,7 @@ abstract class ClientDowningNodeThatIsUnreachableSpec(multiNodeConfig: ClientDow runOn(first) { // kill 'third' node - testConductor.shutdown(third, 0) + testConductor.shutdown(third, 0).await markNodeAsUnavailable(thirdAddress) // mark 'third' node as DOWN diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterAccrualFailureDetectorSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterAccrualFailureDetectorSpec.scala index 05253a16b3..552f90bd49 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterAccrualFailureDetectorSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterAccrualFailureDetectorSpec.scala @@ -46,7 +46,7 @@ abstract class ClusterAccrualFailureDetectorSpec "mark node as 'unavailable' if a node in the cluster is shut down (and its heartbeats stops)" taggedAs LongRunningTest in { runOn(first) { - testConductor.shutdown(third, 0) + testConductor.shutdown(third, 0).await } enterBarrier("third-shutdown") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala index c8125f32b1..b2a9453035 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala @@ -61,7 +61,7 @@ abstract class ConvergenceSpec(multiNodeConfig: ConvergenceMultiNodeConfig) runOn(first) { // kill 'third' node - testConductor.shutdown(third, 0) + testConductor.shutdown(third, 0).await markNodeAsUnavailable(thirdAddress) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala index 6465c5ead8..1ebb930df6 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala @@ -316,7 +316,7 @@ abstract class LargeClusterSpec } runOn(firstDatacenter) { - testConductor.shutdown(secondDatacenter, 0) + testConductor.shutdown(secondDatacenter, 0).await } enterBarrier("second-datacenter-shutdown") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala index bf026b43b3..134ed4d0d6 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala @@ -51,7 +51,7 @@ abstract class LeaderDowningNodeThatIsUnreachableSpec(multiNodeConfig: LeaderDow val fourthAddress = address(fourth) runOn(first) { // kill 'fourth' node - testConductor.shutdown(fourth, 0) + testConductor.shutdown(fourth, 0).await enterBarrier("down-fourth-node") // mark the node as unreachable in the failure detector @@ -81,7 +81,7 @@ abstract class LeaderDowningNodeThatIsUnreachableSpec(multiNodeConfig: LeaderDow enterBarrier("before-down-second-node") runOn(first) { // kill 'second' node - testConductor.shutdown(second, 0) + testConductor.shutdown(second, 0).await enterBarrier("down-second-node") // mark the node as unreachable in the failure detector diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala index ddc9a46ecb..fc8c4d2619 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala @@ -18,7 +18,7 @@ case class LeaderElectionMultiNodeConfig(failureDetectorPuppet: Boolean) extends val third = role("third") val fourth = role("fourth") - commonConfig(debugConfig(on = true).withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet))) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet))) } class LeaderElectionWithFailureDetectorPuppetMultiJvmNode1 extends LeaderElectionSpec(failureDetectorPuppet = true) @@ -70,7 +70,7 @@ abstract class LeaderElectionSpec(multiNodeConfig: LeaderElectionMultiNodeConfig case `controller` ⇒ val leaderAddress = address(leader) enterBarrier("before-shutdown" + n) - testConductor.shutdown(leader, 0) + testConductor.shutdown(leader, 0).await enterBarrier("after-shutdown" + n, "after-unavailable" + n, "after-down" + n, "completed" + n) case `leader` ⇒ diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala index 3575c5b6d4..291a59a44f 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala @@ -61,7 +61,7 @@ abstract class SingletonClusterSpec(multiNodeConfig: SingletonClusterMultiNodeCo "become singleton cluster when one node is shutdown" taggedAs LongRunningTest in { runOn(first) { val secondAddress = address(second) - testConductor.shutdown(second, 0) + testConductor.shutdown(second, 0).await markNodeAsUnavailable(secondAddress) diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala index bbccd3d0a5..eae82b6e6d 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit.MILLISECONDS import akka.util.{ Timeout } import scala.reflect.classTag import akka.ConfigurationException +import akka.AkkaException sealed trait Direction { def includes(other: Direction): Boolean @@ -207,7 +208,10 @@ trait Conductor { this: TestConductorExt ⇒ */ def shutdown(node: RoleName, exitValue: Int): Future[Done] = { import Settings.QueryTimeout - controller ? Terminate(node, exitValue) mapTo classTag[Done] + import system.dispatcher + // the recover is needed to handle ClientDisconnectedException exception, + // which is normal during shutdown + controller ? Terminate(node, exitValue) mapTo classTag[Done] recover { case _: ClientDisconnectedException ⇒ Done } } /** @@ -309,7 +313,7 @@ private[akka] class ServerFSM(val controller: ActorRef, val channel: Channel) ex whenUnhandled { case Event(ClientDisconnected, Some(s)) ⇒ - s ! Status.Failure(new RuntimeException("client disconnected in state " + stateName + ": " + channel)) + s ! Status.Failure(new ClientDisconnectedException("client disconnected in state " + stateName + ": " + channel)) stop() case Event(ClientDisconnected, None) ⇒ stop() } @@ -367,6 +371,7 @@ private[akka] class ServerFSM(val controller: ActorRef, val channel: Channel) ex */ private[akka] object Controller { case class ClientDisconnected(name: RoleName) + class ClientDisconnectedException(msg: String) extends AkkaException(msg) case object GetNodes case object GetSockAddr case class CreateServerFSM(channel: Channel)