diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 594c2972db..09b340a3b9 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -76,6 +76,7 @@ case class UnlinkAndStop(child: ActorRef) extends LifeCycleMessage case object Kill extends LifeCycleMessage // Exceptions for Actors +class ActorStartException private[akka](message: String) extends RuntimeException(message) class ActorKilledException private[akka](message: String) extends RuntimeException(message) class ActorInitializationException private[akka](message: String) extends RuntimeException(message) @@ -281,8 +282,8 @@ trait Actor extends Logging { * */ val self: ActorRef = optionSelf.get - self.id = getClass.getName + import self._ /** * User overridable callback/setting. diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index 6971250381..dfd587648b 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -101,9 +101,8 @@ trait ActorRef extends TransactionManagement { // Only mutable for RemoteServer in order to maintain identity across nodes @volatile protected[akka] var _uuid = UUID.newUuid.toString @volatile protected[this] var _isRunning = false - @volatile protected[this] var _isSuspended = true @volatile protected[this] var _isShutDown = false - @volatile protected[akka] var _isKilled = false + @volatile protected[akka] var _isBeingRestarted = false @volatile protected[akka] var _homeAddress = new InetSocketAddress(RemoteServer.HOSTNAME, RemoteServer.PORT) @volatile protected[akka] var startOnCreation = false @@ -213,9 +212,9 @@ trait ActorRef extends TransactionManagement { protected[akka] def replyTo_=(rt: Option[Either[ActorRef, CompletableFuture[Any]]]) = guard.withGuard { _replyTo = rt } /** - * Is the actor killed? + * Is the actor being restarted? */ - def isKilled: Boolean = _isKilled + def isBeingRestarted: Boolean = _isBeingRestarted /** * Is the actor running? @@ -252,9 +251,9 @@ trait ActorRef extends TransactionManagement { *

*/ def !(message: Any)(implicit sender: Option[ActorRef] = None) = { - if (isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") if (isRunning) postMessageToMailbox(message, sender) - else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") + else throw new ActorInitializationException( + "Actor has not been started, you need to invoke 'actor.start' before using it") } /** @@ -270,7 +269,6 @@ trait ActorRef extends TransactionManagement { * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ def !![T](message: Any, timeout: Long): Option[T] = { - if (isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") if (isRunning) { val future = postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, None) val isActiveObject = message.isInstanceOf[Invocation] @@ -287,9 +285,8 @@ trait ActorRef extends TransactionManagement { if (future.exception.isDefined) throw future.exception.get._2 else future.result - } - else throw new IllegalStateException( - "Actor has not been started, you need to invoke 'actor.start' before using it") + } else throw new ActorInitializationException( + "Actor has not been started, you need to invoke 'actor.start' before using it") } /** @@ -318,9 +315,8 @@ trait ActorRef extends TransactionManagement { * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ def !!![T](message: Any): Future[T] = { - if (isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") if (isRunning) postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, None) - else throw new IllegalStateException( + else throw new ActorInitializationException( "Actor has not been started, you need to invoke 'actor.start' before using it") } @@ -330,14 +326,13 @@ trait ActorRef extends TransactionManagement { * Works with '!', '!!' and '!!!'. */ def forward(message: Any)(implicit sender: Some[ActorRef]) = { - if (isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") if (isRunning) { sender.get.replyTo match { case Some(Left(actorRef)) => postMessageToMailbox(message, Some(actorRef)) case Some(Right(future)) => postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, Some(future)) case _ => throw new IllegalStateException("Can't forward message when initial sender is not an actor") } - } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") + } else throw new ActorInitializationException("Actor has not been started, you need to invoke 'actor.start' before using it") } /** @@ -643,8 +638,8 @@ sealed class LocalActorRef private[akka]( * Sets the dispatcher for this actor. Needs to be invoked before the actor is started. */ def dispatcher_=(md: MessageDispatcher): Unit = guard.withGuard { - if (!isRunning) _dispatcher = md - else throw new IllegalArgumentException( + if (!isRunning || isBeingRestarted) _dispatcher = md + else throw new ActorInitializationException( "Can not swap dispatcher for " + toString + " after it has been started") } @@ -657,21 +652,20 @@ sealed class LocalActorRef private[akka]( * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host. */ def makeRemote(hostname: String, port: Int): Unit = - if (isRunning) throw new IllegalStateException( + if (!isRunning || isBeingRestarted) makeRemote(new InetSocketAddress(hostname, port)) + else throw new ActorInitializationException( "Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.") - else makeRemote(new InetSocketAddress(hostname, port)) /** * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host. */ def makeRemote(address: InetSocketAddress): Unit = guard.withGuard { - if (isRunning) throw new IllegalStateException( - "Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.") - else { + if (!isRunning || isBeingRestarted) { _remoteAddress = Some(address) RemoteClient.register(address.getHostName, address.getPort, uuid) homeAddress = (RemoteServer.HOSTNAME, RemoteServer.PORT) - } + } else throw new ActorInitializationException( + "Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.") } /** @@ -683,9 +677,9 @@ sealed class LocalActorRef private[akka]( * */ def makeTransactionRequired = guard.withGuard { - if (isRunning) throw new IllegalArgumentException( + if (!isRunning || isBeingRestarted) isTransactor = true + else throw new ActorInitializationException( "Can not make actor transaction required after it has been started") - else isTransactor = true } /** @@ -704,7 +698,7 @@ sealed class LocalActorRef private[akka]( * Starts up the actor and its message queue. */ def start: ActorRef = guard.withGuard { - if (isShutdown) throw new IllegalStateException( + if (isShutdown) throw new ActorStartException( "Can't restart an actor that has been shut down with 'stop' or 'exit'") if (!isRunning) { if (!isInInitialization) initializeActorInstance @@ -726,7 +720,7 @@ sealed class LocalActorRef private[akka]( remoteAddress.foreach(address => RemoteClient.unregister( address.getHostName, address.getPort, uuid)) RemoteNode.unregister(this) - } + } else if (isBeingRestarted) throw new ActorKilledException("Actor [" + toString + "] is being restarted.") } /** @@ -915,9 +909,8 @@ sealed class LocalActorRef private[akka]( val invocation = new MessageInvocation(this, message, senderOption.map(Left(_)), transactionSet.get) if (dispatcher.usesActorMailbox) { _mailbox.add(invocation) - if (_isSuspended) invocation.send - } - else invocation.send + invocation.send + } else invocation.send } } @@ -990,7 +983,7 @@ sealed class LocalActorRef private[akka]( "No handler matching message [" + message + "] in " + toString) } catch { case e => - _isKilled = true + _isBeingRestarted = true Actor.log.error(e, "Could not invoke actor [%s]", toString) // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client if (_supervisor.isDefined) _supervisor.get ! Exit(this, e) @@ -1038,6 +1031,7 @@ sealed class LocalActorRef private[akka]( } catch { case e: IllegalStateException => {} case e => + _isBeingRestarted = true // abort transaction set if (isTransactionSetInScope) try { getTransactionSetInScope.abort @@ -1080,6 +1074,7 @@ sealed class LocalActorRef private[akka]( } protected[akka] def restart(reason: Throwable): Unit = { + _isBeingRestarted = true Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id) restartLinkedActors(reason) val failedActor = actorInstance.get @@ -1087,18 +1082,16 @@ sealed class LocalActorRef private[akka]( Actor.log.debug("Restarting linked actors for actor [%s].", id) Actor.log.debug("Invoking 'preRestart' for failed actor instance [%s].", id) failedActor.preRestart(reason) - failedActor.shutdown - _isRunning = false val freshActor = newActor freshActor.synchronized { + initializeActorInstance freshActor.init freshActor.initTransactionalState - _isRunning = true actorInstance.set(freshActor) Actor.log.debug("Invoking 'postRestart' for new actor instance [%s].", id) freshActor.postRestart(reason) } - _isKilled = false + _isBeingRestarted = false } } @@ -1146,7 +1139,7 @@ sealed class LocalActorRef private[akka]( protected[akka] def linkedActorsAsList: List[ActorRef] = linkedActors.values.toArray.toList.asInstanceOf[List[ActorRef]] - private def initializeActorInstance = if (!isRunning) { + private def initializeActorInstance = if (!isRunning || isBeingRestarted) { dispatcher.register(this) dispatcher.start actor.init // run actor init and initTransactionalState callbacks diff --git a/akka-core/src/main/scala/actor/Supervisor.scala b/akka-core/src/main/scala/actor/Supervisor.scala index 4d2fd49541..fa4da120c8 100644 --- a/akka-core/src/main/scala/actor/Supervisor.scala +++ b/akka-core/src/main/scala/actor/Supervisor.scala @@ -11,8 +11,7 @@ import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.dispatch.Dispatchers import se.scalablesolutions.akka.remote.RemoteServer import Actor._ - -import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap} /** * Factory object for creating supervisors declarative. It creates instances of the 'Supervisor' class. @@ -132,7 +131,6 @@ object SupervisorFactory { } } - /** * For internal use only. * @@ -146,7 +144,7 @@ class SupervisorFactory private[akka] (val config: SupervisorConfig) extends Log def newInstanceFor(config: SupervisorConfig): Supervisor = { val (handler, trapExits) = SupervisorFactory.retrieveFaultHandlerAndTrapExitsFrom(config) val supervisor = new Supervisor(handler, trapExits) - supervisor.configure(config, this) + supervisor.configure(config) supervisor.start supervisor } @@ -168,8 +166,9 @@ sealed class Supervisor private[akka] ( handler: FaultHandlingStrategy, trapExceptions: List[Class[_ <: Throwable]]) extends Configurator { - private val children = new ConcurrentHashMap[String, List[ActorRef]] - private val supervisor = SupervisorActor(handler, trapExceptions) + private val childActors = new ConcurrentHashMap[String, List[ActorRef]] + private val childSupervisors = new CopyOnWriteArrayList[Supervisor] + private[akka] val supervisor = SupervisorActor(handler, trapExceptions) def uuid = supervisor.uuid @@ -184,42 +183,39 @@ sealed class Supervisor private[akka] ( def unlink(child: ActorRef) = supervisor ! Unlink(child) - def getInstance[T](clazz: Class[T]): List[T] = children.get(clazz.getName).asInstanceOf[List[T]] + // FIXME recursive search + do not fix if we remove feature that Actors can be RESTful usin Jersey annotations + def getInstance[T](clazz: Class[T]): List[T] = childActors.get(clazz.getName).asInstanceOf[List[T]] - def getComponentInterfaces: List[Class[_]] = - children.values.toArray.toList.asInstanceOf[List[List[AnyRef]]].flatten.map(_.getClass) + // FIXME recursive search + do not fix if we remove feature that Actors can be RESTful usin Jersey annotations + def getComponentInterfaces: List[Class[_]] = + childActors.values.toArray.toList.asInstanceOf[List[List[AnyRef]]].flatten.map(_.getClass) - def isDefined(clazz: Class[_]): Boolean = children.containsKey(clazz.getName) + // FIXME recursive search + do not fix if we remove feature that Actors can be RESTful usin Jersey annotations + def isDefined(clazz: Class[_]): Boolean = childActors.containsKey(clazz.getName) - def configure(config: SupervisorConfig, factory: SupervisorFactory) = config match { + def configure(config: SupervisorConfig): Unit = config match { case SupervisorConfig(_, servers) => servers.map(server => server match { case Supervise(actorRef, lifeCycle, remoteAddress) => val className = actorRef.actor.getClass.getName val currentActors = { - val list = children.get(className) + val list = childActors.get(className) if (list eq null) List[ActorRef]() else list } - children.put(className, actorRef :: currentActors) + childActors.put(className, actorRef :: currentActors) actorRef.lifeCycle = Some(lifeCycle) supervisor ! Link(actorRef) - remoteAddress.foreach(address => RemoteServer.actorsFor( - RemoteServer.Address(address.hostname, address.port)) - .actors.put(actorRef.id, actorRef)) + remoteAddress.foreach { address => RemoteServer + .actorsFor(RemoteServer.Address(address.hostname, address.port)) + .actors.put(actorRef.id, actorRef) + } - case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration - val childSupervisor = SupervisorActor(supervisorConfig) - childSupervisor.lifeCycle = Some(LifeCycle(Permanent)) - val className = childSupervisor.uuid - val currentSupervisors = { - val list = children.get(className) - if (list eq null) List[ActorRef]() - else list - } - children.put(className, childSupervisor :: currentSupervisors) - supervisor ! Link(childSupervisor) + case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration + val childSupervisor = Supervisor(supervisorConfig) + supervisor ! Link(childSupervisor.supervisor) + childSupervisors.add(childSupervisor) }) } } @@ -240,18 +236,18 @@ sealed class Supervisor private[akka] ( */ final class SupervisorActor private[akka] ( handler: FaultHandlingStrategy, - trapExceptions: List[Class[_ <: Throwable]]) - extends Actor { - self.dispatcher = Dispatchers.newThreadBasedDispatcher(self) - self.trapExit = trapExceptions - self.faultHandler = Some(handler) + trapExceptions: List[Class[_ <: Throwable]]) extends Actor { + import self._ +// dispatcher = Dispatchers.newThreadBasedDispatcher(self) + trapExit = trapExceptions + faultHandler = Some(handler) - override def shutdown: Unit = self.shutdownLinkedActors + override def shutdown: Unit = shutdownLinkedActors def receive = { - case Link(child) => self.startLink(child) - case Unlink(child) => self.unlink(child) - case UnlinkAndStop(child) => self.unlink(child); child.stop + case Link(child) => startLink(child) + case Unlink(child) => unlink(child) + case UnlinkAndStop(child) => unlink(child); child.stop case unknown => throw new IllegalArgumentException( "Supervisor can only respond to 'Link' and 'Unlink' messages. Unknown message [" + unknown + "]") } diff --git a/akka-core/src/test/scala/RemoteSupervisorSpec.scala b/akka-core/src/test/scala/RemoteSupervisorSpec.scala index 1ec5f6c48c..9cc2d13441 100644 --- a/akka-core/src/test/scala/RemoteSupervisorSpec.scala +++ b/akka-core/src/test/scala/RemoteSupervisorSpec.scala @@ -40,7 +40,6 @@ object Log { } @serializable class RemotePingPong2Actor extends Actor { - self.dispatcher = Dispatchers.newThreadBasedDispatcher(self) def receive = { case BinaryString("Ping") => Log.messageLog.put("ping") @@ -55,7 +54,6 @@ object Log { } @serializable class RemotePingPong3Actor extends Actor { - self.dispatcher = Dispatchers.newThreadBasedDispatcher(self) def receive = { case BinaryString("Ping") => Log.messageLog.put("ping") diff --git a/akka-core/src/test/scala/SerializerSpec.scala b/akka-core/src/test/scala/SerializerSpec.scala index 0efc05192c..0dda4565f9 100644 --- a/akka-core/src/test/scala/SerializerSpec.scala +++ b/akka-core/src/test/scala/SerializerSpec.scala @@ -7,12 +7,12 @@ import org.junit.{Test, Before, After} import scala.reflect.BeanInfo -@BeanInfo +//@BeanInfo case class Foo(foo: String) { def this() = this(null) } -@BeanInfo +//@BeanInfo case class MyMessage(val id: String, val value: Tuple2[String, Int]) { private def this() = this(null, null) } diff --git a/akka-core/src/test/scala/SupervisorSpec.scala b/akka-core/src/test/scala/SupervisorSpec.scala index 23f33d9214..7fb97ae40d 100644 --- a/akka-core/src/test/scala/SupervisorSpec.scala +++ b/akka-core/src/test/scala/SupervisorSpec.scala @@ -23,11 +23,13 @@ object SupervisorSpec { } class PingPong1Actor extends Actor { - self.timeout = 1000 + import self._ + dispatcher = Dispatchers.newThreadBasedDispatcher(self) + timeout = 1000 def receive = { case Ping => messageLog.put("ping") - self.reply("pong") + reply("pong") case OneWay => oneWayLog.put("oneway") @@ -41,11 +43,12 @@ object SupervisorSpec { } class PingPong2Actor extends Actor { - self.timeout = 1000 + import self._ + timeout = 1000 def receive = { case Ping => messageLog.put("ping") - self.reply("pong") + reply("pong") case Die => throw new RuntimeException("DIE") } @@ -55,11 +58,12 @@ object SupervisorSpec { } class PingPong3Actor extends Actor { - self.timeout = 1000 + import self._ + timeout = 1000 def receive = { case Ping => messageLog.put("ping") - self.reply("pong") + reply("pong") case Die => throw new RuntimeException("DIE") } @@ -86,7 +90,7 @@ class SupervisorSpec extends JUnitSuite { sup.start expect("pong") { - (pingpong1 !! (Ping, 100)).getOrElse("nil") + (pingpong1 !! (Ping, 5000)).getOrElse("nil") } } @@ -96,7 +100,7 @@ class SupervisorSpec extends JUnitSuite { sup.start expect("pong") { - (pingpong1 !! (Ping, 100)).getOrElse("nil") + (pingpong1 !! (Ping, 5000)).getOrElse("nil") } } @@ -105,11 +109,11 @@ class SupervisorSpec extends JUnitSuite { val sup = getSingleActorOneForOneSupervisor sup.start intercept[RuntimeException] { - pingpong1 !! (Die, 100) + pingpong1 !! (Die, 5000) } expect("DIE") { - messageLog.poll(1, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } } @@ -118,25 +122,25 @@ class SupervisorSpec extends JUnitSuite { val sup = getSingleActorOneForOneSupervisor sup.start expect("pong") { - (pingpong1 !! (Ping, 100)).getOrElse("nil") + (pingpong1 !! (Ping, 5000)).getOrElse("nil") } expect("ping") { - messageLog.poll(1, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } intercept[RuntimeException] { - pingpong1 !! (Die, 100) + pingpong1 !! (Die, 5000) } expect("DIE") { - messageLog.poll(1, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } expect("pong") { - (pingpong1 !! (Ping, 100)).getOrElse("nil") + (pingpong1 !! (Ping, 5000)).getOrElse("nil") } expect("ping") { - messageLog.poll(1, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } } @@ -145,11 +149,11 @@ class SupervisorSpec extends JUnitSuite { val sup = getSingleActorAllForOneSupervisor sup.start intercept[RuntimeException] { - pingpong1 !! (Die, 100) + pingpong1 !! (Die, 5000) } expect("DIE") { - messageLog.poll(1, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } } @@ -158,25 +162,25 @@ class SupervisorSpec extends JUnitSuite { val sup = getSingleActorAllForOneSupervisor sup.start expect("pong") { - (pingpong1 !! (Ping, 100)).getOrElse("nil") + (pingpong1 !! (Ping, 5000)).getOrElse("nil") } expect("ping") { - messageLog.poll(1, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } intercept[RuntimeException] { - pingpong1 !! (Die, 100) + pingpong1 !! (Die, 5000) } expect("DIE") { - messageLog.poll(1, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } expect("pong") { - (pingpong1 !! (Ping, 100)).getOrElse("nil") + (pingpong1 !! (Ping, 5000)).getOrElse("nil") } expect("ping") { - messageLog.poll(1, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } } @@ -185,79 +189,79 @@ class SupervisorSpec extends JUnitSuite { val sup = getMultipleActorsOneForOneConf sup.start intercept[RuntimeException] { - pingpong1 !! (Die, 100) + pingpong1 !! (Die, 5000) } expect("DIE") { - messageLog.poll(1, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } } -/* + @Test def shouldKillMultipleActorsOneForOne2 = { clearMessageLogs val sup = getMultipleActorsOneForOneConf sup.start intercept[RuntimeException] { - pingpong3 !! (Die, 100) + pingpong3 !! (Die, 5000) } expect("DIE") { - messageLog.poll(1, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } } -*/ + @Test def shouldKillCallMultipleActorsOneForOne = { clearMessageLogs val sup = getMultipleActorsOneForOneConf sup.start expect("pong") { - (pingpong1 !! (Ping, 100)).getOrElse("nil") + (pingpong1 !! (Ping, 5000)).getOrElse("nil") } expect("pong") { - (pingpong2 !! (Ping, 100)).getOrElse("nil") + (pingpong2 !! (Ping, 5000)).getOrElse("nil") } expect("pong") { - (pingpong3 !! (Ping, 100)).getOrElse("nil") + (pingpong3 !! (Ping, 5000)).getOrElse("nil") } expect("ping") { - messageLog.poll(1, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } expect("ping") { - messageLog.poll(1, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } expect("ping") { - messageLog.poll(1, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } intercept[RuntimeException] { - pingpong2 !! (Die, 100) + pingpong2 !! (Die, 5000) } expect("DIE") { - messageLog.poll(1, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } expect("pong") { - (pingpong1 !! (Ping, 100)).getOrElse("nil") + (pingpong1 !! (Ping, 5000)).getOrElse("nil") } expect("pong") { - (pingpong2 !! (Ping, 100)).getOrElse("nil") + (pingpong2 !! (Ping, 5000)).getOrElse("nil") } expect("pong") { - (pingpong3 !! (Ping, 100)).getOrElse("nil") + (pingpong3 !! (Ping, 5000)).getOrElse("nil") } expect("ping") { - messageLog.poll(1, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } expect("ping") { - messageLog.poll(1, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } expect("ping") { - messageLog.poll(1, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } } @@ -266,17 +270,17 @@ class SupervisorSpec extends JUnitSuite { val sup = getMultipleActorsAllForOneConf sup.start intercept[RuntimeException] { - pingpong2 !! (Die, 100) + pingpong2 !! (Die, 5000) } expect("DIE") { - messageLog.poll(1, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } expect("DIE") { - messageLog.poll(1, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } expect("DIE") { - messageLog.poll(1, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } } @@ -285,59 +289,59 @@ class SupervisorSpec extends JUnitSuite { val sup = getMultipleActorsAllForOneConf sup.start expect("pong") { - (pingpong1 !! (Ping, 100)).getOrElse("nil") + (pingpong1 !! (Ping, 5000)).getOrElse("nil") } expect("pong") { - (pingpong2 !! (Ping, 100)).getOrElse("nil") + (pingpong2 !! (Ping, 5000)).getOrElse("nil") } expect("pong") { - (pingpong3 !! (Ping, 100)).getOrElse("nil") + (pingpong3 !! (Ping, 5000)).getOrElse("nil") } expect("ping") { - messageLog.poll(1, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } expect("ping") { - messageLog.poll(1, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } expect("ping") { - messageLog.poll(1, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } intercept[RuntimeException] { - pingpong2 !! (Die, 100) + pingpong2 !! (Die, 5000) } expect("DIE") { - messageLog.poll(1, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } expect("DIE") { - messageLog.poll(1, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } expect("DIE") { - messageLog.poll(1, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } expect("pong") { - (pingpong1 !! (Ping, 100)).getOrElse("nil") + (pingpong1 !! (Ping, 5000)).getOrElse("nil") } expect("pong") { - (pingpong2 !! (Ping, 100)).getOrElse("nil") + (pingpong2 !! (Ping, 5000)).getOrElse("nil") } expect("pong") { - (pingpong3 !! (Ping, 100)).getOrElse("nil") + (pingpong3 !! (Ping, 5000)).getOrElse("nil") } expect("ping") { - messageLog.poll(1, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } expect("ping") { - messageLog.poll(1, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } expect("ping") { - messageLog.poll(1, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } } @@ -348,7 +352,7 @@ class SupervisorSpec extends JUnitSuite { pingpong1 ! Die expect("DIE") { - messageLog.poll(1, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } } @@ -359,83 +363,82 @@ class SupervisorSpec extends JUnitSuite { pingpong1 ! OneWay expect("oneway") { - oneWayLog.poll(1, TimeUnit.SECONDS) + oneWayLog.poll(5, TimeUnit.SECONDS) } pingpong1 ! Die expect("DIE") { - messageLog.poll(1, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } pingpong1 ! OneWay expect("oneway") { - oneWayLog.poll(1, TimeUnit.SECONDS) + oneWayLog.poll(5, TimeUnit.SECONDS) } } -/* @Test def shouldRestartKilledActorsForNestedSupervisorHierarchy = { clearMessageLogs val sup = getNestedSupervisorsAllForOneConf sup.start expect("pong") { - (pingpong1 !! (Ping, 100)).getOrElse("nil") + (pingpong1 !! (Ping, 5000)).getOrElse("nil") } expect("pong") { - (pingpong2 !! (Ping, 100)).getOrElse("nil") + (pingpong2 !! (Ping, 5000)).getOrElse("nil") } expect("pong") { - (pingpong3 !! (Ping, 100)).getOrElse("nil") + (pingpong3 !! (Ping, 5000)).getOrElse("nil") } expect("ping") { - messageLog.poll(1, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } expect("ping") { - messageLog.poll(1, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } expect("ping") { - messageLog.poll(1, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } intercept[RuntimeException] { - pingpong2 !! (Die, 100) + pingpong2 !! (Die, 5000) } expect("DIE") { - messageLog.poll(1 , TimeUnit.SECONDS) + messageLog.poll(5 , TimeUnit.SECONDS) } expect("DIE") { - messageLog.poll(1, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } expect("DIE") { - messageLog.poll(1, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } expect("pong") { - (pingpong1 !! (Ping, 100)).getOrElse("nil") + (pingpong1 !! (Ping, 5000)).getOrElse("nil") } expect("pong") { - (pingpong2 !! (Ping, 100)).getOrElse("nil") + (pingpong2 !! (Ping, 5000)).getOrElse("nil") } expect("pong") { - (pingpong3 !! (Ping, 100)).getOrElse("nil") + (pingpong3 !! (Ping, 5000)).getOrElse("nil") } expect("ping") { - messageLog.poll(1, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } expect("ping") { - messageLog.poll(1, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } expect("ping") { - messageLog.poll(1, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } } -*/ + // ============================================= // Create some supervisors with different configurations @@ -444,7 +447,7 @@ class SupervisorSpec extends JUnitSuite { Supervisor( SupervisorConfig( - RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])), + RestartStrategy(AllForOne, 3, 5000, List(classOf[Exception])), Supervise( pingpong1, LifeCycle(Permanent)) @@ -456,7 +459,7 @@ class SupervisorSpec extends JUnitSuite { Supervisor( SupervisorConfig( - RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])), + RestartStrategy(OneForOne, 3, 5000, List(classOf[Exception])), Supervise( pingpong1, LifeCycle(Permanent)) @@ -470,7 +473,7 @@ class SupervisorSpec extends JUnitSuite { Supervisor( SupervisorConfig( - RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])), + RestartStrategy(AllForOne, 3, 5000, List(classOf[Exception])), Supervise( pingpong1, LifeCycle(Permanent)) @@ -492,9 +495,9 @@ class SupervisorSpec extends JUnitSuite { Supervisor( SupervisorConfig( - RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])), + RestartStrategy(OneForOne, 3, 5000, List(classOf[Exception])), Supervise( - pingpong1, + pingpong3, LifeCycle(Permanent)) :: Supervise( @@ -502,7 +505,7 @@ class SupervisorSpec extends JUnitSuite { LifeCycle(Permanent)) :: Supervise( - pingpong3, + pingpong1, LifeCycle(Permanent)) :: Nil)) } @@ -514,13 +517,13 @@ class SupervisorSpec extends JUnitSuite { Supervisor( SupervisorConfig( - RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])), + RestartStrategy(AllForOne, 3, 5000, List(classOf[Exception])), Supervise( pingpong1, LifeCycle(Permanent)) :: SupervisorConfig( - RestartStrategy(AllForOne, 3, 100, Nil), + RestartStrategy(AllForOne, 3, 5000, Nil), Supervise( pingpong2, LifeCycle(Permanent))