From a001552cfebbc421c38f2c8ac9abd88790600136 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 11 Oct 2010 18:16:26 +0200 Subject: [PATCH] Rewrote restart code, resetting restarts outside tiem window etc --- .../src/main/scala/actor/ActorRef.scala | 64 +++++++++---------- 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala index ccd3e54bd4..0843f421ef 100644 --- a/akka-actor/src/main/scala/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/actor/ActorRef.scala @@ -21,13 +21,13 @@ import org.multiverse.api.exceptions.DeadTransactionException import java.net.InetSocketAddress import java.util.concurrent.locks.ReentrantLock -import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.{ ScheduledFuture, ConcurrentHashMap, TimeUnit } import java.util.{ Map => JMap } import java.lang.reflect.Field import scala.reflect.BeanProperty import scala.collection.immutable.Stack +import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} private[akka] object ActorRefInternals { @@ -644,8 +644,7 @@ class LocalActorRef private[akka] ( private[akka] lazy val _linkedActors = new ConcurrentHashMap[Uuid, ActorRef] @volatile private[akka] var _supervisor: Option[ActorRef] = None - @volatile - private var maxNrOfRetriesCount: Int = 0 + private val maxNrOfRetriesCount = new AtomicInteger(0) @volatile private var restartsWithinTimeRangeTimestamp: Long = 0L @volatile @@ -1037,26 +1036,35 @@ class LocalActorRef private[akka] ( } } - protected[akka] def canRestart(maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Boolean = { - - log.debug("CanRestart: [%s max / %s current] [%s < %s]",maxNrOfRetries, maxNrOfRetriesCount, System.currentTimeMillis - restartsWithinTimeRangeTimestamp, withinTimeRange) - - if (maxNrOfRetries.isEmpty && withinTimeRange.isEmpty) { //Immortal - true - } - else if (withinTimeRange.isEmpty) { // restrict number of restarts - maxNrOfRetriesCount < maxNrOfRetries.get - } else { // cannot restart more than N within M timerange - val msElapsed = System.currentTimeMillis - restartsWithinTimeRangeTimestamp - - msElapsed < withinTimeRange.get && (maxNrOfRetries.isEmpty || maxNrOfRetries.get > maxNrOfRetriesCount) - } - } - protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = { - if (maxNrOfRetriesCount == 0) restartsWithinTimeRangeTimestamp = System.currentTimeMillis + val isUnrestartable = if (maxNrOfRetries.isEmpty && withinTimeRange.isEmpty) { //Immortal + false + } + else if (withinTimeRange.isEmpty) { // restrict number of restarts + maxNrOfRetriesCount.incrementAndGet > maxNrOfRetries.get + } else { // cannot restart more than N within M timerange + val windowStart = restartsWithinTimeRangeTimestamp + val now = System.currentTimeMillis - if (!canRestart(maxNrOfRetries, withinTimeRange)) { + //We are within the time window if it isn't the first restart, or if the window hasn't closed + val insideWindow = if (windowStart == 0) + false + else + (now - windowStart) <= withinTimeRange.get + + //The actor is dead if it dies X times within the window of restart + val unrestartable = insideWindow && maxNrOfRetriesCount.incrementAndGet > maxNrOfRetries.getOrElse(1) + + if (windowStart == 0 || !insideWindow) //(Re-)set the start of the window + restartsWithinTimeRangeTimestamp = now + + if (windowStart != 0 && !insideWindow) //Reset number of restarts if window has expired + maxNrOfRetriesCount.set(1) + + unrestartable + } + + if (isUnrestartable) { Actor.log.warning( "Maximum number of restarts [%s] within time range [%s] reached." + "\n\tWill *not* restart actor [%s] anymore." + @@ -1074,14 +1082,13 @@ class LocalActorRef private[akka] ( stop } else { - - _status = ActorRefInternals.BEING_RESTARTED - val failedActor = actorInstance.get - guard.withGuard { + _status = ActorRefInternals.BEING_RESTARTED lifeCycle match { case Temporary => shutDownTemporaryActor(this) case _ => + val failedActor = actorInstance.get + // either permanent or none where default is permanent Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id) Actor.log.debug("Restarting linked actors for actor [%s].", id) @@ -1093,13 +1100,6 @@ class LocalActorRef private[akka] ( else restartActor(failedActor, reason) - if (maxNrOfRetries.isEmpty) - restartsWithinTimeRangeTimestamp = System.currentTimeMillis - else - if (maxNrOfRetriesCount != 0 && maxNrOfRetriesCount % maxNrOfRetries.get == 0) - restartsWithinTimeRangeTimestamp = System.currentTimeMillis - - maxNrOfRetriesCount += 1 _status = ActorRefInternals.RUNNING dispatcher.resume(this) }