Merge pull request #23529 from akka/wip-23371-CrossDcHeartbeatSender-patriknw
Missing become after CurrentClusterState in CrossDcHeartbeatSender, #23371
This commit is contained in:
commit
eefe6474c3
3 changed files with 29 additions and 30 deletions
|
|
@ -58,8 +58,7 @@ private[cluster] final class CrossDcHeartbeatSender extends Actor with ActorLogg
|
|||
selfDataCenter,
|
||||
crossDcFailureDetector,
|
||||
crossDcSettings.NrOfMonitoringActors,
|
||||
SortedSet.empty
|
||||
)
|
||||
SortedSet.empty)
|
||||
|
||||
// start periodic heartbeat to other nodes in cluster
|
||||
val heartbeatTask = scheduler.schedule(
|
||||
|
|
@ -125,10 +124,11 @@ private[cluster] final class CrossDcHeartbeatSender extends Actor with ActorLogg
|
|||
val nodes = snapshot.members
|
||||
val nrOfMonitoredNodes = crossDcSettings.NrOfMonitoringActors
|
||||
dataCentersState = CrossDcHeartbeatingState.init(selfDataCenter, crossDcFailureDetector, nrOfMonitoredNodes, nodes)
|
||||
becomeActiveIfResponsibleForHeartbeat()
|
||||
}
|
||||
|
||||
def addMember(m: Member): Unit =
|
||||
if (m.status != MemberStatus.Joining && m.status != MemberStatus.WeaklyUp) {
|
||||
if (CrossDcHeartbeatingState.atLeastInUpState(m)) {
|
||||
// since we only monitor nodes in Up or later states, due to the n-th oldest requirement
|
||||
dataCentersState = dataCentersState.addMember(m)
|
||||
if (verboseHeartbeat && m.dataCenter != selfDataCenter)
|
||||
|
|
@ -194,7 +194,7 @@ private[cluster] final class CrossDcHeartbeatSender extends Actor with ActorLogg
|
|||
@InternalApi
|
||||
private[akka] object CrossDcHeartbeatSender {
|
||||
|
||||
// -- messages intended only for local messaging during testing --
|
||||
// -- messages intended only for local messaging during testing --
|
||||
sealed trait InspectionCommand extends NoSerializationVerificationNeeded
|
||||
final case class ReportStatus()
|
||||
|
||||
|
|
@ -202,7 +202,7 @@ private[akka] object CrossDcHeartbeatSender {
|
|||
sealed trait MonitoringStateReport extends StatusReport
|
||||
final case class MonitoringActive(state: CrossDcHeartbeatingState) extends MonitoringStateReport
|
||||
final case class MonitoringDormant() extends MonitoringStateReport
|
||||
// -- end of messages intended only for local messaging during testing --
|
||||
// -- end of messages intended only for local messaging during testing --
|
||||
}
|
||||
|
||||
/** INTERNAL API */
|
||||
|
|
@ -219,9 +219,7 @@ private[cluster] final case class CrossDcHeartbeatingState(
|
|||
* Only the `nrOfMonitoredNodesPerDc`-oldest nodes in each DC fulfil this role.
|
||||
*/
|
||||
def shouldActivelyMonitorNodes(selfDc: ClusterSettings.DataCenter, selfAddress: UniqueAddress): Boolean = {
|
||||
/** Since we need ordering of oldests guaranteed, we must only look at Up (or Leaving, Exiting...) nodes */
|
||||
def atLeastInUpState(m: Member): Boolean =
|
||||
m.status != MemberStatus.WeaklyUp && m.status != MemberStatus.Joining
|
||||
// Since we need ordering of oldests guaranteed, we must only look at Up (or Leaving, Exiting...) nodes
|
||||
|
||||
val selfDcNeighbours: SortedSet[Member] = state.getOrElse(selfDc, emptyMembersSortedSet)
|
||||
val selfDcOldOnes = selfDcNeighbours.filter(atLeastInUpState).take(nrOfMonitoredNodesPerDc)
|
||||
|
|
@ -244,12 +242,12 @@ private[cluster] final case class CrossDcHeartbeatingState(
|
|||
val updatedState = this.copy(state = state.updated(dc, updatedMembers))
|
||||
|
||||
// guarding against the case of two members having the same upNumber, in which case the activeReceivers
|
||||
// which are based on the ageOrdering could actually have changed by adding a node. In practice this
|
||||
// should happen rarely, since upNumbers are assigned sequentially, and we only ever compare nodes
|
||||
// in the same DC. If it happens though, we need to remove the previously monitored node from the failure
|
||||
// which are based on the ageOrdering could actually have changed by adding a node. In practice this
|
||||
// should happen rarely, since upNumbers are assigned sequentially, and we only ever compare nodes
|
||||
// in the same DC. If it happens though, we need to remove the previously monitored node from the failure
|
||||
// detector, to prevent both a resource leak and that node actually appearing as unreachable in the gossip (!)
|
||||
val stoppedMonitoringReceivers = updatedState.activeReceiversIn(dc) diff this.activeReceiversIn(dc)
|
||||
stoppedMonitoringReceivers.foreach(m ⇒ failureDetector.remove(m.address)) // at most one element difference
|
||||
stoppedMonitoringReceivers.foreach(m ⇒ failureDetector.remove(m.address)) // at most one element difference
|
||||
|
||||
updatedState
|
||||
}
|
||||
|
|
@ -263,7 +261,7 @@ private[cluster] final case class CrossDcHeartbeatingState(
|
|||
failureDetector.remove(m.address)
|
||||
copy(state = state.updated(dc, updatedMembers))
|
||||
case None ⇒
|
||||
this // no change needed, was certainly not present (not even its DC was)
|
||||
this // no change needed, was certainly not present (not even its DC was)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -274,8 +272,7 @@ private[cluster] final case class CrossDcHeartbeatingState(
|
|||
|
||||
allOtherNodes.flatMap(
|
||||
_.take(nrOfMonitoredNodesPerDc)
|
||||
.map(_.uniqueAddress)(breakOut)
|
||||
).toSet
|
||||
.map(_.uniqueAddress)(breakOut)).toSet
|
||||
}
|
||||
|
||||
/** Lists addresses in diven DataCenter that this node should send heartbeats to */
|
||||
|
|
@ -310,6 +307,10 @@ private[cluster] object CrossDcHeartbeatingState {
|
|||
/** Sorted by age */
|
||||
private def emptyMembersSortedSet: SortedSet[Member] = SortedSet.empty[Member](Member.ageOrdering)
|
||||
|
||||
// Since we need ordering of oldests guaranteed, we must only look at Up (or Leaving, Exiting...) nodes
|
||||
def atLeastInUpState(m: Member): Boolean =
|
||||
m.status != MemberStatus.WeaklyUp && m.status != MemberStatus.Joining
|
||||
|
||||
def init(
|
||||
selfDataCenter: DataCenter,
|
||||
crossDcFailureDetector: FailureDetectorRegistry[Address],
|
||||
|
|
@ -321,7 +322,7 @@ private[cluster] object CrossDcHeartbeatingState {
|
|||
nrOfMonitoredNodesPerDc,
|
||||
state = {
|
||||
// TODO unduplicate this with the logic in MembershipState.ageSortedTopOldestMembersPerDc
|
||||
val groupedByDc = members.groupBy(_.dataCenter)
|
||||
val groupedByDc = members.filter(atLeastInUpState).groupBy(_.dataCenter)
|
||||
|
||||
if (members.ordering == Member.ageOrdering) {
|
||||
// we already have the right ordering
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ import scala.concurrent.duration._
|
|||
|
||||
object MultiDcHeartbeatTakingOverSpecMultiJvmSpec extends MultiNodeConfig {
|
||||
val first = role("first") // alpha
|
||||
val second = role("second") // alpha
|
||||
val second = role("second") // alpha
|
||||
val third = role("third") // alpha
|
||||
|
||||
val fourth = role("fourth") // beta
|
||||
|
|
@ -40,15 +40,15 @@ object MultiDcHeartbeatTakingOverSpecMultiJvmSpec extends MultiNodeConfig {
|
|||
"""
|
||||
akka {
|
||||
actor.provider = cluster
|
||||
|
||||
|
||||
loggers = ["akka.testkit.TestEventListener"]
|
||||
loglevel = INFO
|
||||
|
||||
|
||||
remote.log-remote-lifecycle-events = off
|
||||
|
||||
|
||||
cluster {
|
||||
debug.verbose-heartbeat-logging = off
|
||||
|
||||
|
||||
multi-data-center {
|
||||
cross-data-center-connections = 2
|
||||
}
|
||||
|
|
@ -136,7 +136,7 @@ abstract class MultiDcHeartbeatTakingOverSpec extends MultiNodeSpec(MultiDcHeart
|
|||
"other node must become oldest when current DC-oldest Leaves" taggedAs LongRunningTest in {
|
||||
val observer = TestProbe("alpha-observer-prime")
|
||||
|
||||
// we leave one of the current oldest nodes of the `alpha` DC,
|
||||
// we leave one of the current oldest nodes of the `alpha` DC,
|
||||
// since it has 3 members the "not yet oldest" one becomes oldest and should start monitoring across datacenter
|
||||
val preLeaveOldestAlphaRole = expectedAlphaHeartbeaterRoles.head
|
||||
val preLeaveOldestAlphaAddress = expectedAlphaHeartbeaterNodes.find(_.address.port.get == preLeaveOldestAlphaRole.port.get).get.address
|
||||
|
|
@ -182,13 +182,12 @@ abstract class MultiDcHeartbeatTakingOverSpec extends MultiNodeSpec(MultiDcHeart
|
|||
*/
|
||||
private def membersByAge(): immutable.SortedSet[Member] =
|
||||
SortedSet.empty(Member.ageOrdering)
|
||||
.union(cluster.state.members.filter(m ⇒ m.status != MemberStatus.WeaklyUp && m.status != MemberStatus.WeaklyUp))
|
||||
.union(cluster.state.members.filter(m ⇒ m.status != MemberStatus.Joining && m.status != MemberStatus.WeaklyUp))
|
||||
|
||||
/** INTERNAL API */
|
||||
@InternalApi
|
||||
private[cluster] def takeNOldestMembers(memberFilter: Member ⇒ Boolean, n: Int): immutable.SortedSet[Member] =
|
||||
membersByAge()
|
||||
.filter(m ⇒ m.status != MemberStatus.Joining && m.status != MemberStatus.WeaklyUp)
|
||||
.filter(memberFilter)
|
||||
.take(n)
|
||||
|
||||
|
|
|
|||
|
|
@ -38,15 +38,15 @@ object MultiDcSunnyWeatherMultiJvmSpec extends MultiNodeConfig {
|
|||
"""
|
||||
akka {
|
||||
actor.provider = cluster
|
||||
|
||||
|
||||
loggers = ["akka.testkit.TestEventListener"]
|
||||
loglevel = INFO
|
||||
|
||||
|
||||
remote.log-remote-lifecycle-events = off
|
||||
|
||||
|
||||
cluster {
|
||||
debug.verbose-heartbeat-logging = off
|
||||
|
||||
|
||||
multi-data-center {
|
||||
cross-data-center-connections = 2
|
||||
}
|
||||
|
|
@ -149,13 +149,12 @@ abstract class MultiDcSunnyWeatherSpec extends MultiNodeSpec(MultiDcSunnyWeather
|
|||
*/
|
||||
private def membersByAge(): immutable.SortedSet[Member] =
|
||||
SortedSet.empty(Member.ageOrdering)
|
||||
.union(cluster.state.members.filter(m ⇒ m.status != MemberStatus.WeaklyUp && m.status != MemberStatus.WeaklyUp))
|
||||
.union(cluster.state.members.filter(m ⇒ m.status != MemberStatus.Joining && m.status != MemberStatus.WeaklyUp))
|
||||
|
||||
/** INTERNAL API */
|
||||
@InternalApi
|
||||
private[cluster] def takeNOldestMembers(memberFilter: Member ⇒ Boolean, n: Int): immutable.SortedSet[Member] =
|
||||
membersByAge()
|
||||
.filter(m ⇒ m.status != MemberStatus.Joining && m.status != MemberStatus.WeaklyUp)
|
||||
.filter(memberFilter)
|
||||
.take(n)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue