Merge branch 'ticket_345'
Conflicts: akka-core/src/test/scala/SchedulerSpec.scala
This commit is contained in:
commit
20464a3d20
9 changed files with 346 additions and 132 deletions
|
|
@ -979,7 +979,6 @@ sealed class LocalActorRef private[akka](
|
|||
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = {
|
||||
if (trapExit.exists(_.isAssignableFrom(reason.getClass))) {
|
||||
faultHandler match {
|
||||
// FIXME: implement support for maxNrOfRetries and withinTimeRange in RestartStrategy
|
||||
case Some(AllForOneStrategy(maxNrOfRetries, withinTimeRange)) =>
|
||||
restartLinkedActors(reason, maxNrOfRetries, withinTimeRange)
|
||||
|
||||
|
|
@ -991,7 +990,6 @@ sealed class LocalActorRef private[akka](
|
|||
"\n\tto non-empty list of exception classes - can't proceed " + toString)
|
||||
}
|
||||
} else {
|
||||
if (lifeCycle.isEmpty) lifeCycle = Some(LifeCycle(Permanent)) // when passing on make sure we have a lifecycle
|
||||
notifySupervisorWithMessage(Exit(this, reason)) // if 'trapExit' is not defined then pass the Exit on
|
||||
}
|
||||
}
|
||||
|
|
@ -1002,7 +1000,6 @@ sealed class LocalActorRef private[akka](
|
|||
|
||||
val tooManyRestarts = maxNrOfRetriesCount > maxNrOfRetries
|
||||
val restartingHasExpired = (System.currentTimeMillis - restartsWithinTimeRangeTimestamp) > withinTimeRange
|
||||
|
||||
if (tooManyRestarts || restartingHasExpired) {
|
||||
val notification = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason)
|
||||
Actor.log.warning(
|
||||
|
|
@ -1018,17 +1015,18 @@ sealed class LocalActorRef private[akka](
|
|||
"No message handler defined for system message [MaximumNumberOfRestartsWithinTimeRangeReached]" +
|
||||
"\n\tCan't send the message to the supervisor [%s].", sup)
|
||||
}
|
||||
stop
|
||||
} else {
|
||||
_isBeingRestarted = true
|
||||
val failedActor = actorInstance.get
|
||||
guard.withGuard {
|
||||
lifeCycle.get match {
|
||||
case LifeCycle(scope, _, _) => {
|
||||
scope match {
|
||||
case Permanent =>
|
||||
lifeCycle match {
|
||||
case Some(LifeCycle(Temporary, _, _)) => shutDownTemporaryActor(this)
|
||||
case _ =>
|
||||
// either permanent or none where default is permanent
|
||||
Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id)
|
||||
restartLinkedActors(reason, maxNrOfRetries, withinTimeRange)
|
||||
Actor.log.debug("Restarting linked actors for actor [%s].", id)
|
||||
restartLinkedActors(reason, maxNrOfRetries, withinTimeRange)
|
||||
Actor.log.debug("Invoking 'preRestart' for failed actor instance [%s].", id)
|
||||
failedActor.preRestart(reason)
|
||||
nullOutActorRefReferencesFor(failedActor)
|
||||
|
|
@ -1039,9 +1037,6 @@ sealed class LocalActorRef private[akka](
|
|||
Actor.log.debug("Invoking 'postRestart' for new actor instance [%s].", id)
|
||||
freshActor.postRestart(reason)
|
||||
_isBeingRestarted = false
|
||||
case Temporary => shutDownTemporaryActor(this)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1049,14 +1044,10 @@ sealed class LocalActorRef private[akka](
|
|||
|
||||
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int) = {
|
||||
linkedActorsAsList.foreach { actorRef =>
|
||||
if (actorRef.lifeCycle.isEmpty) actorRef.lifeCycle = Some(LifeCycle(Permanent))
|
||||
actorRef.lifeCycle.get match {
|
||||
case LifeCycle(scope, _, _) => {
|
||||
scope match {
|
||||
case Permanent => actorRef.restart(reason, maxNrOfRetries, withinTimeRange)
|
||||
case Temporary => shutDownTemporaryActor(actorRef)
|
||||
}
|
||||
}
|
||||
actorRef.lifeCycle match {
|
||||
// either permanent or none where default is permanent
|
||||
case Some(LifeCycle(Temporary, _, _)) => shutDownTemporaryActor(actorRef)
|
||||
case _ => actorRef.restart(reason, maxNrOfRetries, withinTimeRange)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1160,6 +1151,7 @@ sealed class LocalActorRef private[akka](
|
|||
handleExceptionInDispatch(
|
||||
new TransactionSetAbortedException("Transaction set has been aborted by another participant"),
|
||||
message, topLevelTransaction)
|
||||
case e: InterruptedException => {} // received message while actor is shutting down, ignore
|
||||
case e =>
|
||||
handleExceptionInDispatch(e, message, topLevelTransaction)
|
||||
} finally {
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ import java.util.concurrent._
|
|||
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
|
||||
object Scheduler {
|
||||
object Scheduler extends Logging {
|
||||
import Actor._
|
||||
|
||||
case object UnSchedule
|
||||
|
|
@ -28,8 +28,12 @@ object Scheduler {
|
|||
|
||||
private var service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory)
|
||||
private val schedulers = new ConcurrentHashMap[ActorRef, ActorRef]
|
||||
log.info("Starting up Scheduler")
|
||||
|
||||
def schedule(receiver: ActorRef, message: AnyRef, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ActorRef = {
|
||||
log.trace(
|
||||
"Schedule scheduled event\n\tevent = [%s]\n\treceiver = [%s]\n\tinitialDelay = [%s]\n\tdelay = [%s]\n\ttimeUnit = [%s]",
|
||||
message, receiver, initialDelay, delay, timeUnit)
|
||||
try {
|
||||
val future = service.scheduleAtFixedRate(
|
||||
new Runnable { def run = receiver ! message },
|
||||
|
|
@ -44,6 +48,9 @@ object Scheduler {
|
|||
}
|
||||
|
||||
def scheduleOnce(receiver: ActorRef, message: AnyRef, delay: Long, timeUnit: TimeUnit): ActorRef = {
|
||||
log.trace(
|
||||
"Schedule one-time event\n\tevent = [%s]\n\treceiver = [%s]\n\tdelay = [%s]\n\ttimeUnit = [%s]",
|
||||
message, receiver, delay, timeUnit)
|
||||
try {
|
||||
val future = service.schedule(
|
||||
new Runnable { def run = receiver ! message }, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
|
||||
|
|
@ -65,6 +72,7 @@ object Scheduler {
|
|||
}
|
||||
|
||||
def shutdown = {
|
||||
log.info("Shutting down Scheduler")
|
||||
import scala.collection.JavaConversions._
|
||||
schedulers.values.foreach(_ ! UnSchedule)
|
||||
schedulers.clear
|
||||
|
|
@ -72,14 +80,16 @@ object Scheduler {
|
|||
}
|
||||
|
||||
def restart = {
|
||||
log.info("Restarting Scheduler")
|
||||
shutdown
|
||||
service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory)
|
||||
}
|
||||
}
|
||||
|
||||
private class ScheduleActor(future: ScheduledFuture[AnyRef]) extends Actor with Logging {
|
||||
private class ScheduleActor(future: ScheduledFuture[AnyRef]) extends Actor {
|
||||
def receive = {
|
||||
case Scheduler.UnSchedule =>
|
||||
Scheduler.log.trace("Unschedule event handled by scheduleActor\n\tactorRef = [%s]", self.toString)
|
||||
future.cancel(true)
|
||||
self.stop
|
||||
}
|
||||
|
|
@ -91,7 +101,7 @@ private object SchedulerThreadFactory extends ThreadFactory {
|
|||
|
||||
def newThread(r: Runnable): Thread = {
|
||||
val thread = threadFactory.newThread(r)
|
||||
thread.setName("Scheduler-" + count)
|
||||
thread.setName("akka:scheduler-" + count)
|
||||
thread.setDaemon(true)
|
||||
thread
|
||||
}
|
||||
|
|
|
|||
|
|
@ -222,6 +222,7 @@ class RemoteServer extends Logging {
|
|||
bootstrap.releaseExternalResources
|
||||
Cluster.deregisterLocalNode(hostname, port)
|
||||
} catch {
|
||||
case e: java.nio.channels.ClosedChannelException => {}
|
||||
case e => log.warning("Could not close remote server channel in a graceful way")
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,149 +11,103 @@ import org.scalatest.BeforeAndAfterAll
|
|||
import org.scalatest.junit.JUnitRunner
|
||||
import org.junit.runner.RunWith
|
||||
|
||||
import se.scalablesolutions.akka.config.Config
|
||||
import se.scalablesolutions.akka.config._
|
||||
import se.scalablesolutions.akka.config.ActiveObjectConfigurator
|
||||
import se.scalablesolutions.akka.config.JavaConfig._
|
||||
import se.scalablesolutions.akka.actor._
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class DeclarativelySupervisedNestedTransactionalActiveObjectSpec extends
|
||||
class NestedTransactionalActiveObjectSpec extends
|
||||
Spec with
|
||||
ShouldMatchers with
|
||||
BeforeAndAfterAll {
|
||||
|
||||
private val conf = new ActiveObjectConfigurator
|
||||
private var messageLog = ""
|
||||
|
||||
override def beforeAll {
|
||||
Config.config
|
||||
conf.configure(
|
||||
new RestartStrategy(new AllForOne, 3, 5000, List(classOf[Exception]).toArray),
|
||||
List(
|
||||
new Component(classOf[TransactionalActiveObject],
|
||||
new LifeCycle(new Permanent),
|
||||
10000),
|
||||
new Component(classOf[NestedTransactionalActiveObject],
|
||||
new LifeCycle(new Permanent),
|
||||
10000),
|
||||
new Component(classOf[ActiveObjectFailer],
|
||||
new LifeCycle(new Permanent),
|
||||
10000)
|
||||
).toArray).supervise
|
||||
}
|
||||
|
||||
override def afterAll {
|
||||
conf.stop
|
||||
// ActorRegistry.shutdownAll
|
||||
}
|
||||
|
||||
describe("Declaratively nested supervised transactional in-memory Active Object") {
|
||||
|
||||
it("map should not rollback state for stateful server in case of success") {
|
||||
val stateful = conf.getInstance(classOf[TransactionalActiveObject])
|
||||
val stateful = ActiveObject.newInstance(classOf[TransactionalActiveObject])
|
||||
stateful.init
|
||||
stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state
|
||||
Thread.sleep(100)
|
||||
val nested = conf.getInstance(classOf[NestedTransactionalActiveObject])
|
||||
val nested = ActiveObject.newInstance(classOf[NestedTransactionalActiveObject])
|
||||
nested.init
|
||||
nested.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state
|
||||
Thread.sleep(100)
|
||||
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested) // transactionrequired
|
||||
Thread.sleep(100)
|
||||
stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess") should equal("new state")
|
||||
nested.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess") should equal("new state")
|
||||
}
|
||||
|
||||
it("map should rollback state for stateful server in case of failure") {
|
||||
val stateful = conf.getInstance(classOf[TransactionalActiveObject])
|
||||
val stateful = ActiveObject.newInstance(classOf[TransactionalActiveObject])
|
||||
stateful.init
|
||||
stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state
|
||||
Thread.sleep(100)
|
||||
val nested = conf.getInstance(classOf[NestedTransactionalActiveObject])
|
||||
val nested = ActiveObject.newInstance(classOf[NestedTransactionalActiveObject])
|
||||
nested.init
|
||||
nested.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state
|
||||
Thread.sleep(100)
|
||||
val failer = conf.getInstance(classOf[ActiveObjectFailer])
|
||||
val failer = ActiveObject.newInstance(classOf[ActiveObjectFailer])
|
||||
try {
|
||||
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer)
|
||||
Thread.sleep(100)
|
||||
fail("should have thrown an exception")
|
||||
} catch { case e => {} }
|
||||
stateful.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure") should equal("init")
|
||||
Thread.sleep(100)
|
||||
nested.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure") should equal("init")
|
||||
}
|
||||
|
||||
it("vector should not rollback state for stateful server in case of success") {
|
||||
val stateful = conf.getInstance(classOf[TransactionalActiveObject])
|
||||
val stateful = ActiveObject.newInstance(classOf[TransactionalActiveObject])
|
||||
stateful.init
|
||||
stateful.setVectorState("init") // set init state
|
||||
Thread.sleep(100)
|
||||
val nested = conf.getInstance(classOf[NestedTransactionalActiveObject])
|
||||
val nested = ActiveObject.newInstance(classOf[NestedTransactionalActiveObject])
|
||||
nested.init
|
||||
Thread.sleep(100)
|
||||
nested.setVectorState("init") // set init state
|
||||
Thread.sleep(100)
|
||||
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested) // transactionrequired
|
||||
Thread.sleep(100)
|
||||
stateful.getVectorState should equal("new state")
|
||||
Thread.sleep(100)
|
||||
nested.getVectorState should equal("new state")
|
||||
}
|
||||
|
||||
it("vector should rollback state for stateful server in case of failure") {
|
||||
val stateful = conf.getInstance(classOf[TransactionalActiveObject])
|
||||
val stateful = ActiveObject.newInstance(classOf[TransactionalActiveObject])
|
||||
stateful.init
|
||||
stateful.setVectorState("init") // set init state
|
||||
Thread.sleep(100)
|
||||
val nested = conf.getInstance(classOf[NestedTransactionalActiveObject])
|
||||
val nested = ActiveObject.newInstance(classOf[NestedTransactionalActiveObject])
|
||||
nested.init
|
||||
nested.setVectorState("init") // set init state
|
||||
Thread.sleep(100)
|
||||
val failer = conf.getInstance(classOf[ActiveObjectFailer])
|
||||
val failer = ActiveObject.newInstance(classOf[ActiveObjectFailer])
|
||||
try {
|
||||
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer)
|
||||
Thread.sleep(100)
|
||||
fail("should have thrown an exception")
|
||||
} catch { case e => {} }
|
||||
stateful.getVectorState should equal("init")
|
||||
Thread.sleep(100)
|
||||
nested.getVectorState should equal("init")
|
||||
}
|
||||
|
||||
it("ref should not rollback state for stateful server in case of success") {
|
||||
val stateful = conf.getInstance(classOf[TransactionalActiveObject])
|
||||
val stateful = ActiveObject.newInstance(classOf[TransactionalActiveObject])
|
||||
stateful.init
|
||||
val nested = conf.getInstance(classOf[NestedTransactionalActiveObject])
|
||||
val nested = ActiveObject.newInstance(classOf[NestedTransactionalActiveObject])
|
||||
nested.init
|
||||
stateful.setRefState("init") // set init state
|
||||
Thread.sleep(100)
|
||||
nested.setRefState("init") // set init state
|
||||
Thread.sleep(100)
|
||||
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested)
|
||||
Thread.sleep(100)
|
||||
stateful.getRefState should equal("new state")
|
||||
Thread.sleep(100)
|
||||
nested.getRefState should equal("new state")
|
||||
}
|
||||
|
||||
it("ref should rollback state for stateful server in case of failure") {
|
||||
val stateful = conf.getInstance(classOf[TransactionalActiveObject])
|
||||
val stateful = ActiveObject.newInstance(classOf[TransactionalActiveObject])
|
||||
stateful.init
|
||||
val nested = conf.getInstance(classOf[NestedTransactionalActiveObject])
|
||||
val nested = ActiveObject.newInstance(classOf[NestedTransactionalActiveObject])
|
||||
nested.init
|
||||
stateful.setRefState("init") // set init state
|
||||
Thread.sleep(100)
|
||||
nested.setRefState("init") // set init state
|
||||
Thread.sleep(100)
|
||||
val failer = conf.getInstance(classOf[ActiveObjectFailer])
|
||||
val failer = ActiveObject.newInstance(classOf[ActiveObjectFailer])
|
||||
try {
|
||||
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer)
|
||||
Thread.sleep(100)
|
||||
fail("should have thrown an exception")
|
||||
} catch { case e => {} }
|
||||
stateful.getRefState should equal("init")
|
||||
Thread.sleep(100)
|
||||
nested.getRefState should equal("init")
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,114 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.actor
|
||||
|
||||
import org.scalatest.Spec
|
||||
import org.scalatest.Assertions
|
||||
import org.scalatest.matchers.ShouldMatchers
|
||||
import org.scalatest.BeforeAndAfterAll
|
||||
import org.scalatest.junit.JUnitRunner
|
||||
import org.junit.runner.RunWith
|
||||
|
||||
import se.scalablesolutions.akka.config.Config
|
||||
import se.scalablesolutions.akka.config._
|
||||
import se.scalablesolutions.akka.config.ActiveObjectConfigurator
|
||||
import se.scalablesolutions.akka.config.JavaConfig._
|
||||
import se.scalablesolutions.akka.actor._
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class RestartNestedTransactionalActiveObjectSpec extends
|
||||
Spec with
|
||||
ShouldMatchers with
|
||||
BeforeAndAfterAll {
|
||||
|
||||
private val conf = new ActiveObjectConfigurator
|
||||
private var messageLog = ""
|
||||
|
||||
override def beforeAll {
|
||||
Config.config
|
||||
conf.configure(
|
||||
new RestartStrategy(new AllForOne, 3, 5000, List(classOf[Exception]).toArray),
|
||||
List(
|
||||
new Component(classOf[TransactionalActiveObject],
|
||||
new LifeCycle(new Permanent),
|
||||
10000),
|
||||
new Component(classOf[NestedTransactionalActiveObject],
|
||||
new LifeCycle(new Permanent),
|
||||
10000),
|
||||
new Component(classOf[ActiveObjectFailer],
|
||||
new LifeCycle(new Permanent),
|
||||
10000)
|
||||
).toArray).supervise
|
||||
}
|
||||
|
||||
override def afterAll {
|
||||
conf.stop
|
||||
ActorRegistry.shutdownAll
|
||||
}
|
||||
|
||||
describe("Restart nested supervised transactional Active Object") {
|
||||
/*
|
||||
it("map should rollback state for stateful server in case of failure") {
|
||||
val stateful = conf.getInstance(classOf[TransactionalActiveObject])
|
||||
stateful.init
|
||||
stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state
|
||||
|
||||
val nested = conf.getInstance(classOf[NestedTransactionalActiveObject])
|
||||
nested.init
|
||||
nested.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state
|
||||
|
||||
val failer = conf.getInstance(classOf[ActiveObjectFailer])
|
||||
try {
|
||||
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer)
|
||||
|
||||
fail("should have thrown an exception")
|
||||
} catch { case e => {} }
|
||||
stateful.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure") should equal("init")
|
||||
|
||||
nested.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure") should equal("init")
|
||||
}
|
||||
|
||||
it("vector should rollback state for stateful server in case of failure") {
|
||||
val stateful = conf.getInstance(classOf[TransactionalActiveObject])
|
||||
stateful.init
|
||||
stateful.setVectorState("init") // set init state
|
||||
|
||||
val nested = conf.getInstance(classOf[NestedTransactionalActiveObject])
|
||||
nested.init
|
||||
nested.setVectorState("init") // set init state
|
||||
|
||||
val failer = conf.getInstance(classOf[ActiveObjectFailer])
|
||||
try {
|
||||
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer)
|
||||
|
||||
fail("should have thrown an exception")
|
||||
} catch { case e => {} }
|
||||
stateful.getVectorState should equal("init")
|
||||
|
||||
nested.getVectorState should equal("init")
|
||||
}
|
||||
|
||||
it("ref should rollback state for stateful server in case of failure") {
|
||||
val stateful = conf.getInstance(classOf[TransactionalActiveObject])
|
||||
stateful.init
|
||||
val nested = conf.getInstance(classOf[NestedTransactionalActiveObject])
|
||||
nested.init
|
||||
stateful.setRefState("init") // set init state
|
||||
|
||||
nested.setRefState("init") // set init state
|
||||
|
||||
val failer = conf.getInstance(classOf[ActiveObjectFailer])
|
||||
try {
|
||||
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer)
|
||||
|
||||
fail("should have thrown an exception")
|
||||
} catch { case e => {} }
|
||||
stateful.getRefState should equal("init")
|
||||
|
||||
nested.getRefState should equal("init")
|
||||
}
|
||||
*/
|
||||
}
|
||||
}
|
||||
74
akka-core/src/test/scala/RestartStrategySpec.scala
Normal file
74
akka-core/src/test/scala/RestartStrategySpec.scala
Normal file
|
|
@ -0,0 +1,74 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.actor
|
||||
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
|
||||
import Actor._
|
||||
import se.scalablesolutions.akka.config.OneForOneStrategy
|
||||
import java.util.concurrent.{TimeUnit, CountDownLatch}
|
||||
import se.scalablesolutions.akka.config.ScalaConfig.{Permanent, LifeCycle}
|
||||
import org.multiverse.api.latches.StandardLatch
|
||||
|
||||
class RestartStrategySpec extends JUnitSuite {
|
||||
|
||||
object Ping
|
||||
object Crash
|
||||
|
||||
@Test
|
||||
def slaveShouldStayDeadAfterMaxRestarts = {
|
||||
|
||||
val boss = actorOf(new Actor{
|
||||
self.trapExit = List(classOf[Throwable])
|
||||
self.faultHandler = Some(OneForOneStrategy(1, 1000))
|
||||
protected def receive = { case _ => () }
|
||||
}).start
|
||||
|
||||
val restartLatch = new StandardLatch
|
||||
val secondRestartLatch = new StandardLatch
|
||||
val countDownLatch = new CountDownLatch(2)
|
||||
|
||||
|
||||
val slave = actorOf(new Actor{
|
||||
|
||||
protected def receive = {
|
||||
case Ping => countDownLatch.countDown
|
||||
case Crash => throw new Exception("Crashing...")
|
||||
}
|
||||
override def postRestart(reason: Throwable) = {
|
||||
restartLatch.open
|
||||
}
|
||||
|
||||
override def shutdown = {
|
||||
if (restartLatch.isOpen) {
|
||||
secondRestartLatch.open
|
||||
}
|
||||
}
|
||||
})
|
||||
boss.startLink(slave)
|
||||
|
||||
slave ! Ping
|
||||
slave ! Crash
|
||||
slave ! Ping
|
||||
|
||||
// test restart and post restart ping
|
||||
assert(restartLatch.tryAwait(1, TimeUnit.SECONDS))
|
||||
assert(countDownLatch.await(1, TimeUnit.SECONDS))
|
||||
|
||||
// now crash again... should not restart
|
||||
slave ! Crash
|
||||
|
||||
assert(secondRestartLatch.tryAwait(1, TimeUnit.SECONDS))
|
||||
val exceptionLatch = new StandardLatch
|
||||
try {
|
||||
slave ! Ping // this should fail
|
||||
} catch {
|
||||
case e => exceptionLatch.open // expected here
|
||||
}
|
||||
assert(exceptionLatch.tryAwait(1, TimeUnit.SECONDS))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,92 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.actor
|
||||
|
||||
import org.scalatest.Spec
|
||||
import org.scalatest.Assertions
|
||||
import org.scalatest.matchers.ShouldMatchers
|
||||
import org.scalatest.BeforeAndAfterAll
|
||||
import org.scalatest.junit.JUnitRunner
|
||||
import org.junit.runner.RunWith
|
||||
|
||||
import se.scalablesolutions.akka.config.Config
|
||||
import se.scalablesolutions.akka.config._
|
||||
import se.scalablesolutions.akka.config.ActiveObjectConfigurator
|
||||
import se.scalablesolutions.akka.config.JavaConfig._
|
||||
import se.scalablesolutions.akka.actor._
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class RestartTransactionalActiveObjectSpec extends
|
||||
Spec with
|
||||
ShouldMatchers with
|
||||
BeforeAndAfterAll {
|
||||
|
||||
private val conf = new ActiveObjectConfigurator
|
||||
private var messageLog = ""
|
||||
|
||||
def before {
|
||||
Config.config
|
||||
conf.configure(
|
||||
new RestartStrategy(new AllForOne, 3, 5000, List(classOf[Exception]).toArray),
|
||||
List(
|
||||
new Component(
|
||||
classOf[TransactionalActiveObject],
|
||||
new LifeCycle(new Temporary),
|
||||
10000),
|
||||
new Component(
|
||||
classOf[ActiveObjectFailer],
|
||||
new LifeCycle(new Temporary),
|
||||
10000)
|
||||
).toArray).supervise
|
||||
}
|
||||
|
||||
def after {
|
||||
conf.stop
|
||||
ActorRegistry.shutdownAll
|
||||
}
|
||||
|
||||
describe("Restart supervised transactional Active Object ") {
|
||||
/*
|
||||
it("map should rollback state for stateful server in case of failure") {
|
||||
before
|
||||
val stateful = conf.getInstance(classOf[TransactionalActiveObject])
|
||||
stateful.init
|
||||
stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init")
|
||||
val failer = conf.getInstance(classOf[ActiveObjectFailer])
|
||||
try {
|
||||
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer)
|
||||
fail("should have thrown an exception")
|
||||
} catch { case e => {} }
|
||||
stateful.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure") should equal("init")
|
||||
after
|
||||
}
|
||||
|
||||
it("vector should rollback state for stateful server in case of failure") {
|
||||
before
|
||||
val stateful = conf.getInstance(classOf[TransactionalActiveObject])
|
||||
stateful.init
|
||||
stateful.setVectorState("init") // set init state
|
||||
val failer = conf.getInstance(classOf[ActiveObjectFailer])
|
||||
try {
|
||||
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer)
|
||||
fail("should have thrown an exception")
|
||||
} catch { case e => {} }
|
||||
stateful.getVectorState should equal("init")
|
||||
after
|
||||
}
|
||||
|
||||
it("ref should rollback state for stateful server in case of failure") {
|
||||
val stateful = conf.getInstance(classOf[TransactionalActiveObject])
|
||||
stateful.init
|
||||
stateful.setRefState("init") // set init state
|
||||
val failer = conf.getInstance(classOf[ActiveObjectFailer])
|
||||
try {
|
||||
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer)
|
||||
fail("should have thrown an exception")
|
||||
} catch { case e => {} }
|
||||
stateful.getRefState should equal("init")
|
||||
}
|
||||
*/ }
|
||||
}
|
||||
|
|
@ -66,7 +66,6 @@ class SchedulerSpec extends JUnitSuite {
|
|||
|
||||
override def postRestart(reason: Throwable) = restartLatch.open
|
||||
})
|
||||
|
||||
Supervisor(
|
||||
SupervisorConfig(
|
||||
RestartStrategy(AllForOne, 3, 1000,
|
||||
|
|
|
|||
|
|
@ -11,45 +11,23 @@ import org.scalatest.BeforeAndAfterAll
|
|||
import org.scalatest.junit.JUnitRunner
|
||||
import org.junit.runner.RunWith
|
||||
|
||||
import se.scalablesolutions.akka.config.Config
|
||||
import se.scalablesolutions.akka.config._
|
||||
import se.scalablesolutions.akka.config.ActiveObjectConfigurator
|
||||
import se.scalablesolutions.akka.config.JavaConfig._
|
||||
import se.scalablesolutions.akka.actor._
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class DeclarativelySupervisedTransactionalActiveObjectSpec extends
|
||||
class TransactionalActiveObjectSpec extends
|
||||
Spec with
|
||||
ShouldMatchers with
|
||||
BeforeAndAfterAll {
|
||||
|
||||
private val conf = new ActiveObjectConfigurator
|
||||
private var messageLog = ""
|
||||
|
||||
override def beforeAll {
|
||||
Config.config
|
||||
conf.configure(
|
||||
new RestartStrategy(new AllForOne, 3, 5000, List(classOf[Exception]).toArray),
|
||||
List(
|
||||
new Component(
|
||||
classOf[TransactionalActiveObject],
|
||||
new LifeCycle(new Permanent),
|
||||
10000),
|
||||
new Component(
|
||||
classOf[ActiveObjectFailer],
|
||||
new LifeCycle(new Permanent),
|
||||
10000)
|
||||
).toArray).supervise
|
||||
}
|
||||
|
||||
override def afterAll {
|
||||
conf.stop
|
||||
// ActorRegistry.shutdownAll
|
||||
}
|
||||
|
||||
describe("Declaratively supervised transactional in-memory Active Object ") {
|
||||
|
||||
it("map should not rollback state for stateful server in case of success") {
|
||||
val stateful = conf.getInstance(classOf[TransactionalActiveObject])
|
||||
val stateful = ActiveObject.newInstance(classOf[TransactionalActiveObject])
|
||||
stateful.init
|
||||
stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init")
|
||||
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state")
|
||||
|
|
@ -57,10 +35,10 @@ class DeclarativelySupervisedTransactionalActiveObjectSpec extends
|
|||
}
|
||||
|
||||
it("map should rollback state for stateful server in case of failure") {
|
||||
val stateful = conf.getInstance(classOf[TransactionalActiveObject])
|
||||
val stateful = ActiveObject.newInstance(classOf[TransactionalActiveObject])
|
||||
stateful.init
|
||||
stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init")
|
||||
val failer = conf.getInstance(classOf[ActiveObjectFailer])
|
||||
val failer = ActiveObject.newInstance(classOf[ActiveObjectFailer])
|
||||
try {
|
||||
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer)
|
||||
fail("should have thrown an exception")
|
||||
|
|
@ -69,7 +47,7 @@ class DeclarativelySupervisedTransactionalActiveObjectSpec extends
|
|||
}
|
||||
|
||||
it("vector should not rollback state for stateful server in case of success") {
|
||||
val stateful = conf.getInstance(classOf[TransactionalActiveObject])
|
||||
val stateful = ActiveObject.newInstance(classOf[TransactionalActiveObject])
|
||||
stateful.init
|
||||
stateful.setVectorState("init") // set init state
|
||||
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state")
|
||||
|
|
@ -77,10 +55,10 @@ class DeclarativelySupervisedTransactionalActiveObjectSpec extends
|
|||
}
|
||||
|
||||
it("vector should rollback state for stateful server in case of failure") {
|
||||
val stateful = conf.getInstance(classOf[TransactionalActiveObject])
|
||||
val stateful = ActiveObject.newInstance(classOf[TransactionalActiveObject])
|
||||
stateful.init
|
||||
stateful.setVectorState("init") // set init state
|
||||
val failer = conf.getInstance(classOf[ActiveObjectFailer])
|
||||
val failer = ActiveObject.newInstance(classOf[ActiveObjectFailer])
|
||||
try {
|
||||
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer)
|
||||
fail("should have thrown an exception")
|
||||
|
|
@ -89,7 +67,7 @@ class DeclarativelySupervisedTransactionalActiveObjectSpec extends
|
|||
}
|
||||
|
||||
it("ref should not rollback state for stateful server in case of success") {
|
||||
val stateful = conf.getInstance(classOf[TransactionalActiveObject])
|
||||
val stateful = ActiveObject.newInstance(classOf[TransactionalActiveObject])
|
||||
stateful.init
|
||||
stateful.setRefState("init") // set init state
|
||||
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state")
|
||||
|
|
@ -97,10 +75,10 @@ class DeclarativelySupervisedTransactionalActiveObjectSpec extends
|
|||
}
|
||||
|
||||
it("ref should rollback state for stateful server in case of failure") {
|
||||
val stateful = conf.getInstance(classOf[TransactionalActiveObject])
|
||||
val stateful = ActiveObject.newInstance(classOf[TransactionalActiveObject])
|
||||
stateful.init
|
||||
stateful.setRefState("init") // set init state
|
||||
val failer = conf.getInstance(classOf[ActiveObjectFailer])
|
||||
val failer = ActiveObject.newInstance(classOf[ActiveObjectFailer])
|
||||
try {
|
||||
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer)
|
||||
fail("should have thrown an exception")
|
||||
Loading…
Add table
Add a link
Reference in a new issue