Merge pull request #936 from akka/wip-2306-min-nodes-startup-patriknw

min-nr-of-members and registerOnMemberUp, see #2306
This commit is contained in:
Patrik Nordwall 2012-12-14 01:36:41 -08:00
commit c325877c6a
13 changed files with 241 additions and 105 deletions

View file

@ -28,6 +28,12 @@ akka {
# formed in case of network partition. # formed in case of network partition.
auto-down = off auto-down = off
# Minimum required number of members before the leader changes member status
# of 'Joining' members to 'Up'. Typically used together with
# 'Cluster.registerOnMemberUp' to defer some action, such as starting actors,
# until the cluster has reached a certain size.
min-nr-of-members = 1
# Enable or disable JMX MBeans for management of the cluster # Enable or disable JMX MBeans for management of the cluster
jmx.enabled = on jmx.enabled = on

View file

@ -232,6 +232,24 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
def down(address: Address): Unit = def down(address: Address): Unit =
clusterCore ! ClusterUserAction.Down(address) clusterCore ! ClusterUserAction.Down(address)
/**
* The supplied thunk will be run, once, when current cluster member is `Up`.
* Typically used together with configuration option `akka.cluster.min-nr-of-members'
* to defer some action, such as starting actors, until the cluster has reached
* a certain size.
*/
def registerOnMemberUp[T](code: T): Unit =
registerOnMemberUp(new Runnable { def run = code })
/**
* The supplied callback will be run, once, when current cluster member is `Up`.
* Typically used together with configuration option `akka.cluster.min-nr-of-members'
* to defer some action, such as starting actors, until the cluster has reached
* a certain size.
* JAVA API
*/
def registerOnMemberUp(callback: Runnable): Unit = clusterDaemons ! InternalClusterAction.AddOnMemberUpListener(callback)
// ======================================================== // ========================================================
// ===================== INTERNAL API ===================== // ===================== INTERNAL API =====================
// ======================================================== // ========================================================
@ -268,4 +286,3 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
} }
} }

View file

@ -3,9 +3,12 @@
*/ */
package akka.cluster package akka.cluster
import language.existentials
import language.postfixOps
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.util.control.NonFatal
import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, ReceiveTimeout, RootActorPath, Scheduler } import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, ReceiveTimeout, RootActorPath, Scheduler }
import akka.actor.Status.Failure import akka.actor.Status.Failure
import akka.event.EventStream import akka.event.EventStream
@ -13,8 +16,6 @@ import akka.pattern.ask
import akka.util.Timeout import akka.util.Timeout
import akka.cluster.MemberStatus._ import akka.cluster.MemberStatus._
import akka.cluster.ClusterEvent._ import akka.cluster.ClusterEvent._
import language.existentials
import language.postfixOps
/** /**
* Base trait for all cluster messages. All ClusterMessage's are serializable. * Base trait for all cluster messages. All ClusterMessage's are serializable.
@ -104,6 +105,12 @@ private[cluster] object InternalClusterAction {
case object GetClusterCoreRef case object GetClusterCoreRef
/**
* Comand to [[akka.cluster.ClusterDaemon]] to create a
* [[akka.cluster.OnMemberUpListener]].
*/
case class AddOnMemberUpListener(callback: Runnable)
sealed trait SubscriptionMessage sealed trait SubscriptionMessage
case class Subscribe(subscriber: ActorRef, to: Class[_]) extends SubscriptionMessage case class Subscribe(subscriber: ActorRef, to: Class[_]) extends SubscriptionMessage
case class Unsubscribe(subscriber: ActorRef, to: Option[Class[_]]) extends SubscriptionMessage case class Unsubscribe(subscriber: ActorRef, to: Option[Class[_]]) extends SubscriptionMessage
@ -160,6 +167,8 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac
def receive = { def receive = {
case InternalClusterAction.GetClusterCoreRef sender ! core case InternalClusterAction.GetClusterCoreRef sender ! core
case InternalClusterAction.AddOnMemberUpListener(code)
context.actorOf(Props(new OnMemberUpListener(code)))
} }
} }
@ -602,12 +611,18 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
if (localGossip.convergence) { if (localGossip.convergence) {
// we have convergence - so we can't have unreachable nodes // we have convergence - so we can't have unreachable nodes
val numberOfMembers = localMembers.size
def isJoiningToUp(m: Member): Boolean = m.status == Joining && numberOfMembers >= MinNrOfMembers
// transform the node member ring // transform the node member ring
val newMembers = localMembers collect { val newMembers = localMembers collect {
// 1. Move JOINING => UP (once all nodes have seen that this node is JOINING e.g. we have a convergence) // 1. Move JOINING => UP (once all nodes have seen that this node is JOINING, i.e. we have a convergence)
case member if member.status == Joining member copy (status = Up) // and minimum number of nodes have joined the cluster
// 2. Move LEAVING => EXITING (once we have a convergence on LEAVING *and* if we have a successful partition handoff) case member if isJoiningToUp(member) member copy (status = Up)
case member if member.status == Leaving && hasPartionHandoffCompletedSuccessfully member copy (status = Exiting) // 2. Move LEAVING => EXITING (once we have a convergence on LEAVING
// *and* if we have a successful partition handoff)
case member if member.status == Leaving && hasPartionHandoffCompletedSuccessfully
member copy (status = Exiting)
// 3. Everyone else that is not Exiting stays as they are // 3. Everyone else that is not Exiting stays as they are
case member if member.status != Exiting member case member if member.status != Exiting member
// 4. Move EXITING => REMOVED - e.g. remove the nodes from the 'members' set/node ring and seen table // 4. Move EXITING => REMOVED - e.g. remove the nodes from the 'members' set/node ring and seen table
@ -621,10 +636,10 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
// Repeat the checking for transitions between JOINING -> UP, LEAVING -> EXITING, EXITING -> REMOVED // Repeat the checking for transitions between JOINING -> UP, LEAVING -> EXITING, EXITING -> REMOVED
// to check for state-changes and to store away removed and exiting members for later notification // to check for state-changes and to store away removed and exiting members for later notification
// 1. check for state-changes to update // 1. check for state-changes to update
// 2. store away removed and exiting members so we can separate the pure state changes (that can be retried on collision) and the side-effecting message sending // 2. store away removed and exiting members so we can separate the pure state changes
val (removedMembers, newMembers1) = localMembers partition (_.status == Exiting) val (removedMembers, newMembers1) = localMembers partition (_.status == Exiting)
val (upMembers, newMembers2) = newMembers1 partition (_.status == Joining) val (upMembers, newMembers2) = newMembers1 partition (isJoiningToUp(_))
val exitingMembers = newMembers2 filter (_.status == Leaving && hasPartionHandoffCompletedSuccessfully) val exitingMembers = newMembers2 filter (_.status == Leaving && hasPartionHandoffCompletedSuccessfully)
@ -877,6 +892,42 @@ private[cluster] final class ClusterCoreSender extends Actor with ActorLogging {
} }
} }
/**
* INTERNAL API
*
* The supplied callback will be run, once, when current cluster member is `Up`.
*/
private[cluster] class OnMemberUpListener(callback: Runnable) extends Actor with ActorLogging {
import ClusterEvent._
val cluster = Cluster(context.system)
// subscribe to MemberUp, re-subscribe when restart
override def preStart(): Unit =
cluster.subscribe(self, classOf[MemberUp])
override def postStop(): Unit =
cluster.unsubscribe(self)
def receive = {
case state: CurrentClusterState
if (state.members.exists(isSelfUp(_)))
done()
case MemberUp(m)
if (isSelfUp(m))
done()
}
def done(): Unit = {
try callback.run() catch {
case NonFatal(e) log.error(e, "OnMemberUp callback failed with [{}]", e.getMessage)
} finally {
context stop self
}
}
def isSelfUp(m: Member): Boolean =
m.address == cluster.selfAddress && m.status == MemberStatus.Up
}
/** /**
* INTERNAL API * INTERNAL API
*/ */

View file

@ -56,6 +56,10 @@ class ClusterSettings(val config: Config, val systemName: String) {
final val PublishStatsInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.publish-stats-interval"), MILLISECONDS) final val PublishStatsInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.publish-stats-interval"), MILLISECONDS)
final val AutoJoin: Boolean = getBoolean("akka.cluster.auto-join") final val AutoJoin: Boolean = getBoolean("akka.cluster.auto-join")
final val AutoDown: Boolean = getBoolean("akka.cluster.auto-down") final val AutoDown: Boolean = getBoolean("akka.cluster.auto-down")
final val MinNrOfMembers: Int = {
val n = getInt("akka.cluster.min-nr-of-members")
require(n > 0, "min-nr-of-members must be > 0"); n
}
final val JmxEnabled: Boolean = getBoolean("akka.cluster.jmx.enabled") final val JmxEnabled: Boolean = getBoolean("akka.cluster.jmx.enabled")
final val JoinTimeout: FiniteDuration = Duration(getMilliseconds("akka.cluster.join-timeout"), MILLISECONDS) final val JoinTimeout: FiniteDuration = Duration(getMilliseconds("akka.cluster.join-timeout"), MILLISECONDS)
final val UseDispatcher: String = getString("akka.cluster.use-dispatcher") match { final val UseDispatcher: String = getString("akka.cluster.use-dispatcher") match {

View file

@ -0,0 +1,81 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfter
import scala.collection.immutable.SortedSet
import scala.concurrent.duration._
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import java.util.concurrent.atomic.AtomicReference
import akka.actor.Props
import akka.actor.Actor
import akka.cluster.MemberStatus._
object MinMembersBeforeUpMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString("""
# turn off unreachable reaper
akka.cluster.min-nr-of-members = 3""")).
withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))
}
class MinMembersBeforeUpMultiJvmNode1 extends MinMembersBeforeUpSpec
class MinMembersBeforeUpMultiJvmNode2 extends MinMembersBeforeUpSpec
class MinMembersBeforeUpMultiJvmNode3 extends MinMembersBeforeUpSpec
abstract class MinMembersBeforeUpSpec
extends MultiNodeSpec(MinMembersBeforeUpMultiJvmSpec)
with MultiNodeClusterSpec {
import MinMembersBeforeUpMultiJvmSpec._
import ClusterEvent._
"Cluster leader" must {
"wait with moving members to UP until minimum number of members have joined" taggedAs LongRunningTest in {
val onUpLatch = TestLatch(1)
cluster.registerOnMemberUp(onUpLatch.countDown())
runOn(first) {
startClusterNode()
awaitCond(clusterView.status == Joining)
}
enterBarrier("first-started")
onUpLatch.isOpen must be(false)
runOn(second) {
cluster.join(first)
}
runOn(first, second) {
val expectedAddresses = Set(first, second) map address
awaitCond(clusterView.members.map(_.address) == expectedAddresses)
clusterView.members.map(_.status) must be(Set(Joining))
// and it should not change
1 to 5 foreach { _
Thread.sleep(1000)
clusterView.members.map(_.address) must be(expectedAddresses)
clusterView.members.map(_.status) must be(Set(Joining))
}
}
enterBarrier("second-joined")
runOn(third) {
cluster.join(first)
}
awaitClusterUp(first, second, third)
onUpLatch.await
enterBarrier("after-1")
}
}
}

View file

@ -36,6 +36,7 @@ class ClusterConfigSpec extends AkkaSpec {
JoinTimeout must be(60 seconds) JoinTimeout must be(60 seconds)
AutoJoin must be(true) AutoJoin must be(true)
AutoDown must be(false) AutoDown must be(false)
MinNrOfMembers must be(1)
JmxEnabled must be(true) JmxEnabled must be(true)
UseDispatcher must be(Dispatchers.DefaultDispatcherId) UseDispatcher must be(Dispatchers.DefaultDispatcherId)
GossipDifferentViewProbability must be(0.8 plusOrMinus 0.0001) GossipDifferentViewProbability must be(0.8 plusOrMinus 0.0001)

View file

@ -239,6 +239,25 @@ frontend nodes and 3 backend nodes::
.. note:: The above example should probably be designed as two separate, frontend/backend, clusters, when there is a `cluster client for decoupling clusters <https://www.assembla.com/spaces/akka/tickets/1165>`_. .. note:: The above example should probably be designed as two separate, frontend/backend, clusters, when there is a `cluster client for decoupling clusters <https://www.assembla.com/spaces/akka/tickets/1165>`_.
How To Startup when Cluster Size Reached
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
A common use case is to start actors after the cluster has been initialized,
members have joined, and the cluster has reached a certain size.
With a configuration option you can define required number of members
before the leader changes member status of 'Joining' members to 'Up'.
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/resources/factorial.conf#min-nr-of-members
You can start the actors in a ``registerOnMemberUp`` callback, which will
be invoked when the current member status is changed tp 'Up', i.e. the cluster
has at least the defined number of members.
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialFrontendMain.java#registerOnUp
This callback can be used for other things than starting actors.
Failure Detector Failure Detector
^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^

View file

@ -212,6 +212,25 @@ frontend nodes and 3 backend nodes::
.. note:: The above example should probably be designed as two separate, frontend/backend, clusters, when there is a `cluster client for decoupling clusters <https://www.assembla.com/spaces/akka/tickets/1165>`_. .. note:: The above example should probably be designed as two separate, frontend/backend, clusters, when there is a `cluster client for decoupling clusters <https://www.assembla.com/spaces/akka/tickets/1165>`_.
How To Startup when Cluster Size Reached
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
A common use case is to start actors after the cluster has been initialized,
members have joined, and the cluster has reached a certain size.
With a configuration option you can define required number of members
before the leader changes member status of 'Joining' members to 'Up'.
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/resources/factorial.conf#min-nr-of-members
You can start the actors in a ``registerOnMemberUp`` callback, which will
be invoked when the current member status is changed tp 'Up', i.e. the cluster
has at least the defined number of members.
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/factorial/FactorialSample.scala#registerOnUp
This callback can be used for other things than starting actors.
Failure Detector Failure Detector
^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^

View file

@ -1,5 +1,6 @@
package sample.cluster.factorial.japi; package sample.cluster.factorial.japi;
import com.typesafe.config.ConfigFactory;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import akka.actor.Props; import akka.actor.Props;
@ -11,7 +12,7 @@ public class FactorialBackendMain {
if (args.length > 0) if (args.length > 0)
System.setProperty("akka.remote.netty.port", args[0]); System.setProperty("akka.remote.netty.port", args[0]);
ActorSystem system = ActorSystem.create("ClusterSystem"); ActorSystem system = ActorSystem.create("ClusterSystem", ConfigFactory.load("factorial"));
system.actorOf(new Props(FactorialBackend.class), "factorialBackend"); system.actorOf(new Props(FactorialBackend.class), "factorialBackend");

View file

@ -1,27 +1,32 @@
package sample.cluster.factorial.japi; package sample.cluster.factorial.japi;
import akka.actor.ActorRef; import com.typesafe.config.ConfigFactory;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import akka.actor.Props; import akka.actor.Props;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory; import akka.actor.UntypedActorFactory;
import akka.cluster.Cluster;
public class FactorialFrontendMain { public class FactorialFrontendMain {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
final int upToN = (args.length == 0 ? 200 : Integer.valueOf(args[0])); final int upToN = (args.length == 0 ? 200 : Integer.valueOf(args[0]));
ActorSystem system = ActorSystem.create("ClusterSystem"); final ActorSystem system = ActorSystem.create("ClusterSystem", ConfigFactory.load("factorial"));
system.log().info("Factorials will start when 3 members in the cluster.");
// start the calculations when there is at least 2 other members //#registerOnUp
system.actorOf(new Props(new UntypedActorFactory() { Cluster.get(system).registerOnMemberUp(new Runnable() {
@Override @Override
public UntypedActor create() { public void run() {
return new StartupFrontend(upToN); system.actorOf(new Props(new UntypedActorFactory() {
} @Override
}), "startup"); public UntypedActor create() {
return new FactorialFrontend(upToN, true);
}
}), "factorialFrontend");
}
});
//#registerOnUp
} }
} }

View file

@ -1,56 +0,0 @@
package sample.cluster.factorial.japi;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent.CurrentClusterState;
import akka.cluster.ClusterEvent.MemberUp;
import akka.event.Logging;
import akka.event.LoggingAdapter;
public class StartupFrontend extends UntypedActor {
final int upToN;
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
int memberCount = 0;
public StartupFrontend(int upToN) {
this.upToN = upToN;
}
//subscribe to ClusterMetricsChanged
@Override
public void preStart() {
log.info("Factorials will start when 3 members in the cluster.");
Cluster.get(getContext().system()).subscribe(getSelf(), MemberUp.class);
}
@Override
public void onReceive(Object message) {
if (message instanceof CurrentClusterState) {
CurrentClusterState state = (CurrentClusterState) message;
memberCount = state.members().size();
runWhenReady();
} else if (message instanceof MemberUp) {
memberCount++;
runWhenReady();
} else {
unhandled(message);
}
}
void runWhenReady() {
if (memberCount >= 3) {
getContext().system().actorOf(new Props(new UntypedActorFactory() {
@Override
public UntypedActor create() {
return new FactorialFrontend(upToN, true);
}
}), "factorialFrontend");
getContext().stop(getSelf());
}
}
}

View file

@ -0,0 +1,5 @@
include "application"
# //#min-nr-of-members
akka.cluster.min-nr-of-members = 3
# //#min-nr-of-members

View file

@ -3,6 +3,7 @@ package sample.cluster.factorial
//#imports //#imports
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.concurrent.Future import scala.concurrent.Future
import com.typesafe.config.ConfigFactory
import akka.actor.Actor import akka.actor.Actor
import akka.actor.ActorLogging import akka.actor.ActorLogging
import akka.actor.ActorRef import akka.actor.ActorRef
@ -21,32 +22,14 @@ object FactorialFrontend {
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
val upToN = if (args.isEmpty) 200 else args(0).toInt val upToN = if (args.isEmpty) 200 else args(0).toInt
val system = ActorSystem("ClusterSystem") val system = ActorSystem("ClusterSystem", ConfigFactory.load("factorial"))
system.log.info("Factorials will start when 3 members in the cluster.")
// start the calculations when there is at least 2 other members //#registerOnUp
system.actorOf(Props(new Actor with ActorLogging { Cluster(system) registerOnMemberUp {
var memberCount = 0 system.actorOf(Props(new FactorialFrontend(upToN, repeat = true)),
name = "factorialFrontend")
log.info("Factorials will start when 3 members in the cluster.") }
Cluster(context.system).subscribe(self, classOf[MemberUp]) //#registerOnUp
def receive = {
case state: CurrentClusterState
memberCount = state.members.size
runWhenReady()
case MemberUp(member)
memberCount += 1
runWhenReady()
}
def runWhenReady(): Unit = if (memberCount >= 3) {
context.system.actorOf(Props(new FactorialFrontend(upToN, repeat = true)),
name = "factorialFrontend")
context stop self
}
}), name = "startup")
} }
} }
@ -79,7 +62,7 @@ object FactorialBackend {
// when specified as program argument // when specified as program argument
if (args.nonEmpty) System.setProperty("akka.remote.netty.port", args(0)) if (args.nonEmpty) System.setProperty("akka.remote.netty.port", args(0))
val system = ActorSystem("ClusterSystem") val system = ActorSystem("ClusterSystem", ConfigFactory.load("factorial"))
system.actorOf(Props[FactorialBackend], name = "factorialBackend") system.actorOf(Props[FactorialBackend], name = "factorialBackend")
system.actorOf(Props[MetricsListener], name = "metricsListener") system.actorOf(Props[MetricsListener], name = "metricsListener")