Added shutdown of un-supervised Temporary that have crashed
This commit is contained in:
parent
f6c64ce007
commit
9f5d77b4b0
2 changed files with 28 additions and 3 deletions
|
|
@ -992,6 +992,7 @@ class LocalActorRef private[akka](
|
|||
"No message handler defined for system message [MaximumNumberOfRestartsWithinTimeRangeReached]" +
|
||||
"\n\tCan't send the message to the supervisor [%s].", sup)
|
||||
}
|
||||
|
||||
stop
|
||||
} else {
|
||||
_isBeingRestarted = true
|
||||
|
|
@ -1182,15 +1183,16 @@ class LocalActorRef private[akka](
|
|||
clearTransaction
|
||||
if (topLevelTransaction) clearTransactionSet
|
||||
|
||||
notifySupervisorWithMessage(Exit(this, reason))
|
||||
if (supervisor.isDefined) notifySupervisorWithMessage(Exit(this, reason))
|
||||
else lifeCycle match { case Some(LifeCycle(Temporary)) => shutDownTemporaryActor(this) }
|
||||
}
|
||||
|
||||
private def notifySupervisorWithMessage(notification: LifeCycleMessage) = {
|
||||
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
|
||||
_supervisor.foreach { sup =>
|
||||
if (sup.isShutdown) { // if supervisor is shut down, game over for all linked actors
|
||||
shutdownLinkedActors
|
||||
stop
|
||||
shutdownLinkedActors
|
||||
stop
|
||||
} else sup ! notification // else notify supervisor
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package se.scalablesolutions.akka.actor
|
||||
|
||||
import java.util.concurrent.{TimeUnit, CyclicBarrier, TimeoutException}
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
|
||||
|
|
@ -19,6 +20,16 @@ object ActorFireForgetRequestReplySpec {
|
|||
}
|
||||
}
|
||||
|
||||
class CrashingTemporaryActor extends Actor {
|
||||
self.lifeCycle = Some(LifeCycle(Temporary))
|
||||
|
||||
def receive = {
|
||||
case "Die" =>
|
||||
state.finished.await
|
||||
throw new Exception("Expected exception")
|
||||
}
|
||||
}
|
||||
|
||||
class SenderActor(replyActor: ActorRef) extends Actor {
|
||||
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
|
||||
|
||||
|
|
@ -66,4 +77,16 @@ class ActorFireForgetRequestReplySpec extends JUnitSuite {
|
|||
catch { case e: TimeoutException => fail("Never got the message") }
|
||||
assert("ReplyImplicit" === state.s)
|
||||
}
|
||||
|
||||
@Test
|
||||
def shouldShutdownCrashedTemporaryActor = {
|
||||
state.finished.reset
|
||||
val actor = actorOf[CrashingTemporaryActor].start
|
||||
assert(actor.isRunning)
|
||||
actor ! "Die"
|
||||
try { state.finished.await(1L, TimeUnit.SECONDS) }
|
||||
catch { case e: TimeoutException => fail("Never got the message") }
|
||||
Thread.sleep(100)
|
||||
assert(actor.isShutdown)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue