parent
c981ba774f
commit
91f666f2b7
16 changed files with 200 additions and 133 deletions
|
|
@ -62,7 +62,7 @@ public class ActorCompile {
|
|||
});
|
||||
Behavior<MyMsg> actor9 = widened(actor7, pf -> pf.match(MyMsgA.class, x -> x));
|
||||
Behavior<MyMsg> actor10 =
|
||||
Behaviors.receive((context, message) -> stopped(actor4), (context, signal) -> same());
|
||||
Behaviors.receive((context, message) -> stopped(() -> {}), (context, signal) -> same());
|
||||
|
||||
ActorSystem<MyMsg> system = ActorSystem.create(actor1, "Sys");
|
||||
|
||||
|
|
|
|||
|
|
@ -62,11 +62,9 @@ public class GracefulStopDocTest {
|
|||
// perform graceful stop, executing cleanup before final system termination
|
||||
// behavior executing cleanup is passed as a parameter to Actor.stopped
|
||||
return Behaviors.stopped(
|
||||
Behaviors.receiveSignal(
|
||||
(_ctx, PostStop) -> {
|
||||
() -> {
|
||||
context.getSystem().log().info("Cleanup!");
|
||||
return Behaviors.same();
|
||||
}));
|
||||
});
|
||||
})
|
||||
.onSignal(
|
||||
PostStop.class,
|
||||
|
|
|
|||
|
|
@ -272,7 +272,7 @@ object BehaviorSpec {
|
|||
"must stop" in {
|
||||
val Setup(testkit, _, aux) = mkCtx()
|
||||
testkit.run(Stop)
|
||||
testkit.currentBehavior should be(Behavior.StoppedBehavior)
|
||||
Behavior.isAlive(testkit.currentBehavior) should be(false)
|
||||
checkAux(Stop, aux)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -329,6 +329,37 @@ class InterceptSpec extends ScalaTestWithActorTestKit("""
|
|||
probe.expectMessageType[B]
|
||||
}
|
||||
|
||||
"intercept PostStop" in {
|
||||
val probe = TestProbe[String]()
|
||||
val postStopInterceptor = new BehaviorInterceptor[String, String] {
|
||||
def aroundReceive(ctx: TypedActorContext[String],
|
||||
msg: String,
|
||||
target: ReceiveTarget[String]): Behavior[String] = {
|
||||
target(ctx, msg)
|
||||
}
|
||||
def aroundSignal(ctx: TypedActorContext[String],
|
||||
signal: Signal,
|
||||
target: SignalTarget[String]): Behavior[String] = {
|
||||
signal match {
|
||||
case PostStop =>
|
||||
probe.ref ! "interceptor-post-stop"
|
||||
}
|
||||
target(ctx, signal)
|
||||
}
|
||||
}
|
||||
|
||||
val ref = spawn(Behaviors.intercept(postStopInterceptor)(Behaviors.receiveMessage[String] { _ =>
|
||||
Behaviors.stopped { () =>
|
||||
probe.ref ! "callback-post-stop"
|
||||
}
|
||||
}))
|
||||
|
||||
ref ! "stop"
|
||||
probe.awaitAssert {
|
||||
probe.expectMessage("interceptor-post-stop") // previous behavior when stopping get the signal
|
||||
probe.expectMessage("callback-post-stop")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,6 +10,8 @@ import java.util.concurrent.atomic.AtomicInteger
|
|||
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.control.NoStackTrace
|
||||
|
||||
import akka.actor.DeadLetter
|
||||
import akka.actor.testkit.typed.scaladsl._
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.actor.typed.scaladsl.TimerScheduler
|
||||
|
|
@ -75,10 +77,10 @@ class TimerSpec extends ScalaTestWithActorTestKit("""
|
|||
}
|
||||
}
|
||||
.receiveSignal {
|
||||
case (context, PreRestart) =>
|
||||
case (_, PreRestart) =>
|
||||
monitor ! GotPreRestart(timer.isTimerActive("T"))
|
||||
Behaviors.same
|
||||
case (context, PostStop) =>
|
||||
case (_, PostStop) =>
|
||||
monitor ! GotPostStop(timer.isTimerActive("T"))
|
||||
Behaviors.same
|
||||
}
|
||||
|
|
@ -316,5 +318,23 @@ class TimerSpec extends ScalaTestWithActorTestKit("""
|
|||
if (elements.count(_.getClassName == "TimerInterceptor") > 1)
|
||||
fail(s"Stack contains TimerInterceptor more than once: \n${elements.mkString("\n\t")}")
|
||||
}
|
||||
|
||||
"not leak timers when PostStop is used" in {
|
||||
val probe = TestProbe[Any]()
|
||||
val ref = spawn(Behaviors.withTimers[String] { timers =>
|
||||
Behaviors.setup { _ =>
|
||||
timers.startPeriodicTimer("test", "test", 250.millis)
|
||||
Behaviors.receive { (context, message) =>
|
||||
Behaviors.stopped(() => context.log.info(s"stopping"))
|
||||
}
|
||||
}
|
||||
})
|
||||
EventFilter.info("stopping").intercept {
|
||||
ref ! "stop"
|
||||
}
|
||||
probe.expectTerminated(ref)
|
||||
system.toUntyped.eventStream.subscribe(probe.ref.toUntyped, classOf[DeadLetter])
|
||||
probe.expectNoMessage(1.second)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -119,7 +119,7 @@ class WatchSpec extends ScalaTestWithActorTestKit(WatchSpec.config) with WordSpe
|
|||
val ex = new TestException("boom")
|
||||
val behavior = Behaviors.setup[Any] { context =>
|
||||
val child = context.spawn(Behaviors
|
||||
.supervise(Behaviors.receive[Any]((context, message) => {
|
||||
.supervise(Behaviors.receive[Any]((_, _) => {
|
||||
throw ex
|
||||
}))
|
||||
.onFailure[Throwable](SupervisorStrategy.stop),
|
||||
|
|
@ -127,7 +127,7 @@ class WatchSpec extends ScalaTestWithActorTestKit(WatchSpec.config) with WordSpe
|
|||
context.watch(child)
|
||||
|
||||
Behaviors
|
||||
.receive[Any] { (context, message) =>
|
||||
.receive[Any] { (_, message) =>
|
||||
child ! message
|
||||
Behaviors.same
|
||||
}
|
||||
|
|
@ -153,8 +153,8 @@ class WatchSpec extends ScalaTestWithActorTestKit(WatchSpec.config) with WordSpe
|
|||
val probe = TestProbe[Any]()
|
||||
val ex = new TestException("boom")
|
||||
val grossoBosso =
|
||||
spawn(
|
||||
Behaviors.setup[Any] { context =>
|
||||
spawn(Behaviors.setup[Any] {
|
||||
context =>
|
||||
val middleManagement = context.spawn(Behaviors.setup[Any] { context =>
|
||||
val sixPackJoe = context.spawn(Behaviors.receive[Any]((context, message) => throw ex), "joe")
|
||||
context.watch(sixPackJoe)
|
||||
|
|
@ -178,8 +178,7 @@ class WatchSpec extends ScalaTestWithActorTestKit(WatchSpec.config) with WordSpe
|
|||
Behaviors.stopped
|
||||
}
|
||||
|
||||
},
|
||||
"grosso-bosso")
|
||||
}, "grosso-bosso")
|
||||
|
||||
EventFilter[TestException](occurrences = 1).intercept {
|
||||
EventFilter[DeathPactException](occurrences = 1).intercept {
|
||||
|
|
|
|||
|
|
@ -32,14 +32,11 @@ final class GracefulStopSpec extends ScalaTestWithActorTestKit with WordSpecLike
|
|||
Behaviors.stopped
|
||||
}, "child2")
|
||||
|
||||
Behaviors.stopped {
|
||||
Behaviors.receiveSignal {
|
||||
case (_, PostStop) =>
|
||||
Behaviors.stopped { () =>
|
||||
// cleanup function body
|
||||
probe.ref ! "parent-done"
|
||||
Behaviors.same
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
spawn(behavior)
|
||||
|
|
@ -54,13 +51,9 @@ final class GracefulStopSpec extends ScalaTestWithActorTestKit with WordSpecLike
|
|||
val behavior =
|
||||
Behaviors.setup[akka.NotUsed] { _ =>
|
||||
// do not spawn any children
|
||||
Behaviors.stopped {
|
||||
Behaviors.receiveSignal {
|
||||
case (_, PostStop) =>
|
||||
Behaviors.stopped { () =>
|
||||
// cleanup function body
|
||||
probe.ref ! Done
|
||||
Behaviors.same
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -5,10 +5,9 @@
|
|||
package akka.actor.typed.scaladsl
|
||||
|
||||
import scala.concurrent.Promise
|
||||
|
||||
import akka.Done
|
||||
import akka.actor.testkit.typed.TestException
|
||||
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
|
||||
import akka.actor.testkit.typed.scaladsl.TestProbe
|
||||
import akka.actor.typed
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.actor.typed.BehaviorInterceptor
|
||||
|
|
@ -21,22 +20,52 @@ class StopSpec extends ScalaTestWithActorTestKit with WordSpecLike {
|
|||
|
||||
"Stopping an actor" should {
|
||||
|
||||
"execute the post stop" in {
|
||||
val sawSignal = Promise[Done]()
|
||||
"execute the post stop when stopping after setup" in {
|
||||
val probe = TestProbe[Done]()
|
||||
spawn(Behaviors.setup[AnyRef] { _ =>
|
||||
Behaviors.stopped[AnyRef](Behaviors.receiveSignal[AnyRef] {
|
||||
case (context, PostStop) =>
|
||||
sawSignal.success(Done)
|
||||
Behaviors.empty
|
||||
Behaviors.stopped { () =>
|
||||
probe.ref ! Done
|
||||
}
|
||||
})
|
||||
probe.expectMessage(Done)
|
||||
}
|
||||
|
||||
"execute the post stop" in {
|
||||
val probe = TestProbe[Done]()
|
||||
val ref = spawn(Behaviors.receiveMessage[String] {
|
||||
case "stop" =>
|
||||
Behaviors.stopped { () =>
|
||||
probe.ref ! Done
|
||||
}
|
||||
})
|
||||
sawSignal.future.futureValue should ===(Done)
|
||||
ref ! "stop"
|
||||
probe.expectMessage(Done)
|
||||
}
|
||||
|
||||
"signal PostStop and then execute the post stop" in {
|
||||
val probe = TestProbe[String]()
|
||||
val ref = spawn(
|
||||
Behaviors
|
||||
.receiveMessage[String] {
|
||||
case "stop" =>
|
||||
Behaviors.stopped { () =>
|
||||
probe.ref ! "callback"
|
||||
}
|
||||
}
|
||||
.receiveSignal {
|
||||
case (_, PostStop) =>
|
||||
probe.ref ! "signal"
|
||||
Behaviors.same
|
||||
})
|
||||
ref ! "stop"
|
||||
probe.expectMessage("signal")
|
||||
probe.expectMessage("callback")
|
||||
}
|
||||
|
||||
// #25082
|
||||
"execute the post stop when wrapped" in {
|
||||
val sawSignal = Promise[Done]()
|
||||
val ref = spawn(Behaviors.setup[AnyRef] { _ =>
|
||||
val probe = TestProbe[Done]()
|
||||
spawn(Behaviors.setup[AnyRef] { _ =>
|
||||
Behaviors.intercept(new BehaviorInterceptor[AnyRef, AnyRef] {
|
||||
override def aroundReceive(context: typed.TypedActorContext[AnyRef],
|
||||
message: AnyRef,
|
||||
|
|
@ -49,42 +78,23 @@ class StopSpec extends ScalaTestWithActorTestKit with WordSpecLike {
|
|||
target: SignalTarget[AnyRef]): Behavior[AnyRef] = {
|
||||
target(context, signal)
|
||||
}
|
||||
})(Behaviors.stopped[AnyRef](Behaviors.receiveSignal[AnyRef] {
|
||||
case (context, PostStop) =>
|
||||
sawSignal.success(Done)
|
||||
Behaviors.empty
|
||||
}))
|
||||
})(Behaviors.stopped { () =>
|
||||
probe.ref ! Done
|
||||
})
|
||||
ref ! "stopit"
|
||||
sawSignal.future.futureValue should ===(Done)
|
||||
})
|
||||
probe.expectMessage(Done)
|
||||
}
|
||||
|
||||
// #25096
|
||||
"execute the post stop early" in {
|
||||
val sawSignal = Promise[Done]()
|
||||
spawn(Behaviors.stopped[AnyRef](Behaviors.receiveSignal[AnyRef] {
|
||||
case (context, PostStop) =>
|
||||
sawSignal.success(Done)
|
||||
Behaviors.empty
|
||||
}))
|
||||
|
||||
sawSignal.future.futureValue should ===(Done)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
"PostStop" should {
|
||||
"immediately throw when a deferred behavior (setup) is passed in as postStop" in {
|
||||
val ex = intercept[IllegalArgumentException] {
|
||||
Behaviors.stopped(
|
||||
// illegal:
|
||||
Behaviors.setup[String] { _ =>
|
||||
throw TestException("boom!")
|
||||
val probe = TestProbe[Done]()
|
||||
spawn(Behaviors.stopped { () =>
|
||||
probe.ref ! Done
|
||||
})
|
||||
|
||||
probe.expectMessage(Done)
|
||||
}
|
||||
|
||||
ex.getMessage should include("Behavior used as `postStop` behavior in Stopped(...) was a deferred one ")
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -39,12 +39,8 @@ object GracefulStopDocSpec {
|
|||
context.log.info("Initiating graceful shutdown...")
|
||||
// perform graceful stop, executing cleanup before final system termination
|
||||
// behavior executing cleanup is passed as a parameter to Actor.stopped
|
||||
Behaviors.stopped {
|
||||
Behaviors.receiveSignal {
|
||||
case (context, PostStop) =>
|
||||
Behaviors.stopped { () =>
|
||||
cleanup(context.system.log)
|
||||
Behaviors.same
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,12 +4,13 @@
|
|||
|
||||
package akka.actor.typed
|
||||
|
||||
import scala.annotation.tailrec
|
||||
|
||||
import akka.actor.InvalidMessageException
|
||||
import akka.actor.typed.internal.BehaviorImpl
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import akka.actor.typed.internal.BehaviorImpl.OrElseBehavior
|
||||
import akka.actor.typed.internal.WrappingBehavior
|
||||
import akka.actor.typed.internal.BehaviorImpl.OrElseBehavior
|
||||
|
||||
import akka.util.{ LineNumbers, OptionVal }
|
||||
import akka.annotation.{ ApiMayChange, DoNotInherit, InternalApi }
|
||||
import akka.actor.typed.scaladsl.{ ActorContext => SAC }
|
||||
|
|
@ -63,7 +64,7 @@ abstract class Behavior[T] { behavior =>
|
|||
* when `unhandled` is returned.
|
||||
*
|
||||
* @param that the fallback `Behavior`
|
||||
*/
|
||||
**/
|
||||
final def orElse(that: Behavior[T]): Behavior[T] = Behavior.DeferredBehavior[T] { ctx =>
|
||||
new OrElseBehavior[T](Behavior.start(this, ctx), Behavior.start(that, ctx))
|
||||
}
|
||||
|
|
@ -176,8 +177,8 @@ object Behavior {
|
|||
* given `postStop` behavior. All other messages and signals will effectively be
|
||||
* ignored.
|
||||
*/
|
||||
def stopped[T](postStop: Behavior[T]): Behavior[T] =
|
||||
new StoppedBehavior(OptionVal.Some(postStop))
|
||||
def stopped[T](postStop: () => Unit): Behavior[T] =
|
||||
new StoppedBehavior[T](OptionVal.Some((_: TypedActorContext[T]) => postStop()))
|
||||
|
||||
/**
|
||||
* A behavior that treats every incoming message as unhandled.
|
||||
|
|
@ -266,22 +267,19 @@ object Behavior {
|
|||
* INTERNAL API: When the cell is stopping this behavior is used, so
|
||||
* that PostStop can be sent to previous behavior from `finishTerminate`.
|
||||
*/
|
||||
private[akka] class StoppedBehavior[T](val postStop: OptionVal[Behavior[T]]) extends Behavior[T] {
|
||||
validatePostStop(postStop)
|
||||
private[akka] sealed class StoppedBehavior[T](val postStop: OptionVal[TypedActorContext[T] => Unit])
|
||||
extends Behavior[T] {
|
||||
|
||||
@throws[IllegalArgumentException]
|
||||
private final def validatePostStop(postStop: OptionVal[Behavior[T]]): Unit = {
|
||||
def onPostStop(ctx: TypedActorContext[T]): Unit = {
|
||||
postStop match {
|
||||
case OptionVal.Some(b: DeferredBehavior[_]) =>
|
||||
throw new IllegalArgumentException(
|
||||
s"Behavior used as `postStop` behavior in Stopped(...) was a deferred one [${b.toString}], which is not supported (it would never be evaluated).")
|
||||
case _ => // all good
|
||||
case OptionVal.Some(callback) => callback(ctx)
|
||||
case OptionVal.None =>
|
||||
}
|
||||
}
|
||||
|
||||
override def toString = "Stopped" + {
|
||||
postStop match {
|
||||
case OptionVal.Some(_) => "(postStop)"
|
||||
case OptionVal.Some(callback) => s"(${LineNumbers(callback)})"
|
||||
case _ => "()"
|
||||
}
|
||||
}
|
||||
|
|
@ -420,7 +418,9 @@ object Behavior {
|
|||
case d: DeferredBehavior[_] =>
|
||||
throw new IllegalArgumentException(s"deferred [$d] should not be passed to interpreter")
|
||||
case IgnoreBehavior => Behavior.same[T]
|
||||
case s: StoppedBehavior[T] => s
|
||||
case s: StoppedBehavior[T] =>
|
||||
if (msg == PostStop) s.onPostStop(ctx)
|
||||
s
|
||||
case f: FailedBehavior => f
|
||||
case EmptyBehavior => Behavior.unhandled[T]
|
||||
case ext: ExtensibleBehavior[T] =>
|
||||
|
|
|
|||
|
|
@ -44,6 +44,7 @@ private[akka] final class InterceptorImpl[O, I](val interceptor: BehaviorInterce
|
|||
override def start(ctx: TypedActorContext[_]): Behavior[I] = {
|
||||
Behavior.start[I](nestedBehavior, ctx.asInstanceOf[TypedActorContext[I]])
|
||||
}
|
||||
override def toString: String = s"PreStartTarget($nestedBehavior)"
|
||||
}
|
||||
|
||||
private val receiveTarget: ReceiveTarget[I] = new ReceiveTarget[I] {
|
||||
|
|
@ -52,11 +53,14 @@ private[akka] final class InterceptorImpl[O, I](val interceptor: BehaviorInterce
|
|||
|
||||
override def signalRestart(ctx: TypedActorContext[_]): Unit =
|
||||
Behavior.interpretSignal(nestedBehavior, ctx.asInstanceOf[TypedActorContext[I]], PreRestart)
|
||||
|
||||
override def toString: String = s"ReceiveTarget($nestedBehavior)"
|
||||
}
|
||||
|
||||
private val signalTarget = new SignalTarget[I] {
|
||||
override def apply(ctx: TypedActorContext[_], signal: Signal): Behavior[I] =
|
||||
Behavior.interpretSignal(nestedBehavior, ctx.asInstanceOf[TypedActorContext[I]], signal)
|
||||
override def toString: String = s"SignalTarget($nestedBehavior)"
|
||||
}
|
||||
|
||||
// invoked pre-start to start/de-duplicate the initial behavior stack
|
||||
|
|
|
|||
|
|
@ -12,6 +12,9 @@ import akka.annotation.InternalApi
|
|||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*
|
||||
* Note that this is a `Signal` poison pill, not a universal poison pill like the untyped actor one.
|
||||
* This requires special handling on the receiving side where it is used (for example with the interceptor below).
|
||||
*/
|
||||
@InternalApi private[akka] sealed abstract class PoisonPill extends Signal
|
||||
|
||||
|
|
|
|||
|
|
@ -9,6 +9,8 @@ package adapter
|
|||
import java.lang.reflect.InvocationTargetException
|
||||
|
||||
import akka.actor.ActorInitializationException
|
||||
import akka.actor.typed.Behavior.DeferredBehavior
|
||||
import akka.actor.typed.Behavior.StoppedBehavior
|
||||
import akka.actor.typed.internal.adapter.ActorAdapter.TypedActorFailedException
|
||||
import scala.annotation.tailrec
|
||||
import scala.util.Failure
|
||||
|
|
@ -18,7 +20,6 @@ import scala.util.control.Exception.Catcher
|
|||
|
||||
import akka.{ actor => untyped }
|
||||
import akka.annotation.InternalApi
|
||||
import akka.util.OptionVal
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -110,22 +111,12 @@ import akka.util.OptionVal
|
|||
if (Behavior.isUnhandled(b)) unhandled(msg)
|
||||
else {
|
||||
b match {
|
||||
case s: StoppedBehavior[T] =>
|
||||
// use StoppedBehavior with previous behavior or an explicitly given `postStop` behavior
|
||||
// until Terminate is received, i.e until postStop is invoked, and there PostStop
|
||||
// will be signaled to the previous/postStop behavior
|
||||
s.postStop match {
|
||||
case OptionVal.None =>
|
||||
// use previous as the postStop behavior
|
||||
behavior = new Behavior.StoppedBehavior(OptionVal.Some(behavior))
|
||||
case OptionVal.Some(postStop) =>
|
||||
// use the given postStop behavior, but canonicalize it
|
||||
behavior = new Behavior.StoppedBehavior(OptionVal.Some(Behavior.canonicalize(postStop, behavior, ctx)))
|
||||
}
|
||||
context.stop(self)
|
||||
case f: FailedBehavior =>
|
||||
// For the parent untyped supervisor to pick up the exception
|
||||
throw TypedActorFailedException(f.cause)
|
||||
case stopped: StoppedBehavior[T] =>
|
||||
behavior = new ComposedStoppingBehavior[T](behavior, stopped)
|
||||
context.stop(self)
|
||||
case _ =>
|
||||
behavior = Behavior.canonicalize(b, behavior, ctx)
|
||||
}
|
||||
|
|
@ -225,11 +216,6 @@ import akka.util.OptionVal
|
|||
case null => // skip PostStop
|
||||
case _: DeferredBehavior[_] =>
|
||||
// Do not undefer a DeferredBehavior as that may cause creation side-effects, which we do not want on termination.
|
||||
case s: StoppedBehavior[_] =>
|
||||
s.postStop match {
|
||||
case OptionVal.Some(postStop) => Behavior.interpretSignal(postStop, ctx, PostStop)
|
||||
case OptionVal.None => // no postStop behavior defined
|
||||
}
|
||||
case b => Behavior.interpretSignal(b, ctx, PostStop)
|
||||
}
|
||||
|
||||
|
|
@ -287,3 +273,27 @@ private[typed] class GuardianActorAdapter[T](_initialBehavior: Behavior[T]) exte
|
|||
case object Start
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[typed] final class ComposedStoppingBehavior[T](lastBehavior: Behavior[T],
|
||||
stopBehavior: StoppedBehavior[T])
|
||||
extends ExtensibleBehavior[T] {
|
||||
override def receive(ctx: TypedActorContext[T], msg: T): Behavior[T] =
|
||||
throw new IllegalStateException("Stopping, should never receieve a message")
|
||||
override def receiveSignal(ctx: TypedActorContext[T], msg: Signal): Behavior[T] = {
|
||||
if (msg != PostStop)
|
||||
throw new IllegalArgumentException(
|
||||
s"The ComposedStoppingBehavior should only ever receive a PostStop signal, but received $msg")
|
||||
// first pass the signal to the previous behavior, so that it and potential interceptors
|
||||
// will get the PostStop signal, unless it is deferred, we don't start a behavior while stopping
|
||||
lastBehavior match {
|
||||
case _: DeferredBehavior[_] => // no starting of behaviors on actor stop
|
||||
case nonDeferred => Behavior.interpretSignal(nonDeferred, ctx, PostStop)
|
||||
}
|
||||
// and then to the potential stop hook, which can have a call back or not
|
||||
stopBehavior.onPostStop(ctx)
|
||||
Behavior.empty
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ import java.util.function.{ Function => JFunction }
|
|||
import akka.actor.typed._
|
||||
import akka.actor.typed.internal.{ BehaviorImpl, Supervisor, TimerSchedulerImpl, WithMdcBehaviorInterceptor }
|
||||
import akka.annotation.ApiMayChange
|
||||
import akka.japi.function.{ Function2 => JapiFunction2 }
|
||||
import akka.japi.function.{ Effect, Function2 => JapiFunction2 }
|
||||
import akka.japi.pf.PFBuilder
|
||||
import akka.util.unused
|
||||
|
||||
|
|
@ -63,7 +63,7 @@ object Behaviors {
|
|||
* shall terminate voluntarily. If this actor has created child actors then
|
||||
* these will be stopped as part of the shutdown procedure.
|
||||
*
|
||||
* The PostStop signal that results from stopping this actor will be passed to the
|
||||
* The `PostStop` signal that results from stopping this actor will be passed to the
|
||||
* current behavior. All other messages and signals will effectively be
|
||||
* ignored.
|
||||
*/
|
||||
|
|
@ -74,11 +74,11 @@ object Behaviors {
|
|||
* shall terminate voluntarily. If this actor has created child actors then
|
||||
* these will be stopped as part of the shutdown procedure.
|
||||
*
|
||||
* The PostStop signal that results from stopping this actor will be passed to the
|
||||
* given `postStop` behavior. All other messages and signals will effectively be
|
||||
* ignored.
|
||||
* The `PostStop` signal that results from stopping this actor will first be passed to the
|
||||
* current behavior and then the provided `postStop` callback will be invoked.
|
||||
* All other messages and signals will effectively be ignored.
|
||||
*/
|
||||
def stopped[T](postStop: Behavior[T]): Behavior[T] = Behavior.stopped(postStop)
|
||||
def stopped[T](postStop: Effect): Behavior[T] = Behavior.stopped(postStop.apply _)
|
||||
|
||||
/**
|
||||
* A behavior that treats every incoming message as unhandled.
|
||||
|
|
@ -282,6 +282,7 @@ object Behaviors {
|
|||
* Support for scheduled `self` messages in an actor.
|
||||
* It takes care of the lifecycle of the timers such as cancelling them when the actor
|
||||
* is restarted or stopped.
|
||||
*
|
||||
* @see [[TimerScheduler]]
|
||||
*/
|
||||
def withTimers[T](factory: akka.japi.function.Function[TimerScheduler[T], Behavior[T]]): Behavior[T] =
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ object Behaviors {
|
|||
* shall terminate voluntarily. If this actor has created child actors then
|
||||
* these will be stopped as part of the shutdown procedure.
|
||||
*
|
||||
* The PostStop signal that results from stopping this actor will be passed to the
|
||||
* The `PostStop` signal that results from stopping this actor will be passed to the
|
||||
* current behavior. All other messages and signals will effectively be
|
||||
* ignored.
|
||||
*/
|
||||
|
|
@ -62,11 +62,11 @@ object Behaviors {
|
|||
* shall terminate voluntarily. If this actor has created child actors then
|
||||
* these will be stopped as part of the shutdown procedure.
|
||||
*
|
||||
* The PostStop signal that results from stopping this actor will be passed to the
|
||||
* given `postStop` behavior. All other messages and signals will effectively be
|
||||
* ignored.
|
||||
* The `PostStop` signal that results from stopping this actor will first be passed to the
|
||||
* current behavior and then the provided `postStop` callback will be invoked.
|
||||
* All other messages and signals will effectively be ignored.
|
||||
*/
|
||||
def stopped[T](postStop: Behavior[T]): Behavior[T] = Behavior.stopped(postStop)
|
||||
def stopped[T](postStop: () => Unit): Behavior[T] = Behavior.stopped(postStop)
|
||||
|
||||
/**
|
||||
* A behavior that treats every incoming message as unhandled.
|
||||
|
|
@ -215,6 +215,7 @@ object Behaviors {
|
|||
* Support for scheduled `self` messages in an actor.
|
||||
* It takes care of the lifecycle of the timers such as cancelling them when the actor
|
||||
* is restarted or stopped.
|
||||
*
|
||||
* @see [[TimerScheduler]]
|
||||
*/
|
||||
def withTimers[T](factory: TimerScheduler[T] => Behavior[T]): Behavior[T] =
|
||||
|
|
|
|||
|
|
@ -125,18 +125,19 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
|
|||
try {
|
||||
eventSourcedSetup.onSignal(signal)
|
||||
} catch {
|
||||
case NonFatal(ex) ⇒
|
||||
case NonFatal(ex) =>
|
||||
ctx.asScala.log.error(ex, s"Error while processing signal [{}]", signal)
|
||||
}
|
||||
nextBehavior
|
||||
}
|
||||
override def toString: String = "onStopInterceptor"
|
||||
}
|
||||
val widened = RequestingRecoveryPermit(eventSourcedSetup).widen[Any] {
|
||||
case res: JournalProtocol.Response ⇒ InternalProtocol.JournalResponse(res)
|
||||
case res: SnapshotProtocol.Response ⇒ InternalProtocol.SnapshotterResponse(res)
|
||||
case RecoveryPermitter.RecoveryPermitGranted ⇒ InternalProtocol.RecoveryPermitGranted
|
||||
case internal: InternalProtocol ⇒ internal // such as RecoveryTickEvent
|
||||
case cmd: Command @unchecked ⇒ InternalProtocol.IncomingCommand(cmd)
|
||||
case res: JournalProtocol.Response => InternalProtocol.JournalResponse(res)
|
||||
case res: SnapshotProtocol.Response => InternalProtocol.SnapshotterResponse(res)
|
||||
case RecoveryPermitter.RecoveryPermitGranted => InternalProtocol.RecoveryPermitGranted
|
||||
case internal: InternalProtocol => internal // such as RecoveryTickEvent
|
||||
case cmd: Command @unchecked => InternalProtocol.IncomingCommand(cmd)
|
||||
}
|
||||
Behaviors.intercept(onStopInterceptor)(widened).narrow[Command]
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue