Unwatch before watch in typed Supervision (#30172)
This commit is contained in:
parent
ba06b19835
commit
d40ab055f5
2 changed files with 50 additions and 0 deletions
|
|
@ -40,6 +40,7 @@ object SupervisionSpec {
|
||||||
case object GetState extends Command
|
case object GetState extends Command
|
||||||
final case class CreateChild[T](behavior: Behavior[T], name: String) extends Command
|
final case class CreateChild[T](behavior: Behavior[T], name: String) extends Command
|
||||||
final case class Watch(ref: ActorRef[_]) extends Command
|
final case class Watch(ref: ActorRef[_]) extends Command
|
||||||
|
final case class WatchWith(ref: ActorRef[_], cmd: Command) extends Command
|
||||||
|
|
||||||
sealed trait Event
|
sealed trait Event
|
||||||
final case class Pong(n: Int) extends Event
|
final case class Pong(n: Int) extends Event
|
||||||
|
|
@ -73,6 +74,9 @@ object SupervisionSpec {
|
||||||
case Watch(ref) =>
|
case Watch(ref) =>
|
||||||
context.watch(ref)
|
context.watch(ref)
|
||||||
Behaviors.same
|
Behaviors.same
|
||||||
|
case WatchWith(ref, cmd) =>
|
||||||
|
context.watchWith(ref, cmd)
|
||||||
|
Behaviors.same
|
||||||
case Throw(e) =>
|
case Throw(e) =>
|
||||||
throw e
|
throw e
|
||||||
}
|
}
|
||||||
|
|
@ -548,6 +552,50 @@ class SupervisionSpec extends ScalaTestWithActorTestKit("""
|
||||||
parentProbe.expectMessage(ReceivedSignal(Terminated(anotherProbe.ref)))
|
parentProbe.expectMessage(ReceivedSignal(Terminated(anotherProbe.ref)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"successfully restart after stopping watchWith'd children" in {
|
||||||
|
val parentProbe = TestProbe[Event]("evt")
|
||||||
|
val behv = Behaviors.supervise(targetBehavior(parentProbe.ref)).onFailure[Exc1](SupervisorStrategy.restart)
|
||||||
|
val ref = spawn(behv)
|
||||||
|
|
||||||
|
val anotherProbe = TestProbe[String]("another")
|
||||||
|
ref ! WatchWith(anotherProbe.ref, Ping(0))
|
||||||
|
|
||||||
|
val childProbe = TestProbe[Event]("childEvt")
|
||||||
|
val slowStop = new CountDownLatch(1)
|
||||||
|
val childName = nextName()
|
||||||
|
ref ! CreateChild(targetBehavior(childProbe.ref, slowStop = Some(slowStop)), childName)
|
||||||
|
ref ! GetState
|
||||||
|
val childRef =
|
||||||
|
parentProbe.receiveMessage() match {
|
||||||
|
case State(0, children) =>
|
||||||
|
children.keySet should ===(Set(childName))
|
||||||
|
children(childName)
|
||||||
|
case _ =>
|
||||||
|
fail("expected to receive a State(0, _)")
|
||||||
|
}
|
||||||
|
|
||||||
|
ref ! WatchWith(childRef, Ping(1))
|
||||||
|
|
||||||
|
LoggingTestKit.error[Exc1].expect {
|
||||||
|
ref ! Throw(new Exc1)
|
||||||
|
parentProbe.expectMessage(ReceivedSignal(PreRestart))
|
||||||
|
ref ! GetState
|
||||||
|
anotherProbe.stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
// waiting for children to stop, GetState stashed
|
||||||
|
parentProbe.expectNoMessage()
|
||||||
|
slowStop.countDown()
|
||||||
|
|
||||||
|
childProbe.expectMessage(ReceivedSignal(PostStop))
|
||||||
|
parentProbe.expectMessageType[State].children.keySet should ===(Set.empty)
|
||||||
|
// we get the Ping(0) message from stopping anotherProbe
|
||||||
|
parentProbe.expectMessage(Pong(0))
|
||||||
|
// but we didn't get the Ping(1) message from stopping the child, because the
|
||||||
|
// restart strategy revokes the watchWith in favor of watch
|
||||||
|
parentProbe.expectNoMessage()
|
||||||
|
}
|
||||||
|
|
||||||
"optionally NOT stop children when restarting" in {
|
"optionally NOT stop children when restarting" in {
|
||||||
testNotStopChildren(strategy = SupervisorStrategy.restart.withStopChildren(enabled = false))
|
testNotStopChildren(strategy = SupervisorStrategy.restart.withStopChildren(enabled = false))
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -386,6 +386,8 @@ private class RestartSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior
|
||||||
|
|
||||||
private def stopChildren(ctx: TypedActorContext[_], children: Set[ActorRef[Nothing]]): Unit = {
|
private def stopChildren(ctx: TypedActorContext[_], children: Set[ActorRef[Nothing]]): Unit = {
|
||||||
children.foreach { child =>
|
children.foreach { child =>
|
||||||
|
// Unwatch in case the actor being restarted used watchWith to watch the child.
|
||||||
|
ctx.asScala.unwatch(child)
|
||||||
ctx.asScala.watch(child)
|
ctx.asScala.watch(child)
|
||||||
ctx.asScala.stop(child)
|
ctx.asScala.stop(child)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue