Adding support for optional maxrestarts and withinTime, closing ticket #346
This commit is contained in:
parent
b924647c30
commit
8e8a9c72ff
6 changed files with 60 additions and 16 deletions
|
|
@ -60,8 +60,8 @@ case object ReceiveTimeout extends LifeCycleMessage
|
|||
|
||||
case class MaximumNumberOfRestartsWithinTimeRangeReached(
|
||||
@BeanProperty val victim: ActorRef,
|
||||
@BeanProperty val maxNrOfRetries: Int,
|
||||
@BeanProperty val withinTimeRange: Int,
|
||||
@BeanProperty val maxNrOfRetries: Option[Int],
|
||||
@BeanProperty val withinTimeRange: Option[Int],
|
||||
@BeanProperty val lastExceptionCausingRestart: Throwable) extends LifeCycleMessage
|
||||
|
||||
// Exceptions for Actors
|
||||
|
|
|
|||
|
|
@ -601,9 +601,9 @@ trait ActorRef extends
|
|||
|
||||
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit
|
||||
|
||||
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit
|
||||
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit
|
||||
|
||||
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit
|
||||
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit
|
||||
|
||||
protected[akka] def registerSupervisorAsRemoteActor: Option[String]
|
||||
|
||||
|
|
@ -1037,12 +1037,18 @@ class LocalActorRef private[akka](
|
|||
}
|
||||
}
|
||||
|
||||
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit = {
|
||||
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = {
|
||||
if (maxNrOfRetriesCount == 0) restartsWithinTimeRangeTimestamp = System.currentTimeMillis // first time around
|
||||
maxNrOfRetriesCount += 1
|
||||
|
||||
val tooManyRestarts = if (maxNrOfRetries.isDefined) {
|
||||
maxNrOfRetriesCount += 1
|
||||
maxNrOfRetriesCount > maxNrOfRetries.get
|
||||
} else false
|
||||
|
||||
val restartingHasExpired = if (withinTimeRange.isDefined)
|
||||
(System.currentTimeMillis - restartsWithinTimeRangeTimestamp) > withinTimeRange.get
|
||||
else false
|
||||
|
||||
val tooManyRestarts = maxNrOfRetriesCount > maxNrOfRetries
|
||||
val restartingHasExpired = (System.currentTimeMillis - restartsWithinTimeRangeTimestamp) > withinTimeRange
|
||||
if (tooManyRestarts || restartingHasExpired) {
|
||||
val notification = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason)
|
||||
Actor.log.warning(
|
||||
|
|
@ -1080,7 +1086,7 @@ class LocalActorRef private[akka](
|
|||
}
|
||||
}
|
||||
|
||||
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int) = {
|
||||
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) = {
|
||||
linkedActorsAsList.foreach { actorRef =>
|
||||
actorRef.lifeCycle match {
|
||||
// either permanent or none where default is permanent
|
||||
|
|
@ -1412,8 +1418,8 @@ private[akka] case class RemoteActorRef private[akka] (
|
|||
protected[akka] def mailbox: AnyRef = unsupported
|
||||
protected[akka] def mailbox_=(value: AnyRef):AnyRef = unsupported
|
||||
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported
|
||||
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit = unsupported
|
||||
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit = unsupported
|
||||
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported
|
||||
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported
|
||||
protected[akka] def linkedActors: JMap[String, ActorRef] = unsupported
|
||||
protected[akka] def linkedActorsAsList: List[ActorRef] = unsupported
|
||||
protected[akka] def invoke(messageHandle: MessageInvocation): Unit = unsupported
|
||||
|
|
|
|||
|
|
@ -8,8 +8,19 @@ import se.scalablesolutions.akka.actor.{ActorRef}
|
|||
import se.scalablesolutions.akka.dispatch.MessageDispatcher
|
||||
|
||||
sealed abstract class FaultHandlingStrategy
|
||||
case class AllForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) extends FaultHandlingStrategy
|
||||
case class OneForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) extends FaultHandlingStrategy
|
||||
object AllForOneStrategy {
|
||||
def apply(maxNrOfRetries: Int, withinTimeRange: Int): AllForOneStrategy =
|
||||
AllForOneStrategy(if (maxNrOfRetries < 0) None else Some(maxNrOfRetries),
|
||||
if (withinTimeRange < 0) None else Some(withinTimeRange))
|
||||
}
|
||||
case class AllForOneStrategy(maxNrOfRetries: Option[Int] = None, withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy
|
||||
|
||||
object OneForOneStrategy {
|
||||
def apply(maxNrOfRetries: Int, withinTimeRange: Int): OneForOneStrategy =
|
||||
this(if (maxNrOfRetries < 0) None else Some(maxNrOfRetries),
|
||||
if (withinTimeRange < 0) None else Some(withinTimeRange))
|
||||
}
|
||||
case class OneForOneStrategy(maxNrOfRetries: Option[Int] = None, withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy
|
||||
|
||||
/**
|
||||
* Configuration classes - not to be used as messages.
|
||||
|
|
|
|||
|
|
@ -70,5 +70,32 @@ class RestartStrategySpec extends JUnitSuite {
|
|||
}
|
||||
assert(exceptionLatch.tryAwait(1, TimeUnit.SECONDS))
|
||||
}
|
||||
|
||||
@Test
|
||||
def slaveShouldBeImmortalWithoutMaxRestarts = {
|
||||
|
||||
val boss = actorOf(new Actor{
|
||||
self.trapExit = List(classOf[Throwable])
|
||||
self.faultHandler = Some(OneForOneStrategy(None, None))
|
||||
protected def receive = { case _ => () }
|
||||
}).start
|
||||
|
||||
val countDownLatch = new CountDownLatch(100)
|
||||
|
||||
val slave = actorOf(new Actor{
|
||||
|
||||
protected def receive = {
|
||||
case Crash => throw new Exception("Crashing...")
|
||||
}
|
||||
|
||||
override def postRestart(reason: Throwable) = {
|
||||
countDownLatch.countDown
|
||||
}
|
||||
})
|
||||
|
||||
boss.startLink(slave)
|
||||
(1 to 100) foreach { _ => slave ! Crash }
|
||||
assert(countDownLatch.await(120, TimeUnit.SECONDS))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -247,8 +247,8 @@ private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCall
|
|||
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T](message: Any, timeout: Long, senderOption: Option[ActorRef], senderFuture: Option[CompletableFuture[T]]) = unsupported
|
||||
protected[akka] def mailbox: AnyRef = unsupported
|
||||
protected[akka] def mailbox_=(msg: AnyRef):AnyRef = unsupported
|
||||
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit = unsupported
|
||||
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit = unsupported
|
||||
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported
|
||||
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported
|
||||
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported
|
||||
protected[akka] def linkedActors: JavaMap[String, ActorRef] = unsupported
|
||||
protected[akka] def linkedActorsAsList: List[ActorRef] = unsupported
|
||||
|
|
|
|||
|
|
@ -87,7 +87,7 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft
|
|||
SamplePojoImpl.reset
|
||||
val pojo = TypedActor.newInstance(classOf[SimpleJavaPojo], classOf[SimpleJavaPojoImpl])
|
||||
val supervisor = TypedActor.newInstance(classOf[SimpleJavaPojo], classOf[SimpleJavaPojoImpl])
|
||||
link(supervisor, pojo, new OneForOneStrategy(3, 2000), Array(classOf[Throwable]))
|
||||
link(supervisor, pojo, OneForOneStrategy(3, 2000), Array(classOf[Throwable]))
|
||||
pojo.throwException
|
||||
Thread.sleep(500)
|
||||
SimpleJavaPojoImpl._pre should be(true)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue