diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala index 9c5dd95dc5..f7a9844c9d 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala @@ -215,8 +215,9 @@ class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExt EventFilter[Exception]("hello", occurrences = 1) intercept { a ! "die" } - val t = probe.expectMsg(Terminated(a)(true)) + val t = probe.expectMsg(Terminated(a)(existenceConfirmed = true, addressTerminated = false)) t.existenceConfirmed must be(true) + t.addressTerminated must be(false) } "shut down when /user escalates" in { diff --git a/akka-actor-tests/src/test/scala/akka/util/SwitchSpec.scala b/akka-actor-tests/src/test/scala/akka/util/SwitchSpec.scala new file mode 100644 index 0000000000..cf272cba88 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/util/SwitchSpec.scala @@ -0,0 +1,99 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.util + +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit + +class SwitchSpec extends WordSpec with MustMatchers { + + "Switch" must { + + "on and off" in { + val s = new Switch(false) + s.isOff must be(true) + s.isOn must be(false) + + s.switchOn("hello") must be(true) + s.isOn must be(true) + s.isOff must be(false) + s.switchOn("hello") must be(false) + s.isOn must be(true) + s.isOff must be(false) + + s.switchOff("hello") must be(true) + s.isOff must be(true) + s.isOn must be(false) + s.switchOff("hello") must be(false) + s.isOff must be(true) + s.isOn must be(false) + } + + "revert when exception" in { + val s = new Switch(false) + intercept[RuntimeException] { + s.switchOn(throw new RuntimeException) + } + s.isOff must be(true) + } + + "run action without locking" in { + val s = new Switch(false) + s.ifOffYield("yes") must be(Some("yes")) + s.ifOnYield("no") must be(None) + s.ifOff("yes") must be(true) + s.ifOn("no") must be(false) + + s.switchOn() + s.ifOnYield("yes") must be(Some("yes")) + s.ifOffYield("no") must be(None) + s.ifOn("yes") must be(true) + s.ifOff("no") must be(false) + } + + "run action with locking" in { + val s = new Switch(false) + s.whileOffYield("yes") must be(Some("yes")) + s.whileOnYield("no") must be(None) + s.whileOff("yes") must be(true) + s.whileOn("no") must be(false) + + s.switchOn() + s.whileOnYield("yes") must be(Some("yes")) + s.whileOffYield("no") must be(None) + s.whileOn("yes") must be(true) + s.whileOff("no") must be(false) + } + + "run first or second action depending on state" in { + val s = new Switch(false) + s.fold("on")("off") must be("off") + s.switchOn() + s.fold("on")("off") must be("on") + } + + "do proper locking" in { + val s = new Switch(false) + + s.locked { + Thread.sleep(500) + s.switchOn() + s.isOn must be(true) + } + + val latch = new CountDownLatch(1) + new Thread { + override def run(): Unit = { + s.switchOff() + latch.countDown() + } + }.start() + + latch.await(5, TimeUnit.SECONDS) + s.isOff must be(true) + } + } +} diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index b5f2093f90..633458953e 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -65,9 +65,18 @@ case object Kill extends Kill { * Terminated message can't be forwarded to another actor, since that actor * might not be watching the subject. Instead, if you need to forward Terminated * to another actor you should send the information in your own message. + * + * @param actor the watched actor that terminated + * @param existenceConfirmed is false when the Terminated message was not sent + * directly from the watched actor, but derived from another source, such as + * when watching a non-local ActorRef, which might not have been resolved + * @param addressTerminated the Terminated message was derived from + * that the remote node hosting the watched actor was detected as unreachable */ @SerialVersionUID(1L) -case class Terminated private[akka] (@BeanProperty actor: ActorRef)(@BeanProperty val existenceConfirmed: Boolean) extends AutoReceivedMessage +case class Terminated private[akka] (@BeanProperty actor: ActorRef)( + @BeanProperty val existenceConfirmed: Boolean, + @BeanProperty val addressTerminated: Boolean) extends AutoReceivedMessage /** * INTERNAL API diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 504ec5f762..5ec4545fd1 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -380,8 +380,10 @@ private[akka] class ActorCell( publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg)) msg.message match { - case Failed(cause, uid) ⇒ handleFailure(sender, cause, uid) - case t: Terminated ⇒ watchedActorTerminated(t) + case Failed(cause, uid) ⇒ handleFailure(sender, cause, uid) + case t: Terminated ⇒ + if (t.addressTerminated) removeChildWhenToAddressTerminated(t.actor) + watchedActorTerminated(t) case AddressTerminated(address) ⇒ addressTerminated(address) case Kill ⇒ throw new ActorKilledException("Kill") case PoisonPill ⇒ self.stop() @@ -391,6 +393,18 @@ private[akka] class ActorCell( } } + /** + * When a parent is watching a child and it terminates due to AddressTerminated, + * it should be removed to support immediate creation of child with same name. + * + * For remote deployed actors ChildTerminated should be sent to the supervisor + * to clean up child references of remote deployed actors when remote node + * goes down, i.e. triggered by AddressTerminated, but that is the responsibility + * of the ActorRefProvider to handle that scenario. + */ + private def removeChildWhenToAddressTerminated(child: ActorRef): Unit = + childrenRefs.getByRef(child) foreach { crs ⇒ removeChildAndGetStateChange(crs.child) } + final def receiveMessage(msg: Any): Unit = behaviorStack.head.applyOrElse(msg, actor.unhandled) /* diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 615c4ef92e..2cb2f984f2 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -10,9 +10,10 @@ import java.lang.{ UnsupportedOperationException, IllegalStateException } import akka.serialization.{ Serialization, JavaSerializer } import akka.event.EventStream import scala.annotation.tailrec -import java.util.concurrent.{ ConcurrentHashMap } +import java.util.concurrent.ConcurrentHashMap import akka.event.LoggingAdapter import scala.concurrent.forkjoin.ThreadLocalRandom +import scala.collection.JavaConverters /** * Immutable and serializable handle to an actor, which may or may not reside @@ -442,7 +443,7 @@ private[akka] class EmptyLocalActorRef(override val provider: ActorRefProvider, protected def specialHandle(msg: Any): Boolean = msg match { case w: Watch ⇒ if (w.watchee == this && w.watcher != this) - w.watcher ! Terminated(w.watchee)(existenceConfirmed = false) + w.watcher ! Terminated(w.watchee)(existenceConfirmed = false, addressTerminated = false) true case _: Unwatch ⇒ true // Just ignore case _ ⇒ false @@ -467,7 +468,7 @@ private[akka] class DeadLetterActorRef(_provider: ActorRefProvider, override protected def specialHandle(msg: Any): Boolean = msg match { case w: Watch ⇒ if (w.watchee != this && w.watcher != this) - w.watcher ! Terminated(w.watchee)(existenceConfirmed = false) + w.watcher ! Terminated(w.watchee)(existenceConfirmed = false, addressTerminated = false) true case w: Unwatch ⇒ true // Just ignore case NullMessage ⇒ true @@ -516,4 +517,11 @@ private[akka] class VirtualPathContainer( } } } + + def hasChildren: Boolean = !children.isEmpty + + def foreachChild(f: ActorRef ⇒ Unit) = { + val iter = children.values.iterator + while (iter.hasNext) f(iter.next) + } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 3f96bd839c..d60a46d497 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -300,6 +300,23 @@ trait ActorRefFactory { */ private[akka] case class StopChild(child: ActorRef) +/** + * INTERNAL API + */ +private[akka] object SystemGuardian { + /** + * For the purpose of orderly shutdown it's possible + * to register interest in the termination of systemGuardian + * and receive a notification [[akka.actor.Guardian.TerminationHook]] + * before systemGuardian is stopped. The registered hook is supposed + * to reply with [[akka.actor.Guardian.TerminationHookDone]] and the + * systemGuardian will not stop until all registered hooks have replied. + */ + case object RegisterTerminationHook + case object TerminationHook + case object TerminationHookDone +} + /** * Local ActorRef provider. */ @@ -373,10 +390,13 @@ class LocalActorRefProvider( } } - private class Guardian(override val supervisorStrategy: SupervisorStrategy, isSystem: Boolean) extends Actor { + /* + * Root and user guardian + */ + private class Guardian(override val supervisorStrategy: SupervisorStrategy) extends Actor { def receive = { - case Terminated(_) ⇒ if (isSystem) eventStream.stopDefaultLoggers(); context.stop(self) + case Terminated(_) ⇒ context.stop(self) case StopChild(child) ⇒ context.stop(child) case m ⇒ deadLetters ! DeadLetter(m, sender, self) } @@ -385,6 +405,53 @@ class LocalActorRefProvider( override def preRestart(cause: Throwable, msg: Option[Any]) {} } + /** + * System guardian + */ + private class SystemGuardian(override val supervisorStrategy: SupervisorStrategy) extends Actor { + import SystemGuardian._ + + var terminationHooks = Set.empty[ActorRef] + + def receive = { + case Terminated(`guardian`) ⇒ + // time for the systemGuardian to stop, but first notify all the + // termination hooks, they will reply with TerminationHookDone + // and when all are done the systemGuardian is stopped + context.become(terminating) + terminationHooks foreach { _ ! TerminationHook } + stopWhenAllTerminationHooksDone() + case Terminated(a) ⇒ + // a registered, and watched termination hook terminated before + // termination process of guardian has started + terminationHooks -= a + case StopChild(child) ⇒ context.stop(child) + case RegisterTerminationHook if sender != context.system.deadLetters ⇒ + terminationHooks += sender + context watch sender + case m ⇒ deadLetters ! DeadLetter(m, sender, self) + } + + def terminating: Receive = { + case Terminated(a) ⇒ stopWhenAllTerminationHooksDone(a) + case TerminationHookDone ⇒ stopWhenAllTerminationHooksDone(sender) + case m ⇒ deadLetters ! DeadLetter(m, sender, self) + } + + def stopWhenAllTerminationHooksDone(remove: ActorRef): Unit = { + terminationHooks -= remove + stopWhenAllTerminationHooksDone() + } + + def stopWhenAllTerminationHooksDone(): Unit = if (terminationHooks.isEmpty) { + eventStream.stopDefaultLoggers() + context.stop(self) + } + + // guardian MUST NOT lose its children during restart + override def preRestart(cause: Throwable, msg: Option[Any]) {} + } + /* * The problem is that ActorRefs need a reference to the ActorSystem to * provide their service. Hence they cannot be created while the @@ -435,7 +502,7 @@ class LocalActorRefProvider( protected def systemGuardianStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy lazy val rootGuardian: LocalActorRef = - new LocalActorRef(system, Props(new Guardian(rootGuardianStrategy, isSystem = false)), theOneWhoWalksTheBubblesOfSpaceTime, rootPath) { + new LocalActorRef(system, Props(new Guardian(rootGuardianStrategy)), theOneWhoWalksTheBubblesOfSpaceTime, rootPath) { override def getParent: InternalActorRef = this override def getSingleChild(name: String): InternalActorRef = name match { case "temp" ⇒ tempContainer @@ -447,7 +514,7 @@ class LocalActorRefProvider( lazy val guardian: LocalActorRef = { val cell = rootGuardian.underlying cell.reserveChild("user") - val ref = new LocalActorRef(system, Props(new Guardian(guardianStrategy, isSystem = false)), rootGuardian, rootPath / "user") + val ref = new LocalActorRef(system, Props(new Guardian(guardianStrategy)), rootGuardian, rootPath / "user") cell.initChild(ref) ref } @@ -455,7 +522,7 @@ class LocalActorRefProvider( lazy val systemGuardian: LocalActorRef = { val cell = rootGuardian.underlying cell.reserveChild("system") - val ref = new LocalActorRef(system, Props(new Guardian(systemGuardianStrategy, isSystem = true)), rootGuardian, rootPath / "system") + val ref = new LocalActorRef(system, Props(new SystemGuardian(systemGuardianStrategy)), rootGuardian, rootPath / "system") cell.initChild(ref) ref } diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala b/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala index 5407afc2c8..70f79f1d48 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala @@ -49,7 +49,7 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ protected def tellWatchersWeDied(actor: Actor): Unit = { if (!watchedBy.isEmpty) { - val terminated = Terminated(self)(existenceConfirmed = true) + val terminated = Terminated(self)(existenceConfirmed = true, addressTerminated = false) try { watchedBy foreach { watcher ⇒ @@ -118,7 +118,7 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ // existenceConfirmed = false because we could have been watching a // non-local ActorRef that had never resolved before the other node went down for (a ← watching; if a.path.address == address) { - self ! Terminated(a)(existenceConfirmed = false) + self ! Terminated(a)(existenceConfirmed = false, addressTerminated = true) } } diff --git a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala index 92e34a2294..704ce43d8d 100644 --- a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala @@ -261,7 +261,7 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide case _: Terminate ⇒ stop() case Watch(watchee, watcher) ⇒ if (watchee == this && watcher != this) { - if (!addWatcher(watcher)) watcher ! Terminated(watchee)(existenceConfirmed = true) + if (!addWatcher(watcher)) watcher ! Terminated(watchee)(existenceConfirmed = true, addressTerminated = false) } else System.err.println("BUG: illegal Watch(%s,%s) for %s".format(watchee, watcher, this)) case Unwatch(watchee, watcher) ⇒ if (watchee == this && watcher != this) remWatcher(watcher) @@ -280,7 +280,7 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide result tryComplete Failure(new ActorKilledException("Stopped")) val watchers = clearWatchers() if (!watchers.isEmpty) { - val termination = Terminated(this)(existenceConfirmed = true) + val termination = Terminated(this)(existenceConfirmed = true, addressTerminated = false) watchers foreach { w ⇒ try w.tell(termination, this) catch { case NonFatal(t) ⇒ /* FIXME LOG THIS */ } } } } diff --git a/akka-actor/src/main/scala/akka/util/LockUtil.scala b/akka-actor/src/main/scala/akka/util/LockUtil.scala index 91c837063a..8279fcc7df 100644 --- a/akka-actor/src/main/scala/akka/util/LockUtil.scala +++ b/akka-actor/src/main/scala/akka/util/LockUtil.scala @@ -115,7 +115,7 @@ class Switch(startAsOn: Boolean = false) { * Be careful of longrunning or blocking within the provided action as it can lead to deadlocks or bad performance */ def whileOff(action: ⇒ Unit): Boolean = synchronized { - if (switch.get) { + if (!switch.get) { action true } else false diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala index 03d9982bd1..024dfdc00c 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala @@ -5,19 +5,30 @@ package akka.cluster import com.typesafe.config.Config import akka.ConfigurationException +import akka.actor.Actor +import akka.actor.ActorPath +import akka.actor.ActorRef import akka.actor.ActorSystem +import akka.actor.ActorSystemImpl import akka.actor.Deploy import akka.actor.DynamicAccess +import akka.actor.InternalActorRef import akka.actor.NoScopeGiven +import akka.actor.Props import akka.actor.Scheduler import akka.actor.Scope +import akka.actor.Terminated import akka.cluster.routing.ClusterRouterConfig +import akka.cluster.routing.ClusterRouterSettings +import akka.dispatch.ChildTerminated import akka.event.EventStream import akka.remote.RemoteActorRefProvider import akka.remote.RemoteDeployer import akka.remote.routing.RemoteRouterConfig -import akka.cluster.routing.ClusterRouterSettings +/** + * INTERNAL API + */ class ClusterActorRefProvider( _systemName: String, _settings: ActorSystem.Settings, @@ -26,10 +37,55 @@ class ClusterActorRefProvider( _dynamicAccess: DynamicAccess) extends RemoteActorRefProvider( _systemName, _settings, _eventStream, _scheduler, _dynamicAccess) { + @volatile private var remoteDeploymentWatcher: ActorRef = _ + + override def init(system: ActorSystemImpl): Unit = { + super.init(system) + + remoteDeploymentWatcher = system.systemActorOf(Props[RemoteDeploymentWatcher], "RemoteDeploymentWatcher") + } + override val deployer: ClusterDeployer = new ClusterDeployer(settings, dynamicAccess) + /** + * This method is overridden here to keep track of remote deployed actors to + * be able to clean up corresponding child references. + */ + override def useActorOnNode(path: ActorPath, props: Props, deploy: Deploy, supervisor: ActorRef): Unit = { + super.useActorOnNode(path, props, deploy, supervisor) + remoteDeploymentWatcher ! (actorFor(path), supervisor) + } + } +/** + * INTERNAL API + * + * Responsible for cleaning up child references of remote deployed actors when remote node + * goes down (jvm crash, network failure), i.e. triggered by [[akka.actor.AddressTerminated]]. + */ +private[akka] class RemoteDeploymentWatcher extends Actor { + var supervisors = Map.empty[ActorRef, InternalActorRef] + + def receive = { + case (a: ActorRef, supervisor: InternalActorRef) ⇒ + supervisors += (a -> supervisor) + context.watch(a) + + case t @ Terminated(a) if supervisors isDefinedAt a ⇒ + // send extra ChildTerminated to the supervisor so that it will remove the child + if (t.addressTerminated) supervisors(a).sendSystemMessage(ChildTerminated(a)) + supervisors -= a + + case _: Terminated ⇒ + } +} + +/** + * INTERNAL API + * + * Deployer of cluster aware routers. + */ private[akka] class ClusterDeployer(_settings: ActorSystem.Settings, _pm: DynamicAccess) extends RemoteDeployer(_settings, _pm) { override def parseConfig(path: String, config: Config): Option[Deploy] = { super.parseConfig(path, config) match { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala index 2c891531b5..b8554c995b 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala @@ -14,6 +14,9 @@ import akka.actor.Address import akka.actor.RootActorPath import akka.actor.Terminated import akka.actor.Address +import akka.remote.RemoteActorRef +import java.util.concurrent.TimeoutException +import akka.actor.ActorSystemImpl object ClusterDeathWatchMultiJvmSpec extends MultiNodeConfig { val first = role("first") @@ -22,6 +25,12 @@ object ClusterDeathWatchMultiJvmSpec extends MultiNodeConfig { val fourth = role("fourth") commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) + + deployOn(fourth, """/hello.remote = "@first@" """) + + class Hello extends Actor { + def receive = Actor.emptyBehavior + } } class ClusterDeathWatchMultiJvmNode1 extends ClusterDeathWatchSpec @@ -114,5 +123,41 @@ abstract class ClusterDeathWatchSpec enterBarrier("after-3") } + "be able to shutdown system when using remote deployed actor on node that crash" taggedAs LongRunningTest in { + runOn(fourth) { + val hello = system.actorOf(Props[Hello], "hello") + hello.isInstanceOf[RemoteActorRef] must be(true) + hello.path.address must be(address(first)) + watch(hello) + enterBarrier("hello-deployed") + + markNodeAsUnavailable(first) + val t = expectMsgType[Terminated] + t.actor must be(hello) + + enterBarrier("first-unavailable") + + system.shutdown() + val timeout = remaining + try system.awaitTermination(timeout) catch { + case _: TimeoutException ⇒ + fail("Failed to stop [%s] within [%s] \n%s".format(system.name, timeout, + system.asInstanceOf[ActorSystemImpl].printTree)) + } + } + + runOn(first, second, third) { + enterBarrier("hello-deployed") + enterBarrier("first-unavailable") + runOn(first) { + // fourth system will be shutdown, remove to not participate in barriers any more + testConductor.removeNode(fourth) + } + + enterBarrier("after-4") + } + + } + } } diff --git a/akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala b/akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala index ceb1dfbe15..e457542e89 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala @@ -12,7 +12,7 @@ import akka.actor._ import akka.util.Timeout import akka.remote.testconductor.{ TestConductorExt, TestConductor, RoleName } import akka.remote.RemoteActorRefProvider -import akka.testkit.TestKit +import akka.testkit._ import scala.concurrent.{ Await, Awaitable } import scala.util.control.NonFatal import scala.concurrent.util.Duration @@ -246,14 +246,23 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles: } } system.shutdown() - try system.awaitTermination(5 seconds) catch { + val shutdownTimeout = 5.seconds.dilated + try system.awaitTermination(shutdownTimeout) catch { case _: TimeoutException ⇒ - system.log.warning("Failed to stop [{}] within 5 seconds", system.name) - println(system.asInstanceOf[ActorSystemImpl].printTree) + val msg = "Failed to stop [%s] within [%s] \n%s".format(system.name, shutdownTimeout, + system.asInstanceOf[ActorSystemImpl].printTree) + if (verifySystemShutdown) throw new RuntimeException(msg) + else system.log.warning(msg) } atTermination() } + /** + * Override this and return `true` to assert that the + * shutdown of the `ActorSystem` was done properly. + */ + def verifySystemShutdown: Boolean = false + /* * Test Class Interface */ diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/NewRemoteActorSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/NewRemoteActorSpec.scala index 6b3f081968..f7820ae8d3 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/NewRemoteActorSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/NewRemoteActorSpec.scala @@ -3,14 +3,17 @@ */ package akka.remote +import language.postfixOps import com.typesafe.config.ConfigFactory - import akka.actor.Actor import akka.actor.ActorRef import akka.actor.Props import akka.pattern.ask -import testkit.{STMultiNodeSpec, MultiNodeConfig, MultiNodeSpec} +import testkit.{ STMultiNodeSpec, MultiNodeConfig, MultiNodeSpec } import akka.testkit._ +import akka.actor.Terminated +import scala.concurrent.util.duration._ +import com.typesafe.config.ConfigFactory object NewRemoteActorMultiJvmSpec extends MultiNodeConfig { @@ -20,12 +23,16 @@ object NewRemoteActorMultiJvmSpec extends MultiNodeConfig { } } - commonConfig(debugConfig(on = false)) + commonConfig(debugConfig(on = false).withFallback( + ConfigFactory.parseString("akka.remote.log-remote-lifecycle-events = off"))) val master = role("master") val slave = role("slave") - deployOn(master, """/service-hello.remote = "@slave@" """) + deployOn(master, """ + /service-hello.remote = "@slave@" + /service-hello3.remote = "@slave@" + """) deployOnAll("""/service-hello2.remote = "@slave@" """) } @@ -37,7 +44,10 @@ class NewRemoteActorSpec extends MultiNodeSpec(NewRemoteActorMultiJvmSpec) with STMultiNodeSpec with ImplicitSender with DefaultTimeout { import NewRemoteActorMultiJvmSpec._ - def initialParticipants = 2 + def initialParticipants = roles.size + + // ensure that system shutdown is successful + override def verifySystemShutdown = true "A new remote actor" must { "be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" taggedAs LongRunningTest in { @@ -45,14 +55,11 @@ class NewRemoteActorSpec extends MultiNodeSpec(NewRemoteActorMultiJvmSpec) runOn(master) { val actor = system.actorOf(Props[SomeActor], "service-hello") actor.isInstanceOf[RemoteActorRef] must be(true) + actor.path.address must be(node(slave).address) val slaveAddress = testConductor.getAddressFor(slave).await actor ! "identify" expectMsgType[ActorRef].path.address must equal(slaveAddress) - - // shut down the actor before we let the other node(s) shut down so we don't try to send - // "Terminate" to a shut down node - system.stop(actor) } enterBarrier("done") @@ -63,17 +70,37 @@ class NewRemoteActorSpec extends MultiNodeSpec(NewRemoteActorMultiJvmSpec) runOn(master) { val actor = system.actorOf(Props[SomeActor], "service-hello2") actor.isInstanceOf[RemoteActorRef] must be(true) + actor.path.address must be(node(slave).address) val slaveAddress = testConductor.getAddressFor(slave).await actor ! "identify" expectMsgType[ActorRef].path.address must equal(slaveAddress) - - // shut down the actor before we let the other node(s) shut down so we don't try to send - // "Terminate" to a shut down node - system.stop(actor) } enterBarrier("done") } + + "be able to shutdown system when using remote deployed actor" taggedAs LongRunningTest in within(10 seconds) { + runOn(master) { + val actor = system.actorOf(Props[SomeActor], "service-hello3") + actor.isInstanceOf[RemoteActorRef] must be(true) + actor.path.address must be(node(slave).address) + watch(actor) + + enterBarrier("deployed") + + // master system is supposed to be shutdown after slave + // this should be triggered by slave system shutdown + expectMsgPF(remaining) { case Terminated(`actor`) ⇒ true } + } + + runOn(slave) { + enterBarrier("deployed") + } + + // Important that this is the last test. + // It must not be any barriers here. + // verifySystemShutdown = true will ensure that system shutdown is successful + } } } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala index 53023687c0..60a2e7b4b0 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala @@ -5,11 +5,12 @@ package akka.remote import scala.annotation.tailrec -import akka.actor.{ VirtualPathContainer, Terminated, Deploy, Props, Nobody, LocalActorRef, InternalActorRef, Address, ActorSystemImpl, ActorRef, ActorPathExtractor, ActorPath, Actor } +import akka.actor.{ VirtualPathContainer, Terminated, Deploy, Props, Nobody, LocalActorRef, InternalActorRef, Address, ActorSystemImpl, ActorRef, ActorPathExtractor, ActorPath, Actor, AddressTerminated } import akka.event.LoggingAdapter import akka.dispatch.Watch import akka.actor.ActorRefWithCell import akka.actor.ActorRefScope +import akka.util.Switch private[akka] sealed trait DaemonMsg private[akka] case class DaemonMsgCreate(props: Props, deploy: Deploy, path: String, supervisor: ActorRef) extends DaemonMsg @@ -24,6 +25,14 @@ private[akka] case class DaemonMsgCreate(props: Props, deploy: Deploy, path: Str private[akka] class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath, _parent: InternalActorRef, _log: LoggingAdapter) extends VirtualPathContainer(system.provider, _path, _parent, _log) { + import akka.actor.SystemGuardian._ + + private val terminating = new Switch(false) + + system.provider.systemGuardian.tell(RegisterTerminationHook, this) + + system.eventStream.subscribe(this, classOf[AddressTerminated]) + /** * Find the longest matching path which we know about and return that ref * (or ask that ref to continue searching if elements are left). @@ -60,21 +69,40 @@ private[akka] class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath // TODO RK canonicalize path so as not to duplicate it always #1446 val subpath = elems.drop(1) val path = this.path / subpath - val actor = system.provider.actorOf(system, props, supervisor.asInstanceOf[InternalActorRef], - path, systemService = false, Some(deploy), lookupDeploy = true, async = false) - addChild(subpath.mkString("/"), actor) - this.sendSystemMessage(Watch(actor, this)) + val isTerminating = !terminating.whileOff { + val actor = system.provider.actorOf(system, props, supervisor.asInstanceOf[InternalActorRef], + path, systemService = false, Some(deploy), lookupDeploy = true, async = false) + addChild(subpath.mkString("/"), actor) + actor.sendSystemMessage(Watch(actor, this)) + } + if (isTerminating) log.error("Skipping [{}] to RemoteSystemDaemon on [{}] while terminating", message, path.address) case _ ⇒ log.error("remote path does not match path from message [{}]", message) } } case Terminated(child: ActorRefWithCell) if child.asInstanceOf[ActorRefScope].isLocal ⇒ - removeChild(child.path.elements.drop(1).mkString("/")) + terminating.locked { + removeChild(child.path.elements.drop(1).mkString("/")) + terminationHookDoneWhenNoChildren() + } case t: Terminated ⇒ - case unknown ⇒ log.warning("Unknown message {} received by {}", unknown, this) + case TerminationHook ⇒ + terminating.switchOn { + terminationHookDoneWhenNoChildren() + foreachChild { system.stop } + } + + case AddressTerminated(address) ⇒ + foreachChild { case a: InternalActorRef if a.getParent.path.address == address ⇒ system.stop(a) } + + case unknown ⇒ log.warning("Unknown message {} received by {}", unknown, this) + } + + def terminationHookDoneWhenNoChildren(): Unit = terminating.whileOn { + if (!hasChildren) system.provider.systemGuardian.tell(TerminationHookDone, this) } } diff --git a/akka-remote/src/main/scala/akka/remote/netty/Client.scala b/akka-remote/src/main/scala/akka/remote/netty/Client.scala index 0b66cd3b7c..4a391af5f0 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Client.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Client.scala @@ -20,6 +20,7 @@ import akka.actor.{ DeadLetter, Address, ActorRef } import akka.util.Switch import scala.util.control.NonFatal import org.jboss.netty.handler.ssl.SslHandler +import scala.concurrent.util.Deadline /** * This is the abstract baseclass for netty remote clients, currently there's only an @@ -106,7 +107,7 @@ private[akka] class ActiveRemoteClient private[akka] ( private[remote] var openChannels: DefaultChannelGroup = _ @volatile - private var reconnectionTimeWindowStart = 0L + private var reconnectionDeadline: Option[Deadline] = None def notifyListeners(msg: RemoteLifeCycleEvent): Unit = netty.notifyListeners(msg) @@ -208,20 +209,18 @@ private[akka] class ActiveRemoteClient private[akka] ( log.debug("[{}] has been shut down", name) } - private[akka] def isWithinReconnectionTimeWindow: Boolean = { - if (reconnectionTimeWindowStart == 0L) { - reconnectionTimeWindowStart = System.currentTimeMillis + private[akka] def isWithinReconnectionTimeWindow: Boolean = reconnectionDeadline match { + case None ⇒ + reconnectionDeadline = Some(Deadline.now + settings.ReconnectionTimeWindow) true - } else { - val timeLeft = (settings.ReconnectionTimeWindow.toMillis - (System.currentTimeMillis - reconnectionTimeWindowStart)) > 0 - if (timeLeft) - log.info("Will try to reconnect to remote server for another [{}] milliseconds", timeLeft) - - timeLeft - } + case Some(deadline) ⇒ + val hasTimeLeft = deadline.hasTimeLeft + if (hasTimeLeft) + log.info("Will try to reconnect to remote server for another [{}] milliseconds", deadline.timeLeft.toMillis) + hasTimeLeft } - private[akka] def resetReconnectionTimeWindow = reconnectionTimeWindowStart = 0L + private[akka] def resetReconnectionTimeWindow = reconnectionDeadline = None } @ChannelHandler.Sharable