Merge pull request #774 from akka/wip-2502-cluster-java-patriknw

Java version of Cluster usage documentation, see #2502
This commit is contained in:
Patrik Nordwall 2012-10-05 00:35:48 -07:00
commit 89c1f66b1f
35 changed files with 1801 additions and 70 deletions

View file

@ -105,8 +105,6 @@ case object NoScopeGiven extends NoScopeGiven {
/**
* Deployer maps actor paths to actor deployments.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
private[akka] class Deployer(val settings: ActorSystem.Settings, val dynamicAccess: DynamicAccess) {

View file

@ -32,7 +32,41 @@ object ClusterEvent {
unreachable: Set[Member] = Set.empty,
convergence: Boolean = false,
seenBy: Set[Address] = Set.empty,
leader: Option[Address] = None) extends ClusterDomainEvent
leader: Option[Address] = None) extends ClusterDomainEvent {
/**
* Java API
* Read only
*/
def getMembers: java.lang.Iterable[Member] = {
import scala.collection.JavaConverters._
members.asJava
}
/**
* Java API
* Read only
*/
def getUnreachable: java.util.Set[Member] = {
import scala.collection.JavaConverters._
unreachable.asJava
}
/**
* Java API
* Read only
*/
def getSeenBy: java.util.Set[Address] = {
import scala.collection.JavaConverters._
seenBy.asJava
}
/**
* Java API
* @return address of current leader, or null if none
*/
def getLeader: Address = leader orNull
}
/**
* Marker interface for member related events.
@ -88,11 +122,6 @@ object ClusterEvent {
if (member.status != Removed) throw new IllegalArgumentException("Expected Removed status, got: " + member)
}
/**
* Current snapshot of cluster member metrics. Published to subscribers.
*/
case class ClusterMetricsChanged(nodes: Set[NodeMetrics]) extends ClusterDomainEvent
/**
* Cluster convergence state changed.
*/
@ -101,7 +130,20 @@ object ClusterEvent {
/**
* Leader of the cluster members changed. Only published after convergence.
*/
case class LeaderChanged(leader: Option[Address]) extends ClusterDomainEvent
case class LeaderChanged(leader: Option[Address]) extends ClusterDomainEvent {
/**
* Java API
* @return address of current leader, or null if none
*/
def getLeader: Address = leader orNull
}
/**
* INTERNAL API
*
* Current snapshot of cluster member metrics. Published to subscribers.
*/
case class ClusterMetricsChanged(nodes: Set[NodeMetrics]) extends ClusterDomainEvent
/**
* INTERNAL API

View file

@ -22,6 +22,8 @@ import java.lang.reflect.Method
import java.lang.System.{ currentTimeMillis newTimestamp }
/**
* INTERNAL API.
*
* This strategy is primarily for load-balancing of nodes. It controls metrics sampling
* at a regular frequency, prepares highly variable data for further analysis by other entities,
* and publishes the latest cluster metrics data around the node ring to assist in determining
@ -31,10 +33,6 @@ import java.lang.System.{ currentTimeMillis ⇒ newTimestamp }
*
* Calculation of statistical data for each monitored process is delegated to the
* [[akka.cluster.DataStream]] for exponential smoothing, with additional decay factor.
*
* INTERNAL API.
*
* @author Helena Edelson
*/
private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Actor with ActorLogging {
@ -119,8 +117,6 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
* Samples the latest metrics for the node, updates metrics statistics in
* [[akka.cluster.MetricsGossip]], and publishes the change to the event bus.
*
* INTERNAL API
*
* @see [[akka.cluster.ClusterMetricsCollector.collect( )]]
*/
def collect(): Unit = {
@ -164,7 +160,6 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
* INTERNAL API
*
* @param nodes metrics per node
* @author Helena Edelson
*/
private[cluster] case class MetricsGossip(rateOfDecay: Int, nodes: Set[NodeMetrics] = Set.empty) {
@ -220,8 +215,8 @@ private[cluster] case class MetricsGossip(rateOfDecay: Int, nodes: Set[NodeMetri
}
/**
* Envelope adding a sender address to the gossip.
* INTERNAL API
* Envelope adding a sender address to the gossip.
*/
private[cluster] case class MetricsGossipEnvelope(from: Address, gossip: MetricsGossip) extends ClusterMessage
@ -245,8 +240,6 @@ private[cluster] case class MetricsGossipEnvelope(from: Address, gossip: Metrics
* @param timestamp the most recent time of sampling
*
* @param startTime the time of initial sampling for this data stream
*
* @author Helena Edelson
*/
private[cluster] case class DataStream(decay: Int, ewma: ScalaNumber, startTime: Long, timestamp: Long)
extends ClusterMessage with MetricNumericConverter {
@ -277,9 +270,9 @@ private[cluster] case class DataStream(decay: Int, ewma: ScalaNumber, startTime:
}
/**
* Companion object of DataStream class.
* INTERNAL API
*
* @author Helena Edelson
* Companion object of DataStream class.
*/
private[cluster] object DataStream {
@ -297,8 +290,6 @@ private[cluster] object DataStream {
*
* @param average the data stream of the metric value, for trending over time. Metrics that are already
* averages (e.g. system load average) or finite (e.g. as total cores), are not trended.
*
* @author Helena Edelson
*/
private[cluster] case class Metric(name: String, value: Option[ScalaNumber], average: Option[DataStream])
extends ClusterMessage with MetricNumericConverter {
@ -348,9 +339,9 @@ private[cluster] case class Metric(name: String, value: Option[ScalaNumber], ave
}
/**
* Companion object of Metric class.
* INTERNAL API
*
* @author Helena Edelson
* Companion object of Metric class.
*/
private[cluster] object Metric extends MetricNumericConverter {
@ -372,6 +363,8 @@ private[cluster] object Metric extends MetricNumericConverter {
}
/**
* INTERNAL API
*
* The snapshot of current sampled health metrics for any monitored process.
* Collected and gossipped at regular intervals for dynamic cluster management strategies.
*
@ -386,8 +379,6 @@ private[cluster] object Metric extends MetricNumericConverter {
* @param timestamp the time of sampling
*
* @param metrics the array of sampled [[akka.actor.Metric]]
*
* @author Helena Edelson
*/
private[cluster] case class NodeMetrics(address: Address, timestamp: Long, metrics: Set[Metric] = Set.empty[Metric]) extends ClusterMessage {
@ -409,12 +400,10 @@ private[cluster] case class NodeMetrics(address: Address, timestamp: Long, metri
}
/**
* Encapsulates evaluation of validity of metric values, conversion of an actual metric value to
* a [[akka.cluster.Metric]] for consumption by subscribed cluster entities.
*
* INTERNAL API
*
* @author Helena Edelson
* Encapsulates evaluation of validity of metric values, conversion of an actual metric value to
* a [[akka.cluster.Metric]] for consumption by subscribed cluster entities.
*/
private[cluster] trait MetricNumericConverter {
@ -439,18 +428,16 @@ private[cluster] trait MetricNumericConverter {
}
/**
* INTERNAL API
*
* Loads JVM metrics through JMX monitoring beans. If Hyperic SIGAR is on the classpath, this
* loads wider and more accurate range of metrics in combination with SIGAR's native OS library.
*
* FIXME switch to Scala reflection
*
* INTERNAL API
*
* @param sigar the optional org.hyperic.Sigar instance
*
* @param address The [[akka.actor.Address]] of the node being sampled
*
* @author Helena Edelson
*/
private[cluster] class MetricsCollector private (private val sigar: Option[AnyRef], address: Address) extends MetricNumericConverter {
@ -562,9 +549,8 @@ private[cluster] class MetricsCollector private (private val sigar: Option[AnyRe
}
/**
* INTERNAL API
* Companion object of MetricsCollector class.
*
* @author Helena Edelson
*/
private[cluster] object MetricsCollector {
def apply(address: Address, log: LoggingAdapter, dynamicAccess: DynamicAccess): MetricsCollector =

View file

@ -87,7 +87,7 @@ object Member {
*
* Can be one of: Joining, Up, Leaving, Exiting and Down.
*/
sealed trait MemberStatus extends ClusterMessage {
abstract class MemberStatus extends ClusterMessage {
/**
* Using the same notion for 'unavailable' as 'non-convergence': DOWN
@ -102,4 +102,34 @@ object MemberStatus {
case object Exiting extends MemberStatus
case object Down extends MemberStatus
case object Removed extends MemberStatus
/**
* JAVA API
*/
def joining: MemberStatus = Joining
/**
* JAVA API
*/
def up: MemberStatus = Up
/**
* JAVA API
*/
def leaving: MemberStatus = Leaving
/**
* JAVA API
*/
def exiting: MemberStatus = Exiting
/**
* JAVA API
*/
def down: MemberStatus = Down
/**
* JAVA API
*/
def removed: MemberStatus = Removed
}

View file

@ -0,0 +1,496 @@
.. _cluster_usage_java:
######################
Cluster Usage (Java)
######################
.. note:: This module is :ref:`experimental <experimental>`. This document describes how to use the features implemented so far. More features are coming in Akka Coltrane. Track progress of the Coltrane milestone in `Assembla <http://www.assembla.com/spaces/akka/tickets>`_ and the `Roadmap <https://docs.google.com/document/d/18W9-fKs55wiFNjXL9q50PYOnR7-nnsImzJqHOPPbM4E/edit?hl=en_US>`_.
For introduction to the Akka Cluster concepts please see :ref:`cluster`.
Preparing Your Project for Clustering
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The Akka cluster is a separate jar file. Make sure that you have the following dependency in your project::
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster-experimental_@binVersion@</artifactId>
<version>@version@</version>
</dependency>
If you are using the latest nightly build you should pick a timestamped Akka
version from
`<http://repo.typesafe.com/typesafe/snapshots/com/typesafe/akka/akka-cluster-experimental_@binVersion@/>`_.
We recommend against using ``SNAPSHOT`` in order to obtain stable builds.
A Simple Cluster Example
^^^^^^^^^^^^^^^^^^^^^^^^
The following small program together with its configuration starts an ``ActorSystem``
with the Cluster extension enabled. It joins the cluster and logs some membership events.
Try it out:
1. Add the following ``application.conf`` in your project, place it in ``src/main/resources``:
.. literalinclude:: ../../../akka-samples/akka-sample-cluster/src/main/resources/application.conf
:language: none
To enable cluster capabilities in your Akka project you should, at a minimum, add the :ref:`remoting-java`
settings, but with ``akka.cluster.ClusterActorRefProvider``.
The ``akka.cluster.seed-nodes`` and cluster extension should normally also be added to your
``application.conf`` file.
The seed nodes are configured contact points for initial, automatic, join of the cluster.
Note that if you are going to start the nodes on different machines you need to specify the
ip-addresses or host names of the machines in ``application.conf`` instead of ``127.0.0.1``
2. Add the following main program to your project, place it in ``src/main/java``:
.. literalinclude:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/simple/japi/SimpleClusterApp.java
:language: java
3. Start the first seed node. Open a sbt session in one terminal window and run::
run-main sample.cluster.simple.japi.SimpleClusterApp 2551
2551 corresponds to the port of the first seed-nodes element in the configuration.
In the log output you see that the cluster node has been started and changed status to 'Up'.
4. Start the second seed node. Open a sbt session in another terminal window and run::
run-main sample.cluster.simple.japi.SimpleClusterApp 2552
2552 corresponds to the port of the second seed-nodes element in the configuration.
In the log output you see that the cluster node has been started and joins the other seed node
and becomes a member of the cluster. It's status changed to 'Up'.
Switch over to the first terminal window and see in the log output that the member joined.
5. Start another node. Open a sbt session in yet another terminal window and run::
run-main sample.cluster.simple.japi.SimpleClusterApp
Now you don't need to specify the port number, and it will use a random available port.
It joins one of the configured seed nodes. Look at the log output in the different terminal
windows.
Start even more nodes in the same way, if you like.
6. Shut down one of the nodes by pressing 'ctrl-c' in one of the terminal windows.
The other nodes will detect the failure after a while, which you can see in the log
output in the other terminals.
Look at the source code of the program again. What it does is to create an actor
and register it as subscriber of certain cluster events. It gets notified with
an snapshot event, ``CurrentClusterState`` that holds full state information of
the cluster. After that it receives events for changes that happen in the cluster.
Automatic vs. Manual Joining
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
You may decide if joining to the cluster should be done automatically or manually.
By default it is automatic and you need to define the seed nodes in configuration
so that a new node has an initial contact point. When a new node is started it
sends a message to all seed nodes and then sends join command to the one that
answers first. If no one of the seed nodes replied (might not be started yet)
it retries this procedure until successful or shutdown.
There is one thing to be aware of regarding the seed node configured as the
first element in the ``seed-nodes`` configuration list.
The seed nodes can be started in any order and it is not necessary to have all
seed nodes running, but the first seed node must be started when initially
starting a cluster, otherwise the other seed-nodes will not become initialized
and no other node can join the cluster. Once more than two seed nodes have been
started it is no problem to shut down the first seed node. If it goes down it
must be manually joined to the cluster again.
Automatic joining of the first seed node is not possible, it would only join
itself. It is only the first seed node that has this restriction.
You can disable automatic joining with configuration::
akka.cluster.auto-join = off
Then you need to join manually, using :ref:`cluster_jmx_java` or :ref:`cluster_command_line_java`.
You can join to any node in the cluster. It doesn't have to be configured as
seed node. If you are not using auto-join there is no need to configure
seed nodes at all.
Joining can also be performed programatically with ``Cluster.get(system).join(address)``.
Automatic vs. Manual Downing
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
When a member is considered by the failure detector to be unreachable the
leader is not allowed to perform its duties, such as changing status of
new joining members to 'Up'. The status of the unreachable member must be
changed to 'Down'. This can be performed automatically or manually. By
default it must be done manually, using using :ref:`cluster_jmx_java` or
:ref:`cluster_command_line_java`.
It can also be performed programatically with ``Cluster.get(system).down(address)``.
You can enable automatic downing with configuration::
akka.cluster.auto-down = on
Be aware of that using auto-down implies that two separate clusters will
automatically be formed in case of network partition. That might be
desired by some applications but not by others.
.. _cluster_subscriber_java:
Subscribe to Cluster Events
^^^^^^^^^^^^^^^^^^^^^^^^^^^
You can subscribe to change notifications of the cluster membership by using
``Cluster.get(system).subscribe(subscriber, to)``. A snapshot of the full state,
``akka.cluster.ClusterEvent.CurrentClusterState``, is sent to the subscriber
as the first event, followed by events for incremental updates.
There are several types of change events, consult the API documentation
of classes that extends ``akka.cluster.ClusterEvent.ClusterDomainEvent``
for details about the events.
Worker Dial-in Example
----------------------
Let's take a look at an example that illustrates how workers, here named *backend*,
can detect and register to new master nodes, here named *frontend*.
The example application provides a service to transform text. When some text
is sent to one of the frontend services, it will be delegated to one of the
backend workers, which performs the transformation job, and sends the result back to
the original client. New backend nodes, as well as new frontend nodes, can be
added or removed to the cluster dynamically.
In this example the following imports are used:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/transformation/japi/TransformationBackend.java#imports
Messages:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/transformation/japi/TransformationMessages.java#messages
The backend worker that performs the transformation job:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/transformation/japi/TransformationBackend.java#backend
Note that the ``TransformationBackend`` actor subscribes to cluster events to detect new,
potential, frontend nodes, and send them a registration message so that they know
that they can use the backend worker.
The frontend that receives user jobs and delegates to one of the registered backend workers:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/transformation/japi/TransformationFrontend.java#frontend
Note that the ``TransformationFrontend`` actor watch the registered backend
to be able to remove it from its list of availble backend workers.
Death watch uses the cluster failure detector for nodes in the cluster, i.e. it detects
network failures and JVM crashes, in addition to graceful termination of watched
actor.
This example is included in ``akka-samples/akka-sample-cluster``
and you can try by starting nodes in different terminal windows. For example, starting 2
frontend nodes and 3 backend nodes::
sbt
project akka-sample-cluster-experimental
run-main sample.cluster.transformation.japi.TransformationFrontendMain 2551
run-main sample.cluster.transformation.japi.TransformationBackendMain 2552
run-main sample.cluster.transformation.japi.TransformationBackendMain
run-main sample.cluster.transformation.japi.TransformationBackendMain
run-main sample.cluster.transformation.japi.TransformationFrontendMain
.. note:: The above example should probably be designed as two separate, frontend/backend, clusters, when there is a `cluster client for decoupling clusters <https://www.assembla.com/spaces/akka/tickets/1165>`_.
Failure Detector
^^^^^^^^^^^^^^^^
The nodes in the cluster monitor each other by sending heartbeats to detect if a node is
unreachable from the rest of the cluster. The heartbeat arrival times is interpreted
by an implementation of
`The Phi Accrual Failure Detector <http://ddg.jaist.ac.jp/pub/HDY+04.pdf>`_.
The suspicion level of failure is given by a value called *phi*.
The basic idea of the phi failure detector is to express the value of *phi* on a scale that
is dynamically adjusted to reflect current network conditions.
The value of *phi* is calculated as::
phi = -log10(1 - F(timeSinceLastHeartbeat))
where F is the cumulative distribution function of a normal distribution with mean
and standard deviation estimated from historical heartbeat inter-arrival times.
In the :ref:`cluster_configuration_java` you can adjust the ``akka.cluster.failure-detector.threshold``
to define when a *phi* value is considered to be a failure.
A low ``threshold`` is prone to generate many false positives but ensures
a quick detection in the event of a real crash. Conversely, a high ``threshold``
generates fewer mistakes but needs more time to detect actual crashes. The
default ``threshold`` is 8 and is appropriate for most situations. However in
cloud environments, such as Amazon EC2, the value could be increased to 12 in
order to account for network issues that sometimes occur on such platforms.
The following chart illustrates how *phi* increase with increasing time since the
previous heartbeat.
.. image:: images/phi1.png
Phi is calculated from the mean and standard deviation of historical
inter arrival times. The previous chart is an example for standard deviation
of 200 ms. If the heartbeats arrive with less deviation the curve becomes steeper,
i.e. it's possible to determine failure more quickly. The curve looks like this for
a standard deviation of 100 ms.
.. image:: images/phi2.png
To be able to survive sudden abnormalities, such as garbage collection pauses and
transient network failures the failure detector is configured with a margin,
``akka.cluster.failure-detector.acceptable-heartbeat-pause``. You may want to
adjust the :ref:`cluster_configuration_java` of this depending on you environment.
This is how the curve looks like for ``acceptable-heartbeat-pause`` configured to
3 seconds.
.. image:: images/phi3.png
Cluster Aware Routers
^^^^^^^^^^^^^^^^^^^^^
All :ref:`routers <routing-java>` can be made aware of member nodes in the cluster, i.e.
deploying new routees or looking up routees on nodes in the cluster.
When a node becomes unavailble or leaves the cluster the routees of that node are
automatically unregistered from the router. When new nodes join the cluster additional
routees are added to the router, according to the configuration.
When using a router with routees looked up on the cluster member nodes, i.e. the routees
are already running, the configuration for a router looks like this:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala#router-lookup-config
It's the relative actor path defined in ``routees-path`` that identify what actor to lookup.
``nr-of-instances`` defines total number of routees in the cluster, but there will not be
more than one per node. Setting ``nr-of-instances`` to a high value will result in new routees
added to the router when nodes join the cluster.
The same type of router could also have been defined in code:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsService.java#router-lookup-in-code
When using a router with routees created and deployed on the cluster member nodes
the configuration for a router looks like this:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala#router-deploy-config
``nr-of-instances`` defines total number of routees in the cluster, but the number of routees
per node, ``max-nr-of-instances-per-node``, will not be exceeded. Setting ``nr-of-instances``
to a high value will result in creating and deploying additional routees when new nodes join
the cluster.
The same type of router could also have been defined in code:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsService.java#router-deploy-in-code
See :ref:`cluster_configuration_java` section for further descriptions of the settings.
Router Example
--------------
Let's take a look at how to use cluster aware routers.
The example application provides a service to calculate statistics for a text.
When some text is sent to the service it splits it into words, and delegates the task
to count number of characters in each word to a separate worker, a routee of a router.
The character count for each word is sent back to an aggregator that calculates
the average number of characters per word when all results have been collected.
In this example we use the following imports:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsService.java#imports
Messages:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsMessages.java#messages
The worker that counts number of characters in each word:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsWorker.java#worker
The service that receives text from users and splits it up into words, delegates to workers and aggregates:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsService.java#service
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsAggregator.java#aggregator
Note, nothing cluster specific so far, just plain actors.
We can use these actors with two different types of router setup. Either with lookup of routees,
or with create and deploy of routees. Remember, routees are the workers in this case.
We start with the router setup with lookup of routees. All nodes start ``StatsService`` and
``StatsWorker`` actors and the router is configured with ``routees-path``:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleMain.java#start-router-lookup
This means that user requests can be sent to ``StatsService`` on any node and it will use
``StatsWorker`` on all nodes. There can only be one worker per node, but that worker could easily
fan out to local children if more parallelism is needed.
This example is included in ``akka-samples/akka-sample-cluster``
and you can try by starting nodes in different terminal windows. For example, starting 3
service nodes and 1 client::
sbt
project akka-sample-cluster-experimental
run-main sample.cluster.stats.japi.StatsSampleMain 2551
run-main sample.cluster.stats.japi.StatsSampleMain 2552
run-main sample.cluster.stats.japi.StatsSampleClientMain
run-main sample.cluster.stats.japi.StatsSampleMain
The above setup is nice for this example, but we will also take a look at how to use
a single master node that creates and deploys workers. To keep track of a single
master we need one additional actor:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsFacade.java#facade
The ``StatsFacade`` receives text from users and delegates to the current ``StatsService``, the single
master. It listens to cluster events to create or lookup the ``StatsService`` depending on if
it is on the same same node or on another node. We run the master on the same node as the leader of
the cluster members, which is nothing more than the address currently sorted first in the member ring,
i.e. it can change when new nodes join or when current leader leaves.
All nodes start ``StatsFacade`` and the router is now configured like this:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleOneMasterMain.java#start-router-deploy
This example is included in ``akka-samples/akka-sample-cluster``
and you can try by starting nodes in different terminal windows. For example, starting 3
service nodes and 1 client::
sbt
project akka-sample-cluster-experimental
run-main sample.cluster.stats.japi.StatsSampleOneMasterMain 2551
run-main sample.cluster.stats.japi.StatsSampleOneMasterMain 2552
run-main sample.cluster.stats.japi.StatsSampleOneMasterClientMain
run-main sample.cluster.stats.japi.StatsSampleOneMasterMain
.. note:: The above example, especially the last part, will be simplified when the cluster handles automatic actor partitioning.
.. _cluster_jmx_java:
JMX
^^^
Information and management of the cluster is available as JMX MBeans with the root name ``akka.Cluster``.
The JMX information can be displayed with an ordinary JMX console such as JConsole or JVisualVM.
From JMX you can:
* see what members that are part of the cluster
* see status of this node
* join this node to another node in cluster
* mark any node in the cluster as down
* tell any node in the cluster to leave
Member nodes are identified with their address, in format `akka://actor-system-name@hostname:port`.
.. _cluster_command_line_java:
Command Line Management
^^^^^^^^^^^^^^^^^^^^^^^
The cluster can be managed with the script `bin/akka-cluster` provided in the
Akka distribution.
Run it without parameters to see instructions about how to use the script::
Usage: bin/akka-cluster <node-hostname:jmx-port> <command> ...
Supported commands are:
join <node-url> - Sends request a JOIN node with the specified URL
leave <node-url> - Sends a request for node with URL to LEAVE the cluster
down <node-url> - Sends a request for marking node with URL as DOWN
member-status - Asks the member node for its current status
cluster-status - Asks the cluster for its current status (member ring,
unavailable nodes, meta data etc.)
leader - Asks the cluster who the current leader is
is-singleton - Checks if the cluster is a singleton cluster (single
node cluster)
is-available - Checks if the member node is available
is-running - Checks if the member node is running
has-convergence - Checks if there is a cluster convergence
Where the <node-url> should be on the format of 'akka://actor-system-name@hostname:port'
Examples: bin/akka-cluster localhost:9999 is-available
bin/akka-cluster localhost:9999 join akka://MySystem@darkstar:2552
bin/akka-cluster localhost:9999 cluster-status
To be able to use the script you must enable remote monitoring and management when starting the JVMs of the cluster nodes,
as described in `Monitoring and Management Using JMX Technology <http://docs.oracle.com/javase/6/docs/technotes/guides/management/agent.html>`_
Example of system properties to enable remote monitoring and management::
java -Dcom.sun.management.jmxremote.port=9999 \
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom.sun.management.jmxremote.ssl=false
.. _cluster_configuration_java:
Configuration
^^^^^^^^^^^^^
There are several configuration properties for the cluster. We refer to the following
reference file for more information:
.. literalinclude:: ../../../akka-cluster/src/main/resources/reference.conf
:language: none
Cluster Scheduler
-----------------
It is recommended that you change the ``tick-duration`` to 33 ms or less
of the default scheduler when using cluster, if you don't need to have it
configured to a longer duration for other reasons. If you don't do this
a dedicated scheduler will be used for periodic tasks of the cluster, which
introduce the extra overhead of another thread.
::
# shorter tick-duration of default scheduler when using cluster
akka.scheduler.tick-duration.tick-duration = 33ms

View file

@ -1,9 +1,9 @@
.. _cluster_usage:
.. _cluster_usage_scala:
###############
Cluster Usage
###############
#######################
Cluster Usage (Scala)
#######################
.. note:: This module is :ref:`experimental <experimental>`. This document describes how to use the features implemented so far. More features are coming in Akka Coltrane. Track progress of the Coltrane milestone in `Assembla <http://www.assembla.com/spaces/akka/tickets>`_ and the `Roadmap <https://docs.google.com/document/d/18W9-fKs55wiFNjXL9q50PYOnR7-nnsImzJqHOPPbM4E/edit?hl=en_US>`_.
@ -12,11 +12,9 @@ For introduction to the Akka Cluster concepts please see :ref:`cluster`.
Preparing Your Project for Clustering
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The Akka cluster is a separate jar file. Make sure that you have the following dependency in your project:
The Akka cluster is a separate jar file. Make sure that you have the following dependency in your project::
.. parsed-literal::
"com.typesafe.akka" %% "akka-cluster" % "@version@" @crossString@
"com.typesafe.akka" %% "akka-cluster-experimental" % "@version@" @crossString@
If you are using the latest nightly build you should pick a timestamped Akka
version from
@ -115,7 +113,7 @@ You can disable automatic joining with configuration::
akka.cluster.auto-join = off
Then you need to join manually, using :ref:`cluster_jmx` or :ref:`cluster_command_line`.
Then you need to join manually, using :ref:`cluster_jmx_scala` or :ref:`cluster_command_line_scala`.
You can join to any node in the cluster. It doesn't have to be configured as
seed node. If you are not using auto-join there is no need to configure
seed nodes at all.
@ -130,8 +128,8 @@ When a member is considered by the failure detector to be unreachable the
leader is not allowed to perform its duties, such as changing status of
new joining members to 'Up'. The status of the unreachable member must be
changed to 'Down'. This can be performed automatically or manually. By
default it must be done manually, using using :ref:`cluster_jmx` or
:ref:`cluster_command_line`.
default it must be done manually, using using :ref:`cluster_jmx_scala` or
:ref:`cluster_command_line_scala`.
It can also be performed programatically with ``Cluster(system).down(address)``.
@ -143,7 +141,7 @@ Be aware of that using auto-down implies that two separate clusters will
automatically be formed in case of network partition. That might be
desired by some applications but not by others.
.. _cluster_subscriber:
.. _cluster_subscriber_scala:
Subscribe to Cluster Events
^^^^^^^^^^^^^^^^^^^^^^^^^^^
@ -230,12 +228,12 @@ is dynamically adjusted to reflect current network conditions.
The value of *phi* is calculated as::
phi = -log10(1 - F(timeSinceLastHeartbeat)
phi = -log10(1 - F(timeSinceLastHeartbeat))
where F is the cumulative distribution function of a normal distribution with mean
and standard deviation estimated from historical heartbeat inter-arrival times.
In the :ref:`cluster_configuration` you can adjust the ``akka.cluster.failure-detector.threshold``
In the :ref:`cluster_configuration_scala` you can adjust the ``akka.cluster.failure-detector.threshold``
to define when a *phi* value is considered to be a failure.
A low ``threshold`` is prone to generate many false positives but ensures
@ -261,7 +259,7 @@ a standard deviation of 100 ms.
To be able to survive sudden abnormalities, such as garbage collection pauses and
transient network failures the failure detector is configured with a margin,
``akka.cluster.failure-detector.acceptable-heartbeat-pause``. You may want to
adjust the :ref:`cluster_configuration` of this depending on you environment.
adjust the :ref:`cluster_configuration_scala` of this depending on you environment.
This is how the curve looks like for ``acceptable-heartbeat-pause`` configured to
3 seconds.
@ -306,7 +304,7 @@ The same type of router could also have been defined in code:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#router-deploy-in-code
See :ref:`cluster_configuration` section for further descriptions of the settings.
See :ref:`cluster_configuration_scala` section for further descriptions of the settings.
Router Example
@ -355,6 +353,10 @@ This example is included in ``akka-samples/akka-sample-cluster``
and you can try by starting nodes in different terminal windows. For example, starting 3
service nodes and 1 client::
sbt
project akka-sample-cluster-experimental
run-main sample.cluster.stats.StatsSample 2551
run-main sample.cluster.stats.StatsSample 2552
@ -404,7 +406,7 @@ Set up your project according to the instructions in :ref:`multi-node-testing` a
add the ``sbt-multi-jvm`` plugin and the dependency to ``akka-remote-tests-experimental``.
First, as described in :ref:`multi-node-testing`, we need some scaffolding to configure the ``MultiNodeSpec``.
Define the participating roles and their :ref:`cluster_configuration` in an object extending ``MultiNodeConfig``:
Define the participating roles and their :ref:`cluster_configuration_scala` in an object extending ``MultiNodeConfig``:
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala
:include: MultiNodeConfig
@ -434,7 +436,7 @@ From the test you interact with the cluster using the ``Cluster`` extension, e.g
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala#join
Notice how the `testActor` from :ref:`testkit <akka-testkit>` is added as :ref:`subscriber <cluster_subscriber>`
Notice how the `testActor` from :ref:`testkit <akka-testkit>` is added as :ref:`subscriber <cluster_subscriber_scala>`
to cluster changes and then waiting for certain events, such as in this case all members becoming 'Up'.
The above code was running for all roles (JVMs). ``runOn`` is a convenient utility to declare that a certain block
@ -451,7 +453,7 @@ the actor system for a specific role. This can also be used to grab the ``akka.a
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala#addresses
.. _cluster_jmx:
.. _cluster_jmx_scala:
JMX
^^^
@ -469,7 +471,7 @@ From JMX you can:
Member nodes are identified with their address, in format `akka://actor-system-name@hostname:port`.
.. _cluster_command_line:
.. _cluster_command_line_scala:
Command Line Management
^^^^^^^^^^^^^^^^^^^^^^^
@ -510,7 +512,7 @@ Example of system properties to enable remote monitoring and management::
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom.sun.management.jmxremote.ssl=false
.. _cluster_configuration:
.. _cluster_configuration_scala:
Configuration
^^^^^^^^^^^^^

View file

@ -5,4 +5,5 @@ Cluster
:maxdepth: 2
cluster
cluster-usage
cluster-usage-java
cluster-usage-scala

View file

@ -0,0 +1,28 @@
package sample.cluster.simple.japi;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent.ClusterDomainEvent;
public class SimpleClusterApp {
public static void main(String[] args) {
// Override the configuration of the port
// when specified as program argument
if (args.length > 0)
System.setProperty("akka.remote.netty.port", args[0]);
// Create an Akka system
ActorSystem system = ActorSystem.create("ClusterSystem");
// Create an actor that handles cluster domain events
ActorRef clusterListener = system.actorOf(new Props(
SimpleClusterListener.class), "clusterListener");
// Add subscription of cluster events
Cluster.get(system).subscribe(clusterListener,
ClusterDomainEvent.class);
}
}

View file

@ -0,0 +1,41 @@
package sample.cluster.simple.japi;
import akka.actor.UntypedActor;
import akka.cluster.ClusterEvent.ClusterDomainEvent;
import akka.cluster.ClusterEvent.CurrentClusterState;
import akka.cluster.ClusterEvent.MemberJoined;
import akka.cluster.ClusterEvent.MemberUnreachable;
import akka.cluster.ClusterEvent.MemberUp;
import akka.event.Logging;
import akka.event.LoggingAdapter;
public class SimpleClusterListener extends UntypedActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
@Override
public void onReceive(Object message) {
if (message instanceof CurrentClusterState) {
CurrentClusterState state = (CurrentClusterState) message;
log.info("Current members: {}", state.members());
} else if (message instanceof MemberJoined) {
MemberJoined mJoined = (MemberJoined) message;
log.info("Member joined: {}", mJoined);
} else if (message instanceof MemberUp) {
MemberUp mUp = (MemberUp) message;
log.info("Member is Up: {}", mUp.member());
} else if (message instanceof MemberUnreachable) {
MemberUnreachable mUnreachable = (MemberUnreachable) message;
log.info("Member detected as unreachable: {}", mUnreachable.member());
} else if (message instanceof ClusterDomainEvent) {
// ignore
} else {
unhandled(message);
}
}
}

View file

@ -0,0 +1,56 @@
package sample.cluster.stats.japi;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import sample.cluster.stats.japi.StatsMessages.JobFailed;
import sample.cluster.stats.japi.StatsMessages.StatsResult;
import scala.concurrent.util.Duration;
import akka.actor.ActorRef;
import akka.actor.ReceiveTimeout;
import akka.actor.UntypedActor;
//#aggregator
public class StatsAggregator extends UntypedActor {
final int expectedResults;
final ActorRef replyTo;
final List<Integer> results = new ArrayList<Integer>();
public StatsAggregator(int expectedResults, ActorRef replyTo) {
this.expectedResults = expectedResults;
this.replyTo = replyTo;
}
@Override
public void preStart() {
getContext().setReceiveTimeout(Duration.create(10, TimeUnit.SECONDS));
}
@Override
public void onReceive(Object message) {
if (message instanceof Integer) {
Integer wordCount = (Integer) message;
results.add(wordCount);
if (results.size() == expectedResults) {
int sum = 0;
for (int c : results)
sum += c;
double meanWordLength = ((double) sum) / results.size();
replyTo.tell(new StatsResult(meanWordLength), getSelf());
getContext().stop(getSelf());
}
} else if (message == ReceiveTimeout.getInstance()) {
replyTo.tell(new JobFailed("Service unavailable, try again later"),
getSelf());
getContext().stop(getSelf());
} else {
unhandled(message);
}
}
}
//#aggregator

View file

@ -0,0 +1,84 @@
package sample.cluster.stats.japi;
import sample.cluster.stats.japi.StatsMessages.JobFailed;
import sample.cluster.stats.japi.StatsMessages.StatsJob;
import akka.actor.ActorRef;
import akka.actor.Address;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent.CurrentClusterState;
import akka.cluster.ClusterEvent.LeaderChanged;
import akka.cluster.ClusterEvent.MemberEvent;
import akka.event.Logging;
import akka.event.LoggingAdapter;
//#facade
public class StatsFacade extends UntypedActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
Cluster cluster = Cluster.get(getContext().system());
ActorRef currentMaster = null;
boolean currentMasterCreatedByMe = false;
//subscribe to cluster changes, MemberEvent
@Override
public void preStart() {
cluster.subscribe(getSelf(), LeaderChanged.class);
}
//re-subscribe when restart
@Override
public void postStop() {
cluster.unsubscribe(getSelf());
}
@Override
public void onReceive(Object message) {
if (message instanceof StatsJob && currentMaster == null) {
getSender()
.tell(new JobFailed("Service unavailable, try again later"),
getSelf());
} else if (message instanceof StatsJob) {
StatsJob job = (StatsJob) message;
currentMaster.forward(job, getContext());
} else if (message instanceof CurrentClusterState) {
CurrentClusterState state = (CurrentClusterState) message;
updateCurrentMaster(state.getLeader());
} else if (message instanceof LeaderChanged) {
LeaderChanged leaderChanged = (LeaderChanged) message;
updateCurrentMaster(leaderChanged.getLeader());
} else {
unhandled(message);
}
}
void updateCurrentMaster(Address leaderAddress) {
if (leaderAddress == null)
return;
if (leaderAddress.equals(cluster.selfAddress())) {
if (!currentMasterCreatedByMe) {
log.info("Creating new statsService master at [{}]", leaderAddress);
currentMaster = getContext().actorOf(
new Props(StatsService.class), "statsService");
currentMasterCreatedByMe = true;
}
} else {
if (currentMasterCreatedByMe) {
getContext().stop(currentMaster);
}
log.info("Using statsService master at [{}]", leaderAddress);
currentMaster = getContext().actorFor(
getSelf().path().toStringWithAddress(leaderAddress)
+ "/statsService");
currentMasterCreatedByMe = false;
}
}
}
//#facade

View file

@ -0,0 +1,55 @@
package sample.cluster.stats.japi;
import java.io.Serializable;
//#messages
public interface StatsMessages {
public static class StatsJob implements Serializable {
private final String text;
public StatsJob(String text) {
this.text = text;
}
public String getText() {
return text;
}
}
public static class StatsResult implements Serializable {
private final double meanWordLength;
public StatsResult(double meanWordLength) {
this.meanWordLength = meanWordLength;
}
public double getMeanWordLength() {
return meanWordLength;
}
@Override
public String toString() {
return "meanWordLength: " + meanWordLength;
}
}
public static class JobFailed implements Serializable {
private final String reason;
public JobFailed(String reason) {
this.reason = reason;
}
public String getReason() {
return reason;
}
@Override
public String toString() {
return "JobFailed(" + reason + ")";
}
}
}
//#messages

View file

@ -0,0 +1,98 @@
package sample.cluster.stats.japi;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import sample.cluster.stats.japi.StatsMessages.JobFailed;
import sample.cluster.stats.japi.StatsMessages.StatsJob;
import sample.cluster.stats.japi.StatsMessages.StatsResult;
import scala.concurrent.forkjoin.ThreadLocalRandom;
import scala.concurrent.util.Duration;
import scala.concurrent.util.FiniteDuration;
import akka.actor.ActorRef;
import akka.actor.Address;
import akka.actor.Cancellable;
import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent.CurrentClusterState;
import akka.cluster.ClusterEvent.MemberEvent;
import akka.cluster.ClusterEvent.MemberUp;
import akka.cluster.Member;
import akka.cluster.MemberStatus;
public class StatsSampleClient extends UntypedActor {
final String servicePath;
final Cancellable tickTask;
final Set<Address> nodes = new HashSet<Address>();
Cluster cluster = Cluster.get(getContext().system());
public StatsSampleClient(String servicePath) {
this.servicePath = servicePath;
FiniteDuration interval = Duration.create(2, TimeUnit.SECONDS);
tickTask = getContext()
.system()
.scheduler()
.schedule(interval, interval, getSelf(), "tick",
getContext().dispatcher());
}
//subscribe to cluster changes, MemberEvent
@Override
public void preStart() {
cluster.subscribe(getSelf(), MemberEvent.class);
}
//re-subscribe when restart
@Override
public void postStop() {
cluster.unsubscribe(getSelf());
tickTask.cancel();
}
@Override
public void onReceive(Object message) {
if (message.equals("tick") && !nodes.isEmpty()) {
// just pick any one
List<Address> nodesList = new ArrayList<Address>(nodes);
Address address = nodesList.get(ThreadLocalRandom.current().nextInt(
nodesList.size()));
ActorRef service = getContext().actorFor(address + servicePath);
service.tell(new StatsJob("this is the text that will be analyzed"),
getSelf());
} else if (message instanceof StatsResult) {
StatsResult result = (StatsResult) message;
System.out.println(result);
} else if (message instanceof JobFailed) {
JobFailed failed = (JobFailed) message;
System.out.println(failed);
} else if (message instanceof CurrentClusterState) {
CurrentClusterState state = (CurrentClusterState) message;
nodes.clear();
for (Member member : state.getMembers()) {
if (member.status().equals(MemberStatus.up())) {
nodes.add(member.address());
}
}
} else if (message instanceof MemberUp) {
MemberUp mUp = (MemberUp) message;
nodes.add(mUp.member().address());
} else if (message instanceof MemberEvent) {
MemberEvent other = (MemberEvent) message;
nodes.remove(other.member().address());
} else {
unhandled(message);
}
}
}

View file

@ -0,0 +1,20 @@
package sample.cluster.stats.japi;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
public class StatsSampleClientMain {
public static void main(String[] args) throws Exception {
ActorSystem system = ActorSystem.create("ClusterSystem");
system.actorOf(new Props(new UntypedActorFactory() {
@Override
public UntypedActor create() {
return new StatsSampleClient("/user/statsService");
}
}), "client");
}
}

View file

@ -0,0 +1,37 @@
package sample.cluster.stats.japi;
import akka.actor.ActorSystem;
import akka.actor.Props;
import com.typesafe.config.ConfigFactory;
public class StatsSampleMain {
public static void main(String[] args) throws Exception {
// Override the configuration of the port
// when specified as program argument
if (args.length > 0)
System.setProperty("akka.remote.netty.port", args[0]);
//#start-router-lookup
ActorSystem system = ActorSystem.create("ClusterSystem",
ConfigFactory.parseString(
"akka.actor.deployment { \n" +
" /statsService/workerRouter { \n" +
" router = consistent-hashing \n" +
" nr-of-instances = 100 \n" +
" cluster { \n" +
" enabled = on \n" +
" routees-path = \"/user/statsWorker\" \n" +
" allow-local-routees = on \n" +
" } \n" +
" } \n" +
"} \n")
.withFallback(ConfigFactory.load()));
system.actorOf(new Props(StatsWorker.class), "statsWorker");
system.actorOf(new Props(StatsService.class), "statsService");
//#start-router-lookup
}
}

View file

@ -0,0 +1,21 @@
package sample.cluster.stats.japi;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
public class StatsSampleOneMasterClientMain {
public static void main(String[] args) throws Exception {
ActorSystem system = ActorSystem.create("ClusterSystem");
system.actorOf(new Props(new UntypedActorFactory() {
@Override
public UntypedActor create() {
return new StatsSampleClient("/user/statsFacade");
}
}), "client");
}
}

View file

@ -0,0 +1,37 @@
package sample.cluster.stats.japi;
import akka.actor.ActorSystem;
import akka.actor.Props;
import com.typesafe.config.ConfigFactory;
public class StatsSampleOneMasterMain {
public static void main(String[] args) throws Exception {
// Override the configuration of the port
// when specified as program argument
if (args.length > 0)
System.setProperty("akka.remote.netty.port", args[0]);
//#start-router-deploy
ActorSystem system = ActorSystem.create("ClusterSystem",
ConfigFactory.parseString(
"akka.actor.deployment { \n" +
" /statsFacade/statsService/workerRouter { \n" +
" router = consistent-hashing \n" +
" nr-of-instances = 100 \n" +
" cluster { \n" +
" enabled = on \n" +
" max-nr-of-instances-per-node = 3 \n" +
" allow-local-routees = off \n" +
" } \n" +
" } \n" +
"} \n")
.withFallback(ConfigFactory.load()));
system.actorOf(new Props(StatsFacade.class), "statsFacade");
//#start-router-deploy
}
}

View file

@ -0,0 +1,83 @@
package sample.cluster.stats.japi;
import sample.cluster.stats.japi.StatsMessages.StatsJob;
//#imports
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
import akka.cluster.routing.ClusterRouterConfig;
import akka.cluster.routing.ClusterRouterSettings;
import akka.routing.ConsistentHashingRouter;
import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope;
import akka.routing.FromConfig;
//#imports
//#service
public class StatsService extends UntypedActor {
ActorRef workerRouter = getContext().actorOf(
new Props(StatsWorker.class).withRouter(FromConfig.getInstance()),
"workerRouter");
@Override
public void onReceive(Object message) {
if (message instanceof StatsJob) {
StatsJob job = (StatsJob) message;
if (job.getText().equals("")) {
unhandled(message);
} else {
final String[] words = job.getText().split(" ");
final ActorRef replyTo = getSender();
// create actor that collects replies from workers
ActorRef aggregator = getContext().actorOf(
new Props(new UntypedActorFactory() {
@Override
public UntypedActor create() {
return new StatsAggregator(words.length, replyTo);
}
}));
// send each word to a worker
for (String word : words) {
workerRouter.tell(new ConsistentHashableEnvelope(word, word),
aggregator);
}
}
} else {
unhandled(message);
}
}
}
//#service
//not used, only for documentation
abstract class StatsService2 extends UntypedActor {
//#router-lookup-in-code
int totalInstances = 100;
String routeesPath = "/user/statsWorker";
boolean allowLocalRoutees = true;
ActorRef workerRouter = getContext().actorOf(
new Props(StatsWorker.class).withRouter(new ClusterRouterConfig(
new ConsistentHashingRouter(0), new ClusterRouterSettings(
totalInstances, routeesPath, allowLocalRoutees))),
"workerRouter2");
//#router-lookup-in-code
}
//not used, only for documentation
abstract class StatsService3 extends UntypedActor {
//#router-deploy-in-code
int totalInstances = 100;
int maxInstancesPerNode = 3;
boolean allowLocalRoutees = false;
ActorRef workerRouter = getContext().actorOf(
new Props(StatsWorker.class).withRouter(new ClusterRouterConfig(
new ConsistentHashingRouter(0), new ClusterRouterSettings(
totalInstances, maxInstancesPerNode, allowLocalRoutees))),
"workerRouter3");
//#router-deploy-in-code
}

View file

@ -0,0 +1,30 @@
package sample.cluster.stats.japi;
import java.util.HashMap;
import java.util.Map;
import akka.actor.UntypedActor;
//#worker
public class StatsWorker extends UntypedActor {
Map<String, Integer> cache = new HashMap<String, Integer>();
@Override
public void onReceive(Object message) {
if (message instanceof String) {
String word = (String) message;
Integer length = cache.get(word);
if (length == null) {
length = word.length();
cache.put(word, length);
}
getSender().tell(length, getSelf());
} else {
unhandled(message);
}
}
}
//#worker

View file

@ -0,0 +1,65 @@
package sample.cluster.transformation.japi;
import static sample.cluster.transformation.japi.TransformationMessages.BACKEND_REGISTRATION;
import sample.cluster.transformation.japi.TransformationMessages.TransformationJob;
import sample.cluster.transformation.japi.TransformationMessages.TransformationResult;
//#imports
import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent.CurrentClusterState;
import akka.cluster.ClusterEvent.MemberEvent;
import akka.cluster.ClusterEvent.MemberUp;
import akka.cluster.Member;
import akka.cluster.MemberStatus;
//#imports
//#backend
public class TransformationBackend extends UntypedActor {
Cluster cluster = Cluster.get(getContext().system());
//subscribe to cluster changes, MemberEvent
@Override
public void preStart() {
cluster.subscribe(getSelf(), MemberEvent.class);
}
//re-subscribe when restart
@Override
public void postStop() {
cluster.unsubscribe(getSelf());
}
@Override
public void onReceive(Object message) {
if (message instanceof TransformationJob) {
TransformationJob job = (TransformationJob) message;
getSender()
.tell(new TransformationResult(job.getText().toUpperCase()),
getSelf());
} else if (message instanceof CurrentClusterState) {
CurrentClusterState state = (CurrentClusterState) message;
for (Member member : state.getMembers()) {
if (member.status().equals(MemberStatus.up())) {
register(member);
}
}
} else if (message instanceof MemberUp) {
MemberUp mUp = (MemberUp) message;
register(mUp.member());
} else {
unhandled(message);
}
}
//try to register to all nodes, even though there
// might not be any frontend on all nodes
void register(Member member) {
getContext().actorFor(member.address() + "/user/frontend").tell(
BACKEND_REGISTRATION, getSelf());
}
}
//#backend

View file

@ -0,0 +1,20 @@
package sample.cluster.transformation.japi;
import akka.actor.ActorSystem;
import akka.actor.Props;
public class TransformationBackendMain {
public static void main(String[] args) throws Exception {
// Override the configuration of the port
// when specified as program argument
if (args.length > 0)
System.setProperty("akka.remote.netty.port", args[0]);
ActorSystem system = ActorSystem.create("ClusterSystem");
system.actorOf(new Props(TransformationBackend.class), "backend");
}
}

View file

@ -0,0 +1,48 @@
package sample.cluster.transformation.japi;
import static sample.cluster.transformation.japi.TransformationMessages.BACKEND_REGISTRATION;
import java.util.ArrayList;
import java.util.List;
import sample.cluster.transformation.japi.TransformationMessages.JobFailed;
import sample.cluster.transformation.japi.TransformationMessages.TransformationJob;
import akka.actor.ActorRef;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
//#frontend
public class TransformationFrontend extends UntypedActor {
List<ActorRef> backends = new ArrayList<ActorRef>();
int jobCounter = 0;
@Override
public void onReceive(Object message) {
if ((message instanceof TransformationJob) && backends.isEmpty()) {
TransformationJob job = (TransformationJob) message;
getSender().tell(
new JobFailed("Service unavailable, try again later", job),
getSender());
} else if (message instanceof TransformationJob) {
TransformationJob job = (TransformationJob) message;
jobCounter++;
backends.get(jobCounter % backends.size())
.forward(job, getContext());
} else if (message.equals(BACKEND_REGISTRATION)) {
getContext().watch(getSender());
backends.add(getSender());
} else if (message instanceof Terminated) {
Terminated terminated = (Terminated) message;
backends.remove(terminated.getActor());
} else {
unhandled(message);
}
}
}
//#frontend

View file

@ -0,0 +1,45 @@
package sample.cluster.transformation.japi;
import java.util.concurrent.TimeUnit;
import sample.cluster.transformation.japi.TransformationMessages.TransformationJob;
import scala.concurrent.ExecutionContext;
import scala.concurrent.util.Duration;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.dispatch.OnSuccess;
import akka.util.Timeout;
import static akka.pattern.Patterns.ask;
public class TransformationFrontendMain {
public static void main(String[] args) throws Exception {
// Override the configuration of the port
// when specified as program argument
if (args.length > 0)
System.setProperty("akka.remote.netty.port", args[0]);
ActorSystem system = ActorSystem.create("ClusterSystem");
ActorRef frontend = system.actorOf(new Props(
TransformationFrontend.class), "frontend");
Timeout timeout = new Timeout(Duration.create(5, TimeUnit.SECONDS));
final ExecutionContext ec = system.dispatcher();
for (int n = 1; n <= 120; n++) {
ask(frontend, new TransformationJob("hello-" + n), timeout)
.onSuccess(new OnSuccess<Object>() {
public void onSuccess(Object result) {
System.out.println(result);
}
}, ec);
// wait a while until next request,
// to avoid flooding the console with output
Thread.sleep(2000);
}
system.shutdown();
}
}

View file

@ -0,0 +1,63 @@
package sample.cluster.transformation.japi;
import java.io.Serializable;
//#messages
public interface TransformationMessages {
public static class TransformationJob implements Serializable {
private final String text;
public TransformationJob(String text) {
this.text = text;
}
public String getText() {
return text;
}
}
public static class TransformationResult implements Serializable {
private final String text;
public TransformationResult(String text) {
this.text = text;
}
public String getText() {
return text;
}
@Override
public String toString() {
return "TransformationResult(" + text + ")";
}
}
public static class JobFailed implements Serializable {
private final String reason;
private final TransformationJob job;
public JobFailed(String reason, TransformationJob job) {
this.reason = reason;
this.job = job;
}
public String getReason() {
return reason;
}
public TransformationJob getJob() {
return job;
}
@Override
public String toString() {
return "JobFailed(" + reason + ")";
}
}
public static final String BACKEND_REGISTRATION = "BackendRegistration";
}
//#messages

View file

@ -24,7 +24,7 @@ object SimpleClusterApp {
log.info("Member is Up: {}", member)
case MemberUnreachable(member)
log.info("Member detected as unreachable: {}", member)
case _ // ignore
case _: ClusterDomainEvent // ignore
}
}), name = "clusterListener")

View file

@ -39,6 +39,7 @@ class StatsService extends Actor {
case StatsJob(text) if text != ""
val words = text.split(" ")
val replyTo = sender // important to not close over sender
// create actor that collects replies from workers
val aggregator = context.actorOf(Props(
new StatsAggregator(words.size, replyTo)))
words foreach { word
@ -121,7 +122,7 @@ class StatsFacade extends Actor with ActorLogging {
currentMaster foreach { context.stop(_) }
log.info("Using statsService master at [{}]", leaderAddress)
currentMaster = Some(context.actorFor(
context.self.path.toStringWithAddress(leaderAddress) + "/statsService"))
self.path.toStringWithAddress(leaderAddress) + "/statsService"))
currentMasterCreatedByMe = false
}
}

View file

@ -42,6 +42,8 @@ object TransformationFrontend {
(frontend ? TransformationJob("hello-" + n)) onSuccess {
case result println(result)
}
// wait a while until next request,
// to avoid flooding the console with output
Thread.sleep(2000)
}
system.shutdown()

View file

@ -0,0 +1,120 @@
package sample.cluster.stats.japi
import language.postfixOps
import scala.concurrent.util.duration._
import akka.actor.Props
import akka.actor.RootActorPath
import akka.cluster.Cluster
import akka.cluster.Member
import akka.cluster.MemberStatus
import akka.cluster.ClusterEvent.CurrentClusterState
import akka.cluster.ClusterEvent.MemberUp
import sample.cluster.stats.japi.StatsMessages._
import akka.remote.testkit.MultiNodeConfig
import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfterAll
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import akka.remote.testkit.MultiNodeSpec
import akka.testkit.ImplicitSender
object StatsSampleJapiSpecConfig extends MultiNodeConfig {
// register the named roles (nodes) of the test
val first = role("first")
val second = role("second")
val third = role("thrid")
// this configuration will be used for all nodes
// note that no fixed host names and ports are used
commonConfig(ConfigFactory.parseString("""
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-join = off
akka.actor.deployment {
/statsService/workerRouter {
router = consistent-hashing
nr-of-instances = 100
cluster {
enabled = on
routees-path = "/user/statsWorker"
allow-local-routees = on
}
}
}
"""))
}
// need one concrete test class per node
class StatsSampleJapiSpecMultiJvmNode1 extends StatsSampleJapiSpec
class StatsSampleJapiSpecMultiJvmNode2 extends StatsSampleJapiSpec
class StatsSampleJapiSpecMultiJvmNode3 extends StatsSampleJapiSpec
abstract class StatsSampleJapiSpec extends MultiNodeSpec(StatsSampleJapiSpecConfig)
with WordSpec with MustMatchers with BeforeAndAfterAll
with ImplicitSender {
import StatsSampleJapiSpecConfig._
override def initialParticipants = roles.size
override def beforeAll() = multiNodeSpecBeforeAll()
override def afterAll() = multiNodeSpecAfterAll()
"The japi stats sample" must {
"illustrate how to startup cluster" in within(10 seconds) {
Cluster(system).subscribe(testActor, classOf[MemberUp])
expectMsgClass(classOf[CurrentClusterState])
val firstAddress = node(first).address
val secondAddress = node(second).address
val thirdAddress = node(third).address
Cluster(system) join firstAddress
system.actorOf(Props[StatsWorker], "statsWorker")
system.actorOf(Props[StatsService], "statsService")
expectMsgAllOf(
MemberUp(Member(firstAddress, MemberStatus.Up)),
MemberUp(Member(secondAddress, MemberStatus.Up)),
MemberUp(Member(thirdAddress, MemberStatus.Up)))
Cluster(system).unsubscribe(testActor)
testConductor.enter("all-up")
}
"show usage of the statsService from one node" in within(5 seconds) {
runOn(second) {
val service = system.actorFor(node(third) / "user" / "statsService")
service ! new StatsJob("this is the text that will be analyzed")
val meanWordLength = expectMsgPF() {
case r: StatsResult r.getMeanWordLength
}
meanWordLength must be(3.875 plusOrMinus 0.001)
}
testConductor.enter("done-2")
}
//#test-statsService
"show usage of the statsService from all nodes" in within(5 seconds) {
val service = system.actorFor(node(third) / "user" / "statsService")
service ! new StatsJob("this is the text that will be analyzed")
val meanWordLength = expectMsgPF() {
case r: StatsResult r.getMeanWordLength
}
meanWordLength must be(3.875 plusOrMinus 0.001)
testConductor.enter("done-2")
}
}
}

View file

@ -0,0 +1,105 @@
package sample.cluster.stats.japi
import language.postfixOps
import scala.concurrent.util.duration._
import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfterAll
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import akka.actor.Props
import akka.actor.RootActorPath
import akka.cluster.Cluster
import akka.cluster.Member
import akka.cluster.MemberStatus
import akka.cluster.ClusterEvent.CurrentClusterState
import akka.cluster.ClusterEvent.MemberUp
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit.ImplicitSender
import sample.cluster.stats.japi.StatsMessages._
object StatsSampleSingleMasterJapiSpecConfig extends MultiNodeConfig {
// register the named roles (nodes) of the test
val first = role("first")
val second = role("second")
val third = role("thrid")
// this configuration will be used for all nodes
// note that no fixed host names and ports are used
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-join = off
akka.actor.deployment {
/statsFacade/statsService/workerRouter {
router = consistent-hashing
nr-of-instances = 100
cluster {
enabled = on
max-nr-of-instances-per-node = 3
allow-local-routees = off
}
}
}
"""))
}
// need one concrete test class per node
class StatsSampleSingleMasterJapiSpecMultiJvmNode1 extends StatsSampleSingleMasterJapiSpec
class StatsSampleSingleMasterJapiSpecMultiJvmNode2 extends StatsSampleSingleMasterJapiSpec
class StatsSampleSingleMasterJapiSpecMultiJvmNode3 extends StatsSampleSingleMasterJapiSpec
abstract class StatsSampleSingleMasterJapiSpec extends MultiNodeSpec(StatsSampleSingleMasterJapiSpecConfig)
with WordSpec with MustMatchers with BeforeAndAfterAll with ImplicitSender {
import StatsSampleSingleMasterJapiSpecConfig._
override def initialParticipants = roles.size
override def beforeAll() = multiNodeSpecBeforeAll()
override def afterAll() = multiNodeSpecAfterAll()
"The japi stats sample with single master" must {
"illustrate how to startup cluster" in within(10 seconds) {
Cluster(system).subscribe(testActor, classOf[MemberUp])
expectMsgClass(classOf[CurrentClusterState])
Cluster(system) join node(first).address
system.actorOf(Props[StatsFacade], "statsFacade")
expectMsgAllOf(
MemberUp(Member(node(first).address, MemberStatus.Up)),
MemberUp(Member(node(second).address, MemberStatus.Up)),
MemberUp(Member(node(third).address, MemberStatus.Up)))
Cluster(system).unsubscribe(testActor)
testConductor.enter("all-up")
}
"show usage of the statsFacade" in within(5 seconds) {
val facade = system.actorFor(RootActorPath(node(third).address) / "user" / "statsFacade")
// eventually the service should be ok,
// worker nodes might not be up yet
awaitCond {
facade ! new StatsJob("this is the text that will be analyzed")
expectMsgPF() {
case unavailble: JobFailed false
case r: StatsResult
r.getMeanWordLength must be(3.875 plusOrMinus 0.001)
true
}
}
testConductor.enter("done")
}
}
}

View file

@ -0,0 +1,124 @@
package sample.cluster.transformation.japi
import language.postfixOps
import scala.concurrent.util.duration._
import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfterAll
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import akka.actor.Props
import akka.cluster.Cluster
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit.ImplicitSender
import sample.cluster.transformation.japi.TransformationMessages._
object TransformationSampleJapiSpecConfig extends MultiNodeConfig {
// register the named roles (nodes) of the test
val frontend1 = role("frontend1")
val frontend2 = role("frontend2")
val backend1 = role("backend1")
val backend2 = role("backend2")
val backend3 = role("backend3")
// this configuration will be used for all nodes
// note that no fixed host names and ports are used
commonConfig(ConfigFactory.parseString("""
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-join = off
"""))
}
// need one concrete test class per node
class TransformationSampleJapiSpecMultiJvmNode1 extends TransformationSampleJapiSpec
class TransformationSampleJapiSpecMultiJvmNode2 extends TransformationSampleJapiSpec
class TransformationSampleJapiSpecMultiJvmNode3 extends TransformationSampleJapiSpec
class TransformationSampleJapiSpecMultiJvmNode4 extends TransformationSampleJapiSpec
class TransformationSampleJapiSpecMultiJvmNode5 extends TransformationSampleJapiSpec
abstract class TransformationSampleJapiSpec extends MultiNodeSpec(TransformationSampleJapiSpecConfig)
with WordSpec with MustMatchers with BeforeAndAfterAll with ImplicitSender {
import TransformationSampleJapiSpecConfig._
override def initialParticipants = roles.size
override def beforeAll() = multiNodeSpecBeforeAll()
override def afterAll() = multiNodeSpecAfterAll()
"The japi transformation sample" must {
"illustrate how to start first frontend" in {
runOn(frontend1) {
// this will only run on the 'first' node
Cluster(system) join node(frontend1).address
val transformationFrontend = system.actorOf(Props[TransformationFrontend], name = "frontend")
transformationFrontend ! new TransformationJob("hello")
expectMsgPF() {
// no backends yet, service unavailble
case f: JobFailed
}
}
// this will run on all nodes
// use barrier to coordinate test steps
testConductor.enter("frontend1-started")
}
"illustrate how a backend automatically registers" in within(15 seconds) {
runOn(backend1) {
Cluster(system) join node(frontend1).address
system.actorOf(Props[TransformationBackend], name = "backend")
}
testConductor.enter("backend1-started")
runOn(frontend1) {
assertServiceOk
}
testConductor.enter("frontend1-backend1-ok")
}
"illustrate how more nodes registers" in within(15 seconds) {
runOn(frontend2) {
Cluster(system) join node(frontend1).address
system.actorOf(Props[TransformationFrontend], name = "frontend")
}
runOn(backend2, backend3) {
Cluster(system) join node(backend1).address
system.actorOf(Props[TransformationBackend], name = "backend")
}
testConductor.enter("all-started")
runOn(frontend1, frontend2) {
assertServiceOk
}
testConductor.enter("all-ok")
}
}
def assertServiceOk: Unit = {
val transformationFrontend = system.actorFor("akka://" + system.name + "/user/frontend")
// eventually the service should be ok,
// backends might not have registered initially
awaitCond {
transformationFrontend ! new TransformationJob("hello")
expectMsgPF() {
case unavailble: JobFailed false
case r: TransformationResult
r.getText must be("HELLO")
true
}
}
}
}

View file

@ -118,7 +118,6 @@ object CallingThreadDispatcher {
* is then executed. It is possible to suspend an actor from within its call
* stack.
*
* @author Roland Kuhn
* @since 1.1
*/
class CallingThreadDispatcher(

View file

@ -16,7 +16,6 @@ import akka.pattern.ask
* overrides the dispatcher to CallingThreadDispatcher and sets the receiveTimeout to None. Otherwise,
* it acts just like a normal ActorRef. You may retrieve a reference to the underlying actor to test internal logic.
*
* @author Roland Kuhn
* @since 1.1
*/
class TestActorRef[T <: Actor](

View file

@ -31,7 +31,6 @@ import scala.concurrent.util.FiniteDuration
* assert (fsm.underlyingActor.getLog == IndexedSeq(FSMLogEntry(1, null, "hallo")))
* </code></pre>
*
* @author Roland Kuhn
* @since 1.2
*/
class TestFSMRef[S, D, T <: Actor](

View file

@ -637,7 +637,6 @@ trait TestKitBase {
* are scaled using their Duration.dilated method, which uses the
* TestKitExtension.Settings.TestTimeFactor settable via akka.conf entry "akka.test.timefactor".
*
* @author Roland Kuhn
* @since 1.1
*/
class TestKit(_system: ActorSystem) extends { implicit val system = _system } with TestKitBase

View file

@ -17,10 +17,7 @@ import scala.concurrent.util.Duration
/**
* Test whether TestActorRef behaves as an ActorRef should, besides its own spec.
*
* @author Roland Kuhn
*/
object TestActorRefSpec {
var counter = 4