From fb326b7f01bde9024d5c29a0f812128e2e003ac9 Mon Sep 17 00:00:00 2001 From: Dale Wijnand <344610+dwijnand@users.noreply.github.com> Date: Wed, 14 Nov 2018 17:04:59 +0000 Subject: [PATCH] Cleanup BackoffSupervisor (#25898) * Dedup scheduleRestart in BackoffSupervisor * Use show type names in scope * Finish importing BehaviorInterceptor's types --- .../actor/typed/internal/Supervision.scala | 32 ++++++++----------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala index a98eebd34c..18dae2a8b8 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala @@ -8,7 +8,7 @@ package internal import java.util.concurrent.ThreadLocalRandom import akka.actor.DeadLetterSuppression -import akka.actor.typed.BehaviorInterceptor.SignalTarget +import akka.actor.typed.BehaviorInterceptor.{ PreStartTarget, ReceiveTarget, SignalTarget } import akka.actor.typed.SupervisorStrategy._ import akka.actor.typed.scaladsl.Behaviors import akka.annotation.InternalApi @@ -52,7 +52,7 @@ private abstract class AbstractSupervisor[O, I, Thr <: Throwable](strategy: Supe } } - override def aroundStart(ctx: ActorContext[O], target: BehaviorInterceptor.PreStartTarget[I]): Behavior[I] = { + override def aroundStart(ctx: ActorContext[O], target: PreStartTarget[I]): Behavior[I] = { try { target.start(ctx) } catch handleExceptionOnStart(ctx) @@ -71,8 +71,8 @@ private abstract class AbstractSupervisor[O, I, Thr <: Throwable](strategy: Supe } protected def handleExceptionOnStart(ctx: ActorContext[O]): Catcher[Behavior[I]] - protected def handleSignalException(ctx: ActorContext[O], target: BehaviorInterceptor.SignalTarget[I]): Catcher[Behavior[I]] - protected def handleReceiveException(ctx: ActorContext[O], target: BehaviorInterceptor.ReceiveTarget[I]): Catcher[Behavior[I]] + protected def handleSignalException(ctx: ActorContext[O], target: SignalTarget[I]): Catcher[Behavior[I]] + protected def handleReceiveException(ctx: ActorContext[O], target: ReceiveTarget[I]): Catcher[Behavior[I]] } /** @@ -80,7 +80,7 @@ private abstract class AbstractSupervisor[O, I, Thr <: Throwable](strategy: Supe */ private abstract class SimpleSupervisor[T, Thr <: Throwable: ClassTag](ss: SupervisorStrategy) extends AbstractSupervisor[T, T, Thr](ss) { - override def aroundReceive(ctx: ActorContext[T], msg: T, target: BehaviorInterceptor.ReceiveTarget[T]): Behavior[T] = { + override def aroundReceive(ctx: ActorContext[T], msg: T, target: ReceiveTarget[T]): Behavior[T] = { try { target(ctx, msg) } catch handleReceiveException(ctx, target) @@ -94,9 +94,9 @@ private abstract class SimpleSupervisor[T, Thr <: Throwable: ClassTag](ss: Super // convenience if target not required to handle exception protected def handleExceptionOnStart(ctx: ActorContext[T]): Catcher[Behavior[T]] = handleException(ctx) - protected def handleSignalException(ctx: ActorContext[T], target: BehaviorInterceptor.SignalTarget[T]): Catcher[Behavior[T]] = + protected def handleSignalException(ctx: ActorContext[T], target: SignalTarget[T]): Catcher[Behavior[T]] = handleException(ctx) - protected def handleReceiveException(ctx: ActorContext[T], target: BehaviorInterceptor.ReceiveTarget[T]): Catcher[Behavior[T]] = + protected def handleReceiveException(ctx: ActorContext[T], target: ReceiveTarget[T]): Catcher[Behavior[T]] = handleException(ctx) } @@ -126,7 +126,7 @@ private class RestartSupervisor[T, Thr <: Throwable](initial: Behavior[T], strat case OptionVal.Some(d) ⇒ d.hasTimeLeft } - override def aroundStart(ctx: ActorContext[T], target: BehaviorInterceptor.PreStartTarget[T]): Behavior[T] = { + override def aroundStart(ctx: ActorContext[T], target: PreStartTarget[T]): Behavior[T] = { try { target.start(ctx) } catch { @@ -169,7 +169,7 @@ private class RestartSupervisor[T, Thr <: Throwable](initial: Behavior[T], strat override protected def handleSignalException(ctx: ActorContext[T], target: SignalTarget[T]): Catcher[Behavior[T]] = { handleException(ctx, () ⇒ target(ctx, PreRestart)) } - override protected def handleReceiveException(ctx: ActorContext[T], target: BehaviorInterceptor.ReceiveTarget[T]): Catcher[Behavior[T]] = { + override protected def handleReceiveException(ctx: ActorContext[T], target: ReceiveTarget[T]): Catcher[Behavior[T]] = { handleException(ctx, () ⇒ target.signalRestart(ctx)) } } @@ -191,7 +191,7 @@ private class BackoffSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior } } - override def aroundReceive(ctx: ActorContext[AnyRef], msg: AnyRef, target: BehaviorInterceptor.ReceiveTarget[T]): Behavior[T] = { + override def aroundReceive(ctx: ActorContext[AnyRef], msg: AnyRef, target: ReceiveTarget[T]): Behavior[T] = { try { msg match { case ScheduledRestart ⇒ @@ -203,13 +203,7 @@ private class BackoffSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior case NonFatal(ex: Thr) if b.maxRestarts > 0 && restartCount >= b.maxRestarts ⇒ log(ctx, ex) Behaviors.stopped - case NonFatal(ex: Thr) ⇒ - log(ctx, ex) - val restartDelay = BackoffSupervisor.calculateDelay(restartCount, b.minBackoff, b.maxBackoff, b.randomFactor) - ctx.asScala.scheduleOnce(restartDelay, ctx.asScala.self, ScheduledRestart) - restartCount += 1 - blackhole = true - Behaviors.empty + case NonFatal(ex: Thr) ⇒ scheduleRestart(ctx, ex) } case ResetRestartCount(current) ⇒ if (current == restartCount) { @@ -233,7 +227,7 @@ private class BackoffSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior scheduleRestart(ctx, t) } - protected def handleReceiveException(ctx: akka.actor.typed.ActorContext[AnyRef], target: BehaviorInterceptor.ReceiveTarget[T]): util.control.Exception.Catcher[akka.actor.typed.Behavior[T]] = { + protected def handleReceiveException(ctx: ActorContext[AnyRef], target: ReceiveTarget[T]): Catcher[Behavior[T]] = { case NonFatal(t: Thr) ⇒ try { target.signalRestart(ctx) @@ -243,7 +237,7 @@ private class BackoffSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior scheduleRestart(ctx, t) } - protected def handleSignalException(ctx: ActorContext[AnyRef], target: BehaviorInterceptor.SignalTarget[T]): Catcher[akka.actor.typed.Behavior[T]] = { + protected def handleSignalException(ctx: ActorContext[AnyRef], target: SignalTarget[T]): Catcher[Behavior[T]] = { case NonFatal(t: Thr) ⇒ try { target(ctx, PreRestart)