From ae3768c2664ffaa9be091b017bdc05b43158a3dd Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 11 Oct 2010 16:11:51 +0200 Subject: [PATCH 1/4] Initial attempt at suspend/resume --- .../src/main/scala/actor/ActorRef.scala | 36 +++++++---- .../ExecutorBasedEventDrivenDispatcher.scala | 61 ++++++++++++------- ...sedEventDrivenWorkStealingDispatcher.scala | 16 ++++- .../main/scala/dispatch/HawtDispatcher.scala | 9 +++ .../main/scala/dispatch/MailboxHandling.scala | 3 +- .../main/scala/dispatch/MessageHandling.scala | 3 + akka-actor/src/main/scala/util/LockUtil.scala | 3 + .../supervisor/RestartStrategySpec.scala | 6 +- 8 files changed, 99 insertions(+), 38 deletions(-) diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala index b227a6ffe2..ccd3e54bd4 100644 --- a/akka-actor/src/main/scala/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/actor/ActorRef.scala @@ -1038,15 +1038,18 @@ class LocalActorRef private[akka] ( } protected[akka] def canRestart(maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Boolean = { + + log.debug("CanRestart: [%s max / %s current] [%s < %s]",maxNrOfRetries, maxNrOfRetriesCount, System.currentTimeMillis - restartsWithinTimeRangeTimestamp, withinTimeRange) + if (maxNrOfRetries.isEmpty && withinTimeRange.isEmpty) { //Immortal true } else if (withinTimeRange.isEmpty) { // restrict number of restarts maxNrOfRetriesCount < maxNrOfRetries.get } else { // cannot restart more than N within M timerange - val maxRetries = if (maxNrOfRetries.isEmpty) 1 else maxNrOfRetries.get //Default to 1, has to match timerange also - !((maxNrOfRetriesCount >= maxRetries) && - (System.currentTimeMillis - restartsWithinTimeRangeTimestamp < withinTimeRange.get)) + val msElapsed = System.currentTimeMillis - restartsWithinTimeRangeTimestamp + + msElapsed < withinTimeRange.get && (maxNrOfRetries.isEmpty || maxNrOfRetries.get > maxNrOfRetriesCount) } } @@ -1054,7 +1057,6 @@ class LocalActorRef private[akka] ( if (maxNrOfRetriesCount == 0) restartsWithinTimeRangeTimestamp = System.currentTimeMillis if (!canRestart(maxNrOfRetries, withinTimeRange)) { - val notification = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason) Actor.log.warning( "Maximum number of restarts [%s] within time range [%s] reached." + "\n\tWill *not* restart actor [%s] anymore." + @@ -1063,6 +1065,7 @@ class LocalActorRef private[akka] ( maxNrOfRetries, withinTimeRange, this, reason) _supervisor.foreach { sup => // can supervisor handle the notification? + val notification = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason) if (sup.isDefinedAt(notification)) notifySupervisorWithMessage(notification) else Actor.log.warning( "No message handler defined for system message [MaximumNumberOfRestartsWithinTimeRangeReached]" + @@ -1071,8 +1074,10 @@ class LocalActorRef private[akka] ( stop } else { + _status = ActorRefInternals.BEING_RESTARTED val failedActor = actorInstance.get + guard.withGuard { lifeCycle match { case Temporary => shutDownTemporaryActor(this) @@ -1083,17 +1088,20 @@ class LocalActorRef private[akka] ( restartLinkedActors(reason, maxNrOfRetries, withinTimeRange) Actor.log.debug("Invoking 'preRestart' for failed actor instance [%s].", id) - if (isProxyableDispatcher(failedActor)) restartProxyableDispatcher(failedActor, reason) - else restartActor(failedActor, reason) + if (isProxyableDispatcher(failedActor)) + restartProxyableDispatcher(failedActor, reason) + else + restartActor(failedActor, reason) - _status = ActorRefInternals.RUNNING - - // update restart parameters - if (maxNrOfRetries.isDefined && maxNrOfRetriesCount % maxNrOfRetries.get == 0 && maxNrOfRetriesCount != 0) + if (maxNrOfRetries.isEmpty) restartsWithinTimeRangeTimestamp = System.currentTimeMillis - else if (!maxNrOfRetries.isDefined) + else + if (maxNrOfRetriesCount != 0 && maxNrOfRetriesCount % maxNrOfRetries.get == 0) restartsWithinTimeRangeTimestamp = System.currentTimeMillis + maxNrOfRetriesCount += 1 + _status = ActorRefInternals.RUNNING + dispatcher.resume(this) } } } @@ -1242,7 +1250,9 @@ class LocalActorRef private[akka] ( private def handleExceptionInDispatch(reason: Throwable, message: Any, topLevelTransaction: Boolean) = { Actor.log.error(reason, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message) - _status = ActorRefInternals.BEING_RESTARTED + //Prevent any further messages to be processed until the actor has been restarted + dispatcher.suspend(this) + // abort transaction set if (isTransactionSetInScope) { val txSet = getTransactionSetInScope @@ -1261,7 +1271,7 @@ class LocalActorRef private[akka] ( else { lifeCycle match { case Temporary => shutDownTemporaryActor(this) - case _ => + case _ => dispatcher.resume(this) //Resume processing for this actor } } } diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 63a0044368..213e68b863 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -89,6 +89,8 @@ class ExecutorBasedEventDrivenDispatcher( private[akka] val active = new Switch(false) val name = "akka:event-driven:dispatcher:" + _name + + //Initialize init def dispatch(invocation: MessageInvocation) = { @@ -145,7 +147,7 @@ class ExecutorBasedEventDrivenDispatcher( } private[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = if (active.isOn) { - if (mbox.dispatcherLock.tryLock()) { + if (mbox.suspended.isOff && mbox.dispatcherLock.tryLock()) { try { executor execute mbox } catch { @@ -158,8 +160,20 @@ class ExecutorBasedEventDrivenDispatcher( override val toString = getClass.getSimpleName + "[" + name + "]" + def suspend(actorRef: ActorRef) { + log.debug("Suspending %s",actorRef.uuid) + getMailbox(actorRef).suspended.switchOn + } + + def resume(actorRef: ActorRef) { + log.debug("Resuming %s",actorRef.uuid) + val mbox = getMailbox(actorRef) + mbox.suspended.switchOff + registerForExecution(mbox) + } + // FIXME: should we have an unbounded queue and not bounded as default ???? - private[akka] def init = { + private[akka] def init { withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity config(this) buildThreadPool @@ -189,28 +203,33 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue => * @return true if the processing finished before the mailbox was empty, due to the throughput constraint */ final def processMailbox(): Boolean = { - var nextMessage = self.dequeue - if (nextMessage ne null) { - val throttle = dispatcher.throughput > 0 - var processedMessages = 0 - val isDeadlineEnabled = throttle && dispatcher.throughputDeadlineTime > 0 - val started = if (isDeadlineEnabled) System.currentTimeMillis else 0 - do { - nextMessage.invoke + if (self.suspended.isOn) + true + else { + var nextMessage = self.dequeue + if (nextMessage ne null) { + val throttle = dispatcher.throughput > 0 + var processedMessages = 0 + val isDeadlineEnabled = throttle && dispatcher.throughputDeadlineTime > 0 + val started = if (isDeadlineEnabled) System.currentTimeMillis else 0 + do { + nextMessage.invoke - if (nextMessage.receiver.isBeingRestarted) - return !self.isEmpty + if (throttle) { // Will be elided when false + processedMessages += 1 + if ((processedMessages >= dispatcher.throughput) || + (isDeadlineEnabled && (System.currentTimeMillis - started) >= dispatcher.throughputDeadlineTime)) // If we're throttled, break out + return !self.isEmpty + } - if (throttle) { // Will be elided when false - processedMessages += 1 - if ((processedMessages >= dispatcher.throughput) || - (isDeadlineEnabled && (System.currentTimeMillis - started) >= dispatcher.throughputDeadlineTime)) // If we're throttled, break out - return !self.isEmpty - } - nextMessage = self.dequeue - } while (nextMessage ne null) + if (self.suspended.isOn) + return true + + nextMessage = self.dequeue + } while (nextMessage ne null) + } + false } - false } } diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index 88635dbeae..2bcc7c489f 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -98,10 +98,13 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( * @return */ private def processMailbox(mailbox: MessageQueue): Boolean = { + if (mailbox.suspended.isOn) + return false + var messageInvocation = mailbox.dequeue while (messageInvocation ne null) { messageInvocation.invoke - if (messageInvocation.receiver.isBeingRestarted) + if (mailbox.suspended.isOn) return false messageInvocation = mailbox.dequeue } @@ -176,6 +179,17 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( uuids.clear } + + def suspend(actorRef: ActorRef) { + getMailbox(actorRef).suspended.switchOn + } + + def resume(actorRef: ActorRef) { + val mbox = getMailbox(actorRef) + mbox.suspended.switchOff + executor execute mbox + } + def ensureNotActive(): Unit = if (active.isOn) throw new IllegalActorStateException( "Can't build a new thread pool for a dispatcher that is already up and running") diff --git a/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala b/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala index dabb5a4c2e..4ca63f64f2 100644 --- a/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala @@ -169,6 +169,9 @@ class HawtDispatcher(val aggregate: Boolean = true, val parent: DispatchQueue = else new HawtDispatcherMailbox(queue) } + def suspend(actorRef: ActorRef) = mailbox(actorRef).suspend + def resume(actorRef:ActorRef) = mailbox(actorRef).resume + def createTransientMailbox(actorRef: ActorRef, mailboxType: TransientMailboxType): AnyRef = null.asInstanceOf[AnyRef] /** @@ -185,6 +188,9 @@ class HawtDispatcherMailbox(val queue: DispatchQueue) { invocation.invoke } } + + def suspend = queue.suspend + def resume = queue.resume } class AggregatingHawtDispatcherMailbox(queue:DispatchQueue) extends HawtDispatcherMailbox(queue) { @@ -194,6 +200,9 @@ class AggregatingHawtDispatcherMailbox(queue:DispatchQueue) extends HawtDispatch private def drain_source = source.getData.foreach(_.invoke) + override def suspend = source.suspend + override def resume = source.resume + override def dispatch(invocation: MessageInvocation) { if (getCurrentQueue eq null) { // we are being call from a non hawtdispatch thread, can't aggregate diff --git a/akka-actor/src/main/scala/dispatch/MailboxHandling.scala b/akka-actor/src/main/scala/dispatch/MailboxHandling.scala index 192313f178..68fbebb0e7 100644 --- a/akka-actor/src/main/scala/dispatch/MailboxHandling.scala +++ b/akka-actor/src/main/scala/dispatch/MailboxHandling.scala @@ -5,13 +5,13 @@ package se.scalablesolutions.akka.dispatch import se.scalablesolutions.akka.actor.{Actor, ActorType, ActorRef, ActorInitializationException} -import se.scalablesolutions.akka.util.{SimpleLock, Duration, HashCode, Logging} import se.scalablesolutions.akka.util.ReflectiveAccess.EnterpriseModule import se.scalablesolutions.akka.AkkaException import java.util.{Queue, List} import java.util.concurrent._ import concurrent.forkjoin.LinkedTransferQueue +import se.scalablesolutions.akka.util._ class MessageQueueAppendFailedException(message: String) extends AkkaException(message) @@ -20,6 +20,7 @@ class MessageQueueAppendFailedException(message: String) extends AkkaException(m */ trait MessageQueue { val dispatcherLock = new SimpleLock + val suspended = new Switch(false) def enqueue(handle: MessageInvocation) def dequeue(): MessageInvocation def size: Int diff --git a/akka-actor/src/main/scala/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/dispatch/MessageHandling.scala index c9df3f3b87..86e4552151 100644 --- a/akka-actor/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/dispatch/MessageHandling.scala @@ -79,6 +79,9 @@ trait MessageDispatcher extends MailboxFactory with Logging { if (canBeShutDown) shutdown // shut down in the dispatcher's references is zero } + def suspend(actorRef: ActorRef): Unit + def resume(actorRef: ActorRef): Unit + def canBeShutDown: Boolean = uuids.isEmpty def isShutdown: Boolean diff --git a/akka-actor/src/main/scala/util/LockUtil.scala b/akka-actor/src/main/scala/util/LockUtil.scala index 6df0695f03..7c37267157 100644 --- a/akka-actor/src/main/scala/util/LockUtil.scala +++ b/akka-actor/src/main/scala/util/LockUtil.scala @@ -135,6 +135,9 @@ class Switch(startAsOn: Boolean = false) { def switchOff(action: => Unit): Boolean = transcend(from = true, action) def switchOn(action: => Unit): Boolean = transcend(from = false,action) + def switchOff: Boolean = switch.compareAndSet(true,false) + def switchOn: Boolean = switch.compareAndSet(false,true) + def ifOnYield[T](action: => T): Option[T] = { if (switch.get) Some(action) diff --git a/akka-actor/src/test/scala/actor/supervisor/RestartStrategySpec.scala b/akka-actor/src/test/scala/actor/supervisor/RestartStrategySpec.scala index 7fb9bd6ffe..ea1547f4cf 100644 --- a/akka-actor/src/test/scala/actor/supervisor/RestartStrategySpec.scala +++ b/akka-actor/src/test/scala/actor/supervisor/RestartStrategySpec.scala @@ -221,10 +221,12 @@ class RestartStrategySpec extends JUnitSuite { } @Test - def slaveShouldNotRestartWithinTimeRange = { + def slaveShouldNotRestartWithinsTimeRange = { val boss = actorOf(new Actor{ self.faultHandler = OneForOneStrategy(List(classOf[Throwable]), None, Some(1000)) - protected def receive = { case _ => () } + protected def receive = { + case m:MaximumNumberOfRestartsWithinTimeRangeReached => log.error(m.toString) + } }).start val restartLatch = new StandardLatch From 6b9a895dca95836cb1068092795b77143214b661 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 11 Oct 2010 18:16:26 +0200 Subject: [PATCH 2/4] Rewrote restart code, resetting restarts outside tiem window etc --- .../src/main/scala/actor/ActorRef.scala | 64 +++++++++---------- 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala index ccd3e54bd4..0843f421ef 100644 --- a/akka-actor/src/main/scala/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/actor/ActorRef.scala @@ -21,13 +21,13 @@ import org.multiverse.api.exceptions.DeadTransactionException import java.net.InetSocketAddress import java.util.concurrent.locks.ReentrantLock -import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.{ ScheduledFuture, ConcurrentHashMap, TimeUnit } import java.util.{ Map => JMap } import java.lang.reflect.Field import scala.reflect.BeanProperty import scala.collection.immutable.Stack +import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} private[akka] object ActorRefInternals { @@ -644,8 +644,7 @@ class LocalActorRef private[akka] ( private[akka] lazy val _linkedActors = new ConcurrentHashMap[Uuid, ActorRef] @volatile private[akka] var _supervisor: Option[ActorRef] = None - @volatile - private var maxNrOfRetriesCount: Int = 0 + private val maxNrOfRetriesCount = new AtomicInteger(0) @volatile private var restartsWithinTimeRangeTimestamp: Long = 0L @volatile @@ -1037,26 +1036,35 @@ class LocalActorRef private[akka] ( } } - protected[akka] def canRestart(maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Boolean = { - - log.debug("CanRestart: [%s max / %s current] [%s < %s]",maxNrOfRetries, maxNrOfRetriesCount, System.currentTimeMillis - restartsWithinTimeRangeTimestamp, withinTimeRange) - - if (maxNrOfRetries.isEmpty && withinTimeRange.isEmpty) { //Immortal - true - } - else if (withinTimeRange.isEmpty) { // restrict number of restarts - maxNrOfRetriesCount < maxNrOfRetries.get - } else { // cannot restart more than N within M timerange - val msElapsed = System.currentTimeMillis - restartsWithinTimeRangeTimestamp - - msElapsed < withinTimeRange.get && (maxNrOfRetries.isEmpty || maxNrOfRetries.get > maxNrOfRetriesCount) - } - } - protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = { - if (maxNrOfRetriesCount == 0) restartsWithinTimeRangeTimestamp = System.currentTimeMillis + val isUnrestartable = if (maxNrOfRetries.isEmpty && withinTimeRange.isEmpty) { //Immortal + false + } + else if (withinTimeRange.isEmpty) { // restrict number of restarts + maxNrOfRetriesCount.incrementAndGet > maxNrOfRetries.get + } else { // cannot restart more than N within M timerange + val windowStart = restartsWithinTimeRangeTimestamp + val now = System.currentTimeMillis - if (!canRestart(maxNrOfRetries, withinTimeRange)) { + //We are within the time window if it isn't the first restart, or if the window hasn't closed + val insideWindow = if (windowStart == 0) + false + else + (now - windowStart) <= withinTimeRange.get + + //The actor is dead if it dies X times within the window of restart + val unrestartable = insideWindow && maxNrOfRetriesCount.incrementAndGet > maxNrOfRetries.getOrElse(1) + + if (windowStart == 0 || !insideWindow) //(Re-)set the start of the window + restartsWithinTimeRangeTimestamp = now + + if (windowStart != 0 && !insideWindow) //Reset number of restarts if window has expired + maxNrOfRetriesCount.set(1) + + unrestartable + } + + if (isUnrestartable) { Actor.log.warning( "Maximum number of restarts [%s] within time range [%s] reached." + "\n\tWill *not* restart actor [%s] anymore." + @@ -1074,14 +1082,13 @@ class LocalActorRef private[akka] ( stop } else { - - _status = ActorRefInternals.BEING_RESTARTED - val failedActor = actorInstance.get - guard.withGuard { + _status = ActorRefInternals.BEING_RESTARTED lifeCycle match { case Temporary => shutDownTemporaryActor(this) case _ => + val failedActor = actorInstance.get + // either permanent or none where default is permanent Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id) Actor.log.debug("Restarting linked actors for actor [%s].", id) @@ -1093,13 +1100,6 @@ class LocalActorRef private[akka] ( else restartActor(failedActor, reason) - if (maxNrOfRetries.isEmpty) - restartsWithinTimeRangeTimestamp = System.currentTimeMillis - else - if (maxNrOfRetriesCount != 0 && maxNrOfRetriesCount % maxNrOfRetries.get == 0) - restartsWithinTimeRangeTimestamp = System.currentTimeMillis - - maxNrOfRetriesCount += 1 _status = ActorRefInternals.RUNNING dispatcher.resume(this) } From b42dfbc777ada4eb5a0b20f385e015fa2407518a Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 11 Oct 2010 18:41:43 +0200 Subject: [PATCH 3/4] Tuned test to work, also fixed a bug in the restart logic --- akka-actor/src/main/scala/actor/ActorRef.scala | 4 ++-- .../actor/supervisor/RestartStrategySpec.scala | 14 ++++++++------ 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala index 0843f421ef..3c112ce2ff 100644 --- a/akka-actor/src/main/scala/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/actor/ActorRef.scala @@ -1045,7 +1045,7 @@ class LocalActorRef private[akka] ( } else { // cannot restart more than N within M timerange val windowStart = restartsWithinTimeRangeTimestamp val now = System.currentTimeMillis - + val retries = maxNrOfRetriesCount.incrementAndGet //We are within the time window if it isn't the first restart, or if the window hasn't closed val insideWindow = if (windowStart == 0) false @@ -1053,7 +1053,7 @@ class LocalActorRef private[akka] ( (now - windowStart) <= withinTimeRange.get //The actor is dead if it dies X times within the window of restart - val unrestartable = insideWindow && maxNrOfRetriesCount.incrementAndGet > maxNrOfRetries.getOrElse(1) + val unrestartable = insideWindow && retries > maxNrOfRetries.getOrElse(1) if (windowStart == 0 || !insideWindow) //(Re-)set the start of the window restartsWithinTimeRangeTimestamp = now diff --git a/akka-actor/src/test/scala/actor/supervisor/RestartStrategySpec.scala b/akka-actor/src/test/scala/actor/supervisor/RestartStrategySpec.scala index ea1547f4cf..887785f568 100644 --- a/akka-actor/src/test/scala/actor/supervisor/RestartStrategySpec.scala +++ b/akka-actor/src/test/scala/actor/supervisor/RestartStrategySpec.scala @@ -222,24 +222,24 @@ class RestartStrategySpec extends JUnitSuite { @Test def slaveShouldNotRestartWithinsTimeRange = { + + val restartLatch,stopLatch,maxNoOfRestartsLatch = new StandardLatch + val countDownLatch = new CountDownLatch(2) + val boss = actorOf(new Actor{ self.faultHandler = OneForOneStrategy(List(classOf[Throwable]), None, Some(1000)) protected def receive = { - case m:MaximumNumberOfRestartsWithinTimeRangeReached => log.error(m.toString) + case m:MaximumNumberOfRestartsWithinTimeRangeReached => maxNoOfRestartsLatch.open } }).start - val restartLatch = new StandardLatch - val countDownLatch = new CountDownLatch(3) - val stopLatch = new StandardLatch - - val slave = actorOf(new Actor{ protected def receive = { case Ping => countDownLatch.countDown case Crash => throw new Exception("Crashing...") } + override def postRestart(reason: Throwable) = { restartLatch.open } @@ -268,6 +268,8 @@ class RestartStrategySpec extends JUnitSuite { slave ! Crash assert(stopLatch.tryAwait(1, TimeUnit.SECONDS)) + assert(maxNoOfRestartsLatch.tryAwait(1,TimeUnit.SECONDS)) + assert(!slave.isRunning) } } From fdbfbe35d98b33de04c3df99011a5397d67e814d Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 11 Oct 2010 19:15:14 +0200 Subject: [PATCH 4/4] Switching to volatile int instead of AtomicInteger until ticket 384 is done --- akka-actor/src/main/scala/actor/ActorRef.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala index 3c112ce2ff..5548d030ff 100644 --- a/akka-actor/src/main/scala/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/actor/ActorRef.scala @@ -644,7 +644,8 @@ class LocalActorRef private[akka] ( private[akka] lazy val _linkedActors = new ConcurrentHashMap[Uuid, ActorRef] @volatile private[akka] var _supervisor: Option[ActorRef] = None - private val maxNrOfRetriesCount = new AtomicInteger(0) + @volatile + private var maxNrOfRetriesCount: Int = 0 @volatile private var restartsWithinTimeRangeTimestamp: Long = 0L @volatile @@ -1041,11 +1042,13 @@ class LocalActorRef private[akka] ( false } else if (withinTimeRange.isEmpty) { // restrict number of restarts - maxNrOfRetriesCount.incrementAndGet > maxNrOfRetries.get + maxNrOfRetriesCount += 1 //Increment number of retries + maxNrOfRetriesCount > maxNrOfRetries.get } else { // cannot restart more than N within M timerange + maxNrOfRetriesCount += 1 //Increment number of retries val windowStart = restartsWithinTimeRangeTimestamp val now = System.currentTimeMillis - val retries = maxNrOfRetriesCount.incrementAndGet + val retries = maxNrOfRetriesCount //We are within the time window if it isn't the first restart, or if the window hasn't closed val insideWindow = if (windowStart == 0) false @@ -1059,7 +1062,7 @@ class LocalActorRef private[akka] ( restartsWithinTimeRangeTimestamp = now if (windowStart != 0 && !insideWindow) //Reset number of restarts if window has expired - maxNrOfRetriesCount.set(1) + maxNrOfRetriesCount = 1 unrestartable }