Merge pull request #1764 from akka/wip-3660-simple-speedup-of-gossip-in-early-phase-ban

=clu #3660 Simple speedup of gossip in early phase
This commit is contained in:
Björn Antonsson 2013-10-15 01:49:00 -07:00
commit e8bc604280

View file

@ -111,6 +111,8 @@ private[cluster] object InternalClusterAction {
case object GossipTick extends Tick case object GossipTick extends Tick
case object GossipSpeedupTick extends Tick
case object HeartbeatTick extends Tick case object HeartbeatTick extends Tick
case object ReapUnreachableTick extends Tick case object ReapUnreachableTick extends Tick
@ -334,7 +336,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
def initialized: Actor.Receive = { def initialized: Actor.Receive = {
case msg: GossipEnvelope receiveGossip(msg) case msg: GossipEnvelope receiveGossip(msg)
case msg: GossipStatus receiveGossipStatus(msg) case msg: GossipStatus receiveGossipStatus(msg)
case GossipTick gossip() case GossipTick gossipTick()
case GossipSpeedupTick gossipSpeedupTick()
case ReapUnreachableTick reapUnreachableMembers() case ReapUnreachableTick reapUnreachableMembers()
case LeaderActionsTick leaderActions() case LeaderActionsTick leaderActions()
case PublishStatsTick publishInternalStats() case PublishStatsTick publishInternalStats()
@ -680,7 +683,19 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
} }
} }
def mergeRate(count: Long): Double = (count * 1000.0) / GossipInterval.toMillis def gossipTick(): Unit = {
gossip()
if (isGossipSpeedupNeeded) {
scheduler.scheduleOnce(GossipInterval / 3, self, GossipSpeedupTick)
scheduler.scheduleOnce(GossipInterval * 2 / 3, self, GossipSpeedupTick)
}
}
def gossipSpeedupTick(): Unit =
if (isGossipSpeedupNeeded) gossip()
def isGossipSpeedupNeeded: Boolean =
(latestGossip.overview.seen.size < latestGossip.members.size / 2)
/** /**
* Initiates a new round of gossip. * Initiates a new round of gossip.