From 6b40ddc7555ff7d085c83d2db073c748963ff966 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 3 Sep 2012 20:37:33 +0200 Subject: [PATCH] Maintain AddressTerminated subscription in DeathWatch, see #1588 --- .../src/main/scala/akka/actor/Actor.scala | 6 +- .../src/main/scala/akka/actor/ActorCell.scala | 16 ++-- .../scala/akka/actor/cell/DeathWatch.scala | 78 ++++++++++++++----- .../scala/akka/cluster/ClusterEvent.scala | 14 ++-- 4 files changed, 77 insertions(+), 37 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 42000734ca..b5f2093f90 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -74,11 +74,11 @@ case class Terminated private[akka] (@BeanProperty actor: ActorRef)(@BeanPropert * * Used for remote death watch. Failure detector publish this to the * `eventStream` when a remote node is detected to be unreachable. - * [[akka.actor.DeathWatch]] subscribes to the `eventStream` and translates this - * event to [[akka.actor.Terminated]], which is received by the watcher. + * The watcher ([[akka.actor.DeathWatch]]) subscribes to the `eventStream` + * and translates this event to [[akka.actor.Terminated]], which is sent itself. */ @SerialVersionUID(1L) -private[akka] case class NodeUnreachable(address: Address) extends AutoReceivedMessage +private[akka] case class AddressTerminated(address: Address) extends AutoReceivedMessage abstract class ReceiveTimeout extends PossiblyHarmful diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 4a9910bbe2..19c6d9785c 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -377,14 +377,14 @@ private[akka] class ActorCell( publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg)) msg.message match { - case Failed(cause, uid) ⇒ handleFailure(sender, cause, uid) - case t: Terminated ⇒ watchedActorTerminated(t) - case NodeUnreachable(address) ⇒ watchedNodeUnreachable(address) - case Kill ⇒ throw new ActorKilledException("Kill") - case PoisonPill ⇒ self.stop() - case SelectParent(m) ⇒ parent.tell(m, msg.sender) - case SelectChildName(name, m) ⇒ getChildByName(name) match { case Some(c: ChildRestartStats) ⇒ c.child.tell(m, msg.sender); case _ ⇒ } - case SelectChildPattern(p, m) ⇒ for (c ← children if p.matcher(c.path.name).matches) c.tell(m, msg.sender) + case Failed(cause, uid) ⇒ handleFailure(sender, cause, uid) + case t: Terminated ⇒ watchedActorTerminated(t) + case AddressTerminated(address) ⇒ addressTerminated(address) + case Kill ⇒ throw new ActorKilledException("Kill") + case PoisonPill ⇒ self.stop() + case SelectParent(m) ⇒ parent.tell(m, msg.sender) + case SelectChildName(name, m) ⇒ getChildByName(name) match { case Some(c: ChildRestartStats) ⇒ c.child.tell(m, msg.sender); case _ ⇒ } + case SelectChildPattern(p, m) ⇒ for (c ← children if p.matcher(c.path.name).matches) c.tell(m, msg.sender) } } diff --git a/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala b/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala index d45c5ebfce..f8e8c21baf 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala @@ -4,7 +4,7 @@ package akka.actor.cell -import akka.actor.{ Terminated, InternalActorRef, ActorRef, ActorCell, Actor, Address, NodeUnreachable } +import akka.actor.{ Terminated, InternalActorRef, ActorRef, ActorCell, Actor, Address, AddressTerminated } import akka.dispatch.{ Watch, Unwatch } import akka.event.Logging.{ Warning, Error, Debug } import scala.util.control.NonFatal @@ -17,12 +17,10 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ override final def watch(subject: ActorRef): ActorRef = subject match { case a: InternalActorRef ⇒ if (a != self && !watching.contains(a)) { - // start subscription to NodeUnreachable if non-local subject and not already subscribing - if (!a.isLocal && !isSubscribingToNodeUnreachable) - system.eventStream.subscribe(self, classOf[NodeUnreachable]) - - a.sendSystemMessage(Watch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - watching += a + maintainAddressTerminatedSubscription(a) { + a.sendSystemMessage(Watch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + watching += a + } } a } @@ -30,8 +28,10 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ override final def unwatch(subject: ActorRef): ActorRef = subject match { case a: InternalActorRef ⇒ if (a != self && watching.contains(a)) { - a.sendSystemMessage(Unwatch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - watching -= a + maintainAddressTerminatedSubscription(a) { + a.sendSystemMessage(Unwatch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + watching -= a + } } a } @@ -41,7 +41,9 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ * it will be propagated to user's receive. */ protected def watchedActorTerminated(t: Terminated): Unit = if (watching.contains(t.actor)) { - watching -= t.actor + maintainAddressTerminatedSubscription(t.actor) { + watching -= t.actor + } receiveMessage(t) } @@ -67,7 +69,10 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ case NonFatal(t) ⇒ publish(Error(t, self.path.toString, clazz(actor), "deathwatch")) } } - } finally watching = ActorCell.emptyActorRefSet + } finally { + watching = ActorCell.emptyActorRefSet + unsubscribeAddressTerminated() + } } } @@ -76,7 +81,7 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ val watcherSelf = watcher == self if (watcheeSelf && !watcherSelf) { - if (!watchedBy.contains(watcher)) { + if (!watchedBy.contains(watcher)) maintainAddressTerminatedSubscription(watcher) { watchedBy += watcher if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now monitoring " + watcher)) } @@ -92,7 +97,7 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ val watcherSelf = watcher == self if (watcheeSelf && !watcherSelf) { - if (watchedBy.contains(watcher)) { + if (watchedBy.contains(watcher)) maintainAddressTerminatedSubscription(watcher) { watchedBy -= watcher if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "stopped monitoring " + watcher)) } @@ -103,18 +108,49 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ } } - protected def watchedNodeUnreachable(address: Address): Unit = { - val subjects = watching filter { _.path.address == address } - - // FIXME should we cleanup (remove watchedBy) since we know they are dead? + protected def addressTerminated(address: Address): Unit = { + // cleanup watchedBy since we know they are dead + for (a ← watchedBy; if a.path.address == address) maintainAddressTerminatedSubscription(a) { + watchedBy -= a + } + // send Terminated to self for all matching subjects // FIXME existenceConfirmed? - subjects foreach { self ! Terminated(_)(existenceConfirmed = false) } + for (a ← watching; if a.path.address == address) { + self ! Terminated(a)(existenceConfirmed = false) + } } - private def isSubscribingToNodeUnreachable: Boolean = watching.exists { - case a: InternalActorRef if !a.isLocal ⇒ true - case _ ⇒ false + /** + * Starts subscription to AddressTerminated if not already subscribing and the + * block adds a non-local ref to watching or watchedBy. + * Ends subscription to AddressTerminated if subscribing and the + * block removes the last non-local ref from watching and watchedBy. + */ + private def maintainAddressTerminatedSubscription[T](change: ActorRef)(block: ⇒ T): T = { + def isNonLocal(ref: ActorRef) = ref match { + case a: InternalActorRef if !a.isLocal ⇒ true + case _ ⇒ false + } + + def hasNonLocalAddress: Boolean = { + (watching exists isNonLocal) || (watchedBy exists isNonLocal) + } + + if (isNonLocal(change)) { + val had = hasNonLocalAddress + val result = block + val has = hasNonLocalAddress + if (had && !has) unsubscribeAddressTerminated() + else if (!had && has) subscribeAddressTerminated() + result + } else { + block + } } + private def unsubscribeAddressTerminated(): Unit = system.eventStream.unsubscribe(self, classOf[AddressTerminated]) + + private def subscribeAddressTerminated(): Unit = system.eventStream.subscribe(self, classOf[AddressTerminated]) + } \ No newline at end of file diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index bcbeebb147..6b50ddbb3a 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -9,7 +9,7 @@ import akka.actor.{ Actor, ActorLogging, ActorRef, Address } import akka.cluster.ClusterEvent._ import akka.cluster.MemberStatus._ import akka.event.EventStream -import akka.actor.NodeUnreachable +import akka.actor.AddressTerminated /** * Domain events published to the event bus. @@ -200,10 +200,14 @@ private[cluster] final class ClusterDomainEventPublisher(environment: ClusterEnv def publishChanges(oldGossip: Gossip, newGossip: Gossip): Unit = { // keep the latestGossip to be sent to new subscribers latestGossip = newGossip - val events = diff(oldGossip, newGossip) - events foreach { eventStream publish } - // notify DeathWatch about the unreachable node - events collect { case MemberUnreachable(m) ⇒ NodeUnreachable(m.address) } foreach { eventStream publish } + diff(oldGossip, newGossip) foreach { event ⇒ + eventStream publish event + // notify DeathWatch about unreachable node + event match { + case MemberUnreachable(m) ⇒ eventStream publish AddressTerminated(m.address) + case _ ⇒ + } + } } def publishInternalStats(currentStats: CurrentInternalStats): Unit = {