diff --git a/akka-cluster/src/main/protobuf/ClusterMessages.proto b/akka-cluster/src/main/protobuf/ClusterMessages.proto index 068068b0d3..2e7b6e7151 100644 --- a/akka-cluster/src/main/protobuf/ClusterMessages.proto +++ b/akka-cluster/src/main/protobuf/ClusterMessages.proto @@ -99,7 +99,15 @@ message GossipEnvelope { required UniqueAddress from = 1; required UniqueAddress to = 2; required Gossip gossip = 3; - required bool conversation = 4; +} + +/** + * Gossip Status + */ +message GossipStatus { + required UniqueAddress from = 1; + repeated string allHashes = 2; + required VectorClock version = 3; } /** diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 9c74c50e36..8f95269b9d 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -299,7 +299,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto case ClusterUserAction.JoinTo(address) ⇒ join(address) case JoinSeedNodes(seedNodes) ⇒ joinSeedNodes(seedNodes) case msg: SubscriptionMessage ⇒ publisher forward msg - case _: Tick ⇒ // ignore periodic tasks until initialized } def tryingToJoin(joinWith: Address, deadline: Option[Deadline]): Actor.Receive = { @@ -322,6 +321,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto def initialized: Actor.Receive = { case msg: GossipEnvelope ⇒ receiveGossip(msg) + case msg: GossipStatus ⇒ receiveGossipStatus(msg) case GossipTick ⇒ gossip() case ReapUnreachableTick ⇒ reapUnreachableMembers() case LeaderActionsTick ⇒ leaderActions() @@ -341,11 +341,17 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto def removed: Actor.Receive = { case msg: SubscriptionMessage ⇒ publisher forward msg - case _: Tick ⇒ // ignore periodic tasks } def receive = uninitialized + override def unhandled(message: Any): Unit = message match { + case _: Tick ⇒ + case _: GossipEnvelope ⇒ + case _: GossipStatus ⇒ + case other ⇒ super.unhandled(other) + } + def initJoin(): Unit = sender ! InitJoinAck(selfAddress) def joinSeedNodes(seedNodes: immutable.IndexedSeq[Address]): Unit = { @@ -448,7 +454,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto log.info("Cluster Node [{}] - Node [{}] is JOINING, roles [{}]", selfAddress, node.address, roles.mkString(", ")) if (node != selfUniqueAddress) { - clusterCore(node.address) ! Welcome(selfUniqueAddress, latestGossip) + sender ! Welcome(selfUniqueAddress, latestGossip) } publish(latestGossip) @@ -468,7 +474,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto latestGossip = gossip seen selfUniqueAddress publish(latestGossip) if (from != selfUniqueAddress) - oneWayGossipTo(from) + gossipTo(from, sender) context.become(initialized) } } @@ -564,6 +570,21 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto publish(latestGossip) } + def receiveGossipStatus(status: GossipStatus): Unit = { + val from = status.from + if (latestGossip.overview.unreachable.exists(_.uniqueAddress == from)) + log.info("Ignoring received gossip status from unreachable [{}] ", from) + else if (latestGossip.members.forall(_.uniqueAddress != from)) + log.info("Ignoring received gossip status from unknown [{}]", from) + else { + (status.version tryCompareTo latestGossip.version) match { + case Some(0) ⇒ // same version + case Some(x) if x > 0 ⇒ gossipStatusTo(from, sender) // remote is newer + case _ ⇒ gossipTo(from, sender) // conflicting or local is newer + } + } + } + /** * Receive new gossip. */ @@ -585,7 +606,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto else { val comparison = remoteGossip.version tryCompareTo localGossip.version - val conflict = comparison.isEmpty val (winningGossip, talkback, newStats) = comparison match { case None ⇒ @@ -612,7 +632,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from) - if (conflict) { + if (comparison.isEmpty) { log.debug( """Couldn't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merged them into [{}]""", remoteGossip, localGossip, winningGossip) @@ -621,10 +641,10 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto stats = stats.incrementReceivedGossipCount publish(latestGossip) - if (envelope.conversation && talkback) { + if (talkback) { // send back gossip to sender when sender had different view, i.e. merge, or sender had // older or sender had newer - gossipTo(from) + gossipTo(from, sender) } } } @@ -640,7 +660,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto if (!isSingletonCluster && isAvailable) { val localGossip = latestGossip - val preferredGossipTargets = + val preferredGossipTargets: Vector[UniqueAddress] = if (ThreadLocalRandom.current.nextDouble() < GossipDifferentViewProbability) { // If it's time to try to gossip to some nodes with a different view // gossip to a random alive member with preference to a member with older or newer gossip version val localMemberAddressesSet = localGossip.members map { _.uniqueAddress } @@ -650,13 +670,23 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto if version != localGossip.version } yield node - nodesWithDifferentView.toIndexedSeq + nodesWithDifferentView.toVector } else Vector.empty[UniqueAddress] - gossipToRandomNodeOf( - if (preferredGossipTargets.nonEmpty) preferredGossipTargets - else localGossip.members.toIndexedSeq.map(_.uniqueAddress) // Fall back to localGossip; important to not accidentally use `map` of the SortedSet, since the original order is not preserved) - ) + if (preferredGossipTargets.nonEmpty) { + val peer = selectRandomNode(preferredGossipTargets filterNot (_ == selfUniqueAddress)) + // send full gossip because it has different view + peer foreach gossipTo + } else { + // Fall back to localGossip; important to not accidentally use `map` of the SortedSet, since the original order is not preserved) + val peer = selectRandomNode(localGossip.members.toIndexedSeq.collect { + case m if m.uniqueAddress != selfUniqueAddress ⇒ m.uniqueAddress + }) + peer foreach { node ⇒ + if (localGossip.seenByNode(node)) gossipStatusTo(node) + else gossipTo(node) + } + } } } @@ -882,18 +912,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto def isAvailable: Boolean = !latestGossip.isUnreachable(selfUniqueAddress) - /** - * Gossips latest gossip to a random member in the set of members passed in as argument. - * - * @return the used [[UniqueAddress]] if any - */ - private def gossipToRandomNodeOf(nodes: immutable.IndexedSeq[UniqueAddress]): Option[UniqueAddress] = { - // filter out myself - val peer = selectRandomNode(nodes filterNot (_ == selfUniqueAddress)) - peer foreach gossipTo - peer - } - // needed for tests def sendGossipTo(address: Address): Unit = { latestGossip.members.foreach(m ⇒ @@ -905,14 +923,23 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto * Gossips latest gossip to a node. */ def gossipTo(node: UniqueAddress): Unit = - gossipTo(node, GossipEnvelope(selfUniqueAddress, node, latestGossip, conversation = true)) + if (validNodeForGossip(node)) + clusterCore(node.address) ! GossipEnvelope(selfUniqueAddress, node, latestGossip) - def oneWayGossipTo(node: UniqueAddress): Unit = - gossipTo(node, GossipEnvelope(selfUniqueAddress, node, latestGossip, conversation = false)) + def gossipTo(node: UniqueAddress, destination: ActorRef): Unit = + if (validNodeForGossip(node)) + destination ! GossipEnvelope(selfUniqueAddress, node, latestGossip) - def gossipTo(node: UniqueAddress, gossipMsg: GossipEnvelope): Unit = - if (node != selfUniqueAddress && gossipMsg.gossip.members.exists(_.uniqueAddress == node)) - clusterCore(node.address) ! gossipMsg + def gossipStatusTo(node: UniqueAddress, destination: ActorRef): Unit = + if (validNodeForGossip(node)) + destination ! GossipStatus(selfUniqueAddress, latestGossip.version) + + def gossipStatusTo(node: UniqueAddress): Unit = + if (validNodeForGossip(node)) + clusterCore(node.address) ! GossipStatus(selfUniqueAddress, latestGossip.version) + + def validNodeForGossip(node: UniqueAddress): Boolean = + (node != selfUniqueAddress && latestGossip.members.exists(_.uniqueAddress == node)) def publish(newGossip: Gossip): Unit = { publisher ! PublishChanges(newGossip) diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index 980f145919..c9ccf23922 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -239,4 +239,14 @@ private[cluster] case class GossipOverview( * different in that case. */ @SerialVersionUID(1L) -private[cluster] case class GossipEnvelope(from: UniqueAddress, to: UniqueAddress, gossip: Gossip, conversation: Boolean = true) extends ClusterMessage +private[cluster] case class GossipEnvelope(from: UniqueAddress, to: UniqueAddress, gossip: Gossip) extends ClusterMessage + +/** + * INTERNAL API + * When there are no known changes to the node ring a `GossipStatus` + * initiates a gossip chat between two members. If the receiver has a newer + * version it replies with a `GossipEnvelope`. If receiver has older version + * it replies with its `GossipStatus`. Same versions ends the chat immediately. + */ +@SerialVersionUID(1L) +private[cluster] case class GossipStatus(from: UniqueAddress, version: VectorClock) extends ClusterMessage diff --git a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala index 4b3e908f08..357e20db6d 100644 --- a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala +++ b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala @@ -37,6 +37,7 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ classOf[ClusterHeartbeatReceiver.Heartbeat] -> (bytes ⇒ ClusterHeartbeatReceiver.Heartbeat(addressFromBinary(bytes))), classOf[ClusterHeartbeatReceiver.EndHeartbeat] -> (bytes ⇒ ClusterHeartbeatReceiver.EndHeartbeat(addressFromBinary(bytes))), classOf[ClusterHeartbeatSender.HeartbeatRequest] -> (bytes ⇒ ClusterHeartbeatSender.HeartbeatRequest(addressFromBinary(bytes))), + classOf[GossipStatus] -> gossipStatusFromBinary, classOf[GossipEnvelope] -> gossipEnvelopeFromBinary, classOf[MetricsGossipEnvelope] -> metricsGossipEnvelopeFromBinary) @@ -49,6 +50,8 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ addressToProto(from) case m: GossipEnvelope ⇒ gossipEnvelopeToProto(m) + case m: GossipStatus ⇒ + gossipStatusToProto(m) case m: MetricsGossipEnvelope ⇒ metricsGossipEnvelopeToProto(m) case InternalClusterAction.Join(node, roles) ⇒ @@ -140,20 +143,14 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ def mapUniqueAddress(uniqueAddress: UniqueAddress) = mapWithErrorMessage(addressMapping, uniqueAddress, "address") def mapRole(role: String) = mapWithErrorMessage(roleMapping, role, "role") - def mapHash(hash: String) = mapWithErrorMessage(hashMapping, hash, "hash") def memberToProto(member: Member) = { msg.Member(mapUniqueAddress(member.uniqueAddress), msg.MemberStatus.valueOf(memberStatusToInt(member.status)), member.roles.map(mapRole).to[Vector]) } - def vectorClockToProto(version: VectorClock) = { - msg.VectorClock(version.timestamp.time, - version.versions.map { case (n, t) ⇒ msg.VectorClock.Version(mapHash(n.hash), t.time) }.to[Vector]) - } - def seenToProto(seen: (UniqueAddress, VectorClock)) = seen match { case (address: UniqueAddress, version: VectorClock) ⇒ - msg.GossipOverview.Seen(mapUniqueAddress(address), vectorClockToProto(version)) + msg.GossipOverview.Seen(mapUniqueAddress(address), vectorClockToProto(version, hashMapping)) } val unreachable = gossip.overview.unreachable.map(memberToProto).to[Vector] @@ -163,18 +160,34 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ val overview = msg.GossipOverview(seen, unreachable) msg.Gossip(allAddresses.map(uniqueAddressToProto), - allRoles, allHashes, members, overview, vectorClockToProto(gossip.version)) + allRoles, allHashes, members, overview, vectorClockToProto(gossip.version, hashMapping)) + } + + private def vectorClockToProto(version: VectorClock, hashMapping: Map[String, Int]): msg.VectorClock = { + def mapHash(hash: String) = mapWithErrorMessage(hashMapping, hash, "hash") + msg.VectorClock(version.timestamp.time, + version.versions.map { case (n, t) ⇒ msg.VectorClock.Version(mapHash(n.hash), t.time) }.to[Vector]) } private def gossipEnvelopeToProto(envelope: GossipEnvelope): msg.GossipEnvelope = { msg.GossipEnvelope(uniqueAddressToProto(envelope.from), uniqueAddressToProto(envelope.to), - gossipToProto(envelope.gossip), envelope.conversation) + gossipToProto(envelope.gossip)) + } + + private def gossipStatusToProto(status: GossipStatus): msg.GossipStatus = { + val allHashes: Vector[String] = status.version.versions.keys.map(_.hash)(collection.breakOut) + val hashMapping = allHashes.zipWithIndex.toMap + msg.GossipStatus(uniqueAddressToProto(status.from), allHashes, vectorClockToProto(status.version, hashMapping)) } private def gossipEnvelopeFromBinary(bytes: Array[Byte]): GossipEnvelope = { gossipEnvelopeFromProto(msg.GossipEnvelope.defaultInstance.mergeFrom(bytes)) } + private def gossipStatusFromBinary(bytes: Array[Byte]): GossipStatus = { + gossipStatusFromProto(msg.GossipStatus.defaultInstance.mergeFrom(bytes)) + } + private def gossipFromProto(gossip: msg.Gossip): Gossip = { val addressMapping = gossip.allAddresses.map(uniqueAddressFromProto) val roleMapping = gossip.allRoles @@ -185,23 +198,23 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ member.rolesIndexes.map(roleMapping).to[Set]) } - def vectorClockFromProto(version: msg.VectorClock) = { - VectorClock(VectorClock.Timestamp(version.timestamp), - version.versions.map { - case msg.VectorClock.Version(h, t) ⇒ - (VectorClock.Node.fromHash(hashMapping(h)), VectorClock.Timestamp(t)) - }.toMap) - } - def seenFromProto(seen: msg.GossipOverview.Seen) = - (addressMapping(seen.addressIndex), vectorClockFromProto(seen.version)) + (addressMapping(seen.addressIndex), vectorClockFromProto(seen.version, hashMapping)) val members = gossip.members.map(memberFromProto).to[immutable.SortedSet] val unreachable = gossip.overview.unreachable.map(memberFromProto).toSet val seen = gossip.overview.seen.map(seenFromProto).toMap val overview = GossipOverview(seen, unreachable) - Gossip(members, overview, vectorClockFromProto(gossip.version)) + Gossip(members, overview, vectorClockFromProto(gossip.version, hashMapping)) + } + + private def vectorClockFromProto(version: msg.VectorClock, hashMapping: immutable.Seq[String]) = { + VectorClock(VectorClock.Timestamp(version.timestamp), + version.versions.map { + case msg.VectorClock.Version(h, t) ⇒ + (VectorClock.Node.fromHash(hashMapping(h)), VectorClock.Timestamp(t)) + }.toMap) } private def gossipEnvelopeFromProto(envelope: msg.GossipEnvelope): GossipEnvelope = { @@ -209,6 +222,11 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ gossipFromProto(envelope.gossip)) } + private def gossipStatusFromProto(status: msg.GossipStatus): GossipStatus = { + val hashMapping = status.allHashes + GossipStatus(uniqueAddressFromProto(status.from), vectorClockFromProto(status.version, hashMapping)) + } + private def metricsGossipEnvelopeToProto(envelope: MetricsGossipEnvelope): msg.MetricsGossipEnvelope = { val mgossip = envelope.gossip val allAddresses = mgossip.nodes.foldLeft(Set.empty[Address])((s, n) ⇒ s + n.address).to[Vector] diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 102682ed5f..70cc70c90b 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -108,6 +108,7 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro classOf[ClusterHeartbeatReceiver.Heartbeat], classOf[ClusterHeartbeatReceiver.EndHeartbeat], classOf[GossipEnvelope], + classOf[GossipStatus], classOf[MetricsGossipEnvelope], classOf[ClusterEvent.ClusterMetricsChanged], classOf[InternalClusterAction.Tick], diff --git a/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala index de467cece4..2787a7f3b7 100644 --- a/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala @@ -54,9 +54,14 @@ class ClusterMessageSerializerSpec extends AkkaSpec { val node4 = VectorClock.Node("node4") val g1 = (Gossip(SortedSet(a1, b1, c1, d1)) :+ node1 :+ node2).seen(a1.uniqueAddress).seen(b1.uniqueAddress) val g2 = (g1 :+ node3 :+ node4).seen(a1.uniqueAddress).seen(c1.uniqueAddress) + val g3 = g2.copy(overview = g2.overview.copy(unreachable = Set(e1, f1))) checkSerialization(GossipEnvelope(a1.uniqueAddress, uniqueAddress2, g1)) checkSerialization(GossipEnvelope(a1.uniqueAddress, uniqueAddress2, g2)) - checkSerialization(GossipEnvelope(a1.uniqueAddress, uniqueAddress2, g2.copy(overview = g2.overview.copy(unreachable = Set(e1, f1))))) + checkSerialization(GossipEnvelope(a1.uniqueAddress, uniqueAddress2, g3)) + + checkSerialization(GossipStatus(a1.uniqueAddress, g1.version)) + checkSerialization(GossipStatus(a1.uniqueAddress, g2.version)) + checkSerialization(GossipStatus(a1.uniqueAddress, g3.version)) checkSerialization(InternalClusterAction.Welcome(uniqueAddress, g2))