Fixed supervision bugs

This commit is contained in:
Jonas Bonér 2010-05-18 10:47:23 +02:00
parent 844fa2db06
commit b079a128b7
6 changed files with 161 additions and 170 deletions

View file

@ -76,6 +76,7 @@ case class UnlinkAndStop(child: ActorRef) extends LifeCycleMessage
case object Kill extends LifeCycleMessage
// Exceptions for Actors
class ActorStartException private[akka](message: String) extends RuntimeException(message)
class ActorKilledException private[akka](message: String) extends RuntimeException(message)
class ActorInitializationException private[akka](message: String) extends RuntimeException(message)
@ -281,8 +282,8 @@ trait Actor extends Logging {
* </pre>
*/
val self: ActorRef = optionSelf.get
self.id = getClass.getName
import self._
/**
* User overridable callback/setting.

View file

@ -101,9 +101,8 @@ trait ActorRef extends TransactionManagement {
// Only mutable for RemoteServer in order to maintain identity across nodes
@volatile protected[akka] var _uuid = UUID.newUuid.toString
@volatile protected[this] var _isRunning = false
@volatile protected[this] var _isSuspended = true
@volatile protected[this] var _isShutDown = false
@volatile protected[akka] var _isKilled = false
@volatile protected[akka] var _isBeingRestarted = false
@volatile protected[akka] var _homeAddress = new InetSocketAddress(RemoteServer.HOSTNAME, RemoteServer.PORT)
@volatile protected[akka] var startOnCreation = false
@ -213,9 +212,9 @@ trait ActorRef extends TransactionManagement {
protected[akka] def replyTo_=(rt: Option[Either[ActorRef, CompletableFuture[Any]]]) = guard.withGuard { _replyTo = rt }
/**
* Is the actor killed?
* Is the actor being restarted?
*/
def isKilled: Boolean = _isKilled
def isBeingRestarted: Boolean = _isBeingRestarted
/**
* Is the actor running?
@ -252,9 +251,9 @@ trait ActorRef extends TransactionManagement {
* <p/>
*/
def !(message: Any)(implicit sender: Option[ActorRef] = None) = {
if (isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages")
if (isRunning) postMessageToMailbox(message, sender)
else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
else throw new ActorInitializationException(
"Actor has not been started, you need to invoke 'actor.start' before using it")
}
/**
@ -270,7 +269,6 @@ trait ActorRef extends TransactionManagement {
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
def !![T](message: Any, timeout: Long): Option[T] = {
if (isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages")
if (isRunning) {
val future = postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, None)
val isActiveObject = message.isInstanceOf[Invocation]
@ -287,9 +285,8 @@ trait ActorRef extends TransactionManagement {
if (future.exception.isDefined) throw future.exception.get._2
else future.result
}
else throw new IllegalStateException(
"Actor has not been started, you need to invoke 'actor.start' before using it")
} else throw new ActorInitializationException(
"Actor has not been started, you need to invoke 'actor.start' before using it")
}
/**
@ -318,9 +315,8 @@ trait ActorRef extends TransactionManagement {
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
def !!![T](message: Any): Future[T] = {
if (isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages")
if (isRunning) postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, None)
else throw new IllegalStateException(
else throw new ActorInitializationException(
"Actor has not been started, you need to invoke 'actor.start' before using it")
}
@ -330,14 +326,13 @@ trait ActorRef extends TransactionManagement {
* Works with '!', '!!' and '!!!'.
*/
def forward(message: Any)(implicit sender: Some[ActorRef]) = {
if (isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages")
if (isRunning) {
sender.get.replyTo match {
case Some(Left(actorRef)) => postMessageToMailbox(message, Some(actorRef))
case Some(Right(future)) => postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, Some(future))
case _ => throw new IllegalStateException("Can't forward message when initial sender is not an actor")
}
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
} else throw new ActorInitializationException("Actor has not been started, you need to invoke 'actor.start' before using it")
}
/**
@ -643,8 +638,8 @@ sealed class LocalActorRef private[akka](
* Sets the dispatcher for this actor. Needs to be invoked before the actor is started.
*/
def dispatcher_=(md: MessageDispatcher): Unit = guard.withGuard {
if (!isRunning) _dispatcher = md
else throw new IllegalArgumentException(
if (!isRunning || isBeingRestarted) _dispatcher = md
else throw new ActorInitializationException(
"Can not swap dispatcher for " + toString + " after it has been started")
}
@ -657,21 +652,20 @@ sealed class LocalActorRef private[akka](
* Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
*/
def makeRemote(hostname: String, port: Int): Unit =
if (isRunning) throw new IllegalStateException(
if (!isRunning || isBeingRestarted) makeRemote(new InetSocketAddress(hostname, port))
else throw new ActorInitializationException(
"Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.")
else makeRemote(new InetSocketAddress(hostname, port))
/**
* Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
*/
def makeRemote(address: InetSocketAddress): Unit = guard.withGuard {
if (isRunning) throw new IllegalStateException(
"Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.")
else {
if (!isRunning || isBeingRestarted) {
_remoteAddress = Some(address)
RemoteClient.register(address.getHostName, address.getPort, uuid)
homeAddress = (RemoteServer.HOSTNAME, RemoteServer.PORT)
}
} else throw new ActorInitializationException(
"Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.")
}
/**
@ -683,9 +677,9 @@ sealed class LocalActorRef private[akka](
* </pre>
*/
def makeTransactionRequired = guard.withGuard {
if (isRunning) throw new IllegalArgumentException(
if (!isRunning || isBeingRestarted) isTransactor = true
else throw new ActorInitializationException(
"Can not make actor transaction required after it has been started")
else isTransactor = true
}
/**
@ -704,7 +698,7 @@ sealed class LocalActorRef private[akka](
* Starts up the actor and its message queue.
*/
def start: ActorRef = guard.withGuard {
if (isShutdown) throw new IllegalStateException(
if (isShutdown) throw new ActorStartException(
"Can't restart an actor that has been shut down with 'stop' or 'exit'")
if (!isRunning) {
if (!isInInitialization) initializeActorInstance
@ -726,7 +720,7 @@ sealed class LocalActorRef private[akka](
remoteAddress.foreach(address => RemoteClient.unregister(
address.getHostName, address.getPort, uuid))
RemoteNode.unregister(this)
}
} else if (isBeingRestarted) throw new ActorKilledException("Actor [" + toString + "] is being restarted.")
}
/**
@ -915,9 +909,8 @@ sealed class LocalActorRef private[akka](
val invocation = new MessageInvocation(this, message, senderOption.map(Left(_)), transactionSet.get)
if (dispatcher.usesActorMailbox) {
_mailbox.add(invocation)
if (_isSuspended) invocation.send
}
else invocation.send
invocation.send
} else invocation.send
}
}
@ -990,7 +983,7 @@ sealed class LocalActorRef private[akka](
"No handler matching message [" + message + "] in " + toString)
} catch {
case e =>
_isKilled = true
_isBeingRestarted = true
Actor.log.error(e, "Could not invoke actor [%s]", toString)
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
if (_supervisor.isDefined) _supervisor.get ! Exit(this, e)
@ -1038,6 +1031,7 @@ sealed class LocalActorRef private[akka](
} catch {
case e: IllegalStateException => {}
case e =>
_isBeingRestarted = true
// abort transaction set
if (isTransactionSetInScope) try {
getTransactionSetInScope.abort
@ -1080,6 +1074,7 @@ sealed class LocalActorRef private[akka](
}
protected[akka] def restart(reason: Throwable): Unit = {
_isBeingRestarted = true
Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id)
restartLinkedActors(reason)
val failedActor = actorInstance.get
@ -1087,18 +1082,16 @@ sealed class LocalActorRef private[akka](
Actor.log.debug("Restarting linked actors for actor [%s].", id)
Actor.log.debug("Invoking 'preRestart' for failed actor instance [%s].", id)
failedActor.preRestart(reason)
failedActor.shutdown
_isRunning = false
val freshActor = newActor
freshActor.synchronized {
initializeActorInstance
freshActor.init
freshActor.initTransactionalState
_isRunning = true
actorInstance.set(freshActor)
Actor.log.debug("Invoking 'postRestart' for new actor instance [%s].", id)
freshActor.postRestart(reason)
}
_isKilled = false
_isBeingRestarted = false
}
}
@ -1146,7 +1139,7 @@ sealed class LocalActorRef private[akka](
protected[akka] def linkedActorsAsList: List[ActorRef] =
linkedActors.values.toArray.toList.asInstanceOf[List[ActorRef]]
private def initializeActorInstance = if (!isRunning) {
private def initializeActorInstance = if (!isRunning || isBeingRestarted) {
dispatcher.register(this)
dispatcher.start
actor.init // run actor init and initTransactionalState callbacks

View file

@ -11,8 +11,7 @@ import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.dispatch.Dispatchers
import se.scalablesolutions.akka.remote.RemoteServer
import Actor._
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
/**
* Factory object for creating supervisors declarative. It creates instances of the 'Supervisor' class.
@ -132,7 +131,6 @@ object SupervisorFactory {
}
}
/**
* For internal use only.
*
@ -146,7 +144,7 @@ class SupervisorFactory private[akka] (val config: SupervisorConfig) extends Log
def newInstanceFor(config: SupervisorConfig): Supervisor = {
val (handler, trapExits) = SupervisorFactory.retrieveFaultHandlerAndTrapExitsFrom(config)
val supervisor = new Supervisor(handler, trapExits)
supervisor.configure(config, this)
supervisor.configure(config)
supervisor.start
supervisor
}
@ -168,8 +166,9 @@ sealed class Supervisor private[akka] (
handler: FaultHandlingStrategy, trapExceptions: List[Class[_ <: Throwable]])
extends Configurator {
private val children = new ConcurrentHashMap[String, List[ActorRef]]
private val supervisor = SupervisorActor(handler, trapExceptions)
private val childActors = new ConcurrentHashMap[String, List[ActorRef]]
private val childSupervisors = new CopyOnWriteArrayList[Supervisor]
private[akka] val supervisor = SupervisorActor(handler, trapExceptions)
def uuid = supervisor.uuid
@ -184,42 +183,39 @@ sealed class Supervisor private[akka] (
def unlink(child: ActorRef) = supervisor ! Unlink(child)
def getInstance[T](clazz: Class[T]): List[T] = children.get(clazz.getName).asInstanceOf[List[T]]
// FIXME recursive search + do not fix if we remove feature that Actors can be RESTful usin Jersey annotations
def getInstance[T](clazz: Class[T]): List[T] = childActors.get(clazz.getName).asInstanceOf[List[T]]
def getComponentInterfaces: List[Class[_]] =
children.values.toArray.toList.asInstanceOf[List[List[AnyRef]]].flatten.map(_.getClass)
// FIXME recursive search + do not fix if we remove feature that Actors can be RESTful usin Jersey annotations
def getComponentInterfaces: List[Class[_]] =
childActors.values.toArray.toList.asInstanceOf[List[List[AnyRef]]].flatten.map(_.getClass)
def isDefined(clazz: Class[_]): Boolean = children.containsKey(clazz.getName)
// FIXME recursive search + do not fix if we remove feature that Actors can be RESTful usin Jersey annotations
def isDefined(clazz: Class[_]): Boolean = childActors.containsKey(clazz.getName)
def configure(config: SupervisorConfig, factory: SupervisorFactory) = config match {
def configure(config: SupervisorConfig): Unit = config match {
case SupervisorConfig(_, servers) =>
servers.map(server =>
server match {
case Supervise(actorRef, lifeCycle, remoteAddress) =>
val className = actorRef.actor.getClass.getName
val currentActors = {
val list = children.get(className)
val list = childActors.get(className)
if (list eq null) List[ActorRef]()
else list
}
children.put(className, actorRef :: currentActors)
childActors.put(className, actorRef :: currentActors)
actorRef.lifeCycle = Some(lifeCycle)
supervisor ! Link(actorRef)
remoteAddress.foreach(address => RemoteServer.actorsFor(
RemoteServer.Address(address.hostname, address.port))
.actors.put(actorRef.id, actorRef))
remoteAddress.foreach { address => RemoteServer
.actorsFor(RemoteServer.Address(address.hostname, address.port))
.actors.put(actorRef.id, actorRef)
}
case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration
val childSupervisor = SupervisorActor(supervisorConfig)
childSupervisor.lifeCycle = Some(LifeCycle(Permanent))
val className = childSupervisor.uuid
val currentSupervisors = {
val list = children.get(className)
if (list eq null) List[ActorRef]()
else list
}
children.put(className, childSupervisor :: currentSupervisors)
supervisor ! Link(childSupervisor)
case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration
val childSupervisor = Supervisor(supervisorConfig)
supervisor ! Link(childSupervisor.supervisor)
childSupervisors.add(childSupervisor)
})
}
}
@ -240,18 +236,18 @@ sealed class Supervisor private[akka] (
*/
final class SupervisorActor private[akka] (
handler: FaultHandlingStrategy,
trapExceptions: List[Class[_ <: Throwable]])
extends Actor {
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
self.trapExit = trapExceptions
self.faultHandler = Some(handler)
trapExceptions: List[Class[_ <: Throwable]]) extends Actor {
import self._
// dispatcher = Dispatchers.newThreadBasedDispatcher(self)
trapExit = trapExceptions
faultHandler = Some(handler)
override def shutdown: Unit = self.shutdownLinkedActors
override def shutdown: Unit = shutdownLinkedActors
def receive = {
case Link(child) => self.startLink(child)
case Unlink(child) => self.unlink(child)
case UnlinkAndStop(child) => self.unlink(child); child.stop
case Link(child) => startLink(child)
case Unlink(child) => unlink(child)
case UnlinkAndStop(child) => unlink(child); child.stop
case unknown => throw new IllegalArgumentException(
"Supervisor can only respond to 'Link' and 'Unlink' messages. Unknown message [" + unknown + "]")
}

View file

@ -40,7 +40,6 @@ object Log {
}
@serializable class RemotePingPong2Actor extends Actor {
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
def receive = {
case BinaryString("Ping") =>
Log.messageLog.put("ping")
@ -55,7 +54,6 @@ object Log {
}
@serializable class RemotePingPong3Actor extends Actor {
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
def receive = {
case BinaryString("Ping") =>
Log.messageLog.put("ping")

View file

@ -7,12 +7,12 @@ import org.junit.{Test, Before, After}
import scala.reflect.BeanInfo
@BeanInfo
//@BeanInfo
case class Foo(foo: String) {
def this() = this(null)
}
@BeanInfo
//@BeanInfo
case class MyMessage(val id: String, val value: Tuple2[String, Int]) {
private def this() = this(null, null)
}

View file

@ -23,11 +23,13 @@ object SupervisorSpec {
}
class PingPong1Actor extends Actor {
self.timeout = 1000
import self._
dispatcher = Dispatchers.newThreadBasedDispatcher(self)
timeout = 1000
def receive = {
case Ping =>
messageLog.put("ping")
self.reply("pong")
reply("pong")
case OneWay =>
oneWayLog.put("oneway")
@ -41,11 +43,12 @@ object SupervisorSpec {
}
class PingPong2Actor extends Actor {
self.timeout = 1000
import self._
timeout = 1000
def receive = {
case Ping =>
messageLog.put("ping")
self.reply("pong")
reply("pong")
case Die =>
throw new RuntimeException("DIE")
}
@ -55,11 +58,12 @@ object SupervisorSpec {
}
class PingPong3Actor extends Actor {
self.timeout = 1000
import self._
timeout = 1000
def receive = {
case Ping =>
messageLog.put("ping")
self.reply("pong")
reply("pong")
case Die =>
throw new RuntimeException("DIE")
}
@ -86,7 +90,7 @@ class SupervisorSpec extends JUnitSuite {
sup.start
expect("pong") {
(pingpong1 !! (Ping, 100)).getOrElse("nil")
(pingpong1 !! (Ping, 5000)).getOrElse("nil")
}
}
@ -96,7 +100,7 @@ class SupervisorSpec extends JUnitSuite {
sup.start
expect("pong") {
(pingpong1 !! (Ping, 100)).getOrElse("nil")
(pingpong1 !! (Ping, 5000)).getOrElse("nil")
}
}
@ -105,11 +109,11 @@ class SupervisorSpec extends JUnitSuite {
val sup = getSingleActorOneForOneSupervisor
sup.start
intercept[RuntimeException] {
pingpong1 !! (Die, 100)
pingpong1 !! (Die, 5000)
}
expect("DIE") {
messageLog.poll(1, TimeUnit.SECONDS)
messageLog.poll(5, TimeUnit.SECONDS)
}
}
@ -118,25 +122,25 @@ class SupervisorSpec extends JUnitSuite {
val sup = getSingleActorOneForOneSupervisor
sup.start
expect("pong") {
(pingpong1 !! (Ping, 100)).getOrElse("nil")
(pingpong1 !! (Ping, 5000)).getOrElse("nil")
}
expect("ping") {
messageLog.poll(1, TimeUnit.SECONDS)
messageLog.poll(5, TimeUnit.SECONDS)
}
intercept[RuntimeException] {
pingpong1 !! (Die, 100)
pingpong1 !! (Die, 5000)
}
expect("DIE") {
messageLog.poll(1, TimeUnit.SECONDS)
messageLog.poll(5, TimeUnit.SECONDS)
}
expect("pong") {
(pingpong1 !! (Ping, 100)).getOrElse("nil")
(pingpong1 !! (Ping, 5000)).getOrElse("nil")
}
expect("ping") {
messageLog.poll(1, TimeUnit.SECONDS)
messageLog.poll(5, TimeUnit.SECONDS)
}
}
@ -145,11 +149,11 @@ class SupervisorSpec extends JUnitSuite {
val sup = getSingleActorAllForOneSupervisor
sup.start
intercept[RuntimeException] {
pingpong1 !! (Die, 100)
pingpong1 !! (Die, 5000)
}
expect("DIE") {
messageLog.poll(1, TimeUnit.SECONDS)
messageLog.poll(5, TimeUnit.SECONDS)
}
}
@ -158,25 +162,25 @@ class SupervisorSpec extends JUnitSuite {
val sup = getSingleActorAllForOneSupervisor
sup.start
expect("pong") {
(pingpong1 !! (Ping, 100)).getOrElse("nil")
(pingpong1 !! (Ping, 5000)).getOrElse("nil")
}
expect("ping") {
messageLog.poll(1, TimeUnit.SECONDS)
messageLog.poll(5, TimeUnit.SECONDS)
}
intercept[RuntimeException] {
pingpong1 !! (Die, 100)
pingpong1 !! (Die, 5000)
}
expect("DIE") {
messageLog.poll(1, TimeUnit.SECONDS)
messageLog.poll(5, TimeUnit.SECONDS)
}
expect("pong") {
(pingpong1 !! (Ping, 100)).getOrElse("nil")
(pingpong1 !! (Ping, 5000)).getOrElse("nil")
}
expect("ping") {
messageLog.poll(1, TimeUnit.SECONDS)
messageLog.poll(5, TimeUnit.SECONDS)
}
}
@ -185,79 +189,79 @@ class SupervisorSpec extends JUnitSuite {
val sup = getMultipleActorsOneForOneConf
sup.start
intercept[RuntimeException] {
pingpong1 !! (Die, 100)
pingpong1 !! (Die, 5000)
}
expect("DIE") {
messageLog.poll(1, TimeUnit.SECONDS)
messageLog.poll(5, TimeUnit.SECONDS)
}
}
/*
@Test def shouldKillMultipleActorsOneForOne2 = {
clearMessageLogs
val sup = getMultipleActorsOneForOneConf
sup.start
intercept[RuntimeException] {
pingpong3 !! (Die, 100)
pingpong3 !! (Die, 5000)
}
expect("DIE") {
messageLog.poll(1, TimeUnit.SECONDS)
messageLog.poll(5, TimeUnit.SECONDS)
}
}
*/
@Test def shouldKillCallMultipleActorsOneForOne = {
clearMessageLogs
val sup = getMultipleActorsOneForOneConf
sup.start
expect("pong") {
(pingpong1 !! (Ping, 100)).getOrElse("nil")
(pingpong1 !! (Ping, 5000)).getOrElse("nil")
}
expect("pong") {
(pingpong2 !! (Ping, 100)).getOrElse("nil")
(pingpong2 !! (Ping, 5000)).getOrElse("nil")
}
expect("pong") {
(pingpong3 !! (Ping, 100)).getOrElse("nil")
(pingpong3 !! (Ping, 5000)).getOrElse("nil")
}
expect("ping") {
messageLog.poll(1, TimeUnit.SECONDS)
messageLog.poll(5, TimeUnit.SECONDS)
}
expect("ping") {
messageLog.poll(1, TimeUnit.SECONDS)
messageLog.poll(5, TimeUnit.SECONDS)
}
expect("ping") {
messageLog.poll(1, TimeUnit.SECONDS)
messageLog.poll(5, TimeUnit.SECONDS)
}
intercept[RuntimeException] {
pingpong2 !! (Die, 100)
pingpong2 !! (Die, 5000)
}
expect("DIE") {
messageLog.poll(1, TimeUnit.SECONDS)
messageLog.poll(5, TimeUnit.SECONDS)
}
expect("pong") {
(pingpong1 !! (Ping, 100)).getOrElse("nil")
(pingpong1 !! (Ping, 5000)).getOrElse("nil")
}
expect("pong") {
(pingpong2 !! (Ping, 100)).getOrElse("nil")
(pingpong2 !! (Ping, 5000)).getOrElse("nil")
}
expect("pong") {
(pingpong3 !! (Ping, 100)).getOrElse("nil")
(pingpong3 !! (Ping, 5000)).getOrElse("nil")
}
expect("ping") {
messageLog.poll(1, TimeUnit.SECONDS)
messageLog.poll(5, TimeUnit.SECONDS)
}
expect("ping") {
messageLog.poll(1, TimeUnit.SECONDS)
messageLog.poll(5, TimeUnit.SECONDS)
}
expect("ping") {
messageLog.poll(1, TimeUnit.SECONDS)
messageLog.poll(5, TimeUnit.SECONDS)
}
}
@ -266,17 +270,17 @@ class SupervisorSpec extends JUnitSuite {
val sup = getMultipleActorsAllForOneConf
sup.start
intercept[RuntimeException] {
pingpong2 !! (Die, 100)
pingpong2 !! (Die, 5000)
}
expect("DIE") {
messageLog.poll(1, TimeUnit.SECONDS)
messageLog.poll(5, TimeUnit.SECONDS)
}
expect("DIE") {
messageLog.poll(1, TimeUnit.SECONDS)
messageLog.poll(5, TimeUnit.SECONDS)
}
expect("DIE") {
messageLog.poll(1, TimeUnit.SECONDS)
messageLog.poll(5, TimeUnit.SECONDS)
}
}
@ -285,59 +289,59 @@ class SupervisorSpec extends JUnitSuite {
val sup = getMultipleActorsAllForOneConf
sup.start
expect("pong") {
(pingpong1 !! (Ping, 100)).getOrElse("nil")
(pingpong1 !! (Ping, 5000)).getOrElse("nil")
}
expect("pong") {
(pingpong2 !! (Ping, 100)).getOrElse("nil")
(pingpong2 !! (Ping, 5000)).getOrElse("nil")
}
expect("pong") {
(pingpong3 !! (Ping, 100)).getOrElse("nil")
(pingpong3 !! (Ping, 5000)).getOrElse("nil")
}
expect("ping") {
messageLog.poll(1, TimeUnit.SECONDS)
messageLog.poll(5, TimeUnit.SECONDS)
}
expect("ping") {
messageLog.poll(1, TimeUnit.SECONDS)
messageLog.poll(5, TimeUnit.SECONDS)
}
expect("ping") {
messageLog.poll(1, TimeUnit.SECONDS)
messageLog.poll(5, TimeUnit.SECONDS)
}
intercept[RuntimeException] {
pingpong2 !! (Die, 100)
pingpong2 !! (Die, 5000)
}
expect("DIE") {
messageLog.poll(1, TimeUnit.SECONDS)
messageLog.poll(5, TimeUnit.SECONDS)
}
expect("DIE") {
messageLog.poll(1, TimeUnit.SECONDS)
messageLog.poll(5, TimeUnit.SECONDS)
}
expect("DIE") {
messageLog.poll(1, TimeUnit.SECONDS)
messageLog.poll(5, TimeUnit.SECONDS)
}
expect("pong") {
(pingpong1 !! (Ping, 100)).getOrElse("nil")
(pingpong1 !! (Ping, 5000)).getOrElse("nil")
}
expect("pong") {
(pingpong2 !! (Ping, 100)).getOrElse("nil")
(pingpong2 !! (Ping, 5000)).getOrElse("nil")
}
expect("pong") {
(pingpong3 !! (Ping, 100)).getOrElse("nil")
(pingpong3 !! (Ping, 5000)).getOrElse("nil")
}
expect("ping") {
messageLog.poll(1, TimeUnit.SECONDS)
messageLog.poll(5, TimeUnit.SECONDS)
}
expect("ping") {
messageLog.poll(1, TimeUnit.SECONDS)
messageLog.poll(5, TimeUnit.SECONDS)
}
expect("ping") {
messageLog.poll(1, TimeUnit.SECONDS)
messageLog.poll(5, TimeUnit.SECONDS)
}
}
@ -348,7 +352,7 @@ class SupervisorSpec extends JUnitSuite {
pingpong1 ! Die
expect("DIE") {
messageLog.poll(1, TimeUnit.SECONDS)
messageLog.poll(5, TimeUnit.SECONDS)
}
}
@ -359,83 +363,82 @@ class SupervisorSpec extends JUnitSuite {
pingpong1 ! OneWay
expect("oneway") {
oneWayLog.poll(1, TimeUnit.SECONDS)
oneWayLog.poll(5, TimeUnit.SECONDS)
}
pingpong1 ! Die
expect("DIE") {
messageLog.poll(1, TimeUnit.SECONDS)
messageLog.poll(5, TimeUnit.SECONDS)
}
pingpong1 ! OneWay
expect("oneway") {
oneWayLog.poll(1, TimeUnit.SECONDS)
oneWayLog.poll(5, TimeUnit.SECONDS)
}
}
/*
@Test def shouldRestartKilledActorsForNestedSupervisorHierarchy = {
clearMessageLogs
val sup = getNestedSupervisorsAllForOneConf
sup.start
expect("pong") {
(pingpong1 !! (Ping, 100)).getOrElse("nil")
(pingpong1 !! (Ping, 5000)).getOrElse("nil")
}
expect("pong") {
(pingpong2 !! (Ping, 100)).getOrElse("nil")
(pingpong2 !! (Ping, 5000)).getOrElse("nil")
}
expect("pong") {
(pingpong3 !! (Ping, 100)).getOrElse("nil")
(pingpong3 !! (Ping, 5000)).getOrElse("nil")
}
expect("ping") {
messageLog.poll(1, TimeUnit.SECONDS)
messageLog.poll(5, TimeUnit.SECONDS)
}
expect("ping") {
messageLog.poll(1, TimeUnit.SECONDS)
messageLog.poll(5, TimeUnit.SECONDS)
}
expect("ping") {
messageLog.poll(1, TimeUnit.SECONDS)
messageLog.poll(5, TimeUnit.SECONDS)
}
intercept[RuntimeException] {
pingpong2 !! (Die, 100)
pingpong2 !! (Die, 5000)
}
expect("DIE") {
messageLog.poll(1 , TimeUnit.SECONDS)
messageLog.poll(5 , TimeUnit.SECONDS)
}
expect("DIE") {
messageLog.poll(1, TimeUnit.SECONDS)
messageLog.poll(5, TimeUnit.SECONDS)
}
expect("DIE") {
messageLog.poll(1, TimeUnit.SECONDS)
messageLog.poll(5, TimeUnit.SECONDS)
}
expect("pong") {
(pingpong1 !! (Ping, 100)).getOrElse("nil")
(pingpong1 !! (Ping, 5000)).getOrElse("nil")
}
expect("pong") {
(pingpong2 !! (Ping, 100)).getOrElse("nil")
(pingpong2 !! (Ping, 5000)).getOrElse("nil")
}
expect("pong") {
(pingpong3 !! (Ping, 100)).getOrElse("nil")
(pingpong3 !! (Ping, 5000)).getOrElse("nil")
}
expect("ping") {
messageLog.poll(1, TimeUnit.SECONDS)
messageLog.poll(5, TimeUnit.SECONDS)
}
expect("ping") {
messageLog.poll(1, TimeUnit.SECONDS)
messageLog.poll(5, TimeUnit.SECONDS)
}
expect("ping") {
messageLog.poll(1, TimeUnit.SECONDS)
messageLog.poll(5, TimeUnit.SECONDS)
}
}
*/
// =============================================
// Create some supervisors with different configurations
@ -444,7 +447,7 @@ class SupervisorSpec extends JUnitSuite {
Supervisor(
SupervisorConfig(
RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])),
RestartStrategy(AllForOne, 3, 5000, List(classOf[Exception])),
Supervise(
pingpong1,
LifeCycle(Permanent))
@ -456,7 +459,7 @@ class SupervisorSpec extends JUnitSuite {
Supervisor(
SupervisorConfig(
RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])),
RestartStrategy(OneForOne, 3, 5000, List(classOf[Exception])),
Supervise(
pingpong1,
LifeCycle(Permanent))
@ -470,7 +473,7 @@ class SupervisorSpec extends JUnitSuite {
Supervisor(
SupervisorConfig(
RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])),
RestartStrategy(AllForOne, 3, 5000, List(classOf[Exception])),
Supervise(
pingpong1,
LifeCycle(Permanent))
@ -492,9 +495,9 @@ class SupervisorSpec extends JUnitSuite {
Supervisor(
SupervisorConfig(
RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])),
RestartStrategy(OneForOne, 3, 5000, List(classOf[Exception])),
Supervise(
pingpong1,
pingpong3,
LifeCycle(Permanent))
::
Supervise(
@ -502,7 +505,7 @@ class SupervisorSpec extends JUnitSuite {
LifeCycle(Permanent))
::
Supervise(
pingpong3,
pingpong1,
LifeCycle(Permanent))
:: Nil))
}
@ -514,13 +517,13 @@ class SupervisorSpec extends JUnitSuite {
Supervisor(
SupervisorConfig(
RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])),
RestartStrategy(AllForOne, 3, 5000, List(classOf[Exception])),
Supervise(
pingpong1,
LifeCycle(Permanent))
::
SupervisorConfig(
RestartStrategy(AllForOne, 3, 100, Nil),
RestartStrategy(AllForOne, 3, 5000, Nil),
Supervise(
pingpong2,
LifeCycle(Permanent))