Cleanup BackoffSupervisor (#25898)
* Dedup scheduleRestart in BackoffSupervisor * Use show type names in scope * Finish importing BehaviorInterceptor's types
This commit is contained in:
parent
1c8577b107
commit
fb326b7f01
1 changed files with 13 additions and 19 deletions
|
|
@ -8,7 +8,7 @@ package internal
|
||||||
import java.util.concurrent.ThreadLocalRandom
|
import java.util.concurrent.ThreadLocalRandom
|
||||||
|
|
||||||
import akka.actor.DeadLetterSuppression
|
import akka.actor.DeadLetterSuppression
|
||||||
import akka.actor.typed.BehaviorInterceptor.SignalTarget
|
import akka.actor.typed.BehaviorInterceptor.{ PreStartTarget, ReceiveTarget, SignalTarget }
|
||||||
import akka.actor.typed.SupervisorStrategy._
|
import akka.actor.typed.SupervisorStrategy._
|
||||||
import akka.actor.typed.scaladsl.Behaviors
|
import akka.actor.typed.scaladsl.Behaviors
|
||||||
import akka.annotation.InternalApi
|
import akka.annotation.InternalApi
|
||||||
|
|
@ -52,7 +52,7 @@ private abstract class AbstractSupervisor[O, I, Thr <: Throwable](strategy: Supe
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override def aroundStart(ctx: ActorContext[O], target: BehaviorInterceptor.PreStartTarget[I]): Behavior[I] = {
|
override def aroundStart(ctx: ActorContext[O], target: PreStartTarget[I]): Behavior[I] = {
|
||||||
try {
|
try {
|
||||||
target.start(ctx)
|
target.start(ctx)
|
||||||
} catch handleExceptionOnStart(ctx)
|
} catch handleExceptionOnStart(ctx)
|
||||||
|
|
@ -71,8 +71,8 @@ private abstract class AbstractSupervisor[O, I, Thr <: Throwable](strategy: Supe
|
||||||
}
|
}
|
||||||
|
|
||||||
protected def handleExceptionOnStart(ctx: ActorContext[O]): Catcher[Behavior[I]]
|
protected def handleExceptionOnStart(ctx: ActorContext[O]): Catcher[Behavior[I]]
|
||||||
protected def handleSignalException(ctx: ActorContext[O], target: BehaviorInterceptor.SignalTarget[I]): Catcher[Behavior[I]]
|
protected def handleSignalException(ctx: ActorContext[O], target: SignalTarget[I]): Catcher[Behavior[I]]
|
||||||
protected def handleReceiveException(ctx: ActorContext[O], target: BehaviorInterceptor.ReceiveTarget[I]): Catcher[Behavior[I]]
|
protected def handleReceiveException(ctx: ActorContext[O], target: ReceiveTarget[I]): Catcher[Behavior[I]]
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -80,7 +80,7 @@ private abstract class AbstractSupervisor[O, I, Thr <: Throwable](strategy: Supe
|
||||||
*/
|
*/
|
||||||
private abstract class SimpleSupervisor[T, Thr <: Throwable: ClassTag](ss: SupervisorStrategy) extends AbstractSupervisor[T, T, Thr](ss) {
|
private abstract class SimpleSupervisor[T, Thr <: Throwable: ClassTag](ss: SupervisorStrategy) extends AbstractSupervisor[T, T, Thr](ss) {
|
||||||
|
|
||||||
override def aroundReceive(ctx: ActorContext[T], msg: T, target: BehaviorInterceptor.ReceiveTarget[T]): Behavior[T] = {
|
override def aroundReceive(ctx: ActorContext[T], msg: T, target: ReceiveTarget[T]): Behavior[T] = {
|
||||||
try {
|
try {
|
||||||
target(ctx, msg)
|
target(ctx, msg)
|
||||||
} catch handleReceiveException(ctx, target)
|
} catch handleReceiveException(ctx, target)
|
||||||
|
|
@ -94,9 +94,9 @@ private abstract class SimpleSupervisor[T, Thr <: Throwable: ClassTag](ss: Super
|
||||||
// convenience if target not required to handle exception
|
// convenience if target not required to handle exception
|
||||||
protected def handleExceptionOnStart(ctx: ActorContext[T]): Catcher[Behavior[T]] =
|
protected def handleExceptionOnStart(ctx: ActorContext[T]): Catcher[Behavior[T]] =
|
||||||
handleException(ctx)
|
handleException(ctx)
|
||||||
protected def handleSignalException(ctx: ActorContext[T], target: BehaviorInterceptor.SignalTarget[T]): Catcher[Behavior[T]] =
|
protected def handleSignalException(ctx: ActorContext[T], target: SignalTarget[T]): Catcher[Behavior[T]] =
|
||||||
handleException(ctx)
|
handleException(ctx)
|
||||||
protected def handleReceiveException(ctx: ActorContext[T], target: BehaviorInterceptor.ReceiveTarget[T]): Catcher[Behavior[T]] =
|
protected def handleReceiveException(ctx: ActorContext[T], target: ReceiveTarget[T]): Catcher[Behavior[T]] =
|
||||||
handleException(ctx)
|
handleException(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -126,7 +126,7 @@ private class RestartSupervisor[T, Thr <: Throwable](initial: Behavior[T], strat
|
||||||
case OptionVal.Some(d) ⇒ d.hasTimeLeft
|
case OptionVal.Some(d) ⇒ d.hasTimeLeft
|
||||||
}
|
}
|
||||||
|
|
||||||
override def aroundStart(ctx: ActorContext[T], target: BehaviorInterceptor.PreStartTarget[T]): Behavior[T] = {
|
override def aroundStart(ctx: ActorContext[T], target: PreStartTarget[T]): Behavior[T] = {
|
||||||
try {
|
try {
|
||||||
target.start(ctx)
|
target.start(ctx)
|
||||||
} catch {
|
} catch {
|
||||||
|
|
@ -169,7 +169,7 @@ private class RestartSupervisor[T, Thr <: Throwable](initial: Behavior[T], strat
|
||||||
override protected def handleSignalException(ctx: ActorContext[T], target: SignalTarget[T]): Catcher[Behavior[T]] = {
|
override protected def handleSignalException(ctx: ActorContext[T], target: SignalTarget[T]): Catcher[Behavior[T]] = {
|
||||||
handleException(ctx, () ⇒ target(ctx, PreRestart))
|
handleException(ctx, () ⇒ target(ctx, PreRestart))
|
||||||
}
|
}
|
||||||
override protected def handleReceiveException(ctx: ActorContext[T], target: BehaviorInterceptor.ReceiveTarget[T]): Catcher[Behavior[T]] = {
|
override protected def handleReceiveException(ctx: ActorContext[T], target: ReceiveTarget[T]): Catcher[Behavior[T]] = {
|
||||||
handleException(ctx, () ⇒ target.signalRestart(ctx))
|
handleException(ctx, () ⇒ target.signalRestart(ctx))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -191,7 +191,7 @@ private class BackoffSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override def aroundReceive(ctx: ActorContext[AnyRef], msg: AnyRef, target: BehaviorInterceptor.ReceiveTarget[T]): Behavior[T] = {
|
override def aroundReceive(ctx: ActorContext[AnyRef], msg: AnyRef, target: ReceiveTarget[T]): Behavior[T] = {
|
||||||
try {
|
try {
|
||||||
msg match {
|
msg match {
|
||||||
case ScheduledRestart ⇒
|
case ScheduledRestart ⇒
|
||||||
|
|
@ -203,13 +203,7 @@ private class BackoffSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior
|
||||||
case NonFatal(ex: Thr) if b.maxRestarts > 0 && restartCount >= b.maxRestarts ⇒
|
case NonFatal(ex: Thr) if b.maxRestarts > 0 && restartCount >= b.maxRestarts ⇒
|
||||||
log(ctx, ex)
|
log(ctx, ex)
|
||||||
Behaviors.stopped
|
Behaviors.stopped
|
||||||
case NonFatal(ex: Thr) ⇒
|
case NonFatal(ex: Thr) ⇒ scheduleRestart(ctx, ex)
|
||||||
log(ctx, ex)
|
|
||||||
val restartDelay = BackoffSupervisor.calculateDelay(restartCount, b.minBackoff, b.maxBackoff, b.randomFactor)
|
|
||||||
ctx.asScala.scheduleOnce(restartDelay, ctx.asScala.self, ScheduledRestart)
|
|
||||||
restartCount += 1
|
|
||||||
blackhole = true
|
|
||||||
Behaviors.empty
|
|
||||||
}
|
}
|
||||||
case ResetRestartCount(current) ⇒
|
case ResetRestartCount(current) ⇒
|
||||||
if (current == restartCount) {
|
if (current == restartCount) {
|
||||||
|
|
@ -233,7 +227,7 @@ private class BackoffSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior
|
||||||
scheduleRestart(ctx, t)
|
scheduleRestart(ctx, t)
|
||||||
}
|
}
|
||||||
|
|
||||||
protected def handleReceiveException(ctx: akka.actor.typed.ActorContext[AnyRef], target: BehaviorInterceptor.ReceiveTarget[T]): util.control.Exception.Catcher[akka.actor.typed.Behavior[T]] = {
|
protected def handleReceiveException(ctx: ActorContext[AnyRef], target: ReceiveTarget[T]): Catcher[Behavior[T]] = {
|
||||||
case NonFatal(t: Thr) ⇒
|
case NonFatal(t: Thr) ⇒
|
||||||
try {
|
try {
|
||||||
target.signalRestart(ctx)
|
target.signalRestart(ctx)
|
||||||
|
|
@ -243,7 +237,7 @@ private class BackoffSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior
|
||||||
scheduleRestart(ctx, t)
|
scheduleRestart(ctx, t)
|
||||||
}
|
}
|
||||||
|
|
||||||
protected def handleSignalException(ctx: ActorContext[AnyRef], target: BehaviorInterceptor.SignalTarget[T]): Catcher[akka.actor.typed.Behavior[T]] = {
|
protected def handleSignalException(ctx: ActorContext[AnyRef], target: SignalTarget[T]): Catcher[Behavior[T]] = {
|
||||||
case NonFatal(t: Thr) ⇒
|
case NonFatal(t: Thr) ⇒
|
||||||
try {
|
try {
|
||||||
target(ctx, PreRestart)
|
target(ctx, PreRestart)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue