From c3381d8149347620fe019668c42b4a03ec38b6dd Mon Sep 17 00:00:00 2001 From: Jan Van Besien Date: Thu, 1 Apr 2010 15:15:44 +0200 Subject: [PATCH 1/2] Improved unit test performance by replacing Thread.sleep with more clever approaches (CountDownLatch, BlockingQueue and others). Here and there Thread.sleep could also simply be removed. --- .../ActorFireForgetRequestReplyTest.scala | 18 +- .../ClientInitiatedRemoteActorTest.scala | 18 +- ...rBasedEventDrivenDispatcherActorTest.scala | 11 +- ...ventDrivenWorkStealingDispatcherTest.scala | 1 - akka-core/src/test/scala/ForwardActor.scala | 20 +- .../src/test/scala/InMemoryActorTest.scala | 55 ++- ...ThreadEventDrivenDispatcherActorTest.scala | 11 +- ...adPoolEventDrivenDispatcherActorTest.scala | 11 +- .../src/test/scala/RemoteSupervisorTest.scala | 389 +++++------------- .../ServerInitiatedRemoteActorTest.scala | 19 +- akka-core/src/test/scala/SupervisorTest.scala | 384 ++++++----------- .../src/test/scala/ThreadBasedActorTest.scala | 11 +- 12 files changed, 324 insertions(+), 624 deletions(-) diff --git a/akka-core/src/test/scala/ActorFireForgetRequestReplyTest.scala b/akka-core/src/test/scala/ActorFireForgetRequestReplyTest.scala index 8aab34f495..5906468048 100644 --- a/akka-core/src/test/scala/ActorFireForgetRequestReplyTest.scala +++ b/akka-core/src/test/scala/ActorFireForgetRequestReplyTest.scala @@ -1,5 +1,6 @@ package se.scalablesolutions.akka.actor +import java.util.concurrent.{TimeUnit, CountDownLatch} import org.scalatest.junit.JUnitSuite import org.junit.Test @@ -9,6 +10,7 @@ class ActorFireForgetRequestReplyTest extends JUnitSuite { object state { var s = "NIL" + val finished = new CountDownLatch(1) } class ReplyActor extends Actor { @@ -25,9 +27,15 @@ class ActorFireForgetRequestReplyTest extends JUnitSuite { def receive = { case "Init" => replyActor ! "Send" - case "Reply" => state.s = "Reply" + case "Reply" => { + state.s = "Reply" + state.finished.countDown + } case "InitImplicit" => replyActor ! "SendImplicit" - case "ReplyImplicit" => state.s = "ReplyImplicit" + case "ReplyImplicit" => { + state.s = "ReplyImplicit" + state.finished.countDown + } } } @@ -40,7 +48,8 @@ class ActorFireForgetRequestReplyTest extends JUnitSuite { val senderActor = new SenderActor(replyActor) senderActor.start senderActor ! "Init" - Thread.sleep(1000) + state.finished.await(1, TimeUnit.SECONDS) + assert(0 === state.finished.getCount) assert("Reply" === state.s) } @@ -53,7 +62,8 @@ class ActorFireForgetRequestReplyTest extends JUnitSuite { val senderActor = new SenderActor(replyActor) senderActor.start senderActor ! "InitImplicit" - Thread.sleep(1000) + state.finished.await(1, TimeUnit.SECONDS) + assert(0 === state.finished.getCount) assert("ReplyImplicit" === state.s) } } diff --git a/akka-core/src/test/scala/ClientInitiatedRemoteActorTest.scala b/akka-core/src/test/scala/ClientInitiatedRemoteActorTest.scala index 95ae42de30..2df6ea4531 100644 --- a/akka-core/src/test/scala/ClientInitiatedRemoteActorTest.scala +++ b/akka-core/src/test/scala/ClientInitiatedRemoteActorTest.scala @@ -1,6 +1,6 @@ package se.scalablesolutions.akka.actor -import java.util.concurrent.TimeUnit +import java.util.concurrent.{CountDownLatch, TimeUnit} import junit.framework.TestCase import org.scalatest.junit.JUnitSuite @@ -10,15 +10,15 @@ import se.scalablesolutions.akka.remote.{RemoteServer, RemoteClient} import se.scalablesolutions.akka.dispatch.Dispatchers object Global { - var oneWay = "nada" - var remoteReply = "nada" + val oneWay = new CountDownLatch(1) + val remoteReply = new CountDownLatch(1) } class RemoteActorSpecActorUnidirectional extends Actor { dispatcher = Dispatchers.newThreadBasedDispatcher(this) def receive = { case "OneWay" => - Global.oneWay = "received" + Global.oneWay.countDown } } @@ -38,7 +38,7 @@ class RemoteActorSpecActorAsyncSender extends Actor { case Send(actor: Actor) => actor ! "Hello" case "World" => - Global.remoteReply = "replied" + Global.remoteReply.countDown } def send(actor: Actor) { @@ -85,8 +85,8 @@ class ClientInitiatedRemoteActorTest extends JUnitSuite { actor.makeRemote(HOSTNAME, PORT1) actor.start val result = actor ! "OneWay" - Thread.sleep(1000) - assert("received" === Global.oneWay) + Global.oneWay.await(1, TimeUnit.SECONDS) + assert(0 === Global.oneWay.getCount) actor.stop } @@ -111,8 +111,8 @@ class ClientInitiatedRemoteActorTest extends JUnitSuite { sender.setReplyToAddress(HOSTNAME, PORT1) sender.start sender.send(actor) - Thread.sleep(1000) - assert("replied" === Global.remoteReply) + Global.remoteReply.await(1, TimeUnit.SECONDS) + assert(0 === Global.remoteReply.getCount) actor.stop } diff --git a/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorTest.scala b/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorTest.scala index 7fb91fd49d..693802c82d 100644 --- a/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorTest.scala +++ b/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorTest.scala @@ -1,7 +1,6 @@ package se.scalablesolutions.akka.actor -import java.util.concurrent.TimeUnit - +import java.util.concurrent.{CountDownLatch, TimeUnit} import org.scalatest.junit.JUnitSuite import org.junit.Test import se.scalablesolutions.akka.dispatch.Dispatchers @@ -22,17 +21,17 @@ class ExecutorBasedEventDrivenDispatcherActorTest extends JUnitSuite { } @Test def shouldSendOneWay = { - var oneWay = "nada" + val oneWay = new CountDownLatch(1) val actor = new Actor { dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(uuid) def receive = { - case "OneWay" => oneWay = "received" + case "OneWay" => oneWay.countDown } } actor.start val result = actor ! "OneWay" - Thread.sleep(1000) - assert("received" === oneWay) + oneWay.await(1, TimeUnit.SECONDS) + assert(0 === oneWay.getCount) actor.stop } diff --git a/akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherTest.scala b/akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherTest.scala index ecc911734a..cee012d49f 100644 --- a/akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherTest.scala +++ b/akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherTest.scala @@ -22,7 +22,6 @@ class ExecutorBasedEventDrivenWorkStealingDispatcherTest extends JUnitSuite with Thread.sleep(delay) invocationCount += 1 finishedCounter.countDown -// println(id + " processed " + x) } } } diff --git a/akka-core/src/test/scala/ForwardActor.scala b/akka-core/src/test/scala/ForwardActor.scala index ff493c80e8..a477c5ecf8 100644 --- a/akka-core/src/test/scala/ForwardActor.scala +++ b/akka-core/src/test/scala/ForwardActor.scala @@ -1,5 +1,6 @@ package se.scalablesolutions.akka.actor +import java.util.concurrent.{TimeUnit, CountDownLatch} import org.scalatest.junit.JUnitSuite import org.junit.Test @@ -8,12 +9,15 @@ class ForwardActorTest extends JUnitSuite { object ForwardState { var sender: Actor = null - var result: String = "nada" + val finished = new CountDownLatch(1) } class ReceiverActor extends Actor { def receive = { - case "SendBang" => ForwardState.sender = sender.get + case "SendBang" => { + ForwardState.sender = sender.get + ForwardState.finished.countDown + } case "SendBangBang" => reply("SendBangBang") } } @@ -40,7 +44,10 @@ class ForwardActorTest extends JUnitSuite { class BangBangSenderActor extends Actor { val forwardActor = new ForwardActor forwardActor.start - ForwardState.result = (forwardActor !! "SendBangBang").getOrElse("nada") + (forwardActor !! "SendBangBang") match { + case Some(_) => {ForwardState.finished.countDown} + case None => {} + } def receive = { case _ => {} } @@ -50,7 +57,8 @@ class ForwardActorTest extends JUnitSuite { def shouldForwardActorReferenceWhenInvokingForwardOnBang = { val senderActor = new BangSenderActor senderActor.start - Thread.sleep(1000) + ForwardState.finished.await(1, TimeUnit.SECONDS) + assert(0 === ForwardState.finished.getCount) assert(ForwardState.sender ne null) assert(senderActor === ForwardState.sender) } @@ -59,7 +67,7 @@ class ForwardActorTest extends JUnitSuite { def shouldForwardActorReferenceWhenInvokingForwardOnBangBang = { val senderActor = new BangBangSenderActor senderActor.start - Thread.sleep(1000) - assert(ForwardState.result === "SendBangBang") + ForwardState.finished.await(1, TimeUnit.SECONDS) + assert(0 === ForwardState.finished.getCount) } } diff --git a/akka-core/src/test/scala/InMemoryActorTest.scala b/akka-core/src/test/scala/InMemoryActorTest.scala index 5692d7b01f..d88a568db9 100644 --- a/akka-core/src/test/scala/InMemoryActorTest.scala +++ b/akka-core/src/test/scala/InMemoryActorTest.scala @@ -1,5 +1,6 @@ package se.scalablesolutions.akka.actor +import java.util.concurrent.{TimeUnit, CountDownLatch} import org.scalatest.junit.JUnitSuite import org.junit.Test @@ -22,10 +23,13 @@ case class SetRefStateOneWay(key: String) case class SuccessOneWay(key: String, value: String) case class FailureOneWay(key: String, value: String, failer: Actor) -class InMemStatefulActor extends Actor { +class InMemStatefulActor(expectedInvocationCount:Int) extends Actor { + def this() = this(0) timeout = 5000 makeTransactionRequired + val notifier = new CountDownLatch(expectedInvocationCount) + private lazy val mapState = TransactionalState.newMap[String, String] private lazy val vectorState = TransactionalState.newVector[String] private lazy val refState = TransactionalState.newRef[String] @@ -33,46 +37,59 @@ class InMemStatefulActor extends Actor { def receive = { case GetMapState(key) => reply(mapState.get(key).get) + notifier.countDown case GetVectorSize => reply(vectorState.length.asInstanceOf[AnyRef]) + notifier.countDown case GetRefState => reply(refState.get.get) + notifier.countDown case SetMapState(key, msg) => mapState.put(key, msg) reply(msg) + notifier.countDown case SetVectorState(msg) => vectorState.add(msg) reply(msg) + notifier.countDown case SetRefState(msg) => refState.swap(msg) reply(msg) + notifier.countDown case Success(key, msg) => mapState.put(key, msg) vectorState.add(msg) refState.swap(msg) reply(msg) + notifier.countDown case Failure(key, msg, failer) => mapState.put(key, msg) vectorState.add(msg) refState.swap(msg) failer !! "Failure" reply(msg) + notifier.countDown case SetMapStateOneWay(key, msg) => mapState.put(key, msg) + notifier.countDown case SetVectorStateOneWay(msg) => vectorState.add(msg) + notifier.countDown case SetRefStateOneWay(msg) => refState.swap(msg) + notifier.countDown case SuccessOneWay(key, msg) => mapState.put(key, msg) vectorState.add(msg) refState.swap(msg) + notifier.countDown case FailureOneWay(key, msg, failer) => mapState.put(key, msg) vectorState.add(msg) refState.swap(msg) failer ! "Failure" + notifier.countDown } } @@ -84,18 +101,18 @@ class InMemFailerActor extends Actor { throw new RuntimeException("expected") } } - + class InMemoryActorTest extends JUnitSuite { import Actor.Sender.Self @Test def shouldOneWayMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = { - val stateful = new InMemStatefulActor + val stateful = new InMemStatefulActor(2) stateful.start stateful ! SetMapStateOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state - Thread.sleep(1000) stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired - Thread.sleep(1000) + stateful.notifier.await(1, TimeUnit.SECONDS) + assert(0 === stateful.notifier.getCount) assert("new state" === (stateful !! GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")).get) } @@ -110,14 +127,14 @@ class InMemoryActorTest extends JUnitSuite { @Test def shouldOneWayMapShouldRollbackStateForStatefulServerInCaseOfFailure = { - val stateful = new InMemStatefulActor + val stateful = new InMemStatefulActor(2) stateful.start val failer = new InMemFailerActor failer.start stateful ! SetMapStateOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state - Thread.sleep(1000) stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method - Thread.sleep(1000) + stateful.notifier.await(1, TimeUnit.SECONDS) + assert(0 === stateful.notifier.getCount) assert("init" === (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).get) // check that state is == init state } @@ -137,12 +154,12 @@ class InMemoryActorTest extends JUnitSuite { @Test def shouldOneWayVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess = { - val stateful = new InMemStatefulActor + val stateful = new InMemStatefulActor(2) stateful.start stateful ! SetVectorStateOneWay("init") // set init state - Thread.sleep(1000) stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired - Thread.sleep(1000) + stateful.notifier.await(1, TimeUnit.SECONDS) + assert(0 === stateful.notifier.getCount) assert(2 === (stateful !! GetVectorSize).get) } @@ -157,14 +174,15 @@ class InMemoryActorTest extends JUnitSuite { @Test def shouldOneWayVectorShouldRollbackStateForStatefulServerInCaseOfFailure = { - val stateful = new InMemStatefulActor + val stateful = new InMemStatefulActor(2) stateful.start stateful ! SetVectorStateOneWay("init") // set init state Thread.sleep(1000) val failer = new InMemFailerActor failer.start stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method - Thread.sleep(1000) + stateful.notifier.await(1, TimeUnit.SECONDS) + assert(0 === stateful.notifier.getCount) assert(1 === (stateful !! GetVectorSize).get) } @@ -184,12 +202,12 @@ class InMemoryActorTest extends JUnitSuite { @Test def shouldOneWayRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess = { - val stateful = new InMemStatefulActor + val stateful = new InMemStatefulActor(2) stateful.start stateful ! SetRefStateOneWay("init") // set init state - Thread.sleep(1000) stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired - Thread.sleep(1000) + stateful.notifier.await(1, TimeUnit.SECONDS) + assert(0 === stateful.notifier.getCount) assert("new state" === (stateful !! GetRefState).get) } @@ -204,14 +222,15 @@ class InMemoryActorTest extends JUnitSuite { @Test def shouldOneWayRefShouldRollbackStateForStatefulServerInCaseOfFailure = { - val stateful = new InMemStatefulActor + val stateful = new InMemStatefulActor(2) stateful.start stateful ! SetRefStateOneWay("init") // set init state Thread.sleep(1000) val failer = new InMemFailerActor failer.start stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method - Thread.sleep(1000) + stateful.notifier.await(1, TimeUnit.SECONDS) + assert(0 === stateful.notifier.getCount) assert("init" === (stateful !! (GetRefState, 1000000)).get) // check that state is == init state } diff --git a/akka-core/src/test/scala/ReactorBasedSingleThreadEventDrivenDispatcherActorTest.scala b/akka-core/src/test/scala/ReactorBasedSingleThreadEventDrivenDispatcherActorTest.scala index f0c3f0cdf7..51711d4ef0 100644 --- a/akka-core/src/test/scala/ReactorBasedSingleThreadEventDrivenDispatcherActorTest.scala +++ b/akka-core/src/test/scala/ReactorBasedSingleThreadEventDrivenDispatcherActorTest.scala @@ -1,7 +1,6 @@ package se.scalablesolutions.akka.actor -import java.util.concurrent.TimeUnit - +import java.util.concurrent.{CountDownLatch, TimeUnit} import org.scalatest.junit.JUnitSuite import org.junit.Test @@ -24,17 +23,17 @@ class ReactorBasedSingleThreadEventDrivenDispatcherActorTest extends JUnitSuite } @Test def shouldSendOneWay = { - var oneWay = "nada" + val oneWay = new CountDownLatch(1) val actor = new Actor { dispatcher = Dispatchers.newReactorBasedSingleThreadEventDrivenDispatcher(uuid) def receive = { - case "OneWay" => oneWay = "received" + case "OneWay" => oneWay.countDown } } actor.start val result = actor ! "OneWay" - Thread.sleep(1000) - assert("received" === oneWay) + oneWay.await(1, TimeUnit.SECONDS) + assert(0 === oneWay.getCount) actor.stop } diff --git a/akka-core/src/test/scala/ReactorBasedThreadPoolEventDrivenDispatcherActorTest.scala b/akka-core/src/test/scala/ReactorBasedThreadPoolEventDrivenDispatcherActorTest.scala index 99c6d378f0..4cfaeae328 100644 --- a/akka-core/src/test/scala/ReactorBasedThreadPoolEventDrivenDispatcherActorTest.scala +++ b/akka-core/src/test/scala/ReactorBasedThreadPoolEventDrivenDispatcherActorTest.scala @@ -1,7 +1,6 @@ package se.scalablesolutions.akka.actor -import java.util.concurrent.TimeUnit - +import java.util.concurrent.{CountDownLatch, TimeUnit} import org.scalatest.junit.JUnitSuite import org.junit.Test import se.scalablesolutions.akka.dispatch.Dispatchers @@ -22,17 +21,17 @@ class ReactorBasedThreadPoolEventDrivenDispatcherActorTest extends JUnitSuite { } @Test def shouldSendOneWay = { - var oneWay = "nada" + val oneWay = new CountDownLatch(1) val actor = new Actor { dispatcher = Dispatchers.newReactorBasedThreadPoolEventDrivenDispatcher(uuid) def receive = { - case "OneWay" => oneWay = "received" + case "OneWay" => oneWay.countDown } } actor.start val result = actor ! "OneWay" - Thread.sleep(1000) - assert("received" === oneWay) + oneWay.await(1, TimeUnit.SECONDS) + assert(0 === oneWay.getCount) actor.stop } diff --git a/akka-core/src/test/scala/RemoteSupervisorTest.scala b/akka-core/src/test/scala/RemoteSupervisorTest.scala index 409e5ddfa7..933b418445 100644 --- a/akka-core/src/test/scala/RemoteSupervisorTest.scala +++ b/akka-core/src/test/scala/RemoteSupervisorTest.scala @@ -4,6 +4,7 @@ package se.scalablesolutions.akka.actor +import _root_.java.util.concurrent.{LinkedBlockingQueue, TimeUnit, BlockingQueue} import se.scalablesolutions.akka.serialization.BinaryString import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.remote.{RemoteNode, RemoteServer} @@ -14,7 +15,7 @@ import org.scalatest.junit.JUnitSuite import org.junit.Test object Log { - var messageLog: String = "" + var messageLog: BlockingQueue[String] = new LinkedBlockingQueue[String] var oneWayLog: String = "" } @@ -22,7 +23,7 @@ object Log { dispatcher = Dispatchers.newThreadBasedDispatcher(this) def receive = { case BinaryString("Ping") => - Log.messageLog += "ping" + Log.messageLog.put("ping") reply("pong") case OneWay => @@ -33,7 +34,7 @@ object Log { } override protected def postRestart(reason: Throwable) { - Log.messageLog += reason.getMessage + Log.messageLog.put(reason.getMessage) } } @@ -41,14 +42,14 @@ object Log { dispatcher = Dispatchers.newThreadBasedDispatcher(this) def receive = { case BinaryString("Ping") => - Log.messageLog += "ping" + Log.messageLog.put("ping") reply("pong") case BinaryString("Die") => throw new RuntimeException("DIE") } override protected def postRestart(reason: Throwable) { - Log.messageLog += reason.getMessage + Log.messageLog.put(reason.getMessage) } } @@ -56,14 +57,14 @@ object Log { dispatcher = Dispatchers.newThreadBasedDispatcher(this) def receive = { case BinaryString("Ping") => - Log.messageLog += "ping" + Log.messageLog.put("ping") reply("pong") case BinaryString("Die") => throw new RuntimeException("DIE") } override protected def postRestart(reason: Throwable) { - Log.messageLog += reason.getMessage + Log.messageLog.put(reason.getMessage) } } @@ -87,7 +88,7 @@ class RemoteSupervisorTest extends JUnitSuite { var pingpong3: RemotePingPong3Actor = _ @Test def shouldStartServer = { - Log.messageLog = "" + Log.messageLog.clear val sup = getSingleActorAllForOneSupervisor sup.start @@ -97,416 +98,232 @@ class RemoteSupervisorTest extends JUnitSuite { } @Test def shouldKillSingleActorOneForOne = { - Log.messageLog = "" + Log.messageLog.clear val sup = getSingleActorOneForOneSupervisor sup.start - Thread.sleep(500) intercept[RuntimeException] { pingpong1 !! BinaryString("Die") } - Thread.sleep(500) expect("DIE") { - Log.messageLog + Log.messageLog.poll(1, TimeUnit.SECONDS) } } @Test def shouldCallKillCallSingleActorOneForOne = { - Log.messageLog = "" + Log.messageLog.clear val sup = getSingleActorOneForOneSupervisor sup.start - Thread.sleep(500) expect("pong") { (pingpong1 !! BinaryString("Ping")).getOrElse("nil") } - Thread.sleep(500) + expect("ping") { - Log.messageLog + Log.messageLog.poll(1, TimeUnit.SECONDS) } intercept[RuntimeException] { pingpong1 !! BinaryString("Die") } - Thread.sleep(500) - expect("pingDIE") { - Log.messageLog + + expect("DIE") { + Log.messageLog.poll(1, TimeUnit.SECONDS) } expect("pong") { (pingpong1 !! BinaryString("Ping")).getOrElse("nil") } - Thread.sleep(500) - expect("pingDIEping") { - Log.messageLog + + expect("ping") { + Log.messageLog.poll(1, TimeUnit.SECONDS) } } @Test def shouldKillSingleActorAllForOne = { - Log.messageLog = "" + Log.messageLog.clear val sup = getSingleActorAllForOneSupervisor sup.start - Thread.sleep(500) intercept[RuntimeException] { pingpong1 !! BinaryString("Die") } - Thread.sleep(500) + expect("DIE") { - Log.messageLog + Log.messageLog.poll(1, TimeUnit.SECONDS) } } @Test def shouldCallKillCallSingleActorAllForOne = { - Log.messageLog = "" + Log.messageLog.clear val sup = getSingleActorAllForOneSupervisor sup.start - Thread.sleep(500) expect("pong") { (pingpong1 !! BinaryString("Ping")).getOrElse("nil") } - Thread.sleep(500) + expect("ping") { - Log.messageLog + Log.messageLog.poll(1, TimeUnit.SECONDS) } intercept[RuntimeException] { pingpong1 !! BinaryString("Die") } - Thread.sleep(500) - expect("pingDIE") { - Log.messageLog + + expect("DIE") { + Log.messageLog.poll(1, TimeUnit.SECONDS) } expect("pong") { (pingpong1 !! BinaryString("Ping")).getOrElse("nil") } - Thread.sleep(500) - expect("pingDIEping") { - Log.messageLog + + expect("ping") { + Log.messageLog.poll(1, TimeUnit.SECONDS) } } @Test def shouldKillMultipleActorsOneForOne = { - Log.messageLog = "" + Log.messageLog.clear val sup = getMultipleActorsOneForOneConf sup.start - Thread.sleep(500) intercept[RuntimeException] { pingpong3 !! BinaryString("Die") } - Thread.sleep(500) + expect("DIE") { - Log.messageLog + Log.messageLog.poll(1, TimeUnit.SECONDS) } } def tesCallKillCallMultipleActorsOneForOne = { - Log.messageLog = "" + Log.messageLog.clear val sup = getMultipleActorsOneForOneConf sup.start - Thread.sleep(500) expect("pong") { (pingpong1 !! BinaryString("Ping")).getOrElse("nil") } - Thread.sleep(500) + expect("pong") { (pingpong2 !! BinaryString("Ping")).getOrElse("nil") } - Thread.sleep(500) + expect("pong") { (pingpong3 !! BinaryString("Ping")).getOrElse("nil") } - Thread.sleep(500) - expect("pingpingping") { - Log.messageLog + + expect("ping") { + Log.messageLog.poll(1, TimeUnit.SECONDS) + } + expect("ping") { + Log.messageLog.poll(1, TimeUnit.SECONDS) + } + expect("ping") { + Log.messageLog.poll(1, TimeUnit.SECONDS) } intercept[RuntimeException] { pingpong2 !! BinaryString("Die") } - Thread.sleep(500) - expect("pingpingpingDIE") { - Log.messageLog + + expect("DIE") { + Log.messageLog.poll(1, TimeUnit.SECONDS) } expect("pong") { (pingpong1 !! BinaryString("Ping")).getOrElse("nil") } - Thread.sleep(500) + expect("pong") { (pingpong2 !! BinaryString("Ping")).getOrElse("nil") } - Thread.sleep(500) + expect("pong") { (pingpong3 !! BinaryString("Ping")).getOrElse("nil") } - Thread.sleep(500) - expect("pingpingpingDIEpingpingping") { - Log.messageLog + + expect("ping") { + Log.messageLog.poll(1, TimeUnit.SECONDS) + } + expect("ping") { + Log.messageLog.poll(1, TimeUnit.SECONDS) + } + expect("ping") { + Log.messageLog.poll(1, TimeUnit.SECONDS) } } @Test def shouldKillMultipleActorsAllForOne = { - Log.messageLog = "" + Log.messageLog.clear val sup = getMultipleActorsAllForOneConf sup.start - Thread.sleep(500) intercept[RuntimeException] { pingpong2 !! BinaryString("Die") } - Thread.sleep(500) - expect("DIEDIEDIE") { - Log.messageLog + + expect("DIE") { + Log.messageLog.poll(1, TimeUnit.SECONDS) + } + expect("DIE") { + Log.messageLog.poll(1, TimeUnit.SECONDS) + } + expect("DIE") { + Log.messageLog.poll(1, TimeUnit.SECONDS) } } def tesCallKillCallMultipleActorsAllForOne = { - Log.messageLog = "" + Log.messageLog.clear val sup = getMultipleActorsAllForOneConf sup.start - Thread.sleep(500) expect("pong") { (pingpong1 !! BinaryString("Ping")).getOrElse("nil") } - Thread.sleep(500) + expect("pong") { (pingpong2 !! BinaryString("Ping")).getOrElse("nil") } - Thread.sleep(500) + expect("pong") { (pingpong3 !! BinaryString("Ping")).getOrElse("nil") } - Thread.sleep(500) - expect("pingpingping") { - Log.messageLog + + expect("ping") { + Log.messageLog.poll(1, TimeUnit.SECONDS) + } + expect("ping") { + Log.messageLog.poll(1, TimeUnit.SECONDS) + } + expect("ping") { + Log.messageLog.poll(1, TimeUnit.SECONDS) } intercept[RuntimeException] { pingpong2 !! BinaryString("Die") } - Thread.sleep(500) - expect("pingpingpingDIEDIEDIE") { - Log.messageLog + + expect("DIE") { + Log.messageLog.poll(1, TimeUnit.SECONDS) + } + expect("DIE") { + Log.messageLog.poll(1, TimeUnit.SECONDS) + } + expect("DIE") { + Log.messageLog.poll(1, TimeUnit.SECONDS) } expect("pong") { (pingpong1 !! BinaryString("Ping")).getOrElse("nil") } - Thread.sleep(500) + expect("pong") { (pingpong2 !! BinaryString("Ping")).getOrElse("nil") } - Thread.sleep(500) + expect("pong") { (pingpong3 !! BinaryString("Ping")).getOrElse("nil") } - Thread.sleep(500) - expect("pingpingpingDIEDIEDIEpingpingping") { - Log.messageLog + + expect("ping") { + Log.messageLog.poll(1, TimeUnit.SECONDS) + } + expect("ping") { + Log.messageLog.poll(1, TimeUnit.SECONDS) + } + expect("ping") { + Log.messageLog.poll(1, TimeUnit.SECONDS) } } - /* - @Test def shouldOneWayKillSingleActorOneForOne = { - Logg.messageLog = "" - val sup = getSingleActorOneForOneSupervisor - sup.start - Thread.sleep(500) - pingpong1 ! BinaryString("Die") - Thread.sleep(500) - expect("DIE") { - Logg.messageLog - } - } - - @Test def shouldOneWayCallKillCallSingleActorOneForOne = { - Logg.messageLog = "" - val sup = getSingleActorOneForOneSupervisor - sup.start - Thread.sleep(500) - pingpong1 ! OneWay - Thread.sleep(500) - expect("oneway") { - Logg.oneWayLog - } - pingpong1 ! BinaryString("Die") - Thread.sleep(500) - expect("DIE") { - Logg.messageLog - } - pingpong1 ! OneWay - Thread.sleep(500) - expect("onewayoneway") { - Logg.oneWayLog - } - } -*/ - - /* - @Test def shouldOneWayKillSingleActorAllForOne = { - Logg.messageLog = "" - val sup = getSingleActorAllForOneSupervisor - sup.start - Thread.sleep(500) - intercept[RuntimeException] { - pingpong1 ! BinaryString("Die") - } - Thread.sleep(500) - expect("DIE") { - Logg.messageLog - } - } - - @Test def shouldOneWayCallKillCallSingleActorAllForOne = { - Logg.messageLog = "" - val sup = getSingleActorAllForOneSupervisor - sup.start - Thread.sleep(500) - expect("pong") { - (pingpong1 ! BinaryString("Ping")).getOrElse("nil") - } - Thread.sleep(500) - expect("ping") { - Logg.messageLog - } - intercept[RuntimeException] { - pingpong1 ! BinaryString("Die") - } - Thread.sleep(500) - expect("pingDIE") { - Logg.messageLog - } - expect("pong") { - (pingpong1 ! BinaryString("Ping")).getOrElse("nil") - } - Thread.sleep(500) - expect("pingDIEping") { - Logg.messageLog - } - } - - @Test def shouldOneWayKillMultipleActorsOneForOne = { - Logg.messageLog = "" - val sup = getMultipleActorsOneForOneConf - sup.start - Thread.sleep(500) - intercept[RuntimeException] { - pingpong3 ! BinaryString("Die") - } - Thread.sleep(500) - expect("DIE") { - Logg.messageLog - } - } - - def tesOneWayCallKillCallMultipleActorsOneForOne = { - Logg.messageLog = "" - val sup = getMultipleActorsOneForOneConf - sup.start - Thread.sleep(500) - expect("pong") { - (pingpong1 ! BinaryString("Ping")).getOrElse("nil") - } - Thread.sleep(500) - expect("pong") { - (pingpong2 ! BinaryString("Ping")).getOrElse("nil") - } - Thread.sleep(500) - expect("pong") { - (pingpong3 ! BinaryString("Ping")).getOrElse("nil") - } - Thread.sleep(500) - expect("pingpingping") { - Logg.messageLog - } - intercept[RuntimeException] { - pingpong2 ! BinaryString("Die") - } - Thread.sleep(500) - expect("pingpingpingDIE") { - Logg.messageLog - } - expect("pong") { - (pingpong1 ! BinaryString("Ping")).getOrElse("nil") - } - Thread.sleep(500) - expect("pong") { - (pingpong2 ! BinaryString("Ping")).getOrElse("nil") - } - Thread.sleep(500) - expect("pong") { - (pingpong3 ! BinaryString("Ping")).getOrElse("nil") - } - Thread.sleep(500) - expect("pingpingpingDIEpingpingping") { - Logg.messageLog - } - } - - @Test def shouldOneWayKillMultipleActorsAllForOne = { - Logg.messageLog = "" - val sup = getMultipleActorsAllForOneConf - sup.start - Thread.sleep(500) - intercept[RuntimeException] { - pingpong2 ! BinaryString("Die") - } - Thread.sleep(500) - expect("DIEDIEDIE") { - Logg.messageLog - } - } - - def tesOneWayCallKillCallMultipleActorsAllForOne = { - Logg.messageLog = "" - val sup = getMultipleActorsAllForOneConf - sup.start - Thread.sleep(500) - expect("pong") { - pingpong1 ! BinaryString("Ping") - } - Thread.sleep(500) - expect("pong") { - (pingpong2 ! BinaryString("Ping")).getOrElse("nil") - } - Thread.sleep(500) - expect("pong") { - (pingpong3 ! BinaryString("Ping")).getOrElse("nil") - } - Thread.sleep(500) - expect("pingpingping") { - Logg.messageLog - } - intercept[RuntimeException] { - pingpong2 ! BinaryString("Die") - } - Thread.sleep(500) - expect("pingpingpingDIEDIEDIE") { - Logg.messageLog - } - expect("pong") { - (pingpong1 ! BinaryString("Ping")).getOrElse("nil") - } - Thread.sleep(500) - expect("pong") { - (pingpong2 ! BinaryString("Ping")).getOrElse("nil") - } - Thread.sleep(500) - expect("pong") { - (pingpong3 ! BinaryString("Ping")).getOrElse("nil") - } - Thread.sleep(500) - expect("pingpingpingDIEDIEDIEpingpingping") { - Logg.messageLog - } - } - */ - - /* - @Test def shouldNestedSupervisorsTerminateFirstLevelActorAllForOne = { - Logg.messageLog = "" - val sup = getNestedSupervisorsAllForOneConf - sup.start - intercept[RuntimeException] { - pingpong1 !! BinaryString("Die") - } - Thread.sleep(500) - expect("DIEDIEDIE") { - Logg.messageLog - } - } -*/ - // ============================================= // Creat some supervisors with different configurations diff --git a/akka-core/src/test/scala/ServerInitiatedRemoteActorTest.scala b/akka-core/src/test/scala/ServerInitiatedRemoteActorTest.scala index 3fd540e542..64ff11bdd3 100644 --- a/akka-core/src/test/scala/ServerInitiatedRemoteActorTest.scala +++ b/akka-core/src/test/scala/ServerInitiatedRemoteActorTest.scala @@ -1,7 +1,6 @@ package se.scalablesolutions.akka.actor -import java.util.concurrent.TimeUnit - +import java.util.concurrent.{CountDownLatch, TimeUnit} import org.scalatest.junit.JUnitSuite import org.junit.{Test, Before, After} @@ -14,8 +13,8 @@ object ServerInitiatedRemoteActorTest { var server: RemoteServer = null object Global { - var oneWay = "nada" - var remoteReply = "nada" + val oneWay = new CountDownLatch(1) + var remoteReply = new CountDownLatch(1) } class RemoteActorSpecActorUnidirectional extends Actor { @@ -25,7 +24,7 @@ object ServerInitiatedRemoteActorTest { def receive = { case "OneWay" => println("================== ONEWAY") - Global.oneWay = "received" + Global.oneWay.countDown } } @@ -47,7 +46,7 @@ object ServerInitiatedRemoteActorTest { case Send(actor: Actor) => actor ! "Hello" case "World" => - Global.remoteReply = "replied" + Global.remoteReply.countDown } def send(actor: Actor) { @@ -92,8 +91,8 @@ class ServerInitiatedRemoteActorTest extends JUnitSuite { 5000L, HOSTNAME, PORT) val result = actor ! "OneWay" - Thread.sleep(1000) - assert("received" === Global.oneWay) + Global.oneWay.await(1, TimeUnit.SECONDS) + assert(0 === Global.oneWay.getCount) actor.stop } @@ -120,8 +119,8 @@ class ServerInitiatedRemoteActorTest extends JUnitSuite { sender.setReplyToAddress(HOSTNAME, PORT) sender.start sender.send(actor) - Thread.sleep(1000) - assert("replied" === Global.remoteReply) + Global.remoteReply.await(1, TimeUnit.SECONDS) + assert(0 === Global.remoteReply.getCount) actor.stop } diff --git a/akka-core/src/test/scala/SupervisorTest.scala b/akka-core/src/test/scala/SupervisorTest.scala index 41e743533c..b46af44781 100644 --- a/akka-core/src/test/scala/SupervisorTest.scala +++ b/akka-core/src/test/scala/SupervisorTest.scala @@ -4,6 +4,7 @@ package se.scalablesolutions.akka.actor +import _root_.java.util.concurrent.{TimeUnit, BlockingQueue, LinkedBlockingQueue} import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.dispatch.Dispatchers import se.scalablesolutions.akka.{OneWay, Die, Ping} @@ -17,15 +18,15 @@ import org.junit.Test class SupervisorTest extends JUnitSuite { import Actor.Sender.Self - var messageLog: String = "" - var oneWayLog: String = "" - + var messageLog: BlockingQueue[String] = new LinkedBlockingQueue[String] + var oneWayLog: BlockingQueue[String] = new LinkedBlockingQueue[String] + var pingpong1: PingPong1Actor = _ var pingpong2: PingPong2Actor = _ var pingpong3: PingPong3Actor = _ @Test def shouldStartServer = { - messageLog = "" + messageLog.clear val sup = getSingleActorAllForOneSupervisor sup.start @@ -35,414 +36,265 @@ class SupervisorTest extends JUnitSuite { } @Test def shouldKillSingleActorOneForOne = { - messageLog = "" + messageLog.clear val sup = getSingleActorOneForOneSupervisor sup.start - Thread.sleep(500) intercept[RuntimeException] { pingpong1 !! Die } - Thread.sleep(500) + expect("DIE") { - messageLog + messageLog.poll(1, TimeUnit.SECONDS) } } @Test def shouldCallKillCallSingleActorOneForOne = { - messageLog = "" + messageLog.clear val sup = getSingleActorOneForOneSupervisor sup.start - Thread.sleep(500) expect("pong") { (pingpong1 !! Ping).getOrElse("nil") } - Thread.sleep(500) + expect("ping") { - messageLog + messageLog.poll(1, TimeUnit.SECONDS) } intercept[RuntimeException] { pingpong1 !! Die } - Thread.sleep(500) - expect("pingDIE") { - messageLog + + expect("DIE") { + messageLog.poll(1, TimeUnit.SECONDS) } expect("pong") { (pingpong1 !! Ping).getOrElse("nil") } - Thread.sleep(500) - expect("pingDIEping") { - messageLog + + expect("ping") { + messageLog.poll(1, TimeUnit.SECONDS) } } @Test def shouldKillSingleActorAllForOne = { - messageLog = "" + messageLog.clear val sup = getSingleActorAllForOneSupervisor sup.start - Thread.sleep(500) intercept[RuntimeException] { pingpong1 !! Die } - Thread.sleep(500) + expect("DIE") { - messageLog + messageLog.poll(1, TimeUnit.SECONDS) } } @Test def shouldCallKillCallSingleActorAllForOne = { - messageLog = "" + messageLog.clear val sup = getSingleActorAllForOneSupervisor sup.start - Thread.sleep(500) expect("pong") { (pingpong1 !! Ping).getOrElse("nil") } - Thread.sleep(500) + expect("ping") { - messageLog + messageLog.poll(1, TimeUnit.SECONDS) } intercept[RuntimeException] { pingpong1 !! Die } - Thread.sleep(500) - expect("pingDIE") { - messageLog + + expect("DIE") { + messageLog.poll(1, TimeUnit.SECONDS) } expect("pong") { (pingpong1 !! Ping).getOrElse("nil") } - Thread.sleep(500) - expect("pingDIEping") { - messageLog + + expect("ping") { + messageLog.poll(1, TimeUnit.SECONDS) } } @Test def shouldKillMultipleActorsOneForOne = { - messageLog = "" + messageLog.clear val sup = getMultipleActorsOneForOneConf sup.start - Thread.sleep(500) intercept[RuntimeException] { pingpong3 !! Die } - Thread.sleep(500) + expect("DIE") { - messageLog + messageLog.poll(1, TimeUnit.SECONDS) } } def tesCallKillCallMultipleActorsOneForOne = { - messageLog = "" + messageLog.clear val sup = getMultipleActorsOneForOneConf sup.start - Thread.sleep(500) expect("pong") { (pingpong1 !! Ping).getOrElse("nil") } - Thread.sleep(500) + expect("pong") { (pingpong2 !! Ping).getOrElse("nil") } - Thread.sleep(500) + expect("pong") { (pingpong3 !! Ping).getOrElse("nil") } - Thread.sleep(500) - expect("pingpingping") { - messageLog + + expect("ping") { + messageLog.poll(1, TimeUnit.SECONDS) + } + expect("ping") { + messageLog.poll(1, TimeUnit.SECONDS) + } + expect("ping") { + messageLog.poll(1, TimeUnit.SECONDS) } intercept[RuntimeException] { pingpong2 !! Die } - Thread.sleep(500) - expect("pingpingpingDIE") { - messageLog + + expect("DIE") { + messageLog.poll(1, TimeUnit.SECONDS) } expect("pong") { (pingpong1 !! Ping).getOrElse("nil") } - Thread.sleep(500) + expect("pong") { (pingpong2 !! Ping).getOrElse("nil") } - Thread.sleep(500) + expect("pong") { (pingpong3 !! Ping).getOrElse("nil") } - Thread.sleep(500) - expect("pingpingpingDIEpingpingping") { - messageLog + + expect("ping") { + messageLog.poll(1, TimeUnit.SECONDS) + } + expect("ping") { + messageLog.poll(1, TimeUnit.SECONDS) + } + expect("ping") { + messageLog.poll(1, TimeUnit.SECONDS) } } @Test def shouldKillMultipleActorsAllForOne = { - messageLog = "" + messageLog.clear val sup = getMultipleActorsAllForOneConf sup.start - Thread.sleep(500) intercept[RuntimeException] { pingpong2 !! Die } - Thread.sleep(500) - expect("DIEDIEDIE") { - messageLog + + expect("DIE") { + messageLog.poll(1, TimeUnit.SECONDS) + } + expect("DIE") { + messageLog.poll(1, TimeUnit.SECONDS) + } + expect("DIE") { + messageLog.poll(1, TimeUnit.SECONDS) } } def tesCallKillCallMultipleActorsAllForOne = { - messageLog = "" + messageLog.clear val sup = getMultipleActorsAllForOneConf sup.start - Thread.sleep(500) expect("pong") { (pingpong1 !! Ping).getOrElse("nil") } - Thread.sleep(500) + expect("pong") { (pingpong2 !! Ping).getOrElse("nil") } - Thread.sleep(500) + expect("pong") { (pingpong3 !! Ping).getOrElse("nil") } - Thread.sleep(500) - expect("pingpingping") { - messageLog + + expect("ping") { + messageLog.poll(1, TimeUnit.SECONDS) + } + expect("ping") { + messageLog.poll(1, TimeUnit.SECONDS) + } + expect("ping") { + messageLog.poll(1, TimeUnit.SECONDS) } intercept[RuntimeException] { pingpong2 !! Die } - Thread.sleep(500) - expect("pingpingpingDIEDIEDIE") { - messageLog + + expect("DIE") { + messageLog.poll(1, TimeUnit.SECONDS) + } + expect("DIE") { + messageLog.poll(1, TimeUnit.SECONDS) + } + expect("DIE") { + messageLog.poll(1, TimeUnit.SECONDS) } expect("pong") { (pingpong1 !! Ping).getOrElse("nil") } - Thread.sleep(500) + expect("pong") { (pingpong2 !! Ping).getOrElse("nil") } - Thread.sleep(500) + expect("pong") { (pingpong3 !! Ping).getOrElse("nil") } - Thread.sleep(500) - expect("pingpingpingDIEDIEDIEpingpingping") { - messageLog + + expect("ping") { + messageLog.poll(1, TimeUnit.SECONDS) + } + expect("ping") { + messageLog.poll(1, TimeUnit.SECONDS) + } + expect("ping") { + messageLog.poll(1, TimeUnit.SECONDS) } } @Test def shouldOneWayKillSingleActorOneForOne = { - messageLog = "" + messageLog.clear val sup = getSingleActorOneForOneSupervisor sup.start - Thread.sleep(500) pingpong1 ! Die - Thread.sleep(500) + expect("DIE") { - messageLog + messageLog.poll(1, TimeUnit.SECONDS) } } @Test def shouldOneWayCallKillCallSingleActorOneForOne = { - messageLog = "" + messageLog.clear val sup = getSingleActorOneForOneSupervisor sup.start - Thread.sleep(500) pingpong1 ! OneWay - Thread.sleep(500) + expect("oneway") { - oneWayLog + oneWayLog.poll(1, TimeUnit.SECONDS) } pingpong1 ! Die - Thread.sleep(500) + expect("DIE") { - messageLog + messageLog.poll(1, TimeUnit.SECONDS) } pingpong1 ! OneWay - Thread.sleep(500) - expect("onewayoneway") { - oneWayLog + + expect("oneway") { + oneWayLog.poll(1, TimeUnit.SECONDS) } } - /* - @Test def shouldOneWayKillSingleActorAllForOne = { - messageLog = "" - val sup = getSingleActorAllForOneSupervisor - sup.start - Thread.sleep(500) - intercept[RuntimeException] { - pingpong1 ! Die - } - Thread.sleep(500) - expect("DIE") { - messageLog - } - } - - @Test def shouldOneWayCallKillCallSingleActorAllForOne = { - messageLog = "" - val sup = getSingleActorAllForOneSupervisor - sup.start - Thread.sleep(500) - expect("pong") { - (pingpong1 ! Ping).getOrElse("nil") - } - Thread.sleep(500) - expect("ping") { - messageLog - } - intercept[RuntimeException] { - pingpong1 ! Die - } - Thread.sleep(500) - expect("pingDIE") { - messageLog - } - expect("pong") { - (pingpong1 ! Ping).getOrElse("nil") - } - Thread.sleep(500) - expect("pingDIEping") { - messageLog - } - } - - @Test def shouldOneWayKillMultipleActorsOneForOne = { - messageLog = "" - val sup = getMultipleActorsOneForOneConf - sup.start - Thread.sleep(500) - intercept[RuntimeException] { - pingpong3 ! Die - } - Thread.sleep(500) - expect("DIE") { - messageLog - } - } - - def tesOneWayCallKillCallMultipleActorsOneForOne = { - messageLog = "" - val sup = getMultipleActorsOneForOneConf - sup.start - Thread.sleep(500) - expect("pong") { - (pingpong1 ! Ping).getOrElse("nil") - } - Thread.sleep(500) - expect("pong") { - (pingpong2 ! Ping).getOrElse("nil") - } - Thread.sleep(500) - expect("pong") { - (pingpong3 ! Ping).getOrElse("nil") - } - Thread.sleep(500) - expect("pingpingping") { - messageLog - } - intercept[RuntimeException] { - pingpong2 ! Die - } - Thread.sleep(500) - expect("pingpingpingDIE") { - messageLog - } - expect("pong") { - (pingpong1 ! Ping).getOrElse("nil") - } - Thread.sleep(500) - expect("pong") { - (pingpong2 ! Ping).getOrElse("nil") - } - Thread.sleep(500) - expect("pong") { - (pingpong3 ! Ping).getOrElse("nil") - } - Thread.sleep(500) - expect("pingpingpingDIEpingpingping") { - messageLog - } - } - - @Test def shouldOneWayKillMultipleActorsAllForOne = { - messageLog = "" - val sup = getMultipleActorsAllForOneConf - sup.start - Thread.sleep(500) - intercept[RuntimeException] { - pingpong2 ! Die - } - Thread.sleep(500) - expect("DIEDIEDIE") { - messageLog - } - } - - def tesOneWayCallKillCallMultipleActorsAllForOne = { - messageLog = "" - val sup = getMultipleActorsAllForOneConf - sup.start - Thread.sleep(500) - expect("pong") { - pingpong1 ! Ping - } - Thread.sleep(500) - expect("pong") { - (pingpong2 ! Ping).getOrElse("nil") - } - Thread.sleep(500) - expect("pong") { - (pingpong3 ! Ping).getOrElse("nil") - } - Thread.sleep(500) - expect("pingpingping") { - messageLog - } - intercept[RuntimeException] { - pingpong2 ! Die - } - Thread.sleep(500) - expect("pingpingpingDIEDIEDIE") { - messageLog - } - expect("pong") { - (pingpong1 ! Ping).getOrElse("nil") - } - Thread.sleep(500) - expect("pong") { - (pingpong2 ! Ping).getOrElse("nil") - } - Thread.sleep(500) - expect("pong") { - (pingpong3 ! Ping).getOrElse("nil") - } - Thread.sleep(500) - expect("pingpingpingDIEDIEDIEpingpingping") { - messageLog - } - } - */ - - /* - @Test def shouldNestedSupervisorsTerminateFirstLevelActorAllForOne = { - messageLog = "" - val sup = getNestedSupervisorsAllForOneConf - sup.start - intercept[RuntimeException] { - pingpong1 !! Die - } - Thread.sleep(500) - expect("DIEDIEDIE") { - messageLog - } - } -*/ - // ============================================= // Creat some supervisors with different configurations @@ -547,44 +399,44 @@ class SupervisorTest extends JUnitSuite { class PingPong1Actor extends Actor { def receive = { case Ping => - messageLog += "ping" + messageLog.put("ping") reply("pong") case OneWay => - oneWayLog += "oneway" + oneWayLog.put("oneway") case Die => throw new RuntimeException("DIE") } override protected def postRestart(reason: Throwable) { - messageLog += reason.getMessage + messageLog.put(reason.getMessage) } } class PingPong2Actor extends Actor { def receive = { case Ping => - messageLog += "ping" + messageLog.put("ping") reply("pong") case Die => throw new RuntimeException("DIE") } override protected def postRestart(reason: Throwable) { - messageLog += reason.getMessage + messageLog.put(reason.getMessage) } } class PingPong3Actor extends Actor { def receive = { case Ping => - messageLog += "ping" + messageLog.put("ping") reply("pong") case Die => throw new RuntimeException("DIE") } override protected def postRestart(reason: Throwable) { - messageLog += reason.getMessage + messageLog.put(reason.getMessage) } } } diff --git a/akka-core/src/test/scala/ThreadBasedActorTest.scala b/akka-core/src/test/scala/ThreadBasedActorTest.scala index 403dbc0683..902ef78364 100644 --- a/akka-core/src/test/scala/ThreadBasedActorTest.scala +++ b/akka-core/src/test/scala/ThreadBasedActorTest.scala @@ -1,7 +1,6 @@ package se.scalablesolutions.akka.actor -import java.util.concurrent.TimeUnit - +import java.util.concurrent.{CountDownLatch, TimeUnit} import org.scalatest.junit.JUnitSuite import org.junit.Test @@ -24,17 +23,17 @@ class ThreadBasedActorTest extends JUnitSuite { } @Test def shouldSendOneWay = { - var oneWay = "nada" + var oneWay = new CountDownLatch(1) val actor = new Actor { dispatcher = Dispatchers.newThreadBasedDispatcher(this) def receive = { - case "OneWay" => oneWay = "received" + case "OneWay" => oneWay.countDown } } actor.start val result = actor ! "OneWay" - Thread.sleep(1000) - assert("received" === oneWay) + oneWay.await(1, TimeUnit.SECONDS) + assert(0 === oneWay.getCount) actor.stop } From a23cc9b230d4a46f37975065493a908ffcd9e11a Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 1 Apr 2010 22:34:16 +0200 Subject: [PATCH 2/2] Minor cleanups and fixing super.unregister --- akka-core/src/main/scala/actor/Actor.scala | 5 ----- .../AbstractReactorBasedEventDrivenDispatcher.scala | 2 +- akka-core/src/main/scala/dispatch/Dispatchers.scala | 9 ++++++++- akka-core/src/main/scala/dispatch/Reactor.scala | 6 +++++- 4 files changed, 14 insertions(+), 8 deletions(-) diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index e0c6de0f5c..4a52da74b6 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -433,10 +433,6 @@ trait Actor extends TransactionManagement with Logging { def start: Actor = synchronized { if (_isShutDown) throw new IllegalStateException("Can't restart an actor that has been shut down with 'exit'") if (!_isRunning) { - if (messageDispatcher.isShutdown && - messageDispatcher.isInstanceOf[Dispatchers.globalExecutorBasedEventDrivenDispatcher.type]) { - messageDispatcher.asInstanceOf[ExecutorBasedEventDrivenDispatcher].init - } messageDispatcher.register(this) messageDispatcher.start _isRunning = true @@ -459,7 +455,6 @@ trait Actor extends TransactionManagement with Logging { def stop = synchronized { if (_isRunning) { messageDispatcher.unregister(this) - if (messageDispatcher.canBeShutDown) messageDispatcher.shutdown // shut down in the dispatcher's references is zero _isRunning = false _isShutDown = true shutdown diff --git a/akka-core/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala index 332798154a..3727e8ca92 100644 --- a/akka-core/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala @@ -25,7 +25,7 @@ abstract class AbstractReactorBasedEventDrivenDispatcher(val name: String) exten override def unregister(actor: Actor) = synchronized { messageInvokers.remove(actor) - super.register(actor) + super.unregister(actor) } def shutdown = if (active) { diff --git a/akka-core/src/main/scala/dispatch/Dispatchers.scala b/akka-core/src/main/scala/dispatch/Dispatchers.scala index 554be1106b..49d9c624b6 100644 --- a/akka-core/src/main/scala/dispatch/Dispatchers.scala +++ b/akka-core/src/main/scala/dispatch/Dispatchers.scala @@ -39,7 +39,14 @@ import se.scalablesolutions.akka.actor.Actor * @author Jonas Bonér */ object Dispatchers { - object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global") + object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global") { + override def register(actor : Actor) = { + if (isShutdown) + init + super.register(actor) + } + } + object globalReactorBasedSingleThreadEventDrivenDispatcher extends ReactorBasedSingleThreadEventDrivenDispatcher("global") object globalReactorBasedThreadPoolEventDrivenDispatcher extends ReactorBasedThreadPoolEventDrivenDispatcher("global") diff --git a/akka-core/src/main/scala/dispatch/Reactor.scala b/akka-core/src/main/scala/dispatch/Reactor.scala index 627d27aeac..8aee0075ad 100644 --- a/akka-core/src/main/scala/dispatch/Reactor.scala +++ b/akka-core/src/main/scala/dispatch/Reactor.scala @@ -63,7 +63,11 @@ trait MessageDispatcher extends Logging { def start def shutdown def register(actor: Actor) = references.put(actor.uuid, actor) - def unregister(actor: Actor) = references.remove(actor.uuid) + def unregister(actor: Actor) = { + references.remove(actor.uuid) + if (canBeShutDown) + shutdown // shut down in the dispatcher's references is zero + } def canBeShutDown: Boolean = references.isEmpty def isShutdown: Boolean }