diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala
index 88b41c0073..6d2e3fbba7 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala
@@ -648,6 +648,19 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
guardian.stop()
}
+ @volatile var aborting = false
+
+ /**
+ * This kind of shutdown attempts to bring the system down and release its
+ * resources more forcefully than plain shutdown. For example it will not
+ * wait for remote-deployed child actors to terminate before terminating their
+ * parents.
+ */
+ def abort(): Unit = {
+ aborting = true
+ shutdown()
+ }
+
//#create-scheduler
/**
* Create the scheduler service. This one needs one special behavior: if
diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala
index ae4c6532fa..26b7c2e626 100644
--- a/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala
+++ b/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala
@@ -16,6 +16,7 @@ import scala.collection.immutable
import scala.concurrent.duration.Duration
import scala.util.control.Exception._
import scala.util.control.NonFatal
+import akka.actor.ActorRefScope
private[akka] trait FaultHandling { this: ActorCell ⇒
@@ -148,6 +149,14 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
// stop all children, which will turn childrenRefs into TerminatingChildrenContainer (if there are children)
children foreach stop
+ if (systemImpl.aborting) {
+ // separate iteration because this is a very rare case that should not penalize normal operation
+ children foreach {
+ case ref: ActorRefScope if !ref.isLocal ⇒ self.sendSystemMessage(DeathWatchNotification(ref, true, false))
+ case _ ⇒
+ }
+ }
+
val wasTerminating = isTerminating
if (setChildrenTerminationReason(ChildrenContainer.Termination)) {
diff --git a/akka-multi-node-testkit/src/main/java/akka/remote/testconductor/TestConductorProtocol.java b/akka-multi-node-testkit/src/main/java/akka/remote/testconductor/TestConductorProtocol.java
index 66ad015941..1384937522 100644
--- a/akka-multi-node-testkit/src/main/java/akka/remote/testconductor/TestConductorProtocol.java
+++ b/akka-multi-node-testkit/src/main/java/akka/remote/testconductor/TestConductorProtocol.java
@@ -1,5 +1,5 @@
// Generated by the protocol buffer compiler. DO NOT EDIT!
-// source: TestConductorProtocol.proto
+// source: protobuf/TestConductorProtocol.proto
package akka.remote.testconductor;
@@ -133,6 +133,10 @@ public final class TestConductorProtocol {
* Shutdown = 5;
*/
Shutdown(4, 5),
+ /**
+ * ShutdownAbrupt = 6;
+ */
+ ShutdownAbrupt(5, 6),
;
/**
@@ -155,6 +159,10 @@ public final class TestConductorProtocol {
* Shutdown = 5;
*/
public static final int Shutdown_VALUE = 5;
+ /**
+ * ShutdownAbrupt = 6;
+ */
+ public static final int ShutdownAbrupt_VALUE = 6;
public final int getNumber() { return value; }
@@ -166,6 +174,7 @@ public final class TestConductorProtocol {
case 3: return Abort;
case 4: return Exit;
case 5: return Shutdown;
+ case 6: return ShutdownAbrupt;
default: return null;
}
}
@@ -5427,26 +5436,27 @@ public final class TestConductorProtocol {
descriptor;
static {
java.lang.String[] descriptorData = {
- "\n\033TestConductorProtocol.proto\"\216\001\n\007Wrappe" +
- "r\022\025\n\005hello\030\001 \001(\0132\006.Hello\022\036\n\007barrier\030\002 \001(" +
- "\0132\r.EnterBarrier\022\037\n\007failure\030\003 \001(\0132\016.Inje" +
- "ctFailure\022\014\n\004done\030\004 \001(\t\022\035\n\004addr\030\005 \001(\0132\017." +
- "AddressRequest\"0\n\005Hello\022\014\n\004name\030\001 \002(\t\022\031\n" +
- "\007address\030\002 \002(\0132\010.Address\"E\n\014EnterBarrier" +
- "\022\014\n\004name\030\001 \002(\t\022\026\n\002op\030\002 \002(\0162\n.BarrierOp\022\017" +
- "\n\007timeout\030\003 \001(\003\"6\n\016AddressRequest\022\014\n\004nod" +
- "e\030\001 \002(\t\022\026\n\004addr\030\002 \001(\0132\010.Address\"G\n\007Addre" +
- "ss\022\020\n\010protocol\030\001 \002(\t\022\016\n\006system\030\002 \002(\t\022\014\n\004",
- "host\030\003 \002(\t\022\014\n\004port\030\004 \002(\005\"\212\001\n\rInjectFailu" +
- "re\022\032\n\007failure\030\001 \002(\0162\t.FailType\022\035\n\tdirect" +
- "ion\030\002 \001(\0162\n.Direction\022\031\n\007address\030\003 \001(\0132\010" +
- ".Address\022\020\n\010rateMBit\030\006 \001(\002\022\021\n\texitValue\030" +
- "\007 \001(\005*;\n\tBarrierOp\022\t\n\005Enter\020\001\022\010\n\004Fail\020\002\022" +
- "\r\n\tSucceeded\020\003\022\n\n\006Failed\020\004*K\n\010FailType\022\014" +
- "\n\010Throttle\020\001\022\016\n\nDisconnect\020\002\022\t\n\005Abort\020\003\022" +
- "\010\n\004Exit\020\004\022\014\n\010Shutdown\020\005*,\n\tDirection\022\010\n\004" +
- "Send\020\001\022\013\n\007Receive\020\002\022\010\n\004Both\020\003B\035\n\031akka.re" +
- "mote.testconductorH\001"
+ "\n$protobuf/TestConductorProtocol.proto\"\216" +
+ "\001\n\007Wrapper\022\025\n\005hello\030\001 \001(\0132\006.Hello\022\036\n\007bar" +
+ "rier\030\002 \001(\0132\r.EnterBarrier\022\037\n\007failure\030\003 \001" +
+ "(\0132\016.InjectFailure\022\014\n\004done\030\004 \001(\t\022\035\n\004addr" +
+ "\030\005 \001(\0132\017.AddressRequest\"0\n\005Hello\022\014\n\004name" +
+ "\030\001 \002(\t\022\031\n\007address\030\002 \002(\0132\010.Address\"E\n\014Ent" +
+ "erBarrier\022\014\n\004name\030\001 \002(\t\022\026\n\002op\030\002 \002(\0162\n.Ba" +
+ "rrierOp\022\017\n\007timeout\030\003 \001(\003\"6\n\016AddressReque" +
+ "st\022\014\n\004node\030\001 \002(\t\022\026\n\004addr\030\002 \001(\0132\010.Address" +
+ "\"G\n\007Address\022\020\n\010protocol\030\001 \002(\t\022\016\n\006system\030",
+ "\002 \002(\t\022\014\n\004host\030\003 \002(\t\022\014\n\004port\030\004 \002(\005\"\212\001\n\rIn" +
+ "jectFailure\022\032\n\007failure\030\001 \002(\0162\t.FailType\022" +
+ "\035\n\tdirection\030\002 \001(\0162\n.Direction\022\031\n\007addres" +
+ "s\030\003 \001(\0132\010.Address\022\020\n\010rateMBit\030\006 \001(\002\022\021\n\te" +
+ "xitValue\030\007 \001(\005*;\n\tBarrierOp\022\t\n\005Enter\020\001\022\010" +
+ "\n\004Fail\020\002\022\r\n\tSucceeded\020\003\022\n\n\006Failed\020\004*_\n\010F" +
+ "ailType\022\014\n\010Throttle\020\001\022\016\n\nDisconnect\020\002\022\t\n" +
+ "\005Abort\020\003\022\010\n\004Exit\020\004\022\014\n\010Shutdown\020\005\022\022\n\016Shut" +
+ "downAbrupt\020\006*,\n\tDirection\022\010\n\004Send\020\001\022\013\n\007R" +
+ "eceive\020\002\022\010\n\004Both\020\003B\035\n\031akka.remote.testco",
+ "nductorH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
diff --git a/akka-multi-node-testkit/src/main/protobuf/TestConductorProtocol.proto b/akka-multi-node-testkit/src/main/protobuf/TestConductorProtocol.proto
index 2c8b65908a..bb86537b11 100644
--- a/akka-multi-node-testkit/src/main/protobuf/TestConductorProtocol.proto
+++ b/akka-multi-node-testkit/src/main/protobuf/TestConductorProtocol.proto
@@ -49,6 +49,7 @@ enum FailType {
Abort = 3;
Exit = 4;
Shutdown = 5;
+ ShutdownAbrupt = 6;
}
enum Direction {
diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala
index 4b1e61bf46..c73fdbf247 100644
--- a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala
+++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala
@@ -185,7 +185,7 @@ trait Conductor { this: TestConductorExt ⇒
import system.dispatcher
// the recover is needed to handle ClientDisconnectedException exception,
// which is normal during shutdown
- controller ? Terminate(node, Some(exitValue)) mapTo classTag[Done] recover { case _: ClientDisconnectedException ⇒ Done }
+ controller ? Terminate(node, Right(exitValue)) mapTo classTag[Done] recover { case _: ClientDisconnectedException ⇒ Done }
}
/**
@@ -194,12 +194,21 @@ trait Conductor { this: TestConductorExt ⇒
*
* @param node is the symbolic name of the node which is to be affected
*/
- def shutdown(node: RoleName): Future[Done] = {
+ def shutdown(node: RoleName): Future[Done] = shutdown(node, abort = false)
+
+ /**
+ * Tell the actor system at the remote node to shut itself down without
+ * awaiting termination of remote-deployed children. The node will also be
+ * removed, so that the remaining nodes may still pass subsequent barriers.
+ *
+ * @param node is the symbolic name of the node which is to be affected
+ */
+ def shutdown(node: RoleName, abort: Boolean): Future[Done] = {
import Settings.QueryTimeout
import system.dispatcher
// the recover is needed to handle ClientDisconnectedException exception,
// which is normal during shutdown
- controller ? Terminate(node, None) mapTo classTag[Done] recover { case _: ClientDisconnectedException ⇒ Done }
+ controller ? Terminate(node, Left(abort)) mapTo classTag[Done] recover { case _: ClientDisconnectedException ⇒ Done }
}
/**
@@ -455,9 +464,10 @@ private[akka] class Controller(private var initialParticipants: Int, controllerP
case Disconnect(node, target, abort) ⇒
val t = nodes(target)
nodes(node).fsm forward ToClient(DisconnectMsg(t.addr, abort))
- case Terminate(node, exitValue) ⇒
+ case Terminate(node, shutdownOrExit) ⇒
barrier ! BarrierCoordinator.RemoveClient(node)
- nodes(node).fsm forward ToClient(TerminateMsg(exitValue))
+ nodes(node).fsm forward ToClient(TerminateMsg(shutdownOrExit))
+ nodes -= node
case Remove(node) ⇒
barrier ! BarrierCoordinator.RemoveClient(node)
}
diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/DataTypes.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/DataTypes.scala
index 03af7f8614..cc8361f158 100644
--- a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/DataTypes.scala
+++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/DataTypes.scala
@@ -43,8 +43,8 @@ private[akka] final case class ThrottleMsg(target: Address, direction: Direction
private[akka] final case class Disconnect(node: RoleName, target: RoleName, abort: Boolean) extends CommandOp
private[akka] final case class DisconnectMsg(target: Address, abort: Boolean) extends ConfirmedClientOp with NetworkOp
-private[akka] final case class Terminate(node: RoleName, exitValue: Option[Int]) extends CommandOp
-private[akka] final case class TerminateMsg(exitValue: Option[Int]) extends ConfirmedClientOp with NetworkOp
+private[akka] final case class Terminate(node: RoleName, shutdownOrExit: Either[Boolean, Int]) extends CommandOp
+private[akka] final case class TerminateMsg(shutdownOrExit: Either[Boolean, Int]) extends ConfirmedClientOp with NetworkOp
private[akka] final case class GetAddress(node: RoleName) extends ServerOp with NetworkOp
private[akka] final case class AddressReply(node: RoleName, addr: Address) extends UnconfirmedClientOp with NetworkOp
@@ -94,10 +94,12 @@ private[akka] class MsgEncoder extends OneToOneEncoder {
case DisconnectMsg(target, abort) ⇒
w.setFailure(TCP.InjectFailure.newBuilder.setAddress(target)
.setFailure(if (abort) TCP.FailType.Abort else TCP.FailType.Disconnect))
- case TerminateMsg(Some(exitValue)) ⇒
+ case TerminateMsg(Right(exitValue)) ⇒
w.setFailure(TCP.InjectFailure.newBuilder.setFailure(TCP.FailType.Exit).setExitValue(exitValue))
- case TerminateMsg(None) ⇒
+ case TerminateMsg(Left(false)) ⇒
w.setFailure(TCP.InjectFailure.newBuilder.setFailure(TCP.FailType.Shutdown))
+ case TerminateMsg(Left(true)) ⇒
+ w.setFailure(TCP.InjectFailure.newBuilder.setFailure(TCP.FailType.ShutdownAbrupt))
case GetAddress(node) ⇒
w.setAddr(TCP.AddressRequest.newBuilder.setNode(node.name))
case AddressReply(node, addr) ⇒
@@ -139,11 +141,12 @@ private[akka] class MsgDecoder extends OneToOneDecoder {
val f = w.getFailure
import TCP.{ FailType ⇒ FT }
f.getFailure match {
- case FT.Throttle ⇒ ThrottleMsg(f.getAddress, f.getDirection, f.getRateMBit)
- case FT.Abort ⇒ DisconnectMsg(f.getAddress, true)
- case FT.Disconnect ⇒ DisconnectMsg(f.getAddress, false)
- case FT.Exit ⇒ TerminateMsg(Some(f.getExitValue))
- case FT.Shutdown ⇒ TerminateMsg(None)
+ case FT.Throttle ⇒ ThrottleMsg(f.getAddress, f.getDirection, f.getRateMBit)
+ case FT.Abort ⇒ DisconnectMsg(f.getAddress, true)
+ case FT.Disconnect ⇒ DisconnectMsg(f.getAddress, false)
+ case FT.Exit ⇒ TerminateMsg(Right(f.getExitValue))
+ case FT.Shutdown ⇒ TerminateMsg(Left(false))
+ case FT.ShutdownAbrupt ⇒ TerminateMsg(Left(true))
}
} else if (w.hasAddr) {
val a = w.getAddr
diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala
index d39d563654..80cd9a87fa 100644
--- a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala
+++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala
@@ -31,8 +31,9 @@ trait Player { this: TestConductorExt ⇒
private var _client: ActorRef = _
private def client = _client match {
- case null ⇒ throw new IllegalStateException("TestConductor client not yet started")
- case x ⇒ x
+ case null ⇒ throw new IllegalStateException("TestConductor client not yet started")
+ case _ if system.isTerminated ⇒ throw new IllegalStateException("TestConductor unavailable because system is shutdown; you need to startNewSystem() before this point")
+ case x ⇒ x
}
/**
@@ -239,10 +240,13 @@ private[akka] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress)
import context.dispatcher // FIXME is this the right EC for the future below?
// FIXME: Currently ignoring, needs support from Remoting
stay
- case TerminateMsg(None) ⇒
+ case TerminateMsg(Left(false)) ⇒
context.system.shutdown()
stay
- case TerminateMsg(Some(exitValue)) ⇒
+ case TerminateMsg(Left(true)) ⇒
+ context.system.asInstanceOf[ActorSystemImpl].abort()
+ stay
+ case TerminateMsg(Right(exitValue)) ⇒
System.exit(exitValue)
stay // needed because Java doesn’t have Nothing
case _: Done ⇒ stay //FIXME what should happen?
diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala
index 7377e9cc14..65ceb61ee9 100644
--- a/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala
+++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala
@@ -327,7 +327,7 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles:
* been started either in Conductor or Player mode when the constructor of
* MultiNodeSpec finishes, i.e. do not call the start*() methods yourself!
*/
- val testConductor: TestConductorExt = TestConductor(system)
+ var testConductor: TestConductorExt = null
/**
* Execute the given block of code only on the given nodes (names according
@@ -376,52 +376,85 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles:
*/
private val controllerAddr = new InetSocketAddress(serverName, serverPort)
- if (selfIndex == 0) {
- Await.result(testConductor.startController(initialParticipants, myself, controllerAddr),
- testConductor.Settings.BarrierTimeout.duration)
- } else {
- Await.result(testConductor.startClient(myself, controllerAddr),
- testConductor.Settings.BarrierTimeout.duration)
+
+ protected def attachConductor(tc: TestConductorExt): Unit = {
+ val timeout = tc.Settings.BarrierTimeout.duration
+ val startFuture =
+ if (selfIndex == 0) tc.startController(initialParticipants, myself, controllerAddr)
+ else tc.startClient(myself, controllerAddr)
+ try Await.result(startFuture, timeout)
+ catch {
+ case NonFatal(x) ⇒ throw new RuntimeException("failure while attaching new conductor", x)
+ }
+ testConductor = tc
}
+ attachConductor(TestConductor(system))
+
// now add deployments, if so desired
private final case class Replacement(tag: String, role: RoleName) {
lazy val addr = node(role).address.toString
}
+
private val replacements = roles map (r ⇒ Replacement("@" + r.name + "@", r))
- private val deployer = system.asInstanceOf[ExtendedActorSystem].provider.deployer
- deployments(myself) foreach { str ⇒
- val deployString = (str /: replacements) {
- case (base, r @ Replacement(tag, _)) ⇒
- base.indexOf(tag) match {
- case -1 ⇒ base
- case start ⇒
- val replaceWith = try
- r.addr
- catch {
- case NonFatal(e) ⇒
- // might happen if all test cases are ignored (excluded) and
- // controller node is finished/exited before r.addr is run
- // on the other nodes
- val unresolved = "akka://unresolved-replacement-" + r.role.name
- log.warning(unresolved + " due to: " + e.getMessage)
- unresolved
- }
- base.replace(tag, replaceWith)
- }
- }
- import scala.collection.JavaConverters._
- ConfigFactory.parseString(deployString).root.asScala foreach {
- case (key, value: ConfigObject) ⇒ deployer.parseConfig(key, value.toConfig) foreach deployer.deploy
- case (key, x) ⇒ throw new IllegalArgumentException(s"key $key must map to deployment section, not simple value $x")
+
+ protected def injectDeployments(sys: ActorSystem, role: RoleName): Unit = {
+ val deployer = sys.asInstanceOf[ExtendedActorSystem].provider.deployer
+ deployments(role) foreach { str ⇒
+ val deployString = (str /: replacements) {
+ case (base, r @ Replacement(tag, _)) ⇒
+ base.indexOf(tag) match {
+ case -1 ⇒ base
+ case start ⇒
+ val replaceWith = try
+ r.addr
+ catch {
+ case NonFatal(e) ⇒
+ // might happen if all test cases are ignored (excluded) and
+ // controller node is finished/exited before r.addr is run
+ // on the other nodes
+ val unresolved = "akka://unresolved-replacement-" + r.role.name
+ log.warning(unresolved + " due to: " + e.getMessage)
+ unresolved
+ }
+ base.replace(tag, replaceWith)
+ }
+ }
+ import scala.collection.JavaConverters._
+ ConfigFactory.parseString(deployString).root.asScala foreach {
+ case (key, value: ConfigObject) ⇒ deployer.parseConfig(key, value.toConfig) foreach deployer.deploy
+ case (key, x) ⇒ throw new IllegalArgumentException(s"key $key must map to deployment section, not simple value $x")
+ }
}
}
- // useful to see which jvm is running which role, used by LogRoleReplace utility
- log.info("Role [{}] started with address [{}]", myself.name,
- system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport.defaultAddress)
+ injectDeployments(system, myself)
+ protected val myAddress = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
+
+ // useful to see which jvm is running which role, used by LogRoleReplace utility
+ log.info("Role [{}] started with address [{}]", myself.name, myAddress)
+
+ /**
+ * This method starts a new ActorSystem with the same configuration as the
+ * previous one on the current node, including deployments. It also creates
+ * a new TestConductor client and registers itself with the conductor so
+ * that it is possible to use barriers etc. normally after this method has
+ * been called.
+ *
+ * NOTICE: you MUST start a new system before trying to enter a barrier or
+ * otherwise using the TestConductor after having terminated this node’s
+ * system.
+ */
+ protected def startNewSystem(): ActorSystem = {
+ val config = ConfigFactory.parseString(s"akka.remote.netty.tcp{port=${myAddress.port.get}\nhostname=${myAddress.host.get}}")
+ .withFallback(system.settings.config)
+ val sys = ActorSystem(system.name, config)
+ injectDeployments(sys, myself)
+ attachConductor(TestConductor(sys))
+ sys
+ }
}
/**
diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala
index 16b4517db3..fb9338c1a0 100644
--- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala
+++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala
@@ -630,7 +630,7 @@ trait TestKitBase {
for { x ← 1 to n } yield {
val timeout = stop - now
val o = receiveOne(timeout)
- assert(o ne null, s"timeout ($max) while expecting $n messages")
+ assert(o ne null, s"timeout ($max) while expecting $n messages (got ${x - 1})")
o
}
}