Update supervisor spec
This commit is contained in:
parent
832568a152
commit
a4a4395d28
5 changed files with 292 additions and 513 deletions
|
|
@ -4,64 +4,48 @@
|
||||||
|
|
||||||
package akka.actor
|
package akka.actor
|
||||||
|
|
||||||
|
import org.scalatest.WordSpec
|
||||||
|
import org.scalatest.matchers.MustMatchers
|
||||||
|
import org.scalatest.BeforeAndAfterEach
|
||||||
|
|
||||||
|
import akka.testing._
|
||||||
|
import akka.testing.Testing.{testMillis, sleepFor}
|
||||||
|
import akka.util.duration._
|
||||||
import akka.config.Supervision._
|
import akka.config.Supervision._
|
||||||
import akka.{OneWay, Die, Ping}
|
import akka.{Die, Ping}
|
||||||
import Actor._
|
import Actor._
|
||||||
|
|
||||||
import org.scalatest.junit.JUnitSuite
|
|
||||||
import org.junit.Test
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
import java.util.concurrent. {CountDownLatch, TimeUnit, LinkedBlockingQueue}
|
import java.util.concurrent.LinkedBlockingQueue
|
||||||
|
|
||||||
|
|
||||||
object SupervisorSpec {
|
object SupervisorSpec {
|
||||||
|
val Timeout = 5 seconds
|
||||||
|
val TimeoutMillis = testMillis(Timeout).toInt
|
||||||
|
|
||||||
|
// =====================================================
|
||||||
|
// Message logs
|
||||||
|
// =====================================================
|
||||||
|
|
||||||
|
val PingMessage = "ping"
|
||||||
|
val PongMessage = "pong"
|
||||||
|
val ExceptionMessage = "Expected exception; to test fault-tolerance"
|
||||||
|
|
||||||
var messageLog = new LinkedBlockingQueue[String]
|
var messageLog = new LinkedBlockingQueue[String]
|
||||||
var oneWayLog = new LinkedBlockingQueue[String]
|
|
||||||
|
|
||||||
def clearMessageLogs {
|
def messageLogPoll = messageLog.poll(Timeout.length, Timeout.unit)
|
||||||
messageLog.clear
|
|
||||||
oneWayLog.clear
|
|
||||||
}
|
|
||||||
|
|
||||||
class PingPong1Actor extends Actor {
|
// =====================================================
|
||||||
import self._
|
// Actors
|
||||||
|
// =====================================================
|
||||||
|
|
||||||
|
class PingPongActor extends Actor {
|
||||||
def receive = {
|
def receive = {
|
||||||
case Ping =>
|
case Ping =>
|
||||||
messageLog.put("ping")
|
messageLog.put(PingMessage)
|
||||||
reply("pong")
|
self.reply_?(PongMessage)
|
||||||
|
|
||||||
case OneWay =>
|
|
||||||
oneWayLog.put("oneway")
|
|
||||||
|
|
||||||
case Die =>
|
case Die =>
|
||||||
throw new RuntimeException("Expected exception; to test fault-tolerance")
|
throw new RuntimeException(ExceptionMessage)
|
||||||
}
|
|
||||||
override def postRestart(reason: Throwable) {
|
|
||||||
messageLog.put(reason.getMessage)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
class PingPong2Actor extends Actor {
|
|
||||||
import self._
|
|
||||||
def receive = {
|
|
||||||
case Ping =>
|
|
||||||
messageLog.put("ping")
|
|
||||||
reply("pong")
|
|
||||||
case Die =>
|
|
||||||
throw new RuntimeException("Expected exception; to test fault-tolerance")
|
|
||||||
}
|
|
||||||
override def postRestart(reason: Throwable) {
|
|
||||||
messageLog.put(reason.getMessage)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
class PingPong3Actor extends Actor {
|
|
||||||
import self._
|
|
||||||
def receive = {
|
|
||||||
case Ping =>
|
|
||||||
messageLog.put("ping")
|
|
||||||
reply("pong")
|
|
||||||
case Die =>
|
|
||||||
throw new RuntimeException("Expected exception; to test fault-tolerance")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
override def postRestart(reason: Throwable) {
|
override def postRestart(reason: Throwable) {
|
||||||
|
|
@ -69,490 +53,74 @@ object SupervisorSpec {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class TemporaryActor extends Actor {
|
class TemporaryActor extends PingPongActor {
|
||||||
import self._
|
self.lifeCycle = Temporary
|
||||||
lifeCycle = Temporary
|
|
||||||
def receive = {
|
|
||||||
case Ping =>
|
|
||||||
messageLog.put("ping")
|
|
||||||
reply("pong")
|
|
||||||
case Die =>
|
|
||||||
throw new RuntimeException("Expected exception; to test fault-tolerance")
|
|
||||||
}
|
|
||||||
|
|
||||||
override def postRestart(reason: Throwable) {
|
|
||||||
messageLog.put(reason.getMessage)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
class Master extends Actor {
|
class Master extends Actor {
|
||||||
self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 5, 1000)
|
self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 5, testMillis(1 second).toInt)
|
||||||
|
|
||||||
val temp = self.spawnLink[TemporaryActor]
|
val temp = self.spawnLink[TemporaryActor]
|
||||||
|
|
||||||
override def receive = {
|
override def receive = {
|
||||||
case Die => temp !! (Die, 5000)
|
case Die => temp !! (Die, TimeoutMillis)
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
||||||
*/
|
|
||||||
class SupervisorSpec extends JUnitSuite {
|
|
||||||
import SupervisorSpec._
|
|
||||||
|
|
||||||
var pingpong1: ActorRef = _
|
|
||||||
var pingpong2: ActorRef = _
|
|
||||||
var pingpong3: ActorRef = _
|
|
||||||
var temporaryActor: ActorRef = _
|
|
||||||
|
|
||||||
@Test def shoulNotRestartProgrammaticallyLinkedTemporaryActor = {
|
|
||||||
clearMessageLogs
|
|
||||||
val master = actorOf[Master].start
|
|
||||||
|
|
||||||
intercept[RuntimeException] {
|
|
||||||
master !! (Die, 5000)
|
|
||||||
}
|
|
||||||
|
|
||||||
Thread.sleep(1000)
|
|
||||||
assert(messageLog.size === 0)
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test def shoulNotRestartTemporaryActor = {
|
|
||||||
clearMessageLogs
|
|
||||||
val sup = getTemporaryActorAllForOneSupervisor
|
|
||||||
|
|
||||||
intercept[RuntimeException] {
|
|
||||||
temporaryActor !! (Die, 5000)
|
|
||||||
}
|
|
||||||
|
|
||||||
Thread.sleep(1000)
|
|
||||||
assert(messageLog.size === 0)
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test def shouldStartServerForNestedSupervisorHierarchy = {
|
|
||||||
clearMessageLogs
|
|
||||||
val sup = getNestedSupervisorsAllForOneConf
|
|
||||||
sup.start
|
|
||||||
|
|
||||||
expect("pong") {
|
|
||||||
(pingpong1 !! (Ping, 5000)).getOrElse("nil")
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test def shouldKillSingleActorOneForOne = {
|
// =====================================================
|
||||||
clearMessageLogs
|
// Creating actors and supervisors
|
||||||
val sup = getSingleActorOneForOneSupervisor
|
// =====================================================
|
||||||
|
|
||||||
intercept[RuntimeException] {
|
def temporaryActorAllForOne = {
|
||||||
pingpong1 !! (Die, 5000)
|
val temporaryActor = actorOf[TemporaryActor].start
|
||||||
}
|
|
||||||
|
|
||||||
expect("Expected exception; to test fault-tolerance") {
|
val supervisor = Supervisor(
|
||||||
messageLog.poll(5, TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test def shouldCallKillCallSingleActorOneForOne = {
|
|
||||||
clearMessageLogs
|
|
||||||
val sup = getSingleActorOneForOneSupervisor
|
|
||||||
|
|
||||||
expect("pong") {
|
|
||||||
(pingpong1 !! (Ping, 5000)).getOrElse("nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
expect("ping") {
|
|
||||||
messageLog.poll(5, TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
intercept[RuntimeException] {
|
|
||||||
pingpong1 !! (Die, 5000)
|
|
||||||
}
|
|
||||||
|
|
||||||
expect("Expected exception; to test fault-tolerance") {
|
|
||||||
messageLog.poll(5, TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
expect("pong") {
|
|
||||||
(pingpong1 !! (Ping, 5000)).getOrElse("nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
expect("ping") {
|
|
||||||
messageLog.poll(5, TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test def shouldKillSingleActorAllForOne = {
|
|
||||||
clearMessageLogs
|
|
||||||
val sup = getSingleActorAllForOneSupervisor
|
|
||||||
|
|
||||||
intercept[RuntimeException] {
|
|
||||||
pingpong1 !! (Die, 5000)
|
|
||||||
}
|
|
||||||
|
|
||||||
expect("Expected exception; to test fault-tolerance") {
|
|
||||||
messageLog.poll(5, TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test def shouldCallKillCallSingleActorAllForOne = {
|
|
||||||
clearMessageLogs
|
|
||||||
val sup = getSingleActorAllForOneSupervisor
|
|
||||||
|
|
||||||
expect("pong") {
|
|
||||||
(pingpong1 !! (Ping, 5000)).getOrElse("nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
expect("ping") {
|
|
||||||
messageLog.poll(5, TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
intercept[RuntimeException] {
|
|
||||||
pingpong1 !! (Die, 5000)
|
|
||||||
}
|
|
||||||
|
|
||||||
expect("Expected exception; to test fault-tolerance") {
|
|
||||||
messageLog.poll(5, TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
expect("pong") {
|
|
||||||
(pingpong1 !! (Ping, 5000)).getOrElse("nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
expect("ping") {
|
|
||||||
messageLog.poll(5, TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test def shouldKillMultipleActorsOneForOne1 = {
|
|
||||||
clearMessageLogs
|
|
||||||
val sup = getMultipleActorsOneForOneConf
|
|
||||||
|
|
||||||
intercept[RuntimeException] {
|
|
||||||
pingpong1 !! (Die, 5000)
|
|
||||||
}
|
|
||||||
|
|
||||||
expect("Expected exception; to test fault-tolerance") {
|
|
||||||
messageLog.poll(5, TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test def shouldKillMultipleActorsOneForOne2 = {
|
|
||||||
clearMessageLogs
|
|
||||||
val sup = getMultipleActorsOneForOneConf
|
|
||||||
|
|
||||||
intercept[RuntimeException] {
|
|
||||||
pingpong3 !! (Die, 5000)
|
|
||||||
}
|
|
||||||
|
|
||||||
expect("Expected exception; to test fault-tolerance") {
|
|
||||||
messageLog.poll(5, TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test def shouldKillCallMultipleActorsOneForOne = {
|
|
||||||
clearMessageLogs
|
|
||||||
val sup = getMultipleActorsOneForOneConf
|
|
||||||
|
|
||||||
expect("pong") {
|
|
||||||
(pingpong1 !! (Ping, 5000)).getOrElse("nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
expect("pong") {
|
|
||||||
(pingpong2 !! (Ping, 5000)).getOrElse("nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
expect("pong") {
|
|
||||||
(pingpong3 !! (Ping, 5000)).getOrElse("nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
expect("ping") {
|
|
||||||
messageLog.poll(5, TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
expect("ping") {
|
|
||||||
messageLog.poll(5, TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
expect("ping") {
|
|
||||||
messageLog.poll(5, TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
intercept[RuntimeException] {
|
|
||||||
pingpong2 !! (Die, 5000)
|
|
||||||
}
|
|
||||||
|
|
||||||
expect("Expected exception; to test fault-tolerance") {
|
|
||||||
messageLog.poll(5, TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
expect("pong") {
|
|
||||||
(pingpong1 !! (Ping, 5000)).getOrElse("nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
expect("pong") {
|
|
||||||
(pingpong2 !! (Ping, 5000)).getOrElse("nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
expect("pong") {
|
|
||||||
(pingpong3 !! (Ping, 5000)).getOrElse("nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
expect("ping") {
|
|
||||||
messageLog.poll(5, TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
expect("ping") {
|
|
||||||
messageLog.poll(5, TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
expect("ping") {
|
|
||||||
messageLog.poll(5, TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test def shouldKillMultipleActorsAllForOne = {
|
|
||||||
clearMessageLogs
|
|
||||||
val sup = getMultipleActorsAllForOneConf
|
|
||||||
|
|
||||||
intercept[RuntimeException] {
|
|
||||||
pingpong2 !! (Die, 5000)
|
|
||||||
}
|
|
||||||
|
|
||||||
expect("Expected exception; to test fault-tolerance") {
|
|
||||||
messageLog.poll(5, TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
expect("Expected exception; to test fault-tolerance") {
|
|
||||||
messageLog.poll(5, TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
expect("Expected exception; to test fault-tolerance") {
|
|
||||||
messageLog.poll(5, TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test def shouldCallKillCallMultipleActorsAllForOne = {
|
|
||||||
clearMessageLogs
|
|
||||||
val sup = getMultipleActorsAllForOneConf
|
|
||||||
|
|
||||||
expect("pong") {
|
|
||||||
(pingpong1 !! (Ping, 5000)).getOrElse("nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
expect("pong") {
|
|
||||||
(pingpong2 !! (Ping, 5000)).getOrElse("nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
expect("pong") {
|
|
||||||
(pingpong3 !! (Ping, 5000)).getOrElse("nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
expect("ping") {
|
|
||||||
messageLog.poll(5, TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
expect("ping") {
|
|
||||||
messageLog.poll(5, TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
expect("ping") {
|
|
||||||
messageLog.poll(5, TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
intercept[RuntimeException] {
|
|
||||||
pingpong2 !! (Die, 5000)
|
|
||||||
}
|
|
||||||
|
|
||||||
expect("Expected exception; to test fault-tolerance") {
|
|
||||||
messageLog.poll(5, TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
expect("Expected exception; to test fault-tolerance") {
|
|
||||||
messageLog.poll(5, TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
expect("Expected exception; to test fault-tolerance") {
|
|
||||||
messageLog.poll(5, TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
expect("pong") {
|
|
||||||
(pingpong1 !! (Ping, 5000)).getOrElse("nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
expect("pong") {
|
|
||||||
(pingpong2 !! (Ping, 5000)).getOrElse("nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
expect("pong") {
|
|
||||||
(pingpong3 !! (Ping, 5000)).getOrElse("nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
expect("ping") {
|
|
||||||
messageLog.poll(5, TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
expect("ping") {
|
|
||||||
messageLog.poll(5, TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
expect("ping") {
|
|
||||||
messageLog.poll(5, TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test def shouldOneWayKillSingleActorOneForOne = {
|
|
||||||
clearMessageLogs
|
|
||||||
val sup = getSingleActorOneForOneSupervisor
|
|
||||||
|
|
||||||
pingpong1 ! Die
|
|
||||||
|
|
||||||
expect("Expected exception; to test fault-tolerance") {
|
|
||||||
messageLog.poll(5, TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test def shouldOneWayCallKillCallSingleActorOneForOne = {
|
|
||||||
clearMessageLogs
|
|
||||||
val sup = getSingleActorOneForOneSupervisor
|
|
||||||
|
|
||||||
pingpong1 ! OneWay
|
|
||||||
|
|
||||||
expect("oneway") {
|
|
||||||
oneWayLog.poll(5, TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
pingpong1 ! Die
|
|
||||||
|
|
||||||
expect("Expected exception; to test fault-tolerance") {
|
|
||||||
messageLog.poll(5, TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
pingpong1 ! OneWay
|
|
||||||
|
|
||||||
expect("oneway") {
|
|
||||||
oneWayLog.poll(5, TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test def shouldRestartKilledActorsForNestedSupervisorHierarchy = {
|
|
||||||
clearMessageLogs
|
|
||||||
val sup = getNestedSupervisorsAllForOneConf
|
|
||||||
|
|
||||||
|
|
||||||
expect("pong") {
|
|
||||||
(pingpong1 !! (Ping, 5000)).getOrElse("nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
expect("pong") {
|
|
||||||
(pingpong2 !! (Ping, 5000)).getOrElse("nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
expect("pong") {
|
|
||||||
(pingpong3 !! (Ping, 5000)).getOrElse("nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
expect("ping") {
|
|
||||||
messageLog.poll(5, TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
expect("ping") {
|
|
||||||
messageLog.poll(5, TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
expect("ping") {
|
|
||||||
messageLog.poll(5, TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
intercept[RuntimeException] {
|
|
||||||
pingpong2 !! (Die, 5000)
|
|
||||||
}
|
|
||||||
|
|
||||||
expect("Expected exception; to test fault-tolerance") {
|
|
||||||
messageLog.poll(5 , TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
expect("Expected exception; to test fault-tolerance") {
|
|
||||||
messageLog.poll(5, TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
expect("Expected exception; to test fault-tolerance") {
|
|
||||||
messageLog.poll(5, TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
expect("pong") {
|
|
||||||
(pingpong1 !! (Ping, 5000)).getOrElse("nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
expect("pong") {
|
|
||||||
(pingpong2 !! (Ping, 5000)).getOrElse("nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
expect("pong") {
|
|
||||||
(pingpong3 !! (Ping, 5000)).getOrElse("nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
expect("ping") {
|
|
||||||
messageLog.poll(5, TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
expect("ping") {
|
|
||||||
messageLog.poll(5, TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
expect("ping") {
|
|
||||||
messageLog.poll(5, TimeUnit.SECONDS)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test def shouldAttemptRestartWhenExceptionDuringRestart {
|
|
||||||
val inits = new AtomicInteger(0)
|
|
||||||
val dyingActor = actorOf(new Actor {
|
|
||||||
self.lifeCycle = Permanent
|
|
||||||
inits.incrementAndGet
|
|
||||||
|
|
||||||
if (!(inits.get % 2 != 0))
|
|
||||||
throw new IllegalStateException("Don't wanna!")
|
|
||||||
|
|
||||||
def receive = {
|
|
||||||
case Ping => self.reply_?("pong")
|
|
||||||
case Die => throw new Exception("expected")
|
|
||||||
}
|
|
||||||
})
|
|
||||||
val supervisor =
|
|
||||||
Supervisor(
|
|
||||||
SupervisorConfig(
|
|
||||||
OneForOneStrategy(classOf[Exception] :: Nil,3,10000),
|
|
||||||
Supervise(dyingActor,Permanent) :: Nil))
|
|
||||||
|
|
||||||
intercept[Exception] {
|
|
||||||
dyingActor !! (Die, 5000)
|
|
||||||
}
|
|
||||||
|
|
||||||
expect("pong") {
|
|
||||||
(dyingActor !! (Ping, 5000)).getOrElse("nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
expect(3) { inits.get }
|
|
||||||
supervisor.shutdown
|
|
||||||
}
|
|
||||||
|
|
||||||
// =============================================
|
|
||||||
// Create some supervisors with different configurations
|
|
||||||
|
|
||||||
def getTemporaryActorAllForOneSupervisor: Supervisor = {
|
|
||||||
temporaryActor = actorOf[TemporaryActor].start
|
|
||||||
|
|
||||||
Supervisor(
|
|
||||||
SupervisorConfig(
|
SupervisorConfig(
|
||||||
AllForOneStrategy(List(classOf[Exception]), 3, 5000),
|
AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis),
|
||||||
Supervise(
|
Supervise(
|
||||||
temporaryActor,
|
temporaryActor,
|
||||||
Temporary)
|
Temporary)
|
||||||
:: Nil))
|
:: Nil))
|
||||||
|
|
||||||
|
(temporaryActor, supervisor)
|
||||||
}
|
}
|
||||||
|
|
||||||
def getSingleActorAllForOneSupervisor: Supervisor = {
|
def singleActorAllForOne = {
|
||||||
pingpong1 = actorOf[PingPong1Actor].start
|
val pingpong = actorOf[PingPongActor].start
|
||||||
|
|
||||||
Supervisor(
|
val supervisor = Supervisor(
|
||||||
SupervisorConfig(
|
SupervisorConfig(
|
||||||
AllForOneStrategy(List(classOf[Exception]), 3, 5000),
|
AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis),
|
||||||
Supervise(
|
Supervise(
|
||||||
pingpong1,
|
pingpong,
|
||||||
Permanent)
|
Permanent)
|
||||||
:: Nil))
|
:: Nil))
|
||||||
|
|
||||||
|
(pingpong, supervisor)
|
||||||
}
|
}
|
||||||
|
|
||||||
def getSingleActorOneForOneSupervisor: Supervisor = {
|
def singleActorOneForOne = {
|
||||||
pingpong1 = actorOf[PingPong1Actor].start
|
val pingpong = actorOf[PingPongActor].start
|
||||||
|
|
||||||
Supervisor(
|
val supervisor = Supervisor(
|
||||||
SupervisorConfig(
|
SupervisorConfig(
|
||||||
OneForOneStrategy(List(classOf[Exception]), 3, 5000),
|
OneForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis),
|
||||||
Supervise(
|
Supervise(
|
||||||
pingpong1,
|
pingpong,
|
||||||
Permanent)
|
Permanent)
|
||||||
:: Nil))
|
:: Nil))
|
||||||
|
|
||||||
|
(pingpong, supervisor)
|
||||||
}
|
}
|
||||||
|
|
||||||
def getMultipleActorsAllForOneConf: Supervisor = {
|
def multipleActorsAllForOne = {
|
||||||
pingpong1 = actorOf[PingPong1Actor].start
|
val pingpong1 = actorOf[PingPongActor].start
|
||||||
pingpong2 = actorOf[PingPong2Actor].start
|
val pingpong2 = actorOf[PingPongActor].start
|
||||||
pingpong3 = actorOf[PingPong3Actor].start
|
val pingpong3 = actorOf[PingPongActor].start
|
||||||
|
|
||||||
Supervisor(
|
val supervisor = Supervisor(
|
||||||
SupervisorConfig(
|
SupervisorConfig(
|
||||||
AllForOneStrategy(List(classOf[Exception]), 3, 5000),
|
AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis),
|
||||||
Supervise(
|
Supervise(
|
||||||
pingpong1,
|
pingpong1,
|
||||||
Permanent)
|
Permanent)
|
||||||
|
|
@ -565,16 +133,18 @@ class SupervisorSpec extends JUnitSuite {
|
||||||
pingpong3,
|
pingpong3,
|
||||||
Permanent)
|
Permanent)
|
||||||
:: Nil))
|
:: Nil))
|
||||||
|
|
||||||
|
(pingpong1, pingpong2, pingpong3, supervisor)
|
||||||
}
|
}
|
||||||
|
|
||||||
def getMultipleActorsOneForOneConf: Supervisor = {
|
def multipleActorsOneForOne = {
|
||||||
pingpong1 = actorOf[PingPong1Actor].start
|
val pingpong1 = actorOf[PingPongActor].start
|
||||||
pingpong2 = actorOf[PingPong2Actor].start
|
val pingpong2 = actorOf[PingPongActor].start
|
||||||
pingpong3 = actorOf[PingPong3Actor].start
|
val pingpong3 = actorOf[PingPongActor].start
|
||||||
|
|
||||||
Supervisor(
|
val supervisor = Supervisor(
|
||||||
SupervisorConfig(
|
SupervisorConfig(
|
||||||
OneForOneStrategy(List(classOf[Exception]), 3, 5000),
|
OneForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis),
|
||||||
Supervise(
|
Supervise(
|
||||||
pingpong1,
|
pingpong1,
|
||||||
Permanent)
|
Permanent)
|
||||||
|
|
@ -587,22 +157,24 @@ class SupervisorSpec extends JUnitSuite {
|
||||||
pingpong3,
|
pingpong3,
|
||||||
Permanent)
|
Permanent)
|
||||||
:: Nil))
|
:: Nil))
|
||||||
|
|
||||||
|
(pingpong1, pingpong2, pingpong3, supervisor)
|
||||||
}
|
}
|
||||||
|
|
||||||
def getNestedSupervisorsAllForOneConf: Supervisor = {
|
def nestedSupervisorsAllForOne = {
|
||||||
pingpong1 = actorOf[PingPong1Actor].start
|
val pingpong1 = actorOf[PingPongActor]
|
||||||
pingpong2 = actorOf[PingPong2Actor].start
|
val pingpong2 = actorOf[PingPongActor]
|
||||||
pingpong3 = actorOf[PingPong3Actor].start
|
val pingpong3 = actorOf[PingPongActor]
|
||||||
|
|
||||||
Supervisor(
|
val supervisor = Supervisor(
|
||||||
SupervisorConfig(
|
SupervisorConfig(
|
||||||
AllForOneStrategy(List(classOf[Exception]), 3, 5000),
|
AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis),
|
||||||
Supervise(
|
Supervise(
|
||||||
pingpong1,
|
pingpong1,
|
||||||
Permanent)
|
Permanent)
|
||||||
::
|
::
|
||||||
SupervisorConfig(
|
SupervisorConfig(
|
||||||
AllForOneStrategy(Nil, 3, 5000),
|
AllForOneStrategy(Nil, 3, TimeoutMillis),
|
||||||
Supervise(
|
Supervise(
|
||||||
pingpong2,
|
pingpong2,
|
||||||
Permanent)
|
Permanent)
|
||||||
|
|
@ -612,5 +184,204 @@ class SupervisorSpec extends JUnitSuite {
|
||||||
Permanent)
|
Permanent)
|
||||||
:: Nil)
|
:: Nil)
|
||||||
:: Nil))
|
:: Nil))
|
||||||
|
|
||||||
|
(pingpong1, pingpong2, pingpong3, supervisor)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class SupervisorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach {
|
||||||
|
import SupervisorSpec._
|
||||||
|
|
||||||
|
override def beforeEach() = {
|
||||||
|
messageLog.clear
|
||||||
|
}
|
||||||
|
|
||||||
|
def ping(pingPongActor: ActorRef) = {
|
||||||
|
(pingPongActor !! (Ping, TimeoutMillis)).getOrElse("nil") must be (PongMessage)
|
||||||
|
messageLogPoll must be (PingMessage)
|
||||||
|
}
|
||||||
|
|
||||||
|
def kill(pingPongActor: ActorRef) = {
|
||||||
|
intercept[RuntimeException] { pingPongActor !! (Die, TimeoutMillis) }
|
||||||
|
messageLogPoll must be (ExceptionMessage)
|
||||||
|
}
|
||||||
|
|
||||||
|
"A supervisor" must {
|
||||||
|
|
||||||
|
"not restart programmatically linked temporary actor" in {
|
||||||
|
val master = actorOf[Master].start
|
||||||
|
|
||||||
|
intercept[RuntimeException] {
|
||||||
|
master !! (Die, TimeoutMillis)
|
||||||
|
}
|
||||||
|
|
||||||
|
sleepFor(1 second)
|
||||||
|
messageLog.size must be (0)
|
||||||
|
}
|
||||||
|
|
||||||
|
"not restart temporary actor" in {
|
||||||
|
val (temporaryActor, supervisor) = temporaryActorAllForOne
|
||||||
|
|
||||||
|
intercept[RuntimeException] {
|
||||||
|
temporaryActor !! (Die, TimeoutMillis)
|
||||||
|
}
|
||||||
|
|
||||||
|
sleepFor(1 second)
|
||||||
|
messageLog.size must be (0)
|
||||||
|
}
|
||||||
|
|
||||||
|
"start server for nested supervisor hierarchy" in {
|
||||||
|
val (actor1, actor2, actor3, supervisor) = nestedSupervisorsAllForOne
|
||||||
|
ping(actor1)
|
||||||
|
}
|
||||||
|
|
||||||
|
"kill single actor OneForOne" in {
|
||||||
|
val (actor, supervisor) = singleActorOneForOne
|
||||||
|
kill(actor)
|
||||||
|
}
|
||||||
|
|
||||||
|
"call-kill-call single actor OneForOne" in {
|
||||||
|
val (actor, supervisor) = singleActorOneForOne
|
||||||
|
ping(actor)
|
||||||
|
kill(actor)
|
||||||
|
ping(actor)
|
||||||
|
}
|
||||||
|
|
||||||
|
"kill single actor AllForOne" in {
|
||||||
|
val (actor, supervisor) = singleActorAllForOne
|
||||||
|
kill(actor)
|
||||||
|
}
|
||||||
|
|
||||||
|
"call-kill-call single actor AllForOne" in {
|
||||||
|
val (actor, supervisor) = singleActorAllForOne
|
||||||
|
ping(actor)
|
||||||
|
kill(actor)
|
||||||
|
ping(actor)
|
||||||
|
}
|
||||||
|
|
||||||
|
"kill multiple actors OneForOne 1" in {
|
||||||
|
val (actor1, actor2, actor3, supervisor) = multipleActorsOneForOne
|
||||||
|
kill(actor1)
|
||||||
|
}
|
||||||
|
|
||||||
|
"kill multiple actors OneForOne 2" in {
|
||||||
|
val (actor1, actor2, actor3, supervisor) = multipleActorsOneForOne
|
||||||
|
kill(actor3)
|
||||||
|
}
|
||||||
|
|
||||||
|
"call-kill-call multiple actors OneForOne" in {
|
||||||
|
val (actor1, actor2, actor3, supervisor) = multipleActorsOneForOne
|
||||||
|
|
||||||
|
ping(actor1)
|
||||||
|
ping(actor2)
|
||||||
|
ping(actor3)
|
||||||
|
|
||||||
|
kill(actor2)
|
||||||
|
|
||||||
|
ping(actor1)
|
||||||
|
ping(actor2)
|
||||||
|
ping(actor3)
|
||||||
|
}
|
||||||
|
|
||||||
|
"kill multiple actors AllForOne" in {
|
||||||
|
val (actor1, actor2, actor3, supervisor) = multipleActorsAllForOne
|
||||||
|
|
||||||
|
kill(actor2)
|
||||||
|
|
||||||
|
// and two more exception messages
|
||||||
|
messageLogPoll must be (ExceptionMessage)
|
||||||
|
messageLogPoll must be (ExceptionMessage)
|
||||||
|
}
|
||||||
|
|
||||||
|
"call-kill-call multiple actors AllForOne" in {
|
||||||
|
val (actor1, actor2, actor3, supervisor) = multipleActorsAllForOne
|
||||||
|
|
||||||
|
ping(actor1)
|
||||||
|
ping(actor2)
|
||||||
|
ping(actor3)
|
||||||
|
|
||||||
|
kill(actor2)
|
||||||
|
|
||||||
|
// and two more exception messages
|
||||||
|
messageLogPoll must be (ExceptionMessage)
|
||||||
|
messageLogPoll must be (ExceptionMessage)
|
||||||
|
|
||||||
|
ping(actor1)
|
||||||
|
ping(actor2)
|
||||||
|
ping(actor3)
|
||||||
|
}
|
||||||
|
|
||||||
|
"one-way kill single actor OneForOne" in {
|
||||||
|
val (actor, supervisor) = singleActorOneForOne
|
||||||
|
|
||||||
|
actor ! Die
|
||||||
|
messageLogPoll must be (ExceptionMessage)
|
||||||
|
}
|
||||||
|
|
||||||
|
"one-way call-kill-call single actor OneForOne" in {
|
||||||
|
val (actor, supervisor) = singleActorOneForOne
|
||||||
|
|
||||||
|
actor ! Ping
|
||||||
|
messageLogPoll must be (PingMessage)
|
||||||
|
|
||||||
|
actor ! Die
|
||||||
|
messageLogPoll must be (ExceptionMessage)
|
||||||
|
|
||||||
|
actor ! Ping
|
||||||
|
messageLogPoll must be (PingMessage)
|
||||||
|
}
|
||||||
|
|
||||||
|
"restart killed actors in nested superviser hierarchy" in {
|
||||||
|
val (actor1, actor2, actor3, supervisor) = nestedSupervisorsAllForOne
|
||||||
|
|
||||||
|
ping(actor1)
|
||||||
|
ping(actor2)
|
||||||
|
ping(actor3)
|
||||||
|
|
||||||
|
kill(actor2)
|
||||||
|
|
||||||
|
// and two more exception messages
|
||||||
|
messageLogPoll must be (ExceptionMessage)
|
||||||
|
messageLogPoll must be (ExceptionMessage)
|
||||||
|
|
||||||
|
ping(actor1)
|
||||||
|
ping(actor2)
|
||||||
|
ping(actor3)
|
||||||
|
}
|
||||||
|
|
||||||
|
"must attempt restart when exception during restart" in {
|
||||||
|
val inits = new AtomicInteger(0)
|
||||||
|
|
||||||
|
val dyingActor = actorOf(new Actor {
|
||||||
|
self.lifeCycle = Permanent
|
||||||
|
inits.incrementAndGet
|
||||||
|
|
||||||
|
if (inits.get % 2 == 0) throw new IllegalStateException("Don't wanna!")
|
||||||
|
|
||||||
|
def receive = {
|
||||||
|
case Ping => self.reply_?(PongMessage)
|
||||||
|
case Die => throw new Exception("expected")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
val supervisor =
|
||||||
|
Supervisor(
|
||||||
|
SupervisorConfig(
|
||||||
|
OneForOneStrategy(classOf[Exception] :: Nil, 3, 10000),
|
||||||
|
Supervise(dyingActor, Permanent) :: Nil))
|
||||||
|
|
||||||
|
intercept[Exception] {
|
||||||
|
dyingActor !! (Die, TimeoutMillis)
|
||||||
|
}
|
||||||
|
|
||||||
|
// give time for restart
|
||||||
|
sleepFor(3 seconds)
|
||||||
|
|
||||||
|
(dyingActor !! (Ping, TimeoutMillis)).getOrElse("nil") must be (PongMessage)
|
||||||
|
|
||||||
|
inits.get must be (3)
|
||||||
|
|
||||||
|
supervisor.shutdown
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -25,5 +25,9 @@ object Testing {
|
||||||
def testTime(t: Float): Float = (timeFactor * t).toFloat
|
def testTime(t: Float): Float = (timeFactor * t).toFloat
|
||||||
def testTime(t: Double): Double = timeFactor * t
|
def testTime(t: Double): Double = timeFactor * t
|
||||||
|
|
||||||
|
def testSeconds(duration: Duration) = testTime(duration.toSeconds)
|
||||||
|
def testMillis(duration: Duration) = testTime(duration.toMillis)
|
||||||
|
def testNanos(duration: Duration) = testTime(duration.toNanos)
|
||||||
|
|
||||||
def sleepFor(duration: Duration) = Thread.sleep(testTime(duration.toMillis))
|
def sleepFor(duration: Duration) = Thread.sleep(testTime(duration.toMillis))
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,7 @@ import akka.actor.UntypedActor;
|
||||||
import akka.stm.Ref;
|
import akka.stm.Ref;
|
||||||
|
|
||||||
public class UntypedCoordinatedCounter extends UntypedActor {
|
public class UntypedCoordinatedCounter extends UntypedActor {
|
||||||
private Ref<Integer> count = new Ref(0);
|
private Ref<Integer> count = new Ref<Integer>(0);
|
||||||
|
|
||||||
private void increment() {
|
private void increment() {
|
||||||
System.out.println("incrementing");
|
System.out.println("incrementing");
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,7 @@ import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
public class UntypedCoordinatedCounter extends UntypedActor {
|
public class UntypedCoordinatedCounter extends UntypedActor {
|
||||||
private String name;
|
private String name;
|
||||||
private Ref<Integer> count = new Ref(0);
|
private Ref<Integer> count = new Ref<Integer>(0);
|
||||||
private TransactionFactory txFactory = new TransactionFactoryBuilder()
|
private TransactionFactory txFactory = new TransactionFactoryBuilder()
|
||||||
.setTimeout(new FiniteDuration(3, TimeUnit.SECONDS))
|
.setTimeout(new FiniteDuration(3, TimeUnit.SECONDS))
|
||||||
.build();
|
.build();
|
||||||
|
|
|
||||||
|
|
@ -12,6 +12,10 @@ import akka.config.Supervision._
|
||||||
import java.util.concurrent.CountDownLatch
|
import java.util.concurrent.CountDownLatch
|
||||||
import akka.config.TypedActorConfigurator
|
import akka.config.TypedActorConfigurator
|
||||||
|
|
||||||
|
import akka.testing._
|
||||||
|
import akka.util.duration._
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Martin Krasser
|
* @author Martin Krasser
|
||||||
*/
|
*/
|
||||||
|
|
@ -107,7 +111,7 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft
|
||||||
}
|
}
|
||||||
|
|
||||||
// allow some time for the actor to be stopped
|
// allow some time for the actor to be stopped
|
||||||
Thread.sleep(3000)
|
Testing.sleepFor(3 seconds)
|
||||||
|
|
||||||
val second = conf.getInstance(classOf[TypedActorFailer])
|
val second = conf.getInstance(classOf[TypedActorFailer])
|
||||||
first should be (second)
|
first should be (second)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue