diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala
index 6a6fc5368b..da0502f000 100644
--- a/akka-core/src/main/scala/actor/Actor.scala
+++ b/akka-core/src/main/scala/actor/Actor.scala
@@ -220,6 +220,7 @@ object Actor extends Logging {
* @author Jonas Bonér
*/
trait Actor extends TransactionManagement with Logging {
+ implicit protected val self: Option[Actor] = Some(this)
// Only mutable for RemoteServer in order to maintain identity across nodes
private[akka] var _uuid = UUID.newUuid.toString
@@ -430,10 +431,6 @@ trait Actor extends TransactionManagement with Logging {
def start: Actor = synchronized {
if (_isShutDown) throw new IllegalStateException("Can't restart an actor that has been shut down with 'exit'")
if (!_isRunning) {
- if (messageDispatcher.isShutdown &&
- messageDispatcher.isInstanceOf[Dispatchers.globalExecutorBasedEventDrivenDispatcher.type]) {
- messageDispatcher.asInstanceOf[ExecutorBasedEventDrivenDispatcher].init
- }
messageDispatcher.register(this)
messageDispatcher.start
_isRunning = true
@@ -456,7 +453,6 @@ trait Actor extends TransactionManagement with Logging {
def stop = synchronized {
if (_isRunning) {
messageDispatcher.unregister(this)
- if (messageDispatcher.canBeShutDown) messageDispatcher.shutdown // shut down in the dispatcher's references is zero
_isRunning = false
_isShutDown = true
shutdown
diff --git a/akka-core/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala
index 332798154a..3727e8ca92 100644
--- a/akka-core/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala
+++ b/akka-core/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala
@@ -25,7 +25,7 @@ abstract class AbstractReactorBasedEventDrivenDispatcher(val name: String) exten
override def unregister(actor: Actor) = synchronized {
messageInvokers.remove(actor)
- super.register(actor)
+ super.unregister(actor)
}
def shutdown = if (active) {
diff --git a/akka-core/src/main/scala/dispatch/Dispatchers.scala b/akka-core/src/main/scala/dispatch/Dispatchers.scala
index 554be1106b..49d9c624b6 100644
--- a/akka-core/src/main/scala/dispatch/Dispatchers.scala
+++ b/akka-core/src/main/scala/dispatch/Dispatchers.scala
@@ -39,7 +39,14 @@ import se.scalablesolutions.akka.actor.Actor
* @author Jonas Bonér
*/
object Dispatchers {
- object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global")
+ object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global") {
+ override def register(actor : Actor) = {
+ if (isShutdown)
+ init
+ super.register(actor)
+ }
+ }
+
object globalReactorBasedSingleThreadEventDrivenDispatcher extends ReactorBasedSingleThreadEventDrivenDispatcher("global")
object globalReactorBasedThreadPoolEventDrivenDispatcher extends ReactorBasedThreadPoolEventDrivenDispatcher("global")
diff --git a/akka-core/src/main/scala/dispatch/Reactor.scala b/akka-core/src/main/scala/dispatch/Reactor.scala
index 627d27aeac..8aee0075ad 100644
--- a/akka-core/src/main/scala/dispatch/Reactor.scala
+++ b/akka-core/src/main/scala/dispatch/Reactor.scala
@@ -63,7 +63,11 @@ trait MessageDispatcher extends Logging {
def start
def shutdown
def register(actor: Actor) = references.put(actor.uuid, actor)
- def unregister(actor: Actor) = references.remove(actor.uuid)
+ def unregister(actor: Actor) = {
+ references.remove(actor.uuid)
+ if (canBeShutDown)
+ shutdown // shut down in the dispatcher's references is zero
+ }
def canBeShutDown: Boolean = references.isEmpty
def isShutdown: Boolean
}
diff --git a/akka-core/src/test/scala/ActorFireForgetRequestReplyTest.scala b/akka-core/src/test/scala/ActorFireForgetRequestReplyTest.scala
index 8aab34f495..5906468048 100644
--- a/akka-core/src/test/scala/ActorFireForgetRequestReplyTest.scala
+++ b/akka-core/src/test/scala/ActorFireForgetRequestReplyTest.scala
@@ -1,5 +1,6 @@
package se.scalablesolutions.akka.actor
+import java.util.concurrent.{TimeUnit, CountDownLatch}
import org.scalatest.junit.JUnitSuite
import org.junit.Test
@@ -9,6 +10,7 @@ class ActorFireForgetRequestReplyTest extends JUnitSuite {
object state {
var s = "NIL"
+ val finished = new CountDownLatch(1)
}
class ReplyActor extends Actor {
@@ -25,9 +27,15 @@ class ActorFireForgetRequestReplyTest extends JUnitSuite {
def receive = {
case "Init" => replyActor ! "Send"
- case "Reply" => state.s = "Reply"
+ case "Reply" => {
+ state.s = "Reply"
+ state.finished.countDown
+ }
case "InitImplicit" => replyActor ! "SendImplicit"
- case "ReplyImplicit" => state.s = "ReplyImplicit"
+ case "ReplyImplicit" => {
+ state.s = "ReplyImplicit"
+ state.finished.countDown
+ }
}
}
@@ -40,7 +48,8 @@ class ActorFireForgetRequestReplyTest extends JUnitSuite {
val senderActor = new SenderActor(replyActor)
senderActor.start
senderActor ! "Init"
- Thread.sleep(1000)
+ state.finished.await(1, TimeUnit.SECONDS)
+ assert(0 === state.finished.getCount)
assert("Reply" === state.s)
}
@@ -53,7 +62,8 @@ class ActorFireForgetRequestReplyTest extends JUnitSuite {
val senderActor = new SenderActor(replyActor)
senderActor.start
senderActor ! "InitImplicit"
- Thread.sleep(1000)
+ state.finished.await(1, TimeUnit.SECONDS)
+ assert(0 === state.finished.getCount)
assert("ReplyImplicit" === state.s)
}
}
diff --git a/akka-core/src/test/scala/ClientInitiatedRemoteActorTest.scala b/akka-core/src/test/scala/ClientInitiatedRemoteActorTest.scala
index 95ae42de30..2df6ea4531 100644
--- a/akka-core/src/test/scala/ClientInitiatedRemoteActorTest.scala
+++ b/akka-core/src/test/scala/ClientInitiatedRemoteActorTest.scala
@@ -1,6 +1,6 @@
package se.scalablesolutions.akka.actor
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{CountDownLatch, TimeUnit}
import junit.framework.TestCase
import org.scalatest.junit.JUnitSuite
@@ -10,15 +10,15 @@ import se.scalablesolutions.akka.remote.{RemoteServer, RemoteClient}
import se.scalablesolutions.akka.dispatch.Dispatchers
object Global {
- var oneWay = "nada"
- var remoteReply = "nada"
+ val oneWay = new CountDownLatch(1)
+ val remoteReply = new CountDownLatch(1)
}
class RemoteActorSpecActorUnidirectional extends Actor {
dispatcher = Dispatchers.newThreadBasedDispatcher(this)
def receive = {
case "OneWay" =>
- Global.oneWay = "received"
+ Global.oneWay.countDown
}
}
@@ -38,7 +38,7 @@ class RemoteActorSpecActorAsyncSender extends Actor {
case Send(actor: Actor) =>
actor ! "Hello"
case "World" =>
- Global.remoteReply = "replied"
+ Global.remoteReply.countDown
}
def send(actor: Actor) {
@@ -85,8 +85,8 @@ class ClientInitiatedRemoteActorTest extends JUnitSuite {
actor.makeRemote(HOSTNAME, PORT1)
actor.start
val result = actor ! "OneWay"
- Thread.sleep(1000)
- assert("received" === Global.oneWay)
+ Global.oneWay.await(1, TimeUnit.SECONDS)
+ assert(0 === Global.oneWay.getCount)
actor.stop
}
@@ -111,8 +111,8 @@ class ClientInitiatedRemoteActorTest extends JUnitSuite {
sender.setReplyToAddress(HOSTNAME, PORT1)
sender.start
sender.send(actor)
- Thread.sleep(1000)
- assert("replied" === Global.remoteReply)
+ Global.remoteReply.await(1, TimeUnit.SECONDS)
+ assert(0 === Global.remoteReply.getCount)
actor.stop
}
diff --git a/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorTest.scala b/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorTest.scala
index 7fb91fd49d..693802c82d 100644
--- a/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorTest.scala
+++ b/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorTest.scala
@@ -1,7 +1,6 @@
package se.scalablesolutions.akka.actor
-import java.util.concurrent.TimeUnit
-
+import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import se.scalablesolutions.akka.dispatch.Dispatchers
@@ -22,17 +21,17 @@ class ExecutorBasedEventDrivenDispatcherActorTest extends JUnitSuite {
}
@Test def shouldSendOneWay = {
- var oneWay = "nada"
+ val oneWay = new CountDownLatch(1)
val actor = new Actor {
dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(uuid)
def receive = {
- case "OneWay" => oneWay = "received"
+ case "OneWay" => oneWay.countDown
}
}
actor.start
val result = actor ! "OneWay"
- Thread.sleep(1000)
- assert("received" === oneWay)
+ oneWay.await(1, TimeUnit.SECONDS)
+ assert(0 === oneWay.getCount)
actor.stop
}
diff --git a/akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherTest.scala b/akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherTest.scala
index ecc911734a..cee012d49f 100644
--- a/akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherTest.scala
+++ b/akka-core/src/test/scala/ExecutorBasedEventDrivenWorkStealingDispatcherTest.scala
@@ -22,7 +22,6 @@ class ExecutorBasedEventDrivenWorkStealingDispatcherTest extends JUnitSuite with
Thread.sleep(delay)
invocationCount += 1
finishedCounter.countDown
-// println(id + " processed " + x)
}
}
}
diff --git a/akka-core/src/test/scala/ForwardActor.scala b/akka-core/src/test/scala/ForwardActor.scala
index ff493c80e8..a477c5ecf8 100644
--- a/akka-core/src/test/scala/ForwardActor.scala
+++ b/akka-core/src/test/scala/ForwardActor.scala
@@ -1,5 +1,6 @@
package se.scalablesolutions.akka.actor
+import java.util.concurrent.{TimeUnit, CountDownLatch}
import org.scalatest.junit.JUnitSuite
import org.junit.Test
@@ -8,12 +9,15 @@ class ForwardActorTest extends JUnitSuite {
object ForwardState {
var sender: Actor = null
- var result: String = "nada"
+ val finished = new CountDownLatch(1)
}
class ReceiverActor extends Actor {
def receive = {
- case "SendBang" => ForwardState.sender = sender.get
+ case "SendBang" => {
+ ForwardState.sender = sender.get
+ ForwardState.finished.countDown
+ }
case "SendBangBang" => reply("SendBangBang")
}
}
@@ -40,7 +44,10 @@ class ForwardActorTest extends JUnitSuite {
class BangBangSenderActor extends Actor {
val forwardActor = new ForwardActor
forwardActor.start
- ForwardState.result = (forwardActor !! "SendBangBang").getOrElse("nada")
+ (forwardActor !! "SendBangBang") match {
+ case Some(_) => {ForwardState.finished.countDown}
+ case None => {}
+ }
def receive = {
case _ => {}
}
@@ -50,7 +57,8 @@ class ForwardActorTest extends JUnitSuite {
def shouldForwardActorReferenceWhenInvokingForwardOnBang = {
val senderActor = new BangSenderActor
senderActor.start
- Thread.sleep(1000)
+ ForwardState.finished.await(1, TimeUnit.SECONDS)
+ assert(0 === ForwardState.finished.getCount)
assert(ForwardState.sender ne null)
assert(senderActor === ForwardState.sender)
}
@@ -59,7 +67,7 @@ class ForwardActorTest extends JUnitSuite {
def shouldForwardActorReferenceWhenInvokingForwardOnBangBang = {
val senderActor = new BangBangSenderActor
senderActor.start
- Thread.sleep(1000)
- assert(ForwardState.result === "SendBangBang")
+ ForwardState.finished.await(1, TimeUnit.SECONDS)
+ assert(0 === ForwardState.finished.getCount)
}
}
diff --git a/akka-core/src/test/scala/InMemoryActorTest.scala b/akka-core/src/test/scala/InMemoryActorTest.scala
index 5692d7b01f..d88a568db9 100644
--- a/akka-core/src/test/scala/InMemoryActorTest.scala
+++ b/akka-core/src/test/scala/InMemoryActorTest.scala
@@ -1,5 +1,6 @@
package se.scalablesolutions.akka.actor
+import java.util.concurrent.{TimeUnit, CountDownLatch}
import org.scalatest.junit.JUnitSuite
import org.junit.Test
@@ -22,10 +23,13 @@ case class SetRefStateOneWay(key: String)
case class SuccessOneWay(key: String, value: String)
case class FailureOneWay(key: String, value: String, failer: Actor)
-class InMemStatefulActor extends Actor {
+class InMemStatefulActor(expectedInvocationCount:Int) extends Actor {
+ def this() = this(0)
timeout = 5000
makeTransactionRequired
+ val notifier = new CountDownLatch(expectedInvocationCount)
+
private lazy val mapState = TransactionalState.newMap[String, String]
private lazy val vectorState = TransactionalState.newVector[String]
private lazy val refState = TransactionalState.newRef[String]
@@ -33,46 +37,59 @@ class InMemStatefulActor extends Actor {
def receive = {
case GetMapState(key) =>
reply(mapState.get(key).get)
+ notifier.countDown
case GetVectorSize =>
reply(vectorState.length.asInstanceOf[AnyRef])
+ notifier.countDown
case GetRefState =>
reply(refState.get.get)
+ notifier.countDown
case SetMapState(key, msg) =>
mapState.put(key, msg)
reply(msg)
+ notifier.countDown
case SetVectorState(msg) =>
vectorState.add(msg)
reply(msg)
+ notifier.countDown
case SetRefState(msg) =>
refState.swap(msg)
reply(msg)
+ notifier.countDown
case Success(key, msg) =>
mapState.put(key, msg)
vectorState.add(msg)
refState.swap(msg)
reply(msg)
+ notifier.countDown
case Failure(key, msg, failer) =>
mapState.put(key, msg)
vectorState.add(msg)
refState.swap(msg)
failer !! "Failure"
reply(msg)
+ notifier.countDown
case SetMapStateOneWay(key, msg) =>
mapState.put(key, msg)
+ notifier.countDown
case SetVectorStateOneWay(msg) =>
vectorState.add(msg)
+ notifier.countDown
case SetRefStateOneWay(msg) =>
refState.swap(msg)
+ notifier.countDown
case SuccessOneWay(key, msg) =>
mapState.put(key, msg)
vectorState.add(msg)
refState.swap(msg)
+ notifier.countDown
case FailureOneWay(key, msg, failer) =>
mapState.put(key, msg)
vectorState.add(msg)
refState.swap(msg)
failer ! "Failure"
+ notifier.countDown
}
}
@@ -84,18 +101,18 @@ class InMemFailerActor extends Actor {
throw new RuntimeException("expected")
}
}
-
+
class InMemoryActorTest extends JUnitSuite {
import Actor.Sender.Self
@Test
def shouldOneWayMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
- val stateful = new InMemStatefulActor
+ val stateful = new InMemStatefulActor(2)
stateful.start
stateful ! SetMapStateOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state
- Thread.sleep(1000)
stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
- Thread.sleep(1000)
+ stateful.notifier.await(1, TimeUnit.SECONDS)
+ assert(0 === stateful.notifier.getCount)
assert("new state" === (stateful !! GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")).get)
}
@@ -110,14 +127,14 @@ class InMemoryActorTest extends JUnitSuite {
@Test
def shouldOneWayMapShouldRollbackStateForStatefulServerInCaseOfFailure = {
- val stateful = new InMemStatefulActor
+ val stateful = new InMemStatefulActor(2)
stateful.start
val failer = new InMemFailerActor
failer.start
stateful ! SetMapStateOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state
- Thread.sleep(1000)
stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
- Thread.sleep(1000)
+ stateful.notifier.await(1, TimeUnit.SECONDS)
+ assert(0 === stateful.notifier.getCount)
assert("init" === (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).get) // check that state is == init state
}
@@ -137,12 +154,12 @@ class InMemoryActorTest extends JUnitSuite {
@Test
def shouldOneWayVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
- val stateful = new InMemStatefulActor
+ val stateful = new InMemStatefulActor(2)
stateful.start
stateful ! SetVectorStateOneWay("init") // set init state
- Thread.sleep(1000)
stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
- Thread.sleep(1000)
+ stateful.notifier.await(1, TimeUnit.SECONDS)
+ assert(0 === stateful.notifier.getCount)
assert(2 === (stateful !! GetVectorSize).get)
}
@@ -157,14 +174,15 @@ class InMemoryActorTest extends JUnitSuite {
@Test
def shouldOneWayVectorShouldRollbackStateForStatefulServerInCaseOfFailure = {
- val stateful = new InMemStatefulActor
+ val stateful = new InMemStatefulActor(2)
stateful.start
stateful ! SetVectorStateOneWay("init") // set init state
Thread.sleep(1000)
val failer = new InMemFailerActor
failer.start
stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
- Thread.sleep(1000)
+ stateful.notifier.await(1, TimeUnit.SECONDS)
+ assert(0 === stateful.notifier.getCount)
assert(1 === (stateful !! GetVectorSize).get)
}
@@ -184,12 +202,12 @@ class InMemoryActorTest extends JUnitSuite {
@Test
def shouldOneWayRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
- val stateful = new InMemStatefulActor
+ val stateful = new InMemStatefulActor(2)
stateful.start
stateful ! SetRefStateOneWay("init") // set init state
- Thread.sleep(1000)
stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
- Thread.sleep(1000)
+ stateful.notifier.await(1, TimeUnit.SECONDS)
+ assert(0 === stateful.notifier.getCount)
assert("new state" === (stateful !! GetRefState).get)
}
@@ -204,14 +222,15 @@ class InMemoryActorTest extends JUnitSuite {
@Test
def shouldOneWayRefShouldRollbackStateForStatefulServerInCaseOfFailure = {
- val stateful = new InMemStatefulActor
+ val stateful = new InMemStatefulActor(2)
stateful.start
stateful ! SetRefStateOneWay("init") // set init state
Thread.sleep(1000)
val failer = new InMemFailerActor
failer.start
stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
- Thread.sleep(1000)
+ stateful.notifier.await(1, TimeUnit.SECONDS)
+ assert(0 === stateful.notifier.getCount)
assert("init" === (stateful !! (GetRefState, 1000000)).get) // check that state is == init state
}
diff --git a/akka-core/src/test/scala/ReactorBasedSingleThreadEventDrivenDispatcherActorTest.scala b/akka-core/src/test/scala/ReactorBasedSingleThreadEventDrivenDispatcherActorTest.scala
index f0c3f0cdf7..51711d4ef0 100644
--- a/akka-core/src/test/scala/ReactorBasedSingleThreadEventDrivenDispatcherActorTest.scala
+++ b/akka-core/src/test/scala/ReactorBasedSingleThreadEventDrivenDispatcherActorTest.scala
@@ -1,7 +1,6 @@
package se.scalablesolutions.akka.actor
-import java.util.concurrent.TimeUnit
-
+import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.scalatest.junit.JUnitSuite
import org.junit.Test
@@ -24,17 +23,17 @@ class ReactorBasedSingleThreadEventDrivenDispatcherActorTest extends JUnitSuite
}
@Test def shouldSendOneWay = {
- var oneWay = "nada"
+ val oneWay = new CountDownLatch(1)
val actor = new Actor {
dispatcher = Dispatchers.newReactorBasedSingleThreadEventDrivenDispatcher(uuid)
def receive = {
- case "OneWay" => oneWay = "received"
+ case "OneWay" => oneWay.countDown
}
}
actor.start
val result = actor ! "OneWay"
- Thread.sleep(1000)
- assert("received" === oneWay)
+ oneWay.await(1, TimeUnit.SECONDS)
+ assert(0 === oneWay.getCount)
actor.stop
}
diff --git a/akka-core/src/test/scala/ReactorBasedThreadPoolEventDrivenDispatcherActorTest.scala b/akka-core/src/test/scala/ReactorBasedThreadPoolEventDrivenDispatcherActorTest.scala
index 99c6d378f0..4cfaeae328 100644
--- a/akka-core/src/test/scala/ReactorBasedThreadPoolEventDrivenDispatcherActorTest.scala
+++ b/akka-core/src/test/scala/ReactorBasedThreadPoolEventDrivenDispatcherActorTest.scala
@@ -1,7 +1,6 @@
package se.scalablesolutions.akka.actor
-import java.util.concurrent.TimeUnit
-
+import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import se.scalablesolutions.akka.dispatch.Dispatchers
@@ -22,17 +21,17 @@ class ReactorBasedThreadPoolEventDrivenDispatcherActorTest extends JUnitSuite {
}
@Test def shouldSendOneWay = {
- var oneWay = "nada"
+ val oneWay = new CountDownLatch(1)
val actor = new Actor {
dispatcher = Dispatchers.newReactorBasedThreadPoolEventDrivenDispatcher(uuid)
def receive = {
- case "OneWay" => oneWay = "received"
+ case "OneWay" => oneWay.countDown
}
}
actor.start
val result = actor ! "OneWay"
- Thread.sleep(1000)
- assert("received" === oneWay)
+ oneWay.await(1, TimeUnit.SECONDS)
+ assert(0 === oneWay.getCount)
actor.stop
}
diff --git a/akka-core/src/test/scala/RemoteSupervisorTest.scala b/akka-core/src/test/scala/RemoteSupervisorTest.scala
index 409e5ddfa7..933b418445 100644
--- a/akka-core/src/test/scala/RemoteSupervisorTest.scala
+++ b/akka-core/src/test/scala/RemoteSupervisorTest.scala
@@ -4,6 +4,7 @@
package se.scalablesolutions.akka.actor
+import _root_.java.util.concurrent.{LinkedBlockingQueue, TimeUnit, BlockingQueue}
import se.scalablesolutions.akka.serialization.BinaryString
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.remote.{RemoteNode, RemoteServer}
@@ -14,7 +15,7 @@ import org.scalatest.junit.JUnitSuite
import org.junit.Test
object Log {
- var messageLog: String = ""
+ var messageLog: BlockingQueue[String] = new LinkedBlockingQueue[String]
var oneWayLog: String = ""
}
@@ -22,7 +23,7 @@ object Log {
dispatcher = Dispatchers.newThreadBasedDispatcher(this)
def receive = {
case BinaryString("Ping") =>
- Log.messageLog += "ping"
+ Log.messageLog.put("ping")
reply("pong")
case OneWay =>
@@ -33,7 +34,7 @@ object Log {
}
override protected def postRestart(reason: Throwable) {
- Log.messageLog += reason.getMessage
+ Log.messageLog.put(reason.getMessage)
}
}
@@ -41,14 +42,14 @@ object Log {
dispatcher = Dispatchers.newThreadBasedDispatcher(this)
def receive = {
case BinaryString("Ping") =>
- Log.messageLog += "ping"
+ Log.messageLog.put("ping")
reply("pong")
case BinaryString("Die") =>
throw new RuntimeException("DIE")
}
override protected def postRestart(reason: Throwable) {
- Log.messageLog += reason.getMessage
+ Log.messageLog.put(reason.getMessage)
}
}
@@ -56,14 +57,14 @@ object Log {
dispatcher = Dispatchers.newThreadBasedDispatcher(this)
def receive = {
case BinaryString("Ping") =>
- Log.messageLog += "ping"
+ Log.messageLog.put("ping")
reply("pong")
case BinaryString("Die") =>
throw new RuntimeException("DIE")
}
override protected def postRestart(reason: Throwable) {
- Log.messageLog += reason.getMessage
+ Log.messageLog.put(reason.getMessage)
}
}
@@ -87,7 +88,7 @@ class RemoteSupervisorTest extends JUnitSuite {
var pingpong3: RemotePingPong3Actor = _
@Test def shouldStartServer = {
- Log.messageLog = ""
+ Log.messageLog.clear
val sup = getSingleActorAllForOneSupervisor
sup.start
@@ -97,416 +98,232 @@ class RemoteSupervisorTest extends JUnitSuite {
}
@Test def shouldKillSingleActorOneForOne = {
- Log.messageLog = ""
+ Log.messageLog.clear
val sup = getSingleActorOneForOneSupervisor
sup.start
- Thread.sleep(500)
intercept[RuntimeException] {
pingpong1 !! BinaryString("Die")
}
- Thread.sleep(500)
expect("DIE") {
- Log.messageLog
+ Log.messageLog.poll(1, TimeUnit.SECONDS)
}
}
@Test def shouldCallKillCallSingleActorOneForOne = {
- Log.messageLog = ""
+ Log.messageLog.clear
val sup = getSingleActorOneForOneSupervisor
sup.start
- Thread.sleep(500)
expect("pong") {
(pingpong1 !! BinaryString("Ping")).getOrElse("nil")
}
- Thread.sleep(500)
+
expect("ping") {
- Log.messageLog
+ Log.messageLog.poll(1, TimeUnit.SECONDS)
}
intercept[RuntimeException] {
pingpong1 !! BinaryString("Die")
}
- Thread.sleep(500)
- expect("pingDIE") {
- Log.messageLog
+
+ expect("DIE") {
+ Log.messageLog.poll(1, TimeUnit.SECONDS)
}
expect("pong") {
(pingpong1 !! BinaryString("Ping")).getOrElse("nil")
}
- Thread.sleep(500)
- expect("pingDIEping") {
- Log.messageLog
+
+ expect("ping") {
+ Log.messageLog.poll(1, TimeUnit.SECONDS)
}
}
@Test def shouldKillSingleActorAllForOne = {
- Log.messageLog = ""
+ Log.messageLog.clear
val sup = getSingleActorAllForOneSupervisor
sup.start
- Thread.sleep(500)
intercept[RuntimeException] {
pingpong1 !! BinaryString("Die")
}
- Thread.sleep(500)
+
expect("DIE") {
- Log.messageLog
+ Log.messageLog.poll(1, TimeUnit.SECONDS)
}
}
@Test def shouldCallKillCallSingleActorAllForOne = {
- Log.messageLog = ""
+ Log.messageLog.clear
val sup = getSingleActorAllForOneSupervisor
sup.start
- Thread.sleep(500)
expect("pong") {
(pingpong1 !! BinaryString("Ping")).getOrElse("nil")
}
- Thread.sleep(500)
+
expect("ping") {
- Log.messageLog
+ Log.messageLog.poll(1, TimeUnit.SECONDS)
}
intercept[RuntimeException] {
pingpong1 !! BinaryString("Die")
}
- Thread.sleep(500)
- expect("pingDIE") {
- Log.messageLog
+
+ expect("DIE") {
+ Log.messageLog.poll(1, TimeUnit.SECONDS)
}
expect("pong") {
(pingpong1 !! BinaryString("Ping")).getOrElse("nil")
}
- Thread.sleep(500)
- expect("pingDIEping") {
- Log.messageLog
+
+ expect("ping") {
+ Log.messageLog.poll(1, TimeUnit.SECONDS)
}
}
@Test def shouldKillMultipleActorsOneForOne = {
- Log.messageLog = ""
+ Log.messageLog.clear
val sup = getMultipleActorsOneForOneConf
sup.start
- Thread.sleep(500)
intercept[RuntimeException] {
pingpong3 !! BinaryString("Die")
}
- Thread.sleep(500)
+
expect("DIE") {
- Log.messageLog
+ Log.messageLog.poll(1, TimeUnit.SECONDS)
}
}
def tesCallKillCallMultipleActorsOneForOne = {
- Log.messageLog = ""
+ Log.messageLog.clear
val sup = getMultipleActorsOneForOneConf
sup.start
- Thread.sleep(500)
expect("pong") {
(pingpong1 !! BinaryString("Ping")).getOrElse("nil")
}
- Thread.sleep(500)
+
expect("pong") {
(pingpong2 !! BinaryString("Ping")).getOrElse("nil")
}
- Thread.sleep(500)
+
expect("pong") {
(pingpong3 !! BinaryString("Ping")).getOrElse("nil")
}
- Thread.sleep(500)
- expect("pingpingping") {
- Log.messageLog
+
+ expect("ping") {
+ Log.messageLog.poll(1, TimeUnit.SECONDS)
+ }
+ expect("ping") {
+ Log.messageLog.poll(1, TimeUnit.SECONDS)
+ }
+ expect("ping") {
+ Log.messageLog.poll(1, TimeUnit.SECONDS)
}
intercept[RuntimeException] {
pingpong2 !! BinaryString("Die")
}
- Thread.sleep(500)
- expect("pingpingpingDIE") {
- Log.messageLog
+
+ expect("DIE") {
+ Log.messageLog.poll(1, TimeUnit.SECONDS)
}
expect("pong") {
(pingpong1 !! BinaryString("Ping")).getOrElse("nil")
}
- Thread.sleep(500)
+
expect("pong") {
(pingpong2 !! BinaryString("Ping")).getOrElse("nil")
}
- Thread.sleep(500)
+
expect("pong") {
(pingpong3 !! BinaryString("Ping")).getOrElse("nil")
}
- Thread.sleep(500)
- expect("pingpingpingDIEpingpingping") {
- Log.messageLog
+
+ expect("ping") {
+ Log.messageLog.poll(1, TimeUnit.SECONDS)
+ }
+ expect("ping") {
+ Log.messageLog.poll(1, TimeUnit.SECONDS)
+ }
+ expect("ping") {
+ Log.messageLog.poll(1, TimeUnit.SECONDS)
}
}
@Test def shouldKillMultipleActorsAllForOne = {
- Log.messageLog = ""
+ Log.messageLog.clear
val sup = getMultipleActorsAllForOneConf
sup.start
- Thread.sleep(500)
intercept[RuntimeException] {
pingpong2 !! BinaryString("Die")
}
- Thread.sleep(500)
- expect("DIEDIEDIE") {
- Log.messageLog
+
+ expect("DIE") {
+ Log.messageLog.poll(1, TimeUnit.SECONDS)
+ }
+ expect("DIE") {
+ Log.messageLog.poll(1, TimeUnit.SECONDS)
+ }
+ expect("DIE") {
+ Log.messageLog.poll(1, TimeUnit.SECONDS)
}
}
def tesCallKillCallMultipleActorsAllForOne = {
- Log.messageLog = ""
+ Log.messageLog.clear
val sup = getMultipleActorsAllForOneConf
sup.start
- Thread.sleep(500)
expect("pong") {
(pingpong1 !! BinaryString("Ping")).getOrElse("nil")
}
- Thread.sleep(500)
+
expect("pong") {
(pingpong2 !! BinaryString("Ping")).getOrElse("nil")
}
- Thread.sleep(500)
+
expect("pong") {
(pingpong3 !! BinaryString("Ping")).getOrElse("nil")
}
- Thread.sleep(500)
- expect("pingpingping") {
- Log.messageLog
+
+ expect("ping") {
+ Log.messageLog.poll(1, TimeUnit.SECONDS)
+ }
+ expect("ping") {
+ Log.messageLog.poll(1, TimeUnit.SECONDS)
+ }
+ expect("ping") {
+ Log.messageLog.poll(1, TimeUnit.SECONDS)
}
intercept[RuntimeException] {
pingpong2 !! BinaryString("Die")
}
- Thread.sleep(500)
- expect("pingpingpingDIEDIEDIE") {
- Log.messageLog
+
+ expect("DIE") {
+ Log.messageLog.poll(1, TimeUnit.SECONDS)
+ }
+ expect("DIE") {
+ Log.messageLog.poll(1, TimeUnit.SECONDS)
+ }
+ expect("DIE") {
+ Log.messageLog.poll(1, TimeUnit.SECONDS)
}
expect("pong") {
(pingpong1 !! BinaryString("Ping")).getOrElse("nil")
}
- Thread.sleep(500)
+
expect("pong") {
(pingpong2 !! BinaryString("Ping")).getOrElse("nil")
}
- Thread.sleep(500)
+
expect("pong") {
(pingpong3 !! BinaryString("Ping")).getOrElse("nil")
}
- Thread.sleep(500)
- expect("pingpingpingDIEDIEDIEpingpingping") {
- Log.messageLog
+
+ expect("ping") {
+ Log.messageLog.poll(1, TimeUnit.SECONDS)
+ }
+ expect("ping") {
+ Log.messageLog.poll(1, TimeUnit.SECONDS)
+ }
+ expect("ping") {
+ Log.messageLog.poll(1, TimeUnit.SECONDS)
}
}
- /*
- @Test def shouldOneWayKillSingleActorOneForOne = {
- Logg.messageLog = ""
- val sup = getSingleActorOneForOneSupervisor
- sup.start
- Thread.sleep(500)
- pingpong1 ! BinaryString("Die")
- Thread.sleep(500)
- expect("DIE") {
- Logg.messageLog
- }
- }
-
- @Test def shouldOneWayCallKillCallSingleActorOneForOne = {
- Logg.messageLog = ""
- val sup = getSingleActorOneForOneSupervisor
- sup.start
- Thread.sleep(500)
- pingpong1 ! OneWay
- Thread.sleep(500)
- expect("oneway") {
- Logg.oneWayLog
- }
- pingpong1 ! BinaryString("Die")
- Thread.sleep(500)
- expect("DIE") {
- Logg.messageLog
- }
- pingpong1 ! OneWay
- Thread.sleep(500)
- expect("onewayoneway") {
- Logg.oneWayLog
- }
- }
-*/
-
- /*
- @Test def shouldOneWayKillSingleActorAllForOne = {
- Logg.messageLog = ""
- val sup = getSingleActorAllForOneSupervisor
- sup.start
- Thread.sleep(500)
- intercept[RuntimeException] {
- pingpong1 ! BinaryString("Die")
- }
- Thread.sleep(500)
- expect("DIE") {
- Logg.messageLog
- }
- }
-
- @Test def shouldOneWayCallKillCallSingleActorAllForOne = {
- Logg.messageLog = ""
- val sup = getSingleActorAllForOneSupervisor
- sup.start
- Thread.sleep(500)
- expect("pong") {
- (pingpong1 ! BinaryString("Ping")).getOrElse("nil")
- }
- Thread.sleep(500)
- expect("ping") {
- Logg.messageLog
- }
- intercept[RuntimeException] {
- pingpong1 ! BinaryString("Die")
- }
- Thread.sleep(500)
- expect("pingDIE") {
- Logg.messageLog
- }
- expect("pong") {
- (pingpong1 ! BinaryString("Ping")).getOrElse("nil")
- }
- Thread.sleep(500)
- expect("pingDIEping") {
- Logg.messageLog
- }
- }
-
- @Test def shouldOneWayKillMultipleActorsOneForOne = {
- Logg.messageLog = ""
- val sup = getMultipleActorsOneForOneConf
- sup.start
- Thread.sleep(500)
- intercept[RuntimeException] {
- pingpong3 ! BinaryString("Die")
- }
- Thread.sleep(500)
- expect("DIE") {
- Logg.messageLog
- }
- }
-
- def tesOneWayCallKillCallMultipleActorsOneForOne = {
- Logg.messageLog = ""
- val sup = getMultipleActorsOneForOneConf
- sup.start
- Thread.sleep(500)
- expect("pong") {
- (pingpong1 ! BinaryString("Ping")).getOrElse("nil")
- }
- Thread.sleep(500)
- expect("pong") {
- (pingpong2 ! BinaryString("Ping")).getOrElse("nil")
- }
- Thread.sleep(500)
- expect("pong") {
- (pingpong3 ! BinaryString("Ping")).getOrElse("nil")
- }
- Thread.sleep(500)
- expect("pingpingping") {
- Logg.messageLog
- }
- intercept[RuntimeException] {
- pingpong2 ! BinaryString("Die")
- }
- Thread.sleep(500)
- expect("pingpingpingDIE") {
- Logg.messageLog
- }
- expect("pong") {
- (pingpong1 ! BinaryString("Ping")).getOrElse("nil")
- }
- Thread.sleep(500)
- expect("pong") {
- (pingpong2 ! BinaryString("Ping")).getOrElse("nil")
- }
- Thread.sleep(500)
- expect("pong") {
- (pingpong3 ! BinaryString("Ping")).getOrElse("nil")
- }
- Thread.sleep(500)
- expect("pingpingpingDIEpingpingping") {
- Logg.messageLog
- }
- }
-
- @Test def shouldOneWayKillMultipleActorsAllForOne = {
- Logg.messageLog = ""
- val sup = getMultipleActorsAllForOneConf
- sup.start
- Thread.sleep(500)
- intercept[RuntimeException] {
- pingpong2 ! BinaryString("Die")
- }
- Thread.sleep(500)
- expect("DIEDIEDIE") {
- Logg.messageLog
- }
- }
-
- def tesOneWayCallKillCallMultipleActorsAllForOne = {
- Logg.messageLog = ""
- val sup = getMultipleActorsAllForOneConf
- sup.start
- Thread.sleep(500)
- expect("pong") {
- pingpong1 ! BinaryString("Ping")
- }
- Thread.sleep(500)
- expect("pong") {
- (pingpong2 ! BinaryString("Ping")).getOrElse("nil")
- }
- Thread.sleep(500)
- expect("pong") {
- (pingpong3 ! BinaryString("Ping")).getOrElse("nil")
- }
- Thread.sleep(500)
- expect("pingpingping") {
- Logg.messageLog
- }
- intercept[RuntimeException] {
- pingpong2 ! BinaryString("Die")
- }
- Thread.sleep(500)
- expect("pingpingpingDIEDIEDIE") {
- Logg.messageLog
- }
- expect("pong") {
- (pingpong1 ! BinaryString("Ping")).getOrElse("nil")
- }
- Thread.sleep(500)
- expect("pong") {
- (pingpong2 ! BinaryString("Ping")).getOrElse("nil")
- }
- Thread.sleep(500)
- expect("pong") {
- (pingpong3 ! BinaryString("Ping")).getOrElse("nil")
- }
- Thread.sleep(500)
- expect("pingpingpingDIEDIEDIEpingpingping") {
- Logg.messageLog
- }
- }
- */
-
- /*
- @Test def shouldNestedSupervisorsTerminateFirstLevelActorAllForOne = {
- Logg.messageLog = ""
- val sup = getNestedSupervisorsAllForOneConf
- sup.start
- intercept[RuntimeException] {
- pingpong1 !! BinaryString("Die")
- }
- Thread.sleep(500)
- expect("DIEDIEDIE") {
- Logg.messageLog
- }
- }
-*/
-
// =============================================
// Creat some supervisors with different configurations
diff --git a/akka-core/src/test/scala/ServerInitiatedRemoteActorTest.scala b/akka-core/src/test/scala/ServerInitiatedRemoteActorTest.scala
index 3fd540e542..64ff11bdd3 100644
--- a/akka-core/src/test/scala/ServerInitiatedRemoteActorTest.scala
+++ b/akka-core/src/test/scala/ServerInitiatedRemoteActorTest.scala
@@ -1,7 +1,6 @@
package se.scalablesolutions.akka.actor
-import java.util.concurrent.TimeUnit
-
+import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.scalatest.junit.JUnitSuite
import org.junit.{Test, Before, After}
@@ -14,8 +13,8 @@ object ServerInitiatedRemoteActorTest {
var server: RemoteServer = null
object Global {
- var oneWay = "nada"
- var remoteReply = "nada"
+ val oneWay = new CountDownLatch(1)
+ var remoteReply = new CountDownLatch(1)
}
class RemoteActorSpecActorUnidirectional extends Actor {
@@ -25,7 +24,7 @@ object ServerInitiatedRemoteActorTest {
def receive = {
case "OneWay" =>
println("================== ONEWAY")
- Global.oneWay = "received"
+ Global.oneWay.countDown
}
}
@@ -47,7 +46,7 @@ object ServerInitiatedRemoteActorTest {
case Send(actor: Actor) =>
actor ! "Hello"
case "World" =>
- Global.remoteReply = "replied"
+ Global.remoteReply.countDown
}
def send(actor: Actor) {
@@ -92,8 +91,8 @@ class ServerInitiatedRemoteActorTest extends JUnitSuite {
5000L,
HOSTNAME, PORT)
val result = actor ! "OneWay"
- Thread.sleep(1000)
- assert("received" === Global.oneWay)
+ Global.oneWay.await(1, TimeUnit.SECONDS)
+ assert(0 === Global.oneWay.getCount)
actor.stop
}
@@ -120,8 +119,8 @@ class ServerInitiatedRemoteActorTest extends JUnitSuite {
sender.setReplyToAddress(HOSTNAME, PORT)
sender.start
sender.send(actor)
- Thread.sleep(1000)
- assert("replied" === Global.remoteReply)
+ Global.remoteReply.await(1, TimeUnit.SECONDS)
+ assert(0 === Global.remoteReply.getCount)
actor.stop
}
diff --git a/akka-core/src/test/scala/SupervisorTest.scala b/akka-core/src/test/scala/SupervisorTest.scala
index 41e743533c..b46af44781 100644
--- a/akka-core/src/test/scala/SupervisorTest.scala
+++ b/akka-core/src/test/scala/SupervisorTest.scala
@@ -4,6 +4,7 @@
package se.scalablesolutions.akka.actor
+import _root_.java.util.concurrent.{TimeUnit, BlockingQueue, LinkedBlockingQueue}
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.dispatch.Dispatchers
import se.scalablesolutions.akka.{OneWay, Die, Ping}
@@ -17,15 +18,15 @@ import org.junit.Test
class SupervisorTest extends JUnitSuite {
import Actor.Sender.Self
- var messageLog: String = ""
- var oneWayLog: String = ""
-
+ var messageLog: BlockingQueue[String] = new LinkedBlockingQueue[String]
+ var oneWayLog: BlockingQueue[String] = new LinkedBlockingQueue[String]
+
var pingpong1: PingPong1Actor = _
var pingpong2: PingPong2Actor = _
var pingpong3: PingPong3Actor = _
@Test def shouldStartServer = {
- messageLog = ""
+ messageLog.clear
val sup = getSingleActorAllForOneSupervisor
sup.start
@@ -35,414 +36,265 @@ class SupervisorTest extends JUnitSuite {
}
@Test def shouldKillSingleActorOneForOne = {
- messageLog = ""
+ messageLog.clear
val sup = getSingleActorOneForOneSupervisor
sup.start
- Thread.sleep(500)
intercept[RuntimeException] {
pingpong1 !! Die
}
- Thread.sleep(500)
+
expect("DIE") {
- messageLog
+ messageLog.poll(1, TimeUnit.SECONDS)
}
}
@Test def shouldCallKillCallSingleActorOneForOne = {
- messageLog = ""
+ messageLog.clear
val sup = getSingleActorOneForOneSupervisor
sup.start
- Thread.sleep(500)
expect("pong") {
(pingpong1 !! Ping).getOrElse("nil")
}
- Thread.sleep(500)
+
expect("ping") {
- messageLog
+ messageLog.poll(1, TimeUnit.SECONDS)
}
intercept[RuntimeException] {
pingpong1 !! Die
}
- Thread.sleep(500)
- expect("pingDIE") {
- messageLog
+
+ expect("DIE") {
+ messageLog.poll(1, TimeUnit.SECONDS)
}
expect("pong") {
(pingpong1 !! Ping).getOrElse("nil")
}
- Thread.sleep(500)
- expect("pingDIEping") {
- messageLog
+
+ expect("ping") {
+ messageLog.poll(1, TimeUnit.SECONDS)
}
}
@Test def shouldKillSingleActorAllForOne = {
- messageLog = ""
+ messageLog.clear
val sup = getSingleActorAllForOneSupervisor
sup.start
- Thread.sleep(500)
intercept[RuntimeException] {
pingpong1 !! Die
}
- Thread.sleep(500)
+
expect("DIE") {
- messageLog
+ messageLog.poll(1, TimeUnit.SECONDS)
}
}
@Test def shouldCallKillCallSingleActorAllForOne = {
- messageLog = ""
+ messageLog.clear
val sup = getSingleActorAllForOneSupervisor
sup.start
- Thread.sleep(500)
expect("pong") {
(pingpong1 !! Ping).getOrElse("nil")
}
- Thread.sleep(500)
+
expect("ping") {
- messageLog
+ messageLog.poll(1, TimeUnit.SECONDS)
}
intercept[RuntimeException] {
pingpong1 !! Die
}
- Thread.sleep(500)
- expect("pingDIE") {
- messageLog
+
+ expect("DIE") {
+ messageLog.poll(1, TimeUnit.SECONDS)
}
expect("pong") {
(pingpong1 !! Ping).getOrElse("nil")
}
- Thread.sleep(500)
- expect("pingDIEping") {
- messageLog
+
+ expect("ping") {
+ messageLog.poll(1, TimeUnit.SECONDS)
}
}
@Test def shouldKillMultipleActorsOneForOne = {
- messageLog = ""
+ messageLog.clear
val sup = getMultipleActorsOneForOneConf
sup.start
- Thread.sleep(500)
intercept[RuntimeException] {
pingpong3 !! Die
}
- Thread.sleep(500)
+
expect("DIE") {
- messageLog
+ messageLog.poll(1, TimeUnit.SECONDS)
}
}
def tesCallKillCallMultipleActorsOneForOne = {
- messageLog = ""
+ messageLog.clear
val sup = getMultipleActorsOneForOneConf
sup.start
- Thread.sleep(500)
expect("pong") {
(pingpong1 !! Ping).getOrElse("nil")
}
- Thread.sleep(500)
+
expect("pong") {
(pingpong2 !! Ping).getOrElse("nil")
}
- Thread.sleep(500)
+
expect("pong") {
(pingpong3 !! Ping).getOrElse("nil")
}
- Thread.sleep(500)
- expect("pingpingping") {
- messageLog
+
+ expect("ping") {
+ messageLog.poll(1, TimeUnit.SECONDS)
+ }
+ expect("ping") {
+ messageLog.poll(1, TimeUnit.SECONDS)
+ }
+ expect("ping") {
+ messageLog.poll(1, TimeUnit.SECONDS)
}
intercept[RuntimeException] {
pingpong2 !! Die
}
- Thread.sleep(500)
- expect("pingpingpingDIE") {
- messageLog
+
+ expect("DIE") {
+ messageLog.poll(1, TimeUnit.SECONDS)
}
expect("pong") {
(pingpong1 !! Ping).getOrElse("nil")
}
- Thread.sleep(500)
+
expect("pong") {
(pingpong2 !! Ping).getOrElse("nil")
}
- Thread.sleep(500)
+
expect("pong") {
(pingpong3 !! Ping).getOrElse("nil")
}
- Thread.sleep(500)
- expect("pingpingpingDIEpingpingping") {
- messageLog
+
+ expect("ping") {
+ messageLog.poll(1, TimeUnit.SECONDS)
+ }
+ expect("ping") {
+ messageLog.poll(1, TimeUnit.SECONDS)
+ }
+ expect("ping") {
+ messageLog.poll(1, TimeUnit.SECONDS)
}
}
@Test def shouldKillMultipleActorsAllForOne = {
- messageLog = ""
+ messageLog.clear
val sup = getMultipleActorsAllForOneConf
sup.start
- Thread.sleep(500)
intercept[RuntimeException] {
pingpong2 !! Die
}
- Thread.sleep(500)
- expect("DIEDIEDIE") {
- messageLog
+
+ expect("DIE") {
+ messageLog.poll(1, TimeUnit.SECONDS)
+ }
+ expect("DIE") {
+ messageLog.poll(1, TimeUnit.SECONDS)
+ }
+ expect("DIE") {
+ messageLog.poll(1, TimeUnit.SECONDS)
}
}
def tesCallKillCallMultipleActorsAllForOne = {
- messageLog = ""
+ messageLog.clear
val sup = getMultipleActorsAllForOneConf
sup.start
- Thread.sleep(500)
expect("pong") {
(pingpong1 !! Ping).getOrElse("nil")
}
- Thread.sleep(500)
+
expect("pong") {
(pingpong2 !! Ping).getOrElse("nil")
}
- Thread.sleep(500)
+
expect("pong") {
(pingpong3 !! Ping).getOrElse("nil")
}
- Thread.sleep(500)
- expect("pingpingping") {
- messageLog
+
+ expect("ping") {
+ messageLog.poll(1, TimeUnit.SECONDS)
+ }
+ expect("ping") {
+ messageLog.poll(1, TimeUnit.SECONDS)
+ }
+ expect("ping") {
+ messageLog.poll(1, TimeUnit.SECONDS)
}
intercept[RuntimeException] {
pingpong2 !! Die
}
- Thread.sleep(500)
- expect("pingpingpingDIEDIEDIE") {
- messageLog
+
+ expect("DIE") {
+ messageLog.poll(1, TimeUnit.SECONDS)
+ }
+ expect("DIE") {
+ messageLog.poll(1, TimeUnit.SECONDS)
+ }
+ expect("DIE") {
+ messageLog.poll(1, TimeUnit.SECONDS)
}
expect("pong") {
(pingpong1 !! Ping).getOrElse("nil")
}
- Thread.sleep(500)
+
expect("pong") {
(pingpong2 !! Ping).getOrElse("nil")
}
- Thread.sleep(500)
+
expect("pong") {
(pingpong3 !! Ping).getOrElse("nil")
}
- Thread.sleep(500)
- expect("pingpingpingDIEDIEDIEpingpingping") {
- messageLog
+
+ expect("ping") {
+ messageLog.poll(1, TimeUnit.SECONDS)
+ }
+ expect("ping") {
+ messageLog.poll(1, TimeUnit.SECONDS)
+ }
+ expect("ping") {
+ messageLog.poll(1, TimeUnit.SECONDS)
}
}
@Test def shouldOneWayKillSingleActorOneForOne = {
- messageLog = ""
+ messageLog.clear
val sup = getSingleActorOneForOneSupervisor
sup.start
- Thread.sleep(500)
pingpong1 ! Die
- Thread.sleep(500)
+
expect("DIE") {
- messageLog
+ messageLog.poll(1, TimeUnit.SECONDS)
}
}
@Test def shouldOneWayCallKillCallSingleActorOneForOne = {
- messageLog = ""
+ messageLog.clear
val sup = getSingleActorOneForOneSupervisor
sup.start
- Thread.sleep(500)
pingpong1 ! OneWay
- Thread.sleep(500)
+
expect("oneway") {
- oneWayLog
+ oneWayLog.poll(1, TimeUnit.SECONDS)
}
pingpong1 ! Die
- Thread.sleep(500)
+
expect("DIE") {
- messageLog
+ messageLog.poll(1, TimeUnit.SECONDS)
}
pingpong1 ! OneWay
- Thread.sleep(500)
- expect("onewayoneway") {
- oneWayLog
+
+ expect("oneway") {
+ oneWayLog.poll(1, TimeUnit.SECONDS)
}
}
- /*
- @Test def shouldOneWayKillSingleActorAllForOne = {
- messageLog = ""
- val sup = getSingleActorAllForOneSupervisor
- sup.start
- Thread.sleep(500)
- intercept[RuntimeException] {
- pingpong1 ! Die
- }
- Thread.sleep(500)
- expect("DIE") {
- messageLog
- }
- }
-
- @Test def shouldOneWayCallKillCallSingleActorAllForOne = {
- messageLog = ""
- val sup = getSingleActorAllForOneSupervisor
- sup.start
- Thread.sleep(500)
- expect("pong") {
- (pingpong1 ! Ping).getOrElse("nil")
- }
- Thread.sleep(500)
- expect("ping") {
- messageLog
- }
- intercept[RuntimeException] {
- pingpong1 ! Die
- }
- Thread.sleep(500)
- expect("pingDIE") {
- messageLog
- }
- expect("pong") {
- (pingpong1 ! Ping).getOrElse("nil")
- }
- Thread.sleep(500)
- expect("pingDIEping") {
- messageLog
- }
- }
-
- @Test def shouldOneWayKillMultipleActorsOneForOne = {
- messageLog = ""
- val sup = getMultipleActorsOneForOneConf
- sup.start
- Thread.sleep(500)
- intercept[RuntimeException] {
- pingpong3 ! Die
- }
- Thread.sleep(500)
- expect("DIE") {
- messageLog
- }
- }
-
- def tesOneWayCallKillCallMultipleActorsOneForOne = {
- messageLog = ""
- val sup = getMultipleActorsOneForOneConf
- sup.start
- Thread.sleep(500)
- expect("pong") {
- (pingpong1 ! Ping).getOrElse("nil")
- }
- Thread.sleep(500)
- expect("pong") {
- (pingpong2 ! Ping).getOrElse("nil")
- }
- Thread.sleep(500)
- expect("pong") {
- (pingpong3 ! Ping).getOrElse("nil")
- }
- Thread.sleep(500)
- expect("pingpingping") {
- messageLog
- }
- intercept[RuntimeException] {
- pingpong2 ! Die
- }
- Thread.sleep(500)
- expect("pingpingpingDIE") {
- messageLog
- }
- expect("pong") {
- (pingpong1 ! Ping).getOrElse("nil")
- }
- Thread.sleep(500)
- expect("pong") {
- (pingpong2 ! Ping).getOrElse("nil")
- }
- Thread.sleep(500)
- expect("pong") {
- (pingpong3 ! Ping).getOrElse("nil")
- }
- Thread.sleep(500)
- expect("pingpingpingDIEpingpingping") {
- messageLog
- }
- }
-
- @Test def shouldOneWayKillMultipleActorsAllForOne = {
- messageLog = ""
- val sup = getMultipleActorsAllForOneConf
- sup.start
- Thread.sleep(500)
- intercept[RuntimeException] {
- pingpong2 ! Die
- }
- Thread.sleep(500)
- expect("DIEDIEDIE") {
- messageLog
- }
- }
-
- def tesOneWayCallKillCallMultipleActorsAllForOne = {
- messageLog = ""
- val sup = getMultipleActorsAllForOneConf
- sup.start
- Thread.sleep(500)
- expect("pong") {
- pingpong1 ! Ping
- }
- Thread.sleep(500)
- expect("pong") {
- (pingpong2 ! Ping).getOrElse("nil")
- }
- Thread.sleep(500)
- expect("pong") {
- (pingpong3 ! Ping).getOrElse("nil")
- }
- Thread.sleep(500)
- expect("pingpingping") {
- messageLog
- }
- intercept[RuntimeException] {
- pingpong2 ! Die
- }
- Thread.sleep(500)
- expect("pingpingpingDIEDIEDIE") {
- messageLog
- }
- expect("pong") {
- (pingpong1 ! Ping).getOrElse("nil")
- }
- Thread.sleep(500)
- expect("pong") {
- (pingpong2 ! Ping).getOrElse("nil")
- }
- Thread.sleep(500)
- expect("pong") {
- (pingpong3 ! Ping).getOrElse("nil")
- }
- Thread.sleep(500)
- expect("pingpingpingDIEDIEDIEpingpingping") {
- messageLog
- }
- }
- */
-
- /*
- @Test def shouldNestedSupervisorsTerminateFirstLevelActorAllForOne = {
- messageLog = ""
- val sup = getNestedSupervisorsAllForOneConf
- sup.start
- intercept[RuntimeException] {
- pingpong1 !! Die
- }
- Thread.sleep(500)
- expect("DIEDIEDIE") {
- messageLog
- }
- }
-*/
-
// =============================================
// Creat some supervisors with different configurations
@@ -547,44 +399,44 @@ class SupervisorTest extends JUnitSuite {
class PingPong1Actor extends Actor {
def receive = {
case Ping =>
- messageLog += "ping"
+ messageLog.put("ping")
reply("pong")
case OneWay =>
- oneWayLog += "oneway"
+ oneWayLog.put("oneway")
case Die =>
throw new RuntimeException("DIE")
}
override protected def postRestart(reason: Throwable) {
- messageLog += reason.getMessage
+ messageLog.put(reason.getMessage)
}
}
class PingPong2Actor extends Actor {
def receive = {
case Ping =>
- messageLog += "ping"
+ messageLog.put("ping")
reply("pong")
case Die =>
throw new RuntimeException("DIE")
}
override protected def postRestart(reason: Throwable) {
- messageLog += reason.getMessage
+ messageLog.put(reason.getMessage)
}
}
class PingPong3Actor extends Actor {
def receive = {
case Ping =>
- messageLog += "ping"
+ messageLog.put("ping")
reply("pong")
case Die =>
throw new RuntimeException("DIE")
}
override protected def postRestart(reason: Throwable) {
- messageLog += reason.getMessage
+ messageLog.put(reason.getMessage)
}
}
}
diff --git a/akka-core/src/test/scala/ThreadBasedActorTest.scala b/akka-core/src/test/scala/ThreadBasedActorTest.scala
index 403dbc0683..902ef78364 100644
--- a/akka-core/src/test/scala/ThreadBasedActorTest.scala
+++ b/akka-core/src/test/scala/ThreadBasedActorTest.scala
@@ -1,7 +1,6 @@
package se.scalablesolutions.akka.actor
-import java.util.concurrent.TimeUnit
-
+import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.scalatest.junit.JUnitSuite
import org.junit.Test
@@ -24,17 +23,17 @@ class ThreadBasedActorTest extends JUnitSuite {
}
@Test def shouldSendOneWay = {
- var oneWay = "nada"
+ var oneWay = new CountDownLatch(1)
val actor = new Actor {
dispatcher = Dispatchers.newThreadBasedDispatcher(this)
def receive = {
- case "OneWay" => oneWay = "received"
+ case "OneWay" => oneWay.countDown
}
}
actor.start
val result = actor ! "OneWay"
- Thread.sleep(1000)
- assert("received" === oneWay)
+ oneWay.await(1, TimeUnit.SECONDS)
+ assert(0 === oneWay.getCount)
actor.stop
}