- better restart strategy test
- make sure actor stops when restart strategy maxes out - nicer patternmathing on lifecycle making sure lifecycle.get is never called anymore (sometimes gave nullpointer exceptions) - also applying the defaults in a nicer way
This commit is contained in:
parent
7288e832b3
commit
e91dc5cd45
2 changed files with 44 additions and 48 deletions
|
|
@ -991,7 +991,6 @@ sealed class LocalActorRef private[akka](
|
|||
"\n\tto non-empty list of exception classes - can't proceed " + toString)
|
||||
}
|
||||
} else {
|
||||
if (lifeCycle.isEmpty) lifeCycle = Some(LifeCycle(Permanent)) // when passing on make sure we have a lifecycle
|
||||
notifySupervisorWithMessage(Exit(this, reason)) // if 'trapExit' is not defined then pass the Exit on
|
||||
}
|
||||
}
|
||||
|
|
@ -1004,6 +1003,7 @@ sealed class LocalActorRef private[akka](
|
|||
val restartingHasExpired = (System.currentTimeMillis - restartsWithinTimeRangeTimestamp) > withinTimeRange
|
||||
|
||||
if (tooManyRestarts || restartingHasExpired) {
|
||||
stop
|
||||
val notification = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason)
|
||||
Actor.log.warning(
|
||||
"Maximum number of restarts [%s] within time range [%s] reached." +
|
||||
|
|
@ -1018,30 +1018,27 @@ sealed class LocalActorRef private[akka](
|
|||
"No message handler defined for system message [MaximumNumberOfRestartsWithinTimeRangeReached]" +
|
||||
"\n\tCan't send the message to the supervisor [%s].", sup)
|
||||
}
|
||||
} else {
|
||||
} else {
|
||||
_isBeingRestarted = true
|
||||
val failedActor = actorInstance.get
|
||||
guard.withGuard {
|
||||
lifeCycle.get match {
|
||||
case LifeCycle(scope, _, _) => {
|
||||
scope match {
|
||||
case Permanent =>
|
||||
Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id)
|
||||
restartLinkedActors(reason, maxNrOfRetries, withinTimeRange)
|
||||
Actor.log.debug("Restarting linked actors for actor [%s].", id)
|
||||
Actor.log.debug("Invoking 'preRestart' for failed actor instance [%s].", id)
|
||||
failedActor.preRestart(reason)
|
||||
nullOutActorRefReferencesFor(failedActor)
|
||||
val freshActor = newActor
|
||||
freshActor.init
|
||||
freshActor.initTransactionalState
|
||||
actorInstance.set(freshActor)
|
||||
Actor.log.debug("Invoking 'postRestart' for new actor instance [%s].", id)
|
||||
freshActor.postRestart(reason)
|
||||
_isBeingRestarted = false
|
||||
case Temporary => shutDownTemporaryActor(this)
|
||||
}
|
||||
}
|
||||
lifeCycle match {
|
||||
case Some(LifeCycle(Temporary, _, _)) => shutDownTemporaryActor(this)
|
||||
case _ =>
|
||||
// eith permanent or none where default is permanent
|
||||
Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id)
|
||||
Actor.log.debug("Restarting linked actors for actor [%s].", id)
|
||||
restartLinkedActors(reason, maxNrOfRetries, withinTimeRange)
|
||||
Actor.log.debug("Invoking 'preRestart' for failed actor instance [%s].", id)
|
||||
failedActor.preRestart(reason)
|
||||
nullOutActorRefReferencesFor(failedActor)
|
||||
val freshActor = newActor
|
||||
freshActor.init
|
||||
freshActor.initTransactionalState
|
||||
actorInstance.set(freshActor)
|
||||
Actor.log.debug("Invoking 'postRestart' for new actor instance [%s].", id)
|
||||
freshActor.postRestart(reason)
|
||||
_isBeingRestarted = false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1049,14 +1046,9 @@ sealed class LocalActorRef private[akka](
|
|||
|
||||
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int) = {
|
||||
linkedActorsAsList.foreach { actorRef =>
|
||||
if (actorRef.lifeCycle.isEmpty) actorRef.lifeCycle = Some(LifeCycle(Permanent))
|
||||
actorRef.lifeCycle.get match {
|
||||
case LifeCycle(scope, _, _) => {
|
||||
scope match {
|
||||
case Permanent => actorRef.restart(reason, maxNrOfRetries, withinTimeRange)
|
||||
case Temporary => shutDownTemporaryActor(actorRef)
|
||||
}
|
||||
}
|
||||
actorRef.lifeCycle match {
|
||||
case Some(LifeCycle(Temporary, _, _)) => shutDownTemporaryActor(actorRef)
|
||||
case _ => actorRef.restart(reason, maxNrOfRetries, withinTimeRange)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -28,25 +28,26 @@ class RestartStrategySpec extends JUnitSuite {
|
|||
}).start
|
||||
|
||||
val restartLatch = new StandardLatch
|
||||
val firstCountDown = new CountDownLatch(2)
|
||||
val secondCountDown = new CountDownLatch(2)
|
||||
val secondRestartLatch = new StandardLatch
|
||||
val countDownLatch = new CountDownLatch(2)
|
||||
|
||||
|
||||
val slave = actorOf(new Actor{
|
||||
self.lifeCycle = Some(LifeCycle(Permanent))
|
||||
// self.lifeCycle = Some(LifeCycle(Permanent))
|
||||
|
||||
protected def receive = {
|
||||
case Ping => {
|
||||
log.info("png")
|
||||
if (firstCountDown.getCount > 0) {
|
||||
firstCountDown.countDown
|
||||
} else {
|
||||
secondCountDown.countDown
|
||||
}
|
||||
}
|
||||
case Ping => countDownLatch.countDown
|
||||
case Crash => throw new Exception("Crashing...")
|
||||
}
|
||||
override def postRestart(reason: Throwable) = restartLatch.open
|
||||
override def postRestart(reason: Throwable) = {
|
||||
restartLatch.open
|
||||
}
|
||||
|
||||
override def shutdown = {
|
||||
if (restartLatch.isOpen) {
|
||||
secondRestartLatch.open
|
||||
}
|
||||
}
|
||||
})
|
||||
boss.startLink(slave)
|
||||
|
||||
|
|
@ -56,16 +57,19 @@ class RestartStrategySpec extends JUnitSuite {
|
|||
|
||||
// test restart and post restart ping
|
||||
assert(restartLatch.tryAwait(1, TimeUnit.SECONDS))
|
||||
assert(firstCountDown.await(1, TimeUnit.SECONDS))
|
||||
assert(countDownLatch.await(1, TimeUnit.SECONDS))
|
||||
|
||||
// now crash again... should not restart
|
||||
slave ! Crash
|
||||
|
||||
slave ! Ping // this should fail
|
||||
slave ! Ping // this should fail
|
||||
slave ! Ping // this should fail
|
||||
slave ! Ping // this should fail
|
||||
assert(secondCountDown.await(2, TimeUnit.SECONDS) == false) // should not hold
|
||||
assert(secondRestartLatch.tryAwait(1, TimeUnit.SECONDS))
|
||||
val exceptionLatch = new StandardLatch
|
||||
try {
|
||||
slave ! Ping // this should fail
|
||||
} catch {
|
||||
case e => exceptionLatch.open // expected here
|
||||
}
|
||||
assert(exceptionLatch.tryAwait(1, TimeUnit.SECONDS))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue