diff --git a/akka-actor/src/main/scala/akka/actor/Supervisor.scala b/akka-actor/src/main/scala/akka/actor/Supervisor.scala index 22abafaccc..388444ea3b 100644 --- a/akka-actor/src/main/scala/akka/actor/Supervisor.scala +++ b/akka-actor/src/main/scala/akka/actor/Supervisor.scala @@ -4,7 +4,6 @@ package akka.actor -import akka.config.Supervision._ import akka.AkkaException import akka.util._ import ReflectiveAccess._ @@ -81,7 +80,7 @@ case class SupervisorFactory(val config: SupervisorConfig) { def newInstance: Supervisor = newInstanceFor(config) def newInstanceFor(config: SupervisorConfig): Supervisor = { - val supervisor = new Supervisor(config.restartStrategy) + val supervisor = new Supervisor(config.restartStrategy, config.maxRestartsHandler) supervisor.configure(config) supervisor.start supervisor @@ -100,13 +99,13 @@ case class SupervisorFactory(val config: SupervisorConfig) { * * @author Jonas Bonér */ -sealed class Supervisor(handler: FaultHandlingStrategy) { +sealed class Supervisor(handler: FaultHandlingStrategy, maxRestartsHandler: MaximumNumberOfRestartsWithinTimeRangeReached => Unit) { import Supervisor._ private val _childActors = new ConcurrentHashMap[String, List[ActorRef]] private val _childSupervisors = new CopyOnWriteArrayList[Supervisor] - private[akka] val supervisor = actorOf(new SupervisorActor(handler)).start() + private[akka] val supervisor = actorOf(new SupervisorActor(handler,maxRestartsHandler)).start() def uuid = supervisor.uuid @@ -127,7 +126,8 @@ sealed class Supervisor(handler: FaultHandlingStrategy) { _childActors.values.toArray.toList.asInstanceOf[List[Supervisor]] def configure(config: SupervisorConfig): Unit = config match { - case SupervisorConfig(_, servers) => + case SupervisorConfig(_, servers, _) => + servers.map(server => server match { case Supervise(actorRef, lifeCycle, registerAsRemoteService) => @@ -143,7 +143,7 @@ sealed class Supervisor(handler: FaultHandlingStrategy) { supervisor.link(actorRef) if (registerAsRemoteService) Actor.remote.register(actorRef) - case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration + case supervisorConfig @ SupervisorConfig(_, _,_) => // recursive supervisor configuration val childSupervisor = Supervisor(supervisorConfig) supervisor.link(childSupervisor.supervisor) _childSupervisors.add(childSupervisor) @@ -156,9 +156,10 @@ sealed class Supervisor(handler: FaultHandlingStrategy) { * * @author Jonas Bonér */ -final class SupervisorActor private[akka] (handler: FaultHandlingStrategy) extends Actor { +final class SupervisorActor private[akka] (handler: FaultHandlingStrategy, maxRestartsHandler: MaximumNumberOfRestartsWithinTimeRangeReached => Unit) extends Actor { self.faultHandler = handler + override def postStop(): Unit = { val i = self.linkedActors.values.iterator while(i.hasNext) { @@ -169,11 +170,8 @@ final class SupervisorActor private[akka] (handler: FaultHandlingStrategy) exten } def receive = { - // FIXME add a way to respond to MaximumNumberOfRestartsWithinTimeRangeReached in declaratively configured Supervisor - case MaximumNumberOfRestartsWithinTimeRangeReached( - victim, maxNrOfRetries, withinTimeRange, lastExceptionCausingRestart) => + case max@MaximumNumberOfRestartsWithinTimeRangeReached(_,_,_,_) => maxRestartsHandler(max) case unknown => throw new SupervisorException( "SupervisorActor can not respond to messages.\n\tUnknown message [" + unknown + "]") } } - diff --git a/akka-actor/src/main/scala/akka/config/SupervisionConfig.scala b/akka-actor/src/main/scala/akka/config/SupervisionConfig.scala index 9f63c64bc1..da06c7aa44 100644 --- a/akka-actor/src/main/scala/akka/config/SupervisionConfig.scala +++ b/akka-actor/src/main/scala/akka/config/SupervisionConfig.scala @@ -4,8 +4,9 @@ package akka.config -import akka.actor.{ActorRef} import akka.dispatch.MessageDispatcher +import akka.actor.{MaximumNumberOfRestartsWithinTimeRangeReached, ActorRef} +import akka.japi.Procedure case class RemoteAddress(val hostname: String, val port: Int) @@ -21,9 +22,10 @@ object Supervision { sealed abstract class LifeCycle extends ConfigElement sealed abstract class FaultHandlingStrategy(val trapExit: List[Class[_ <: Throwable]]) extends ConfigElement - case class SupervisorConfig(restartStrategy: FaultHandlingStrategy, worker: List[Server]) extends Server { + case class SupervisorConfig(restartStrategy: FaultHandlingStrategy, worker: List[Server], maxRestartsHandler: MaximumNumberOfRestartsWithinTimeRangeReached => Unit = {max=>()}) extends Server { //Java API def this(restartStrategy: FaultHandlingStrategy, worker: Array[Server]) = this(restartStrategy,worker.toList) + def this(restartStrategy: FaultHandlingStrategy, worker: Array[Server], restartHandler:Procedure[MaximumNumberOfRestartsWithinTimeRangeReached]) = this(restartStrategy,worker.toList, {max=>restartHandler.apply(max)}) } class Supervise(val actorRef: ActorRef, val lifeCycle: LifeCycle, val registerAsRemoteService: Boolean = false) extends Server {