diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala index 687f49faaa..8988953d3a 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala @@ -45,7 +45,7 @@ abstract class ClientDowningNodeThatIsUnreachableSpec(multiNodeConfig: ClientDow runOn(first) { // kill 'third' node - testConductor.shutdown(third, 0).await + testConductor.exit(third, 0).await markNodeAsUnavailable(thirdAddress) // mark 'third' node as DOWN diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterAccrualFailureDetectorSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterAccrualFailureDetectorSpec.scala index 3ef3beddcc..2777d1b300 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterAccrualFailureDetectorSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterAccrualFailureDetectorSpec.scala @@ -46,7 +46,7 @@ abstract class ClusterAccrualFailureDetectorSpec "mark node as 'unavailable' if a node in the cluster is shut down (and its heartbeats stops)" taggedAs LongRunningTest in { runOn(first) { - testConductor.shutdown(third, 0).await + testConductor.exit(third, 0).await } enterBarrier("third-shutdown") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala index 6ccf9b4557..8483313589 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala @@ -235,7 +235,6 @@ abstract class ClusterDeathWatchSpec enterBarrier("first-unavailable") - system.shutdown() val timeout = remaining try system.awaitTermination(timeout) catch { case _: TimeoutException ⇒ @@ -266,7 +265,7 @@ abstract class ClusterDeathWatchSpec // don't end the test until the fourth is done runOn(first) { // fourth system will be shutdown, remove to not participate in barriers any more - testConductor.removeNode(fourth) + testConductor.shutdown(fourth).await expectMsg(EndActor.End) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala index 327a5cb52f..60b9af29c6 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala @@ -61,7 +61,7 @@ abstract class ConvergenceSpec(multiNodeConfig: ConvergenceMultiNodeConfig) runOn(first) { // kill 'third' node - testConductor.shutdown(third, 0).await + testConductor.exit(third, 0).await markNodeAsUnavailable(thirdAddress) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala index d5616d07ad..b7e7ed3fbf 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala @@ -52,7 +52,7 @@ abstract class LeaderDowningNodeThatIsUnreachableSpec(multiNodeConfig: LeaderDow val fourthAddress = address(fourth) runOn(first) { // kill 'fourth' node - testConductor.shutdown(fourth, 0).await + testConductor.exit(fourth, 0).await enterBarrier("down-fourth-node") // mark the node as unreachable in the failure detector @@ -82,7 +82,7 @@ abstract class LeaderDowningNodeThatIsUnreachableSpec(multiNodeConfig: LeaderDow enterBarrier("before-down-second-node") runOn(first) { // kill 'second' node - testConductor.shutdown(second, 0).await + testConductor.exit(second, 0).await enterBarrier("down-second-node") // mark the node as unreachable in the failure detector diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala index 51787df18c..4f0f2fea3f 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala @@ -71,7 +71,7 @@ abstract class LeaderElectionSpec(multiNodeConfig: LeaderElectionMultiNodeConfig case `controller` ⇒ val leaderAddress = address(leader) enterBarrier("before-shutdown" + n) - testConductor.shutdown(leader, 0).await + testConductor.exit(leader, 0).await enterBarrier("after-shutdown" + n, "after-unavailable" + n, "after-down" + n, "completed" + n) case `leader` ⇒ diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MBeanSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MBeanSpec.scala index 9b2959702e..e755051a55 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MBeanSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MBeanSpec.scala @@ -98,7 +98,7 @@ abstract class MBeanSpec "support down" taggedAs LongRunningTest in within(20 seconds) { val fourthAddress = address(fourth) runOn(first) { - testConductor.shutdown(fourth, 0).await + testConductor.exit(fourth, 0).await } enterBarrier("fourth-shutdown") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala index bfbf1f3ad4..81962d8bfc 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala @@ -65,7 +65,7 @@ abstract class SingletonClusterSpec(multiNodeConfig: SingletonClusterMultiNodeCo "become singleton cluster when one node is shutdown" taggedAs LongRunningTest in { runOn(first) { val secondAddress = address(second) - testConductor.shutdown(second, 0).await + testConductor.exit(second, 0).await markNodeAsUnavailable(secondAddress) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala index f05eb51f35..c52ff081e7 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala @@ -795,7 +795,7 @@ abstract class StressSpec runOn(roles.head) { if (shutdown) { log.info("Shutting down [{}]", removeAddress) - testConductor.shutdown(removeRole, 0).await + testConductor.exit(removeRole, 0).await } } awaitMembersUp(currentRoles.size, timeout = remaining) @@ -828,7 +828,7 @@ abstract class StressSpec runOn(roles.head) { if (shutdown) removeRoles.foreach { r ⇒ log.info("Shutting down [{}]", address(r)) - testConductor.shutdown(r, 0).await + testConductor.exit(r, 0).await } } awaitMembersUp(currentRoles.size, timeout = remaining) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeJoinsAgainSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeJoinsAgainSpec.scala index 673afbcd1c..3b723708b9 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeJoinsAgainSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeJoinsAgainSpec.scala @@ -160,12 +160,11 @@ abstract class UnreachableNodeJoinsAgainSpec runOn(first) { // will shutdown ActorSystem of victim - testConductor.removeNode(victim) + testConductor.shutdown(victim) } runOn(victim) { val victimAddress = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress - system.shutdown() system.awaitTermination(10 seconds) Thread.sleep(5000) // create new ActorSystem with same host:port diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterClientSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterClientSpec.scala index 443c47ead3..2b7b276aff 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterClientSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterClientSpec.scala @@ -177,7 +177,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod case Some(r) ⇒ r case None ⇒ fail("unexpected missing roleName: " + lastSender.path.address) } - testConductor.shutdown(receptionistRoleName, 0).await + testConductor.exit(receptionistRoleName, 0).await awaitAssert { c ! ClusterClient.Send("/user/service2", "hi again", localAffinity = true) expectMsg(1 second, "ack") diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerChaosSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerChaosSpec.scala index 9b745e60c4..0601e62d48 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerChaosSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerChaosSpec.scala @@ -100,7 +100,7 @@ class ClusterSingletonManagerChaosSpec extends MultiNodeSpec(ClusterSingletonMan runOn(controller) { roles foreach { r ⇒ log.info("Shutdown [{}]", node(r).address) - testConductor.shutdown(r, 0).await + testConductor.exit(r, 0).await } } } diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala index cc56bd410b..5973a968d4 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala @@ -281,7 +281,7 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS expectMsg(ResetOk) roles foreach { r ⇒ log.info("Shutdown [{}]", node(r).address) - testConductor.shutdown(r, 0).await + testConductor.exit(r, 0).await } } } diff --git a/akka-multi-node-testkit/src/main/java/akka/remote/testconductor/TestConductorProtocol.java b/akka-multi-node-testkit/src/main/java/akka/remote/testconductor/TestConductorProtocol.java index bd8de8a052..978e4c9fa4 100644 --- a/akka-multi-node-testkit/src/main/java/akka/remote/testconductor/TestConductorProtocol.java +++ b/akka-multi-node-testkit/src/main/java/akka/remote/testconductor/TestConductorProtocol.java @@ -88,13 +88,15 @@ public final class TestConductorProtocol { Throttle(0, 1), Disconnect(1, 2), Abort(2, 3), - Shutdown(3, 4), + Exit(3, 4), + Shutdown(4, 5), ; public static final int Throttle_VALUE = 1; public static final int Disconnect_VALUE = 2; public static final int Abort_VALUE = 3; - public static final int Shutdown_VALUE = 4; + public static final int Exit_VALUE = 4; + public static final int Shutdown_VALUE = 5; public final int getNumber() { return value; } @@ -104,7 +106,8 @@ public final class TestConductorProtocol { case 1: return Throttle; case 2: return Disconnect; case 3: return Abort; - case 4: return Shutdown; + case 4: return Exit; + case 5: return Shutdown; default: return null; } } @@ -135,7 +138,7 @@ public final class TestConductorProtocol { } private static final FailType[] VALUES = { - Throttle, Disconnect, Abort, Shutdown, + Throttle, Disconnect, Abort, Exit, Shutdown, }; public static FailType valueOf( @@ -4215,11 +4218,11 @@ public final class TestConductorProtocol { "ion\030\002 \001(\0162\n.Direction\022\031\n\007address\030\003 \001(\0132\010" + ".Address\022\020\n\010rateMBit\030\006 \001(\002\022\021\n\texitValue\030" + "\007 \001(\005*;\n\tBarrierOp\022\t\n\005Enter\020\001\022\010\n\004Fail\020\002\022" + - "\r\n\tSucceeded\020\003\022\n\n\006Failed\020\004*A\n\010FailType\022\014" + + "\r\n\tSucceeded\020\003\022\n\n\006Failed\020\004*K\n\010FailType\022\014" + "\n\010Throttle\020\001\022\016\n\nDisconnect\020\002\022\t\n\005Abort\020\003\022" + - "\014\n\010Shutdown\020\004*,\n\tDirection\022\010\n\004Send\020\001\022\013\n\007" + - "Receive\020\002\022\010\n\004Both\020\003B\035\n\031akka.remote.testc" + - "onductorH\001" + "\010\n\004Exit\020\004\022\014\n\010Shutdown\020\005*,\n\tDirection\022\010\n\004" + + "Send\020\001\022\013\n\007Receive\020\002\022\010\n\004Both\020\003B\035\n\031akka.re" + + "mote.testconductorH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { diff --git a/akka-multi-node-testkit/src/main/protocol/TestConductorProtocol.proto b/akka-multi-node-testkit/src/main/protocol/TestConductorProtocol.proto index c993d6d245..dadb4ecdec 100644 --- a/akka-multi-node-testkit/src/main/protocol/TestConductorProtocol.proto +++ b/akka-multi-node-testkit/src/main/protocol/TestConductorProtocol.proto @@ -7,7 +7,7 @@ option optimize_for = SPEED; /****************************************** Compile with: - cd ./akka-remote-tests/src/main/protocol + cd ./akka-multi-node-testkit/src/main/protocol protoc TestConductorProtocol.proto --java_out ../java cd ../../../.. ./scripts/fix-protobuf.sh @@ -55,7 +55,8 @@ enum FailType { Throttle = 1; Disconnect = 2; Abort = 3; - Shutdown = 4; + Exit = 4; + Shutdown = 5; } enum Direction { diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala index c03eee6d14..51df5c8a9c 100644 --- a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala +++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala @@ -179,12 +179,26 @@ trait Conductor { this: TestConductorExt ⇒ * @param node is the symbolic name of the node which is to be affected * @param exitValue is the return code which shall be given to System.exit */ - def shutdown(node: RoleName, exitValue: Int): Future[Done] = { + def exit(node: RoleName, exitValue: Int): Future[Done] = { import Settings.QueryTimeout import system.dispatcher // the recover is needed to handle ClientDisconnectedException exception, // which is normal during shutdown - controller ? Terminate(node, exitValue) mapTo classTag[Done] recover { case _: ClientDisconnectedException ⇒ Done } + controller ? Terminate(node, Some(exitValue)) mapTo classTag[Done] recover { case _: ClientDisconnectedException ⇒ Done } + } + + /** + * Tell the actor system at the remote node to shut itself down. The node will also be + * removed, so that the remaining nodes may still pass subsequent barriers. + * + * @param node is the symbolic name of the node which is to be affected + */ + def shutdown(node: RoleName): Future[Done] = { + import Settings.QueryTimeout + import system.dispatcher + // the recover is needed to handle ClientDisconnectedException exception, + // which is normal during shutdown + controller ? Terminate(node, None) mapTo classTag[Done] recover { case _: ClientDisconnectedException ⇒ Done } } /** diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/DataTypes.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/DataTypes.scala index 594c28dcf9..a1b61e6d68 100644 --- a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/DataTypes.scala +++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/DataTypes.scala @@ -43,8 +43,8 @@ private[akka] case class ThrottleMsg(target: Address, direction: Direction, rate private[akka] case class Disconnect(node: RoleName, target: RoleName, abort: Boolean) extends CommandOp private[akka] case class DisconnectMsg(target: Address, abort: Boolean) extends ConfirmedClientOp with NetworkOp -private[akka] case class Terminate(node: RoleName, exitValue: Int) extends CommandOp -private[akka] case class TerminateMsg(exitValue: Int) extends ConfirmedClientOp with NetworkOp +private[akka] case class Terminate(node: RoleName, exitValue: Option[Int]) extends CommandOp +private[akka] case class TerminateMsg(exitValue: Option[Int]) extends ConfirmedClientOp with NetworkOp private[akka] case class GetAddress(node: RoleName) extends ServerOp with NetworkOp private[akka] case class AddressReply(node: RoleName, addr: Address) extends UnconfirmedClientOp with NetworkOp @@ -94,8 +94,10 @@ private[akka] class MsgEncoder extends OneToOneEncoder { case DisconnectMsg(target, abort) ⇒ w.setFailure(TCP.InjectFailure.newBuilder.setAddress(target) .setFailure(if (abort) TCP.FailType.Abort else TCP.FailType.Disconnect)) - case TerminateMsg(exitValue) ⇒ - w.setFailure(TCP.InjectFailure.newBuilder.setFailure(TCP.FailType.Shutdown).setExitValue(exitValue)) + case TerminateMsg(Some(exitValue)) ⇒ + w.setFailure(TCP.InjectFailure.newBuilder.setFailure(TCP.FailType.Exit).setExitValue(exitValue)) + case TerminateMsg(None) ⇒ + w.setFailure(TCP.InjectFailure.newBuilder.setFailure(TCP.FailType.Shutdown)) case GetAddress(node) ⇒ w.setAddr(TCP.AddressRequest.newBuilder.setNode(node.name)) case AddressReply(node, addr) ⇒ @@ -140,7 +142,8 @@ private[akka] class MsgDecoder extends OneToOneDecoder { case FT.Throttle ⇒ ThrottleMsg(f.getAddress, f.getDirection, f.getRateMBit) case FT.Abort ⇒ DisconnectMsg(f.getAddress, true) case FT.Disconnect ⇒ DisconnectMsg(f.getAddress, false) - case FT.Shutdown ⇒ TerminateMsg(f.getExitValue) + case FT.Exit ⇒ TerminateMsg(Some(f.getExitValue)) + case FT.Shutdown ⇒ TerminateMsg(None) } } else if (w.hasAddr) { val a = w.getAddr diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala index 0316e2608c..b3cd47f594 100644 --- a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala +++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala @@ -237,8 +237,11 @@ private[akka] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress) import context.dispatcher // FIXME is this the right EC for the future below? // FIXME: Currently ignoring, needs support from Remoting stay - case TerminateMsg(exit) ⇒ - System.exit(exit) + case TerminateMsg(None) ⇒ + context.system.shutdown() + stay + case TerminateMsg(Some(exitValue)) ⇒ + System.exit(exitValue) stay // needed because Java doesn’t have Nothing case _: Done ⇒ stay //FIXME what should happen? } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteDeploymentDeathWatchSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteDeploymentDeathWatchSpec.scala index db2965e346..6d51ec989a 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteDeploymentDeathWatchSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteDeploymentDeathWatchSpec.scala @@ -76,7 +76,6 @@ abstract class RemoteDeploymentDeathWatchSpec sleep() // if the remote deployed actor is not removed the system will not shutdown - system.shutdown() val timeout = remaining try system.awaitTermination(timeout) catch { case _: TimeoutException ⇒ @@ -93,12 +92,12 @@ abstract class RemoteDeploymentDeathWatchSpec runOn(first) { enterBarrier("hello-deployed") sleep() - testConductor.shutdown(third, 0).await + testConductor.exit(third, 0).await enterBarrier("third-crashed") runOn(first) { - // second system will be shutdown, remove to not participate in barriers any more - testConductor.removeNode(second) + // second system will be shutdown + testConductor.shutdown(second) } enterBarrier("after-3") diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeDeathWatchSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeDeathWatchSpec.scala index 9807f7ea3c..7c0b0bf209 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeDeathWatchSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeDeathWatchSpec.scala @@ -333,7 +333,7 @@ abstract class RemoteNodeDeathWatchSpec sleep() log.info("shutdown second") - testConductor.shutdown(second, 0).await + testConductor.exit(second, 0).await expectMsgType[WrappedTerminated](15 seconds).t.actor must be(subject) // verify that things are cleaned up, and heartbeating is stopped