Fixing a race in DeathWatchSpec

This commit is contained in:
Viktor Klang 2011-10-19 14:26:53 +02:00
parent 0dc3c5ad3d
commit 2d4251fcee
4 changed files with 25 additions and 47 deletions

View file

@ -17,11 +17,11 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
} }
"notify with one Terminated message when an Actor is stopped" in { "notify with one Terminated message when an Actor is stopped" in {
val terminal = actorOf(Props(context { case _ context.self.stop() })) val terminal = actorOf(Props(context { case _ }))
testActor startsMonitoring terminal testActor startsMonitoring terminal
terminal ! "anything" terminal ! PoisonPill
expectTerminationOf(terminal) expectTerminationOf(terminal)
@ -30,13 +30,13 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
"notify with all monitors with one Terminated message when an Actor is stopped" in { "notify with all monitors with one Terminated message when an Actor is stopped" in {
val monitor1, monitor2 = actorOf(Props(context { case t: Terminated testActor ! t })) val monitor1, monitor2 = actorOf(Props(context { case t: Terminated testActor ! t }))
val terminal = actorOf(Props(context { case _ context.self.stop() })) val terminal = actorOf(Props(context { case _ }))
monitor1 startsMonitoring terminal monitor1 startsMonitoring terminal
monitor2 startsMonitoring terminal monitor2 startsMonitoring terminal
testActor startsMonitoring terminal testActor startsMonitoring terminal
terminal ! "anything" terminal ! PoisonPill
expectTerminationOf(terminal) expectTerminationOf(terminal)
expectTerminationOf(terminal) expectTerminationOf(terminal)
@ -48,8 +48,11 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
} }
"notify with _current_ monitors with one Terminated message when an Actor is stopped" in { "notify with _current_ monitors with one Terminated message when an Actor is stopped" in {
val monitor1, monitor2 = actorOf(Props(context { case t: Terminated testActor ! t })) val monitor1, monitor2 = actorOf(Props(context {
val terminal = actorOf(Props(context { case _ context.self.stop() })) case t: Terminated testActor ! t
case "ping" context.channel ! "pong"
}))
val terminal = actorOf(Props(context { case _ }))
monitor1 startsMonitoring terminal monitor1 startsMonitoring terminal
monitor2 startsMonitoring terminal monitor2 startsMonitoring terminal
@ -57,12 +60,15 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
monitor2 stopsMonitoring terminal monitor2 stopsMonitoring terminal
terminal ! "anything" monitor2 ! "ping"
expectMsg("pong") //Needs to be here since startsMonitoring and stopsMonitoring are asynchronous
terminal ! PoisonPill
expectTerminationOf(terminal) expectTerminationOf(terminal)
expectTerminationOf(terminal) expectTerminationOf(terminal)
terminal.stop()
monitor1.stop() monitor1.stop()
monitor2.stop() monitor2.stop()
} }
@ -80,8 +86,8 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
terminal ! Kill terminal ! Kill
expectTerminationOf(terminal) expectTerminationOf(terminal)
terminal.isShutdown must be === true
terminal.stop()
supervisor.stop() supervisor.stop()
} }
} }

View file

@ -392,31 +392,11 @@ abstract class ActorModelSpec extends AkkaSpec {
} catch { } catch {
case e case e
System.err.println("Error: " + e.getMessage + " missing count downs == " + cachedMessage.latch.getCount() + " out of " + num) System.err.println("Error: " + e.getMessage + " missing count downs == " + cachedMessage.latch.getCount() + " out of " + num)
//app.eventHandler.error(new Exception with NoStackTrace, null, cachedMessage.latch.getCount())
} }
} }
for (run 1 to 3) { for (run 1 to 3) {
flood(40000) flood(40000)
try { assertDispatcher(dispatcher)(starts = run, stops = run)
assertDispatcher(dispatcher)(starts = run, stops = run)
} catch {
case e
// FIXME: registry has been removed
// app.registry.local.foreach {
// case actor: LocalActorRef
// val cell = actor.underlying
// val mbox = cell.mailbox
// System.err.println("Left in the registry: " + actor.address + " => " + cell + " => " + mbox.hasMessages + " " + mbox.hasSystemMessages + " " + mbox.numberOfMessages + " " + mbox.isScheduled)
// var message = mbox.dequeue()
// while (message ne null) {
// System.err.println("Lingering message for " + cell + " " + message)
// message = mbox.dequeue()
// }
// }
throw e
}
} }
} }

View file

@ -90,6 +90,8 @@ class BalancingDispatcher(
while (messages ne null) { while (messages ne null) {
deadLetterMailbox.systemEnqueue(messages) //Send to dead letter queue deadLetterMailbox.systemEnqueue(messages) //Send to dead letter queue
messages = messages.next messages = messages.next
if (messages eq null) //Make sure that any system messages received after the current drain are also sent to the dead letter mbox
messages = mailBox.systemDrain()
} }
} }
} }

View file

@ -182,10 +182,7 @@ trait ActorClassification { self: ActorEventBus with ActorClassifier ⇒
if (monitored.isShutdown) false if (monitored.isShutdown) false
else { else {
if (mappings.putIfAbsent(monitored, Vector(monitor)) ne null) associate(monitored, monitor) if (mappings.putIfAbsent(monitored, Vector(monitor)) ne null) associate(monitored, monitor)
else { else if (monitored.isShutdown) !dissociate(monitored, monitor) else true
if (monitored.isShutdown) !dissociate(monitored, monitor)
else true
}
} }
case raw: Vector[_] case raw: Vector[_]
val v = raw.asInstanceOf[Vector[ActorRef]] val v = raw.asInstanceOf[Vector[ActorRef]]
@ -194,10 +191,7 @@ trait ActorClassification { self: ActorEventBus with ActorClassifier ⇒
else { else {
val added = v :+ monitor val added = v :+ monitor
if (!mappings.replace(monitored, v, added)) associate(monitored, monitor) if (!mappings.replace(monitored, v, added)) associate(monitored, monitor)
else { else if (monitored.isShutdown) !dissociate(monitored, monitor) else true
if (monitored.isShutdown) !dissociate(monitored, monitor)
else true
}
} }
} }
} }
@ -241,13 +235,11 @@ trait ActorClassification { self: ActorEventBus with ActorClassifier ⇒
case raw: Vector[_] case raw: Vector[_]
val v = raw.asInstanceOf[Vector[ActorRef]] val v = raw.asInstanceOf[Vector[ActorRef]]
val removed = v.filterNot(monitor ==) val removed = v.filterNot(monitor ==)
if (removed eq v) false if (removed eq raw) false
else if (removed.isEmpty) { else if (removed.isEmpty) {
if (!mappings.remove(monitored, v)) dissociate(monitored, monitor) if (!mappings.remove(monitored, v)) dissociate(monitored, monitor) else true
else true
} else { } else {
if (!mappings.replace(monitored, v, removed)) dissociate(monitored, monitor) if (!mappings.replace(monitored, v, removed)) dissociate(monitored, monitor) else true
else true
} }
} }
} }
@ -263,10 +255,8 @@ trait ActorClassification { self: ActorEventBus with ActorClassifier ⇒
protected def mapSize: Int protected def mapSize: Int
def publish(event: Event): Unit = mappings.get(classify(event)) match { def publish(event: Event): Unit = mappings.get(classify(event)) match {
case null case null
case raw: Vector[_] case raw: Vector[_] raw.asInstanceOf[Vector[ActorRef]] foreach { _ ! event }
val v = raw.asInstanceOf[Vector[ActorRef]]
v foreach { _ ! event }
} }
def subscribe(subscriber: Subscriber, to: Classifier): Boolean = associate(to, subscriber) def subscribe(subscriber: Subscriber, to: Classifier): Boolean = associate(to, subscriber)