diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 6012c48f45..10bcd9ee6a 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -155,8 +155,8 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac withDispatcher(context.props.dispatcher), name = "publisher") val core = context.actorOf(Props(new ClusterCoreDaemon(publisher)). withDispatcher(context.props.dispatcher), name = "core") - context.actorOf(Props[ClusterHeartbeatDaemon]. - withDispatcher(context.props.dispatcher), name = "heartbeat") + context.actorOf(Props[ClusterHeartbeatReceiver]. + withDispatcher(context.props.dispatcher), name = "heartbeatReceiver") if (settings.MetricsEnabled) context.actorOf(Props(new ClusterMetricsCollector(publisher)). withDispatcher(context.props.dispatcher), name = "metrics") @@ -172,26 +172,24 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Actor with ActorLogging { import ClusterLeaderAction._ import InternalClusterAction._ - import ClusterHeartbeatSender._ + import ClusterHeartbeatSender.JoinInProgress val cluster = Cluster(context.system) import cluster.{ selfAddress, scheduler, failureDetector } import cluster.settings._ val vclockNode = VectorClock.Node(selfAddress.toString) - val selfHeartbeat = Heartbeat(selfAddress) // note that self is not initially member, // and the Gossip is not versioned for this 'Node' yet var latestGossip: Gossip = Gossip() - var joinInProgress: Map[Address, Deadline] = Map.empty var stats = ClusterStats() - val heartbeatSender = context.actorOf(Props[ClusterHeartbeatSender]. - withDispatcher(UseDispatcher), name = "heartbeatSender") val coreSender = context.actorOf(Props[ClusterCoreSender]. withDispatcher(UseDispatcher), name = "coreSender") + val heartbeatSender = context.actorOf(Props[ClusterHeartbeatSender]. + withDispatcher(UseDispatcher), name = "heartbeatSender") import context.dispatcher @@ -201,12 +199,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto self ! GossipTick } - // start periodic heartbeat to all nodes in cluster - val heartbeatTask = - FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(HeartbeatInterval).asInstanceOf[FiniteDuration], HeartbeatInterval) { - self ! HeartbeatTick - } - // start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list) val failureDetectorReaperTask = FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(UnreachableNodesReaperInterval).asInstanceOf[FiniteDuration], UnreachableNodesReaperInterval) { @@ -232,7 +224,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto override def postStop(): Unit = { gossipTask.cancel() - heartbeatTask.cancel() failureDetectorReaperTask.cancel() leaderActionsTask.cancel() publishStatsTask foreach { _.cancel() } @@ -250,7 +241,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto case msg: GossipEnvelope ⇒ receiveGossip(msg) case msg: GossipMergeConflict ⇒ receiveGossipMerge(msg) case GossipTick ⇒ gossip() - case HeartbeatTick ⇒ heartbeat() case ReapUnreachableTick ⇒ reapUnreachableMembers() case LeaderActionsTick ⇒ leaderActions() case PublishStatsTick ⇒ publishInternalStats() @@ -293,11 +283,11 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto val localGossip = latestGossip // wipe our state since a node that joins a cluster must be empty latestGossip = Gossip() - joinInProgress = Map(address -> (Deadline.now + JoinTimeout)) // wipe the failure detector since we are starting fresh and shouldn't care about the past failureDetector.reset() + heartbeatSender ! JoinInProgress(address, Deadline.now + JoinTimeout) publish(localGossip) context.become(initialized) @@ -517,12 +507,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto else if (remoteGossip.version < localGossip.version) localGossip // local gossip is newer else remoteGossip // remote gossip is newer - val newJoinInProgress = - if (joinInProgress.isEmpty) joinInProgress - else joinInProgress -- winningGossip.members.map(_.address) -- winningGossip.overview.unreachable.map(_.address) - latestGossip = winningGossip seen selfAddress - joinInProgress = newJoinInProgress // for all new joining nodes we remove them from the failure detector (latestGossip.members -- localGossip.members).foreach { @@ -744,27 +729,10 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto } } - def heartbeat(): Unit = { - removeOverdueJoinInProgress() - - val beatTo = latestGossip.members.toSeq.map(_.address) ++ joinInProgress.keys - - val deadline = Deadline.now + HeartbeatInterval - beatTo.foreach { address ⇒ if (address != selfAddress) heartbeatSender ! SendHeartbeat(selfHeartbeat, address, deadline) } - } - - /** - * Removes overdue joinInProgress from State. - */ - def removeOverdueJoinInProgress(): Unit = { - joinInProgress --= joinInProgress collect { case (address, deadline) if deadline.isOverdue ⇒ address } - } - /** * Reaps the unreachable members (moves them to the 'unreachable' list in the cluster overview) according to the failure detector's verdict. */ def reapUnreachableMembers(): Unit = { - if (!isSingletonCluster && isAvailable) { // only scrutinize if we are a non-singleton cluster and available diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala index b48c9f066b..b28542bf24 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -4,12 +4,14 @@ package akka.cluster import language.postfixOps - +import scala.collection.immutable.SortedSet import akka.actor.{ ReceiveTimeout, ActorLogging, ActorRef, Address, Actor, RootActorPath, Props } import java.security.MessageDigest import akka.pattern.{ CircuitBreaker, CircuitBreakerOpenException } import scala.concurrent.util.duration._ import scala.concurrent.util.Deadline +import scala.concurrent.util.FiniteDuration +import akka.cluster.ClusterEvent._ /** * Sent at regular intervals for failure detection. @@ -19,11 +21,11 @@ case class Heartbeat(from: Address) extends ClusterMessage /** * INTERNAL API. * - * Receives Heartbeat messages and delegates to Cluster. + * Receives Heartbeat messages and updates failure detector. * Instantiated as a single instance for each Cluster - e.g. heartbeats are serialized * to Cluster message after message, but concurrent with other types of messages. */ -private[cluster] final class ClusterHeartbeatDaemon extends Actor with ActorLogging { +private[cluster] final class ClusterHeartbeatReceiver extends Actor with ActorLogging { val failureDetector = Cluster(context.system).failureDetector @@ -38,12 +40,18 @@ private[cluster] final class ClusterHeartbeatDaemon extends Actor with ActorLogg */ private[cluster] object ClusterHeartbeatSender { /** - * - * Command to [akka.cluster.ClusterHeartbeatSender]], which will send [[akka.cluster.Heartbeat]] + * Command to [akka.cluster.ClusterHeartbeatSenderWorker]], which will send [[akka.cluster.Heartbeat]] * to the other node. * Local only, no need to serialize. */ case class SendHeartbeat(heartbeatMsg: Heartbeat, to: Address, deadline: Deadline) + + /** + * Tell [akka.cluster.ClusterHeartbeatSender]] that this node has started joining of + * another node and heartbeats should be sent until it becomes member or deadline is overdue. + * Local only, no need to serialize. + */ + case class JoinInProgress(address: Address, deadline: Deadline) } /* @@ -57,12 +65,39 @@ private[cluster] object ClusterHeartbeatSender { */ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogging { import ClusterHeartbeatSender._ + import Member.addressOrdering + import InternalClusterAction.HeartbeatTick + + val cluster = Cluster(context.system) + import cluster.{ selfAddress, scheduler } + import cluster.settings._ + import context.dispatcher + + val selfHeartbeat = Heartbeat(selfAddress) + + var nodes: SortedSet[Address] = SortedSet.empty + var joinInProgress: Map[Address, Deadline] = Map.empty + + // start periodic heartbeat to other nodes in cluster + val heartbeatTask = + FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(HeartbeatInterval).asInstanceOf[FiniteDuration], HeartbeatInterval) { + self ! HeartbeatTick + } + + override def preStart(): Unit = { + cluster.subscribe(self, classOf[MemberEvent]) + } + + override def postStop(): Unit = { + heartbeatTask.cancel() + cluster.unsubscribe(self) + } /** * Looks up and returns the remote cluster heartbeat connection for the specific address. */ def clusterHeartbeatConnectionFor(address: Address): ActorRef = - context.actorFor(RootActorPath(address) / "system" / "cluster" / "heartbeat") + context.actorFor(RootActorPath(address) / "system" / "cluster" / "heartbeatReceiver") val digester = MessageDigest.getInstance("MD5") @@ -76,14 +111,51 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg } def receive = { - case msg @ SendHeartbeat(from, to, deadline) ⇒ + case state: CurrentClusterState ⇒ init(state) + case MemberUnreachable(m) ⇒ removeMember(m) + case MemberRemoved(m) ⇒ removeMember(m) + case e: MemberEvent ⇒ addMember(e.member) + case JoinInProgress(a, d) ⇒ joinInProgress += (a -> d) + case HeartbeatTick ⇒ heartbeat() + } + + def init(state: CurrentClusterState): Unit = { + nodes = state.members.map(_.address) + joinInProgress --= nodes + } + + def addMember(m: Member): Unit = { + nodes += m.address + joinInProgress -= m.address + } + + def removeMember(m: Member): Unit = { + nodes -= m.address + joinInProgress -= m.address + } + + def heartbeat(): Unit = { + removeOverdueJoinInProgress() + + val beatTo = nodes ++ joinInProgress.keys + + val deadline = Deadline.now + HeartbeatInterval + for (to ← beatTo; if to != selfAddress) { val workerName = encodeChildName(to.toString) val worker = context.actorFor(workerName) match { case notFound if notFound.isTerminated ⇒ context.actorOf(Props(new ClusterHeartbeatSenderWorker(clusterHeartbeatConnectionFor(to))), workerName) case child ⇒ child } - worker ! msg + worker ! SendHeartbeat(selfHeartbeat, to, deadline) + } + } + + /** + * Removes overdue joinInProgress from State. + */ + def removeOverdueJoinInProgress(): Unit = { + joinInProgress --= joinInProgress collect { case (address, deadline) if (nodes contains address) || deadline.isOverdue ⇒ address } } }