diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 583429ad6b..fb33b2ab85 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -509,14 +509,7 @@ private[akka] class ActorCell( checkReceiveTimeout if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(created), "started (" + created + ")")) } catch { - case NonFatal(e) ⇒ - try { - dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), "error while creating actor"), e)) - // prevent any further messages to be processed until the actor has been restarted - dispatcher.suspend(this) - } finally { - parent.tell(Failed(ActorInitializationException(self, "exception during creation", e)), self) - } + case NonFatal(e) ⇒ throw ActorInitializationException(self, "exception during creation", e) } } @@ -540,13 +533,7 @@ private[akka] class ActorCell( doRecreate(cause, failedActor) } } catch { - case NonFatal(e) ⇒ try { - dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), "error while creating actor"), e)) - // prevent any further messages to be processed until the actor has been restarted - dispatcher.suspend(this) - } finally { - parent.tell(Failed(ActorInitializationException(self, "exception during re-creation", e)), self) - } + case NonFatal(e) ⇒ throw ActorInitializationException(self, "exception during creation", e) } } @@ -601,48 +588,34 @@ private[akka] class ActorCell( case ChildTerminated(child) ⇒ handleChildTerminated(child) } } catch { - case NonFatal(e) ⇒ - dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), "error while processing " + message), e)) - throw e + case e @ (_: InterruptedException | NonFatal(_)) ⇒ handleInvokeFailure(e, "error while processing " + message) } } //Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status - final def invoke(messageHandle: Envelope) { - try { - currentMessage = messageHandle - try { - try { - cancelReceiveTimeout() // FIXME: leave this here??? - messageHandle.message match { - case msg: AutoReceivedMessage ⇒ autoReceiveMessage(messageHandle) - case msg ⇒ actor(msg) - } - currentMessage = null // reset current message after successful invocation - } catch { - case e: InterruptedException ⇒ - dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), e.getMessage), e)) - // prevent any further messages to be processed until the actor has been restarted - dispatcher.suspend(this) - // make sure that InterruptedException does not leave this thread - val ex = ActorInterruptedException(e) - actor.supervisorStrategy.handleSupervisorFailing(self, children) - parent.tell(Failed(ex), self) - throw e //Re-throw InterruptedExceptions as expected - case NonFatal(e) ⇒ - dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), e.getMessage), e)) - // prevent any further messages to be processed until the actor has been restarted - dispatcher.suspend(this) - if (actor ne null) actor.supervisorStrategy.handleSupervisorFailing(self, children) - parent.tell(Failed(e), self) - } finally { - checkReceiveTimeout // Reschedule receive timeout - } - } catch { - case NonFatal(e) ⇒ - dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), e.getMessage), e)) - throw e - } + final def invoke(messageHandle: Envelope): Unit = try { + currentMessage = messageHandle + cancelReceiveTimeout() // FIXME: leave this here??? + messageHandle.message match { + case msg: AutoReceivedMessage ⇒ autoReceiveMessage(messageHandle) + case msg ⇒ actor(msg) + } + currentMessage = null // reset current message after successful invocation + } catch { + case e @ (_: InterruptedException | NonFatal(_)) ⇒ handleInvokeFailure(e, e.getMessage) + } finally { + checkReceiveTimeout // Reschedule receive timeout + } + + private final def handleInvokeFailure(t: Throwable, message: String): Unit = try { + dispatcher.reportFailure(new LogEventException(Error(t, self.path.toString, clazz(actor), message), t)) + // prevent any further messages to be processed until the actor has been restarted + dispatcher.suspend(this) + if (actor ne null) actor.supervisorStrategy.handleSupervisorFailing(self, children) + } finally { + t match { // Wrap InterruptedExceptions and rethrow + case _: InterruptedException ⇒ parent.tell(Failed(ActorInterruptedException(t)), self); throw t + case _ ⇒ parent.tell(Failed(t), self) } } diff --git a/akka-actor/src/main/scala/akka/util/NonFatal.scala b/akka-actor/src/main/scala/akka/util/NonFatal.scala index ae7d91c7a3..e14a491910 100644 --- a/akka-actor/src/main/scala/akka/util/NonFatal.scala +++ b/akka-actor/src/main/scala/akka/util/NonFatal.scala @@ -19,13 +19,11 @@ package akka.util * }}} */ object NonFatal { - def unapply(t: Throwable): Option[Throwable] = t match { case e: StackOverflowError ⇒ Some(e) // StackOverflowError ok even though it is a VirtualMachineError // VirtualMachineError includes OutOfMemoryError and other fatal errors case _: VirtualMachineError | _: ThreadDeath | _: InterruptedException | _: LinkageError ⇒ None case e ⇒ Some(e) } - }