diff --git a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala index 6021ecf8cc..459e479ebe 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala @@ -15,6 +15,10 @@ import akka.testkit.AkkaSpec @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class RestartStrategySpec extends AkkaSpec { + override def atStartup { + app.mainbus.publish(Mute(EventFilter[Exception]("Crashing..."))) + } + object Ping object Crash diff --git a/akka-actor/src/main/scala/akka/AkkaApplication.scala b/akka-actor/src/main/scala/akka/AkkaApplication.scala index 837960229d..a47a375e21 100644 --- a/akka-actor/src/main/scala/akka/AkkaApplication.scala +++ b/akka-actor/src/main/scala/akka/AkkaApplication.scala @@ -5,8 +5,9 @@ package akka import akka.config._ import akka.actor._ -import dispatch._ -import event._ +import akka.dispatch._ +import akka.event._ +import akka.util.duration._ import java.net.InetAddress import com.eaio.uuid.UUID import akka.dispatch.{ Dispatchers, Future } @@ -105,8 +106,7 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor val DispatcherThroughput = getInt("akka.actor.throughput", 5) val DispatcherDefaultShutdown = getLong("akka.actor.dispatcher-shutdown-timeout"). - map(time ⇒ Duration(time, DefaultTimeUnit)). - getOrElse(Duration(1000, TimeUnit.MILLISECONDS)) + map(time ⇒ Duration(time, DefaultTimeUnit)).getOrElse(1 second) val MailboxCapacity = getInt("akka.actor.default-dispatcher.mailbox-capacity", -1) val MailboxPushTimeout = Duration(getInt("akka.actor.default-dispatcher.mailbox-push-timeout-time", 10), DefaultTimeUnit) val DispatcherThroughputDeadlineTime = Duration(getInt("akka.actor.throughput-deadline-time", -1), DefaultTimeUnit) @@ -181,38 +181,46 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor // TODO think about memory consistency effects when doing funky stuff inside constructor val provider: ActorRefProvider = reflective.createProvider - // TODO make this configurable - protected[akka] val guardian: ActorRef = { - import akka.actor.FaultHandlingStrategy._ - provider.actorOf( - Props(context ⇒ { case _ ⇒ }).withFaultHandler(OneForOneStrategy { - case _: ActorKilledException ⇒ Stop - case _: ActorInitializationException ⇒ Stop - case _: Exception ⇒ Restart - }).withDispatcher(dispatcher), - provider.theOneWhoWalksTheBubblesOfSpaceTime, - "ApplicationSupervisor", - true) + private class Guardian extends Actor { + def receive = { + case Terminated(_) ⇒ context.self.stop() + } } + private class SystemGuardian extends Actor { + def receive = { + case Terminated(_) ⇒ + mainbus.stopDefaultLoggers() + context.self.stop() + } + } + private val guardianFaultHandlingStrategy = { + import akka.actor.FaultHandlingStrategy._ + OneForOneStrategy { + case _: ActorKilledException ⇒ Stop + case _: ActorInitializationException ⇒ Stop + case _: Exception ⇒ Restart + } + } + private val guardianProps = Props(new Guardian).withFaultHandler(guardianFaultHandlingStrategy) - protected[akka] val systemGuardian: ActorRef = { - import akka.actor.FaultHandlingStrategy._ - provider.actorOf( - Props(context ⇒ { case _ ⇒ }).withFaultHandler(OneForOneStrategy { - case _: ActorKilledException ⇒ Stop - case _: ActorInitializationException ⇒ Stop - case _: Exception ⇒ Restart - }).withDispatcher(dispatcher), - provider.theOneWhoWalksTheBubblesOfSpaceTime, - "SystemSupervisor", - true) - } + private val guardianInChief: ActorRef = + provider.actorOf(guardianProps, provider.theOneWhoWalksTheBubblesOfSpaceTime, "GuardianInChief", true) + + protected[akka] val guardian: ActorRef = + provider.actorOf(guardianProps, guardianInChief, "ApplicationSupervisor", true) + + protected[akka] val systemGuardian: ActorRef = + provider.actorOf(guardianProps.withCreator(new SystemGuardian), guardianInChief, "SystemSupervisor", true) // TODO think about memory consistency effects when doing funky stuff inside constructor val deadLetters = new DeadLetterActorRef(this) val deathWatch = provider.createDeathWatch() + // chain death watchers so that killing guardian stops the application + deathWatch.subscribe(systemGuardian, guardian) + deathWatch.subscribe(guardianInChief, systemGuardian) + // this starts the reaper actor and the user-configured logging subscribers, which are also actors mainbus.start(this) mainbus.startDefaultLoggers(this, AkkaConfig) @@ -232,7 +240,6 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor // TODO shutdown all that other stuff, whatever that may be def stop(): Unit = { guardian.stop() - systemGuardian.stop() } terminationFuture.onComplete(_ ⇒ dispatcher.shutdown()) diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index 5d64eec5f1..ae9b86695d 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -130,7 +130,7 @@ class DefaultScheduler extends Scheduler { } } - private[akka] def shutdown() { service.shutdown() } + private[akka] def shutdown() { service.shutdownNow() } } private object SchedulerThreadFactory extends ThreadFactory { diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index 3e9411d593..f288150265 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -107,7 +107,7 @@ class Dispatcher( protected[akka] def shutdown { val old = executorService.getAndSet(new LazyExecutorServiceWrapper(executorServiceFactory.createExecutorService)) if (old ne null) - old.shutdown() + old.shutdownNow() } /** diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index 59932879f1..82a4df5d00 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -88,6 +88,19 @@ trait LoggingBus extends ActorEventBus { } } + def stopDefaultLoggers() { + val level = _logLevel // volatile access before reading loggers + if (!(loggers contains StandardOutLogger)) { + AllLogLevels filter (level >= _) foreach (l ⇒ subscribe(StandardOutLogger, classFor(l))) + publish(Info(this, "shutting down: StandardOutLogger started")) + } + for { + logger ← loggers + if logger != StandardOutLogger + } logger.stop() + publish(Info(this, "all default loggers stopped")) + } + private def addLogger(app: AkkaApplication, clazz: Class[_ <: Actor], level: LogLevel): ActorRef = { val actor = app.systemActorOf(Props(clazz), Props.randomAddress) actor ! InitializeLogger(this) diff --git a/akka-actor/src/main/scala/akka/util/Index.scala b/akka-actor/src/main/scala/akka/util/Index.scala index f79ea921dd..f91c20228c 100644 --- a/akka-actor/src/main/scala/akka/util/Index.scala +++ b/akka-actor/src/main/scala/akka/util/Index.scala @@ -115,7 +115,7 @@ class Index[K, V](val mapSize: Int, val valueComparator: Comparator[V]) { /** * Returns the key set. */ - def keys = scala.collection.JavaConversions.asScalaIterable(container.keySet) + def keys = scala.collection.JavaConversions.collectionAsScalaIterable(container.keySet) /** * Disassociates the value of type V from the key of type K diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index 22f114b109..4264427e0d 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -10,6 +10,8 @@ import akka.AkkaApplication import akka.actor.{ Actor, ActorRef, Props } import akka.dispatch.MessageDispatcher import akka.event.{ Logging, MainBusLogging } +import akka.util.duration._ +import akka.dispatch.FutureTimeoutException abstract class AkkaSpec(_application: AkkaApplication = AkkaApplication()) extends TestKit(_application) with WordSpec with MustMatchers with BeforeAndAfterAll { @@ -22,6 +24,9 @@ abstract class AkkaSpec(_application: AkkaApplication = AkkaApplication()) final override def afterAll { app.stop() + try app.terminationFuture.await(5 seconds) catch { + case _: FutureTimeoutException ⇒ app.log.warning("failed to stop within 5 seconds") + } atTermination() } @@ -42,4 +47,20 @@ abstract class AkkaSpec(_application: AkkaApplication = AkkaApplication()) def spawn(body: ⇒ Unit)(implicit dispatcher: MessageDispatcher) { actorOf(Props(ctx ⇒ { case "go" ⇒ try body finally ctx.self.stop() }).withDispatcher(dispatcher)) ! "go" } +} + +class AkkaSpecSpec extends WordSpec with MustMatchers { + "An AkkaSpec" must { + "terminate all actors" in { + import AkkaApplication.defaultConfig + val app = AkkaApplication("test", defaultConfig ++ Configuration( + "akka.actor.debug.lifecycle" -> true, "akka.loglevel" -> "DEBUG")) + val spec = new AkkaSpec(app) { + val ref = Seq(testActor, app.actorOf(Props.empty, "name")) + } + spec.ref foreach (_ must not be 'shutdown) + app.stop() + spec.awaitCond(spec.ref forall (_.isShutdown), 2 seconds) + } + } } \ No newline at end of file