diff --git a/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala index e8c7de8643..e5b4d5bf60 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala @@ -387,7 +387,7 @@ class ActorPoolSpec extends WordSpec with MustMatchers { throw new RuntimeException case _ ⇒ pingCount.incrementAndGet } - })) + }).withSupervisor(self)) }).withFaultHandler(faultHandler)) val pool2 = actorOf( @@ -411,7 +411,7 @@ class ActorPoolSpec extends WordSpec with MustMatchers { throw new RuntimeException case _ ⇒ pingCount.incrementAndGet } - })) + }).withSupervisor(self)) }).withFaultHandler(faultHandler)) val pool3 = actorOf( @@ -526,7 +526,7 @@ class ActorPoolSpec extends WordSpec with MustMatchers { def instance = factory def receive = _route def pressureThreshold = 1 - def factory = actorOf(new Actor { + def factory = actorOf(Props(new Actor { if (deathCount.get > 5) deathCount.set(0) if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") } def receive = { @@ -537,7 +537,7 @@ class ActorPoolSpec extends WordSpec with MustMatchers { throw new RuntimeException case _ ⇒ pingCount.incrementAndGet } - }) + }).withSupervisor(self)) }).withFaultHandler(OneForOneStrategy(List(classOf[IllegalStateException]), 5, 1000))) // actor comes back right away diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 6a517d385f..0ebbb858db 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -322,31 +322,26 @@ private[akka] class ActorCell( } actor match { - case null ⇒ + case null if !recreation ⇒ val created = newActor() //TODO !!!! Notify supervisor on failure to create! actor = created created.preStart() checkReceiveTimeout if (Actor.debugLifecycle) EventHandler.debug(created, "started") - case instance if recreation ⇒ + case failedActor if recreation ⇒ val reason = new Exception("CRASHED") //FIXME TODO stash away the exception that caused the failure and reuse that? <------- !!!!!!!!!! RED RED RED try { - val failedActor = actor if (Actor.debugLifecycle) EventHandler.debug(failedActor, "restarting") + val freshActor = newActor() + clearActorContext() if (failedActor ne null) { val c = currentMessage //One read only plz failedActor.preRestart(reason, if (c ne null) Some(c.message) else None) } - val freshActor = newActor() - clearActorContext() actor = freshActor // assign it here so if preStart fails, we can null out the sef-refs next call freshActor.postRestart(reason) if (Actor.debugLifecycle) EventHandler.debug(freshActor, "restarted") - } catch { - case e ⇒ - EventHandler.error(e, self, "Exception in restart of Actor [%s]".format(toString)) - throw e } finally { currentMessage = null } @@ -358,10 +353,14 @@ private[akka] class ActorCell( case _ ⇒ } } catch { - case e ⇒ + case e ⇒ try { EventHandler.error(e, this, "error while creating actor") + // prevent any further messages to be processed until the actor has been restarted + dispatcher.suspend(this) envelope.channel.sendException(e) + } finally { if (supervisor.isDefined) supervisor.get ! Failed(self, e) else throw e + } } def suspend(): Unit = dispatcher suspend this @@ -432,7 +431,8 @@ private[akka] class ActorCell( def invoke(messageHandle: Envelope) { try { - if (!mailbox.isClosed) { + val isClosed = mailbox.isClosed //Fence plus volatile read + if (!isClosed) { currentMessage = messageHandle try { try { diff --git a/akka-actor/src/main/scala/akka/routing/Pool.scala b/akka-actor/src/main/scala/akka/routing/Pool.scala index 0044baf69f..3f9ffacac4 100644 --- a/akka-actor/src/main/scala/akka/routing/Pool.scala +++ b/akka-actor/src/main/scala/akka/routing/Pool.scala @@ -89,11 +89,7 @@ trait DefaultActorPool extends ActorPool { this: Actor ⇒ protected[akka] var _delegates = Vector[ActorRef]() override def postStop() { - _delegates foreach { delegate ⇒ - try { - delegate ! PoisonPill - } catch { case e: Exception ⇒ } //Ignore any exceptions here - } + _delegates foreach { _ ! PoisonPill } } protected def _route(): Receive = { @@ -101,7 +97,7 @@ trait DefaultActorPool extends ActorPool { this: Actor ⇒ case Stat ⇒ tryReply(Stats(_delegates length)) case Terminated(victim, _) ⇒ - _delegates = _delegates filterNot { _.uuid == victim.uuid } + _delegates = _delegates filterNot { victim == } case msg ⇒ resizeIfAppropriate()