Refining Supervision API and remove AllForOne, OneForOne and replace with AllForOneStrategy, OneForOneStrategy etc
This commit is contained in:
parent
3af056f137
commit
470cd00ca6
23 changed files with 123 additions and 129 deletions
|
|
@ -81,11 +81,7 @@ object SupervisorFactory {
|
||||||
|
|
||||||
private[akka] def retrieveFaultHandlerAndTrapExitsFrom(config: SupervisorConfig): FaultHandlingStrategy =
|
private[akka] def retrieveFaultHandlerAndTrapExitsFrom(config: SupervisorConfig): FaultHandlingStrategy =
|
||||||
config match {
|
config match {
|
||||||
case SupervisorConfig(RestartStrategy(scheme, maxNrOfRetries, timeRange, trapExceptions), _) =>
|
case SupervisorConfig(faultHandler, _) => faultHandler
|
||||||
scheme match {
|
|
||||||
case a:AllForOne => AllForOneStrategy(trapExceptions.toList,maxNrOfRetries, timeRange)
|
|
||||||
case o:OneForOne => OneForOneStrategy(trapExceptions.toList,maxNrOfRetries, timeRange)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -4,12 +4,12 @@
|
||||||
|
|
||||||
package se.scalablesolutions.akka.config
|
package se.scalablesolutions.akka.config
|
||||||
|
|
||||||
import se.scalablesolutions.akka.config.Supervision. {SuperviseTypedActor, RestartStrategy}
|
import se.scalablesolutions.akka.config.Supervision. {SuperviseTypedActor, FaultHandlingStrategy}
|
||||||
|
|
||||||
private[akka] trait TypedActorConfiguratorBase {
|
private[akka] trait TypedActorConfiguratorBase {
|
||||||
def getExternalDependency[T](clazz: Class[T]): T
|
def getExternalDependency[T](clazz: Class[T]): T
|
||||||
|
|
||||||
def configure(restartStrategy: RestartStrategy, components: List[SuperviseTypedActor]): TypedActorConfiguratorBase
|
def configure(restartStrategy: FaultHandlingStrategy, components: List[SuperviseTypedActor]): TypedActorConfiguratorBase
|
||||||
|
|
||||||
def inject: TypedActorConfiguratorBase
|
def inject: TypedActorConfiguratorBase
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -18,34 +18,71 @@ object Supervision {
|
||||||
sealed abstract class ConfigElement
|
sealed abstract class ConfigElement
|
||||||
|
|
||||||
abstract class Server extends ConfigElement
|
abstract class Server extends ConfigElement
|
||||||
abstract class FailOverScheme extends ConfigElement
|
|
||||||
sealed abstract class LifeCycle extends ConfigElement
|
sealed abstract class LifeCycle extends ConfigElement
|
||||||
|
sealed abstract class FaultHandlingStrategy(val trapExit: List[Class[_ <: Throwable]]) extends ConfigElement
|
||||||
|
|
||||||
case class SupervisorConfig(restartStrategy: RestartStrategy, worker: List[Server]) extends Server {
|
case class SupervisorConfig(restartStrategy: FaultHandlingStrategy, worker: List[Server]) extends Server {
|
||||||
//Java API
|
//Java API
|
||||||
def this(restartStrategy: RestartStrategy, worker: Array[Server]) = this(restartStrategy,worker.toList)
|
def this(restartStrategy: FaultHandlingStrategy, worker: Array[Server]) = this(restartStrategy,worker.toList)
|
||||||
}
|
}
|
||||||
|
|
||||||
class Supervise(val actorRef: ActorRef, val lifeCycle: LifeCycle, _remoteAddress: RemoteAddress) extends Server {
|
class Supervise(val actorRef: ActorRef, val lifeCycle: LifeCycle, val remoteAddress: Option[RemoteAddress]) extends Server {
|
||||||
val remoteAddress: Option[RemoteAddress] = Option(_remoteAddress)
|
//Java API
|
||||||
|
def this(actorRef: ActorRef, lifeCycle: LifeCycle, remoteAddress: RemoteAddress) =
|
||||||
|
this(actorRef, lifeCycle, Option(remoteAddress))
|
||||||
}
|
}
|
||||||
|
|
||||||
object Supervise {
|
object Supervise {
|
||||||
def apply(actorRef: ActorRef, lifeCycle: LifeCycle, remoteAddress: RemoteAddress) = new Supervise(actorRef, lifeCycle, remoteAddress)
|
def apply(actorRef: ActorRef, lifeCycle: LifeCycle, remoteAddress: RemoteAddress) = new Supervise(actorRef, lifeCycle, remoteAddress)
|
||||||
def apply(actorRef: ActorRef, lifeCycle: LifeCycle) = new Supervise(actorRef, lifeCycle, null)
|
def apply(actorRef: ActorRef, lifeCycle: LifeCycle) = new Supervise(actorRef, lifeCycle, None)
|
||||||
def unapply(supervise: Supervise) = Some((supervise.actorRef, supervise.lifeCycle, supervise.remoteAddress))
|
def unapply(supervise: Supervise) = Some((supervise.actorRef, supervise.lifeCycle, supervise.remoteAddress))
|
||||||
}
|
}
|
||||||
|
|
||||||
case class RestartStrategy(scheme: FailOverScheme, maxNrOfRetries: Int, withinTimeRange: Int, trapExceptions: Array[Class[_ <: Throwable]]) extends ConfigElement
|
object AllForOneStrategy {
|
||||||
|
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int): AllForOneStrategy =
|
||||||
object RestartStrategy {
|
new AllForOneStrategy(trapExit,
|
||||||
def apply(scheme: FailOverScheme, maxNrOfRetries: Int, withinTimeRange: Int, trapExceptions: List[Class[_ <: Throwable]]) =
|
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange))
|
||||||
new RestartStrategy(scheme,maxNrOfRetries,withinTimeRange,trapExceptions.toArray)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//Java API
|
case class AllForOneStrategy(override val trapExit: List[Class[_ <: Throwable]],
|
||||||
class AllForOne extends FailOverScheme
|
maxNrOfRetries: Option[Int] = None,
|
||||||
class OneForOne extends FailOverScheme
|
withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy(trapExit) {
|
||||||
|
def this(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) =
|
||||||
|
this(trapExit,
|
||||||
|
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange))
|
||||||
|
|
||||||
|
def this(trapExit: Array[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) =
|
||||||
|
this(trapExit.toList,
|
||||||
|
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange))
|
||||||
|
|
||||||
|
def this(trapExit: java.util.List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) =
|
||||||
|
this(trapExit.toArray.toList.asInstanceOf[List[Class[_ <: Throwable]]],
|
||||||
|
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange))
|
||||||
|
}
|
||||||
|
|
||||||
|
object OneForOneStrategy {
|
||||||
|
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int): OneForOneStrategy =
|
||||||
|
new OneForOneStrategy(trapExit,
|
||||||
|
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange))
|
||||||
|
}
|
||||||
|
|
||||||
|
case class OneForOneStrategy(override val trapExit: List[Class[_ <: Throwable]],
|
||||||
|
maxNrOfRetries: Option[Int] = None,
|
||||||
|
withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy(trapExit) {
|
||||||
|
def this(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) =
|
||||||
|
this(trapExit,
|
||||||
|
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange))
|
||||||
|
|
||||||
|
def this(trapExit: Array[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) =
|
||||||
|
this(trapExit.toList,
|
||||||
|
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange))
|
||||||
|
|
||||||
|
def this(trapExit: java.util.List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) =
|
||||||
|
this(trapExit.toArray.toList.asInstanceOf[List[Class[_ <: Throwable]]],
|
||||||
|
if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange))
|
||||||
|
}
|
||||||
|
|
||||||
|
case object NoFaultHandlingStrategy extends FaultHandlingStrategy(Nil)
|
||||||
|
|
||||||
//Scala API
|
//Scala API
|
||||||
case object Permanent extends LifeCycle
|
case object Permanent extends LifeCycle
|
||||||
|
|
@ -57,9 +94,8 @@ object Supervision {
|
||||||
def temporary() = Temporary
|
def temporary() = Temporary
|
||||||
def undefinedLifeCycle = UndefinedLifeCycle
|
def undefinedLifeCycle = UndefinedLifeCycle
|
||||||
|
|
||||||
//Scala API
|
//Java API
|
||||||
object AllForOne extends AllForOne { def apply() = this }
|
def noFaultHandlingStrategy = NoFaultHandlingStrategy
|
||||||
object OneForOne extends OneForOne { def apply() = this }
|
|
||||||
|
|
||||||
case class SuperviseTypedActor(_intf: Class[_],
|
case class SuperviseTypedActor(_intf: Class[_],
|
||||||
val target: Class[_],
|
val target: Class[_],
|
||||||
|
|
@ -118,44 +154,4 @@ object Supervision {
|
||||||
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, remoteAddress: RemoteAddress) =
|
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, remoteAddress: RemoteAddress) =
|
||||||
this(null: Class[_], target, lifeCycle, timeout, transactionRequired, dispatcher, remoteAddress)
|
this(null: Class[_], target, lifeCycle, timeout, transactionRequired, dispatcher, remoteAddress)
|
||||||
}
|
}
|
||||||
|
|
||||||
sealed abstract class FaultHandlingStrategy {
|
|
||||||
def trapExit: List[Class[_ <: Throwable]]
|
|
||||||
}
|
|
||||||
|
|
||||||
object AllForOneStrategy {
|
|
||||||
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) =
|
|
||||||
new AllForOneStrategy(trapExit, if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange))
|
|
||||||
def apply(trapExit: Array[Class[Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) =
|
|
||||||
new AllForOneStrategy(trapExit.toList,maxNrOfRetries,withinTimeRange)
|
|
||||||
}
|
|
||||||
|
|
||||||
case class AllForOneStrategy(trapExit: List[Class[_ <: Throwable]],
|
|
||||||
maxNrOfRetries: Option[Int] = None,
|
|
||||||
withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy {
|
|
||||||
def this(trapExit: List[Class[_ <: Throwable]],maxNrOfRetries: Int, withinTimeRange: Int) =
|
|
||||||
this(trapExit, if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange))
|
|
||||||
def this(trapExit: Array[Class[Throwable]],maxNrOfRetries: Int, withinTimeRange: Int) =
|
|
||||||
this(trapExit.toList,maxNrOfRetries,withinTimeRange)
|
|
||||||
}
|
|
||||||
|
|
||||||
object OneForOneStrategy {
|
|
||||||
def apply(trapExit: List[Class[_ <: Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) =
|
|
||||||
new OneForOneStrategy(trapExit, if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange))
|
|
||||||
def apply(trapExit: Array[Class[Throwable]], maxNrOfRetries: Int, withinTimeRange: Int) =
|
|
||||||
new OneForOneStrategy(trapExit.toList,maxNrOfRetries,withinTimeRange)
|
|
||||||
}
|
|
||||||
|
|
||||||
case class OneForOneStrategy(trapExit: List[Class[_ <: Throwable]],
|
|
||||||
maxNrOfRetries: Option[Int] = None,
|
|
||||||
withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy {
|
|
||||||
def this(trapExit: List[Class[_ <: Throwable]],maxNrOfRetries: Int, withinTimeRange: Int) =
|
|
||||||
this(trapExit, if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), if (withinTimeRange < 0) None else Some(withinTimeRange))
|
|
||||||
def this(trapExit: Array[Class[Throwable]],maxNrOfRetries: Int, withinTimeRange: Int) =
|
|
||||||
this(trapExit.toList,maxNrOfRetries,withinTimeRange)
|
|
||||||
}
|
|
||||||
|
|
||||||
case object NoFaultHandlingStrategy extends FaultHandlingStrategy {
|
|
||||||
def trapExit: List[Class[_ <: Throwable]] = Nil
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
@ -3,6 +3,7 @@ package se.scalablesolutions.akka.config;
|
||||||
import se.scalablesolutions.akka.actor.ActorRef;
|
import se.scalablesolutions.akka.actor.ActorRef;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import static se.scalablesolutions.akka.config.Supervision.*;
|
import static se.scalablesolutions.akka.config.Supervision.*;
|
||||||
|
|
@ -15,6 +16,6 @@ public class SupervisionConfig {
|
||||||
targets.add(new Supervise(ref, permanent(), new RemoteAddress("localhost",9999)));
|
targets.add(new Supervise(ref, permanent(), new RemoteAddress("localhost",9999)));
|
||||||
}
|
}
|
||||||
|
|
||||||
return new SupervisorConfig(new RestartStrategy(new OneForOne(),50,1000,new Class[]{ Exception.class}), targets.toArray(new Server[0]));
|
return new SupervisorConfig(new OneForOneStrategy(new Class[] { Exception.class },50,1000), targets.toArray(new Server[0]));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,7 @@ package se.scalablesolutions.akka.actor
|
||||||
import org.scalatest.WordSpec
|
import org.scalatest.WordSpec
|
||||||
import org.scalatest.matchers.MustMatchers
|
import org.scalatest.matchers.MustMatchers
|
||||||
import se.scalablesolutions.akka.dispatch.Dispatchers
|
import se.scalablesolutions.akka.dispatch.Dispatchers
|
||||||
import se.scalablesolutions.akka.config.Supervision.{RestartStrategy, SupervisorConfig, OneForOne, Supervise, Permanent}
|
import se.scalablesolutions.akka.config.Supervision.{SupervisorConfig, OneForOneStrategy, Supervise, Permanent}
|
||||||
import java.util.concurrent.CountDownLatch
|
import java.util.concurrent.CountDownLatch
|
||||||
|
|
||||||
class SupervisorMiscSpec extends WordSpec with MustMatchers {
|
class SupervisorMiscSpec extends WordSpec with MustMatchers {
|
||||||
|
|
@ -57,7 +57,7 @@ class SupervisorMiscSpec extends WordSpec with MustMatchers {
|
||||||
|
|
||||||
val sup = Supervisor(
|
val sup = Supervisor(
|
||||||
SupervisorConfig(
|
SupervisorConfig(
|
||||||
RestartStrategy(new OneForOne, 3, 5000, List(classOf[Exception])),
|
OneForOneStrategy(List(classOf[Exception]),3, 5000),
|
||||||
Supervise(actor1, Permanent) ::
|
Supervise(actor1, Permanent) ::
|
||||||
Supervise(actor2, Permanent) ::
|
Supervise(actor2, Permanent) ::
|
||||||
Supervise(actor3, Permanent) ::
|
Supervise(actor3, Permanent) ::
|
||||||
|
|
|
||||||
|
|
@ -501,7 +501,7 @@ class SupervisorSpec extends JUnitSuite {
|
||||||
|
|
||||||
Supervisor(
|
Supervisor(
|
||||||
SupervisorConfig(
|
SupervisorConfig(
|
||||||
RestartStrategy(AllForOne, 3, 5000, List(classOf[Exception])),
|
AllForOneStrategy(List(classOf[Exception]), 3, 5000),
|
||||||
Supervise(
|
Supervise(
|
||||||
temporaryActor,
|
temporaryActor,
|
||||||
Temporary)
|
Temporary)
|
||||||
|
|
@ -513,7 +513,7 @@ class SupervisorSpec extends JUnitSuite {
|
||||||
|
|
||||||
Supervisor(
|
Supervisor(
|
||||||
SupervisorConfig(
|
SupervisorConfig(
|
||||||
RestartStrategy(AllForOne, 3, 5000, List(classOf[Exception])),
|
AllForOneStrategy(List(classOf[Exception]), 3, 5000),
|
||||||
Supervise(
|
Supervise(
|
||||||
pingpong1,
|
pingpong1,
|
||||||
Permanent)
|
Permanent)
|
||||||
|
|
@ -525,7 +525,7 @@ class SupervisorSpec extends JUnitSuite {
|
||||||
|
|
||||||
Supervisor(
|
Supervisor(
|
||||||
SupervisorConfig(
|
SupervisorConfig(
|
||||||
RestartStrategy(OneForOne, 3, 5000, List(classOf[Exception])),
|
OneForOneStrategy(List(classOf[Exception]), 3, 5000),
|
||||||
Supervise(
|
Supervise(
|
||||||
pingpong1,
|
pingpong1,
|
||||||
Permanent)
|
Permanent)
|
||||||
|
|
@ -539,7 +539,7 @@ class SupervisorSpec extends JUnitSuite {
|
||||||
|
|
||||||
Supervisor(
|
Supervisor(
|
||||||
SupervisorConfig(
|
SupervisorConfig(
|
||||||
RestartStrategy(AllForOne, 3, 5000, List(classOf[Exception])),
|
AllForOneStrategy(List(classOf[Exception]), 3, 5000),
|
||||||
Supervise(
|
Supervise(
|
||||||
pingpong1,
|
pingpong1,
|
||||||
Permanent)
|
Permanent)
|
||||||
|
|
@ -561,7 +561,7 @@ class SupervisorSpec extends JUnitSuite {
|
||||||
|
|
||||||
Supervisor(
|
Supervisor(
|
||||||
SupervisorConfig(
|
SupervisorConfig(
|
||||||
RestartStrategy(OneForOne, 3, 5000, List(classOf[Exception])),
|
OneForOneStrategy(List(classOf[Exception]), 3, 5000),
|
||||||
Supervise(
|
Supervise(
|
||||||
pingpong1,
|
pingpong1,
|
||||||
Permanent)
|
Permanent)
|
||||||
|
|
@ -583,13 +583,13 @@ class SupervisorSpec extends JUnitSuite {
|
||||||
|
|
||||||
Supervisor(
|
Supervisor(
|
||||||
SupervisorConfig(
|
SupervisorConfig(
|
||||||
RestartStrategy(AllForOne, 3, 5000, List(classOf[Exception])),
|
AllForOneStrategy(List(classOf[Exception]), 3, 5000),
|
||||||
Supervise(
|
Supervise(
|
||||||
pingpong1,
|
pingpong1,
|
||||||
Permanent)
|
Permanent)
|
||||||
::
|
::
|
||||||
SupervisorConfig(
|
SupervisorConfig(
|
||||||
RestartStrategy(AllForOne, 3, 5000, Nil),
|
AllForOneStrategy(Nil, 3, 5000),
|
||||||
Supervise(
|
Supervise(
|
||||||
pingpong2,
|
pingpong2,
|
||||||
Permanent)
|
Permanent)
|
||||||
|
|
|
||||||
|
|
@ -110,8 +110,7 @@ class SchedulerSpec extends JUnitSuite {
|
||||||
|
|
||||||
Supervisor(
|
Supervisor(
|
||||||
SupervisorConfig(
|
SupervisorConfig(
|
||||||
RestartStrategy(AllForOne, 3, 1000,
|
AllForOneStrategy(List(classOf[Exception]), 3, 1000),
|
||||||
List(classOf[Exception])),
|
|
||||||
Supervise(
|
Supervise(
|
||||||
actor,
|
actor,
|
||||||
Permanent)
|
Permanent)
|
||||||
|
|
|
||||||
|
|
@ -241,8 +241,7 @@ object Cluster extends Cluster with Logging {
|
||||||
|
|
||||||
private[akka] def createSupervisor(actor: ActorRef): Option[Supervisor] =
|
private[akka] def createSupervisor(actor: ActorRef): Option[Supervisor] =
|
||||||
Some(Supervisor(
|
Some(Supervisor(
|
||||||
SupervisorConfig(
|
SupervisorConfig(OneForOneStrategy(List(classOf[Exception]), 5, 1000),
|
||||||
RestartStrategy(OneForOne, 5, 1000, List(classOf[Exception])),
|
|
||||||
Supervise(actor, Permanent) :: Nil)))
|
Supervise(actor, Permanent) :: Nil)))
|
||||||
|
|
||||||
private[this] def clusterActor = if (clusterActorRef.isEmpty) None else Some(clusterActorRef.get.actor.asInstanceOf[ClusterActor])
|
private[this] def clusterActor = if (clusterActorRef.isEmpty) None else Some(clusterActorRef.get.actor.asInstanceOf[ClusterActor])
|
||||||
|
|
|
||||||
|
|
@ -480,7 +480,7 @@ class RemoteSupervisorSpec extends JUnitSuite {
|
||||||
|
|
||||||
val factory = SupervisorFactory(
|
val factory = SupervisorFactory(
|
||||||
SupervisorConfig(
|
SupervisorConfig(
|
||||||
RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])),
|
AllForOneStrategy(List(classOf[Exception]), 3, 100),
|
||||||
Supervise(
|
Supervise(
|
||||||
pingpong1,
|
pingpong1,
|
||||||
Permanent)
|
Permanent)
|
||||||
|
|
@ -496,7 +496,7 @@ class RemoteSupervisorSpec extends JUnitSuite {
|
||||||
|
|
||||||
val factory = SupervisorFactory(
|
val factory = SupervisorFactory(
|
||||||
SupervisorConfig(
|
SupervisorConfig(
|
||||||
RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])),
|
OneForOneStrategy(List(classOf[Exception]), 3, 100),
|
||||||
Supervise(
|
Supervise(
|
||||||
pingpong1,
|
pingpong1,
|
||||||
Permanent)
|
Permanent)
|
||||||
|
|
@ -517,7 +517,7 @@ class RemoteSupervisorSpec extends JUnitSuite {
|
||||||
|
|
||||||
val factory = SupervisorFactory(
|
val factory = SupervisorFactory(
|
||||||
SupervisorConfig(
|
SupervisorConfig(
|
||||||
RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])),
|
AllForOneStrategy(List(classOf[Exception]), 3, 100),
|
||||||
Supervise(
|
Supervise(
|
||||||
pingpong1,
|
pingpong1,
|
||||||
Permanent)
|
Permanent)
|
||||||
|
|
@ -548,7 +548,7 @@ class RemoteSupervisorSpec extends JUnitSuite {
|
||||||
|
|
||||||
val factory = SupervisorFactory(
|
val factory = SupervisorFactory(
|
||||||
SupervisorConfig(
|
SupervisorConfig(
|
||||||
RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])),
|
OneForOneStrategy(List(classOf[Exception]), 3, 100),
|
||||||
Supervise(
|
Supervise(
|
||||||
pingpong1,
|
pingpong1,
|
||||||
Permanent)
|
Permanent)
|
||||||
|
|
@ -577,13 +577,13 @@ class RemoteSupervisorSpec extends JUnitSuite {
|
||||||
|
|
||||||
val factory = SupervisorFactory(
|
val factory = SupervisorFactory(
|
||||||
SupervisorConfig(
|
SupervisorConfig(
|
||||||
RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])),
|
AllForOneStrategy(List(classOf[Exception]), 3, 100),
|
||||||
Supervise(
|
Supervise(
|
||||||
pingpong1,
|
pingpong1,
|
||||||
Permanent)
|
Permanent)
|
||||||
::
|
::
|
||||||
SupervisorConfig(
|
SupervisorConfig(
|
||||||
RestartStrategy(AllForOne, 3, 100, List(classOf[Exception])),
|
AllForOneStrategy(List(classOf[Exception]), 3, 100),
|
||||||
Supervise(
|
Supervise(
|
||||||
pingpong2,
|
pingpong2,
|
||||||
Permanent)
|
Permanent)
|
||||||
|
|
|
||||||
|
|
@ -48,7 +48,7 @@ class RemoteTypedActorSpec extends
|
||||||
server.start("localhost", 9995)
|
server.start("localhost", 9995)
|
||||||
Config.config
|
Config.config
|
||||||
conf.configure(
|
conf.configure(
|
||||||
new RestartStrategy(AllForOne(), 3, 5000, List(classOf[Exception]).toArray),
|
new AllForOneStrategy(List(classOf[Exception]), 3, 5000),
|
||||||
List(
|
List(
|
||||||
new SuperviseTypedActor(
|
new SuperviseTypedActor(
|
||||||
classOf[RemoteTypedActorOne],
|
classOf[RemoteTypedActorOne],
|
||||||
|
|
|
||||||
|
|
@ -11,7 +11,7 @@ public class Boot {
|
||||||
public final static TypedActorConfigurator configurator = new TypedActorConfigurator();
|
public final static TypedActorConfigurator configurator = new TypedActorConfigurator();
|
||||||
static {
|
static {
|
||||||
configurator.configure(
|
configurator.configure(
|
||||||
new RestartStrategy(new OneForOne(), 3, 5000, new Class[]{Exception.class}),
|
new OneForOneStrategy(new Class[]{Exception.class}, 3, 5000),
|
||||||
new SuperviseTypedActor[] {
|
new SuperviseTypedActor[] {
|
||||||
new SuperviseTypedActor(
|
new SuperviseTypedActor(
|
||||||
SimpleService.class,
|
SimpleService.class,
|
||||||
|
|
|
||||||
|
|
@ -25,7 +25,7 @@ import org.atmosphere.jersey.Broadcastable
|
||||||
class Boot {
|
class Boot {
|
||||||
val factory = SupervisorFactory(
|
val factory = SupervisorFactory(
|
||||||
SupervisorConfig(
|
SupervisorConfig(
|
||||||
RestartStrategy(OneForOne, 3, 100,List(classOf[Exception])),
|
OneForOneStrategy(List(classOf[Exception]), 3, 100),
|
||||||
Supervise(
|
Supervise(
|
||||||
actorOf[SimpleServiceActor],
|
actorOf[SimpleServiceActor],
|
||||||
Permanent) ::
|
Permanent) ::
|
||||||
|
|
|
||||||
|
|
@ -15,7 +15,7 @@ import se.scalablesolutions.akka.actor.ActorRegistry.actorFor
|
||||||
class Boot {
|
class Boot {
|
||||||
val factory = SupervisorFactory(
|
val factory = SupervisorFactory(
|
||||||
SupervisorConfig(
|
SupervisorConfig(
|
||||||
RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])),
|
OneForOneStrategy(List(classOf[Exception]), 3, 100),
|
||||||
// Dummy implementations of all authentication actors
|
// Dummy implementations of all authentication actors
|
||||||
// see akka.conf to enable one of these for the AkkaSecurityFilterFactory
|
// see akka.conf to enable one of these for the AkkaSecurityFilterFactory
|
||||||
Supervise(
|
Supervise(
|
||||||
|
|
|
||||||
|
|
@ -47,12 +47,17 @@ class SupervisionBeanDefinitionParser extends AbstractSingleBeanDefinitionParser
|
||||||
}
|
}
|
||||||
|
|
||||||
private[akka] def parseRestartStrategy(element: Element, builder: BeanDefinitionBuilder) {
|
private[akka] def parseRestartStrategy(element: Element, builder: BeanDefinitionBuilder) {
|
||||||
val failover = if (mandatory(element, FAILOVER) == "AllForOne") AllForOne else OneForOne
|
val failover = mandatory(element, FAILOVER)
|
||||||
val timeRange = mandatory(element, TIME_RANGE).toInt
|
val timeRange = mandatory(element, TIME_RANGE).toInt
|
||||||
val retries = mandatory(element, RETRIES).toInt
|
val retries = mandatory(element, RETRIES).toInt
|
||||||
val trapExitsElement = mandatoryElement(element, TRAP_EXISTS_TAG)
|
val trapExitsElement = mandatoryElement(element, TRAP_EXISTS_TAG)
|
||||||
val trapExceptions = parseTrapExits(trapExitsElement)
|
val trapExceptions = parseTrapExits(trapExitsElement)
|
||||||
val restartStrategy = new RestartStrategy(failover, retries, timeRange, trapExceptions)
|
|
||||||
|
val restartStrategy = failover match {
|
||||||
|
case "AllForOne" => new AllForOneStrategy(trapExceptions, retries, timeRange)
|
||||||
|
case "OneForOne" => new OneForOneStrategy(trapExceptions, retries, timeRange)
|
||||||
|
case _ => new OneForOneStrategy(trapExceptions, retries, timeRange) //Default to OneForOne
|
||||||
|
}
|
||||||
builder.addPropertyValue("restartStrategy", restartStrategy)
|
builder.addPropertyValue("restartStrategy", restartStrategy)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -15,7 +15,7 @@ import se.scalablesolutions.akka.config.{TypedActorConfigurator, RemoteAddress}
|
||||||
* @author michaelkober
|
* @author michaelkober
|
||||||
*/
|
*/
|
||||||
class SupervisionFactoryBean extends AbstractFactoryBean[AnyRef] {
|
class SupervisionFactoryBean extends AbstractFactoryBean[AnyRef] {
|
||||||
@BeanProperty var restartStrategy: RestartStrategy = _
|
@BeanProperty var restartStrategy: FaultHandlingStrategy = _
|
||||||
@BeanProperty var supervised: List[ActorProperties] = _
|
@BeanProperty var supervised: List[ActorProperties] = _
|
||||||
@BeanProperty var typed: String = ""
|
@BeanProperty var typed: String = ""
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -11,7 +11,7 @@ import ScalaDom._
|
||||||
|
|
||||||
import org.w3c.dom.Element
|
import org.w3c.dom.Element
|
||||||
import org.springframework.beans.factory.support.BeanDefinitionBuilder
|
import org.springframework.beans.factory.support.BeanDefinitionBuilder
|
||||||
import se.scalablesolutions.akka.config.Supervision. {RestartStrategy, AllForOne}
|
import se.scalablesolutions.akka.config.Supervision. {FaultHandlingStrategy, AllForOneStrategy}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test for SupervisionBeanDefinitionParser
|
* Test for SupervisionBeanDefinitionParser
|
||||||
|
|
@ -35,13 +35,11 @@ class SupervisionBeanDefinitionParserTest extends Spec with ShouldMatchers {
|
||||||
|
|
||||||
it("should parse the supervisor restart strategy") {
|
it("should parse the supervisor restart strategy") {
|
||||||
parser.parseSupervisor(createSupervisorElement, builder);
|
parser.parseSupervisor(createSupervisorElement, builder);
|
||||||
val strategy = builder.getBeanDefinition.getPropertyValues.getPropertyValue("restartStrategy").getValue.asInstanceOf[RestartStrategy]
|
val strategy = builder.getBeanDefinition.getPropertyValues.getPropertyValue("restartStrategy").getValue.asInstanceOf[FaultHandlingStrategy]
|
||||||
assert(strategy ne null)
|
assert(strategy ne null)
|
||||||
assert(strategy.scheme match {
|
assert(strategy.isInstanceOf[AllForOneStrategy])
|
||||||
case AllForOne => true
|
expect(3) { strategy.asInstanceOf[AllForOneStrategy].maxNrOfRetries.get }
|
||||||
case _ => false })
|
expect(1000) { strategy.asInstanceOf[AllForOneStrategy].withinTimeRange.get }
|
||||||
expect(3) { strategy.maxNrOfRetries }
|
|
||||||
expect(1000) { strategy.withinTimeRange }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
it("should parse the supervised typed actors") {
|
it("should parse the supervised typed actors") {
|
||||||
|
|
|
||||||
|
|
@ -15,7 +15,7 @@ private[akka] class Foo
|
||||||
@RunWith(classOf[JUnitRunner])
|
@RunWith(classOf[JUnitRunner])
|
||||||
class SupervisionFactoryBeanTest extends Spec with ShouldMatchers {
|
class SupervisionFactoryBeanTest extends Spec with ShouldMatchers {
|
||||||
|
|
||||||
val restartStrategy = new RestartStrategy(AllForOne(), 3, 1000, Array(classOf[Throwable]))
|
val faultHandlingStrategy = new AllForOneStrategy(List(classOf[Exception]), 3, 1000)
|
||||||
val typedActors = List(createTypedActorProperties("se.scalablesolutions.akka.spring.Foo", "1000"))
|
val typedActors = List(createTypedActorProperties("se.scalablesolutions.akka.spring.Foo", "1000"))
|
||||||
|
|
||||||
private def createTypedActorProperties(target: String, timeout: String) : ActorProperties = {
|
private def createTypedActorProperties(target: String, timeout: String) : ActorProperties = {
|
||||||
|
|
@ -28,8 +28,8 @@ class SupervisionFactoryBeanTest extends Spec with ShouldMatchers {
|
||||||
describe("A SupervisionFactoryBean") {
|
describe("A SupervisionFactoryBean") {
|
||||||
val bean = new SupervisionFactoryBean
|
val bean = new SupervisionFactoryBean
|
||||||
it("should have java getters and setters for all properties") {
|
it("should have java getters and setters for all properties") {
|
||||||
bean.setRestartStrategy(restartStrategy)
|
bean.setRestartStrategy(faultHandlingStrategy)
|
||||||
assert(bean.getRestartStrategy == restartStrategy)
|
assert(bean.getRestartStrategy == faultHandlingStrategy)
|
||||||
bean.setSupervised(typedActors)
|
bean.setSupervised(typedActors)
|
||||||
assert(bean.getSupervised == typedActors)
|
assert(bean.getSupervised == typedActors)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -731,8 +731,8 @@ object TypedActor extends Logging {
|
||||||
private[akka] def returnsFuture_?(methodRtti: MethodRtti): Boolean =
|
private[akka] def returnsFuture_?(methodRtti: MethodRtti): Boolean =
|
||||||
classOf[Future[_]].isAssignableFrom(methodRtti.getMethod.getReturnType)
|
classOf[Future[_]].isAssignableFrom(methodRtti.getMethod.getReturnType)
|
||||||
|
|
||||||
private[akka] def supervise(restartStrategy: RestartStrategy, components: List[Supervise]): Supervisor =
|
private[akka] def supervise(faultHandlingStrategy: FaultHandlingStrategy, components: List[Supervise]): Supervisor =
|
||||||
Supervisor(SupervisorConfig(restartStrategy, components))
|
Supervisor(SupervisorConfig(faultHandlingStrategy, components))
|
||||||
|
|
||||||
def isJoinPointAndOneWay(message: Any): Boolean = if (isJoinPoint(message))
|
def isJoinPointAndOneWay(message: Any): Boolean = if (isJoinPoint(message))
|
||||||
isOneWay(message.asInstanceOf[JoinPoint].getRtti.asInstanceOf[MethodRtti])
|
isOneWay(message.asInstanceOf[JoinPoint].getRtti.asInstanceOf[MethodRtti])
|
||||||
|
|
|
||||||
|
|
@ -43,9 +43,9 @@ class TypedActorConfigurator {
|
||||||
*/
|
*/
|
||||||
def getInstance[T](clazz: Class[T]): T = INSTANCE.getInstance(clazz).head
|
def getInstance[T](clazz: Class[T]): T = INSTANCE.getInstance(clazz).head
|
||||||
|
|
||||||
def configure(restartStrategy: RestartStrategy, components: Array[SuperviseTypedActor]): TypedActorConfigurator = {
|
def configure(faultHandlingStrategy: FaultHandlingStrategy, components: Array[SuperviseTypedActor]): TypedActorConfigurator = {
|
||||||
INSTANCE.configure(
|
INSTANCE.configure(
|
||||||
restartStrategy,
|
faultHandlingStrategy,
|
||||||
components.toList.asInstanceOf[scala.List[SuperviseTypedActor]])
|
components.toList.asInstanceOf[scala.List[SuperviseTypedActor]])
|
||||||
this
|
this
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -27,7 +27,7 @@ import com.google.inject._
|
||||||
private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBase with Logging {
|
private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBase with Logging {
|
||||||
private var injector: Injector = _
|
private var injector: Injector = _
|
||||||
private var supervisor: Option[Supervisor] = None
|
private var supervisor: Option[Supervisor] = None
|
||||||
private var restartStrategy: RestartStrategy = _
|
private var faultHandlingStrategy: FaultHandlingStrategy = NoFaultHandlingStrategy
|
||||||
private var components: List[SuperviseTypedActor] = _
|
private var components: List[SuperviseTypedActor] = _
|
||||||
private var supervised: List[Supervise] = Nil
|
private var supervised: List[Supervise] = Nil
|
||||||
private var bindings: List[DependencyBinding] = Nil
|
private var bindings: List[DependencyBinding] = Nil
|
||||||
|
|
@ -68,9 +68,9 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa
|
||||||
else c.target
|
else c.target
|
||||||
}
|
}
|
||||||
|
|
||||||
override def configure(restartStrategy: RestartStrategy, components: List[SuperviseTypedActor]):
|
override def configure(faultHandlingStrategy: FaultHandlingStrategy, components: List[SuperviseTypedActor]):
|
||||||
TypedActorConfiguratorBase = synchronized {
|
TypedActorConfiguratorBase = synchronized {
|
||||||
this.restartStrategy = restartStrategy
|
this.faultHandlingStrategy = faultHandlingStrategy
|
||||||
this.components = components.toArray.toList.asInstanceOf[List[SuperviseTypedActor]]
|
this.components = components.toArray.toList.asInstanceOf[List[SuperviseTypedActor]]
|
||||||
bindings = for (component <- this.components) yield {
|
bindings = for (component <- this.components) yield {
|
||||||
newDelegatingProxy(component)
|
newDelegatingProxy(component)
|
||||||
|
|
@ -144,7 +144,7 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa
|
||||||
|
|
||||||
override def supervise: TypedActorConfiguratorBase = synchronized {
|
override def supervise: TypedActorConfiguratorBase = synchronized {
|
||||||
if (injector eq null) inject
|
if (injector eq null) inject
|
||||||
supervisor = Some(TypedActor.supervise(restartStrategy, supervised))
|
supervisor = Some(TypedActor.supervise(faultHandlingStrategy, supervised))
|
||||||
this
|
this
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -173,7 +173,7 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa
|
||||||
typedActorRegistry = new HashMap[Class[_], Tuple3[AnyRef, AnyRef, SuperviseTypedActor]]
|
typedActorRegistry = new HashMap[Class[_], Tuple3[AnyRef, AnyRef, SuperviseTypedActor]]
|
||||||
methodToUriRegistry = new HashMap[Method, String]
|
methodToUriRegistry = new HashMap[Method, String]
|
||||||
injector = null
|
injector = null
|
||||||
restartStrategy = null
|
faultHandlingStrategy = NoFaultHandlingStrategy
|
||||||
}
|
}
|
||||||
|
|
||||||
def stop = synchronized {
|
def stop = synchronized {
|
||||||
|
|
|
||||||
|
|
@ -27,7 +27,7 @@ class RestartTransactionalTypedActorSpec extends
|
||||||
def before {
|
def before {
|
||||||
Config.config
|
Config.config
|
||||||
conf.configure(
|
conf.configure(
|
||||||
new RestartStrategy(AllForOne, 3, 5000, Array(classOf[Exception])),
|
AllForOneStrategy(List(classOf[Exception]), 3, 5000),
|
||||||
List(
|
List(
|
||||||
new SuperviseTypedActor(
|
new SuperviseTypedActor(
|
||||||
classOf[TransactionalTypedActor],
|
classOf[TransactionalTypedActor],
|
||||||
|
|
|
||||||
|
|
@ -35,7 +35,7 @@ class TypedActorGuiceConfiguratorSpec extends
|
||||||
conf.addExternalGuiceModule(new AbstractModule {
|
conf.addExternalGuiceModule(new AbstractModule {
|
||||||
def configure = bind(classOf[Ext]).to(classOf[ExtImpl]).in(Scopes.SINGLETON)
|
def configure = bind(classOf[Ext]).to(classOf[ExtImpl]).in(Scopes.SINGLETON)
|
||||||
}).configure(
|
}).configure(
|
||||||
new RestartStrategy(AllForOne(), 3, 5000, Array(classOf[Exception])),
|
AllForOneStrategy(classOf[Exception] :: Nil, 3, 5000),
|
||||||
List(
|
List(
|
||||||
new SuperviseTypedActor(
|
new SuperviseTypedActor(
|
||||||
classOf[Foo],
|
classOf[Foo],
|
||||||
|
|
|
||||||
|
|
@ -21,7 +21,7 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft
|
||||||
var conf2: TypedActorConfigurator = _
|
var conf2: TypedActorConfigurator = _
|
||||||
|
|
||||||
override protected def beforeAll() = {
|
override protected def beforeAll() = {
|
||||||
val strategy = new RestartStrategy(AllForOne(), 3, 1000, Array(classOf[Exception]))
|
val strategy = AllForOneStrategy(classOf[Exception] :: Nil, 3, 1000)
|
||||||
val comp3 = new SuperviseTypedActor(classOf[SamplePojo], classOf[SamplePojoImpl], permanent(), 1000)
|
val comp3 = new SuperviseTypedActor(classOf[SamplePojo], classOf[SamplePojoImpl], permanent(), 1000)
|
||||||
val comp4 = new SuperviseTypedActor(classOf[SamplePojo], classOf[SamplePojoImpl], temporary(), 1000)
|
val comp4 = new SuperviseTypedActor(classOf[SamplePojo], classOf[SamplePojoImpl], temporary(), 1000)
|
||||||
conf1 = new TypedActorConfigurator().configure(strategy, Array(comp3)).supervise
|
conf1 = new TypedActorConfigurator().configure(strategy, Array(comp3)).supervise
|
||||||
|
|
@ -87,7 +87,7 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft
|
||||||
SamplePojoImpl.reset
|
SamplePojoImpl.reset
|
||||||
val pojo = TypedActor.newInstance(classOf[SimpleJavaPojo], classOf[SimpleJavaPojoImpl])
|
val pojo = TypedActor.newInstance(classOf[SimpleJavaPojo], classOf[SimpleJavaPojoImpl])
|
||||||
val supervisor = TypedActor.newInstance(classOf[SimpleJavaPojo], classOf[SimpleJavaPojoImpl])
|
val supervisor = TypedActor.newInstance(classOf[SimpleJavaPojo], classOf[SimpleJavaPojoImpl])
|
||||||
link(supervisor, pojo, OneForOneStrategy(Array(classOf[Throwable]), 3, 2000))
|
link(supervisor, pojo, OneForOneStrategy(classOf[Throwable] :: Nil, 3, 2000))
|
||||||
pojo.throwException
|
pojo.throwException
|
||||||
Thread.sleep(500)
|
Thread.sleep(500)
|
||||||
SimpleJavaPojoImpl._pre should be(true)
|
SimpleJavaPojoImpl._pre should be(true)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue