add the ability to configure a handler for MaximumNumberOfRestartsWithinTimeRangeReached to declarative Supervision

This commit is contained in:
ticktock 2011-04-13 21:09:25 -07:00
parent 414122bf6c
commit 41ef2843b1
2 changed files with 13 additions and 13 deletions

View file

@ -4,7 +4,6 @@
package akka.actor package akka.actor
import akka.config.Supervision._
import akka.AkkaException import akka.AkkaException
import akka.util._ import akka.util._
import ReflectiveAccess._ import ReflectiveAccess._
@ -81,7 +80,7 @@ case class SupervisorFactory(val config: SupervisorConfig) {
def newInstance: Supervisor = newInstanceFor(config) def newInstance: Supervisor = newInstanceFor(config)
def newInstanceFor(config: SupervisorConfig): Supervisor = { def newInstanceFor(config: SupervisorConfig): Supervisor = {
val supervisor = new Supervisor(config.restartStrategy) val supervisor = new Supervisor(config.restartStrategy, config.maxRestartsHandler)
supervisor.configure(config) supervisor.configure(config)
supervisor.start supervisor.start
supervisor supervisor
@ -100,13 +99,13 @@ case class SupervisorFactory(val config: SupervisorConfig) {
* *
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
sealed class Supervisor(handler: FaultHandlingStrategy) { sealed class Supervisor(handler: FaultHandlingStrategy, maxRestartsHandler: MaximumNumberOfRestartsWithinTimeRangeReached => Unit) {
import Supervisor._ import Supervisor._
private val _childActors = new ConcurrentHashMap[String, List[ActorRef]] private val _childActors = new ConcurrentHashMap[String, List[ActorRef]]
private val _childSupervisors = new CopyOnWriteArrayList[Supervisor] private val _childSupervisors = new CopyOnWriteArrayList[Supervisor]
private[akka] val supervisor = actorOf(new SupervisorActor(handler)).start() private[akka] val supervisor = actorOf(new SupervisorActor(handler,maxRestartsHandler)).start()
def uuid = supervisor.uuid def uuid = supervisor.uuid
@ -127,7 +126,8 @@ sealed class Supervisor(handler: FaultHandlingStrategy) {
_childActors.values.toArray.toList.asInstanceOf[List[Supervisor]] _childActors.values.toArray.toList.asInstanceOf[List[Supervisor]]
def configure(config: SupervisorConfig): Unit = config match { def configure(config: SupervisorConfig): Unit = config match {
case SupervisorConfig(_, servers) => case SupervisorConfig(_, servers, _) =>
servers.map(server => servers.map(server =>
server match { server match {
case Supervise(actorRef, lifeCycle, registerAsRemoteService) => case Supervise(actorRef, lifeCycle, registerAsRemoteService) =>
@ -143,7 +143,7 @@ sealed class Supervisor(handler: FaultHandlingStrategy) {
supervisor.link(actorRef) supervisor.link(actorRef)
if (registerAsRemoteService) if (registerAsRemoteService)
Actor.remote.register(actorRef) Actor.remote.register(actorRef)
case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration case supervisorConfig @ SupervisorConfig(_, _,_) => // recursive supervisor configuration
val childSupervisor = Supervisor(supervisorConfig) val childSupervisor = Supervisor(supervisorConfig)
supervisor.link(childSupervisor.supervisor) supervisor.link(childSupervisor.supervisor)
_childSupervisors.add(childSupervisor) _childSupervisors.add(childSupervisor)
@ -156,9 +156,10 @@ sealed class Supervisor(handler: FaultHandlingStrategy) {
* *
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
final class SupervisorActor private[akka] (handler: FaultHandlingStrategy) extends Actor { final class SupervisorActor private[akka] (handler: FaultHandlingStrategy, maxRestartsHandler: MaximumNumberOfRestartsWithinTimeRangeReached => Unit) extends Actor {
self.faultHandler = handler self.faultHandler = handler
override def postStop(): Unit = { override def postStop(): Unit = {
val i = self.linkedActors.values.iterator val i = self.linkedActors.values.iterator
while(i.hasNext) { while(i.hasNext) {
@ -169,11 +170,8 @@ final class SupervisorActor private[akka] (handler: FaultHandlingStrategy) exten
} }
def receive = { def receive = {
// FIXME add a way to respond to MaximumNumberOfRestartsWithinTimeRangeReached in declaratively configured Supervisor case max@MaximumNumberOfRestartsWithinTimeRangeReached(_,_,_,_) => maxRestartsHandler(max)
case MaximumNumberOfRestartsWithinTimeRangeReached(
victim, maxNrOfRetries, withinTimeRange, lastExceptionCausingRestart) =>
case unknown => throw new SupervisorException( case unknown => throw new SupervisorException(
"SupervisorActor can not respond to messages.\n\tUnknown message [" + unknown + "]") "SupervisorActor can not respond to messages.\n\tUnknown message [" + unknown + "]")
} }
} }

View file

@ -4,8 +4,9 @@
package akka.config package akka.config
import akka.actor.{ActorRef}
import akka.dispatch.MessageDispatcher import akka.dispatch.MessageDispatcher
import akka.actor.{MaximumNumberOfRestartsWithinTimeRangeReached, ActorRef}
import akka.japi.Procedure
case class RemoteAddress(val hostname: String, val port: Int) case class RemoteAddress(val hostname: String, val port: Int)
@ -21,9 +22,10 @@ object Supervision {
sealed abstract class LifeCycle extends ConfigElement sealed abstract class LifeCycle extends ConfigElement
sealed abstract class FaultHandlingStrategy(val trapExit: List[Class[_ <: Throwable]]) extends ConfigElement sealed abstract class FaultHandlingStrategy(val trapExit: List[Class[_ <: Throwable]]) extends ConfigElement
case class SupervisorConfig(restartStrategy: FaultHandlingStrategy, worker: List[Server]) extends Server { case class SupervisorConfig(restartStrategy: FaultHandlingStrategy, worker: List[Server], maxRestartsHandler: MaximumNumberOfRestartsWithinTimeRangeReached => Unit = {max=>()}) extends Server {
//Java API //Java API
def this(restartStrategy: FaultHandlingStrategy, worker: Array[Server]) = this(restartStrategy,worker.toList) def this(restartStrategy: FaultHandlingStrategy, worker: Array[Server]) = this(restartStrategy,worker.toList)
def this(restartStrategy: FaultHandlingStrategy, worker: Array[Server], restartHandler:Procedure[MaximumNumberOfRestartsWithinTimeRangeReached]) = this(restartStrategy,worker.toList, {max=>restartHandler.apply(max)})
} }
class Supervise(val actorRef: ActorRef, val lifeCycle: LifeCycle, val registerAsRemoteService: Boolean = false) extends Server { class Supervise(val actorRef: ActorRef, val lifeCycle: LifeCycle, val registerAsRemoteService: Boolean = false) extends Server {