diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala index 79c31cda33..d3bef92e6c 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala @@ -18,7 +18,7 @@ import java.util.Arrays * hash, i.e. make sure it is different for different nodes. * */ -class ConsistentHash[T: ClassTag] private (nodes: SortedMap[Int, T], virtualNodesFactor: Int) { +class ConsistentHash[T: ClassTag] private (nodes: SortedMap[Int, T], val virtualNodesFactor: Int) { import ConsistentHash._ diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala index 561f3c85ba..325e0aae25 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -89,13 +89,9 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg val selfHeartbeat = Heartbeat(selfAddress) val selfEndHeartbeat = EndHeartbeat(selfAddress) - val selfAddressStr = selfAddress.toString - var all = Set.empty[Address] - var current = Set.empty[Address] - var ending = Map.empty[Address, Int] - var joinInProgress = Map.empty[Address, Deadline] - var consistentHash = ConsistentHash(Seq.empty[Address], HeartbeatConsistentHashingVirtualNodesFactor) + var state = ClusterHeartbeatSenderState.empty(ConsistentHash(Seq.empty[Address], HeartbeatConsistentHashingVirtualNodesFactor), + selfAddress.toString, MonitoredByNrOfMembers) // start periodic heartbeat to other nodes in cluster val heartbeatTask = scheduler.schedule(PeriodicTasksInitialDelay.max(HeartbeatInterval).asInstanceOf[FiniteDuration], @@ -115,47 +111,31 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg context.actorFor(RootActorPath(address) / "system" / "cluster" / "heartbeatReceiver") def receive = { - case HeartbeatTick ⇒ heartbeat() - case state: CurrentClusterState ⇒ init(state) - case MemberUnreachable(m) ⇒ removeMember(m) - case MemberRemoved(m) ⇒ removeMember(m) - case e: MemberEvent ⇒ addMember(e.member) - case JoinInProgress(a, d) ⇒ addJoinInProgress(a, d) + case HeartbeatTick ⇒ heartbeat() + case s: CurrentClusterState ⇒ reset(s) + case MemberUnreachable(m) ⇒ removeMember(m) + case MemberRemoved(m) ⇒ removeMember(m) + case e: MemberEvent ⇒ addMember(e.member) + case JoinInProgress(a, d) ⇒ addJoinInProgress(a, d) } - def init(state: CurrentClusterState): Unit = { - all = state.members.collect { case m if m.address != selfAddress ⇒ m.address } - joinInProgress --= all - consistentHash = ConsistentHash(all, HeartbeatConsistentHashingVirtualNodesFactor) - update() - } + def reset(snapshot: CurrentClusterState): Unit = + state = state.reset(snapshot.members.collect { case m if m.address != selfAddress ⇒ m.address }) - def addMember(m: Member): Unit = if (m.address != selfAddress) { - all += m.address - consistentHash = consistentHash :+ m.address - removeJoinInProgress(m.address) - update() - } + def addMember(m: Member): Unit = if (m.address != selfAddress) + state = state addMember m.address - def removeMember(m: Member): Unit = if (m.address != selfAddress) { - all -= m.address - consistentHash = consistentHash :- m.address - removeJoinInProgress(m.address) - update() - } + def removeMember(m: Member): Unit = if (m.address != selfAddress) + state = state removeMember m.address - def removeJoinInProgress(address: Address): Unit = if (joinInProgress contains address) { - joinInProgress -= address - ending += (address -> 0) - } + def removeJoinInProgress(address: Address): Unit = if (address != selfAddress) + state = state.removeJoinInProgress(address) - def addJoinInProgress(address: Address, deadline: Deadline): Unit = { - if (address != selfAddress && !all.contains(address)) - joinInProgress += (address -> deadline) - } + def addJoinInProgress(address: Address, deadline: Deadline): Unit = if (address != selfAddress) + state = state.addJoinInProgress(address, deadline) def heartbeat(): Unit = { - removeOverdueJoinInProgress() + state = state.removeOverdueJoinInProgress() def connection(to: Address): ActorRef = { // URL encoded target address as child actor name @@ -168,67 +148,145 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg } val deadline = Deadline.now + HeartbeatInterval - (current ++ joinInProgress.keys) foreach { to ⇒ connection(to) ! SendHeartbeat(selfHeartbeat, to, deadline) } + state.active foreach { to ⇒ connection(to) ! SendHeartbeat(selfHeartbeat, to, deadline) } // When sending heartbeats to a node is stopped a few `EndHeartbeat` messages is // sent to notify it that no more heartbeats will be sent. - for ((to, count) ← ending) { + for ((to, count) ← state.ending) { val c = connection(to) c ! SendEndHeartbeat(selfEndHeartbeat, to) if (count == NumberOfEndHeartbeats) { - ending -= to + state = state.removeEnding(to) c ! PoisonPill - } else { - ending += (to -> (count + 1)) - } + } else + state = state.increaseEndingCount(to) } } +} + +/** + * INTERNAL API + */ +private[cluster] object ClusterHeartbeatSenderState { /** - * Update current peers to send heartbeats to, and + * Initial, empty state + */ + def empty(consistentHash: ConsistentHash[Address], selfAddressStr: String, + monitoredByNrOfMembers: Int): ClusterHeartbeatSenderState = + ClusterHeartbeatSenderState(consistentHash, selfAddressStr, monitoredByNrOfMembers) + + /** + * Create a new state based on previous state, and * keep track of which nodes to stop sending heartbeats to. */ - def update(): Unit = { - val previous = current - current = selectPeers + private def apply( + old: ClusterHeartbeatSenderState, + consistentHash: ConsistentHash[Address], + all: Set[Address]): ClusterHeartbeatSenderState = { + + /** + * Select a few peers that heartbeats will be sent to, i.e. that will + * monitor this node. Try to send heartbeats to same nodes as much + * as possible, but re-balance with consistent hashing algorithm when + * new members are added or removed. + */ + def selectPeers: Set[Address] = { + val allSize = all.size + val nrOfPeers = math.min(allSize, old.monitoredByNrOfMembers) + // try more if consistentHash results in same node as already selected + val attemptLimit = nrOfPeers * 2 + @tailrec def select(acc: Set[Address], n: Int): Set[Address] = { + if (acc.size == nrOfPeers || n == attemptLimit) acc + else select(acc + consistentHash.nodeFor(old.selfAddressStr + n), n + 1) + } + if (nrOfPeers >= allSize) all + else select(Set.empty[Address], 0) + } + + val curr = selectPeers // start ending process for nodes not selected any more - ending ++= (previous -- current).map(_ -> 0) // abort ending process for nodes that have been selected again - ending --= current + val end = old.ending ++ (old.current -- curr).map(_ -> 0) -- curr + old.copy(consistentHash = consistentHash, all = all, current = curr, ending = end, + joinInProgress = old.joinInProgress -- all) } - /** - * Select a few peers that heartbeats will be sent to, i.e. that will - * monitor this node. Try to send heartbeats to same nodes as much - * as possible, but re-balance with consistent hashing algorithm when - * new members are added or removed. - */ - def selectPeers: Set[Address] = { - val allSize = all.size - val nrOfPeers = math.min(allSize, MonitoredByNrOfMembers) - // try more if consistentHash results in same node as already selected - val attemptLimit = nrOfPeers * 2 - @tailrec def select(acc: Set[Address], n: Int): Set[Address] = { - if (acc.size == nrOfPeers || n == attemptLimit) acc - else select(acc + consistentHash.nodeFor(selfAddressStr + n), n + 1) - } - if (nrOfPeers >= allSize) all - else select(Set.empty[Address], 0) +} + +/** + * INTERNAL API + * + * State used by [akka.cluster.ClusterHeartbeatSender]. + * The initial state is created with `empty` in the of + * the companion object, thereafter the state is modified + * with the methods, such as `addMember`. It is immutable, + * i.e. the methods return new instances. + */ +private[cluster] case class ClusterHeartbeatSenderState private ( + consistentHash: ConsistentHash[Address], + selfAddressStr: String, + monitoredByNrOfMembers: Int, + all: Set[Address] = Set.empty, + current: Set[Address] = Set.empty, + ending: Map[Address, Int] = Map.empty, + joinInProgress: Map[Address, Deadline] = Map.empty) { + + // FIXME can be disabled as optimization + assertInvariants + + private def assertInvariants: Unit = { + val currentAndEnding = current.intersect(ending.keySet) + require(currentAndEnding.isEmpty, + "Same nodes in current and ending not allowed, got [%s]" format currentAndEnding) + val joinInProgressAndAll = joinInProgress.keySet.intersect(all) + require(joinInProgressAndAll.isEmpty, + "Same nodes in joinInProgress and all not allowed, got [%s]" format joinInProgressAndAll) + val currentNotInAll = current -- all + require(currentNotInAll.isEmpty, + "Nodes in current but not in all not allowed, got [%s]" format currentNotInAll) + require(all.isEmpty == consistentHash.isEmpty, "ConsistentHash doesn't correspond to all nodes [%s]" + format all) + } + + val active: Set[Address] = current ++ joinInProgress.keySet + + def reset(nodes: Set[Address]): ClusterHeartbeatSenderState = + ClusterHeartbeatSenderState(this, consistentHash = ConsistentHash(nodes, consistentHash.virtualNodesFactor), + all = nodes) + + def addMember(a: Address): ClusterHeartbeatSenderState = + ClusterHeartbeatSenderState(this, all = all + a, consistentHash = consistentHash :+ a) + + def removeMember(a: Address): ClusterHeartbeatSenderState = + ClusterHeartbeatSenderState(this, all = all - a, consistentHash = consistentHash :- a) + + def removeJoinInProgress(address: Address): ClusterHeartbeatSenderState = { + if (joinInProgress contains address) + copy(joinInProgress = joinInProgress - address, ending = ending + (address -> 0)) + else this + } + + def addJoinInProgress(address: Address, deadline: Deadline): ClusterHeartbeatSenderState = { + if (all contains address) this + else copy(joinInProgress = joinInProgress + (address -> deadline)) } /** * Cleanup overdue joinInProgress, in case a joining node never * became member, for some reason. */ - def removeOverdueJoinInProgress(): Unit = { + def removeOverdueJoinInProgress(): ClusterHeartbeatSenderState = { val overdue = joinInProgress collect { case (address, deadline) if deadline.isOverdue ⇒ address } - if (overdue.nonEmpty) { - log.info("Overdue join in progress [{}]", overdue.mkString(", ")) - ending ++= overdue.map(_ -> 0) - joinInProgress --= overdue - } + if (overdue.isEmpty) this + else + copy(ending = ending ++ overdue.map(_ -> 0), joinInProgress = joinInProgress -- overdue) } + def removeEnding(a: Address): ClusterHeartbeatSenderState = copy(ending = ending - a) + + def increaseEndingCount(a: Address): ClusterHeartbeatSenderState = copy(ending = ending + (a -> (ending(a) + 1))) + } /** diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala new file mode 100644 index 0000000000..bd378ed0fe --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala @@ -0,0 +1,88 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers +import akka.actor.Address +import akka.routing.ConsistentHash +import scala.concurrent.util.Deadline +import scala.concurrent.util.duration._ + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class ClusterHeartbeatSenderStateSpec extends WordSpec with MustMatchers { + + val selfAddress = Address("akka", "sys", "myself", 2552) + val aa = Address("akka", "sys", "aa", 2552) + val bb = Address("akka", "sys", "bb", 2552) + val cc = Address("akka", "sys", "cc", 2552) + val dd = Address("akka", "sys", "dd", 2552) + val ee = Address("akka", "sys", "ee", 2552) + + val emptyState = ClusterHeartbeatSenderState.empty(ConsistentHash(Seq.empty[Address], 10), + selfAddress.toString, 3) + + "A ClusterHeartbeatSenderState" must { + + "return empty active set when no nodes" in { + emptyState.active.isEmpty must be(true) + } + + "include joinInProgress in active set" in { + val s = emptyState.addJoinInProgress(aa, Deadline.now + 30.seconds) + s.joinInProgress.keySet must be(Set(aa)) + s.active must be(Set(aa)) + } + + "remove joinInProgress from active set after removeOverdueJoinInProgress" in { + val s = emptyState.addJoinInProgress(aa, Deadline.now - 30.seconds).removeOverdueJoinInProgress() + s.joinInProgress must be(Map.empty) + s.active must be(Set.empty) + } + + "remove joinInProgress after reset" in { + val s = emptyState.addJoinInProgress(aa, Deadline.now - 30.seconds).reset(Set(aa, bb)) + s.joinInProgress must be(Map.empty) + } + + "include nodes from reset in active set" in { + val nodes = Set(aa, bb, cc) + val s = emptyState.reset(nodes) + s.all must be(nodes) + s.current must be(nodes) + s.ending must be(Map.empty) + s.active must be(nodes) + } + + "limit current nodes to monitoredByNrOfMembers when adding members" in { + val nodes = Set(aa, bb, cc, dd) + val s = nodes.foldLeft(emptyState) { _ addMember _ } + s.all must be(nodes) + s.current.size must be(3) + s.addMember(ee).current.size must be(3) + } + + "move meber to ending set when removing member" in { + val nodes = Set(aa, bb, cc, dd, ee) + val s = emptyState.reset(nodes) + s.ending must be(Map.empty) + val included = s.current.head + val s2 = s.removeMember(included) + s2.ending must be(Map(included -> 0)) + s2.current must not contain (included) + val s3 = s2.addMember(included) + s3.current must contain(included) + s3.ending.keySet must not contain (included) + } + + "increase ending count correctly" in { + val s = emptyState.reset(Set(aa)).removeMember(aa) + s.ending must be(Map(aa -> 0)) + val s2 = s.increaseEndingCount(aa).increaseEndingCount(aa) + s2.ending must be(Map(aa -> 2)) + } + + } +}