From b1e251e0bc06e8830dd3643f7eafdd353938b43a Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 31 Aug 2012 16:35:23 +0200 Subject: [PATCH 1/6] Prototype of death watch hooked up with failure detector, see #1588 * Probably a lot of things missing, but wanted to try the first idea * The test is green :) --- .../src/main/scala/akka/actor/Actor.scala | 11 +++ .../src/main/scala/akka/actor/ActorCell.scala | 5 +- .../scala/akka/actor/cell/DeathWatch.scala | 21 ++++- .../scala/akka/cluster/ClusterEvent.scala | 7 +- .../akka/cluster/ClusterDeathWatchSpec.scala | 83 +++++++++++++++++++ 5 files changed, 122 insertions(+), 5 deletions(-) create mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 77e1add959..bb72d9cdff 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -65,6 +65,17 @@ case object Kill extends Kill { @SerialVersionUID(1L) case class Terminated private[akka] (@BeanProperty actor: ActorRef)(@BeanProperty val existenceConfirmed: Boolean) extends AutoReceivedMessage +/** + * INTERNAL API + * + * Used for remote death watch. Failure detector publish this to the + * `eventStream` when a remote node is detected to be unreachable. + * [[akka.actor.DeathWatch]] subscribes to the `eventStream` and translates this + * event to [[akka.actor.Terminated]], which is received by the watcher. + */ +@SerialVersionUID(1L) +private[akka] case class NodeUnreachable(address: Address) extends AutoReceivedMessage + abstract class ReceiveTimeout extends PossiblyHarmful /** diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 1699a27505..69deafa306 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -316,9 +316,9 @@ private[akka] class ActorCell( @tailrec final def systemInvoke(message: SystemMessage): Unit = { /* * When recreate/suspend/resume are received while restarting (i.e. between - * preRestart and postRestart, waiting for children to terminate), these + * preRestart and postRestart, waiting for children to terminate), these * must not be executed immediately, but instead queued and released after - * finishRecreate returns. This can only ever be triggered by + * finishRecreate returns. This can only ever be triggered by * ChildTerminated, and ChildTerminated is not one of the queued message * types (hence the overwrite further down). Mailbox sets message.next=null * before systemInvoke, so this will only be non-null during such a replay. @@ -377,6 +377,7 @@ private[akka] class ActorCell( msg.message match { case Failed(cause, uid) ⇒ handleFailure(sender, cause, uid) case t: Terminated ⇒ watchedActorTerminated(t.actor); receiveMessage(t) + case NodeUnreachable(address) ⇒ watchedNodeUnreachable(address) foreach receiveMessage case Kill ⇒ throw new ActorKilledException("Kill") case PoisonPill ⇒ self.stop() case SelectParent(m) ⇒ parent.tell(m, msg.sender) diff --git a/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala b/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala index 031019f3f6..908539ef69 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala @@ -4,7 +4,7 @@ package akka.actor.cell -import akka.actor.{ Terminated, InternalActorRef, ActorRef, ActorCell, Actor } +import akka.actor.{ Terminated, InternalActorRef, ActorRef, ActorCell, Actor, Address, NodeUnreachable } import akka.dispatch.{ Watch, Unwatch } import akka.event.Logging.{ Warning, Error, Debug } import scala.util.control.NonFatal @@ -17,6 +17,10 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ override final def watch(subject: ActorRef): ActorRef = subject match { case a: InternalActorRef ⇒ if (a != self && !watching.contains(a)) { + // start subscription to NodeUnreachable if non-local subject and not already subscribing + if (!a.isLocal && !isSubscribingToNodeUnreachable) + system.eventStream.subscribe(self, classOf[NodeUnreachable]) + a.sendSystemMessage(Watch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ watching += a } @@ -92,4 +96,19 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ } } + protected def watchedNodeUnreachable(address: Address): Iterable[Terminated] = { + val subjects = watching filter { _.path.address == address } + subjects foreach watchedActorTerminated + + // FIXME should we cleanup (remove watchedBy) since we know they are dead? + + // FIXME existenceConfirmed? + subjects map { Terminated(_)(existenceConfirmed = false) } + } + + private def isSubscribingToNodeUnreachable: Boolean = watching.exists { + case a: InternalActorRef if !a.isLocal ⇒ true + case _ ⇒ false + } + } \ No newline at end of file diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index cbcee78cc9..bcbeebb147 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -5,11 +5,11 @@ package akka.cluster import language.postfixOps import scala.collection.immutable.SortedSet - import akka.actor.{ Actor, ActorLogging, ActorRef, Address } import akka.cluster.ClusterEvent._ import akka.cluster.MemberStatus._ import akka.event.EventStream +import akka.actor.NodeUnreachable /** * Domain events published to the event bus. @@ -200,7 +200,10 @@ private[cluster] final class ClusterDomainEventPublisher(environment: ClusterEnv def publishChanges(oldGossip: Gossip, newGossip: Gossip): Unit = { // keep the latestGossip to be sent to new subscribers latestGossip = newGossip - diff(oldGossip, newGossip) foreach { eventStream publish } + val events = diff(oldGossip, newGossip) + events foreach { eventStream publish } + // notify DeathWatch about the unreachable node + events collect { case MemberUnreachable(m) ⇒ NodeUnreachable(m.address) } foreach { eventStream publish } } def publishInternalStats(currentStats: CurrentInternalStats): Unit = { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala new file mode 100644 index 0000000000..467b2e49c0 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala @@ -0,0 +1,83 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.actor.Props +import akka.actor.Actor +import akka.actor.RootActorPath +import akka.actor.Terminated + +object ClusterDeathWatchMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) +} + +class ClusterDeathWatchMultiJvmNode1 extends ClusterDeathWatchSpec with FailureDetectorPuppetStrategy +class ClusterDeathWatchMultiJvmNode2 extends ClusterDeathWatchSpec with FailureDetectorPuppetStrategy +class ClusterDeathWatchMultiJvmNode3 extends ClusterDeathWatchSpec with FailureDetectorPuppetStrategy + +abstract class ClusterDeathWatchSpec + extends MultiNodeSpec(ClusterDeathWatchMultiJvmSpec) + with MultiNodeClusterSpec { + + import ClusterDeathWatchMultiJvmSpec._ + + "An actor watching a remote actor in the cluster" must { + "receive Terminated when watched node becomes unreachable" taggedAs LongRunningTest in { + awaitClusterUp(roles: _*) + enterBarrier("cluster-up") + + runOn(first) { + enterBarrier("subjected-started") + + val path2 = RootActorPath(second) / "user" / "subject" + val path3 = RootActorPath(third) / "user" / "subject" + val watchEstablished = TestLatch(1) + system.actorOf(Props(new Actor { + + context.watch(context.actorFor(path2)) + context.watch(context.actorFor(path3)) + watchEstablished.countDown + + def receive = { + case t: Terminated ⇒ testActor ! t.actor.path + } + }), name = "observer") + + watchEstablished.await + enterBarrier("watch-established") + expectMsg(path2) + enterBarrier("second-terminated") + + markNodeAsUnavailable(third) + expectMsg(path3) + enterBarrier("third-terminated") + + } + + runOn(second, third) { + system.actorOf(Props(new Actor { def receive = Actor.emptyBehavior }), name = "subject") + enterBarrier("subjected-started") + enterBarrier("watch-established") + runOn(third) { + markNodeAsUnavailable(second) + } + enterBarrier("second-terminated") + enterBarrier("third-terminated") + } + + enterBarrier("after") + + } + + } +} From dad04cf9e5e85bda03c24aa56c81d7b949559322 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 3 Sep 2012 18:36:11 +0200 Subject: [PATCH 2/6] DeathWatch must only notify when watching, see #1588 * Discard Terminated when not watching the subject * This will filter eventual duplicates * Note about the fw case in Scaladoc of Terminated * Added description of changed behaviour in migration guide --- .../scala/akka/actor/DeathWatchSpec.scala | 32 +++++++++++++++---- .../src/main/scala/akka/actor/Actor.scala | 6 +++- .../src/main/scala/akka/actor/ActorCell.scala | 6 ++-- .../scala/akka/actor/cell/DeathWatch.scala | 14 +++++--- .../project/migration-guide-2.0.x-2.1.x.rst | 23 +++++++++++++ .../code/docs/routing/RouterDocSpec.scala | 2 +- .../scala/akka/testkit/TestActorRefSpec.scala | 13 ++++++-- 7 files changed, 80 insertions(+), 16 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala index 7dc121604f..ea491dcbd1 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala @@ -5,7 +5,6 @@ package akka.actor import language.postfixOps - import akka.testkit._ import scala.concurrent.util.duration._ import java.util.concurrent.atomic._ @@ -18,8 +17,17 @@ class LocalDeathWatchSpec extends AkkaSpec with ImplicitSender with DefaultTimeo object DeathWatchSpec { def props(target: ActorRef, testActor: ActorRef) = Props(new Actor { context.watch(target) - def receive = { case x ⇒ testActor forward x } + def receive = { + case t: Terminated ⇒ testActor forward WrappedTerminated(t) + case x ⇒ testActor forward x + } }) + + /** + * Forwarding `Terminated` to non-watching testActor is not possible, + * and therefore the `Terminated` message is wrapped. + */ + case class WrappedTerminated(t: Terminated) } trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout ⇒ @@ -32,7 +40,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout "The Death Watch" must { def expectTerminationOf(actorRef: ActorRef) = expectMsgPF(5 seconds, actorRef + ": Stopped or Already terminated when linking") { - case Terminated(`actorRef`) ⇒ true + case WrappedTerminated(Terminated(`actorRef`)) ⇒ true } "notify with one Terminated message when an Actor is stopped" in { @@ -77,7 +85,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout context.unwatch(terminal) def receive = { case "ping" ⇒ sender ! "pong" - case t: Terminated ⇒ testActor ! t + case t: Terminated ⇒ testActor ! WrappedTerminated(t) } })) @@ -137,9 +145,9 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout failed ! Kill val result = receiveWhile(3 seconds, messages = 3) { - case FF(Failed(_: ActorKilledException, _)) if lastSender eq failed ⇒ 1 + case FF(Failed(_: ActorKilledException, _)) if lastSender eq failed ⇒ 1 case FF(Failed(DeathPactException(`failed`), _)) if lastSender eq brother ⇒ 2 - case Terminated(`brother`) ⇒ 3 + case WrappedTerminated(Terminated(`brother`)) ⇒ 3 } testActor.isTerminated must not be true result must be(Seq(1, 2, 3)) @@ -165,6 +173,18 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout parent ! "NKOTB" expectMsg("GREEN") } + + "only notify when watching" in { + val subject = system.actorOf(Props(new Actor { def receive = Actor.emptyBehavior })) + val observer = system.actorOf(Props(new Actor { + context.watch(subject) + def receive = { case x ⇒ testActor forward x } + })) + + subject ! PoisonPill + // the testActor is not watching subject and will discard Terminated msg + expectNoMsg + } } } diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index bb72d9cdff..42000734ca 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -60,7 +60,11 @@ case object Kill extends Kill { } /** - * When Death Watch is used, the watcher will receive a Terminated(watched) message when watched is terminated. + * When Death Watch is used, the watcher will receive a Terminated(watched) + * message when watched is terminated. + * Terminated message can't be forwarded to another actor, since that actor + * might not be watching the subject. Instead, if you need to forward Terminated + * to another actor you should send the information in your own message. */ @SerialVersionUID(1L) case class Terminated private[akka] (@BeanProperty actor: ActorRef)(@BeanProperty val existenceConfirmed: Boolean) extends AutoReceivedMessage diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 69deafa306..4a9910bbe2 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -125,6 +125,8 @@ trait ActorContext extends ActorRefFactory { /** * Registers this actor as a Monitor for the provided ActorRef. + * This actor will receive a Terminated(watched) message when watched + * is terminated. * @return the provided ActorRef */ def watch(subject: ActorRef): ActorRef @@ -376,8 +378,8 @@ private[akka] class ActorCell( msg.message match { case Failed(cause, uid) ⇒ handleFailure(sender, cause, uid) - case t: Terminated ⇒ watchedActorTerminated(t.actor); receiveMessage(t) - case NodeUnreachable(address) ⇒ watchedNodeUnreachable(address) foreach receiveMessage + case t: Terminated ⇒ watchedActorTerminated(t) + case NodeUnreachable(address) ⇒ watchedNodeUnreachable(address) case Kill ⇒ throw new ActorKilledException("Kill") case PoisonPill ⇒ self.stop() case SelectParent(m) ⇒ parent.tell(m, msg.sender) diff --git a/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala b/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala index 908539ef69..d45c5ebfce 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala @@ -36,7 +36,14 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ a } - protected def watchedActorTerminated(ref: ActorRef): Unit = watching -= ref + /** + * When this actor is watching the subject of [[akka.actor.Terminated]] message + * it will be propagated to user's receive. + */ + protected def watchedActorTerminated(t: Terminated): Unit = if (watching.contains(t.actor)) { + watching -= t.actor + receiveMessage(t) + } protected def tellWatchersWeDied(actor: Actor): Unit = { if (!watchedBy.isEmpty) { @@ -96,14 +103,13 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ } } - protected def watchedNodeUnreachable(address: Address): Iterable[Terminated] = { + protected def watchedNodeUnreachable(address: Address): Unit = { val subjects = watching filter { _.path.address == address } - subjects foreach watchedActorTerminated // FIXME should we cleanup (remove watchedBy) since we know they are dead? // FIXME existenceConfirmed? - subjects map { Terminated(_)(existenceConfirmed = false) } + subjects foreach { self ! Terminated(_)(existenceConfirmed = false) } } private def isSubscribingToNodeUnreachable: Boolean = watching.exists { diff --git a/akka-docs/project/migration-guide-2.0.x-2.1.x.rst b/akka-docs/project/migration-guide-2.0.x-2.1.x.rst index 010f544582..6de9eb8ca3 100644 --- a/akka-docs/project/migration-guide-2.0.x-2.1.x.rst +++ b/akka-docs/project/migration-guide-2.0.x-2.1.x.rst @@ -275,3 +275,26 @@ stashed messages are put into the dead letters when the actor stops, make sure y super.postStop if you override it. +Forward of Terminated message +============================= + +Forward of ``Terminated`` message is no longer supported. Instead, if you forward +``Terminated`` you should send the information in you own message. + +v2.0:: + + context.watch(subject) + + def receive = { + case t @ Terminated => someone forward t + } + +v2.1:: + + case class MyTerminated(subject: ActorRef) + + context.watch(subject) + + def receive = { + case Terminated(s) => someone forward MyTerminated(s) + } diff --git a/akka-docs/scala/code/docs/routing/RouterDocSpec.scala b/akka-docs/scala/code/docs/routing/RouterDocSpec.scala index 972da038ca..bea1bf16f4 100644 --- a/akka-docs/scala/code/docs/routing/RouterDocSpec.scala +++ b/akka-docs/scala/code/docs/routing/RouterDocSpec.scala @@ -6,7 +6,7 @@ package docs.routing import RouterDocSpec.MyActor import akka.testkit.AkkaSpec import akka.routing.RoundRobinRouter -import akka.actor.{ActorRef, Props, Actor} +import akka.actor.{ ActorRef, Props, Actor } object RouterDocSpec { class MyActor extends Actor { diff --git a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala index 9d88c7b111..f033df302f 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala @@ -96,6 +96,12 @@ object TestActorRefSpec { } } + /** + * Forwarding `Terminated` to non-watching testActor is not possible, + * and therefore the `Terminated` message is wrapped. + */ + case class WrappedTerminated(t: Terminated) + } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -169,11 +175,14 @@ class TestActorRefSpec extends AkkaSpec("disp1.type=Dispatcher") with BeforeAndA val a = TestActorRef(Props[WorkerActor]) val forwarder = system.actorOf(Props(new Actor { context.watch(a) - def receive = { case x ⇒ testActor forward x } + def receive = { + case t: Terminated ⇒ testActor forward WrappedTerminated(t) + case x ⇒ testActor forward x + } })) a.!(PoisonPill)(testActor) expectMsgPF(5 seconds) { - case Terminated(`a`) ⇒ true + case WrappedTerminated(Terminated(`a`)) ⇒ true } a.isTerminated must be(true) assertThread From 6b40ddc7555ff7d085c83d2db073c748963ff966 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 3 Sep 2012 20:37:33 +0200 Subject: [PATCH 3/6] Maintain AddressTerminated subscription in DeathWatch, see #1588 --- .../src/main/scala/akka/actor/Actor.scala | 6 +- .../src/main/scala/akka/actor/ActorCell.scala | 16 ++-- .../scala/akka/actor/cell/DeathWatch.scala | 78 ++++++++++++++----- .../scala/akka/cluster/ClusterEvent.scala | 14 ++-- 4 files changed, 77 insertions(+), 37 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 42000734ca..b5f2093f90 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -74,11 +74,11 @@ case class Terminated private[akka] (@BeanProperty actor: ActorRef)(@BeanPropert * * Used for remote death watch. Failure detector publish this to the * `eventStream` when a remote node is detected to be unreachable. - * [[akka.actor.DeathWatch]] subscribes to the `eventStream` and translates this - * event to [[akka.actor.Terminated]], which is received by the watcher. + * The watcher ([[akka.actor.DeathWatch]]) subscribes to the `eventStream` + * and translates this event to [[akka.actor.Terminated]], which is sent itself. */ @SerialVersionUID(1L) -private[akka] case class NodeUnreachable(address: Address) extends AutoReceivedMessage +private[akka] case class AddressTerminated(address: Address) extends AutoReceivedMessage abstract class ReceiveTimeout extends PossiblyHarmful diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 4a9910bbe2..19c6d9785c 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -377,14 +377,14 @@ private[akka] class ActorCell( publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg)) msg.message match { - case Failed(cause, uid) ⇒ handleFailure(sender, cause, uid) - case t: Terminated ⇒ watchedActorTerminated(t) - case NodeUnreachable(address) ⇒ watchedNodeUnreachable(address) - case Kill ⇒ throw new ActorKilledException("Kill") - case PoisonPill ⇒ self.stop() - case SelectParent(m) ⇒ parent.tell(m, msg.sender) - case SelectChildName(name, m) ⇒ getChildByName(name) match { case Some(c: ChildRestartStats) ⇒ c.child.tell(m, msg.sender); case _ ⇒ } - case SelectChildPattern(p, m) ⇒ for (c ← children if p.matcher(c.path.name).matches) c.tell(m, msg.sender) + case Failed(cause, uid) ⇒ handleFailure(sender, cause, uid) + case t: Terminated ⇒ watchedActorTerminated(t) + case AddressTerminated(address) ⇒ addressTerminated(address) + case Kill ⇒ throw new ActorKilledException("Kill") + case PoisonPill ⇒ self.stop() + case SelectParent(m) ⇒ parent.tell(m, msg.sender) + case SelectChildName(name, m) ⇒ getChildByName(name) match { case Some(c: ChildRestartStats) ⇒ c.child.tell(m, msg.sender); case _ ⇒ } + case SelectChildPattern(p, m) ⇒ for (c ← children if p.matcher(c.path.name).matches) c.tell(m, msg.sender) } } diff --git a/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala b/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala index d45c5ebfce..f8e8c21baf 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala @@ -4,7 +4,7 @@ package akka.actor.cell -import akka.actor.{ Terminated, InternalActorRef, ActorRef, ActorCell, Actor, Address, NodeUnreachable } +import akka.actor.{ Terminated, InternalActorRef, ActorRef, ActorCell, Actor, Address, AddressTerminated } import akka.dispatch.{ Watch, Unwatch } import akka.event.Logging.{ Warning, Error, Debug } import scala.util.control.NonFatal @@ -17,12 +17,10 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ override final def watch(subject: ActorRef): ActorRef = subject match { case a: InternalActorRef ⇒ if (a != self && !watching.contains(a)) { - // start subscription to NodeUnreachable if non-local subject and not already subscribing - if (!a.isLocal && !isSubscribingToNodeUnreachable) - system.eventStream.subscribe(self, classOf[NodeUnreachable]) - - a.sendSystemMessage(Watch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - watching += a + maintainAddressTerminatedSubscription(a) { + a.sendSystemMessage(Watch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + watching += a + } } a } @@ -30,8 +28,10 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ override final def unwatch(subject: ActorRef): ActorRef = subject match { case a: InternalActorRef ⇒ if (a != self && watching.contains(a)) { - a.sendSystemMessage(Unwatch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - watching -= a + maintainAddressTerminatedSubscription(a) { + a.sendSystemMessage(Unwatch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + watching -= a + } } a } @@ -41,7 +41,9 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ * it will be propagated to user's receive. */ protected def watchedActorTerminated(t: Terminated): Unit = if (watching.contains(t.actor)) { - watching -= t.actor + maintainAddressTerminatedSubscription(t.actor) { + watching -= t.actor + } receiveMessage(t) } @@ -67,7 +69,10 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ case NonFatal(t) ⇒ publish(Error(t, self.path.toString, clazz(actor), "deathwatch")) } } - } finally watching = ActorCell.emptyActorRefSet + } finally { + watching = ActorCell.emptyActorRefSet + unsubscribeAddressTerminated() + } } } @@ -76,7 +81,7 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ val watcherSelf = watcher == self if (watcheeSelf && !watcherSelf) { - if (!watchedBy.contains(watcher)) { + if (!watchedBy.contains(watcher)) maintainAddressTerminatedSubscription(watcher) { watchedBy += watcher if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now monitoring " + watcher)) } @@ -92,7 +97,7 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ val watcherSelf = watcher == self if (watcheeSelf && !watcherSelf) { - if (watchedBy.contains(watcher)) { + if (watchedBy.contains(watcher)) maintainAddressTerminatedSubscription(watcher) { watchedBy -= watcher if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "stopped monitoring " + watcher)) } @@ -103,18 +108,49 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ } } - protected def watchedNodeUnreachable(address: Address): Unit = { - val subjects = watching filter { _.path.address == address } - - // FIXME should we cleanup (remove watchedBy) since we know they are dead? + protected def addressTerminated(address: Address): Unit = { + // cleanup watchedBy since we know they are dead + for (a ← watchedBy; if a.path.address == address) maintainAddressTerminatedSubscription(a) { + watchedBy -= a + } + // send Terminated to self for all matching subjects // FIXME existenceConfirmed? - subjects foreach { self ! Terminated(_)(existenceConfirmed = false) } + for (a ← watching; if a.path.address == address) { + self ! Terminated(a)(existenceConfirmed = false) + } } - private def isSubscribingToNodeUnreachable: Boolean = watching.exists { - case a: InternalActorRef if !a.isLocal ⇒ true - case _ ⇒ false + /** + * Starts subscription to AddressTerminated if not already subscribing and the + * block adds a non-local ref to watching or watchedBy. + * Ends subscription to AddressTerminated if subscribing and the + * block removes the last non-local ref from watching and watchedBy. + */ + private def maintainAddressTerminatedSubscription[T](change: ActorRef)(block: ⇒ T): T = { + def isNonLocal(ref: ActorRef) = ref match { + case a: InternalActorRef if !a.isLocal ⇒ true + case _ ⇒ false + } + + def hasNonLocalAddress: Boolean = { + (watching exists isNonLocal) || (watchedBy exists isNonLocal) + } + + if (isNonLocal(change)) { + val had = hasNonLocalAddress + val result = block + val has = hasNonLocalAddress + if (had && !has) unsubscribeAddressTerminated() + else if (!had && has) subscribeAddressTerminated() + result + } else { + block + } } + private def unsubscribeAddressTerminated(): Unit = system.eventStream.unsubscribe(self, classOf[AddressTerminated]) + + private def subscribeAddressTerminated(): Unit = system.eventStream.subscribe(self, classOf[AddressTerminated]) + } \ No newline at end of file diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index bcbeebb147..6b50ddbb3a 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -9,7 +9,7 @@ import akka.actor.{ Actor, ActorLogging, ActorRef, Address } import akka.cluster.ClusterEvent._ import akka.cluster.MemberStatus._ import akka.event.EventStream -import akka.actor.NodeUnreachable +import akka.actor.AddressTerminated /** * Domain events published to the event bus. @@ -200,10 +200,14 @@ private[cluster] final class ClusterDomainEventPublisher(environment: ClusterEnv def publishChanges(oldGossip: Gossip, newGossip: Gossip): Unit = { // keep the latestGossip to be sent to new subscribers latestGossip = newGossip - val events = diff(oldGossip, newGossip) - events foreach { eventStream publish } - // notify DeathWatch about the unreachable node - events collect { case MemberUnreachable(m) ⇒ NodeUnreachable(m.address) } foreach { eventStream publish } + diff(oldGossip, newGossip) foreach { event ⇒ + eventStream publish event + // notify DeathWatch about unreachable node + event match { + case MemberUnreachable(m) ⇒ eventStream publish AddressTerminated(m.address) + case _ ⇒ + } + } } def publishInternalStats(currentStats: CurrentInternalStats): Unit = { From ea59b952ce5270d1349b36e819f75bc881af9151 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 4 Sep 2012 09:55:08 +0200 Subject: [PATCH 4/6] Direct failed remote sends to deadLetters, see #1588 * When Watch is sent to deadLetters it will generate Terminated * Test: receive Terminated when watched node is unknown host * Test: receive Terminated when watched path doesn't exist --- .../akka/cluster/ClusterDeathWatchSpec.scala | 43 +++++++++++++++++-- .../akka/remote/RemoteActorRefProvider.scala | 2 + 2 files changed, 41 insertions(+), 4 deletions(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala index 467b2e49c0..d6287ed099 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala @@ -10,13 +10,16 @@ import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import akka.actor.Props import akka.actor.Actor +import akka.actor.Address import akka.actor.RootActorPath import akka.actor.Terminated +import akka.actor.Address object ClusterDeathWatchMultiJvmSpec extends MultiNodeConfig { val first = role("first") val second = role("second") val third = role("third") + val fourth = role("fourth") commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } @@ -24,6 +27,7 @@ object ClusterDeathWatchMultiJvmSpec extends MultiNodeConfig { class ClusterDeathWatchMultiJvmNode1 extends ClusterDeathWatchSpec with FailureDetectorPuppetStrategy class ClusterDeathWatchMultiJvmNode2 extends ClusterDeathWatchSpec with FailureDetectorPuppetStrategy class ClusterDeathWatchMultiJvmNode3 extends ClusterDeathWatchSpec with FailureDetectorPuppetStrategy +class ClusterDeathWatchMultiJvmNode4 extends ClusterDeathWatchSpec with FailureDetectorPuppetStrategy abstract class ClusterDeathWatchSpec extends MultiNodeSpec(ClusterDeathWatchMultiJvmSpec) @@ -43,19 +47,18 @@ abstract class ClusterDeathWatchSpec val path3 = RootActorPath(third) / "user" / "subject" val watchEstablished = TestLatch(1) system.actorOf(Props(new Actor { - context.watch(context.actorFor(path2)) context.watch(context.actorFor(path3)) watchEstablished.countDown - def receive = { case t: Terminated ⇒ testActor ! t.actor.path } - }), name = "observer") + }), name = "observer1") watchEstablished.await enterBarrier("watch-established") expectMsg(path2) + expectNoMsg enterBarrier("second-terminated") markNodeAsUnavailable(third) @@ -64,7 +67,7 @@ abstract class ClusterDeathWatchSpec } - runOn(second, third) { + runOn(second, third, fourth) { system.actorOf(Props(new Actor { def receive = Actor.emptyBehavior }), name = "subject") enterBarrier("subjected-started") enterBarrier("watch-established") @@ -79,5 +82,37 @@ abstract class ClusterDeathWatchSpec } + "receive Terminated when watched node is unknown host" taggedAs LongRunningTest in { + runOn(first) { + val path = RootActorPath(Address("akka", system.name, "unknownhost", 2552)) / "user" / "subject" + system.actorOf(Props(new Actor { + context.watch(context.actorFor(path)) + def receive = { + case t: Terminated ⇒ testActor ! t.actor.path + } + }), name = "observer2") + + expectMsg(path) + } + + enterBarrier("after") + } + + "receive Terminated when watched path doesn't exist" taggedAs LongRunningTest in { + runOn(first) { + val path = RootActorPath(second) / "user" / "non-existing" + system.actorOf(Props(new Actor { + context.watch(context.actorFor(path)) + def receive = { + case t: Terminated ⇒ testActor ! t.actor.path + } + }), name = "observer3") + + expectMsg(path) + } + + enterBarrier("after") + } + } } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 5377013a42..24b38ed868 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -236,6 +236,7 @@ private[akka] class RemoteActorRef private[akka] ( catch { case e @ (_: InterruptedException | NonFatal(_)) ⇒ remote.system.eventStream.publish(Error(e, path.toString, classOf[RemoteActorRef], "swallowing exception during message send")) + provider.deadLetters ! message } override def !(message: Any)(implicit sender: ActorRef = null): Unit = @@ -243,6 +244,7 @@ private[akka] class RemoteActorRef private[akka] ( catch { case e @ (_: InterruptedException | NonFatal(_)) ⇒ remote.system.eventStream.publish(Error(e, path.toString, classOf[RemoteActorRef], "swallowing exception during message send")) + provider.deadLetters ! message } def suspend(): Unit = sendSystemMessage(Suspend()) From 7460ef467c660895fb9f1b28c2a59e738ebf5158 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 7 Sep 2012 13:11:45 +0200 Subject: [PATCH 5/6] Adjustments based on feedback, see #1588 --- .../src/main/scala/akka/actor/cell/DeathWatch.scala | 12 +++++------- .../scala/akka/cluster/ClusterDeathWatchSpec.scala | 6 +++--- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala b/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala index f8e8c21baf..ca6b37dd71 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala @@ -110,8 +110,8 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ protected def addressTerminated(address: Address): Unit = { // cleanup watchedBy since we know they are dead - for (a ← watchedBy; if a.path.address == address) maintainAddressTerminatedSubscription(a) { - watchedBy -= a + maintainAddressTerminatedSubscription() { + for (a ← watchedBy; if a.path.address == address) watchedBy -= a } // send Terminated to self for all matching subjects @@ -127,17 +127,15 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ * Ends subscription to AddressTerminated if subscribing and the * block removes the last non-local ref from watching and watchedBy. */ - private def maintainAddressTerminatedSubscription[T](change: ActorRef)(block: ⇒ T): T = { + private def maintainAddressTerminatedSubscription[T](change: ActorRef = null)(block: ⇒ T): T = { def isNonLocal(ref: ActorRef) = ref match { + case null ⇒ true case a: InternalActorRef if !a.isLocal ⇒ true case _ ⇒ false } - def hasNonLocalAddress: Boolean = { - (watching exists isNonLocal) || (watchedBy exists isNonLocal) - } - if (isNonLocal(change)) { + def hasNonLocalAddress: Boolean = ((watching exists isNonLocal) || (watchedBy exists isNonLocal)) val had = hasNonLocalAddress val result = block val has = hasNonLocalAddress diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala index d6287ed099..8fb7357289 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala @@ -78,7 +78,7 @@ abstract class ClusterDeathWatchSpec enterBarrier("third-terminated") } - enterBarrier("after") + enterBarrier("after-1") } @@ -95,7 +95,7 @@ abstract class ClusterDeathWatchSpec expectMsg(path) } - enterBarrier("after") + enterBarrier("after-2") } "receive Terminated when watched path doesn't exist" taggedAs LongRunningTest in { @@ -111,7 +111,7 @@ abstract class ClusterDeathWatchSpec expectMsg(path) } - enterBarrier("after") + enterBarrier("after-3") } } From 6d51d6d647b16b3bcfece24bcd62a5a96fd57fc1 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 11 Sep 2012 09:23:09 +0200 Subject: [PATCH 6/6] Explain use of existenceConfirmed, see #1588 --- akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala b/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala index ca6b37dd71..f994e956c6 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala @@ -115,7 +115,8 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ } // send Terminated to self for all matching subjects - // FIXME existenceConfirmed? + // existenceConfirmed = false because we could have been watching a + // non-local ActorRef that had never resolved before the other node went down for (a ← watching; if a.path.address == address) { self ! Terminated(a)(existenceConfirmed = false) }