Merge pull request #2090 from akka/wip-remoting-multi-fix-∂π

Wip remoting multi fix ∂π (broken commits apart properly)
This commit is contained in:
Roland Kuhn 2014-03-22 15:19:45 +01:00
commit bf2bf24c4a
19 changed files with 460 additions and 132 deletions

View file

@ -564,6 +564,7 @@ private[akka] class VirtualPathContainer(
// this can happen from RemoteSystemDaemon if a new child is created
// before the old is removed from RemoteSystemDaemon children
log.debug("{} replacing child {} ({} -> {})", path, name, old, ref)
old.stop()
}
}

View file

@ -648,6 +648,19 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
guardian.stop()
}
@volatile var aborting = false
/**
* This kind of shutdown attempts to bring the system down and release its
* resources more forcefully than plain shutdown. For example it will not
* wait for remote-deployed child actors to terminate before terminating their
* parents.
*/
def abort(): Unit = {
aborting = true
shutdown()
}
//#create-scheduler
/**
* Create the scheduler service. This one needs one special behavior: if

View file

@ -16,6 +16,7 @@ import scala.collection.immutable
import scala.concurrent.duration.Duration
import scala.util.control.Exception._
import scala.util.control.NonFatal
import akka.actor.ActorRefScope
private[akka] trait FaultHandling { this: ActorCell
@ -148,6 +149,14 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
// stop all children, which will turn childrenRefs into TerminatingChildrenContainer (if there are children)
children foreach stop
if (systemImpl.aborting) {
// separate iteration because this is a very rare case that should not penalize normal operation
children foreach {
case ref: ActorRefScope if !ref.isLocal self.sendSystemMessage(DeathWatchNotification(ref, true, false))
case _
}
}
val wasTerminating = isTerminating
if (setChildrenTerminationReason(ChildrenContainer.Termination)) {

View file

@ -26,27 +26,37 @@ object LoggingReceive {
* This method does NOT modify the given Receive unless
* `akka.actor.debug.receive` is set in configuration.
*/
def apply(r: Receive)(implicit context: ActorContext): Receive = r match {
case _: LoggingReceive r
case _ if (context.system.settings.AddLoggingReceive) new LoggingReceive(None, r) else r
}
def apply(r: Receive)(implicit context: ActorContext): Receive = withLabel(null)(r)
/**
* Java API: compatible with lambda expressions
* This is an EXPERIMENTAL feature and is subject to change until it has received more real world testing.
*/
def create(r: Receive, context: ActorContext): Receive = apply(r)(context)
/**
* Create a decorated logger which will append `" in state " + label` to each message it logs.
*/
def withLabel(label: String)(r: Receive)(implicit context: ActorContext): Receive = r match {
case _: LoggingReceive r
case _ if (context.system.settings.AddLoggingReceive) new LoggingReceive(None, r, Option(label)) else r
}
}
/**
* This decorator adds invocation logging to a Receive function.
* @param source the log source, if not defined the actor of the context will be used
*/
class LoggingReceive(source: Option[AnyRef], r: Receive)(implicit context: ActorContext) extends Receive {
class LoggingReceive(source: Option[AnyRef], r: Receive, label: Option[String])(implicit context: ActorContext) extends Receive {
def this(source: Option[AnyRef], r: Receive)(implicit context: ActorContext) = this(source, r, None)
def isDefinedAt(o: Any): Boolean = {
val handled = r.isDefinedAt(o)
val (str, clazz) = LogSource.fromAnyRef(source getOrElse context.asInstanceOf[ActorCell].actor)
context.system.eventStream.publish(Debug(str, clazz, "received " + (if (handled) "handled" else "unhandled") + " message " + o))
context.system.eventStream.publish(Debug(str, clazz, "received " + (if (handled) "handled" else "unhandled") + " message " + o
+ (label match {
case Some(l) " in state " + l
case _ ""
})))
handled
}
def apply(o: Any): Unit = r(o)

View file

@ -20,6 +20,7 @@ import akka.actor.ExtendedActorSystem
import akka.remote.RemoteActorRefProvider
import akka.actor.ActorRef
import akka.dispatch.sysmsg.Failed
import akka.actor.PoisonPill
object SurviveNetworkInstabilityMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
@ -113,10 +114,13 @@ abstract class SurviveNetworkInstabilitySpec
runOn(alive: _*) {
for (to alive) {
val sel = system.actorSelection(node(to) / "user" / "echo")
val msg = s"ping-$to"
val p = TestProbe()
awaitAssert {
sel ! "ping"
expectMsg(1.second, "ping")
sel.tell(msg, p.ref)
p.expectMsg(1.second, msg)
}
p.ref ! PoisonPill
}
}
enterBarrier("ping-ok")
@ -269,7 +273,10 @@ abstract class SurviveNetworkInstabilitySpec
for (_ 1 to sysMsgBufferSize + 1) {
// remote deployment to third
parent ! Props[RemoteChild]
val child = expectMsgType[ActorRef]
val child = receiveOne(remainingOrDefault) match {
case a: ActorRef a
case other fail(s"expected ActorRef, got $other")
}
child ! "hello"
expectMsg("hello")
lastSender.path.address should be(address(third))

View file

@ -74,7 +74,10 @@ object TestkitDocSpec {
//#logging-receive
import akka.event.LoggingReceive
def receive = LoggingReceive {
case msg => // Do something...
case msg => // Do something ...
}
def otherState: Receive = LoggingReceive.withLabel("other") {
case msg => // Do something else ...
}
//#logging-receive
}

View file

@ -1,5 +1,5 @@
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: TestConductorProtocol.proto
// source: protobuf/TestConductorProtocol.proto
package akka.remote.testconductor;
@ -133,6 +133,10 @@ public final class TestConductorProtocol {
* <code>Shutdown = 5;</code>
*/
Shutdown(4, 5),
/**
* <code>ShutdownAbrupt = 6;</code>
*/
ShutdownAbrupt(5, 6),
;
/**
@ -155,6 +159,10 @@ public final class TestConductorProtocol {
* <code>Shutdown = 5;</code>
*/
public static final int Shutdown_VALUE = 5;
/**
* <code>ShutdownAbrupt = 6;</code>
*/
public static final int ShutdownAbrupt_VALUE = 6;
public final int getNumber() { return value; }
@ -166,6 +174,7 @@ public final class TestConductorProtocol {
case 3: return Abort;
case 4: return Exit;
case 5: return Shutdown;
case 6: return ShutdownAbrupt;
default: return null;
}
}
@ -5427,26 +5436,27 @@ public final class TestConductorProtocol {
descriptor;
static {
java.lang.String[] descriptorData = {
"\n\033TestConductorProtocol.proto\"\216\001\n\007Wrappe" +
"r\022\025\n\005hello\030\001 \001(\0132\006.Hello\022\036\n\007barrier\030\002 \001(" +
"\0132\r.EnterBarrier\022\037\n\007failure\030\003 \001(\0132\016.Inje" +
"ctFailure\022\014\n\004done\030\004 \001(\t\022\035\n\004addr\030\005 \001(\0132\017." +
"AddressRequest\"0\n\005Hello\022\014\n\004name\030\001 \002(\t\022\031\n" +
"\007address\030\002 \002(\0132\010.Address\"E\n\014EnterBarrier" +
"\022\014\n\004name\030\001 \002(\t\022\026\n\002op\030\002 \002(\0162\n.BarrierOp\022\017" +
"\n\007timeout\030\003 \001(\003\"6\n\016AddressRequest\022\014\n\004nod" +
"e\030\001 \002(\t\022\026\n\004addr\030\002 \001(\0132\010.Address\"G\n\007Addre" +
"ss\022\020\n\010protocol\030\001 \002(\t\022\016\n\006system\030\002 \002(\t\022\014\n\004",
"host\030\003 \002(\t\022\014\n\004port\030\004 \002(\005\"\212\001\n\rInjectFailu" +
"re\022\032\n\007failure\030\001 \002(\0162\t.FailType\022\035\n\tdirect" +
"ion\030\002 \001(\0162\n.Direction\022\031\n\007address\030\003 \001(\0132\010" +
".Address\022\020\n\010rateMBit\030\006 \001(\002\022\021\n\texitValue\030" +
"\007 \001(\005*;\n\tBarrierOp\022\t\n\005Enter\020\001\022\010\n\004Fail\020\002\022" +
"\r\n\tSucceeded\020\003\022\n\n\006Failed\020\004*K\n\010FailType\022\014" +
"\n\010Throttle\020\001\022\016\n\nDisconnect\020\002\022\t\n\005Abort\020\003\022" +
"\010\n\004Exit\020\004\022\014\n\010Shutdown\020\005*,\n\tDirection\022\010\n\004" +
"Send\020\001\022\013\n\007Receive\020\002\022\010\n\004Both\020\003B\035\n\031akka.re" +
"mote.testconductorH\001"
"\n$protobuf/TestConductorProtocol.proto\"\216" +
"\001\n\007Wrapper\022\025\n\005hello\030\001 \001(\0132\006.Hello\022\036\n\007bar" +
"rier\030\002 \001(\0132\r.EnterBarrier\022\037\n\007failure\030\003 \001" +
"(\0132\016.InjectFailure\022\014\n\004done\030\004 \001(\t\022\035\n\004addr" +
"\030\005 \001(\0132\017.AddressRequest\"0\n\005Hello\022\014\n\004name" +
"\030\001 \002(\t\022\031\n\007address\030\002 \002(\0132\010.Address\"E\n\014Ent" +
"erBarrier\022\014\n\004name\030\001 \002(\t\022\026\n\002op\030\002 \002(\0162\n.Ba" +
"rrierOp\022\017\n\007timeout\030\003 \001(\003\"6\n\016AddressReque" +
"st\022\014\n\004node\030\001 \002(\t\022\026\n\004addr\030\002 \001(\0132\010.Address" +
"\"G\n\007Address\022\020\n\010protocol\030\001 \002(\t\022\016\n\006system\030",
"\002 \002(\t\022\014\n\004host\030\003 \002(\t\022\014\n\004port\030\004 \002(\005\"\212\001\n\rIn" +
"jectFailure\022\032\n\007failure\030\001 \002(\0162\t.FailType\022" +
"\035\n\tdirection\030\002 \001(\0162\n.Direction\022\031\n\007addres" +
"s\030\003 \001(\0132\010.Address\022\020\n\010rateMBit\030\006 \001(\002\022\021\n\te" +
"xitValue\030\007 \001(\005*;\n\tBarrierOp\022\t\n\005Enter\020\001\022\010" +
"\n\004Fail\020\002\022\r\n\tSucceeded\020\003\022\n\n\006Failed\020\004*_\n\010F" +
"ailType\022\014\n\010Throttle\020\001\022\016\n\nDisconnect\020\002\022\t\n" +
"\005Abort\020\003\022\010\n\004Exit\020\004\022\014\n\010Shutdown\020\005\022\022\n\016Shut" +
"downAbrupt\020\006*,\n\tDirection\022\010\n\004Send\020\001\022\013\n\007R" +
"eceive\020\002\022\010\n\004Both\020\003B\035\n\031akka.remote.testco",
"nductorH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

View file

@ -49,6 +49,7 @@ enum FailType {
Abort = 3;
Exit = 4;
Shutdown = 5;
ShutdownAbrupt = 6;
}
enum Direction {

View file

@ -185,7 +185,7 @@ trait Conductor { this: TestConductorExt ⇒
import system.dispatcher
// the recover is needed to handle ClientDisconnectedException exception,
// which is normal during shutdown
controller ? Terminate(node, Some(exitValue)) mapTo classTag[Done] recover { case _: ClientDisconnectedException Done }
controller ? Terminate(node, Right(exitValue)) mapTo classTag[Done] recover { case _: ClientDisconnectedException Done }
}
/**
@ -194,12 +194,21 @@ trait Conductor { this: TestConductorExt ⇒
*
* @param node is the symbolic name of the node which is to be affected
*/
def shutdown(node: RoleName): Future[Done] = {
def shutdown(node: RoleName): Future[Done] = shutdown(node, abort = false)
/**
* Tell the actor system at the remote node to shut itself down without
* awaiting termination of remote-deployed children. The node will also be
* removed, so that the remaining nodes may still pass subsequent barriers.
*
* @param node is the symbolic name of the node which is to be affected
*/
def shutdown(node: RoleName, abort: Boolean): Future[Done] = {
import Settings.QueryTimeout
import system.dispatcher
// the recover is needed to handle ClientDisconnectedException exception,
// which is normal during shutdown
controller ? Terminate(node, None) mapTo classTag[Done] recover { case _: ClientDisconnectedException Done }
controller ? Terminate(node, Left(abort)) mapTo classTag[Done] recover { case _: ClientDisconnectedException Done }
}
/**
@ -455,9 +464,10 @@ private[akka] class Controller(private var initialParticipants: Int, controllerP
case Disconnect(node, target, abort)
val t = nodes(target)
nodes(node).fsm forward ToClient(DisconnectMsg(t.addr, abort))
case Terminate(node, exitValue)
case Terminate(node, shutdownOrExit)
barrier ! BarrierCoordinator.RemoveClient(node)
nodes(node).fsm forward ToClient(TerminateMsg(exitValue))
nodes(node).fsm forward ToClient(TerminateMsg(shutdownOrExit))
nodes -= node
case Remove(node)
barrier ! BarrierCoordinator.RemoveClient(node)
}

View file

@ -43,8 +43,8 @@ private[akka] final case class ThrottleMsg(target: Address, direction: Direction
private[akka] final case class Disconnect(node: RoleName, target: RoleName, abort: Boolean) extends CommandOp
private[akka] final case class DisconnectMsg(target: Address, abort: Boolean) extends ConfirmedClientOp with NetworkOp
private[akka] final case class Terminate(node: RoleName, exitValue: Option[Int]) extends CommandOp
private[akka] final case class TerminateMsg(exitValue: Option[Int]) extends ConfirmedClientOp with NetworkOp
private[akka] final case class Terminate(node: RoleName, shutdownOrExit: Either[Boolean, Int]) extends CommandOp
private[akka] final case class TerminateMsg(shutdownOrExit: Either[Boolean, Int]) extends ConfirmedClientOp with NetworkOp
private[akka] final case class GetAddress(node: RoleName) extends ServerOp with NetworkOp
private[akka] final case class AddressReply(node: RoleName, addr: Address) extends UnconfirmedClientOp with NetworkOp
@ -94,10 +94,12 @@ private[akka] class MsgEncoder extends OneToOneEncoder {
case DisconnectMsg(target, abort)
w.setFailure(TCP.InjectFailure.newBuilder.setAddress(target)
.setFailure(if (abort) TCP.FailType.Abort else TCP.FailType.Disconnect))
case TerminateMsg(Some(exitValue))
case TerminateMsg(Right(exitValue))
w.setFailure(TCP.InjectFailure.newBuilder.setFailure(TCP.FailType.Exit).setExitValue(exitValue))
case TerminateMsg(None)
case TerminateMsg(Left(false))
w.setFailure(TCP.InjectFailure.newBuilder.setFailure(TCP.FailType.Shutdown))
case TerminateMsg(Left(true))
w.setFailure(TCP.InjectFailure.newBuilder.setFailure(TCP.FailType.ShutdownAbrupt))
case GetAddress(node)
w.setAddr(TCP.AddressRequest.newBuilder.setNode(node.name))
case AddressReply(node, addr)
@ -139,11 +141,12 @@ private[akka] class MsgDecoder extends OneToOneDecoder {
val f = w.getFailure
import TCP.{ FailType FT }
f.getFailure match {
case FT.Throttle ThrottleMsg(f.getAddress, f.getDirection, f.getRateMBit)
case FT.Abort DisconnectMsg(f.getAddress, true)
case FT.Disconnect DisconnectMsg(f.getAddress, false)
case FT.Exit TerminateMsg(Some(f.getExitValue))
case FT.Shutdown TerminateMsg(None)
case FT.Throttle ThrottleMsg(f.getAddress, f.getDirection, f.getRateMBit)
case FT.Abort DisconnectMsg(f.getAddress, true)
case FT.Disconnect DisconnectMsg(f.getAddress, false)
case FT.Exit TerminateMsg(Right(f.getExitValue))
case FT.Shutdown TerminateMsg(Left(false))
case FT.ShutdownAbrupt TerminateMsg(Left(true))
}
} else if (w.hasAddr) {
val a = w.getAddr

View file

@ -31,8 +31,9 @@ trait Player { this: TestConductorExt ⇒
private var _client: ActorRef = _
private def client = _client match {
case null throw new IllegalStateException("TestConductor client not yet started")
case x x
case null throw new IllegalStateException("TestConductor client not yet started")
case _ if system.isTerminated throw new IllegalStateException("TestConductor unavailable because system is shutdown; you need to startNewSystem() before this point")
case x x
}
/**
@ -239,10 +240,13 @@ private[akka] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress)
import context.dispatcher // FIXME is this the right EC for the future below?
// FIXME: Currently ignoring, needs support from Remoting
stay
case TerminateMsg(None)
case TerminateMsg(Left(false))
context.system.shutdown()
stay
case TerminateMsg(Some(exitValue))
case TerminateMsg(Left(true))
context.system.asInstanceOf[ActorSystemImpl].abort()
stay
case TerminateMsg(Right(exitValue))
System.exit(exitValue)
stay // needed because Java doesnt have Nothing
case _: Done stay //FIXME what should happen?

View file

@ -327,7 +327,7 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles:
* been started either in Conductor or Player mode when the constructor of
* MultiNodeSpec finishes, i.e. do not call the start*() methods yourself!
*/
val testConductor: TestConductorExt = TestConductor(system)
var testConductor: TestConductorExt = null
/**
* Execute the given block of code only on the given nodes (names according
@ -376,52 +376,85 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles:
*/
private val controllerAddr = new InetSocketAddress(serverName, serverPort)
if (selfIndex == 0) {
Await.result(testConductor.startController(initialParticipants, myself, controllerAddr),
testConductor.Settings.BarrierTimeout.duration)
} else {
Await.result(testConductor.startClient(myself, controllerAddr),
testConductor.Settings.BarrierTimeout.duration)
protected def attachConductor(tc: TestConductorExt): Unit = {
val timeout = tc.Settings.BarrierTimeout.duration
val startFuture =
if (selfIndex == 0) tc.startController(initialParticipants, myself, controllerAddr)
else tc.startClient(myself, controllerAddr)
try Await.result(startFuture, timeout)
catch {
case NonFatal(x) throw new RuntimeException("failure while attaching new conductor", x)
}
testConductor = tc
}
attachConductor(TestConductor(system))
// now add deployments, if so desired
private final case class Replacement(tag: String, role: RoleName) {
lazy val addr = node(role).address.toString
}
private val replacements = roles map (r Replacement("@" + r.name + "@", r))
private val deployer = system.asInstanceOf[ExtendedActorSystem].provider.deployer
deployments(myself) foreach { str
val deployString = (str /: replacements) {
case (base, r @ Replacement(tag, _))
base.indexOf(tag) match {
case -1 base
case start
val replaceWith = try
r.addr
catch {
case NonFatal(e)
// might happen if all test cases are ignored (excluded) and
// controller node is finished/exited before r.addr is run
// on the other nodes
val unresolved = "akka://unresolved-replacement-" + r.role.name
log.warning(unresolved + " due to: " + e.getMessage)
unresolved
}
base.replace(tag, replaceWith)
}
}
import scala.collection.JavaConverters._
ConfigFactory.parseString(deployString).root.asScala foreach {
case (key, value: ConfigObject) deployer.parseConfig(key, value.toConfig) foreach deployer.deploy
case (key, x) throw new IllegalArgumentException(s"key $key must map to deployment section, not simple value $x")
protected def injectDeployments(sys: ActorSystem, role: RoleName): Unit = {
val deployer = sys.asInstanceOf[ExtendedActorSystem].provider.deployer
deployments(role) foreach { str
val deployString = (str /: replacements) {
case (base, r @ Replacement(tag, _))
base.indexOf(tag) match {
case -1 base
case start
val replaceWith = try
r.addr
catch {
case NonFatal(e)
// might happen if all test cases are ignored (excluded) and
// controller node is finished/exited before r.addr is run
// on the other nodes
val unresolved = "akka://unresolved-replacement-" + r.role.name
log.warning(unresolved + " due to: " + e.getMessage)
unresolved
}
base.replace(tag, replaceWith)
}
}
import scala.collection.JavaConverters._
ConfigFactory.parseString(deployString).root.asScala foreach {
case (key, value: ConfigObject) deployer.parseConfig(key, value.toConfig) foreach deployer.deploy
case (key, x) throw new IllegalArgumentException(s"key $key must map to deployment section, not simple value $x")
}
}
}
// useful to see which jvm is running which role, used by LogRoleReplace utility
log.info("Role [{}] started with address [{}]", myself.name,
system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport.defaultAddress)
injectDeployments(system, myself)
protected val myAddress = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
// useful to see which jvm is running which role, used by LogRoleReplace utility
log.info("Role [{}] started with address [{}]", myself.name, myAddress)
/**
* This method starts a new ActorSystem with the same configuration as the
* previous one on the current node, including deployments. It also creates
* a new TestConductor client and registers itself with the conductor so
* that it is possible to use barriers etc. normally after this method has
* been called.
*
* NOTICE: you MUST start a new system before trying to enter a barrier or
* otherwise using the TestConductor after having terminated this nodes
* system.
*/
protected def startNewSystem(): ActorSystem = {
val config = ConfigFactory.parseString(s"akka.remote.netty.tcp{port=${myAddress.port.get}\nhostname=${myAddress.host.get}}")
.withFallback(system.settings.config)
val sys = ActorSystem(system.name, config)
injectDeployments(sys, myself)
attachConductor(TestConductor(sys))
sys
}
}
/**

View file

@ -0,0 +1,156 @@
package akka.remote
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.testkit.ImplicitSender
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.remote.transport.ThrottlerTransportAdapter.Direction._
import com.typesafe.config.ConfigFactory
import akka.actor.ActorSystem
import scala.concurrent.duration._
import akka.actor.ActorLogging
import akka.remote.testconductor.TestConductor
import akka.testkit.TestProbe
object RemoteReDeploymentMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(
"""akka.remote.transport-failure-detector {
threshold=0.1
heartbeat-interval=0.1s
acceptable-heartbeat-pause=1s
}
akka.remote.watch-failure-detector {
threshold=0.1
heartbeat-interval=0.1s
acceptable-heartbeat-pause=2.5s
}""")))
testTransport(on = true)
deployOn(second, "/parent/hello.remote = \"@first@\"")
class Parent extends Actor {
val monitor = context.actorSelection("/user/echo")
def receive = {
case (p: Props, n: String) context.actorOf(p, n)
case msg monitor ! msg
}
}
class Hello extends Actor {
val monitor = context.actorSelection("/user/echo")
context.parent ! "HelloParent"
override def preStart(): Unit = monitor ! "PreStart"
override def postStop(): Unit = monitor ! "PostStop"
def receive = Actor.emptyBehavior
}
class Echo(target: ActorRef) extends Actor with ActorLogging {
def receive = {
case msg
log.info(s"received $msg from $sender")
target ! msg
}
}
def echoProps(target: ActorRef) = Props(new Echo(target))
}
class RemoteReDeploymentFastMultiJvmNode1 extends RemoteReDeploymentFastMultiJvmSpec
class RemoteReDeploymentFastMultiJvmNode2 extends RemoteReDeploymentFastMultiJvmSpec
abstract class RemoteReDeploymentFastMultiJvmSpec extends RemoteReDeploymentMultiJvmSpec {
override def sleepAfterKill = 0.seconds // new association will come in while old is still healthy
override def expectQuarantine = false
}
class RemoteReDeploymentMediumMultiJvmNode1 extends RemoteReDeploymentMediumMultiJvmSpec
class RemoteReDeploymentMediumMultiJvmNode2 extends RemoteReDeploymentMediumMultiJvmSpec
abstract class RemoteReDeploymentMediumMultiJvmSpec extends RemoteReDeploymentMultiJvmSpec {
override def sleepAfterKill = 1.seconds // new association will come in while old is gated in ReliableDeliverySupervisor
override def expectQuarantine = false
}
class RemoteReDeploymentSlowMultiJvmNode1 extends RemoteReDeploymentSlowMultiJvmSpec
class RemoteReDeploymentSlowMultiJvmNode2 extends RemoteReDeploymentSlowMultiJvmSpec
abstract class RemoteReDeploymentSlowMultiJvmSpec extends RemoteReDeploymentMultiJvmSpec {
override def sleepAfterKill = 10.seconds // new association will come in after old has been quarantined
override def expectQuarantine = true
}
abstract class RemoteReDeploymentMultiJvmSpec extends MultiNodeSpec(RemoteReDeploymentMultiJvmSpec)
with STMultiNodeSpec with ImplicitSender {
def sleepAfterKill: FiniteDuration
def expectQuarantine: Boolean
def initialParticipants = roles.size
import RemoteReDeploymentMultiJvmSpec._
"A remote deployment target system" must {
"terminate the child when its parent system is replaced by a new one" in {
val echo = system.actorOf(echoProps(testActor), "echo")
val address = node(second).address
runOn(second) {
system.actorOf(Props[Parent], "parent") ! ((Props[Hello], "hello"))
expectMsg("HelloParent")
}
runOn(first) {
expectMsg("PreStart")
}
enterBarrier("first-deployed")
runOn(first) {
testConductor.blackhole(second, first, Both).await
testConductor.shutdown(second, abort = true).await
if (expectQuarantine)
within(sleepAfterKill) {
expectMsg("PostStop")
expectNoMsg()
}
else expectNoMsg(sleepAfterKill)
awaitAssert(node(second), 10.seconds, 100.millis)
}
var sys: ActorSystem = null
runOn(second) {
system.awaitTermination(30.seconds)
expectNoMsg(sleepAfterKill)
sys = startNewSystem()
}
enterBarrier("cable-cut")
runOn(second) {
val p = TestProbe()(sys)
sys.actorOf(echoProps(p.ref), "echo")
p.send(sys.actorOf(Props[Parent], "parent"), (Props[Hello], "hello"))
p.expectMsg("HelloParent")
}
enterBarrier("re-deployed")
runOn(first) {
if (expectQuarantine) expectMsg("PreStart")
else expectMsgAllOf("PostStop", "PreStart")
}
enterBarrier("the-end")
expectNoMsg(1.second)
}
}
}

View file

@ -306,7 +306,8 @@ private[remote] class ReliableDeliverySupervisor(
if (resendBuffer.nonAcked.nonEmpty || resendBuffer.nacked.nonEmpty)
context.system.scheduler.scheduleOnce(settings.SysResendTimeout, self, AttemptSysMsgRedelivery)
context.become(idle)
case GotUid(receivedUid)
case g @ GotUid(receivedUid)
context.parent ! g
// New system that has the same address as the old - need to start from fresh state
uidConfirmed = true
if (uid.exists(_ != receivedUid)) reset()

View file

@ -21,6 +21,8 @@ import akka.actor.Identify
import akka.actor.ActorIdentity
import akka.actor.EmptyLocalActorRef
import akka.event.AddressTerminatedTopic
import java.util.concurrent.ConcurrentHashMap
import akka.dispatch.sysmsg.Unwatch
/**
* INTERNAL API
@ -55,6 +57,33 @@ private[akka] class RemoteSystemDaemon(
AddressTerminatedTopic(system).subscribe(this)
private val parent2children = new ConcurrentHashMap[ActorRef, Set[ActorRef]]
@tailrec private def addChildParentNeedsWatch(parent: ActorRef, child: ActorRef): Boolean =
parent2children.get(parent) match {
case null
if (parent2children.putIfAbsent(parent, Set(child)) == null) true
else addChildParentNeedsWatch(parent, child)
case children
if (parent2children.replace(parent, children, children + child)) false
else addChildParentNeedsWatch(parent, child)
}
@tailrec private def removeChildParentNeedsUnwatch(parent: ActorRef, child: ActorRef): Boolean = {
parent2children.get(parent) match {
case null false // no-op
case children
val next = children - child
if (next.isEmpty) {
if (!parent2children.remove(parent, children)) removeChildParentNeedsUnwatch(parent, child)
else true
} else {
if (!parent2children.replace(parent, children, next)) removeChildParentNeedsUnwatch(parent, child)
else false
}
}
}
/**
* Find the longest matching path which we know about and return that ref
* (or ask that ref to continue searching if elements are left).
@ -87,8 +116,22 @@ private[akka] class RemoteSystemDaemon(
case DeathWatchNotification(child: ActorRefWithCell with ActorRefScope, _, _) if child.isLocal
terminating.locked {
removeChild(child.path.elements.drop(1).mkString("/"), child)
val parent = child.getParent
if (removeChildParentNeedsUnwatch(parent, child)) parent.sendSystemMessage(Unwatch(parent, this))
terminationHookDoneWhenNoChildren()
}
case DeathWatchNotification(parent: ActorRef with ActorRefScope, _, _) if !parent.isLocal
terminating.locked {
parent2children.remove(parent) match {
case null
case children
for (c children) {
system.stop(c)
removeChild(c.path.elements.drop(1).mkString("/"), c)
}
terminationHookDoneWhenNoChildren()
}
}
case _ super.sendSystemMessage(message)
}
@ -111,11 +154,13 @@ private[akka] class RemoteSystemDaemon(
else s.substring(0, i)
}
val isTerminating = !terminating.whileOff {
val actor = system.provider.actorOf(system, props, supervisor.asInstanceOf[InternalActorRef],
val parent = supervisor.asInstanceOf[InternalActorRef]
val actor = system.provider.actorOf(system, props, parent,
p, systemService = false, Some(deploy), lookupDeploy = true, async = false)
addChild(childName, actor)
actor.sendSystemMessage(Watch(actor, this))
actor.start()
if (addChildParentNeedsWatch(parent, actor)) parent.sendSystemMessage(Watch(parent, this))
}
if (isTerminating) log.error("Skipping [{}] to RemoteSystemDaemon on [{}] while terminating", message, p.address)
case _

View file

@ -265,7 +265,7 @@ private[remote] object EndpointManager {
*/
def isTombstone: Boolean
}
final case class Pass(endpoint: ActorRef) extends EndpointPolicy {
final case class Pass(endpoint: ActorRef, uid: Option[Int]) extends EndpointPolicy {
override def isTombstone: Boolean = false
}
final case class Gated(timeOfRelease: Deadline) extends EndpointPolicy {
@ -282,15 +282,23 @@ private[remote] object EndpointManager {
private var addressToReadonly = HashMap[Address, ActorRef]()
private var readonlyToAddress = HashMap[ActorRef, Address]()
def registerWritableEndpoint(address: Address, endpoint: ActorRef): ActorRef = addressToWritable.get(address) match {
case Some(Pass(e))
def registerWritableEndpoint(address: Address, uid: Option[Int], endpoint: ActorRef): ActorRef = addressToWritable.get(address) match {
case Some(Pass(e, _))
throw new IllegalArgumentException(s"Attempting to overwrite existing endpoint [$e] with [$endpoint]")
case _
addressToWritable += address -> Pass(endpoint)
addressToWritable += address -> Pass(endpoint, uid)
writableToAddress += endpoint -> address
endpoint
}
def registerWritableEndpointUid(writer: ActorRef, uid: Int): Unit = {
val address = writableToAddress(writer)
addressToWritable.get(address) match {
case Some(Pass(ep, _)) addressToWritable += address -> Pass(ep, Some(uid))
case other // the GotUid might have lost the race with some failure
}
}
def registerReadOnlyEndpoint(address: Address, endpoint: ActorRef): ActorRef = {
addressToReadonly += address -> endpoint
readonlyToAddress += endpoint -> address
@ -313,8 +321,8 @@ private[remote] object EndpointManager {
def writableEndpointWithPolicyFor(address: Address): Option[EndpointPolicy] = addressToWritable.get(address)
def hasWritableEndpointFor(address: Address): Boolean = writableEndpointWithPolicyFor(address) match {
case Some(Pass(_)) true
case _ false
case Some(Pass(_, _)) true
case _ false
}
def readOnlyEndpointFor(address: Address): Option[ActorRef] = addressToReadonly.get(address)
@ -387,6 +395,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
else None
var pendingReadHandoffs = Map[ActorRef, AkkaProtocolHandle]()
var stashedInbound = Map[ActorRef, Vector[InboundAssociation]]()
override val supervisorStrategy =
OneForOneStrategy(loggingEnabled = false) {
@ -479,7 +488,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
case Quarantine(address, uidOption)
// Stop writers
endpoints.writableEndpointWithPolicyFor(address) match {
case Some(Pass(endpoint))
case Some(Pass(endpoint, _))
context.stop(endpoint)
if (uidOption.isEmpty) {
log.warning("Association to [{}] with unknown UID is reported as quarantined, but " +
@ -505,6 +514,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
def createAndRegisterWritingEndpoint(refuseUid: Option[Int]): ActorRef =
endpoints.registerWritableEndpoint(
recipientAddress,
None,
createEndpoint(
recipientAddress,
recipientRef.localAddressToUse,
@ -515,7 +525,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
refuseUid))
endpoints.writableEndpointWithPolicyFor(recipientAddress) match {
case Some(Pass(endpoint))
case Some(Pass(endpoint, _))
endpoint ! s
case Some(Gated(timeOfRelease))
if (timeOfRelease.isOverdue()) createAndRegisterWritingEndpoint(refuseUid = None) ! s
@ -529,7 +539,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
}
case InboundAssociation(handle: AkkaProtocolHandle) endpoints.readOnlyEndpointFor(handle.remoteAddress) match {
case ia @ InboundAssociation(handle: AkkaProtocolHandle) endpoints.readOnlyEndpointFor(handle.remoteAddress) match {
case Some(endpoint)
pendingReadHandoffs.get(endpoint) foreach (_.disassociate())
pendingReadHandoffs += endpoint -> handle
@ -538,33 +548,21 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
if (endpoints.isQuarantined(handle.remoteAddress, handle.handshakeInfo.uid))
handle.disassociate(AssociationHandle.Quarantined)
else endpoints.writableEndpointWithPolicyFor(handle.remoteAddress) match {
case Some(Pass(ep))
pendingReadHandoffs.get(ep) foreach (_.disassociate())
pendingReadHandoffs += ep -> handle
ep ! EndpointWriter.StopReading(ep)
case _
val writing = settings.UsePassiveConnections && !endpoints.hasWritableEndpointFor(handle.remoteAddress)
eventPublisher.notifyListeners(AssociatedEvent(handle.localAddress, handle.remoteAddress, inbound = true))
val endpoint = createEndpoint(
handle.remoteAddress,
handle.localAddress,
transportMapping(handle.localAddress),
settings,
Some(handle),
writing,
refuseUid = None)
if (writing)
endpoints.registerWritableEndpoint(handle.remoteAddress, endpoint)
else {
endpoints.registerReadOnlyEndpoint(handle.remoteAddress, endpoint)
endpoints.writableEndpointWithPolicyFor(handle.remoteAddress) match {
case Some(Pass(_)) // Leave it alone
case _
// Since we just communicated with the guy we can lift gate, quarantine, etc. New writer will be
// opened at first write.
endpoints.removePolicy(handle.remoteAddress)
}
case Some(Pass(ep, None))
stashedInbound += ep -> (stashedInbound.getOrElse(ep, Vector.empty) :+ ia)
case Some(Pass(ep, Some(uid)))
if (handle.handshakeInfo.uid == uid) {
pendingReadHandoffs.get(ep) foreach (_.disassociate())
pendingReadHandoffs += ep -> handle
ep ! EndpointWriter.StopReading(ep)
} else {
context.stop(ep)
endpoints.unregisterEndpoint(ep)
pendingReadHandoffs -= ep
createAndRegisterEndpoint(handle, Some(uid))
}
case state
createAndRegisterEndpoint(handle, None)
}
}
case EndpointWriter.StoppedReading(endpoint)
@ -572,8 +570,13 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
case Terminated(endpoint)
acceptPendingReader(takingOverFrom = endpoint)
endpoints.unregisterEndpoint(endpoint)
stashedInbound -= endpoint
case EndpointWriter.TookOver(endpoint, handle)
removePendingReader(takingOverFrom = endpoint, withHandle = handle)
case ReliableDeliverySupervisor.GotUid(uid)
endpoints.registerWritableEndpointUid(sender, uid)
stashedInbound.getOrElse(sender, Vector.empty) foreach (self ! _)
stashedInbound -= sender
case Prune
endpoints.prune()
case ShutdownAndFlush
@ -604,6 +607,25 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
case Terminated(_) // why should we care now?
}
private def createAndRegisterEndpoint(handle: AkkaProtocolHandle, refuseUid: Option[Int]): Unit = {
val writing = settings.UsePassiveConnections && !endpoints.hasWritableEndpointFor(handle.remoteAddress)
eventPublisher.notifyListeners(AssociatedEvent(handle.localAddress, handle.remoteAddress, inbound = true))
val endpoint = createEndpoint(
handle.remoteAddress,
handle.localAddress,
transportMapping(handle.localAddress),
settings,
Some(handle),
writing,
refuseUid = refuseUid)
if (writing)
endpoints.registerWritableEndpoint(handle.remoteAddress, Some(handle.handshakeInfo.uid), endpoint)
else {
endpoints.registerReadOnlyEndpoint(handle.remoteAddress, endpoint)
endpoints.removePolicy(handle.remoteAddress)
}
}
private def listens: Future[Seq[(AkkaProtocolTransport, Address, Promise[AssociationEventListener])]] = {
/*
* Constructs chains of adapters on top of each driver as given in configuration. The resulting structure looks

View file

@ -453,7 +453,7 @@ private[transport] class ThrottledAssociation(
sender() ! SetThrottleAck
stay()
case Event(Disassociated(info), _)
stop()
stop() // not notifying the upstream handler is intentional: we are relying on heartbeating
}
// This method captures ASSOCIATE packets and extracts the origin address

View file

@ -20,9 +20,9 @@ class EndpointRegistrySpec extends AkkaSpec {
reg.writableEndpointWithPolicyFor(address1) should be(None)
reg.registerWritableEndpoint(address1, actorA) should be(actorA)
reg.registerWritableEndpoint(address1, None, actorA) should be(actorA)
reg.writableEndpointWithPolicyFor(address1) should be(Some(Pass(actorA)))
reg.writableEndpointWithPolicyFor(address1) should be(Some(Pass(actorA, None)))
reg.readOnlyEndpointFor(address1) should be(None)
reg.isWritable(actorA) should be(true)
reg.isReadOnly(actorA) should be(false)
@ -49,10 +49,10 @@ class EndpointRegistrySpec extends AkkaSpec {
reg.writableEndpointWithPolicyFor(address1) should be(None)
reg.registerReadOnlyEndpoint(address1, actorA) should be(actorA)
reg.registerWritableEndpoint(address1, actorB) should be(actorB)
reg.registerWritableEndpoint(address1, None, actorB) should be(actorB)
reg.readOnlyEndpointFor(address1) should be(Some(actorA))
reg.writableEndpointWithPolicyFor(address1) should be(Some(Pass(actorB)))
reg.writableEndpointWithPolicyFor(address1) should be(Some(Pass(actorB, None)))
reg.isWritable(actorA) should be(false)
reg.isWritable(actorB) should be(true)
@ -66,7 +66,7 @@ class EndpointRegistrySpec extends AkkaSpec {
val reg = new EndpointRegistry
reg.writableEndpointWithPolicyFor(address1) should be(None)
reg.registerWritableEndpoint(address1, actorA)
reg.registerWritableEndpoint(address1, None, actorA)
val deadline = Deadline.now
reg.markAsFailed(actorA, deadline)
reg.writableEndpointWithPolicyFor(address1) should be(Some(Gated(deadline)))
@ -85,8 +85,8 @@ class EndpointRegistrySpec extends AkkaSpec {
"keep tombstones when removing an endpoint" in {
val reg = new EndpointRegistry
reg.registerWritableEndpoint(address1, actorA)
reg.registerWritableEndpoint(address2, actorB)
reg.registerWritableEndpoint(address1, None, actorA)
reg.registerWritableEndpoint(address2, None, actorB)
val deadline = Deadline.now
reg.markAsFailed(actorA, deadline)
reg.markAsQuarantined(address2, 42, deadline)
@ -102,8 +102,8 @@ class EndpointRegistrySpec extends AkkaSpec {
"prune outdated Gated directives properly" in {
val reg = new EndpointRegistry
reg.registerWritableEndpoint(address1, actorA)
reg.registerWritableEndpoint(address2, actorB)
reg.registerWritableEndpoint(address1, None, actorA)
reg.registerWritableEndpoint(address2, None, actorB)
reg.markAsFailed(actorA, Deadline.now)
val farInTheFuture = Deadline.now + Duration(60, SECONDS)
reg.markAsFailed(actorB, farInTheFuture)

View file

@ -630,7 +630,7 @@ trait TestKitBase {
for { x 1 to n } yield {
val timeout = stop - now
val o = receiveOne(timeout)
assert(o ne null, s"timeout ($max) while expecting $n messages")
assert(o ne null, s"timeout ($max) while expecting $n messages (got ${x - 1})")
o
}
}