diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 387775ab7b..a48d5c4c45 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -382,8 +382,8 @@ private[akka] class ActorCell( case ChildTerminated(child) ⇒ todo = handleChildTerminated(child) case NoMessage ⇒ // only here to suppress warning } - } catch { - case e @ (_: InterruptedException | NonFatal(_)) ⇒ handleInvokeFailure(Nil, e, "error while processing " + message) + } catch handleNonFatalOrInterruptedException { e ⇒ + handleInvokeFailure(Nil, e, "error while processing " + message) } if (todo != null) systemInvoke(todo) } @@ -397,8 +397,8 @@ private[akka] class ActorCell( case msg ⇒ receiveMessage(msg) } currentMessage = null // reset current message after successful invocation - } catch { - case e @ (_: InterruptedException | NonFatal(_)) ⇒ handleInvokeFailure(Nil, e, e.getMessage) + } catch handleNonFatalOrInterruptedException { e ⇒ + handleInvokeFailure(Nil, e, e.getMessage) } finally { checkReceiveTimeout // Reschedule receive timeout } @@ -471,7 +471,13 @@ private[akka] class ActorCell( } } - protected def create(uid: Int): Unit = + protected def create(uid: Int): Unit = { + def clearOutActorIfNonNull(): Unit = { + if (actor != null) { + clearActorFields(actor) + actor = null // ensure that we know that we failed during creation + } + } try { this.uid = uid val created = newActor() @@ -480,11 +486,12 @@ private[akka] class ActorCell( checkReceiveTimeout if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(created), "started (" + created + ")")) } catch { + case e: InterruptedException ⇒ + clearOutActorIfNonNull() + Thread.currentThread().interrupt() + throw ActorInitializationException(self, "interruption during creation", e) case NonFatal(e) ⇒ - if (actor != null) { - clearActorFields(actor) - actor = null // ensure that we know that we failed during creation - } + clearOutActorIfNonNull() e match { case i: InstantiationException ⇒ throw ActorInitializationException(self, """exception during creation, this problem is likely to occur because the class of the Actor you tried to create is either, @@ -494,6 +501,7 @@ private[akka] class ActorCell( case x ⇒ throw ActorInitializationException(self, "exception during creation", x) } } + } private def supervise(child: ActorRef, async: Boolean, uid: Int): Unit = if (!isTerminating) { diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala b/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala index f897535c91..9c64d79a64 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala @@ -185,6 +185,10 @@ private[akka] trait Children { this: ActorCell ⇒ cell.provider.actorOf(cell.systemImpl, props, cell.self, cell.self.path / name, systemService = systemService, deploy = None, lookupDeploy = true, async = async) } catch { + case e: InterruptedException ⇒ + unreserveChild(name) + Thread.interrupted() // clear interrupted flag before throwing according to java convention + throw e case NonFatal(e) ⇒ unreserveChild(name) throw e diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala index b97313d794..053765296a 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala @@ -17,6 +17,7 @@ import akka.actor.Failed import akka.actor.PostRestartException import akka.event.Logging.Debug import scala.concurrent.duration.Duration +import scala.util.control.Exception._ private[akka] trait FaultHandling { this: ActorCell ⇒ @@ -65,10 +66,9 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ try { // if the actor fails in preRestart, we can do nothing but log it: it’s best-effort if (failedActor.context ne null) failedActor.preRestart(cause, optionalMessage) - } catch { - case NonFatal(e) ⇒ - val ex = new PreRestartException(self, e, cause, optionalMessage) - publish(Error(ex, self.path.toString, clazz(failedActor), e.getMessage)) + } catch handleNonFatalOrInterruptedException { e ⇒ + val ex = new PreRestartException(self, e, cause, optionalMessage) + publish(Error(ex, self.path.toString, clazz(failedActor), e.getMessage)) } finally { clearActorFields(failedActor) } @@ -174,20 +174,16 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ case _ ⇒ setFailed(self); Set.empty } suspendChildren(exceptFor = skip ++ childrenNotToSuspend) - // tell supervisor - t match { // Wrap InterruptedExceptions and, clear the flag and rethrow - case _: InterruptedException ⇒ - parent.tell(Failed(new ActorInterruptedException(t), uid), self) - Thread.interrupted() // clear interrupted flag before throwing according to java convention - throw t - case _ ⇒ parent.tell(Failed(t, uid), self) + t match { + // tell supervisor + case _: InterruptedException ⇒ parent.tell(Failed(new ActorInterruptedException(t), uid), self) + case _ ⇒ parent.tell(Failed(t, uid), self) } - } catch { - case NonFatal(e) ⇒ - publish(Error(e, self.path.toString, clazz(actor), - "emergency stop: exception in failure handling for " + t.getClass + Logging.stackTraceFor(t))) - try children foreach stop - finally finishTerminate() + } catch handleNonFatalOrInterruptedException { e ⇒ + publish(Error(e, self.path.toString, clazz(actor), + "emergency stop: exception in failure handling for " + t.getClass + Logging.stackTraceFor(t))) + try children foreach stop + finally finishTerminate() } } @@ -199,8 +195,8 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ * specific order. */ try if (a ne null) a.postStop() - catch { - case NonFatal(e) ⇒ publish(Error(e, self.path.toString, clazz(a), e.getMessage)) + catch handleNonFatalOrInterruptedException { e ⇒ + publish(Error(e, self.path.toString, clazz(a), e.getMessage)) } finally try dispatcher.detach(this) finally try parent.sendSystemMessage(ChildTerminated(self)) finally try parent ! NullMessage // read ScalaDoc of NullMessage to see why @@ -232,13 +228,12 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ // only after parent is up and running again do restart the children which were not stopped survivors foreach (child ⇒ try child.asInstanceOf[InternalActorRef].restart(cause) - catch { - case NonFatal(e) ⇒ publish(Error(e, self.path.toString, clazz(freshActor), "restarting " + child)) + catch handleNonFatalOrInterruptedException { e ⇒ + publish(Error(e, self.path.toString, clazz(freshActor), "restarting " + child)) }) - } catch { - case NonFatal(e) ⇒ - clearActorFields(actor) // in order to prevent preRestart() from happening again - handleInvokeFailure(survivors, new PostRestartException(self, e, cause), e.getMessage) + } catch handleNonFatalOrInterruptedException { e ⇒ + clearActorFields(actor) // in order to prevent preRestart() from happening again + handleInvokeFailure(survivors, new PostRestartException(self, e, cause), e.getMessage) } } @@ -267,8 +262,8 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ */ if (actor != null) { try actor.supervisorStrategy.handleChildTerminated(this, child, children) - catch { - case NonFatal(e) ⇒ handleInvokeFailure(Nil, e, "handleChildTerminated failed") + catch handleNonFatalOrInterruptedException { e ⇒ + handleInvokeFailure(Nil, e, "handleChildTerminated failed") } } /* @@ -282,4 +277,12 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ case _ ⇒ null } } + + final protected def handleNonFatalOrInterruptedException(thunk: (Throwable) ⇒ Unit): Catcher[Unit] = { + case e: InterruptedException ⇒ + thunk(e) + Thread.currentThread().interrupt() + case NonFatal(e) ⇒ + thunk(e) + } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index e3576a1184..86c908e95d 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -231,12 +231,11 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) if (next ne null) { if (Mailbox.debug) println(actor.self + " processing message " + next) actor invoke next - processAllSystemMessages() - if ((left > 1) && ((dispatcher.isThroughputDeadlineTimeDefined == false) || (System.nanoTime - deadlineNs) < 0)) { - processMailbox(left - 1, deadlineNs) - } else if (Thread.interrupted()) { + if (Thread.interrupted()) throw new InterruptedException("Interrupted while processing actor messages") - } + processAllSystemMessages() + if ((left > 1) && ((dispatcher.isThroughputDeadlineTimeDefined == false) || (System.nanoTime - deadlineNs) < 0)) + processMailbox(left - 1, deadlineNs) } } @@ -255,12 +254,10 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) nextMessage = nextMessage.next msg.next = null if (debug) println(actor.self + " processing system message " + msg + " with " + actor.childrenRefs) - try { - actor systemInvoke msg - } catch { - // we know here that systemInvoke ensures that only InterruptedException and "fatal" exceptions get rethrown - case e: InterruptedException ⇒ interruption = e - } + // we know here that systemInvoke ensures that only "fatal" exceptions get rethrown + actor systemInvoke msg + if (Thread.interrupted()) + interruption = new InterruptedException("Interrupted while processing system messages") // don’t ever execute normal message when system message present! if ((nextMessage eq null) && !isClosed) nextMessage = systemDrain(null) }