From 9f5d77b4b0177ccc733ad1d1277e7aed197fde8a Mon Sep 17 00:00:00 2001 From: Jonas Boner Date: Mon, 16 Aug 2010 13:37:57 +0200 Subject: [PATCH] Added shutdown of un-supervised Temporary that have crashed --- akka-core/src/main/scala/actor/ActorRef.scala | 8 ++++--- .../ActorFireForgetRequestReplySpec.scala | 23 +++++++++++++++++++ 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index 6c15c197dd..028d238887 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -992,6 +992,7 @@ class LocalActorRef private[akka]( "No message handler defined for system message [MaximumNumberOfRestartsWithinTimeRangeReached]" + "\n\tCan't send the message to the supervisor [%s].", sup) } + stop } else { _isBeingRestarted = true @@ -1182,15 +1183,16 @@ class LocalActorRef private[akka]( clearTransaction if (topLevelTransaction) clearTransactionSet - notifySupervisorWithMessage(Exit(this, reason)) + if (supervisor.isDefined) notifySupervisorWithMessage(Exit(this, reason)) + else lifeCycle match { case Some(LifeCycle(Temporary)) => shutDownTemporaryActor(this) } } private def notifySupervisorWithMessage(notification: LifeCycleMessage) = { // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client _supervisor.foreach { sup => if (sup.isShutdown) { // if supervisor is shut down, game over for all linked actors - shutdownLinkedActors - stop + shutdownLinkedActors + stop } else sup ! notification // else notify supervisor } } diff --git a/akka-core/src/test/scala/actor/actor/ActorFireForgetRequestReplySpec.scala b/akka-core/src/test/scala/actor/actor/ActorFireForgetRequestReplySpec.scala index 9fbd39f90e..6d3b4d5124 100644 --- a/akka-core/src/test/scala/actor/actor/ActorFireForgetRequestReplySpec.scala +++ b/akka-core/src/test/scala/actor/actor/ActorFireForgetRequestReplySpec.scala @@ -1,6 +1,7 @@ package se.scalablesolutions.akka.actor import java.util.concurrent.{TimeUnit, CyclicBarrier, TimeoutException} +import se.scalablesolutions.akka.config.ScalaConfig._ import org.scalatest.junit.JUnitSuite import org.junit.Test @@ -19,6 +20,16 @@ object ActorFireForgetRequestReplySpec { } } + class CrashingTemporaryActor extends Actor { + self.lifeCycle = Some(LifeCycle(Temporary)) + + def receive = { + case "Die" => + state.finished.await + throw new Exception("Expected exception") + } + } + class SenderActor(replyActor: ActorRef) extends Actor { self.dispatcher = Dispatchers.newThreadBasedDispatcher(self) @@ -66,4 +77,16 @@ class ActorFireForgetRequestReplySpec extends JUnitSuite { catch { case e: TimeoutException => fail("Never got the message") } assert("ReplyImplicit" === state.s) } + + @Test + def shouldShutdownCrashedTemporaryActor = { + state.finished.reset + val actor = actorOf[CrashingTemporaryActor].start + assert(actor.isRunning) + actor ! "Die" + try { state.finished.await(1L, TimeUnit.SECONDS) } + catch { case e: TimeoutException => fail("Never got the message") } + Thread.sleep(100) + assert(actor.isShutdown) + } }