Merge pull request #511 from akka/wip-2134-deathwatch2.0-√

Wip 2134 deathwatch2.0 √
This commit is contained in:
viktorklang 2012-06-05 03:33:01 -07:00
commit 9581ec8403
28 changed files with 334 additions and 1010 deletions

View file

@ -36,7 +36,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
"notify with one Terminated message when an Actor is stopped" in { "notify with one Terminated message when an Actor is stopped" in {
val terminal = system.actorOf(Props.empty) val terminal = system.actorOf(Props.empty)
startWatching(terminal) ! "hallo" startWatching(terminal) ! "hallo"
expectMsg("hallo") // this ensures that the DaemonMsgWatch has been received before we send the PoisonPill expectMsg("hallo")
terminal ! PoisonPill terminal ! PoisonPill

View file

@ -374,7 +374,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa
def compare(l: AnyRef, r: AnyRef) = (l, r) match { case (ll: ActorCell, rr: ActorCell) ll.self.path compareTo rr.self.path } def compare(l: AnyRef, r: AnyRef) = (l, r) match { case (ll: ActorCell, rr: ActorCell) ll.self.path compareTo rr.self.path }
} foreach { } foreach {
case cell: ActorCell case cell: ActorCell
System.err.println(" - " + cell.self.path + " " + cell.isTerminated + " " + cell.mailbox.status + " " + cell.mailbox.numberOfMessages + " " + SystemMessage.size(cell.mailbox.systemDrain())) System.err.println(" - " + cell.self.path + " " + cell.isTerminated + " " + cell.mailbox.status + " " + cell.mailbox.numberOfMessages + " " + SystemMessage.size(cell.mailbox.systemDrain(null)))
} }
System.err.println("Mailbox: " + mq.numberOfMessages + " " + mq.hasMessages) System.err.println("Mailbox: " + mq.numberOfMessages + " " + mq.hasMessages)

View file

@ -73,7 +73,9 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
watch(router) watch(router)
watch(c2) watch(c2)
system.stop(c2) system.stop(c2)
expectMsg(Terminated(c2)) expectMsgPF() {
case t @ Terminated(`c2`) if t.existenceConfirmed == true t
}
// it might take a while until the Router has actually processed the Terminated message // it might take a while until the Router has actually processed the Terminated message
awaitCond { awaitCond {
router ! "" router ! ""
@ -84,7 +86,9 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
res == Seq(c1, c1) res == Seq(c1, c1)
} }
system.stop(c1) system.stop(c1)
expectMsg(Terminated(router)) expectMsgPF() {
case t @ Terminated(`router`) if t.existenceConfirmed == true t
}
} }
"be able to send their routees" in { "be able to send their routees" in {

View file

@ -0,0 +1,19 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor;
import akka.util.Unsafe;
final class AbstractActorCell {
final static long mailboxOffset;
static {
try {
mailboxOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("_mailboxDoNotCallMeDirectly"));
} catch(Throwable t){
throw new ExceptionInInitializerError(t);
}
}
}

View file

@ -8,10 +8,12 @@ import akka.util.Unsafe;
final class AbstractPromiseActorRef { final class AbstractPromiseActorRef {
final static long stateOffset; final static long stateOffset;
final static long watchedByOffset;
static { static {
try { try {
stateOffset = Unsafe.instance.objectFieldOffset(PromiseActorRef.class.getDeclaredField("_stateDoNotCallMeDirectly")); stateOffset = Unsafe.instance.objectFieldOffset(PromiseActorRef.class.getDeclaredField("_stateDoNotCallMeDirectly"));
watchedByOffset = Unsafe.instance.objectFieldOffset(PromiseActorRef.class.getDeclaredField("_watchedByDoNotCallMeDirectly"));
} catch(Throwable t){ } catch(Throwable t){
throw new ExceptionInInitializerError(t); throw new ExceptionInInitializerError(t);
} }

View file

@ -59,7 +59,7 @@ case object Kill extends Kill {
/** /**
* When Death Watch is used, the watcher will receive a Terminated(watched) message when watched is terminated. * When Death Watch is used, the watcher will receive a Terminated(watched) message when watched is terminated.
*/ */
case class Terminated(@BeanProperty actor: ActorRef) extends PossiblyHarmful case class Terminated(@BeanProperty actor: ActorRef)(@BeanProperty val existenceConfirmed: Boolean)
abstract class ReceiveTimeout extends PossiblyHarmful abstract class ReceiveTimeout extends PossiblyHarmful

View file

@ -6,16 +6,15 @@ package akka.actor
import akka.dispatch._ import akka.dispatch._
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.collection.immutable.{ Stack, TreeMap }
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.event.Logging.{ Debug, Warning, Error } import akka.event.Logging.{ Debug, Warning, Error }
import akka.util.{ Duration, Helpers }
import akka.japi.Procedure import akka.japi.Procedure
import java.io.{ NotSerializableException, ObjectOutputStream } import java.io.{ NotSerializableException, ObjectOutputStream }
import akka.serialization.SerializationExtension import akka.serialization.SerializationExtension
import akka.util.NonFatal
import akka.event.Logging.LogEventException import akka.event.Logging.LogEventException
import collection.immutable.{ TreeSet, Stack, TreeMap }
import akka.util.{ Unsafe, Duration, Helpers, NonFatal }
//TODO: everything here for current compatibility - could be limited more //TODO: everything here for current compatibility - could be limited more
@ -187,6 +186,8 @@ private[akka] object ActorCell {
final val behaviorStackPlaceHolder: Stack[Actor.Receive] = Stack.empty.push(Actor.emptyBehavior) final val behaviorStackPlaceHolder: Stack[Actor.Receive] = Stack.empty.push(Actor.emptyBehavior)
final val emptyActorRefSet: Set[ActorRef] = TreeSet.empty
sealed trait SuspendReason sealed trait SuspendReason
case object UserRequest extends SuspendReason case object UserRequest extends SuspendReason
case class Recreation(cause: Throwable) extends SuspendReason case class Recreation(cause: Throwable) extends SuspendReason
@ -317,7 +318,7 @@ private[akka] class ActorCell(
val props: Props, val props: Props,
@volatile var parent: InternalActorRef, @volatile var parent: InternalActorRef,
/*no member*/ _receiveTimeout: Option[Duration]) extends UntypedActorContext { /*no member*/ _receiveTimeout: Option[Duration]) extends UntypedActorContext {
import AbstractActorCell.mailboxOffset
import ActorCell._ import ActorCell._
final def systemImpl = system final def systemImpl = system
@ -407,16 +408,13 @@ private[akka] class ActorCell(
actor.asInstanceOf[InternalActorRef].stop() actor.asInstanceOf[InternalActorRef].stop()
} }
var currentMessage: Envelope = null var currentMessage: Envelope = _
var actor: Actor = _ var actor: Actor = _
private var behaviorStack: Stack[Actor.Receive] = Stack.empty private var behaviorStack: Stack[Actor.Receive] = Stack.empty
@volatile var _mailboxDoNotCallMeDirectly: Mailbox = _ //This must be volatile since it isn't protected by the mailbox status
@volatile //This must be volatile since it isn't protected by the mailbox status
var mailbox: Mailbox = _
var nextNameSequence: Long = 0 var nextNameSequence: Long = 0
var watching: Set[ActorRef] = emptyActorRefSet
var watchedBy: Set[ActorRef] = emptyActorRefSet
//Not thread safe, so should only be used inside the actor that inhabits this ActorCell //Not thread safe, so should only be used inside the actor that inhabits this ActorCell
final protected def randomName(): String = { final protected def randomName(): String = {
@ -428,6 +426,24 @@ private[akka] class ActorCell(
@inline @inline
final val dispatcher: MessageDispatcher = system.dispatchers.lookup(props.dispatcher) final val dispatcher: MessageDispatcher = system.dispatchers.lookup(props.dispatcher)
/**
* INTERNAL API
*
* Returns a reference to the current mailbox
*/
@inline final def mailbox: Mailbox = Unsafe.instance.getObjectVolatile(this, mailboxOffset).asInstanceOf[Mailbox]
/**
* INTERNAL API
*
* replaces the current mailbox using getAndSet semantics
*/
@tailrec final def swapMailbox(newMailbox: Mailbox): Mailbox = {
val oldMailbox = mailbox
if (!Unsafe.instance.compareAndSwapObject(this, mailboxOffset, oldMailbox, newMailbox)) swapMailbox(newMailbox)
else oldMailbox
}
/** /**
* UntypedActorContext impl * UntypedActorContext impl
*/ */
@ -440,7 +456,7 @@ private[akka] class ActorCell(
* Create the mailbox and enqueue the Create() message to ensure that * Create the mailbox and enqueue the Create() message to ensure that
* this is processed before anything else. * this is processed before anything else.
*/ */
mailbox = dispatcher.createMailbox(this) swapMailbox(dispatcher.createMailbox(this))
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
mailbox.systemEnqueue(self, Create()) mailbox.systemEnqueue(self, Create())
@ -460,16 +476,22 @@ private[akka] class ActorCell(
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
final def stop(): Unit = dispatcher.systemDispatch(this, Terminate()) final def stop(): Unit = dispatcher.systemDispatch(this, Terminate())
override final def watch(subject: ActorRef): ActorRef = { override final def watch(subject: ActorRef): ActorRef = subject match {
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ case a: InternalActorRef
dispatcher.systemDispatch(this, Link(subject)) if (a != self && !watching.contains(a)) {
subject a.sendSystemMessage(Watch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
watching += a
}
a
} }
override final def unwatch(subject: ActorRef): ActorRef = { override final def unwatch(subject: ActorRef): ActorRef = subject match {
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ case a: InternalActorRef
dispatcher.systemDispatch(this, Unlink(subject)) if (a != self && watching.contains(a)) {
subject a.sendSystemMessage(Unwatch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
watching -= a
}
a
} }
final def children: Iterable[ActorRef] = childrenRefs.children final def children: Iterable[ActorRef] = childrenRefs.children
@ -477,10 +499,8 @@ private[akka] class ActorCell(
/** /**
* Impl UntypedActorContext * Impl UntypedActorContext
*/ */
final def getChildren(): java.lang.Iterable[ActorRef] = { final def getChildren(): java.lang.Iterable[ActorRef] =
import scala.collection.JavaConverters.asJavaIterableConverter scala.collection.JavaConverters.asJavaIterableConverter(children).asJava
asJavaIterableConverter(children).asJava
}
final def tell(message: Any, sender: ActorRef): Unit = final def tell(message: Any, sender: ActorRef): Unit =
dispatcher.dispatch(this, Envelope(message, if (sender eq null) system.deadLetters else sender)(system)) dispatcher.dispatch(this, Envelope(message, if (sender eq null) system.deadLetters else sender)(system))
@ -567,15 +587,35 @@ private[akka] class ActorCell(
def resume(): Unit = if (isNormal) dispatcher resume this def resume(): Unit = if (isNormal) dispatcher resume this
def link(subject: ActorRef): Unit = if (!isTerminating) { def addWatcher(watchee: ActorRef, watcher: ActorRef): Unit = {
if (system.deathWatch.subscribe(self, subject)) { val watcheeSelf = watchee == self
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now monitoring " + subject)) val watcherSelf = watcher == self
if (watcheeSelf && !watcherSelf) {
if (!watchedBy.contains(watcher)) {
watchedBy += watcher
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now monitoring " + watcher))
}
} else if (!watcheeSelf && watcherSelf) {
watch(watchee)
} else {
system.eventStream.publish(Warning(self.path.toString, clazz(actor), "BUG: illegal Watch(%s,%s) for %s".format(watchee, watcher, self)))
} }
} }
def unlink(subject: ActorRef): Unit = if (!isTerminating) { def remWatcher(watchee: ActorRef, watcher: ActorRef): Unit = {
if (system.deathWatch.unsubscribe(self, subject)) { val watcheeSelf = watchee == self
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopped monitoring " + subject)) val watcherSelf = watcher == self
if (watcheeSelf && !watcherSelf) {
if (watchedBy.contains(watcher)) {
watchedBy -= watcher
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopped monitoring " + watcher))
}
} else if (!watcheeSelf && watcherSelf) {
unwatch(watchee)
} else {
system.eventStream.publish(Warning(self.path.toString, clazz(actor), "BUG: illegal Unwatch(%s,%s) for %s".format(watchee, watcher, self)))
} }
} }
@ -603,15 +643,15 @@ private[akka] class ActorCell(
try { try {
message match { message match {
case Create() create() case Create() create()
case Recreate(cause) recreate(cause) case Recreate(cause) recreate(cause)
case Link(subject) link(subject) case Watch(watchee, watcher) addWatcher(watchee, watcher)
case Unlink(subject) unlink(subject) case Unwatch(watchee, watcher) remWatcher(watchee, watcher)
case Suspend() suspend() case Suspend() suspend()
case Resume() resume() case Resume() resume()
case Terminate() terminate() case Terminate() terminate()
case Supervise(child) supervise(child) case Supervise(child) supervise(child)
case ChildTerminated(child) handleChildTerminated(child) case ChildTerminated(child) handleChildTerminated(child)
} }
} catch { } catch {
case e @ (_: InterruptedException | NonFatal(_)) handleInvokeFailure(e, "error while processing " + message) case e @ (_: InterruptedException | NonFatal(_)) handleInvokeFailure(e, "error while processing " + message)
@ -698,11 +738,32 @@ private[akka] class ActorCell(
} finally { } finally {
try { try {
parent.sendSystemMessage(ChildTerminated(self)) parent.sendSystemMessage(ChildTerminated(self))
system.deathWatch.publish(Terminated(self))
if (!watchedBy.isEmpty) {
val terminated = Terminated(self)(existenceConfirmed = true)
try {
watchedBy foreach {
watcher
try watcher.tell(terminated, self) catch {
case NonFatal(t) system.eventStream.publish(Error(t, self.path.toString, clazz(a), "deathwatch"))
}
}
} finally watchedBy = emptyActorRefSet
}
if (!watching.isEmpty) {
try {
watching foreach { // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
case watchee: InternalActorRef try watchee.sendSystemMessage(Unwatch(watchee, self)) catch {
case NonFatal(t) system.eventStream.publish(Error(t, self.path.toString, clazz(a), "deathwatch"))
}
}
} finally watching = emptyActorRefSet
}
if (system.settings.DebugLifecycle) if (system.settings.DebugLifecycle)
system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopped")) system.eventStream.publish(Debug(self.path.toString, clazz(a), "stopped"))
} finally { } finally {
behaviorStack = ActorCell.behaviorStackPlaceHolder behaviorStack = behaviorStackPlaceHolder
clearActorFields(a) clearActorFields(a)
actor = null actor = null
} }

View file

@ -409,16 +409,26 @@ private[akka] object DeadLetterActorRef {
* *
* INTERNAL API * INTERNAL API
*/ */
private[akka] class EmptyLocalActorRef( private[akka] class EmptyLocalActorRef(override val provider: ActorRefProvider,
override val provider: ActorRefProvider, override val path: ActorPath,
override val path: ActorPath, val eventStream: EventStream) extends MinimalActorRef {
val eventStream: EventStream) extends MinimalActorRef {
override def isTerminated(): Boolean = true override def isTerminated(): Boolean = true
override def sendSystemMessage(message: SystemMessage): Unit = specialHandle(message)
override def !(message: Any)(implicit sender: ActorRef = null): Unit = message match { override def !(message: Any)(implicit sender: ActorRef = null): Unit = message match {
case d: DeadLetter // do NOT form endless loops, since deadLetters will resend! case d: DeadLetter specialHandle(d.message) // do NOT form endless loops, since deadLetters will resend!
case _ eventStream.publish(DeadLetter(message, sender, this)) case _ if (!specialHandle(message)) eventStream.publish(DeadLetter(message, sender, this))
}
protected def specialHandle(msg: Any): Boolean = msg match {
case w: Watch
if (w.watchee == this && w.watcher != this)
w.watcher ! Terminated(w.watchee)(existenceConfirmed = false)
true
case _: Unwatch true // Just ignore
case _ false
} }
} }
@ -428,12 +438,22 @@ private[akka] class EmptyLocalActorRef(
* *
* INTERNAL API * INTERNAL API
*/ */
private[akka] class DeadLetterActorRef(_provider: ActorRefProvider, _path: ActorPath, _eventStream: EventStream) private[akka] class DeadLetterActorRef(_provider: ActorRefProvider,
extends EmptyLocalActorRef(_provider, _path, _eventStream) { _path: ActorPath,
_eventStream: EventStream) extends EmptyLocalActorRef(_provider, _path, _eventStream) {
override def !(message: Any)(implicit sender: ActorRef = this): Unit = message match { override def !(message: Any)(implicit sender: ActorRef = this): Unit = message match {
case d: DeadLetter eventStream.publish(d) case d: DeadLetter if (!specialHandle(d.message)) eventStream.publish(d)
case _ eventStream.publish(DeadLetter(message, sender, this)) case _ if (!specialHandle(message)) eventStream.publish(DeadLetter(message, sender, this))
}
override protected def specialHandle(msg: Any): Boolean = msg match {
case w: Watch
if (w.watchee != this && w.watcher != this)
w.watcher ! Terminated(w.watchee)(existenceConfirmed = false)
true
case w: Unwatch true // Just ignore
case _ false
} }
@throws(classOf[java.io.ObjectStreamException]) @throws(classOf[java.io.ObjectStreamException])

View file

@ -8,8 +8,8 @@ import java.util.concurrent.atomic.AtomicLong
import akka.dispatch._ import akka.dispatch._
import akka.routing._ import akka.routing._
import akka.AkkaException import akka.AkkaException
import akka.util.{ Switch, Helpers }
import akka.event._ import akka.event._
import akka.util.{ NonFatal, Switch, Helpers }
/** /**
* Interface for all ActorRef providers to implement. * Interface for all ActorRef providers to implement.
@ -38,11 +38,6 @@ trait ActorRefProvider {
*/ */
def deadLetters: ActorRef def deadLetters: ActorRef
/**
* Reference to the death watch service.
*/
def deathWatch: DeathWatch
/** /**
* The root path for all actors within this actor system, including remote * The root path for all actors within this actor system, including remote
* address if enabled. * address if enabled.
@ -162,8 +157,9 @@ trait ActorRefFactory {
* INTERNAL USE ONLY * INTERNAL USE ONLY
*/ */
protected def provider: ActorRefProvider protected def provider: ActorRefProvider
/** /**
* Returns the default MessageDispatcher used by this ActorRefFactory * Returns the default MessageDispatcher associated with this ActorRefFactory
*/ */
implicit def dispatcher: MessageDispatcher implicit def dispatcher: MessageDispatcher
@ -339,8 +335,6 @@ class LocalActorRefProvider(
override val deadLetters: InternalActorRef = new DeadLetterActorRef(this, rootPath / "deadLetters", eventStream) override val deadLetters: InternalActorRef = new DeadLetterActorRef(this, rootPath / "deadLetters", eventStream)
override val deathWatch: DeathWatch = new LocalDeathWatch(1024) //TODO make configrable
/* /*
* generate name for temporary actor refs * generate name for temporary actor refs
*/ */
@ -379,9 +373,9 @@ class LocalActorRefProvider(
override def sendSystemMessage(message: SystemMessage): Unit = stopped ifOff { override def sendSystemMessage(message: SystemMessage): Unit = stopped ifOff {
message match { message match {
case Supervise(child) // TODO register child in some map to keep track of it and enable shutdown after all dead case Supervise(_) // TODO register child in some map to keep track of it and enable shutdown after all dead
case ChildTerminated(child) stop() case ChildTerminated(_) stop()
case _ log.error(this + " received unexpected system message [" + message + "]") case _ log.error(this + " received unexpected system message [" + message + "]")
} }
} }
} }
@ -409,8 +403,8 @@ class LocalActorRefProvider(
def receive = { def receive = {
case Terminated(_) context.stop(self) case Terminated(_) context.stop(self)
case CreateChild(child, name) sender ! (try context.actorOf(child, name) catch { case e: Exception e }) // FIXME shouldn't this use NonFatal & Status.Failure? case CreateChild(child, name) sender ! (try context.actorOf(child, name) catch { case NonFatal(e) Status.Failure(e) })
case CreateRandomNameChild(child) sender ! (try context.actorOf(child) catch { case e: Exception e }) // FIXME shouldn't this use NonFatal & Status.Failure? case CreateRandomNameChild(child) sender ! (try context.actorOf(child) catch { case NonFatal(e) Status.Failure(e) })
case StopChild(child) context.stop(child); sender ! "ok" case StopChild(child) context.stop(child); sender ! "ok"
case m deadLetters ! DeadLetter(m, sender, self) case m deadLetters ! DeadLetter(m, sender, self)
} }
@ -441,8 +435,8 @@ class LocalActorRefProvider(
def receive = { def receive = {
case Terminated(_) eventStream.stopDefaultLoggers(); context.stop(self) case Terminated(_) eventStream.stopDefaultLoggers(); context.stop(self)
case CreateChild(child, name) sender ! (try context.actorOf(child, name) catch { case e: Exception e }) // FIXME shouldn't this use NonFatal & Status.Failure? case CreateChild(child, name) sender ! (try context.actorOf(child, name) catch { case NonFatal(e) Status.Failure(e) })
case CreateRandomNameChild(child) sender ! (try context.actorOf(child) catch { case e: Exception e }) // FIXME shouldn't this use NonFatal & Status.Failure? case CreateRandomNameChild(child) sender ! (try context.actorOf(child) catch { case NonFatal(e) Status.Failure(e) })
case StopChild(child) context.stop(child); sender ! "ok" case StopChild(child) context.stop(child); sender ! "ok"
case m deadLetters ! DeadLetter(m, sender, self) case m deadLetters ! DeadLetter(m, sender, self)
} }
@ -480,18 +474,10 @@ class LocalActorRefProvider(
lazy val rootGuardian: InternalActorRef = lazy val rootGuardian: InternalActorRef =
new LocalActorRef(system, guardianProps, theOneWhoWalksTheBubblesOfSpaceTime, rootPath, true) { new LocalActorRef(system, guardianProps, theOneWhoWalksTheBubblesOfSpaceTime, rootPath, true) {
object Extra {
def unapply(s: String): Option[InternalActorRef] = extraNames.get(s)
}
override def getParent: InternalActorRef = this override def getParent: InternalActorRef = this
override def getSingleChild(name: String): InternalActorRef = name match {
override def getSingleChild(name: String): InternalActorRef = { case "temp" tempContainer
name match { case other extraNames.get(other).getOrElse(super.getSingleChild(other))
case "temp" tempContainer
case Extra(e) e
case _ super.getSingleChild(name)
}
} }
} }
@ -516,8 +502,8 @@ class LocalActorRefProvider(
def init(_system: ActorSystemImpl) { def init(_system: ActorSystemImpl) {
system = _system system = _system
// chain death watchers so that killing guardian stops the application // chain death watchers so that killing guardian stops the application
deathWatch.subscribe(systemGuardian, guardian) systemGuardian.sendSystemMessage(Watch(guardian, systemGuardian))
deathWatch.subscribe(rootGuardian, systemGuardian) rootGuardian.sendSystemMessage(Watch(systemGuardian, rootGuardian))
eventStream.startDefaultLoggers(_system) eventStream.startDefaultLoggers(_system)
} }
@ -566,19 +552,3 @@ class LocalActorRefProvider(
def getExternalAddressFor(addr: Address): Option[Address] = if (addr == rootPath.address) Some(addr) else None def getExternalAddressFor(addr: Address): Option[Address] = if (addr == rootPath.address) Some(addr) else None
} }
class LocalDeathWatch(val mapSize: Int) extends DeathWatch with ActorClassification {
override def publish(event: Event): Unit = {
val monitors = dissociate(classify(event))
if (monitors.nonEmpty) monitors.foreach(_ ! event)
}
override def subscribe(subscriber: Subscriber, to: Classifier): Boolean = {
if (!super.subscribe(subscriber, to)) {
subscriber ! Terminated(to)
false
} else true
}
}

View file

@ -408,11 +408,6 @@ abstract class ExtendedActorSystem extends ActorSystem {
*/ */
def systemGuardian: InternalActorRef def systemGuardian: InternalActorRef
/**
* Implementation of the mechanism which is used for watch()/unwatch().
*/
def deathWatch: DeathWatch
/** /**
* A ThreadFactory that can be used if the transport needs to create any Threads * A ThreadFactory that can be used if the transport needs to create any Threads
*/ */
@ -485,26 +480,17 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
private[akka] def systemActorOf(props: Props, name: String): ActorRef = { private[akka] def systemActorOf(props: Props, name: String): ActorRef = {
implicit val timeout = settings.CreationTimeout implicit val timeout = settings.CreationTimeout
Await.result(systemGuardian ? CreateChild(props, name), timeout.duration) match { Await.result((systemGuardian ? CreateChild(props, name)).mapTo[ActorRef], timeout.duration)
case ref: ActorRef ref
case ex: Exception throw ex
}
} }
def actorOf(props: Props, name: String): ActorRef = { def actorOf(props: Props, name: String): ActorRef = {
implicit val timeout = settings.CreationTimeout implicit val timeout = settings.CreationTimeout
Await.result(guardian ? CreateChild(props, name), timeout.duration) match { Await.result((guardian ? CreateChild(props, name)).mapTo[ActorRef], timeout.duration)
case ref: ActorRef ref
case ex: Exception throw ex
}
} }
def actorOf(props: Props): ActorRef = { def actorOf(props: Props): ActorRef = {
implicit val timeout = settings.CreationTimeout implicit val timeout = settings.CreationTimeout
Await.result(guardian ? CreateRandomNameChild(props), timeout.duration) match { Await.result((guardian ? CreateRandomNameChild(props)).mapTo[ActorRef], timeout.duration)
case ref: ActorRef ref
case ex: Exception throw ex
}
} }
def stop(actor: ActorRef): Unit = { def stop(actor: ActorRef): Unit = {
@ -547,7 +533,8 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
//FIXME Why do we need this at all? //FIXME Why do we need this at all?
val deadLetterQueue: MessageQueue = new MessageQueue { val deadLetterQueue: MessageQueue = new MessageQueue {
def enqueue(receiver: ActorRef, envelope: Envelope) { deadLetters ! DeadLetter(envelope.message, envelope.sender, receiver) } def enqueue(receiver: ActorRef, envelope: Envelope): Unit =
deadLetters ! DeadLetter(envelope.message, envelope.sender, receiver)
def dequeue() = null def dequeue() = null
def hasMessages = false def hasMessages = false
def numberOfMessages = 0 def numberOfMessages = 0
@ -556,8 +543,9 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
//FIXME Why do we need this at all? //FIXME Why do we need this at all?
val deadLetterMailbox: Mailbox = new Mailbox(null, deadLetterQueue) { val deadLetterMailbox: Mailbox = new Mailbox(null, deadLetterQueue) {
becomeClosed() becomeClosed()
def systemEnqueue(receiver: ActorRef, handle: SystemMessage): Unit = deadLetters ! DeadLetter(handle, receiver, receiver) def systemEnqueue(receiver: ActorRef, handle: SystemMessage): Unit =
def systemDrain(): SystemMessage = null deadLetters ! DeadLetter(handle, receiver, receiver)
def systemDrain(newContents: SystemMessage): SystemMessage = null
def hasSystemMessages = false def hasSystemMessages = false
} }
@ -570,7 +558,6 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
def lookupRoot: InternalActorRef = provider.rootGuardian def lookupRoot: InternalActorRef = provider.rootGuardian
def guardian: InternalActorRef = provider.guardian def guardian: InternalActorRef = provider.guardian
def systemGuardian: InternalActorRef = provider.systemGuardian def systemGuardian: InternalActorRef = provider.systemGuardian
def deathWatch: DeathWatch = provider.deathWatch
def /(actorName: String): ActorPath = guardian.path / actorName def /(actorName: String): ActorPath = guardian.path / actorName
def /(path: Iterable[String]): ActorPath = guardian.path / path def /(path: Iterable[String]): ActorPath = guardian.path / path

View file

@ -102,11 +102,15 @@ private[akka] case class ChildTerminated(child: ActorRef) extends SystemMessage
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] case class Link(subject: ActorRef) extends SystemMessage // sent to self from ActorCell.watch private[akka] case class Watch(watchee: ActorRef, watcher: ActorRef) extends SystemMessage // sent to establish a DeathWatch
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] case class Unlink(subject: ActorRef) extends SystemMessage // sent to self from ActorCell.unwatch private[akka] case class Unwatch(watchee: ActorRef, watcher: ActorRef) extends SystemMessage // sent to tear down a DeathWatch
/**
* INTERNAL API
*/
private[akka] case object NoMessage extends SystemMessage // switched into the mailbox to signal termination
final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cleanup: () Unit) extends Runnable { final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cleanup: () Unit) extends Runnable {
def run(): Unit = def run(): Unit =
@ -310,16 +314,14 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
case 0 case 0
shutdownSchedule match { shutdownSchedule match {
case UNSCHEDULED case UNSCHEDULED
if (updateShutdownSchedule(UNSCHEDULED, SCHEDULED)) { if (updateShutdownSchedule(UNSCHEDULED, SCHEDULED)) scheduleShutdownAction()
scheduleShutdownAction() else ifSensibleToDoSoThenScheduleShutdown()
()
} else ifSensibleToDoSoThenScheduleShutdown()
case SCHEDULED case SCHEDULED
if (updateShutdownSchedule(SCHEDULED, RESCHEDULED)) () if (updateShutdownSchedule(SCHEDULED, RESCHEDULED)) ()
else ifSensibleToDoSoThenScheduleShutdown() else ifSensibleToDoSoThenScheduleShutdown()
case RESCHEDULED () case RESCHEDULED
} }
case _ () case _
} }
private def scheduleShutdownAction(): Unit = { private def scheduleShutdownAction(): Unit = {
@ -349,9 +351,8 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
protected[akka] def unregister(actor: ActorCell) { protected[akka] def unregister(actor: ActorCell) {
if (debug) actors.remove(this, actor.self) if (debug) actors.remove(this, actor.self)
addInhabitants(-1) addInhabitants(-1)
val mailBox = actor.mailbox val mailBox = actor.swapMailbox(deadLetterMailbox)
mailBox.becomeClosed() // FIXME reschedule in tell if possible race with cleanUp is detected in order to properly clean up mailBox.becomeClosed() // FIXME reschedule in tell if possible race with cleanUp is detected in order to properly clean up
actor.mailbox = deadLetterMailbox
mailBox.cleanUp() mailBox.cleanUp()
} }
@ -359,7 +360,6 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
@tailrec @tailrec
final def run() { final def run() {
shutdownSchedule match { shutdownSchedule match {
case UNSCHEDULED ()
case SCHEDULED case SCHEDULED
try { try {
if (inhabitants == 0) shutdown() //Warning, racy if (inhabitants == 0) shutdown() //Warning, racy
@ -369,6 +369,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
case RESCHEDULED case RESCHEDULED
if (updateShutdownSchedule(RESCHEDULED, SCHEDULED)) scheduleShutdownAction() if (updateShutdownSchedule(RESCHEDULED, SCHEDULED)) scheduleShutdownAction()
else run() else run()
case UNSCHEDULED
} }
} }
} }

View file

@ -50,17 +50,15 @@ class BalancingDispatcher(
private class SharingMailbox(_actor: ActorCell, _messageQueue: MessageQueue) extends Mailbox(_actor, _messageQueue) with DefaultSystemMessageQueue { private class SharingMailbox(_actor: ActorCell, _messageQueue: MessageQueue) extends Mailbox(_actor, _messageQueue) with DefaultSystemMessageQueue {
override def cleanUp(): Unit = { override def cleanUp(): Unit = {
val dlq = actor.systemImpl.deadLetterMailbox
//Don't call the original implementation of this since it scraps all messages, and we don't want to do that //Don't call the original implementation of this since it scraps all messages, and we don't want to do that
if (hasSystemMessages) { var message = systemDrain(NoMessage)
val dlq = actor.systemImpl.deadLetterMailbox while (message ne null) {
var message = systemDrain() // message must be virgin before being able to systemEnqueue again
while (message ne null) { val next = message.next
// message must be virgin before being able to systemEnqueue again message.next = null
val next = message.next dlq.systemEnqueue(actor.self, message)
message.next = null message = next
dlq.systemEnqueue(actor.self, message)
message = next
}
} }
} }
} }

View file

@ -169,6 +169,7 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes
*/ */
protected final def systemQueueGet: SystemMessage = protected final def systemQueueGet: SystemMessage =
Unsafe.instance.getObjectVolatile(this, AbstractMailbox.systemMessageOffset).asInstanceOf[SystemMessage] Unsafe.instance.getObjectVolatile(this, AbstractMailbox.systemMessageOffset).asInstanceOf[SystemMessage]
protected final def systemQueuePut(_old: SystemMessage, _new: SystemMessage): Boolean = protected final def systemQueuePut(_old: SystemMessage, _new: SystemMessage): Boolean =
Unsafe.instance.compareAndSwapObject(this, AbstractMailbox.systemMessageOffset, _old, _new) Unsafe.instance.compareAndSwapObject(this, AbstractMailbox.systemMessageOffset, _old, _new)
@ -208,14 +209,14 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes
} }
final def processAllSystemMessages() { final def processAllSystemMessages() {
var nextMessage = systemDrain() var nextMessage = systemDrain(null)
try { try {
while ((nextMessage ne null) && !isClosed) { while ((nextMessage ne null) && !isClosed) {
if (debug) println(actor.self + " processing system message " + nextMessage + " with " + actor.childrenRefs) if (debug) println(actor.self + " processing system message " + nextMessage + " with " + actor.childrenRefs)
actor systemInvoke nextMessage actor systemInvoke nextMessage
nextMessage = nextMessage.next nextMessage = nextMessage.next
// dont ever execute normal message when system message present! // dont ever execute normal message when system message present!
if (nextMessage eq null) nextMessage = systemDrain() if (nextMessage eq null) nextMessage = systemDrain(null)
} }
} catch { } catch {
case NonFatal(e) case NonFatal(e)
@ -235,15 +236,13 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes
protected[dispatch] def cleanUp(): Unit = protected[dispatch] def cleanUp(): Unit =
if (actor ne null) { // actor is null for the deadLetterMailbox if (actor ne null) { // actor is null for the deadLetterMailbox
val dlm = actor.systemImpl.deadLetterMailbox val dlm = actor.systemImpl.deadLetterMailbox
if (hasSystemMessages) { var message = systemDrain(NoMessage)
var message = systemDrain() while (message ne null) {
while (message ne null) { // message must be virgin before being able to systemEnqueue again
// message must be virgin before being able to systemEnqueue again val next = message.next
val next = message.next message.next = null
message.next = null dlm.systemEnqueue(actor.self, message)
dlm.systemEnqueue(actor.self, message) message = next
message = next
}
} }
if (messageQueue ne null) // needed for CallingThreadDispatcher, which never calls Mailbox.run() if (messageQueue ne null) // needed for CallingThreadDispatcher, which never calls Mailbox.run()
@ -300,7 +299,7 @@ private[akka] trait SystemMessageQueue {
/** /**
* Dequeue all messages from system queue and return them as single-linked list. * Dequeue all messages from system queue and return them as single-linked list.
*/ */
def systemDrain(): SystemMessage def systemDrain(newContents: SystemMessage): SystemMessage
def hasSystemMessages: Boolean def hasSystemMessages: Boolean
} }
@ -315,26 +314,30 @@ private[akka] trait DefaultSystemMessageQueue { self: Mailbox ⇒
assert(message.next eq null) assert(message.next eq null)
if (Mailbox.debug) println(actor.self + " having enqueued " + message) if (Mailbox.debug) println(actor.self + " having enqueued " + message)
val head = systemQueueGet val head = systemQueueGet
/* if (head == NoMessage) actor.system.deadLetterMailbox.systemEnqueue(receiver, message)
* this write is safely published by the compareAndSet contained within else {
* systemQueuePut; Intra-Thread Semantics on page 12 of the JSR133 spec /*
* guarantees that head uses the value obtained from systemQueueGet above. * this write is safely published by the compareAndSet contained within
* Hence, SystemMessage.next does not need to be volatile. * systemQueuePut; Intra-Thread Semantics on page 12 of the JSR133 spec
*/ * guarantees that head uses the value obtained from systemQueueGet above.
message.next = head * Hence, SystemMessage.next does not need to be volatile.
if (!systemQueuePut(head, message)) { */
message.next = null message.next = head
systemEnqueue(receiver, message) if (!systemQueuePut(head, message)) {
message.next = null
systemEnqueue(receiver, message)
}
} }
} }
@tailrec @tailrec
final def systemDrain(): SystemMessage = { final def systemDrain(newContents: SystemMessage): SystemMessage = {
val head = systemQueueGet val head = systemQueueGet
if (systemQueuePut(head, null)) SystemMessage.reverse(head) else systemDrain() if (systemQueuePut(head, newContents)) SystemMessage.reverse(head) else systemDrain(newContents)
} }
def hasSystemMessages: Boolean = systemQueueGet ne null def hasSystemMessages: Boolean = systemQueueGet ne null
} }
/** /**

View file

@ -29,30 +29,20 @@ object ThreadPoolConfig {
val defaultTimeout: Duration = Duration(60000L, TimeUnit.MILLISECONDS) val defaultTimeout: Duration = Duration(60000L, TimeUnit.MILLISECONDS)
val defaultRejectionPolicy: RejectedExecutionHandler = new SaneRejectedExecutionHandler() val defaultRejectionPolicy: RejectedExecutionHandler = new SaneRejectedExecutionHandler()
def scaledPoolSize(floor: Int, multiplier: Double, ceiling: Int): Int = { def scaledPoolSize(floor: Int, multiplier: Double, ceiling: Int): Int =
import scala.math.{ min, max } math.min(math.max((Runtime.getRuntime.availableProcessors * multiplier).ceil.toInt, floor), ceiling)
min(max((Runtime.getRuntime.availableProcessors * multiplier).ceil.toInt, floor), ceiling)
}
def arrayBlockingQueue(capacity: Int, fair: Boolean): QueueFactory = def arrayBlockingQueue(capacity: Int, fair: Boolean): QueueFactory = () new ArrayBlockingQueue[Runnable](capacity, fair)
() new ArrayBlockingQueue[Runnable](capacity, fair)
def synchronousQueue(fair: Boolean): QueueFactory = def synchronousQueue(fair: Boolean): QueueFactory = () new SynchronousQueue[Runnable](fair)
() new SynchronousQueue[Runnable](fair)
def linkedBlockingQueue(): QueueFactory = def linkedBlockingQueue(): QueueFactory = () new LinkedBlockingQueue[Runnable]()
() new LinkedBlockingQueue[Runnable]()
def linkedBlockingQueue(capacity: Int): QueueFactory = def linkedBlockingQueue(capacity: Int): QueueFactory = () new LinkedBlockingQueue[Runnable](capacity)
() new LinkedBlockingQueue[Runnable](capacity)
def reusableQueue(queue: BlockingQueue[Runnable]): QueueFactory = def reusableQueue(queue: BlockingQueue[Runnable]): QueueFactory = () queue
() queue
def reusableQueue(queueFactory: QueueFactory): QueueFactory = { def reusableQueue(queueFactory: QueueFactory): QueueFactory = reusableQueue(queueFactory())
val queue = queueFactory()
() queue
}
} }
/** /**
@ -157,7 +147,8 @@ case class ThreadPoolConfigBuilder(config: ThreadPoolConfig) {
def setQueueFactory(newQueueFactory: QueueFactory): ThreadPoolConfigBuilder = def setQueueFactory(newQueueFactory: QueueFactory): ThreadPoolConfigBuilder =
this.copy(config = config.copy(queueFactory = newQueueFactory)) this.copy(config = config.copy(queueFactory = newQueueFactory))
def configure(fs: Option[Function[ThreadPoolConfigBuilder, ThreadPoolConfigBuilder]]*): ThreadPoolConfigBuilder = fs.foldLeft(this)((c, f) f.map(_(c)).getOrElse(c)) def configure(fs: Option[Function[ThreadPoolConfigBuilder, ThreadPoolConfigBuilder]]*): ThreadPoolConfigBuilder =
fs.foldLeft(this)((c, f) f.map(_(c)).getOrElse(c))
} }
object MonitorableThreadFactory { object MonitorableThreadFactory {

View file

@ -1,19 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.event
import akka.actor._
/**
* The contract of DeathWatch is not properly expressed using the type system
* Whenever there is a publish, all listeners to the Terminated Actor should be atomically removed
* A failed subscribe should also only mean that the Classifier (ActorRef) that is listened to is already shut down
* See LocalDeathWatch for semantics
*/
abstract class DeathWatch extends ActorEventBus with ActorClassifier {
type Event = Terminated
protected final def classify(event: Event): Classifier = event.actor
}

View file

@ -4,12 +4,10 @@
package akka.pattern package akka.pattern
import java.util.concurrent.TimeoutException import java.util.concurrent.TimeoutException
import akka.dispatch.{ Promise, Terminate, SystemMessage, Future }
import akka.event.DeathWatch
import akka.util.Timeout
import annotation.tailrec import annotation.tailrec
import akka.util.Unsafe
import akka.actor._ import akka.actor._
import akka.dispatch._
import akka.util.{ NonFatal, Timeout, Unsafe }
/** /**
* This is what is used to complete a Future that is returned from an ask/? call, * This is what is used to complete a Future that is returned from an ask/? call,
@ -164,6 +162,7 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide
extends MinimalActorRef { extends MinimalActorRef {
import PromiseActorRef._ import PromiseActorRef._
import AbstractPromiseActorRef.stateOffset import AbstractPromiseActorRef.stateOffset
import AbstractPromiseActorRef.watchedByOffset
/** /**
* As an optimization for the common (local) case we only register this PromiseActorRef * As an optimization for the common (local) case we only register this PromiseActorRef
@ -180,14 +179,43 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide
@volatile @volatile
private[this] var _stateDoNotCallMeDirectly: AnyRef = _ private[this] var _stateDoNotCallMeDirectly: AnyRef = _
@inline @volatile
private def state: AnyRef = Unsafe.instance.getObjectVolatile(this, stateOffset) private[this] var _watchedByDoNotCallMeDirectly: Set[ActorRef] = ActorCell.emptyActorRefSet
@inline @inline
private def updateState(oldState: AnyRef, newState: AnyRef): Boolean = Unsafe.instance.compareAndSwapObject(this, stateOffset, oldState, newState) private[this] def watchedBy: Set[ActorRef] = Unsafe.instance.getObjectVolatile(this, watchedByOffset).asInstanceOf[Set[ActorRef]]
@inline @inline
private def setState(newState: AnyRef): Unit = Unsafe.instance.putObjectVolatile(this, stateOffset, newState) private[this] def updateWatchedBy(oldWatchedBy: Set[ActorRef], newWatchedBy: Set[ActorRef]): Boolean =
Unsafe.instance.compareAndSwapObject(this, watchedByOffset, oldWatchedBy, newWatchedBy)
@tailrec // Returns false if the Promise is already completed
private[this] final def addWatcher(watcher: ActorRef): Boolean = watchedBy match {
case null false
case other updateWatchedBy(other, other + watcher) || addWatcher(watcher)
}
@tailrec
private[this] final def remWatcher(watcher: ActorRef): Unit = watchedBy match {
case null ()
case other if (!updateWatchedBy(other, other - watcher)) remWatcher(watcher)
}
@tailrec
private[this] final def clearWatchers(): Set[ActorRef] = watchedBy match {
case null ActorCell.emptyActorRefSet
case other if (!updateWatchedBy(other, null)) clearWatchers() else other
}
@inline
private[this] def state: AnyRef = Unsafe.instance.getObjectVolatile(this, stateOffset)
@inline
private[this] def updateState(oldState: AnyRef, newState: AnyRef): Boolean =
Unsafe.instance.compareAndSwapObject(this, stateOffset, oldState, newState)
@inline
private[this] def setState(newState: AnyRef): Unit = Unsafe.instance.putObjectVolatile(this, stateOffset, newState)
override def getParent: InternalActorRef = provider.tempContainer override def getParent: InternalActorRef = provider.tempContainer
@ -218,20 +246,25 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide
override def !(message: Any)(implicit sender: ActorRef = null): Unit = state match { override def !(message: Any)(implicit sender: ActorRef = null): Unit = state match {
case Stopped | _: StoppedWithPath provider.deadLetters ! message case Stopped | _: StoppedWithPath provider.deadLetters ! message
case _ case _ if (!(result.tryComplete {
val completedJustNow = result.tryComplete { message match {
message match { case Status.Success(r) Right(r)
case Status.Success(r) Right(r) case Status.Failure(f) Left(f)
case Status.Failure(f) Left(f) case other Right(other)
case other Right(other)
}
} }
if (!completedJustNow) provider.deadLetters ! message })) provider.deadLetters ! message
} }
override def sendSystemMessage(message: SystemMessage): Unit = message match { override def sendSystemMessage(message: SystemMessage): Unit = message match {
case _: Terminate stop() case _: Terminate stop()
case _ case Watch(watchee, watcher)
if (watchee == this && watcher != this) {
if (!addWatcher(watcher)) watcher ! Terminated(watchee)(existenceConfirmed = true)
} else System.err.println("BUG: illegal Watch(%s,%s) for %s".format(watchee, watcher, this))
case Unwatch(watchee, watcher)
if (watchee == this && watcher != this) remWatcher(watcher)
else System.err.println("BUG: illegal Unwatch(%s,%s) for %s".format(watchee, watcher, this))
case _
} }
override def isTerminated: Boolean = state match { override def isTerminated: Boolean = state match {
@ -241,23 +274,20 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide
@tailrec @tailrec
override def stop(): Unit = { override def stop(): Unit = {
def ensurePromiseCompleted(): Unit = def ensureCompleted(): Unit = {
if (!result.isCompleted) result.tryComplete(Left(new ActorKilledException("Stopped"))) if (!result.isCompleted) result.tryComplete(Left(new ActorKilledException("Stopped")))
val watchers = clearWatchers()
if (!watchers.isEmpty) {
val termination = Terminated(this)(existenceConfirmed = true)
watchers foreach { w try w.tell(termination, this) catch { case NonFatal(t) /* FIXME LOG THIS */ } }
}
}
state match { state match {
case null case null // if path was never queried nobody can possibly be watching us, so we don't have to publish termination either
// if path was never queried nobody can possibly be watching us, so we don't have to publish termination either if (updateState(null, Stopped)) ensureCompleted() else stop()
if (updateState(null, Stopped)) ensurePromiseCompleted()
else stop()
case p: ActorPath case p: ActorPath
if (updateState(p, StoppedWithPath(p))) { if (updateState(p, StoppedWithPath(p))) { try ensureCompleted() finally provider.unregisterTempActor(p) } else stop()
try { case Stopped | _: StoppedWithPath // already stopped
ensurePromiseCompleted()
provider.deathWatch.publish(Terminated(this))
} finally {
provider.unregisterTempActor(p)
}
} else stop()
case Stopped | _: StoppedWithPath
case Registering stop() // spin until registration is completed before stopping case Registering stop() // spin until registration is completed before stopping
} }
} }

View file

@ -4,9 +4,9 @@
package akka.pattern package akka.pattern
import akka.dispatch.{ Promise, Future }
import akka.actor._ import akka.actor._
import akka.util.{ Timeout, Duration } import akka.util.{ Timeout, Duration }
import akka.dispatch.{ Unwatch, Watch, Promise, Future }
trait GracefulStopSupport { trait GracefulStopSupport {
/** /**
@ -34,18 +34,21 @@ trait GracefulStopSupport {
* is completed with failure [[akka.pattern.AskTimeoutException]]. * is completed with failure [[akka.pattern.AskTimeoutException]].
*/ */
def gracefulStop(target: ActorRef, timeout: Duration)(implicit system: ActorSystem): Future[Boolean] = { def gracefulStop(target: ActorRef, timeout: Duration)(implicit system: ActorSystem): Future[Boolean] = {
if (target.isTerminated) { if (target.isTerminated) Promise.successful(true)
Promise.successful(true) else system match {
} else system match {
case e: ExtendedActorSystem case e: ExtendedActorSystem
val internalTarget = target.asInstanceOf[InternalActorRef]
val ref = PromiseActorRef(e.provider, Timeout(timeout)) val ref = PromiseActorRef(e.provider, Timeout(timeout))
e.deathWatch.subscribe(ref, target) internalTarget.sendSystemMessage(Watch(target, ref))
ref.result onComplete { ref.result onComplete { // Just making sure we're not leaking here
case Right(Terminated(`target`)) () // Ignore case Right(Terminated(`target`)) ()
case _ e.deathWatch.unsubscribe(ref, target) case _ internalTarget.sendSystemMessage(Unwatch(target, ref))
} // Just making sure we're not leaking here }
target ! PoisonPill target ! PoisonPill
ref.result map { case Terminated(`target`) true } ref.result map {
case Terminated(`target`) true
case _ false
}
case s throw new IllegalArgumentException("Unknown ActorSystem implementation: '" + s + "'") case s throw new IllegalArgumentException("Unknown ActorSystem implementation: '" + s + "'")
} }
} }

View file

@ -182,7 +182,7 @@ public class FaultHandlingTestBase {
final TestProbe probe = new TestProbe(system); final TestProbe probe = new TestProbe(system);
probe.watch(child); probe.watch(child);
child.tell(new IllegalArgumentException()); child.tell(new IllegalArgumentException());
probe.expectMsg(new Terminated(child)); probe.expectMsg(new Terminated(child, true));
//#stop //#stop
//#escalate-kill //#escalate-kill
@ -190,7 +190,7 @@ public class FaultHandlingTestBase {
probe.watch(child); probe.watch(child);
assert Await.result(ask(child, "get", 5000), timeout).equals(0); assert Await.result(ask(child, "get", 5000), timeout).equals(0);
child.tell(new Exception()); child.tell(new Exception());
probe.expectMsg(new Terminated(child)); probe.expectMsg(new Terminated(child, true));
//#escalate-kill //#escalate-kill
//#escalate-restart //#escalate-restart

View file

@ -111,7 +111,7 @@ class FaultHandlingDocSpec extends AkkaSpec with ImplicitSender {
//#stop //#stop
watch(child) // have testActor watch child watch(child) // have testActor watch child
child ! new IllegalArgumentException // break it child ! new IllegalArgumentException // break it
expectMsg(Terminated(child)) expectMsg(Terminated(child)(existenceConfirmed = true))
child.isTerminated must be(true) child.isTerminated must be(true)
//#stop //#stop
} }
@ -125,7 +125,7 @@ class FaultHandlingDocSpec extends AkkaSpec with ImplicitSender {
expectMsg(0) expectMsg(0)
child2 ! new Exception("CRASH") // escalate failure child2 ! new Exception("CRASH") // escalate failure
expectMsg(Terminated(child2)) expectMsg(Terminated(child2)(existenceConfirmed = true))
//#escalate-kill //#escalate-kill
//#escalate-restart //#escalate-restart
val supervisor2 = system.actorOf(Props[Supervisor2], "supervisor2") val supervisor2 = system.actorOf(Props[Supervisor2], "supervisor2")

View file

@ -6351,605 +6351,6 @@ public final class RemoteProtocol {
// @@protoc_insertion_point(class_scope:DeployProtocol) // @@protoc_insertion_point(class_scope:DeployProtocol)
} }
public interface DaemonMsgWatchProtocolOrBuilder
extends com.google.protobuf.MessageOrBuilder {
// required .ActorRefProtocol watcher = 1;
boolean hasWatcher();
akka.remote.RemoteProtocol.ActorRefProtocol getWatcher();
akka.remote.RemoteProtocol.ActorRefProtocolOrBuilder getWatcherOrBuilder();
// required .ActorRefProtocol watched = 2;
boolean hasWatched();
akka.remote.RemoteProtocol.ActorRefProtocol getWatched();
akka.remote.RemoteProtocol.ActorRefProtocolOrBuilder getWatchedOrBuilder();
}
public static final class DaemonMsgWatchProtocol extends
com.google.protobuf.GeneratedMessage
implements DaemonMsgWatchProtocolOrBuilder {
// Use DaemonMsgWatchProtocol.newBuilder() to construct.
private DaemonMsgWatchProtocol(Builder builder) {
super(builder);
}
private DaemonMsgWatchProtocol(boolean noInit) {}
private static final DaemonMsgWatchProtocol defaultInstance;
public static DaemonMsgWatchProtocol getDefaultInstance() {
return defaultInstance;
}
public DaemonMsgWatchProtocol getDefaultInstanceForType() {
return defaultInstance;
}
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return akka.remote.RemoteProtocol.internal_static_DaemonMsgWatchProtocol_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return akka.remote.RemoteProtocol.internal_static_DaemonMsgWatchProtocol_fieldAccessorTable;
}
private int bitField0_;
// required .ActorRefProtocol watcher = 1;
public static final int WATCHER_FIELD_NUMBER = 1;
private akka.remote.RemoteProtocol.ActorRefProtocol watcher_;
public boolean hasWatcher() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
public akka.remote.RemoteProtocol.ActorRefProtocol getWatcher() {
return watcher_;
}
public akka.remote.RemoteProtocol.ActorRefProtocolOrBuilder getWatcherOrBuilder() {
return watcher_;
}
// required .ActorRefProtocol watched = 2;
public static final int WATCHED_FIELD_NUMBER = 2;
private akka.remote.RemoteProtocol.ActorRefProtocol watched_;
public boolean hasWatched() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
public akka.remote.RemoteProtocol.ActorRefProtocol getWatched() {
return watched_;
}
public akka.remote.RemoteProtocol.ActorRefProtocolOrBuilder getWatchedOrBuilder() {
return watched_;
}
private void initFields() {
watcher_ = akka.remote.RemoteProtocol.ActorRefProtocol.getDefaultInstance();
watched_ = akka.remote.RemoteProtocol.ActorRefProtocol.getDefaultInstance();
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
byte isInitialized = memoizedIsInitialized;
if (isInitialized != -1) return isInitialized == 1;
if (!hasWatcher()) {
memoizedIsInitialized = 0;
return false;
}
if (!hasWatched()) {
memoizedIsInitialized = 0;
return false;
}
if (!getWatcher().isInitialized()) {
memoizedIsInitialized = 0;
return false;
}
if (!getWatched().isInitialized()) {
memoizedIsInitialized = 0;
return false;
}
memoizedIsInitialized = 1;
return true;
}
public void writeTo(com.google.protobuf.CodedOutputStream output)
throws java.io.IOException {
getSerializedSize();
if (((bitField0_ & 0x00000001) == 0x00000001)) {
output.writeMessage(1, watcher_);
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
output.writeMessage(2, watched_);
}
getUnknownFields().writeTo(output);
}
private int memoizedSerializedSize = -1;
public int getSerializedSize() {
int size = memoizedSerializedSize;
if (size != -1) return size;
size = 0;
if (((bitField0_ & 0x00000001) == 0x00000001)) {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(1, watcher_);
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(2, watched_);
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
}
private static final long serialVersionUID = 0L;
@java.lang.Override
protected java.lang.Object writeReplace()
throws java.io.ObjectStreamException {
return super.writeReplace();
}
public static akka.remote.RemoteProtocol.DaemonMsgWatchProtocol parseFrom(
com.google.protobuf.ByteString data)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data).buildParsed();
}
public static akka.remote.RemoteProtocol.DaemonMsgWatchProtocol parseFrom(
com.google.protobuf.ByteString data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data, extensionRegistry)
.buildParsed();
}
public static akka.remote.RemoteProtocol.DaemonMsgWatchProtocol parseFrom(byte[] data)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data).buildParsed();
}
public static akka.remote.RemoteProtocol.DaemonMsgWatchProtocol parseFrom(
byte[] data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data, extensionRegistry)
.buildParsed();
}
public static akka.remote.RemoteProtocol.DaemonMsgWatchProtocol parseFrom(java.io.InputStream input)
throws java.io.IOException {
return newBuilder().mergeFrom(input).buildParsed();
}
public static akka.remote.RemoteProtocol.DaemonMsgWatchProtocol parseFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return newBuilder().mergeFrom(input, extensionRegistry)
.buildParsed();
}
public static akka.remote.RemoteProtocol.DaemonMsgWatchProtocol parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
Builder builder = newBuilder();
if (builder.mergeDelimitedFrom(input)) {
return builder.buildParsed();
} else {
return null;
}
}
public static akka.remote.RemoteProtocol.DaemonMsgWatchProtocol parseDelimitedFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
Builder builder = newBuilder();
if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
return builder.buildParsed();
} else {
return null;
}
}
public static akka.remote.RemoteProtocol.DaemonMsgWatchProtocol parseFrom(
com.google.protobuf.CodedInputStream input)
throws java.io.IOException {
return newBuilder().mergeFrom(input).buildParsed();
}
public static akka.remote.RemoteProtocol.DaemonMsgWatchProtocol parseFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return newBuilder().mergeFrom(input, extensionRegistry)
.buildParsed();
}
public static Builder newBuilder() { return Builder.create(); }
public Builder newBuilderForType() { return newBuilder(); }
public static Builder newBuilder(akka.remote.RemoteProtocol.DaemonMsgWatchProtocol prototype) {
return newBuilder().mergeFrom(prototype);
}
public Builder toBuilder() { return newBuilder(this); }
@java.lang.Override
protected Builder newBuilderForType(
com.google.protobuf.GeneratedMessage.BuilderParent parent) {
Builder builder = new Builder(parent);
return builder;
}
public static final class Builder extends
com.google.protobuf.GeneratedMessage.Builder<Builder>
implements akka.remote.RemoteProtocol.DaemonMsgWatchProtocolOrBuilder {
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return akka.remote.RemoteProtocol.internal_static_DaemonMsgWatchProtocol_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return akka.remote.RemoteProtocol.internal_static_DaemonMsgWatchProtocol_fieldAccessorTable;
}
// Construct using akka.remote.RemoteProtocol.DaemonMsgWatchProtocol.newBuilder()
private Builder() {
maybeForceBuilderInitialization();
}
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
private void maybeForceBuilderInitialization() {
if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
getWatcherFieldBuilder();
getWatchedFieldBuilder();
}
}
private static Builder create() {
return new Builder();
}
public Builder clear() {
super.clear();
if (watcherBuilder_ == null) {
watcher_ = akka.remote.RemoteProtocol.ActorRefProtocol.getDefaultInstance();
} else {
watcherBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000001);
if (watchedBuilder_ == null) {
watched_ = akka.remote.RemoteProtocol.ActorRefProtocol.getDefaultInstance();
} else {
watchedBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000002);
return this;
}
public Builder clone() {
return create().mergeFrom(buildPartial());
}
public com.google.protobuf.Descriptors.Descriptor
getDescriptorForType() {
return akka.remote.RemoteProtocol.DaemonMsgWatchProtocol.getDescriptor();
}
public akka.remote.RemoteProtocol.DaemonMsgWatchProtocol getDefaultInstanceForType() {
return akka.remote.RemoteProtocol.DaemonMsgWatchProtocol.getDefaultInstance();
}
public akka.remote.RemoteProtocol.DaemonMsgWatchProtocol build() {
akka.remote.RemoteProtocol.DaemonMsgWatchProtocol result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(result);
}
return result;
}
private akka.remote.RemoteProtocol.DaemonMsgWatchProtocol buildParsed()
throws com.google.protobuf.InvalidProtocolBufferException {
akka.remote.RemoteProtocol.DaemonMsgWatchProtocol result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(
result).asInvalidProtocolBufferException();
}
return result;
}
public akka.remote.RemoteProtocol.DaemonMsgWatchProtocol buildPartial() {
akka.remote.RemoteProtocol.DaemonMsgWatchProtocol result = new akka.remote.RemoteProtocol.DaemonMsgWatchProtocol(this);
int from_bitField0_ = bitField0_;
int to_bitField0_ = 0;
if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
to_bitField0_ |= 0x00000001;
}
if (watcherBuilder_ == null) {
result.watcher_ = watcher_;
} else {
result.watcher_ = watcherBuilder_.build();
}
if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
to_bitField0_ |= 0x00000002;
}
if (watchedBuilder_ == null) {
result.watched_ = watched_;
} else {
result.watched_ = watchedBuilder_.build();
}
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
}
public Builder mergeFrom(com.google.protobuf.Message other) {
if (other instanceof akka.remote.RemoteProtocol.DaemonMsgWatchProtocol) {
return mergeFrom((akka.remote.RemoteProtocol.DaemonMsgWatchProtocol)other);
} else {
super.mergeFrom(other);
return this;
}
}
public Builder mergeFrom(akka.remote.RemoteProtocol.DaemonMsgWatchProtocol other) {
if (other == akka.remote.RemoteProtocol.DaemonMsgWatchProtocol.getDefaultInstance()) return this;
if (other.hasWatcher()) {
mergeWatcher(other.getWatcher());
}
if (other.hasWatched()) {
mergeWatched(other.getWatched());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
public final boolean isInitialized() {
if (!hasWatcher()) {
return false;
}
if (!hasWatched()) {
return false;
}
if (!getWatcher().isInitialized()) {
return false;
}
if (!getWatched().isInitialized()) {
return false;
}
return true;
}
public Builder mergeFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
com.google.protobuf.UnknownFieldSet.Builder unknownFields =
com.google.protobuf.UnknownFieldSet.newBuilder(
this.getUnknownFields());
while (true) {
int tag = input.readTag();
switch (tag) {
case 0:
this.setUnknownFields(unknownFields.build());
onChanged();
return this;
default: {
if (!parseUnknownField(input, unknownFields,
extensionRegistry, tag)) {
this.setUnknownFields(unknownFields.build());
onChanged();
return this;
}
break;
}
case 10: {
akka.remote.RemoteProtocol.ActorRefProtocol.Builder subBuilder = akka.remote.RemoteProtocol.ActorRefProtocol.newBuilder();
if (hasWatcher()) {
subBuilder.mergeFrom(getWatcher());
}
input.readMessage(subBuilder, extensionRegistry);
setWatcher(subBuilder.buildPartial());
break;
}
case 18: {
akka.remote.RemoteProtocol.ActorRefProtocol.Builder subBuilder = akka.remote.RemoteProtocol.ActorRefProtocol.newBuilder();
if (hasWatched()) {
subBuilder.mergeFrom(getWatched());
}
input.readMessage(subBuilder, extensionRegistry);
setWatched(subBuilder.buildPartial());
break;
}
}
}
}
private int bitField0_;
// required .ActorRefProtocol watcher = 1;
private akka.remote.RemoteProtocol.ActorRefProtocol watcher_ = akka.remote.RemoteProtocol.ActorRefProtocol.getDefaultInstance();
private com.google.protobuf.SingleFieldBuilder<
akka.remote.RemoteProtocol.ActorRefProtocol, akka.remote.RemoteProtocol.ActorRefProtocol.Builder, akka.remote.RemoteProtocol.ActorRefProtocolOrBuilder> watcherBuilder_;
public boolean hasWatcher() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
public akka.remote.RemoteProtocol.ActorRefProtocol getWatcher() {
if (watcherBuilder_ == null) {
return watcher_;
} else {
return watcherBuilder_.getMessage();
}
}
public Builder setWatcher(akka.remote.RemoteProtocol.ActorRefProtocol value) {
if (watcherBuilder_ == null) {
if (value == null) {
throw new NullPointerException();
}
watcher_ = value;
onChanged();
} else {
watcherBuilder_.setMessage(value);
}
bitField0_ |= 0x00000001;
return this;
}
public Builder setWatcher(
akka.remote.RemoteProtocol.ActorRefProtocol.Builder builderForValue) {
if (watcherBuilder_ == null) {
watcher_ = builderForValue.build();
onChanged();
} else {
watcherBuilder_.setMessage(builderForValue.build());
}
bitField0_ |= 0x00000001;
return this;
}
public Builder mergeWatcher(akka.remote.RemoteProtocol.ActorRefProtocol value) {
if (watcherBuilder_ == null) {
if (((bitField0_ & 0x00000001) == 0x00000001) &&
watcher_ != akka.remote.RemoteProtocol.ActorRefProtocol.getDefaultInstance()) {
watcher_ =
akka.remote.RemoteProtocol.ActorRefProtocol.newBuilder(watcher_).mergeFrom(value).buildPartial();
} else {
watcher_ = value;
}
onChanged();
} else {
watcherBuilder_.mergeFrom(value);
}
bitField0_ |= 0x00000001;
return this;
}
public Builder clearWatcher() {
if (watcherBuilder_ == null) {
watcher_ = akka.remote.RemoteProtocol.ActorRefProtocol.getDefaultInstance();
onChanged();
} else {
watcherBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000001);
return this;
}
public akka.remote.RemoteProtocol.ActorRefProtocol.Builder getWatcherBuilder() {
bitField0_ |= 0x00000001;
onChanged();
return getWatcherFieldBuilder().getBuilder();
}
public akka.remote.RemoteProtocol.ActorRefProtocolOrBuilder getWatcherOrBuilder() {
if (watcherBuilder_ != null) {
return watcherBuilder_.getMessageOrBuilder();
} else {
return watcher_;
}
}
private com.google.protobuf.SingleFieldBuilder<
akka.remote.RemoteProtocol.ActorRefProtocol, akka.remote.RemoteProtocol.ActorRefProtocol.Builder, akka.remote.RemoteProtocol.ActorRefProtocolOrBuilder>
getWatcherFieldBuilder() {
if (watcherBuilder_ == null) {
watcherBuilder_ = new com.google.protobuf.SingleFieldBuilder<
akka.remote.RemoteProtocol.ActorRefProtocol, akka.remote.RemoteProtocol.ActorRefProtocol.Builder, akka.remote.RemoteProtocol.ActorRefProtocolOrBuilder>(
watcher_,
getParentForChildren(),
isClean());
watcher_ = null;
}
return watcherBuilder_;
}
// required .ActorRefProtocol watched = 2;
private akka.remote.RemoteProtocol.ActorRefProtocol watched_ = akka.remote.RemoteProtocol.ActorRefProtocol.getDefaultInstance();
private com.google.protobuf.SingleFieldBuilder<
akka.remote.RemoteProtocol.ActorRefProtocol, akka.remote.RemoteProtocol.ActorRefProtocol.Builder, akka.remote.RemoteProtocol.ActorRefProtocolOrBuilder> watchedBuilder_;
public boolean hasWatched() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
public akka.remote.RemoteProtocol.ActorRefProtocol getWatched() {
if (watchedBuilder_ == null) {
return watched_;
} else {
return watchedBuilder_.getMessage();
}
}
public Builder setWatched(akka.remote.RemoteProtocol.ActorRefProtocol value) {
if (watchedBuilder_ == null) {
if (value == null) {
throw new NullPointerException();
}
watched_ = value;
onChanged();
} else {
watchedBuilder_.setMessage(value);
}
bitField0_ |= 0x00000002;
return this;
}
public Builder setWatched(
akka.remote.RemoteProtocol.ActorRefProtocol.Builder builderForValue) {
if (watchedBuilder_ == null) {
watched_ = builderForValue.build();
onChanged();
} else {
watchedBuilder_.setMessage(builderForValue.build());
}
bitField0_ |= 0x00000002;
return this;
}
public Builder mergeWatched(akka.remote.RemoteProtocol.ActorRefProtocol value) {
if (watchedBuilder_ == null) {
if (((bitField0_ & 0x00000002) == 0x00000002) &&
watched_ != akka.remote.RemoteProtocol.ActorRefProtocol.getDefaultInstance()) {
watched_ =
akka.remote.RemoteProtocol.ActorRefProtocol.newBuilder(watched_).mergeFrom(value).buildPartial();
} else {
watched_ = value;
}
onChanged();
} else {
watchedBuilder_.mergeFrom(value);
}
bitField0_ |= 0x00000002;
return this;
}
public Builder clearWatched() {
if (watchedBuilder_ == null) {
watched_ = akka.remote.RemoteProtocol.ActorRefProtocol.getDefaultInstance();
onChanged();
} else {
watchedBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000002);
return this;
}
public akka.remote.RemoteProtocol.ActorRefProtocol.Builder getWatchedBuilder() {
bitField0_ |= 0x00000002;
onChanged();
return getWatchedFieldBuilder().getBuilder();
}
public akka.remote.RemoteProtocol.ActorRefProtocolOrBuilder getWatchedOrBuilder() {
if (watchedBuilder_ != null) {
return watchedBuilder_.getMessageOrBuilder();
} else {
return watched_;
}
}
private com.google.protobuf.SingleFieldBuilder<
akka.remote.RemoteProtocol.ActorRefProtocol, akka.remote.RemoteProtocol.ActorRefProtocol.Builder, akka.remote.RemoteProtocol.ActorRefProtocolOrBuilder>
getWatchedFieldBuilder() {
if (watchedBuilder_ == null) {
watchedBuilder_ = new com.google.protobuf.SingleFieldBuilder<
akka.remote.RemoteProtocol.ActorRefProtocol, akka.remote.RemoteProtocol.ActorRefProtocol.Builder, akka.remote.RemoteProtocol.ActorRefProtocolOrBuilder>(
watched_,
getParentForChildren(),
isClean());
watched_ = null;
}
return watchedBuilder_;
}
// @@protoc_insertion_point(builder_scope:DaemonMsgWatchProtocol)
}
static {
defaultInstance = new DaemonMsgWatchProtocol(true);
defaultInstance.initFields();
}
// @@protoc_insertion_point(class_scope:DaemonMsgWatchProtocol)
}
private static com.google.protobuf.Descriptors.Descriptor private static com.google.protobuf.Descriptors.Descriptor
internal_static_AkkaRemoteProtocol_descriptor; internal_static_AkkaRemoteProtocol_descriptor;
private static private static
@ -7000,11 +6401,6 @@ public final class RemoteProtocol {
private static private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_DeployProtocol_fieldAccessorTable; internal_static_DeployProtocol_fieldAccessorTable;
private static com.google.protobuf.Descriptors.Descriptor
internal_static_DaemonMsgWatchProtocol_descriptor;
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_DaemonMsgWatchProtocol_fieldAccessorTable;
public static com.google.protobuf.Descriptors.FileDescriptor public static com.google.protobuf.Descriptors.FileDescriptor
getDescriptor() { getDescriptor() {
@ -7039,11 +6435,9 @@ public final class RemoteProtocol {
"ssCreator\030\003 \001(\t\022\017\n\007creator\030\004 \001(\014\022\024\n\014rout" + "ssCreator\030\003 \001(\t\022\017\n\007creator\030\004 \001(\014\022\024\n\014rout" +
"erConfig\030\005 \001(\014\"S\n\016DeployProtocol\022\014\n\004path" + "erConfig\030\005 \001(\014\"S\n\016DeployProtocol\022\014\n\004path" +
"\030\001 \002(\t\022\016\n\006config\030\002 \001(\014\022\024\n\014routerConfig\030\003" + "\030\001 \002(\t\022\016\n\006config\030\002 \001(\014\022\024\n\014routerConfig\030\003" +
" \001(\014\022\r\n\005scope\030\004 \001(\014\"`\n\026DaemonMsgWatchPro" + " \001(\014\022\r\n\005scope\030\004 \001(\014*7\n\013CommandType\022\013\n\007CO" +
"tocol\022\"\n\007watcher\030\001 \002(\0132\021.ActorRefProtoco" + "NNECT\020\001\022\014\n\010SHUTDOWN\020\002\022\r\n\tHEARTBEAT\020\003B\017\n\013" +
"l\022\"\n\007watched\030\002 \002(\0132\021.ActorRefProtocol*7\n" + "akka.remoteH\001"
"\013CommandType\022\013\n\007CONNECT\020\001\022\014\n\010SHUTDOWN\020\002\022" +
"\r\n\tHEARTBEAT\020\003B\017\n\013akka.remoteH\001"
}; };
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -7130,14 +6524,6 @@ public final class RemoteProtocol {
new java.lang.String[] { "Path", "Config", "RouterConfig", "Scope", }, new java.lang.String[] { "Path", "Config", "RouterConfig", "Scope", },
akka.remote.RemoteProtocol.DeployProtocol.class, akka.remote.RemoteProtocol.DeployProtocol.class,
akka.remote.RemoteProtocol.DeployProtocol.Builder.class); akka.remote.RemoteProtocol.DeployProtocol.Builder.class);
internal_static_DaemonMsgWatchProtocol_descriptor =
getDescriptor().getMessageTypes().get(10);
internal_static_DaemonMsgWatchProtocol_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_DaemonMsgWatchProtocol_descriptor,
new java.lang.String[] { "Watcher", "Watched", },
akka.remote.RemoteProtocol.DaemonMsgWatchProtocol.class,
akka.remote.RemoteProtocol.DaemonMsgWatchProtocol.Builder.class);
return null; return null;
} }
}; };

View file

@ -107,12 +107,4 @@ message DeployProtocol {
optional bytes config = 2; optional bytes config = 2;
optional bytes routerConfig = 3; optional bytes routerConfig = 3;
optional bytes scope = 4; optional bytes scope = 4;
} }
/**
* Serialization of akka.remote.DaemonMsgWatch
*/
message DaemonMsgWatchProtocol {
required ActorRefProtocol watcher = 1;
required ActorRefProtocol watched = 2;
}

View file

@ -15,7 +15,6 @@ akka {
serializers { serializers {
proto = "akka.serialization.ProtobufSerializer" proto = "akka.serialization.ProtobufSerializer"
daemon-create = "akka.serialization.DaemonMsgCreateSerializer" daemon-create = "akka.serialization.DaemonMsgCreateSerializer"
daemon-watch = "akka.serialization.DaemonMsgWatchSerializer"
} }
@ -24,7 +23,6 @@ akka {
# does, need to use the more specific one here in order to avoid ambiguity # does, need to use the more specific one here in order to avoid ambiguity
"com.google.protobuf.GeneratedMessage" = proto "com.google.protobuf.GeneratedMessage" = proto
"akka.remote.DaemonMsgCreate" = daemon-create "akka.remote.DaemonMsgCreate" = daemon-create
"akka.remote.DaemonMsgWatch" = daemon-watch
} }
deployment { deployment {

View file

@ -6,7 +6,7 @@ package akka.remote
import akka.actor._ import akka.actor._
import akka.dispatch._ import akka.dispatch._
import akka.event.{ DeathWatch, Logging, LoggingAdapter } import akka.event.{ Logging, LoggingAdapter }
import akka.event.EventStream import akka.event.EventStream
import akka.serialization.Serialization import akka.serialization.Serialization
import akka.serialization.SerializationExtension import akka.serialization.SerializationExtension
@ -34,8 +34,6 @@ private[akka] class RemoteActorRefProvider(
override def rootPath: ActorPath = local.rootPath override def rootPath: ActorPath = local.rootPath
override def deadLetters: InternalActorRef = local.deadLetters override def deadLetters: InternalActorRef = local.deadLetters
override val deathWatch: DeathWatch = new RemoteDeathWatch(local.deathWatch, this)
// these are only available after init() // these are only available after init()
override def rootGuardian: InternalActorRef = local.rootGuardian override def rootGuardian: InternalActorRef = local.rootGuardian
override def guardian: InternalActorRef = local.guardian override def guardian: InternalActorRef = local.guardian
@ -246,25 +244,4 @@ private[akka] class RemoteActorRef private[akka] (
@throws(classOf[java.io.ObjectStreamException]) @throws(classOf[java.io.ObjectStreamException])
private def writeReplace(): AnyRef = SerializedActorRef(path) private def writeReplace(): AnyRef = SerializedActorRef(path)
} }
private[akka] class RemoteDeathWatch(val local: DeathWatch, val provider: RemoteActorRefProvider) extends DeathWatch {
override def subscribe(watcher: ActorRef, watched: ActorRef): Boolean = watched match {
case r: RemoteRef
val ret = local.subscribe(watcher, watched)
provider.actorFor(r.path.root / "remote") ! DaemonMsgWatch(watcher, watched)
ret
case l: LocalRef
local.subscribe(watcher, watched)
case _
provider.log.error("unknown ActorRef type {} as DeathWatch target", watched.getClass)
false
}
override def unsubscribe(watcher: ActorRef, watched: ActorRef): Boolean = local.unsubscribe(watcher, watched)
override def unsubscribe(watcher: ActorRef): Unit = local.unsubscribe(watcher)
override def publish(event: Terminated): Unit = local.publish(event)
}

View file

@ -8,10 +8,10 @@ import scala.annotation.tailrec
import akka.actor.{ VirtualPathContainer, Terminated, Deploy, Props, Nobody, LocalActorRef, InternalActorRef, Address, ActorSystemImpl, ActorRef, ActorPathExtractor, ActorPath, Actor } import akka.actor.{ VirtualPathContainer, Terminated, Deploy, Props, Nobody, LocalActorRef, InternalActorRef, Address, ActorSystemImpl, ActorRef, ActorPathExtractor, ActorPath, Actor }
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
import akka.dispatch.Watch
private[akka] sealed trait DaemonMsg private[akka] sealed trait DaemonMsg
private[akka] case class DaemonMsgCreate(props: Props, deploy: Deploy, path: String, supervisor: ActorRef) extends DaemonMsg private[akka] case class DaemonMsgCreate(props: Props, deploy: Deploy, path: String, supervisor: ActorRef) extends DaemonMsg
private[akka] case class DaemonMsgWatch(watcher: ActorRef, watched: ActorRef) extends DaemonMsg
/** /**
* Internal system "daemon" actor for remote internal communication. * Internal system "daemon" actor for remote internal communication.
@ -62,18 +62,15 @@ private[akka] class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath
val actor = system.provider.actorOf(system, props, supervisor.asInstanceOf[InternalActorRef], val actor = system.provider.actorOf(system, props, supervisor.asInstanceOf[InternalActorRef],
path, false, Some(deploy), true) path, false, Some(deploy), true)
addChild(subpath.mkString("/"), actor) addChild(subpath.mkString("/"), actor)
system.deathWatch.subscribe(this, actor) this.sendSystemMessage(Watch(actor, this))
case _ case _
log.error("remote path does not match path from message [{}]", message) log.error("remote path does not match path from message [{}]", message)
} }
case DaemonMsgWatch(watcher, watched)
val other = system.actorFor(watcher.path.root / "remote")
system.deathWatch.subscribe(other, watched)
} }
case Terminated(child: LocalActorRef) removeChild(child.path.elements.drop(1).mkString("/")) case Terminated(child: LocalActorRef) removeChild(child.path.elements.drop(1).mkString("/"))
case t: Terminated system.deathWatch.publish(t) case t: Terminated
case unknown log.warning("Unknown message {} received by {}", unknown, this) case unknown log.warning("Unknown message {} received by {}", unknown, this)
} }

View file

@ -287,10 +287,9 @@ abstract class RemoteTransport(val system: ExtendedActorSystem, val provider: Re
case l: LocalRef case l: LocalRef
if (provider.remoteSettings.LogReceive) log.debug("received local message {}", remoteMessage) if (provider.remoteSettings.LogReceive) log.debug("received local message {}", remoteMessage)
remoteMessage.payload match { remoteMessage.payload match {
case _: SystemMessage if useUntrustedMode log.warning("operating in UntrustedMode, dropping inbound system message") case msg: PossiblyHarmful if useUntrustedMode log.warning("operating in UntrustedMode, dropping inbound PossiblyHarmful message of type {}", msg.getClass)
case _: AutoReceivedMessage if useUntrustedMode log.warning("operating in UntrustedMode, dropping inbound AutoReceivedMessage") case msg: SystemMessage l.sendSystemMessage(msg)
case msg: SystemMessage l.sendSystemMessage(msg) case msg l.!(msg)(remoteMessage.sender)
case msg l.!(msg)(remoteMessage.sender)
} }
case r: RemoteRef case r: RemoteRef
if (provider.remoteSettings.LogReceive) log.debug("received remote-destined message {}", remoteMessage) if (provider.remoteSettings.LogReceive) log.debug("received remote-destined message {}", remoteMessage)

View file

@ -1,43 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.serialization
import akka.actor.ActorRef
import akka.remote.DaemonMsgWatch
import akka.remote.RemoteProtocol.ActorRefProtocol
import akka.remote.RemoteProtocol.DaemonMsgWatchProtocol
import akka.actor.ExtendedActorSystem
/**
* Serializes akka's internal DaemonMsgWatch using protobuf.
*
* INTERNAL API
*/
private[akka] class DaemonMsgWatchSerializer(val system: ExtendedActorSystem) extends Serializer {
import ProtobufSerializer.serializeActorRef
import ProtobufSerializer.deserializeActorRef
def includeManifest: Boolean = false
def identifier = 4
def toBinary(obj: AnyRef): Array[Byte] = obj match {
case DaemonMsgWatch(watcher, watched)
DaemonMsgWatchProtocol.newBuilder.
setWatcher(serializeActorRef(watcher)).
setWatched(serializeActorRef(watched)).
build.toByteArray
case _
throw new IllegalArgumentException(
"Can't serialize a non-DaemonMsgWatch message using DaemonMsgWatchSerializer [%s]".format(obj))
}
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
val proto = DaemonMsgWatchProtocol.parseFrom(bytes)
DaemonMsgWatch(
watcher = deserializeActorRef(system, proto.getWatcher),
watched = deserializeActorRef(system, proto.getWatched))
}
}

View file

@ -1,49 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.serialization
import akka.testkit.AkkaSpec
import akka.remote.DaemonMsgWatch
import akka.actor.Actor
import akka.actor.Props
object DaemonMsgWatchSerializerSpec {
class MyActor extends Actor {
def receive = {
case _
}
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class DaemonMsgWatchSerializerSpec extends AkkaSpec {
import DaemonMsgWatchSerializerSpec._
val ser = SerializationExtension(system)
"Serialization" must {
"resolve DaemonMsgWatchSerializer" in {
ser.serializerFor(classOf[DaemonMsgWatch]).getClass must be(classOf[DaemonMsgWatchSerializer])
}
"serialize and de-serialize DaemonMsgWatch" in {
val watcher = system.actorOf(Props[MyActor], "watcher")
val watched = system.actorOf(Props[MyActor], "watched")
val msg = DaemonMsgWatch(watcher, watched)
val bytes = ser.serialize(msg) match {
case Left(exception) fail(exception)
case Right(bytes) bytes
}
ser.deserialize(bytes.asInstanceOf[Array[Byte]], classOf[DaemonMsgWatch]) match {
case Left(exception) fail(exception)
case Right(m) assert(m === msg)
}
}
}
}

View file

@ -12,7 +12,6 @@ import akka.util.duration._
import com.typesafe.config.Config import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import akka.actor.PoisonPill import akka.actor.PoisonPill
import akka.actor.CreateChild
import akka.actor.DeadLetter import akka.actor.DeadLetter
import java.util.concurrent.TimeoutException import java.util.concurrent.TimeoutException
import akka.dispatch.{ Await, MessageDispatcher } import akka.dispatch.{ Await, MessageDispatcher }
@ -115,9 +114,7 @@ class AkkaSpecSpec extends WordSpec with MustMatchers {
"akka.actor.debug.lifecycle" -> true, "akka.actor.debug.event-stream" -> true, "akka.actor.debug.lifecycle" -> true, "akka.actor.debug.event-stream" -> true,
"akka.loglevel" -> "DEBUG", "akka.stdout-loglevel" -> "DEBUG") "akka.loglevel" -> "DEBUG", "akka.stdout-loglevel" -> "DEBUG")
val system = ActorSystem("AkkaSpec1", ConfigFactory.parseMap(conf.asJava).withFallback(AkkaSpec.testConf)) val system = ActorSystem("AkkaSpec1", ConfigFactory.parseMap(conf.asJava).withFallback(AkkaSpec.testConf))
val spec = new AkkaSpec(system) { val spec = new AkkaSpec(system) { val ref = Seq(testActor, system.actorOf(Props.empty, "name")) }
val ref = Seq(testActor, system.actorOf(Props.empty, "name"))
}
spec.ref foreach (_.isTerminated must not be true) spec.ref foreach (_.isTerminated must not be true)
system.shutdown() system.shutdown()
spec.awaitCond(spec.ref forall (_.isTerminated), 2 seconds) spec.awaitCond(spec.ref forall (_.isTerminated), 2 seconds)