diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala index 0383f294c3..42f61ac0e5 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala @@ -23,6 +23,7 @@ import akka.cluster.Member import akka.cluster.MemberStatus import akka.AkkaException import akka.actor.NoSerializationVerificationNeeded +import akka.cluster.UniqueAddress object ClusterSingletonManagerSettings { @@ -184,11 +185,11 @@ object ClusterSingletonManager { case object End extends State case object Uninitialized extends Data - final case class YoungerData(oldestOption: Option[Address]) extends Data - final case class BecomingOldestData(previousOldestOption: Option[Address]) extends Data + final case class YoungerData(oldestOption: Option[UniqueAddress]) extends Data + final case class BecomingOldestData(previousOldestOption: Option[UniqueAddress]) extends Data final case class OldestData(singleton: ActorRef, singletonTerminated: Boolean = false) extends Data final case class WasOldestData(singleton: ActorRef, singletonTerminated: Boolean, - newOldestOption: Option[Address]) extends Data + newOldestOption: Option[UniqueAddress]) extends Data final case class HandingOverData(singleton: ActorRef, handOverTo: Option[ActorRef]) extends Data case object EndData extends Data final case class DelayedMemberRemoved(member: Member) @@ -205,9 +206,9 @@ object ClusterSingletonManager { /** * The first event, corresponding to CurrentClusterState. */ - final case class InitialOldestState(oldest: Option[Address], safeToBeOldest: Boolean) + final case class InitialOldestState(oldest: Option[UniqueAddress], safeToBeOldest: Boolean) - final case class OldestChanged(oldest: Option[Address]) + final case class OldestChanged(oldest: Option[UniqueAddress]) } /** @@ -245,14 +246,14 @@ object ClusterSingletonManager { block() val after = membersByAge.headOption if (before != after) - changes :+= OldestChanged(after.map(_.address)) + changes :+= OldestChanged(after.map(_.uniqueAddress)) } def handleInitial(state: CurrentClusterState): Unit = { membersByAge = immutable.SortedSet.empty(ageOrdering) union state.members.filter(m ⇒ (m.status == MemberStatus.Up || m.status == MemberStatus.Leaving) && matchingRole(m)) val safeToBeOldest = !state.members.exists { m ⇒ (m.status == MemberStatus.Down || m.status == MemberStatus.Exiting) } - val initial = InitialOldestState(membersByAge.headOption.map(_.address), safeToBeOldest) + val initial = InitialOldestState(membersByAge.headOption.map(_.uniqueAddress), safeToBeOldest) changes :+= initial } @@ -376,7 +377,7 @@ class ClusterSingletonManager( import FSM.`→` val cluster = Cluster(context.system) - val selfAddressOption = Some(cluster.selfAddress) + val selfUniqueAddressOption = Some(cluster.selfUniqueAddress) import cluster.settings.LogInfo require( @@ -406,13 +407,13 @@ class ClusterSingletonManager( var selfExited = false // keep track of previously removed members - var removed = Map.empty[Address, Deadline] + var removed = Map.empty[UniqueAddress, Deadline] - def addRemoved(address: Address): Unit = - removed += address → (Deadline.now + 15.minutes) + def addRemoved(node: UniqueAddress): Unit = + removed += node → (Deadline.now + 15.minutes) def cleanupOverdueNotMemberAnyMore(): Unit = { - removed = removed filter { case (address, deadline) ⇒ deadline.hasTimeLeft } + removed = removed filter { case (_, deadline) ⇒ deadline.hasTimeLeft } } def logInfo(message: String): Unit = @@ -463,10 +464,10 @@ class ClusterSingletonManager( case Event(InitialOldestState(oldestOption, safeToBeOldest), _) ⇒ oldestChangedReceived = true - if (oldestOption == selfAddressOption && safeToBeOldest) + if (oldestOption == selfUniqueAddressOption && safeToBeOldest) // oldest immediately gotoOldest() - else if (oldestOption == selfAddressOption) + else if (oldestOption == selfUniqueAddressOption) goto(BecomingOldest) using BecomingOldestData(None) else goto(Younger) using YoungerData(oldestOption) @@ -475,22 +476,22 @@ class ClusterSingletonManager( when(Younger) { case Event(OldestChanged(oldestOption), YoungerData(previousOldestOption)) ⇒ oldestChangedReceived = true - if (oldestOption == selfAddressOption) { - logInfo("Younger observed OldestChanged: [{} -> myself]", previousOldestOption) + if (oldestOption == selfUniqueAddressOption) { + logInfo("Younger observed OldestChanged: [{} -> myself]", previousOldestOption.map(_.address)) previousOldestOption match { case None ⇒ gotoOldest() case Some(prev) if removed.contains(prev) ⇒ gotoOldest() case Some(prev) ⇒ - peer(prev) ! HandOverToMe + peer(prev.address) ! HandOverToMe goto(BecomingOldest) using BecomingOldestData(previousOldestOption) } } else { - logInfo("Younger observed OldestChanged: [{} -> {}]", previousOldestOption, oldestOption) + logInfo("Younger observed OldestChanged: [{} -> {}]", previousOldestOption.map(_.address), oldestOption.map(_.address)) getNextOldestChanged() stay using YoungerData(oldestOption) } - case Event(MemberRemoved(m, _), _) if m.address == cluster.selfAddress ⇒ + case Event(MemberRemoved(m, _), _) if m.uniqueAddress == cluster.selfUniqueAddress ⇒ logInfo("Self removed, stopping ClusterSingletonManager") stop() @@ -498,11 +499,17 @@ class ClusterSingletonManager( scheduleDelayedMemberRemoved(m) stay - case Event(DelayedMemberRemoved(m), YoungerData(Some(previousOldest))) if m.address == previousOldest ⇒ + case Event(DelayedMemberRemoved(m), YoungerData(Some(previousOldest))) if m.uniqueAddress == previousOldest ⇒ logInfo("Previous oldest removed [{}]", m.address) - addRemoved(m.address) + addRemoved(m.uniqueAddress) // transition when OldestChanged stay using YoungerData(None) + + case Event(HandOverToMe, _) ⇒ + // this node was probably quickly restarted with same hostname:port, + // confirm that the old singleton instance has been stopped + sender() ! HandOverDone + stay } when(BecomingOldest) { @@ -514,16 +521,16 @@ class ClusterSingletonManager( stay case Event(HandOverDone, BecomingOldestData(Some(previousOldest))) ⇒ - if (sender().path.address == previousOldest) + if (sender().path.address == previousOldest.address) gotoOldest() else { logInfo( "Ignoring HandOverDone in BecomingOldest from [{}]. Expected previous oldest [{}]", - sender().path.address, previousOldest) + sender().path.address, previousOldest.address) stay } - case Event(MemberRemoved(m, _), _) if m.address == cluster.selfAddress ⇒ + case Event(MemberRemoved(m, _), _) if m.uniqueAddress == cluster.selfUniqueAddress ⇒ logInfo("Self removed, stopping ClusterSingletonManager") stop() @@ -531,26 +538,39 @@ class ClusterSingletonManager( scheduleDelayedMemberRemoved(m) stay - case Event(DelayedMemberRemoved(m), BecomingOldestData(Some(previousOldest))) if m.address == previousOldest ⇒ - logInfo("Previous oldest [{}] removed", previousOldest) - addRemoved(m.address) + case Event(DelayedMemberRemoved(m), BecomingOldestData(Some(previousOldest))) if m.uniqueAddress == previousOldest ⇒ + logInfo("Previous oldest [{}] removed", previousOldest.address) + addRemoved(m.uniqueAddress) gotoOldest() - case Event(TakeOverFromMe, BecomingOldestData(None)) ⇒ - sender() ! HandOverToMe - stay using BecomingOldestData(Some(sender().path.address)) - - case Event(TakeOverFromMe, BecomingOldestData(Some(previousOldest))) ⇒ - if (previousOldest == sender().path.address) sender() ! HandOverToMe - else logInfo( - "Ignoring TakeOver request in BecomingOldest from [{}]. Expected previous oldest [{}]", - sender().path.address, previousOldest) - stay + case Event(TakeOverFromMe, BecomingOldestData(previousOldestOption)) ⇒ + val senderAddress = sender().path.address + // it would have been better to include the UniqueAddress in the TakeOverFromMe message, + // but can't change due to backwards compatibility + cluster.state.members.collectFirst { case m if m.address == senderAddress ⇒ m.uniqueAddress } match { + case None ⇒ + // from unknown node, ignore + logInfo( + "Ignoring TakeOver request from unknown node in BecomingOldest from [{}].", senderAddress) + stay + case Some(senderUniqueAddress) ⇒ + previousOldestOption match { + case Some(previousOldest) ⇒ + if (previousOldest == senderUniqueAddress) sender() ! HandOverToMe + else logInfo( + "Ignoring TakeOver request in BecomingOldest from [{}]. Expected previous oldest [{}]", + sender().path.address, previousOldest.address) + stay + case None ⇒ + sender() ! HandOverToMe + stay using BecomingOldestData(Some(senderUniqueAddress)) + } + } case Event(HandOverRetry(count), BecomingOldestData(previousOldestOption)) ⇒ if (count <= maxHandOverRetries) { - logInfo("Retry [{}], sending HandOverToMe to [{}]", count, previousOldestOption) - previousOldestOption foreach { peer(_) ! HandOverToMe } + logInfo("Retry [{}], sending HandOverToMe to [{}]", count, previousOldestOption.map(_.address)) + previousOldestOption.foreach(node ⇒ peer(node.address) ! HandOverToMe) setTimer(HandOverRetryTimer, HandOverRetry(count + 1), handOverRetryInterval, repeat = false) stay() } else if (previousOldestOption forall removed.contains) { @@ -582,16 +602,16 @@ class ClusterSingletonManager( when(Oldest) { case Event(OldestChanged(oldestOption), OldestData(singleton, singletonTerminated)) ⇒ oldestChangedReceived = true - logInfo("Oldest observed OldestChanged: [{} -> {}]", cluster.selfAddress, oldestOption) + logInfo("Oldest observed OldestChanged: [{} -> {}]", cluster.selfAddress, oldestOption.map(_.address)) oldestOption match { - case Some(a) if a == cluster.selfAddress ⇒ + case Some(a) if a == cluster.selfUniqueAddress ⇒ // already oldest stay case Some(a) if !selfExited && removed.contains(a) ⇒ gotoHandingOver(singleton, singletonTerminated, None) case Some(a) ⇒ // send TakeOver request in case the new oldest doesn't know previous oldest - peer(a) ! TakeOverFromMe + peer(a.address) ! TakeOverFromMe setTimer(TakeOverRetryTimer, TakeOverRetry(1), handOverRetryInterval, repeat = false) goto(WasOldest) using WasOldestData(singleton, singletonTerminated, newOldestOption = Some(a)) case None ⇒ @@ -610,8 +630,8 @@ class ClusterSingletonManager( when(WasOldest) { case Event(TakeOverRetry(count), WasOldestData(_, _, newOldestOption)) ⇒ if (count <= maxTakeOverRetries) { - logInfo("Retry [{}], sending TakeOverFromMe to [{}]", count, newOldestOption) - newOldestOption foreach { peer(_) ! TakeOverFromMe } + logInfo("Retry [{}], sending TakeOverFromMe to [{}]", count, newOldestOption.map(_.address)) + newOldestOption.foreach(node ⇒ peer(node.address) ! TakeOverFromMe) setTimer(TakeOverRetryTimer, TakeOverRetry(count + 1), handOverRetryInterval, repeat = false) stay } else if (cluster.isTerminated) @@ -622,12 +642,12 @@ class ClusterSingletonManager( case Event(HandOverToMe, WasOldestData(singleton, singletonTerminated, _)) ⇒ gotoHandingOver(singleton, singletonTerminated, Some(sender())) - case Event(MemberRemoved(m, _), _) if m.address == cluster.selfAddress && !selfExited ⇒ + case Event(MemberRemoved(m, _), _) if m.uniqueAddress == cluster.selfUniqueAddress && !selfExited ⇒ logInfo("Self removed, stopping ClusterSingletonManager") stop() - case Event(MemberRemoved(m, _), WasOldestData(singleton, singletonTerminated, Some(newOldest))) if !selfExited && m.address == newOldest ⇒ - addRemoved(m.address) + case Event(MemberRemoved(m, _), WasOldestData(singleton, singletonTerminated, Some(newOldest))) if !selfExited && m.uniqueAddress == newOldest ⇒ + addRemoved(m.uniqueAddress) gotoHandingOver(singleton, singletonTerminated, None) case Event(Terminated(ref), d @ WasOldestData(singleton, _, _)) if ref == singleton ⇒ @@ -660,17 +680,15 @@ class ClusterSingletonManager( val newOldest = handOverTo.map(_.path.address) logInfo("Singleton terminated, hand-over done [{} -> {}]", cluster.selfAddress, newOldest) handOverTo foreach { _ ! HandOverDone } - if (removed.contains(cluster.selfAddress)) { + if (removed.contains(cluster.selfUniqueAddress)) { logInfo("Self removed, stopping ClusterSingletonManager") stop() - } else if (selfExited) + } else goto(End) using EndData - else - goto(Younger) using YoungerData(newOldest) } when(End) { - case Event(MemberRemoved(m, _), _) if m.address == cluster.selfAddress ⇒ + case Event(MemberRemoved(m, _), _) if m.uniqueAddress == cluster.selfUniqueAddress ⇒ logInfo("Self removed, stopping ClusterSingletonManager") stop() } @@ -678,21 +696,21 @@ class ClusterSingletonManager( whenUnhandled { case Event(_: CurrentClusterState, _) ⇒ stay case Event(MemberExited(m), _) ⇒ - if (m.address == cluster.selfAddress) { + if (m.uniqueAddress == cluster.selfUniqueAddress) { selfExited = true logInfo("Exited [{}]", m.address) } stay - case Event(MemberRemoved(m, _), _) if m.address == cluster.selfAddress && !selfExited ⇒ + case Event(MemberRemoved(m, _), _) if m.uniqueAddress == cluster.selfUniqueAddress && !selfExited ⇒ logInfo("Self removed, stopping ClusterSingletonManager") stop() case Event(MemberRemoved(m, _), _) ⇒ if (!selfExited) logInfo("Member removed [{}]", m.address) - addRemoved(m.address) + addRemoved(m.uniqueAddress) stay case Event(DelayedMemberRemoved(m), _) ⇒ if (!selfExited) logInfo("Member removed [{}]", m.address) - addRemoved(m.address) + addRemoved(m.uniqueAddress) stay case Event(TakeOverFromMe, _) ⇒ logInfo("Ignoring TakeOver request in [{}] from [{}].", stateName, sender().path.address) @@ -720,7 +738,7 @@ class ClusterSingletonManager( } onTransition { - case _ → (Younger | End) if removed.contains(cluster.selfAddress) ⇒ + case _ → (Younger | End) if removed.contains(cluster.selfUniqueAddress) ⇒ logInfo("Self removed, stopping ClusterSingletonManager") // note that FSM.stop() can't be used in onTransition context.stop(self) diff --git a/akka-cluster-tools/src/test/scala/akka/cluster/singleton/ClusterSingletonRestartSpec.scala b/akka-cluster-tools/src/test/scala/akka/cluster/singleton/ClusterSingletonRestartSpec.scala new file mode 100644 index 0000000000..e8c2f6d8b1 --- /dev/null +++ b/akka-cluster-tools/src/test/scala/akka/cluster/singleton/ClusterSingletonRestartSpec.scala @@ -0,0 +1,109 @@ +/** + * Copyright (C) 2009-2016 Lightbend Inc. + */ +package akka.cluster.singleton + +import scala.concurrent.duration._ + +import akka.actor.ActorSystem +import akka.actor.PoisonPill +import akka.cluster.Cluster +import akka.cluster.MemberStatus +import akka.testkit.AkkaSpec +import akka.testkit.TestActors +import akka.testkit.TestProbe +import com.typesafe.config.ConfigFactory + +class ClusterSingletonRestartSpec extends AkkaSpec(""" + akka.loglevel = INFO + akka.actor.provider = akka.cluster.ClusterActorRefProvider + akka.remote { + netty.tcp { + hostname = "127.0.0.1" + port = 0 + } + } + """) { + + val sys1 = ActorSystem(system.name, system.settings.config) + val sys2 = ActorSystem(system.name, system.settings.config) + var sys3: ActorSystem = null + + def join(from: ActorSystem, to: ActorSystem): Unit = { + from.actorOf( + ClusterSingletonManager.props( + singletonProps = TestActors.echoActorProps, + terminationMessage = PoisonPill, + settings = ClusterSingletonManagerSettings(from)), + name = "echo") + + within(10.seconds) { + awaitAssert { + Cluster(from) join Cluster(to).selfAddress + Cluster(from).state.members.map(_.uniqueAddress) should contain(Cluster(from).selfUniqueAddress) + Cluster(from).state.members.map(_.status) should ===(Set(MemberStatus.Up)) + } + } + } + + "Restarting cluster node with same hostname and port" must { + "hand-over to next oldest" in { + join(sys1, sys1) + join(sys2, sys1) + + val proxy2 = sys2.actorOf(ClusterSingletonProxy.props("user/echo", ClusterSingletonProxySettings(sys2)), "proxy2") + + within(5.seconds) { + awaitAssert { + val probe = TestProbe()(sys2) + proxy2.tell("hello", probe.ref) + probe.expectMsg(1.second, "hello") + } + } + + shutdown(sys1) + // it will be downed by the join attempts of the new incarnation + + sys3 = ActorSystem( + system.name, + ConfigFactory.parseString(s"akka.remote.netty.tcp.port=${Cluster(sys1).selfAddress.port.get}").withFallback( + system.settings.config)) + join(sys3, sys2) + + within(5.seconds) { + awaitAssert { + val probe = TestProbe()(sys2) + proxy2.tell("hello2", probe.ref) + probe.expectMsg(1.second, "hello2") + } + } + + Cluster(sys2).leave(Cluster(sys2).selfAddress) + + within(10.seconds) { + awaitAssert { + Cluster(sys3).state.members.map(_.uniqueAddress) should ===(Set(Cluster(sys3).selfUniqueAddress)) + } + } + + val proxy3 = sys3.actorOf(ClusterSingletonProxy.props("user/echo", ClusterSingletonProxySettings(sys3)), "proxy3") + + within(5.seconds) { + awaitAssert { + val probe = TestProbe()(sys3) + proxy3.tell("hello3", probe.ref) + probe.expectMsg(1.second, "hello3") + } + } + + } + } + + override def afterTermination(): Unit = { + shutdown(sys1) + shutdown(sys2) + if (sys3 != null) + shutdown(sys3) + } +} + diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 0db89311c9..af69ad1262 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -508,8 +508,15 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with // new node will retry join logInfo("New incarnation of existing member [{}] is trying to join. " + "Existing will be removed from the cluster and then new member will be allowed to join.", m) - if (m.status != Down) + if (m.status != Down) { + // we can confirm it as terminated/unreachable immediately + val newReachability = latestGossip.overview.reachability.terminated(selfUniqueAddress, m.uniqueAddress) + val newOverview = latestGossip.overview.copy(reachability = newReachability) + val newGossip = latestGossip.copy(overview = newOverview) + updateLatestGossip(newGossip) + downing(m.address) + } case None ⇒ // remove the node from the failure detector failureDetector.remove(node.address) @@ -609,7 +616,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with publish(latestGossip) case Some(_) ⇒ // already down case None ⇒ - logInfo("Ignoring down of unknown node [{}] as [{}]", address) + logInfo("Ignoring down of unknown node [{}]", address) } } @@ -1259,10 +1266,10 @@ private[cluster] class OnMemberStatusChangedListener(callback: Runnable, status: import ClusterEvent._ private val cluster = Cluster(context.system) private val to = status match { - case Up ⇒ - classOf[MemberUp] - case Removed ⇒ - classOf[MemberRemoved] + case Up ⇒ classOf[MemberUp] + case Removed ⇒ classOf[MemberRemoved] + case other ⇒ throw new IllegalArgumentException( + s"Expected Up or Removed in OnMemberStatusChangedListener, got [$other]") } override def preStart(): Unit = @@ -1305,10 +1312,10 @@ private[cluster] class OnMemberStatusChangedListener(callback: Runnable, status: @SerialVersionUID(1L) private[cluster] final case class GossipStats( receivedGossipCount: Long = 0L, - mergeCount: Long = 0L, - sameCount: Long = 0L, - newerCount: Long = 0L, - olderCount: Long = 0L) { + mergeCount: Long = 0L, + sameCount: Long = 0L, + newerCount: Long = 0L, + olderCount: Long = 0L) { def incrementMergeCount(): GossipStats = copy(mergeCount = mergeCount + 1, receivedGossipCount = receivedGossipCount + 1) @@ -1348,5 +1355,5 @@ private[cluster] final case class GossipStats( @SerialVersionUID(1L) private[cluster] final case class VectorClockStats( versionSize: Int = 0, - seenLatest: Int = 0) + seenLatest: Int = 0) diff --git a/project/MiMa.scala b/project/MiMa.scala index 6866c38af3..e55d3c89e7 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -939,7 +939,11 @@ object MiMa extends AutoPlugin { // #19872 double wildcard for actor deployment config ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.actor.Deployer.lookup"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.util.WildcardTree.apply"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.util.WildcardTree.find") + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.util.WildcardTree.find"), + + // #20942 ClusterSingleton + ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.singleton.ClusterSingletonManager.addRemoved"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.singleton.ClusterSingletonManager.selfAddressOption") ) ) }