Move state of ClusterHeartbeatSender to separate immutable class, see #2284

This commit is contained in:
Patrik Nordwall 2012-10-10 15:23:18 +02:00
parent 668d5a5013
commit 66c81e915e
3 changed files with 221 additions and 75 deletions

View file

@ -18,7 +18,7 @@ import java.util.Arrays
* hash, i.e. make sure it is different for different nodes.
*
*/
class ConsistentHash[T: ClassTag] private (nodes: SortedMap[Int, T], virtualNodesFactor: Int) {
class ConsistentHash[T: ClassTag] private (nodes: SortedMap[Int, T], val virtualNodesFactor: Int) {
import ConsistentHash._

View file

@ -89,13 +89,9 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
val selfHeartbeat = Heartbeat(selfAddress)
val selfEndHeartbeat = EndHeartbeat(selfAddress)
val selfAddressStr = selfAddress.toString
var all = Set.empty[Address]
var current = Set.empty[Address]
var ending = Map.empty[Address, Int]
var joinInProgress = Map.empty[Address, Deadline]
var consistentHash = ConsistentHash(Seq.empty[Address], HeartbeatConsistentHashingVirtualNodesFactor)
var state = ClusterHeartbeatSenderState.empty(ConsistentHash(Seq.empty[Address], HeartbeatConsistentHashingVirtualNodesFactor),
selfAddress.toString, MonitoredByNrOfMembers)
// start periodic heartbeat to other nodes in cluster
val heartbeatTask = scheduler.schedule(PeriodicTasksInitialDelay.max(HeartbeatInterval).asInstanceOf[FiniteDuration],
@ -116,46 +112,30 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
def receive = {
case HeartbeatTick heartbeat()
case state: CurrentClusterState init(state)
case s: CurrentClusterState reset(s)
case MemberUnreachable(m) removeMember(m)
case MemberRemoved(m) removeMember(m)
case e: MemberEvent addMember(e.member)
case JoinInProgress(a, d) addJoinInProgress(a, d)
}
def init(state: CurrentClusterState): Unit = {
all = state.members.collect { case m if m.address != selfAddress m.address }
joinInProgress --= all
consistentHash = ConsistentHash(all, HeartbeatConsistentHashingVirtualNodesFactor)
update()
}
def reset(snapshot: CurrentClusterState): Unit =
state = state.reset(snapshot.members.collect { case m if m.address != selfAddress m.address })
def addMember(m: Member): Unit = if (m.address != selfAddress) {
all += m.address
consistentHash = consistentHash :+ m.address
removeJoinInProgress(m.address)
update()
}
def addMember(m: Member): Unit = if (m.address != selfAddress)
state = state addMember m.address
def removeMember(m: Member): Unit = if (m.address != selfAddress) {
all -= m.address
consistentHash = consistentHash :- m.address
removeJoinInProgress(m.address)
update()
}
def removeMember(m: Member): Unit = if (m.address != selfAddress)
state = state removeMember m.address
def removeJoinInProgress(address: Address): Unit = if (joinInProgress contains address) {
joinInProgress -= address
ending += (address -> 0)
}
def removeJoinInProgress(address: Address): Unit = if (address != selfAddress)
state = state.removeJoinInProgress(address)
def addJoinInProgress(address: Address, deadline: Deadline): Unit = {
if (address != selfAddress && !all.contains(address))
joinInProgress += (address -> deadline)
}
def addJoinInProgress(address: Address, deadline: Deadline): Unit = if (address != selfAddress)
state = state.addJoinInProgress(address, deadline)
def heartbeat(): Unit = {
removeOverdueJoinInProgress()
state = state.removeOverdueJoinInProgress()
def connection(to: Address): ActorRef = {
// URL encoded target address as child actor name
@ -168,34 +148,42 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
}
val deadline = Deadline.now + HeartbeatInterval
(current ++ joinInProgress.keys) foreach { to connection(to) ! SendHeartbeat(selfHeartbeat, to, deadline) }
state.active foreach { to connection(to) ! SendHeartbeat(selfHeartbeat, to, deadline) }
// When sending heartbeats to a node is stopped a few `EndHeartbeat` messages is
// sent to notify it that no more heartbeats will be sent.
for ((to, count) ending) {
for ((to, count) state.ending) {
val c = connection(to)
c ! SendEndHeartbeat(selfEndHeartbeat, to)
if (count == NumberOfEndHeartbeats) {
ending -= to
state = state.removeEnding(to)
c ! PoisonPill
} else {
ending += (to -> (count + 1))
}
} else
state = state.increaseEndingCount(to)
}
}
}
/**
* INTERNAL API
*/
private[cluster] object ClusterHeartbeatSenderState {
/**
* Update current peers to send heartbeats to, and
* Initial, empty state
*/
def empty(consistentHash: ConsistentHash[Address], selfAddressStr: String,
monitoredByNrOfMembers: Int): ClusterHeartbeatSenderState =
ClusterHeartbeatSenderState(consistentHash, selfAddressStr, monitoredByNrOfMembers)
/**
* Create a new state based on previous state, and
* keep track of which nodes to stop sending heartbeats to.
*/
def update(): Unit = {
val previous = current
current = selectPeers
// start ending process for nodes not selected any more
ending ++= (previous -- current).map(_ -> 0)
// abort ending process for nodes that have been selected again
ending --= current
}
private def apply(
old: ClusterHeartbeatSenderState,
consistentHash: ConsistentHash[Address],
all: Set[Address]): ClusterHeartbeatSenderState = {
/**
* Select a few peers that heartbeats will be sent to, i.e. that will
@ -205,30 +193,100 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
*/
def selectPeers: Set[Address] = {
val allSize = all.size
val nrOfPeers = math.min(allSize, MonitoredByNrOfMembers)
val nrOfPeers = math.min(allSize, old.monitoredByNrOfMembers)
// try more if consistentHash results in same node as already selected
val attemptLimit = nrOfPeers * 2
@tailrec def select(acc: Set[Address], n: Int): Set[Address] = {
if (acc.size == nrOfPeers || n == attemptLimit) acc
else select(acc + consistentHash.nodeFor(selfAddressStr + n), n + 1)
else select(acc + consistentHash.nodeFor(old.selfAddressStr + n), n + 1)
}
if (nrOfPeers >= allSize) all
else select(Set.empty[Address], 0)
}
val curr = selectPeers
// start ending process for nodes not selected any more
// abort ending process for nodes that have been selected again
val end = old.ending ++ (old.current -- curr).map(_ -> 0) -- curr
old.copy(consistentHash = consistentHash, all = all, current = curr, ending = end,
joinInProgress = old.joinInProgress -- all)
}
}
/**
* INTERNAL API
*
* State used by [akka.cluster.ClusterHeartbeatSender].
* The initial state is created with `empty` in the of
* the companion object, thereafter the state is modified
* with the methods, such as `addMember`. It is immutable,
* i.e. the methods return new instances.
*/
private[cluster] case class ClusterHeartbeatSenderState private (
consistentHash: ConsistentHash[Address],
selfAddressStr: String,
monitoredByNrOfMembers: Int,
all: Set[Address] = Set.empty,
current: Set[Address] = Set.empty,
ending: Map[Address, Int] = Map.empty,
joinInProgress: Map[Address, Deadline] = Map.empty) {
// FIXME can be disabled as optimization
assertInvariants
private def assertInvariants: Unit = {
val currentAndEnding = current.intersect(ending.keySet)
require(currentAndEnding.isEmpty,
"Same nodes in current and ending not allowed, got [%s]" format currentAndEnding)
val joinInProgressAndAll = joinInProgress.keySet.intersect(all)
require(joinInProgressAndAll.isEmpty,
"Same nodes in joinInProgress and all not allowed, got [%s]" format joinInProgressAndAll)
val currentNotInAll = current -- all
require(currentNotInAll.isEmpty,
"Nodes in current but not in all not allowed, got [%s]" format currentNotInAll)
require(all.isEmpty == consistentHash.isEmpty, "ConsistentHash doesn't correspond to all nodes [%s]"
format all)
}
val active: Set[Address] = current ++ joinInProgress.keySet
def reset(nodes: Set[Address]): ClusterHeartbeatSenderState =
ClusterHeartbeatSenderState(this, consistentHash = ConsistentHash(nodes, consistentHash.virtualNodesFactor),
all = nodes)
def addMember(a: Address): ClusterHeartbeatSenderState =
ClusterHeartbeatSenderState(this, all = all + a, consistentHash = consistentHash :+ a)
def removeMember(a: Address): ClusterHeartbeatSenderState =
ClusterHeartbeatSenderState(this, all = all - a, consistentHash = consistentHash :- a)
def removeJoinInProgress(address: Address): ClusterHeartbeatSenderState = {
if (joinInProgress contains address)
copy(joinInProgress = joinInProgress - address, ending = ending + (address -> 0))
else this
}
def addJoinInProgress(address: Address, deadline: Deadline): ClusterHeartbeatSenderState = {
if (all contains address) this
else copy(joinInProgress = joinInProgress + (address -> deadline))
}
/**
* Cleanup overdue joinInProgress, in case a joining node never
* became member, for some reason.
*/
def removeOverdueJoinInProgress(): Unit = {
def removeOverdueJoinInProgress(): ClusterHeartbeatSenderState = {
val overdue = joinInProgress collect { case (address, deadline) if deadline.isOverdue address }
if (overdue.nonEmpty) {
log.info("Overdue join in progress [{}]", overdue.mkString(", "))
ending ++= overdue.map(_ -> 0)
joinInProgress --= overdue
}
if (overdue.isEmpty) this
else
copy(ending = ending ++ overdue.map(_ -> 0), joinInProgress = joinInProgress -- overdue)
}
def removeEnding(a: Address): ClusterHeartbeatSenderState = copy(ending = ending - a)
def increaseEndingCount(a: Address): ClusterHeartbeatSenderState = copy(ending = ending + (a -> (ending(a) + 1)))
}
/**

View file

@ -0,0 +1,88 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import akka.actor.Address
import akka.routing.ConsistentHash
import scala.concurrent.util.Deadline
import scala.concurrent.util.duration._
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ClusterHeartbeatSenderStateSpec extends WordSpec with MustMatchers {
val selfAddress = Address("akka", "sys", "myself", 2552)
val aa = Address("akka", "sys", "aa", 2552)
val bb = Address("akka", "sys", "bb", 2552)
val cc = Address("akka", "sys", "cc", 2552)
val dd = Address("akka", "sys", "dd", 2552)
val ee = Address("akka", "sys", "ee", 2552)
val emptyState = ClusterHeartbeatSenderState.empty(ConsistentHash(Seq.empty[Address], 10),
selfAddress.toString, 3)
"A ClusterHeartbeatSenderState" must {
"return empty active set when no nodes" in {
emptyState.active.isEmpty must be(true)
}
"include joinInProgress in active set" in {
val s = emptyState.addJoinInProgress(aa, Deadline.now + 30.seconds)
s.joinInProgress.keySet must be(Set(aa))
s.active must be(Set(aa))
}
"remove joinInProgress from active set after removeOverdueJoinInProgress" in {
val s = emptyState.addJoinInProgress(aa, Deadline.now - 30.seconds).removeOverdueJoinInProgress()
s.joinInProgress must be(Map.empty)
s.active must be(Set.empty)
}
"remove joinInProgress after reset" in {
val s = emptyState.addJoinInProgress(aa, Deadline.now - 30.seconds).reset(Set(aa, bb))
s.joinInProgress must be(Map.empty)
}
"include nodes from reset in active set" in {
val nodes = Set(aa, bb, cc)
val s = emptyState.reset(nodes)
s.all must be(nodes)
s.current must be(nodes)
s.ending must be(Map.empty)
s.active must be(nodes)
}
"limit current nodes to monitoredByNrOfMembers when adding members" in {
val nodes = Set(aa, bb, cc, dd)
val s = nodes.foldLeft(emptyState) { _ addMember _ }
s.all must be(nodes)
s.current.size must be(3)
s.addMember(ee).current.size must be(3)
}
"move meber to ending set when removing member" in {
val nodes = Set(aa, bb, cc, dd, ee)
val s = emptyState.reset(nodes)
s.ending must be(Map.empty)
val included = s.current.head
val s2 = s.removeMember(included)
s2.ending must be(Map(included -> 0))
s2.current must not contain (included)
val s3 = s2.addMember(included)
s3.current must contain(included)
s3.ending.keySet must not contain (included)
}
"increase ending count correctly" in {
val s = emptyState.reset(Set(aa)).removeMember(aa)
s.ending must be(Map(aa -> 0))
val s2 = s.increaseEndingCount(aa).increaseEndingCount(aa)
s2.ending must be(Map(aa -> 2))
}
}
}