diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 53b14e2842..f8d18a516b 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -28,6 +28,12 @@ akka { # formed in case of network partition. auto-down = off + # Minimum required number of members before the leader changes member status + # of 'Joining' members to 'Up'. Typically used together with + # 'Cluster.registerOnMemberUp' to defer some action, such as starting actors, + # until the cluster has reached a certain size. + min-nr-of-members = 1 + # Enable or disable JMX MBeans for management of the cluster jmx.enabled = on diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index e362b4ac34..513d8cbc74 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -232,6 +232,24 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { def down(address: Address): Unit = clusterCore ! ClusterUserAction.Down(address) + /** + * The supplied thunk will be run, once, when current cluster member is `Up`. + * Typically used together with configuration option `akka.cluster.min-nr-of-members' + * to defer some action, such as starting actors, until the cluster has reached + * a certain size. + */ + def registerOnMemberUp[T](code: ⇒ T): Unit = + registerOnMemberUp(new Runnable { def run = code }) + + /** + * The supplied callback will be run, once, when current cluster member is `Up`. + * Typically used together with configuration option `akka.cluster.min-nr-of-members' + * to defer some action, such as starting actors, until the cluster has reached + * a certain size. + * JAVA API + */ + def registerOnMemberUp(callback: Runnable): Unit = clusterDaemons ! InternalClusterAction.AddOnMemberUpListener(callback) + // ======================================================== // ===================== INTERNAL API ===================== // ======================================================== @@ -268,4 +286,3 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { } } - diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 99fa7e2821..2d6dac2568 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -3,9 +3,12 @@ */ package akka.cluster +import language.existentials +import language.postfixOps import scala.collection.immutable import scala.concurrent.duration._ import scala.concurrent.forkjoin.ThreadLocalRandom +import scala.util.control.NonFatal import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, ReceiveTimeout, RootActorPath, Scheduler } import akka.actor.Status.Failure import akka.event.EventStream @@ -13,8 +16,6 @@ import akka.pattern.ask import akka.util.Timeout import akka.cluster.MemberStatus._ import akka.cluster.ClusterEvent._ -import language.existentials -import language.postfixOps /** * Base trait for all cluster messages. All ClusterMessage's are serializable. @@ -104,6 +105,12 @@ private[cluster] object InternalClusterAction { case object GetClusterCoreRef + /** + * Comand to [[akka.cluster.ClusterDaemon]] to create a + * [[akka.cluster.OnMemberUpListener]]. + */ + case class AddOnMemberUpListener(callback: Runnable) + sealed trait SubscriptionMessage case class Subscribe(subscriber: ActorRef, to: Class[_]) extends SubscriptionMessage case class Unsubscribe(subscriber: ActorRef, to: Option[Class[_]]) extends SubscriptionMessage @@ -160,6 +167,8 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac def receive = { case InternalClusterAction.GetClusterCoreRef ⇒ sender ! core + case InternalClusterAction.AddOnMemberUpListener(code) ⇒ + context.actorOf(Props(new OnMemberUpListener(code))) } } @@ -602,12 +611,18 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto if (localGossip.convergence) { // we have convergence - so we can't have unreachable nodes + val numberOfMembers = localMembers.size + def isJoiningToUp(m: Member): Boolean = m.status == Joining && numberOfMembers >= MinNrOfMembers + // transform the node member ring val newMembers = localMembers collect { - // 1. Move JOINING => UP (once all nodes have seen that this node is JOINING e.g. we have a convergence) - case member if member.status == Joining ⇒ member copy (status = Up) - // 2. Move LEAVING => EXITING (once we have a convergence on LEAVING *and* if we have a successful partition handoff) - case member if member.status == Leaving && hasPartionHandoffCompletedSuccessfully ⇒ member copy (status = Exiting) + // 1. Move JOINING => UP (once all nodes have seen that this node is JOINING, i.e. we have a convergence) + // and minimum number of nodes have joined the cluster + case member if isJoiningToUp(member) ⇒ member copy (status = Up) + // 2. Move LEAVING => EXITING (once we have a convergence on LEAVING + // *and* if we have a successful partition handoff) + case member if member.status == Leaving && hasPartionHandoffCompletedSuccessfully ⇒ + member copy (status = Exiting) // 3. Everyone else that is not Exiting stays as they are case member if member.status != Exiting ⇒ member // 4. Move EXITING => REMOVED - e.g. remove the nodes from the 'members' set/node ring and seen table @@ -621,10 +636,10 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto // Repeat the checking for transitions between JOINING -> UP, LEAVING -> EXITING, EXITING -> REMOVED // to check for state-changes and to store away removed and exiting members for later notification // 1. check for state-changes to update - // 2. store away removed and exiting members so we can separate the pure state changes (that can be retried on collision) and the side-effecting message sending + // 2. store away removed and exiting members so we can separate the pure state changes val (removedMembers, newMembers1) = localMembers partition (_.status == Exiting) - val (upMembers, newMembers2) = newMembers1 partition (_.status == Joining) + val (upMembers, newMembers2) = newMembers1 partition (isJoiningToUp(_)) val exitingMembers = newMembers2 filter (_.status == Leaving && hasPartionHandoffCompletedSuccessfully) @@ -877,6 +892,42 @@ private[cluster] final class ClusterCoreSender extends Actor with ActorLogging { } } +/** + * INTERNAL API + * + * The supplied callback will be run, once, when current cluster member is `Up`. + */ +private[cluster] class OnMemberUpListener(callback: Runnable) extends Actor with ActorLogging { + import ClusterEvent._ + val cluster = Cluster(context.system) + // subscribe to MemberUp, re-subscribe when restart + override def preStart(): Unit = + cluster.subscribe(self, classOf[MemberUp]) + override def postStop(): Unit = + cluster.unsubscribe(self) + + def receive = { + case state: CurrentClusterState ⇒ + if (state.members.exists(isSelfUp(_))) + done() + case MemberUp(m) ⇒ + if (isSelfUp(m)) + done() + } + + def done(): Unit = { + try callback.run() catch { + case NonFatal(e) ⇒ log.error(e, "OnMemberUp callback failed with [{}]", e.getMessage) + } finally { + context stop self + } + } + + def isSelfUp(m: Member): Boolean = + m.address == cluster.selfAddress && m.status == MemberStatus.Up + +} + /** * INTERNAL API */ diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index 6861459168..5b5c26ae33 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -56,6 +56,10 @@ class ClusterSettings(val config: Config, val systemName: String) { final val PublishStatsInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.publish-stats-interval"), MILLISECONDS) final val AutoJoin: Boolean = getBoolean("akka.cluster.auto-join") final val AutoDown: Boolean = getBoolean("akka.cluster.auto-down") + final val MinNrOfMembers: Int = { + val n = getInt("akka.cluster.min-nr-of-members") + require(n > 0, "min-nr-of-members must be > 0"); n + } final val JmxEnabled: Boolean = getBoolean("akka.cluster.jmx.enabled") final val JoinTimeout: FiniteDuration = Duration(getMilliseconds("akka.cluster.join-timeout"), MILLISECONDS) final val UseDispatcher: String = getString("akka.cluster.use-dispatcher") match { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MinMembersBeforeUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MinMembersBeforeUpSpec.scala new file mode 100644 index 0000000000..46891bbc49 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MinMembersBeforeUpSpec.scala @@ -0,0 +1,81 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import scala.collection.immutable.SortedSet +import scala.concurrent.duration._ +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import java.util.concurrent.atomic.AtomicReference +import akka.actor.Props +import akka.actor.Actor +import akka.cluster.MemberStatus._ + +object MinMembersBeforeUpMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(""" + # turn off unreachable reaper + akka.cluster.min-nr-of-members = 3""")). + withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) +} + +class MinMembersBeforeUpMultiJvmNode1 extends MinMembersBeforeUpSpec +class MinMembersBeforeUpMultiJvmNode2 extends MinMembersBeforeUpSpec +class MinMembersBeforeUpMultiJvmNode3 extends MinMembersBeforeUpSpec + +abstract class MinMembersBeforeUpSpec + extends MultiNodeSpec(MinMembersBeforeUpMultiJvmSpec) + with MultiNodeClusterSpec { + + import MinMembersBeforeUpMultiJvmSpec._ + import ClusterEvent._ + + "Cluster leader" must { + "wait with moving members to UP until minimum number of members have joined" taggedAs LongRunningTest in { + + val onUpLatch = TestLatch(1) + cluster.registerOnMemberUp(onUpLatch.countDown()) + + runOn(first) { + startClusterNode() + awaitCond(clusterView.status == Joining) + } + enterBarrier("first-started") + + onUpLatch.isOpen must be(false) + + runOn(second) { + cluster.join(first) + } + runOn(first, second) { + val expectedAddresses = Set(first, second) map address + awaitCond(clusterView.members.map(_.address) == expectedAddresses) + clusterView.members.map(_.status) must be(Set(Joining)) + // and it should not change + 1 to 5 foreach { _ ⇒ + Thread.sleep(1000) + clusterView.members.map(_.address) must be(expectedAddresses) + clusterView.members.map(_.status) must be(Set(Joining)) + } + } + enterBarrier("second-joined") + + runOn(third) { + cluster.join(first) + } + awaitClusterUp(first, second, third) + + onUpLatch.await + + enterBarrier("after-1") + } + + } +} diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index a857a3363c..ce7f7a4a70 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -36,6 +36,7 @@ class ClusterConfigSpec extends AkkaSpec { JoinTimeout must be(60 seconds) AutoJoin must be(true) AutoDown must be(false) + MinNrOfMembers must be(1) JmxEnabled must be(true) UseDispatcher must be(Dispatchers.DefaultDispatcherId) GossipDifferentViewProbability must be(0.8 plusOrMinus 0.0001) diff --git a/akka-docs/rst/cluster/cluster-usage-java.rst b/akka-docs/rst/cluster/cluster-usage-java.rst index a799dae457..0b418350b2 100644 --- a/akka-docs/rst/cluster/cluster-usage-java.rst +++ b/akka-docs/rst/cluster/cluster-usage-java.rst @@ -239,6 +239,25 @@ frontend nodes and 3 backend nodes:: .. note:: The above example should probably be designed as two separate, frontend/backend, clusters, when there is a `cluster client for decoupling clusters `_. +How To Startup when Cluster Size Reached +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +A common use case is to start actors after the cluster has been initialized, +members have joined, and the cluster has reached a certain size. + +With a configuration option you can define required number of members +before the leader changes member status of 'Joining' members to 'Up'. + +.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/resources/factorial.conf#min-nr-of-members + +You can start the actors in a ``registerOnMemberUp`` callback, which will +be invoked when the current member status is changed tp 'Up', i.e. the cluster +has at least the defined number of members. + +.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialFrontendMain.java#registerOnUp + +This callback can be used for other things than starting actors. + Failure Detector ^^^^^^^^^^^^^^^^ diff --git a/akka-docs/rst/cluster/cluster-usage-scala.rst b/akka-docs/rst/cluster/cluster-usage-scala.rst index 49d1c3b547..717d084d96 100644 --- a/akka-docs/rst/cluster/cluster-usage-scala.rst +++ b/akka-docs/rst/cluster/cluster-usage-scala.rst @@ -212,6 +212,25 @@ frontend nodes and 3 backend nodes:: .. note:: The above example should probably be designed as two separate, frontend/backend, clusters, when there is a `cluster client for decoupling clusters `_. +How To Startup when Cluster Size Reached +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +A common use case is to start actors after the cluster has been initialized, +members have joined, and the cluster has reached a certain size. + +With a configuration option you can define required number of members +before the leader changes member status of 'Joining' members to 'Up'. + +.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/resources/factorial.conf#min-nr-of-members + +You can start the actors in a ``registerOnMemberUp`` callback, which will +be invoked when the current member status is changed tp 'Up', i.e. the cluster +has at least the defined number of members. + +.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/factorial/FactorialSample.scala#registerOnUp + +This callback can be used for other things than starting actors. + Failure Detector ^^^^^^^^^^^^^^^^ diff --git a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialBackendMain.java b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialBackendMain.java index 4bf907748d..8acb6aad26 100644 --- a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialBackendMain.java +++ b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialBackendMain.java @@ -1,5 +1,6 @@ package sample.cluster.factorial.japi; +import com.typesafe.config.ConfigFactory; import akka.actor.ActorSystem; import akka.actor.Props; @@ -11,7 +12,7 @@ public class FactorialBackendMain { if (args.length > 0) System.setProperty("akka.remote.netty.port", args[0]); - ActorSystem system = ActorSystem.create("ClusterSystem"); + ActorSystem system = ActorSystem.create("ClusterSystem", ConfigFactory.load("factorial")); system.actorOf(new Props(FactorialBackend.class), "factorialBackend"); diff --git a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialFrontendMain.java b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialFrontendMain.java index 8d52bdf54a..e22ad6c2cb 100644 --- a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialFrontendMain.java +++ b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/FactorialFrontendMain.java @@ -1,27 +1,32 @@ package sample.cluster.factorial.japi; -import akka.actor.ActorRef; +import com.typesafe.config.ConfigFactory; import akka.actor.ActorSystem; import akka.actor.Props; import akka.actor.UntypedActor; import akka.actor.UntypedActorFactory; - +import akka.cluster.Cluster; public class FactorialFrontendMain { public static void main(String[] args) throws Exception { final int upToN = (args.length == 0 ? 200 : Integer.valueOf(args[0])); - ActorSystem system = ActorSystem.create("ClusterSystem"); - - // start the calculations when there is at least 2 other members - system.actorOf(new Props(new UntypedActorFactory() { - @Override - public UntypedActor create() { - return new StartupFrontend(upToN); - } - }), "startup"); - + final ActorSystem system = ActorSystem.create("ClusterSystem", ConfigFactory.load("factorial")); + system.log().info("Factorials will start when 3 members in the cluster."); + //#registerOnUp + Cluster.get(system).registerOnMemberUp(new Runnable() { + @Override + public void run() { + system.actorOf(new Props(new UntypedActorFactory() { + @Override + public UntypedActor create() { + return new FactorialFrontend(upToN, true); + } + }), "factorialFrontend"); + } + }); + //#registerOnUp } } diff --git a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/StartupFrontend.java b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/StartupFrontend.java deleted file mode 100644 index 54ca680988..0000000000 --- a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/factorial/japi/StartupFrontend.java +++ /dev/null @@ -1,56 +0,0 @@ -package sample.cluster.factorial.japi; - -import akka.actor.Props; -import akka.actor.UntypedActor; -import akka.actor.UntypedActorFactory; -import akka.cluster.Cluster; -import akka.cluster.ClusterEvent.CurrentClusterState; -import akka.cluster.ClusterEvent.MemberUp; -import akka.event.Logging; -import akka.event.LoggingAdapter; - -public class StartupFrontend extends UntypedActor { - final int upToN; - LoggingAdapter log = Logging.getLogger(getContext().system(), this); - int memberCount = 0; - - public StartupFrontend(int upToN) { - this.upToN = upToN; - } - - //subscribe to ClusterMetricsChanged - @Override - public void preStart() { - log.info("Factorials will start when 3 members in the cluster."); - Cluster.get(getContext().system()).subscribe(getSelf(), MemberUp.class); - } - - @Override - public void onReceive(Object message) { - if (message instanceof CurrentClusterState) { - CurrentClusterState state = (CurrentClusterState) message; - memberCount = state.members().size(); - runWhenReady(); - - } else if (message instanceof MemberUp) { - memberCount++; - runWhenReady(); - - } else { - unhandled(message); - } - - } - - void runWhenReady() { - if (memberCount >= 3) { - getContext().system().actorOf(new Props(new UntypedActorFactory() { - @Override - public UntypedActor create() { - return new FactorialFrontend(upToN, true); - } - }), "factorialFrontend"); - getContext().stop(getSelf()); - } - } -} diff --git a/akka-samples/akka-sample-cluster/src/main/resources/factorial.conf b/akka-samples/akka-sample-cluster/src/main/resources/factorial.conf new file mode 100644 index 0000000000..17e82db15d --- /dev/null +++ b/akka-samples/akka-sample-cluster/src/main/resources/factorial.conf @@ -0,0 +1,5 @@ +include "application" + +# //#min-nr-of-members +akka.cluster.min-nr-of-members = 3 +# //#min-nr-of-members \ No newline at end of file diff --git a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/factorial/FactorialSample.scala b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/factorial/FactorialSample.scala index 9e219a933a..d75dcb96e1 100644 --- a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/factorial/FactorialSample.scala +++ b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/factorial/FactorialSample.scala @@ -3,6 +3,7 @@ package sample.cluster.factorial //#imports import scala.annotation.tailrec import scala.concurrent.Future +import com.typesafe.config.ConfigFactory import akka.actor.Actor import akka.actor.ActorLogging import akka.actor.ActorRef @@ -21,32 +22,14 @@ object FactorialFrontend { def main(args: Array[String]): Unit = { val upToN = if (args.isEmpty) 200 else args(0).toInt - val system = ActorSystem("ClusterSystem") - - // start the calculations when there is at least 2 other members - system.actorOf(Props(new Actor with ActorLogging { - var memberCount = 0 - - log.info("Factorials will start when 3 members in the cluster.") - Cluster(context.system).subscribe(self, classOf[MemberUp]) - - def receive = { - case state: CurrentClusterState ⇒ - memberCount = state.members.size - runWhenReady() - case MemberUp(member) ⇒ - memberCount += 1 - runWhenReady() - } - - def runWhenReady(): Unit = if (memberCount >= 3) { - context.system.actorOf(Props(new FactorialFrontend(upToN, repeat = true)), - name = "factorialFrontend") - context stop self - } - - }), name = "startup") - + val system = ActorSystem("ClusterSystem", ConfigFactory.load("factorial")) + system.log.info("Factorials will start when 3 members in the cluster.") + //#registerOnUp + Cluster(system) registerOnMemberUp { + system.actorOf(Props(new FactorialFrontend(upToN, repeat = true)), + name = "factorialFrontend") + } + //#registerOnUp } } @@ -79,7 +62,7 @@ object FactorialBackend { // when specified as program argument if (args.nonEmpty) System.setProperty("akka.remote.netty.port", args(0)) - val system = ActorSystem("ClusterSystem") + val system = ActorSystem("ClusterSystem", ConfigFactory.load("factorial")) system.actorOf(Props[FactorialBackend], name = "factorialBackend") system.actorOf(Props[MetricsListener], name = "metricsListener")