Reduce amount of gossip data transferred in idle cluster, see #3279
* When seen same the gossip chat is initated with GossipStatus message containing the vclock only * Remove conversation flag in GossipEnvelope * Ordinary tell instead of actorSelection when replying
This commit is contained in:
parent
671ebf8909
commit
6635ac4032
6 changed files with 123 additions and 54 deletions
|
|
@ -99,7 +99,15 @@ message GossipEnvelope {
|
|||
required UniqueAddress from = 1;
|
||||
required UniqueAddress to = 2;
|
||||
required Gossip gossip = 3;
|
||||
required bool conversation = 4;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gossip Status
|
||||
*/
|
||||
message GossipStatus {
|
||||
required UniqueAddress from = 1;
|
||||
repeated string allHashes = 2;
|
||||
required VectorClock version = 3;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -299,7 +299,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
|||
case ClusterUserAction.JoinTo(address) ⇒ join(address)
|
||||
case JoinSeedNodes(seedNodes) ⇒ joinSeedNodes(seedNodes)
|
||||
case msg: SubscriptionMessage ⇒ publisher forward msg
|
||||
case _: Tick ⇒ // ignore periodic tasks until initialized
|
||||
}
|
||||
|
||||
def tryingToJoin(joinWith: Address, deadline: Option[Deadline]): Actor.Receive = {
|
||||
|
|
@ -322,6 +321,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
|||
|
||||
def initialized: Actor.Receive = {
|
||||
case msg: GossipEnvelope ⇒ receiveGossip(msg)
|
||||
case msg: GossipStatus ⇒ receiveGossipStatus(msg)
|
||||
case GossipTick ⇒ gossip()
|
||||
case ReapUnreachableTick ⇒ reapUnreachableMembers()
|
||||
case LeaderActionsTick ⇒ leaderActions()
|
||||
|
|
@ -341,11 +341,17 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
|||
|
||||
def removed: Actor.Receive = {
|
||||
case msg: SubscriptionMessage ⇒ publisher forward msg
|
||||
case _: Tick ⇒ // ignore periodic tasks
|
||||
}
|
||||
|
||||
def receive = uninitialized
|
||||
|
||||
override def unhandled(message: Any): Unit = message match {
|
||||
case _: Tick ⇒
|
||||
case _: GossipEnvelope ⇒
|
||||
case _: GossipStatus ⇒
|
||||
case other ⇒ super.unhandled(other)
|
||||
}
|
||||
|
||||
def initJoin(): Unit = sender ! InitJoinAck(selfAddress)
|
||||
|
||||
def joinSeedNodes(seedNodes: immutable.IndexedSeq[Address]): Unit = {
|
||||
|
|
@ -448,7 +454,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
|||
|
||||
log.info("Cluster Node [{}] - Node [{}] is JOINING, roles [{}]", selfAddress, node.address, roles.mkString(", "))
|
||||
if (node != selfUniqueAddress) {
|
||||
clusterCore(node.address) ! Welcome(selfUniqueAddress, latestGossip)
|
||||
sender ! Welcome(selfUniqueAddress, latestGossip)
|
||||
}
|
||||
|
||||
publish(latestGossip)
|
||||
|
|
@ -468,7 +474,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
|||
latestGossip = gossip seen selfUniqueAddress
|
||||
publish(latestGossip)
|
||||
if (from != selfUniqueAddress)
|
||||
oneWayGossipTo(from)
|
||||
gossipTo(from, sender)
|
||||
context.become(initialized)
|
||||
}
|
||||
}
|
||||
|
|
@ -564,6 +570,21 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
|||
publish(latestGossip)
|
||||
}
|
||||
|
||||
def receiveGossipStatus(status: GossipStatus): Unit = {
|
||||
val from = status.from
|
||||
if (latestGossip.overview.unreachable.exists(_.uniqueAddress == from))
|
||||
log.info("Ignoring received gossip status from unreachable [{}] ", from)
|
||||
else if (latestGossip.members.forall(_.uniqueAddress != from))
|
||||
log.info("Ignoring received gossip status from unknown [{}]", from)
|
||||
else {
|
||||
(status.version tryCompareTo latestGossip.version) match {
|
||||
case Some(0) ⇒ // same version
|
||||
case Some(x) if x > 0 ⇒ gossipStatusTo(from, sender) // remote is newer
|
||||
case _ ⇒ gossipTo(from, sender) // conflicting or local is newer
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Receive new gossip.
|
||||
*/
|
||||
|
|
@ -585,7 +606,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
|||
else {
|
||||
|
||||
val comparison = remoteGossip.version tryCompareTo localGossip.version
|
||||
val conflict = comparison.isEmpty
|
||||
|
||||
val (winningGossip, talkback, newStats) = comparison match {
|
||||
case None ⇒
|
||||
|
|
@ -612,7 +632,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
|||
|
||||
log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from)
|
||||
|
||||
if (conflict) {
|
||||
if (comparison.isEmpty) {
|
||||
log.debug(
|
||||
"""Couldn't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merged them into [{}]""",
|
||||
remoteGossip, localGossip, winningGossip)
|
||||
|
|
@ -621,10 +641,10 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
|||
stats = stats.incrementReceivedGossipCount
|
||||
publish(latestGossip)
|
||||
|
||||
if (envelope.conversation && talkback) {
|
||||
if (talkback) {
|
||||
// send back gossip to sender when sender had different view, i.e. merge, or sender had
|
||||
// older or sender had newer
|
||||
gossipTo(from)
|
||||
gossipTo(from, sender)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -640,7 +660,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
|||
if (!isSingletonCluster && isAvailable) {
|
||||
val localGossip = latestGossip
|
||||
|
||||
val preferredGossipTargets =
|
||||
val preferredGossipTargets: Vector[UniqueAddress] =
|
||||
if (ThreadLocalRandom.current.nextDouble() < GossipDifferentViewProbability) { // If it's time to try to gossip to some nodes with a different view
|
||||
// gossip to a random alive member with preference to a member with older or newer gossip version
|
||||
val localMemberAddressesSet = localGossip.members map { _.uniqueAddress }
|
||||
|
|
@ -650,13 +670,23 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
|||
if version != localGossip.version
|
||||
} yield node
|
||||
|
||||
nodesWithDifferentView.toIndexedSeq
|
||||
nodesWithDifferentView.toVector
|
||||
} else Vector.empty[UniqueAddress]
|
||||
|
||||
gossipToRandomNodeOf(
|
||||
if (preferredGossipTargets.nonEmpty) preferredGossipTargets
|
||||
else localGossip.members.toIndexedSeq.map(_.uniqueAddress) // Fall back to localGossip; important to not accidentally use `map` of the SortedSet, since the original order is not preserved)
|
||||
)
|
||||
if (preferredGossipTargets.nonEmpty) {
|
||||
val peer = selectRandomNode(preferredGossipTargets filterNot (_ == selfUniqueAddress))
|
||||
// send full gossip because it has different view
|
||||
peer foreach gossipTo
|
||||
} else {
|
||||
// Fall back to localGossip; important to not accidentally use `map` of the SortedSet, since the original order is not preserved)
|
||||
val peer = selectRandomNode(localGossip.members.toIndexedSeq.collect {
|
||||
case m if m.uniqueAddress != selfUniqueAddress ⇒ m.uniqueAddress
|
||||
})
|
||||
peer foreach { node ⇒
|
||||
if (localGossip.seenByNode(node)) gossipStatusTo(node)
|
||||
else gossipTo(node)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -882,18 +912,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
|||
|
||||
def isAvailable: Boolean = !latestGossip.isUnreachable(selfUniqueAddress)
|
||||
|
||||
/**
|
||||
* Gossips latest gossip to a random member in the set of members passed in as argument.
|
||||
*
|
||||
* @return the used [[UniqueAddress]] if any
|
||||
*/
|
||||
private def gossipToRandomNodeOf(nodes: immutable.IndexedSeq[UniqueAddress]): Option[UniqueAddress] = {
|
||||
// filter out myself
|
||||
val peer = selectRandomNode(nodes filterNot (_ == selfUniqueAddress))
|
||||
peer foreach gossipTo
|
||||
peer
|
||||
}
|
||||
|
||||
// needed for tests
|
||||
def sendGossipTo(address: Address): Unit = {
|
||||
latestGossip.members.foreach(m ⇒
|
||||
|
|
@ -905,14 +923,23 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
|||
* Gossips latest gossip to a node.
|
||||
*/
|
||||
def gossipTo(node: UniqueAddress): Unit =
|
||||
gossipTo(node, GossipEnvelope(selfUniqueAddress, node, latestGossip, conversation = true))
|
||||
if (validNodeForGossip(node))
|
||||
clusterCore(node.address) ! GossipEnvelope(selfUniqueAddress, node, latestGossip)
|
||||
|
||||
def oneWayGossipTo(node: UniqueAddress): Unit =
|
||||
gossipTo(node, GossipEnvelope(selfUniqueAddress, node, latestGossip, conversation = false))
|
||||
def gossipTo(node: UniqueAddress, destination: ActorRef): Unit =
|
||||
if (validNodeForGossip(node))
|
||||
destination ! GossipEnvelope(selfUniqueAddress, node, latestGossip)
|
||||
|
||||
def gossipTo(node: UniqueAddress, gossipMsg: GossipEnvelope): Unit =
|
||||
if (node != selfUniqueAddress && gossipMsg.gossip.members.exists(_.uniqueAddress == node))
|
||||
clusterCore(node.address) ! gossipMsg
|
||||
def gossipStatusTo(node: UniqueAddress, destination: ActorRef): Unit =
|
||||
if (validNodeForGossip(node))
|
||||
destination ! GossipStatus(selfUniqueAddress, latestGossip.version)
|
||||
|
||||
def gossipStatusTo(node: UniqueAddress): Unit =
|
||||
if (validNodeForGossip(node))
|
||||
clusterCore(node.address) ! GossipStatus(selfUniqueAddress, latestGossip.version)
|
||||
|
||||
def validNodeForGossip(node: UniqueAddress): Boolean =
|
||||
(node != selfUniqueAddress && latestGossip.members.exists(_.uniqueAddress == node))
|
||||
|
||||
def publish(newGossip: Gossip): Unit = {
|
||||
publisher ! PublishChanges(newGossip)
|
||||
|
|
|
|||
|
|
@ -239,4 +239,14 @@ private[cluster] case class GossipOverview(
|
|||
* different in that case.
|
||||
*/
|
||||
@SerialVersionUID(1L)
|
||||
private[cluster] case class GossipEnvelope(from: UniqueAddress, to: UniqueAddress, gossip: Gossip, conversation: Boolean = true) extends ClusterMessage
|
||||
private[cluster] case class GossipEnvelope(from: UniqueAddress, to: UniqueAddress, gossip: Gossip) extends ClusterMessage
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
* When there are no known changes to the node ring a `GossipStatus`
|
||||
* initiates a gossip chat between two members. If the receiver has a newer
|
||||
* version it replies with a `GossipEnvelope`. If receiver has older version
|
||||
* it replies with its `GossipStatus`. Same versions ends the chat immediately.
|
||||
*/
|
||||
@SerialVersionUID(1L)
|
||||
private[cluster] case class GossipStatus(from: UniqueAddress, version: VectorClock) extends ClusterMessage
|
||||
|
|
|
|||
|
|
@ -37,6 +37,7 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
|
|||
classOf[ClusterHeartbeatReceiver.Heartbeat] -> (bytes ⇒ ClusterHeartbeatReceiver.Heartbeat(addressFromBinary(bytes))),
|
||||
classOf[ClusterHeartbeatReceiver.EndHeartbeat] -> (bytes ⇒ ClusterHeartbeatReceiver.EndHeartbeat(addressFromBinary(bytes))),
|
||||
classOf[ClusterHeartbeatSender.HeartbeatRequest] -> (bytes ⇒ ClusterHeartbeatSender.HeartbeatRequest(addressFromBinary(bytes))),
|
||||
classOf[GossipStatus] -> gossipStatusFromBinary,
|
||||
classOf[GossipEnvelope] -> gossipEnvelopeFromBinary,
|
||||
classOf[MetricsGossipEnvelope] -> metricsGossipEnvelopeFromBinary)
|
||||
|
||||
|
|
@ -49,6 +50,8 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
|
|||
addressToProto(from)
|
||||
case m: GossipEnvelope ⇒
|
||||
gossipEnvelopeToProto(m)
|
||||
case m: GossipStatus ⇒
|
||||
gossipStatusToProto(m)
|
||||
case m: MetricsGossipEnvelope ⇒
|
||||
metricsGossipEnvelopeToProto(m)
|
||||
case InternalClusterAction.Join(node, roles) ⇒
|
||||
|
|
@ -140,20 +143,14 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
|
|||
|
||||
def mapUniqueAddress(uniqueAddress: UniqueAddress) = mapWithErrorMessage(addressMapping, uniqueAddress, "address")
|
||||
def mapRole(role: String) = mapWithErrorMessage(roleMapping, role, "role")
|
||||
def mapHash(hash: String) = mapWithErrorMessage(hashMapping, hash, "hash")
|
||||
|
||||
def memberToProto(member: Member) = {
|
||||
msg.Member(mapUniqueAddress(member.uniqueAddress), msg.MemberStatus.valueOf(memberStatusToInt(member.status)), member.roles.map(mapRole).to[Vector])
|
||||
}
|
||||
|
||||
def vectorClockToProto(version: VectorClock) = {
|
||||
msg.VectorClock(version.timestamp.time,
|
||||
version.versions.map { case (n, t) ⇒ msg.VectorClock.Version(mapHash(n.hash), t.time) }.to[Vector])
|
||||
}
|
||||
|
||||
def seenToProto(seen: (UniqueAddress, VectorClock)) = seen match {
|
||||
case (address: UniqueAddress, version: VectorClock) ⇒
|
||||
msg.GossipOverview.Seen(mapUniqueAddress(address), vectorClockToProto(version))
|
||||
msg.GossipOverview.Seen(mapUniqueAddress(address), vectorClockToProto(version, hashMapping))
|
||||
}
|
||||
|
||||
val unreachable = gossip.overview.unreachable.map(memberToProto).to[Vector]
|
||||
|
|
@ -163,18 +160,34 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
|
|||
val overview = msg.GossipOverview(seen, unreachable)
|
||||
|
||||
msg.Gossip(allAddresses.map(uniqueAddressToProto),
|
||||
allRoles, allHashes, members, overview, vectorClockToProto(gossip.version))
|
||||
allRoles, allHashes, members, overview, vectorClockToProto(gossip.version, hashMapping))
|
||||
}
|
||||
|
||||
private def vectorClockToProto(version: VectorClock, hashMapping: Map[String, Int]): msg.VectorClock = {
|
||||
def mapHash(hash: String) = mapWithErrorMessage(hashMapping, hash, "hash")
|
||||
msg.VectorClock(version.timestamp.time,
|
||||
version.versions.map { case (n, t) ⇒ msg.VectorClock.Version(mapHash(n.hash), t.time) }.to[Vector])
|
||||
}
|
||||
|
||||
private def gossipEnvelopeToProto(envelope: GossipEnvelope): msg.GossipEnvelope = {
|
||||
msg.GossipEnvelope(uniqueAddressToProto(envelope.from), uniqueAddressToProto(envelope.to),
|
||||
gossipToProto(envelope.gossip), envelope.conversation)
|
||||
gossipToProto(envelope.gossip))
|
||||
}
|
||||
|
||||
private def gossipStatusToProto(status: GossipStatus): msg.GossipStatus = {
|
||||
val allHashes: Vector[String] = status.version.versions.keys.map(_.hash)(collection.breakOut)
|
||||
val hashMapping = allHashes.zipWithIndex.toMap
|
||||
msg.GossipStatus(uniqueAddressToProto(status.from), allHashes, vectorClockToProto(status.version, hashMapping))
|
||||
}
|
||||
|
||||
private def gossipEnvelopeFromBinary(bytes: Array[Byte]): GossipEnvelope = {
|
||||
gossipEnvelopeFromProto(msg.GossipEnvelope.defaultInstance.mergeFrom(bytes))
|
||||
}
|
||||
|
||||
private def gossipStatusFromBinary(bytes: Array[Byte]): GossipStatus = {
|
||||
gossipStatusFromProto(msg.GossipStatus.defaultInstance.mergeFrom(bytes))
|
||||
}
|
||||
|
||||
private def gossipFromProto(gossip: msg.Gossip): Gossip = {
|
||||
val addressMapping = gossip.allAddresses.map(uniqueAddressFromProto)
|
||||
val roleMapping = gossip.allRoles
|
||||
|
|
@ -185,7 +198,18 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
|
|||
member.rolesIndexes.map(roleMapping).to[Set])
|
||||
}
|
||||
|
||||
def vectorClockFromProto(version: msg.VectorClock) = {
|
||||
def seenFromProto(seen: msg.GossipOverview.Seen) =
|
||||
(addressMapping(seen.addressIndex), vectorClockFromProto(seen.version, hashMapping))
|
||||
|
||||
val members = gossip.members.map(memberFromProto).to[immutable.SortedSet]
|
||||
val unreachable = gossip.overview.unreachable.map(memberFromProto).toSet
|
||||
val seen = gossip.overview.seen.map(seenFromProto).toMap
|
||||
val overview = GossipOverview(seen, unreachable)
|
||||
|
||||
Gossip(members, overview, vectorClockFromProto(gossip.version, hashMapping))
|
||||
}
|
||||
|
||||
private def vectorClockFromProto(version: msg.VectorClock, hashMapping: immutable.Seq[String]) = {
|
||||
VectorClock(VectorClock.Timestamp(version.timestamp),
|
||||
version.versions.map {
|
||||
case msg.VectorClock.Version(h, t) ⇒
|
||||
|
|
@ -193,22 +217,16 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
|
|||
}.toMap)
|
||||
}
|
||||
|
||||
def seenFromProto(seen: msg.GossipOverview.Seen) =
|
||||
(addressMapping(seen.addressIndex), vectorClockFromProto(seen.version))
|
||||
|
||||
val members = gossip.members.map(memberFromProto).to[immutable.SortedSet]
|
||||
val unreachable = gossip.overview.unreachable.map(memberFromProto).toSet
|
||||
val seen = gossip.overview.seen.map(seenFromProto).toMap
|
||||
val overview = GossipOverview(seen, unreachable)
|
||||
|
||||
Gossip(members, overview, vectorClockFromProto(gossip.version))
|
||||
}
|
||||
|
||||
private def gossipEnvelopeFromProto(envelope: msg.GossipEnvelope): GossipEnvelope = {
|
||||
GossipEnvelope(uniqueAddressFromProto(envelope.from), uniqueAddressFromProto(envelope.to),
|
||||
gossipFromProto(envelope.gossip))
|
||||
}
|
||||
|
||||
private def gossipStatusFromProto(status: msg.GossipStatus): GossipStatus = {
|
||||
val hashMapping = status.allHashes
|
||||
GossipStatus(uniqueAddressFromProto(status.from), vectorClockFromProto(status.version, hashMapping))
|
||||
}
|
||||
|
||||
private def metricsGossipEnvelopeToProto(envelope: MetricsGossipEnvelope): msg.MetricsGossipEnvelope = {
|
||||
val mgossip = envelope.gossip
|
||||
val allAddresses = mgossip.nodes.foldLeft(Set.empty[Address])((s, n) ⇒ s + n.address).to[Vector]
|
||||
|
|
|
|||
|
|
@ -108,6 +108,7 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro
|
|||
classOf[ClusterHeartbeatReceiver.Heartbeat],
|
||||
classOf[ClusterHeartbeatReceiver.EndHeartbeat],
|
||||
classOf[GossipEnvelope],
|
||||
classOf[GossipStatus],
|
||||
classOf[MetricsGossipEnvelope],
|
||||
classOf[ClusterEvent.ClusterMetricsChanged],
|
||||
classOf[InternalClusterAction.Tick],
|
||||
|
|
|
|||
|
|
@ -54,9 +54,14 @@ class ClusterMessageSerializerSpec extends AkkaSpec {
|
|||
val node4 = VectorClock.Node("node4")
|
||||
val g1 = (Gossip(SortedSet(a1, b1, c1, d1)) :+ node1 :+ node2).seen(a1.uniqueAddress).seen(b1.uniqueAddress)
|
||||
val g2 = (g1 :+ node3 :+ node4).seen(a1.uniqueAddress).seen(c1.uniqueAddress)
|
||||
val g3 = g2.copy(overview = g2.overview.copy(unreachable = Set(e1, f1)))
|
||||
checkSerialization(GossipEnvelope(a1.uniqueAddress, uniqueAddress2, g1))
|
||||
checkSerialization(GossipEnvelope(a1.uniqueAddress, uniqueAddress2, g2))
|
||||
checkSerialization(GossipEnvelope(a1.uniqueAddress, uniqueAddress2, g2.copy(overview = g2.overview.copy(unreachable = Set(e1, f1)))))
|
||||
checkSerialization(GossipEnvelope(a1.uniqueAddress, uniqueAddress2, g3))
|
||||
|
||||
checkSerialization(GossipStatus(a1.uniqueAddress, g1.version))
|
||||
checkSerialization(GossipStatus(a1.uniqueAddress, g2.version))
|
||||
checkSerialization(GossipStatus(a1.uniqueAddress, g3.version))
|
||||
|
||||
checkSerialization(InternalClusterAction.Welcome(uniqueAddress, g2))
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue