From 4ee299c72983a30692577d7a927de81ac81d9811 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 7 Feb 2013 14:25:29 +0100 Subject: [PATCH] Hardening of another corner case in cluster singleton, see #3017 * It was an unlikely situatation that was not covered, the new leader didn't know previous, because it transitioned from Start -> BecomeLeader, old leader was removed and got LeaderChanged(None), so none of them could request the other for hand-over or take-over. * Taken care of with the retry timeouts, also when leader receives LeaderChanged(None) * The old leader should have received a propert LeaderChanged earlier, which is a flaw in the way we publish leader events. That part will be fixed in a separate commit. --- .../pattern/ClusterSingletonManager.scala | 29 +++++++++---------- .../pattern/ClusterSingletonManagerSpec.scala | 8 ++--- 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala index c641524644..71ed4d4d17 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala @@ -65,7 +65,7 @@ object ClusterSingletonManager { case object TakeOverFromMe case class HandOverRetry(count: Int) - case class TakeOverRetry(leaderPeer: ActorRef, count: Int) + case class TakeOverRetry(count: Int) case object Cleanup case object StartLeaderChangedBuffer @@ -83,7 +83,7 @@ object ClusterSingletonManager { case class LeaderData(singleton: ActorRef, singletonTerminated: Boolean = false, handOverData: Option[Any] = None) extends Data case class WasLeaderData(singleton: ActorRef, singletonTerminated: Boolean, handOverData: Option[Any], - newLeader: Address) extends Data + newLeaderOption: Option[Address]) extends Data case class HandingOverData(singleton: ActorRef, handOverTo: Option[ActorRef], handOverData: Option[Any]) extends Data val HandOverRetryTimer = "hand-over-retry" @@ -475,13 +475,13 @@ class ClusterSingletonManager( gotoHandingOver(singleton, singletonTerminated, handOverData, None) case Some(a) ⇒ // send TakeOver request in case the new leader doesn't know previous leader - val leaderPeer = peer(a) - leaderPeer ! TakeOverFromMe - setTimer(TakeOverRetryTimer, TakeOverRetry(leaderPeer, 1), retryInterval, repeat = false) - goto(WasLeader) using WasLeaderData(singleton, singletonTerminated, handOverData, newLeader = a) - case _ ⇒ + peer(a) ! TakeOverFromMe + setTimer(TakeOverRetryTimer, TakeOverRetry(1), retryInterval, repeat = false) + goto(WasLeader) using WasLeaderData(singleton, singletonTerminated, handOverData, newLeaderOption = Some(a)) + case None ⇒ // new leader will initiate the hand-over - stay + setTimer(TakeOverRetryTimer, TakeOverRetry(1), retryInterval, repeat = false) + goto(WasLeader) using WasLeaderData(singleton, singletonTerminated, handOverData, newLeaderOption = None) } case Event(HandOverToMe, LeaderData(singleton, singletonTerminated, handOverData)) ⇒ @@ -495,20 +495,19 @@ class ClusterSingletonManager( } when(WasLeader) { - case Event(TakeOverRetry(leaderPeer, count), _) ⇒ - val newLeader = leaderPeer.path.address + case Event(TakeOverRetry(count), WasLeaderData(_, _, _, newLeaderOption)) ⇒ if (count <= maxTakeOverRetries) { - logInfo("Retry [{}], sending TakeOverFromMe to [{}]", count, newLeader) - leaderPeer ! TakeOverFromMe - setTimer(TakeOverRetryTimer, TakeOverRetry(leaderPeer, count + 1), retryInterval, repeat = false) + logInfo("Retry [{}], sending TakeOverFromMe to [{}]", count, newLeaderOption) + newLeaderOption foreach { peer(_) ! TakeOverFromMe } + setTimer(TakeOverRetryTimer, TakeOverRetry(count + 1), retryInterval, repeat = false) stay } else - throw new ClusterSingletonManagerIsStuck(s"Expected hand-over to [${newLeader}] never occured") + throw new ClusterSingletonManagerIsStuck(s"Expected hand-over to [${newLeaderOption}] never occured") case Event(HandOverToMe, WasLeaderData(singleton, singletonTerminated, handOverData, _)) ⇒ gotoHandingOver(singleton, singletonTerminated, handOverData, Some(sender)) - case Event(MemberDowned(m), WasLeaderData(singleton, singletonTerminated, handOverData, newLeader)) if m.address == newLeader ⇒ + case Event(MemberDowned(m), WasLeaderData(singleton, singletonTerminated, handOverData, Some(newLeader))) if m.address == newLeader ⇒ addDowned(m.address) gotoHandingOver(singleton, singletonTerminated, handOverData, None) diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala index 672ca7037d..0ceea4cddf 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala @@ -155,12 +155,12 @@ object ClusterSingletonManagerSpec extends MultiNodeConfig { def receive = { case state: CurrentClusterState ⇒ leaderAddress = state.leader - case LeaderChanged(leader) ⇒ leaderAddress = leader - case other => consumer foreach { _ forward other } + case LeaderChanged(leader) ⇒ leaderAddress = leader + case other ⇒ consumer foreach { _ forward other } } def consumer: Option[ActorRef] = - leaderAddress map (a => context.actorFor(RootActorPath(a) / + leaderAddress map (a ⇒ context.actorFor(RootActorPath(a) / "user" / "singleton" / "consumer")) } //#singleton-proxy @@ -300,7 +300,7 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS verify(sortedClusterRoles(0), msg = 4, expectedCurrent = 3) } - "hand over when leader leaves in 6 nodes cluster " in within(20 seconds) { + "hand over when leader leaves in 6 nodes cluster " in within(30 seconds) { //#test-leave val leaveRole = sortedClusterRoles(0) val newLeaderRole = sortedClusterRoles(1)