Merge pull request #26092 from dwijnand/give-BackoffSupervisor-an-O-outer-msg-type-param
Give BackoffSupervisor an "O" outer msg type param
This commit is contained in:
commit
3916ef382e
1 changed files with 12 additions and 12 deletions
|
|
@ -33,7 +33,7 @@ import scala.util.control.NonFatal
|
|||
case r: Stop ⇒
|
||||
Behaviors.intercept[T, T](new StopSupervisor(initialBehavior, r))(initialBehavior)
|
||||
case r: Backoff ⇒
|
||||
Behaviors.intercept[AnyRef, T](new BackoffSupervisor(initialBehavior, r))(initialBehavior).asInstanceOf[Behavior[T]]
|
||||
Behaviors.intercept[T, T](new BackoffSupervisor(initialBehavior, r))(initialBehavior)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -175,14 +175,14 @@ private class RestartSupervisor[T, Thr <: Throwable](initial: Behavior[T], strat
|
|||
}
|
||||
}
|
||||
|
||||
private class BackoffSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior[T], b: Backoff) extends AbstractSupervisor[AnyRef, T, Thr](b) {
|
||||
private class BackoffSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behavior[T], b: Backoff) extends AbstractSupervisor[O, T, Thr](b) {
|
||||
|
||||
import BackoffSupervisor._
|
||||
|
||||
var blackhole = false
|
||||
var restartCount: Int = 0
|
||||
|
||||
override def aroundSignal(ctx: ActorContext[AnyRef], signal: Signal, target: SignalTarget[T]): Behavior[T] = {
|
||||
override def aroundSignal(ctx: ActorContext[O], signal: Signal, target: SignalTarget[T]): Behavior[T] = {
|
||||
if (blackhole) {
|
||||
import akka.actor.typed.scaladsl.adapter._
|
||||
ctx.asScala.system.toUntyped.eventStream.publish(Dropped(signal, ctx.asScala.self))
|
||||
|
|
@ -192,12 +192,12 @@ private class BackoffSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior
|
|||
}
|
||||
}
|
||||
|
||||
override def aroundReceive(ctx: ActorContext[AnyRef], msg: AnyRef, target: ReceiveTarget[T]): Behavior[T] = {
|
||||
override def aroundReceive(ctx: ActorContext[O], msg: O, target: ReceiveTarget[T]): Behavior[T] = {
|
||||
try {
|
||||
msg match {
|
||||
msg.asInstanceOf[Any] match {
|
||||
case ScheduledRestart ⇒
|
||||
blackhole = false
|
||||
ctx.asScala.scheduleOnce(b.resetBackoffAfter, ctx.asScala.self, ResetRestartCount(restartCount))
|
||||
ctx.asScala.scheduleOnce(b.resetBackoffAfter, ctx.asScala.self.unsafeUpcast[Any], ResetRestartCount(restartCount))
|
||||
try {
|
||||
Behavior.validateAsInitial(Behavior.start(initial, ctx.asInstanceOf[ActorContext[T]]))
|
||||
} catch {
|
||||
|
|
@ -214,7 +214,7 @@ private class BackoffSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior
|
|||
case _ ⇒
|
||||
if (blackhole) {
|
||||
import akka.actor.typed.scaladsl.adapter._
|
||||
ctx.asScala.system.toUntyped.eventStream.publish(Dropped(msg, ctx.asScala.self))
|
||||
ctx.asScala.system.toUntyped.eventStream.publish(Dropped(msg, ctx.asScala.self.unsafeUpcast[Any]))
|
||||
Behaviors.same
|
||||
} else {
|
||||
target(ctx, msg.asInstanceOf[T])
|
||||
|
|
@ -223,12 +223,12 @@ private class BackoffSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior
|
|||
} catch handleReceiveException(ctx, target)
|
||||
}
|
||||
|
||||
protected def handleExceptionOnStart(ctx: ActorContext[AnyRef]): Catcher[Behavior[T]] = {
|
||||
protected def handleExceptionOnStart(ctx: ActorContext[O]): Catcher[Behavior[T]] = {
|
||||
case NonFatal(t: Thr) ⇒
|
||||
scheduleRestart(ctx, t)
|
||||
}
|
||||
|
||||
protected def handleReceiveException(ctx: ActorContext[AnyRef], target: ReceiveTarget[T]): Catcher[Behavior[T]] = {
|
||||
protected def handleReceiveException(ctx: ActorContext[O], target: ReceiveTarget[T]): Catcher[Behavior[T]] = {
|
||||
case NonFatal(t: Thr) ⇒
|
||||
try {
|
||||
target.signalRestart(ctx)
|
||||
|
|
@ -238,7 +238,7 @@ private class BackoffSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior
|
|||
scheduleRestart(ctx, t)
|
||||
}
|
||||
|
||||
protected def handleSignalException(ctx: ActorContext[AnyRef], target: SignalTarget[T]): Catcher[Behavior[T]] = {
|
||||
protected def handleSignalException(ctx: ActorContext[O], target: SignalTarget[T]): Catcher[Behavior[T]] = {
|
||||
case NonFatal(t: Thr) ⇒
|
||||
try {
|
||||
target(ctx, PreRestart)
|
||||
|
|
@ -248,10 +248,10 @@ private class BackoffSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior
|
|||
scheduleRestart(ctx, t)
|
||||
}
|
||||
|
||||
private def scheduleRestart(ctx: ActorContext[AnyRef], reason: Throwable): Behavior[T] = {
|
||||
private def scheduleRestart(ctx: ActorContext[O], reason: Throwable): Behavior[T] = {
|
||||
log(ctx, reason)
|
||||
val restartDelay = calculateDelay(restartCount, b.minBackoff, b.maxBackoff, b.randomFactor)
|
||||
ctx.asScala.scheduleOnce(restartDelay, ctx.asScala.self, ScheduledRestart)
|
||||
ctx.asScala.scheduleOnce(restartDelay, ctx.asScala.self.unsafeUpcast[Any], ScheduledRestart)
|
||||
restartCount += 1
|
||||
blackhole = true
|
||||
Behaviors.empty
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue