diff --git a/.gitignore b/.gitignore index f646a4c173..18fb5d762a 100755 --- a/.gitignore +++ b/.gitignore @@ -67,3 +67,4 @@ redis/ beanstalk/ .scalastyle bin/ +.worksheet diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 4347f6c0b0..a1215f4563 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -78,6 +78,10 @@ akka { # how often should the node send out heartbeats? heartbeat-interval = 1s + # Number of member nodes that each member will send heartbeat messages to, + # i.e. each node will be monitored by this number of other nodes. + monitored-by-nr-of-members = 5 + # defines the failure detector threshold # A low threshold is prone to generate many wrong suspicions but ensures # a quick detection in the event of a real crash. Conversely, a high diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 25b1cd684b..6863b1224e 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -62,7 +62,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { val settings = new ClusterSettings(system.settings.config, system.name) import settings._ - val selfAddress = system.provider match { + val selfAddress: Address = system.provider match { case c: ClusterActorRefProvider ⇒ c.transport.address case other ⇒ throw new ConfigurationException( "ActorSystem [%s] needs to have a 'ClusterActorRefProvider' enabled in the configuration, currently uses [%s]". @@ -74,7 +74,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { log.info("Cluster Node [{}] - is starting up...", selfAddress) - val failureDetector = { + val failureDetector: FailureDetector = { import settings.{ FailureDetectorImplementationClass ⇒ fqcn } system.dynamicAccess.createInstanceFor[FailureDetector]( fqcn, Seq(classOf[ActorSystem] -> system, classOf[ClusterSettings] -> settings)).recover({ diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 10bcd9ee6a..1b4398feeb 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -287,8 +287,8 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto // wipe the failure detector since we are starting fresh and shouldn't care about the past failureDetector.reset() - heartbeatSender ! JoinInProgress(address, Deadline.now + JoinTimeout) publish(localGossip) + heartbeatSender ! JoinInProgress(address, Deadline.now + JoinTimeout) context.become(initialized) if (address == selfAddress) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala index 734c923df5..4e80223dbf 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -5,18 +5,32 @@ package akka.cluster import language.postfixOps import scala.collection.immutable.SortedSet -import akka.actor.{ ReceiveTimeout, ActorLogging, ActorRef, Address, Actor, RootActorPath, Props } -import akka.pattern.{ CircuitBreaker, CircuitBreakerOpenException } +import scala.annotation.tailrec import scala.concurrent.util.duration._ import scala.concurrent.util.Deadline import scala.concurrent.util.FiniteDuration -import akka.cluster.ClusterEvent._ import java.net.URLEncoder +import akka.actor.{ ActorLogging, ActorRef, Address, Actor, RootActorPath, PoisonPill, Props } +import akka.pattern.{ CircuitBreaker, CircuitBreakerOpenException } +import akka.cluster.ClusterEvent._ +import akka.routing.ConsistentHash /** - * Sent at regular intervals for failure detection. + * INTERNAL API */ -case class Heartbeat(from: Address) extends ClusterMessage +private[akka] object ClusterHeartbeatReceiver { + /** + * Sent at regular intervals for failure detection. + */ + case class Heartbeat(from: Address) extends ClusterMessage + + /** + * Tell failure detector at receiving side that it should + * remove the monitoring, because heartbeats will end from + * this node. + */ + case class EndHeartbeat(from: Address) extends ClusterMessage +} /** * INTERNAL API. @@ -26,11 +40,13 @@ case class Heartbeat(from: Address) extends ClusterMessage * to Cluster message after message, but concurrent with other types of messages. */ private[cluster] final class ClusterHeartbeatReceiver extends Actor with ActorLogging { + import ClusterHeartbeatReceiver._ val failureDetector = Cluster(context.system).failureDetector def receive = { - case Heartbeat(from) ⇒ failureDetector heartbeat from + case Heartbeat(from) ⇒ failureDetector heartbeat from + case EndHeartbeat(from) ⇒ failureDetector remove from } } @@ -39,16 +55,11 @@ private[cluster] final class ClusterHeartbeatReceiver extends Actor with ActorLo * INTERNAL API */ private[cluster] object ClusterHeartbeatSender { - /** - * Command to [akka.cluster.ClusterHeartbeatSenderWorker]], which will send [[akka.cluster.Heartbeat]] - * to the other node. - * Local only, no need to serialize. - */ - case class SendHeartbeat(heartbeatMsg: Heartbeat, to: Address, deadline: Deadline) - /** * Tell [akka.cluster.ClusterHeartbeatSender]] that this node has started joining of - * another node and heartbeats should be sent until it becomes member or deadline is overdue. + * another node and heartbeats should be sent unconditionally until it becomes + * member or deadline is overdue. This is done to be able to detect immediate death + * of the joining node. * Local only, no need to serialize. */ case class JoinInProgress(address: Address, deadline: Deadline) @@ -58,14 +69,17 @@ private[cluster] object ClusterHeartbeatSender { * INTERNAL API * * This actor is responsible for sending the heartbeat messages to - * other nodes. Netty blocks when sending to broken connections. This actor - * isolates sending to different nodes by using child workers for each target + * a few other nodes that will monitor this node. + * + * Netty blocks when sending to broken connections. This actor + * isolates sending to different nodes by using child actors for each target * address and thereby reduce the risk of irregular heartbeats to healty * nodes due to broken connections to other nodes. */ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogging { import ClusterHeartbeatSender._ - import Member.addressOrdering + import ClusterHeartbeatSenderConnection._ + import ClusterHeartbeatReceiver._ import InternalClusterAction.HeartbeatTick val cluster = Cluster(context.system) @@ -74,9 +88,14 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg import context.dispatcher val selfHeartbeat = Heartbeat(selfAddress) + val selfEndHeartbeat = EndHeartbeat(selfAddress) + val selfAddressStr = selfAddress.toString - var nodes: SortedSet[Address] = SortedSet.empty - var joinInProgress: Map[Address, Deadline] = Map.empty + var all = Set.empty[Address] + var current = Set.empty[Address] + var ending = Map.empty[Address, Int] + var joinInProgress = Map.empty[Address, Deadline] + var consistentHash = ConsistentHash(Seq.empty[Address], HeartbeatConsistentHashingVirtualNodesFactor) // start periodic heartbeat to other nodes in cluster val heartbeatTask = @@ -99,63 +118,146 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg def clusterHeartbeatConnectionFor(address: Address): ActorRef = context.actorFor(RootActorPath(address) / "system" / "cluster" / "heartbeatReceiver") - /** - * Child name URL encoded target address. - */ - def encodeChildName(name: String): String = URLEncoder.encode(name, "UTF-8") - def receive = { + case HeartbeatTick ⇒ heartbeat() case state: CurrentClusterState ⇒ init(state) case MemberUnreachable(m) ⇒ removeMember(m) case MemberRemoved(m) ⇒ removeMember(m) case e: MemberEvent ⇒ addMember(e.member) - case JoinInProgress(a, d) ⇒ joinInProgress += (a -> d) - case HeartbeatTick ⇒ heartbeat() + case JoinInProgress(a, d) ⇒ addJoinInProgress(a, d) } def init(state: CurrentClusterState): Unit = { - nodes = state.members.map(_.address) - joinInProgress --= nodes + all = state.members.collect { case m if m.address != selfAddress ⇒ m.address } + joinInProgress --= all + consistentHash = ConsistentHash(all, HeartbeatConsistentHashingVirtualNodesFactor) } - def addMember(m: Member): Unit = { - nodes += m.address - joinInProgress -= m.address + def addMember(m: Member): Unit = if (m.address != selfAddress) { + all += m.address + consistentHash = consistentHash :+ m.address + removeJoinInProgress(m.address) + update() } - def removeMember(m: Member): Unit = { - nodes -= m.address - joinInProgress -= m.address + def removeMember(m: Member): Unit = if (m.address != selfAddress) { + all -= m.address + consistentHash = consistentHash :- m.address + removeJoinInProgress(m.address) + update() + } + + def removeJoinInProgress(address: Address): Unit = if (joinInProgress contains address) { + joinInProgress -= address + ending += (address -> 0) + } + + def addJoinInProgress(address: Address, deadline: Deadline): Unit = { + if (address != selfAddress && !all.contains(address)) + joinInProgress += (address -> deadline) } def heartbeat(): Unit = { removeOverdueJoinInProgress() - val beatTo = nodes ++ joinInProgress.keys - - val deadline = Deadline.now + HeartbeatInterval - for (to ← beatTo; if to != selfAddress) { - val workerName = encodeChildName(to.toString) - val worker = context.actorFor(workerName) match { + def connection(to: Address): ActorRef = { + // URL encoded target address as child actor name + val connectionName = URLEncoder.encode(to.toString, "UTF-8") + context.actorFor(connectionName) match { case notFound if notFound.isTerminated ⇒ - context.actorOf(Props(new ClusterHeartbeatSenderWorker(clusterHeartbeatConnectionFor(to))), workerName) + context.actorOf(Props(new ClusterHeartbeatSenderConnection(clusterHeartbeatConnectionFor(to))), connectionName) case child ⇒ child } - worker ! SendHeartbeat(selfHeartbeat, to, deadline) + } + + val deadline = Deadline.now + HeartbeatInterval + (current ++ joinInProgress.keys) foreach { to ⇒ connection(to) ! SendHeartbeat(selfHeartbeat, to, deadline) } + + // When sending heartbeats to a node is stopped a few `EndHeartbeat` messages is + // sent to notify it that no more heartbeats will be sent. + for ((to, count) ← ending) { + val c = connection(to) + c ! SendEndHeartbeat(selfEndHeartbeat, to) + if (count == NumberOfEndHeartbeats) { + ending -= to + c ! PoisonPill + } else { + ending += (to -> (count + 1)) + } } } /** - * Removes overdue joinInProgress from State. + * Update current peers to send heartbeats to, and + * keep track of which nodes to stop sending heartbeats to. + */ + def update(): Unit = { + val previous = current + current = selectPeers + // start ending process for nodes not selected any more + ending ++= (previous -- current).map(_ -> 0) + // abort ending process for nodes that have been selected again + ending --= current + } + + /** + * Select a few peers that heartbeats will be sent to, i.e. that will + * monitor this node. Try to send heartbeats to same nodes as much + * as possible, but re-balance with consistent hashing algorithm when + * new members are added or removed. + */ + def selectPeers: Set[Address] = { + val allSize = all.size + val nrOfPeers = math.min(allSize, MonitoredByNrOfMembers) + // try more if consistentHash results in same node as already selected + val attemptLimit = nrOfPeers * 2 + @tailrec def select(acc: Set[Address], n: Int): Set[Address] = { + if (acc.size == nrOfPeers || n == attemptLimit) acc + else select(acc + consistentHash.nodeFor(selfAddressStr + n), n + 1) + } + if (nrOfPeers >= allSize) all + else select(Set.empty[Address], 0) + } + + /** + * Cleanup overdue joinInProgress, in case a joining node never + * became member, for some reason. */ def removeOverdueJoinInProgress(): Unit = { - joinInProgress --= joinInProgress collect { case (address, deadline) if (nodes contains address) || deadline.isOverdue ⇒ address } + val overdue = joinInProgress collect { case (address, deadline) if deadline.isOverdue ⇒ address } + if (overdue.nonEmpty) { + log.info("Overdue join in progress [{}]", overdue.mkString(", ")) + ending ++= overdue.map(_ -> 0) + joinInProgress --= overdue + } } } /** - * Responsible for sending [[akka.cluster.Heartbeat]] to one specific address. + * INTERNAL API + */ +private[cluster] object ClusterHeartbeatSenderConnection { + import ClusterHeartbeatReceiver._ + + /** + * Command to [akka.cluster.ClusterHeartbeatSenderConnection]], which will send + * [[akka.cluster.ClusterHeartbeatReceiver.Heartbeat]] to the other node. + * Local only, no need to serialize. + */ + case class SendHeartbeat(heartbeatMsg: Heartbeat, to: Address, deadline: Deadline) + + /** + * Command to [akka.cluster.ClusterHeartbeatSenderConnection]], which will send + * [[akka.cluster.ClusterHeartbeatReceiver.EndHeartbeat]] to the other node. + * Local only, no need to serialize. + */ + case class SendEndHeartbeat(endHeartbeatMsg: EndHeartbeat, to: Address) +} + +/** + * Responsible for sending [[akka.cluster.ClusterHeartbeatReceiver.Heartbeat]] + * and [[akka.cluster.ClusterHeartbeatReceiver.EndHeartbeat]] to one specific address. * * Netty blocks when sending to broken connections, and this actor uses * a configurable circuit breaker to reduce connect attempts to broken @@ -163,10 +265,10 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg * * @see ClusterHeartbeatSender */ -private[cluster] final class ClusterHeartbeatSenderWorker(toRef: ActorRef) +private[cluster] final class ClusterHeartbeatSenderConnection(toRef: ActorRef) extends Actor with ActorLogging { - import ClusterHeartbeatSender._ + import ClusterHeartbeatSenderConnection._ val breaker = { val cbSettings = Cluster(context.system).settings.SendCircuitBreakerSettings @@ -177,21 +279,19 @@ private[cluster] final class ClusterHeartbeatSenderWorker(toRef: ActorRef) onClose(log.debug("CircuitBreaker Closed for [{}]", toRef)) } - // make sure it will cleanup when not used any more - context.setReceiveTimeout(30 seconds) - def receive = { case SendHeartbeat(heartbeatMsg, _, deadline) ⇒ if (!deadline.isOverdue) { + log.debug("Cluster Node [{}] - Heartbeat to [{}]", heartbeatMsg.from, toRef) // the CircuitBreaker will measure elapsed time and open if too many long calls try breaker.withSyncCircuitBreaker { - log.debug("Cluster Node [{}] - Heartbeat to [{}]", heartbeatMsg.from, toRef) toRef ! heartbeatMsg - if (deadline.isOverdue) log.debug("Sending heartbeat to [{}] took longer than expected", toRef) } catch { case e: CircuitBreakerOpenException ⇒ /* skip sending heartbeat to broken connection */ } } - - case ReceiveTimeout ⇒ context.stop(self) // cleanup when not used - + if (deadline.isOverdue) log.debug("Sending heartbeat to [{}] took longer than expected", toRef) + case SendEndHeartbeat(endHeartbeatMsg, _) ⇒ + log.debug("Cluster Node [{}] - EndHeartbeat to [{}]", endHeartbeatMsg.from, toRef) + toRef ! endHeartbeatMsg } + } \ No newline at end of file diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index 6110df034a..38a3f4554c 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -24,6 +24,9 @@ class ClusterSettings(val config: Config, val systemName: String) { final val FailureDetectorAcceptableHeartbeatPause: FiniteDuration = Duration(getMilliseconds("akka.cluster.failure-detector.acceptable-heartbeat-pause"), MILLISECONDS) final val HeartbeatInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.failure-detector.heartbeat-interval"), MILLISECONDS) + final val HeartbeatConsistentHashingVirtualNodesFactor = 10 // no need for configuration + final val NumberOfEndHeartbeats: Int = (FailureDetectorAcceptableHeartbeatPause / HeartbeatInterval + 1).toInt + final val MonitoredByNrOfMembers = getInt("akka.cluster.failure-detector.monitored-by-nr-of-members") final val SeedNodes: IndexedSeq[Address] = getStringList("akka.cluster.seed-nodes").asScala.map { case AddressFromURIString(addr) ⇒ addr diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala index e5c72e642b..5f1edfc6db 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala @@ -42,7 +42,7 @@ object LargeClusterMultiJvmSpec extends MultiNodeConfig { gossip-interval = 500 ms auto-join = off auto-down = on - failure-detector.acceptable-heartbeat-pause = 10s + failure-detector.acceptable-heartbeat-pause = 5s publish-stats-interval = 0 s # always, when it happens } akka.event-handlers = ["akka.testkit.TestEventListener"] @@ -57,7 +57,9 @@ object LargeClusterMultiJvmSpec extends MultiNodeConfig { akka.scheduler.tick-duration = 33 ms akka.remote.log-remote-lifecycle-events = off akka.remote.netty.execution-pool-size = 4 - #akka.remote.netty.reconnection-time-window = 1s + #akka.remote.netty.reconnection-time-window = 10s + akka.remote.netty.read-timeout = 5s + akka.remote.netty.write-timeout = 5s akka.remote.netty.backoff-timeout = 500ms akka.remote.netty.connection-timeout = 500ms diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index be5ae74e4d..d83753fb00 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -29,6 +29,8 @@ class ClusterConfigSpec extends AkkaSpec { PeriodicTasksInitialDelay must be(1 seconds) GossipInterval must be(1 second) HeartbeatInterval must be(1 second) + NumberOfEndHeartbeats must be(4) + MonitoredByNrOfMembers must be(5) LeaderActionsInterval must be(1 second) UnreachableNodesReaperInterval must be(1 second) PublishStatsInterval must be(10 second)