diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala index b227a6ffe2..5548d030ff 100644 --- a/akka-actor/src/main/scala/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/actor/ActorRef.scala @@ -21,13 +21,13 @@ import org.multiverse.api.exceptions.DeadTransactionException import java.net.InetSocketAddress import java.util.concurrent.locks.ReentrantLock -import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.{ ScheduledFuture, ConcurrentHashMap, TimeUnit } import java.util.{ Map => JMap } import java.lang.reflect.Field import scala.reflect.BeanProperty import scala.collection.immutable.Stack +import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} private[akka] object ActorRefInternals { @@ -1037,24 +1037,37 @@ class LocalActorRef private[akka] ( } } - protected[akka] def canRestart(maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Boolean = { - if (maxNrOfRetries.isEmpty && withinTimeRange.isEmpty) { //Immortal - true - } - else if (withinTimeRange.isEmpty) { // restrict number of restarts - maxNrOfRetriesCount < maxNrOfRetries.get - } else { // cannot restart more than N within M timerange - val maxRetries = if (maxNrOfRetries.isEmpty) 1 else maxNrOfRetries.get //Default to 1, has to match timerange also - !((maxNrOfRetriesCount >= maxRetries) && - (System.currentTimeMillis - restartsWithinTimeRangeTimestamp < withinTimeRange.get)) - } - } - protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = { - if (maxNrOfRetriesCount == 0) restartsWithinTimeRangeTimestamp = System.currentTimeMillis + val isUnrestartable = if (maxNrOfRetries.isEmpty && withinTimeRange.isEmpty) { //Immortal + false + } + else if (withinTimeRange.isEmpty) { // restrict number of restarts + maxNrOfRetriesCount += 1 //Increment number of retries + maxNrOfRetriesCount > maxNrOfRetries.get + } else { // cannot restart more than N within M timerange + maxNrOfRetriesCount += 1 //Increment number of retries + val windowStart = restartsWithinTimeRangeTimestamp + val now = System.currentTimeMillis + val retries = maxNrOfRetriesCount + //We are within the time window if it isn't the first restart, or if the window hasn't closed + val insideWindow = if (windowStart == 0) + false + else + (now - windowStart) <= withinTimeRange.get - if (!canRestart(maxNrOfRetries, withinTimeRange)) { - val notification = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason) + //The actor is dead if it dies X times within the window of restart + val unrestartable = insideWindow && retries > maxNrOfRetries.getOrElse(1) + + if (windowStart == 0 || !insideWindow) //(Re-)set the start of the window + restartsWithinTimeRangeTimestamp = now + + if (windowStart != 0 && !insideWindow) //Reset number of restarts if window has expired + maxNrOfRetriesCount = 1 + + unrestartable + } + + if (isUnrestartable) { Actor.log.warning( "Maximum number of restarts [%s] within time range [%s] reached." + "\n\tWill *not* restart actor [%s] anymore." + @@ -1063,6 +1076,7 @@ class LocalActorRef private[akka] ( maxNrOfRetries, withinTimeRange, this, reason) _supervisor.foreach { sup => // can supervisor handle the notification? + val notification = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason) if (sup.isDefinedAt(notification)) notifySupervisorWithMessage(notification) else Actor.log.warning( "No message handler defined for system message [MaximumNumberOfRestartsWithinTimeRangeReached]" + @@ -1071,29 +1085,26 @@ class LocalActorRef private[akka] ( stop } else { - _status = ActorRefInternals.BEING_RESTARTED - val failedActor = actorInstance.get guard.withGuard { + _status = ActorRefInternals.BEING_RESTARTED lifeCycle match { case Temporary => shutDownTemporaryActor(this) case _ => + val failedActor = actorInstance.get + // either permanent or none where default is permanent Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id) Actor.log.debug("Restarting linked actors for actor [%s].", id) restartLinkedActors(reason, maxNrOfRetries, withinTimeRange) Actor.log.debug("Invoking 'preRestart' for failed actor instance [%s].", id) - if (isProxyableDispatcher(failedActor)) restartProxyableDispatcher(failedActor, reason) - else restartActor(failedActor, reason) + if (isProxyableDispatcher(failedActor)) + restartProxyableDispatcher(failedActor, reason) + else + restartActor(failedActor, reason) _status = ActorRefInternals.RUNNING - - // update restart parameters - if (maxNrOfRetries.isDefined && maxNrOfRetriesCount % maxNrOfRetries.get == 0 && maxNrOfRetriesCount != 0) - restartsWithinTimeRangeTimestamp = System.currentTimeMillis - else if (!maxNrOfRetries.isDefined) - restartsWithinTimeRangeTimestamp = System.currentTimeMillis - maxNrOfRetriesCount += 1 + dispatcher.resume(this) } } } @@ -1242,7 +1253,9 @@ class LocalActorRef private[akka] ( private def handleExceptionInDispatch(reason: Throwable, message: Any, topLevelTransaction: Boolean) = { Actor.log.error(reason, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message) - _status = ActorRefInternals.BEING_RESTARTED + //Prevent any further messages to be processed until the actor has been restarted + dispatcher.suspend(this) + // abort transaction set if (isTransactionSetInScope) { val txSet = getTransactionSetInScope @@ -1261,7 +1274,7 @@ class LocalActorRef private[akka] ( else { lifeCycle match { case Temporary => shutDownTemporaryActor(this) - case _ => + case _ => dispatcher.resume(this) //Resume processing for this actor } } } diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 63a0044368..213e68b863 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -89,6 +89,8 @@ class ExecutorBasedEventDrivenDispatcher( private[akka] val active = new Switch(false) val name = "akka:event-driven:dispatcher:" + _name + + //Initialize init def dispatch(invocation: MessageInvocation) = { @@ -145,7 +147,7 @@ class ExecutorBasedEventDrivenDispatcher( } private[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = if (active.isOn) { - if (mbox.dispatcherLock.tryLock()) { + if (mbox.suspended.isOff && mbox.dispatcherLock.tryLock()) { try { executor execute mbox } catch { @@ -158,8 +160,20 @@ class ExecutorBasedEventDrivenDispatcher( override val toString = getClass.getSimpleName + "[" + name + "]" + def suspend(actorRef: ActorRef) { + log.debug("Suspending %s",actorRef.uuid) + getMailbox(actorRef).suspended.switchOn + } + + def resume(actorRef: ActorRef) { + log.debug("Resuming %s",actorRef.uuid) + val mbox = getMailbox(actorRef) + mbox.suspended.switchOff + registerForExecution(mbox) + } + // FIXME: should we have an unbounded queue and not bounded as default ???? - private[akka] def init = { + private[akka] def init { withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity config(this) buildThreadPool @@ -189,28 +203,33 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue => * @return true if the processing finished before the mailbox was empty, due to the throughput constraint */ final def processMailbox(): Boolean = { - var nextMessage = self.dequeue - if (nextMessage ne null) { - val throttle = dispatcher.throughput > 0 - var processedMessages = 0 - val isDeadlineEnabled = throttle && dispatcher.throughputDeadlineTime > 0 - val started = if (isDeadlineEnabled) System.currentTimeMillis else 0 - do { - nextMessage.invoke + if (self.suspended.isOn) + true + else { + var nextMessage = self.dequeue + if (nextMessage ne null) { + val throttle = dispatcher.throughput > 0 + var processedMessages = 0 + val isDeadlineEnabled = throttle && dispatcher.throughputDeadlineTime > 0 + val started = if (isDeadlineEnabled) System.currentTimeMillis else 0 + do { + nextMessage.invoke - if (nextMessage.receiver.isBeingRestarted) - return !self.isEmpty + if (throttle) { // Will be elided when false + processedMessages += 1 + if ((processedMessages >= dispatcher.throughput) || + (isDeadlineEnabled && (System.currentTimeMillis - started) >= dispatcher.throughputDeadlineTime)) // If we're throttled, break out + return !self.isEmpty + } - if (throttle) { // Will be elided when false - processedMessages += 1 - if ((processedMessages >= dispatcher.throughput) || - (isDeadlineEnabled && (System.currentTimeMillis - started) >= dispatcher.throughputDeadlineTime)) // If we're throttled, break out - return !self.isEmpty - } - nextMessage = self.dequeue - } while (nextMessage ne null) + if (self.suspended.isOn) + return true + + nextMessage = self.dequeue + } while (nextMessage ne null) + } + false } - false } } diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index 88635dbeae..2bcc7c489f 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -98,10 +98,13 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( * @return */ private def processMailbox(mailbox: MessageQueue): Boolean = { + if (mailbox.suspended.isOn) + return false + var messageInvocation = mailbox.dequeue while (messageInvocation ne null) { messageInvocation.invoke - if (messageInvocation.receiver.isBeingRestarted) + if (mailbox.suspended.isOn) return false messageInvocation = mailbox.dequeue } @@ -176,6 +179,17 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( uuids.clear } + + def suspend(actorRef: ActorRef) { + getMailbox(actorRef).suspended.switchOn + } + + def resume(actorRef: ActorRef) { + val mbox = getMailbox(actorRef) + mbox.suspended.switchOff + executor execute mbox + } + def ensureNotActive(): Unit = if (active.isOn) throw new IllegalActorStateException( "Can't build a new thread pool for a dispatcher that is already up and running") diff --git a/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala b/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala index dabb5a4c2e..4ca63f64f2 100644 --- a/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala @@ -169,6 +169,9 @@ class HawtDispatcher(val aggregate: Boolean = true, val parent: DispatchQueue = else new HawtDispatcherMailbox(queue) } + def suspend(actorRef: ActorRef) = mailbox(actorRef).suspend + def resume(actorRef:ActorRef) = mailbox(actorRef).resume + def createTransientMailbox(actorRef: ActorRef, mailboxType: TransientMailboxType): AnyRef = null.asInstanceOf[AnyRef] /** @@ -185,6 +188,9 @@ class HawtDispatcherMailbox(val queue: DispatchQueue) { invocation.invoke } } + + def suspend = queue.suspend + def resume = queue.resume } class AggregatingHawtDispatcherMailbox(queue:DispatchQueue) extends HawtDispatcherMailbox(queue) { @@ -194,6 +200,9 @@ class AggregatingHawtDispatcherMailbox(queue:DispatchQueue) extends HawtDispatch private def drain_source = source.getData.foreach(_.invoke) + override def suspend = source.suspend + override def resume = source.resume + override def dispatch(invocation: MessageInvocation) { if (getCurrentQueue eq null) { // we are being call from a non hawtdispatch thread, can't aggregate diff --git a/akka-actor/src/main/scala/dispatch/MailboxHandling.scala b/akka-actor/src/main/scala/dispatch/MailboxHandling.scala index 192313f178..68fbebb0e7 100644 --- a/akka-actor/src/main/scala/dispatch/MailboxHandling.scala +++ b/akka-actor/src/main/scala/dispatch/MailboxHandling.scala @@ -5,13 +5,13 @@ package se.scalablesolutions.akka.dispatch import se.scalablesolutions.akka.actor.{Actor, ActorType, ActorRef, ActorInitializationException} -import se.scalablesolutions.akka.util.{SimpleLock, Duration, HashCode, Logging} import se.scalablesolutions.akka.util.ReflectiveAccess.EnterpriseModule import se.scalablesolutions.akka.AkkaException import java.util.{Queue, List} import java.util.concurrent._ import concurrent.forkjoin.LinkedTransferQueue +import se.scalablesolutions.akka.util._ class MessageQueueAppendFailedException(message: String) extends AkkaException(message) @@ -20,6 +20,7 @@ class MessageQueueAppendFailedException(message: String) extends AkkaException(m */ trait MessageQueue { val dispatcherLock = new SimpleLock + val suspended = new Switch(false) def enqueue(handle: MessageInvocation) def dequeue(): MessageInvocation def size: Int diff --git a/akka-actor/src/main/scala/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/dispatch/MessageHandling.scala index c9df3f3b87..86e4552151 100644 --- a/akka-actor/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/dispatch/MessageHandling.scala @@ -79,6 +79,9 @@ trait MessageDispatcher extends MailboxFactory with Logging { if (canBeShutDown) shutdown // shut down in the dispatcher's references is zero } + def suspend(actorRef: ActorRef): Unit + def resume(actorRef: ActorRef): Unit + def canBeShutDown: Boolean = uuids.isEmpty def isShutdown: Boolean diff --git a/akka-actor/src/main/scala/util/LockUtil.scala b/akka-actor/src/main/scala/util/LockUtil.scala index 6df0695f03..7c37267157 100644 --- a/akka-actor/src/main/scala/util/LockUtil.scala +++ b/akka-actor/src/main/scala/util/LockUtil.scala @@ -135,6 +135,9 @@ class Switch(startAsOn: Boolean = false) { def switchOff(action: => Unit): Boolean = transcend(from = true, action) def switchOn(action: => Unit): Boolean = transcend(from = false,action) + def switchOff: Boolean = switch.compareAndSet(true,false) + def switchOn: Boolean = switch.compareAndSet(false,true) + def ifOnYield[T](action: => T): Option[T] = { if (switch.get) Some(action) diff --git a/akka-actor/src/test/scala/actor/supervisor/RestartStrategySpec.scala b/akka-actor/src/test/scala/actor/supervisor/RestartStrategySpec.scala index 7fb9bd6ffe..887785f568 100644 --- a/akka-actor/src/test/scala/actor/supervisor/RestartStrategySpec.scala +++ b/akka-actor/src/test/scala/actor/supervisor/RestartStrategySpec.scala @@ -221,23 +221,25 @@ class RestartStrategySpec extends JUnitSuite { } @Test - def slaveShouldNotRestartWithinTimeRange = { + def slaveShouldNotRestartWithinsTimeRange = { + + val restartLatch,stopLatch,maxNoOfRestartsLatch = new StandardLatch + val countDownLatch = new CountDownLatch(2) + val boss = actorOf(new Actor{ self.faultHandler = OneForOneStrategy(List(classOf[Throwable]), None, Some(1000)) - protected def receive = { case _ => () } + protected def receive = { + case m:MaximumNumberOfRestartsWithinTimeRangeReached => maxNoOfRestartsLatch.open + } }).start - val restartLatch = new StandardLatch - val countDownLatch = new CountDownLatch(3) - val stopLatch = new StandardLatch - - val slave = actorOf(new Actor{ protected def receive = { case Ping => countDownLatch.countDown case Crash => throw new Exception("Crashing...") } + override def postRestart(reason: Throwable) = { restartLatch.open } @@ -266,6 +268,8 @@ class RestartStrategySpec extends JUnitSuite { slave ! Crash assert(stopLatch.tryAwait(1, TimeUnit.SECONDS)) + assert(maxNoOfRestartsLatch.tryAwait(1,TimeUnit.SECONDS)) + assert(!slave.isRunning) } }