diff --git a/akka-core/src/main/scala/actor/ActiveObject.scala b/akka-core/src/main/scala/actor/ActiveObject.scala index 32dd70f425..cec0fb4c2d 100644 --- a/akka-core/src/main/scala/actor/ActiveObject.scala +++ b/akka-core/src/main/scala/actor/ActiveObject.scala @@ -317,7 +317,7 @@ object ActiveObject { throw new IllegalStateException("Can't link when the supervisor is not an active object")) val supervisedActor = actorFor(supervised).getOrElse( throw new IllegalStateException("Can't link when the supervised is not an active object")) - supervisorActor !! Link(supervisedActor) + supervisorActor.link(supervisedActor) } /** @@ -334,7 +334,7 @@ object ActiveObject { throw new IllegalStateException("Can't link when the supervised is not an active object")) supervisorActor.trapExit = trapExceptions.toList supervisorActor.faultHandler = Some(handler) - supervisorActor !! Link(supervisedActor) + supervisorActor.link(supervisedActor) } /** @@ -347,7 +347,7 @@ object ActiveObject { throw new IllegalStateException("Can't unlink when the supervisor is not an active object")) val supervisedActor = actorFor(supervised).getOrElse( throw new IllegalStateException("Can't unlink when the supervised is not an active object")) - supervisorActor !! Unlink(supervisedActor) + supervisorActor.unlink(supervisedActor) } /** diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index b23891b9bc..b66131123d 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -6,24 +6,8 @@ package se.scalablesolutions.akka.actor import se.scalablesolutions.akka.dispatch._ import se.scalablesolutions.akka.config.Config._ -import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy} import se.scalablesolutions.akka.config.ScalaConfig._ -import se.scalablesolutions.akka.stm.Transaction.Global._ -import se.scalablesolutions.akka.stm.TransactionManagement._ -import se.scalablesolutions.akka.stm.TransactionManagement -import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequestProtocol, RemoteReplyProtocol, ActorRefProtocol} -import se.scalablesolutions.akka.remote.{RemoteNode, RemoteServer, RemoteClient, RemoteProtocolBuilder, RemoteRequestProtocolIdFactory} -import se.scalablesolutions.akka.serialization.Serializer -import se.scalablesolutions.akka.util.{HashCode, Logging, UUID} - -import org.multiverse.api.ThreadLocalTransaction._ -import org.multiverse.commitbarriers.CountDownCommitBarrier - -import jsr166x.{Deque, ConcurrentLinkedDeque} - -import java.net.InetSocketAddress -import java.util.concurrent.locks.{Lock, ReentrantLock} -import java.util.{HashSet => JHashSet} +import se.scalablesolutions.akka.util.Logging /* // FIXME add support for ActorWithNestedReceive @@ -365,6 +349,7 @@ trait Actor extends Logging { case HotSwap(code) => self.hotswap = code case Restart(reason) => self.restart(reason) case Exit(dead, reason) => self.handleTrapExit(dead, reason) + case Link(child) => self.link(child) case Unlink(child) => self.unlink(child) case UnlinkAndStop(child) => self.unlink(child); child.stop case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message") diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index 7a92fcfe57..db2d0ed030 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -185,7 +185,7 @@ trait ActorRef extends TransactionManagement { /** * Holds the hot swapped partial function. */ - protected[akka] var hotswap: Option[Actor.Receive] = None // FIXME: _hotswap should be a stack + protected[akka] var hotswap: Option[PartialFunction[Any, Unit]] = None // FIXME: _hotswap should be a stack /** * User overridable callback/setting. @@ -1074,7 +1074,7 @@ sealed class LocalActorRef private[akka]( } protected[akka] def restart(reason: Throwable): Unit = { - _isBeingRestarted = true + //_isBeingRestarted = true Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id) restartLinkedActors(reason) val failedActor = actorInstance.get @@ -1084,7 +1084,6 @@ sealed class LocalActorRef private[akka]( failedActor.preRestart(reason) val freshActor = newActor freshActor.synchronized { - initializeActorInstance freshActor.init freshActor.initTransactionalState actorInstance.set(freshActor) @@ -1139,7 +1138,7 @@ sealed class LocalActorRef private[akka]( protected[akka] def linkedActorsAsList: List[ActorRef] = linkedActors.values.toArray.toList.asInstanceOf[List[ActorRef]] - private def initializeActorInstance = if (!isRunning || isBeingRestarted) { + private def initializeActorInstance = if (!isRunning) { dispatcher.register(this) dispatcher.start actor.init // run actor init and initTransactionalState callbacks diff --git a/akka-core/src/main/scala/actor/Supervisor.scala b/akka-core/src/main/scala/actor/Supervisor.scala index 0f61a7249f..3f05f18548 100644 --- a/akka-core/src/main/scala/actor/Supervisor.scala +++ b/akka-core/src/main/scala/actor/Supervisor.scala @@ -9,8 +9,11 @@ import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, F import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.remote.RemoteServer import Actor._ + import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap} +class SupervisorException private[akka](message: String) extends RuntimeException(message) + /** * Factory object for creating supervisors declarative. It creates instances of the 'Supervisor' class. * These are not actors, if you need a supervisor that is an Actor then you have to use the 'SupervisorActor' @@ -31,9 +34,12 @@ import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap} * Nil)) * * - * You can use the declaratively created Supervisor to link and unlink child children - * dynamically using the 'link' and 'unlink' methods. - * + * You dynamically link and unlink child children using the 'link' and 'unlink' methods. + *
+ * supervisor.link(child) + * supervisor.unlink(child) + *+ * * @author Jonas Bonér */ object Supervisor { @@ -47,12 +53,6 @@ object Supervisor { * Here is a sample on how to use the programmatic API (note that the supervisor is automatically started): *
* val supervisor = SupervisorActor(AllForOneStrategy(maxNrOfRetries, timeRange), Array(classOf[Throwable])) - * - * // link and unlink child actors dynamically - * supervisor ! Link(child1) // starts the actor if not started yet, starts and links atomically - * supervisor ! Unlink(child2) - * supervisor ! UnlinkAndStop(child3) - ** * Here is a sample on how to use the declarative API: *
@@ -66,15 +66,13 @@ object Supervisor {
* mySecondActor,
* LifeCycle(Permanent)) ::
* Nil))
- *
- * // link and unlink child actors dynamically
- * supervisor ! Link(child1) // starts the actor if not started yet, starts and links atomically
- * supervisor ! Unlink(child2)
- * supervisor ! UnlinkAndStop(child3)
*
- *
- * You can use the declaratively created Supervisor to link and unlink child children
- * dynamically using the 'link' and 'unlink' methods.
+ *
+ * You dynamically link and unlink child children using the 'link' and 'unlink' methods.
+ * + * supervisor.link(child) + * supervisor.unlink(child) + ** * @author Jonas Bonér */ @@ -163,6 +161,7 @@ class SupervisorFactory private[akka] (val config: SupervisorConfig) extends Log sealed class Supervisor private[akka] ( handler: FaultHandlingStrategy, trapExceptions: List[Class[_ <: Throwable]]) extends Configurator { + import Supervisor._ private val childActors = new ConcurrentHashMap[String, List[ActorRef]] private val childSupervisors = new CopyOnWriteArrayList[Supervisor] @@ -177,9 +176,9 @@ sealed class Supervisor private[akka] ( def shutdown: Unit = supervisor.stop - def link(child: ActorRef) = supervisor ! Link(child) + def link(child: ActorRef) = supervisor.link(child) - def unlink(child: ActorRef) = supervisor ! Unlink(child) + def unlink(child: ActorRef) = supervisor.unlink(child) // FIXME recursive search + do not fix if we remove feature that Actors can be RESTful usin Jersey annotations def getInstance[T](clazz: Class[T]): List[T] = childActors.get(clazz.getName).asInstanceOf[List[T]] @@ -204,7 +203,7 @@ sealed class Supervisor private[akka] ( } childActors.put(className, actorRef :: currentActors) actorRef.lifeCycle = Some(lifeCycle) - supervisor ! Link(actorRef) + supervisor.link(actorRef) remoteAddress.foreach { address => RemoteServer .actorsFor(RemoteServer.Address(address.hostname, address.port)) .actors.put(actorRef.id, actorRef) @@ -212,7 +211,7 @@ sealed class Supervisor private[akka] ( case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration val childSupervisor = Supervisor(supervisorConfig) - supervisor ! Link(childSupervisor.supervisor) + supervisor.link(childSupervisor.supervisor) childSupervisors.add(childSupervisor) }) } @@ -225,11 +224,14 @@ sealed class Supervisor private[akka] ( * Here is a sample on how to use it: *
* val supervisor = Supervisor(AllForOneStrategy(maxNrOfRetries, timeRange), Array(classOf[Throwable])) - * supervisor ! Link(child1) // starts the actor if not started yet, starts and links atomically - * supervisor ! Unlink(child2) - * supervisor ! UnlinkAndStop(child3) ** + * You dynamically link and unlink child children using the 'link' and 'unlink' methods. + *
+ * supervisor.link(child) + * supervisor.unlink(child) + *+ * * @author Jonas Bonér */ final class SupervisorActor private[akka] ( @@ -242,11 +244,8 @@ final class SupervisorActor private[akka] ( override def shutdown: Unit = shutdownLinkedActors def receive = { - case Link(child) => startLink(child) - case Unlink(child) => unlink(child) - case UnlinkAndStop(child) => unlink(child); child.stop - case unknown => throw new IllegalArgumentException( - "Supervisor can only respond to 'Link' and 'Unlink' messages. Unknown message [" + unknown + "]") + case unknown => throw new SupervisorException( + "SupervisorActor can not respond to messages. Unknown message [" + unknown + "]") } } diff --git a/akka-core/src/test/scala/InMemoryActorSpec.scala b/akka-core/src/test/scala/InMemoryActorSpec.scala index 46ddf5f237..433e959a73 100644 --- a/akka-core/src/test/scala/InMemoryActorSpec.scala +++ b/akka-core/src/test/scala/InMemoryActorSpec.scala @@ -7,24 +7,27 @@ import org.junit.Test import se.scalablesolutions.akka.stm.{TransactionalState, TransactionalMap, TransactionalRef, TransactionalVector} import Actor._ -case class GetMapState(key: String) -case object GetVectorState -case object GetVectorSize -case object GetRefState +object InMemoryActorSpec { + case class GetMapState(key: String) + case object GetVectorState + case object GetVectorSize + case object GetRefState -case class SetMapState(key: String, value: String) -case class SetVectorState(key: String) -case class SetRefState(key: String) -case class Success(key: String, value: String) -case class Failure(key: String, value: String, failer: ActorRef) + case class SetMapState(key: String, value: String) + case class SetVectorState(key: String) + case class SetRefState(key: String) + case class Success(key: String, value: String) + case class Failure(key: String, value: String, failer: ActorRef) -case class SetMapStateOneWay(key: String, value: String) -case class SetVectorStateOneWay(key: String) -case class SetRefStateOneWay(key: String) -case class SuccessOneWay(key: String, value: String) -case class FailureOneWay(key: String, value: String, failer: ActorRef) + case class SetMapStateOneWay(key: String, value: String) + case class SetVectorStateOneWay(key: String) + case class SetRefStateOneWay(key: String) + case class SuccessOneWay(key: String, value: String) + case class FailureOneWay(key: String, value: String, failer: ActorRef) -case object GetNotifier + case object GetNotifier +} +import InMemoryActorSpec._ class InMemStatefulActor(expectedInvocationCount: Int) extends Transactor { def this() = this(0) diff --git a/akka-core/src/test/scala/RemoteSupervisorSpec.scala b/akka-core/src/test/scala/RemoteSupervisorSpec.scala index acb8486aff..52d531dadf 100644 --- a/akka-core/src/test/scala/RemoteSupervisorSpec.scala +++ b/akka-core/src/test/scala/RemoteSupervisorSpec.scala @@ -16,19 +16,23 @@ import org.scalatest.junit.JUnitSuite import org.junit.Test object Log { - var messageLog: BlockingQueue[String] = new LinkedBlockingQueue[String] - var oneWayLog: String = "" + val messageLog: BlockingQueue[String] = new LinkedBlockingQueue[String] + val oneWayLog = new LinkedBlockingQueue[String] + + def clearMessageLogs { + messageLog.clear + oneWayLog.clear + } } @serializable class RemotePingPong1Actor extends Actor { -// self.dispatcher = Dispatchers.newThreadBasedDispatcher(self) def receive = { case BinaryString("Ping") => Log.messageLog.put("ping") self.reply("pong") case OneWay => - Log.oneWayLog += "oneway" + Log.oneWayLog.put("oneway") case BinaryString("Die") => throw new RuntimeException("DIE") @@ -85,260 +89,365 @@ class RemoteSupervisorSpec extends JUnitSuite { var pingpong2: ActorRef = _ var pingpong3: ActorRef = _ + import Log._ + @Test def shouldStartServer = { Log.messageLog.clear val sup = getSingleActorAllForOneSupervisor - sup.start expect("pong") { (pingpong1 !! BinaryString("Ping")).getOrElse("nil") } } + @Test def shouldStartServerForNestedSupervisorHierarchy = { + clearMessageLogs + val sup = getNestedSupervisorsAllForOneConf + sup.start + + expect("pong") { + (pingpong1 !! (BinaryString("Ping"), 5000)).getOrElse("nil") + } + } @Test def shouldKillSingleActorOneForOne = { - Log.messageLog.clear + clearMessageLogs val sup = getSingleActorOneForOneSupervisor - sup.start + intercept[RuntimeException] { - pingpong1 !! BinaryString("Die") + pingpong1 !! (BinaryString("Die"), 5000) } + expect("DIE") { - Log.messageLog.poll(5, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } } @Test def shouldCallKillCallSingleActorOneForOne = { - Log.messageLog.clear + clearMessageLogs val sup = getSingleActorOneForOneSupervisor - sup.start + expect("pong") { - (pingpong1 !! BinaryString("Ping")).getOrElse("nil") + (pingpong1 !! (BinaryString("Ping"), 5000)).getOrElse("nil") } expect("ping") { - Log.messageLog.poll(5, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } intercept[RuntimeException] { - pingpong1 !! BinaryString("Die") + pingpong1 !! (BinaryString("Die"), 5000) } expect("DIE") { - Log.messageLog.poll(5, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } expect("pong") { - (pingpong1 !! BinaryString("Ping")).getOrElse("nil") + (pingpong1 !! (BinaryString("Ping"), 5000)).getOrElse("nil") } expect("ping") { - Log.messageLog.poll(5, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } } @Test def shouldKillSingleActorAllForOne = { - Log.messageLog.clear + clearMessageLogs val sup = getSingleActorAllForOneSupervisor - sup.start + intercept[RuntimeException] { - pingpong1 !! BinaryString("Die") + pingpong1 !! (BinaryString("Die"), 5000) } expect("DIE") { - Log.messageLog.poll(5, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } } @Test def shouldCallKillCallSingleActorAllForOne = { - Log.messageLog.clear + clearMessageLogs val sup = getSingleActorAllForOneSupervisor - sup.start expect("pong") { - (pingpong1 !! BinaryString("Ping")).getOrElse("nil") + (pingpong1 !! (BinaryString("Ping"), 5000)).getOrElse("nil") } expect("ping") { - Log.messageLog.poll(5, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } - intercept[RuntimeException] { - pingpong1 !! BinaryString("Die") + pingpong1 !! (BinaryString("Die"), 5000) } expect("DIE") { - Log.messageLog.poll(5, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } - expect("pong") { - (pingpong1 !! BinaryString("Ping")).getOrElse("nil") + (pingpong1 !! (BinaryString("Ping"), 5000)).getOrElse("nil") } expect("ping") { - Log.messageLog.poll(5, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } } @Test def shouldKillMultipleActorsOneForOne1 = { - Log.messageLog.clear + clearMessageLogs val sup = getMultipleActorsOneForOneConf - sup.start + intercept[RuntimeException] { - pingpong1 !! BinaryString("Die") + pingpong1 !! (BinaryString("Die"), 5000) } expect("DIE") { - Log.messageLog.poll(5, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } } -/* + /* + // Uncomment when the same test passes in SupervisorSpec - pending bug @Test def shouldKillMultipleActorsOneForOne2 = { - Log.messageLog.clear + clearMessageLogs val sup = getMultipleActorsOneForOneConf - sup.start + intercept[RuntimeException] { - pingpong3 !! BinaryString("Die") + pingpong3 !! (BinaryString("Die"), 5000) } expect("DIE") { - Log.messageLog.poll(5, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } } */ - def tesCallKillCallMultipleActorsOneForOne = { - Log.messageLog.clear + @Test def shouldKillCallMultipleActorsOneForOne = { + clearMessageLogs val sup = getMultipleActorsOneForOneConf - sup.start + expect("pong") { - (pingpong1 !! BinaryString("Ping")).getOrElse("nil") + (pingpong1 !! (BinaryString("Ping"), 5000)).getOrElse("nil") } expect("pong") { - (pingpong2 !! BinaryString("Ping")).getOrElse("nil") + (pingpong2 !! (BinaryString("Ping"), 5000)).getOrElse("nil") } expect("pong") { - (pingpong3 !! BinaryString("Ping")).getOrElse("nil") + (pingpong3 !! (BinaryString("Ping"), 5000)).getOrElse("nil") } expect("ping") { - Log.messageLog.poll(5, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } expect("ping") { - Log.messageLog.poll(5, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } expect("ping") { - Log.messageLog.poll(5, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } intercept[RuntimeException] { - pingpong2 !! BinaryString("Die") + pingpong2 !! (BinaryString("Die"), 5000) } expect("DIE") { - Log.messageLog.poll(5, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } expect("pong") { - (pingpong1 !! BinaryString("Ping")).getOrElse("nil") + (pingpong1 !! (BinaryString("Ping"), 5000)).getOrElse("nil") } expect("pong") { - (pingpong2 !! BinaryString("Ping")).getOrElse("nil") + (pingpong2 !! (BinaryString("Ping"), 5000)).getOrElse("nil") } expect("pong") { - (pingpong3 !! BinaryString("Ping")).getOrElse("nil") + (pingpong3 !! (BinaryString("Ping"), 5000)).getOrElse("nil") } expect("ping") { - Log.messageLog.poll(5, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } expect("ping") { - Log.messageLog.poll(5, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } expect("ping") { - Log.messageLog.poll(5, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } } @Test def shouldKillMultipleActorsAllForOne = { - Log.messageLog.clear + clearMessageLogs val sup = getMultipleActorsAllForOneConf - sup.start + intercept[RuntimeException] { - pingpong2 !! BinaryString("Die") + pingpong2 !! (BinaryString("Die"), 5000) } expect("DIE") { - Log.messageLog.poll(5, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } expect("DIE") { - Log.messageLog.poll(5, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } expect("DIE") { - Log.messageLog.poll(5, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } } - def tesCallKillCallMultipleActorsAllForOne = { - Log.messageLog.clear + @Test def shouldCallKillCallMultipleActorsAllForOne = { + clearMessageLogs val sup = getMultipleActorsAllForOneConf - sup.start + expect("pong") { - (pingpong1 !! BinaryString("Ping")).getOrElse("nil") + (pingpong1 !! (BinaryString("Ping"), 5000)).getOrElse("nil") } expect("pong") { - (pingpong2 !! BinaryString("Ping")).getOrElse("nil") + (pingpong2 !! (BinaryString("Ping"), 5000)).getOrElse("nil") } expect("pong") { - (pingpong3 !! BinaryString("Ping")).getOrElse("nil") + (pingpong3 !! (BinaryString("Ping"), 5000)).getOrElse("nil") } expect("ping") { - Log.messageLog.poll(5, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } expect("ping") { - Log.messageLog.poll(5, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } expect("ping") { - Log.messageLog.poll(5, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } intercept[RuntimeException] { - pingpong2 !! BinaryString("Die") + pingpong2 !! (BinaryString("Die"), 5000) } expect("DIE") { - Log.messageLog.poll(5, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } expect("DIE") { - Log.messageLog.poll(5, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } expect("DIE") { - Log.messageLog.poll(5, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } expect("pong") { - (pingpong1 !! BinaryString("Ping")).getOrElse("nil") + (pingpong1 !! (BinaryString("Ping"), 5000)).getOrElse("nil") } expect("pong") { - (pingpong2 !! BinaryString("Ping")).getOrElse("nil") + (pingpong2 !! (BinaryString("Ping"), 5000)).getOrElse("nil") } expect("pong") { - (pingpong3 !! BinaryString("Ping")).getOrElse("nil") + (pingpong3 !! (BinaryString("Ping"), 5000)).getOrElse("nil") } expect("ping") { - Log.messageLog.poll(5, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } expect("ping") { - Log.messageLog.poll(5, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } expect("ping") { - Log.messageLog.poll(5, TimeUnit.SECONDS) + messageLog.poll(5, TimeUnit.SECONDS) } } + /* + + @Test def shouldOneWayKillSingleActorOneForOne = { + clearMessageLogs + val sup = getSingleActorOneForOneSupervisor + + pingpong1 ! BinaryString("Die") + + expect("DIE") { + messageLog.poll(5, TimeUnit.SECONDS) + } + } + + @Test def shouldOneWayCallKillCallSingleActorOneForOne = { + clearMessageLogs + val sup = getSingleActorOneForOneSupervisor + + pingpong1 ! OneWay + + expect("oneway") { + oneWayLog.poll(5, TimeUnit.SECONDS) + } + pingpong1 ! BinaryString("Die") + + expect("DIE") { + messageLog.poll(5, TimeUnit.SECONDS) + } + pingpong1 ! OneWay + + expect("oneway") { + oneWayLog.poll(5, TimeUnit.SECONDS) + } + } + + @Test def shouldRestartKilledActorsForNestedSupervisorHierarchy = { + clearMessageLogs + val sup = getNestedSupervisorsAllForOneConf + + + expect("pong") { + (pingpong1 !! (BinaryString("Ping"), 5000)).getOrElse("nil") + } + + expect("pong") { + (pingpong2 !! (BinaryString("Ping"), 5000)).getOrElse("nil") + } + + expect("pong") { + (pingpong3 !! (BinaryString("Ping"), 5000)).getOrElse("nil") + } + + expect("ping") { + messageLog.poll(5, TimeUnit.SECONDS) + } + expect("ping") { + messageLog.poll(5, TimeUnit.SECONDS) + } + expect("ping") { + messageLog.poll(5, TimeUnit.SECONDS) + } + intercept[RuntimeException] { + pingpong2 !! (BinaryString("Die"), 5000) + } + + expect("DIE") { + messageLog.poll(5 , TimeUnit.SECONDS) + } + expect("DIE") { + messageLog.poll(5, TimeUnit.SECONDS) + } + expect("DIE") { + messageLog.poll(5, TimeUnit.SECONDS) + } + expect("pong") { + (pingpong1 !! (BinaryString("Ping"), 5000)).getOrElse("nil") + } + + expect("pong") { + (pingpong2 !! (BinaryString("Ping"), 5000)).getOrElse("nil") + } + + expect("pong") { + (pingpong3 !! (BinaryString("Ping"), 5000)).getOrElse("nil") + } + + expect("ping") { + messageLog.poll(5, TimeUnit.SECONDS) + } + expect("ping") { + messageLog.poll(5, TimeUnit.SECONDS) + } + expect("ping") { + messageLog.poll(5, TimeUnit.SECONDS) + } + } + */ // ============================================= // Creat some supervisors with different configurations diff --git a/akka-core/src/test/scala/SupervisorSpec.scala b/akka-core/src/test/scala/SupervisorSpec.scala index 7fb97ae40d..73b763a227 100644 --- a/akka-core/src/test/scala/SupervisorSpec.scala +++ b/akka-core/src/test/scala/SupervisorSpec.scala @@ -11,7 +11,7 @@ import Actor._ import org.scalatest.junit.JUnitSuite import org.junit.Test -import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit, BlockingQueue, LinkedBlockingQueue} +import java.util.concurrent.{TimeUnit, LinkedBlockingQueue} object SupervisorSpec { var messageLog = new LinkedBlockingQueue[String] @@ -24,8 +24,7 @@ object SupervisorSpec { class PingPong1Actor extends Actor { import self._ - dispatcher = Dispatchers.newThreadBasedDispatcher(self) - timeout = 1000 + //dispatcher = Dispatchers.newThreadBasedDispatcher(self) def receive = { case Ping => messageLog.put("ping") @@ -35,40 +34,44 @@ object SupervisorSpec { oneWayLog.put("oneway") case Die => + println("******************** GOT DIE 1") throw new RuntimeException("DIE") } override def postRestart(reason: Throwable) { + println("******************** restart 1") messageLog.put(reason.getMessage) } } class PingPong2Actor extends Actor { import self._ - timeout = 1000 def receive = { case Ping => messageLog.put("ping") reply("pong") case Die => + println("******************** GOT DIE 2") throw new RuntimeException("DIE") } override def postRestart(reason: Throwable) { + println("******************** restart 2") messageLog.put(reason.getMessage) } } class PingPong3Actor extends Actor { import self._ - timeout = 1000 def receive = { case Ping => messageLog.put("ping") reply("pong") case Die => + println("******************** GOT DIE 3") throw new RuntimeException("DIE") } override def postRestart(reason: Throwable) { + println("******************** restart 3") messageLog.put(reason.getMessage) } } @@ -107,7 +110,7 @@ class SupervisorSpec extends JUnitSuite { @Test def shouldKillSingleActorOneForOne = { clearMessageLogs val sup = getSingleActorOneForOneSupervisor - sup.start + intercept[RuntimeException] { pingpong1 !! (Die, 5000) } @@ -120,7 +123,7 @@ class SupervisorSpec extends JUnitSuite { @Test def shouldCallKillCallSingleActorOneForOne = { clearMessageLogs val sup = getSingleActorOneForOneSupervisor - sup.start + expect("pong") { (pingpong1 !! (Ping, 5000)).getOrElse("nil") } @@ -147,7 +150,7 @@ class SupervisorSpec extends JUnitSuite { @Test def shouldKillSingleActorAllForOne = { clearMessageLogs val sup = getSingleActorAllForOneSupervisor - sup.start + intercept[RuntimeException] { pingpong1 !! (Die, 5000) } @@ -160,7 +163,7 @@ class SupervisorSpec extends JUnitSuite { @Test def shouldCallKillCallSingleActorAllForOne = { clearMessageLogs val sup = getSingleActorAllForOneSupervisor - sup.start + expect("pong") { (pingpong1 !! (Ping, 5000)).getOrElse("nil") } @@ -187,7 +190,7 @@ class SupervisorSpec extends JUnitSuite { @Test def shouldKillMultipleActorsOneForOne1 = { clearMessageLogs val sup = getMultipleActorsOneForOneConf - sup.start + intercept[RuntimeException] { pingpong1 !! (Die, 5000) } @@ -200,7 +203,7 @@ class SupervisorSpec extends JUnitSuite { @Test def shouldKillMultipleActorsOneForOne2 = { clearMessageLogs val sup = getMultipleActorsOneForOneConf - sup.start + intercept[RuntimeException] { pingpong3 !! (Die, 5000) } @@ -213,7 +216,7 @@ class SupervisorSpec extends JUnitSuite { @Test def shouldKillCallMultipleActorsOneForOne = { clearMessageLogs val sup = getMultipleActorsOneForOneConf - sup.start + expect("pong") { (pingpong1 !! (Ping, 5000)).getOrElse("nil") } @@ -268,7 +271,7 @@ class SupervisorSpec extends JUnitSuite { @Test def shouldKillMultipleActorsAllForOne = { clearMessageLogs val sup = getMultipleActorsAllForOneConf - sup.start + intercept[RuntimeException] { pingpong2 !! (Die, 5000) } @@ -287,7 +290,7 @@ class SupervisorSpec extends JUnitSuite { @Test def shouldCallKillCallMultipleActorsAllForOne = { clearMessageLogs val sup = getMultipleActorsAllForOneConf - sup.start + expect("pong") { (pingpong1 !! (Ping, 5000)).getOrElse("nil") } @@ -348,7 +351,7 @@ class SupervisorSpec extends JUnitSuite { @Test def shouldOneWayKillSingleActorOneForOne = { clearMessageLogs val sup = getSingleActorOneForOneSupervisor - sup.start + pingpong1 ! Die expect("DIE") { @@ -359,7 +362,7 @@ class SupervisorSpec extends JUnitSuite { @Test def shouldOneWayCallKillCallSingleActorOneForOne = { clearMessageLogs val sup = getSingleActorOneForOneSupervisor - sup.start + pingpong1 ! OneWay expect("oneway") { @@ -380,7 +383,7 @@ class SupervisorSpec extends JUnitSuite { @Test def shouldRestartKilledActorsForNestedSupervisorHierarchy = { clearMessageLogs val sup = getNestedSupervisorsAllForOneConf - sup.start + expect("pong") { (pingpong1 !! (Ping, 5000)).getOrElse("nil") @@ -497,7 +500,7 @@ class SupervisorSpec extends JUnitSuite { SupervisorConfig( RestartStrategy(OneForOne, 3, 5000, List(classOf[Exception])), Supervise( - pingpong3, + pingpong1, LifeCycle(Permanent)) :: Supervise( @@ -505,7 +508,7 @@ class SupervisorSpec extends JUnitSuite { LifeCycle(Permanent)) :: Supervise( - pingpong1, + pingpong3, LifeCycle(Permanent)) :: Nil)) }