diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala index 848a91bc92..5b65df3317 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala @@ -182,6 +182,7 @@ object ClusterSingletonManager { case object WasOldest extends State case object HandingOver extends State case object TakeOver extends State + case object Stopping extends State case object End extends State case object Uninitialized extends Data @@ -191,6 +192,7 @@ object ClusterSingletonManager { final case class WasOldestData(singleton: ActorRef, singletonTerminated: Boolean, newOldestOption: Option[UniqueAddress]) extends Data final case class HandingOverData(singleton: ActorRef, handOverTo: Option[ActorRef]) extends Data + final case class StoppingData(singleton: ActorRef) extends Data case object EndData extends Data final case class DelayedMemberRemoved(member: Member) @@ -631,15 +633,16 @@ class ClusterSingletonManager( } when(WasOldest) { - case Event(TakeOverRetry(count), WasOldestData(_, _, newOldestOption)) ⇒ - if (count <= maxTakeOverRetries) { + case Event(TakeOverRetry(count), WasOldestData(singleton, singletonTerminated, newOldestOption)) ⇒ + if (cluster.isTerminated && (newOldestOption.isEmpty || count > maxTakeOverRetries)) { + if (singletonTerminated) stop() + else gotoStopping(singleton) + } else if (count <= maxTakeOverRetries) { logInfo("Retry [{}], sending TakeOverFromMe to [{}]", count, newOldestOption.map(_.address)) newOldestOption.foreach(node ⇒ peer(node.address) ! TakeOverFromMe) setTimer(TakeOverRetryTimer, TakeOverRetry(count + 1), handOverRetryInterval, repeat = false) stay - } else if (cluster.isTerminated) - stop() - else + } else throw new ClusterSingletonManagerIsStuck(s"Expected hand-over to [${newOldestOption}] never occured") case Event(HandOverToMe, WasOldestData(singleton, singletonTerminated, _)) ⇒ @@ -692,6 +695,16 @@ class ClusterSingletonManager( goto(End) using EndData } + def gotoStopping(singleton: ActorRef): State = { + singleton ! terminationMessage + goto(Stopping) using StoppingData(singleton) + } + + when(Stopping) { + case (Event(Terminated(ref), StoppingData(singleton))) if ref == singleton ⇒ + stop() + } + when(End) { case Event(MemberRemoved(m, _), _) if m.uniqueAddress == cluster.selfUniqueAddress ⇒ logInfo("Self removed, stopping ClusterSingletonManager") diff --git a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerLeaveSpec.scala b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerLeaveSpec.scala index b66fe09871..5bbad79e82 100644 --- a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerLeaveSpec.scala +++ b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerLeaveSpec.scala @@ -45,11 +45,17 @@ object ClusterSingletonManagerLeaveSpec extends MultiNodeConfig { * The singleton actor */ class Echo(testActor: ActorRef) extends Actor { + override def preStart(): Unit = { + testActor ! "preStart" + } override def postStop(): Unit = { - testActor ! "stopped" + testActor ! "postStop" } def receive = { + case "stop" ⇒ + testActor ! "stop" + context.stop(self) case _ ⇒ sender() ! self } @@ -78,7 +84,7 @@ class ClusterSingletonManagerLeaveSpec extends MultiNodeSpec(ClusterSingletonMan system.actorOf( ClusterSingletonManager.props( singletonProps = Props(classOf[Echo], testActor), - terminationMessage = PoisonPill, + terminationMessage = "stop", settings = ClusterSingletonManagerSettings(system)), name = "echo") } @@ -97,12 +103,22 @@ class ClusterSingletonManagerLeaveSpec extends MultiNodeSpec(ClusterSingletonMan join(first, first) runOn(first) { - echoProxy ! "hello" - expectMsgType[ActorRef](5.seconds) + within(5.seconds) { + expectMsg("preStart") + echoProxy ! "hello" + expectMsgType[ActorRef] + } } enterBarrier("first-active") join(second, first) + runOn(first, second) { + within(10.seconds) { + awaitAssert(cluster.state.members.count(m ⇒ m.status == MemberStatus.Up) should be(2)) + } + } + enterBarrier("second-up") + join(third, first) within(10.seconds) { awaitAssert(cluster.state.members.count(m ⇒ m.status == MemberStatus.Up) should be(3)) @@ -114,14 +130,20 @@ class ClusterSingletonManagerLeaveSpec extends MultiNodeSpec(ClusterSingletonMan } runOn(first) { - expectMsg(10.seconds, "stopped") + expectMsg(10.seconds, "stop") + expectMsg("postStop") } enterBarrier("first-stopped") + runOn(second) { + expectMsg("preStart") + } + enterBarrier("second-started") + runOn(second, third) { val p = TestProbe() val firstAddress = node(first).address - p.within(10.seconds) { + p.within(15.seconds) { p.awaitAssert { echoProxy.tell("hello2", p.ref) p.expectMsgType[ActorRef](1.seconds).path.address should not be (firstAddress) @@ -129,8 +151,33 @@ class ClusterSingletonManagerLeaveSpec extends MultiNodeSpec(ClusterSingletonMan } } } + enterBarrier("second-working") + + runOn(third) { + cluster.leave(node(second).address) + } + + runOn(second) { + expectMsg(15.seconds, "stop") + expectMsg("postStop") + } + enterBarrier("second-stopped") + + runOn(third) { + expectMsg("preStart") + } + enterBarrier("third-started") + + runOn(third) { + cluster.leave(node(third).address) + } + + runOn(third) { + expectMsg(5.seconds, "stop") + expectMsg("postStop") + } + enterBarrier("third-stopped") - enterBarrier("hand-over-done") } }