Merge pull request #22854 from akka/wip-22840-PostStop-patriknw

signal PostStop also when voluntarily stopped, #22840
This commit is contained in:
Patrik Nordwall 2017-05-05 14:45:56 +02:00 committed by GitHub
commit 144fbb50ef
14 changed files with 221 additions and 95 deletions

View file

@ -42,6 +42,7 @@ public class ActorCompile {
return monitor(self, ignore());
});
Behavior<MyMsg> actor9 = widened(actor7, pf -> pf.match(MyMsgA.class, x -> x));
Behavior<MyMsg> actor10 = immutable((ctx, msg) -> stopped(actor4), (ctx, signal) -> same());
ActorSystem<MyMsg> system = ActorSystem.create("Sys", actor1);

View file

@ -72,7 +72,7 @@ object ActorContextSpec {
final case class GetAdapter(replyTo: ActorRef[Adapter], name: String = "") extends Command
final case class Adapter(a: ActorRef[Command]) extends Event
def subject(monitor: ActorRef[Monitor]): Behavior[Command] =
def subject(monitor: ActorRef[Monitor], ignorePostStop: Boolean): Behavior[Command] =
Actor.immutable[Command] {
(ctx, message)
message match {
@ -87,13 +87,13 @@ object ActorContextSpec {
Actor.unhandled
case Renew(replyTo)
replyTo ! Renewed
subject(monitor)
subject(monitor, ignorePostStop)
case Throw(ex)
throw ex
case MkChild(name, mon, replyTo)
val child = name match {
case None ctx.spawnAnonymous(Actor.restarter[Throwable]().wrap(subject(mon)))
case Some(n) ctx.spawn(Actor.restarter[Throwable]().wrap(subject(mon)), n)
case None ctx.spawnAnonymous(Actor.restarter[Throwable]().wrap(subject(mon, ignorePostStop)))
case Some(n) ctx.spawn(Actor.restarter[Throwable]().wrap(subject(mon, ignorePostStop)), n)
}
replyTo ! Created(child)
Actor.same
@ -108,7 +108,8 @@ object ActorContextSpec {
replyTo ! Scheduled
ctx.schedule(delay, target, msg)
Actor.same
case Stop Actor.stopped
case Stop
Actor.stopped
case Kill(ref, replyTo)
if (ctx.stop(ref)) replyTo ! Killed
else replyTo ! NotKilled
@ -145,7 +146,8 @@ object ActorContextSpec {
Actor.immutable[Command] {
case (_, _) Actor.unhandled
} onSignal {
case (_, Terminated(_)) Actor.unhandled
case (_, PostStop) if ignorePostStop Actor.same // ignore PostStop here
case (_, Terminated(_)) Actor.unhandled
case (_, sig)
monitor ! GotSignal(sig)
Actor.same
@ -155,10 +157,11 @@ object ActorContextSpec {
Actor.same
}
} onSignal {
case (ctx, signal) monitor ! GotSignal(signal); Actor.same
case (_, PostStop) if ignorePostStop Actor.same // ignore PostStop here
case (ctx, signal) monitor ! GotSignal(signal); Actor.same
}
def oldSubject(monitor: ActorRef[Monitor]): Behavior[Command] = {
def oldSubject(monitor: ActorRef[Monitor], ignorePostStop: Boolean): Behavior[Command] = {
Actor.immutable[Command] {
case (ctx, message) message match {
case ReceiveTimeout
@ -172,13 +175,13 @@ object ActorContextSpec {
Actor.unhandled
case Renew(replyTo)
replyTo ! Renewed
subject(monitor)
subject(monitor, ignorePostStop)
case Throw(ex)
throw ex
case MkChild(name, mon, replyTo)
val child = name match {
case None ctx.spawnAnonymous(Actor.restarter[Throwable]().wrap(subject(mon)))
case Some(n) ctx.spawn(Actor.restarter[Throwable]().wrap(subject(mon)), n)
case None ctx.spawnAnonymous(Actor.restarter[Throwable]().wrap(subject(mon, ignorePostStop)))
case Some(n) ctx.spawn(Actor.restarter[Throwable]().wrap(subject(mon, ignorePostStop)), n)
}
replyTo ! Created(child)
Actor.same
@ -193,7 +196,8 @@ object ActorContextSpec {
replyTo ! Scheduled
ctx.schedule(delay, target, msg)
Actor.same
case Stop Actor.stopped
case Stop
Actor.stopped
case Kill(ref, replyTo)
if (ctx.stop(ref)) replyTo ! Killed
else replyTo ! NotKilled
@ -230,7 +234,8 @@ object ActorContextSpec {
Actor.immutable[Command] {
case _ Actor.unhandled
} onSignal {
case (_, Terminated(_)) Actor.unhandled
case (_, PostStop) if ignorePostStop Actor.same // ignore PostStop here
case (_, Terminated(_)) Actor.unhandled
case (_, sig)
monitor ! GotSignal(sig)
Actor.same
@ -240,6 +245,7 @@ object ActorContextSpec {
Actor.same
}
} onSignal {
case (_, PostStop) if ignorePostStop Actor.same // ignore PostStop here
case (_, signal)
monitor ! GotSignal(signal)
Actor.same
@ -270,7 +276,7 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
/**
* The behavior against which to run all the tests.
*/
def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command]
def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command]
implicit def system: ActorSystem[TypedSpec.Command]
@ -278,10 +284,10 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
if (system eq nativeSystem) suite + "Native"
else suite + "Adapted"
def setup(name: String, wrapper: Option[Actor.Restarter[_]] = None)(
def setup(name: String, wrapper: Option[Actor.Restarter[_]] = None, ignorePostStop: Boolean = true)(
proc: (scaladsl.ActorContext[Event], StepWise.Steps[Event, ActorRef[Command]]) StepWise.Steps[Event, _]): Future[TypedSpec.Status] =
runTest(s"$mySuite-$name")(StepWise[Event] { (ctx, startWith)
val props = wrapper.map(_.wrap(behavior(ctx))).getOrElse(behavior(ctx))
val props = wrapper.map(_.wrap(behavior(ctx, ignorePostStop))).getOrElse(behavior(ctx, ignorePostStop))
val steps = startWith.withKeepTraces(true)(ctx.spawn(props, "subject"))
proc(ctx, steps)
@ -344,29 +350,30 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
}
})
def `01 must correctly wire the lifecycle hooks`(): Unit = sync(setup("ctx01", Some(Actor.restarter[Throwable]())) { (ctx, startWith)
val self = ctx.self
val ex = new Exception("KABOOM1")
startWith { subj
val log = muteExpectedException[Exception]("KABOOM1", occurrences = 1)
subj ! Throw(ex)
(subj, log)
}.expectMessage(expectTimeout) {
case (msg, (subj, log))
msg should ===(GotSignal(PreRestart))
log.assertDone(expectTimeout)
ctx.stop(subj)
}.expectMessage(expectTimeout) { (msg, _)
msg should ===(GotSignal(PostStop))
}
})
def `01 must correctly wire the lifecycle hooks`(): Unit =
sync(setup("ctx01", Some(Actor.restarter[Throwable]()), ignorePostStop = false) { (ctx, startWith)
val self = ctx.self
val ex = new Exception("KABOOM1")
startWith { subj
val log = muteExpectedException[Exception]("KABOOM1", occurrences = 1)
subj ! Throw(ex)
(subj, log)
}.expectMessage(expectTimeout) {
case (msg, (subj, log))
msg should ===(GotSignal(PreRestart))
log.assertDone(expectTimeout)
ctx.stop(subj)
}.expectMessage(expectTimeout) { (msg, _)
msg should ===(GotSignal(PostStop))
}
})
def `02 must not signal PostStop after voluntary termination`(): Unit = sync(setup("ctx02") { (ctx, startWith)
def `02 must signal PostStop after voluntary termination`(): Unit = sync(setup("ctx02", ignorePostStop = false) { (ctx, startWith)
startWith.keep { subj
ctx.watch(subj)
stop(subj)
}.expectTermination(expectTimeout) { (t, subj)
t.ref should ===(subj)
}.expectMessage(expectTimeout) {
case (msg, _)
msg should ===(GotSignal(PostStop))
}
})
@ -440,7 +447,7 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
}.stimulate(_ ! Ping(self), _ Pong2)
})
def `07 must stop upon Stop`(): Unit = sync(setup("ctx07") { (ctx, startWith)
def `07 must stop upon Stop`(): Unit = sync(setup("ctx07", ignorePostStop = false) { (ctx, startWith)
val self = ctx.self
val ex = new Exception("KABOOM07")
startWith
@ -457,7 +464,7 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
val self = ctx.self
startWith.mkChild(Some("A"), ctx.spawnAdapter(ChildEvent), self) {
case (subj, child)
val other = ctx.spawn(behavior(ctx), "A")
val other = ctx.spawn(behavior(ctx, ignorePostStop = true), "A")
subj ! Kill(other, ctx.self)
child
}.expectMessageKeep(expectTimeout) { (msg, _)
@ -515,7 +522,7 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
}
})
def `13 must terminate upon not handling Terminated`(): Unit = sync(setup("ctx13") { (ctx, startWith)
def `13 must terminate upon not handling Terminated`(): Unit = sync(setup("ctx13", ignorePostStop = false) { (ctx, startWith)
val self = ctx.self
startWith.mkChild(None, ctx.spawnAdapter(ChildEvent), self).keep {
case (subj, child)
@ -531,6 +538,9 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
child ! Stop
}.expectMessage(expectTimeout) {
case (msg, (subj, child))
msg should ===(ChildEvent(GotSignal(PostStop)))
}.expectMessage(expectTimeout) {
case (msg, _)
msg should ===(GotSignal(PostStop))
}
})
@ -579,7 +589,7 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
}
})
def `40 must create a working adapter`(): Unit = sync(setup("ctx40") { (ctx, startWith)
def `40 must create a working adapter`(): Unit = sync(setup("ctx40", ignorePostStop = false) { (ctx, startWith)
startWith.keep { subj
subj ! GetAdapter(ctx.self)
}.expectMessage(expectTimeout) { (msg, subj)
@ -609,8 +619,8 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
trait Normal extends Tests {
override def suite = "normal"
override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] =
subject(ctx.self)
override def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command] =
subject(ctx.self, ignorePostStop)
}
object `An ActorContext (native)` extends Normal with NativeSystem
object `An ActorContext (adapted)` extends Normal with AdaptedSystem
@ -618,32 +628,32 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
trait Widened extends Tests {
import Actor._
override def suite = "widened"
override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] =
subject(ctx.self).widen { case x x }
override def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command] =
subject(ctx.self, ignorePostStop).widen { case x x }
}
object `An ActorContext with widened Behavior (native)` extends Widened with NativeSystem
object `An ActorContext with widened Behavior (adapted)` extends Widened with AdaptedSystem
trait Deferred extends Tests {
override def suite = "deferred"
override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] =
Actor.deferred(_ subject(ctx.self))
override def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command] =
Actor.deferred(_ subject(ctx.self, ignorePostStop))
}
object `An ActorContext with deferred Behavior (native)` extends Deferred with NativeSystem
object `An ActorContext with deferred Behavior (adapted)` extends Deferred with AdaptedSystem
trait NestedDeferred extends Tests {
override def suite = "deferred"
override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] =
Actor.deferred(_ Actor.deferred(_ subject(ctx.self)))
override def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command] =
Actor.deferred(_ Actor.deferred(_ subject(ctx.self, ignorePostStop)))
}
object `An ActorContext with nested deferred Behavior (native)` extends NestedDeferred with NativeSystem
object `An ActorContext with nested deferred Behavior (adapted)` extends NestedDeferred with AdaptedSystem
trait Tap extends Tests {
override def suite = "tap"
override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] =
Actor.tap((_, _) (), (_, _) (), subject(ctx.self))
override def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command] =
Actor.tap((_, _) (), (_, _) (), subject(ctx.self, ignorePostStop))
}
object `An ActorContext with Tap (old-native)` extends Tap with NativeSystem
object `An ActorContext with Tap (old-adapted)` extends Tap with AdaptedSystem

View file

@ -141,6 +141,9 @@ object StepWise {
ctx.cancelReceiveTimeout()
run(ctx, tail, f(msg, value))
} onSignal {
case (_, PostStop)
// ignore PostStop here
run(ctx, ops, value)
case (_, other) throwIllegalState(trace, s"unexpected $other while waiting for a message")
}
case MultiMessage(t, c, f, trace) :: tail
@ -157,6 +160,9 @@ object StepWise {
run(ctx, tail, f((msg :: acc).reverse, value))
} else behavior(nextCount, msg :: acc)
} onSignal {
case (_, PostStop)
// ignore PostStop here
run(ctx, ops, value)
case (_, other) throwIllegalState(trace, s"unexpected $other while waiting for $c messages (got $count valid ones)")
}
}
@ -175,6 +181,9 @@ object StepWise {
run(ctx, tail, f((Right(msg) :: acc).reverse, value))
} else behavior(nextCount, Right(msg) :: acc)
} onSignal {
case (_, PostStop)
// ignore PostStop here
run(ctx, ops, value)
case (_, other)
val nextCount = count + 1
if (nextCount == c) {
@ -190,12 +199,16 @@ object StepWise {
case (_, ReceiveTimeout) throwTimeout(trace, s"timeout of $t expired while waiting for termination")
case other throwIllegalState(trace, s"unexpected $other while waiting for termination")
} onSignal {
case (_, PostStop)
// ignore PostStop here
run(ctx, ops, value)
case (_, t: Terminated)
ctx.cancelReceiveTimeout()
run(ctx, tail, f(t, value))
case other throwIllegalState(trace, s"unexpected $other while waiting for termination")
}
case Nil stopped
case Nil
stopped
}
}

View file

@ -6,7 +6,6 @@ package akka.typed
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
@ -16,7 +15,6 @@ import akka.typed.scaladsl.AskPattern._
import akka.typed.scaladsl.TimerScheduler
import akka.typed.testkit.TestKitSettings
import akka.typed.testkit.scaladsl._
import org.scalatest.concurrent.Eventually.eventually
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class TimerSpec extends TypedSpec("""
@ -96,6 +94,7 @@ class TimerSpec extends TypedSpec("""
probe.expectNoMsg(100.millis)
ref ! End
probe.expectMsg(GotPostStop(false))
}
def `02 must schedule repeated ticks`(): Unit = {
@ -113,6 +112,7 @@ class TimerSpec extends TypedSpec("""
}
ref ! End
probe.expectMsg(GotPostStop(false))
}
def `03 must replace timer`(): Unit = {
@ -132,6 +132,7 @@ class TimerSpec extends TypedSpec("""
probe.expectMsg(Tock(2))
ref ! End
probe.expectMsg(GotPostStop(false))
}
def `04 must cancel timer`(): Unit = {
@ -147,6 +148,7 @@ class TimerSpec extends TypedSpec("""
probe.expectNoMsg(dilatedInterval + 100.millis)
ref ! End
probe.expectMsg(GotPostStop(false))
}
def `05 must discard timers from old incarnation after restart, alt 1`(): Unit = {
@ -170,6 +172,7 @@ class TimerSpec extends TypedSpec("""
probe.expectMsg(Tock(2))
ref ! End
probe.expectMsg(GotPostStop(false))
}
def `06 must discard timers from old incarnation after restart, alt 2`(): Unit = {
@ -195,6 +198,7 @@ class TimerSpec extends TypedSpec("""
probe.expectMsg(Tock(1))
ref ! End
probe.expectMsg(GotPostStop(false))
}
def `07 must cancel timers when stopped from exception`(): Unit = {
@ -210,18 +214,13 @@ class TimerSpec extends TypedSpec("""
def `08 must cancel timers when stopped voluntarily`(): Unit = {
val probe = TestProbe[Event]("evt")
val timerRef = new AtomicReference[TimerScheduler[Command]]
val behv = Actor.withTimers[Command] { timer
timerRef.set(timer)
timer.startPeriodicTimer("T", Tick(1), interval)
target(probe.ref, timer, 1)
}
val ref = start(behv)
ref ! End
// PostStop is not signalled when stopped voluntarily
eventually {
timerRef.get().isTimerActive("T") should ===(false)
}
probe.expectMsg(GotPostStop(false))
}
}

View file

@ -95,15 +95,30 @@ object Behavior {
* behaviors that delegate (partial) handling to other behaviors.
*/
def unhandled[T]: Behavior[T] = UnhandledBehavior.asInstanceOf[Behavior[T]]
/**
* Return this behavior from message processing to signal that this actor
* shall terminate voluntarily. If this actor has created child actors then
* these will be stopped as part of the shutdown procedure. The PostStop
* signal that results from stopping this actor will NOT be passed to the
* current behavior, it will be effectively ignored.
* these will be stopped as part of the shutdown procedure.
*
* The PostStop signal that results from stopping this actor will be passed to the
* current behavior. All other messages and signals will effectively be
* ignored.
*/
def stopped[T]: Behavior[T] = StoppedBehavior.asInstanceOf[Behavior[T]]
/**
* Return this behavior from message processing to signal that this actor
* shall terminate voluntarily. If this actor has created child actors then
* these will be stopped as part of the shutdown procedure.
*
* The PostStop signal that results from stopping this actor will be passed to the
* given `postStop` behavior. All other messages and signals will effectively be
* ignored.
*/
def stopped[T](postStop: Behavior[T]): Behavior[T] =
new StoppedBehavior(OptionVal.Some(postStop))
/**
* A behavior that treats every incoming message as unhandled.
*/
@ -169,7 +184,13 @@ object Behavior {
/**
* INTERNAL API.
*/
private[akka] object StoppedBehavior extends Behavior[Nothing] {
private[akka] object StoppedBehavior extends StoppedBehavior[Nothing](OptionVal.None)
/**
* INTERNAL API: When the cell is stopping this behavior is used, so
* that PostStop can be sent to previous behavior from `finishTerminate`.
*/
private[akka] class StoppedBehavior[T](val postStop: OptionVal[Behavior[T]]) extends Behavior[T] {
override def toString = "Stopped"
}
@ -211,7 +232,10 @@ object Behavior {
/**
* Returns true if the given behavior is not stopped.
*/
def isAlive[T](behavior: Behavior[T]): Boolean = behavior ne StoppedBehavior
def isAlive[T](behavior: Behavior[T]): Boolean = behavior match {
case _: StoppedBehavior[_] false
case _ true
}
/**
* Returns true if the given behavior is the special `unhandled` marker.
@ -243,7 +267,7 @@ object Behavior {
case SameBehavior | UnhandledBehavior throw new IllegalArgumentException(s"cannot execute with [$behavior] as behavior")
case d: DeferredBehavior[_] throw new IllegalArgumentException(s"deferred [$d] should not be passed to interpreter")
case IgnoreBehavior SameBehavior.asInstanceOf[Behavior[T]]
case StoppedBehavior StoppedBehavior.asInstanceOf[Behavior[T]]
case s: StoppedBehavior[T] s
case EmptyBehavior UnhandledBehavior.asInstanceOf[Behavior[T]]
case ext: ExtensibleBehavior[T]
val possiblyDeferredResult = msg match {

View file

@ -42,10 +42,6 @@ final case object PreRestart extends PreRestart {
* Lifecycle signal that is fired after this actor and all its child actors
* (transitively) have terminated. The [[Terminated]] signal is only sent to
* registered watchers after this signal has been processed.
*
* <b>IMPORTANT NOTE:</b> if the actor terminated by switching to the
* `Stopped` behavior then this signal will be ignored (i.e. the
* Stopped behavior will do nothing in reaction to it).
*/
sealed abstract class PostStop extends Signal
final case object PostStop extends PostStop {

View file

@ -18,6 +18,8 @@ import scala.util.control.NonFatal
import scala.util.control.Exception.Catcher
import akka.event.Logging.Error
import akka.event.Logging
import akka.typed.Behavior.StoppedBehavior
import akka.util.OptionVal
/**
* INTERNAL API
@ -324,8 +326,25 @@ private[typed] class ActorCell[T](
protected def next(b: Behavior[T], msg: Any): Unit = {
if (Behavior.isUnhandled(b)) unhandled(msg)
behavior = Behavior.canonicalize(b, behavior, ctx)
if (!Behavior.isAlive(behavior)) self.sendSystem(Terminate())
else {
b match {
case s: StoppedBehavior[T]
// use StoppedBehavior with previous behavior or an explicitly given `postStop` behavior
// until Terminate is received, i.e until finishTerminate is invoked, and there PostStop
// will be signaled to the previous/postStop behavior
s.postStop match {
case OptionVal.None
// use previous as the postStop behavior
behavior = new Behavior.StoppedBehavior(OptionVal.Some(behavior))
case OptionVal.Some(postStop)
// use the given postStop behavior, but canonicalize it
behavior = new Behavior.StoppedBehavior(OptionVal.Some(Behavior.canonicalize(postStop, behavior, ctx)))
}
self.sendSystem(Terminate())
case _
behavior = Behavior.canonicalize(b, behavior, ctx)
}
}
}
private def unhandled(msg: Any): Unit = msg match {

View file

@ -139,7 +139,7 @@ import scala.annotation.tailrec
private def canonical(b: Behavior[T], ctx: ActorContext[T]): Behavior[T] = {
if (isUnhandled(b)) unhandled
else if ((b eq SameBehavior) || (b eq this)) same
else if (!Behavior.isAlive(b)) Behavior.stopped
else if (!Behavior.isAlive(b)) b
else {
b match {
case d: DeferredBehavior[T] canonical(Behavior.undefer(d, ctx), ctx)

View file

@ -87,7 +87,7 @@ import akka.typed.scaladsl.Actor
protected final def canonical(b: Behavior[T], ctx: ActorContext[T], afterException: Boolean): Behavior[T] =
if (Behavior.isUnhandled(b)) Behavior.unhandled
else if ((b eq Behavior.SameBehavior) || (b eq behavior)) Behavior.same
else if (!Behavior.isAlive(b)) Behavior.stopped
else if (!Behavior.isAlive(b)) b
else {
b match {
case d: DeferredBehavior[T] canonical(Behavior.undefer(d, ctx), ctx, afterException)

View file

@ -7,6 +7,8 @@ package internal
import scala.util.control.NonFatal
import akka.event.Logging
import akka.typed.Behavior.{ DeferredBehavior, undefer, validateAsInitial }
import akka.typed.Behavior.StoppedBehavior
import akka.util.OptionVal
/**
* INTERNAL API
@ -88,10 +90,18 @@ private[typed] trait SupervisionMechanics[T] {
/*
* The following order is crucial for things to work properly. Only change this if you're very confident and lucky.
*
* Do not undefer a DeferredBehavior as that may cause creation side-effects, which we do not want on termination.
*
*/
try if ((a ne null) && !a.isInstanceOf[DeferredBehavior[_]]) Behavior.interpretSignal(a, ctx, PostStop)
catch { case NonFatal(ex) publish(Logging.Error(ex, self.path.toString, clazz(a), "failure during PostStop")) }
try a match {
case null // skip PostStop
case _: DeferredBehavior[_]
// Do not undefer a DeferredBehavior as that may cause creation side-effects, which we do not want on termination.
case s: StoppedBehavior[_] s.postStop match {
case OptionVal.Some(postStop) Behavior.interpretSignal(postStop, ctx, PostStop)
case OptionVal.None // no postStop behavior defined
}
case _ Behavior.interpretSignal(a, ctx, PostStop)
} catch { case NonFatal(ex) publish(Logging.Error(ex, self.path.toString, clazz(a), "failure during PostStop")) }
finally try tellWatchersWeDied()
finally try parent.sendSystem(DeathWatchNotification(self, failed))
finally {

View file

@ -139,13 +139,8 @@ import scala.reflect.ClassTag
}
true
},
afterMessage = (ctx, msg, b) {
// PostStop is not signaled when voluntarily stopped
if (!Behavior.isAlive(b))
cancelAll()
b
},
afterSignal = (ctx, sig, b) b, // TODO optimize by using more ConstantFun
afterMessage = (ctx, msg, b) b, // TODO optimize by using more ConstantFun
afterSignal = (ctx, sig, b) b,
behavior)(ClassTag(classOf[TimerSchedulerImpl.TimerMsg]))
}

View file

@ -7,6 +7,7 @@ package adapter
import akka.{ actor a }
import akka.annotation.InternalApi
import akka.util.OptionVal
/**
* INTERNAL API
@ -39,9 +40,26 @@ import akka.annotation.InternalApi
}
private def next(b: Behavior[T], msg: Any): Unit = {
if (isUnhandled(b)) unhandled(msg)
behavior = canonicalize(b, behavior, ctx)
if (!isAlive(behavior)) context.stop(self)
if (Behavior.isUnhandled(b)) unhandled(msg)
else {
b match {
case s: StoppedBehavior[T]
// use StoppedBehavior with previous behavior or an explicitly given `postStop` behavior
// until Terminate is received, i.e until postStop is invoked, and there PostStop
// will be signaled to the previous/postStop behavior
s.postStop match {
case OptionVal.None
// use previous as the postStop behavior
behavior = new Behavior.StoppedBehavior(OptionVal.Some(behavior))
case OptionVal.Some(postStop)
// use the given postStop behavior, but canonicalize it
behavior = new Behavior.StoppedBehavior(OptionVal.Some(Behavior.canonicalize(postStop, behavior, ctx)))
}
context.stop(self)
case _
behavior = Behavior.canonicalize(b, behavior, ctx)
}
}
}
override def unhandled(msg: Any): Unit = msg match {
@ -59,11 +77,26 @@ import akka.annotation.InternalApi
override def preStart(): Unit =
behavior = validateAsInitial(undefer(behavior, ctx))
override def preRestart(reason: Throwable, message: Option[Any]): Unit =
next(Behavior.interpretSignal(behavior, ctx, PreRestart), PreRestart)
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
Behavior.interpretSignal(behavior, ctx, PreRestart)
behavior = Behavior.stopped
}
override def postRestart(reason: Throwable): Unit =
behavior = validateAsInitial(undefer(behavior, ctx))
override def postStop(): Unit = {
next(Behavior.interpretSignal(behavior, ctx, PostStop), PostStop)
behavior match {
case null // skip PostStop
case _: DeferredBehavior[_]
// Do not undefer a DeferredBehavior as that may cause creation side-effects, which we do not want on termination.
case s: StoppedBehavior[_] s.postStop match {
case OptionVal.Some(postStop) Behavior.interpretSignal(postStop, ctx, PostStop)
case OptionVal.None // no postStop behavior defined
}
case b Behavior.interpretSignal(b, ctx, PostStop)
}
behavior = Behavior.stopped
}
}

View file

@ -110,12 +110,25 @@ object Actor {
/**
* Return this behavior from message processing to signal that this actor
* shall terminate voluntarily. If this actor has created child actors then
* these will be stopped as part of the shutdown procedure. The PostStop
* signal that results from stopping this actor will NOT be passed to the
* current behavior, it will be effectively ignored.
* these will be stopped as part of the shutdown procedure.
*
* The PostStop signal that results from stopping this actor will be passed to the
* current behavior. All other messages and signals will effectively be
* ignored.
*/
def stopped[T]: Behavior[T] = Behavior.stopped
/**
* Return this behavior from message processing to signal that this actor
* shall terminate voluntarily. If this actor has created child actors then
* these will be stopped as part of the shutdown procedure.
*
* The PostStop signal that results from stopping this actor will be passed to the
* given `postStop` behavior. All other messages and signals will effectively be
* ignored.
*/
def stopped[T](postStop: Behavior[T]): Behavior[T] = Behavior.stopped(postStop)
/**
* A behavior that treats every incoming message as unhandled.
*/

View file

@ -137,12 +137,25 @@ object Actor {
/**
* Return this behavior from message processing to signal that this actor
* shall terminate voluntarily. If this actor has created child actors then
* these will be stopped as part of the shutdown procedure. The PostStop
* signal that results from stopping this actor will NOT be passed to the
* current behavior, it will be effectively ignored.
* these will be stopped as part of the shutdown procedure.
*
* The PostStop signal that results from stopping this actor will be passed to the
* current behavior. All other messages and signals will effectively be
* ignored.
*/
def stopped[T]: Behavior[T] = Behavior.stopped
/**
* Return this behavior from message processing to signal that this actor
* shall terminate voluntarily. If this actor has created child actors then
* these will be stopped as part of the shutdown procedure.
*
* The PostStop signal that results from stopping this actor will be passed to the
* given `postStop` behavior. All other messages and signals will effectively be
* ignored.
*/
def stopped[T](postStop: Behavior[T]): Behavior[T] = Behavior.stopped(postStop)
/**
* A behavior that treats every incoming message as unhandled.
*/