Merge branch 'master' of git@github.com:jboner/akka
This commit is contained in:
commit
183fcfbb9a
16 changed files with 339 additions and 632 deletions
|
|
@ -220,6 +220,7 @@ object Actor extends Logging {
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
trait Actor extends TransactionManagement with Logging {
|
||||
implicit protected val self: Option[Actor] = Some(this)
|
||||
// Only mutable for RemoteServer in order to maintain identity across nodes
|
||||
private[akka] var _uuid = UUID.newUuid.toString
|
||||
|
||||
|
|
@ -430,10 +431,6 @@ trait Actor extends TransactionManagement with Logging {
|
|||
def start: Actor = synchronized {
|
||||
if (_isShutDown) throw new IllegalStateException("Can't restart an actor that has been shut down with 'exit'")
|
||||
if (!_isRunning) {
|
||||
if (messageDispatcher.isShutdown &&
|
||||
messageDispatcher.isInstanceOf[Dispatchers.globalExecutorBasedEventDrivenDispatcher.type]) {
|
||||
messageDispatcher.asInstanceOf[ExecutorBasedEventDrivenDispatcher].init
|
||||
}
|
||||
messageDispatcher.register(this)
|
||||
messageDispatcher.start
|
||||
_isRunning = true
|
||||
|
|
@ -456,7 +453,6 @@ trait Actor extends TransactionManagement with Logging {
|
|||
def stop = synchronized {
|
||||
if (_isRunning) {
|
||||
messageDispatcher.unregister(this)
|
||||
if (messageDispatcher.canBeShutDown) messageDispatcher.shutdown // shut down in the dispatcher's references is zero
|
||||
_isRunning = false
|
||||
_isShutDown = true
|
||||
shutdown
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ abstract class AbstractReactorBasedEventDrivenDispatcher(val name: String) exten
|
|||
|
||||
override def unregister(actor: Actor) = synchronized {
|
||||
messageInvokers.remove(actor)
|
||||
super.register(actor)
|
||||
super.unregister(actor)
|
||||
}
|
||||
|
||||
def shutdown = if (active) {
|
||||
|
|
|
|||
|
|
@ -39,7 +39,14 @@ import se.scalablesolutions.akka.actor.Actor
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object Dispatchers {
|
||||
object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global")
|
||||
object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global") {
|
||||
override def register(actor : Actor) = {
|
||||
if (isShutdown)
|
||||
init
|
||||
super.register(actor)
|
||||
}
|
||||
}
|
||||
|
||||
object globalReactorBasedSingleThreadEventDrivenDispatcher extends ReactorBasedSingleThreadEventDrivenDispatcher("global")
|
||||
object globalReactorBasedThreadPoolEventDrivenDispatcher extends ReactorBasedThreadPoolEventDrivenDispatcher("global")
|
||||
|
||||
|
|
|
|||
|
|
@ -63,7 +63,11 @@ trait MessageDispatcher extends Logging {
|
|||
def start
|
||||
def shutdown
|
||||
def register(actor: Actor) = references.put(actor.uuid, actor)
|
||||
def unregister(actor: Actor) = references.remove(actor.uuid)
|
||||
def unregister(actor: Actor) = {
|
||||
references.remove(actor.uuid)
|
||||
if (canBeShutDown)
|
||||
shutdown // shut down in the dispatcher's references is zero
|
||||
}
|
||||
def canBeShutDown: Boolean = references.isEmpty
|
||||
def isShutdown: Boolean
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
package se.scalablesolutions.akka.actor
|
||||
|
||||
import java.util.concurrent.{TimeUnit, CountDownLatch}
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
|
||||
|
|
@ -9,6 +10,7 @@ class ActorFireForgetRequestReplyTest extends JUnitSuite {
|
|||
|
||||
object state {
|
||||
var s = "NIL"
|
||||
val finished = new CountDownLatch(1)
|
||||
}
|
||||
|
||||
class ReplyActor extends Actor {
|
||||
|
|
@ -25,9 +27,15 @@ class ActorFireForgetRequestReplyTest extends JUnitSuite {
|
|||
|
||||
def receive = {
|
||||
case "Init" => replyActor ! "Send"
|
||||
case "Reply" => state.s = "Reply"
|
||||
case "Reply" => {
|
||||
state.s = "Reply"
|
||||
state.finished.countDown
|
||||
}
|
||||
case "InitImplicit" => replyActor ! "SendImplicit"
|
||||
case "ReplyImplicit" => state.s = "ReplyImplicit"
|
||||
case "ReplyImplicit" => {
|
||||
state.s = "ReplyImplicit"
|
||||
state.finished.countDown
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -40,7 +48,8 @@ class ActorFireForgetRequestReplyTest extends JUnitSuite {
|
|||
val senderActor = new SenderActor(replyActor)
|
||||
senderActor.start
|
||||
senderActor ! "Init"
|
||||
Thread.sleep(1000)
|
||||
state.finished.await(1, TimeUnit.SECONDS)
|
||||
assert(0 === state.finished.getCount)
|
||||
assert("Reply" === state.s)
|
||||
}
|
||||
|
||||
|
|
@ -53,7 +62,8 @@ class ActorFireForgetRequestReplyTest extends JUnitSuite {
|
|||
val senderActor = new SenderActor(replyActor)
|
||||
senderActor.start
|
||||
senderActor ! "InitImplicit"
|
||||
Thread.sleep(1000)
|
||||
state.finished.await(1, TimeUnit.SECONDS)
|
||||
assert(0 === state.finished.getCount)
|
||||
assert("ReplyImplicit" === state.s)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
package se.scalablesolutions.akka.actor
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.{CountDownLatch, TimeUnit}
|
||||
import junit.framework.TestCase
|
||||
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
|
|
@ -10,15 +10,15 @@ import se.scalablesolutions.akka.remote.{RemoteServer, RemoteClient}
|
|||
import se.scalablesolutions.akka.dispatch.Dispatchers
|
||||
|
||||
object Global {
|
||||
var oneWay = "nada"
|
||||
var remoteReply = "nada"
|
||||
val oneWay = new CountDownLatch(1)
|
||||
val remoteReply = new CountDownLatch(1)
|
||||
}
|
||||
class RemoteActorSpecActorUnidirectional extends Actor {
|
||||
dispatcher = Dispatchers.newThreadBasedDispatcher(this)
|
||||
|
||||
def receive = {
|
||||
case "OneWay" =>
|
||||
Global.oneWay = "received"
|
||||
Global.oneWay.countDown
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -38,7 +38,7 @@ class RemoteActorSpecActorAsyncSender extends Actor {
|
|||
case Send(actor: Actor) =>
|
||||
actor ! "Hello"
|
||||
case "World" =>
|
||||
Global.remoteReply = "replied"
|
||||
Global.remoteReply.countDown
|
||||
}
|
||||
|
||||
def send(actor: Actor) {
|
||||
|
|
@ -85,8 +85,8 @@ class ClientInitiatedRemoteActorTest extends JUnitSuite {
|
|||
actor.makeRemote(HOSTNAME, PORT1)
|
||||
actor.start
|
||||
val result = actor ! "OneWay"
|
||||
Thread.sleep(1000)
|
||||
assert("received" === Global.oneWay)
|
||||
Global.oneWay.await(1, TimeUnit.SECONDS)
|
||||
assert(0 === Global.oneWay.getCount)
|
||||
actor.stop
|
||||
}
|
||||
|
||||
|
|
@ -111,8 +111,8 @@ class ClientInitiatedRemoteActorTest extends JUnitSuite {
|
|||
sender.setReplyToAddress(HOSTNAME, PORT1)
|
||||
sender.start
|
||||
sender.send(actor)
|
||||
Thread.sleep(1000)
|
||||
assert("replied" === Global.remoteReply)
|
||||
Global.remoteReply.await(1, TimeUnit.SECONDS)
|
||||
assert(0 === Global.remoteReply.getCount)
|
||||
actor.stop
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,7 +1,6 @@
|
|||
package se.scalablesolutions.akka.actor
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import java.util.concurrent.{CountDownLatch, TimeUnit}
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
import se.scalablesolutions.akka.dispatch.Dispatchers
|
||||
|
|
@ -22,17 +21,17 @@ class ExecutorBasedEventDrivenDispatcherActorTest extends JUnitSuite {
|
|||
}
|
||||
|
||||
@Test def shouldSendOneWay = {
|
||||
var oneWay = "nada"
|
||||
val oneWay = new CountDownLatch(1)
|
||||
val actor = new Actor {
|
||||
dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(uuid)
|
||||
def receive = {
|
||||
case "OneWay" => oneWay = "received"
|
||||
case "OneWay" => oneWay.countDown
|
||||
}
|
||||
}
|
||||
actor.start
|
||||
val result = actor ! "OneWay"
|
||||
Thread.sleep(1000)
|
||||
assert("received" === oneWay)
|
||||
oneWay.await(1, TimeUnit.SECONDS)
|
||||
assert(0 === oneWay.getCount)
|
||||
actor.stop
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -22,7 +22,6 @@ class ExecutorBasedEventDrivenWorkStealingDispatcherTest extends JUnitSuite with
|
|||
Thread.sleep(delay)
|
||||
invocationCount += 1
|
||||
finishedCounter.countDown
|
||||
// println(id + " processed " + x)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
package se.scalablesolutions.akka.actor
|
||||
|
||||
import java.util.concurrent.{TimeUnit, CountDownLatch}
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
|
||||
|
|
@ -8,12 +9,15 @@ class ForwardActorTest extends JUnitSuite {
|
|||
|
||||
object ForwardState {
|
||||
var sender: Actor = null
|
||||
var result: String = "nada"
|
||||
val finished = new CountDownLatch(1)
|
||||
}
|
||||
|
||||
class ReceiverActor extends Actor {
|
||||
def receive = {
|
||||
case "SendBang" => ForwardState.sender = sender.get
|
||||
case "SendBang" => {
|
||||
ForwardState.sender = sender.get
|
||||
ForwardState.finished.countDown
|
||||
}
|
||||
case "SendBangBang" => reply("SendBangBang")
|
||||
}
|
||||
}
|
||||
|
|
@ -40,7 +44,10 @@ class ForwardActorTest extends JUnitSuite {
|
|||
class BangBangSenderActor extends Actor {
|
||||
val forwardActor = new ForwardActor
|
||||
forwardActor.start
|
||||
ForwardState.result = (forwardActor !! "SendBangBang").getOrElse("nada")
|
||||
(forwardActor !! "SendBangBang") match {
|
||||
case Some(_) => {ForwardState.finished.countDown}
|
||||
case None => {}
|
||||
}
|
||||
def receive = {
|
||||
case _ => {}
|
||||
}
|
||||
|
|
@ -50,7 +57,8 @@ class ForwardActorTest extends JUnitSuite {
|
|||
def shouldForwardActorReferenceWhenInvokingForwardOnBang = {
|
||||
val senderActor = new BangSenderActor
|
||||
senderActor.start
|
||||
Thread.sleep(1000)
|
||||
ForwardState.finished.await(1, TimeUnit.SECONDS)
|
||||
assert(0 === ForwardState.finished.getCount)
|
||||
assert(ForwardState.sender ne null)
|
||||
assert(senderActor === ForwardState.sender)
|
||||
}
|
||||
|
|
@ -59,7 +67,7 @@ class ForwardActorTest extends JUnitSuite {
|
|||
def shouldForwardActorReferenceWhenInvokingForwardOnBangBang = {
|
||||
val senderActor = new BangBangSenderActor
|
||||
senderActor.start
|
||||
Thread.sleep(1000)
|
||||
assert(ForwardState.result === "SendBangBang")
|
||||
ForwardState.finished.await(1, TimeUnit.SECONDS)
|
||||
assert(0 === ForwardState.finished.getCount)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
package se.scalablesolutions.akka.actor
|
||||
|
||||
import java.util.concurrent.{TimeUnit, CountDownLatch}
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
|
||||
|
|
@ -22,10 +23,13 @@ case class SetRefStateOneWay(key: String)
|
|||
case class SuccessOneWay(key: String, value: String)
|
||||
case class FailureOneWay(key: String, value: String, failer: Actor)
|
||||
|
||||
class InMemStatefulActor extends Actor {
|
||||
class InMemStatefulActor(expectedInvocationCount:Int) extends Actor {
|
||||
def this() = this(0)
|
||||
timeout = 5000
|
||||
makeTransactionRequired
|
||||
|
||||
val notifier = new CountDownLatch(expectedInvocationCount)
|
||||
|
||||
private lazy val mapState = TransactionalState.newMap[String, String]
|
||||
private lazy val vectorState = TransactionalState.newVector[String]
|
||||
private lazy val refState = TransactionalState.newRef[String]
|
||||
|
|
@ -33,46 +37,59 @@ class InMemStatefulActor extends Actor {
|
|||
def receive = {
|
||||
case GetMapState(key) =>
|
||||
reply(mapState.get(key).get)
|
||||
notifier.countDown
|
||||
case GetVectorSize =>
|
||||
reply(vectorState.length.asInstanceOf[AnyRef])
|
||||
notifier.countDown
|
||||
case GetRefState =>
|
||||
reply(refState.get.get)
|
||||
notifier.countDown
|
||||
case SetMapState(key, msg) =>
|
||||
mapState.put(key, msg)
|
||||
reply(msg)
|
||||
notifier.countDown
|
||||
case SetVectorState(msg) =>
|
||||
vectorState.add(msg)
|
||||
reply(msg)
|
||||
notifier.countDown
|
||||
case SetRefState(msg) =>
|
||||
refState.swap(msg)
|
||||
reply(msg)
|
||||
notifier.countDown
|
||||
case Success(key, msg) =>
|
||||
mapState.put(key, msg)
|
||||
vectorState.add(msg)
|
||||
refState.swap(msg)
|
||||
reply(msg)
|
||||
notifier.countDown
|
||||
case Failure(key, msg, failer) =>
|
||||
mapState.put(key, msg)
|
||||
vectorState.add(msg)
|
||||
refState.swap(msg)
|
||||
failer !! "Failure"
|
||||
reply(msg)
|
||||
notifier.countDown
|
||||
|
||||
case SetMapStateOneWay(key, msg) =>
|
||||
mapState.put(key, msg)
|
||||
notifier.countDown
|
||||
case SetVectorStateOneWay(msg) =>
|
||||
vectorState.add(msg)
|
||||
notifier.countDown
|
||||
case SetRefStateOneWay(msg) =>
|
||||
refState.swap(msg)
|
||||
notifier.countDown
|
||||
case SuccessOneWay(key, msg) =>
|
||||
mapState.put(key, msg)
|
||||
vectorState.add(msg)
|
||||
refState.swap(msg)
|
||||
notifier.countDown
|
||||
case FailureOneWay(key, msg, failer) =>
|
||||
mapState.put(key, msg)
|
||||
vectorState.add(msg)
|
||||
refState.swap(msg)
|
||||
failer ! "Failure"
|
||||
notifier.countDown
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -84,18 +101,18 @@ class InMemFailerActor extends Actor {
|
|||
throw new RuntimeException("expected")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class InMemoryActorTest extends JUnitSuite {
|
||||
import Actor.Sender.Self
|
||||
|
||||
@Test
|
||||
def shouldOneWayMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
|
||||
val stateful = new InMemStatefulActor
|
||||
val stateful = new InMemStatefulActor(2)
|
||||
stateful.start
|
||||
stateful ! SetMapStateOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state
|
||||
Thread.sleep(1000)
|
||||
stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
|
||||
Thread.sleep(1000)
|
||||
stateful.notifier.await(1, TimeUnit.SECONDS)
|
||||
assert(0 === stateful.notifier.getCount)
|
||||
assert("new state" === (stateful !! GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")).get)
|
||||
}
|
||||
|
||||
|
|
@ -110,14 +127,14 @@ class InMemoryActorTest extends JUnitSuite {
|
|||
|
||||
@Test
|
||||
def shouldOneWayMapShouldRollbackStateForStatefulServerInCaseOfFailure = {
|
||||
val stateful = new InMemStatefulActor
|
||||
val stateful = new InMemStatefulActor(2)
|
||||
stateful.start
|
||||
val failer = new InMemFailerActor
|
||||
failer.start
|
||||
stateful ! SetMapStateOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state
|
||||
Thread.sleep(1000)
|
||||
stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
|
||||
Thread.sleep(1000)
|
||||
stateful.notifier.await(1, TimeUnit.SECONDS)
|
||||
assert(0 === stateful.notifier.getCount)
|
||||
assert("init" === (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).get) // check that state is == init state
|
||||
}
|
||||
|
||||
|
|
@ -137,12 +154,12 @@ class InMemoryActorTest extends JUnitSuite {
|
|||
|
||||
@Test
|
||||
def shouldOneWayVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
|
||||
val stateful = new InMemStatefulActor
|
||||
val stateful = new InMemStatefulActor(2)
|
||||
stateful.start
|
||||
stateful ! SetVectorStateOneWay("init") // set init state
|
||||
Thread.sleep(1000)
|
||||
stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
|
||||
Thread.sleep(1000)
|
||||
stateful.notifier.await(1, TimeUnit.SECONDS)
|
||||
assert(0 === stateful.notifier.getCount)
|
||||
assert(2 === (stateful !! GetVectorSize).get)
|
||||
}
|
||||
|
||||
|
|
@ -157,14 +174,15 @@ class InMemoryActorTest extends JUnitSuite {
|
|||
|
||||
@Test
|
||||
def shouldOneWayVectorShouldRollbackStateForStatefulServerInCaseOfFailure = {
|
||||
val stateful = new InMemStatefulActor
|
||||
val stateful = new InMemStatefulActor(2)
|
||||
stateful.start
|
||||
stateful ! SetVectorStateOneWay("init") // set init state
|
||||
Thread.sleep(1000)
|
||||
val failer = new InMemFailerActor
|
||||
failer.start
|
||||
stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
|
||||
Thread.sleep(1000)
|
||||
stateful.notifier.await(1, TimeUnit.SECONDS)
|
||||
assert(0 === stateful.notifier.getCount)
|
||||
assert(1 === (stateful !! GetVectorSize).get)
|
||||
}
|
||||
|
||||
|
|
@ -184,12 +202,12 @@ class InMemoryActorTest extends JUnitSuite {
|
|||
|
||||
@Test
|
||||
def shouldOneWayRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
|
||||
val stateful = new InMemStatefulActor
|
||||
val stateful = new InMemStatefulActor(2)
|
||||
stateful.start
|
||||
stateful ! SetRefStateOneWay("init") // set init state
|
||||
Thread.sleep(1000)
|
||||
stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
|
||||
Thread.sleep(1000)
|
||||
stateful.notifier.await(1, TimeUnit.SECONDS)
|
||||
assert(0 === stateful.notifier.getCount)
|
||||
assert("new state" === (stateful !! GetRefState).get)
|
||||
}
|
||||
|
||||
|
|
@ -204,14 +222,15 @@ class InMemoryActorTest extends JUnitSuite {
|
|||
|
||||
@Test
|
||||
def shouldOneWayRefShouldRollbackStateForStatefulServerInCaseOfFailure = {
|
||||
val stateful = new InMemStatefulActor
|
||||
val stateful = new InMemStatefulActor(2)
|
||||
stateful.start
|
||||
stateful ! SetRefStateOneWay("init") // set init state
|
||||
Thread.sleep(1000)
|
||||
val failer = new InMemFailerActor
|
||||
failer.start
|
||||
stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
|
||||
Thread.sleep(1000)
|
||||
stateful.notifier.await(1, TimeUnit.SECONDS)
|
||||
assert(0 === stateful.notifier.getCount)
|
||||
assert("init" === (stateful !! (GetRefState, 1000000)).get) // check that state is == init state
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,7 +1,6 @@
|
|||
package se.scalablesolutions.akka.actor
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import java.util.concurrent.{CountDownLatch, TimeUnit}
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
|
||||
|
|
@ -24,17 +23,17 @@ class ReactorBasedSingleThreadEventDrivenDispatcherActorTest extends JUnitSuite
|
|||
}
|
||||
|
||||
@Test def shouldSendOneWay = {
|
||||
var oneWay = "nada"
|
||||
val oneWay = new CountDownLatch(1)
|
||||
val actor = new Actor {
|
||||
dispatcher = Dispatchers.newReactorBasedSingleThreadEventDrivenDispatcher(uuid)
|
||||
def receive = {
|
||||
case "OneWay" => oneWay = "received"
|
||||
case "OneWay" => oneWay.countDown
|
||||
}
|
||||
}
|
||||
actor.start
|
||||
val result = actor ! "OneWay"
|
||||
Thread.sleep(1000)
|
||||
assert("received" === oneWay)
|
||||
oneWay.await(1, TimeUnit.SECONDS)
|
||||
assert(0 === oneWay.getCount)
|
||||
actor.stop
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,7 +1,6 @@
|
|||
package se.scalablesolutions.akka.actor
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import java.util.concurrent.{CountDownLatch, TimeUnit}
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
import se.scalablesolutions.akka.dispatch.Dispatchers
|
||||
|
|
@ -22,17 +21,17 @@ class ReactorBasedThreadPoolEventDrivenDispatcherActorTest extends JUnitSuite {
|
|||
}
|
||||
|
||||
@Test def shouldSendOneWay = {
|
||||
var oneWay = "nada"
|
||||
val oneWay = new CountDownLatch(1)
|
||||
val actor = new Actor {
|
||||
dispatcher = Dispatchers.newReactorBasedThreadPoolEventDrivenDispatcher(uuid)
|
||||
def receive = {
|
||||
case "OneWay" => oneWay = "received"
|
||||
case "OneWay" => oneWay.countDown
|
||||
}
|
||||
}
|
||||
actor.start
|
||||
val result = actor ! "OneWay"
|
||||
Thread.sleep(1000)
|
||||
assert("received" === oneWay)
|
||||
oneWay.await(1, TimeUnit.SECONDS)
|
||||
assert(0 === oneWay.getCount)
|
||||
actor.stop
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@
|
|||
|
||||
package se.scalablesolutions.akka.actor
|
||||
|
||||
import _root_.java.util.concurrent.{LinkedBlockingQueue, TimeUnit, BlockingQueue}
|
||||
import se.scalablesolutions.akka.serialization.BinaryString
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.remote.{RemoteNode, RemoteServer}
|
||||
|
|
@ -14,7 +15,7 @@ import org.scalatest.junit.JUnitSuite
|
|||
import org.junit.Test
|
||||
|
||||
object Log {
|
||||
var messageLog: String = ""
|
||||
var messageLog: BlockingQueue[String] = new LinkedBlockingQueue[String]
|
||||
var oneWayLog: String = ""
|
||||
}
|
||||
|
||||
|
|
@ -22,7 +23,7 @@ object Log {
|
|||
dispatcher = Dispatchers.newThreadBasedDispatcher(this)
|
||||
def receive = {
|
||||
case BinaryString("Ping") =>
|
||||
Log.messageLog += "ping"
|
||||
Log.messageLog.put("ping")
|
||||
reply("pong")
|
||||
|
||||
case OneWay =>
|
||||
|
|
@ -33,7 +34,7 @@ object Log {
|
|||
}
|
||||
|
||||
override protected def postRestart(reason: Throwable) {
|
||||
Log.messageLog += reason.getMessage
|
||||
Log.messageLog.put(reason.getMessage)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -41,14 +42,14 @@ object Log {
|
|||
dispatcher = Dispatchers.newThreadBasedDispatcher(this)
|
||||
def receive = {
|
||||
case BinaryString("Ping") =>
|
||||
Log.messageLog += "ping"
|
||||
Log.messageLog.put("ping")
|
||||
reply("pong")
|
||||
case BinaryString("Die") =>
|
||||
throw new RuntimeException("DIE")
|
||||
}
|
||||
|
||||
override protected def postRestart(reason: Throwable) {
|
||||
Log.messageLog += reason.getMessage
|
||||
Log.messageLog.put(reason.getMessage)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -56,14 +57,14 @@ object Log {
|
|||
dispatcher = Dispatchers.newThreadBasedDispatcher(this)
|
||||
def receive = {
|
||||
case BinaryString("Ping") =>
|
||||
Log.messageLog += "ping"
|
||||
Log.messageLog.put("ping")
|
||||
reply("pong")
|
||||
case BinaryString("Die") =>
|
||||
throw new RuntimeException("DIE")
|
||||
}
|
||||
|
||||
override protected def postRestart(reason: Throwable) {
|
||||
Log.messageLog += reason.getMessage
|
||||
Log.messageLog.put(reason.getMessage)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -87,7 +88,7 @@ class RemoteSupervisorTest extends JUnitSuite {
|
|||
var pingpong3: RemotePingPong3Actor = _
|
||||
|
||||
@Test def shouldStartServer = {
|
||||
Log.messageLog = ""
|
||||
Log.messageLog.clear
|
||||
val sup = getSingleActorAllForOneSupervisor
|
||||
sup.start
|
||||
|
||||
|
|
@ -97,416 +98,232 @@ class RemoteSupervisorTest extends JUnitSuite {
|
|||
}
|
||||
|
||||
@Test def shouldKillSingleActorOneForOne = {
|
||||
Log.messageLog = ""
|
||||
Log.messageLog.clear
|
||||
val sup = getSingleActorOneForOneSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
intercept[RuntimeException] {
|
||||
pingpong1 !! BinaryString("Die")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("DIE") {
|
||||
Log.messageLog
|
||||
Log.messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
}
|
||||
|
||||
@Test def shouldCallKillCallSingleActorOneForOne = {
|
||||
Log.messageLog = ""
|
||||
Log.messageLog.clear
|
||||
val sup = getSingleActorOneForOneSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
(pingpong1 !! BinaryString("Ping")).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
|
||||
expect("ping") {
|
||||
Log.messageLog
|
||||
Log.messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
intercept[RuntimeException] {
|
||||
pingpong1 !! BinaryString("Die")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pingDIE") {
|
||||
Log.messageLog
|
||||
|
||||
expect("DIE") {
|
||||
Log.messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("pong") {
|
||||
(pingpong1 !! BinaryString("Ping")).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pingDIEping") {
|
||||
Log.messageLog
|
||||
|
||||
expect("ping") {
|
||||
Log.messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
}
|
||||
|
||||
@Test def shouldKillSingleActorAllForOne = {
|
||||
Log.messageLog = ""
|
||||
Log.messageLog.clear
|
||||
val sup = getSingleActorAllForOneSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
intercept[RuntimeException] {
|
||||
pingpong1 !! BinaryString("Die")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
|
||||
expect("DIE") {
|
||||
Log.messageLog
|
||||
Log.messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
}
|
||||
|
||||
@Test def shouldCallKillCallSingleActorAllForOne = {
|
||||
Log.messageLog = ""
|
||||
Log.messageLog.clear
|
||||
val sup = getSingleActorAllForOneSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
(pingpong1 !! BinaryString("Ping")).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
|
||||
expect("ping") {
|
||||
Log.messageLog
|
||||
Log.messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
intercept[RuntimeException] {
|
||||
pingpong1 !! BinaryString("Die")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pingDIE") {
|
||||
Log.messageLog
|
||||
|
||||
expect("DIE") {
|
||||
Log.messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("pong") {
|
||||
(pingpong1 !! BinaryString("Ping")).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pingDIEping") {
|
||||
Log.messageLog
|
||||
|
||||
expect("ping") {
|
||||
Log.messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
}
|
||||
|
||||
@Test def shouldKillMultipleActorsOneForOne = {
|
||||
Log.messageLog = ""
|
||||
Log.messageLog.clear
|
||||
val sup = getMultipleActorsOneForOneConf
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
intercept[RuntimeException] {
|
||||
pingpong3 !! BinaryString("Die")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
|
||||
expect("DIE") {
|
||||
Log.messageLog
|
||||
Log.messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
}
|
||||
|
||||
def tesCallKillCallMultipleActorsOneForOne = {
|
||||
Log.messageLog = ""
|
||||
Log.messageLog.clear
|
||||
val sup = getMultipleActorsOneForOneConf
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
(pingpong1 !! BinaryString("Ping")).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
|
||||
expect("pong") {
|
||||
(pingpong2 !! BinaryString("Ping")).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
|
||||
expect("pong") {
|
||||
(pingpong3 !! BinaryString("Ping")).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pingpingping") {
|
||||
Log.messageLog
|
||||
|
||||
expect("ping") {
|
||||
Log.messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("ping") {
|
||||
Log.messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("ping") {
|
||||
Log.messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
intercept[RuntimeException] {
|
||||
pingpong2 !! BinaryString("Die")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pingpingpingDIE") {
|
||||
Log.messageLog
|
||||
|
||||
expect("DIE") {
|
||||
Log.messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("pong") {
|
||||
(pingpong1 !! BinaryString("Ping")).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
|
||||
expect("pong") {
|
||||
(pingpong2 !! BinaryString("Ping")).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
|
||||
expect("pong") {
|
||||
(pingpong3 !! BinaryString("Ping")).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pingpingpingDIEpingpingping") {
|
||||
Log.messageLog
|
||||
|
||||
expect("ping") {
|
||||
Log.messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("ping") {
|
||||
Log.messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("ping") {
|
||||
Log.messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
}
|
||||
|
||||
@Test def shouldKillMultipleActorsAllForOne = {
|
||||
Log.messageLog = ""
|
||||
Log.messageLog.clear
|
||||
val sup = getMultipleActorsAllForOneConf
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
intercept[RuntimeException] {
|
||||
pingpong2 !! BinaryString("Die")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("DIEDIEDIE") {
|
||||
Log.messageLog
|
||||
|
||||
expect("DIE") {
|
||||
Log.messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("DIE") {
|
||||
Log.messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("DIE") {
|
||||
Log.messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
}
|
||||
|
||||
def tesCallKillCallMultipleActorsAllForOne = {
|
||||
Log.messageLog = ""
|
||||
Log.messageLog.clear
|
||||
val sup = getMultipleActorsAllForOneConf
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
(pingpong1 !! BinaryString("Ping")).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
|
||||
expect("pong") {
|
||||
(pingpong2 !! BinaryString("Ping")).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
|
||||
expect("pong") {
|
||||
(pingpong3 !! BinaryString("Ping")).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pingpingping") {
|
||||
Log.messageLog
|
||||
|
||||
expect("ping") {
|
||||
Log.messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("ping") {
|
||||
Log.messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("ping") {
|
||||
Log.messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
intercept[RuntimeException] {
|
||||
pingpong2 !! BinaryString("Die")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pingpingpingDIEDIEDIE") {
|
||||
Log.messageLog
|
||||
|
||||
expect("DIE") {
|
||||
Log.messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("DIE") {
|
||||
Log.messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("DIE") {
|
||||
Log.messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("pong") {
|
||||
(pingpong1 !! BinaryString("Ping")).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
|
||||
expect("pong") {
|
||||
(pingpong2 !! BinaryString("Ping")).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
|
||||
expect("pong") {
|
||||
(pingpong3 !! BinaryString("Ping")).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pingpingpingDIEDIEDIEpingpingping") {
|
||||
Log.messageLog
|
||||
|
||||
expect("ping") {
|
||||
Log.messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("ping") {
|
||||
Log.messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("ping") {
|
||||
Log.messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
@Test def shouldOneWayKillSingleActorOneForOne = {
|
||||
Logg.messageLog = ""
|
||||
val sup = getSingleActorOneForOneSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
pingpong1 ! BinaryString("Die")
|
||||
Thread.sleep(500)
|
||||
expect("DIE") {
|
||||
Logg.messageLog
|
||||
}
|
||||
}
|
||||
|
||||
@Test def shouldOneWayCallKillCallSingleActorOneForOne = {
|
||||
Logg.messageLog = ""
|
||||
val sup = getSingleActorOneForOneSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
pingpong1 ! OneWay
|
||||
Thread.sleep(500)
|
||||
expect("oneway") {
|
||||
Logg.oneWayLog
|
||||
}
|
||||
pingpong1 ! BinaryString("Die")
|
||||
Thread.sleep(500)
|
||||
expect("DIE") {
|
||||
Logg.messageLog
|
||||
}
|
||||
pingpong1 ! OneWay
|
||||
Thread.sleep(500)
|
||||
expect("onewayoneway") {
|
||||
Logg.oneWayLog
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
/*
|
||||
@Test def shouldOneWayKillSingleActorAllForOne = {
|
||||
Logg.messageLog = ""
|
||||
val sup = getSingleActorAllForOneSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
intercept[RuntimeException] {
|
||||
pingpong1 ! BinaryString("Die")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("DIE") {
|
||||
Logg.messageLog
|
||||
}
|
||||
}
|
||||
|
||||
@Test def shouldOneWayCallKillCallSingleActorAllForOne = {
|
||||
Logg.messageLog = ""
|
||||
val sup = getSingleActorAllForOneSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
(pingpong1 ! BinaryString("Ping")).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("ping") {
|
||||
Logg.messageLog
|
||||
}
|
||||
intercept[RuntimeException] {
|
||||
pingpong1 ! BinaryString("Die")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pingDIE") {
|
||||
Logg.messageLog
|
||||
}
|
||||
expect("pong") {
|
||||
(pingpong1 ! BinaryString("Ping")).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pingDIEping") {
|
||||
Logg.messageLog
|
||||
}
|
||||
}
|
||||
|
||||
@Test def shouldOneWayKillMultipleActorsOneForOne = {
|
||||
Logg.messageLog = ""
|
||||
val sup = getMultipleActorsOneForOneConf
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
intercept[RuntimeException] {
|
||||
pingpong3 ! BinaryString("Die")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("DIE") {
|
||||
Logg.messageLog
|
||||
}
|
||||
}
|
||||
|
||||
def tesOneWayCallKillCallMultipleActorsOneForOne = {
|
||||
Logg.messageLog = ""
|
||||
val sup = getMultipleActorsOneForOneConf
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
(pingpong1 ! BinaryString("Ping")).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
(pingpong2 ! BinaryString("Ping")).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
(pingpong3 ! BinaryString("Ping")).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pingpingping") {
|
||||
Logg.messageLog
|
||||
}
|
||||
intercept[RuntimeException] {
|
||||
pingpong2 ! BinaryString("Die")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pingpingpingDIE") {
|
||||
Logg.messageLog
|
||||
}
|
||||
expect("pong") {
|
||||
(pingpong1 ! BinaryString("Ping")).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
(pingpong2 ! BinaryString("Ping")).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
(pingpong3 ! BinaryString("Ping")).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pingpingpingDIEpingpingping") {
|
||||
Logg.messageLog
|
||||
}
|
||||
}
|
||||
|
||||
@Test def shouldOneWayKillMultipleActorsAllForOne = {
|
||||
Logg.messageLog = ""
|
||||
val sup = getMultipleActorsAllForOneConf
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
intercept[RuntimeException] {
|
||||
pingpong2 ! BinaryString("Die")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("DIEDIEDIE") {
|
||||
Logg.messageLog
|
||||
}
|
||||
}
|
||||
|
||||
def tesOneWayCallKillCallMultipleActorsAllForOne = {
|
||||
Logg.messageLog = ""
|
||||
val sup = getMultipleActorsAllForOneConf
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
pingpong1 ! BinaryString("Ping")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
(pingpong2 ! BinaryString("Ping")).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
(pingpong3 ! BinaryString("Ping")).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pingpingping") {
|
||||
Logg.messageLog
|
||||
}
|
||||
intercept[RuntimeException] {
|
||||
pingpong2 ! BinaryString("Die")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pingpingpingDIEDIEDIE") {
|
||||
Logg.messageLog
|
||||
}
|
||||
expect("pong") {
|
||||
(pingpong1 ! BinaryString("Ping")).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
(pingpong2 ! BinaryString("Ping")).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
(pingpong3 ! BinaryString("Ping")).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pingpingpingDIEDIEDIEpingpingping") {
|
||||
Logg.messageLog
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
/*
|
||||
@Test def shouldNestedSupervisorsTerminateFirstLevelActorAllForOne = {
|
||||
Logg.messageLog = ""
|
||||
val sup = getNestedSupervisorsAllForOneConf
|
||||
sup.start
|
||||
intercept[RuntimeException] {
|
||||
pingpong1 !! BinaryString("Die")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("DIEDIEDIE") {
|
||||
Logg.messageLog
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
// =============================================
|
||||
// Creat some supervisors with different configurations
|
||||
|
||||
|
|
|
|||
|
|
@ -1,7 +1,6 @@
|
|||
package se.scalablesolutions.akka.actor
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import java.util.concurrent.{CountDownLatch, TimeUnit}
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.{Test, Before, After}
|
||||
|
||||
|
|
@ -14,8 +13,8 @@ object ServerInitiatedRemoteActorTest {
|
|||
var server: RemoteServer = null
|
||||
|
||||
object Global {
|
||||
var oneWay = "nada"
|
||||
var remoteReply = "nada"
|
||||
val oneWay = new CountDownLatch(1)
|
||||
var remoteReply = new CountDownLatch(1)
|
||||
}
|
||||
|
||||
class RemoteActorSpecActorUnidirectional extends Actor {
|
||||
|
|
@ -25,7 +24,7 @@ object ServerInitiatedRemoteActorTest {
|
|||
def receive = {
|
||||
case "OneWay" =>
|
||||
println("================== ONEWAY")
|
||||
Global.oneWay = "received"
|
||||
Global.oneWay.countDown
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -47,7 +46,7 @@ object ServerInitiatedRemoteActorTest {
|
|||
case Send(actor: Actor) =>
|
||||
actor ! "Hello"
|
||||
case "World" =>
|
||||
Global.remoteReply = "replied"
|
||||
Global.remoteReply.countDown
|
||||
}
|
||||
|
||||
def send(actor: Actor) {
|
||||
|
|
@ -92,8 +91,8 @@ class ServerInitiatedRemoteActorTest extends JUnitSuite {
|
|||
5000L,
|
||||
HOSTNAME, PORT)
|
||||
val result = actor ! "OneWay"
|
||||
Thread.sleep(1000)
|
||||
assert("received" === Global.oneWay)
|
||||
Global.oneWay.await(1, TimeUnit.SECONDS)
|
||||
assert(0 === Global.oneWay.getCount)
|
||||
actor.stop
|
||||
}
|
||||
|
||||
|
|
@ -120,8 +119,8 @@ class ServerInitiatedRemoteActorTest extends JUnitSuite {
|
|||
sender.setReplyToAddress(HOSTNAME, PORT)
|
||||
sender.start
|
||||
sender.send(actor)
|
||||
Thread.sleep(1000)
|
||||
assert("replied" === Global.remoteReply)
|
||||
Global.remoteReply.await(1, TimeUnit.SECONDS)
|
||||
assert(0 === Global.remoteReply.getCount)
|
||||
actor.stop
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@
|
|||
|
||||
package se.scalablesolutions.akka.actor
|
||||
|
||||
import _root_.java.util.concurrent.{TimeUnit, BlockingQueue, LinkedBlockingQueue}
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.dispatch.Dispatchers
|
||||
import se.scalablesolutions.akka.{OneWay, Die, Ping}
|
||||
|
|
@ -17,15 +18,15 @@ import org.junit.Test
|
|||
class SupervisorTest extends JUnitSuite {
|
||||
import Actor.Sender.Self
|
||||
|
||||
var messageLog: String = ""
|
||||
var oneWayLog: String = ""
|
||||
|
||||
var messageLog: BlockingQueue[String] = new LinkedBlockingQueue[String]
|
||||
var oneWayLog: BlockingQueue[String] = new LinkedBlockingQueue[String]
|
||||
|
||||
var pingpong1: PingPong1Actor = _
|
||||
var pingpong2: PingPong2Actor = _
|
||||
var pingpong3: PingPong3Actor = _
|
||||
|
||||
@Test def shouldStartServer = {
|
||||
messageLog = ""
|
||||
messageLog.clear
|
||||
val sup = getSingleActorAllForOneSupervisor
|
||||
sup.start
|
||||
|
||||
|
|
@ -35,414 +36,265 @@ class SupervisorTest extends JUnitSuite {
|
|||
}
|
||||
|
||||
@Test def shouldKillSingleActorOneForOne = {
|
||||
messageLog = ""
|
||||
messageLog.clear
|
||||
val sup = getSingleActorOneForOneSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
intercept[RuntimeException] {
|
||||
pingpong1 !! Die
|
||||
}
|
||||
Thread.sleep(500)
|
||||
|
||||
expect("DIE") {
|
||||
messageLog
|
||||
messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
}
|
||||
|
||||
@Test def shouldCallKillCallSingleActorOneForOne = {
|
||||
messageLog = ""
|
||||
messageLog.clear
|
||||
val sup = getSingleActorOneForOneSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
(pingpong1 !! Ping).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
|
||||
expect("ping") {
|
||||
messageLog
|
||||
messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
intercept[RuntimeException] {
|
||||
pingpong1 !! Die
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pingDIE") {
|
||||
messageLog
|
||||
|
||||
expect("DIE") {
|
||||
messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("pong") {
|
||||
(pingpong1 !! Ping).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pingDIEping") {
|
||||
messageLog
|
||||
|
||||
expect("ping") {
|
||||
messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
}
|
||||
|
||||
@Test def shouldKillSingleActorAllForOne = {
|
||||
messageLog = ""
|
||||
messageLog.clear
|
||||
val sup = getSingleActorAllForOneSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
intercept[RuntimeException] {
|
||||
pingpong1 !! Die
|
||||
}
|
||||
Thread.sleep(500)
|
||||
|
||||
expect("DIE") {
|
||||
messageLog
|
||||
messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
}
|
||||
|
||||
@Test def shouldCallKillCallSingleActorAllForOne = {
|
||||
messageLog = ""
|
||||
messageLog.clear
|
||||
val sup = getSingleActorAllForOneSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
(pingpong1 !! Ping).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
|
||||
expect("ping") {
|
||||
messageLog
|
||||
messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
intercept[RuntimeException] {
|
||||
pingpong1 !! Die
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pingDIE") {
|
||||
messageLog
|
||||
|
||||
expect("DIE") {
|
||||
messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("pong") {
|
||||
(pingpong1 !! Ping).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pingDIEping") {
|
||||
messageLog
|
||||
|
||||
expect("ping") {
|
||||
messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
}
|
||||
|
||||
@Test def shouldKillMultipleActorsOneForOne = {
|
||||
messageLog = ""
|
||||
messageLog.clear
|
||||
val sup = getMultipleActorsOneForOneConf
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
intercept[RuntimeException] {
|
||||
pingpong3 !! Die
|
||||
}
|
||||
Thread.sleep(500)
|
||||
|
||||
expect("DIE") {
|
||||
messageLog
|
||||
messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
}
|
||||
|
||||
def tesCallKillCallMultipleActorsOneForOne = {
|
||||
messageLog = ""
|
||||
messageLog.clear
|
||||
val sup = getMultipleActorsOneForOneConf
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
(pingpong1 !! Ping).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
|
||||
expect("pong") {
|
||||
(pingpong2 !! Ping).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
|
||||
expect("pong") {
|
||||
(pingpong3 !! Ping).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pingpingping") {
|
||||
messageLog
|
||||
|
||||
expect("ping") {
|
||||
messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("ping") {
|
||||
messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("ping") {
|
||||
messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
intercept[RuntimeException] {
|
||||
pingpong2 !! Die
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pingpingpingDIE") {
|
||||
messageLog
|
||||
|
||||
expect("DIE") {
|
||||
messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("pong") {
|
||||
(pingpong1 !! Ping).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
|
||||
expect("pong") {
|
||||
(pingpong2 !! Ping).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
|
||||
expect("pong") {
|
||||
(pingpong3 !! Ping).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pingpingpingDIEpingpingping") {
|
||||
messageLog
|
||||
|
||||
expect("ping") {
|
||||
messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("ping") {
|
||||
messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("ping") {
|
||||
messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
}
|
||||
|
||||
@Test def shouldKillMultipleActorsAllForOne = {
|
||||
messageLog = ""
|
||||
messageLog.clear
|
||||
val sup = getMultipleActorsAllForOneConf
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
intercept[RuntimeException] {
|
||||
pingpong2 !! Die
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("DIEDIEDIE") {
|
||||
messageLog
|
||||
|
||||
expect("DIE") {
|
||||
messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("DIE") {
|
||||
messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("DIE") {
|
||||
messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
}
|
||||
|
||||
def tesCallKillCallMultipleActorsAllForOne = {
|
||||
messageLog = ""
|
||||
messageLog.clear
|
||||
val sup = getMultipleActorsAllForOneConf
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
(pingpong1 !! Ping).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
|
||||
expect("pong") {
|
||||
(pingpong2 !! Ping).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
|
||||
expect("pong") {
|
||||
(pingpong3 !! Ping).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pingpingping") {
|
||||
messageLog
|
||||
|
||||
expect("ping") {
|
||||
messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("ping") {
|
||||
messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("ping") {
|
||||
messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
intercept[RuntimeException] {
|
||||
pingpong2 !! Die
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pingpingpingDIEDIEDIE") {
|
||||
messageLog
|
||||
|
||||
expect("DIE") {
|
||||
messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("DIE") {
|
||||
messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("DIE") {
|
||||
messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("pong") {
|
||||
(pingpong1 !! Ping).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
|
||||
expect("pong") {
|
||||
(pingpong2 !! Ping).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
|
||||
expect("pong") {
|
||||
(pingpong3 !! Ping).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pingpingpingDIEDIEDIEpingpingping") {
|
||||
messageLog
|
||||
|
||||
expect("ping") {
|
||||
messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("ping") {
|
||||
messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
expect("ping") {
|
||||
messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
}
|
||||
|
||||
@Test def shouldOneWayKillSingleActorOneForOne = {
|
||||
messageLog = ""
|
||||
messageLog.clear
|
||||
val sup = getSingleActorOneForOneSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
pingpong1 ! Die
|
||||
Thread.sleep(500)
|
||||
|
||||
expect("DIE") {
|
||||
messageLog
|
||||
messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
}
|
||||
|
||||
@Test def shouldOneWayCallKillCallSingleActorOneForOne = {
|
||||
messageLog = ""
|
||||
messageLog.clear
|
||||
val sup = getSingleActorOneForOneSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
pingpong1 ! OneWay
|
||||
Thread.sleep(500)
|
||||
|
||||
expect("oneway") {
|
||||
oneWayLog
|
||||
oneWayLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
pingpong1 ! Die
|
||||
Thread.sleep(500)
|
||||
|
||||
expect("DIE") {
|
||||
messageLog
|
||||
messageLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
pingpong1 ! OneWay
|
||||
Thread.sleep(500)
|
||||
expect("onewayoneway") {
|
||||
oneWayLog
|
||||
|
||||
expect("oneway") {
|
||||
oneWayLog.poll(1, TimeUnit.SECONDS)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
@Test def shouldOneWayKillSingleActorAllForOne = {
|
||||
messageLog = ""
|
||||
val sup = getSingleActorAllForOneSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
intercept[RuntimeException] {
|
||||
pingpong1 ! Die
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("DIE") {
|
||||
messageLog
|
||||
}
|
||||
}
|
||||
|
||||
@Test def shouldOneWayCallKillCallSingleActorAllForOne = {
|
||||
messageLog = ""
|
||||
val sup = getSingleActorAllForOneSupervisor
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
(pingpong1 ! Ping).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("ping") {
|
||||
messageLog
|
||||
}
|
||||
intercept[RuntimeException] {
|
||||
pingpong1 ! Die
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pingDIE") {
|
||||
messageLog
|
||||
}
|
||||
expect("pong") {
|
||||
(pingpong1 ! Ping).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pingDIEping") {
|
||||
messageLog
|
||||
}
|
||||
}
|
||||
|
||||
@Test def shouldOneWayKillMultipleActorsOneForOne = {
|
||||
messageLog = ""
|
||||
val sup = getMultipleActorsOneForOneConf
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
intercept[RuntimeException] {
|
||||
pingpong3 ! Die
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("DIE") {
|
||||
messageLog
|
||||
}
|
||||
}
|
||||
|
||||
def tesOneWayCallKillCallMultipleActorsOneForOne = {
|
||||
messageLog = ""
|
||||
val sup = getMultipleActorsOneForOneConf
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
(pingpong1 ! Ping).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
(pingpong2 ! Ping).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
(pingpong3 ! Ping).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pingpingping") {
|
||||
messageLog
|
||||
}
|
||||
intercept[RuntimeException] {
|
||||
pingpong2 ! Die
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pingpingpingDIE") {
|
||||
messageLog
|
||||
}
|
||||
expect("pong") {
|
||||
(pingpong1 ! Ping).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
(pingpong2 ! Ping).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
(pingpong3 ! Ping).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pingpingpingDIEpingpingping") {
|
||||
messageLog
|
||||
}
|
||||
}
|
||||
|
||||
@Test def shouldOneWayKillMultipleActorsAllForOne = {
|
||||
messageLog = ""
|
||||
val sup = getMultipleActorsAllForOneConf
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
intercept[RuntimeException] {
|
||||
pingpong2 ! Die
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("DIEDIEDIE") {
|
||||
messageLog
|
||||
}
|
||||
}
|
||||
|
||||
def tesOneWayCallKillCallMultipleActorsAllForOne = {
|
||||
messageLog = ""
|
||||
val sup = getMultipleActorsAllForOneConf
|
||||
sup.start
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
pingpong1 ! Ping
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
(pingpong2 ! Ping).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
(pingpong3 ! Ping).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pingpingping") {
|
||||
messageLog
|
||||
}
|
||||
intercept[RuntimeException] {
|
||||
pingpong2 ! Die
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pingpingpingDIEDIEDIE") {
|
||||
messageLog
|
||||
}
|
||||
expect("pong") {
|
||||
(pingpong1 ! Ping).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
(pingpong2 ! Ping).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pong") {
|
||||
(pingpong3 ! Ping).getOrElse("nil")
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("pingpingpingDIEDIEDIEpingpingping") {
|
||||
messageLog
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
/*
|
||||
@Test def shouldNestedSupervisorsTerminateFirstLevelActorAllForOne = {
|
||||
messageLog = ""
|
||||
val sup = getNestedSupervisorsAllForOneConf
|
||||
sup.start
|
||||
intercept[RuntimeException] {
|
||||
pingpong1 !! Die
|
||||
}
|
||||
Thread.sleep(500)
|
||||
expect("DIEDIEDIE") {
|
||||
messageLog
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
// =============================================
|
||||
// Creat some supervisors with different configurations
|
||||
|
||||
|
|
@ -547,44 +399,44 @@ class SupervisorTest extends JUnitSuite {
|
|||
class PingPong1Actor extends Actor {
|
||||
def receive = {
|
||||
case Ping =>
|
||||
messageLog += "ping"
|
||||
messageLog.put("ping")
|
||||
reply("pong")
|
||||
|
||||
case OneWay =>
|
||||
oneWayLog += "oneway"
|
||||
oneWayLog.put("oneway")
|
||||
|
||||
case Die =>
|
||||
throw new RuntimeException("DIE")
|
||||
}
|
||||
override protected def postRestart(reason: Throwable) {
|
||||
messageLog += reason.getMessage
|
||||
messageLog.put(reason.getMessage)
|
||||
}
|
||||
}
|
||||
|
||||
class PingPong2Actor extends Actor {
|
||||
def receive = {
|
||||
case Ping =>
|
||||
messageLog += "ping"
|
||||
messageLog.put("ping")
|
||||
reply("pong")
|
||||
case Die =>
|
||||
throw new RuntimeException("DIE")
|
||||
}
|
||||
override protected def postRestart(reason: Throwable) {
|
||||
messageLog += reason.getMessage
|
||||
messageLog.put(reason.getMessage)
|
||||
}
|
||||
}
|
||||
|
||||
class PingPong3Actor extends Actor {
|
||||
def receive = {
|
||||
case Ping =>
|
||||
messageLog += "ping"
|
||||
messageLog.put("ping")
|
||||
reply("pong")
|
||||
case Die =>
|
||||
throw new RuntimeException("DIE")
|
||||
}
|
||||
|
||||
override protected def postRestart(reason: Throwable) {
|
||||
messageLog += reason.getMessage
|
||||
messageLog.put(reason.getMessage)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,7 +1,6 @@
|
|||
package se.scalablesolutions.akka.actor
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import java.util.concurrent.{CountDownLatch, TimeUnit}
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
|
||||
|
|
@ -24,17 +23,17 @@ class ThreadBasedActorTest extends JUnitSuite {
|
|||
}
|
||||
|
||||
@Test def shouldSendOneWay = {
|
||||
var oneWay = "nada"
|
||||
var oneWay = new CountDownLatch(1)
|
||||
val actor = new Actor {
|
||||
dispatcher = Dispatchers.newThreadBasedDispatcher(this)
|
||||
def receive = {
|
||||
case "OneWay" => oneWay = "received"
|
||||
case "OneWay" => oneWay.countDown
|
||||
}
|
||||
}
|
||||
actor.start
|
||||
val result = actor ! "OneWay"
|
||||
Thread.sleep(1000)
|
||||
assert("received" === oneWay)
|
||||
oneWay.await(1, TimeUnit.SECONDS)
|
||||
assert(0 === oneWay.getCount)
|
||||
actor.stop
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue