Merge pull request #1113 from akka/wip-3006-handle-interruption-in-the-dungeon-ban

Catch InterruptedException in the dungeon and mark the thread as interrupted. See #3006
This commit is contained in:
Björn Antonsson 2013-02-18 00:05:09 -08:00
commit 392b1ebef3
4 changed files with 59 additions and 47 deletions

View file

@ -382,8 +382,8 @@ private[akka] class ActorCell(
case ChildTerminated(child) todo = handleChildTerminated(child) case ChildTerminated(child) todo = handleChildTerminated(child)
case NoMessage // only here to suppress warning case NoMessage // only here to suppress warning
} }
} catch { } catch handleNonFatalOrInterruptedException { e
case e @ (_: InterruptedException | NonFatal(_)) handleInvokeFailure(Nil, e, "error while processing " + message) handleInvokeFailure(Nil, e, "error while processing " + message)
} }
if (todo != null) systemInvoke(todo) if (todo != null) systemInvoke(todo)
} }
@ -397,8 +397,8 @@ private[akka] class ActorCell(
case msg receiveMessage(msg) case msg receiveMessage(msg)
} }
currentMessage = null // reset current message after successful invocation currentMessage = null // reset current message after successful invocation
} catch { } catch handleNonFatalOrInterruptedException { e
case e @ (_: InterruptedException | NonFatal(_)) handleInvokeFailure(Nil, e, e.getMessage) handleInvokeFailure(Nil, e, e.getMessage)
} finally { } finally {
checkReceiveTimeout // Reschedule receive timeout checkReceiveTimeout // Reschedule receive timeout
} }
@ -471,7 +471,13 @@ private[akka] class ActorCell(
} }
} }
protected def create(uid: Int): Unit = protected def create(uid: Int): Unit = {
def clearOutActorIfNonNull(): Unit = {
if (actor != null) {
clearActorFields(actor)
actor = null // ensure that we know that we failed during creation
}
}
try { try {
this.uid = uid this.uid = uid
val created = newActor() val created = newActor()
@ -480,11 +486,12 @@ private[akka] class ActorCell(
checkReceiveTimeout checkReceiveTimeout
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(created), "started (" + created + ")")) if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(created), "started (" + created + ")"))
} catch { } catch {
case e: InterruptedException
clearOutActorIfNonNull()
Thread.currentThread().interrupt()
throw ActorInitializationException(self, "interruption during creation", e)
case NonFatal(e) case NonFatal(e)
if (actor != null) { clearOutActorIfNonNull()
clearActorFields(actor)
actor = null // ensure that we know that we failed during creation
}
e match { e match {
case i: InstantiationException throw ActorInitializationException(self, case i: InstantiationException throw ActorInitializationException(self,
"""exception during creation, this problem is likely to occur because the class of the Actor you tried to create is either, """exception during creation, this problem is likely to occur because the class of the Actor you tried to create is either,
@ -494,6 +501,7 @@ private[akka] class ActorCell(
case x throw ActorInitializationException(self, "exception during creation", x) case x throw ActorInitializationException(self, "exception during creation", x)
} }
} }
}
private def supervise(child: ActorRef, async: Boolean, uid: Int): Unit = private def supervise(child: ActorRef, async: Boolean, uid: Int): Unit =
if (!isTerminating) { if (!isTerminating) {

View file

@ -185,6 +185,10 @@ private[akka] trait Children { this: ActorCell ⇒
cell.provider.actorOf(cell.systemImpl, props, cell.self, cell.self.path / name, cell.provider.actorOf(cell.systemImpl, props, cell.self, cell.self.path / name,
systemService = systemService, deploy = None, lookupDeploy = true, async = async) systemService = systemService, deploy = None, lookupDeploy = true, async = async)
} catch { } catch {
case e: InterruptedException
unreserveChild(name)
Thread.interrupted() // clear interrupted flag before throwing according to java convention
throw e
case NonFatal(e) case NonFatal(e)
unreserveChild(name) unreserveChild(name)
throw e throw e

View file

@ -17,6 +17,7 @@ import akka.actor.Failed
import akka.actor.PostRestartException import akka.actor.PostRestartException
import akka.event.Logging.Debug import akka.event.Logging.Debug
import scala.concurrent.duration.Duration import scala.concurrent.duration.Duration
import scala.util.control.Exception._
private[akka] trait FaultHandling { this: ActorCell private[akka] trait FaultHandling { this: ActorCell
@ -65,10 +66,9 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
try { try {
// if the actor fails in preRestart, we can do nothing but log it: its best-effort // if the actor fails in preRestart, we can do nothing but log it: its best-effort
if (failedActor.context ne null) failedActor.preRestart(cause, optionalMessage) if (failedActor.context ne null) failedActor.preRestart(cause, optionalMessage)
} catch { } catch handleNonFatalOrInterruptedException { e
case NonFatal(e) val ex = new PreRestartException(self, e, cause, optionalMessage)
val ex = new PreRestartException(self, e, cause, optionalMessage) publish(Error(ex, self.path.toString, clazz(failedActor), e.getMessage))
publish(Error(ex, self.path.toString, clazz(failedActor), e.getMessage))
} finally { } finally {
clearActorFields(failedActor) clearActorFields(failedActor)
} }
@ -174,20 +174,16 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
case _ setFailed(self); Set.empty case _ setFailed(self); Set.empty
} }
suspendChildren(exceptFor = skip ++ childrenNotToSuspend) suspendChildren(exceptFor = skip ++ childrenNotToSuspend)
// tell supervisor t match {
t match { // Wrap InterruptedExceptions and, clear the flag and rethrow // tell supervisor
case _: InterruptedException case _: InterruptedException parent.tell(Failed(new ActorInterruptedException(t), uid), self)
parent.tell(Failed(new ActorInterruptedException(t), uid), self) case _ parent.tell(Failed(t, uid), self)
Thread.interrupted() // clear interrupted flag before throwing according to java convention
throw t
case _ parent.tell(Failed(t, uid), self)
} }
} catch { } catch handleNonFatalOrInterruptedException { e
case NonFatal(e) publish(Error(e, self.path.toString, clazz(actor),
publish(Error(e, self.path.toString, clazz(actor), "emergency stop: exception in failure handling for " + t.getClass + Logging.stackTraceFor(t)))
"emergency stop: exception in failure handling for " + t.getClass + Logging.stackTraceFor(t))) try children foreach stop
try children foreach stop finally finishTerminate()
finally finishTerminate()
} }
} }
@ -199,8 +195,8 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
* specific order. * specific order.
*/ */
try if (a ne null) a.postStop() try if (a ne null) a.postStop()
catch { catch handleNonFatalOrInterruptedException { e
case NonFatal(e) publish(Error(e, self.path.toString, clazz(a), e.getMessage)) publish(Error(e, self.path.toString, clazz(a), e.getMessage))
} finally try dispatcher.detach(this) } finally try dispatcher.detach(this)
finally try parent.sendSystemMessage(ChildTerminated(self)) finally try parent.sendSystemMessage(ChildTerminated(self))
finally try parent ! NullMessage // read ScalaDoc of NullMessage to see why finally try parent ! NullMessage // read ScalaDoc of NullMessage to see why
@ -232,13 +228,12 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
// only after parent is up and running again do restart the children which were not stopped // only after parent is up and running again do restart the children which were not stopped
survivors foreach (child survivors foreach (child
try child.asInstanceOf[InternalActorRef].restart(cause) try child.asInstanceOf[InternalActorRef].restart(cause)
catch { catch handleNonFatalOrInterruptedException { e
case NonFatal(e) publish(Error(e, self.path.toString, clazz(freshActor), "restarting " + child)) publish(Error(e, self.path.toString, clazz(freshActor), "restarting " + child))
}) })
} catch { } catch handleNonFatalOrInterruptedException { e
case NonFatal(e) clearActorFields(actor) // in order to prevent preRestart() from happening again
clearActorFields(actor) // in order to prevent preRestart() from happening again handleInvokeFailure(survivors, new PostRestartException(self, e, cause), e.getMessage)
handleInvokeFailure(survivors, new PostRestartException(self, e, cause), e.getMessage)
} }
} }
@ -267,8 +262,8 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
*/ */
if (actor != null) { if (actor != null) {
try actor.supervisorStrategy.handleChildTerminated(this, child, children) try actor.supervisorStrategy.handleChildTerminated(this, child, children)
catch { catch handleNonFatalOrInterruptedException { e
case NonFatal(e) handleInvokeFailure(Nil, e, "handleChildTerminated failed") handleInvokeFailure(Nil, e, "handleChildTerminated failed")
} }
} }
/* /*
@ -282,4 +277,12 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
case _ null case _ null
} }
} }
final protected def handleNonFatalOrInterruptedException(thunk: (Throwable) Unit): Catcher[Unit] = {
case e: InterruptedException
thunk(e)
Thread.currentThread().interrupt()
case NonFatal(e)
thunk(e)
}
} }

View file

@ -231,12 +231,11 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
if (next ne null) { if (next ne null) {
if (Mailbox.debug) println(actor.self + " processing message " + next) if (Mailbox.debug) println(actor.self + " processing message " + next)
actor invoke next actor invoke next
processAllSystemMessages() if (Thread.interrupted())
if ((left > 1) && ((dispatcher.isThroughputDeadlineTimeDefined == false) || (System.nanoTime - deadlineNs) < 0)) {
processMailbox(left - 1, deadlineNs)
} else if (Thread.interrupted()) {
throw new InterruptedException("Interrupted while processing actor messages") throw new InterruptedException("Interrupted while processing actor messages")
} processAllSystemMessages()
if ((left > 1) && ((dispatcher.isThroughputDeadlineTimeDefined == false) || (System.nanoTime - deadlineNs) < 0))
processMailbox(left - 1, deadlineNs)
} }
} }
@ -255,12 +254,10 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
nextMessage = nextMessage.next nextMessage = nextMessage.next
msg.next = null msg.next = null
if (debug) println(actor.self + " processing system message " + msg + " with " + actor.childrenRefs) if (debug) println(actor.self + " processing system message " + msg + " with " + actor.childrenRefs)
try { // we know here that systemInvoke ensures that only "fatal" exceptions get rethrown
actor systemInvoke msg actor systemInvoke msg
} catch { if (Thread.interrupted())
// we know here that systemInvoke ensures that only InterruptedException and "fatal" exceptions get rethrown interruption = new InterruptedException("Interrupted while processing system messages")
case e: InterruptedException interruption = e
}
// dont ever execute normal message when system message present! // dont ever execute normal message when system message present!
if ((nextMessage eq null) && !isClosed) nextMessage = systemDrain(null) if ((nextMessage eq null) && !isClosed) nextMessage = systemDrain(null)
} }