remove ActorRef from Failed/ChildTerminated and make some warnings nicer

- the same information is transmitted as sender, hence enabling
  ChildTerminated to become a singleton
- make lastSender accessible in TestKit (needed now for DeathWatchSpec)
- fix nasty infinite loop when logging at the wrong moment during
  shutdown
This commit is contained in:
Roland 2011-11-12 22:37:12 +01:00
parent 1ba168774f
commit 02a5cd081c
14 changed files with 57 additions and 49 deletions

View file

@ -107,9 +107,9 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
case class FF(fail: Failed)
val supervisor = actorOf(Props[Supervisor]
.withFaultHandler(new OneForOneStrategy(FaultHandlingStrategy.makeDecider(List(classOf[Exception])), Some(0)) {
override def handleFailure(fail: Failed, stats: ChildRestartStats, children: Iterable[(ActorRef, ChildRestartStats)]) = {
testActor ! FF(fail)
super.handleFailure(fail, stats, children)
override def handleFailure(child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[(ActorRef, ChildRestartStats)]) = {
testActor.tell(FF(Failed(cause)), child)
super.handleFailure(child, cause, stats, children)
}
}))
@ -119,9 +119,9 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
failed ! Kill
val result = receiveWhile(3 seconds, messages = 3) {
case FF(Failed(`failed`, _: ActorKilledException)) 1
case FF(Failed(`brother`, DeathPactException(`failed`))) 2
case Terminated(`brother`) 3
case FF(Failed(_: ActorKilledException)) if lastSender eq failed 1
case FF(Failed(DeathPactException(`failed`))) if lastSender eq brother 2
case Terminated(`brother`) 3
}
testActor must not be 'shutdown
result must be(Seq(1, 2, 3))

View file

@ -52,10 +52,9 @@ case class HotSwap(code: ActorRef ⇒ Actor.Receive, discardOld: Boolean = true)
def this(code: akka.japi.Function[ActorRef, Procedure[Any]]) = this(code, true)
}
case class Failed(@BeanProperty actor: ActorRef,
@BeanProperty cause: Throwable) extends AutoReceivedMessage with PossiblyHarmful
case class Failed(cause: Throwable) extends AutoReceivedMessage with PossiblyHarmful
case class ChildTerminated(@BeanProperty child: ActorRef) extends AutoReceivedMessage with PossiblyHarmful
case object ChildTerminated extends AutoReceivedMessage with PossiblyHarmful
case object RevertHotSwap extends AutoReceivedMessage with PossiblyHarmful
@ -419,8 +418,8 @@ trait Actor {
msg match {
case HotSwap(code, discardOld) become(code(self), discardOld)
case RevertHotSwap unbecome()
case f: Failed context.handleFailure(f)
case ct: ChildTerminated context.handleChildTerminated(ct.child)
case Failed(cause) context.handleFailure(sender, cause)
case ChildTerminated context.handleChildTerminated(sender)
case Kill throw new ActorKilledException("Kill")
case PoisonPill self.stop()
}

View file

@ -39,7 +39,7 @@ trait ActorContext extends ActorRefFactory with TypedActorFactory {
def dispatcher: MessageDispatcher
def handleFailure(fail: Failed): Unit
def handleFailure(child: ActorRef, cause: Throwable): Unit
def handleChildTerminated(child: ActorRef): Unit
@ -134,7 +134,8 @@ private[akka] class ActorCell(
else childrenRefs.get(name)
}
final def tell(message: Any, sender: ActorRef): Unit = dispatcher.dispatch(this, Envelope(message, sender))
final def tell(message: Any, sender: ActorRef): Unit =
dispatcher.dispatch(this, Envelope(message, if (sender eq null) app.deadLetters else sender))
final def sender: ActorRef = currentMessage match {
case null app.deadLetters
@ -175,7 +176,7 @@ private[akka] class ActorCell(
// prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
} finally {
parent ! Failed(self, ActorInitializationException(self, "exception during creation", e))
parent.tell(Failed(ActorInitializationException(self, "exception during creation", e)), self)
}
}
@ -206,7 +207,7 @@ private[akka] class ActorCell(
// prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
} finally {
parent ! Failed(self, ActorInitializationException(self, "exception during re-creation", e))
parent.tell(Failed(ActorInitializationException(self, "exception during re-creation", e)), self)
}
}
@ -236,7 +237,7 @@ private[akka] class ActorCell(
}
} finally {
try {
parent ! ChildTerminated(self)
parent.tell(ChildTerminated, self)
app.deathWatch.publish(Terminated(self))
} finally {
currentMessage = null
@ -302,11 +303,11 @@ private[akka] class ActorCell(
if (e.isInstanceOf[InterruptedException]) {
val ex = ActorInterruptedException(e)
props.faultHandler.handleSupervisorFailing(self, children)
parent ! Failed(self, ex)
parent.tell(Failed(ex), self)
throw e //Re-throw InterruptedExceptions as expected
} else {
props.faultHandler.handleSupervisorFailing(self, children)
parent ! Failed(self, e)
parent.tell(Failed(e), self)
}
} finally {
checkReceiveTimeout // Reschedule receive timeout
@ -320,9 +321,9 @@ private[akka] class ActorCell(
}
}
final def handleFailure(fail: Failed): Unit = childrenStats.get(fail.actor) match {
case Some(stats) if (!props.faultHandler.handleFailure(fail, stats, childrenStats)) throw fail.cause
case None app.eventStream.publish(Warning(self, "dropping " + fail + " from unknown child"))
final def handleFailure(child: ActorRef, cause: Throwable): Unit = childrenStats.get(child) match {
case Some(stats) if (!props.faultHandler.handleFailure(child, cause, stats, childrenStats)) throw cause
case None app.eventStream.publish(Warning(self, "dropping Failed(" + cause + ") from unknown child"))
}
final def handleChildTerminated(child: ActorRef): Unit = {

View file

@ -132,9 +132,9 @@ class LocalActorRefProvider(val app: ActorSystem) extends ActorRefProvider {
def isShutdown = stopped
override def tell(msg: Any, sender: ActorRef): Unit = msg match {
case Failed(child, ex) child.stop()
case ChildTerminated(child) terminationFuture.completeWithResult(ActorSystem.Stopped)
case _ log.error(this + " received unexpected message " + msg)
case Failed(ex) sender.stop()
case ChildTerminated terminationFuture.completeWithResult(ActorSystem.Stopped)
case _ log.error(this + " received unexpected message " + msg)
}
protected[akka] override def sendSystemMessage(message: SystemMessage) {

View file

@ -105,7 +105,7 @@ class ActorSystem(val name: String, val config: Configuration) extends ActorRefF
val DebugAutoReceive = getBool("akka.actor.debug.autoreceive", false)
val DebugLifecycle = getBool("akka.actor.debug.lifecycle", false)
val FsmDebugEvent = getBool("akka.actor.debug.fsm", false)
val DebugMainBus = getBool("akka.actor.debug.eventStream", false)
val DebugEventStream = getBool("akka.actor.debug.event-stream", false)
val DispatcherThroughput = getInt("akka.actor.throughput", 5)
val DispatcherDefaultShutdown = getLong("akka.actor.dispatcher-shutdown-timeout").
@ -153,7 +153,7 @@ class ActorSystem(val name: String, val config: Configuration) extends ActorRefF
})
// this provides basic logging (to stdout) until .start() is called below
val eventStream = new EventStream(DebugMainBus)
val eventStream = new EventStream(DebugEventStream)
eventStream.startStdoutLogger(AkkaConfig)
val log = new BusLogging(eventStream, this)

View file

@ -118,7 +118,7 @@ abstract class FaultHandlingStrategy {
/**
* This method is called to act on the failure of a child: restart if the flag is true, stop otherwise.
*/
def processFailure(restart: Boolean, fail: Failed, stats: ChildRestartStats, children: Iterable[(ActorRef, ChildRestartStats)]): Unit
def processFailure(restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[(ActorRef, ChildRestartStats)]): Unit
def handleSupervisorFailing(supervisor: ActorRef, children: Iterable[ActorRef]): Unit = {
if (children.nonEmpty)
@ -133,13 +133,12 @@ abstract class FaultHandlingStrategy {
/**
* Returns whether it processed the failure or not
*/
def handleFailure(fail: Failed, stats: ChildRestartStats, children: Iterable[(ActorRef, ChildRestartStats)]): Boolean = {
val cause = fail.cause
def handleFailure(child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[(ActorRef, ChildRestartStats)]): Boolean = {
val action = if (decider.isDefinedAt(cause)) decider(cause) else Escalate
action match {
case Resume fail.actor.resume(); true
case Restart processFailure(true, fail, stats, children); true
case Stop processFailure(false, fail, stats, children); true
case Resume child.resume(); true
case Restart processFailure(true, child, cause, stats, children); true
case Stop processFailure(false, child, cause, stats, children); true
case Escalate false
}
}
@ -192,10 +191,10 @@ case class AllForOneStrategy(decider: FaultHandlingStrategy.Decider,
//TODO optimization to drop all children here already?
}
def processFailure(restart: Boolean, fail: Failed, stats: ChildRestartStats, children: Iterable[(ActorRef, ChildRestartStats)]): Unit = {
def processFailure(restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[(ActorRef, ChildRestartStats)]): Unit = {
if (children.nonEmpty) {
if (restart && children.forall(_._2.requestRestartPermission(retriesWindow)))
children.foreach(_._1.restart(fail.cause))
children.foreach(_._1.restart(cause))
else
children.foreach(_._1.stop())
}
@ -246,11 +245,11 @@ case class OneForOneStrategy(decider: FaultHandlingStrategy.Decider,
def handleChildTerminated(child: ActorRef, children: Iterable[ActorRef]): Unit = {}
def processFailure(restart: Boolean, fail: Failed, stats: ChildRestartStats, children: Iterable[(ActorRef, ChildRestartStats)]): Unit = {
def processFailure(restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[(ActorRef, ChildRestartStats)]): Unit = {
if (restart && stats.requestRestartPermission(retriesWindow))
fail.actor.restart(fail.cause)
child.restart(cause)
else
fail.actor.stop() //TODO optimization to drop child here already?
child.stop() //TODO optimization to drop child here already?
}
}

View file

@ -196,8 +196,8 @@ abstract class MessageDispatcher(val app: ActorSystem) extends Serializable {
protected[akka] def unregister(actor: ActorCell) {
_actors.decrementAndGet()
val mailBox = actor.mailbox
mailBox.becomeClosed()
actor.mailbox = deadLetterMailbox //FIXME getAndSet would be preferrable here
actor.mailbox = deadLetterMailbox
mailBox.becomeClosed() // FIXME reschedule in tell if possible race with cleanUp is detected in order to properly clean up
cleanUpMailboxFor(actor, mailBox)
}

View file

@ -173,7 +173,7 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag
var processedMessages = 0
val deadlineNs = if (dispatcher.isThroughputDeadlineTimeDefined) System.nanoTime + TimeUnit.MILLISECONDS.toNanos(dispatcher.throughputDeadlineTime) else 0
do {
if (debug) println(actor + " processing message " + nextMessage)
if (debug) println(actor.self + " processing message " + nextMessage)
actor invoke nextMessage
processAllSystemMessages() //After we're done, process all system messages
@ -197,7 +197,7 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag
var nextMessage = systemDrain()
try {
while (nextMessage ne null) {
if (debug) println(actor + " processing system message " + nextMessage)
if (debug) println(actor.self + " processing system message " + nextMessage + " with children " + actor.childrenRefs + "/" + actor.childrenStats)
actor systemInvoke nextMessage
nextMessage = nextMessage.next
// dont ever execute normal message when system message present!
@ -245,7 +245,7 @@ trait DefaultSystemMessageQueue { self: Mailbox ⇒
@tailrec
final def systemEnqueue(receiver: ActorRef, message: SystemMessage): Unit = {
assert(message.next eq null)
if (Mailbox.debug) println(actor + " having enqueued " + message)
if (Mailbox.debug) println(actor.self + " having enqueued " + message)
val head = systemQueueGet
/*
* this write is safely published by the compareAndSet contained within

View file

@ -124,7 +124,11 @@ trait LoggingBus extends ActorEventBus {
for {
logger loggers
if logger != StandardOutLogger
} logger.stop()
} {
// this is very necessary, else you get infinite loop with DeadLetter
unsubscribe(logger)
logger.stop()
}
publish(Info(this, "all default loggers stopped"))
}

View file

@ -208,7 +208,7 @@ class CallingThreadDispatcher(_app: ActorSystem, val name: String = "calling-thr
}
if (handle ne null) {
try {
if (Mailbox.debug) println(mbox.actor + " processing message " + handle)
if (Mailbox.debug) println(mbox.actor.self + " processing message " + handle)
mbox.actor.invoke(handle)
true
} catch {

View file

@ -449,7 +449,7 @@ class TestEventListener extends Logging.DefaultLogger {
case Mute(filters) filters foreach addFilter
case UnMute(filters) filters foreach removeFilter
case event: LogEvent if (!filter(event)) print(event)
case DeadLetter(msg: SystemMessage, null, rcp)
case DeadLetter(msg: SystemMessage, _, rcp)
val event = Warning(rcp, "received dead system message: " + msg)
if (!filter(event)) print(event)
case DeadLetter(msg, snd, rcp)

View file

@ -95,6 +95,8 @@ class TestKit(_app: ActorSystem) {
private val queue = new LinkedBlockingDeque[Message]()
private[akka] var lastMessage: Message = NullMessage
def lastSender = lastMessage.sender
/**
* ActorRef of the test actor. Access is provided to enable e.g.
* registration as message target.

View file

@ -55,7 +55,8 @@ class AkkaSpecSpec extends WordSpec with MustMatchers {
"terminate all actors" in {
import ActorSystem.defaultConfig
val app = ActorSystem("test", defaultConfig ++ Configuration(
"akka.actor.debug.lifecycle" -> true, "akka.loglevel" -> "DEBUG"))
"akka.actor.debug.lifecycle" -> true, "akka.actor.debug.event-stream" -> true,
"akka.loglevel" -> "DEBUG", "akka.stdout-loglevel" -> "DEBUG"))
val spec = new AkkaSpec(app) {
val ref = Seq(testActor, app.actorOf(Props.empty, "name"))
}

View file

@ -140,9 +140,11 @@ akka {
}
debug {
receive = off # enable function of Actor.loggable(), which is to log any received message at DEBUG level
autoreceive = off # enable DEBUG logging of all AutoReceiveMessages (Kill, PoisonPill and the like)
lifecycle = off # enable DEBUG logging of actor lifecycle changes
receive = off # enable function of Actor.loggable(), which is to log any received message at DEBUG level
autoreceive = off # enable DEBUG logging of all AutoReceiveMessages (Kill, PoisonPill and the like)
lifecycle = off # enable DEBUG logging of actor lifecycle changes
fsm = off # enable DEBUG logging of all LoggingFSMs for events, transitions and timers
event-stream = off # enable DEBUG logging of subscription changes on the eventStream
}
mailbox {