diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala index 049a5891e2..39b4299af1 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala @@ -147,7 +147,7 @@ object FSMTimingSpec { } def resume(actorRef: ActorRef): Unit = actorRef match { - case l: ActorRefWithCell ⇒ l.resume(inResponseToFailure = null) + case l: ActorRefWithCell ⇒ l.resume(causedByFailure = null) case _ ⇒ } diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 52b3686aa1..64d283d72a 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -348,7 +348,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa assertNoCountDown(done, 1000, "Should not process messages while suspended") assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, suspensions = 1) - a.resume(inResponseToFailure = null) + a.resume(causedByFailure = null) assertCountDown(done, 3.seconds.dilated.toMillis, "Should resume processing of messages when resumed") assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1, suspensions = 1, resumes = 1) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala index 41ce7db15f..a1de1f84bd 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala @@ -65,7 +65,7 @@ class PriorityDispatcherSpec extends AkkaSpec(PriorityDispatcherSpec.config) wit val msgs = (1 to 100).toList for (m ← msgs) actor ! m - actor.resume(inResponseToFailure = null) //Signal the actor to start treating it's message backlog + actor.resume(causedByFailure = null) //Signal the actor to start treating it's message backlog Await.result(actor.?('Result).mapTo[List[Int]], timeout.duration) must be === msgs.reverse } diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 86f910ac08..04a9499ebf 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -8,6 +8,7 @@ import akka.AkkaException import scala.reflect.BeanProperty import scala.util.control.NoStackTrace import java.util.regex.Pattern +import scala.annotation.tailrec /** * Marker trait to show which Messages are automatically handled by Akka @@ -99,19 +100,12 @@ private[akka] case class SelectParent(next: Any) extends SelectionPath * IllegalActorStateException is thrown when a core invariant in the Actor implementation has been violated. * For instance, if you try to create an Actor that doesn't extend Actor. */ -case class IllegalActorStateException private[akka] (message: String, cause: Throwable = null) - extends AkkaException(message, cause) { - def this(msg: String) = this(msg, null) -} +case class IllegalActorStateException private[akka] (message: String) extends AkkaException(message) /** * ActorKilledException is thrown when an Actor receives the akka.actor.Kill message */ -case class ActorKilledException private[akka] (message: String, cause: Throwable) - extends AkkaException(message, cause) - with NoStackTrace { - def this(msg: String) = this(msg, null) -} +case class ActorKilledException private[akka] (message: String) extends AkkaException(message) with NoStackTrace /** * An InvalidActorNameException is thrown when you try to convert something, usually a String, to an Actor name @@ -152,9 +146,12 @@ object ActorInitializationException { * @param origCause is the exception which caused the restart in the first place * @param msg is the message which was optionally passed into preRestart() */ -case class PreRestartException private[akka] (actor: ActorRef, cause: Throwable, origCause: Throwable, msg: Option[Any]) +case class PreRestartException private[akka] (actor: ActorRef, cause: Throwable, originalCause: Throwable, messageOption: Option[Any]) extends ActorInitializationException(actor, - "exception in preRestart(" + (if (origCause == null) "null" else origCause.getClass) + ", " + msg.map(_.getClass) + ")", cause) + "exception in preRestart(" + + (if (originalCause == null) "null" else originalCause.getClass) + ", " + + (messageOption match { case Some(m: AnyRef) ⇒ m.getClass; case _ ⇒ "None" }) + + ")", cause) /** * A PostRestartException is thrown when constructor or postRestart() method @@ -164,9 +161,9 @@ case class PreRestartException private[akka] (actor: ActorRef, cause: Throwable, * @param cause is the exception thrown by that actor within preRestart() * @param origCause is the exception which caused the restart in the first place */ -case class PostRestartException private[akka] (actor: ActorRef, cause: Throwable, origCause: Throwable) +case class PostRestartException private[akka] (actor: ActorRef, cause: Throwable, originalCause: Throwable) extends ActorInitializationException(actor, - "exception post restart (" + (if (origCause == null) "null" else origCause.getClass) + ")", cause) + "exception post restart (" + (if (originalCause == null) "null" else originalCause.getClass) + ")", cause) /** * This is an extractor for retrieving the original cause (i.e. the first @@ -176,7 +173,7 @@ case class PostRestartException private[akka] (actor: ActorRef, cause: Throwable */ object OriginalRestartException { def unapply(ex: PostRestartException): Option[Throwable] = { - def rec(ex: PostRestartException): Option[Throwable] = ex match { + @tailrec def rec(ex: PostRestartException): Option[Throwable] = ex match { case PostRestartException(_, _, e: PostRestartException) ⇒ rec(e) case PostRestartException(_, _, e) ⇒ Some(e) } @@ -188,10 +185,7 @@ object OriginalRestartException { * InvalidMessageException is thrown when an invalid message is sent to an Actor; * Currently only `null` is an invalid message. */ -case class InvalidMessageException private[akka] (message: String, cause: Throwable = null) - extends AkkaException(message, cause) { - def this(msg: String) = this(msg, null) -} +case class InvalidMessageException private[akka] (message: String) extends AkkaException(message) /** * A DeathPactException is thrown by an Actor that receives a Terminated(someActor) message diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 09523fbab3..13c449d99c 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -188,7 +188,7 @@ private[akka] trait Cell { /** * Recursively resume this actor and all its children. */ - def resume(inResponseToFailure: Throwable): Unit + def resume(causedByFailure: Throwable): Unit /** * Restart this actor (will recursively restart or stop all children). */ diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index e8cdf2955e..d74fc5293d 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -191,7 +191,7 @@ private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRe /* * Actor life-cycle management, invoked only internally (in response to user requests via ActorContext). */ - def resume(inResponseToFailure: Throwable): Unit + def resume(causedByFailure: Throwable): Unit def suspend(): Unit def restart(cause: Throwable): Unit def stop(): Unit @@ -288,7 +288,7 @@ private[akka] class LocalActorRef private[akka] ( /** * Resumes a suspended actor. */ - override def resume(inResponseToFailure: Throwable): Unit = actorCell.resume(inResponseToFailure) + override def resume(causedByFailure: Throwable): Unit = actorCell.resume(causedByFailure) /** * Shuts down the actor and its message queue @@ -388,7 +388,7 @@ private[akka] trait MinimalActorRef extends InternalActorRef with LocalRef { override def getChild(names: Iterator[String]): InternalActorRef = if (names.forall(_.isEmpty)) this else Nobody override def suspend(): Unit = () - override def resume(inResponseToFailure: Throwable): Unit = () + override def resume(causedByFailure: Throwable): Unit = () override def stop(): Unit = () override def isTerminated = false diff --git a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala index 64a4c2c7b1..9ef60f2316 100644 --- a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala @@ -265,7 +265,7 @@ abstract class SupervisorStrategy { * is not the currently failing child. Suspend/resume needs to be done in * matching pairs, otherwise actors will wake up too soon or never at all. */ - final def resumeChild(child: ActorRef, cause: Throwable): Unit = child.asInstanceOf[InternalActorRef].resume(inResponseToFailure = cause) + final def resumeChild(child: ActorRef, cause: Throwable): Unit = child.asInstanceOf[InternalActorRef].resume(causedByFailure = cause) /** * Restart the given child, possibly suspending it first. diff --git a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala index 7977be5e5b..57a375dbad 100644 --- a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala @@ -81,7 +81,7 @@ private[akka] class RepointableActorRef( def suspend(): Unit = underlying.suspend() - def resume(inResponseToFailure: Throwable): Unit = underlying.resume(inResponseToFailure) + def resume(causedByFailure: Throwable): Unit = underlying.resume(causedByFailure) def stop(): Unit = underlying.stop() @@ -171,7 +171,7 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, val self: Rep def system: ActorSystem = systemImpl def suspend(): Unit = { lock.lock(); try suspendCount += 1 finally lock.unlock() } - def resume(inResponseToFailure: Throwable): Unit = { lock.lock(); try suspendCount -= 1 finally lock.unlock() } + def resume(causedByFailure: Throwable): Unit = { lock.lock(); try suspendCount -= 1 finally lock.unlock() } def restart(cause: Throwable): Unit = { lock.lock(); try suspendCount -= 1 finally lock.unlock() } def stop(): Unit = sendSystemMessage(Terminate()) def isTerminated: Boolean = false diff --git a/akka-actor/src/main/scala/akka/actor/cell/Children.scala b/akka-actor/src/main/scala/akka/actor/cell/Children.scala index 900bc60a0c..2b0fa76db4 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/Children.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/Children.scala @@ -118,16 +118,16 @@ private[akka] trait Children { this: ActorCell ⇒ case _ ⇒ null } - protected def suspendChildren(skip: Set[ActorRef] = Set.empty): Unit = + protected def suspendChildren(exceptFor: Set[ActorRef] = Set.empty): Unit = childrenRefs.stats foreach { - case ChildRestartStats(child, _, _) if !(skip contains child) ⇒ child.asInstanceOf[InternalActorRef].suspend() + case ChildRestartStats(child, _, _) if !(exceptFor contains child) ⇒ child.asInstanceOf[InternalActorRef].suspend() case _ ⇒ } - protected def resumeChildren(inResponseToFailure: Throwable, perp: ActorRef): Unit = + protected def resumeChildren(causedByFailure: Throwable, perp: ActorRef): Unit = childrenRefs.stats foreach { case ChildRestartStats(child: InternalActorRef, _, _) ⇒ - child.resume(if (perp == child) inResponseToFailure else null) + child.resume(if (perp == child) causedByFailure else null) } def getChildByName(name: String): Option[ChildRestartStats] = childrenRefs.getByName(name) diff --git a/akka-actor/src/main/scala/akka/actor/cell/ChildrenContainer.scala b/akka-actor/src/main/scala/akka/actor/cell/ChildrenContainer.scala index 09303bd24d..1fc21c08eb 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/ChildrenContainer.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/ChildrenContainer.scala @@ -44,6 +44,7 @@ private[akka] object ChildrenContainer { sealed trait SuspendReason case object UserRequest extends SuspendReason + // careful with those system messages, all handling to be taking place in ActorCell.scala! case class Recreation(cause: Throwable, var todo: SystemMessage = null) extends SuspendReason case object Termination extends SuspendReason diff --git a/akka-actor/src/main/scala/akka/actor/cell/Dispatch.scala b/akka-actor/src/main/scala/akka/actor/cell/Dispatch.scala index a473e4df66..3e11071f82 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/Dispatch.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/Dispatch.scala @@ -62,7 +62,7 @@ private[akka] trait Dispatch { this: ActorCell ⇒ final def suspend(): Unit = dispatcher.systemDispatch(this, Suspend()) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - final def resume(inResponseToFailure: Throwable): Unit = dispatcher.systemDispatch(this, Resume(inResponseToFailure)) + final def resume(causedByFailure: Throwable): Unit = dispatcher.systemDispatch(this, Resume(causedByFailure)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ final def restart(cause: Throwable): Unit = dispatcher.systemDispatch(this, Recreate(cause)) diff --git a/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala index feabe3104d..b9155887e5 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala @@ -67,7 +67,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ if (!setChildrenTerminationReason(ChildrenContainer.Recreation(cause))) finishRecreate(cause, failedActor) } else { // need to keep that suspend counter balanced - faultResume(inResponseToFailure = null) + faultResume(causedByFailure = null) } /** @@ -83,21 +83,21 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ /** * Do resume the actor in response to a failure. * - * @param inResponseToFailure signifies if it was our own failure which + * @param causedByFailure signifies if it was our own failure which * prompted this action. */ - protected def faultResume(inResponseToFailure: Throwable): Unit = { - if ((actor == null || actor.context == null) && inResponseToFailure != null) { + protected def faultResume(causedByFailure: Throwable): Unit = { + if ((actor == null || actor.context == null) && causedByFailure != null) { system.eventStream.publish(Error(self.path.toString, clazz(actor), - "changing Resume into Restart after " + inResponseToFailure)) - faultRecreate(inResponseToFailure) + "changing Resume into Restart after " + causedByFailure)) + faultRecreate(causedByFailure) } else { val perp = perpetrator // done always to keep that suspend counter balanced // must happen “atomically” try resumeNonRecursive() - finally if (inResponseToFailure != null) clearFailed() - resumeChildren(inResponseToFailure, perp) + finally if (causedByFailure != null) clearFailed() + resumeChildren(causedByFailure, perp) } } @@ -134,7 +134,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ case Envelope(Failed(_), child) ⇒ setFailed(child); Set(child) case _ ⇒ setFailed(self); Set.empty } - suspendChildren(skip ++ childrenNotToSuspend) + suspendChildren(exceptFor = skip ++ childrenNotToSuspend) // tell supervisor t match { // Wrap InterruptedExceptions and rethrow case _: InterruptedException ⇒ parent.tell(Failed(new ActorInterruptedException(t)), self); throw t diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 34047677aa..31308ab4f7 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -90,7 +90,7 @@ private[akka] case class Suspend() extends SystemMessage // sent to self from Ac /** * INTERNAL API */ -private[akka] case class Resume(inResponseToFailure: Throwable) extends SystemMessage // sent to self from ActorCell.resume +private[akka] case class Resume(causedByFailure: Throwable) extends SystemMessage // sent to self from ActorCell.resume /** * INTERNAL API */ diff --git a/akka-agent/src/main/scala/akka/agent/Agent.scala b/akka-agent/src/main/scala/akka/agent/Agent.scala index 03ba363c89..09bf6f2ad1 100644 --- a/akka-agent/src/main/scala/akka/agent/Agent.scala +++ b/akka-agent/src/main/scala/akka/agent/Agent.scala @@ -214,7 +214,7 @@ class Agent[T](initialValue: T, system: ActorSystem) { /** * Resumes processing of `send` actions for the agent. */ - def resume(): Unit = updater.resume(inResponseToFailure = null) + def resume(): Unit = updater.resume(causedByFailure = null) /** * Closes the agents and makes it eligible for garbage collection. diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 910c57502a..131b5d76be 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -235,7 +235,7 @@ private[akka] class RemoteActorRef private[akka] ( def suspend(): Unit = sendSystemMessage(Suspend()) - def resume(inResponseToFailure: Throwable): Unit = sendSystemMessage(Resume(inResponseToFailure)) + def resume(causedByFailure: Throwable): Unit = sendSystemMessage(Resume(causedByFailure)) def stop(): Unit = sendSystemMessage(Terminate()) diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 5028ab7dc9..831033da03 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -171,7 +171,7 @@ class CallingThreadDispatcher( if (switched && !wasActive) { runQueue(mbox, queue) } - case m ⇒ m.systemEnqueue(actor.self, Resume(inResponseToFailure = null)) + case m ⇒ m.systemEnqueue(actor.self, Resume(causedByFailure = null)) } }