diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala index ece9ec3d61..c4622b081f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala @@ -17,11 +17,11 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende } "notify with one Terminated message when an Actor is stopped" in { - val terminal = actorOf(Props(context ⇒ { case _ ⇒ context.self.stop() })) + val terminal = actorOf(Props(context ⇒ { case _ ⇒ })) testActor startsMonitoring terminal - terminal ! "anything" + terminal ! PoisonPill expectTerminationOf(terminal) @@ -30,13 +30,13 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende "notify with all monitors with one Terminated message when an Actor is stopped" in { val monitor1, monitor2 = actorOf(Props(context ⇒ { case t: Terminated ⇒ testActor ! t })) - val terminal = actorOf(Props(context ⇒ { case _ ⇒ context.self.stop() })) + val terminal = actorOf(Props(context ⇒ { case _ ⇒ })) monitor1 startsMonitoring terminal monitor2 startsMonitoring terminal testActor startsMonitoring terminal - terminal ! "anything" + terminal ! PoisonPill expectTerminationOf(terminal) expectTerminationOf(terminal) @@ -48,8 +48,11 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende } "notify with _current_ monitors with one Terminated message when an Actor is stopped" in { - val monitor1, monitor2 = actorOf(Props(context ⇒ { case t: Terminated ⇒ testActor ! t })) - val terminal = actorOf(Props(context ⇒ { case _ ⇒ context.self.stop() })) + val monitor1, monitor2 = actorOf(Props(context ⇒ { + case t: Terminated ⇒ testActor ! t + case "ping" ⇒ context.channel ! "pong" + })) + val terminal = actorOf(Props(context ⇒ { case _ ⇒ })) monitor1 startsMonitoring terminal monitor2 startsMonitoring terminal @@ -57,12 +60,15 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende monitor2 stopsMonitoring terminal - terminal ! "anything" + monitor2 ! "ping" + + expectMsg("pong") //Needs to be here since startsMonitoring and stopsMonitoring are asynchronous + + terminal ! PoisonPill expectTerminationOf(terminal) expectTerminationOf(terminal) - terminal.stop() monitor1.stop() monitor2.stop() } @@ -80,8 +86,8 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende terminal ! Kill expectTerminationOf(terminal) + terminal.isShutdown must be === true - terminal.stop() supervisor.stop() } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index a9085d2a81..d4aa3e90e6 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -392,31 +392,11 @@ abstract class ActorModelSpec extends AkkaSpec { } catch { case e ⇒ System.err.println("Error: " + e.getMessage + " missing count downs == " + cachedMessage.latch.getCount() + " out of " + num) - //app.eventHandler.error(new Exception with NoStackTrace, null, cachedMessage.latch.getCount()) } } for (run ← 1 to 3) { flood(40000) - try { - assertDispatcher(dispatcher)(starts = run, stops = run) - } catch { - case e ⇒ - - // FIXME: registry has been removed - // app.registry.local.foreach { - // case actor: LocalActorRef ⇒ - // val cell = actor.underlying - // val mbox = cell.mailbox - // System.err.println("Left in the registry: " + actor.address + " => " + cell + " => " + mbox.hasMessages + " " + mbox.hasSystemMessages + " " + mbox.numberOfMessages + " " + mbox.isScheduled) - // var message = mbox.dequeue() - // while (message ne null) { - // System.err.println("Lingering message for " + cell + " " + message) - // message = mbox.dequeue() - // } - // } - - throw e - } + assertDispatcher(dispatcher)(starts = run, stops = run) } } diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index f38e3a657e..93e7c99e7b 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -90,6 +90,8 @@ class BalancingDispatcher( while (messages ne null) { deadLetterMailbox.systemEnqueue(messages) //Send to dead letter queue messages = messages.next + if (messages eq null) //Make sure that any system messages received after the current drain are also sent to the dead letter mbox + messages = mailBox.systemDrain() } } } diff --git a/akka-actor/src/main/scala/akka/event/EventBus.scala b/akka-actor/src/main/scala/akka/event/EventBus.scala index 33319fbb13..9dd76f5344 100644 --- a/akka-actor/src/main/scala/akka/event/EventBus.scala +++ b/akka-actor/src/main/scala/akka/event/EventBus.scala @@ -182,10 +182,7 @@ trait ActorClassification { self: ActorEventBus with ActorClassifier ⇒ if (monitored.isShutdown) false else { if (mappings.putIfAbsent(monitored, Vector(monitor)) ne null) associate(monitored, monitor) - else { - if (monitored.isShutdown) !dissociate(monitored, monitor) - else true - } + else if (monitored.isShutdown) !dissociate(monitored, monitor) else true } case raw: Vector[_] ⇒ val v = raw.asInstanceOf[Vector[ActorRef]] @@ -194,10 +191,7 @@ trait ActorClassification { self: ActorEventBus with ActorClassifier ⇒ else { val added = v :+ monitor if (!mappings.replace(monitored, v, added)) associate(monitored, monitor) - else { - if (monitored.isShutdown) !dissociate(monitored, monitor) - else true - } + else if (monitored.isShutdown) !dissociate(monitored, monitor) else true } } } @@ -241,13 +235,11 @@ trait ActorClassification { self: ActorEventBus with ActorClassifier ⇒ case raw: Vector[_] ⇒ val v = raw.asInstanceOf[Vector[ActorRef]] val removed = v.filterNot(monitor ==) - if (removed eq v) false + if (removed eq raw) false else if (removed.isEmpty) { - if (!mappings.remove(monitored, v)) dissociate(monitored, monitor) - else true + if (!mappings.remove(monitored, v)) dissociate(monitored, monitor) else true } else { - if (!mappings.replace(monitored, v, removed)) dissociate(monitored, monitor) - else true + if (!mappings.replace(monitored, v, removed)) dissociate(monitored, monitor) else true } } } @@ -263,10 +255,8 @@ trait ActorClassification { self: ActorEventBus with ActorClassifier ⇒ protected def mapSize: Int def publish(event: Event): Unit = mappings.get(classify(event)) match { - case null ⇒ - case raw: Vector[_] ⇒ - val v = raw.asInstanceOf[Vector[ActorRef]] - v foreach { _ ! event } + case null ⇒ + case raw: Vector[_] ⇒ raw.asInstanceOf[Vector[ActorRef]] foreach { _ ! event } } def subscribe(subscriber: Subscriber, to: Classifier): Boolean = associate(to, subscriber)