=typ #18699 pass Unhandled message to unhandled()
This commit is contained in:
parent
2382aacdee
commit
d71ca8597c
2 changed files with 55 additions and 9 deletions
|
|
@ -8,6 +8,7 @@ import scala.concurrent.duration.Duration
|
|||
import scala.concurrent.duration.FiniteDuration
|
||||
import scala.concurrent.ExecutionContextExecutor
|
||||
import akka.event.LoggingReceive
|
||||
import akka.actor.DeathPactException
|
||||
|
||||
/**
|
||||
* INTERNAL API. Mapping the execution of a [[Behavior]] onto a good old untyped
|
||||
|
|
@ -20,24 +21,35 @@ private[typed] class ActorAdapter[T](_initialBehavior: () ⇒ Behavior[T]) exten
|
|||
val ctx = new ActorContextAdapter[T](context)
|
||||
|
||||
def receive = LoggingReceive {
|
||||
case akka.actor.Terminated(ref) ⇒ next(behavior.management(ctx, Terminated(ActorRef(ref))))
|
||||
case akka.actor.ReceiveTimeout ⇒ next(behavior.management(ctx, ReceiveTimeout))
|
||||
case msg ⇒ next(behavior.message(ctx, msg.asInstanceOf[T]))
|
||||
case akka.actor.Terminated(ref) ⇒
|
||||
val msg = Terminated(ActorRef(ref))
|
||||
next(behavior.management(ctx, msg), msg)
|
||||
case akka.actor.ReceiveTimeout ⇒
|
||||
next(behavior.management(ctx, ReceiveTimeout), ReceiveTimeout)
|
||||
case msg ⇒
|
||||
val m = msg.asInstanceOf[T]
|
||||
next(behavior.message(ctx, m), m)
|
||||
}
|
||||
|
||||
private def next(b: Behavior[T]): Unit = {
|
||||
private def next(b: Behavior[T], msg: Any): Unit = {
|
||||
if (isUnhandled(b)) unhandled(msg)
|
||||
behavior = canonicalize(ctx, b, behavior)
|
||||
if (!isAlive(behavior)) {
|
||||
context.stop(self)
|
||||
}
|
||||
}
|
||||
|
||||
override def unhandled(msg: Any): Unit = msg match {
|
||||
case Terminated(ref) ⇒ throw new DeathPactException(ref.untypedRef)
|
||||
case other ⇒ super.unhandled(other)
|
||||
}
|
||||
|
||||
override val supervisorStrategy = a.OneForOneStrategy() {
|
||||
case ex ⇒
|
||||
import Failed._
|
||||
import akka.actor.{ SupervisorStrategy ⇒ s }
|
||||
val f = Failed(ex, ActorRef(sender()))
|
||||
next(behavior.management(ctx, f))
|
||||
next(behavior.management(ctx, f), f)
|
||||
f.getDecision match {
|
||||
case Resume ⇒ s.Resume
|
||||
case Restart ⇒ s.Restart
|
||||
|
|
@ -47,13 +59,13 @@ private[typed] class ActorAdapter[T](_initialBehavior: () ⇒ Behavior[T]) exten
|
|||
}
|
||||
|
||||
override def preStart(): Unit =
|
||||
next(behavior.management(ctx, PreStart))
|
||||
next(behavior.management(ctx, PreStart), PreStart)
|
||||
override def preRestart(reason: Throwable, message: Option[Any]): Unit =
|
||||
next(behavior.management(ctx, PreRestart(reason)))
|
||||
next(behavior.management(ctx, PreRestart(reason)), PreRestart(reason))
|
||||
override def postRestart(reason: Throwable): Unit =
|
||||
next(behavior.management(ctx, PostRestart(reason)))
|
||||
next(behavior.management(ctx, PostRestart(reason)), PostRestart(reason))
|
||||
override def postStop(): Unit =
|
||||
next(behavior.management(ctx, PostStop))
|
||||
next(behavior.management(ctx, PostStop), PostStop)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -63,6 +63,9 @@ object ActorContextSpec {
|
|||
final case class BecomeInert(replyTo: ActorRef[BecameInert.type]) extends Command
|
||||
case object BecameInert extends Event
|
||||
|
||||
final case class BecomeCareless(replyTo: ActorRef[BecameCareless.type]) extends Command
|
||||
case object BecameCareless extends Event
|
||||
|
||||
def subject(monitor: ActorRef[GotSignal]): Behavior[Command] =
|
||||
FullTotal {
|
||||
case Sig(ctx, signal) ⇒
|
||||
|
|
@ -131,6 +134,14 @@ object ActorContextSpec {
|
|||
throw ex
|
||||
case _ ⇒ Same
|
||||
}
|
||||
case BecomeCareless(replyTo) ⇒
|
||||
replyTo ! BecameCareless
|
||||
Full {
|
||||
case Sig(_, Terminated(_)) ⇒ Unhandled
|
||||
case Sig(_, sig) ⇒
|
||||
monitor ! GotSignal(sig)
|
||||
Same
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -426,6 +437,29 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
|
|||
}
|
||||
})
|
||||
|
||||
def `13 must terminate upon not handling Terminated`(): Unit = sync(setup("ctx13") { (ctx, startWith) ⇒
|
||||
val self = ctx.self
|
||||
startWith.mkChild(None, ctx.spawnAdapter(ChildEvent), self).keep {
|
||||
case (subj, child) ⇒
|
||||
subj ! Watch(child, self)
|
||||
}.expectMessageKeep(500.millis) {
|
||||
case (msg, (subj, child)) ⇒
|
||||
msg should ===(Watched)
|
||||
subj ! BecomeCareless(self)
|
||||
}.expectMessageKeep(500.millis) {
|
||||
case (msg, (subj, child)) ⇒
|
||||
msg should ===(BecameCareless)
|
||||
child ! Stop
|
||||
}.expectFailureKeep(500.millis) {
|
||||
case (f, (subj, child)) ⇒
|
||||
f.child should ===(subj)
|
||||
Failed.Stop
|
||||
}.expectMessage(500.millis) {
|
||||
case (msg, (subj, child)) ⇒
|
||||
msg should ===(GotSignal(PostStop))
|
||||
}
|
||||
})
|
||||
|
||||
def `20 must return the right context info`(): Unit = sync(setup("ctx20") { (ctx, startWith) ⇒
|
||||
startWith.keep(_ ! GetInfo(ctx.self))
|
||||
.expectMessage(500.millis) {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue