diff --git a/akka-typed-tests/src/test/java/akka/typed/javadsl/ActorCompile.java b/akka-typed-tests/src/test/java/akka/typed/javadsl/ActorCompile.java index e14f1950db..d81a5e7d42 100644 --- a/akka-typed-tests/src/test/java/akka/typed/javadsl/ActorCompile.java +++ b/akka-typed-tests/src/test/java/akka/typed/javadsl/ActorCompile.java @@ -42,6 +42,7 @@ public class ActorCompile { return monitor(self, ignore()); }); Behavior actor9 = widened(actor7, pf -> pf.match(MyMsgA.class, x -> x)); + Behavior actor10 = immutable((ctx, msg) -> stopped(actor4), (ctx, signal) -> same()); ActorSystem system = ActorSystem.create("Sys", actor1); diff --git a/akka-typed-tests/src/test/scala/akka/typed/ActorContextSpec.scala b/akka-typed-tests/src/test/scala/akka/typed/ActorContextSpec.scala index e2072bec83..b5b2667f4f 100644 --- a/akka-typed-tests/src/test/scala/akka/typed/ActorContextSpec.scala +++ b/akka-typed-tests/src/test/scala/akka/typed/ActorContextSpec.scala @@ -72,7 +72,7 @@ object ActorContextSpec { final case class GetAdapter(replyTo: ActorRef[Adapter], name: String = "") extends Command final case class Adapter(a: ActorRef[Command]) extends Event - def subject(monitor: ActorRef[Monitor]): Behavior[Command] = + def subject(monitor: ActorRef[Monitor], ignorePostStop: Boolean): Behavior[Command] = Actor.immutable[Command] { (ctx, message) ⇒ message match { @@ -87,13 +87,13 @@ object ActorContextSpec { Actor.unhandled case Renew(replyTo) ⇒ replyTo ! Renewed - subject(monitor) + subject(monitor, ignorePostStop) case Throw(ex) ⇒ throw ex case MkChild(name, mon, replyTo) ⇒ val child = name match { - case None ⇒ ctx.spawnAnonymous(Actor.restarter[Throwable]().wrap(subject(mon))) - case Some(n) ⇒ ctx.spawn(Actor.restarter[Throwable]().wrap(subject(mon)), n) + case None ⇒ ctx.spawnAnonymous(Actor.restarter[Throwable]().wrap(subject(mon, ignorePostStop))) + case Some(n) ⇒ ctx.spawn(Actor.restarter[Throwable]().wrap(subject(mon, ignorePostStop)), n) } replyTo ! Created(child) Actor.same @@ -108,7 +108,8 @@ object ActorContextSpec { replyTo ! Scheduled ctx.schedule(delay, target, msg) Actor.same - case Stop ⇒ Actor.stopped + case Stop ⇒ + Actor.stopped case Kill(ref, replyTo) ⇒ if (ctx.stop(ref)) replyTo ! Killed else replyTo ! NotKilled @@ -145,7 +146,8 @@ object ActorContextSpec { Actor.immutable[Command] { case (_, _) ⇒ Actor.unhandled } onSignal { - case (_, Terminated(_)) ⇒ Actor.unhandled + case (_, PostStop) if ignorePostStop ⇒ Actor.same // ignore PostStop here + case (_, Terminated(_)) ⇒ Actor.unhandled case (_, sig) ⇒ monitor ! GotSignal(sig) Actor.same @@ -155,10 +157,11 @@ object ActorContextSpec { Actor.same } } onSignal { - case (ctx, signal) ⇒ monitor ! GotSignal(signal); Actor.same + case (_, PostStop) if ignorePostStop ⇒ Actor.same // ignore PostStop here + case (ctx, signal) ⇒ monitor ! GotSignal(signal); Actor.same } - def oldSubject(monitor: ActorRef[Monitor]): Behavior[Command] = { + def oldSubject(monitor: ActorRef[Monitor], ignorePostStop: Boolean): Behavior[Command] = { Actor.immutable[Command] { case (ctx, message) ⇒ message match { case ReceiveTimeout ⇒ @@ -172,13 +175,13 @@ object ActorContextSpec { Actor.unhandled case Renew(replyTo) ⇒ replyTo ! Renewed - subject(monitor) + subject(monitor, ignorePostStop) case Throw(ex) ⇒ throw ex case MkChild(name, mon, replyTo) ⇒ val child = name match { - case None ⇒ ctx.spawnAnonymous(Actor.restarter[Throwable]().wrap(subject(mon))) - case Some(n) ⇒ ctx.spawn(Actor.restarter[Throwable]().wrap(subject(mon)), n) + case None ⇒ ctx.spawnAnonymous(Actor.restarter[Throwable]().wrap(subject(mon, ignorePostStop))) + case Some(n) ⇒ ctx.spawn(Actor.restarter[Throwable]().wrap(subject(mon, ignorePostStop)), n) } replyTo ! Created(child) Actor.same @@ -193,7 +196,8 @@ object ActorContextSpec { replyTo ! Scheduled ctx.schedule(delay, target, msg) Actor.same - case Stop ⇒ Actor.stopped + case Stop ⇒ + Actor.stopped case Kill(ref, replyTo) ⇒ if (ctx.stop(ref)) replyTo ! Killed else replyTo ! NotKilled @@ -230,7 +234,8 @@ object ActorContextSpec { Actor.immutable[Command] { case _ ⇒ Actor.unhandled } onSignal { - case (_, Terminated(_)) ⇒ Actor.unhandled + case (_, PostStop) if ignorePostStop ⇒ Actor.same // ignore PostStop here + case (_, Terminated(_)) ⇒ Actor.unhandled case (_, sig) ⇒ monitor ! GotSignal(sig) Actor.same @@ -240,6 +245,7 @@ object ActorContextSpec { Actor.same } } onSignal { + case (_, PostStop) if ignorePostStop ⇒ Actor.same // ignore PostStop here case (_, signal) ⇒ monitor ! GotSignal(signal) Actor.same @@ -270,7 +276,7 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString( /** * The behavior against which to run all the tests. */ - def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] + def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command] implicit def system: ActorSystem[TypedSpec.Command] @@ -278,10 +284,10 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString( if (system eq nativeSystem) suite + "Native" else suite + "Adapted" - def setup(name: String, wrapper: Option[Actor.Restarter[_]] = None)( + def setup(name: String, wrapper: Option[Actor.Restarter[_]] = None, ignorePostStop: Boolean = true)( proc: (scaladsl.ActorContext[Event], StepWise.Steps[Event, ActorRef[Command]]) ⇒ StepWise.Steps[Event, _]): Future[TypedSpec.Status] = runTest(s"$mySuite-$name")(StepWise[Event] { (ctx, startWith) ⇒ - val props = wrapper.map(_.wrap(behavior(ctx))).getOrElse(behavior(ctx)) + val props = wrapper.map(_.wrap(behavior(ctx, ignorePostStop))).getOrElse(behavior(ctx, ignorePostStop)) val steps = startWith.withKeepTraces(true)(ctx.spawn(props, "subject")) proc(ctx, steps) @@ -344,29 +350,30 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString( } }) - def `01 must correctly wire the lifecycle hooks`(): Unit = sync(setup("ctx01", Some(Actor.restarter[Throwable]())) { (ctx, startWith) ⇒ - val self = ctx.self - val ex = new Exception("KABOOM1") - startWith { subj ⇒ - val log = muteExpectedException[Exception]("KABOOM1", occurrences = 1) - subj ! Throw(ex) - (subj, log) - }.expectMessage(expectTimeout) { - case (msg, (subj, log)) ⇒ - msg should ===(GotSignal(PreRestart)) - log.assertDone(expectTimeout) - ctx.stop(subj) - }.expectMessage(expectTimeout) { (msg, _) ⇒ - msg should ===(GotSignal(PostStop)) - } - }) + def `01 must correctly wire the lifecycle hooks`(): Unit = + sync(setup("ctx01", Some(Actor.restarter[Throwable]()), ignorePostStop = false) { (ctx, startWith) ⇒ + val self = ctx.self + val ex = new Exception("KABOOM1") + startWith { subj ⇒ + val log = muteExpectedException[Exception]("KABOOM1", occurrences = 1) + subj ! Throw(ex) + (subj, log) + }.expectMessage(expectTimeout) { + case (msg, (subj, log)) ⇒ + msg should ===(GotSignal(PreRestart)) + log.assertDone(expectTimeout) + ctx.stop(subj) + }.expectMessage(expectTimeout) { (msg, _) ⇒ + msg should ===(GotSignal(PostStop)) + } + }) - def `02 must not signal PostStop after voluntary termination`(): Unit = sync(setup("ctx02") { (ctx, startWith) ⇒ + def `02 must signal PostStop after voluntary termination`(): Unit = sync(setup("ctx02", ignorePostStop = false) { (ctx, startWith) ⇒ startWith.keep { subj ⇒ - ctx.watch(subj) stop(subj) - }.expectTermination(expectTimeout) { (t, subj) ⇒ - t.ref should ===(subj) + }.expectMessage(expectTimeout) { + case (msg, _) ⇒ + msg should ===(GotSignal(PostStop)) } }) @@ -440,7 +447,7 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString( }.stimulate(_ ! Ping(self), _ ⇒ Pong2) }) - def `07 must stop upon Stop`(): Unit = sync(setup("ctx07") { (ctx, startWith) ⇒ + def `07 must stop upon Stop`(): Unit = sync(setup("ctx07", ignorePostStop = false) { (ctx, startWith) ⇒ val self = ctx.self val ex = new Exception("KABOOM07") startWith @@ -457,7 +464,7 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString( val self = ctx.self startWith.mkChild(Some("A"), ctx.spawnAdapter(ChildEvent), self) { case (subj, child) ⇒ - val other = ctx.spawn(behavior(ctx), "A") + val other = ctx.spawn(behavior(ctx, ignorePostStop = true), "A") subj ! Kill(other, ctx.self) child }.expectMessageKeep(expectTimeout) { (msg, _) ⇒ @@ -515,7 +522,7 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString( } }) - def `13 must terminate upon not handling Terminated`(): Unit = sync(setup("ctx13") { (ctx, startWith) ⇒ + def `13 must terminate upon not handling Terminated`(): Unit = sync(setup("ctx13", ignorePostStop = false) { (ctx, startWith) ⇒ val self = ctx.self startWith.mkChild(None, ctx.spawnAdapter(ChildEvent), self).keep { case (subj, child) ⇒ @@ -531,6 +538,9 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString( child ! Stop }.expectMessage(expectTimeout) { case (msg, (subj, child)) ⇒ + msg should ===(ChildEvent(GotSignal(PostStop))) + }.expectMessage(expectTimeout) { + case (msg, _) ⇒ msg should ===(GotSignal(PostStop)) } }) @@ -579,7 +589,7 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString( } }) - def `40 must create a working adapter`(): Unit = sync(setup("ctx40") { (ctx, startWith) ⇒ + def `40 must create a working adapter`(): Unit = sync(setup("ctx40", ignorePostStop = false) { (ctx, startWith) ⇒ startWith.keep { subj ⇒ subj ! GetAdapter(ctx.self) }.expectMessage(expectTimeout) { (msg, subj) ⇒ @@ -609,8 +619,8 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString( trait Normal extends Tests { override def suite = "normal" - override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] = - subject(ctx.self) + override def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command] = + subject(ctx.self, ignorePostStop) } object `An ActorContext (native)` extends Normal with NativeSystem object `An ActorContext (adapted)` extends Normal with AdaptedSystem @@ -618,32 +628,32 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString( trait Widened extends Tests { import Actor._ override def suite = "widened" - override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] = - subject(ctx.self).widen { case x ⇒ x } + override def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command] = + subject(ctx.self, ignorePostStop).widen { case x ⇒ x } } object `An ActorContext with widened Behavior (native)` extends Widened with NativeSystem object `An ActorContext with widened Behavior (adapted)` extends Widened with AdaptedSystem trait Deferred extends Tests { override def suite = "deferred" - override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] = - Actor.deferred(_ ⇒ subject(ctx.self)) + override def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command] = + Actor.deferred(_ ⇒ subject(ctx.self, ignorePostStop)) } object `An ActorContext with deferred Behavior (native)` extends Deferred with NativeSystem object `An ActorContext with deferred Behavior (adapted)` extends Deferred with AdaptedSystem trait NestedDeferred extends Tests { override def suite = "deferred" - override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] = - Actor.deferred(_ ⇒ Actor.deferred(_ ⇒ subject(ctx.self))) + override def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command] = + Actor.deferred(_ ⇒ Actor.deferred(_ ⇒ subject(ctx.self, ignorePostStop))) } object `An ActorContext with nested deferred Behavior (native)` extends NestedDeferred with NativeSystem object `An ActorContext with nested deferred Behavior (adapted)` extends NestedDeferred with AdaptedSystem trait Tap extends Tests { override def suite = "tap" - override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] = - Actor.tap((_, _) ⇒ (), (_, _) ⇒ (), subject(ctx.self)) + override def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command] = + Actor.tap((_, _) ⇒ (), (_, _) ⇒ (), subject(ctx.self, ignorePostStop)) } object `An ActorContext with Tap (old-native)` extends Tap with NativeSystem object `An ActorContext with Tap (old-adapted)` extends Tap with AdaptedSystem diff --git a/akka-typed-tests/src/test/scala/akka/typed/StepWise.scala b/akka-typed-tests/src/test/scala/akka/typed/StepWise.scala index 30321069eb..b4d4d61511 100644 --- a/akka-typed-tests/src/test/scala/akka/typed/StepWise.scala +++ b/akka-typed-tests/src/test/scala/akka/typed/StepWise.scala @@ -141,6 +141,9 @@ object StepWise { ctx.cancelReceiveTimeout() run(ctx, tail, f(msg, value)) } onSignal { + case (_, PostStop) ⇒ + // ignore PostStop here + run(ctx, ops, value) case (_, other) ⇒ throwIllegalState(trace, s"unexpected $other while waiting for a message") } case MultiMessage(t, c, f, trace) :: tail ⇒ @@ -157,6 +160,9 @@ object StepWise { run(ctx, tail, f((msg :: acc).reverse, value)) } else behavior(nextCount, msg :: acc) } onSignal { + case (_, PostStop) ⇒ + // ignore PostStop here + run(ctx, ops, value) case (_, other) ⇒ throwIllegalState(trace, s"unexpected $other while waiting for $c messages (got $count valid ones)") } } @@ -175,6 +181,9 @@ object StepWise { run(ctx, tail, f((Right(msg) :: acc).reverse, value)) } else behavior(nextCount, Right(msg) :: acc) } onSignal { + case (_, PostStop) ⇒ + // ignore PostStop here + run(ctx, ops, value) case (_, other) ⇒ val nextCount = count + 1 if (nextCount == c) { @@ -190,12 +199,16 @@ object StepWise { case (_, ReceiveTimeout) ⇒ throwTimeout(trace, s"timeout of $t expired while waiting for termination") case other ⇒ throwIllegalState(trace, s"unexpected $other while waiting for termination") } onSignal { + case (_, PostStop) ⇒ + // ignore PostStop here + run(ctx, ops, value) case (_, t: Terminated) ⇒ ctx.cancelReceiveTimeout() run(ctx, tail, f(t, value)) case other ⇒ throwIllegalState(trace, s"unexpected $other while waiting for termination") } - case Nil ⇒ stopped + case Nil ⇒ + stopped } } diff --git a/akka-typed-tests/src/test/scala/akka/typed/TimerSpec.scala b/akka-typed-tests/src/test/scala/akka/typed/TimerSpec.scala index 09707eebae..7bbb3b2f5d 100644 --- a/akka-typed-tests/src/test/scala/akka/typed/TimerSpec.scala +++ b/akka-typed-tests/src/test/scala/akka/typed/TimerSpec.scala @@ -6,7 +6,6 @@ package akka.typed import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.atomic.AtomicReference import scala.concurrent.duration._ import scala.util.control.NoStackTrace @@ -16,7 +15,6 @@ import akka.typed.scaladsl.AskPattern._ import akka.typed.scaladsl.TimerScheduler import akka.typed.testkit.TestKitSettings import akka.typed.testkit.scaladsl._ -import org.scalatest.concurrent.Eventually.eventually @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class TimerSpec extends TypedSpec(""" @@ -96,6 +94,7 @@ class TimerSpec extends TypedSpec(""" probe.expectNoMsg(100.millis) ref ! End + probe.expectMsg(GotPostStop(false)) } def `02 must schedule repeated ticks`(): Unit = { @@ -113,6 +112,7 @@ class TimerSpec extends TypedSpec(""" } ref ! End + probe.expectMsg(GotPostStop(false)) } def `03 must replace timer`(): Unit = { @@ -132,6 +132,7 @@ class TimerSpec extends TypedSpec(""" probe.expectMsg(Tock(2)) ref ! End + probe.expectMsg(GotPostStop(false)) } def `04 must cancel timer`(): Unit = { @@ -147,6 +148,7 @@ class TimerSpec extends TypedSpec(""" probe.expectNoMsg(dilatedInterval + 100.millis) ref ! End + probe.expectMsg(GotPostStop(false)) } def `05 must discard timers from old incarnation after restart, alt 1`(): Unit = { @@ -170,6 +172,7 @@ class TimerSpec extends TypedSpec(""" probe.expectMsg(Tock(2)) ref ! End + probe.expectMsg(GotPostStop(false)) } def `06 must discard timers from old incarnation after restart, alt 2`(): Unit = { @@ -195,6 +198,7 @@ class TimerSpec extends TypedSpec(""" probe.expectMsg(Tock(1)) ref ! End + probe.expectMsg(GotPostStop(false)) } def `07 must cancel timers when stopped from exception`(): Unit = { @@ -210,18 +214,13 @@ class TimerSpec extends TypedSpec(""" def `08 must cancel timers when stopped voluntarily`(): Unit = { val probe = TestProbe[Event]("evt") - val timerRef = new AtomicReference[TimerScheduler[Command]] val behv = Actor.withTimers[Command] { timer ⇒ - timerRef.set(timer) timer.startPeriodicTimer("T", Tick(1), interval) target(probe.ref, timer, 1) } val ref = start(behv) ref ! End - // PostStop is not signalled when stopped voluntarily - eventually { - timerRef.get().isTimerActive("T") should ===(false) - } + probe.expectMsg(GotPostStop(false)) } } diff --git a/akka-typed/src/main/scala/akka/typed/Behavior.scala b/akka-typed/src/main/scala/akka/typed/Behavior.scala index bd1576964c..777248fa59 100644 --- a/akka-typed/src/main/scala/akka/typed/Behavior.scala +++ b/akka-typed/src/main/scala/akka/typed/Behavior.scala @@ -95,15 +95,30 @@ object Behavior { * behaviors that delegate (partial) handling to other behaviors. */ def unhandled[T]: Behavior[T] = UnhandledBehavior.asInstanceOf[Behavior[T]] + /** * Return this behavior from message processing to signal that this actor * shall terminate voluntarily. If this actor has created child actors then - * these will be stopped as part of the shutdown procedure. The PostStop - * signal that results from stopping this actor will NOT be passed to the - * current behavior, it will be effectively ignored. + * these will be stopped as part of the shutdown procedure. + * + * The PostStop signal that results from stopping this actor will be passed to the + * current behavior. All other messages and signals will effectively be + * ignored. */ def stopped[T]: Behavior[T] = StoppedBehavior.asInstanceOf[Behavior[T]] + /** + * Return this behavior from message processing to signal that this actor + * shall terminate voluntarily. If this actor has created child actors then + * these will be stopped as part of the shutdown procedure. + * + * The PostStop signal that results from stopping this actor will be passed to the + * given `postStop` behavior. All other messages and signals will effectively be + * ignored. + */ + def stopped[T](postStop: Behavior[T]): Behavior[T] = + new StoppedBehavior(OptionVal.Some(postStop)) + /** * A behavior that treats every incoming message as unhandled. */ @@ -169,7 +184,13 @@ object Behavior { /** * INTERNAL API. */ - private[akka] object StoppedBehavior extends Behavior[Nothing] { + private[akka] object StoppedBehavior extends StoppedBehavior[Nothing](OptionVal.None) + + /** + * INTERNAL API: When the cell is stopping this behavior is used, so + * that PostStop can be sent to previous behavior from `finishTerminate`. + */ + private[akka] class StoppedBehavior[T](val postStop: OptionVal[Behavior[T]]) extends Behavior[T] { override def toString = "Stopped" } @@ -211,7 +232,10 @@ object Behavior { /** * Returns true if the given behavior is not stopped. */ - def isAlive[T](behavior: Behavior[T]): Boolean = behavior ne StoppedBehavior + def isAlive[T](behavior: Behavior[T]): Boolean = behavior match { + case _: StoppedBehavior[_] ⇒ false + case _ ⇒ true + } /** * Returns true if the given behavior is the special `unhandled` marker. @@ -243,7 +267,7 @@ object Behavior { case SameBehavior | UnhandledBehavior ⇒ throw new IllegalArgumentException(s"cannot execute with [$behavior] as behavior") case d: DeferredBehavior[_] ⇒ throw new IllegalArgumentException(s"deferred [$d] should not be passed to interpreter") case IgnoreBehavior ⇒ SameBehavior.asInstanceOf[Behavior[T]] - case StoppedBehavior ⇒ StoppedBehavior.asInstanceOf[Behavior[T]] + case s: StoppedBehavior[T] ⇒ s case EmptyBehavior ⇒ UnhandledBehavior.asInstanceOf[Behavior[T]] case ext: ExtensibleBehavior[T] ⇒ val possiblyDeferredResult = msg match { diff --git a/akka-typed/src/main/scala/akka/typed/MessageAndSignals.scala b/akka-typed/src/main/scala/akka/typed/MessageAndSignals.scala index c5a82a8c9e..421b5a676d 100644 --- a/akka-typed/src/main/scala/akka/typed/MessageAndSignals.scala +++ b/akka-typed/src/main/scala/akka/typed/MessageAndSignals.scala @@ -42,10 +42,6 @@ final case object PreRestart extends PreRestart { * Lifecycle signal that is fired after this actor and all its child actors * (transitively) have terminated. The [[Terminated]] signal is only sent to * registered watchers after this signal has been processed. - * - * IMPORTANT NOTE: if the actor terminated by switching to the - * `Stopped` behavior then this signal will be ignored (i.e. the - * Stopped behavior will do nothing in reaction to it). */ sealed abstract class PostStop extends Signal final case object PostStop extends PostStop { diff --git a/akka-typed/src/main/scala/akka/typed/internal/ActorCell.scala b/akka-typed/src/main/scala/akka/typed/internal/ActorCell.scala index f3a6af6bd8..8f1d2dacb4 100644 --- a/akka-typed/src/main/scala/akka/typed/internal/ActorCell.scala +++ b/akka-typed/src/main/scala/akka/typed/internal/ActorCell.scala @@ -18,6 +18,8 @@ import scala.util.control.NonFatal import scala.util.control.Exception.Catcher import akka.event.Logging.Error import akka.event.Logging +import akka.typed.Behavior.StoppedBehavior +import akka.util.OptionVal /** * INTERNAL API @@ -324,8 +326,25 @@ private[typed] class ActorCell[T]( protected def next(b: Behavior[T], msg: Any): Unit = { if (Behavior.isUnhandled(b)) unhandled(msg) - behavior = Behavior.canonicalize(b, behavior, ctx) - if (!Behavior.isAlive(behavior)) self.sendSystem(Terminate()) + else { + b match { + case s: StoppedBehavior[T] ⇒ + // use StoppedBehavior with previous behavior or an explicitly given `postStop` behavior + // until Terminate is received, i.e until finishTerminate is invoked, and there PostStop + // will be signaled to the previous/postStop behavior + s.postStop match { + case OptionVal.None ⇒ + // use previous as the postStop behavior + behavior = new Behavior.StoppedBehavior(OptionVal.Some(behavior)) + case OptionVal.Some(postStop) ⇒ + // use the given postStop behavior, but canonicalize it + behavior = new Behavior.StoppedBehavior(OptionVal.Some(Behavior.canonicalize(postStop, behavior, ctx))) + } + self.sendSystem(Terminate()) + case _ ⇒ + behavior = Behavior.canonicalize(b, behavior, ctx) + } + } } private def unhandled(msg: Any): Unit = msg match { diff --git a/akka-typed/src/main/scala/akka/typed/internal/BehaviorImpl.scala b/akka-typed/src/main/scala/akka/typed/internal/BehaviorImpl.scala index eceff42b7a..2e0d9d4812 100644 --- a/akka-typed/src/main/scala/akka/typed/internal/BehaviorImpl.scala +++ b/akka-typed/src/main/scala/akka/typed/internal/BehaviorImpl.scala @@ -139,7 +139,7 @@ import scala.annotation.tailrec private def canonical(b: Behavior[T], ctx: ActorContext[T]): Behavior[T] = { if (isUnhandled(b)) unhandled else if ((b eq SameBehavior) || (b eq this)) same - else if (!Behavior.isAlive(b)) Behavior.stopped + else if (!Behavior.isAlive(b)) b else { b match { case d: DeferredBehavior[T] ⇒ canonical(Behavior.undefer(d, ctx), ctx) diff --git a/akka-typed/src/main/scala/akka/typed/internal/Restarter.scala b/akka-typed/src/main/scala/akka/typed/internal/Restarter.scala index b2634e8e0e..f233ffbc5d 100644 --- a/akka-typed/src/main/scala/akka/typed/internal/Restarter.scala +++ b/akka-typed/src/main/scala/akka/typed/internal/Restarter.scala @@ -87,7 +87,7 @@ import akka.typed.scaladsl.Actor protected final def canonical(b: Behavior[T], ctx: ActorContext[T], afterException: Boolean): Behavior[T] = if (Behavior.isUnhandled(b)) Behavior.unhandled else if ((b eq Behavior.SameBehavior) || (b eq behavior)) Behavior.same - else if (!Behavior.isAlive(b)) Behavior.stopped + else if (!Behavior.isAlive(b)) b else { b match { case d: DeferredBehavior[T] ⇒ canonical(Behavior.undefer(d, ctx), ctx, afterException) diff --git a/akka-typed/src/main/scala/akka/typed/internal/SupervisionMechanics.scala b/akka-typed/src/main/scala/akka/typed/internal/SupervisionMechanics.scala index c8f33b3e6f..fb9d72df37 100644 --- a/akka-typed/src/main/scala/akka/typed/internal/SupervisionMechanics.scala +++ b/akka-typed/src/main/scala/akka/typed/internal/SupervisionMechanics.scala @@ -7,6 +7,8 @@ package internal import scala.util.control.NonFatal import akka.event.Logging import akka.typed.Behavior.{ DeferredBehavior, undefer, validateAsInitial } +import akka.typed.Behavior.StoppedBehavior +import akka.util.OptionVal /** * INTERNAL API @@ -88,10 +90,18 @@ private[typed] trait SupervisionMechanics[T] { /* * The following order is crucial for things to work properly. Only change this if you're very confident and lucky. * - * Do not undefer a DeferredBehavior as that may cause creation side-effects, which we do not want on termination. + * */ - try if ((a ne null) && !a.isInstanceOf[DeferredBehavior[_]]) Behavior.interpretSignal(a, ctx, PostStop) - catch { case NonFatal(ex) ⇒ publish(Logging.Error(ex, self.path.toString, clazz(a), "failure during PostStop")) } + try a match { + case null ⇒ // skip PostStop + case _: DeferredBehavior[_] ⇒ + // Do not undefer a DeferredBehavior as that may cause creation side-effects, which we do not want on termination. + case s: StoppedBehavior[_] ⇒ s.postStop match { + case OptionVal.Some(postStop) ⇒ Behavior.interpretSignal(postStop, ctx, PostStop) + case OptionVal.None ⇒ // no postStop behavior defined + } + case _ ⇒ Behavior.interpretSignal(a, ctx, PostStop) + } catch { case NonFatal(ex) ⇒ publish(Logging.Error(ex, self.path.toString, clazz(a), "failure during PostStop")) } finally try tellWatchersWeDied() finally try parent.sendSystem(DeathWatchNotification(self, failed)) finally { diff --git a/akka-typed/src/main/scala/akka/typed/internal/TimerSchedulerImpl.scala b/akka-typed/src/main/scala/akka/typed/internal/TimerSchedulerImpl.scala index 8b3bf4cf7c..a123f5acfd 100644 --- a/akka-typed/src/main/scala/akka/typed/internal/TimerSchedulerImpl.scala +++ b/akka-typed/src/main/scala/akka/typed/internal/TimerSchedulerImpl.scala @@ -139,13 +139,8 @@ import scala.reflect.ClassTag } true }, - afterMessage = (ctx, msg, b) ⇒ { - // PostStop is not signaled when voluntarily stopped - if (!Behavior.isAlive(b)) - cancelAll() - b - }, - afterSignal = (ctx, sig, b) ⇒ b, // TODO optimize by using more ConstantFun + afterMessage = (ctx, msg, b) ⇒ b, // TODO optimize by using more ConstantFun + afterSignal = (ctx, sig, b) ⇒ b, behavior)(ClassTag(classOf[TimerSchedulerImpl.TimerMsg])) } diff --git a/akka-typed/src/main/scala/akka/typed/internal/adapter/ActorAdapter.scala b/akka-typed/src/main/scala/akka/typed/internal/adapter/ActorAdapter.scala index 272900b979..81e6170473 100644 --- a/akka-typed/src/main/scala/akka/typed/internal/adapter/ActorAdapter.scala +++ b/akka-typed/src/main/scala/akka/typed/internal/adapter/ActorAdapter.scala @@ -7,6 +7,7 @@ package adapter import akka.{ actor ⇒ a } import akka.annotation.InternalApi +import akka.util.OptionVal /** * INTERNAL API @@ -39,9 +40,26 @@ import akka.annotation.InternalApi } private def next(b: Behavior[T], msg: Any): Unit = { - if (isUnhandled(b)) unhandled(msg) - behavior = canonicalize(b, behavior, ctx) - if (!isAlive(behavior)) context.stop(self) + if (Behavior.isUnhandled(b)) unhandled(msg) + else { + b match { + case s: StoppedBehavior[T] ⇒ + // use StoppedBehavior with previous behavior or an explicitly given `postStop` behavior + // until Terminate is received, i.e until postStop is invoked, and there PostStop + // will be signaled to the previous/postStop behavior + s.postStop match { + case OptionVal.None ⇒ + // use previous as the postStop behavior + behavior = new Behavior.StoppedBehavior(OptionVal.Some(behavior)) + case OptionVal.Some(postStop) ⇒ + // use the given postStop behavior, but canonicalize it + behavior = new Behavior.StoppedBehavior(OptionVal.Some(Behavior.canonicalize(postStop, behavior, ctx))) + } + context.stop(self) + case _ ⇒ + behavior = Behavior.canonicalize(b, behavior, ctx) + } + } } override def unhandled(msg: Any): Unit = msg match { @@ -59,11 +77,26 @@ import akka.annotation.InternalApi override def preStart(): Unit = behavior = validateAsInitial(undefer(behavior, ctx)) - override def preRestart(reason: Throwable, message: Option[Any]): Unit = - next(Behavior.interpretSignal(behavior, ctx, PreRestart), PreRestart) + + override def preRestart(reason: Throwable, message: Option[Any]): Unit = { + Behavior.interpretSignal(behavior, ctx, PreRestart) + behavior = Behavior.stopped + } + override def postRestart(reason: Throwable): Unit = behavior = validateAsInitial(undefer(behavior, ctx)) + override def postStop(): Unit = { - next(Behavior.interpretSignal(behavior, ctx, PostStop), PostStop) + behavior match { + case null ⇒ // skip PostStop + case _: DeferredBehavior[_] ⇒ + // Do not undefer a DeferredBehavior as that may cause creation side-effects, which we do not want on termination. + case s: StoppedBehavior[_] ⇒ s.postStop match { + case OptionVal.Some(postStop) ⇒ Behavior.interpretSignal(postStop, ctx, PostStop) + case OptionVal.None ⇒ // no postStop behavior defined + } + case b ⇒ Behavior.interpretSignal(b, ctx, PostStop) + } + behavior = Behavior.stopped } } diff --git a/akka-typed/src/main/scala/akka/typed/javadsl/Actor.scala b/akka-typed/src/main/scala/akka/typed/javadsl/Actor.scala index f9f9a9cfb5..ba921ed9d6 100644 --- a/akka-typed/src/main/scala/akka/typed/javadsl/Actor.scala +++ b/akka-typed/src/main/scala/akka/typed/javadsl/Actor.scala @@ -110,12 +110,25 @@ object Actor { /** * Return this behavior from message processing to signal that this actor * shall terminate voluntarily. If this actor has created child actors then - * these will be stopped as part of the shutdown procedure. The PostStop - * signal that results from stopping this actor will NOT be passed to the - * current behavior, it will be effectively ignored. + * these will be stopped as part of the shutdown procedure. + * + * The PostStop signal that results from stopping this actor will be passed to the + * current behavior. All other messages and signals will effectively be + * ignored. */ def stopped[T]: Behavior[T] = Behavior.stopped + /** + * Return this behavior from message processing to signal that this actor + * shall terminate voluntarily. If this actor has created child actors then + * these will be stopped as part of the shutdown procedure. + * + * The PostStop signal that results from stopping this actor will be passed to the + * given `postStop` behavior. All other messages and signals will effectively be + * ignored. + */ + def stopped[T](postStop: Behavior[T]): Behavior[T] = Behavior.stopped(postStop) + /** * A behavior that treats every incoming message as unhandled. */ diff --git a/akka-typed/src/main/scala/akka/typed/scaladsl/Actor.scala b/akka-typed/src/main/scala/akka/typed/scaladsl/Actor.scala index ef81ec29fb..d9c3481677 100644 --- a/akka-typed/src/main/scala/akka/typed/scaladsl/Actor.scala +++ b/akka-typed/src/main/scala/akka/typed/scaladsl/Actor.scala @@ -137,12 +137,25 @@ object Actor { /** * Return this behavior from message processing to signal that this actor * shall terminate voluntarily. If this actor has created child actors then - * these will be stopped as part of the shutdown procedure. The PostStop - * signal that results from stopping this actor will NOT be passed to the - * current behavior, it will be effectively ignored. + * these will be stopped as part of the shutdown procedure. + * + * The PostStop signal that results from stopping this actor will be passed to the + * current behavior. All other messages and signals will effectively be + * ignored. */ def stopped[T]: Behavior[T] = Behavior.stopped + /** + * Return this behavior from message processing to signal that this actor + * shall terminate voluntarily. If this actor has created child actors then + * these will be stopped as part of the shutdown procedure. + * + * The PostStop signal that results from stopping this actor will be passed to the + * given `postStop` behavior. All other messages and signals will effectively be + * ignored. + */ + def stopped[T](postStop: Behavior[T]): Behavior[T] = Behavior.stopped(postStop) + /** * A behavior that treats every incoming message as unhandled. */