some cleanup after Viktor’s comments
- remove useless `cause` argument from some akka exceptions - improve names of some method arguments - eliminate some closure allocation
This commit is contained in:
parent
810d65068e
commit
6145d4313b
16 changed files with 41 additions and 46 deletions
|
|
@ -147,7 +147,7 @@ object FSMTimingSpec {
|
|||
}
|
||||
|
||||
def resume(actorRef: ActorRef): Unit = actorRef match {
|
||||
case l: ActorRefWithCell ⇒ l.resume(inResponseToFailure = null)
|
||||
case l: ActorRefWithCell ⇒ l.resume(causedByFailure = null)
|
||||
case _ ⇒
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -348,7 +348,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa
|
|||
assertNoCountDown(done, 1000, "Should not process messages while suspended")
|
||||
assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, suspensions = 1)
|
||||
|
||||
a.resume(inResponseToFailure = null)
|
||||
a.resume(causedByFailure = null)
|
||||
assertCountDown(done, 3.seconds.dilated.toMillis, "Should resume processing of messages when resumed")
|
||||
assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1,
|
||||
suspensions = 1, resumes = 1)
|
||||
|
|
|
|||
|
|
@ -65,7 +65,7 @@ class PriorityDispatcherSpec extends AkkaSpec(PriorityDispatcherSpec.config) wit
|
|||
val msgs = (1 to 100).toList
|
||||
for (m ← msgs) actor ! m
|
||||
|
||||
actor.resume(inResponseToFailure = null) //Signal the actor to start treating it's message backlog
|
||||
actor.resume(causedByFailure = null) //Signal the actor to start treating it's message backlog
|
||||
|
||||
Await.result(actor.?('Result).mapTo[List[Int]], timeout.duration) must be === msgs.reverse
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import akka.AkkaException
|
|||
import scala.reflect.BeanProperty
|
||||
import scala.util.control.NoStackTrace
|
||||
import java.util.regex.Pattern
|
||||
import scala.annotation.tailrec
|
||||
|
||||
/**
|
||||
* Marker trait to show which Messages are automatically handled by Akka
|
||||
|
|
@ -99,19 +100,12 @@ private[akka] case class SelectParent(next: Any) extends SelectionPath
|
|||
* IllegalActorStateException is thrown when a core invariant in the Actor implementation has been violated.
|
||||
* For instance, if you try to create an Actor that doesn't extend Actor.
|
||||
*/
|
||||
case class IllegalActorStateException private[akka] (message: String, cause: Throwable = null)
|
||||
extends AkkaException(message, cause) {
|
||||
def this(msg: String) = this(msg, null)
|
||||
}
|
||||
case class IllegalActorStateException private[akka] (message: String) extends AkkaException(message)
|
||||
|
||||
/**
|
||||
* ActorKilledException is thrown when an Actor receives the akka.actor.Kill message
|
||||
*/
|
||||
case class ActorKilledException private[akka] (message: String, cause: Throwable)
|
||||
extends AkkaException(message, cause)
|
||||
with NoStackTrace {
|
||||
def this(msg: String) = this(msg, null)
|
||||
}
|
||||
case class ActorKilledException private[akka] (message: String) extends AkkaException(message) with NoStackTrace
|
||||
|
||||
/**
|
||||
* An InvalidActorNameException is thrown when you try to convert something, usually a String, to an Actor name
|
||||
|
|
@ -152,9 +146,12 @@ object ActorInitializationException {
|
|||
* @param origCause is the exception which caused the restart in the first place
|
||||
* @param msg is the message which was optionally passed into preRestart()
|
||||
*/
|
||||
case class PreRestartException private[akka] (actor: ActorRef, cause: Throwable, origCause: Throwable, msg: Option[Any])
|
||||
case class PreRestartException private[akka] (actor: ActorRef, cause: Throwable, originalCause: Throwable, messageOption: Option[Any])
|
||||
extends ActorInitializationException(actor,
|
||||
"exception in preRestart(" + (if (origCause == null) "null" else origCause.getClass) + ", " + msg.map(_.getClass) + ")", cause)
|
||||
"exception in preRestart(" +
|
||||
(if (originalCause == null) "null" else originalCause.getClass) + ", " +
|
||||
(messageOption match { case Some(m: AnyRef) ⇒ m.getClass; case _ ⇒ "None" }) +
|
||||
")", cause)
|
||||
|
||||
/**
|
||||
* A PostRestartException is thrown when constructor or postRestart() method
|
||||
|
|
@ -164,9 +161,9 @@ case class PreRestartException private[akka] (actor: ActorRef, cause: Throwable,
|
|||
* @param cause is the exception thrown by that actor within preRestart()
|
||||
* @param origCause is the exception which caused the restart in the first place
|
||||
*/
|
||||
case class PostRestartException private[akka] (actor: ActorRef, cause: Throwable, origCause: Throwable)
|
||||
case class PostRestartException private[akka] (actor: ActorRef, cause: Throwable, originalCause: Throwable)
|
||||
extends ActorInitializationException(actor,
|
||||
"exception post restart (" + (if (origCause == null) "null" else origCause.getClass) + ")", cause)
|
||||
"exception post restart (" + (if (originalCause == null) "null" else originalCause.getClass) + ")", cause)
|
||||
|
||||
/**
|
||||
* This is an extractor for retrieving the original cause (i.e. the first
|
||||
|
|
@ -176,7 +173,7 @@ case class PostRestartException private[akka] (actor: ActorRef, cause: Throwable
|
|||
*/
|
||||
object OriginalRestartException {
|
||||
def unapply(ex: PostRestartException): Option[Throwable] = {
|
||||
def rec(ex: PostRestartException): Option[Throwable] = ex match {
|
||||
@tailrec def rec(ex: PostRestartException): Option[Throwable] = ex match {
|
||||
case PostRestartException(_, _, e: PostRestartException) ⇒ rec(e)
|
||||
case PostRestartException(_, _, e) ⇒ Some(e)
|
||||
}
|
||||
|
|
@ -188,10 +185,7 @@ object OriginalRestartException {
|
|||
* InvalidMessageException is thrown when an invalid message is sent to an Actor;
|
||||
* Currently only `null` is an invalid message.
|
||||
*/
|
||||
case class InvalidMessageException private[akka] (message: String, cause: Throwable = null)
|
||||
extends AkkaException(message, cause) {
|
||||
def this(msg: String) = this(msg, null)
|
||||
}
|
||||
case class InvalidMessageException private[akka] (message: String) extends AkkaException(message)
|
||||
|
||||
/**
|
||||
* A DeathPactException is thrown by an Actor that receives a Terminated(someActor) message
|
||||
|
|
|
|||
|
|
@ -188,7 +188,7 @@ private[akka] trait Cell {
|
|||
/**
|
||||
* Recursively resume this actor and all its children.
|
||||
*/
|
||||
def resume(inResponseToFailure: Throwable): Unit
|
||||
def resume(causedByFailure: Throwable): Unit
|
||||
/**
|
||||
* Restart this actor (will recursively restart or stop all children).
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -191,7 +191,7 @@ private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRe
|
|||
/*
|
||||
* Actor life-cycle management, invoked only internally (in response to user requests via ActorContext).
|
||||
*/
|
||||
def resume(inResponseToFailure: Throwable): Unit
|
||||
def resume(causedByFailure: Throwable): Unit
|
||||
def suspend(): Unit
|
||||
def restart(cause: Throwable): Unit
|
||||
def stop(): Unit
|
||||
|
|
@ -288,7 +288,7 @@ private[akka] class LocalActorRef private[akka] (
|
|||
/**
|
||||
* Resumes a suspended actor.
|
||||
*/
|
||||
override def resume(inResponseToFailure: Throwable): Unit = actorCell.resume(inResponseToFailure)
|
||||
override def resume(causedByFailure: Throwable): Unit = actorCell.resume(causedByFailure)
|
||||
|
||||
/**
|
||||
* Shuts down the actor and its message queue
|
||||
|
|
@ -388,7 +388,7 @@ private[akka] trait MinimalActorRef extends InternalActorRef with LocalRef {
|
|||
override def getChild(names: Iterator[String]): InternalActorRef = if (names.forall(_.isEmpty)) this else Nobody
|
||||
|
||||
override def suspend(): Unit = ()
|
||||
override def resume(inResponseToFailure: Throwable): Unit = ()
|
||||
override def resume(causedByFailure: Throwable): Unit = ()
|
||||
override def stop(): Unit = ()
|
||||
override def isTerminated = false
|
||||
|
||||
|
|
|
|||
|
|
@ -265,7 +265,7 @@ abstract class SupervisorStrategy {
|
|||
* is not the currently failing child</b>. Suspend/resume needs to be done in
|
||||
* matching pairs, otherwise actors will wake up too soon or never at all.
|
||||
*/
|
||||
final def resumeChild(child: ActorRef, cause: Throwable): Unit = child.asInstanceOf[InternalActorRef].resume(inResponseToFailure = cause)
|
||||
final def resumeChild(child: ActorRef, cause: Throwable): Unit = child.asInstanceOf[InternalActorRef].resume(causedByFailure = cause)
|
||||
|
||||
/**
|
||||
* Restart the given child, possibly suspending it first.
|
||||
|
|
|
|||
|
|
@ -81,7 +81,7 @@ private[akka] class RepointableActorRef(
|
|||
|
||||
def suspend(): Unit = underlying.suspend()
|
||||
|
||||
def resume(inResponseToFailure: Throwable): Unit = underlying.resume(inResponseToFailure)
|
||||
def resume(causedByFailure: Throwable): Unit = underlying.resume(causedByFailure)
|
||||
|
||||
def stop(): Unit = underlying.stop()
|
||||
|
||||
|
|
@ -171,7 +171,7 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, val self: Rep
|
|||
|
||||
def system: ActorSystem = systemImpl
|
||||
def suspend(): Unit = { lock.lock(); try suspendCount += 1 finally lock.unlock() }
|
||||
def resume(inResponseToFailure: Throwable): Unit = { lock.lock(); try suspendCount -= 1 finally lock.unlock() }
|
||||
def resume(causedByFailure: Throwable): Unit = { lock.lock(); try suspendCount -= 1 finally lock.unlock() }
|
||||
def restart(cause: Throwable): Unit = { lock.lock(); try suspendCount -= 1 finally lock.unlock() }
|
||||
def stop(): Unit = sendSystemMessage(Terminate())
|
||||
def isTerminated: Boolean = false
|
||||
|
|
|
|||
|
|
@ -118,16 +118,16 @@ private[akka] trait Children { this: ActorCell ⇒
|
|||
case _ ⇒ null
|
||||
}
|
||||
|
||||
protected def suspendChildren(skip: Set[ActorRef] = Set.empty): Unit =
|
||||
protected def suspendChildren(exceptFor: Set[ActorRef] = Set.empty): Unit =
|
||||
childrenRefs.stats foreach {
|
||||
case ChildRestartStats(child, _, _) if !(skip contains child) ⇒ child.asInstanceOf[InternalActorRef].suspend()
|
||||
case ChildRestartStats(child, _, _) if !(exceptFor contains child) ⇒ child.asInstanceOf[InternalActorRef].suspend()
|
||||
case _ ⇒
|
||||
}
|
||||
|
||||
protected def resumeChildren(inResponseToFailure: Throwable, perp: ActorRef): Unit =
|
||||
protected def resumeChildren(causedByFailure: Throwable, perp: ActorRef): Unit =
|
||||
childrenRefs.stats foreach {
|
||||
case ChildRestartStats(child: InternalActorRef, _, _) ⇒
|
||||
child.resume(if (perp == child) inResponseToFailure else null)
|
||||
child.resume(if (perp == child) causedByFailure else null)
|
||||
}
|
||||
|
||||
def getChildByName(name: String): Option[ChildRestartStats] = childrenRefs.getByName(name)
|
||||
|
|
|
|||
|
|
@ -44,6 +44,7 @@ private[akka] object ChildrenContainer {
|
|||
|
||||
sealed trait SuspendReason
|
||||
case object UserRequest extends SuspendReason
|
||||
// careful with those system messages, all handling to be taking place in ActorCell.scala!
|
||||
case class Recreation(cause: Throwable, var todo: SystemMessage = null) extends SuspendReason
|
||||
case object Termination extends SuspendReason
|
||||
|
||||
|
|
|
|||
|
|
@ -62,7 +62,7 @@ private[akka] trait Dispatch { this: ActorCell ⇒
|
|||
final def suspend(): Unit = dispatcher.systemDispatch(this, Suspend())
|
||||
|
||||
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
|
||||
final def resume(inResponseToFailure: Throwable): Unit = dispatcher.systemDispatch(this, Resume(inResponseToFailure))
|
||||
final def resume(causedByFailure: Throwable): Unit = dispatcher.systemDispatch(this, Resume(causedByFailure))
|
||||
|
||||
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
|
||||
final def restart(cause: Throwable): Unit = dispatcher.systemDispatch(this, Recreate(cause))
|
||||
|
|
|
|||
|
|
@ -67,7 +67,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
|
|||
if (!setChildrenTerminationReason(ChildrenContainer.Recreation(cause))) finishRecreate(cause, failedActor)
|
||||
} else {
|
||||
// need to keep that suspend counter balanced
|
||||
faultResume(inResponseToFailure = null)
|
||||
faultResume(causedByFailure = null)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -83,21 +83,21 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
|
|||
/**
|
||||
* Do resume the actor in response to a failure.
|
||||
*
|
||||
* @param inResponseToFailure signifies if it was our own failure which
|
||||
* @param causedByFailure signifies if it was our own failure which
|
||||
* prompted this action.
|
||||
*/
|
||||
protected def faultResume(inResponseToFailure: Throwable): Unit = {
|
||||
if ((actor == null || actor.context == null) && inResponseToFailure != null) {
|
||||
protected def faultResume(causedByFailure: Throwable): Unit = {
|
||||
if ((actor == null || actor.context == null) && causedByFailure != null) {
|
||||
system.eventStream.publish(Error(self.path.toString, clazz(actor),
|
||||
"changing Resume into Restart after " + inResponseToFailure))
|
||||
faultRecreate(inResponseToFailure)
|
||||
"changing Resume into Restart after " + causedByFailure))
|
||||
faultRecreate(causedByFailure)
|
||||
} else {
|
||||
val perp = perpetrator
|
||||
// done always to keep that suspend counter balanced
|
||||
// must happen “atomically”
|
||||
try resumeNonRecursive()
|
||||
finally if (inResponseToFailure != null) clearFailed()
|
||||
resumeChildren(inResponseToFailure, perp)
|
||||
finally if (causedByFailure != null) clearFailed()
|
||||
resumeChildren(causedByFailure, perp)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -134,7 +134,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
|
|||
case Envelope(Failed(_), child) ⇒ setFailed(child); Set(child)
|
||||
case _ ⇒ setFailed(self); Set.empty
|
||||
}
|
||||
suspendChildren(skip ++ childrenNotToSuspend)
|
||||
suspendChildren(exceptFor = skip ++ childrenNotToSuspend)
|
||||
// tell supervisor
|
||||
t match { // Wrap InterruptedExceptions and rethrow
|
||||
case _: InterruptedException ⇒ parent.tell(Failed(new ActorInterruptedException(t)), self); throw t
|
||||
|
|
|
|||
|
|
@ -90,7 +90,7 @@ private[akka] case class Suspend() extends SystemMessage // sent to self from Ac
|
|||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] case class Resume(inResponseToFailure: Throwable) extends SystemMessage // sent to self from ActorCell.resume
|
||||
private[akka] case class Resume(causedByFailure: Throwable) extends SystemMessage // sent to self from ActorCell.resume
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -214,7 +214,7 @@ class Agent[T](initialValue: T, system: ActorSystem) {
|
|||
/**
|
||||
* Resumes processing of `send` actions for the agent.
|
||||
*/
|
||||
def resume(): Unit = updater.resume(inResponseToFailure = null)
|
||||
def resume(): Unit = updater.resume(causedByFailure = null)
|
||||
|
||||
/**
|
||||
* Closes the agents and makes it eligible for garbage collection.
|
||||
|
|
|
|||
|
|
@ -235,7 +235,7 @@ private[akka] class RemoteActorRef private[akka] (
|
|||
|
||||
def suspend(): Unit = sendSystemMessage(Suspend())
|
||||
|
||||
def resume(inResponseToFailure: Throwable): Unit = sendSystemMessage(Resume(inResponseToFailure))
|
||||
def resume(causedByFailure: Throwable): Unit = sendSystemMessage(Resume(causedByFailure))
|
||||
|
||||
def stop(): Unit = sendSystemMessage(Terminate())
|
||||
|
||||
|
|
|
|||
|
|
@ -171,7 +171,7 @@ class CallingThreadDispatcher(
|
|||
if (switched && !wasActive) {
|
||||
runQueue(mbox, queue)
|
||||
}
|
||||
case m ⇒ m.systemEnqueue(actor.self, Resume(inResponseToFailure = null))
|
||||
case m ⇒ m.systemEnqueue(actor.self, Resume(causedByFailure = null))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue