Rewrote restart code, resetting restarts outside tiem window etc
This commit is contained in:
parent
7fd6ba8cbf
commit
a001552cfe
1 changed files with 32 additions and 32 deletions
|
|
@ -21,13 +21,13 @@ import org.multiverse.api.exceptions.DeadTransactionException
|
|||
|
||||
import java.net.InetSocketAddress
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
import java.util.concurrent.{ ScheduledFuture, ConcurrentHashMap, TimeUnit }
|
||||
import java.util.{ Map => JMap }
|
||||
import java.lang.reflect.Field
|
||||
|
||||
import scala.reflect.BeanProperty
|
||||
import scala.collection.immutable.Stack
|
||||
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
|
||||
|
||||
private[akka] object ActorRefInternals {
|
||||
|
||||
|
|
@ -644,8 +644,7 @@ class LocalActorRef private[akka] (
|
|||
private[akka] lazy val _linkedActors = new ConcurrentHashMap[Uuid, ActorRef]
|
||||
@volatile
|
||||
private[akka] var _supervisor: Option[ActorRef] = None
|
||||
@volatile
|
||||
private var maxNrOfRetriesCount: Int = 0
|
||||
private val maxNrOfRetriesCount = new AtomicInteger(0)
|
||||
@volatile
|
||||
private var restartsWithinTimeRangeTimestamp: Long = 0L
|
||||
@volatile
|
||||
|
|
@ -1037,26 +1036,35 @@ class LocalActorRef private[akka] (
|
|||
}
|
||||
}
|
||||
|
||||
protected[akka] def canRestart(maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Boolean = {
|
||||
|
||||
log.debug("CanRestart: [%s max / %s current] [%s < %s]",maxNrOfRetries, maxNrOfRetriesCount, System.currentTimeMillis - restartsWithinTimeRangeTimestamp, withinTimeRange)
|
||||
|
||||
if (maxNrOfRetries.isEmpty && withinTimeRange.isEmpty) { //Immortal
|
||||
true
|
||||
}
|
||||
else if (withinTimeRange.isEmpty) { // restrict number of restarts
|
||||
maxNrOfRetriesCount < maxNrOfRetries.get
|
||||
} else { // cannot restart more than N within M timerange
|
||||
val msElapsed = System.currentTimeMillis - restartsWithinTimeRangeTimestamp
|
||||
|
||||
msElapsed < withinTimeRange.get && (maxNrOfRetries.isEmpty || maxNrOfRetries.get > maxNrOfRetriesCount)
|
||||
}
|
||||
}
|
||||
|
||||
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = {
|
||||
if (maxNrOfRetriesCount == 0) restartsWithinTimeRangeTimestamp = System.currentTimeMillis
|
||||
val isUnrestartable = if (maxNrOfRetries.isEmpty && withinTimeRange.isEmpty) { //Immortal
|
||||
false
|
||||
}
|
||||
else if (withinTimeRange.isEmpty) { // restrict number of restarts
|
||||
maxNrOfRetriesCount.incrementAndGet > maxNrOfRetries.get
|
||||
} else { // cannot restart more than N within M timerange
|
||||
val windowStart = restartsWithinTimeRangeTimestamp
|
||||
val now = System.currentTimeMillis
|
||||
|
||||
if (!canRestart(maxNrOfRetries, withinTimeRange)) {
|
||||
//We are within the time window if it isn't the first restart, or if the window hasn't closed
|
||||
val insideWindow = if (windowStart == 0)
|
||||
false
|
||||
else
|
||||
(now - windowStart) <= withinTimeRange.get
|
||||
|
||||
//The actor is dead if it dies X times within the window of restart
|
||||
val unrestartable = insideWindow && maxNrOfRetriesCount.incrementAndGet > maxNrOfRetries.getOrElse(1)
|
||||
|
||||
if (windowStart == 0 || !insideWindow) //(Re-)set the start of the window
|
||||
restartsWithinTimeRangeTimestamp = now
|
||||
|
||||
if (windowStart != 0 && !insideWindow) //Reset number of restarts if window has expired
|
||||
maxNrOfRetriesCount.set(1)
|
||||
|
||||
unrestartable
|
||||
}
|
||||
|
||||
if (isUnrestartable) {
|
||||
Actor.log.warning(
|
||||
"Maximum number of restarts [%s] within time range [%s] reached." +
|
||||
"\n\tWill *not* restart actor [%s] anymore." +
|
||||
|
|
@ -1074,14 +1082,13 @@ class LocalActorRef private[akka] (
|
|||
|
||||
stop
|
||||
} else {
|
||||
|
||||
_status = ActorRefInternals.BEING_RESTARTED
|
||||
val failedActor = actorInstance.get
|
||||
|
||||
guard.withGuard {
|
||||
_status = ActorRefInternals.BEING_RESTARTED
|
||||
lifeCycle match {
|
||||
case Temporary => shutDownTemporaryActor(this)
|
||||
case _ =>
|
||||
val failedActor = actorInstance.get
|
||||
|
||||
// either permanent or none where default is permanent
|
||||
Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id)
|
||||
Actor.log.debug("Restarting linked actors for actor [%s].", id)
|
||||
|
|
@ -1093,13 +1100,6 @@ class LocalActorRef private[akka] (
|
|||
else
|
||||
restartActor(failedActor, reason)
|
||||
|
||||
if (maxNrOfRetries.isEmpty)
|
||||
restartsWithinTimeRangeTimestamp = System.currentTimeMillis
|
||||
else
|
||||
if (maxNrOfRetriesCount != 0 && maxNrOfRetriesCount % maxNrOfRetries.get == 0)
|
||||
restartsWithinTimeRangeTimestamp = System.currentTimeMillis
|
||||
|
||||
maxNrOfRetriesCount += 1
|
||||
_status = ActorRefInternals.RUNNING
|
||||
dispatcher.resume(this)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue