diff --git a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorCompile.java b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorCompile.java index bf230d0e7b..5464b4856b 100644 --- a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorCompile.java +++ b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorCompile.java @@ -62,7 +62,7 @@ public class ActorCompile { }); Behavior actor9 = widened(actor7, pf -> pf.match(MyMsgA.class, x -> x)); Behavior actor10 = - Behaviors.receive((context, message) -> stopped(actor4), (context, signal) -> same()); + Behaviors.receive((context, message) -> stopped(() -> {}), (context, signal) -> same()); ActorSystem system = ActorSystem.create(actor1, "Sys"); diff --git a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/GracefulStopDocTest.java b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/GracefulStopDocTest.java index e02f64ee72..78494746dd 100644 --- a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/GracefulStopDocTest.java +++ b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/GracefulStopDocTest.java @@ -62,11 +62,9 @@ public class GracefulStopDocTest { // perform graceful stop, executing cleanup before final system termination // behavior executing cleanup is passed as a parameter to Actor.stopped return Behaviors.stopped( - Behaviors.receiveSignal( - (_ctx, PostStop) -> { - context.getSystem().log().info("Cleanup!"); - return Behaviors.same(); - })); + () -> { + context.getSystem().log().info("Cleanup!"); + }); }) .onSignal( PostStop.class, diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala index 43c0a986e4..8a321ec413 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala @@ -272,7 +272,7 @@ object BehaviorSpec { "must stop" in { val Setup(testkit, _, aux) = mkCtx() testkit.run(Stop) - testkit.currentBehavior should be(Behavior.StoppedBehavior) + Behavior.isAlive(testkit.currentBehavior) should be(false) checkAux(Stop, aux) } } @@ -567,7 +567,7 @@ class ImmutableWithSignalJavaBehaviorSpec extends Messages with BecomeWithLifecy SBehaviors.same case Stop => SBehaviors.stopped case _: AuxPing => SBehaviors.unhandled - }), + }), fs((_, sig) => { monitor ! ReceivedSignal(sig) SBehaviors.same @@ -600,7 +600,7 @@ class ImmutableJavaBehaviorSpec extends Messages with Become with Stoppable { SBehaviors.same case Stop => SBehaviors.stopped case _: AuxPing => SBehaviors.unhandled - }) + }) } } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/InterceptSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/InterceptSpec.scala index 5674156b8f..eab4e2dac2 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/InterceptSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/InterceptSpec.scala @@ -329,6 +329,37 @@ class InterceptSpec extends ScalaTestWithActorTestKit(""" probe.expectMessageType[B] } + "intercept PostStop" in { + val probe = TestProbe[String]() + val postStopInterceptor = new BehaviorInterceptor[String, String] { + def aroundReceive(ctx: TypedActorContext[String], + msg: String, + target: ReceiveTarget[String]): Behavior[String] = { + target(ctx, msg) + } + def aroundSignal(ctx: TypedActorContext[String], + signal: Signal, + target: SignalTarget[String]): Behavior[String] = { + signal match { + case PostStop => + probe.ref ! "interceptor-post-stop" + } + target(ctx, signal) + } + } + + val ref = spawn(Behaviors.intercept(postStopInterceptor)(Behaviors.receiveMessage[String] { _ => + Behaviors.stopped { () => + probe.ref ! "callback-post-stop" + } + })) + + ref ! "stop" + probe.awaitAssert { + probe.expectMessage("interceptor-post-stop") // previous behavior when stopping get the signal + probe.expectMessage("callback-post-stop") + } + } } } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/TimerSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/TimerSpec.scala index 7845231387..92b9624682 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/TimerSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/TimerSpec.scala @@ -10,6 +10,8 @@ import java.util.concurrent.atomic.AtomicInteger import scala.concurrent.duration._ import scala.util.control.NoStackTrace + +import akka.actor.DeadLetter import akka.actor.testkit.typed.scaladsl._ import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.TimerScheduler @@ -75,10 +77,10 @@ class TimerSpec extends ScalaTestWithActorTestKit(""" } } .receiveSignal { - case (context, PreRestart) => + case (_, PreRestart) => monitor ! GotPreRestart(timer.isTimerActive("T")) Behaviors.same - case (context, PostStop) => + case (_, PostStop) => monitor ! GotPostStop(timer.isTimerActive("T")) Behaviors.same } @@ -316,5 +318,23 @@ class TimerSpec extends ScalaTestWithActorTestKit(""" if (elements.count(_.getClassName == "TimerInterceptor") > 1) fail(s"Stack contains TimerInterceptor more than once: \n${elements.mkString("\n\t")}") } + + "not leak timers when PostStop is used" in { + val probe = TestProbe[Any]() + val ref = spawn(Behaviors.withTimers[String] { timers => + Behaviors.setup { _ => + timers.startPeriodicTimer("test", "test", 250.millis) + Behaviors.receive { (context, message) => + Behaviors.stopped(() => context.log.info(s"stopping")) + } + } + }) + EventFilter.info("stopping").intercept { + ref ! "stop" + } + probe.expectTerminated(ref) + system.toUntyped.eventStream.subscribe(probe.ref.toUntyped, classOf[DeadLetter]) + probe.expectNoMessage(1.second) + } } } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/WatchSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/WatchSpec.scala index 9b12181d67..0ab9ddf73a 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/WatchSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/WatchSpec.scala @@ -119,7 +119,7 @@ class WatchSpec extends ScalaTestWithActorTestKit(WatchSpec.config) with WordSpe val ex = new TestException("boom") val behavior = Behaviors.setup[Any] { context => val child = context.spawn(Behaviors - .supervise(Behaviors.receive[Any]((context, message) => { + .supervise(Behaviors.receive[Any]((_, _) => { throw ex })) .onFailure[Throwable](SupervisorStrategy.stop), @@ -127,7 +127,7 @@ class WatchSpec extends ScalaTestWithActorTestKit(WatchSpec.config) with WordSpe context.watch(child) Behaviors - .receive[Any] { (context, message) => + .receive[Any] { (_, message) => child ! message Behaviors.same } @@ -153,8 +153,8 @@ class WatchSpec extends ScalaTestWithActorTestKit(WatchSpec.config) with WordSpe val probe = TestProbe[Any]() val ex = new TestException("boom") val grossoBosso = - spawn( - Behaviors.setup[Any] { context => + spawn(Behaviors.setup[Any] { + context => val middleManagement = context.spawn(Behaviors.setup[Any] { context => val sixPackJoe = context.spawn(Behaviors.receive[Any]((context, message) => throw ex), "joe") context.watch(sixPackJoe) @@ -178,8 +178,7 @@ class WatchSpec extends ScalaTestWithActorTestKit(WatchSpec.config) with WordSpe Behaviors.stopped } - }, - "grosso-bosso") + }, "grosso-bosso") EventFilter[TestException](occurrences = 1).intercept { EventFilter[DeathPactException](occurrences = 1).intercept { diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/GracefulStopSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/GracefulStopSpec.scala index 592f16f4ae..8376401d4f 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/GracefulStopSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/GracefulStopSpec.scala @@ -32,14 +32,11 @@ final class GracefulStopSpec extends ScalaTestWithActorTestKit with WordSpecLike Behaviors.stopped }, "child2") - Behaviors.stopped { - Behaviors.receiveSignal { - case (_, PostStop) => - // cleanup function body - probe.ref ! "parent-done" - Behaviors.same - } + Behaviors.stopped { () => + // cleanup function body + probe.ref ! "parent-done" } + } spawn(behavior) @@ -54,13 +51,9 @@ final class GracefulStopSpec extends ScalaTestWithActorTestKit with WordSpecLike val behavior = Behaviors.setup[akka.NotUsed] { _ => // do not spawn any children - Behaviors.stopped { - Behaviors.receiveSignal { - case (_, PostStop) => - // cleanup function body - probe.ref ! Done - Behaviors.same - } + Behaviors.stopped { () => + // cleanup function body + probe.ref ! Done } } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StopSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StopSpec.scala index a405693bbf..6b791b1ba0 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StopSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StopSpec.scala @@ -5,10 +5,9 @@ package akka.actor.typed.scaladsl import scala.concurrent.Promise - import akka.Done -import akka.actor.testkit.typed.TestException import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.testkit.typed.scaladsl.TestProbe import akka.actor.typed import akka.actor.typed.Behavior import akka.actor.typed.BehaviorInterceptor @@ -21,22 +20,52 @@ class StopSpec extends ScalaTestWithActorTestKit with WordSpecLike { "Stopping an actor" should { - "execute the post stop" in { - val sawSignal = Promise[Done]() + "execute the post stop when stopping after setup" in { + val probe = TestProbe[Done]() spawn(Behaviors.setup[AnyRef] { _ => - Behaviors.stopped[AnyRef](Behaviors.receiveSignal[AnyRef] { - case (context, PostStop) => - sawSignal.success(Done) - Behaviors.empty - }) + Behaviors.stopped { () => + probe.ref ! Done + } }) - sawSignal.future.futureValue should ===(Done) + probe.expectMessage(Done) + } + + "execute the post stop" in { + val probe = TestProbe[Done]() + val ref = spawn(Behaviors.receiveMessage[String] { + case "stop" => + Behaviors.stopped { () => + probe.ref ! Done + } + }) + ref ! "stop" + probe.expectMessage(Done) + } + + "signal PostStop and then execute the post stop" in { + val probe = TestProbe[String]() + val ref = spawn( + Behaviors + .receiveMessage[String] { + case "stop" => + Behaviors.stopped { () => + probe.ref ! "callback" + } + } + .receiveSignal { + case (_, PostStop) => + probe.ref ! "signal" + Behaviors.same + }) + ref ! "stop" + probe.expectMessage("signal") + probe.expectMessage("callback") } // #25082 "execute the post stop when wrapped" in { - val sawSignal = Promise[Done]() - val ref = spawn(Behaviors.setup[AnyRef] { _ => + val probe = TestProbe[Done]() + spawn(Behaviors.setup[AnyRef] { _ => Behaviors.intercept(new BehaviorInterceptor[AnyRef, AnyRef] { override def aroundReceive(context: typed.TypedActorContext[AnyRef], message: AnyRef, @@ -49,42 +78,23 @@ class StopSpec extends ScalaTestWithActorTestKit with WordSpecLike { target: SignalTarget[AnyRef]): Behavior[AnyRef] = { target(context, signal) } - })(Behaviors.stopped[AnyRef](Behaviors.receiveSignal[AnyRef] { - case (context, PostStop) => - sawSignal.success(Done) - Behaviors.empty - })) + })(Behaviors.stopped { () => + probe.ref ! Done + }) }) - ref ! "stopit" - sawSignal.future.futureValue should ===(Done) + probe.expectMessage(Done) } // #25096 "execute the post stop early" in { - val sawSignal = Promise[Done]() - spawn(Behaviors.stopped[AnyRef](Behaviors.receiveSignal[AnyRef] { - case (context, PostStop) => - sawSignal.success(Done) - Behaviors.empty - })) + val probe = TestProbe[Done]() + spawn(Behaviors.stopped { () => + probe.ref ! Done + }) - sawSignal.future.futureValue should ===(Done) + probe.expectMessage(Done) } } - "PostStop" should { - "immediately throw when a deferred behavior (setup) is passed in as postStop" in { - val ex = intercept[IllegalArgumentException] { - Behaviors.stopped( - // illegal: - Behaviors.setup[String] { _ => - throw TestException("boom!") - }) - } - - ex.getMessage should include("Behavior used as `postStop` behavior in Stopped(...) was a deferred one ") - } - } - } diff --git a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/GracefulStopDocSpec.scala b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/GracefulStopDocSpec.scala index 346fcc1672..ec978e8c74 100644 --- a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/GracefulStopDocSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/GracefulStopDocSpec.scala @@ -39,12 +39,8 @@ object GracefulStopDocSpec { context.log.info("Initiating graceful shutdown...") // perform graceful stop, executing cleanup before final system termination // behavior executing cleanup is passed as a parameter to Actor.stopped - Behaviors.stopped { - Behaviors.receiveSignal { - case (context, PostStop) => - cleanup(context.system.log) - Behaviors.same - } + Behaviors.stopped { () => + cleanup(context.system.log) } } } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala index a17c2b2dc9..edc7e4cc06 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala @@ -4,12 +4,13 @@ package akka.actor.typed +import scala.annotation.tailrec + import akka.actor.InvalidMessageException import akka.actor.typed.internal.BehaviorImpl - -import scala.annotation.tailrec -import akka.actor.typed.internal.BehaviorImpl.OrElseBehavior import akka.actor.typed.internal.WrappingBehavior +import akka.actor.typed.internal.BehaviorImpl.OrElseBehavior + import akka.util.{ LineNumbers, OptionVal } import akka.annotation.{ ApiMayChange, DoNotInherit, InternalApi } import akka.actor.typed.scaladsl.{ ActorContext => SAC } @@ -63,7 +64,7 @@ abstract class Behavior[T] { behavior => * when `unhandled` is returned. * * @param that the fallback `Behavior` - */ + **/ final def orElse(that: Behavior[T]): Behavior[T] = Behavior.DeferredBehavior[T] { ctx => new OrElseBehavior[T](Behavior.start(this, ctx), Behavior.start(that, ctx)) } @@ -176,8 +177,8 @@ object Behavior { * given `postStop` behavior. All other messages and signals will effectively be * ignored. */ - def stopped[T](postStop: Behavior[T]): Behavior[T] = - new StoppedBehavior(OptionVal.Some(postStop)) + def stopped[T](postStop: () => Unit): Behavior[T] = + new StoppedBehavior[T](OptionVal.Some((_: TypedActorContext[T]) => postStop())) /** * A behavior that treats every incoming message as unhandled. @@ -266,23 +267,20 @@ object Behavior { * INTERNAL API: When the cell is stopping this behavior is used, so * that PostStop can be sent to previous behavior from `finishTerminate`. */ - private[akka] class StoppedBehavior[T](val postStop: OptionVal[Behavior[T]]) extends Behavior[T] { - validatePostStop(postStop) + private[akka] sealed class StoppedBehavior[T](val postStop: OptionVal[TypedActorContext[T] => Unit]) + extends Behavior[T] { - @throws[IllegalArgumentException] - private final def validatePostStop(postStop: OptionVal[Behavior[T]]): Unit = { + def onPostStop(ctx: TypedActorContext[T]): Unit = { postStop match { - case OptionVal.Some(b: DeferredBehavior[_]) => - throw new IllegalArgumentException( - s"Behavior used as `postStop` behavior in Stopped(...) was a deferred one [${b.toString}], which is not supported (it would never be evaluated).") - case _ => // all good + case OptionVal.Some(callback) => callback(ctx) + case OptionVal.None => } } override def toString = "Stopped" + { postStop match { - case OptionVal.Some(_) => "(postStop)" - case _ => "()" + case OptionVal.Some(callback) => s"(${LineNumbers(callback)})" + case _ => "()" } } } @@ -419,10 +417,12 @@ object Behavior { throw new IllegalArgumentException(s"cannot execute with [$behavior] as behavior") case d: DeferredBehavior[_] => throw new IllegalArgumentException(s"deferred [$d] should not be passed to interpreter") - case IgnoreBehavior => Behavior.same[T] - case s: StoppedBehavior[T] => s - case f: FailedBehavior => f - case EmptyBehavior => Behavior.unhandled[T] + case IgnoreBehavior => Behavior.same[T] + case s: StoppedBehavior[T] => + if (msg == PostStop) s.onPostStop(ctx) + s + case f: FailedBehavior => f + case EmptyBehavior => Behavior.unhandled[T] case ext: ExtensibleBehavior[T] => val possiblyDeferredResult = msg match { case signal: Signal => ext.receiveSignal(ctx, signal) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala index 68b7ce0711..b96dd3c02a 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala @@ -44,6 +44,7 @@ private[akka] final class InterceptorImpl[O, I](val interceptor: BehaviorInterce override def start(ctx: TypedActorContext[_]): Behavior[I] = { Behavior.start[I](nestedBehavior, ctx.asInstanceOf[TypedActorContext[I]]) } + override def toString: String = s"PreStartTarget($nestedBehavior)" } private val receiveTarget: ReceiveTarget[I] = new ReceiveTarget[I] { @@ -52,11 +53,14 @@ private[akka] final class InterceptorImpl[O, I](val interceptor: BehaviorInterce override def signalRestart(ctx: TypedActorContext[_]): Unit = Behavior.interpretSignal(nestedBehavior, ctx.asInstanceOf[TypedActorContext[I]], PreRestart) + + override def toString: String = s"ReceiveTarget($nestedBehavior)" } private val signalTarget = new SignalTarget[I] { override def apply(ctx: TypedActorContext[_], signal: Signal): Behavior[I] = Behavior.interpretSignal(nestedBehavior, ctx.asInstanceOf[TypedActorContext[I]], signal) + override def toString: String = s"SignalTarget($nestedBehavior)" } // invoked pre-start to start/de-duplicate the initial behavior stack diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/PoisonPill.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/PoisonPill.scala index 511bbe9af1..737e24b024 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/PoisonPill.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/PoisonPill.scala @@ -12,6 +12,9 @@ import akka.annotation.InternalApi /** * INTERNAL API + * + * Note that this is a `Signal` poison pill, not a universal poison pill like the untyped actor one. + * This requires special handling on the receiving side where it is used (for example with the interceptor below). */ @InternalApi private[akka] sealed abstract class PoisonPill extends Signal diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala index e38e4c6f72..fecc8a21b3 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala @@ -9,6 +9,8 @@ package adapter import java.lang.reflect.InvocationTargetException import akka.actor.ActorInitializationException +import akka.actor.typed.Behavior.DeferredBehavior +import akka.actor.typed.Behavior.StoppedBehavior import akka.actor.typed.internal.adapter.ActorAdapter.TypedActorFailedException import scala.annotation.tailrec import scala.util.Failure @@ -18,7 +20,6 @@ import scala.util.control.Exception.Catcher import akka.{ actor => untyped } import akka.annotation.InternalApi -import akka.util.OptionVal /** * INTERNAL API @@ -110,22 +111,12 @@ import akka.util.OptionVal if (Behavior.isUnhandled(b)) unhandled(msg) else { b match { - case s: StoppedBehavior[T] => - // use StoppedBehavior with previous behavior or an explicitly given `postStop` behavior - // until Terminate is received, i.e until postStop is invoked, and there PostStop - // will be signaled to the previous/postStop behavior - s.postStop match { - case OptionVal.None => - // use previous as the postStop behavior - behavior = new Behavior.StoppedBehavior(OptionVal.Some(behavior)) - case OptionVal.Some(postStop) => - // use the given postStop behavior, but canonicalize it - behavior = new Behavior.StoppedBehavior(OptionVal.Some(Behavior.canonicalize(postStop, behavior, ctx))) - } - context.stop(self) case f: FailedBehavior => // For the parent untyped supervisor to pick up the exception throw TypedActorFailedException(f.cause) + case stopped: StoppedBehavior[T] => + behavior = new ComposedStoppingBehavior[T](behavior, stopped) + context.stop(self) case _ => behavior = Behavior.canonicalize(b, behavior, ctx) } @@ -225,11 +216,6 @@ import akka.util.OptionVal case null => // skip PostStop case _: DeferredBehavior[_] => // Do not undefer a DeferredBehavior as that may cause creation side-effects, which we do not want on termination. - case s: StoppedBehavior[_] => - s.postStop match { - case OptionVal.Some(postStop) => Behavior.interpretSignal(postStop, ctx, PostStop) - case OptionVal.None => // no postStop behavior defined - } case b => Behavior.interpretSignal(b, ctx, PostStop) } @@ -287,3 +273,27 @@ private[typed] class GuardianActorAdapter[T](_initialBehavior: Behavior[T]) exte case object Start } + +/** + * INTERNAL API + */ +@InternalApi private[typed] final class ComposedStoppingBehavior[T](lastBehavior: Behavior[T], + stopBehavior: StoppedBehavior[T]) + extends ExtensibleBehavior[T] { + override def receive(ctx: TypedActorContext[T], msg: T): Behavior[T] = + throw new IllegalStateException("Stopping, should never receieve a message") + override def receiveSignal(ctx: TypedActorContext[T], msg: Signal): Behavior[T] = { + if (msg != PostStop) + throw new IllegalArgumentException( + s"The ComposedStoppingBehavior should only ever receive a PostStop signal, but received $msg") + // first pass the signal to the previous behavior, so that it and potential interceptors + // will get the PostStop signal, unless it is deferred, we don't start a behavior while stopping + lastBehavior match { + case _: DeferredBehavior[_] => // no starting of behaviors on actor stop + case nonDeferred => Behavior.interpretSignal(nonDeferred, ctx, PostStop) + } + // and then to the potential stop hook, which can have a call back or not + stopBehavior.onPostStop(ctx) + Behavior.empty + } +} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala index 7246500a24..20bf915754 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala @@ -10,7 +10,7 @@ import java.util.function.{ Function => JFunction } import akka.actor.typed._ import akka.actor.typed.internal.{ BehaviorImpl, Supervisor, TimerSchedulerImpl, WithMdcBehaviorInterceptor } import akka.annotation.ApiMayChange -import akka.japi.function.{ Function2 => JapiFunction2 } +import akka.japi.function.{ Effect, Function2 => JapiFunction2 } import akka.japi.pf.PFBuilder import akka.util.unused @@ -63,7 +63,7 @@ object Behaviors { * shall terminate voluntarily. If this actor has created child actors then * these will be stopped as part of the shutdown procedure. * - * The PostStop signal that results from stopping this actor will be passed to the + * The `PostStop` signal that results from stopping this actor will be passed to the * current behavior. All other messages and signals will effectively be * ignored. */ @@ -74,11 +74,11 @@ object Behaviors { * shall terminate voluntarily. If this actor has created child actors then * these will be stopped as part of the shutdown procedure. * - * The PostStop signal that results from stopping this actor will be passed to the - * given `postStop` behavior. All other messages and signals will effectively be - * ignored. + * The `PostStop` signal that results from stopping this actor will first be passed to the + * current behavior and then the provided `postStop` callback will be invoked. + * All other messages and signals will effectively be ignored. */ - def stopped[T](postStop: Behavior[T]): Behavior[T] = Behavior.stopped(postStop) + def stopped[T](postStop: Effect): Behavior[T] = Behavior.stopped(postStop.apply _) /** * A behavior that treats every incoming message as unhandled. @@ -282,6 +282,7 @@ object Behaviors { * Support for scheduled `self` messages in an actor. * It takes care of the lifecycle of the timers such as cancelling them when the actor * is restarted or stopped. + * * @see [[TimerScheduler]] */ def withTimers[T](factory: akka.japi.function.Function[TimerScheduler[T], Behavior[T]]): Behavior[T] = diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala index c19b19a8d7..33c82851d0 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala @@ -51,7 +51,7 @@ object Behaviors { * shall terminate voluntarily. If this actor has created child actors then * these will be stopped as part of the shutdown procedure. * - * The PostStop signal that results from stopping this actor will be passed to the + * The `PostStop` signal that results from stopping this actor will be passed to the * current behavior. All other messages and signals will effectively be * ignored. */ @@ -62,11 +62,11 @@ object Behaviors { * shall terminate voluntarily. If this actor has created child actors then * these will be stopped as part of the shutdown procedure. * - * The PostStop signal that results from stopping this actor will be passed to the - * given `postStop` behavior. All other messages and signals will effectively be - * ignored. + * The `PostStop` signal that results from stopping this actor will first be passed to the + * current behavior and then the provided `postStop` callback will be invoked. + * All other messages and signals will effectively be ignored. */ - def stopped[T](postStop: Behavior[T]): Behavior[T] = Behavior.stopped(postStop) + def stopped[T](postStop: () => Unit): Behavior[T] = Behavior.stopped(postStop) /** * A behavior that treats every incoming message as unhandled. @@ -215,6 +215,7 @@ object Behaviors { * Support for scheduled `self` messages in an actor. * It takes care of the lifecycle of the timers such as cancelling them when the actor * is restarted or stopped. + * * @see [[TimerScheduler]] */ def withTimers[T](factory: TimerScheduler[T] => Behavior[T]): Behavior[T] = diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala index e2616c199f..cddf92f536 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala @@ -125,18 +125,19 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( try { eventSourcedSetup.onSignal(signal) } catch { - case NonFatal(ex) ⇒ + case NonFatal(ex) => ctx.asScala.log.error(ex, s"Error while processing signal [{}]", signal) } nextBehavior } + override def toString: String = "onStopInterceptor" } val widened = RequestingRecoveryPermit(eventSourcedSetup).widen[Any] { - case res: JournalProtocol.Response ⇒ InternalProtocol.JournalResponse(res) - case res: SnapshotProtocol.Response ⇒ InternalProtocol.SnapshotterResponse(res) - case RecoveryPermitter.RecoveryPermitGranted ⇒ InternalProtocol.RecoveryPermitGranted - case internal: InternalProtocol ⇒ internal // such as RecoveryTickEvent - case cmd: Command @unchecked ⇒ InternalProtocol.IncomingCommand(cmd) + case res: JournalProtocol.Response => InternalProtocol.JournalResponse(res) + case res: SnapshotProtocol.Response => InternalProtocol.SnapshotterResponse(res) + case RecoveryPermitter.RecoveryPermitGranted => InternalProtocol.RecoveryPermitGranted + case internal: InternalProtocol => internal // such as RecoveryTickEvent + case cmd: Command @unchecked => InternalProtocol.IncomingCommand(cmd) } Behaviors.intercept(onStopInterceptor)(widened).narrow[Command] }