diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index 181ee76774..18ad6853c6 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -5,7 +5,7 @@ package akka.cluster import language.postfixOps import scala.collection.immutable -import scala.collection.immutable.VectorBuilder +import scala.collection.immutable.{ SortedSet, VectorBuilder } import akka.actor.{ Actor, ActorLogging, ActorRef, Address } import akka.cluster.ClusterSettings.DataCenter import akka.cluster.ClusterEvent._ @@ -16,6 +16,7 @@ import akka.actor.DeadLetterSuppression import akka.annotation.InternalApi import scala.collection.breakOut +import scala.runtime.AbstractFunction5 /** * Domain events published to the event bus. @@ -55,17 +56,52 @@ object ClusterEvent { */ sealed trait ClusterDomainEvent extends DeadLetterSuppression + // for binary compatibility (used to be a case class) + object CurrentClusterState extends AbstractFunction5[immutable.SortedSet[Member], Set[Member], Set[Address], Option[Address], Map[String, Option[Address]], CurrentClusterState] { + + def apply( + members: immutable.SortedSet[Member] = immutable.SortedSet.empty, + unreachable: Set[Member] = Set.empty, + seenBy: Set[Address] = Set.empty, + leader: Option[Address] = None, + roleLeaderMap: Map[String, Option[Address]] = Map.empty): CurrentClusterState = + new CurrentClusterState(members, unreachable, seenBy, leader, roleLeaderMap) + + def unapply(cs: CurrentClusterState): Option[(immutable.SortedSet[Member], Set[Member], Set[Address], Option[Address], Map[String, Option[Address]])] = + Some(( + cs.members, + cs.unreachable, + cs.seenBy, + cs.leader, + cs.roleLeaderMap + )) + + } + /** * Current snapshot state of the cluster. Sent to new subscriber. * * @param leader leader of the data center of this node */ - final case class CurrentClusterState( - members: immutable.SortedSet[Member] = immutable.SortedSet.empty, - unreachable: Set[Member] = Set.empty, - seenBy: Set[Address] = Set.empty, - leader: Option[Address] = None, - roleLeaderMap: Map[String, Option[Address]] = Map.empty) { + @SerialVersionUID(2) + final class CurrentClusterState( + val members: immutable.SortedSet[Member], + val unreachable: Set[Member], + val seenBy: Set[Address], + val leader: Option[Address], + val roleLeaderMap: Map[String, Option[Address]], + val unreachableDataCenters: Set[DataCenter]) + extends Product5[immutable.SortedSet[Member], Set[Member], Set[Address], Option[Address], Map[String, Option[Address]]] + with Serializable { + + // for binary compatibility + def this( + members: immutable.SortedSet[Member] = immutable.SortedSet.empty, + unreachable: Set[Member] = Set.empty, + seenBy: Set[Address] = Set.empty, + leader: Option[Address] = None, + roleLeaderMap: Map[String, Option[Address]] = Map.empty) = + this(members, unreachable, seenBy, leader, roleLeaderMap, Set.empty) /** * Java API: get current member list. @@ -125,6 +161,47 @@ object ClusterEvent { def getAllDataCenters: java.util.Set[String] = scala.collection.JavaConverters.setAsJavaSetConverter(allDataCenters).asJava + /** + * Replace the set of unreachable datacenters with the given set + */ + def withUnreachableDataCenters(unreachableDataCenters: Set[DataCenter]): CurrentClusterState = + new CurrentClusterState(members, unreachable, seenBy, leader, roleLeaderMap, unreachableDataCenters) + + // for binary compatibility (used to be a case class) + def copy( + members: immutable.SortedSet[Member] = this.members, + unreachable: Set[Member] = this.unreachable, + seenBy: Set[Address] = this.seenBy, + leader: Option[Address] = this.leader, + roleLeaderMap: Map[String, Option[Address]] = this.roleLeaderMap) = + new CurrentClusterState(members, unreachable, seenBy, leader, roleLeaderMap, unreachableDataCenters) + + override def equals(other: Any): Boolean = other match { + case that: CurrentClusterState ⇒ + (this eq that) || ( + members == that.members && + unreachable == that.unreachable && + seenBy == that.seenBy && + leader == that.leader && + roleLeaderMap == that.roleLeaderMap) + case _ ⇒ false + } + + override def hashCode(): Int = { + val state = Seq(members, unreachable, seenBy, leader, roleLeaderMap) + state.map(_.hashCode()).foldLeft(0)((a, b) ⇒ 31 * a + b) + } + + // Product5 + override def productPrefix = "CurrentClusterState" + def _1: SortedSet[Member] = members + def _2: Set[Member] = unreachable + def _3: Set[Address] = seenBy + def _4: Option[Address] = leader + def _5: Map[String, Option[Address]] = roleLeaderMap + def canEqual(that: Any): Boolean = that.isInstanceOf[CurrentClusterState] + + override def toString = s"CurrentClusterState($members, $unreachable, $seenBy, $leader, $roleLeaderMap)" } /** @@ -305,7 +382,10 @@ object ClusterEvent { }(collection.breakOut) } - private def isReachable(state: MembershipState, oldUnreachableNodes: Set[UniqueAddress])(otherDc: DataCenter): Boolean = { + /** + * Internal API + */ + private[cluster] def isReachable(state: MembershipState, oldUnreachableNodes: Set[UniqueAddress])(otherDc: DataCenter): Boolean = { val unrelatedDcNodes = state.latestGossip.members.collect { case m if m.dataCenter != otherDc && m.dataCenter != state.selfDc ⇒ m.uniqueAddress } @@ -464,13 +544,20 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto membershipState.dcReachabilityNoOutsideNodes.allUnreachableOrTerminated.collect { case node if node != selfUniqueAddress ⇒ membershipState.latestGossip.member(node) } - val state = CurrentClusterState( + + val unreachableDataCenters: Set[DataCenter] = + if (!membershipState.latestGossip.isMultiDc) Set.empty + else membershipState.latestGossip.allDataCenters.filterNot(isReachable(membershipState, Set.empty)) + + val state = new CurrentClusterState( members = membershipState.latestGossip.members, unreachable = unreachable, seenBy = membershipState.latestGossip.seenBy.map(_.address), leader = membershipState.leader.map(_.address), roleLeaderMap = membershipState.latestGossip.allRoles.map(r ⇒ - r → membershipState.roleLeader(r).map(_.address))(collection.breakOut)) + r → membershipState.roleLeader(r).map(_.address))(collection.breakOut), + unreachableDataCenters + ) receiver ! state } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala index 20799cfb11..40b32777de 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala @@ -70,6 +70,12 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { _state = _state.copy(roleLeaderMap = _state.roleLeaderMap + (role → leader)) case stats: CurrentInternalStats ⇒ _latestStats = stats case ClusterShuttingDown ⇒ + + case r: ReachableDataCenter ⇒ + _state = _state.withUnreachableDataCenters(_state.unreachableDataCenters - r.dataCenter) + case r: UnreachableDataCenter ⇒ + _state = _state.withUnreachableDataCenters(_state.unreachableDataCenters + r.dataCenter) + } case s: CurrentClusterState ⇒ _state = s }