Data center reachability in cluster state (#23359)

* Manual case-declassing of CurrentClusterState #23347

* Unreachable data centers set in CurrentClusterState #23347
This commit is contained in:
Johan Andrén 2017-08-22 13:04:39 +02:00 committed by Patrik Nordwall
parent b86b10c477
commit cff43a16f7
2 changed files with 103 additions and 10 deletions

View file

@ -5,7 +5,7 @@ package akka.cluster
import language.postfixOps
import scala.collection.immutable
import scala.collection.immutable.VectorBuilder
import scala.collection.immutable.{ SortedSet, VectorBuilder }
import akka.actor.{ Actor, ActorLogging, ActorRef, Address }
import akka.cluster.ClusterSettings.DataCenter
import akka.cluster.ClusterEvent._
@ -16,6 +16,7 @@ import akka.actor.DeadLetterSuppression
import akka.annotation.InternalApi
import scala.collection.breakOut
import scala.runtime.AbstractFunction5
/**
* Domain events published to the event bus.
@ -55,17 +56,52 @@ object ClusterEvent {
*/
sealed trait ClusterDomainEvent extends DeadLetterSuppression
// for binary compatibility (used to be a case class)
object CurrentClusterState extends AbstractFunction5[immutable.SortedSet[Member], Set[Member], Set[Address], Option[Address], Map[String, Option[Address]], CurrentClusterState] {
def apply(
members: immutable.SortedSet[Member] = immutable.SortedSet.empty,
unreachable: Set[Member] = Set.empty,
seenBy: Set[Address] = Set.empty,
leader: Option[Address] = None,
roleLeaderMap: Map[String, Option[Address]] = Map.empty): CurrentClusterState =
new CurrentClusterState(members, unreachable, seenBy, leader, roleLeaderMap)
def unapply(cs: CurrentClusterState): Option[(immutable.SortedSet[Member], Set[Member], Set[Address], Option[Address], Map[String, Option[Address]])] =
Some((
cs.members,
cs.unreachable,
cs.seenBy,
cs.leader,
cs.roleLeaderMap
))
}
/**
* Current snapshot state of the cluster. Sent to new subscriber.
*
* @param leader leader of the data center of this node
*/
final case class CurrentClusterState(
members: immutable.SortedSet[Member] = immutable.SortedSet.empty,
unreachable: Set[Member] = Set.empty,
seenBy: Set[Address] = Set.empty,
leader: Option[Address] = None,
roleLeaderMap: Map[String, Option[Address]] = Map.empty) {
@SerialVersionUID(2)
final class CurrentClusterState(
val members: immutable.SortedSet[Member],
val unreachable: Set[Member],
val seenBy: Set[Address],
val leader: Option[Address],
val roleLeaderMap: Map[String, Option[Address]],
val unreachableDataCenters: Set[DataCenter])
extends Product5[immutable.SortedSet[Member], Set[Member], Set[Address], Option[Address], Map[String, Option[Address]]]
with Serializable {
// for binary compatibility
def this(
members: immutable.SortedSet[Member] = immutable.SortedSet.empty,
unreachable: Set[Member] = Set.empty,
seenBy: Set[Address] = Set.empty,
leader: Option[Address] = None,
roleLeaderMap: Map[String, Option[Address]] = Map.empty) =
this(members, unreachable, seenBy, leader, roleLeaderMap, Set.empty)
/**
* Java API: get current member list.
@ -125,6 +161,47 @@ object ClusterEvent {
def getAllDataCenters: java.util.Set[String] =
scala.collection.JavaConverters.setAsJavaSetConverter(allDataCenters).asJava
/**
* Replace the set of unreachable datacenters with the given set
*/
def withUnreachableDataCenters(unreachableDataCenters: Set[DataCenter]): CurrentClusterState =
new CurrentClusterState(members, unreachable, seenBy, leader, roleLeaderMap, unreachableDataCenters)
// for binary compatibility (used to be a case class)
def copy(
members: immutable.SortedSet[Member] = this.members,
unreachable: Set[Member] = this.unreachable,
seenBy: Set[Address] = this.seenBy,
leader: Option[Address] = this.leader,
roleLeaderMap: Map[String, Option[Address]] = this.roleLeaderMap) =
new CurrentClusterState(members, unreachable, seenBy, leader, roleLeaderMap, unreachableDataCenters)
override def equals(other: Any): Boolean = other match {
case that: CurrentClusterState
(this eq that) || (
members == that.members &&
unreachable == that.unreachable &&
seenBy == that.seenBy &&
leader == that.leader &&
roleLeaderMap == that.roleLeaderMap)
case _ false
}
override def hashCode(): Int = {
val state = Seq(members, unreachable, seenBy, leader, roleLeaderMap)
state.map(_.hashCode()).foldLeft(0)((a, b) 31 * a + b)
}
// Product5
override def productPrefix = "CurrentClusterState"
def _1: SortedSet[Member] = members
def _2: Set[Member] = unreachable
def _3: Set[Address] = seenBy
def _4: Option[Address] = leader
def _5: Map[String, Option[Address]] = roleLeaderMap
def canEqual(that: Any): Boolean = that.isInstanceOf[CurrentClusterState]
override def toString = s"CurrentClusterState($members, $unreachable, $seenBy, $leader, $roleLeaderMap)"
}
/**
@ -305,7 +382,10 @@ object ClusterEvent {
}(collection.breakOut)
}
private def isReachable(state: MembershipState, oldUnreachableNodes: Set[UniqueAddress])(otherDc: DataCenter): Boolean = {
/**
* Internal API
*/
private[cluster] def isReachable(state: MembershipState, oldUnreachableNodes: Set[UniqueAddress])(otherDc: DataCenter): Boolean = {
val unrelatedDcNodes = state.latestGossip.members.collect {
case m if m.dataCenter != otherDc && m.dataCenter != state.selfDc m.uniqueAddress
}
@ -464,13 +544,20 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
membershipState.dcReachabilityNoOutsideNodes.allUnreachableOrTerminated.collect {
case node if node != selfUniqueAddress membershipState.latestGossip.member(node)
}
val state = CurrentClusterState(
val unreachableDataCenters: Set[DataCenter] =
if (!membershipState.latestGossip.isMultiDc) Set.empty
else membershipState.latestGossip.allDataCenters.filterNot(isReachable(membershipState, Set.empty))
val state = new CurrentClusterState(
members = membershipState.latestGossip.members,
unreachable = unreachable,
seenBy = membershipState.latestGossip.seenBy.map(_.address),
leader = membershipState.leader.map(_.address),
roleLeaderMap = membershipState.latestGossip.allRoles.map(r
r membershipState.roleLeader(r).map(_.address))(collection.breakOut))
r membershipState.roleLeader(r).map(_.address))(collection.breakOut),
unreachableDataCenters
)
receiver ! state
}