Maintain AddressTerminated subscription in DeathWatch, see #1588

This commit is contained in:
Patrik Nordwall 2012-09-03 20:37:33 +02:00
parent dad04cf9e5
commit 6b40ddc755
4 changed files with 77 additions and 37 deletions

View file

@ -74,11 +74,11 @@ case class Terminated private[akka] (@BeanProperty actor: ActorRef)(@BeanPropert
*
* Used for remote death watch. Failure detector publish this to the
* `eventStream` when a remote node is detected to be unreachable.
* [[akka.actor.DeathWatch]] subscribes to the `eventStream` and translates this
* event to [[akka.actor.Terminated]], which is received by the watcher.
* The watcher ([[akka.actor.DeathWatch]]) subscribes to the `eventStream`
* and translates this event to [[akka.actor.Terminated]], which is sent itself.
*/
@SerialVersionUID(1L)
private[akka] case class NodeUnreachable(address: Address) extends AutoReceivedMessage
private[akka] case class AddressTerminated(address: Address) extends AutoReceivedMessage
abstract class ReceiveTimeout extends PossiblyHarmful

View file

@ -377,14 +377,14 @@ private[akka] class ActorCell(
publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg))
msg.message match {
case Failed(cause, uid) handleFailure(sender, cause, uid)
case t: Terminated watchedActorTerminated(t)
case NodeUnreachable(address) watchedNodeUnreachable(address)
case Kill throw new ActorKilledException("Kill")
case PoisonPill self.stop()
case SelectParent(m) parent.tell(m, msg.sender)
case SelectChildName(name, m) getChildByName(name) match { case Some(c: ChildRestartStats) c.child.tell(m, msg.sender); case _ }
case SelectChildPattern(p, m) for (c children if p.matcher(c.path.name).matches) c.tell(m, msg.sender)
case Failed(cause, uid) handleFailure(sender, cause, uid)
case t: Terminated watchedActorTerminated(t)
case AddressTerminated(address) addressTerminated(address)
case Kill throw new ActorKilledException("Kill")
case PoisonPill self.stop()
case SelectParent(m) parent.tell(m, msg.sender)
case SelectChildName(name, m) getChildByName(name) match { case Some(c: ChildRestartStats) c.child.tell(m, msg.sender); case _ }
case SelectChildPattern(p, m) for (c children if p.matcher(c.path.name).matches) c.tell(m, msg.sender)
}
}

View file

@ -4,7 +4,7 @@
package akka.actor.cell
import akka.actor.{ Terminated, InternalActorRef, ActorRef, ActorCell, Actor, Address, NodeUnreachable }
import akka.actor.{ Terminated, InternalActorRef, ActorRef, ActorCell, Actor, Address, AddressTerminated }
import akka.dispatch.{ Watch, Unwatch }
import akka.event.Logging.{ Warning, Error, Debug }
import scala.util.control.NonFatal
@ -17,12 +17,10 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
override final def watch(subject: ActorRef): ActorRef = subject match {
case a: InternalActorRef
if (a != self && !watching.contains(a)) {
// start subscription to NodeUnreachable if non-local subject and not already subscribing
if (!a.isLocal && !isSubscribingToNodeUnreachable)
system.eventStream.subscribe(self, classOf[NodeUnreachable])
a.sendSystemMessage(Watch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
watching += a
maintainAddressTerminatedSubscription(a) {
a.sendSystemMessage(Watch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
watching += a
}
}
a
}
@ -30,8 +28,10 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
override final def unwatch(subject: ActorRef): ActorRef = subject match {
case a: InternalActorRef
if (a != self && watching.contains(a)) {
a.sendSystemMessage(Unwatch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
watching -= a
maintainAddressTerminatedSubscription(a) {
a.sendSystemMessage(Unwatch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
watching -= a
}
}
a
}
@ -41,7 +41,9 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
* it will be propagated to user's receive.
*/
protected def watchedActorTerminated(t: Terminated): Unit = if (watching.contains(t.actor)) {
watching -= t.actor
maintainAddressTerminatedSubscription(t.actor) {
watching -= t.actor
}
receiveMessage(t)
}
@ -67,7 +69,10 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
case NonFatal(t) publish(Error(t, self.path.toString, clazz(actor), "deathwatch"))
}
}
} finally watching = ActorCell.emptyActorRefSet
} finally {
watching = ActorCell.emptyActorRefSet
unsubscribeAddressTerminated()
}
}
}
@ -76,7 +81,7 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
val watcherSelf = watcher == self
if (watcheeSelf && !watcherSelf) {
if (!watchedBy.contains(watcher)) {
if (!watchedBy.contains(watcher)) maintainAddressTerminatedSubscription(watcher) {
watchedBy += watcher
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now monitoring " + watcher))
}
@ -92,7 +97,7 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
val watcherSelf = watcher == self
if (watcheeSelf && !watcherSelf) {
if (watchedBy.contains(watcher)) {
if (watchedBy.contains(watcher)) maintainAddressTerminatedSubscription(watcher) {
watchedBy -= watcher
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "stopped monitoring " + watcher))
}
@ -103,18 +108,49 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
}
}
protected def watchedNodeUnreachable(address: Address): Unit = {
val subjects = watching filter { _.path.address == address }
// FIXME should we cleanup (remove watchedBy) since we know they are dead?
protected def addressTerminated(address: Address): Unit = {
// cleanup watchedBy since we know they are dead
for (a watchedBy; if a.path.address == address) maintainAddressTerminatedSubscription(a) {
watchedBy -= a
}
// send Terminated to self for all matching subjects
// FIXME existenceConfirmed?
subjects foreach { self ! Terminated(_)(existenceConfirmed = false) }
for (a watching; if a.path.address == address) {
self ! Terminated(a)(existenceConfirmed = false)
}
}
private def isSubscribingToNodeUnreachable: Boolean = watching.exists {
case a: InternalActorRef if !a.isLocal true
case _ false
/**
* Starts subscription to AddressTerminated if not already subscribing and the
* block adds a non-local ref to watching or watchedBy.
* Ends subscription to AddressTerminated if subscribing and the
* block removes the last non-local ref from watching and watchedBy.
*/
private def maintainAddressTerminatedSubscription[T](change: ActorRef)(block: T): T = {
def isNonLocal(ref: ActorRef) = ref match {
case a: InternalActorRef if !a.isLocal true
case _ false
}
def hasNonLocalAddress: Boolean = {
(watching exists isNonLocal) || (watchedBy exists isNonLocal)
}
if (isNonLocal(change)) {
val had = hasNonLocalAddress
val result = block
val has = hasNonLocalAddress
if (had && !has) unsubscribeAddressTerminated()
else if (!had && has) subscribeAddressTerminated()
result
} else {
block
}
}
private def unsubscribeAddressTerminated(): Unit = system.eventStream.unsubscribe(self, classOf[AddressTerminated])
private def subscribeAddressTerminated(): Unit = system.eventStream.subscribe(self, classOf[AddressTerminated])
}

View file

@ -9,7 +9,7 @@ import akka.actor.{ Actor, ActorLogging, ActorRef, Address }
import akka.cluster.ClusterEvent._
import akka.cluster.MemberStatus._
import akka.event.EventStream
import akka.actor.NodeUnreachable
import akka.actor.AddressTerminated
/**
* Domain events published to the event bus.
@ -200,10 +200,14 @@ private[cluster] final class ClusterDomainEventPublisher(environment: ClusterEnv
def publishChanges(oldGossip: Gossip, newGossip: Gossip): Unit = {
// keep the latestGossip to be sent to new subscribers
latestGossip = newGossip
val events = diff(oldGossip, newGossip)
events foreach { eventStream publish }
// notify DeathWatch about the unreachable node
events collect { case MemberUnreachable(m) NodeUnreachable(m.address) } foreach { eventStream publish }
diff(oldGossip, newGossip) foreach { event
eventStream publish event
// notify DeathWatch about unreachable node
event match {
case MemberUnreachable(m) eventStream publish AddressTerminated(m.address)
case _
}
}
}
def publishInternalStats(currentStats: CurrentInternalStats): Unit = {