From 9be7df152768437a7a45900ffb87c888780758c9 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 28 Oct 2016 14:23:18 +0200 Subject: [PATCH] send terminationMessage to singleton when leaving last, #21592 (#21654) * send terminationMessage to singleton when leaving last, #21592 * When leaving last node, i.e. no newOldestOption, the manager was just stopped. The change is to send the terminationMessage also in this case and wait until the singleton actor is terminated before stopping the manager. * Also changed so that the singleton is stopped immediately when cluster has been terminated when last node is leaving, i.e. no newOldestOption. Previously it retried until maxTakeOverRetries before stopping. * More comprehensive test of this scenario in ClusterSingletonManagerLeaveSpec * increase test timeout --- .../singleton/ClusterSingletonManager.scala | 23 +++++-- .../ClusterSingletonManagerLeaveSpec.scala | 61 ++++++++++++++++--- 2 files changed, 72 insertions(+), 12 deletions(-) diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala index 848a91bc92..5b65df3317 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala @@ -182,6 +182,7 @@ object ClusterSingletonManager { case object WasOldest extends State case object HandingOver extends State case object TakeOver extends State + case object Stopping extends State case object End extends State case object Uninitialized extends Data @@ -191,6 +192,7 @@ object ClusterSingletonManager { final case class WasOldestData(singleton: ActorRef, singletonTerminated: Boolean, newOldestOption: Option[UniqueAddress]) extends Data final case class HandingOverData(singleton: ActorRef, handOverTo: Option[ActorRef]) extends Data + final case class StoppingData(singleton: ActorRef) extends Data case object EndData extends Data final case class DelayedMemberRemoved(member: Member) @@ -631,15 +633,16 @@ class ClusterSingletonManager( } when(WasOldest) { - case Event(TakeOverRetry(count), WasOldestData(_, _, newOldestOption)) ⇒ - if (count <= maxTakeOverRetries) { + case Event(TakeOverRetry(count), WasOldestData(singleton, singletonTerminated, newOldestOption)) ⇒ + if (cluster.isTerminated && (newOldestOption.isEmpty || count > maxTakeOverRetries)) { + if (singletonTerminated) stop() + else gotoStopping(singleton) + } else if (count <= maxTakeOverRetries) { logInfo("Retry [{}], sending TakeOverFromMe to [{}]", count, newOldestOption.map(_.address)) newOldestOption.foreach(node ⇒ peer(node.address) ! TakeOverFromMe) setTimer(TakeOverRetryTimer, TakeOverRetry(count + 1), handOverRetryInterval, repeat = false) stay - } else if (cluster.isTerminated) - stop() - else + } else throw new ClusterSingletonManagerIsStuck(s"Expected hand-over to [${newOldestOption}] never occured") case Event(HandOverToMe, WasOldestData(singleton, singletonTerminated, _)) ⇒ @@ -692,6 +695,16 @@ class ClusterSingletonManager( goto(End) using EndData } + def gotoStopping(singleton: ActorRef): State = { + singleton ! terminationMessage + goto(Stopping) using StoppingData(singleton) + } + + when(Stopping) { + case (Event(Terminated(ref), StoppingData(singleton))) if ref == singleton ⇒ + stop() + } + when(End) { case Event(MemberRemoved(m, _), _) if m.uniqueAddress == cluster.selfUniqueAddress ⇒ logInfo("Self removed, stopping ClusterSingletonManager") diff --git a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerLeaveSpec.scala b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerLeaveSpec.scala index b66fe09871..5bbad79e82 100644 --- a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerLeaveSpec.scala +++ b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerLeaveSpec.scala @@ -45,11 +45,17 @@ object ClusterSingletonManagerLeaveSpec extends MultiNodeConfig { * The singleton actor */ class Echo(testActor: ActorRef) extends Actor { + override def preStart(): Unit = { + testActor ! "preStart" + } override def postStop(): Unit = { - testActor ! "stopped" + testActor ! "postStop" } def receive = { + case "stop" ⇒ + testActor ! "stop" + context.stop(self) case _ ⇒ sender() ! self } @@ -78,7 +84,7 @@ class ClusterSingletonManagerLeaveSpec extends MultiNodeSpec(ClusterSingletonMan system.actorOf( ClusterSingletonManager.props( singletonProps = Props(classOf[Echo], testActor), - terminationMessage = PoisonPill, + terminationMessage = "stop", settings = ClusterSingletonManagerSettings(system)), name = "echo") } @@ -97,12 +103,22 @@ class ClusterSingletonManagerLeaveSpec extends MultiNodeSpec(ClusterSingletonMan join(first, first) runOn(first) { - echoProxy ! "hello" - expectMsgType[ActorRef](5.seconds) + within(5.seconds) { + expectMsg("preStart") + echoProxy ! "hello" + expectMsgType[ActorRef] + } } enterBarrier("first-active") join(second, first) + runOn(first, second) { + within(10.seconds) { + awaitAssert(cluster.state.members.count(m ⇒ m.status == MemberStatus.Up) should be(2)) + } + } + enterBarrier("second-up") + join(third, first) within(10.seconds) { awaitAssert(cluster.state.members.count(m ⇒ m.status == MemberStatus.Up) should be(3)) @@ -114,14 +130,20 @@ class ClusterSingletonManagerLeaveSpec extends MultiNodeSpec(ClusterSingletonMan } runOn(first) { - expectMsg(10.seconds, "stopped") + expectMsg(10.seconds, "stop") + expectMsg("postStop") } enterBarrier("first-stopped") + runOn(second) { + expectMsg("preStart") + } + enterBarrier("second-started") + runOn(second, third) { val p = TestProbe() val firstAddress = node(first).address - p.within(10.seconds) { + p.within(15.seconds) { p.awaitAssert { echoProxy.tell("hello2", p.ref) p.expectMsgType[ActorRef](1.seconds).path.address should not be (firstAddress) @@ -129,8 +151,33 @@ class ClusterSingletonManagerLeaveSpec extends MultiNodeSpec(ClusterSingletonMan } } } + enterBarrier("second-working") + + runOn(third) { + cluster.leave(node(second).address) + } + + runOn(second) { + expectMsg(15.seconds, "stop") + expectMsg("postStop") + } + enterBarrier("second-stopped") + + runOn(third) { + expectMsg("preStart") + } + enterBarrier("third-started") + + runOn(third) { + cluster.leave(node(third).address) + } + + runOn(third) { + expectMsg(5.seconds, "stop") + expectMsg("postStop") + } + enterBarrier("third-stopped") - enterBarrier("hand-over-done") } }