From 659b28e4eb5ce38918f5a95b20dba0bc5af65f8a Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 15 Aug 2017 15:31:52 +0200 Subject: [PATCH] Missing become after CurrentClusterState in CrossDcHeartbeatSender, #23371 * and a few other small things * one can see in the failed test log that there is no ACTIVE log line on the failing node --- .../cluster/CrossDcClusterHeartbeat.scala | 33 ++++++++++--------- .../MultiDcHeartbeatTakingOverSpec.scala | 15 ++++----- .../cluster/MultiDcSunnyWeatherSpec.scala | 11 +++---- 3 files changed, 29 insertions(+), 30 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/CrossDcClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/CrossDcClusterHeartbeat.scala index 3e8f3f318d..2d6b1e56f1 100644 --- a/akka-cluster/src/main/scala/akka/cluster/CrossDcClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/CrossDcClusterHeartbeat.scala @@ -58,8 +58,7 @@ private[cluster] final class CrossDcHeartbeatSender extends Actor with ActorLogg selfDataCenter, crossDcFailureDetector, crossDcSettings.NrOfMonitoringActors, - SortedSet.empty - ) + SortedSet.empty) // start periodic heartbeat to other nodes in cluster val heartbeatTask = scheduler.schedule( @@ -125,10 +124,11 @@ private[cluster] final class CrossDcHeartbeatSender extends Actor with ActorLogg val nodes = snapshot.members val nrOfMonitoredNodes = crossDcSettings.NrOfMonitoringActors dataCentersState = CrossDcHeartbeatingState.init(selfDataCenter, crossDcFailureDetector, nrOfMonitoredNodes, nodes) + becomeActiveIfResponsibleForHeartbeat() } def addMember(m: Member): Unit = - if (m.status != MemberStatus.Joining && m.status != MemberStatus.WeaklyUp) { + if (CrossDcHeartbeatingState.atLeastInUpState(m)) { // since we only monitor nodes in Up or later states, due to the n-th oldest requirement dataCentersState = dataCentersState.addMember(m) if (verboseHeartbeat && m.dataCenter != selfDataCenter) @@ -194,7 +194,7 @@ private[cluster] final class CrossDcHeartbeatSender extends Actor with ActorLogg @InternalApi private[akka] object CrossDcHeartbeatSender { - // -- messages intended only for local messaging during testing -- + // -- messages intended only for local messaging during testing -- sealed trait InspectionCommand extends NoSerializationVerificationNeeded final case class ReportStatus() @@ -202,7 +202,7 @@ private[akka] object CrossDcHeartbeatSender { sealed trait MonitoringStateReport extends StatusReport final case class MonitoringActive(state: CrossDcHeartbeatingState) extends MonitoringStateReport final case class MonitoringDormant() extends MonitoringStateReport - // -- end of messages intended only for local messaging during testing -- + // -- end of messages intended only for local messaging during testing -- } /** INTERNAL API */ @@ -219,9 +219,7 @@ private[cluster] final case class CrossDcHeartbeatingState( * Only the `nrOfMonitoredNodesPerDc`-oldest nodes in each DC fulfil this role. */ def shouldActivelyMonitorNodes(selfDc: ClusterSettings.DataCenter, selfAddress: UniqueAddress): Boolean = { - /** Since we need ordering of oldests guaranteed, we must only look at Up (or Leaving, Exiting...) nodes */ - def atLeastInUpState(m: Member): Boolean = - m.status != MemberStatus.WeaklyUp && m.status != MemberStatus.Joining + // Since we need ordering of oldests guaranteed, we must only look at Up (or Leaving, Exiting...) nodes val selfDcNeighbours: SortedSet[Member] = state.getOrElse(selfDc, emptyMembersSortedSet) val selfDcOldOnes = selfDcNeighbours.filter(atLeastInUpState).take(nrOfMonitoredNodesPerDc) @@ -244,12 +242,12 @@ private[cluster] final case class CrossDcHeartbeatingState( val updatedState = this.copy(state = state.updated(dc, updatedMembers)) // guarding against the case of two members having the same upNumber, in which case the activeReceivers - // which are based on the ageOrdering could actually have changed by adding a node. In practice this - // should happen rarely, since upNumbers are assigned sequentially, and we only ever compare nodes - // in the same DC. If it happens though, we need to remove the previously monitored node from the failure + // which are based on the ageOrdering could actually have changed by adding a node. In practice this + // should happen rarely, since upNumbers are assigned sequentially, and we only ever compare nodes + // in the same DC. If it happens though, we need to remove the previously monitored node from the failure // detector, to prevent both a resource leak and that node actually appearing as unreachable in the gossip (!) val stoppedMonitoringReceivers = updatedState.activeReceiversIn(dc) diff this.activeReceiversIn(dc) - stoppedMonitoringReceivers.foreach(m ⇒ failureDetector.remove(m.address)) // at most one element difference + stoppedMonitoringReceivers.foreach(m ⇒ failureDetector.remove(m.address)) // at most one element difference updatedState } @@ -263,7 +261,7 @@ private[cluster] final case class CrossDcHeartbeatingState( failureDetector.remove(m.address) copy(state = state.updated(dc, updatedMembers)) case None ⇒ - this // no change needed, was certainly not present (not even its DC was) + this // no change needed, was certainly not present (not even its DC was) } } @@ -274,8 +272,7 @@ private[cluster] final case class CrossDcHeartbeatingState( allOtherNodes.flatMap( _.take(nrOfMonitoredNodesPerDc) - .map(_.uniqueAddress)(breakOut) - ).toSet + .map(_.uniqueAddress)(breakOut)).toSet } /** Lists addresses in diven DataCenter that this node should send heartbeats to */ @@ -310,6 +307,10 @@ private[cluster] object CrossDcHeartbeatingState { /** Sorted by age */ private def emptyMembersSortedSet: SortedSet[Member] = SortedSet.empty[Member](Member.ageOrdering) + // Since we need ordering of oldests guaranteed, we must only look at Up (or Leaving, Exiting...) nodes + def atLeastInUpState(m: Member): Boolean = + m.status != MemberStatus.WeaklyUp && m.status != MemberStatus.Joining + def init( selfDataCenter: DataCenter, crossDcFailureDetector: FailureDetectorRegistry[Address], @@ -321,7 +322,7 @@ private[cluster] object CrossDcHeartbeatingState { nrOfMonitoredNodesPerDc, state = { // TODO unduplicate this with the logic in MembershipState.ageSortedTopOldestMembersPerDc - val groupedByDc = members.groupBy(_.dataCenter) + val groupedByDc = members.filter(atLeastInUpState).groupBy(_.dataCenter) if (members.ordering == Member.ageOrdering) { // we already have the right ordering diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcHeartbeatTakingOverSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcHeartbeatTakingOverSpec.scala index 60241fb62d..5a3c312da4 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcHeartbeatTakingOverSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcHeartbeatTakingOverSpec.scala @@ -16,7 +16,7 @@ import scala.concurrent.duration._ object MultiDcHeartbeatTakingOverSpecMultiJvmSpec extends MultiNodeConfig { val first = role("first") // alpha - val second = role("second") // alpha + val second = role("second") // alpha val third = role("third") // alpha val fourth = role("fourth") // beta @@ -40,15 +40,15 @@ object MultiDcHeartbeatTakingOverSpecMultiJvmSpec extends MultiNodeConfig { """ akka { actor.provider = cluster - + loggers = ["akka.testkit.TestEventListener"] loglevel = INFO - + remote.log-remote-lifecycle-events = off - + cluster { debug.verbose-heartbeat-logging = off - + multi-data-center { cross-data-center-connections = 2 } @@ -136,7 +136,7 @@ abstract class MultiDcHeartbeatTakingOverSpec extends MultiNodeSpec(MultiDcHeart "other node must become oldest when current DC-oldest Leaves" taggedAs LongRunningTest in { val observer = TestProbe("alpha-observer-prime") - // we leave one of the current oldest nodes of the `alpha` DC, + // we leave one of the current oldest nodes of the `alpha` DC, // since it has 3 members the "not yet oldest" one becomes oldest and should start monitoring across datacenter val preLeaveOldestAlphaRole = expectedAlphaHeartbeaterRoles.head val preLeaveOldestAlphaAddress = expectedAlphaHeartbeaterNodes.find(_.address.port.get == preLeaveOldestAlphaRole.port.get).get.address @@ -182,13 +182,12 @@ abstract class MultiDcHeartbeatTakingOverSpec extends MultiNodeSpec(MultiDcHeart */ private def membersByAge(): immutable.SortedSet[Member] = SortedSet.empty(Member.ageOrdering) - .union(cluster.state.members.filter(m ⇒ m.status != MemberStatus.WeaklyUp && m.status != MemberStatus.WeaklyUp)) + .union(cluster.state.members.filter(m ⇒ m.status != MemberStatus.Joining && m.status != MemberStatus.WeaklyUp)) /** INTERNAL API */ @InternalApi private[cluster] def takeNOldestMembers(memberFilter: Member ⇒ Boolean, n: Int): immutable.SortedSet[Member] = membersByAge() - .filter(m ⇒ m.status != MemberStatus.Joining && m.status != MemberStatus.WeaklyUp) .filter(memberFilter) .take(n) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcSunnyWeatherSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcSunnyWeatherSpec.scala index fec90f9720..e13f2bc691 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcSunnyWeatherSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcSunnyWeatherSpec.scala @@ -38,15 +38,15 @@ object MultiDcSunnyWeatherMultiJvmSpec extends MultiNodeConfig { """ akka { actor.provider = cluster - + loggers = ["akka.testkit.TestEventListener"] loglevel = INFO - + remote.log-remote-lifecycle-events = off - + cluster { debug.verbose-heartbeat-logging = off - + multi-data-center { cross-data-center-connections = 2 } @@ -149,13 +149,12 @@ abstract class MultiDcSunnyWeatherSpec extends MultiNodeSpec(MultiDcSunnyWeather */ private def membersByAge(): immutable.SortedSet[Member] = SortedSet.empty(Member.ageOrdering) - .union(cluster.state.members.filter(m ⇒ m.status != MemberStatus.WeaklyUp && m.status != MemberStatus.WeaklyUp)) + .union(cluster.state.members.filter(m ⇒ m.status != MemberStatus.Joining && m.status != MemberStatus.WeaklyUp)) /** INTERNAL API */ @InternalApi private[cluster] def takeNOldestMembers(memberFilter: Member ⇒ Boolean, n: Int): immutable.SortedSet[Member] = membersByAge() - .filter(m ⇒ m.status != MemberStatus.Joining && m.status != MemberStatus.WeaklyUp) .filter(memberFilter) .take(n)