parent
b43bdc49d8
commit
12196d674e
3 changed files with 34 additions and 21 deletions
|
|
@ -53,15 +53,25 @@ class Member private[cluster] (
|
||||||
* Is this member older, has been part of cluster longer, than another
|
* Is this member older, has been part of cluster longer, than another
|
||||||
* member. It is only correct when comparing two existing members in a
|
* member. It is only correct when comparing two existing members in a
|
||||||
* cluster. A member that joined after removal of another member may be
|
* cluster. A member that joined after removal of another member may be
|
||||||
* considered older than the removed member. Note that is only makes
|
* considered older than the removed member.
|
||||||
* sense to compare with other members inside of one data center (upNumber has
|
*
|
||||||
* a higher risk of being reused across data centers). // TODO should we enforce this to compare only within DCs?
|
* Note that it only makes sense to compare with other members of
|
||||||
|
* same data center (upNumber has a higher risk of being reused across data centers).
|
||||||
|
* To avoid mistakes of comparing members of different data centers this
|
||||||
|
* method will throw `IllegalArgumentException` if the members belong
|
||||||
|
* to different data centers.
|
||||||
*/
|
*/
|
||||||
def isOlderThan(other: Member): Boolean =
|
@throws[IllegalArgumentException]("if members from different data centers")
|
||||||
|
def isOlderThan(other: Member): Boolean = {
|
||||||
|
if (dataCenter != other.dataCenter)
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"Comparing members of different data centers with isOlderThan is not allowed. " +
|
||||||
|
s"[$this] vs. [$other]")
|
||||||
if (upNumber == other.upNumber)
|
if (upNumber == other.upNumber)
|
||||||
Member.addressOrdering.compare(address, other.address) < 0
|
Member.addressOrdering.compare(address, other.address) < 0
|
||||||
else
|
else
|
||||||
upNumber < other.upNumber
|
upNumber < other.upNumber
|
||||||
|
}
|
||||||
|
|
||||||
def copy(status: MemberStatus): Member = {
|
def copy(status: MemberStatus): Member = {
|
||||||
val oldStatus = this.status
|
val oldStatus = this.status
|
||||||
|
|
@ -141,6 +151,11 @@ object Member {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sort members by age, i.e. using [[Member#isOlderThan]].
|
* Sort members by age, i.e. using [[Member#isOlderThan]].
|
||||||
|
*
|
||||||
|
* Note that it only makes sense to compare with other members of
|
||||||
|
* same data center. To avoid mistakes of comparing members of different
|
||||||
|
* data centers it will throw `IllegalArgumentException` if the
|
||||||
|
* members belong to different data centers.
|
||||||
*/
|
*/
|
||||||
val ageOrdering: Ordering[Member] = Ordering.fromLessThan[Member] {
|
val ageOrdering: Ordering[Member] = Ordering.fromLessThan[Member] {
|
||||||
(a, b) ⇒ a.isOlderThan(b)
|
(a, b) ⇒ a.isOlderThan(b)
|
||||||
|
|
|
||||||
|
|
@ -85,10 +85,10 @@ abstract class MultiDcHeartbeatTakingOverSpec extends MultiNodeSpec(MultiDcHeart
|
||||||
// end of these will be filled in during the initial phase of the test -----------
|
// end of these will be filled in during the initial phase of the test -----------
|
||||||
|
|
||||||
def refreshOldestMemberHeartbeatStatuses() = {
|
def refreshOldestMemberHeartbeatStatuses() = {
|
||||||
expectedAlphaHeartbeaterNodes = takeNOldestMembers(_.dataCenter == "alpha", 2)
|
expectedAlphaHeartbeaterNodes = takeNOldestMembers(dataCenter = "alpha", 2)
|
||||||
expectedAlphaHeartbeaterRoles = membersAsRoles(expectedAlphaHeartbeaterNodes)
|
expectedAlphaHeartbeaterRoles = membersAsRoles(expectedAlphaHeartbeaterNodes)
|
||||||
|
|
||||||
expectedBetaHeartbeaterNodes = takeNOldestMembers(_.dataCenter == "beta", 2)
|
expectedBetaHeartbeaterNodes = takeNOldestMembers(dataCenter = "beta", 2)
|
||||||
expectedBetaHeartbeaterRoles = membersAsRoles(expectedBetaHeartbeaterNodes)
|
expectedBetaHeartbeaterRoles = membersAsRoles(expectedBetaHeartbeaterNodes)
|
||||||
|
|
||||||
expectedNoActiveHeartbeatSenderRoles = roles.toSet -- (expectedAlphaHeartbeaterRoles union expectedBetaHeartbeaterRoles)
|
expectedNoActiveHeartbeatSenderRoles = roles.toSet -- (expectedAlphaHeartbeaterRoles union expectedBetaHeartbeaterRoles)
|
||||||
|
|
@ -154,7 +154,7 @@ abstract class MultiDcHeartbeatTakingOverSpec extends MultiNodeSpec(MultiDcHeart
|
||||||
enterBarrier("after-alpha-monitoring-node-left")
|
enterBarrier("after-alpha-monitoring-node-left")
|
||||||
|
|
||||||
implicit val sender = observer.ref
|
implicit val sender = observer.ref
|
||||||
val expectedAlphaMonitoringNodesAfterLeaving = (takeNOldestMembers(_.dataCenter == "alpha", 3).filterNot(_.status == MemberStatus.Exiting))
|
val expectedAlphaMonitoringNodesAfterLeaving = (takeNOldestMembers(dataCenter = "alpha", 3).filterNot(_.status == MemberStatus.Exiting))
|
||||||
runOn(membersAsRoles(expectedAlphaMonitoringNodesAfterLeaving).toList: _*) {
|
runOn(membersAsRoles(expectedAlphaMonitoringNodesAfterLeaving).toList: _*) {
|
||||||
awaitAssert({
|
awaitAssert({
|
||||||
|
|
||||||
|
|
@ -180,16 +180,15 @@ abstract class MultiDcHeartbeatTakingOverSpec extends MultiNodeSpec(MultiDcHeart
|
||||||
* strongly guaratnee the order of "oldest" members, as they're linearized by the order in which they become Up
|
* strongly guaratnee the order of "oldest" members, as they're linearized by the order in which they become Up
|
||||||
* (since marking that transition is a Leader action).
|
* (since marking that transition is a Leader action).
|
||||||
*/
|
*/
|
||||||
private def membersByAge(): immutable.SortedSet[Member] =
|
private def membersByAge(dataCenter: ClusterSettings.DataCenter): immutable.SortedSet[Member] =
|
||||||
SortedSet.empty(Member.ageOrdering)
|
SortedSet.empty(Member.ageOrdering)
|
||||||
.union(cluster.state.members.filter(m ⇒ m.status != MemberStatus.Joining && m.status != MemberStatus.WeaklyUp))
|
.union(cluster.state.members.filter(m ⇒ m.dataCenter == dataCenter &&
|
||||||
|
m.status != MemberStatus.Joining && m.status != MemberStatus.WeaklyUp))
|
||||||
|
|
||||||
/** INTERNAL API */
|
/** INTERNAL API */
|
||||||
@InternalApi
|
@InternalApi
|
||||||
private[cluster] def takeNOldestMembers(memberFilter: Member ⇒ Boolean, n: Int): immutable.SortedSet[Member] =
|
private[cluster] def takeNOldestMembers(dataCenter: ClusterSettings.DataCenter, n: Int): immutable.SortedSet[Member] =
|
||||||
membersByAge()
|
membersByAge(dataCenter).take(n)
|
||||||
.filter(memberFilter)
|
|
||||||
.take(n)
|
|
||||||
|
|
||||||
private def membersAsRoles(ms: SortedSet[Member]): SortedSet[RoleName] = {
|
private def membersAsRoles(ms: SortedSet[Member]): SortedSet[RoleName] = {
|
||||||
val res = ms.flatMap(m ⇒ roleName(m.address))
|
val res = ms.flatMap(m ⇒ roleName(m.address))
|
||||||
|
|
|
||||||
|
|
@ -76,10 +76,10 @@ abstract class MultiDcSunnyWeatherSpec extends MultiNodeSpec(MultiDcSunnyWeather
|
||||||
val crossDcHeartbeatSenderPath = "/system/cluster/core/daemon/crossDcHeartbeatSender"
|
val crossDcHeartbeatSenderPath = "/system/cluster/core/daemon/crossDcHeartbeatSender"
|
||||||
val selectCrossDcHeartbeatSender = system.actorSelection(crossDcHeartbeatSenderPath)
|
val selectCrossDcHeartbeatSender = system.actorSelection(crossDcHeartbeatSenderPath)
|
||||||
|
|
||||||
val expectedAlphaHeartbeaterNodes = takeNOldestMembers(_.dataCenter == "alpha", 2)
|
val expectedAlphaHeartbeaterNodes = takeNOldestMembers(dataCenter = "alpha", 2)
|
||||||
val expectedAlphaHeartbeaterRoles = membersAsRoles(expectedAlphaHeartbeaterNodes)
|
val expectedAlphaHeartbeaterRoles = membersAsRoles(expectedAlphaHeartbeaterNodes)
|
||||||
|
|
||||||
val expectedBetaHeartbeaterNodes = takeNOldestMembers(_.dataCenter == "beta", 2)
|
val expectedBetaHeartbeaterNodes = takeNOldestMembers(dataCenter = "beta", 2)
|
||||||
val expectedBetaHeartbeaterRoles = membersAsRoles(expectedBetaHeartbeaterNodes)
|
val expectedBetaHeartbeaterRoles = membersAsRoles(expectedBetaHeartbeaterNodes)
|
||||||
|
|
||||||
val expectedNoActiveHeartbeatSenderRoles = roles.toSet -- (expectedAlphaHeartbeaterRoles union expectedBetaHeartbeaterRoles)
|
val expectedNoActiveHeartbeatSenderRoles = roles.toSet -- (expectedAlphaHeartbeaterRoles union expectedBetaHeartbeaterRoles)
|
||||||
|
|
@ -147,16 +147,15 @@ abstract class MultiDcSunnyWeatherSpec extends MultiNodeSpec(MultiDcSunnyWeather
|
||||||
* strongly guaratnee the order of "oldest" members, as they're linearized by the order in which they become Up
|
* strongly guaratnee the order of "oldest" members, as they're linearized by the order in which they become Up
|
||||||
* (since marking that transition is a Leader action).
|
* (since marking that transition is a Leader action).
|
||||||
*/
|
*/
|
||||||
private def membersByAge(): immutable.SortedSet[Member] =
|
private def membersByAge(dataCenter: ClusterSettings.DataCenter): immutable.SortedSet[Member] =
|
||||||
SortedSet.empty(Member.ageOrdering)
|
SortedSet.empty(Member.ageOrdering)
|
||||||
.union(cluster.state.members.filter(m ⇒ m.status != MemberStatus.Joining && m.status != MemberStatus.WeaklyUp))
|
.union(cluster.state.members.filter(m ⇒ m.dataCenter == dataCenter &&
|
||||||
|
m.status != MemberStatus.Joining && m.status != MemberStatus.WeaklyUp))
|
||||||
|
|
||||||
/** INTERNAL API */
|
/** INTERNAL API */
|
||||||
@InternalApi
|
@InternalApi
|
||||||
private[cluster] def takeNOldestMembers(memberFilter: Member ⇒ Boolean, n: Int): immutable.SortedSet[Member] =
|
private[cluster] def takeNOldestMembers(dataCenter: ClusterSettings.DataCenter, n: Int): immutable.SortedSet[Member] =
|
||||||
membersByAge()
|
membersByAge(dataCenter).take(n)
|
||||||
.filter(memberFilter)
|
|
||||||
.take(n)
|
|
||||||
|
|
||||||
private def membersAsRoles(ms: immutable.Set[Member]): immutable.Set[RoleName] = {
|
private def membersAsRoles(ms: immutable.Set[Member]): immutable.Set[RoleName] = {
|
||||||
val res = ms.flatMap(m ⇒ roleName(m.address))
|
val res = ms.flatMap(m ⇒ roleName(m.address))
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue