clean up application structure

- introduce common parent for guardian and systemGuardian
- install deathWatch chain to ensure proper shutdown upon guardian’s
  death
- re-install stdout logger before detaching default loggers
- await app termination after running AkkaSpec tests (max 5sec, with
  warning logged if not stopped)
This commit is contained in:
Roland 2011-10-28 17:15:10 +02:00
parent 01d8b00a76
commit d1e0f411ef
7 changed files with 77 additions and 32 deletions

View file

@ -15,6 +15,10 @@ import akka.testkit.AkkaSpec
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class RestartStrategySpec extends AkkaSpec { class RestartStrategySpec extends AkkaSpec {
override def atStartup {
app.mainbus.publish(Mute(EventFilter[Exception]("Crashing...")))
}
object Ping object Ping
object Crash object Crash

View file

@ -5,8 +5,9 @@ package akka
import akka.config._ import akka.config._
import akka.actor._ import akka.actor._
import dispatch._ import akka.dispatch._
import event._ import akka.event._
import akka.util.duration._
import java.net.InetAddress import java.net.InetAddress
import com.eaio.uuid.UUID import com.eaio.uuid.UUID
import akka.dispatch.{ Dispatchers, Future } import akka.dispatch.{ Dispatchers, Future }
@ -105,8 +106,7 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor
val DispatcherThroughput = getInt("akka.actor.throughput", 5) val DispatcherThroughput = getInt("akka.actor.throughput", 5)
val DispatcherDefaultShutdown = getLong("akka.actor.dispatcher-shutdown-timeout"). val DispatcherDefaultShutdown = getLong("akka.actor.dispatcher-shutdown-timeout").
map(time Duration(time, DefaultTimeUnit)). map(time Duration(time, DefaultTimeUnit)).getOrElse(1 second)
getOrElse(Duration(1000, TimeUnit.MILLISECONDS))
val MailboxCapacity = getInt("akka.actor.default-dispatcher.mailbox-capacity", -1) val MailboxCapacity = getInt("akka.actor.default-dispatcher.mailbox-capacity", -1)
val MailboxPushTimeout = Duration(getInt("akka.actor.default-dispatcher.mailbox-push-timeout-time", 10), DefaultTimeUnit) val MailboxPushTimeout = Duration(getInt("akka.actor.default-dispatcher.mailbox-push-timeout-time", 10), DefaultTimeUnit)
val DispatcherThroughputDeadlineTime = Duration(getInt("akka.actor.throughput-deadline-time", -1), DefaultTimeUnit) val DispatcherThroughputDeadlineTime = Duration(getInt("akka.actor.throughput-deadline-time", -1), DefaultTimeUnit)
@ -181,38 +181,46 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor
// TODO think about memory consistency effects when doing funky stuff inside constructor // TODO think about memory consistency effects when doing funky stuff inside constructor
val provider: ActorRefProvider = reflective.createProvider val provider: ActorRefProvider = reflective.createProvider
// TODO make this configurable private class Guardian extends Actor {
protected[akka] val guardian: ActorRef = { def receive = {
import akka.actor.FaultHandlingStrategy._ case Terminated(_) context.self.stop()
provider.actorOf( }
Props(context { case _ }).withFaultHandler(OneForOneStrategy {
case _: ActorKilledException Stop
case _: ActorInitializationException Stop
case _: Exception Restart
}).withDispatcher(dispatcher),
provider.theOneWhoWalksTheBubblesOfSpaceTime,
"ApplicationSupervisor",
true)
} }
private class SystemGuardian extends Actor {
def receive = {
case Terminated(_)
mainbus.stopDefaultLoggers()
context.self.stop()
}
}
private val guardianFaultHandlingStrategy = {
import akka.actor.FaultHandlingStrategy._
OneForOneStrategy {
case _: ActorKilledException Stop
case _: ActorInitializationException Stop
case _: Exception Restart
}
}
private val guardianProps = Props(new Guardian).withFaultHandler(guardianFaultHandlingStrategy)
protected[akka] val systemGuardian: ActorRef = { private val guardianInChief: ActorRef =
import akka.actor.FaultHandlingStrategy._ provider.actorOf(guardianProps, provider.theOneWhoWalksTheBubblesOfSpaceTime, "GuardianInChief", true)
provider.actorOf(
Props(context { case _ }).withFaultHandler(OneForOneStrategy { protected[akka] val guardian: ActorRef =
case _: ActorKilledException Stop provider.actorOf(guardianProps, guardianInChief, "ApplicationSupervisor", true)
case _: ActorInitializationException Stop
case _: Exception Restart protected[akka] val systemGuardian: ActorRef =
}).withDispatcher(dispatcher), provider.actorOf(guardianProps.withCreator(new SystemGuardian), guardianInChief, "SystemSupervisor", true)
provider.theOneWhoWalksTheBubblesOfSpaceTime,
"SystemSupervisor",
true)
}
// TODO think about memory consistency effects when doing funky stuff inside constructor // TODO think about memory consistency effects when doing funky stuff inside constructor
val deadLetters = new DeadLetterActorRef(this) val deadLetters = new DeadLetterActorRef(this)
val deathWatch = provider.createDeathWatch() val deathWatch = provider.createDeathWatch()
// chain death watchers so that killing guardian stops the application
deathWatch.subscribe(systemGuardian, guardian)
deathWatch.subscribe(guardianInChief, systemGuardian)
// this starts the reaper actor and the user-configured logging subscribers, which are also actors // this starts the reaper actor and the user-configured logging subscribers, which are also actors
mainbus.start(this) mainbus.start(this)
mainbus.startDefaultLoggers(this, AkkaConfig) mainbus.startDefaultLoggers(this, AkkaConfig)
@ -232,7 +240,6 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor
// TODO shutdown all that other stuff, whatever that may be // TODO shutdown all that other stuff, whatever that may be
def stop(): Unit = { def stop(): Unit = {
guardian.stop() guardian.stop()
systemGuardian.stop()
} }
terminationFuture.onComplete(_ dispatcher.shutdown()) terminationFuture.onComplete(_ dispatcher.shutdown())

View file

@ -130,7 +130,7 @@ class DefaultScheduler extends Scheduler {
} }
} }
private[akka] def shutdown() { service.shutdown() } private[akka] def shutdown() { service.shutdownNow() }
} }
private object SchedulerThreadFactory extends ThreadFactory { private object SchedulerThreadFactory extends ThreadFactory {

View file

@ -107,7 +107,7 @@ class Dispatcher(
protected[akka] def shutdown { protected[akka] def shutdown {
val old = executorService.getAndSet(new LazyExecutorServiceWrapper(executorServiceFactory.createExecutorService)) val old = executorService.getAndSet(new LazyExecutorServiceWrapper(executorServiceFactory.createExecutorService))
if (old ne null) if (old ne null)
old.shutdown() old.shutdownNow()
} }
/** /**

View file

@ -88,6 +88,19 @@ trait LoggingBus extends ActorEventBus {
} }
} }
def stopDefaultLoggers() {
val level = _logLevel // volatile access before reading loggers
if (!(loggers contains StandardOutLogger)) {
AllLogLevels filter (level >= _) foreach (l subscribe(StandardOutLogger, classFor(l)))
publish(Info(this, "shutting down: StandardOutLogger started"))
}
for {
logger loggers
if logger != StandardOutLogger
} logger.stop()
publish(Info(this, "all default loggers stopped"))
}
private def addLogger(app: AkkaApplication, clazz: Class[_ <: Actor], level: LogLevel): ActorRef = { private def addLogger(app: AkkaApplication, clazz: Class[_ <: Actor], level: LogLevel): ActorRef = {
val actor = app.systemActorOf(Props(clazz), Props.randomAddress) val actor = app.systemActorOf(Props(clazz), Props.randomAddress)
actor ! InitializeLogger(this) actor ! InitializeLogger(this)

View file

@ -115,7 +115,7 @@ class Index[K, V](val mapSize: Int, val valueComparator: Comparator[V]) {
/** /**
* Returns the key set. * Returns the key set.
*/ */
def keys = scala.collection.JavaConversions.asScalaIterable(container.keySet) def keys = scala.collection.JavaConversions.collectionAsScalaIterable(container.keySet)
/** /**
* Disassociates the value of type V from the key of type K * Disassociates the value of type V from the key of type K

View file

@ -10,6 +10,8 @@ import akka.AkkaApplication
import akka.actor.{ Actor, ActorRef, Props } import akka.actor.{ Actor, ActorRef, Props }
import akka.dispatch.MessageDispatcher import akka.dispatch.MessageDispatcher
import akka.event.{ Logging, MainBusLogging } import akka.event.{ Logging, MainBusLogging }
import akka.util.duration._
import akka.dispatch.FutureTimeoutException
abstract class AkkaSpec(_application: AkkaApplication = AkkaApplication()) abstract class AkkaSpec(_application: AkkaApplication = AkkaApplication())
extends TestKit(_application) with WordSpec with MustMatchers with BeforeAndAfterAll { extends TestKit(_application) with WordSpec with MustMatchers with BeforeAndAfterAll {
@ -22,6 +24,9 @@ abstract class AkkaSpec(_application: AkkaApplication = AkkaApplication())
final override def afterAll { final override def afterAll {
app.stop() app.stop()
try app.terminationFuture.await(5 seconds) catch {
case _: FutureTimeoutException app.log.warning("failed to stop within 5 seconds")
}
atTermination() atTermination()
} }
@ -42,4 +47,20 @@ abstract class AkkaSpec(_application: AkkaApplication = AkkaApplication())
def spawn(body: Unit)(implicit dispatcher: MessageDispatcher) { def spawn(body: Unit)(implicit dispatcher: MessageDispatcher) {
actorOf(Props(ctx { case "go" try body finally ctx.self.stop() }).withDispatcher(dispatcher)) ! "go" actorOf(Props(ctx { case "go" try body finally ctx.self.stop() }).withDispatcher(dispatcher)) ! "go"
} }
}
class AkkaSpecSpec extends WordSpec with MustMatchers {
"An AkkaSpec" must {
"terminate all actors" in {
import AkkaApplication.defaultConfig
val app = AkkaApplication("test", defaultConfig ++ Configuration(
"akka.actor.debug.lifecycle" -> true, "akka.loglevel" -> "DEBUG"))
val spec = new AkkaSpec(app) {
val ref = Seq(testActor, app.actorOf(Props.empty, "name"))
}
spec.ref foreach (_ must not be 'shutdown)
app.stop()
spec.awaitCond(spec.ref forall (_.isShutdown), 2 seconds)
}
}
} }