+act,mul #3948 add MultiNodeSpec.startNewSsytem() and system.abort()

abort() currently only changes that remote-deployed child actors are not
waited for during termination (because that would not change anything);
it is still a different operation than shutdown() since it changes what
you are guaranteed to observe after termination.

testConductor.shutdown(..., abort = true) uses this mode of termination.

improve MultiNodeSpec to allow injection of deployment configuration
into arbitrary actor systems

include number of received elements in the timeout failure message for
TestKit.receiveN
This commit is contained in:
Roland Kuhn 2014-03-21 20:10:34 +01:00
parent 4f4d1d959f
commit a9c022e92a
9 changed files with 158 additions and 75 deletions

View file

@ -648,6 +648,19 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
guardian.stop() guardian.stop()
} }
@volatile var aborting = false
/**
* This kind of shutdown attempts to bring the system down and release its
* resources more forcefully than plain shutdown. For example it will not
* wait for remote-deployed child actors to terminate before terminating their
* parents.
*/
def abort(): Unit = {
aborting = true
shutdown()
}
//#create-scheduler //#create-scheduler
/** /**
* Create the scheduler service. This one needs one special behavior: if * Create the scheduler service. This one needs one special behavior: if

View file

@ -16,6 +16,7 @@ import scala.collection.immutable
import scala.concurrent.duration.Duration import scala.concurrent.duration.Duration
import scala.util.control.Exception._ import scala.util.control.Exception._
import scala.util.control.NonFatal import scala.util.control.NonFatal
import akka.actor.ActorRefScope
private[akka] trait FaultHandling { this: ActorCell private[akka] trait FaultHandling { this: ActorCell
@ -148,6 +149,14 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
// stop all children, which will turn childrenRefs into TerminatingChildrenContainer (if there are children) // stop all children, which will turn childrenRefs into TerminatingChildrenContainer (if there are children)
children foreach stop children foreach stop
if (systemImpl.aborting) {
// separate iteration because this is a very rare case that should not penalize normal operation
children foreach {
case ref: ActorRefScope if !ref.isLocal self.sendSystemMessage(DeathWatchNotification(ref, true, false))
case _
}
}
val wasTerminating = isTerminating val wasTerminating = isTerminating
if (setChildrenTerminationReason(ChildrenContainer.Termination)) { if (setChildrenTerminationReason(ChildrenContainer.Termination)) {

View file

@ -1,5 +1,5 @@
// Generated by the protocol buffer compiler. DO NOT EDIT! // Generated by the protocol buffer compiler. DO NOT EDIT!
// source: TestConductorProtocol.proto // source: protobuf/TestConductorProtocol.proto
package akka.remote.testconductor; package akka.remote.testconductor;
@ -133,6 +133,10 @@ public final class TestConductorProtocol {
* <code>Shutdown = 5;</code> * <code>Shutdown = 5;</code>
*/ */
Shutdown(4, 5), Shutdown(4, 5),
/**
* <code>ShutdownAbrupt = 6;</code>
*/
ShutdownAbrupt(5, 6),
; ;
/** /**
@ -155,6 +159,10 @@ public final class TestConductorProtocol {
* <code>Shutdown = 5;</code> * <code>Shutdown = 5;</code>
*/ */
public static final int Shutdown_VALUE = 5; public static final int Shutdown_VALUE = 5;
/**
* <code>ShutdownAbrupt = 6;</code>
*/
public static final int ShutdownAbrupt_VALUE = 6;
public final int getNumber() { return value; } public final int getNumber() { return value; }
@ -166,6 +174,7 @@ public final class TestConductorProtocol {
case 3: return Abort; case 3: return Abort;
case 4: return Exit; case 4: return Exit;
case 5: return Shutdown; case 5: return Shutdown;
case 6: return ShutdownAbrupt;
default: return null; default: return null;
} }
} }
@ -5427,26 +5436,27 @@ public final class TestConductorProtocol {
descriptor; descriptor;
static { static {
java.lang.String[] descriptorData = { java.lang.String[] descriptorData = {
"\n\033TestConductorProtocol.proto\"\216\001\n\007Wrappe" + "\n$protobuf/TestConductorProtocol.proto\"\216" +
"r\022\025\n\005hello\030\001 \001(\0132\006.Hello\022\036\n\007barrier\030\002 \001(" + "\001\n\007Wrapper\022\025\n\005hello\030\001 \001(\0132\006.Hello\022\036\n\007bar" +
"\0132\r.EnterBarrier\022\037\n\007failure\030\003 \001(\0132\016.Inje" + "rier\030\002 \001(\0132\r.EnterBarrier\022\037\n\007failure\030\003 \001" +
"ctFailure\022\014\n\004done\030\004 \001(\t\022\035\n\004addr\030\005 \001(\0132\017." + "(\0132\016.InjectFailure\022\014\n\004done\030\004 \001(\t\022\035\n\004addr" +
"AddressRequest\"0\n\005Hello\022\014\n\004name\030\001 \002(\t\022\031\n" + "\030\005 \001(\0132\017.AddressRequest\"0\n\005Hello\022\014\n\004name" +
"\007address\030\002 \002(\0132\010.Address\"E\n\014EnterBarrier" + "\030\001 \002(\t\022\031\n\007address\030\002 \002(\0132\010.Address\"E\n\014Ent" +
"\022\014\n\004name\030\001 \002(\t\022\026\n\002op\030\002 \002(\0162\n.BarrierOp\022\017" + "erBarrier\022\014\n\004name\030\001 \002(\t\022\026\n\002op\030\002 \002(\0162\n.Ba" +
"\n\007timeout\030\003 \001(\003\"6\n\016AddressRequest\022\014\n\004nod" + "rrierOp\022\017\n\007timeout\030\003 \001(\003\"6\n\016AddressReque" +
"e\030\001 \002(\t\022\026\n\004addr\030\002 \001(\0132\010.Address\"G\n\007Addre" + "st\022\014\n\004node\030\001 \002(\t\022\026\n\004addr\030\002 \001(\0132\010.Address" +
"ss\022\020\n\010protocol\030\001 \002(\t\022\016\n\006system\030\002 \002(\t\022\014\n\004", "\"G\n\007Address\022\020\n\010protocol\030\001 \002(\t\022\016\n\006system\030",
"host\030\003 \002(\t\022\014\n\004port\030\004 \002(\005\"\212\001\n\rInjectFailu" + "\002 \002(\t\022\014\n\004host\030\003 \002(\t\022\014\n\004port\030\004 \002(\005\"\212\001\n\rIn" +
"re\022\032\n\007failure\030\001 \002(\0162\t.FailType\022\035\n\tdirect" + "jectFailure\022\032\n\007failure\030\001 \002(\0162\t.FailType\022" +
"ion\030\002 \001(\0162\n.Direction\022\031\n\007address\030\003 \001(\0132\010" + "\035\n\tdirection\030\002 \001(\0162\n.Direction\022\031\n\007addres" +
".Address\022\020\n\010rateMBit\030\006 \001(\002\022\021\n\texitValue\030" + "s\030\003 \001(\0132\010.Address\022\020\n\010rateMBit\030\006 \001(\002\022\021\n\te" +
"\007 \001(\005*;\n\tBarrierOp\022\t\n\005Enter\020\001\022\010\n\004Fail\020\002\022" + "xitValue\030\007 \001(\005*;\n\tBarrierOp\022\t\n\005Enter\020\001\022\010" +
"\r\n\tSucceeded\020\003\022\n\n\006Failed\020\004*K\n\010FailType\022\014" + "\n\004Fail\020\002\022\r\n\tSucceeded\020\003\022\n\n\006Failed\020\004*_\n\010F" +
"\n\010Throttle\020\001\022\016\n\nDisconnect\020\002\022\t\n\005Abort\020\003\022" + "ailType\022\014\n\010Throttle\020\001\022\016\n\nDisconnect\020\002\022\t\n" +
"\010\n\004Exit\020\004\022\014\n\010Shutdown\020\005*,\n\tDirection\022\010\n\004" + "\005Abort\020\003\022\010\n\004Exit\020\004\022\014\n\010Shutdown\020\005\022\022\n\016Shut" +
"Send\020\001\022\013\n\007Receive\020\002\022\010\n\004Both\020\003B\035\n\031akka.re" + "downAbrupt\020\006*,\n\tDirection\022\010\n\004Send\020\001\022\013\n\007R" +
"mote.testconductorH\001" "eceive\020\002\022\010\n\004Both\020\003B\035\n\031akka.remote.testco",
"nductorH\001"
}; };
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

View file

@ -49,6 +49,7 @@ enum FailType {
Abort = 3; Abort = 3;
Exit = 4; Exit = 4;
Shutdown = 5; Shutdown = 5;
ShutdownAbrupt = 6;
} }
enum Direction { enum Direction {

View file

@ -185,7 +185,7 @@ trait Conductor { this: TestConductorExt ⇒
import system.dispatcher import system.dispatcher
// the recover is needed to handle ClientDisconnectedException exception, // the recover is needed to handle ClientDisconnectedException exception,
// which is normal during shutdown // which is normal during shutdown
controller ? Terminate(node, Some(exitValue)) mapTo classTag[Done] recover { case _: ClientDisconnectedException Done } controller ? Terminate(node, Right(exitValue)) mapTo classTag[Done] recover { case _: ClientDisconnectedException Done }
} }
/** /**
@ -194,12 +194,21 @@ trait Conductor { this: TestConductorExt ⇒
* *
* @param node is the symbolic name of the node which is to be affected * @param node is the symbolic name of the node which is to be affected
*/ */
def shutdown(node: RoleName): Future[Done] = { def shutdown(node: RoleName): Future[Done] = shutdown(node, abort = false)
/**
* Tell the actor system at the remote node to shut itself down without
* awaiting termination of remote-deployed children. The node will also be
* removed, so that the remaining nodes may still pass subsequent barriers.
*
* @param node is the symbolic name of the node which is to be affected
*/
def shutdown(node: RoleName, abort: Boolean): Future[Done] = {
import Settings.QueryTimeout import Settings.QueryTimeout
import system.dispatcher import system.dispatcher
// the recover is needed to handle ClientDisconnectedException exception, // the recover is needed to handle ClientDisconnectedException exception,
// which is normal during shutdown // which is normal during shutdown
controller ? Terminate(node, None) mapTo classTag[Done] recover { case _: ClientDisconnectedException Done } controller ? Terminate(node, Left(abort)) mapTo classTag[Done] recover { case _: ClientDisconnectedException Done }
} }
/** /**
@ -455,9 +464,10 @@ private[akka] class Controller(private var initialParticipants: Int, controllerP
case Disconnect(node, target, abort) case Disconnect(node, target, abort)
val t = nodes(target) val t = nodes(target)
nodes(node).fsm forward ToClient(DisconnectMsg(t.addr, abort)) nodes(node).fsm forward ToClient(DisconnectMsg(t.addr, abort))
case Terminate(node, exitValue) case Terminate(node, shutdownOrExit)
barrier ! BarrierCoordinator.RemoveClient(node) barrier ! BarrierCoordinator.RemoveClient(node)
nodes(node).fsm forward ToClient(TerminateMsg(exitValue)) nodes(node).fsm forward ToClient(TerminateMsg(shutdownOrExit))
nodes -= node
case Remove(node) case Remove(node)
barrier ! BarrierCoordinator.RemoveClient(node) barrier ! BarrierCoordinator.RemoveClient(node)
} }

View file

@ -43,8 +43,8 @@ private[akka] final case class ThrottleMsg(target: Address, direction: Direction
private[akka] final case class Disconnect(node: RoleName, target: RoleName, abort: Boolean) extends CommandOp private[akka] final case class Disconnect(node: RoleName, target: RoleName, abort: Boolean) extends CommandOp
private[akka] final case class DisconnectMsg(target: Address, abort: Boolean) extends ConfirmedClientOp with NetworkOp private[akka] final case class DisconnectMsg(target: Address, abort: Boolean) extends ConfirmedClientOp with NetworkOp
private[akka] final case class Terminate(node: RoleName, exitValue: Option[Int]) extends CommandOp private[akka] final case class Terminate(node: RoleName, shutdownOrExit: Either[Boolean, Int]) extends CommandOp
private[akka] final case class TerminateMsg(exitValue: Option[Int]) extends ConfirmedClientOp with NetworkOp private[akka] final case class TerminateMsg(shutdownOrExit: Either[Boolean, Int]) extends ConfirmedClientOp with NetworkOp
private[akka] final case class GetAddress(node: RoleName) extends ServerOp with NetworkOp private[akka] final case class GetAddress(node: RoleName) extends ServerOp with NetworkOp
private[akka] final case class AddressReply(node: RoleName, addr: Address) extends UnconfirmedClientOp with NetworkOp private[akka] final case class AddressReply(node: RoleName, addr: Address) extends UnconfirmedClientOp with NetworkOp
@ -94,10 +94,12 @@ private[akka] class MsgEncoder extends OneToOneEncoder {
case DisconnectMsg(target, abort) case DisconnectMsg(target, abort)
w.setFailure(TCP.InjectFailure.newBuilder.setAddress(target) w.setFailure(TCP.InjectFailure.newBuilder.setAddress(target)
.setFailure(if (abort) TCP.FailType.Abort else TCP.FailType.Disconnect)) .setFailure(if (abort) TCP.FailType.Abort else TCP.FailType.Disconnect))
case TerminateMsg(Some(exitValue)) case TerminateMsg(Right(exitValue))
w.setFailure(TCP.InjectFailure.newBuilder.setFailure(TCP.FailType.Exit).setExitValue(exitValue)) w.setFailure(TCP.InjectFailure.newBuilder.setFailure(TCP.FailType.Exit).setExitValue(exitValue))
case TerminateMsg(None) case TerminateMsg(Left(false))
w.setFailure(TCP.InjectFailure.newBuilder.setFailure(TCP.FailType.Shutdown)) w.setFailure(TCP.InjectFailure.newBuilder.setFailure(TCP.FailType.Shutdown))
case TerminateMsg(Left(true))
w.setFailure(TCP.InjectFailure.newBuilder.setFailure(TCP.FailType.ShutdownAbrupt))
case GetAddress(node) case GetAddress(node)
w.setAddr(TCP.AddressRequest.newBuilder.setNode(node.name)) w.setAddr(TCP.AddressRequest.newBuilder.setNode(node.name))
case AddressReply(node, addr) case AddressReply(node, addr)
@ -139,11 +141,12 @@ private[akka] class MsgDecoder extends OneToOneDecoder {
val f = w.getFailure val f = w.getFailure
import TCP.{ FailType FT } import TCP.{ FailType FT }
f.getFailure match { f.getFailure match {
case FT.Throttle ThrottleMsg(f.getAddress, f.getDirection, f.getRateMBit) case FT.Throttle ThrottleMsg(f.getAddress, f.getDirection, f.getRateMBit)
case FT.Abort DisconnectMsg(f.getAddress, true) case FT.Abort DisconnectMsg(f.getAddress, true)
case FT.Disconnect DisconnectMsg(f.getAddress, false) case FT.Disconnect DisconnectMsg(f.getAddress, false)
case FT.Exit TerminateMsg(Some(f.getExitValue)) case FT.Exit TerminateMsg(Right(f.getExitValue))
case FT.Shutdown TerminateMsg(None) case FT.Shutdown TerminateMsg(Left(false))
case FT.ShutdownAbrupt TerminateMsg(Left(true))
} }
} else if (w.hasAddr) { } else if (w.hasAddr) {
val a = w.getAddr val a = w.getAddr

View file

@ -31,8 +31,9 @@ trait Player { this: TestConductorExt ⇒
private var _client: ActorRef = _ private var _client: ActorRef = _
private def client = _client match { private def client = _client match {
case null throw new IllegalStateException("TestConductor client not yet started") case null throw new IllegalStateException("TestConductor client not yet started")
case x x case _ if system.isTerminated throw new IllegalStateException("TestConductor unavailable because system is shutdown; you need to startNewSystem() before this point")
case x x
} }
/** /**
@ -239,10 +240,13 @@ private[akka] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress)
import context.dispatcher // FIXME is this the right EC for the future below? import context.dispatcher // FIXME is this the right EC for the future below?
// FIXME: Currently ignoring, needs support from Remoting // FIXME: Currently ignoring, needs support from Remoting
stay stay
case TerminateMsg(None) case TerminateMsg(Left(false))
context.system.shutdown() context.system.shutdown()
stay stay
case TerminateMsg(Some(exitValue)) case TerminateMsg(Left(true))
context.system.asInstanceOf[ActorSystemImpl].abort()
stay
case TerminateMsg(Right(exitValue))
System.exit(exitValue) System.exit(exitValue)
stay // needed because Java doesnt have Nothing stay // needed because Java doesnt have Nothing
case _: Done stay //FIXME what should happen? case _: Done stay //FIXME what should happen?

View file

@ -327,7 +327,7 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles:
* been started either in Conductor or Player mode when the constructor of * been started either in Conductor or Player mode when the constructor of
* MultiNodeSpec finishes, i.e. do not call the start*() methods yourself! * MultiNodeSpec finishes, i.e. do not call the start*() methods yourself!
*/ */
val testConductor: TestConductorExt = TestConductor(system) var testConductor: TestConductorExt = null
/** /**
* Execute the given block of code only on the given nodes (names according * Execute the given block of code only on the given nodes (names according
@ -376,52 +376,85 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles:
*/ */
private val controllerAddr = new InetSocketAddress(serverName, serverPort) private val controllerAddr = new InetSocketAddress(serverName, serverPort)
if (selfIndex == 0) {
Await.result(testConductor.startController(initialParticipants, myself, controllerAddr), protected def attachConductor(tc: TestConductorExt): Unit = {
testConductor.Settings.BarrierTimeout.duration) val timeout = tc.Settings.BarrierTimeout.duration
} else { val startFuture =
Await.result(testConductor.startClient(myself, controllerAddr), if (selfIndex == 0) tc.startController(initialParticipants, myself, controllerAddr)
testConductor.Settings.BarrierTimeout.duration) else tc.startClient(myself, controllerAddr)
try Await.result(startFuture, timeout)
catch {
case NonFatal(x) throw new RuntimeException("failure while attaching new conductor", x)
}
testConductor = tc
} }
attachConductor(TestConductor(system))
// now add deployments, if so desired // now add deployments, if so desired
private final case class Replacement(tag: String, role: RoleName) { private final case class Replacement(tag: String, role: RoleName) {
lazy val addr = node(role).address.toString lazy val addr = node(role).address.toString
} }
private val replacements = roles map (r Replacement("@" + r.name + "@", r)) private val replacements = roles map (r Replacement("@" + r.name + "@", r))
private val deployer = system.asInstanceOf[ExtendedActorSystem].provider.deployer
deployments(myself) foreach { str protected def injectDeployments(sys: ActorSystem, role: RoleName): Unit = {
val deployString = (str /: replacements) { val deployer = sys.asInstanceOf[ExtendedActorSystem].provider.deployer
case (base, r @ Replacement(tag, _)) deployments(role) foreach { str
base.indexOf(tag) match { val deployString = (str /: replacements) {
case -1 base case (base, r @ Replacement(tag, _))
case start base.indexOf(tag) match {
val replaceWith = try case -1 base
r.addr case start
catch { val replaceWith = try
case NonFatal(e) r.addr
// might happen if all test cases are ignored (excluded) and catch {
// controller node is finished/exited before r.addr is run case NonFatal(e)
// on the other nodes // might happen if all test cases are ignored (excluded) and
val unresolved = "akka://unresolved-replacement-" + r.role.name // controller node is finished/exited before r.addr is run
log.warning(unresolved + " due to: " + e.getMessage) // on the other nodes
unresolved val unresolved = "akka://unresolved-replacement-" + r.role.name
} log.warning(unresolved + " due to: " + e.getMessage)
base.replace(tag, replaceWith) unresolved
} }
} base.replace(tag, replaceWith)
import scala.collection.JavaConverters._ }
ConfigFactory.parseString(deployString).root.asScala foreach { }
case (key, value: ConfigObject) deployer.parseConfig(key, value.toConfig) foreach deployer.deploy import scala.collection.JavaConverters._
case (key, x) throw new IllegalArgumentException(s"key $key must map to deployment section, not simple value $x") ConfigFactory.parseString(deployString).root.asScala foreach {
case (key, value: ConfigObject) deployer.parseConfig(key, value.toConfig) foreach deployer.deploy
case (key, x) throw new IllegalArgumentException(s"key $key must map to deployment section, not simple value $x")
}
} }
} }
// useful to see which jvm is running which role, used by LogRoleReplace utility injectDeployments(system, myself)
log.info("Role [{}] started with address [{}]", myself.name,
system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport.defaultAddress)
protected val myAddress = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
// useful to see which jvm is running which role, used by LogRoleReplace utility
log.info("Role [{}] started with address [{}]", myself.name, myAddress)
/**
* This method starts a new ActorSystem with the same configuration as the
* previous one on the current node, including deployments. It also creates
* a new TestConductor client and registers itself with the conductor so
* that it is possible to use barriers etc. normally after this method has
* been called.
*
* NOTICE: you MUST start a new system before trying to enter a barrier or
* otherwise using the TestConductor after having terminated this nodes
* system.
*/
protected def startNewSystem(): ActorSystem = {
val config = ConfigFactory.parseString(s"akka.remote.netty.tcp{port=${myAddress.port.get}\nhostname=${myAddress.host.get}}")
.withFallback(system.settings.config)
val sys = ActorSystem(system.name, config)
injectDeployments(sys, myself)
attachConductor(TestConductor(sys))
sys
}
} }
/** /**

View file

@ -630,7 +630,7 @@ trait TestKitBase {
for { x 1 to n } yield { for { x 1 to n } yield {
val timeout = stop - now val timeout = stop - now
val o = receiveOne(timeout) val o = receiveOne(timeout)
assert(o ne null, s"timeout ($max) while expecting $n messages") assert(o ne null, s"timeout ($max) while expecting $n messages (got ${x - 1})")
o o
} }
} }