Merge pull request #1963 from drewhk/wip-3562-mistakenly-generated-terminated-drewhk

=act #3562: Do not generate Terminated when unwatched in preRestart
This commit is contained in:
drewhk 2014-01-30 06:13:25 -08:00
commit 6c75b8a1e9
3 changed files with 105 additions and 5 deletions

View file

@ -0,0 +1,99 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
import akka.testkit.{ TestProbe, AkkaSpec }
import akka.actor.SupervisorStrategy.{ Restart, Stop }
import akka.dispatch.sysmsg.SystemMessage
import akka.event.EventStream
import scala.util.control.NoStackTrace
object UidClashTest {
class TerminatedForNonWatchedActor extends Exception("Received Terminated for actor that was not actually watched")
with NoStackTrace
@volatile var oldActor: ActorRef = _
class EvilCollidingActorRef(override val provider: ActorRefProvider,
override val path: ActorPath,
val eventStream: EventStream) extends MinimalActorRef {
//Ignore everything
override def isTerminated(): Boolean = true
override def sendSystemMessage(message: SystemMessage): Unit = ()
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = ()
}
def createCollidingRef(system: ActorSystem): ActorRef =
new EvilCollidingActorRef(system.asInstanceOf[ActorSystemImpl].provider, oldActor.path, system.eventStream)
case object PleaseRestart
case object PingMyself
case object RestartedSafely
class RestartedActor extends Actor {
def receive = {
case PleaseRestart throw new Exception("restart")
case Terminated(ref) throw new TerminatedForNonWatchedActor
// This is the tricky part to make this test a positive one (avoid expectNoMsg).
// Since anything enqueued in postRestart will arrive before the Terminated
// the bug triggers, there needs to be a bounce:
// 1. Ping is sent from postRestart to self
// 2. As a response to pint, RestartedSafely is sent to self
// 3a. if Terminated was enqueued during the restart procedure it will arrive before the RestartedSafely message
// 3b. otherwise only the RestartedSafely message arrives
case PingMyself self ! RestartedSafely
case RestartedSafely context.parent ! RestartedSafely
}
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
context.children foreach { child
oldActor = child
context.unwatch(child)
context.stop(child)
}
}
override def preStart(): Unit = context.watch(context.actorOf(Props.empty, "child"))
override def postRestart(reason: Throwable): Unit = {
context.watch(createCollidingRef(context.system))
self ! PingMyself
} // Simulate UID clash
}
class RestartingActor(probe: ActorRef) extends Actor {
override val supervisorStrategy = OneForOneStrategy(loggingEnabled = false) {
case _: TerminatedForNonWatchedActor
context.stop(self)
Stop
case _ Restart
}
val theRestartedOne = context.actorOf(Props[RestartedActor], "theRestartedOne")
def receive = {
case PleaseRestart theRestartedOne ! PleaseRestart
case RestartedSafely probe ! RestartedSafely
}
}
}
class UidClashTest extends AkkaSpec {
import UidClashTest._
"The Terminated message for an old child stopped in preRestart" should {
"not arrive after restart" in {
val watcher = TestProbe()
val topActor = system.actorOf(Props(classOf[RestartingActor], watcher.ref), "top")
watcher.watch(topActor)
topActor ! PleaseRestart
watcher.expectMsg(RestartedSafely)
}
}
}

View file

@ -50,7 +50,6 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
* it will be propagated to user's receive.
*/
protected def watchedActorTerminated(actor: ActorRef, existenceConfirmed: Boolean, addressTerminated: Boolean): Unit = {
if (childrenRefs.getByRef(actor).isDefined) handleChildTerminated(actor)
if (watchingContains(actor)) {
maintainAddressTerminatedSubscription(actor) {
watching = removeFromSet(actor, watching)
@ -60,6 +59,7 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
terminatedQueuedFor(actor)
}
}
if (childrenRefs.getByRef(actor).isDefined) handleChildTerminated(actor)
}
private[akka] def terminatedQueuedFor(subject: ActorRef): Unit =
@ -77,12 +77,13 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
if (subject.path.uid != ActorCell.undefinedUid) (set - subject) - new UndefinedUidActorRef(subject)
else set filterNot (_.path == subject.path)
protected def tellWatchersWeDied(actor: Actor): Unit =
protected def tellWatchersWeDied(): Unit =
if (!watchedBy.isEmpty) {
try {
// Don't need to send to parent parent since it receives a DWN by default
def sendTerminated(ifLocal: Boolean)(watcher: ActorRef): Unit =
if (watcher.asInstanceOf[ActorRefScope].isLocal == ifLocal) watcher.asInstanceOf[InternalActorRef].sendSystemMessage(
DeathWatchNotification(self, existenceConfirmed = true, addressTerminated = false))
if (watcher.asInstanceOf[ActorRefScope].isLocal == ifLocal && watcher != parent)
watcher.asInstanceOf[InternalActorRef].sendSystemMessage(DeathWatchNotification(self, existenceConfirmed = true, addressTerminated = false))
/*
* It is important to notify the remote watchers first, otherwise RemoteDaemon might shut down, causing

View file

@ -202,7 +202,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
catch handleNonFatalOrInterruptedException { e publish(Error(e, self.path.toString, clazz(a), e.getMessage)) }
finally try dispatcher.detach(this)
finally try parent.sendSystemMessage(DeathWatchNotification(self, existenceConfirmed = true, addressTerminated = false))
finally try tellWatchersWeDied(a)
finally try tellWatchersWeDied()
finally try unwatchWatchedActors(a) // stay here as we expect an emergency stop from handleInvokeFailure
finally {
if (system.settings.DebugLifecycle)