From adeb4fc8b868b00483dccd701e6d3c13a4739b52 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 13 Sep 2012 10:54:00 +0200 Subject: [PATCH 01/16] Handle empty availableNodes, see #2103 --- .../scala/akka/cluster/routing/ClusterRouterConfig.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala index 89622867af..e2d8b8b204 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala @@ -186,11 +186,12 @@ private[akka] class ClusterRouteeProvider( private def selectDeploymentTarget: Option[Address] = { val currentRoutees = routees - if (currentRoutees.size >= settings.totalInstances) { + val currentNodes = availbleNodes + if (currentRoutees.size >= settings.totalInstances || currentNodes.isEmpty) { None } else { val numberOfRouteesPerNode: Map[Address, Int] = - Map.empty[Address, Int] ++ availbleNodes.toSeq.map(_ -> 0) ++ + Map.empty[Address, Int] ++ currentNodes.toSeq.map(_ -> 0) ++ currentRoutees.groupBy(fullAddress).map { case (address, refs) ⇒ address -> refs.size } From 0524ba0a65d422f560c34530e5c6fccc11d228ab Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 13 Sep 2012 10:54:14 +0200 Subject: [PATCH 02/16] Cluster documentation * 2 Sample applications with main programs to play with, and multi-node tests to illustrate testing, and code snippets for inclusion in rst docs * TransformationSample illustratates subscription to cluster events * StatsSample illustrates usage of cluster aware routers, both lookup and deploy --- .../src/main/resources/reference.conf | 20 +- akka-docs/cluster/cluster-usage.rst | 220 ++++++++++++++- .../src/main/resources/application.conf | 5 +- .../SimpleClusterApp.scala} | 4 +- .../sample/cluster/stats/StatsSample.scala | 254 ++++++++++++++++++ .../transformation/TransformationSample.scala | 108 ++++++++ .../stats/StatsSampleSingleMasterSpec.scala | 101 +++++++ .../cluster/stats/StatsSampleSpec.scala | 92 +++++++ .../TransformationSampleSpec.scala | 115 ++++++++ project/AkkaBuild.scala | 16 +- 10 files changed, 913 insertions(+), 22 deletions(-) rename akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/{ClusterApp.scala => simple/SimpleClusterApp.scala} (94%) create mode 100644 akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala create mode 100644 akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/transformation/TransformationSample.scala create mode 100644 akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala create mode 100644 akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala create mode 100644 akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/transformation/TransformationSampleSpec.scala diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 60e69c1984..141cbcbaec 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -9,7 +9,8 @@ akka { cluster { # Initial contact points of the cluster. Nodes to join at startup if auto-join = on. - # Comma separated full URIs defined by a string on the form of "akka://system@hostname:port" + # Comma separated full URIs defined by a string on the form of + # "akka://system@hostname:port" # Leave as empty if the node should be a singleton cluster. seed-nodes = [] @@ -20,9 +21,10 @@ akka { # If seed-nodes is empty it will join itself and become a single node cluster. auto-join = on - # Should the 'leader' in the cluster be allowed to automatically mark unreachable nodes as DOWN? - # Using auto-down implies that two separate clusters will automatically be formed in case of - # network partition. + # Should the 'leader' in the cluster be allowed to automatically mark unreachable + # nodes as DOWN? + # Using auto-down implies that two separate clusters will automatically be formed + # in case of network partition. auto-down = off # how long should the node wait before starting the periodic tasks maintenance tasks? @@ -37,18 +39,20 @@ akka { # how often should the leader perform maintenance tasks? leader-actions-interval = 1s - # how often should the node move nodes, marked as unreachable by the failure detector, out of the membership ring? + # how often should the node move nodes, marked as unreachable by the failure detector, + # out of the membership ring? unreachable-nodes-reaper-interval = 1s # How often the current internal stats should be published. # A value of 0 s can be used to always publish the stats, when it happens. publish-stats-interval = 10s - # A joining node stops sending heartbeats to the node to join if it hasn't become member - # of the cluster within this deadline. + # A joining node stops sending heartbeats to the node to join if it hasn't + # become member of the cluster within this deadline. join-timeout = 60s - # The id of the dispatcher to use for cluster actors. If not specified default dispatcher is used. + # The id of the dispatcher to use for cluster actors. If not specified + # default dispatcher is used. # If specified you need to define the settings of the actual dispatcher. use-dispatcher = "" diff --git a/akka-docs/cluster/cluster-usage.rst b/akka-docs/cluster/cluster-usage.rst index 9d922aea97..9e079796a7 100644 --- a/akka-docs/cluster/cluster-usage.rst +++ b/akka-docs/cluster/cluster-usage.rst @@ -18,7 +18,8 @@ The Akka cluster is a separate jar file. Make sure that you have the following d "com.typesafe.akka" %% "akka-cluster" % "2.1-SNAPSHOT" -If you are using the latest nightly build you should pick a timestamped Akka version from ``_. +If you are using the latest nightly build you should pick a timestamped Akka version from ``_. Don't use ``SNAPSHOT``. Note that the Scala version |scalaVersion| +is part of the artifactId. A Simple Cluster Example ^^^^^^^^^^^^^^^^^^^^^^^^ @@ -35,7 +36,9 @@ Try it out: :language: none To enable cluster capabilities in your Akka project you should, at a minimum, add the :ref:`remoting-scala` -settings and the ``akka.cluster.seed-nodes`` to your ``application.conf`` file. +settings, but with ``ClusterActorRefProvider``. +The ``akka.cluster.seed-nodes`` and cluster extension should normally also be added to your +``application.conf`` file. The seed nodes are configured contact points for initial, automatic, join of the cluster. @@ -44,20 +47,20 @@ ip-addresses or host names of the machines in ``application.conf`` instead of `` 2. Add the following main program to your project, place it in ``src/main/scala``: -.. literalinclude:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/ClusterApp.scala +.. literalinclude:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/simple/SimpleClusterApp.scala :language: scala 3. Start the first seed node. Open a sbt session in one terminal window and run:: - run-main sample.cluster.ClusterApp 2551 + run-main sample.cluster.simple.SimpleClusterApp 2551 2551 corresponds to the port of the first seed-nodes element in the configuration. In the log output you see that the cluster node has been started and changed status to 'Up'. 4. Start the second seed node. Open a sbt session in another terminal window and run:: - run-main sample.cluster.ClusterApp 2552 + run-main sample.cluster.simple.SimpleClusterApp 2552 2552 corresponds to the port of the second seed-nodes element in the configuration. @@ -68,7 +71,7 @@ Switch over to the first terminal window and see in the log output that the memb 5. Start another node. Open a sbt session in yet another terminal window and run:: - run-main sample.cluster.ClusterApp + run-main sample.cluster.simple.SimpleClusterApp Now you don't need to specify the port number, and it will use a random available port. It joins one of the configured seed nodes. Look at the log output in the different terminal @@ -82,7 +85,7 @@ output in the other terminals. Look at the source code of the program again. What it does is to create an actor and register it as subscriber of certain cluster events. It gets notified with -an snapshot event, 'CurrentClusterState' that holds full state information of +an snapshot event, ``CurrentClusterState`` that holds full state information of the cluster. After that it receives events for changes that happen in the cluster. Automatic vs. Manual Joining @@ -137,6 +140,206 @@ Be aware of that using auto-down implies that two separate clusters will automatically be formed in case of network partition. That might be desired by some applications but not by others. +Subscribe to Cluster Events +^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +You can subscribe to change notifications of the cluster membership by using +``Cluster(system).subscribe``. A snapshot of the full state, +``akka.cluster.ClusterEvent.CurrentClusterState``, is sent to the subscriber +as the first event, followed by events for incremental updates. + +There are several types of change events, consult the API documentation +of classes that extends ``akka.cluster.ClusterEvent.ClusterDomainEvent`` +for details about the events. + +Worker Dial-in Example +---------------------- + +Let's take a look at an example that illustrates how workers, here named *backend*, +can detect and register to new master nodes, here named *frontend*. + +The example application provides a service to transform text. When some text +is sent to one of the frontend services, it will be delegated to one of the +backend workers, which performs the transformation job, and sends the result back to +the original client. New backend nodes, as well as new frontend nodes, can be +added or removed to the cluster dynamically. + +In this example the following imports are used: + +.. includecode:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/transformation/TransformationSample.scala#imports + +Messages: + +.. includecode:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/transformation/TransformationSample.scala#messages + +The backend worker that performs the transformation job: + +.. includecode:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/transformation/TransformationSample.scala#backend + +Note that the ``TransformationBackend`` actor subscribes to cluster events to detect new, +potential, frontend nodes, and send them a registration message so that they know +that they can use the backend worker. + +The frontend that receives user jobs and delegates to one of the registered backend workers: + +.. includecode:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/transformation/TransformationSample.scala#frontend + +Note that the ``TransformationFrontend`` actor watch the registered backend +to be able to remove it from its list of availble backend workers. +Death watch uses the cluster failure detector for nodes in the cluster, i.e. it detects +network failures and JVM crashes, in addition to graceful termination of watched +actor. + +This example is included in ``akka-samples/akka-sample-cluster`` +and you can try by starting nodes in different terminal windows. For example, starting 2 +frontend nodes and 3 backend nodes:: + + sbt + + project akka-sample-cluster-experimental + + run-main sample.cluster.transformation.TransformationFrontend 2551 + + run-main sample.cluster.transformation.TransformationBackend 2552 + + run-main sample.cluster.transformation.TransformationBackend + + run-main sample.cluster.transformation.TransformationBackend + + run-main sample.cluster.transformation.TransformationFrontend + + +.. note:: The above example should probably be designed as two separate, frontend/backend, clusters, when there is a `cluster client for decoupling clusters `_. + +Cluster Aware Routers +^^^^^^^^^^^^^^^^^^^^^ + +All :ref:`routers ` can be made aware of member nodes in the cluster, i.e. +deploying new routees or looking up routees on nodes in the cluster. +When a node becomes unavailble or leaves the cluster the routees of that node are +automatically unregistered from the router. When new nodes join the cluster additional +routees are added to the router, according to the configuration. + +When using a router with routees looked up on the cluster member nodes, i.e. the routees +are already running, the configuration for a router looks like this: + +.. includecode:: ../../akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala#router-lookup-config + +It's the relative actor path defined in ``routees-path`` that identify what actor to lookup. + +``nr-of-instances`` defines total number of routees in the cluster, but there will not be +more than one per node. Setting ``nr-of-instances`` to a high value will result in new routees +added to the router when nodes join the cluster. + +The same type of router could also have been defined in code: + +.. includecode:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#router-lookup-in-code + +When using a router with routees created and deployed on the cluster member nodes +the configuration for a router looks like this: + +.. includecode:: ../../akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala#router-deploy-config + + +``nr-of-instances`` defines total number of routees in the cluster, but the number of routees +per node, ``max-nr-of-instances-per-node``, will not be exceeded. Setting ``nr-of-instances`` +to a high value will result in creating and deploying additional routees when new nodes join +the cluster. + +The same type of router could also have been defined in code: + +.. includecode:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#router-deploy-in-code + +See :ref:`cluster_configuration` section for further descriptions of the settings. + + +Router Example +-------------- + +Let's take a look at how to use cluster aware routers. + +The example application provides a service to calculate statistics for a text. +When some text is sent to the service it splits it into words, and delegates the task +to count number of characters in each word to a separate worker, a routee of a router. +The character count for each word is sent back to an aggregator that calculates +the average number of characters per word when all results have been collected. + +In this example we use the following imports: + +.. includecode:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#imports + +Messages: + +.. includecode:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#messages + +The worker that counts number of characters in each word: + +.. includecode:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#worker + +The service that receives text from users and splits it up into words, delegates to workers and aggregates: + +.. includecode:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#service + + +Note, nothing cluster specific so far, just plain actors. + +We can use these actors with two different types of router setup. Either with lookup of routees, +or with create and deploy of routees. Remember, routees are the workers in this case. + +We start with the router setup with lookup of routees. All nodes start ``StatsService`` and +``StatsWorker`` actors and the router is configured with ``routees-path``: + +.. includecode:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#start-router-lookup + +This means that user requests can be sent to ``StatsService`` on any node and it will use +``StatsWorker`` on all nodes. There can only be one worker per node, but that worker could easily +fan out to local children if more parallelism is needed. + +This example is included in ``akka-samples/akka-sample-cluster`` +and you can try by starting nodes in different terminal windows. For example, starting 3 +service nodes and 1 client:: + + run-main sample.cluster.stats.StatsSample 2551 + + run-main sample.cluster.stats.StatsSample 2552 + + run-main sample.cluster.stats.StatsSampleClient + + run-main sample.cluster.stats.StatsSample + +The above setup is nice for this example, but we will also take a look at how to use +a single master node that creates and deploys workers. To keep track of a single +master we need one additional actor: + +.. includecode:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#facade + +The ``StatsFacade`` receives text from users and delegates to the current ``StatsService``, the single +master. It listens to cluster events to create or lookup the ``StatsService`` depending on if +it is on the same same node or on another node. We run the master on the same node as the leader of +the cluster members, which is nothing more than the address currently sorted first in the member ring, +i.e. it can change when new nodes join or when current leader leaves. + +All nodes start ``StatsFacade`` and the router is now configured like this: + +.. includecode:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#start-router-deploy + + +This example is included in ``akka-samples/akka-sample-cluster`` +and you can try by starting nodes in different terminal windows. For example, starting 3 +service nodes and 1 client:: + + run-main sample.cluster.stats.StatsSampleOneMaster 2551 + + run-main sample.cluster.stats.StatsSampleOneMaster 2552 + + run-main sample.cluster.stats.StatsSampleOneMasterClient + + run-main sample.cluster.stats.StatsSampleOneMaster + +.. note:: The above example, especially the last part, will be simplified when the cluster handles automatic actor partitioning. + +.. _cluster_configuration: + Configuration ^^^^^^^^^^^^^ @@ -147,6 +350,9 @@ reference file for more information: .. literalinclude:: ../../akka-cluster/src/main/resources/reference.conf :language: none +Cluster Scheduler +----------------- + It is recommended that you change the ``tick-duration`` to 33 ms or less of the default scheduler when using cluster, if you don't need to have it configured to a longer duration for other reasons. If you don't do this diff --git a/akka-samples/akka-sample-cluster/src/main/resources/application.conf b/akka-samples/akka-sample-cluster/src/main/resources/application.conf index ee403ff23d..dc0f09445c 100644 --- a/akka-samples/akka-sample-cluster/src/main/resources/application.conf +++ b/akka-samples/akka-sample-cluster/src/main/resources/application.conf @@ -1,9 +1,10 @@ akka { actor { - provider = "akka.remote.RemoteActorRefProvider" + provider = "akka.cluster.ClusterActorRefProvider" } remote { transport = "akka.remote.netty.NettyRemoteTransport" + log-remote-lifecycle-events = off netty { hostname = "127.0.0.1" port = 0 @@ -16,5 +17,7 @@ akka { seed-nodes = [ "akka://ClusterSystem@127.0.0.1:2551", "akka://ClusterSystem@127.0.0.1:2552"] + + auto-down = on } } \ No newline at end of file diff --git a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/ClusterApp.scala b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/simple/SimpleClusterApp.scala similarity index 94% rename from akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/ClusterApp.scala rename to akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/simple/SimpleClusterApp.scala index 0fd396784d..4f69700835 100644 --- a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/ClusterApp.scala +++ b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/simple/SimpleClusterApp.scala @@ -1,10 +1,10 @@ -package sample.cluster +package sample.cluster.simple import akka.actor._ import akka.cluster.Cluster import akka.cluster.ClusterEvent._ -object ClusterApp { +object SimpleClusterApp { def main(args: Array[String]): Unit = { diff --git a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala new file mode 100644 index 0000000000..1f13d7fa7a --- /dev/null +++ b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala @@ -0,0 +1,254 @@ +package sample.cluster.stats + +//#imports +import language.postfixOps +import scala.concurrent.forkjoin.ThreadLocalRandom +import scala.concurrent.util.duration._ +import com.typesafe.config.ConfigFactory +import akka.actor.Actor +import akka.actor.ActorLogging +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.actor.Address +import akka.actor.Props +import akka.actor.ReceiveTimeout +import akka.actor.RelativeActorPath +import akka.actor.RootActorPath +import akka.cluster.Cluster +import akka.cluster.ClusterEvent.CurrentClusterState +import akka.cluster.ClusterEvent.LeaderChanged +import akka.cluster.ClusterEvent.MemberEvent +import akka.cluster.ClusterEvent.MemberUp +import akka.cluster.MemberStatus +import akka.routing.FromConfig +//#imports + +//#messages +case class StatsJob(text: String) +case class StatsResult(meanWordLength: Double) +case class JobFailed(reason: String) +//#messages + +//#service +class StatsService extends Actor { + val workerRouter = context.actorOf(Props[StatsWorker].withRouter(FromConfig), + name = "workerRouter") + + def receive = { + case StatsJob(text) if text != "" ⇒ + val words = text.split(" ") + val replyTo = sender // important to not close over sender + val aggregator = context.actorOf(Props(new StatsAggregator(words.size, replyTo))) + words foreach { word ⇒ workerRouter.tell(word, aggregator) } + } +} + +class StatsAggregator(expectedResults: Int, replyTo: ActorRef) extends Actor { + var results = IndexedSeq.empty[Int] + context.setReceiveTimeout(10 seconds) + + def receive = { + case wordCount: Int ⇒ + results = results :+ wordCount + if (results.size == expectedResults) { + val meanWordLength = results.sum.toDouble / results.size + replyTo ! StatsResult(meanWordLength) + context.stop(self) + } + case ReceiveTimeout ⇒ + replyTo ! JobFailed("Service unavailable, try again later") + context.stop(self) + } +} +//#service + +//#worker +class StatsWorker extends Actor { + // FIXME add a cache here to illustrate consistent hashing + def receive = { + case word: String ⇒ sender ! word.length + } +} +//#worker + +//#facade +class StatsFacade extends Actor with ActorLogging { + val cluster = Cluster(context.system) + + var currentMaster: Option[ActorRef] = None + var currentMasterCreatedByMe = false + + // subscribe to cluster changes, LeaderChanged + // re-subscribe when restart + override def preStart(): Unit = cluster.subscribe(self, classOf[LeaderChanged]) + override def postStop(): Unit = cluster.unsubscribe(self) + + def receive = { + case job: StatsJob if currentMaster.isEmpty ⇒ + sender ! JobFailed("Service unavailable, try again later") + case job: StatsJob ⇒ + currentMaster foreach { _ forward job } + case state: CurrentClusterState ⇒ + if (state.convergence) + state.leader foreach updateCurrentMaster + case LeaderChanged(Some(leaderAddress), true) ⇒ + updateCurrentMaster(leaderAddress) + case other: LeaderChanged ⇒ // ignore, not convergence + } + + def updateCurrentMaster(leaderAddress: Address): Unit = { + if (leaderAddress == cluster.selfAddress) { + if (!currentMasterCreatedByMe) { + log.info("Creating new statsService master at [{}]", leaderAddress) + currentMaster = Some(context.actorOf(Props[StatsService], name = "statsService")) + currentMasterCreatedByMe = true + } + } else { + if (currentMasterCreatedByMe) + currentMaster foreach { context.stop(_) } + log.info("Using statsService master at [{}]", leaderAddress) + currentMaster = Some(context.actorFor( + context.self.path.toStringWithAddress(leaderAddress) + "/statsService")) + currentMasterCreatedByMe = false + } + } + +} +//#facade + +object StatsSample { + def main(args: Array[String]): Unit = { + // Override the configuration of the port + // when specified as program argument + if (args.nonEmpty) System.setProperty("akka.remote.netty.port", args(0)) + + //#start-router-lookup + val system = ActorSystem("ClusterSystem", ConfigFactory.parseString(""" + akka.actor.deployment { + /statsService/workerRouter { + # FIXME use consistent hashing instead + router = round-robin + nr-of-instances = 100 + cluster { + enabled = on + routees-path = "/user/statsWorker" + allow-local-routees = on + } + } + } + """).withFallback(ConfigFactory.load())) + + system.actorOf(Props[StatsWorker], name = "statsWorker") + system.actorOf(Props[StatsService], name = "statsService") + //#start-router-lookup + + } +} + +object StatsSampleOneMaster { + def main(args: Array[String]): Unit = { + // Override the configuration of the port + // when specified as program argument + if (args.nonEmpty) System.setProperty("akka.remote.netty.port", args(0)) + + //#start-router-deploy + val system = ActorSystem("ClusterSystem", ConfigFactory.parseString(""" + akka.actor.deployment { + /statsFacade/statsService/workerRouter { + # FIXME use consistent hashing instead + router = round-robin + nr-of-instances = 100 + cluster { + enabled = on + max-nr-of-instances-per-node = 3 + allow-local-routees = off + } + } + } + """).withFallback(ConfigFactory.load())) + //#start-router-deploy + + system.actorOf(Props[StatsFacade], name = "statsFacade") + } +} + +object StatsSampleClient { + def main(args: Array[String]): Unit = { + val system = ActorSystem("ClusterSystem") + system.actorOf(Props(new StatsSampleClient("/user/statsService")), "client") + } +} + +object StatsSampleOneMasterClient { + def main(args: Array[String]): Unit = { + val system = ActorSystem("ClusterSystem") + system.actorOf(Props(new StatsSampleClient("/user/statsFacade")), "client") + } +} + +class StatsSampleClient(servicePath: String) extends Actor { + val cluster = Cluster(context.system) + val servicePathElements = servicePath match { + case RelativeActorPath(elements) ⇒ elements + case _ ⇒ throw new IllegalArgumentException( + "servicePath [%s] is not a valid relative actor path" format servicePath) + } + import context.dispatcher + val tickTask = context.system.scheduler.schedule(2 seconds, 2 seconds, self, "tick") + + var nodes = Set.empty[Address] + + override def preStart(): Unit = cluster.subscribe(self, classOf[MemberEvent]) + override def postStop(): Unit = { + cluster.unsubscribe(self) + tickTask.cancel() + } + + def receive = { + case "tick" if nodes.nonEmpty ⇒ + // just pick any one + val address = nodes.toIndexedSeq(ThreadLocalRandom.current.nextInt(nodes.size)) + context.actorFor(RootActorPath(address) / servicePathElements) ! + StatsJob("this is the text that will be analyzed") + case result: StatsResult ⇒ + println(result) + case failed: JobFailed ⇒ + println(failed) + case state: CurrentClusterState ⇒ + nodes = state.members.collect { case m if m.status == MemberStatus.Up ⇒ m.address } + case MemberUp(m) ⇒ nodes += m.address + case other: MemberEvent ⇒ nodes -= other.member.address + } + +} + +// not used, only for documentation +abstract class StatsService2 extends Actor { + //#router-lookup-in-code + import akka.cluster.routing.ClusterRouterConfig + import akka.cluster.routing.ClusterRouterSettings + import akka.routing.RoundRobinRouter + + val workerRouter = context.actorOf(Props[StatsWorker].withRouter( + ClusterRouterConfig(RoundRobinRouter(), ClusterRouterSettings( + totalInstances = 100, routeesPath = "/user/statsWorker", + allowLocalRoutees = true))), + name = "workerRouter2") + //#router-lookup-in-code +} + +// not used, only for documentation +abstract class StatsService3 extends Actor { + //#router-deploy-in-code + import akka.cluster.routing.ClusterRouterConfig + import akka.cluster.routing.ClusterRouterSettings + // FIXME use ConsistentHashingRouter instead + import akka.routing.RoundRobinRouter + + val workerRouter = context.actorOf(Props[StatsWorker].withRouter( + ClusterRouterConfig(RoundRobinRouter(), ClusterRouterSettings( + totalInstances = 100, maxInstancesPerNode = 3, + allowLocalRoutees = false))), + name = "workerRouter3") + //#router-deploy-in-code +} diff --git a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/transformation/TransformationSample.scala b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/transformation/TransformationSample.scala new file mode 100644 index 0000000000..e26bcd245d --- /dev/null +++ b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/transformation/TransformationSample.scala @@ -0,0 +1,108 @@ +package sample.cluster.transformation + +//#imports +import language.postfixOps +import scala.concurrent.util.duration._ + +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.actor.Props +import akka.actor.RootActorPath +import akka.actor.Terminated +import akka.cluster.Cluster +import akka.cluster.ClusterEvent.CurrentClusterState +import akka.cluster.ClusterEvent.MemberEvent +import akka.cluster.ClusterEvent.MemberUp +import akka.cluster.Member +import akka.cluster.MemberStatus +import akka.pattern.ask +import akka.util.Timeout +//#imports + +//#messages +case class TransformationJob(text: String) +case class TransformationResult(text: String) +case class JobFailed(reason: String, job: TransformationJob) +case object BackendRegistration +//#messages + +object TransformationFrontend { + def main(args: Array[String]): Unit = { + // Override the configuration of the port + // when specified as program argument + if (args.nonEmpty) System.setProperty("akka.remote.netty.port", args(0)) + + val system = ActorSystem("ClusterSystem") + val frontend = system.actorOf(Props[TransformationFrontend], name = "frontend") + + import system.dispatcher + implicit val timeout = Timeout(5 seconds) + for (n ← 1 to 120) { + (frontend ? TransformationJob("hello-" + n)) onSuccess { + case result ⇒ println(result) + } + Thread.sleep(2000) + } + system.shutdown() + } +} + +//#frontend +class TransformationFrontend extends Actor { + + var backends = IndexedSeq.empty[ActorRef] + var jobCounter = 0 + + def receive = { + case job: TransformationJob if backends.isEmpty ⇒ + sender ! JobFailed("Service unavailable, try again later", job) + + case job: TransformationJob ⇒ + jobCounter += 1 + backends(jobCounter % backends.size) forward job + + case BackendRegistration if !backends.contains(sender) ⇒ + context watch sender + backends = backends :+ sender + + case Terminated(a) ⇒ backends.filterNot(_ == a) + } +} +//#frontend + +object TransformationBackend { + def main(args: Array[String]): Unit = { + // Override the configuration of the port + // when specified as program argument + if (args.nonEmpty) System.setProperty("akka.remote.netty.port", args(0)) + + val system = ActorSystem("ClusterSystem") + system.actorOf(Props[TransformationBackend], name = "backend") + } +} + +//#backend +class TransformationBackend extends Actor { + + val cluster = Cluster(context.system) + + // subscribe to cluster changes, MemberEvent + // re-subscribe when restart + override def preStart(): Unit = cluster.subscribe(self, classOf[MemberEvent]) + override def postStop(): Unit = cluster.unsubscribe(self) + + def receive = { + case TransformationJob(text) ⇒ sender ! TransformationResult(text.toUpperCase) + case state: CurrentClusterState ⇒ + state.members.filter(_.status == MemberStatus.Up) foreach register + case MemberUp(m) ⇒ register(m) + } + + // try to register to all nodes, even though there + // might not be any frontend on all nodes + def register(member: Member): Unit = + context.actorFor(RootActorPath(member.address) / "user" / "frontend") ! + BackendRegistration +} +//#backend \ No newline at end of file diff --git a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala new file mode 100644 index 0000000000..9c7315c902 --- /dev/null +++ b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala @@ -0,0 +1,101 @@ +package sample.cluster.stats + +import language.postfixOps +import scala.concurrent.util.duration._ + +import com.typesafe.config.ConfigFactory + +import StatsSampleSpec.first +import StatsSampleSpec.third +import akka.actor.Props +import akka.actor.RootActorPath +import akka.cluster.Cluster +import akka.cluster.Member +import akka.cluster.MemberStatus +import akka.cluster.ClusterEvent.CurrentClusterState +import akka.cluster.ClusterEvent.MemberUp +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit.ImplicitSender + +object StatsSampleSingleMasterSpec extends MultiNodeConfig { + // register the named roles (nodes) of the test + val first = role("first") + val second = role("second") + val third = role("thrid") + + // this configuration will be used for all nodes + // note that no fixed host names and ports are used + commonConfig(ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.remote.log-remote-lifecycle-events = off + akka.cluster.auto-join = off + #//#router-deploy-config + akka.actor.deployment { + /statsFacade/statsService/workerRouter { + # FIXME use consistent hashing instead + router = round-robin + nr-of-instances = 100 + cluster { + enabled = on + max-nr-of-instances-per-node = 3 + allow-local-routees = off + } + } + } + #//#router-deploy-config + """)) + +} + +// need one concrete test class per node +class StatsSampleSingleMasterMultiJvmNode1 extends StatsSampleSingleMasterSpec +class StatsSampleSingleMasterMultiJvmNode2 extends StatsSampleSingleMasterSpec +class StatsSampleSingleMasterMultiJvmNode3 extends StatsSampleSingleMasterSpec + +abstract class StatsSampleSingleMasterSpec extends MultiNodeSpec(StatsSampleSingleMasterSpec) + with ImplicitSender { + + import StatsSampleSpec._ + + override def initialParticipants = roles.size + + "The stats sample with single master" must { + "illustrate how to startup cluster" in within(10 seconds) { + Cluster(system).subscribe(testActor, classOf[MemberUp]) + expectMsgClass(classOf[CurrentClusterState]) + + Cluster(system) join node(first).address + system.actorOf(Props[StatsFacade], "statsFacade") + + expectMsgAllOf( + MemberUp(Member(node(first).address, MemberStatus.Up)), + MemberUp(Member(node(second).address, MemberStatus.Up)), + MemberUp(Member(node(third).address, MemberStatus.Up))) + + Cluster(system).unsubscribe(testActor) + + testConductor.enter("all-up") + } + + "show usage of the statsFacade" in within(5 seconds) { + val facade = system.actorFor(RootActorPath(node(third).address) / "user" / "statsFacade") + + // eventually the service should be ok, + // worker nodes might not be up yet + awaitCond { + facade ! StatsJob("this is the text that will be analyzed") + expectMsgPF() { + case unavailble: JobFailed ⇒ false + case StatsResult(meanWordLength) ⇒ + meanWordLength must be(3.875 plusOrMinus 0.001) + true + } + } + + testConductor.enter("done") + } + } + +} \ No newline at end of file diff --git a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala new file mode 100644 index 0000000000..1b0a9e08b8 --- /dev/null +++ b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala @@ -0,0 +1,92 @@ +package sample.cluster.stats + +import language.postfixOps +import scala.concurrent.util.duration._ +import com.typesafe.config.ConfigFactory + +import akka.actor.Props +import akka.actor.RootActorPath +import akka.cluster.Cluster +import akka.cluster.Member +import akka.cluster.MemberStatus +import akka.cluster.ClusterEvent.CurrentClusterState +import akka.cluster.ClusterEvent.MemberUp +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit.ImplicitSender + +object StatsSampleSpec extends MultiNodeConfig { + // register the named roles (nodes) of the test + val first = role("first") + val second = role("second") + val third = role("thrid") + + // this configuration will be used for all nodes + // note that no fixed host names and ports are used + commonConfig(ConfigFactory.parseString(""" + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.remote.log-remote-lifecycle-events = off + akka.cluster.auto-join = off + #//#router-lookup-config + akka.actor.deployment { + /statsService/workerRouter { + # FIXME use consistent hashing instead + router = round-robin + nr-of-instances = 100 + cluster { + enabled = on + routees-path = "/user/statsWorker" + allow-local-routees = on + } + } + } + #//#router-lookup-config + """)) + +} + +// need one concrete test class per node +class StatsSampleMultiJvmNode1 extends StatsSampleSpec +class StatsSampleMultiJvmNode2 extends StatsSampleSpec +class StatsSampleMultiJvmNode3 extends StatsSampleSpec + +abstract class StatsSampleSpec extends MultiNodeSpec(StatsSampleSpec) + with ImplicitSender { + + import StatsSampleSpec._ + + override def initialParticipants = roles.size + + "The stats sample" must { + "illustrate how to startup cluster" in within(10 seconds) { + Cluster(system).subscribe(testActor, classOf[MemberUp]) + expectMsgClass(classOf[CurrentClusterState]) + + Cluster(system) join node(first).address + system.actorOf(Props[StatsWorker], "statsWorker") + system.actorOf(Props[StatsService], "statsService") + + expectMsgAllOf( + MemberUp(Member(node(first).address, MemberStatus.Up)), + MemberUp(Member(node(second).address, MemberStatus.Up)), + MemberUp(Member(node(third).address, MemberStatus.Up))) + + Cluster(system).unsubscribe(testActor) + + testConductor.enter("all-up") + } + + "show usage of the statsService" in within(5 seconds) { + + val service = system.actorFor(RootActorPath(node(third).address) / "user" / "statsService") + service ! StatsJob("this is the text that will be analyzed") + val meanWordLength = expectMsgPF() { + case StatsResult(meanWordLength) ⇒ meanWordLength + } + meanWordLength must be(3.875 plusOrMinus 0.001) + + testConductor.enter("done") + } + } + +} \ No newline at end of file diff --git a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/transformation/TransformationSampleSpec.scala b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/transformation/TransformationSampleSpec.scala new file mode 100644 index 0000000000..a18a4a4896 --- /dev/null +++ b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/transformation/TransformationSampleSpec.scala @@ -0,0 +1,115 @@ +package sample.cluster.transformation + +import language.postfixOps +import scala.concurrent.util.duration._ + +import com.typesafe.config.ConfigFactory + +import akka.actor.Props +import akka.cluster.Cluster +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit.ImplicitSender + +object TransformationSampleSpec extends MultiNodeConfig { + // register the named roles (nodes) of the test + val frontend1 = role("frontend1") + val frontend2 = role("frontend2") + val backend1 = role("backend1") + val backend2 = role("backend2") + val backend3 = role("backend3") + + // this configuration will be used for all nodes + // note that no fixed host names and ports are used + commonConfig(ConfigFactory.parseString(""" + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.remote.log-remote-lifecycle-events = off + akka.cluster.auto-join = off + """)) + +} + +// need one concrete test class per node +class TransformationSampleMultiJvmNode1 extends TransformationSampleSpec +class TransformationSampleMultiJvmNode2 extends TransformationSampleSpec +class TransformationSampleMultiJvmNode3 extends TransformationSampleSpec +class TransformationSampleMultiJvmNode4 extends TransformationSampleSpec +class TransformationSampleMultiJvmNode5 extends TransformationSampleSpec + +abstract class TransformationSampleSpec extends MultiNodeSpec(TransformationSampleSpec) + with ImplicitSender { + + import TransformationSampleSpec._ + + override def initialParticipants = roles.size + + "The transformation sample" must { + "illustrate how to start first frontend" in { + runOn(frontend1) { + // this will only run on the 'first' node + Cluster(system) join node(frontend1).address + val transformationFrontend = system.actorOf(Props[TransformationFrontend], name = "frontend") + transformationFrontend ! TransformationJob("hello") + expectMsgPF() { + // no backends yet, service unavailble + case JobFailed(_, TransformationJob("hello")) ⇒ + } + } + + // this will run on all nodes + // use barrier to coordinate test steps + testConductor.enter("frontend1-started") + } + + "illustrate how a backend automatically registers" in within(15 seconds) { + runOn(backend1) { + Cluster(system) join node(frontend1).address + system.actorOf(Props[TransformationBackend], name = "backend") + } + testConductor.enter("backend1-started") + + runOn(frontend1) { + assertServiceOk + } + + testConductor.enter("frontend1-backend1-ok") + } + + "illustrate how more nodes registers" in within(15 seconds) { + runOn(frontend2) { + Cluster(system) join node(frontend1).address + system.actorOf(Props[TransformationFrontend], name = "frontend") + } + runOn(backend2, backend3) { + Cluster(system) join node(backend1).address + system.actorOf(Props[TransformationBackend], name = "backend") + } + + testConductor.enter("all-started") + + runOn(frontend1, frontend2) { + assertServiceOk + } + + testConductor.enter("all-ok") + + } + + } + + def assertServiceOk: Unit = { + val transformationFrontend = system.actorFor("akka://" + system.name + "/user/frontend") + // eventually the service should be ok, + // backends might not have registered initially + awaitCond { + transformationFrontend ! TransformationJob("hello") + expectMsgPF() { + case unavailble: JobFailed ⇒ false + case TransformationResult(result) ⇒ + result must be("HELLO") + true + } + } + } + +} \ No newline at end of file diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 8c102ec39f..7e5f6fb067 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -326,11 +326,19 @@ object AkkaBuild extends Build { ) lazy val clusterSample = Project( - id = "akka-sample-cluster", + id = "akka-sample-cluster-experimental", base = file("akka-samples/akka-sample-cluster"), - dependencies = Seq(cluster), - settings = defaultSettings - ) + dependencies = Seq(cluster, remoteTests % "compile;test->test;multi-jvm->multi-jvm", testkit % "test->test"), + settings = defaultSettings ++ multiJvmSettings ++ Seq( + // disable parallel tests + parallelExecution in Test := false, + extraOptions in MultiJvm <<= (sourceDirectory in MultiJvm) { src => + (name: String) => (src ** (name + ".conf")).get.headOption.map("-Dakka.config=" + _.absolutePath).toSeq + }, + scalatestOptions in MultiJvm := defaultMultiJvmScalatestOptions, + jvmOptions in MultiJvm := defaultMultiJvmOptions + ) + ) configs (MultiJvm) lazy val docs = Project( id = "akka-docs", From da3f08cf41dc280267717892ef0ad94bc3f8174d Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 17 Sep 2012 08:42:47 +0200 Subject: [PATCH 03/16] Simplify selectDeploymentTarget * Break the loop immediately when no more targets --- .../cluster/routing/ClusterRouterConfig.scala | 43 ++++++++++++------- 1 file changed, 28 insertions(+), 15 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala index e2d8b8b204..c99ae68704 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala @@ -32,6 +32,7 @@ import akka.routing.RemoteRouterConfig import akka.actor.RootActorPath import akka.actor.ActorCell import akka.actor.RelativeActorPath +import scala.annotation.tailrec /** * [[akka.routing.RouterConfig]] implementation for deployment on cluster nodes. @@ -168,18 +169,26 @@ private[akka] class ClusterRouteeProvider( * to use for cluster routers. */ override def createRoutees(nrOfInstances: Int): Unit = { - for (i ← 1 to settings.totalInstances; target ← selectDeploymentTarget) { - val ref = - if (settings.isRouteesPathDefined) { - context.actorFor(RootActorPath(target) / settings.routeesPathElements) - } else { - val name = "c" + childNameCounter.incrementAndGet - val deploy = Deploy("", ConfigFactory.empty(), routeeProps.routerConfig, RemoteScope(target)) - context.asInstanceOf[ActorCell].attachChild(routeeProps.withDeploy(deploy), name, systemService = false) - } - // must register each one, since registered routees are used in selectDeploymentTarget - registerRoutees(Some(ref)) + @tailrec + def doCreateRoutees(): Unit = selectDeploymentTarget match { + case None ⇒ // done + case Some(target) ⇒ + val ref = + if (settings.isRouteesPathDefined) { + context.actorFor(RootActorPath(target) / settings.routeesPathElements) + } else { + val name = "c" + childNameCounter.incrementAndGet + val deploy = Deploy("", ConfigFactory.empty(), routeeProps.routerConfig, RemoteScope(target)) + context.asInstanceOf[ActorCell].attachChild(routeeProps.withDeploy(deploy), name, systemService = false) + } + // must register each one, since registered routees are used in selectDeploymentTarget + registerRoutees(Some(ref)) + + // recursion until all created + doCreateRoutees() } + + doCreateRoutees() } private[routing] def createRoutees(): Unit = createRoutees(settings.totalInstances) @@ -187,14 +196,18 @@ private[akka] class ClusterRouteeProvider( private def selectDeploymentTarget: Option[Address] = { val currentRoutees = routees val currentNodes = availbleNodes - if (currentRoutees.size >= settings.totalInstances || currentNodes.isEmpty) { + if (currentNodes.isEmpty || currentRoutees.size >= settings.totalInstances) { None } else { + // find the node with least routees val numberOfRouteesPerNode: Map[Address, Int] = - Map.empty[Address, Int] ++ currentNodes.toSeq.map(_ -> 0) ++ - currentRoutees.groupBy(fullAddress).map { - case (address, refs) ⇒ address -> refs.size + currentRoutees.foldLeft(currentNodes.map(_ -> 0).toMap) { (acc, x) ⇒ + val address = fullAddress(x) + acc.get(address) match { + case Some(count) ⇒ acc + (address -> (count + 1)) + case None ⇒ acc + (address -> 1) } + } val (address, count) = numberOfRouteesPerNode.minBy(_._2) if (count < settings.maxInstancesPerNode) Some(address) else None From 175dd4c547471a47858eb1e11105c2fd7c280659 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 18 Sep 2012 10:49:41 +0200 Subject: [PATCH 04/16] Minor cleanup --- .../src/main/scala/sample/cluster/stats/StatsSample.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala index 1f13d7fa7a..0ba5a19b3e 100644 --- a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala +++ b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala @@ -208,8 +208,8 @@ class StatsSampleClient(servicePath: String) extends Actor { case "tick" if nodes.nonEmpty ⇒ // just pick any one val address = nodes.toIndexedSeq(ThreadLocalRandom.current.nextInt(nodes.size)) - context.actorFor(RootActorPath(address) / servicePathElements) ! - StatsJob("this is the text that will be analyzed") + val service = context.actorFor(RootActorPath(address) / servicePathElements) + service ! StatsJob("this is the text that will be analyzed") case result: StatsResult ⇒ println(result) case failed: JobFailed ⇒ From c0c6cc3931fe4b40176f4a9c04e52649914eef84 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 18 Sep 2012 14:19:38 +0200 Subject: [PATCH 05/16] Publish cluster LeaderChanged only when convergence, see #2518 --- .../scala/akka/cluster/ClusterEvent.scala | 28 ++++-- .../scala/akka/cluster/ClusterReadView.scala | 10 +- .../ClusterDomainEventPublisherSpec.scala | 96 +++++++++++++++++++ .../akka/cluster/ClusterDomainEventSpec.scala | 9 +- .../sample/cluster/stats/StatsSample.scala | 6 +- 5 files changed, 127 insertions(+), 22 deletions(-) create mode 100644 akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index 4bb0105413..fd7bfa0de9 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -94,9 +94,9 @@ object ClusterEvent { case class ConvergenceChanged(convergence: Boolean) extends ClusterDomainEvent /** - * Leader of the cluster members changed, and/or convergence status. + * Leader of the cluster members changed. Only published after convergence. */ - case class LeaderChanged(leader: Option[Address], convergence: Boolean) extends ClusterDomainEvent + case class LeaderChanged(leader: Option[Address]) extends ClusterDomainEvent /** * INTERNAL API @@ -150,7 +150,7 @@ object ClusterEvent { val convergenceEvents = if (convergenceChanged) Seq(ConvergenceChanged(newConvergence)) else Seq.empty val leaderEvents = - if (convergenceChanged || newGossip.leader != oldGossip.leader) Seq(LeaderChanged(newGossip.leader, newConvergence)) + if (newGossip.leader != oldGossip.leader) Seq(LeaderChanged(newGossip.leader)) else Seq.empty val newSeenBy = newGossip.seenBy @@ -159,7 +159,7 @@ object ClusterEvent { else Seq.empty memberEvents.toIndexedSeq ++ unreachableEvents ++ downedEvents ++ unreachableDownedEvents ++ removedEvents ++ - convergenceEvents ++ leaderEvents ++ seenEvents + leaderEvents ++ convergenceEvents ++ seenEvents } } @@ -173,6 +173,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto import InternalClusterAction._ var latestGossip: Gossip = Gossip() + var stashedLeaderChanged: Option[LeaderChanged] = None def receive = { case PublishChanges(oldGossip, newGossip) ⇒ publishChanges(oldGossip, newGossip) @@ -201,11 +202,22 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto // keep the latestGossip to be sent to new subscribers latestGossip = newGossip diff(oldGossip, newGossip) foreach { event ⇒ - eventStream publish event - // notify DeathWatch about unreachable node event match { - case MemberUnreachable(m) ⇒ eventStream publish AddressTerminated(m.address) - case _ ⇒ + case LeaderChanged(_) if oldGossip.convergence && newGossip.convergence ⇒ + stashedLeaderChanged = None + eventStream publish event + case x: LeaderChanged ⇒ + // publish later, when convergence + stashedLeaderChanged = Some(x) + case ConvergenceChanged(true) ⇒ + stashedLeaderChanged foreach { eventStream publish _ } + stashedLeaderChanged = None + eventStream publish event + case MemberUnreachable(m) ⇒ + eventStream publish event + // notify DeathWatch about unreachable node + eventStream publish AddressTerminated(m.address) + case _ ⇒ eventStream publish event } } } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala index ff827574d7..0aa9e6997e 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala @@ -52,11 +52,11 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { case event: MemberEvent ⇒ // replace current member with new member (might have different status, only address is used in equals) state = state.copy(members = state.members - event.member + event.member) - case LeaderChanged(leader, convergence) ⇒ state = state.copy(leader = leader, convergence = convergence) - case ConvergenceChanged(convergence) ⇒ state = state.copy(convergence = convergence) - case s: CurrentClusterState ⇒ state = s - case CurrentInternalStats(stats) ⇒ _latestStats = stats - case _ ⇒ // ignore, not interesting + case LeaderChanged(leader) ⇒ state = state.copy(leader = leader) + case ConvergenceChanged(convergence) ⇒ state = state.copy(convergence = convergence) + case s: CurrentClusterState ⇒ state = s + case CurrentInternalStats(stats) ⇒ _latestStats = stats + case _ ⇒ // ignore, not interesting } }).withDispatcher(cluster.settings.UseDispatcher), name = "clusterEventBusListener") } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala new file mode 100644 index 0000000000..d7c76270f3 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala @@ -0,0 +1,96 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import language.postfixOps +import scala.collection.immutable.SortedSet +import scala.concurrent.util.duration._ +import akka.actor.Address +import akka.actor.Props +import akka.cluster.MemberStatus._ +import akka.cluster.InternalClusterAction._ +import akka.cluster.ClusterEvent._ +import akka.testkit.AkkaSpec +import akka.testkit.ImplicitSender + +object ClusterDomainEventPublisherSpec { + val config = """ + akka.cluster.auto-join = off + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.remote.log-remote-lifecycle-events = off + akka.remote.netty.port = 0 + """ + + case class GossipTo(address: Address) +} + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublisherSpec.config) with ImplicitSender { + import ClusterDomainEventPublisherSpec._ + + val publisher = system.actorOf(Props[ClusterDomainEventPublisher], name = "test-publisher") + val a1 = Member(Address("akka", "sys", "a", 2552), Up) + val b1 = Member(Address("akka", "sys", "b", 2552), Up) + val c1 = Member(Address("akka", "sys", "c", 2552), Joining) + val c2 = Member(Address("akka", "sys", "c", 2552), Up) + val d1 = Member(Address("akka", "sys", "a", 2551), Up) + + val g0 = Gossip(members = SortedSet(a1)).seen(a1.address) + val g1 = Gossip(members = SortedSet(a1, b1, c1)).seen(a1.address).seen(b1.address).seen(c1.address) + val g2 = Gossip(members = SortedSet(a1, b1, c2)).seen(a1.address) + val g3 = g2.seen(b1.address).seen(c2.address) + val g4 = Gossip(members = SortedSet(d1, a1, b1, c2)).seen(a1.address) + val g5 = Gossip(members = SortedSet(d1, a1, b1, c2)).seen(a1.address).seen(b1.address).seen(c2.address).seen(d1.address) + + "ClusterDomainEventPublisher" must { + + "send snapshot when starting subscription" in { + publisher ! PublishChanges(g0, g1) + publisher ! Subscribe(testActor, classOf[ClusterDomainEvent]) + val state = expectMsgType[CurrentClusterState] + state.members must be(g1.members) + state.convergence must be(true) + } + + "publish MemberUp when member status changed to Up" in { + publisher ! PublishChanges(g1, g2) + expectMsg(MemberUp(c2)) + expectMsg(ConvergenceChanged(false)) + expectMsgType[SeenChanged] + } + + "publish convergence true when all seen it" in { + publisher ! PublishChanges(g2, g3) + expectMsg(ConvergenceChanged(true)) + expectMsgType[SeenChanged] + } + + "publish leader changed when new leader and after convergence" in { + publisher ! PublishChanges(g3, g4) + expectMsg(MemberUp(d1)) + expectMsg(ConvergenceChanged(false)) + expectMsgType[SeenChanged] + + publisher ! PublishChanges(g4, g5) + expectMsg(LeaderChanged(Some(d1.address))) + expectMsg(ConvergenceChanged(true)) + expectMsgType[SeenChanged] + + // convergence both before and after + publisher ! PublishChanges(g3, g5) + expectMsg(MemberUp(d1)) + expectMsg(LeaderChanged(Some(d1.address))) + expectMsgType[SeenChanged] + expectNoMsg(1 second) + + // not convergence + publisher ! PublishChanges(g2, g4) + expectMsg(MemberUp(d1)) + expectNoMsg(1 second) + } + + } + +} diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala index 1bbffca3c2..3a4e3ee3a4 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala @@ -69,10 +69,9 @@ class ClusterDomainEventSpec extends WordSpec with MustMatchers { val g1 = Gossip(members = SortedSet(a1, b1, e1)).seen(a1.address).seen(b1.address).seen(e1.address) val g2 = Gossip(members = SortedSet(a1, b1, e1)).seen(a1.address).seen(b1.address) - // LeaderChanged is also published when convergence changed - diff(g1, g2) must be(Seq(ConvergenceChanged(false), LeaderChanged(Some(a1.address), convergence = false), + diff(g1, g2) must be(Seq(ConvergenceChanged(false), SeenChanged(convergence = false, seenBy = Set(a1.address, b1.address)))) - diff(g2, g1) must be(Seq(ConvergenceChanged(true), LeaderChanged(Some(a1.address), convergence = true), + diff(g2, g1) must be(Seq(ConvergenceChanged(true), SeenChanged(convergence = true, seenBy = Set(a1.address, b1.address, e1.address)))) } @@ -81,8 +80,8 @@ class ClusterDomainEventSpec extends WordSpec with MustMatchers { val g2 = Gossip(members = SortedSet(b1, e1), overview = GossipOverview(unreachable = Set(a1))) val g3 = g2.copy(overview = GossipOverview()).seen(b1.address).seen(e1.address) - diff(g1, g2) must be(Seq(MemberUnreachable(a1), LeaderChanged(Some(b1.address), convergence = false))) - diff(g2, g3) must be(Seq(ConvergenceChanged(true), LeaderChanged(Some(b1.address), convergence = true), + diff(g1, g2) must be(Seq(MemberUnreachable(a1), LeaderChanged(Some(b1.address)))) + diff(g2, g3) must be(Seq(ConvergenceChanged(true), SeenChanged(convergence = true, seenBy = Set(b1.address, e1.address)))) } } diff --git a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala index 0ba5a19b3e..351b5d0a48 100644 --- a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala +++ b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala @@ -89,11 +89,9 @@ class StatsFacade extends Actor with ActorLogging { case job: StatsJob ⇒ currentMaster foreach { _ forward job } case state: CurrentClusterState ⇒ - if (state.convergence) - state.leader foreach updateCurrentMaster - case LeaderChanged(Some(leaderAddress), true) ⇒ + state.leader foreach updateCurrentMaster + case LeaderChanged(Some(leaderAddress)) ⇒ updateCurrentMaster(leaderAddress) - case other: LeaderChanged ⇒ // ignore, not convergence } def updateCurrentMaster(leaderAddress: Address): Unit = { From 718686e2f2d266dff3ae595ad230684867868934 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 19 Sep 2012 10:18:55 +0200 Subject: [PATCH 06/16] Add another test case for publish of LeaderChanged, see #2518 * It didn't handle convergence changes with same leader correctly --- .../scala/akka/cluster/ClusterEvent.scala | 19 +++++-- .../ClusterDomainEventPublisherSpec.scala | 51 ++++++++++++++----- 2 files changed, 53 insertions(+), 17 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index fd7bfa0de9..17988fd7ab 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -174,6 +174,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto var latestGossip: Gossip = Gossip() var stashedLeaderChanged: Option[LeaderChanged] = None + var publishedLeaderChanged: Option[LeaderChanged] = None def receive = { case PublishChanges(oldGossip, newGossip) ⇒ publishChanges(oldGossip, newGossip) @@ -203,16 +204,26 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto latestGossip = newGossip diff(oldGossip, newGossip) foreach { event ⇒ event match { - case LeaderChanged(_) if oldGossip.convergence && newGossip.convergence ⇒ + case x @ LeaderChanged(_) if Some(x) == publishedLeaderChanged ⇒ + // skip, this leader has already been published + + case x @ LeaderChanged(_) if oldGossip.convergence && newGossip.convergence ⇒ stashedLeaderChanged = None - eventStream publish event + publishedLeaderChanged = Some(x) + eventStream publish x + case x: LeaderChanged ⇒ // publish later, when convergence stashedLeaderChanged = Some(x) + case ConvergenceChanged(true) ⇒ - stashedLeaderChanged foreach { eventStream publish _ } - stashedLeaderChanged = None + stashedLeaderChanged foreach { + publishedLeaderChanged = stashedLeaderChanged + stashedLeaderChanged = None + eventStream publish _ + } eventStream publish event + case MemberUnreachable(m) ⇒ eventStream publish event // notify DeathWatch about unreachable node diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala index d7c76270f3..c29f237be7 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala @@ -7,6 +7,7 @@ package akka.cluster import language.postfixOps import scala.collection.immutable.SortedSet import scala.concurrent.util.duration._ +import org.scalatest.BeforeAndAfterEach import akka.actor.Address import akka.actor.Props import akka.cluster.MemberStatus._ @@ -14,6 +15,7 @@ import akka.cluster.InternalClusterAction._ import akka.cluster.ClusterEvent._ import akka.testkit.AkkaSpec import akka.testkit.ImplicitSender +import akka.actor.ActorRef object ClusterDomainEventPublisherSpec { val config = """ @@ -27,10 +29,11 @@ object ClusterDomainEventPublisherSpec { } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublisherSpec.config) with ImplicitSender { +class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublisherSpec.config) + with BeforeAndAfterEach with ImplicitSender { import ClusterDomainEventPublisherSpec._ - val publisher = system.actorOf(Props[ClusterDomainEventPublisher], name = "test-publisher") + var publisher: ActorRef = _ val a1 = Member(Address("akka", "sys", "a", 2552), Up) val b1 = Member(Address("akka", "sys", "b", 2552), Up) val c1 = Member(Address("akka", "sys", "c", 2552), Joining) @@ -44,15 +47,18 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish val g4 = Gossip(members = SortedSet(d1, a1, b1, c2)).seen(a1.address) val g5 = Gossip(members = SortedSet(d1, a1, b1, c2)).seen(a1.address).seen(b1.address).seen(c2.address).seen(d1.address) - "ClusterDomainEventPublisher" must { + override def beforeEach(): Unit = { + publisher = system.actorOf(Props[ClusterDomainEventPublisher]) + publisher ! Subscribe(testActor, classOf[ClusterDomainEvent]) + expectMsgType[CurrentClusterState] + } - "send snapshot when starting subscription" in { - publisher ! PublishChanges(g0, g1) - publisher ! Subscribe(testActor, classOf[ClusterDomainEvent]) - val state = expectMsgType[CurrentClusterState] - state.members must be(g1.members) - state.convergence must be(true) - } + override def afterEach(): Unit = { + publisher ! Unsubscribe(testActor) + system.stop(publisher) + } + + "ClusterDomainEventPublisher" must { "publish MemberUp when member status changed to Up" in { publisher ! PublishChanges(g1, g2) @@ -67,30 +73,49 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish expectMsgType[SeenChanged] } - "publish leader changed when new leader and after convergence" in { + "publish leader changed when new leader after convergence" in { publisher ! PublishChanges(g3, g4) expectMsg(MemberUp(d1)) expectMsg(ConvergenceChanged(false)) expectMsgType[SeenChanged] + expectNoMsg(1 second) publisher ! PublishChanges(g4, g5) expectMsg(LeaderChanged(Some(d1.address))) expectMsg(ConvergenceChanged(true)) expectMsgType[SeenChanged] + } + "publish leader changed when new leader and convergence both before and after" in { // convergence both before and after publisher ! PublishChanges(g3, g5) expectMsg(MemberUp(d1)) expectMsg(LeaderChanged(Some(d1.address))) expectMsgType[SeenChanged] - expectNoMsg(1 second) + } - // not convergence + "not publish leader changed when not convergence" in { publisher ! PublishChanges(g2, g4) expectMsg(MemberUp(d1)) expectNoMsg(1 second) } + "not publish leader changed when changed convergence but still same leader" in { + publisher ! PublishChanges(g2, g5) + expectMsg(MemberUp(d1)) + expectMsg(LeaderChanged(Some(d1.address))) + expectMsg(ConvergenceChanged(true)) + expectMsgType[SeenChanged] + + publisher ! PublishChanges(g5, g4) + expectMsg(ConvergenceChanged(false)) + expectMsgType[SeenChanged] + + publisher ! PublishChanges(g4, g5) + expectMsg(ConvergenceChanged(true)) + expectMsgType[SeenChanged] + } + } } From 8ee48e7a051933a669daa1858ebbeb55d80acf45 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 19 Sep 2012 15:50:33 +0200 Subject: [PATCH 07/16] Remove duplicated multi-node-log-replace, it's in scripts/ --- project/scripts/multi-node-log-replace | 11 ----------- 1 file changed, 11 deletions(-) delete mode 100755 project/scripts/multi-node-log-replace diff --git a/project/scripts/multi-node-log-replace b/project/scripts/multi-node-log-replace deleted file mode 100755 index 83f1b8a136..0000000000 --- a/project/scripts/multi-node-log-replace +++ /dev/null @@ -1,11 +0,0 @@ -#!/usr/bin/env bash -# -# Utility to make log files from multi-node tests easier to analyze. -# Replaces jvm names and host:port with corresponding logical role name. -# - - -# check for an sbt command -type -P sbt &> /dev/null || fail "sbt command not found" - -sbt "project akka-remote-tests" "test:run-main akka.remote.testkit.LogRoleReplace $1 $2" \ No newline at end of file From 7b06ee57057f6e2a2c6bfe3eba6e92019c0684f1 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 19 Sep 2012 16:46:39 +0200 Subject: [PATCH 08/16] Cluster doc of JMX and admin script, see #2014 --- akka-docs/cluster/cluster-usage.rst | 64 +++++++++++++++++++++- akka-kernel/src/main/dist/bin/akka-cluster | 32 +++++------ 2 files changed, 78 insertions(+), 18 deletions(-) diff --git a/akka-docs/cluster/cluster-usage.rst b/akka-docs/cluster/cluster-usage.rst index 9e079796a7..6087e1b637 100644 --- a/akka-docs/cluster/cluster-usage.rst +++ b/akka-docs/cluster/cluster-usage.rst @@ -113,7 +113,7 @@ You can disable automatic joining with configuration: akka.cluster.auto-join = off -Then you need to join manually, using JMX or the provided script. +Then you need to join manually, using :ref:`cluster_jmx` or :ref:`cluster_command_line`. You can join to any node in the cluster. It doesn't have to be configured as seed node. If you are not using auto-join there is no need to configure seed nodes at all. @@ -128,7 +128,8 @@ When a member is considered by the failure detector to be unreachable the leader is not allowed to perform its duties, such as changing status of new joining members to 'Up'. The status of the unreachable member must be changed to 'Down'. This can be performed automatically or manually. By -default it must be done manually, using using JMX or the provided script. +default it must be done manually, using using :ref:`cluster_jmx` or +:ref:`cluster_command_line`. It can also be performed programatically with ``Cluster(system).down``. @@ -338,6 +339,65 @@ service nodes and 1 client:: .. note:: The above example, especially the last part, will be simplified when the cluster handles automatic actor partitioning. +.. _cluster_jmx: + +JMX +^^^ + +Information and management of the cluster is available as JMX MBeans with the root name ``akka.Cluster``. +The JMX information can be displayed with an ordinary JMX console such as JConsole or JVisualVM. + +From JMX you can: + +* see what members that are part of the cluster +* see status of this node +* join this node to another node in cluster +* mark any node in the cluster as down +* tell any node in the cluster to leave + +Member nodes are identified with their address, in format `akka://actor-system-name@hostname:port`. + +.. _cluster_command_line: + +Command Line Management +^^^^^^^^^^^^^^^^^^^^^^^ + +The cluster can be managed with the script `bin/akka-cluster` provided in the +Akka distribution. + +Run it without parameters to see instructions about how to use the script:: + + Usage: bin/akka-cluster ... + + Supported commands are: + join - Sends request a JOIN node with the specified URL + leave - Sends a request for node with URL to LEAVE the cluster + down - Sends a request for marking node with URL as DOWN + member-status - Asks the member node for its current status + cluster-status - Asks the cluster for its current status (member ring, + unavailable nodes, meta data etc.) + leader - Asks the cluster who the current leader is + is-singleton - Checks if the cluster is a singleton cluster (single + node cluster) + is-available - Checks if the member node is available + is-running - Checks if the member node is running + has-convergence - Checks if there is a cluster convergence + Where the should be on the format of 'akka://actor-system-name@hostname:port' + + Examples: bin/akka-cluster localhost:9999 is-available + bin/akka-cluster localhost:9999 join akka://MySystem@darkstar:2552 + bin/akka-cluster localhost:9999 cluster-status + + +To be able to use the script you must enable remote monitoring and management when starting the JVMs of the cluster nodes, +as described in `Monitoring and Management Using JMX Technology `_ + +Example of system properties to enable remote monitoring and management:: + + java -Dcom.sun.management.jmxremote.port=9999 \ + -Dcom.sun.management.jmxremote.authenticate=false \ + -Dcom.sun.management.jmxremote.ssl=false + .. _cluster_configuration: Configuration diff --git a/akka-kernel/src/main/dist/bin/akka-cluster b/akka-kernel/src/main/dist/bin/akka-cluster index fe3af38449..0cbff520dd 100755 --- a/akka-kernel/src/main/dist/bin/akka-cluster +++ b/akka-kernel/src/main/dist/bin/akka-cluster @@ -27,7 +27,7 @@ HOST=$1 # cluster node:port to talk to through JMX function ensureNodeIsRunningAndAvailable { REPLY=$($JMX_CLIENT $HOST akka:type=Cluster Available 2>&1 >/dev/null) # redirects STDERR to STDOUT before capturing it if [[ "$REPLY" != *true ]]; then - echo "Akka cluster node is not available on $HOST" + echo "Akka cluster node is not available on $HOST, due to $REPLY" exit 1 fi } @@ -37,7 +37,7 @@ case "$2" in join) if [ $# -ne 3 ]; then - echo "Usage: $SELF join " + echo "Usage: $SELF join " exit 1 fi @@ -51,7 +51,7 @@ case "$2" in leave) if [ $# -ne 3 ]; then - echo "Usage: $SELF leave " + echo "Usage: $SELF leave " exit 1 fi @@ -65,7 +65,7 @@ case "$2" in down) if [ $# -ne 3 ]; then - echo "Usage: $SELF down " + echo "Usage: $SELF down " exit 1 fi @@ -164,7 +164,7 @@ case "$2" in ensureNodeIsRunningAndAvailable shift - echo "Checking if member node on $HOST is AVAILABLE" + echo "Checking if member node on $HOST is RUNNING" $JMX_CLIENT $HOST akka:type=Cluster Running ;; @@ -172,17 +172,17 @@ case "$2" in printf "Usage: bin/$SELF ...\n" printf "\n" printf "Supported commands are:\n" - printf "%26s - %s\n" "join " "Sends request a JOIN node with the specified URL" - printf "%26s - %s\n" "leave " "Sends a request for node with URL to LEAVE the cluster" - printf "%26s - %s\n" "down " "Sends a request for marking node with URL as DOWN" - printf "%26s - %s\n" member-status "Asks the member node for its current status" - printf "%26s - %s\n" cluster-status "Asks the cluster for its current status (member ring, unavailable nodes, meta data etc.)" - printf "%26s - %s\n" leader "Asks the cluster who the current leader is" - printf "%26s - %s\n" is-singleton "Checks if the cluster is a singleton cluster (single node cluster)" - printf "%26s - %s\n" is-available "Checks if the member node is available" - printf "%26s - %s\n" is-running "Checks if the member node is running" - printf "%26s - %s\n" has-convergence "Checks if there is a cluster convergence" - printf "Where the should be on the format of 'akka://actor-system-name@hostname:port'\n" + printf "%26s - %s\n" "join " "Sends request a JOIN node with the specified URL" + printf "%26s - %s\n" "leave " "Sends a request for node with URL to LEAVE the cluster" + printf "%26s - %s\n" "down " "Sends a request for marking node with URL as DOWN" + printf "%26s - %s\n" member-status "Asks the member node for its current status" + printf "%26s - %s\n" cluster-status "Asks the cluster for its current status (member ring, unavailable nodes, meta data etc.)" + printf "%26s - %s\n" leader "Asks the cluster who the current leader is" + printf "%26s - %s\n" is-singleton "Checks if the cluster is a singleton cluster (single node cluster)" + printf "%26s - %s\n" is-available "Checks if the member node is available" + printf "%26s - %s\n" is-running "Checks if the member node is running" + printf "%26s - %s\n" has-convergence "Checks if there is a cluster convergence" + printf "Where the should be on the format of 'akka://actor-system-name@hostname:port'\n" printf "\n" printf "Examples: bin/$SELF localhost:9999 is-available\n" printf " bin/$SELF localhost:9999 join akka://MySystem@darkstar:2552\n" From 068335789cd6762bd4f148f9dae6f40a7d362af2 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 20 Sep 2012 08:09:01 +0200 Subject: [PATCH 09/16] Cluster config setting to disable jmx, see #2531 --- akka-cluster/src/main/resources/reference.conf | 3 +++ akka-cluster/src/main/scala/akka/cluster/Cluster.scala | 9 ++++++--- .../src/main/scala/akka/cluster/ClusterSettings.scala | 1 + .../scala/akka/cluster/MultiNodeClusterSpec.scala | 1 + .../src/test/scala/akka/cluster/ClusterConfigSpec.scala | 1 + 5 files changed, 12 insertions(+), 3 deletions(-) diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 141cbcbaec..fa6860a1a8 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -27,6 +27,9 @@ akka { # in case of network partition. auto-down = off + # Enable or disable JMX MBeans for management of the cluster + jmx.enabled = on + # how long should the node wait before starting the periodic tasks maintenance tasks? periodic-tasks-initial-delay = 1s diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 647f335b8c..09feaeb656 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -160,8 +160,11 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { system.registerOnTermination(shutdown()) - private val clusterJmx = new ClusterJmx(this, log) - clusterJmx.createMBean() + private val clusterJmx: Option[ClusterJmx] = { + val jmx = new ClusterJmx(this, log) + jmx.createMBean() + Some(jmx) + } log.info("Cluster Node [{}] - has started up successfully", selfAddress) @@ -237,7 +240,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { scheduler.close() - clusterJmx.unregisterMBean() + clusterJmx foreach { _.unregisterMBean() } log.info("Cluster Node [{}] - Cluster node successfully shut down", selfAddress) } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index e37d4abc72..4212e59c1c 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -36,6 +36,7 @@ class ClusterSettings(val config: Config, val systemName: String) { final val PublishStatsInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.publish-stats-interval"), MILLISECONDS) final val AutoJoin: Boolean = getBoolean("akka.cluster.auto-join") final val AutoDown: Boolean = getBoolean("akka.cluster.auto-down") + final val JmxEnabled: Boolean = getBoolean("akka.cluster.jmx.enabled") final val JoinTimeout: FiniteDuration = Duration(getMilliseconds("akka.cluster.join-timeout"), MILLISECONDS) final val UseDispatcher: String = getString("akka.cluster.use-dispatcher") match { case "" ⇒ Dispatchers.DefaultDispatcherId diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index af47d869dc..f4594543e6 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -33,6 +33,7 @@ object MultiNodeClusterSpec { akka.cluster { auto-join = on auto-down = off + jmx.enabled = off gossip-interval = 200 ms heartbeat-interval = 400 ms leader-actions-interval = 200 ms diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 2d7565f5f5..f8c5571a57 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -35,6 +35,7 @@ class ClusterConfigSpec extends AkkaSpec { JoinTimeout must be(60 seconds) AutoJoin must be(true) AutoDown must be(false) + JmxEnabled must be(true) UseDispatcher must be(Dispatchers.DefaultDispatcherId) GossipDifferentViewProbability must be(0.8 plusOrMinus 0.0001) MaxGossipMergeRate must be(5.0 plusOrMinus 0.0001) From 6cf638815f8d65052cdb5362a91699e3f9cf3901 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 20 Sep 2012 08:09:46 +0200 Subject: [PATCH 10/16] Disable remote logging --- .../src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala | 1 + .../src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala | 1 + 2 files changed, 2 insertions(+) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala index c92ff0eafb..0d122938ab 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala @@ -52,6 +52,7 @@ object LargeClusterMultiJvmSpec extends MultiNodeConfig { parallelism-max = 13 } akka.scheduler.tick-duration = 33 ms + akka.remote.log-remote-lifecycle-events = off akka.remote.netty.execution-pool-size = 4 #akka.remote.netty.reconnection-time-window = 1s akka.remote.netty.backoff-timeout = 500ms diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala index 768546a532..40f46ffbbc 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala @@ -29,6 +29,7 @@ object SunnyWeatherMultiJvmSpec extends MultiNodeConfig { auto-join = off } akka.loglevel = INFO + akka.remote.log-remote-lifecycle-events = off """)) } From ab8a690c65fa67d5183bb8615f2fda1dc417d6a1 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 20 Sep 2012 08:44:44 +0200 Subject: [PATCH 11/16] Use Either for LeaderChanged state, see #2518 --- .../scala/akka/cluster/ClusterEvent.scala | 34 +++++++++++++------ 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index 17988fd7ab..60172ed6a4 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -173,8 +173,14 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto import InternalClusterAction._ var latestGossip: Gossip = Gossip() - var stashedLeaderChanged: Option[LeaderChanged] = None - var publishedLeaderChanged: Option[LeaderChanged] = None + + // Keep track of LeaderChanged event. Should not be published until + // convergence, and it should only be published when leader actually + // changed to another node. 3 states: + // - None: No LeaderChanged detected yet, nothing published yet + // - Some(Left): Stashed LeaderChanged to be published later, when convergence + // - Some(Right): Latest published LeaderChanged + var leaderChangedState: Option[Either[LeaderChanged, LeaderChanged]] = None def receive = { case PublishChanges(oldGossip, newGossip) ⇒ publishChanges(oldGossip, newGossip) @@ -204,23 +210,26 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto latestGossip = newGossip diff(oldGossip, newGossip) foreach { event ⇒ event match { - case x @ LeaderChanged(_) if Some(x) == publishedLeaderChanged ⇒ + case x @ LeaderChanged(_) if leaderChangedState == Some(Right(x)) ⇒ // skip, this leader has already been published case x @ LeaderChanged(_) if oldGossip.convergence && newGossip.convergence ⇒ - stashedLeaderChanged = None - publishedLeaderChanged = Some(x) + // leader changed and immediate convergence + leaderChangedState = Some(Right(x)) eventStream publish x case x: LeaderChanged ⇒ // publish later, when convergence - stashedLeaderChanged = Some(x) + leaderChangedState = Some(Left(x)) case ConvergenceChanged(true) ⇒ - stashedLeaderChanged foreach { - publishedLeaderChanged = stashedLeaderChanged - stashedLeaderChanged = None - eventStream publish _ + // now it's convergence, publish eventual stashed LeaderChanged event + leaderChangedState match { + case Some(Left(x)) ⇒ + leaderChangedState = Some(Right(x)) + eventStream publish x + + case _ ⇒ // nothing stashed } eventStream publish event @@ -228,7 +237,10 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto eventStream publish event // notify DeathWatch about unreachable node eventStream publish AddressTerminated(m.address) - case _ ⇒ eventStream publish event + + case _ ⇒ + // all other events + eventStream publish event } } } From a790f5bb32f18e7b5668f8db50b090a743e5d20b Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 20 Sep 2012 08:50:12 +0200 Subject: [PATCH 12/16] Use named parameters to Deploy, avoid strange path param --- .../main/scala/akka/cluster/routing/ClusterRouterConfig.scala | 3 ++- .../src/main/scala/akka/routing/RemoteRouterConfig.scala | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala index c99ae68704..99415e9838 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala @@ -178,7 +178,8 @@ private[akka] class ClusterRouteeProvider( context.actorFor(RootActorPath(target) / settings.routeesPathElements) } else { val name = "c" + childNameCounter.incrementAndGet - val deploy = Deploy("", ConfigFactory.empty(), routeeProps.routerConfig, RemoteScope(target)) + val deploy = Deploy(config = ConfigFactory.empty(), routerConfig = routeeProps.routerConfig, + scope = RemoteScope(target)) context.asInstanceOf[ActorCell].attachChild(routeeProps.withDeploy(deploy), name, systemService = false) } // must register each one, since registered routees are used in selectDeploymentTarget diff --git a/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala b/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala index 7075aa5ea7..1d05dae3bc 100644 --- a/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala +++ b/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala @@ -78,7 +78,8 @@ final class RemoteRouteeProvider(nodes: Iterable[Address], _context: ActorContex override def createRoutees(nrOfInstances: Int): Unit = { val refs = IndexedSeq.fill(nrOfInstances) { val name = "c" + childNameCounter.incrementAndGet - val deploy = Deploy("", ConfigFactory.empty(), routeeProps.routerConfig, RemoteScope(nodeAddressIter.next)) + val deploy = Deploy(config = ConfigFactory.empty(), routerConfig = routeeProps.routerConfig, + scope = RemoteScope(nodeAddressIter.next)) // attachChild means that the provider will treat this call as if possibly done out of the wrong // context and use RepointableActorRef instead of LocalActorRef. Seems like a slightly sub-optimal From 6dbe1c00c49800455edee4fb6aa091b1a75531de Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 20 Sep 2012 09:24:06 +0200 Subject: [PATCH 13/16] Further improvement of selectDeploymentTarget based on feedback --- .../scala/akka/cluster/routing/ClusterRouterConfig.scala | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala index 99415e9838..d3a0b9cb88 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala @@ -202,12 +202,9 @@ private[akka] class ClusterRouteeProvider( } else { // find the node with least routees val numberOfRouteesPerNode: Map[Address, Int] = - currentRoutees.foldLeft(currentNodes.map(_ -> 0).toMap) { (acc, x) ⇒ + currentRoutees.foldLeft(currentNodes.map(_ -> 0).toMap.withDefault(_ ⇒ 0)) { (acc, x) ⇒ val address = fullAddress(x) - acc.get(address) match { - case Some(count) ⇒ acc + (address -> (count + 1)) - case None ⇒ acc + (address -> 1) - } + acc + (address -> (acc(address) + 1)) } val (address, count) = numberOfRouteesPerNode.minBy(_._2) From bc34adf6246c2368974a31e97f824022cee7dff4 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 20 Sep 2012 12:35:03 +0200 Subject: [PATCH 14/16] Test consistent hashing router with cluster router, see #944 * Found and fixed issue with NoRouter which is used in the combination of FromConfig and cluster.enabled=true * Multi-node test that tests several of the possible cominations --- .../routing/ConsistentHashingRouter.scala | 2 +- .../ClusterConsistentHashingRouterSpec.scala | 182 ++++++++++++++++++ 2 files changed, 183 insertions(+), 1 deletion(-) create mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterConsistentHashingRouterSpec.scala diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala index 3b9802d7fd..cdfd040ace 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala @@ -205,7 +205,7 @@ case class ConsistentHashingRouter( * that can't be defined in configuration. */ override def withFallback(other: RouterConfig): RouterConfig = other match { - case _: FromConfig ⇒ this + case _: FromConfig | _: NoRouter ⇒ this case otherRouter: ConsistentHashingRouter ⇒ val useResizer = if (this.resizer.isEmpty && otherRouter.resizer.isDefined) otherRouter.resizer diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterConsistentHashingRouterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterConsistentHashingRouterSpec.scala new file mode 100644 index 0000000000..c39edd8a13 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterConsistentHashingRouterSpec.scala @@ -0,0 +1,182 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster.routing + +import scala.concurrent.Await +import scala.concurrent.util.duration._ + +import com.typesafe.config.ConfigFactory + +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.Address +import akka.actor.Props +import akka.cluster.MultiNodeClusterSpec +import akka.pattern.ask +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.routing.ConsistentHashingRouter +import akka.routing.ConsistentHashingRouter.ConsistentHashMapping +import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope +import akka.routing.CurrentRoutees +import akka.routing.FromConfig +import akka.routing.RouterRoutees +import akka.testkit._ + +object ClusterConsistentHashingRouterMultiJvmSpec extends MultiNodeConfig { + + class Echo extends Actor { + def receive = { + case _ ⇒ sender ! self + } + } + + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig(debugConfig(on = false). + withFallback(ConfigFactory.parseString(""" + common-router-settings = { + router = consistent-hashing + nr-of-instances = 10 + cluster { + enabled = on + max-nr-of-instances-per-node = 2 + } + } + + akka.actor.deployment { + /router1 = ${common-router-settings} + /router3 = ${common-router-settings} + /router4 = ${common-router-settings} + } + """)). + withFallback(MultiNodeClusterSpec.clusterConfig)) + +} + +class ClusterConsistentHashingRouterMultiJvmNode1 extends ClusterConsistentHashingRouterSpec +class ClusterConsistentHashingRouterMultiJvmNode2 extends ClusterConsistentHashingRouterSpec +class ClusterConsistentHashingRouterMultiJvmNode3 extends ClusterConsistentHashingRouterSpec + +abstract class ClusterConsistentHashingRouterSpec extends MultiNodeSpec(ClusterConsistentHashingRouterMultiJvmSpec) + with MultiNodeClusterSpec + with ImplicitSender with DefaultTimeout { + import ClusterConsistentHashingRouterMultiJvmSpec._ + + lazy val router1 = system.actorOf(Props[Echo].withRouter(FromConfig()), "router1") + + def currentRoutees(router: ActorRef) = + Await.result(router ? CurrentRoutees, remaining).asInstanceOf[RouterRoutees].routees + + /** + * Fills in self address for local ActorRef + */ + private def fullAddress(actorRef: ActorRef): Address = actorRef.path.address match { + case Address(_, _, None, None) ⇒ cluster.selfAddress + case a ⇒ a + } + + "A cluster router with a consistent hashing router" must { + "start cluster with 2 nodes" taggedAs LongRunningTest in { + awaitClusterUp(first, second) + enterBarrier("after-1") + } + + "create routees from configuration" in { + runOn(first) { + awaitCond { + // it may take some time until router receives cluster member events + currentRoutees(router1).size == 4 + } + currentRoutees(router1).map(fullAddress).toSet must be(Set(address(first), address(second))) + } + enterBarrier("after-2") + } + + "select destination based on hashKey" in { + runOn(first) { + router1 ! ConsistentHashableEnvelope(message = "A", hashKey = "a") + val destinationA = expectMsgType[ActorRef] + router1 ! ConsistentHashableEnvelope(message = "AA", hashKey = "a") + expectMsg(destinationA) + } + enterBarrier("after-2") + } + + "deploy routees to new member nodes in the cluster" taggedAs LongRunningTest in { + + awaitClusterUp(first, second, third) + + runOn(first) { + awaitCond { + // it may take some time until router receives cluster member events + currentRoutees(router1).size == 6 + } + currentRoutees(router1).map(fullAddress).toSet must be(roles.map(address).toSet) + } + + enterBarrier("after-3") + } + + "deploy programatically defined routees to the member nodes in the cluster" taggedAs LongRunningTest in { + runOn(first) { + val router2 = system.actorOf(Props[Echo].withRouter(ClusterRouterConfig(local = ConsistentHashingRouter(), + settings = ClusterRouterSettings(totalInstances = 10, maxInstancesPerNode = 2))), "router2") + awaitCond { + // it may take some time until router receives cluster member events + currentRoutees(router2).size == 6 + } + currentRoutees(router2).map(fullAddress).toSet must be(roles.map(address).toSet) + } + + enterBarrier("after-4") + } + + "handle combination of configured router and programatically defined hashMapping" taggedAs LongRunningTest in { + runOn(first) { + def hashMapping: ConsistentHashMapping = { + case s: String ⇒ s + } + + val router3 = system.actorOf(Props[Echo].withRouter(ConsistentHashingRouter(hashMapping = hashMapping)), "router3") + + assertHashMapping(router3) + } + + enterBarrier("after-5") + } + + "handle combination of configured router and programatically defined hashMapping and ClusterRouterConfig" taggedAs LongRunningTest in { + runOn(first) { + def hashMapping: ConsistentHashMapping = { + case s: String ⇒ s + } + + val router4 = system.actorOf(Props[Echo].withRouter(ClusterRouterConfig( + local = ConsistentHashingRouter(hashMapping = hashMapping), + settings = ClusterRouterSettings(totalInstances = 10, maxInstancesPerNode = 1))), "router4") + + assertHashMapping(router4) + } + + enterBarrier("after-6") + } + + def assertHashMapping(router: ActorRef): Unit = { + awaitCond { + // it may take some time until router receives cluster member events + currentRoutees(router).size == 6 + } + currentRoutees(router).map(fullAddress).toSet must be(roles.map(address).toSet) + + router ! "a" + val destinationA = expectMsgType[ActorRef] + router ! "a" + expectMsg(destinationA) + } + + } +} From c7b966b4e70d65e012fc76a1d649eacffc4f485b Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 20 Sep 2012 12:58:51 +0200 Subject: [PATCH 15/16] Switch to ConistentHashingRouter in cluster sample --- .../sample/cluster/stats/StatsSample.scala | 34 ++++++++++++------- .../stats/StatsSampleSingleMasterSpec.scala | 3 +- .../cluster/stats/StatsSampleSpec.scala | 3 +- 3 files changed, 24 insertions(+), 16 deletions(-) diff --git a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala index 351b5d0a48..8a847d0b05 100644 --- a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala +++ b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala @@ -21,6 +21,7 @@ import akka.cluster.ClusterEvent.MemberEvent import akka.cluster.ClusterEvent.MemberUp import akka.cluster.MemberStatus import akka.routing.FromConfig +import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope //#imports //#messages @@ -39,7 +40,10 @@ class StatsService extends Actor { val words = text.split(" ") val replyTo = sender // important to not close over sender val aggregator = context.actorOf(Props(new StatsAggregator(words.size, replyTo))) - words foreach { word ⇒ workerRouter.tell(word, aggregator) } + words foreach { word ⇒ + workerRouter.tell( + ConsistentHashableEnvelope(word, word), aggregator) + } } } @@ -64,9 +68,18 @@ class StatsAggregator(expectedResults: Int, replyTo: ActorRef) extends Actor { //#worker class StatsWorker extends Actor { - // FIXME add a cache here to illustrate consistent hashing + var cache = Map.empty[String, Int] def receive = { - case word: String ⇒ sender ! word.length + case word: String ⇒ + val length = cache.get(word) match { + case Some(x) ⇒ x + case None ⇒ + val x = word.length + cache += (word -> x) + x + } + + sender ! length } } //#worker @@ -124,8 +137,7 @@ object StatsSample { val system = ActorSystem("ClusterSystem", ConfigFactory.parseString(""" akka.actor.deployment { /statsService/workerRouter { - # FIXME use consistent hashing instead - router = round-robin + router = consistent-hashing nr-of-instances = 100 cluster { enabled = on @@ -153,8 +165,7 @@ object StatsSampleOneMaster { val system = ActorSystem("ClusterSystem", ConfigFactory.parseString(""" akka.actor.deployment { /statsFacade/statsService/workerRouter { - # FIXME use consistent hashing instead - router = round-robin + router = consistent-hashing nr-of-instances = 100 cluster { enabled = on @@ -225,10 +236,10 @@ abstract class StatsService2 extends Actor { //#router-lookup-in-code import akka.cluster.routing.ClusterRouterConfig import akka.cluster.routing.ClusterRouterSettings - import akka.routing.RoundRobinRouter + import akka.routing.ConsistentHashingRouter val workerRouter = context.actorOf(Props[StatsWorker].withRouter( - ClusterRouterConfig(RoundRobinRouter(), ClusterRouterSettings( + ClusterRouterConfig(ConsistentHashingRouter(), ClusterRouterSettings( totalInstances = 100, routeesPath = "/user/statsWorker", allowLocalRoutees = true))), name = "workerRouter2") @@ -240,11 +251,10 @@ abstract class StatsService3 extends Actor { //#router-deploy-in-code import akka.cluster.routing.ClusterRouterConfig import akka.cluster.routing.ClusterRouterSettings - // FIXME use ConsistentHashingRouter instead - import akka.routing.RoundRobinRouter + import akka.routing.ConsistentHashingRouter val workerRouter = context.actorOf(Props[StatsWorker].withRouter( - ClusterRouterConfig(RoundRobinRouter(), ClusterRouterSettings( + ClusterRouterConfig(ConsistentHashingRouter(), ClusterRouterSettings( totalInstances = 100, maxInstancesPerNode = 3, allowLocalRoutees = false))), name = "workerRouter3") diff --git a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala index e23504d084..b1d27cd7a3 100644 --- a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala +++ b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala @@ -35,8 +35,7 @@ object StatsSampleSingleMasterSpec extends MultiNodeConfig { #//#router-deploy-config akka.actor.deployment { /statsFacade/statsService/workerRouter { - # FIXME use consistent hashing instead - router = round-robin + router = consistent-hashing nr-of-instances = 100 cluster { enabled = on diff --git a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala index 7398aa025b..9f88597051 100644 --- a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala +++ b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala @@ -31,8 +31,7 @@ object StatsSampleSpec extends MultiNodeConfig { #//#router-lookup-config akka.actor.deployment { /statsService/workerRouter { - # FIXME use consistent hashing instead - router = round-robin + router = consistent-hashing nr-of-instances = 100 cluster { enabled = on From 08121e4ae3e89f0c188ad664b9ca53c68cbbc338 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 20 Sep 2012 13:02:24 +0200 Subject: [PATCH 16/16] Minor corr, fqcn of CARP --- akka-docs/cluster/cluster-usage.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-docs/cluster/cluster-usage.rst b/akka-docs/cluster/cluster-usage.rst index 6087e1b637..b0ec8f08b7 100644 --- a/akka-docs/cluster/cluster-usage.rst +++ b/akka-docs/cluster/cluster-usage.rst @@ -36,7 +36,7 @@ Try it out: :language: none To enable cluster capabilities in your Akka project you should, at a minimum, add the :ref:`remoting-scala` -settings, but with ``ClusterActorRefProvider``. +settings, but with ``akka.cluster.ClusterActorRefProvider``. The ``akka.cluster.seed-nodes`` and cluster extension should normally also be added to your ``application.conf`` file.