diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteReDeploymentSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteReDeploymentSpec.scala new file mode 100644 index 0000000000..d82bd428ef --- /dev/null +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteReDeploymentSpec.scala @@ -0,0 +1,156 @@ +package akka.remote + +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.testkit.STMultiNodeSpec +import akka.testkit.ImplicitSender +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.Props +import akka.remote.transport.ThrottlerTransportAdapter.Direction._ +import com.typesafe.config.ConfigFactory +import akka.actor.ActorSystem +import scala.concurrent.duration._ +import akka.actor.ActorLogging +import akka.remote.testconductor.TestConductor +import akka.testkit.TestProbe + +object RemoteReDeploymentMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString( + """akka.remote.transport-failure-detector { + threshold=0.1 + heartbeat-interval=0.1s + acceptable-heartbeat-pause=1s + } + akka.remote.watch-failure-detector { + threshold=0.1 + heartbeat-interval=0.1s + acceptable-heartbeat-pause=2.5s + }"""))) + testTransport(on = true) + + deployOn(second, "/parent/hello.remote = \"@first@\"") + + class Parent extends Actor { + val monitor = context.actorSelection("/user/echo") + def receive = { + case (p: Props, n: String) ⇒ context.actorOf(p, n) + case msg ⇒ monitor ! msg + } + } + + class Hello extends Actor { + val monitor = context.actorSelection("/user/echo") + context.parent ! "HelloParent" + override def preStart(): Unit = monitor ! "PreStart" + override def postStop(): Unit = monitor ! "PostStop" + def receive = Actor.emptyBehavior + } + + class Echo(target: ActorRef) extends Actor with ActorLogging { + def receive = { + case msg ⇒ + log.info(s"received $msg from $sender") + target ! msg + } + } + def echoProps(target: ActorRef) = Props(new Echo(target)) +} + +class RemoteReDeploymentFastMultiJvmNode1 extends RemoteReDeploymentFastMultiJvmSpec +class RemoteReDeploymentFastMultiJvmNode2 extends RemoteReDeploymentFastMultiJvmSpec +abstract class RemoteReDeploymentFastMultiJvmSpec extends RemoteReDeploymentMultiJvmSpec { + override def sleepAfterKill = 0.seconds // new association will come in while old is still “healthy” + override def expectQuarantine = false +} + +class RemoteReDeploymentMediumMultiJvmNode1 extends RemoteReDeploymentMediumMultiJvmSpec +class RemoteReDeploymentMediumMultiJvmNode2 extends RemoteReDeploymentMediumMultiJvmSpec +abstract class RemoteReDeploymentMediumMultiJvmSpec extends RemoteReDeploymentMultiJvmSpec { + override def sleepAfterKill = 1.seconds // new association will come in while old is gated in ReliableDeliverySupervisor + override def expectQuarantine = false +} + +class RemoteReDeploymentSlowMultiJvmNode1 extends RemoteReDeploymentSlowMultiJvmSpec +class RemoteReDeploymentSlowMultiJvmNode2 extends RemoteReDeploymentSlowMultiJvmSpec +abstract class RemoteReDeploymentSlowMultiJvmSpec extends RemoteReDeploymentMultiJvmSpec { + override def sleepAfterKill = 10.seconds // new association will come in after old has been quarantined + override def expectQuarantine = true +} + +abstract class RemoteReDeploymentMultiJvmSpec extends MultiNodeSpec(RemoteReDeploymentMultiJvmSpec) + with STMultiNodeSpec with ImplicitSender { + + def sleepAfterKill: FiniteDuration + def expectQuarantine: Boolean + + def initialParticipants = roles.size + + import RemoteReDeploymentMultiJvmSpec._ + + "A remote deployment target system" must { + + "terminate the child when its parent system is replaced by a new one" in { + + val echo = system.actorOf(echoProps(testActor), "echo") + val address = node(second).address + + runOn(second) { + system.actorOf(Props[Parent], "parent") ! ((Props[Hello], "hello")) + expectMsg("HelloParent") + } + + runOn(first) { + expectMsg("PreStart") + } + + enterBarrier("first-deployed") + + runOn(first) { + testConductor.blackhole(second, first, Both).await + testConductor.shutdown(second, abort = true).await + if (expectQuarantine) + within(sleepAfterKill) { + expectMsg("PostStop") + expectNoMsg() + } + else expectNoMsg(sleepAfterKill) + awaitAssert(node(second), 10.seconds, 100.millis) + } + + var sys: ActorSystem = null + + runOn(second) { + system.awaitTermination(30.seconds) + expectNoMsg(sleepAfterKill) + sys = startNewSystem() + } + + enterBarrier("cable-cut") + + runOn(second) { + val p = TestProbe()(sys) + sys.actorOf(echoProps(p.ref), "echo") + p.send(sys.actorOf(Props[Parent], "parent"), (Props[Hello], "hello")) + p.expectMsg("HelloParent") + } + + enterBarrier("re-deployed") + + runOn(first) { + if (expectQuarantine) expectMsg("PreStart") + else expectMsgAllOf("PostStop", "PreStart") + } + + enterBarrier("the-end") + + expectNoMsg(1.second) + + } + + } + +} \ No newline at end of file diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index bd48e45e4c..ed75707a46 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -306,7 +306,8 @@ private[remote] class ReliableDeliverySupervisor( if (resendBuffer.nonAcked.nonEmpty || resendBuffer.nacked.nonEmpty) context.system.scheduler.scheduleOnce(settings.SysResendTimeout, self, AttemptSysMsgRedelivery) context.become(idle) - case GotUid(receivedUid) ⇒ + case g @ GotUid(receivedUid) ⇒ + context.parent ! g // New system that has the same address as the old - need to start from fresh state uidConfirmed = true if (uid.exists(_ != receivedUid)) reset() diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index f9aba754e4..56734befa2 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -265,7 +265,7 @@ private[remote] object EndpointManager { */ def isTombstone: Boolean } - final case class Pass(endpoint: ActorRef) extends EndpointPolicy { + final case class Pass(endpoint: ActorRef, uid: Option[Int]) extends EndpointPolicy { override def isTombstone: Boolean = false } final case class Gated(timeOfRelease: Deadline) extends EndpointPolicy { @@ -282,15 +282,23 @@ private[remote] object EndpointManager { private var addressToReadonly = HashMap[Address, ActorRef]() private var readonlyToAddress = HashMap[ActorRef, Address]() - def registerWritableEndpoint(address: Address, endpoint: ActorRef): ActorRef = addressToWritable.get(address) match { - case Some(Pass(e)) ⇒ + def registerWritableEndpoint(address: Address, uid: Option[Int], endpoint: ActorRef): ActorRef = addressToWritable.get(address) match { + case Some(Pass(e, _)) ⇒ throw new IllegalArgumentException(s"Attempting to overwrite existing endpoint [$e] with [$endpoint]") case _ ⇒ - addressToWritable += address -> Pass(endpoint) + addressToWritable += address -> Pass(endpoint, uid) writableToAddress += endpoint -> address endpoint } + def registerWritableEndpointUid(writer: ActorRef, uid: Int): Unit = { + val address = writableToAddress(writer) + addressToWritable.get(address) match { + case Some(Pass(ep, _)) ⇒ addressToWritable += address -> Pass(ep, Some(uid)) + case other ⇒ // the GotUid might have lost the race with some failure + } + } + def registerReadOnlyEndpoint(address: Address, endpoint: ActorRef): ActorRef = { addressToReadonly += address -> endpoint readonlyToAddress += endpoint -> address @@ -313,8 +321,8 @@ private[remote] object EndpointManager { def writableEndpointWithPolicyFor(address: Address): Option[EndpointPolicy] = addressToWritable.get(address) def hasWritableEndpointFor(address: Address): Boolean = writableEndpointWithPolicyFor(address) match { - case Some(Pass(_)) ⇒ true - case _ ⇒ false + case Some(Pass(_, _)) ⇒ true + case _ ⇒ false } def readOnlyEndpointFor(address: Address): Option[ActorRef] = addressToReadonly.get(address) @@ -387,6 +395,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends else None var pendingReadHandoffs = Map[ActorRef, AkkaProtocolHandle]() + var stashedInbound = Map[ActorRef, Vector[InboundAssociation]]() override val supervisorStrategy = OneForOneStrategy(loggingEnabled = false) { @@ -479,7 +488,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends case Quarantine(address, uidOption) ⇒ // Stop writers endpoints.writableEndpointWithPolicyFor(address) match { - case Some(Pass(endpoint)) ⇒ + case Some(Pass(endpoint, _)) ⇒ context.stop(endpoint) if (uidOption.isEmpty) { log.warning("Association to [{}] with unknown UID is reported as quarantined, but " + @@ -505,6 +514,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends def createAndRegisterWritingEndpoint(refuseUid: Option[Int]): ActorRef = endpoints.registerWritableEndpoint( recipientAddress, + None, createEndpoint( recipientAddress, recipientRef.localAddressToUse, @@ -515,7 +525,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends refuseUid)) endpoints.writableEndpointWithPolicyFor(recipientAddress) match { - case Some(Pass(endpoint)) ⇒ + case Some(Pass(endpoint, _)) ⇒ endpoint ! s case Some(Gated(timeOfRelease)) ⇒ if (timeOfRelease.isOverdue()) createAndRegisterWritingEndpoint(refuseUid = None) ! s @@ -529,7 +539,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends } - case InboundAssociation(handle: AkkaProtocolHandle) ⇒ endpoints.readOnlyEndpointFor(handle.remoteAddress) match { + case ia @ InboundAssociation(handle: AkkaProtocolHandle) ⇒ endpoints.readOnlyEndpointFor(handle.remoteAddress) match { case Some(endpoint) ⇒ pendingReadHandoffs.get(endpoint) foreach (_.disassociate()) pendingReadHandoffs += endpoint -> handle @@ -538,33 +548,21 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends if (endpoints.isQuarantined(handle.remoteAddress, handle.handshakeInfo.uid)) handle.disassociate(AssociationHandle.Quarantined) else endpoints.writableEndpointWithPolicyFor(handle.remoteAddress) match { - case Some(Pass(ep)) ⇒ - pendingReadHandoffs.get(ep) foreach (_.disassociate()) - pendingReadHandoffs += ep -> handle - ep ! EndpointWriter.StopReading(ep) - case _ ⇒ - val writing = settings.UsePassiveConnections && !endpoints.hasWritableEndpointFor(handle.remoteAddress) - eventPublisher.notifyListeners(AssociatedEvent(handle.localAddress, handle.remoteAddress, inbound = true)) - val endpoint = createEndpoint( - handle.remoteAddress, - handle.localAddress, - transportMapping(handle.localAddress), - settings, - Some(handle), - writing, - refuseUid = None) - if (writing) - endpoints.registerWritableEndpoint(handle.remoteAddress, endpoint) - else { - endpoints.registerReadOnlyEndpoint(handle.remoteAddress, endpoint) - endpoints.writableEndpointWithPolicyFor(handle.remoteAddress) match { - case Some(Pass(_)) ⇒ // Leave it alone - case _ ⇒ - // Since we just communicated with the guy we can lift gate, quarantine, etc. New writer will be - // opened at first write. - endpoints.removePolicy(handle.remoteAddress) - } + case Some(Pass(ep, None)) ⇒ + stashedInbound += ep -> (stashedInbound.getOrElse(ep, Vector.empty) :+ ia) + case Some(Pass(ep, Some(uid))) ⇒ + if (handle.handshakeInfo.uid == uid) { + pendingReadHandoffs.get(ep) foreach (_.disassociate()) + pendingReadHandoffs += ep -> handle + ep ! EndpointWriter.StopReading(ep) + } else { + context.stop(ep) + endpoints.unregisterEndpoint(ep) + pendingReadHandoffs -= ep + createAndRegisterEndpoint(handle, Some(uid)) } + case state ⇒ + createAndRegisterEndpoint(handle, None) } } case EndpointWriter.StoppedReading(endpoint) ⇒ @@ -572,8 +570,13 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends case Terminated(endpoint) ⇒ acceptPendingReader(takingOverFrom = endpoint) endpoints.unregisterEndpoint(endpoint) + stashedInbound -= endpoint case EndpointWriter.TookOver(endpoint, handle) ⇒ removePendingReader(takingOverFrom = endpoint, withHandle = handle) + case ReliableDeliverySupervisor.GotUid(uid) ⇒ + endpoints.registerWritableEndpointUid(sender, uid) + stashedInbound.getOrElse(sender, Vector.empty) foreach (self ! _) + stashedInbound -= sender case Prune ⇒ endpoints.prune() case ShutdownAndFlush ⇒ @@ -604,6 +607,25 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends case Terminated(_) ⇒ // why should we care now? } + private def createAndRegisterEndpoint(handle: AkkaProtocolHandle, refuseUid: Option[Int]): Unit = { + val writing = settings.UsePassiveConnections && !endpoints.hasWritableEndpointFor(handle.remoteAddress) + eventPublisher.notifyListeners(AssociatedEvent(handle.localAddress, handle.remoteAddress, inbound = true)) + val endpoint = createEndpoint( + handle.remoteAddress, + handle.localAddress, + transportMapping(handle.localAddress), + settings, + Some(handle), + writing, + refuseUid = refuseUid) + if (writing) + endpoints.registerWritableEndpoint(handle.remoteAddress, Some(handle.handshakeInfo.uid), endpoint) + else { + endpoints.registerReadOnlyEndpoint(handle.remoteAddress, endpoint) + endpoints.removePolicy(handle.remoteAddress) + } + } + private def listens: Future[Seq[(AkkaProtocolTransport, Address, Promise[AssociationEventListener])]] = { /* * Constructs chains of adapters on top of each driver as given in configuration. The resulting structure looks diff --git a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala index 1f2efef29d..f0a6cdfdac 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala @@ -453,7 +453,7 @@ private[transport] class ThrottledAssociation( sender() ! SetThrottleAck stay() case Event(Disassociated(info), _) ⇒ - stop() + stop() // not notifying the upstream handler is intentional: we are relying on heartbeating } // This method captures ASSOCIATE packets and extracts the origin address diff --git a/akka-remote/src/test/scala/akka/remote/EndpointRegistrySpec.scala b/akka-remote/src/test/scala/akka/remote/EndpointRegistrySpec.scala index 6f46062763..8cbae48cc4 100644 --- a/akka-remote/src/test/scala/akka/remote/EndpointRegistrySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/EndpointRegistrySpec.scala @@ -20,9 +20,9 @@ class EndpointRegistrySpec extends AkkaSpec { reg.writableEndpointWithPolicyFor(address1) should be(None) - reg.registerWritableEndpoint(address1, actorA) should be(actorA) + reg.registerWritableEndpoint(address1, None, actorA) should be(actorA) - reg.writableEndpointWithPolicyFor(address1) should be(Some(Pass(actorA))) + reg.writableEndpointWithPolicyFor(address1) should be(Some(Pass(actorA, None))) reg.readOnlyEndpointFor(address1) should be(None) reg.isWritable(actorA) should be(true) reg.isReadOnly(actorA) should be(false) @@ -49,10 +49,10 @@ class EndpointRegistrySpec extends AkkaSpec { reg.writableEndpointWithPolicyFor(address1) should be(None) reg.registerReadOnlyEndpoint(address1, actorA) should be(actorA) - reg.registerWritableEndpoint(address1, actorB) should be(actorB) + reg.registerWritableEndpoint(address1, None, actorB) should be(actorB) reg.readOnlyEndpointFor(address1) should be(Some(actorA)) - reg.writableEndpointWithPolicyFor(address1) should be(Some(Pass(actorB))) + reg.writableEndpointWithPolicyFor(address1) should be(Some(Pass(actorB, None))) reg.isWritable(actorA) should be(false) reg.isWritable(actorB) should be(true) @@ -66,7 +66,7 @@ class EndpointRegistrySpec extends AkkaSpec { val reg = new EndpointRegistry reg.writableEndpointWithPolicyFor(address1) should be(None) - reg.registerWritableEndpoint(address1, actorA) + reg.registerWritableEndpoint(address1, None, actorA) val deadline = Deadline.now reg.markAsFailed(actorA, deadline) reg.writableEndpointWithPolicyFor(address1) should be(Some(Gated(deadline))) @@ -85,8 +85,8 @@ class EndpointRegistrySpec extends AkkaSpec { "keep tombstones when removing an endpoint" in { val reg = new EndpointRegistry - reg.registerWritableEndpoint(address1, actorA) - reg.registerWritableEndpoint(address2, actorB) + reg.registerWritableEndpoint(address1, None, actorA) + reg.registerWritableEndpoint(address2, None, actorB) val deadline = Deadline.now reg.markAsFailed(actorA, deadline) reg.markAsQuarantined(address2, 42, deadline) @@ -102,8 +102,8 @@ class EndpointRegistrySpec extends AkkaSpec { "prune outdated Gated directives properly" in { val reg = new EndpointRegistry - reg.registerWritableEndpoint(address1, actorA) - reg.registerWritableEndpoint(address2, actorB) + reg.registerWritableEndpoint(address1, None, actorA) + reg.registerWritableEndpoint(address2, None, actorB) reg.markAsFailed(actorA, Deadline.now) val farInTheFuture = Deadline.now + Duration(60, SECONDS) reg.markAsFailed(actorB, farInTheFuture)