diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index a296ac8c2c..181ee76774 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -244,6 +244,22 @@ object ClusterEvent { */ final case class ReachableMember(member: Member) extends ReachabilityEvent + /** + * Marker interface to facilitate subscription of + * both [[UnreachableDataCenter]] and [[ReachableDataCenter]]. + */ + sealed trait DataCenterReachabilityEvent extends ClusterDomainEvent + + /** + * A data center is considered as unreachable when any members from the data center are unreachable + */ + final case class UnreachableDataCenter(dataCenter: DataCenter) extends DataCenterReachabilityEvent + + /** + * A data center is considered reachable when all members from the data center are reachable + */ + final case class ReachableDataCenter(dataCenter: DataCenter) extends DataCenterReachabilityEvent + /** * INTERNAL API * The nodes that have seen current version of the Gossip. @@ -289,6 +305,41 @@ object ClusterEvent { }(collection.breakOut) } + private def isReachable(state: MembershipState, oldUnreachableNodes: Set[UniqueAddress])(otherDc: DataCenter): Boolean = { + val unrelatedDcNodes = state.latestGossip.members.collect { + case m if m.dataCenter != otherDc && m.dataCenter != state.selfDc ⇒ m.uniqueAddress + } + + val reachabilityForOtherDc = state.dcReachabilityWithoutObservationsWithin.remove(unrelatedDcNodes) + reachabilityForOtherDc.allUnreachable.filterNot(oldUnreachableNodes).isEmpty + } + + /** + * INTERNAL API + */ + private[cluster] def diffUnreachableDataCenter(oldState: MembershipState, newState: MembershipState): immutable.Seq[UnreachableDataCenter] = { + if (newState eq oldState) Nil + else { + val otherDcs = (oldState.latestGossip.allDataCenters union newState.latestGossip.allDataCenters) - newState.selfDc + otherDcs.filterNot(isReachable(newState, oldState.dcReachability.allUnreachableOrTerminated)).map(UnreachableDataCenter)(collection.breakOut) + } + } + + /** + * INTERNAL API + */ + private[cluster] def diffReachableDataCenter(oldState: MembershipState, newState: MembershipState): immutable.Seq[ReachableDataCenter] = { + if (newState eq oldState) Nil + else { + val otherDcs = (oldState.latestGossip.allDataCenters union newState.latestGossip.allDataCenters) - newState.selfDc + + val oldUnreachableDcs = otherDcs.filterNot(isReachable(oldState, Set())) + val currentUnreachableDcs = otherDcs.filterNot(isReachable(newState, Set())) + + (oldUnreachableDcs diff currentUnreachableDcs).map(ReachableDataCenter)(collection.breakOut) + } + } + /** * INTERNAL API. */ @@ -457,6 +508,8 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto diffMemberEvents(oldState, newState) foreach pub diffUnreachable(oldState, newState) foreach pub diffReachable(oldState, newState) foreach pub + diffUnreachableDataCenter(oldState, newState) foreach pub + diffReachableDataCenter(oldState, newState) foreach pub diffLeader(oldState, newState) foreach pub diffRolesLeader(oldState, newState) foreach pub // publish internal SeenState for testing purposes diff --git a/akka-cluster/src/main/scala/akka/cluster/MembershipState.scala b/akka-cluster/src/main/scala/akka/cluster/MembershipState.scala index 0c893f4e55..1cf99419c4 100644 --- a/akka-cluster/src/main/scala/akka/cluster/MembershipState.scala +++ b/akka-cluster/src/main/scala/akka/cluster/MembershipState.scala @@ -79,8 +79,14 @@ import scala.util.Random * nodes outside of the data center */ lazy val dcReachability: Reachability = - overview.reachability.removeObservers( - members.collect { case m if m.dataCenter != selfDc ⇒ m.uniqueAddress }) + overview.reachability.removeObservers(members.collect { case m if m.dataCenter != selfDc ⇒ m.uniqueAddress }) + + /** + * @return Reachability excluding observations from nodes outside of the data center and observations within self data center, + * but including observed unreachable nodes outside of the data center + */ + lazy val dcReachabilityWithoutObservationsWithin: Reachability = + dcReachability.filterRecords { r ⇒ latestGossip.member(r.subject).dataCenter != selfDc } /** * @return reachability for data center nodes, with observations from outside the data center or from downed nodes filtered out diff --git a/akka-cluster/src/main/scala/akka/cluster/Reachability.scala b/akka-cluster/src/main/scala/akka/cluster/Reachability.scala index 3afe0bc8ab..2bc50cd44b 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Reachability.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Reachability.scala @@ -209,6 +209,9 @@ private[cluster] class Reachability private ( Reachability(newRecords, newVersions) } + def filterRecords(f: Record ⇒ Boolean) = + Reachability(records.filter(f), versions) + def status(observer: UniqueAddress, subject: UniqueAddress): ReachabilityStatus = observerRows(observer) match { case None ⇒ Reachable diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala index e86fe2a1b5..ce21fa37be 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala @@ -38,6 +38,8 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish var publisher: ActorRef = _ + final val OtherDataCenter = "dc2" + val aUp = TestMember(Address(protocol, "sys", "a", 2552), Up) val aLeaving = aUp.copy(status = Leaving) val aExiting = aLeaving.copy(status = Exiting) @@ -49,6 +51,7 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish val cRemoved = cUp.copy(status = Removed) val a51Up = TestMember(Address(protocol, "sys", "a", 2551), Up) val dUp = TestMember(Address(protocol, "sys", "d", 2552), Up, Set("GRP")) + val eUp = TestMember(Address(protocol, "sys", "e", 2552), Up, Set("GRP"), OtherDataCenter) private def state(gossip: Gossip, self: UniqueAddress, dc: DataCenter) = MembershipState(gossip, self, DefaultDataCenter, crossDcConnections = 5) @@ -74,6 +77,12 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish val g8 = Gossip(members = SortedSet(aUp, bExiting, cUp, dUp), overview = GossipOverview(reachability = Reachability.empty.unreachable(aUp.uniqueAddress, dUp.uniqueAddress))).seen(aUp.uniqueAddress) val state8 = state(g8, aUp.uniqueAddress, DefaultDataCenter) + val g9 = Gossip(members = SortedSet(aUp, bExiting, cUp, dUp, eUp), overview = GossipOverview(reachability = + Reachability.empty.unreachable(aUp.uniqueAddress, eUp.uniqueAddress))) + val state9 = state(g9, aUp.uniqueAddress, DefaultDataCenter) + val g10 = Gossip(members = SortedSet(aUp, bExiting, cUp, dUp, eUp), overview = GossipOverview(reachability = + Reachability.empty)) + val state10 = state(g10, aUp.uniqueAddress, DefaultDataCenter) // created in beforeEach var memberSubscriber: TestProbe = _ @@ -160,7 +169,6 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish subscriber.expectMsgType[CurrentClusterState] // but only to the new subscriber memberSubscriber.expectNoMsg(500 millis) - } "send events corresponding to current state when subscribe" in { @@ -172,6 +180,17 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish subscriber.expectNoMsg(500 millis) } + "send datacenter reachability events" in { + val subscriber = TestProbe() + publisher ! PublishChanges(state9) + publisher ! Subscribe(subscriber.ref, InitialStateAsEvents, Set(classOf[DataCenterReachabilityEvent])) + subscriber.expectMsg(UnreachableDataCenter(OtherDataCenter)) + subscriber.expectNoMsg(500 millis) + publisher ! PublishChanges(state10) + subscriber.expectMsg(ReachableDataCenter(OtherDataCenter)) + subscriber.expectNoMsg(500 millis) + } + "support unsubscribe" in { val subscriber = TestProbe() publisher ! Subscribe(subscriber.ref, InitialStateAsSnapshot, Set(classOf[MemberEvent])) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala index 0ad9e55da9..505d52e7a3 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala @@ -89,6 +89,65 @@ class ClusterDomainEventSpec extends WordSpec with Matchers { diffSeen(state(g1), state(g2)) should ===(Seq.empty) } + "be produced for reachability observations between data centers" in { + val dc2AMemberUp = TestMember(Address("akka.tcp", "sys", "dc2A", 2552), Up, Set.empty, "dc2") + val dc2AMemberDown = TestMember(Address("akka.tcp", "sys", "dc2A", 2552), Down, Set.empty, "dc2") + val dc2BMemberUp = TestMember(Address("akka.tcp", "sys", "dc2B", 2552), Up, Set.empty, "dc2") + + val dc3AMemberUp = TestMember(Address("akka.tcp", "sys", "dc3A", 2552), Up, Set.empty, "dc3") + val dc3BMemberUp = TestMember(Address("akka.tcp", "sys", "dc3B", 2552), Up, Set.empty, "dc3") + + val reachability1 = Reachability.empty + val g1 = Gossip(members = SortedSet(aUp, bUp, dc2AMemberUp, dc2BMemberUp, dc3AMemberUp, dc3BMemberUp), overview = GossipOverview(reachability = reachability1)) + + val reachability2 = reachability1 + .unreachable(aUp.uniqueAddress, dc2AMemberDown.uniqueAddress) + .unreachable(dc2BMemberUp.uniqueAddress, dc2AMemberDown.uniqueAddress) + val g2 = Gossip(members = SortedSet(aUp, bUp, dc2AMemberDown, dc2BMemberUp, dc3AMemberUp, dc3BMemberUp), overview = GossipOverview(reachability = reachability2)) + + Set(aUp, bUp, dc2AMemberUp, dc2BMemberUp, dc3AMemberUp, dc3BMemberUp).foreach { member ⇒ + val otherDc = + if (member.dataCenter == ClusterSettings.DefaultDataCenter) Seq("dc2") + else Seq() + + diffUnreachableDataCenter( + MembershipState(g1, member.uniqueAddress, member.dataCenter, crossDcConnections = 5), + MembershipState(g2, member.uniqueAddress, member.dataCenter, crossDcConnections = 5)) should ===(otherDc.map(UnreachableDataCenter)) + + diffReachableDataCenter( + MembershipState(g2, member.uniqueAddress, member.dataCenter, crossDcConnections = 5), + MembershipState(g1, member.uniqueAddress, member.dataCenter, crossDcConnections = 5)) should ===(otherDc.map(ReachableDataCenter)) + } + } + + "not be produced for same reachability observations between data centers" in { + val dc2AMemberUp = TestMember(Address("akka.tcp", "sys", "dc2A", 2552), Up, Set.empty, "dc2") + val dc2AMemberDown = TestMember(Address("akka.tcp", "sys", "dc2A", 2552), Down, Set.empty, "dc2") + + val reachability1 = Reachability.empty + val g1 = Gossip(members = SortedSet(aUp, dc2AMemberUp), overview = GossipOverview(reachability = reachability1)) + + val reachability2 = reachability1 + .unreachable(aUp.uniqueAddress, dc2AMemberDown.uniqueAddress) + val g2 = Gossip(members = SortedSet(aUp, dc2AMemberDown), overview = GossipOverview(reachability = reachability2)) + + diffUnreachableDataCenter( + MembershipState(g1, aUp.uniqueAddress, aUp.dataCenter, crossDcConnections = 5), + MembershipState(g1, aUp.uniqueAddress, aUp.dataCenter, crossDcConnections = 5)) should ===(Seq()) + + diffUnreachableDataCenter( + MembershipState(g2, aUp.uniqueAddress, aUp.dataCenter, crossDcConnections = 5), + MembershipState(g2, aUp.uniqueAddress, aUp.dataCenter, crossDcConnections = 5)) should ===(Seq()) + + diffReachableDataCenter( + MembershipState(g1, aUp.uniqueAddress, aUp.dataCenter, crossDcConnections = 5), + MembershipState(g1, aUp.uniqueAddress, aUp.dataCenter, crossDcConnections = 5)) should ===(Seq()) + + diffReachableDataCenter( + MembershipState(g2, aUp.uniqueAddress, aUp.dataCenter, crossDcConnections = 5), + MembershipState(g2, aUp.uniqueAddress, aUp.dataCenter, crossDcConnections = 5)) should ===(Seq()) + } + "be produced for members becoming reachable after unreachable" in { val reachability1 = Reachability.empty. unreachable(aUp.uniqueAddress, cUp.uniqueAddress).reachable(aUp.uniqueAddress, cUp.uniqueAddress). diff --git a/akka-docs/.history b/akka-docs/.history deleted file mode 100644 index a3abe50906..0000000000 --- a/akka-docs/.history +++ /dev/null @@ -1 +0,0 @@ -exit