send terminationMessage to singleton when leaving last, #21592 (#21654)

* send terminationMessage to singleton when leaving last, #21592

* When leaving last node, i.e. no newOldestOption, the manager was
  just stopped. The change is to send the terminationMessage also
  in this case and wait until the singleton actor is terminated
  before stopping the manager.
* Also changed so that the singleton is stopped immediately when
  cluster has been terminated when last node is leaving, i.e.
  no newOldestOption. Previously it retried until maxTakeOverRetries
  before stopping.
* More comprehensive test of this scenario in ClusterSingletonManagerLeaveSpec

* increase test timeout
This commit is contained in:
Patrik Nordwall 2016-10-28 14:23:18 +02:00 committed by Konrad Malawski
parent 021c2cbfdd
commit 9be7df1527
2 changed files with 72 additions and 12 deletions

View file

@ -182,6 +182,7 @@ object ClusterSingletonManager {
case object WasOldest extends State
case object HandingOver extends State
case object TakeOver extends State
case object Stopping extends State
case object End extends State
case object Uninitialized extends Data
@ -191,6 +192,7 @@ object ClusterSingletonManager {
final case class WasOldestData(singleton: ActorRef, singletonTerminated: Boolean,
newOldestOption: Option[UniqueAddress]) extends Data
final case class HandingOverData(singleton: ActorRef, handOverTo: Option[ActorRef]) extends Data
final case class StoppingData(singleton: ActorRef) extends Data
case object EndData extends Data
final case class DelayedMemberRemoved(member: Member)
@ -631,15 +633,16 @@ class ClusterSingletonManager(
}
when(WasOldest) {
case Event(TakeOverRetry(count), WasOldestData(_, _, newOldestOption))
if (count <= maxTakeOverRetries) {
case Event(TakeOverRetry(count), WasOldestData(singleton, singletonTerminated, newOldestOption))
if (cluster.isTerminated && (newOldestOption.isEmpty || count > maxTakeOverRetries)) {
if (singletonTerminated) stop()
else gotoStopping(singleton)
} else if (count <= maxTakeOverRetries) {
logInfo("Retry [{}], sending TakeOverFromMe to [{}]", count, newOldestOption.map(_.address))
newOldestOption.foreach(node peer(node.address) ! TakeOverFromMe)
setTimer(TakeOverRetryTimer, TakeOverRetry(count + 1), handOverRetryInterval, repeat = false)
stay
} else if (cluster.isTerminated)
stop()
else
} else
throw new ClusterSingletonManagerIsStuck(s"Expected hand-over to [${newOldestOption}] never occured")
case Event(HandOverToMe, WasOldestData(singleton, singletonTerminated, _))
@ -692,6 +695,16 @@ class ClusterSingletonManager(
goto(End) using EndData
}
def gotoStopping(singleton: ActorRef): State = {
singleton ! terminationMessage
goto(Stopping) using StoppingData(singleton)
}
when(Stopping) {
case (Event(Terminated(ref), StoppingData(singleton))) if ref == singleton
stop()
}
when(End) {
case Event(MemberRemoved(m, _), _) if m.uniqueAddress == cluster.selfUniqueAddress
logInfo("Self removed, stopping ClusterSingletonManager")

View file

@ -45,11 +45,17 @@ object ClusterSingletonManagerLeaveSpec extends MultiNodeConfig {
* The singleton actor
*/
class Echo(testActor: ActorRef) extends Actor {
override def preStart(): Unit = {
testActor ! "preStart"
}
override def postStop(): Unit = {
testActor ! "stopped"
testActor ! "postStop"
}
def receive = {
case "stop"
testActor ! "stop"
context.stop(self)
case _
sender() ! self
}
@ -78,7 +84,7 @@ class ClusterSingletonManagerLeaveSpec extends MultiNodeSpec(ClusterSingletonMan
system.actorOf(
ClusterSingletonManager.props(
singletonProps = Props(classOf[Echo], testActor),
terminationMessage = PoisonPill,
terminationMessage = "stop",
settings = ClusterSingletonManagerSettings(system)),
name = "echo")
}
@ -97,12 +103,22 @@ class ClusterSingletonManagerLeaveSpec extends MultiNodeSpec(ClusterSingletonMan
join(first, first)
runOn(first) {
echoProxy ! "hello"
expectMsgType[ActorRef](5.seconds)
within(5.seconds) {
expectMsg("preStart")
echoProxy ! "hello"
expectMsgType[ActorRef]
}
}
enterBarrier("first-active")
join(second, first)
runOn(first, second) {
within(10.seconds) {
awaitAssert(cluster.state.members.count(m m.status == MemberStatus.Up) should be(2))
}
}
enterBarrier("second-up")
join(third, first)
within(10.seconds) {
awaitAssert(cluster.state.members.count(m m.status == MemberStatus.Up) should be(3))
@ -114,14 +130,20 @@ class ClusterSingletonManagerLeaveSpec extends MultiNodeSpec(ClusterSingletonMan
}
runOn(first) {
expectMsg(10.seconds, "stopped")
expectMsg(10.seconds, "stop")
expectMsg("postStop")
}
enterBarrier("first-stopped")
runOn(second) {
expectMsg("preStart")
}
enterBarrier("second-started")
runOn(second, third) {
val p = TestProbe()
val firstAddress = node(first).address
p.within(10.seconds) {
p.within(15.seconds) {
p.awaitAssert {
echoProxy.tell("hello2", p.ref)
p.expectMsgType[ActorRef](1.seconds).path.address should not be (firstAddress)
@ -129,8 +151,33 @@ class ClusterSingletonManagerLeaveSpec extends MultiNodeSpec(ClusterSingletonMan
}
}
}
enterBarrier("second-working")
runOn(third) {
cluster.leave(node(second).address)
}
runOn(second) {
expectMsg(15.seconds, "stop")
expectMsg("postStop")
}
enterBarrier("second-stopped")
runOn(third) {
expectMsg("preStart")
}
enterBarrier("third-started")
runOn(third) {
cluster.leave(node(third).address)
}
runOn(third) {
expectMsg(5.seconds, "stop")
expectMsg("postStop")
}
enterBarrier("third-stopped")
enterBarrier("hand-over-done")
}
}