diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 48645022ae..b880b917a3 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -111,6 +111,8 @@ private[cluster] object InternalClusterAction { case object GossipTick extends Tick + case object GossipSpeedupTick extends Tick + case object HeartbeatTick extends Tick case object ReapUnreachableTick extends Tick @@ -334,7 +336,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with def initialized: Actor.Receive = { case msg: GossipEnvelope ⇒ receiveGossip(msg) case msg: GossipStatus ⇒ receiveGossipStatus(msg) - case GossipTick ⇒ gossip() + case GossipTick ⇒ gossipTick() + case GossipSpeedupTick ⇒ gossipSpeedupTick() case ReapUnreachableTick ⇒ reapUnreachableMembers() case LeaderActionsTick ⇒ leaderActions() case PublishStatsTick ⇒ publishInternalStats() @@ -680,7 +683,19 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with } } - def mergeRate(count: Long): Double = (count * 1000.0) / GossipInterval.toMillis + def gossipTick(): Unit = { + gossip() + if (isGossipSpeedupNeeded) { + scheduler.scheduleOnce(GossipInterval / 3, self, GossipSpeedupTick) + scheduler.scheduleOnce(GossipInterval * 2 / 3, self, GossipSpeedupTick) + } + } + + def gossipSpeedupTick(): Unit = + if (isGossipSpeedupNeeded) gossip() + + def isGossipSpeedupNeeded: Boolean = + (latestGossip.overview.seen.size < latestGossip.members.size / 2) /** * Initiates a new round of gossip.