diff --git a/akka-typed/src/main/scala/akka/typed/Impl.scala b/akka-typed/src/main/scala/akka/typed/Impl.scala index 48d677812f..f35dadfa38 100644 --- a/akka-typed/src/main/scala/akka/typed/Impl.scala +++ b/akka-typed/src/main/scala/akka/typed/Impl.scala @@ -8,6 +8,7 @@ import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration import scala.concurrent.ExecutionContextExecutor import akka.event.LoggingReceive +import akka.actor.DeathPactException /** * INTERNAL API. Mapping the execution of a [[Behavior]] onto a good old untyped @@ -20,24 +21,35 @@ private[typed] class ActorAdapter[T](_initialBehavior: () ⇒ Behavior[T]) exten val ctx = new ActorContextAdapter[T](context) def receive = LoggingReceive { - case akka.actor.Terminated(ref) ⇒ next(behavior.management(ctx, Terminated(ActorRef(ref)))) - case akka.actor.ReceiveTimeout ⇒ next(behavior.management(ctx, ReceiveTimeout)) - case msg ⇒ next(behavior.message(ctx, msg.asInstanceOf[T])) + case akka.actor.Terminated(ref) ⇒ + val msg = Terminated(ActorRef(ref)) + next(behavior.management(ctx, msg), msg) + case akka.actor.ReceiveTimeout ⇒ + next(behavior.management(ctx, ReceiveTimeout), ReceiveTimeout) + case msg ⇒ + val m = msg.asInstanceOf[T] + next(behavior.message(ctx, m), m) } - private def next(b: Behavior[T]): Unit = { + private def next(b: Behavior[T], msg: Any): Unit = { + if (isUnhandled(b)) unhandled(msg) behavior = canonicalize(ctx, b, behavior) if (!isAlive(behavior)) { context.stop(self) } } + override def unhandled(msg: Any): Unit = msg match { + case Terminated(ref) ⇒ throw new DeathPactException(ref.untypedRef) + case other ⇒ super.unhandled(other) + } + override val supervisorStrategy = a.OneForOneStrategy() { case ex ⇒ import Failed._ import akka.actor.{ SupervisorStrategy ⇒ s } val f = Failed(ex, ActorRef(sender())) - next(behavior.management(ctx, f)) + next(behavior.management(ctx, f), f) f.getDecision match { case Resume ⇒ s.Resume case Restart ⇒ s.Restart @@ -47,13 +59,13 @@ private[typed] class ActorAdapter[T](_initialBehavior: () ⇒ Behavior[T]) exten } override def preStart(): Unit = - next(behavior.management(ctx, PreStart)) + next(behavior.management(ctx, PreStart), PreStart) override def preRestart(reason: Throwable, message: Option[Any]): Unit = - next(behavior.management(ctx, PreRestart(reason))) + next(behavior.management(ctx, PreRestart(reason)), PreRestart(reason)) override def postRestart(reason: Throwable): Unit = - next(behavior.management(ctx, PostRestart(reason))) + next(behavior.management(ctx, PostRestart(reason)), PostRestart(reason)) override def postStop(): Unit = - next(behavior.management(ctx, PostStop)) + next(behavior.management(ctx, PostStop), PostStop) } /** diff --git a/akka-typed/src/test/scala/akka/typed/ActorContextSpec.scala b/akka-typed/src/test/scala/akka/typed/ActorContextSpec.scala index 8dea15af88..756a6dc2b1 100644 --- a/akka-typed/src/test/scala/akka/typed/ActorContextSpec.scala +++ b/akka-typed/src/test/scala/akka/typed/ActorContextSpec.scala @@ -63,6 +63,9 @@ object ActorContextSpec { final case class BecomeInert(replyTo: ActorRef[BecameInert.type]) extends Command case object BecameInert extends Event + final case class BecomeCareless(replyTo: ActorRef[BecameCareless.type]) extends Command + case object BecameCareless extends Event + def subject(monitor: ActorRef[GotSignal]): Behavior[Command] = FullTotal { case Sig(ctx, signal) ⇒ @@ -131,6 +134,14 @@ object ActorContextSpec { throw ex case _ ⇒ Same } + case BecomeCareless(replyTo) ⇒ + replyTo ! BecameCareless + Full { + case Sig(_, Terminated(_)) ⇒ Unhandled + case Sig(_, sig) ⇒ + monitor ! GotSignal(sig) + Same + } } } } @@ -426,6 +437,29 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString( } }) + def `13 must terminate upon not handling Terminated`(): Unit = sync(setup("ctx13") { (ctx, startWith) ⇒ + val self = ctx.self + startWith.mkChild(None, ctx.spawnAdapter(ChildEvent), self).keep { + case (subj, child) ⇒ + subj ! Watch(child, self) + }.expectMessageKeep(500.millis) { + case (msg, (subj, child)) ⇒ + msg should ===(Watched) + subj ! BecomeCareless(self) + }.expectMessageKeep(500.millis) { + case (msg, (subj, child)) ⇒ + msg should ===(BecameCareless) + child ! Stop + }.expectFailureKeep(500.millis) { + case (f, (subj, child)) ⇒ + f.child should ===(subj) + Failed.Stop + }.expectMessage(500.millis) { + case (msg, (subj, child)) ⇒ + msg should ===(GotSignal(PostStop)) + } + }) + def `20 must return the right context info`(): Unit = sync(setup("ctx20") { (ctx, startWith) ⇒ startWith.keep(_ ! GetInfo(ctx.self)) .expectMessage(500.millis) {