Merge pull request #422 from akka/wip-1880-revise-error-handling-in-system-invoke-√

Switching to a more streamlined approach for systemInvoke error handling...
This commit is contained in:
viktorklang 2012-04-27 04:25:58 -07:00
commit 855b88b486
2 changed files with 26 additions and 55 deletions

View file

@ -509,14 +509,7 @@ private[akka] class ActorCell(
checkReceiveTimeout checkReceiveTimeout
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(created), "started (" + created + ")")) if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(created), "started (" + created + ")"))
} catch { } catch {
case NonFatal(e) case NonFatal(e) throw ActorInitializationException(self, "exception during creation", e)
try {
dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), "error while creating actor"), e))
// prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
} finally {
parent.tell(Failed(ActorInitializationException(self, "exception during creation", e)), self)
}
} }
} }
@ -540,13 +533,7 @@ private[akka] class ActorCell(
doRecreate(cause, failedActor) doRecreate(cause, failedActor)
} }
} catch { } catch {
case NonFatal(e) try { case NonFatal(e) throw ActorInitializationException(self, "exception during creation", e)
dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), "error while creating actor"), e))
// prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
} finally {
parent.tell(Failed(ActorInitializationException(self, "exception during re-creation", e)), self)
}
} }
} }
@ -601,18 +588,13 @@ private[akka] class ActorCell(
case ChildTerminated(child) handleChildTerminated(child) case ChildTerminated(child) handleChildTerminated(child)
} }
} catch { } catch {
case NonFatal(e) case e @ (_: InterruptedException | NonFatal(_)) handleInvokeFailure(e, "error while processing " + message)
dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), "error while processing " + message), e))
throw e
} }
} }
//Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status //Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status
final def invoke(messageHandle: Envelope) { final def invoke(messageHandle: Envelope): Unit = try {
try {
currentMessage = messageHandle currentMessage = messageHandle
try {
try {
cancelReceiveTimeout() // FIXME: leave this here??? cancelReceiveTimeout() // FIXME: leave this here???
messageHandle.message match { messageHandle.message match {
case msg: AutoReceivedMessage autoReceiveMessage(messageHandle) case msg: AutoReceivedMessage autoReceiveMessage(messageHandle)
@ -620,29 +602,20 @@ private[akka] class ActorCell(
} }
currentMessage = null // reset current message after successful invocation currentMessage = null // reset current message after successful invocation
} catch { } catch {
case e: InterruptedException case e @ (_: InterruptedException | NonFatal(_)) handleInvokeFailure(e, e.getMessage)
dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), e.getMessage), e))
// prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
// make sure that InterruptedException does not leave this thread
val ex = ActorInterruptedException(e)
actor.supervisorStrategy.handleSupervisorFailing(self, children)
parent.tell(Failed(ex), self)
throw e //Re-throw InterruptedExceptions as expected
case NonFatal(e)
dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), e.getMessage), e))
// prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
if (actor ne null) actor.supervisorStrategy.handleSupervisorFailing(self, children)
parent.tell(Failed(e), self)
} finally { } finally {
checkReceiveTimeout // Reschedule receive timeout checkReceiveTimeout // Reschedule receive timeout
} }
} catch {
case NonFatal(e) private final def handleInvokeFailure(t: Throwable, message: String): Unit = try {
dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), e.getMessage), e)) dispatcher.reportFailure(new LogEventException(Error(t, self.path.toString, clazz(actor), message), t))
throw e // prevent any further messages to be processed until the actor has been restarted
} dispatcher.suspend(this)
if (actor ne null) actor.supervisorStrategy.handleSupervisorFailing(self, children)
} finally {
t match { // Wrap InterruptedExceptions and rethrow
case _: InterruptedException parent.tell(Failed(ActorInterruptedException(t)), self); throw t
case _ parent.tell(Failed(t), self)
} }
} }

View file

@ -19,13 +19,11 @@ package akka.util
* }}} * }}}
*/ */
object NonFatal { object NonFatal {
def unapply(t: Throwable): Option[Throwable] = t match { def unapply(t: Throwable): Option[Throwable] = t match {
case e: StackOverflowError Some(e) // StackOverflowError ok even though it is a VirtualMachineError case e: StackOverflowError Some(e) // StackOverflowError ok even though it is a VirtualMachineError
// VirtualMachineError includes OutOfMemoryError and other fatal errors // VirtualMachineError includes OutOfMemoryError and other fatal errors
case _: VirtualMachineError | _: ThreadDeath | _: InterruptedException | _: LinkageError None case _: VirtualMachineError | _: ThreadDeath | _: InterruptedException | _: LinkageError None
case e Some(e) case e Some(e)
} }
} }