Merge pull request #997 from drewhk/wip-2835-ordering-of-terminated

Enforcing ordering of Terminated wrt remote/local #2835
This commit is contained in:
drewhk 2013-01-04 05:32:26 -08:00
commit cdd86cbf1c
3 changed files with 41 additions and 8 deletions

View file

@ -19,10 +19,27 @@ import scala.collection.immutable
*/
@SerialVersionUID(1L)
final case class Address private (protocol: String, system: String, host: Option[String], port: Option[Int]) {
// Please note that local/non-local distinction must be preserved:
// host.isDefined == hasGlobalScope
// host.isEmpty == hasLocalScope
// hasLocalScope == !hasGlobalScope
def this(protocol: String, system: String) = this(protocol, system, None, None)
def this(protocol: String, system: String, host: String, port: Int) = this(protocol, system, Option(host), Some(port))
/**
* Returns true if this Address is only defined locally. It is not safe to send locally scoped addresses to remote
* hosts. See also [[akka.actor.Address#hasGlobalScope]].
*/
def hasLocalScope: Boolean = host.isEmpty
/**
* Returns true if this Address is usable globally. Unlike locally defined addresses ([[akka.actor.Address#hasLocalScope]])
* addresses of global scope are safe to sent to other hosts, as they globally and uniquely identify an addressable
* entity.
*/
def hasGlobalScope: Boolean = host.isDefined
/**
* Returns the canonical String representation of this Address formatted as:
*

View file

@ -4,7 +4,7 @@
package akka.actor.dungeon
import akka.actor.{ Terminated, InternalActorRef, ActorRef, ActorCell, Actor, Address, AddressTerminated }
import akka.actor.{ Terminated, InternalActorRef, ActorRef, ActorRefScope, ActorCell, Actor, Address, AddressTerminated }
import akka.dispatch.{ Watch, Unwatch }
import akka.event.Logging.{ Warning, Error, Debug }
import scala.util.control.NonFatal
@ -51,12 +51,24 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
if (!watchedBy.isEmpty) {
val terminated = Terminated(self)(existenceConfirmed = true, addressTerminated = false)
try {
watchedBy foreach {
watcher
try watcher.tell(terminated, self) catch {
case NonFatal(t) publish(Error(t, self.path.toString, clazz(actor), "deathwatch"))
}
}
def sendTerminated(ifLocal: Boolean)(watcher: ActorRef): Unit =
if (watcher.asInstanceOf[ActorRefScope].isLocal == ifLocal) watcher.tell(terminated, self)
/*
* It is important to notify the remote watchers first, otherwise RemoteDaemon might shut down, causing
* the remoting to shut down as well. At this point Terminated messages to remote watchers are no longer
* deliverable.
*
* The problematic case is:
* 1. Terminated is sent to RemoteDaemon
* 1a. RemoteDaemon is fast enough to notify the terminator actor in RemoteActorRefProvider
* 1b. The terminator is fast enough to enqueue the shutdown command in the remoting
* 2. Only at this point is the Terminated (to be sent remotely) enqueued in the mailbox of remoting
*
* If the remote watchers are notified first, then the mailbox of the Remoting will guarantee the correct order.
*/
watchedBy foreach sendTerminated(ifLocal = false)
watchedBy foreach sendTerminated(ifLocal = true)
} finally watchedBy = ActorCell.emptyActorRefSet
}
}

View file

@ -190,7 +190,11 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
private def finishTerminate() {
val a = actor
// The following order is crucial for things to work properly. Only chnage this if you're very confident and lucky.
/* The following order is crucial for things to work properly. Only change this if you're very confident and lucky.
*
* Please note that if a parent is also a watcher then ChildTerminated and Terminated must be processed in this
* specific order.
*/
try if (a ne null) a.postStop()
finally try dispatcher.detach(this)
finally try parent.sendSystemMessage(ChildTerminated(self))