From 66d0b811335849ec38d69b39c7ccd4a4f42d97be Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 25 Apr 2012 17:24:16 +0200 Subject: [PATCH 1/3] Switching to a more streamlined approach for systemInvoke error handling, resembling the approach used by invoke --- .../src/main/scala/akka/actor/ActorCell.scala | 71 ++++++------------- .../src/main/scala/akka/util/NonFatal.scala | 9 +-- 2 files changed, 25 insertions(+), 55 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index d577688526..25db544306 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -509,14 +509,7 @@ private[akka] class ActorCell( checkReceiveTimeout if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(created), "started (" + created + ")")) } catch { - case NonFatal(e) ⇒ - try { - dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), "error while creating actor"), e)) - // prevent any further messages to be processed until the actor has been restarted - dispatcher.suspend(this) - } finally { - parent.tell(Failed(ActorInitializationException(self, "exception during creation", e)), self) - } + case NonFatal(e) ⇒ throw ActorInitializationException(self, "exception during creation", e) } } @@ -540,13 +533,7 @@ private[akka] class ActorCell( doRecreate(cause) } } catch { - case NonFatal(e) ⇒ try { - dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), "error while creating actor"), e)) - // prevent any further messages to be processed until the actor has been restarted - dispatcher.suspend(this) - } finally { - parent.tell(Failed(ActorInitializationException(self, "exception during re-creation", e)), self) - } + case NonFatal(e) ⇒ throw ActorInitializationException(self, "exception during creation", e) } } @@ -601,48 +588,34 @@ private[akka] class ActorCell( case ChildTerminated(child) ⇒ handleChildTerminated(child) } } catch { - case NonFatal(e) ⇒ + case e @ (_: InterruptedException | NonFatal(_)) ⇒ dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), "error while processing " + message), e)) - throw e + // prevent any further messages to be processed until the actor has been restarted + dispatcher.suspend(this) + if (actor ne null) actor.supervisorStrategy.handleSupervisorFailing(self, children) + parent.tell(Failed(if (e.isInstanceOf[InterruptedException]) ActorInterruptedException(e) else e), self) // Wrap InterruptedExceptions + throw e // FIXME should we rethrow? } } //Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status final def invoke(messageHandle: Envelope) { + currentMessage = messageHandle try { - currentMessage = messageHandle - try { - try { - cancelReceiveTimeout() // FIXME: leave this here??? - messageHandle.message match { - case msg: AutoReceivedMessage ⇒ autoReceiveMessage(messageHandle) - case msg ⇒ actor(msg) - } - currentMessage = null // reset current message after successful invocation - } catch { - case e: InterruptedException ⇒ - dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), e.getMessage), e)) - // prevent any further messages to be processed until the actor has been restarted - dispatcher.suspend(this) - // make sure that InterruptedException does not leave this thread - val ex = ActorInterruptedException(e) - actor.supervisorStrategy.handleSupervisorFailing(self, children) - parent.tell(Failed(ex), self) - throw e //Re-throw InterruptedExceptions as expected - case NonFatal(e) ⇒ - dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), e.getMessage), e)) - // prevent any further messages to be processed until the actor has been restarted - dispatcher.suspend(this) - if (actor ne null) actor.supervisorStrategy.handleSupervisorFailing(self, children) - parent.tell(Failed(e), self) - } finally { - checkReceiveTimeout // Reschedule receive timeout - } - } catch { - case NonFatal(e) ⇒ - dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), e.getMessage), e)) - throw e + cancelReceiveTimeout() // FIXME: leave this here??? + messageHandle.message match { + case msg: AutoReceivedMessage ⇒ autoReceiveMessage(messageHandle) + case msg ⇒ actor(msg) } + currentMessage = null // reset current message after successful invocation + } catch { + case e @ (_: InterruptedException | NonFatal(_)) ⇒ + dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), e.getMessage), e)) + dispatcher.suspend(this) // prevent any further messages to be processed until the actor has been restarted + if (actor ne null) actor.supervisorStrategy.handleSupervisorFailing(self, children) + parent.tell(Failed(if (e.isInstanceOf[InterruptedException]) ActorInterruptedException(e) else e), self) // Wrap InterruptedExceptions + } finally { + checkReceiveTimeout // Reschedule receive timeout } } diff --git a/akka-actor/src/main/scala/akka/util/NonFatal.scala b/akka-actor/src/main/scala/akka/util/NonFatal.scala index ae7d91c7a3..3fb51b14f8 100644 --- a/akka-actor/src/main/scala/akka/util/NonFatal.scala +++ b/akka-actor/src/main/scala/akka/util/NonFatal.scala @@ -20,12 +20,9 @@ package akka.util */ object NonFatal { - def unapply(t: Throwable): Option[Throwable] = t match { - case e: StackOverflowError ⇒ Some(e) // StackOverflowError ok even though it is a VirtualMachineError - // VirtualMachineError includes OutOfMemoryError and other fatal errors - case _: VirtualMachineError | _: ThreadDeath | _: InterruptedException | _: LinkageError ⇒ None - case e ⇒ Some(e) - } + def unapply(t: Throwable): Option[Throwable] = if (isFatal(t)) None else Some(t) + private def isFatal(t: Throwable): Boolean = + (t.isInstanceOf[VirtualMachineError] && !t.isInstanceOf[StackOverflowError]) || t.isInstanceOf[ThreadDeath] || t.isInstanceOf[InterruptedException] || t.isInstanceOf[LinkageError] } From f56dee1290f273702a1396e491bfd99dc1255aa7 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 26 Apr 2012 10:28:23 +0200 Subject: [PATCH 2/3] Reverting to the old NonFatal impl since the new wasn't needed and adding finally block and escalation only in the case of InterruptedException or fatal --- akka-actor/src/main/scala/akka/actor/ActorCell.scala | 10 +++++++--- akka-actor/src/main/scala/akka/util/NonFatal.scala | 11 ++++++----- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 25db544306..7044ffdabe 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -588,13 +588,17 @@ private[akka] class ActorCell( case ChildTerminated(child) ⇒ handleChildTerminated(child) } } catch { - case e @ (_: InterruptedException | NonFatal(_)) ⇒ + case e @ (_: InterruptedException | NonFatal(_)) ⇒ try { dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), "error while processing " + message), e)) // prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) if (actor ne null) actor.supervisorStrategy.handleSupervisorFailing(self, children) - parent.tell(Failed(if (e.isInstanceOf[InterruptedException]) ActorInterruptedException(e) else e), self) // Wrap InterruptedExceptions - throw e // FIXME should we rethrow? + } finally { + e match { // Wrap InterruptedExceptions and rethrow + case _: InterruptedException ⇒ parent.tell(Failed(ActorInterruptedException(e)), self); throw e + case _ ⇒ parent.tell(Failed(e), self) + } + } } } diff --git a/akka-actor/src/main/scala/akka/util/NonFatal.scala b/akka-actor/src/main/scala/akka/util/NonFatal.scala index 3fb51b14f8..e14a491910 100644 --- a/akka-actor/src/main/scala/akka/util/NonFatal.scala +++ b/akka-actor/src/main/scala/akka/util/NonFatal.scala @@ -19,10 +19,11 @@ package akka.util * }}} */ object NonFatal { - - def unapply(t: Throwable): Option[Throwable] = if (isFatal(t)) None else Some(t) - - private def isFatal(t: Throwable): Boolean = - (t.isInstanceOf[VirtualMachineError] && !t.isInstanceOf[StackOverflowError]) || t.isInstanceOf[ThreadDeath] || t.isInstanceOf[InterruptedException] || t.isInstanceOf[LinkageError] + def unapply(t: Throwable): Option[Throwable] = t match { + case e: StackOverflowError ⇒ Some(e) // StackOverflowError ok even though it is a VirtualMachineError + // VirtualMachineError includes OutOfMemoryError and other fatal errors + case _: VirtualMachineError | _: ThreadDeath | _: InterruptedException | _: LinkageError ⇒ None + case e ⇒ Some(e) + } } From 20866be3afcb805ce0980090b5159cd059aac529 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 27 Apr 2012 11:51:19 +0200 Subject: [PATCH 3/3] Cleaning up and streamlining the error handling of the invoke methods as suggested by Roland --- .../src/main/scala/akka/actor/ActorCell.scala | 50 +++++++++---------- 1 file changed, 23 insertions(+), 27 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 7044ffdabe..eb7845a484 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -588,38 +588,34 @@ private[akka] class ActorCell( case ChildTerminated(child) ⇒ handleChildTerminated(child) } } catch { - case e @ (_: InterruptedException | NonFatal(_)) ⇒ try { - dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), "error while processing " + message), e)) - // prevent any further messages to be processed until the actor has been restarted - dispatcher.suspend(this) - if (actor ne null) actor.supervisorStrategy.handleSupervisorFailing(self, children) - } finally { - e match { // Wrap InterruptedExceptions and rethrow - case _: InterruptedException ⇒ parent.tell(Failed(ActorInterruptedException(e)), self); throw e - case _ ⇒ parent.tell(Failed(e), self) - } - } + case e @ (_: InterruptedException | NonFatal(_)) ⇒ handleInvokeFailure(e, "error while processing " + message) } } //Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status - final def invoke(messageHandle: Envelope) { + final def invoke(messageHandle: Envelope): Unit = try { currentMessage = messageHandle - try { - cancelReceiveTimeout() // FIXME: leave this here??? - messageHandle.message match { - case msg: AutoReceivedMessage ⇒ autoReceiveMessage(messageHandle) - case msg ⇒ actor(msg) - } - currentMessage = null // reset current message after successful invocation - } catch { - case e @ (_: InterruptedException | NonFatal(_)) ⇒ - dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), e.getMessage), e)) - dispatcher.suspend(this) // prevent any further messages to be processed until the actor has been restarted - if (actor ne null) actor.supervisorStrategy.handleSupervisorFailing(self, children) - parent.tell(Failed(if (e.isInstanceOf[InterruptedException]) ActorInterruptedException(e) else e), self) // Wrap InterruptedExceptions - } finally { - checkReceiveTimeout // Reschedule receive timeout + cancelReceiveTimeout() // FIXME: leave this here??? + messageHandle.message match { + case msg: AutoReceivedMessage ⇒ autoReceiveMessage(messageHandle) + case msg ⇒ actor(msg) + } + currentMessage = null // reset current message after successful invocation + } catch { + case e @ (_: InterruptedException | NonFatal(_)) ⇒ handleInvokeFailure(e, e.getMessage) + } finally { + checkReceiveTimeout // Reschedule receive timeout + } + + private final def handleInvokeFailure(t: Throwable, message: String): Unit = try { + dispatcher.reportFailure(new LogEventException(Error(t, self.path.toString, clazz(actor), message), t)) + // prevent any further messages to be processed until the actor has been restarted + dispatcher.suspend(this) + if (actor ne null) actor.supervisorStrategy.handleSupervisorFailing(self, children) + } finally { + t match { // Wrap InterruptedExceptions and rethrow + case _: InterruptedException ⇒ parent.tell(Failed(ActorInterruptedException(t)), self); throw t + case _ ⇒ parent.tell(Failed(t), self) } }