diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
index 3d1d06f3a1..0e03c92bc5 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
@@ -564,6 +564,7 @@ private[akka] class VirtualPathContainer(
// this can happen from RemoteSystemDaemon if a new child is created
// before the old is removed from RemoteSystemDaemon children
log.debug("{} replacing child {} ({} -> {})", path, name, old, ref)
+ old.stop()
}
}
diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala
index 88b41c0073..6d2e3fbba7 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala
@@ -648,6 +648,19 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
guardian.stop()
}
+ @volatile var aborting = false
+
+ /**
+ * This kind of shutdown attempts to bring the system down and release its
+ * resources more forcefully than plain shutdown. For example it will not
+ * wait for remote-deployed child actors to terminate before terminating their
+ * parents.
+ */
+ def abort(): Unit = {
+ aborting = true
+ shutdown()
+ }
+
//#create-scheduler
/**
* Create the scheduler service. This one needs one special behavior: if
diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala
index ae4c6532fa..26b7c2e626 100644
--- a/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala
+++ b/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala
@@ -16,6 +16,7 @@ import scala.collection.immutable
import scala.concurrent.duration.Duration
import scala.util.control.Exception._
import scala.util.control.NonFatal
+import akka.actor.ActorRefScope
private[akka] trait FaultHandling { this: ActorCell ⇒
@@ -148,6 +149,14 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
// stop all children, which will turn childrenRefs into TerminatingChildrenContainer (if there are children)
children foreach stop
+ if (systemImpl.aborting) {
+ // separate iteration because this is a very rare case that should not penalize normal operation
+ children foreach {
+ case ref: ActorRefScope if !ref.isLocal ⇒ self.sendSystemMessage(DeathWatchNotification(ref, true, false))
+ case _ ⇒
+ }
+ }
+
val wasTerminating = isTerminating
if (setChildrenTerminationReason(ChildrenContainer.Termination)) {
diff --git a/akka-actor/src/main/scala/akka/event/LoggingReceive.scala b/akka-actor/src/main/scala/akka/event/LoggingReceive.scala
index 8388ab5c9c..ac3e9d6fe1 100644
--- a/akka-actor/src/main/scala/akka/event/LoggingReceive.scala
+++ b/akka-actor/src/main/scala/akka/event/LoggingReceive.scala
@@ -26,27 +26,37 @@ object LoggingReceive {
* This method does NOT modify the given Receive unless
* `akka.actor.debug.receive` is set in configuration.
*/
- def apply(r: Receive)(implicit context: ActorContext): Receive = r match {
- case _: LoggingReceive ⇒ r
- case _ ⇒ if (context.system.settings.AddLoggingReceive) new LoggingReceive(None, r) else r
- }
+ def apply(r: Receive)(implicit context: ActorContext): Receive = withLabel(null)(r)
/**
* Java API: compatible with lambda expressions
* This is an EXPERIMENTAL feature and is subject to change until it has received more real world testing.
*/
def create(r: Receive, context: ActorContext): Receive = apply(r)(context)
+
+ /**
+ * Create a decorated logger which will append `" in state " + label` to each message it logs.
+ */
+ def withLabel(label: String)(r: Receive)(implicit context: ActorContext): Receive = r match {
+ case _: LoggingReceive ⇒ r
+ case _ ⇒ if (context.system.settings.AddLoggingReceive) new LoggingReceive(None, r, Option(label)) else r
+ }
}
/**
* This decorator adds invocation logging to a Receive function.
* @param source the log source, if not defined the actor of the context will be used
*/
-class LoggingReceive(source: Option[AnyRef], r: Receive)(implicit context: ActorContext) extends Receive {
+class LoggingReceive(source: Option[AnyRef], r: Receive, label: Option[String])(implicit context: ActorContext) extends Receive {
+ def this(source: Option[AnyRef], r: Receive)(implicit context: ActorContext) = this(source, r, None)
def isDefinedAt(o: Any): Boolean = {
val handled = r.isDefinedAt(o)
val (str, clazz) = LogSource.fromAnyRef(source getOrElse context.asInstanceOf[ActorCell].actor)
- context.system.eventStream.publish(Debug(str, clazz, "received " + (if (handled) "handled" else "unhandled") + " message " + o))
+ context.system.eventStream.publish(Debug(str, clazz, "received " + (if (handled) "handled" else "unhandled") + " message " + o
+ + (label match {
+ case Some(l) ⇒ " in state " + l
+ case _ ⇒ ""
+ })))
handled
}
def apply(o: Any): Unit = r(o)
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SurviveNetworkInstabilitySpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SurviveNetworkInstabilitySpec.scala
index bb527e7bc3..a6516a838f 100644
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SurviveNetworkInstabilitySpec.scala
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SurviveNetworkInstabilitySpec.scala
@@ -20,6 +20,7 @@ import akka.actor.ExtendedActorSystem
import akka.remote.RemoteActorRefProvider
import akka.actor.ActorRef
import akka.dispatch.sysmsg.Failed
+import akka.actor.PoisonPill
object SurviveNetworkInstabilityMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
@@ -113,10 +114,13 @@ abstract class SurviveNetworkInstabilitySpec
runOn(alive: _*) {
for (to ← alive) {
val sel = system.actorSelection(node(to) / "user" / "echo")
+ val msg = s"ping-$to"
+ val p = TestProbe()
awaitAssert {
- sel ! "ping"
- expectMsg(1.second, "ping")
+ sel.tell(msg, p.ref)
+ p.expectMsg(1.second, msg)
}
+ p.ref ! PoisonPill
}
}
enterBarrier("ping-ok")
@@ -269,7 +273,10 @@ abstract class SurviveNetworkInstabilitySpec
for (_ ← 1 to sysMsgBufferSize + 1) {
// remote deployment to third
parent ! Props[RemoteChild]
- val child = expectMsgType[ActorRef]
+ val child = receiveOne(remainingOrDefault) match {
+ case a: ActorRef ⇒ a
+ case other ⇒ fail(s"expected ActorRef, got $other")
+ }
child ! "hello"
expectMsg("hello")
lastSender.path.address should be(address(third))
diff --git a/akka-docs/rst/scala/code/docs/testkit/TestkitDocSpec.scala b/akka-docs/rst/scala/code/docs/testkit/TestkitDocSpec.scala
index 78d965b710..14d3070120 100644
--- a/akka-docs/rst/scala/code/docs/testkit/TestkitDocSpec.scala
+++ b/akka-docs/rst/scala/code/docs/testkit/TestkitDocSpec.scala
@@ -74,7 +74,10 @@ object TestkitDocSpec {
//#logging-receive
import akka.event.LoggingReceive
def receive = LoggingReceive {
- case msg => // Do something...
+ case msg => // Do something ...
+ }
+ def otherState: Receive = LoggingReceive.withLabel("other") {
+ case msg => // Do something else ...
}
//#logging-receive
}
diff --git a/akka-multi-node-testkit/src/main/java/akka/remote/testconductor/TestConductorProtocol.java b/akka-multi-node-testkit/src/main/java/akka/remote/testconductor/TestConductorProtocol.java
index 66ad015941..1384937522 100644
--- a/akka-multi-node-testkit/src/main/java/akka/remote/testconductor/TestConductorProtocol.java
+++ b/akka-multi-node-testkit/src/main/java/akka/remote/testconductor/TestConductorProtocol.java
@@ -1,5 +1,5 @@
// Generated by the protocol buffer compiler. DO NOT EDIT!
-// source: TestConductorProtocol.proto
+// source: protobuf/TestConductorProtocol.proto
package akka.remote.testconductor;
@@ -133,6 +133,10 @@ public final class TestConductorProtocol {
* Shutdown = 5;
*/
Shutdown(4, 5),
+ /**
+ * ShutdownAbrupt = 6;
+ */
+ ShutdownAbrupt(5, 6),
;
/**
@@ -155,6 +159,10 @@ public final class TestConductorProtocol {
* Shutdown = 5;
*/
public static final int Shutdown_VALUE = 5;
+ /**
+ * ShutdownAbrupt = 6;
+ */
+ public static final int ShutdownAbrupt_VALUE = 6;
public final int getNumber() { return value; }
@@ -166,6 +174,7 @@ public final class TestConductorProtocol {
case 3: return Abort;
case 4: return Exit;
case 5: return Shutdown;
+ case 6: return ShutdownAbrupt;
default: return null;
}
}
@@ -5427,26 +5436,27 @@ public final class TestConductorProtocol {
descriptor;
static {
java.lang.String[] descriptorData = {
- "\n\033TestConductorProtocol.proto\"\216\001\n\007Wrappe" +
- "r\022\025\n\005hello\030\001 \001(\0132\006.Hello\022\036\n\007barrier\030\002 \001(" +
- "\0132\r.EnterBarrier\022\037\n\007failure\030\003 \001(\0132\016.Inje" +
- "ctFailure\022\014\n\004done\030\004 \001(\t\022\035\n\004addr\030\005 \001(\0132\017." +
- "AddressRequest\"0\n\005Hello\022\014\n\004name\030\001 \002(\t\022\031\n" +
- "\007address\030\002 \002(\0132\010.Address\"E\n\014EnterBarrier" +
- "\022\014\n\004name\030\001 \002(\t\022\026\n\002op\030\002 \002(\0162\n.BarrierOp\022\017" +
- "\n\007timeout\030\003 \001(\003\"6\n\016AddressRequest\022\014\n\004nod" +
- "e\030\001 \002(\t\022\026\n\004addr\030\002 \001(\0132\010.Address\"G\n\007Addre" +
- "ss\022\020\n\010protocol\030\001 \002(\t\022\016\n\006system\030\002 \002(\t\022\014\n\004",
- "host\030\003 \002(\t\022\014\n\004port\030\004 \002(\005\"\212\001\n\rInjectFailu" +
- "re\022\032\n\007failure\030\001 \002(\0162\t.FailType\022\035\n\tdirect" +
- "ion\030\002 \001(\0162\n.Direction\022\031\n\007address\030\003 \001(\0132\010" +
- ".Address\022\020\n\010rateMBit\030\006 \001(\002\022\021\n\texitValue\030" +
- "\007 \001(\005*;\n\tBarrierOp\022\t\n\005Enter\020\001\022\010\n\004Fail\020\002\022" +
- "\r\n\tSucceeded\020\003\022\n\n\006Failed\020\004*K\n\010FailType\022\014" +
- "\n\010Throttle\020\001\022\016\n\nDisconnect\020\002\022\t\n\005Abort\020\003\022" +
- "\010\n\004Exit\020\004\022\014\n\010Shutdown\020\005*,\n\tDirection\022\010\n\004" +
- "Send\020\001\022\013\n\007Receive\020\002\022\010\n\004Both\020\003B\035\n\031akka.re" +
- "mote.testconductorH\001"
+ "\n$protobuf/TestConductorProtocol.proto\"\216" +
+ "\001\n\007Wrapper\022\025\n\005hello\030\001 \001(\0132\006.Hello\022\036\n\007bar" +
+ "rier\030\002 \001(\0132\r.EnterBarrier\022\037\n\007failure\030\003 \001" +
+ "(\0132\016.InjectFailure\022\014\n\004done\030\004 \001(\t\022\035\n\004addr" +
+ "\030\005 \001(\0132\017.AddressRequest\"0\n\005Hello\022\014\n\004name" +
+ "\030\001 \002(\t\022\031\n\007address\030\002 \002(\0132\010.Address\"E\n\014Ent" +
+ "erBarrier\022\014\n\004name\030\001 \002(\t\022\026\n\002op\030\002 \002(\0162\n.Ba" +
+ "rrierOp\022\017\n\007timeout\030\003 \001(\003\"6\n\016AddressReque" +
+ "st\022\014\n\004node\030\001 \002(\t\022\026\n\004addr\030\002 \001(\0132\010.Address" +
+ "\"G\n\007Address\022\020\n\010protocol\030\001 \002(\t\022\016\n\006system\030",
+ "\002 \002(\t\022\014\n\004host\030\003 \002(\t\022\014\n\004port\030\004 \002(\005\"\212\001\n\rIn" +
+ "jectFailure\022\032\n\007failure\030\001 \002(\0162\t.FailType\022" +
+ "\035\n\tdirection\030\002 \001(\0162\n.Direction\022\031\n\007addres" +
+ "s\030\003 \001(\0132\010.Address\022\020\n\010rateMBit\030\006 \001(\002\022\021\n\te" +
+ "xitValue\030\007 \001(\005*;\n\tBarrierOp\022\t\n\005Enter\020\001\022\010" +
+ "\n\004Fail\020\002\022\r\n\tSucceeded\020\003\022\n\n\006Failed\020\004*_\n\010F" +
+ "ailType\022\014\n\010Throttle\020\001\022\016\n\nDisconnect\020\002\022\t\n" +
+ "\005Abort\020\003\022\010\n\004Exit\020\004\022\014\n\010Shutdown\020\005\022\022\n\016Shut" +
+ "downAbrupt\020\006*,\n\tDirection\022\010\n\004Send\020\001\022\013\n\007R" +
+ "eceive\020\002\022\010\n\004Both\020\003B\035\n\031akka.remote.testco",
+ "nductorH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
diff --git a/akka-multi-node-testkit/src/main/protobuf/TestConductorProtocol.proto b/akka-multi-node-testkit/src/main/protobuf/TestConductorProtocol.proto
index 2c8b65908a..bb86537b11 100644
--- a/akka-multi-node-testkit/src/main/protobuf/TestConductorProtocol.proto
+++ b/akka-multi-node-testkit/src/main/protobuf/TestConductorProtocol.proto
@@ -49,6 +49,7 @@ enum FailType {
Abort = 3;
Exit = 4;
Shutdown = 5;
+ ShutdownAbrupt = 6;
}
enum Direction {
diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala
index 4b1e61bf46..c73fdbf247 100644
--- a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala
+++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala
@@ -185,7 +185,7 @@ trait Conductor { this: TestConductorExt ⇒
import system.dispatcher
// the recover is needed to handle ClientDisconnectedException exception,
// which is normal during shutdown
- controller ? Terminate(node, Some(exitValue)) mapTo classTag[Done] recover { case _: ClientDisconnectedException ⇒ Done }
+ controller ? Terminate(node, Right(exitValue)) mapTo classTag[Done] recover { case _: ClientDisconnectedException ⇒ Done }
}
/**
@@ -194,12 +194,21 @@ trait Conductor { this: TestConductorExt ⇒
*
* @param node is the symbolic name of the node which is to be affected
*/
- def shutdown(node: RoleName): Future[Done] = {
+ def shutdown(node: RoleName): Future[Done] = shutdown(node, abort = false)
+
+ /**
+ * Tell the actor system at the remote node to shut itself down without
+ * awaiting termination of remote-deployed children. The node will also be
+ * removed, so that the remaining nodes may still pass subsequent barriers.
+ *
+ * @param node is the symbolic name of the node which is to be affected
+ */
+ def shutdown(node: RoleName, abort: Boolean): Future[Done] = {
import Settings.QueryTimeout
import system.dispatcher
// the recover is needed to handle ClientDisconnectedException exception,
// which is normal during shutdown
- controller ? Terminate(node, None) mapTo classTag[Done] recover { case _: ClientDisconnectedException ⇒ Done }
+ controller ? Terminate(node, Left(abort)) mapTo classTag[Done] recover { case _: ClientDisconnectedException ⇒ Done }
}
/**
@@ -455,9 +464,10 @@ private[akka] class Controller(private var initialParticipants: Int, controllerP
case Disconnect(node, target, abort) ⇒
val t = nodes(target)
nodes(node).fsm forward ToClient(DisconnectMsg(t.addr, abort))
- case Terminate(node, exitValue) ⇒
+ case Terminate(node, shutdownOrExit) ⇒
barrier ! BarrierCoordinator.RemoveClient(node)
- nodes(node).fsm forward ToClient(TerminateMsg(exitValue))
+ nodes(node).fsm forward ToClient(TerminateMsg(shutdownOrExit))
+ nodes -= node
case Remove(node) ⇒
barrier ! BarrierCoordinator.RemoveClient(node)
}
diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/DataTypes.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/DataTypes.scala
index 03af7f8614..cc8361f158 100644
--- a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/DataTypes.scala
+++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/DataTypes.scala
@@ -43,8 +43,8 @@ private[akka] final case class ThrottleMsg(target: Address, direction: Direction
private[akka] final case class Disconnect(node: RoleName, target: RoleName, abort: Boolean) extends CommandOp
private[akka] final case class DisconnectMsg(target: Address, abort: Boolean) extends ConfirmedClientOp with NetworkOp
-private[akka] final case class Terminate(node: RoleName, exitValue: Option[Int]) extends CommandOp
-private[akka] final case class TerminateMsg(exitValue: Option[Int]) extends ConfirmedClientOp with NetworkOp
+private[akka] final case class Terminate(node: RoleName, shutdownOrExit: Either[Boolean, Int]) extends CommandOp
+private[akka] final case class TerminateMsg(shutdownOrExit: Either[Boolean, Int]) extends ConfirmedClientOp with NetworkOp
private[akka] final case class GetAddress(node: RoleName) extends ServerOp with NetworkOp
private[akka] final case class AddressReply(node: RoleName, addr: Address) extends UnconfirmedClientOp with NetworkOp
@@ -94,10 +94,12 @@ private[akka] class MsgEncoder extends OneToOneEncoder {
case DisconnectMsg(target, abort) ⇒
w.setFailure(TCP.InjectFailure.newBuilder.setAddress(target)
.setFailure(if (abort) TCP.FailType.Abort else TCP.FailType.Disconnect))
- case TerminateMsg(Some(exitValue)) ⇒
+ case TerminateMsg(Right(exitValue)) ⇒
w.setFailure(TCP.InjectFailure.newBuilder.setFailure(TCP.FailType.Exit).setExitValue(exitValue))
- case TerminateMsg(None) ⇒
+ case TerminateMsg(Left(false)) ⇒
w.setFailure(TCP.InjectFailure.newBuilder.setFailure(TCP.FailType.Shutdown))
+ case TerminateMsg(Left(true)) ⇒
+ w.setFailure(TCP.InjectFailure.newBuilder.setFailure(TCP.FailType.ShutdownAbrupt))
case GetAddress(node) ⇒
w.setAddr(TCP.AddressRequest.newBuilder.setNode(node.name))
case AddressReply(node, addr) ⇒
@@ -139,11 +141,12 @@ private[akka] class MsgDecoder extends OneToOneDecoder {
val f = w.getFailure
import TCP.{ FailType ⇒ FT }
f.getFailure match {
- case FT.Throttle ⇒ ThrottleMsg(f.getAddress, f.getDirection, f.getRateMBit)
- case FT.Abort ⇒ DisconnectMsg(f.getAddress, true)
- case FT.Disconnect ⇒ DisconnectMsg(f.getAddress, false)
- case FT.Exit ⇒ TerminateMsg(Some(f.getExitValue))
- case FT.Shutdown ⇒ TerminateMsg(None)
+ case FT.Throttle ⇒ ThrottleMsg(f.getAddress, f.getDirection, f.getRateMBit)
+ case FT.Abort ⇒ DisconnectMsg(f.getAddress, true)
+ case FT.Disconnect ⇒ DisconnectMsg(f.getAddress, false)
+ case FT.Exit ⇒ TerminateMsg(Right(f.getExitValue))
+ case FT.Shutdown ⇒ TerminateMsg(Left(false))
+ case FT.ShutdownAbrupt ⇒ TerminateMsg(Left(true))
}
} else if (w.hasAddr) {
val a = w.getAddr
diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala
index d39d563654..80cd9a87fa 100644
--- a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala
+++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala
@@ -31,8 +31,9 @@ trait Player { this: TestConductorExt ⇒
private var _client: ActorRef = _
private def client = _client match {
- case null ⇒ throw new IllegalStateException("TestConductor client not yet started")
- case x ⇒ x
+ case null ⇒ throw new IllegalStateException("TestConductor client not yet started")
+ case _ if system.isTerminated ⇒ throw new IllegalStateException("TestConductor unavailable because system is shutdown; you need to startNewSystem() before this point")
+ case x ⇒ x
}
/**
@@ -239,10 +240,13 @@ private[akka] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress)
import context.dispatcher // FIXME is this the right EC for the future below?
// FIXME: Currently ignoring, needs support from Remoting
stay
- case TerminateMsg(None) ⇒
+ case TerminateMsg(Left(false)) ⇒
context.system.shutdown()
stay
- case TerminateMsg(Some(exitValue)) ⇒
+ case TerminateMsg(Left(true)) ⇒
+ context.system.asInstanceOf[ActorSystemImpl].abort()
+ stay
+ case TerminateMsg(Right(exitValue)) ⇒
System.exit(exitValue)
stay // needed because Java doesn’t have Nothing
case _: Done ⇒ stay //FIXME what should happen?
diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala
index 7377e9cc14..65ceb61ee9 100644
--- a/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala
+++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala
@@ -327,7 +327,7 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles:
* been started either in Conductor or Player mode when the constructor of
* MultiNodeSpec finishes, i.e. do not call the start*() methods yourself!
*/
- val testConductor: TestConductorExt = TestConductor(system)
+ var testConductor: TestConductorExt = null
/**
* Execute the given block of code only on the given nodes (names according
@@ -376,52 +376,85 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles:
*/
private val controllerAddr = new InetSocketAddress(serverName, serverPort)
- if (selfIndex == 0) {
- Await.result(testConductor.startController(initialParticipants, myself, controllerAddr),
- testConductor.Settings.BarrierTimeout.duration)
- } else {
- Await.result(testConductor.startClient(myself, controllerAddr),
- testConductor.Settings.BarrierTimeout.duration)
+
+ protected def attachConductor(tc: TestConductorExt): Unit = {
+ val timeout = tc.Settings.BarrierTimeout.duration
+ val startFuture =
+ if (selfIndex == 0) tc.startController(initialParticipants, myself, controllerAddr)
+ else tc.startClient(myself, controllerAddr)
+ try Await.result(startFuture, timeout)
+ catch {
+ case NonFatal(x) ⇒ throw new RuntimeException("failure while attaching new conductor", x)
+ }
+ testConductor = tc
}
+ attachConductor(TestConductor(system))
+
// now add deployments, if so desired
private final case class Replacement(tag: String, role: RoleName) {
lazy val addr = node(role).address.toString
}
+
private val replacements = roles map (r ⇒ Replacement("@" + r.name + "@", r))
- private val deployer = system.asInstanceOf[ExtendedActorSystem].provider.deployer
- deployments(myself) foreach { str ⇒
- val deployString = (str /: replacements) {
- case (base, r @ Replacement(tag, _)) ⇒
- base.indexOf(tag) match {
- case -1 ⇒ base
- case start ⇒
- val replaceWith = try
- r.addr
- catch {
- case NonFatal(e) ⇒
- // might happen if all test cases are ignored (excluded) and
- // controller node is finished/exited before r.addr is run
- // on the other nodes
- val unresolved = "akka://unresolved-replacement-" + r.role.name
- log.warning(unresolved + " due to: " + e.getMessage)
- unresolved
- }
- base.replace(tag, replaceWith)
- }
- }
- import scala.collection.JavaConverters._
- ConfigFactory.parseString(deployString).root.asScala foreach {
- case (key, value: ConfigObject) ⇒ deployer.parseConfig(key, value.toConfig) foreach deployer.deploy
- case (key, x) ⇒ throw new IllegalArgumentException(s"key $key must map to deployment section, not simple value $x")
+
+ protected def injectDeployments(sys: ActorSystem, role: RoleName): Unit = {
+ val deployer = sys.asInstanceOf[ExtendedActorSystem].provider.deployer
+ deployments(role) foreach { str ⇒
+ val deployString = (str /: replacements) {
+ case (base, r @ Replacement(tag, _)) ⇒
+ base.indexOf(tag) match {
+ case -1 ⇒ base
+ case start ⇒
+ val replaceWith = try
+ r.addr
+ catch {
+ case NonFatal(e) ⇒
+ // might happen if all test cases are ignored (excluded) and
+ // controller node is finished/exited before r.addr is run
+ // on the other nodes
+ val unresolved = "akka://unresolved-replacement-" + r.role.name
+ log.warning(unresolved + " due to: " + e.getMessage)
+ unresolved
+ }
+ base.replace(tag, replaceWith)
+ }
+ }
+ import scala.collection.JavaConverters._
+ ConfigFactory.parseString(deployString).root.asScala foreach {
+ case (key, value: ConfigObject) ⇒ deployer.parseConfig(key, value.toConfig) foreach deployer.deploy
+ case (key, x) ⇒ throw new IllegalArgumentException(s"key $key must map to deployment section, not simple value $x")
+ }
}
}
- // useful to see which jvm is running which role, used by LogRoleReplace utility
- log.info("Role [{}] started with address [{}]", myself.name,
- system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport.defaultAddress)
+ injectDeployments(system, myself)
+ protected val myAddress = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
+
+ // useful to see which jvm is running which role, used by LogRoleReplace utility
+ log.info("Role [{}] started with address [{}]", myself.name, myAddress)
+
+ /**
+ * This method starts a new ActorSystem with the same configuration as the
+ * previous one on the current node, including deployments. It also creates
+ * a new TestConductor client and registers itself with the conductor so
+ * that it is possible to use barriers etc. normally after this method has
+ * been called.
+ *
+ * NOTICE: you MUST start a new system before trying to enter a barrier or
+ * otherwise using the TestConductor after having terminated this node’s
+ * system.
+ */
+ protected def startNewSystem(): ActorSystem = {
+ val config = ConfigFactory.parseString(s"akka.remote.netty.tcp{port=${myAddress.port.get}\nhostname=${myAddress.host.get}}")
+ .withFallback(system.settings.config)
+ val sys = ActorSystem(system.name, config)
+ injectDeployments(sys, myself)
+ attachConductor(TestConductor(sys))
+ sys
+ }
}
/**
diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteReDeploymentSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteReDeploymentSpec.scala
new file mode 100644
index 0000000000..d82bd428ef
--- /dev/null
+++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteReDeploymentSpec.scala
@@ -0,0 +1,156 @@
+package akka.remote
+
+import akka.remote.testkit.MultiNodeConfig
+import akka.remote.testkit.MultiNodeSpec
+import akka.remote.testkit.STMultiNodeSpec
+import akka.testkit.ImplicitSender
+import akka.actor.Actor
+import akka.actor.ActorRef
+import akka.actor.Props
+import akka.remote.transport.ThrottlerTransportAdapter.Direction._
+import com.typesafe.config.ConfigFactory
+import akka.actor.ActorSystem
+import scala.concurrent.duration._
+import akka.actor.ActorLogging
+import akka.remote.testconductor.TestConductor
+import akka.testkit.TestProbe
+
+object RemoteReDeploymentMultiJvmSpec extends MultiNodeConfig {
+ val first = role("first")
+ val second = role("second")
+
+ commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(
+ """akka.remote.transport-failure-detector {
+ threshold=0.1
+ heartbeat-interval=0.1s
+ acceptable-heartbeat-pause=1s
+ }
+ akka.remote.watch-failure-detector {
+ threshold=0.1
+ heartbeat-interval=0.1s
+ acceptable-heartbeat-pause=2.5s
+ }""")))
+ testTransport(on = true)
+
+ deployOn(second, "/parent/hello.remote = \"@first@\"")
+
+ class Parent extends Actor {
+ val monitor = context.actorSelection("/user/echo")
+ def receive = {
+ case (p: Props, n: String) ⇒ context.actorOf(p, n)
+ case msg ⇒ monitor ! msg
+ }
+ }
+
+ class Hello extends Actor {
+ val monitor = context.actorSelection("/user/echo")
+ context.parent ! "HelloParent"
+ override def preStart(): Unit = monitor ! "PreStart"
+ override def postStop(): Unit = monitor ! "PostStop"
+ def receive = Actor.emptyBehavior
+ }
+
+ class Echo(target: ActorRef) extends Actor with ActorLogging {
+ def receive = {
+ case msg ⇒
+ log.info(s"received $msg from $sender")
+ target ! msg
+ }
+ }
+ def echoProps(target: ActorRef) = Props(new Echo(target))
+}
+
+class RemoteReDeploymentFastMultiJvmNode1 extends RemoteReDeploymentFastMultiJvmSpec
+class RemoteReDeploymentFastMultiJvmNode2 extends RemoteReDeploymentFastMultiJvmSpec
+abstract class RemoteReDeploymentFastMultiJvmSpec extends RemoteReDeploymentMultiJvmSpec {
+ override def sleepAfterKill = 0.seconds // new association will come in while old is still “healthy”
+ override def expectQuarantine = false
+}
+
+class RemoteReDeploymentMediumMultiJvmNode1 extends RemoteReDeploymentMediumMultiJvmSpec
+class RemoteReDeploymentMediumMultiJvmNode2 extends RemoteReDeploymentMediumMultiJvmSpec
+abstract class RemoteReDeploymentMediumMultiJvmSpec extends RemoteReDeploymentMultiJvmSpec {
+ override def sleepAfterKill = 1.seconds // new association will come in while old is gated in ReliableDeliverySupervisor
+ override def expectQuarantine = false
+}
+
+class RemoteReDeploymentSlowMultiJvmNode1 extends RemoteReDeploymentSlowMultiJvmSpec
+class RemoteReDeploymentSlowMultiJvmNode2 extends RemoteReDeploymentSlowMultiJvmSpec
+abstract class RemoteReDeploymentSlowMultiJvmSpec extends RemoteReDeploymentMultiJvmSpec {
+ override def sleepAfterKill = 10.seconds // new association will come in after old has been quarantined
+ override def expectQuarantine = true
+}
+
+abstract class RemoteReDeploymentMultiJvmSpec extends MultiNodeSpec(RemoteReDeploymentMultiJvmSpec)
+ with STMultiNodeSpec with ImplicitSender {
+
+ def sleepAfterKill: FiniteDuration
+ def expectQuarantine: Boolean
+
+ def initialParticipants = roles.size
+
+ import RemoteReDeploymentMultiJvmSpec._
+
+ "A remote deployment target system" must {
+
+ "terminate the child when its parent system is replaced by a new one" in {
+
+ val echo = system.actorOf(echoProps(testActor), "echo")
+ val address = node(second).address
+
+ runOn(second) {
+ system.actorOf(Props[Parent], "parent") ! ((Props[Hello], "hello"))
+ expectMsg("HelloParent")
+ }
+
+ runOn(first) {
+ expectMsg("PreStart")
+ }
+
+ enterBarrier("first-deployed")
+
+ runOn(first) {
+ testConductor.blackhole(second, first, Both).await
+ testConductor.shutdown(second, abort = true).await
+ if (expectQuarantine)
+ within(sleepAfterKill) {
+ expectMsg("PostStop")
+ expectNoMsg()
+ }
+ else expectNoMsg(sleepAfterKill)
+ awaitAssert(node(second), 10.seconds, 100.millis)
+ }
+
+ var sys: ActorSystem = null
+
+ runOn(second) {
+ system.awaitTermination(30.seconds)
+ expectNoMsg(sleepAfterKill)
+ sys = startNewSystem()
+ }
+
+ enterBarrier("cable-cut")
+
+ runOn(second) {
+ val p = TestProbe()(sys)
+ sys.actorOf(echoProps(p.ref), "echo")
+ p.send(sys.actorOf(Props[Parent], "parent"), (Props[Hello], "hello"))
+ p.expectMsg("HelloParent")
+ }
+
+ enterBarrier("re-deployed")
+
+ runOn(first) {
+ if (expectQuarantine) expectMsg("PreStart")
+ else expectMsgAllOf("PostStop", "PreStart")
+ }
+
+ enterBarrier("the-end")
+
+ expectNoMsg(1.second)
+
+ }
+
+ }
+
+}
\ No newline at end of file
diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala
index bd48e45e4c..ed75707a46 100644
--- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala
+++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala
@@ -306,7 +306,8 @@ private[remote] class ReliableDeliverySupervisor(
if (resendBuffer.nonAcked.nonEmpty || resendBuffer.nacked.nonEmpty)
context.system.scheduler.scheduleOnce(settings.SysResendTimeout, self, AttemptSysMsgRedelivery)
context.become(idle)
- case GotUid(receivedUid) ⇒
+ case g @ GotUid(receivedUid) ⇒
+ context.parent ! g
// New system that has the same address as the old - need to start from fresh state
uidConfirmed = true
if (uid.exists(_ != receivedUid)) reset()
diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala
index b9af025d91..5eed01664a 100644
--- a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala
+++ b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala
@@ -21,6 +21,8 @@ import akka.actor.Identify
import akka.actor.ActorIdentity
import akka.actor.EmptyLocalActorRef
import akka.event.AddressTerminatedTopic
+import java.util.concurrent.ConcurrentHashMap
+import akka.dispatch.sysmsg.Unwatch
/**
* INTERNAL API
@@ -55,6 +57,33 @@ private[akka] class RemoteSystemDaemon(
AddressTerminatedTopic(system).subscribe(this)
+ private val parent2children = new ConcurrentHashMap[ActorRef, Set[ActorRef]]
+
+ @tailrec private def addChildParentNeedsWatch(parent: ActorRef, child: ActorRef): Boolean =
+ parent2children.get(parent) match {
+ case null ⇒
+ if (parent2children.putIfAbsent(parent, Set(child)) == null) true
+ else addChildParentNeedsWatch(parent, child)
+ case children ⇒
+ if (parent2children.replace(parent, children, children + child)) false
+ else addChildParentNeedsWatch(parent, child)
+ }
+
+ @tailrec private def removeChildParentNeedsUnwatch(parent: ActorRef, child: ActorRef): Boolean = {
+ parent2children.get(parent) match {
+ case null ⇒ false // no-op
+ case children ⇒
+ val next = children - child
+ if (next.isEmpty) {
+ if (!parent2children.remove(parent, children)) removeChildParentNeedsUnwatch(parent, child)
+ else true
+ } else {
+ if (!parent2children.replace(parent, children, next)) removeChildParentNeedsUnwatch(parent, child)
+ else false
+ }
+ }
+ }
+
/**
* Find the longest matching path which we know about and return that ref
* (or ask that ref to continue searching if elements are left).
@@ -87,8 +116,22 @@ private[akka] class RemoteSystemDaemon(
case DeathWatchNotification(child: ActorRefWithCell with ActorRefScope, _, _) if child.isLocal ⇒
terminating.locked {
removeChild(child.path.elements.drop(1).mkString("/"), child)
+ val parent = child.getParent
+ if (removeChildParentNeedsUnwatch(parent, child)) parent.sendSystemMessage(Unwatch(parent, this))
terminationHookDoneWhenNoChildren()
}
+ case DeathWatchNotification(parent: ActorRef with ActorRefScope, _, _) if !parent.isLocal ⇒
+ terminating.locked {
+ parent2children.remove(parent) match {
+ case null ⇒
+ case children ⇒
+ for (c ← children) {
+ system.stop(c)
+ removeChild(c.path.elements.drop(1).mkString("/"), c)
+ }
+ terminationHookDoneWhenNoChildren()
+ }
+ }
case _ ⇒ super.sendSystemMessage(message)
}
@@ -111,11 +154,13 @@ private[akka] class RemoteSystemDaemon(
else s.substring(0, i)
}
val isTerminating = !terminating.whileOff {
- val actor = system.provider.actorOf(system, props, supervisor.asInstanceOf[InternalActorRef],
+ val parent = supervisor.asInstanceOf[InternalActorRef]
+ val actor = system.provider.actorOf(system, props, parent,
p, systemService = false, Some(deploy), lookupDeploy = true, async = false)
addChild(childName, actor)
actor.sendSystemMessage(Watch(actor, this))
actor.start()
+ if (addChildParentNeedsWatch(parent, actor)) parent.sendSystemMessage(Watch(parent, this))
}
if (isTerminating) log.error("Skipping [{}] to RemoteSystemDaemon on [{}] while terminating", message, p.address)
case _ ⇒
diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala
index f9aba754e4..56734befa2 100644
--- a/akka-remote/src/main/scala/akka/remote/Remoting.scala
+++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala
@@ -265,7 +265,7 @@ private[remote] object EndpointManager {
*/
def isTombstone: Boolean
}
- final case class Pass(endpoint: ActorRef) extends EndpointPolicy {
+ final case class Pass(endpoint: ActorRef, uid: Option[Int]) extends EndpointPolicy {
override def isTombstone: Boolean = false
}
final case class Gated(timeOfRelease: Deadline) extends EndpointPolicy {
@@ -282,15 +282,23 @@ private[remote] object EndpointManager {
private var addressToReadonly = HashMap[Address, ActorRef]()
private var readonlyToAddress = HashMap[ActorRef, Address]()
- def registerWritableEndpoint(address: Address, endpoint: ActorRef): ActorRef = addressToWritable.get(address) match {
- case Some(Pass(e)) ⇒
+ def registerWritableEndpoint(address: Address, uid: Option[Int], endpoint: ActorRef): ActorRef = addressToWritable.get(address) match {
+ case Some(Pass(e, _)) ⇒
throw new IllegalArgumentException(s"Attempting to overwrite existing endpoint [$e] with [$endpoint]")
case _ ⇒
- addressToWritable += address -> Pass(endpoint)
+ addressToWritable += address -> Pass(endpoint, uid)
writableToAddress += endpoint -> address
endpoint
}
+ def registerWritableEndpointUid(writer: ActorRef, uid: Int): Unit = {
+ val address = writableToAddress(writer)
+ addressToWritable.get(address) match {
+ case Some(Pass(ep, _)) ⇒ addressToWritable += address -> Pass(ep, Some(uid))
+ case other ⇒ // the GotUid might have lost the race with some failure
+ }
+ }
+
def registerReadOnlyEndpoint(address: Address, endpoint: ActorRef): ActorRef = {
addressToReadonly += address -> endpoint
readonlyToAddress += endpoint -> address
@@ -313,8 +321,8 @@ private[remote] object EndpointManager {
def writableEndpointWithPolicyFor(address: Address): Option[EndpointPolicy] = addressToWritable.get(address)
def hasWritableEndpointFor(address: Address): Boolean = writableEndpointWithPolicyFor(address) match {
- case Some(Pass(_)) ⇒ true
- case _ ⇒ false
+ case Some(Pass(_, _)) ⇒ true
+ case _ ⇒ false
}
def readOnlyEndpointFor(address: Address): Option[ActorRef] = addressToReadonly.get(address)
@@ -387,6 +395,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
else None
var pendingReadHandoffs = Map[ActorRef, AkkaProtocolHandle]()
+ var stashedInbound = Map[ActorRef, Vector[InboundAssociation]]()
override val supervisorStrategy =
OneForOneStrategy(loggingEnabled = false) {
@@ -479,7 +488,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
case Quarantine(address, uidOption) ⇒
// Stop writers
endpoints.writableEndpointWithPolicyFor(address) match {
- case Some(Pass(endpoint)) ⇒
+ case Some(Pass(endpoint, _)) ⇒
context.stop(endpoint)
if (uidOption.isEmpty) {
log.warning("Association to [{}] with unknown UID is reported as quarantined, but " +
@@ -505,6 +514,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
def createAndRegisterWritingEndpoint(refuseUid: Option[Int]): ActorRef =
endpoints.registerWritableEndpoint(
recipientAddress,
+ None,
createEndpoint(
recipientAddress,
recipientRef.localAddressToUse,
@@ -515,7 +525,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
refuseUid))
endpoints.writableEndpointWithPolicyFor(recipientAddress) match {
- case Some(Pass(endpoint)) ⇒
+ case Some(Pass(endpoint, _)) ⇒
endpoint ! s
case Some(Gated(timeOfRelease)) ⇒
if (timeOfRelease.isOverdue()) createAndRegisterWritingEndpoint(refuseUid = None) ! s
@@ -529,7 +539,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
}
- case InboundAssociation(handle: AkkaProtocolHandle) ⇒ endpoints.readOnlyEndpointFor(handle.remoteAddress) match {
+ case ia @ InboundAssociation(handle: AkkaProtocolHandle) ⇒ endpoints.readOnlyEndpointFor(handle.remoteAddress) match {
case Some(endpoint) ⇒
pendingReadHandoffs.get(endpoint) foreach (_.disassociate())
pendingReadHandoffs += endpoint -> handle
@@ -538,33 +548,21 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
if (endpoints.isQuarantined(handle.remoteAddress, handle.handshakeInfo.uid))
handle.disassociate(AssociationHandle.Quarantined)
else endpoints.writableEndpointWithPolicyFor(handle.remoteAddress) match {
- case Some(Pass(ep)) ⇒
- pendingReadHandoffs.get(ep) foreach (_.disassociate())
- pendingReadHandoffs += ep -> handle
- ep ! EndpointWriter.StopReading(ep)
- case _ ⇒
- val writing = settings.UsePassiveConnections && !endpoints.hasWritableEndpointFor(handle.remoteAddress)
- eventPublisher.notifyListeners(AssociatedEvent(handle.localAddress, handle.remoteAddress, inbound = true))
- val endpoint = createEndpoint(
- handle.remoteAddress,
- handle.localAddress,
- transportMapping(handle.localAddress),
- settings,
- Some(handle),
- writing,
- refuseUid = None)
- if (writing)
- endpoints.registerWritableEndpoint(handle.remoteAddress, endpoint)
- else {
- endpoints.registerReadOnlyEndpoint(handle.remoteAddress, endpoint)
- endpoints.writableEndpointWithPolicyFor(handle.remoteAddress) match {
- case Some(Pass(_)) ⇒ // Leave it alone
- case _ ⇒
- // Since we just communicated with the guy we can lift gate, quarantine, etc. New writer will be
- // opened at first write.
- endpoints.removePolicy(handle.remoteAddress)
- }
+ case Some(Pass(ep, None)) ⇒
+ stashedInbound += ep -> (stashedInbound.getOrElse(ep, Vector.empty) :+ ia)
+ case Some(Pass(ep, Some(uid))) ⇒
+ if (handle.handshakeInfo.uid == uid) {
+ pendingReadHandoffs.get(ep) foreach (_.disassociate())
+ pendingReadHandoffs += ep -> handle
+ ep ! EndpointWriter.StopReading(ep)
+ } else {
+ context.stop(ep)
+ endpoints.unregisterEndpoint(ep)
+ pendingReadHandoffs -= ep
+ createAndRegisterEndpoint(handle, Some(uid))
}
+ case state ⇒
+ createAndRegisterEndpoint(handle, None)
}
}
case EndpointWriter.StoppedReading(endpoint) ⇒
@@ -572,8 +570,13 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
case Terminated(endpoint) ⇒
acceptPendingReader(takingOverFrom = endpoint)
endpoints.unregisterEndpoint(endpoint)
+ stashedInbound -= endpoint
case EndpointWriter.TookOver(endpoint, handle) ⇒
removePendingReader(takingOverFrom = endpoint, withHandle = handle)
+ case ReliableDeliverySupervisor.GotUid(uid) ⇒
+ endpoints.registerWritableEndpointUid(sender, uid)
+ stashedInbound.getOrElse(sender, Vector.empty) foreach (self ! _)
+ stashedInbound -= sender
case Prune ⇒
endpoints.prune()
case ShutdownAndFlush ⇒
@@ -604,6 +607,25 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
case Terminated(_) ⇒ // why should we care now?
}
+ private def createAndRegisterEndpoint(handle: AkkaProtocolHandle, refuseUid: Option[Int]): Unit = {
+ val writing = settings.UsePassiveConnections && !endpoints.hasWritableEndpointFor(handle.remoteAddress)
+ eventPublisher.notifyListeners(AssociatedEvent(handle.localAddress, handle.remoteAddress, inbound = true))
+ val endpoint = createEndpoint(
+ handle.remoteAddress,
+ handle.localAddress,
+ transportMapping(handle.localAddress),
+ settings,
+ Some(handle),
+ writing,
+ refuseUid = refuseUid)
+ if (writing)
+ endpoints.registerWritableEndpoint(handle.remoteAddress, Some(handle.handshakeInfo.uid), endpoint)
+ else {
+ endpoints.registerReadOnlyEndpoint(handle.remoteAddress, endpoint)
+ endpoints.removePolicy(handle.remoteAddress)
+ }
+ }
+
private def listens: Future[Seq[(AkkaProtocolTransport, Address, Promise[AssociationEventListener])]] = {
/*
* Constructs chains of adapters on top of each driver as given in configuration. The resulting structure looks
diff --git a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala
index 1f2efef29d..f0a6cdfdac 100644
--- a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala
+++ b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala
@@ -453,7 +453,7 @@ private[transport] class ThrottledAssociation(
sender() ! SetThrottleAck
stay()
case Event(Disassociated(info), _) ⇒
- stop()
+ stop() // not notifying the upstream handler is intentional: we are relying on heartbeating
}
// This method captures ASSOCIATE packets and extracts the origin address
diff --git a/akka-remote/src/test/scala/akka/remote/EndpointRegistrySpec.scala b/akka-remote/src/test/scala/akka/remote/EndpointRegistrySpec.scala
index 6f46062763..8cbae48cc4 100644
--- a/akka-remote/src/test/scala/akka/remote/EndpointRegistrySpec.scala
+++ b/akka-remote/src/test/scala/akka/remote/EndpointRegistrySpec.scala
@@ -20,9 +20,9 @@ class EndpointRegistrySpec extends AkkaSpec {
reg.writableEndpointWithPolicyFor(address1) should be(None)
- reg.registerWritableEndpoint(address1, actorA) should be(actorA)
+ reg.registerWritableEndpoint(address1, None, actorA) should be(actorA)
- reg.writableEndpointWithPolicyFor(address1) should be(Some(Pass(actorA)))
+ reg.writableEndpointWithPolicyFor(address1) should be(Some(Pass(actorA, None)))
reg.readOnlyEndpointFor(address1) should be(None)
reg.isWritable(actorA) should be(true)
reg.isReadOnly(actorA) should be(false)
@@ -49,10 +49,10 @@ class EndpointRegistrySpec extends AkkaSpec {
reg.writableEndpointWithPolicyFor(address1) should be(None)
reg.registerReadOnlyEndpoint(address1, actorA) should be(actorA)
- reg.registerWritableEndpoint(address1, actorB) should be(actorB)
+ reg.registerWritableEndpoint(address1, None, actorB) should be(actorB)
reg.readOnlyEndpointFor(address1) should be(Some(actorA))
- reg.writableEndpointWithPolicyFor(address1) should be(Some(Pass(actorB)))
+ reg.writableEndpointWithPolicyFor(address1) should be(Some(Pass(actorB, None)))
reg.isWritable(actorA) should be(false)
reg.isWritable(actorB) should be(true)
@@ -66,7 +66,7 @@ class EndpointRegistrySpec extends AkkaSpec {
val reg = new EndpointRegistry
reg.writableEndpointWithPolicyFor(address1) should be(None)
- reg.registerWritableEndpoint(address1, actorA)
+ reg.registerWritableEndpoint(address1, None, actorA)
val deadline = Deadline.now
reg.markAsFailed(actorA, deadline)
reg.writableEndpointWithPolicyFor(address1) should be(Some(Gated(deadline)))
@@ -85,8 +85,8 @@ class EndpointRegistrySpec extends AkkaSpec {
"keep tombstones when removing an endpoint" in {
val reg = new EndpointRegistry
- reg.registerWritableEndpoint(address1, actorA)
- reg.registerWritableEndpoint(address2, actorB)
+ reg.registerWritableEndpoint(address1, None, actorA)
+ reg.registerWritableEndpoint(address2, None, actorB)
val deadline = Deadline.now
reg.markAsFailed(actorA, deadline)
reg.markAsQuarantined(address2, 42, deadline)
@@ -102,8 +102,8 @@ class EndpointRegistrySpec extends AkkaSpec {
"prune outdated Gated directives properly" in {
val reg = new EndpointRegistry
- reg.registerWritableEndpoint(address1, actorA)
- reg.registerWritableEndpoint(address2, actorB)
+ reg.registerWritableEndpoint(address1, None, actorA)
+ reg.registerWritableEndpoint(address2, None, actorB)
reg.markAsFailed(actorA, Deadline.now)
val farInTheFuture = Deadline.now + Duration(60, SECONDS)
reg.markAsFailed(actorB, farInTheFuture)
diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala
index 16b4517db3..fb9338c1a0 100644
--- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala
+++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala
@@ -630,7 +630,7 @@ trait TestKitBase {
for { x ← 1 to n } yield {
val timeout = stop - now
val o = receiveOne(timeout)
- assert(o ne null, s"timeout ($max) while expecting $n messages")
+ assert(o ne null, s"timeout ($max) while expecting $n messages (got ${x - 1})")
o
}
}