min-nr-of-members and registerOnMemberUp, see #2306

* Leader moves joining members to up when min-nr-of-members reached
* Tested by MinMembersBeforeUpSpec
* Used in factorial sample
* Docs
This commit is contained in:
Patrik Nordwall 2012-12-10 08:46:25 +01:00
parent d642fab666
commit 44ab9f116f
13 changed files with 241 additions and 105 deletions

View file

@ -3,9 +3,12 @@
*/
package akka.cluster
import language.existentials
import language.postfixOps
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.util.control.NonFatal
import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, ReceiveTimeout, RootActorPath, Scheduler }
import akka.actor.Status.Failure
import akka.event.EventStream
@ -13,8 +16,6 @@ import akka.pattern.ask
import akka.util.Timeout
import akka.cluster.MemberStatus._
import akka.cluster.ClusterEvent._
import language.existentials
import language.postfixOps
/**
* Base trait for all cluster messages. All ClusterMessage's are serializable.
@ -104,6 +105,12 @@ private[cluster] object InternalClusterAction {
case object GetClusterCoreRef
/**
* Comand to [[akka.cluster.ClusterDaemon]] to create a
* [[akka.cluster.OnMemberUpListener]].
*/
case class AddOnMemberUpListener(callback: Runnable)
sealed trait SubscriptionMessage
case class Subscribe(subscriber: ActorRef, to: Class[_]) extends SubscriptionMessage
case class Unsubscribe(subscriber: ActorRef, to: Option[Class[_]]) extends SubscriptionMessage
@ -160,6 +167,8 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac
def receive = {
case InternalClusterAction.GetClusterCoreRef sender ! core
case InternalClusterAction.AddOnMemberUpListener(code)
context.actorOf(Props(new OnMemberUpListener(code)))
}
}
@ -602,12 +611,18 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
if (localGossip.convergence) {
// we have convergence - so we can't have unreachable nodes
val numberOfMembers = localMembers.size
def isJoiningToUp(m: Member): Boolean = m.status == Joining && numberOfMembers >= MinNrOfMembers
// transform the node member ring
val newMembers = localMembers collect {
// 1. Move JOINING => UP (once all nodes have seen that this node is JOINING e.g. we have a convergence)
case member if member.status == Joining member copy (status = Up)
// 2. Move LEAVING => EXITING (once we have a convergence on LEAVING *and* if we have a successful partition handoff)
case member if member.status == Leaving && hasPartionHandoffCompletedSuccessfully member copy (status = Exiting)
// 1. Move JOINING => UP (once all nodes have seen that this node is JOINING, i.e. we have a convergence)
// and minimum number of nodes have joined the cluster
case member if isJoiningToUp(member) member copy (status = Up)
// 2. Move LEAVING => EXITING (once we have a convergence on LEAVING
// *and* if we have a successful partition handoff)
case member if member.status == Leaving && hasPartionHandoffCompletedSuccessfully
member copy (status = Exiting)
// 3. Everyone else that is not Exiting stays as they are
case member if member.status != Exiting member
// 4. Move EXITING => REMOVED - e.g. remove the nodes from the 'members' set/node ring and seen table
@ -621,10 +636,10 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
// Repeat the checking for transitions between JOINING -> UP, LEAVING -> EXITING, EXITING -> REMOVED
// to check for state-changes and to store away removed and exiting members for later notification
// 1. check for state-changes to update
// 2. store away removed and exiting members so we can separate the pure state changes (that can be retried on collision) and the side-effecting message sending
// 2. store away removed and exiting members so we can separate the pure state changes
val (removedMembers, newMembers1) = localMembers partition (_.status == Exiting)
val (upMembers, newMembers2) = newMembers1 partition (_.status == Joining)
val (upMembers, newMembers2) = newMembers1 partition (isJoiningToUp(_))
val exitingMembers = newMembers2 filter (_.status == Leaving && hasPartionHandoffCompletedSuccessfully)
@ -877,6 +892,42 @@ private[cluster] final class ClusterCoreSender extends Actor with ActorLogging {
}
}
/**
* INTERNAL API
*
* The supplied callback will be run, once, when current cluster member is `Up`.
*/
private[cluster] class OnMemberUpListener(callback: Runnable) extends Actor with ActorLogging {
import ClusterEvent._
val cluster = Cluster(context.system)
// subscribe to MemberUp, re-subscribe when restart
override def preStart(): Unit =
cluster.subscribe(self, classOf[MemberUp])
override def postStop(): Unit =
cluster.unsubscribe(self)
def receive = {
case state: CurrentClusterState
if (state.members.exists(isSelfUp(_)))
done()
case MemberUp(m)
if (isSelfUp(m))
done()
}
def done(): Unit = {
try callback.run() catch {
case NonFatal(e) log.error(e, "OnMemberUp callback failed with [{}]", e.getMessage)
} finally {
context stop self
}
}
def isSelfUp(m: Member): Boolean =
m.address == cluster.selfAddress && m.status == MemberStatus.Up
}
/**
* INTERNAL API
*/