Improving the ActorPool code and fixing the tests for the new supervision

This commit is contained in:
Viktor Klang 2011-09-29 16:00:37 +02:00
parent a12ee36151
commit 950b11850a
3 changed files with 17 additions and 21 deletions

View file

@ -387,7 +387,7 @@ class ActorPoolSpec extends WordSpec with MustMatchers {
throw new RuntimeException
case _ pingCount.incrementAndGet
}
}))
}).withSupervisor(self))
}).withFaultHandler(faultHandler))
val pool2 = actorOf(
@ -411,7 +411,7 @@ class ActorPoolSpec extends WordSpec with MustMatchers {
throw new RuntimeException
case _ pingCount.incrementAndGet
}
}))
}).withSupervisor(self))
}).withFaultHandler(faultHandler))
val pool3 = actorOf(
@ -526,7 +526,7 @@ class ActorPoolSpec extends WordSpec with MustMatchers {
def instance = factory
def receive = _route
def pressureThreshold = 1
def factory = actorOf(new Actor {
def factory = actorOf(Props(new Actor {
if (deathCount.get > 5) deathCount.set(0)
if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") }
def receive = {
@ -537,7 +537,7 @@ class ActorPoolSpec extends WordSpec with MustMatchers {
throw new RuntimeException
case _ pingCount.incrementAndGet
}
})
}).withSupervisor(self))
}).withFaultHandler(OneForOneStrategy(List(classOf[IllegalStateException]), 5, 1000)))
// actor comes back right away

View file

@ -322,31 +322,26 @@ private[akka] class ActorCell(
}
actor match {
case null
case null if !recreation
val created = newActor() //TODO !!!! Notify supervisor on failure to create!
actor = created
created.preStart()
checkReceiveTimeout
if (Actor.debugLifecycle) EventHandler.debug(created, "started")
case instance if recreation
case failedActor if recreation
val reason = new Exception("CRASHED") //FIXME TODO stash away the exception that caused the failure and reuse that? <------- !!!!!!!!!! RED RED RED
try {
val failedActor = actor
if (Actor.debugLifecycle) EventHandler.debug(failedActor, "restarting")
val freshActor = newActor()
clearActorContext()
if (failedActor ne null) {
val c = currentMessage //One read only plz
failedActor.preRestart(reason, if (c ne null) Some(c.message) else None)
}
val freshActor = newActor()
clearActorContext()
actor = freshActor // assign it here so if preStart fails, we can null out the sef-refs next call
freshActor.postRestart(reason)
if (Actor.debugLifecycle) EventHandler.debug(freshActor, "restarted")
} catch {
case e
EventHandler.error(e, self, "Exception in restart of Actor [%s]".format(toString))
throw e
} finally {
currentMessage = null
}
@ -358,10 +353,14 @@ private[akka] class ActorCell(
case _
}
} catch {
case e
case e try {
EventHandler.error(e, this, "error while creating actor")
// prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
envelope.channel.sendException(e)
} finally {
if (supervisor.isDefined) supervisor.get ! Failed(self, e) else throw e
}
}
def suspend(): Unit = dispatcher suspend this
@ -432,7 +431,8 @@ private[akka] class ActorCell(
def invoke(messageHandle: Envelope) {
try {
if (!mailbox.isClosed) {
val isClosed = mailbox.isClosed //Fence plus volatile read
if (!isClosed) {
currentMessage = messageHandle
try {
try {

View file

@ -89,11 +89,7 @@ trait DefaultActorPool extends ActorPool { this: Actor ⇒
protected[akka] var _delegates = Vector[ActorRef]()
override def postStop() {
_delegates foreach { delegate
try {
delegate ! PoisonPill
} catch { case e: Exception } //Ignore any exceptions here
}
_delegates foreach { _ ! PoisonPill }
}
protected def _route(): Receive = {
@ -101,7 +97,7 @@ trait DefaultActorPool extends ActorPool { this: Actor ⇒
case Stat
tryReply(Stats(_delegates length))
case Terminated(victim, _)
_delegates = _delegates filterNot { _.uuid == victim.uuid }
_delegates = _delegates filterNot { victim == }
case msg
resizeIfAppropriate()