diff --git a/akka-actor/src/test/scala/dispatch/ActorModelSpec.scala b/akka-actor/src/test/scala/dispatch/ActorModelSpec.scala index efe362d5c8..1761f0195a 100644 --- a/akka-actor/src/test/scala/dispatch/ActorModelSpec.scala +++ b/akka-actor/src/test/scala/dispatch/ActorModelSpec.scala @@ -23,6 +23,7 @@ object ActorModelSpec { case class Increment(counter: AtomicLong) extends ActorModelMessage case class Await(latch: CountDownLatch) extends ActorModelMessage case class Meet(acknowledge: CountDownLatch, waitFor: CountDownLatch) extends ActorModelMessage + case class CountDownNStop(latch: CountDownLatch) extends ActorModelMessage case object Restart extends ActorModelMessage val Ping = "Ping" @@ -45,6 +46,7 @@ object ActorModelSpec { case Forward(to,msg) => ack; to.forward(msg) case CountDown(latch) => ack; latch.countDown() case Increment(count) => ack; count.incrementAndGet() + case CountDownNStop(l)=> ack; l.countDown; self.stop case Restart => ack; throw new Exception("Restart requested") } } @@ -264,6 +266,23 @@ abstract class ActorModelSpec extends JUnitSuite { assertRefDefaultZero(a)(registers = 1,unregisters = 1, msgsReceived = 1, msgsProcessed = 1, suspensions = 1, resumes = 1) } + + @Test def dispatcherShouldHandleWavesOfActors { + implicit val dispatcher = newInterceptedDispatcher + + def flood(num: Int) { + val cachedMessage = CountDownNStop(new CountDownLatch(num)) + (1 to num) foreach { + _ => newTestActor.start ! cachedMessage + } + assertCountDown(cachedMessage.latch,10000, "Should process " + num + " countdowns") + } + for(run <- 1 to 3) { + flood(10000) + Thread.sleep(dispatcher.timeoutMs * 2) + assertDispatcher(dispatcher)(starts = run, stops = run) + } + } } class ExecutorBasedEventDrivenDispatcherModelTest extends ActorModelSpec {