diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index 82f035f311..9824e8b2b6 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -979,7 +979,6 @@ sealed class LocalActorRef private[akka]( protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = { if (trapExit.exists(_.isAssignableFrom(reason.getClass))) { faultHandler match { - // FIXME: implement support for maxNrOfRetries and withinTimeRange in RestartStrategy case Some(AllForOneStrategy(maxNrOfRetries, withinTimeRange)) => restartLinkedActors(reason, maxNrOfRetries, withinTimeRange) @@ -991,7 +990,6 @@ sealed class LocalActorRef private[akka]( "\n\tto non-empty list of exception classes - can't proceed " + toString) } } else { - if (lifeCycle.isEmpty) lifeCycle = Some(LifeCycle(Permanent)) // when passing on make sure we have a lifecycle notifySupervisorWithMessage(Exit(this, reason)) // if 'trapExit' is not defined then pass the Exit on } } @@ -1002,7 +1000,6 @@ sealed class LocalActorRef private[akka]( val tooManyRestarts = maxNrOfRetriesCount > maxNrOfRetries val restartingHasExpired = (System.currentTimeMillis - restartsWithinTimeRangeTimestamp) > withinTimeRange - if (tooManyRestarts || restartingHasExpired) { val notification = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason) Actor.log.warning( @@ -1018,30 +1015,28 @@ sealed class LocalActorRef private[akka]( "No message handler defined for system message [MaximumNumberOfRestartsWithinTimeRangeReached]" + "\n\tCan't send the message to the supervisor [%s].", sup) } - } else { + stop + } else { _isBeingRestarted = true val failedActor = actorInstance.get guard.withGuard { - lifeCycle.get match { - case LifeCycle(scope, _, _) => { - scope match { - case Permanent => - Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id) - restartLinkedActors(reason, maxNrOfRetries, withinTimeRange) - Actor.log.debug("Restarting linked actors for actor [%s].", id) - Actor.log.debug("Invoking 'preRestart' for failed actor instance [%s].", id) - failedActor.preRestart(reason) - nullOutActorRefReferencesFor(failedActor) - val freshActor = newActor - freshActor.init - freshActor.initTransactionalState - actorInstance.set(freshActor) - Actor.log.debug("Invoking 'postRestart' for new actor instance [%s].", id) - freshActor.postRestart(reason) - _isBeingRestarted = false - case Temporary => shutDownTemporaryActor(this) - } - } + lifeCycle match { + case Some(LifeCycle(Temporary, _, _)) => shutDownTemporaryActor(this) + case _ => + // either permanent or none where default is permanent + Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id) + Actor.log.debug("Restarting linked actors for actor [%s].", id) + restartLinkedActors(reason, maxNrOfRetries, withinTimeRange) + Actor.log.debug("Invoking 'preRestart' for failed actor instance [%s].", id) + failedActor.preRestart(reason) + nullOutActorRefReferencesFor(failedActor) + val freshActor = newActor + freshActor.init + freshActor.initTransactionalState + actorInstance.set(freshActor) + Actor.log.debug("Invoking 'postRestart' for new actor instance [%s].", id) + freshActor.postRestart(reason) + _isBeingRestarted = false } } } @@ -1049,14 +1044,10 @@ sealed class LocalActorRef private[akka]( protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int) = { linkedActorsAsList.foreach { actorRef => - if (actorRef.lifeCycle.isEmpty) actorRef.lifeCycle = Some(LifeCycle(Permanent)) - actorRef.lifeCycle.get match { - case LifeCycle(scope, _, _) => { - scope match { - case Permanent => actorRef.restart(reason, maxNrOfRetries, withinTimeRange) - case Temporary => shutDownTemporaryActor(actorRef) - } - } + actorRef.lifeCycle match { + // either permanent or none where default is permanent + case Some(LifeCycle(Temporary, _, _)) => shutDownTemporaryActor(actorRef) + case _ => actorRef.restart(reason, maxNrOfRetries, withinTimeRange) } } } @@ -1160,6 +1151,7 @@ sealed class LocalActorRef private[akka]( handleExceptionInDispatch( new TransactionSetAbortedException("Transaction set has been aborted by another participant"), message, topLevelTransaction) + case e: InterruptedException => {} // received message while actor is shutting down, ignore case e => handleExceptionInDispatch(e, message, topLevelTransaction) } finally { diff --git a/akka-core/src/main/scala/actor/Scheduler.scala b/akka-core/src/main/scala/actor/Scheduler.scala index 6a7187afdc..6f4f099bb2 100644 --- a/akka-core/src/main/scala/actor/Scheduler.scala +++ b/akka-core/src/main/scala/actor/Scheduler.scala @@ -20,7 +20,7 @@ import java.util.concurrent._ import se.scalablesolutions.akka.util.Logging -object Scheduler { +object Scheduler extends Logging { import Actor._ case object UnSchedule @@ -28,8 +28,12 @@ object Scheduler { private var service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory) private val schedulers = new ConcurrentHashMap[ActorRef, ActorRef] + log.info("Starting up Scheduler") def schedule(receiver: ActorRef, message: AnyRef, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ActorRef = { + log.trace( + "Schedule scheduled event\n\tevent = [%s]\n\treceiver = [%s]\n\tinitialDelay = [%s]\n\tdelay = [%s]\n\ttimeUnit = [%s]", + message, receiver, initialDelay, delay, timeUnit) try { val future = service.scheduleAtFixedRate( new Runnable { def run = receiver ! message }, @@ -44,6 +48,9 @@ object Scheduler { } def scheduleOnce(receiver: ActorRef, message: AnyRef, delay: Long, timeUnit: TimeUnit): ActorRef = { + log.trace( + "Schedule one-time event\n\tevent = [%s]\n\treceiver = [%s]\n\tdelay = [%s]\n\ttimeUnit = [%s]", + message, receiver, delay, timeUnit) try { val future = service.schedule( new Runnable { def run = receiver ! message }, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]] @@ -65,6 +72,7 @@ object Scheduler { } def shutdown = { + log.info("Shutting down Scheduler") import scala.collection.JavaConversions._ schedulers.values.foreach(_ ! UnSchedule) schedulers.clear @@ -72,14 +80,16 @@ object Scheduler { } def restart = { + log.info("Restarting Scheduler") shutdown service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory) } } -private class ScheduleActor(future: ScheduledFuture[AnyRef]) extends Actor with Logging { +private class ScheduleActor(future: ScheduledFuture[AnyRef]) extends Actor { def receive = { case Scheduler.UnSchedule => + Scheduler.log.trace("Unschedule event handled by scheduleActor\n\tactorRef = [%s]", self.toString) future.cancel(true) self.stop } @@ -91,7 +101,7 @@ private object SchedulerThreadFactory extends ThreadFactory { def newThread(r: Runnable): Thread = { val thread = threadFactory.newThread(r) - thread.setName("Scheduler-" + count) + thread.setName("akka:scheduler-" + count) thread.setDaemon(true) thread } diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index 9ad33e9f3e..c3f5cc4f0a 100644 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -222,6 +222,7 @@ class RemoteServer extends Logging { bootstrap.releaseExternalResources Cluster.deregisterLocalNode(hostname, port) } catch { + case e: java.nio.channels.ClosedChannelException => {} case e => log.warning("Could not close remote server channel in a graceful way") } } diff --git a/akka-core/src/test/scala/DeclarativelySupervisedNestedTransactionalActiveObjectSpec.scala b/akka-core/src/test/scala/NestedTransactionalActiveObjectSpec.scala similarity index 61% rename from akka-core/src/test/scala/DeclarativelySupervisedNestedTransactionalActiveObjectSpec.scala rename to akka-core/src/test/scala/NestedTransactionalActiveObjectSpec.scala index ea244bf966..afb9f9523e 100644 --- a/akka-core/src/test/scala/DeclarativelySupervisedNestedTransactionalActiveObjectSpec.scala +++ b/akka-core/src/test/scala/NestedTransactionalActiveObjectSpec.scala @@ -11,149 +11,103 @@ import org.scalatest.BeforeAndAfterAll import org.scalatest.junit.JUnitRunner import org.junit.runner.RunWith -import se.scalablesolutions.akka.config.Config -import se.scalablesolutions.akka.config._ -import se.scalablesolutions.akka.config.ActiveObjectConfigurator -import se.scalablesolutions.akka.config.JavaConfig._ import se.scalablesolutions.akka.actor._ @RunWith(classOf[JUnitRunner]) -class DeclarativelySupervisedNestedTransactionalActiveObjectSpec extends +class NestedTransactionalActiveObjectSpec extends Spec with ShouldMatchers with BeforeAndAfterAll { - private val conf = new ActiveObjectConfigurator private var messageLog = "" - override def beforeAll { - Config.config - conf.configure( - new RestartStrategy(new AllForOne, 3, 5000, List(classOf[Exception]).toArray), - List( - new Component(classOf[TransactionalActiveObject], - new LifeCycle(new Permanent), - 10000), - new Component(classOf[NestedTransactionalActiveObject], - new LifeCycle(new Permanent), - 10000), - new Component(classOf[ActiveObjectFailer], - new LifeCycle(new Permanent), - 10000) - ).toArray).supervise - } - override def afterAll { - conf.stop + // ActorRegistry.shutdownAll } describe("Declaratively nested supervised transactional in-memory Active Object") { it("map should not rollback state for stateful server in case of success") { - val stateful = conf.getInstance(classOf[TransactionalActiveObject]) + val stateful = ActiveObject.newInstance(classOf[TransactionalActiveObject]) stateful.init stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state - Thread.sleep(100) - val nested = conf.getInstance(classOf[NestedTransactionalActiveObject]) + val nested = ActiveObject.newInstance(classOf[NestedTransactionalActiveObject]) nested.init nested.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state - Thread.sleep(100) stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested) // transactionrequired - Thread.sleep(100) stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess") should equal("new state") nested.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess") should equal("new state") } it("map should rollback state for stateful server in case of failure") { - val stateful = conf.getInstance(classOf[TransactionalActiveObject]) + val stateful = ActiveObject.newInstance(classOf[TransactionalActiveObject]) stateful.init stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state - Thread.sleep(100) - val nested = conf.getInstance(classOf[NestedTransactionalActiveObject]) + val nested = ActiveObject.newInstance(classOf[NestedTransactionalActiveObject]) nested.init nested.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state - Thread.sleep(100) - val failer = conf.getInstance(classOf[ActiveObjectFailer]) + val failer = ActiveObject.newInstance(classOf[ActiveObjectFailer]) try { stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer) - Thread.sleep(100) fail("should have thrown an exception") } catch { case e => {} } stateful.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure") should equal("init") - Thread.sleep(100) nested.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure") should equal("init") } it("vector should not rollback state for stateful server in case of success") { - val stateful = conf.getInstance(classOf[TransactionalActiveObject]) + val stateful = ActiveObject.newInstance(classOf[TransactionalActiveObject]) stateful.init stateful.setVectorState("init") // set init state - Thread.sleep(100) - val nested = conf.getInstance(classOf[NestedTransactionalActiveObject]) + val nested = ActiveObject.newInstance(classOf[NestedTransactionalActiveObject]) nested.init - Thread.sleep(100) nested.setVectorState("init") // set init state - Thread.sleep(100) stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested) // transactionrequired - Thread.sleep(100) stateful.getVectorState should equal("new state") - Thread.sleep(100) nested.getVectorState should equal("new state") } it("vector should rollback state for stateful server in case of failure") { - val stateful = conf.getInstance(classOf[TransactionalActiveObject]) + val stateful = ActiveObject.newInstance(classOf[TransactionalActiveObject]) stateful.init stateful.setVectorState("init") // set init state - Thread.sleep(100) - val nested = conf.getInstance(classOf[NestedTransactionalActiveObject]) + val nested = ActiveObject.newInstance(classOf[NestedTransactionalActiveObject]) nested.init nested.setVectorState("init") // set init state - Thread.sleep(100) - val failer = conf.getInstance(classOf[ActiveObjectFailer]) + val failer = ActiveObject.newInstance(classOf[ActiveObjectFailer]) try { stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer) - Thread.sleep(100) fail("should have thrown an exception") } catch { case e => {} } stateful.getVectorState should equal("init") - Thread.sleep(100) nested.getVectorState should equal("init") } it("ref should not rollback state for stateful server in case of success") { - val stateful = conf.getInstance(classOf[TransactionalActiveObject]) + val stateful = ActiveObject.newInstance(classOf[TransactionalActiveObject]) stateful.init - val nested = conf.getInstance(classOf[NestedTransactionalActiveObject]) + val nested = ActiveObject.newInstance(classOf[NestedTransactionalActiveObject]) nested.init stateful.setRefState("init") // set init state - Thread.sleep(100) nested.setRefState("init") // set init state - Thread.sleep(100) stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested) - Thread.sleep(100) stateful.getRefState should equal("new state") - Thread.sleep(100) nested.getRefState should equal("new state") } it("ref should rollback state for stateful server in case of failure") { - val stateful = conf.getInstance(classOf[TransactionalActiveObject]) + val stateful = ActiveObject.newInstance(classOf[TransactionalActiveObject]) stateful.init - val nested = conf.getInstance(classOf[NestedTransactionalActiveObject]) + val nested = ActiveObject.newInstance(classOf[NestedTransactionalActiveObject]) nested.init stateful.setRefState("init") // set init state - Thread.sleep(100) nested.setRefState("init") // set init state - Thread.sleep(100) - val failer = conf.getInstance(classOf[ActiveObjectFailer]) + val failer = ActiveObject.newInstance(classOf[ActiveObjectFailer]) try { stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer) - Thread.sleep(100) fail("should have thrown an exception") } catch { case e => {} } stateful.getRefState should equal("init") - Thread.sleep(100) nested.getRefState should equal("init") } } diff --git a/akka-core/src/test/scala/RestartNestedTransactionalActiveObjectSpec.scala b/akka-core/src/test/scala/RestartNestedTransactionalActiveObjectSpec.scala new file mode 100644 index 0000000000..67076aff7c --- /dev/null +++ b/akka-core/src/test/scala/RestartNestedTransactionalActiveObjectSpec.scala @@ -0,0 +1,114 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.actor + +import org.scalatest.Spec +import org.scalatest.Assertions +import org.scalatest.matchers.ShouldMatchers +import org.scalatest.BeforeAndAfterAll +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith + +import se.scalablesolutions.akka.config.Config +import se.scalablesolutions.akka.config._ +import se.scalablesolutions.akka.config.ActiveObjectConfigurator +import se.scalablesolutions.akka.config.JavaConfig._ +import se.scalablesolutions.akka.actor._ + +@RunWith(classOf[JUnitRunner]) +class RestartNestedTransactionalActiveObjectSpec extends + Spec with + ShouldMatchers with + BeforeAndAfterAll { + + private val conf = new ActiveObjectConfigurator + private var messageLog = "" + + override def beforeAll { + Config.config + conf.configure( + new RestartStrategy(new AllForOne, 3, 5000, List(classOf[Exception]).toArray), + List( + new Component(classOf[TransactionalActiveObject], + new LifeCycle(new Permanent), + 10000), + new Component(classOf[NestedTransactionalActiveObject], + new LifeCycle(new Permanent), + 10000), + new Component(classOf[ActiveObjectFailer], + new LifeCycle(new Permanent), + 10000) + ).toArray).supervise + } + + override def afterAll { + conf.stop + ActorRegistry.shutdownAll + } + + describe("Restart nested supervised transactional Active Object") { +/* + it("map should rollback state for stateful server in case of failure") { + val stateful = conf.getInstance(classOf[TransactionalActiveObject]) + stateful.init + stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state + + val nested = conf.getInstance(classOf[NestedTransactionalActiveObject]) + nested.init + nested.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state + + val failer = conf.getInstance(classOf[ActiveObjectFailer]) + try { + stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer) + + fail("should have thrown an exception") + } catch { case e => {} } + stateful.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure") should equal("init") + + nested.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure") should equal("init") + } + + it("vector should rollback state for stateful server in case of failure") { + val stateful = conf.getInstance(classOf[TransactionalActiveObject]) + stateful.init + stateful.setVectorState("init") // set init state + + val nested = conf.getInstance(classOf[NestedTransactionalActiveObject]) + nested.init + nested.setVectorState("init") // set init state + + val failer = conf.getInstance(classOf[ActiveObjectFailer]) + try { + stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer) + + fail("should have thrown an exception") + } catch { case e => {} } + stateful.getVectorState should equal("init") + + nested.getVectorState should equal("init") + } + + it("ref should rollback state for stateful server in case of failure") { + val stateful = conf.getInstance(classOf[TransactionalActiveObject]) + stateful.init + val nested = conf.getInstance(classOf[NestedTransactionalActiveObject]) + nested.init + stateful.setRefState("init") // set init state + + nested.setRefState("init") // set init state + + val failer = conf.getInstance(classOf[ActiveObjectFailer]) + try { + stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", nested, failer) + + fail("should have thrown an exception") + } catch { case e => {} } + stateful.getRefState should equal("init") + + nested.getRefState should equal("init") + } + */ + } +} \ No newline at end of file diff --git a/akka-core/src/test/scala/RestartStrategySpec.scala b/akka-core/src/test/scala/RestartStrategySpec.scala new file mode 100644 index 0000000000..5023c756e1 --- /dev/null +++ b/akka-core/src/test/scala/RestartStrategySpec.scala @@ -0,0 +1,74 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.actor + +import org.scalatest.junit.JUnitSuite +import org.junit.Test + +import Actor._ +import se.scalablesolutions.akka.config.OneForOneStrategy +import java.util.concurrent.{TimeUnit, CountDownLatch} +import se.scalablesolutions.akka.config.ScalaConfig.{Permanent, LifeCycle} +import org.multiverse.api.latches.StandardLatch + +class RestartStrategySpec extends JUnitSuite { + + object Ping + object Crash + + @Test + def slaveShouldStayDeadAfterMaxRestarts = { + + val boss = actorOf(new Actor{ + self.trapExit = List(classOf[Throwable]) + self.faultHandler = Some(OneForOneStrategy(1, 1000)) + protected def receive = { case _ => () } + }).start + + val restartLatch = new StandardLatch + val secondRestartLatch = new StandardLatch + val countDownLatch = new CountDownLatch(2) + + + val slave = actorOf(new Actor{ + + protected def receive = { + case Ping => countDownLatch.countDown + case Crash => throw new Exception("Crashing...") + } + override def postRestart(reason: Throwable) = { + restartLatch.open + } + + override def shutdown = { + if (restartLatch.isOpen) { + secondRestartLatch.open + } + } + }) + boss.startLink(slave) + + slave ! Ping + slave ! Crash + slave ! Ping + + // test restart and post restart ping + assert(restartLatch.tryAwait(1, TimeUnit.SECONDS)) + assert(countDownLatch.await(1, TimeUnit.SECONDS)) + + // now crash again... should not restart + slave ! Crash + + assert(secondRestartLatch.tryAwait(1, TimeUnit.SECONDS)) + val exceptionLatch = new StandardLatch + try { + slave ! Ping // this should fail + } catch { + case e => exceptionLatch.open // expected here + } + assert(exceptionLatch.tryAwait(1, TimeUnit.SECONDS)) + } +} + diff --git a/akka-core/src/test/scala/RestartTransactionalActiveObjectSpec.scala b/akka-core/src/test/scala/RestartTransactionalActiveObjectSpec.scala new file mode 100644 index 0000000000..a1a2f8b507 --- /dev/null +++ b/akka-core/src/test/scala/RestartTransactionalActiveObjectSpec.scala @@ -0,0 +1,92 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.actor + +import org.scalatest.Spec +import org.scalatest.Assertions +import org.scalatest.matchers.ShouldMatchers +import org.scalatest.BeforeAndAfterAll +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith + +import se.scalablesolutions.akka.config.Config +import se.scalablesolutions.akka.config._ +import se.scalablesolutions.akka.config.ActiveObjectConfigurator +import se.scalablesolutions.akka.config.JavaConfig._ +import se.scalablesolutions.akka.actor._ + +@RunWith(classOf[JUnitRunner]) +class RestartTransactionalActiveObjectSpec extends + Spec with + ShouldMatchers with + BeforeAndAfterAll { + + private val conf = new ActiveObjectConfigurator + private var messageLog = "" + + def before { + Config.config + conf.configure( + new RestartStrategy(new AllForOne, 3, 5000, List(classOf[Exception]).toArray), + List( + new Component( + classOf[TransactionalActiveObject], + new LifeCycle(new Temporary), + 10000), + new Component( + classOf[ActiveObjectFailer], + new LifeCycle(new Temporary), + 10000) + ).toArray).supervise + } + + def after { + conf.stop + ActorRegistry.shutdownAll + } + + describe("Restart supervised transactional Active Object ") { +/* + it("map should rollback state for stateful server in case of failure") { + before + val stateful = conf.getInstance(classOf[TransactionalActiveObject]) + stateful.init + stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") + val failer = conf.getInstance(classOf[ActiveObjectFailer]) + try { + stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) + fail("should have thrown an exception") + } catch { case e => {} } + stateful.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure") should equal("init") + after + } + + it("vector should rollback state for stateful server in case of failure") { + before + val stateful = conf.getInstance(classOf[TransactionalActiveObject]) + stateful.init + stateful.setVectorState("init") // set init state + val failer = conf.getInstance(classOf[ActiveObjectFailer]) + try { + stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) + fail("should have thrown an exception") + } catch { case e => {} } + stateful.getVectorState should equal("init") + after + } + + it("ref should rollback state for stateful server in case of failure") { + val stateful = conf.getInstance(classOf[TransactionalActiveObject]) + stateful.init + stateful.setRefState("init") // set init state + val failer = conf.getInstance(classOf[ActiveObjectFailer]) + try { + stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) + fail("should have thrown an exception") + } catch { case e => {} } + stateful.getRefState should equal("init") + } +*/ } +} diff --git a/akka-core/src/test/scala/SchedulerSpec.scala b/akka-core/src/test/scala/SchedulerSpec.scala index 08be87d728..1cedb27354 100644 --- a/akka-core/src/test/scala/SchedulerSpec.scala +++ b/akka-core/src/test/scala/SchedulerSpec.scala @@ -66,7 +66,6 @@ class SchedulerSpec extends JUnitSuite { override def postRestart(reason: Throwable) = restartLatch.open }) - Supervisor( SupervisorConfig( RestartStrategy(AllForOne, 3, 1000, diff --git a/akka-core/src/test/scala/DeclarativelySupervisedTransactionalActiveObjectSpec.scala b/akka-core/src/test/scala/TransactionalActiveObjectSpec.scala similarity index 67% rename from akka-core/src/test/scala/DeclarativelySupervisedTransactionalActiveObjectSpec.scala rename to akka-core/src/test/scala/TransactionalActiveObjectSpec.scala index 95d7b8e5df..1225df9a92 100644 --- a/akka-core/src/test/scala/DeclarativelySupervisedTransactionalActiveObjectSpec.scala +++ b/akka-core/src/test/scala/TransactionalActiveObjectSpec.scala @@ -11,45 +11,23 @@ import org.scalatest.BeforeAndAfterAll import org.scalatest.junit.JUnitRunner import org.junit.runner.RunWith -import se.scalablesolutions.akka.config.Config -import se.scalablesolutions.akka.config._ -import se.scalablesolutions.akka.config.ActiveObjectConfigurator -import se.scalablesolutions.akka.config.JavaConfig._ import se.scalablesolutions.akka.actor._ @RunWith(classOf[JUnitRunner]) -class DeclarativelySupervisedTransactionalActiveObjectSpec extends +class TransactionalActiveObjectSpec extends Spec with ShouldMatchers with BeforeAndAfterAll { - private val conf = new ActiveObjectConfigurator private var messageLog = "" - override def beforeAll { - Config.config - conf.configure( - new RestartStrategy(new AllForOne, 3, 5000, List(classOf[Exception]).toArray), - List( - new Component( - classOf[TransactionalActiveObject], - new LifeCycle(new Permanent), - 10000), - new Component( - classOf[ActiveObjectFailer], - new LifeCycle(new Permanent), - 10000) - ).toArray).supervise - } - override def afterAll { - conf.stop +// ActorRegistry.shutdownAll } describe("Declaratively supervised transactional in-memory Active Object ") { - it("map should not rollback state for stateful server in case of success") { - val stateful = conf.getInstance(classOf[TransactionalActiveObject]) + val stateful = ActiveObject.newInstance(classOf[TransactionalActiveObject]) stateful.init stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") @@ -57,10 +35,10 @@ class DeclarativelySupervisedTransactionalActiveObjectSpec extends } it("map should rollback state for stateful server in case of failure") { - val stateful = conf.getInstance(classOf[TransactionalActiveObject]) + val stateful = ActiveObject.newInstance(classOf[TransactionalActiveObject]) stateful.init stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") - val failer = conf.getInstance(classOf[ActiveObjectFailer]) + val failer = ActiveObject.newInstance(classOf[ActiveObjectFailer]) try { stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) fail("should have thrown an exception") @@ -69,7 +47,7 @@ class DeclarativelySupervisedTransactionalActiveObjectSpec extends } it("vector should not rollback state for stateful server in case of success") { - val stateful = conf.getInstance(classOf[TransactionalActiveObject]) + val stateful = ActiveObject.newInstance(classOf[TransactionalActiveObject]) stateful.init stateful.setVectorState("init") // set init state stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") @@ -77,10 +55,10 @@ class DeclarativelySupervisedTransactionalActiveObjectSpec extends } it("vector should rollback state for stateful server in case of failure") { - val stateful = conf.getInstance(classOf[TransactionalActiveObject]) + val stateful = ActiveObject.newInstance(classOf[TransactionalActiveObject]) stateful.init stateful.setVectorState("init") // set init state - val failer = conf.getInstance(classOf[ActiveObjectFailer]) + val failer = ActiveObject.newInstance(classOf[ActiveObjectFailer]) try { stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) fail("should have thrown an exception") @@ -89,7 +67,7 @@ class DeclarativelySupervisedTransactionalActiveObjectSpec extends } it("ref should not rollback state for stateful server in case of success") { - val stateful = conf.getInstance(classOf[TransactionalActiveObject]) + val stateful = ActiveObject.newInstance(classOf[TransactionalActiveObject]) stateful.init stateful.setRefState("init") // set init state stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") @@ -97,10 +75,10 @@ class DeclarativelySupervisedTransactionalActiveObjectSpec extends } it("ref should rollback state for stateful server in case of failure") { - val stateful = conf.getInstance(classOf[TransactionalActiveObject]) + val stateful = ActiveObject.newInstance(classOf[TransactionalActiveObject]) stateful.init stateful.setRefState("init") // set init state - val failer = conf.getInstance(classOf[ActiveObjectFailer]) + val failer = ActiveObject.newInstance(classOf[ActiveObjectFailer]) try { stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) fail("should have thrown an exception")