diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala index b227a6ffe2..ccd3e54bd4 100644 --- a/akka-actor/src/main/scala/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/actor/ActorRef.scala @@ -1038,15 +1038,18 @@ class LocalActorRef private[akka] ( } protected[akka] def canRestart(maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Boolean = { + + log.debug("CanRestart: [%s max / %s current] [%s < %s]",maxNrOfRetries, maxNrOfRetriesCount, System.currentTimeMillis - restartsWithinTimeRangeTimestamp, withinTimeRange) + if (maxNrOfRetries.isEmpty && withinTimeRange.isEmpty) { //Immortal true } else if (withinTimeRange.isEmpty) { // restrict number of restarts maxNrOfRetriesCount < maxNrOfRetries.get } else { // cannot restart more than N within M timerange - val maxRetries = if (maxNrOfRetries.isEmpty) 1 else maxNrOfRetries.get //Default to 1, has to match timerange also - !((maxNrOfRetriesCount >= maxRetries) && - (System.currentTimeMillis - restartsWithinTimeRangeTimestamp < withinTimeRange.get)) + val msElapsed = System.currentTimeMillis - restartsWithinTimeRangeTimestamp + + msElapsed < withinTimeRange.get && (maxNrOfRetries.isEmpty || maxNrOfRetries.get > maxNrOfRetriesCount) } } @@ -1054,7 +1057,6 @@ class LocalActorRef private[akka] ( if (maxNrOfRetriesCount == 0) restartsWithinTimeRangeTimestamp = System.currentTimeMillis if (!canRestart(maxNrOfRetries, withinTimeRange)) { - val notification = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason) Actor.log.warning( "Maximum number of restarts [%s] within time range [%s] reached." + "\n\tWill *not* restart actor [%s] anymore." + @@ -1063,6 +1065,7 @@ class LocalActorRef private[akka] ( maxNrOfRetries, withinTimeRange, this, reason) _supervisor.foreach { sup => // can supervisor handle the notification? + val notification = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason) if (sup.isDefinedAt(notification)) notifySupervisorWithMessage(notification) else Actor.log.warning( "No message handler defined for system message [MaximumNumberOfRestartsWithinTimeRangeReached]" + @@ -1071,8 +1074,10 @@ class LocalActorRef private[akka] ( stop } else { + _status = ActorRefInternals.BEING_RESTARTED val failedActor = actorInstance.get + guard.withGuard { lifeCycle match { case Temporary => shutDownTemporaryActor(this) @@ -1083,17 +1088,20 @@ class LocalActorRef private[akka] ( restartLinkedActors(reason, maxNrOfRetries, withinTimeRange) Actor.log.debug("Invoking 'preRestart' for failed actor instance [%s].", id) - if (isProxyableDispatcher(failedActor)) restartProxyableDispatcher(failedActor, reason) - else restartActor(failedActor, reason) + if (isProxyableDispatcher(failedActor)) + restartProxyableDispatcher(failedActor, reason) + else + restartActor(failedActor, reason) - _status = ActorRefInternals.RUNNING - - // update restart parameters - if (maxNrOfRetries.isDefined && maxNrOfRetriesCount % maxNrOfRetries.get == 0 && maxNrOfRetriesCount != 0) + if (maxNrOfRetries.isEmpty) restartsWithinTimeRangeTimestamp = System.currentTimeMillis - else if (!maxNrOfRetries.isDefined) + else + if (maxNrOfRetriesCount != 0 && maxNrOfRetriesCount % maxNrOfRetries.get == 0) restartsWithinTimeRangeTimestamp = System.currentTimeMillis + maxNrOfRetriesCount += 1 + _status = ActorRefInternals.RUNNING + dispatcher.resume(this) } } } @@ -1242,7 +1250,9 @@ class LocalActorRef private[akka] ( private def handleExceptionInDispatch(reason: Throwable, message: Any, topLevelTransaction: Boolean) = { Actor.log.error(reason, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message) - _status = ActorRefInternals.BEING_RESTARTED + //Prevent any further messages to be processed until the actor has been restarted + dispatcher.suspend(this) + // abort transaction set if (isTransactionSetInScope) { val txSet = getTransactionSetInScope @@ -1261,7 +1271,7 @@ class LocalActorRef private[akka] ( else { lifeCycle match { case Temporary => shutDownTemporaryActor(this) - case _ => + case _ => dispatcher.resume(this) //Resume processing for this actor } } } diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 63a0044368..213e68b863 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -89,6 +89,8 @@ class ExecutorBasedEventDrivenDispatcher( private[akka] val active = new Switch(false) val name = "akka:event-driven:dispatcher:" + _name + + //Initialize init def dispatch(invocation: MessageInvocation) = { @@ -145,7 +147,7 @@ class ExecutorBasedEventDrivenDispatcher( } private[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = if (active.isOn) { - if (mbox.dispatcherLock.tryLock()) { + if (mbox.suspended.isOff && mbox.dispatcherLock.tryLock()) { try { executor execute mbox } catch { @@ -158,8 +160,20 @@ class ExecutorBasedEventDrivenDispatcher( override val toString = getClass.getSimpleName + "[" + name + "]" + def suspend(actorRef: ActorRef) { + log.debug("Suspending %s",actorRef.uuid) + getMailbox(actorRef).suspended.switchOn + } + + def resume(actorRef: ActorRef) { + log.debug("Resuming %s",actorRef.uuid) + val mbox = getMailbox(actorRef) + mbox.suspended.switchOff + registerForExecution(mbox) + } + // FIXME: should we have an unbounded queue and not bounded as default ???? - private[akka] def init = { + private[akka] def init { withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity config(this) buildThreadPool @@ -189,28 +203,33 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue => * @return true if the processing finished before the mailbox was empty, due to the throughput constraint */ final def processMailbox(): Boolean = { - var nextMessage = self.dequeue - if (nextMessage ne null) { - val throttle = dispatcher.throughput > 0 - var processedMessages = 0 - val isDeadlineEnabled = throttle && dispatcher.throughputDeadlineTime > 0 - val started = if (isDeadlineEnabled) System.currentTimeMillis else 0 - do { - nextMessage.invoke + if (self.suspended.isOn) + true + else { + var nextMessage = self.dequeue + if (nextMessage ne null) { + val throttle = dispatcher.throughput > 0 + var processedMessages = 0 + val isDeadlineEnabled = throttle && dispatcher.throughputDeadlineTime > 0 + val started = if (isDeadlineEnabled) System.currentTimeMillis else 0 + do { + nextMessage.invoke - if (nextMessage.receiver.isBeingRestarted) - return !self.isEmpty + if (throttle) { // Will be elided when false + processedMessages += 1 + if ((processedMessages >= dispatcher.throughput) || + (isDeadlineEnabled && (System.currentTimeMillis - started) >= dispatcher.throughputDeadlineTime)) // If we're throttled, break out + return !self.isEmpty + } - if (throttle) { // Will be elided when false - processedMessages += 1 - if ((processedMessages >= dispatcher.throughput) || - (isDeadlineEnabled && (System.currentTimeMillis - started) >= dispatcher.throughputDeadlineTime)) // If we're throttled, break out - return !self.isEmpty - } - nextMessage = self.dequeue - } while (nextMessage ne null) + if (self.suspended.isOn) + return true + + nextMessage = self.dequeue + } while (nextMessage ne null) + } + false } - false } } diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index 88635dbeae..2bcc7c489f 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -98,10 +98,13 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( * @return */ private def processMailbox(mailbox: MessageQueue): Boolean = { + if (mailbox.suspended.isOn) + return false + var messageInvocation = mailbox.dequeue while (messageInvocation ne null) { messageInvocation.invoke - if (messageInvocation.receiver.isBeingRestarted) + if (mailbox.suspended.isOn) return false messageInvocation = mailbox.dequeue } @@ -176,6 +179,17 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( uuids.clear } + + def suspend(actorRef: ActorRef) { + getMailbox(actorRef).suspended.switchOn + } + + def resume(actorRef: ActorRef) { + val mbox = getMailbox(actorRef) + mbox.suspended.switchOff + executor execute mbox + } + def ensureNotActive(): Unit = if (active.isOn) throw new IllegalActorStateException( "Can't build a new thread pool for a dispatcher that is already up and running") diff --git a/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala b/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala index dabb5a4c2e..4ca63f64f2 100644 --- a/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala @@ -169,6 +169,9 @@ class HawtDispatcher(val aggregate: Boolean = true, val parent: DispatchQueue = else new HawtDispatcherMailbox(queue) } + def suspend(actorRef: ActorRef) = mailbox(actorRef).suspend + def resume(actorRef:ActorRef) = mailbox(actorRef).resume + def createTransientMailbox(actorRef: ActorRef, mailboxType: TransientMailboxType): AnyRef = null.asInstanceOf[AnyRef] /** @@ -185,6 +188,9 @@ class HawtDispatcherMailbox(val queue: DispatchQueue) { invocation.invoke } } + + def suspend = queue.suspend + def resume = queue.resume } class AggregatingHawtDispatcherMailbox(queue:DispatchQueue) extends HawtDispatcherMailbox(queue) { @@ -194,6 +200,9 @@ class AggregatingHawtDispatcherMailbox(queue:DispatchQueue) extends HawtDispatch private def drain_source = source.getData.foreach(_.invoke) + override def suspend = source.suspend + override def resume = source.resume + override def dispatch(invocation: MessageInvocation) { if (getCurrentQueue eq null) { // we are being call from a non hawtdispatch thread, can't aggregate diff --git a/akka-actor/src/main/scala/dispatch/MailboxHandling.scala b/akka-actor/src/main/scala/dispatch/MailboxHandling.scala index 192313f178..68fbebb0e7 100644 --- a/akka-actor/src/main/scala/dispatch/MailboxHandling.scala +++ b/akka-actor/src/main/scala/dispatch/MailboxHandling.scala @@ -5,13 +5,13 @@ package se.scalablesolutions.akka.dispatch import se.scalablesolutions.akka.actor.{Actor, ActorType, ActorRef, ActorInitializationException} -import se.scalablesolutions.akka.util.{SimpleLock, Duration, HashCode, Logging} import se.scalablesolutions.akka.util.ReflectiveAccess.EnterpriseModule import se.scalablesolutions.akka.AkkaException import java.util.{Queue, List} import java.util.concurrent._ import concurrent.forkjoin.LinkedTransferQueue +import se.scalablesolutions.akka.util._ class MessageQueueAppendFailedException(message: String) extends AkkaException(message) @@ -20,6 +20,7 @@ class MessageQueueAppendFailedException(message: String) extends AkkaException(m */ trait MessageQueue { val dispatcherLock = new SimpleLock + val suspended = new Switch(false) def enqueue(handle: MessageInvocation) def dequeue(): MessageInvocation def size: Int diff --git a/akka-actor/src/main/scala/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/dispatch/MessageHandling.scala index c9df3f3b87..86e4552151 100644 --- a/akka-actor/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/dispatch/MessageHandling.scala @@ -79,6 +79,9 @@ trait MessageDispatcher extends MailboxFactory with Logging { if (canBeShutDown) shutdown // shut down in the dispatcher's references is zero } + def suspend(actorRef: ActorRef): Unit + def resume(actorRef: ActorRef): Unit + def canBeShutDown: Boolean = uuids.isEmpty def isShutdown: Boolean diff --git a/akka-actor/src/main/scala/util/LockUtil.scala b/akka-actor/src/main/scala/util/LockUtil.scala index 6df0695f03..7c37267157 100644 --- a/akka-actor/src/main/scala/util/LockUtil.scala +++ b/akka-actor/src/main/scala/util/LockUtil.scala @@ -135,6 +135,9 @@ class Switch(startAsOn: Boolean = false) { def switchOff(action: => Unit): Boolean = transcend(from = true, action) def switchOn(action: => Unit): Boolean = transcend(from = false,action) + def switchOff: Boolean = switch.compareAndSet(true,false) + def switchOn: Boolean = switch.compareAndSet(false,true) + def ifOnYield[T](action: => T): Option[T] = { if (switch.get) Some(action) diff --git a/akka-actor/src/test/scala/actor/supervisor/RestartStrategySpec.scala b/akka-actor/src/test/scala/actor/supervisor/RestartStrategySpec.scala index 7fb9bd6ffe..ea1547f4cf 100644 --- a/akka-actor/src/test/scala/actor/supervisor/RestartStrategySpec.scala +++ b/akka-actor/src/test/scala/actor/supervisor/RestartStrategySpec.scala @@ -221,10 +221,12 @@ class RestartStrategySpec extends JUnitSuite { } @Test - def slaveShouldNotRestartWithinTimeRange = { + def slaveShouldNotRestartWithinsTimeRange = { val boss = actorOf(new Actor{ self.faultHandler = OneForOneStrategy(List(classOf[Throwable]), None, Some(1000)) - protected def receive = { case _ => () } + protected def receive = { + case m:MaximumNumberOfRestartsWithinTimeRangeReached => log.error(m.toString) + } }).start val restartLatch = new StandardLatch