add test to ActorModelSpec

dispatcherShouldHandleQueueingFromMultipleThreads tests for possible race
conditions in prohibiting multiple threads running the same actor concurrently
This commit is contained in:
Roland Kuhn 2011-03-06 21:53:09 +01:00
parent 50b2c14235
commit 2deb47f8fa
3 changed files with 25 additions and 3 deletions

View file

@ -26,6 +26,7 @@ object ActorModelSpec {
case class Meet(acknowledge: CountDownLatch, waitFor: CountDownLatch) extends ActorModelMessage
case class CountDownNStop(latch: CountDownLatch) extends ActorModelMessage
case class Wait(time: Long) extends ActorModelMessage
case class WaitAck(time: Long, latch: CountDownLatch) extends ActorModelMessage
case object Restart extends ActorModelMessage
val Ping = "Ping"
@ -52,6 +53,7 @@ object ActorModelSpec {
case Await(latch) => ack; latch.await(); busy.switchOff()
case Meet(sign, wait) => ack; sign.countDown(); wait.await(); busy.switchOff()
case Wait(time) => ack; Thread.sleep(time); busy.switchOff()
case WaitAck(time, l) => ack; Thread.sleep(time); l.countDown; busy.switchOff()
case Reply(msg) => ack; self.reply(msg); busy.switchOff()
case Reply_?(msg) => ack; self.reply_?(msg); busy.switchOff()
case Forward(to,msg) => ack; to.forward(msg); busy.switchOff()
@ -235,6 +237,26 @@ abstract class ActorModelSpec extends JUnitSuite {
assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 3, msgsProcessed = 3)
}
@Test def dispatcherShouldHandleQueueingFromMultipleThreads {
implicit val dispatcher = newInterceptedDispatcher
val a = newTestActor
val counter = new CountDownLatch(200)
a.start
def start = spawn { for (i <- 1 to 20) { a ! WaitAck(1, counter) } }
for (i <- 1 to 10) { start }
assertCountDown(counter, 3000, "Should process 200 messages")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 200, msgsProcessed = 200)
a.stop
}
def spawn(f : => Unit) = {
val thread = new Thread { override def run { f } }
thread.start
thread
}
@Test def dispatcherShouldProcessMessagesInParallel: Unit = {
implicit val dispatcher = newInterceptedDispatcher
val a, b = newTestActor.start
@ -296,7 +318,7 @@ abstract class ActorModelSpec extends JUnitSuite {
}
for(run <- 1 to 3) {
flood(10000)
await(dispatcher.stops.get == run)(withinMs = 10000)
await(dispatcher.stops.get == run)(withinMs = 11000)
assertDispatcher(dispatcher)(starts = run, stops = run)
}
}

View file

@ -101,7 +101,7 @@ class CallingThreadDispatcher(val warnings: Boolean = true) extends MessageDispa
private[akka] override def shutdown {}
private[akka] override def timeoutMs = 0L
private[akka] override def timeoutMs = 100L
override def suspend(actor: ActorRef) {
getMailbox(actor).suspended.switchOn

View file

@ -2,7 +2,7 @@ package akka.testkit
import akka.actor.dispatch.ActorModelSpec
class CallingThreadDispatcherTest extends ActorModelSpec {
class CallingThreadDispatcherModelSpec extends ActorModelSpec {
import ActorModelSpec._
def newInterceptedDispatcher = new CallingThreadDispatcher with MessageDispatcherInterceptor
override def dispatcherShouldProcessMessagesInParallel {}