Initial attempt at suspend/resume

This commit is contained in:
Viktor Klang 2010-10-11 16:11:51 +02:00
parent 389a5881d2
commit 7fd6ba8cbf
8 changed files with 99 additions and 38 deletions

View file

@ -1038,15 +1038,18 @@ class LocalActorRef private[akka] (
}
protected[akka] def canRestart(maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Boolean = {
log.debug("CanRestart: [%s max / %s current] [%s < %s]",maxNrOfRetries, maxNrOfRetriesCount, System.currentTimeMillis - restartsWithinTimeRangeTimestamp, withinTimeRange)
if (maxNrOfRetries.isEmpty && withinTimeRange.isEmpty) { //Immortal
true
}
else if (withinTimeRange.isEmpty) { // restrict number of restarts
maxNrOfRetriesCount < maxNrOfRetries.get
} else { // cannot restart more than N within M timerange
val maxRetries = if (maxNrOfRetries.isEmpty) 1 else maxNrOfRetries.get //Default to 1, has to match timerange also
!((maxNrOfRetriesCount >= maxRetries) &&
(System.currentTimeMillis - restartsWithinTimeRangeTimestamp < withinTimeRange.get))
val msElapsed = System.currentTimeMillis - restartsWithinTimeRangeTimestamp
msElapsed < withinTimeRange.get && (maxNrOfRetries.isEmpty || maxNrOfRetries.get > maxNrOfRetriesCount)
}
}
@ -1054,7 +1057,6 @@ class LocalActorRef private[akka] (
if (maxNrOfRetriesCount == 0) restartsWithinTimeRangeTimestamp = System.currentTimeMillis
if (!canRestart(maxNrOfRetries, withinTimeRange)) {
val notification = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason)
Actor.log.warning(
"Maximum number of restarts [%s] within time range [%s] reached." +
"\n\tWill *not* restart actor [%s] anymore." +
@ -1063,6 +1065,7 @@ class LocalActorRef private[akka] (
maxNrOfRetries, withinTimeRange, this, reason)
_supervisor.foreach { sup =>
// can supervisor handle the notification?
val notification = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason)
if (sup.isDefinedAt(notification)) notifySupervisorWithMessage(notification)
else Actor.log.warning(
"No message handler defined for system message [MaximumNumberOfRestartsWithinTimeRangeReached]" +
@ -1071,8 +1074,10 @@ class LocalActorRef private[akka] (
stop
} else {
_status = ActorRefInternals.BEING_RESTARTED
val failedActor = actorInstance.get
guard.withGuard {
lifeCycle match {
case Temporary => shutDownTemporaryActor(this)
@ -1083,17 +1088,20 @@ class LocalActorRef private[akka] (
restartLinkedActors(reason, maxNrOfRetries, withinTimeRange)
Actor.log.debug("Invoking 'preRestart' for failed actor instance [%s].", id)
if (isProxyableDispatcher(failedActor)) restartProxyableDispatcher(failedActor, reason)
else restartActor(failedActor, reason)
if (isProxyableDispatcher(failedActor))
restartProxyableDispatcher(failedActor, reason)
else
restartActor(failedActor, reason)
_status = ActorRefInternals.RUNNING
// update restart parameters
if (maxNrOfRetries.isDefined && maxNrOfRetriesCount % maxNrOfRetries.get == 0 && maxNrOfRetriesCount != 0)
if (maxNrOfRetries.isEmpty)
restartsWithinTimeRangeTimestamp = System.currentTimeMillis
else if (!maxNrOfRetries.isDefined)
else
if (maxNrOfRetriesCount != 0 && maxNrOfRetriesCount % maxNrOfRetries.get == 0)
restartsWithinTimeRangeTimestamp = System.currentTimeMillis
maxNrOfRetriesCount += 1
_status = ActorRefInternals.RUNNING
dispatcher.resume(this)
}
}
}
@ -1242,7 +1250,9 @@ class LocalActorRef private[akka] (
private def handleExceptionInDispatch(reason: Throwable, message: Any, topLevelTransaction: Boolean) = {
Actor.log.error(reason, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message)
_status = ActorRefInternals.BEING_RESTARTED
//Prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
// abort transaction set
if (isTransactionSetInScope) {
val txSet = getTransactionSetInScope
@ -1261,7 +1271,7 @@ class LocalActorRef private[akka] (
else {
lifeCycle match {
case Temporary => shutDownTemporaryActor(this)
case _ =>
case _ => dispatcher.resume(this) //Resume processing for this actor
}
}
}

View file

@ -89,6 +89,8 @@ class ExecutorBasedEventDrivenDispatcher(
private[akka] val active = new Switch(false)
val name = "akka:event-driven:dispatcher:" + _name
//Initialize
init
def dispatch(invocation: MessageInvocation) = {
@ -145,7 +147,7 @@ class ExecutorBasedEventDrivenDispatcher(
}
private[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = if (active.isOn) {
if (mbox.dispatcherLock.tryLock()) {
if (mbox.suspended.isOff && mbox.dispatcherLock.tryLock()) {
try {
executor execute mbox
} catch {
@ -158,8 +160,20 @@ class ExecutorBasedEventDrivenDispatcher(
override val toString = getClass.getSimpleName + "[" + name + "]"
def suspend(actorRef: ActorRef) {
log.debug("Suspending %s",actorRef.uuid)
getMailbox(actorRef).suspended.switchOn
}
def resume(actorRef: ActorRef) {
log.debug("Resuming %s",actorRef.uuid)
val mbox = getMailbox(actorRef)
mbox.suspended.switchOff
registerForExecution(mbox)
}
// FIXME: should we have an unbounded queue and not bounded as default ????
private[akka] def init = {
private[akka] def init {
withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
config(this)
buildThreadPool
@ -189,28 +203,33 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue =>
* @return true if the processing finished before the mailbox was empty, due to the throughput constraint
*/
final def processMailbox(): Boolean = {
var nextMessage = self.dequeue
if (nextMessage ne null) {
val throttle = dispatcher.throughput > 0
var processedMessages = 0
val isDeadlineEnabled = throttle && dispatcher.throughputDeadlineTime > 0
val started = if (isDeadlineEnabled) System.currentTimeMillis else 0
do {
nextMessage.invoke
if (self.suspended.isOn)
true
else {
var nextMessage = self.dequeue
if (nextMessage ne null) {
val throttle = dispatcher.throughput > 0
var processedMessages = 0
val isDeadlineEnabled = throttle && dispatcher.throughputDeadlineTime > 0
val started = if (isDeadlineEnabled) System.currentTimeMillis else 0
do {
nextMessage.invoke
if (nextMessage.receiver.isBeingRestarted)
return !self.isEmpty
if (throttle) { // Will be elided when false
processedMessages += 1
if ((processedMessages >= dispatcher.throughput) ||
(isDeadlineEnabled && (System.currentTimeMillis - started) >= dispatcher.throughputDeadlineTime)) // If we're throttled, break out
return !self.isEmpty
}
if (throttle) { // Will be elided when false
processedMessages += 1
if ((processedMessages >= dispatcher.throughput) ||
(isDeadlineEnabled && (System.currentTimeMillis - started) >= dispatcher.throughputDeadlineTime)) // If we're throttled, break out
return !self.isEmpty
}
nextMessage = self.dequeue
} while (nextMessage ne null)
if (self.suspended.isOn)
return true
nextMessage = self.dequeue
} while (nextMessage ne null)
}
false
}
false
}
}

View file

@ -98,10 +98,13 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
* @return
*/
private def processMailbox(mailbox: MessageQueue): Boolean = {
if (mailbox.suspended.isOn)
return false
var messageInvocation = mailbox.dequeue
while (messageInvocation ne null) {
messageInvocation.invoke
if (messageInvocation.receiver.isBeingRestarted)
if (mailbox.suspended.isOn)
return false
messageInvocation = mailbox.dequeue
}
@ -176,6 +179,17 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
uuids.clear
}
def suspend(actorRef: ActorRef) {
getMailbox(actorRef).suspended.switchOn
}
def resume(actorRef: ActorRef) {
val mbox = getMailbox(actorRef)
mbox.suspended.switchOff
executor execute mbox
}
def ensureNotActive(): Unit = if (active.isOn) throw new IllegalActorStateException(
"Can't build a new thread pool for a dispatcher that is already up and running")

View file

@ -169,6 +169,9 @@ class HawtDispatcher(val aggregate: Boolean = true, val parent: DispatchQueue =
else new HawtDispatcherMailbox(queue)
}
def suspend(actorRef: ActorRef) = mailbox(actorRef).suspend
def resume(actorRef:ActorRef) = mailbox(actorRef).resume
def createTransientMailbox(actorRef: ActorRef, mailboxType: TransientMailboxType): AnyRef = null.asInstanceOf[AnyRef]
/**
@ -185,6 +188,9 @@ class HawtDispatcherMailbox(val queue: DispatchQueue) {
invocation.invoke
}
}
def suspend = queue.suspend
def resume = queue.resume
}
class AggregatingHawtDispatcherMailbox(queue:DispatchQueue) extends HawtDispatcherMailbox(queue) {
@ -194,6 +200,9 @@ class AggregatingHawtDispatcherMailbox(queue:DispatchQueue) extends HawtDispatch
private def drain_source = source.getData.foreach(_.invoke)
override def suspend = source.suspend
override def resume = source.resume
override def dispatch(invocation: MessageInvocation) {
if (getCurrentQueue eq null) {
// we are being call from a non hawtdispatch thread, can't aggregate

View file

@ -5,13 +5,13 @@
package se.scalablesolutions.akka.dispatch
import se.scalablesolutions.akka.actor.{Actor, ActorType, ActorRef, ActorInitializationException}
import se.scalablesolutions.akka.util.{SimpleLock, Duration, HashCode, Logging}
import se.scalablesolutions.akka.util.ReflectiveAccess.EnterpriseModule
import se.scalablesolutions.akka.AkkaException
import java.util.{Queue, List}
import java.util.concurrent._
import concurrent.forkjoin.LinkedTransferQueue
import se.scalablesolutions.akka.util._
class MessageQueueAppendFailedException(message: String) extends AkkaException(message)
@ -20,6 +20,7 @@ class MessageQueueAppendFailedException(message: String) extends AkkaException(m
*/
trait MessageQueue {
val dispatcherLock = new SimpleLock
val suspended = new Switch(false)
def enqueue(handle: MessageInvocation)
def dequeue(): MessageInvocation
def size: Int

View file

@ -79,6 +79,9 @@ trait MessageDispatcher extends MailboxFactory with Logging {
if (canBeShutDown) shutdown // shut down in the dispatcher's references is zero
}
def suspend(actorRef: ActorRef): Unit
def resume(actorRef: ActorRef): Unit
def canBeShutDown: Boolean = uuids.isEmpty
def isShutdown: Boolean

View file

@ -135,6 +135,9 @@ class Switch(startAsOn: Boolean = false) {
def switchOff(action: => Unit): Boolean = transcend(from = true, action)
def switchOn(action: => Unit): Boolean = transcend(from = false,action)
def switchOff: Boolean = switch.compareAndSet(true,false)
def switchOn: Boolean = switch.compareAndSet(false,true)
def ifOnYield[T](action: => T): Option[T] = {
if (switch.get)
Some(action)

View file

@ -221,10 +221,12 @@ class RestartStrategySpec extends JUnitSuite {
}
@Test
def slaveShouldNotRestartWithinTimeRange = {
def slaveShouldNotRestartWithinsTimeRange = {
val boss = actorOf(new Actor{
self.faultHandler = OneForOneStrategy(List(classOf[Throwable]), None, Some(1000))
protected def receive = { case _ => () }
protected def receive = {
case m:MaximumNumberOfRestartsWithinTimeRangeReached => log.error(m.toString)
}
}).start
val restartLatch = new StandardLatch