Merge branch 'ticket462'
This commit is contained in:
commit
236ff9d125
8 changed files with 126 additions and 60 deletions
|
|
@ -21,13 +21,13 @@ import org.multiverse.api.exceptions.DeadTransactionException
|
|||
|
||||
import java.net.InetSocketAddress
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
import java.util.concurrent.{ ScheduledFuture, ConcurrentHashMap, TimeUnit }
|
||||
import java.util.{ Map => JMap }
|
||||
import java.lang.reflect.Field
|
||||
|
||||
import scala.reflect.BeanProperty
|
||||
import scala.collection.immutable.Stack
|
||||
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
|
||||
|
||||
private[akka] object ActorRefInternals {
|
||||
|
||||
|
|
@ -1037,24 +1037,37 @@ class LocalActorRef private[akka] (
|
|||
}
|
||||
}
|
||||
|
||||
protected[akka] def canRestart(maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Boolean = {
|
||||
if (maxNrOfRetries.isEmpty && withinTimeRange.isEmpty) { //Immortal
|
||||
true
|
||||
}
|
||||
else if (withinTimeRange.isEmpty) { // restrict number of restarts
|
||||
maxNrOfRetriesCount < maxNrOfRetries.get
|
||||
} else { // cannot restart more than N within M timerange
|
||||
val maxRetries = if (maxNrOfRetries.isEmpty) 1 else maxNrOfRetries.get //Default to 1, has to match timerange also
|
||||
!((maxNrOfRetriesCount >= maxRetries) &&
|
||||
(System.currentTimeMillis - restartsWithinTimeRangeTimestamp < withinTimeRange.get))
|
||||
}
|
||||
}
|
||||
|
||||
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = {
|
||||
if (maxNrOfRetriesCount == 0) restartsWithinTimeRangeTimestamp = System.currentTimeMillis
|
||||
val isUnrestartable = if (maxNrOfRetries.isEmpty && withinTimeRange.isEmpty) { //Immortal
|
||||
false
|
||||
}
|
||||
else if (withinTimeRange.isEmpty) { // restrict number of restarts
|
||||
maxNrOfRetriesCount += 1 //Increment number of retries
|
||||
maxNrOfRetriesCount > maxNrOfRetries.get
|
||||
} else { // cannot restart more than N within M timerange
|
||||
maxNrOfRetriesCount += 1 //Increment number of retries
|
||||
val windowStart = restartsWithinTimeRangeTimestamp
|
||||
val now = System.currentTimeMillis
|
||||
val retries = maxNrOfRetriesCount
|
||||
//We are within the time window if it isn't the first restart, or if the window hasn't closed
|
||||
val insideWindow = if (windowStart == 0)
|
||||
false
|
||||
else
|
||||
(now - windowStart) <= withinTimeRange.get
|
||||
|
||||
if (!canRestart(maxNrOfRetries, withinTimeRange)) {
|
||||
val notification = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason)
|
||||
//The actor is dead if it dies X times within the window of restart
|
||||
val unrestartable = insideWindow && retries > maxNrOfRetries.getOrElse(1)
|
||||
|
||||
if (windowStart == 0 || !insideWindow) //(Re-)set the start of the window
|
||||
restartsWithinTimeRangeTimestamp = now
|
||||
|
||||
if (windowStart != 0 && !insideWindow) //Reset number of restarts if window has expired
|
||||
maxNrOfRetriesCount = 1
|
||||
|
||||
unrestartable
|
||||
}
|
||||
|
||||
if (isUnrestartable) {
|
||||
Actor.log.warning(
|
||||
"Maximum number of restarts [%s] within time range [%s] reached." +
|
||||
"\n\tWill *not* restart actor [%s] anymore." +
|
||||
|
|
@ -1063,6 +1076,7 @@ class LocalActorRef private[akka] (
|
|||
maxNrOfRetries, withinTimeRange, this, reason)
|
||||
_supervisor.foreach { sup =>
|
||||
// can supervisor handle the notification?
|
||||
val notification = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason)
|
||||
if (sup.isDefinedAt(notification)) notifySupervisorWithMessage(notification)
|
||||
else Actor.log.warning(
|
||||
"No message handler defined for system message [MaximumNumberOfRestartsWithinTimeRangeReached]" +
|
||||
|
|
@ -1071,29 +1085,26 @@ class LocalActorRef private[akka] (
|
|||
|
||||
stop
|
||||
} else {
|
||||
_status = ActorRefInternals.BEING_RESTARTED
|
||||
val failedActor = actorInstance.get
|
||||
guard.withGuard {
|
||||
_status = ActorRefInternals.BEING_RESTARTED
|
||||
lifeCycle match {
|
||||
case Temporary => shutDownTemporaryActor(this)
|
||||
case _ =>
|
||||
val failedActor = actorInstance.get
|
||||
|
||||
// either permanent or none where default is permanent
|
||||
Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id)
|
||||
Actor.log.debug("Restarting linked actors for actor [%s].", id)
|
||||
restartLinkedActors(reason, maxNrOfRetries, withinTimeRange)
|
||||
|
||||
Actor.log.debug("Invoking 'preRestart' for failed actor instance [%s].", id)
|
||||
if (isProxyableDispatcher(failedActor)) restartProxyableDispatcher(failedActor, reason)
|
||||
else restartActor(failedActor, reason)
|
||||
if (isProxyableDispatcher(failedActor))
|
||||
restartProxyableDispatcher(failedActor, reason)
|
||||
else
|
||||
restartActor(failedActor, reason)
|
||||
|
||||
_status = ActorRefInternals.RUNNING
|
||||
|
||||
// update restart parameters
|
||||
if (maxNrOfRetries.isDefined && maxNrOfRetriesCount % maxNrOfRetries.get == 0 && maxNrOfRetriesCount != 0)
|
||||
restartsWithinTimeRangeTimestamp = System.currentTimeMillis
|
||||
else if (!maxNrOfRetries.isDefined)
|
||||
restartsWithinTimeRangeTimestamp = System.currentTimeMillis
|
||||
maxNrOfRetriesCount += 1
|
||||
dispatcher.resume(this)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1242,7 +1253,9 @@ class LocalActorRef private[akka] (
|
|||
private def handleExceptionInDispatch(reason: Throwable, message: Any, topLevelTransaction: Boolean) = {
|
||||
Actor.log.error(reason, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message)
|
||||
|
||||
_status = ActorRefInternals.BEING_RESTARTED
|
||||
//Prevent any further messages to be processed until the actor has been restarted
|
||||
dispatcher.suspend(this)
|
||||
|
||||
// abort transaction set
|
||||
if (isTransactionSetInScope) {
|
||||
val txSet = getTransactionSetInScope
|
||||
|
|
@ -1261,7 +1274,7 @@ class LocalActorRef private[akka] (
|
|||
else {
|
||||
lifeCycle match {
|
||||
case Temporary => shutDownTemporaryActor(this)
|
||||
case _ =>
|
||||
case _ => dispatcher.resume(this) //Resume processing for this actor
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -89,6 +89,8 @@ class ExecutorBasedEventDrivenDispatcher(
|
|||
private[akka] val active = new Switch(false)
|
||||
|
||||
val name = "akka:event-driven:dispatcher:" + _name
|
||||
|
||||
//Initialize
|
||||
init
|
||||
|
||||
def dispatch(invocation: MessageInvocation) = {
|
||||
|
|
@ -145,7 +147,7 @@ class ExecutorBasedEventDrivenDispatcher(
|
|||
}
|
||||
|
||||
private[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = if (active.isOn) {
|
||||
if (mbox.dispatcherLock.tryLock()) {
|
||||
if (mbox.suspended.isOff && mbox.dispatcherLock.tryLock()) {
|
||||
try {
|
||||
executor execute mbox
|
||||
} catch {
|
||||
|
|
@ -158,8 +160,20 @@ class ExecutorBasedEventDrivenDispatcher(
|
|||
|
||||
override val toString = getClass.getSimpleName + "[" + name + "]"
|
||||
|
||||
def suspend(actorRef: ActorRef) {
|
||||
log.debug("Suspending %s",actorRef.uuid)
|
||||
getMailbox(actorRef).suspended.switchOn
|
||||
}
|
||||
|
||||
def resume(actorRef: ActorRef) {
|
||||
log.debug("Resuming %s",actorRef.uuid)
|
||||
val mbox = getMailbox(actorRef)
|
||||
mbox.suspended.switchOff
|
||||
registerForExecution(mbox)
|
||||
}
|
||||
|
||||
// FIXME: should we have an unbounded queue and not bounded as default ????
|
||||
private[akka] def init = {
|
||||
private[akka] def init {
|
||||
withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
|
||||
config(this)
|
||||
buildThreadPool
|
||||
|
|
@ -189,28 +203,33 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue =>
|
|||
* @return true if the processing finished before the mailbox was empty, due to the throughput constraint
|
||||
*/
|
||||
final def processMailbox(): Boolean = {
|
||||
var nextMessage = self.dequeue
|
||||
if (nextMessage ne null) {
|
||||
val throttle = dispatcher.throughput > 0
|
||||
var processedMessages = 0
|
||||
val isDeadlineEnabled = throttle && dispatcher.throughputDeadlineTime > 0
|
||||
val started = if (isDeadlineEnabled) System.currentTimeMillis else 0
|
||||
do {
|
||||
nextMessage.invoke
|
||||
if (self.suspended.isOn)
|
||||
true
|
||||
else {
|
||||
var nextMessage = self.dequeue
|
||||
if (nextMessage ne null) {
|
||||
val throttle = dispatcher.throughput > 0
|
||||
var processedMessages = 0
|
||||
val isDeadlineEnabled = throttle && dispatcher.throughputDeadlineTime > 0
|
||||
val started = if (isDeadlineEnabled) System.currentTimeMillis else 0
|
||||
do {
|
||||
nextMessage.invoke
|
||||
|
||||
if (nextMessage.receiver.isBeingRestarted)
|
||||
return !self.isEmpty
|
||||
if (throttle) { // Will be elided when false
|
||||
processedMessages += 1
|
||||
if ((processedMessages >= dispatcher.throughput) ||
|
||||
(isDeadlineEnabled && (System.currentTimeMillis - started) >= dispatcher.throughputDeadlineTime)) // If we're throttled, break out
|
||||
return !self.isEmpty
|
||||
}
|
||||
|
||||
if (throttle) { // Will be elided when false
|
||||
processedMessages += 1
|
||||
if ((processedMessages >= dispatcher.throughput) ||
|
||||
(isDeadlineEnabled && (System.currentTimeMillis - started) >= dispatcher.throughputDeadlineTime)) // If we're throttled, break out
|
||||
return !self.isEmpty
|
||||
}
|
||||
nextMessage = self.dequeue
|
||||
} while (nextMessage ne null)
|
||||
if (self.suspended.isOn)
|
||||
return true
|
||||
|
||||
nextMessage = self.dequeue
|
||||
} while (nextMessage ne null)
|
||||
}
|
||||
false
|
||||
}
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -98,10 +98,13 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
|
|||
* @return
|
||||
*/
|
||||
private def processMailbox(mailbox: MessageQueue): Boolean = {
|
||||
if (mailbox.suspended.isOn)
|
||||
return false
|
||||
|
||||
var messageInvocation = mailbox.dequeue
|
||||
while (messageInvocation ne null) {
|
||||
messageInvocation.invoke
|
||||
if (messageInvocation.receiver.isBeingRestarted)
|
||||
if (mailbox.suspended.isOn)
|
||||
return false
|
||||
messageInvocation = mailbox.dequeue
|
||||
}
|
||||
|
|
@ -176,6 +179,17 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
|
|||
uuids.clear
|
||||
}
|
||||
|
||||
|
||||
def suspend(actorRef: ActorRef) {
|
||||
getMailbox(actorRef).suspended.switchOn
|
||||
}
|
||||
|
||||
def resume(actorRef: ActorRef) {
|
||||
val mbox = getMailbox(actorRef)
|
||||
mbox.suspended.switchOff
|
||||
executor execute mbox
|
||||
}
|
||||
|
||||
def ensureNotActive(): Unit = if (active.isOn) throw new IllegalActorStateException(
|
||||
"Can't build a new thread pool for a dispatcher that is already up and running")
|
||||
|
||||
|
|
|
|||
|
|
@ -169,6 +169,9 @@ class HawtDispatcher(val aggregate: Boolean = true, val parent: DispatchQueue =
|
|||
else new HawtDispatcherMailbox(queue)
|
||||
}
|
||||
|
||||
def suspend(actorRef: ActorRef) = mailbox(actorRef).suspend
|
||||
def resume(actorRef:ActorRef) = mailbox(actorRef).resume
|
||||
|
||||
def createTransientMailbox(actorRef: ActorRef, mailboxType: TransientMailboxType): AnyRef = null.asInstanceOf[AnyRef]
|
||||
|
||||
/**
|
||||
|
|
@ -185,6 +188,9 @@ class HawtDispatcherMailbox(val queue: DispatchQueue) {
|
|||
invocation.invoke
|
||||
}
|
||||
}
|
||||
|
||||
def suspend = queue.suspend
|
||||
def resume = queue.resume
|
||||
}
|
||||
|
||||
class AggregatingHawtDispatcherMailbox(queue:DispatchQueue) extends HawtDispatcherMailbox(queue) {
|
||||
|
|
@ -194,6 +200,9 @@ class AggregatingHawtDispatcherMailbox(queue:DispatchQueue) extends HawtDispatch
|
|||
|
||||
private def drain_source = source.getData.foreach(_.invoke)
|
||||
|
||||
override def suspend = source.suspend
|
||||
override def resume = source.resume
|
||||
|
||||
override def dispatch(invocation: MessageInvocation) {
|
||||
if (getCurrentQueue eq null) {
|
||||
// we are being call from a non hawtdispatch thread, can't aggregate
|
||||
|
|
|
|||
|
|
@ -5,13 +5,13 @@
|
|||
package se.scalablesolutions.akka.dispatch
|
||||
|
||||
import se.scalablesolutions.akka.actor.{Actor, ActorType, ActorRef, ActorInitializationException}
|
||||
import se.scalablesolutions.akka.util.{SimpleLock, Duration, HashCode, Logging}
|
||||
import se.scalablesolutions.akka.util.ReflectiveAccess.EnterpriseModule
|
||||
import se.scalablesolutions.akka.AkkaException
|
||||
|
||||
import java.util.{Queue, List}
|
||||
import java.util.concurrent._
|
||||
import concurrent.forkjoin.LinkedTransferQueue
|
||||
import se.scalablesolutions.akka.util._
|
||||
|
||||
class MessageQueueAppendFailedException(message: String) extends AkkaException(message)
|
||||
|
||||
|
|
@ -20,6 +20,7 @@ class MessageQueueAppendFailedException(message: String) extends AkkaException(m
|
|||
*/
|
||||
trait MessageQueue {
|
||||
val dispatcherLock = new SimpleLock
|
||||
val suspended = new Switch(false)
|
||||
def enqueue(handle: MessageInvocation)
|
||||
def dequeue(): MessageInvocation
|
||||
def size: Int
|
||||
|
|
|
|||
|
|
@ -79,6 +79,9 @@ trait MessageDispatcher extends MailboxFactory with Logging {
|
|||
if (canBeShutDown) shutdown // shut down in the dispatcher's references is zero
|
||||
}
|
||||
|
||||
def suspend(actorRef: ActorRef): Unit
|
||||
def resume(actorRef: ActorRef): Unit
|
||||
|
||||
def canBeShutDown: Boolean = uuids.isEmpty
|
||||
|
||||
def isShutdown: Boolean
|
||||
|
|
|
|||
|
|
@ -135,6 +135,9 @@ class Switch(startAsOn: Boolean = false) {
|
|||
def switchOff(action: => Unit): Boolean = transcend(from = true, action)
|
||||
def switchOn(action: => Unit): Boolean = transcend(from = false,action)
|
||||
|
||||
def switchOff: Boolean = switch.compareAndSet(true,false)
|
||||
def switchOn: Boolean = switch.compareAndSet(false,true)
|
||||
|
||||
def ifOnYield[T](action: => T): Option[T] = {
|
||||
if (switch.get)
|
||||
Some(action)
|
||||
|
|
|
|||
|
|
@ -221,23 +221,25 @@ class RestartStrategySpec extends JUnitSuite {
|
|||
}
|
||||
|
||||
@Test
|
||||
def slaveShouldNotRestartWithinTimeRange = {
|
||||
def slaveShouldNotRestartWithinsTimeRange = {
|
||||
|
||||
val restartLatch,stopLatch,maxNoOfRestartsLatch = new StandardLatch
|
||||
val countDownLatch = new CountDownLatch(2)
|
||||
|
||||
val boss = actorOf(new Actor{
|
||||
self.faultHandler = OneForOneStrategy(List(classOf[Throwable]), None, Some(1000))
|
||||
protected def receive = { case _ => () }
|
||||
protected def receive = {
|
||||
case m:MaximumNumberOfRestartsWithinTimeRangeReached => maxNoOfRestartsLatch.open
|
||||
}
|
||||
}).start
|
||||
|
||||
val restartLatch = new StandardLatch
|
||||
val countDownLatch = new CountDownLatch(3)
|
||||
val stopLatch = new StandardLatch
|
||||
|
||||
|
||||
val slave = actorOf(new Actor{
|
||||
|
||||
protected def receive = {
|
||||
case Ping => countDownLatch.countDown
|
||||
case Crash => throw new Exception("Crashing...")
|
||||
}
|
||||
|
||||
override def postRestart(reason: Throwable) = {
|
||||
restartLatch.open
|
||||
}
|
||||
|
|
@ -266,6 +268,8 @@ class RestartStrategySpec extends JUnitSuite {
|
|||
slave ! Crash
|
||||
assert(stopLatch.tryAwait(1, TimeUnit.SECONDS))
|
||||
|
||||
assert(maxNoOfRestartsLatch.tryAwait(1,TimeUnit.SECONDS))
|
||||
|
||||
assert(!slave.isRunning)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue