From 8e8a9c72ffd47056db4adfec3f131feeba7ac8bf Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 16 Sep 2010 17:09:16 +0200 Subject: [PATCH] Adding support for optional maxrestarts and withinTime, closing ticket #346 --- akka-actor/src/main/scala/actor/Actor.scala | 4 +-- .../src/main/scala/actor/ActorRef.scala | 24 ++++++++++------- .../main/scala/config/SupervisionConfig.scala | 15 +++++++++-- .../supervisor/RestartStrategySpec.scala | 27 +++++++++++++++++++ .../main/scala/component/ActorComponent.scala | 4 +-- .../typed-actor/TypedActorLifecycleSpec.scala | 2 +- 6 files changed, 60 insertions(+), 16 deletions(-) diff --git a/akka-actor/src/main/scala/actor/Actor.scala b/akka-actor/src/main/scala/actor/Actor.scala index 872997c760..d232ca2a77 100644 --- a/akka-actor/src/main/scala/actor/Actor.scala +++ b/akka-actor/src/main/scala/actor/Actor.scala @@ -60,8 +60,8 @@ case object ReceiveTimeout extends LifeCycleMessage case class MaximumNumberOfRestartsWithinTimeRangeReached( @BeanProperty val victim: ActorRef, - @BeanProperty val maxNrOfRetries: Int, - @BeanProperty val withinTimeRange: Int, + @BeanProperty val maxNrOfRetries: Option[Int], + @BeanProperty val withinTimeRange: Option[Int], @BeanProperty val lastExceptionCausingRestart: Throwable) extends LifeCycleMessage // Exceptions for Actors diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala index a29be88a60..4905e62670 100644 --- a/akka-actor/src/main/scala/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/actor/ActorRef.scala @@ -601,9 +601,9 @@ trait ActorRef extends protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit - protected[akka] def restart(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit + protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit - protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit + protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit protected[akka] def registerSupervisorAsRemoteActor: Option[String] @@ -1037,12 +1037,18 @@ class LocalActorRef private[akka]( } } - protected[akka] def restart(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit = { + protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = { if (maxNrOfRetriesCount == 0) restartsWithinTimeRangeTimestamp = System.currentTimeMillis // first time around - maxNrOfRetriesCount += 1 + + val tooManyRestarts = if (maxNrOfRetries.isDefined) { + maxNrOfRetriesCount += 1 + maxNrOfRetriesCount > maxNrOfRetries.get + } else false + + val restartingHasExpired = if (withinTimeRange.isDefined) + (System.currentTimeMillis - restartsWithinTimeRangeTimestamp) > withinTimeRange.get + else false - val tooManyRestarts = maxNrOfRetriesCount > maxNrOfRetries - val restartingHasExpired = (System.currentTimeMillis - restartsWithinTimeRangeTimestamp) > withinTimeRange if (tooManyRestarts || restartingHasExpired) { val notification = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason) Actor.log.warning( @@ -1080,7 +1086,7 @@ class LocalActorRef private[akka]( } } - protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int) = { + protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) = { linkedActorsAsList.foreach { actorRef => actorRef.lifeCycle match { // either permanent or none where default is permanent @@ -1412,8 +1418,8 @@ private[akka] case class RemoteActorRef private[akka] ( protected[akka] def mailbox: AnyRef = unsupported protected[akka] def mailbox_=(value: AnyRef):AnyRef = unsupported protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported - protected[akka] def restart(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit = unsupported - protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit = unsupported + protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported + protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported protected[akka] def linkedActors: JMap[String, ActorRef] = unsupported protected[akka] def linkedActorsAsList: List[ActorRef] = unsupported protected[akka] def invoke(messageHandle: MessageInvocation): Unit = unsupported diff --git a/akka-actor/src/main/scala/config/SupervisionConfig.scala b/akka-actor/src/main/scala/config/SupervisionConfig.scala index 2f25f4ed33..d85001b5ca 100644 --- a/akka-actor/src/main/scala/config/SupervisionConfig.scala +++ b/akka-actor/src/main/scala/config/SupervisionConfig.scala @@ -8,8 +8,19 @@ import se.scalablesolutions.akka.actor.{ActorRef} import se.scalablesolutions.akka.dispatch.MessageDispatcher sealed abstract class FaultHandlingStrategy -case class AllForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) extends FaultHandlingStrategy -case class OneForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) extends FaultHandlingStrategy +object AllForOneStrategy { + def apply(maxNrOfRetries: Int, withinTimeRange: Int): AllForOneStrategy = + AllForOneStrategy(if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), + if (withinTimeRange < 0) None else Some(withinTimeRange)) +} +case class AllForOneStrategy(maxNrOfRetries: Option[Int] = None, withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy + +object OneForOneStrategy { + def apply(maxNrOfRetries: Int, withinTimeRange: Int): OneForOneStrategy = + this(if (maxNrOfRetries < 0) None else Some(maxNrOfRetries), + if (withinTimeRange < 0) None else Some(withinTimeRange)) +} +case class OneForOneStrategy(maxNrOfRetries: Option[Int] = None, withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy /** * Configuration classes - not to be used as messages. diff --git a/akka-actor/src/test/scala/actor/supervisor/RestartStrategySpec.scala b/akka-actor/src/test/scala/actor/supervisor/RestartStrategySpec.scala index 234a0bd25d..b9fa238963 100644 --- a/akka-actor/src/test/scala/actor/supervisor/RestartStrategySpec.scala +++ b/akka-actor/src/test/scala/actor/supervisor/RestartStrategySpec.scala @@ -70,5 +70,32 @@ class RestartStrategySpec extends JUnitSuite { } assert(exceptionLatch.tryAwait(1, TimeUnit.SECONDS)) } + + @Test + def slaveShouldBeImmortalWithoutMaxRestarts = { + + val boss = actorOf(new Actor{ + self.trapExit = List(classOf[Throwable]) + self.faultHandler = Some(OneForOneStrategy(None, None)) + protected def receive = { case _ => () } + }).start + + val countDownLatch = new CountDownLatch(100) + + val slave = actorOf(new Actor{ + + protected def receive = { + case Crash => throw new Exception("Crashing...") + } + + override def postRestart(reason: Throwable) = { + countDownLatch.countDown + } + }) + + boss.startLink(slave) + (1 to 100) foreach { _ => slave ! Crash } + assert(countDownLatch.await(120, TimeUnit.SECONDS)) + } } diff --git a/akka-camel/src/main/scala/component/ActorComponent.scala b/akka-camel/src/main/scala/component/ActorComponent.scala index a5d56dd9dc..6c1c5902fa 100644 --- a/akka-camel/src/main/scala/component/ActorComponent.scala +++ b/akka-camel/src/main/scala/component/ActorComponent.scala @@ -247,8 +247,8 @@ private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCall protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T](message: Any, timeout: Long, senderOption: Option[ActorRef], senderFuture: Option[CompletableFuture[T]]) = unsupported protected[akka] def mailbox: AnyRef = unsupported protected[akka] def mailbox_=(msg: AnyRef):AnyRef = unsupported - protected[akka] def restart(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit = unsupported - protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit = unsupported + protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported + protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported protected[akka] def linkedActors: JavaMap[String, ActorRef] = unsupported protected[akka] def linkedActorsAsList: List[ActorRef] = unsupported diff --git a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala index 9a21af06da..052f4cc7de 100644 --- a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala +++ b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala @@ -87,7 +87,7 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft SamplePojoImpl.reset val pojo = TypedActor.newInstance(classOf[SimpleJavaPojo], classOf[SimpleJavaPojoImpl]) val supervisor = TypedActor.newInstance(classOf[SimpleJavaPojo], classOf[SimpleJavaPojoImpl]) - link(supervisor, pojo, new OneForOneStrategy(3, 2000), Array(classOf[Throwable])) + link(supervisor, pojo, OneForOneStrategy(3, 2000), Array(classOf[Throwable])) pojo.throwException Thread.sleep(500) SimpleJavaPojoImpl._pre should be(true)