From 02a5cd081c630c37af805b2ba7e4f63685ec43bb Mon Sep 17 00:00:00 2001 From: Roland Date: Sat, 12 Nov 2011 22:37:12 +0100 Subject: [PATCH] remove ActorRef from Failed/ChildTerminated and make some warnings nicer - the same information is transmitted as sender, hence enabling ChildTerminated to become a singleton - make lastSender accessible in TestKit (needed now for DeathWatchSpec) - fix nasty infinite loop when logging at the wrong moment during shutdown --- .../scala/akka/actor/DeathWatchSpec.scala | 12 +++++------ .../src/main/scala/akka/actor/Actor.scala | 9 ++++---- .../src/main/scala/akka/actor/ActorCell.scala | 21 ++++++++++--------- .../scala/akka/actor/ActorRefProvider.scala | 6 +++--- .../main/scala/akka/actor/ActorSystem.scala | 4 ++-- .../main/scala/akka/actor/FaultHandling.scala | 21 +++++++++---------- .../akka/dispatch/AbstractDispatcher.scala | 4 ++-- .../main/scala/akka/dispatch/Mailbox.scala | 6 +++--- .../src/main/scala/akka/event/Logging.scala | 6 +++++- .../testkit/CallingThreadDispatcher.scala | 2 +- .../akka/testkit/TestEventListener.scala | 2 +- .../src/main/scala/akka/testkit/TestKit.scala | 2 ++ .../test/scala/akka/testkit/AkkaSpec.scala | 3 ++- config/akka-reference.conf | 8 ++++--- 14 files changed, 57 insertions(+), 49 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala index 9e407c97bf..860554a008 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala @@ -107,9 +107,9 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende case class FF(fail: Failed) val supervisor = actorOf(Props[Supervisor] .withFaultHandler(new OneForOneStrategy(FaultHandlingStrategy.makeDecider(List(classOf[Exception])), Some(0)) { - override def handleFailure(fail: Failed, stats: ChildRestartStats, children: Iterable[(ActorRef, ChildRestartStats)]) = { - testActor ! FF(fail) - super.handleFailure(fail, stats, children) + override def handleFailure(child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[(ActorRef, ChildRestartStats)]) = { + testActor.tell(FF(Failed(cause)), child) + super.handleFailure(child, cause, stats, children) } })) @@ -119,9 +119,9 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende failed ! Kill val result = receiveWhile(3 seconds, messages = 3) { - case FF(Failed(`failed`, _: ActorKilledException)) ⇒ 1 - case FF(Failed(`brother`, DeathPactException(`failed`))) ⇒ 2 - case Terminated(`brother`) ⇒ 3 + case FF(Failed(_: ActorKilledException)) if lastSender eq failed ⇒ 1 + case FF(Failed(DeathPactException(`failed`))) if lastSender eq brother ⇒ 2 + case Terminated(`brother`) ⇒ 3 } testActor must not be 'shutdown result must be(Seq(1, 2, 3)) diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 88dfcbfec6..6c50a68b70 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -52,10 +52,9 @@ case class HotSwap(code: ActorRef ⇒ Actor.Receive, discardOld: Boolean = true) def this(code: akka.japi.Function[ActorRef, Procedure[Any]]) = this(code, true) } -case class Failed(@BeanProperty actor: ActorRef, - @BeanProperty cause: Throwable) extends AutoReceivedMessage with PossiblyHarmful +case class Failed(cause: Throwable) extends AutoReceivedMessage with PossiblyHarmful -case class ChildTerminated(@BeanProperty child: ActorRef) extends AutoReceivedMessage with PossiblyHarmful +case object ChildTerminated extends AutoReceivedMessage with PossiblyHarmful case object RevertHotSwap extends AutoReceivedMessage with PossiblyHarmful @@ -419,8 +418,8 @@ trait Actor { msg match { case HotSwap(code, discardOld) ⇒ become(code(self), discardOld) case RevertHotSwap ⇒ unbecome() - case f: Failed ⇒ context.handleFailure(f) - case ct: ChildTerminated ⇒ context.handleChildTerminated(ct.child) + case Failed(cause) ⇒ context.handleFailure(sender, cause) + case ChildTerminated ⇒ context.handleChildTerminated(sender) case Kill ⇒ throw new ActorKilledException("Kill") case PoisonPill ⇒ self.stop() } diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 2aa059bca5..cdf235572e 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -39,7 +39,7 @@ trait ActorContext extends ActorRefFactory with TypedActorFactory { def dispatcher: MessageDispatcher - def handleFailure(fail: Failed): Unit + def handleFailure(child: ActorRef, cause: Throwable): Unit def handleChildTerminated(child: ActorRef): Unit @@ -134,7 +134,8 @@ private[akka] class ActorCell( else childrenRefs.get(name) } - final def tell(message: Any, sender: ActorRef): Unit = dispatcher.dispatch(this, Envelope(message, sender)) + final def tell(message: Any, sender: ActorRef): Unit = + dispatcher.dispatch(this, Envelope(message, if (sender eq null) app.deadLetters else sender)) final def sender: ActorRef = currentMessage match { case null ⇒ app.deadLetters @@ -175,7 +176,7 @@ private[akka] class ActorCell( // prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) } finally { - parent ! Failed(self, ActorInitializationException(self, "exception during creation", e)) + parent.tell(Failed(ActorInitializationException(self, "exception during creation", e)), self) } } @@ -206,7 +207,7 @@ private[akka] class ActorCell( // prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) } finally { - parent ! Failed(self, ActorInitializationException(self, "exception during re-creation", e)) + parent.tell(Failed(ActorInitializationException(self, "exception during re-creation", e)), self) } } @@ -236,7 +237,7 @@ private[akka] class ActorCell( } } finally { try { - parent ! ChildTerminated(self) + parent.tell(ChildTerminated, self) app.deathWatch.publish(Terminated(self)) } finally { currentMessage = null @@ -302,11 +303,11 @@ private[akka] class ActorCell( if (e.isInstanceOf[InterruptedException]) { val ex = ActorInterruptedException(e) props.faultHandler.handleSupervisorFailing(self, children) - parent ! Failed(self, ex) + parent.tell(Failed(ex), self) throw e //Re-throw InterruptedExceptions as expected } else { props.faultHandler.handleSupervisorFailing(self, children) - parent ! Failed(self, e) + parent.tell(Failed(e), self) } } finally { checkReceiveTimeout // Reschedule receive timeout @@ -320,9 +321,9 @@ private[akka] class ActorCell( } } - final def handleFailure(fail: Failed): Unit = childrenStats.get(fail.actor) match { - case Some(stats) ⇒ if (!props.faultHandler.handleFailure(fail, stats, childrenStats)) throw fail.cause - case None ⇒ app.eventStream.publish(Warning(self, "dropping " + fail + " from unknown child")) + final def handleFailure(child: ActorRef, cause: Throwable): Unit = childrenStats.get(child) match { + case Some(stats) ⇒ if (!props.faultHandler.handleFailure(child, cause, stats, childrenStats)) throw cause + case None ⇒ app.eventStream.publish(Warning(self, "dropping Failed(" + cause + ") from unknown child")) } final def handleChildTerminated(child: ActorRef): Unit = { diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 7a9fbf0a38..44672463be 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -132,9 +132,9 @@ class LocalActorRefProvider(val app: ActorSystem) extends ActorRefProvider { def isShutdown = stopped override def tell(msg: Any, sender: ActorRef): Unit = msg match { - case Failed(child, ex) ⇒ child.stop() - case ChildTerminated(child) ⇒ terminationFuture.completeWithResult(ActorSystem.Stopped) - case _ ⇒ log.error(this + " received unexpected message " + msg) + case Failed(ex) ⇒ sender.stop() + case ChildTerminated ⇒ terminationFuture.completeWithResult(ActorSystem.Stopped) + case _ ⇒ log.error(this + " received unexpected message " + msg) } protected[akka] override def sendSystemMessage(message: SystemMessage) { diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 4e18d07c38..1ceb357c76 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -105,7 +105,7 @@ class ActorSystem(val name: String, val config: Configuration) extends ActorRefF val DebugAutoReceive = getBool("akka.actor.debug.autoreceive", false) val DebugLifecycle = getBool("akka.actor.debug.lifecycle", false) val FsmDebugEvent = getBool("akka.actor.debug.fsm", false) - val DebugMainBus = getBool("akka.actor.debug.eventStream", false) + val DebugEventStream = getBool("akka.actor.debug.event-stream", false) val DispatcherThroughput = getInt("akka.actor.throughput", 5) val DispatcherDefaultShutdown = getLong("akka.actor.dispatcher-shutdown-timeout"). @@ -153,7 +153,7 @@ class ActorSystem(val name: String, val config: Configuration) extends ActorRefF }) // this provides basic logging (to stdout) until .start() is called below - val eventStream = new EventStream(DebugMainBus) + val eventStream = new EventStream(DebugEventStream) eventStream.startStdoutLogger(AkkaConfig) val log = new BusLogging(eventStream, this) diff --git a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala index e112525af3..90da6dd10a 100644 --- a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala @@ -118,7 +118,7 @@ abstract class FaultHandlingStrategy { /** * This method is called to act on the failure of a child: restart if the flag is true, stop otherwise. */ - def processFailure(restart: Boolean, fail: Failed, stats: ChildRestartStats, children: Iterable[(ActorRef, ChildRestartStats)]): Unit + def processFailure(restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[(ActorRef, ChildRestartStats)]): Unit def handleSupervisorFailing(supervisor: ActorRef, children: Iterable[ActorRef]): Unit = { if (children.nonEmpty) @@ -133,13 +133,12 @@ abstract class FaultHandlingStrategy { /** * Returns whether it processed the failure or not */ - def handleFailure(fail: Failed, stats: ChildRestartStats, children: Iterable[(ActorRef, ChildRestartStats)]): Boolean = { - val cause = fail.cause + def handleFailure(child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[(ActorRef, ChildRestartStats)]): Boolean = { val action = if (decider.isDefinedAt(cause)) decider(cause) else Escalate action match { - case Resume ⇒ fail.actor.resume(); true - case Restart ⇒ processFailure(true, fail, stats, children); true - case Stop ⇒ processFailure(false, fail, stats, children); true + case Resume ⇒ child.resume(); true + case Restart ⇒ processFailure(true, child, cause, stats, children); true + case Stop ⇒ processFailure(false, child, cause, stats, children); true case Escalate ⇒ false } } @@ -192,10 +191,10 @@ case class AllForOneStrategy(decider: FaultHandlingStrategy.Decider, //TODO optimization to drop all children here already? } - def processFailure(restart: Boolean, fail: Failed, stats: ChildRestartStats, children: Iterable[(ActorRef, ChildRestartStats)]): Unit = { + def processFailure(restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[(ActorRef, ChildRestartStats)]): Unit = { if (children.nonEmpty) { if (restart && children.forall(_._2.requestRestartPermission(retriesWindow))) - children.foreach(_._1.restart(fail.cause)) + children.foreach(_._1.restart(cause)) else children.foreach(_._1.stop()) } @@ -246,11 +245,11 @@ case class OneForOneStrategy(decider: FaultHandlingStrategy.Decider, def handleChildTerminated(child: ActorRef, children: Iterable[ActorRef]): Unit = {} - def processFailure(restart: Boolean, fail: Failed, stats: ChildRestartStats, children: Iterable[(ActorRef, ChildRestartStats)]): Unit = { + def processFailure(restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[(ActorRef, ChildRestartStats)]): Unit = { if (restart && stats.requestRestartPermission(retriesWindow)) - fail.actor.restart(fail.cause) + child.restart(cause) else - fail.actor.stop() //TODO optimization to drop child here already? + child.stop() //TODO optimization to drop child here already? } } diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 3359251658..d7945f3409 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -196,8 +196,8 @@ abstract class MessageDispatcher(val app: ActorSystem) extends Serializable { protected[akka] def unregister(actor: ActorCell) { _actors.decrementAndGet() val mailBox = actor.mailbox - mailBox.becomeClosed() - actor.mailbox = deadLetterMailbox //FIXME getAndSet would be preferrable here + actor.mailbox = deadLetterMailbox + mailBox.becomeClosed() // FIXME reschedule in tell if possible race with cleanUp is detected in order to properly clean up cleanUpMailboxFor(actor, mailBox) } diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 741e6546e1..007f117bc3 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -173,7 +173,7 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag var processedMessages = 0 val deadlineNs = if (dispatcher.isThroughputDeadlineTimeDefined) System.nanoTime + TimeUnit.MILLISECONDS.toNanos(dispatcher.throughputDeadlineTime) else 0 do { - if (debug) println(actor + " processing message " + nextMessage) + if (debug) println(actor.self + " processing message " + nextMessage) actor invoke nextMessage processAllSystemMessages() //After we're done, process all system messages @@ -197,7 +197,7 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag var nextMessage = systemDrain() try { while (nextMessage ne null) { - if (debug) println(actor + " processing system message " + nextMessage) + if (debug) println(actor.self + " processing system message " + nextMessage + " with children " + actor.childrenRefs + "/" + actor.childrenStats) actor systemInvoke nextMessage nextMessage = nextMessage.next // don’t ever execute normal message when system message present! @@ -245,7 +245,7 @@ trait DefaultSystemMessageQueue { self: Mailbox ⇒ @tailrec final def systemEnqueue(receiver: ActorRef, message: SystemMessage): Unit = { assert(message.next eq null) - if (Mailbox.debug) println(actor + " having enqueued " + message) + if (Mailbox.debug) println(actor.self + " having enqueued " + message) val head = systemQueueGet /* * this write is safely published by the compareAndSet contained within diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index 4bc22d62c5..e1c8412df4 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -124,7 +124,11 @@ trait LoggingBus extends ActorEventBus { for { logger ← loggers if logger != StandardOutLogger - } logger.stop() + } { + // this is very necessary, else you get infinite loop with DeadLetter + unsubscribe(logger) + logger.stop() + } publish(Info(this, "all default loggers stopped")) } diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 781289edea..58ad446728 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -208,7 +208,7 @@ class CallingThreadDispatcher(_app: ActorSystem, val name: String = "calling-thr } if (handle ne null) { try { - if (Mailbox.debug) println(mbox.actor + " processing message " + handle) + if (Mailbox.debug) println(mbox.actor.self + " processing message " + handle) mbox.actor.invoke(handle) true } catch { diff --git a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala index 2e6c58ce81..0e6ea66de5 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala @@ -449,7 +449,7 @@ class TestEventListener extends Logging.DefaultLogger { case Mute(filters) ⇒ filters foreach addFilter case UnMute(filters) ⇒ filters foreach removeFilter case event: LogEvent ⇒ if (!filter(event)) print(event) - case DeadLetter(msg: SystemMessage, null, rcp) ⇒ + case DeadLetter(msg: SystemMessage, _, rcp) ⇒ val event = Warning(rcp, "received dead system message: " + msg) if (!filter(event)) print(event) case DeadLetter(msg, snd, rcp) ⇒ diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index f82dab3183..1a124beb2c 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -95,6 +95,8 @@ class TestKit(_app: ActorSystem) { private val queue = new LinkedBlockingDeque[Message]() private[akka] var lastMessage: Message = NullMessage + def lastSender = lastMessage.sender + /** * ActorRef of the test actor. Access is provided to enable e.g. * registration as message target. diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index 86faf1df28..74a9e57d3b 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -55,7 +55,8 @@ class AkkaSpecSpec extends WordSpec with MustMatchers { "terminate all actors" in { import ActorSystem.defaultConfig val app = ActorSystem("test", defaultConfig ++ Configuration( - "akka.actor.debug.lifecycle" -> true, "akka.loglevel" -> "DEBUG")) + "akka.actor.debug.lifecycle" -> true, "akka.actor.debug.event-stream" -> true, + "akka.loglevel" -> "DEBUG", "akka.stdout-loglevel" -> "DEBUG")) val spec = new AkkaSpec(app) { val ref = Seq(testActor, app.actorOf(Props.empty, "name")) } diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 7059e588a6..0aa5a8d8b3 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -140,9 +140,11 @@ akka { } debug { - receive = off # enable function of Actor.loggable(), which is to log any received message at DEBUG level - autoreceive = off # enable DEBUG logging of all AutoReceiveMessages (Kill, PoisonPill and the like) - lifecycle = off # enable DEBUG logging of actor lifecycle changes + receive = off # enable function of Actor.loggable(), which is to log any received message at DEBUG level + autoreceive = off # enable DEBUG logging of all AutoReceiveMessages (Kill, PoisonPill and the like) + lifecycle = off # enable DEBUG logging of actor lifecycle changes + fsm = off # enable DEBUG logging of all LoggingFSMs for events, transitions and timers + event-stream = off # enable DEBUG logging of subscription changes on the eventStream } mailbox {