DC reachability events #23245
This commit is contained in:
parent
9c7e8d027a
commit
73d3c5db5d
6 changed files with 143 additions and 4 deletions
|
|
@ -244,6 +244,22 @@ object ClusterEvent {
|
||||||
*/
|
*/
|
||||||
final case class ReachableMember(member: Member) extends ReachabilityEvent
|
final case class ReachableMember(member: Member) extends ReachabilityEvent
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Marker interface to facilitate subscription of
|
||||||
|
* both [[UnreachableDataCenter]] and [[ReachableDataCenter]].
|
||||||
|
*/
|
||||||
|
sealed trait DataCenterReachabilityEvent extends ClusterDomainEvent
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A data center is considered as unreachable when any members from the data center are unreachable
|
||||||
|
*/
|
||||||
|
final case class UnreachableDataCenter(dataCenter: DataCenter) extends DataCenterReachabilityEvent
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A data center is considered reachable when all members from the data center are reachable
|
||||||
|
*/
|
||||||
|
final case class ReachableDataCenter(dataCenter: DataCenter) extends DataCenterReachabilityEvent
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
* The nodes that have seen current version of the Gossip.
|
* The nodes that have seen current version of the Gossip.
|
||||||
|
|
@ -289,6 +305,41 @@ object ClusterEvent {
|
||||||
}(collection.breakOut)
|
}(collection.breakOut)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private def isReachable(state: MembershipState, oldUnreachableNodes: Set[UniqueAddress])(otherDc: DataCenter): Boolean = {
|
||||||
|
val unrelatedDcNodes = state.latestGossip.members.collect {
|
||||||
|
case m if m.dataCenter != otherDc && m.dataCenter != state.selfDc ⇒ m.uniqueAddress
|
||||||
|
}
|
||||||
|
|
||||||
|
val reachabilityForOtherDc = state.dcReachabilityWithoutObservationsWithin.remove(unrelatedDcNodes)
|
||||||
|
reachabilityForOtherDc.allUnreachable.filterNot(oldUnreachableNodes).isEmpty
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*/
|
||||||
|
private[cluster] def diffUnreachableDataCenter(oldState: MembershipState, newState: MembershipState): immutable.Seq[UnreachableDataCenter] = {
|
||||||
|
if (newState eq oldState) Nil
|
||||||
|
else {
|
||||||
|
val otherDcs = (oldState.latestGossip.allDataCenters union newState.latestGossip.allDataCenters) - newState.selfDc
|
||||||
|
otherDcs.filterNot(isReachable(newState, oldState.dcReachability.allUnreachableOrTerminated)).map(UnreachableDataCenter)(collection.breakOut)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*/
|
||||||
|
private[cluster] def diffReachableDataCenter(oldState: MembershipState, newState: MembershipState): immutable.Seq[ReachableDataCenter] = {
|
||||||
|
if (newState eq oldState) Nil
|
||||||
|
else {
|
||||||
|
val otherDcs = (oldState.latestGossip.allDataCenters union newState.latestGossip.allDataCenters) - newState.selfDc
|
||||||
|
|
||||||
|
val oldUnreachableDcs = otherDcs.filterNot(isReachable(oldState, Set()))
|
||||||
|
val currentUnreachableDcs = otherDcs.filterNot(isReachable(newState, Set()))
|
||||||
|
|
||||||
|
(oldUnreachableDcs diff currentUnreachableDcs).map(ReachableDataCenter)(collection.breakOut)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API.
|
* INTERNAL API.
|
||||||
*/
|
*/
|
||||||
|
|
@ -457,6 +508,8 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
|
||||||
diffMemberEvents(oldState, newState) foreach pub
|
diffMemberEvents(oldState, newState) foreach pub
|
||||||
diffUnreachable(oldState, newState) foreach pub
|
diffUnreachable(oldState, newState) foreach pub
|
||||||
diffReachable(oldState, newState) foreach pub
|
diffReachable(oldState, newState) foreach pub
|
||||||
|
diffUnreachableDataCenter(oldState, newState) foreach pub
|
||||||
|
diffReachableDataCenter(oldState, newState) foreach pub
|
||||||
diffLeader(oldState, newState) foreach pub
|
diffLeader(oldState, newState) foreach pub
|
||||||
diffRolesLeader(oldState, newState) foreach pub
|
diffRolesLeader(oldState, newState) foreach pub
|
||||||
// publish internal SeenState for testing purposes
|
// publish internal SeenState for testing purposes
|
||||||
|
|
|
||||||
|
|
@ -79,8 +79,14 @@ import scala.util.Random
|
||||||
* nodes outside of the data center
|
* nodes outside of the data center
|
||||||
*/
|
*/
|
||||||
lazy val dcReachability: Reachability =
|
lazy val dcReachability: Reachability =
|
||||||
overview.reachability.removeObservers(
|
overview.reachability.removeObservers(members.collect { case m if m.dataCenter != selfDc ⇒ m.uniqueAddress })
|
||||||
members.collect { case m if m.dataCenter != selfDc ⇒ m.uniqueAddress })
|
|
||||||
|
/**
|
||||||
|
* @return Reachability excluding observations from nodes outside of the data center and observations within self data center,
|
||||||
|
* but including observed unreachable nodes outside of the data center
|
||||||
|
*/
|
||||||
|
lazy val dcReachabilityWithoutObservationsWithin: Reachability =
|
||||||
|
dcReachability.filterRecords { r ⇒ latestGossip.member(r.subject).dataCenter != selfDc }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return reachability for data center nodes, with observations from outside the data center or from downed nodes filtered out
|
* @return reachability for data center nodes, with observations from outside the data center or from downed nodes filtered out
|
||||||
|
|
|
||||||
|
|
@ -209,6 +209,9 @@ private[cluster] class Reachability private (
|
||||||
Reachability(newRecords, newVersions)
|
Reachability(newRecords, newVersions)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def filterRecords(f: Record ⇒ Boolean) =
|
||||||
|
Reachability(records.filter(f), versions)
|
||||||
|
|
||||||
def status(observer: UniqueAddress, subject: UniqueAddress): ReachabilityStatus =
|
def status(observer: UniqueAddress, subject: UniqueAddress): ReachabilityStatus =
|
||||||
observerRows(observer) match {
|
observerRows(observer) match {
|
||||||
case None ⇒ Reachable
|
case None ⇒ Reachable
|
||||||
|
|
|
||||||
|
|
@ -38,6 +38,8 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish
|
||||||
|
|
||||||
var publisher: ActorRef = _
|
var publisher: ActorRef = _
|
||||||
|
|
||||||
|
final val OtherDataCenter = "dc2"
|
||||||
|
|
||||||
val aUp = TestMember(Address(protocol, "sys", "a", 2552), Up)
|
val aUp = TestMember(Address(protocol, "sys", "a", 2552), Up)
|
||||||
val aLeaving = aUp.copy(status = Leaving)
|
val aLeaving = aUp.copy(status = Leaving)
|
||||||
val aExiting = aLeaving.copy(status = Exiting)
|
val aExiting = aLeaving.copy(status = Exiting)
|
||||||
|
|
@ -49,6 +51,7 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish
|
||||||
val cRemoved = cUp.copy(status = Removed)
|
val cRemoved = cUp.copy(status = Removed)
|
||||||
val a51Up = TestMember(Address(protocol, "sys", "a", 2551), Up)
|
val a51Up = TestMember(Address(protocol, "sys", "a", 2551), Up)
|
||||||
val dUp = TestMember(Address(protocol, "sys", "d", 2552), Up, Set("GRP"))
|
val dUp = TestMember(Address(protocol, "sys", "d", 2552), Up, Set("GRP"))
|
||||||
|
val eUp = TestMember(Address(protocol, "sys", "e", 2552), Up, Set("GRP"), OtherDataCenter)
|
||||||
|
|
||||||
private def state(gossip: Gossip, self: UniqueAddress, dc: DataCenter) =
|
private def state(gossip: Gossip, self: UniqueAddress, dc: DataCenter) =
|
||||||
MembershipState(gossip, self, DefaultDataCenter, crossDcConnections = 5)
|
MembershipState(gossip, self, DefaultDataCenter, crossDcConnections = 5)
|
||||||
|
|
@ -74,6 +77,12 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish
|
||||||
val g8 = Gossip(members = SortedSet(aUp, bExiting, cUp, dUp), overview = GossipOverview(reachability =
|
val g8 = Gossip(members = SortedSet(aUp, bExiting, cUp, dUp), overview = GossipOverview(reachability =
|
||||||
Reachability.empty.unreachable(aUp.uniqueAddress, dUp.uniqueAddress))).seen(aUp.uniqueAddress)
|
Reachability.empty.unreachable(aUp.uniqueAddress, dUp.uniqueAddress))).seen(aUp.uniqueAddress)
|
||||||
val state8 = state(g8, aUp.uniqueAddress, DefaultDataCenter)
|
val state8 = state(g8, aUp.uniqueAddress, DefaultDataCenter)
|
||||||
|
val g9 = Gossip(members = SortedSet(aUp, bExiting, cUp, dUp, eUp), overview = GossipOverview(reachability =
|
||||||
|
Reachability.empty.unreachable(aUp.uniqueAddress, eUp.uniqueAddress)))
|
||||||
|
val state9 = state(g9, aUp.uniqueAddress, DefaultDataCenter)
|
||||||
|
val g10 = Gossip(members = SortedSet(aUp, bExiting, cUp, dUp, eUp), overview = GossipOverview(reachability =
|
||||||
|
Reachability.empty))
|
||||||
|
val state10 = state(g10, aUp.uniqueAddress, DefaultDataCenter)
|
||||||
|
|
||||||
// created in beforeEach
|
// created in beforeEach
|
||||||
var memberSubscriber: TestProbe = _
|
var memberSubscriber: TestProbe = _
|
||||||
|
|
@ -160,7 +169,6 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish
|
||||||
subscriber.expectMsgType[CurrentClusterState]
|
subscriber.expectMsgType[CurrentClusterState]
|
||||||
// but only to the new subscriber
|
// but only to the new subscriber
|
||||||
memberSubscriber.expectNoMsg(500 millis)
|
memberSubscriber.expectNoMsg(500 millis)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
"send events corresponding to current state when subscribe" in {
|
"send events corresponding to current state when subscribe" in {
|
||||||
|
|
@ -172,6 +180,17 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish
|
||||||
subscriber.expectNoMsg(500 millis)
|
subscriber.expectNoMsg(500 millis)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"send datacenter reachability events" in {
|
||||||
|
val subscriber = TestProbe()
|
||||||
|
publisher ! PublishChanges(state9)
|
||||||
|
publisher ! Subscribe(subscriber.ref, InitialStateAsEvents, Set(classOf[DataCenterReachabilityEvent]))
|
||||||
|
subscriber.expectMsg(UnreachableDataCenter(OtherDataCenter))
|
||||||
|
subscriber.expectNoMsg(500 millis)
|
||||||
|
publisher ! PublishChanges(state10)
|
||||||
|
subscriber.expectMsg(ReachableDataCenter(OtherDataCenter))
|
||||||
|
subscriber.expectNoMsg(500 millis)
|
||||||
|
}
|
||||||
|
|
||||||
"support unsubscribe" in {
|
"support unsubscribe" in {
|
||||||
val subscriber = TestProbe()
|
val subscriber = TestProbe()
|
||||||
publisher ! Subscribe(subscriber.ref, InitialStateAsSnapshot, Set(classOf[MemberEvent]))
|
publisher ! Subscribe(subscriber.ref, InitialStateAsSnapshot, Set(classOf[MemberEvent]))
|
||||||
|
|
|
||||||
|
|
@ -89,6 +89,65 @@ class ClusterDomainEventSpec extends WordSpec with Matchers {
|
||||||
diffSeen(state(g1), state(g2)) should ===(Seq.empty)
|
diffSeen(state(g1), state(g2)) should ===(Seq.empty)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"be produced for reachability observations between data centers" in {
|
||||||
|
val dc2AMemberUp = TestMember(Address("akka.tcp", "sys", "dc2A", 2552), Up, Set.empty, "dc2")
|
||||||
|
val dc2AMemberDown = TestMember(Address("akka.tcp", "sys", "dc2A", 2552), Down, Set.empty, "dc2")
|
||||||
|
val dc2BMemberUp = TestMember(Address("akka.tcp", "sys", "dc2B", 2552), Up, Set.empty, "dc2")
|
||||||
|
|
||||||
|
val dc3AMemberUp = TestMember(Address("akka.tcp", "sys", "dc3A", 2552), Up, Set.empty, "dc3")
|
||||||
|
val dc3BMemberUp = TestMember(Address("akka.tcp", "sys", "dc3B", 2552), Up, Set.empty, "dc3")
|
||||||
|
|
||||||
|
val reachability1 = Reachability.empty
|
||||||
|
val g1 = Gossip(members = SortedSet(aUp, bUp, dc2AMemberUp, dc2BMemberUp, dc3AMemberUp, dc3BMemberUp), overview = GossipOverview(reachability = reachability1))
|
||||||
|
|
||||||
|
val reachability2 = reachability1
|
||||||
|
.unreachable(aUp.uniqueAddress, dc2AMemberDown.uniqueAddress)
|
||||||
|
.unreachable(dc2BMemberUp.uniqueAddress, dc2AMemberDown.uniqueAddress)
|
||||||
|
val g2 = Gossip(members = SortedSet(aUp, bUp, dc2AMemberDown, dc2BMemberUp, dc3AMemberUp, dc3BMemberUp), overview = GossipOverview(reachability = reachability2))
|
||||||
|
|
||||||
|
Set(aUp, bUp, dc2AMemberUp, dc2BMemberUp, dc3AMemberUp, dc3BMemberUp).foreach { member ⇒
|
||||||
|
val otherDc =
|
||||||
|
if (member.dataCenter == ClusterSettings.DefaultDataCenter) Seq("dc2")
|
||||||
|
else Seq()
|
||||||
|
|
||||||
|
diffUnreachableDataCenter(
|
||||||
|
MembershipState(g1, member.uniqueAddress, member.dataCenter, crossDcConnections = 5),
|
||||||
|
MembershipState(g2, member.uniqueAddress, member.dataCenter, crossDcConnections = 5)) should ===(otherDc.map(UnreachableDataCenter))
|
||||||
|
|
||||||
|
diffReachableDataCenter(
|
||||||
|
MembershipState(g2, member.uniqueAddress, member.dataCenter, crossDcConnections = 5),
|
||||||
|
MembershipState(g1, member.uniqueAddress, member.dataCenter, crossDcConnections = 5)) should ===(otherDc.map(ReachableDataCenter))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
"not be produced for same reachability observations between data centers" in {
|
||||||
|
val dc2AMemberUp = TestMember(Address("akka.tcp", "sys", "dc2A", 2552), Up, Set.empty, "dc2")
|
||||||
|
val dc2AMemberDown = TestMember(Address("akka.tcp", "sys", "dc2A", 2552), Down, Set.empty, "dc2")
|
||||||
|
|
||||||
|
val reachability1 = Reachability.empty
|
||||||
|
val g1 = Gossip(members = SortedSet(aUp, dc2AMemberUp), overview = GossipOverview(reachability = reachability1))
|
||||||
|
|
||||||
|
val reachability2 = reachability1
|
||||||
|
.unreachable(aUp.uniqueAddress, dc2AMemberDown.uniqueAddress)
|
||||||
|
val g2 = Gossip(members = SortedSet(aUp, dc2AMemberDown), overview = GossipOverview(reachability = reachability2))
|
||||||
|
|
||||||
|
diffUnreachableDataCenter(
|
||||||
|
MembershipState(g1, aUp.uniqueAddress, aUp.dataCenter, crossDcConnections = 5),
|
||||||
|
MembershipState(g1, aUp.uniqueAddress, aUp.dataCenter, crossDcConnections = 5)) should ===(Seq())
|
||||||
|
|
||||||
|
diffUnreachableDataCenter(
|
||||||
|
MembershipState(g2, aUp.uniqueAddress, aUp.dataCenter, crossDcConnections = 5),
|
||||||
|
MembershipState(g2, aUp.uniqueAddress, aUp.dataCenter, crossDcConnections = 5)) should ===(Seq())
|
||||||
|
|
||||||
|
diffReachableDataCenter(
|
||||||
|
MembershipState(g1, aUp.uniqueAddress, aUp.dataCenter, crossDcConnections = 5),
|
||||||
|
MembershipState(g1, aUp.uniqueAddress, aUp.dataCenter, crossDcConnections = 5)) should ===(Seq())
|
||||||
|
|
||||||
|
diffReachableDataCenter(
|
||||||
|
MembershipState(g2, aUp.uniqueAddress, aUp.dataCenter, crossDcConnections = 5),
|
||||||
|
MembershipState(g2, aUp.uniqueAddress, aUp.dataCenter, crossDcConnections = 5)) should ===(Seq())
|
||||||
|
}
|
||||||
|
|
||||||
"be produced for members becoming reachable after unreachable" in {
|
"be produced for members becoming reachable after unreachable" in {
|
||||||
val reachability1 = Reachability.empty.
|
val reachability1 = Reachability.empty.
|
||||||
unreachable(aUp.uniqueAddress, cUp.uniqueAddress).reachable(aUp.uniqueAddress, cUp.uniqueAddress).
|
unreachable(aUp.uniqueAddress, cUp.uniqueAddress).reachable(aUp.uniqueAddress, cUp.uniqueAddress).
|
||||||
|
|
|
||||||
|
|
@ -1 +0,0 @@
|
||||||
exit
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue