From 2deb47f8fa1816e5c8a24b26702b998bbb173f5e Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Sun, 6 Mar 2011 21:53:09 +0100 Subject: [PATCH] add test to ActorModelSpec dispatcherShouldHandleQueueingFromMultipleThreads tests for possible race conditions in prohibiting multiple threads running the same actor concurrently --- .../scala/akka/dispatch/ActorModelSpec.scala | 24 ++++++++++++++++++- .../testkit/CallingThreadDispatcher.scala | 2 +- ...=> CallingThreadDispatcherModelSpec.scala} | 2 +- 3 files changed, 25 insertions(+), 3 deletions(-) rename akka-testkit/src/test/scala/akka/testkit/{CallingThreadDispatcherSpec.scala => CallingThreadDispatcherModelSpec.scala} (81%) diff --git a/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala b/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala index e006ba4198..86820a51cb 100644 --- a/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala +++ b/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala @@ -26,6 +26,7 @@ object ActorModelSpec { case class Meet(acknowledge: CountDownLatch, waitFor: CountDownLatch) extends ActorModelMessage case class CountDownNStop(latch: CountDownLatch) extends ActorModelMessage case class Wait(time: Long) extends ActorModelMessage + case class WaitAck(time: Long, latch: CountDownLatch) extends ActorModelMessage case object Restart extends ActorModelMessage val Ping = "Ping" @@ -52,6 +53,7 @@ object ActorModelSpec { case Await(latch) => ack; latch.await(); busy.switchOff() case Meet(sign, wait) => ack; sign.countDown(); wait.await(); busy.switchOff() case Wait(time) => ack; Thread.sleep(time); busy.switchOff() + case WaitAck(time, l) => ack; Thread.sleep(time); l.countDown; busy.switchOff() case Reply(msg) => ack; self.reply(msg); busy.switchOff() case Reply_?(msg) => ack; self.reply_?(msg); busy.switchOff() case Forward(to,msg) => ack; to.forward(msg); busy.switchOff() @@ -235,6 +237,26 @@ abstract class ActorModelSpec extends JUnitSuite { assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 3, msgsProcessed = 3) } + @Test def dispatcherShouldHandleQueueingFromMultipleThreads { + implicit val dispatcher = newInterceptedDispatcher + val a = newTestActor + val counter = new CountDownLatch(200) + a.start + + def start = spawn { for (i <- 1 to 20) { a ! WaitAck(1, counter) } } + for (i <- 1 to 10) { start } + assertCountDown(counter, 3000, "Should process 200 messages") + assertRefDefaultZero(a)(registers = 1, msgsReceived = 200, msgsProcessed = 200) + + a.stop + } + + def spawn(f : => Unit) = { + val thread = new Thread { override def run { f } } + thread.start + thread + } + @Test def dispatcherShouldProcessMessagesInParallel: Unit = { implicit val dispatcher = newInterceptedDispatcher val a, b = newTestActor.start @@ -296,7 +318,7 @@ abstract class ActorModelSpec extends JUnitSuite { } for(run <- 1 to 3) { flood(10000) - await(dispatcher.stops.get == run)(withinMs = 10000) + await(dispatcher.stops.get == run)(withinMs = 11000) assertDispatcher(dispatcher)(starts = run, stops = run) } } diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index afafd284a5..3a96e3ae4f 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -101,7 +101,7 @@ class CallingThreadDispatcher(val warnings: Boolean = true) extends MessageDispa private[akka] override def shutdown {} - private[akka] override def timeoutMs = 0L + private[akka] override def timeoutMs = 100L override def suspend(actor: ActorRef) { getMailbox(actor).suspended.switchOn diff --git a/akka-testkit/src/test/scala/akka/testkit/CallingThreadDispatcherSpec.scala b/akka-testkit/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala similarity index 81% rename from akka-testkit/src/test/scala/akka/testkit/CallingThreadDispatcherSpec.scala rename to akka-testkit/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala index c438a8c3d7..4645b26d1a 100644 --- a/akka-testkit/src/test/scala/akka/testkit/CallingThreadDispatcherSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala @@ -2,7 +2,7 @@ package akka.testkit import akka.actor.dispatch.ActorModelSpec -class CallingThreadDispatcherTest extends ActorModelSpec { +class CallingThreadDispatcherModelSpec extends ActorModelSpec { import ActorModelSpec._ def newInterceptedDispatcher = new CallingThreadDispatcher with MessageDispatcherInterceptor override def dispatcherShouldProcessMessagesInParallel {}