Merge branch 'master' into wip-2349-multi-node-and-multi-jvm-doc-ban

Conflicts:
	project/plugins.sbt
This commit is contained in:
Björn Antonsson 2012-09-21 17:00:34 +02:00
commit 78597ed7c1
317 changed files with 1302 additions and 805 deletions

3
.gitignore vendored
View file

@ -49,7 +49,8 @@ multiverse.log
.eprj
.*.swp
akka-docs/_build/
akka-docs/epilog_rst
akka-docs/rst_html
akka-docs/rst_latex
*.pyc
akka-docs/exts/
_akka_cluster/

View file

@ -2,7 +2,7 @@ package akka.actor;
public class JavaAPITestActor extends UntypedActor {
public void onReceive(Object msg) {
getSender().tell("got it!");
getSender().tell("got it!", getSelf());
getContext().getChildren();
}
}

View file

@ -12,6 +12,6 @@ public class NonPublicClass {
class MyNonPublicActorClass extends UntypedActor {
@Override public void onReceive(Object msg) {
getSender().tell(msg);
getSender().tell(msg, getSelf());
}
}

View file

@ -1,14 +1,10 @@
package akka.actor;
import akka.actor.ActorSystem;
import akka.japi.Creator;
import akka.testkit.AkkaSpec;
import com.typesafe.config.ConfigFactory;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
import com.typesafe.config.ConfigFactory;
public class StashJavaAPI {
@ -16,7 +12,8 @@ public class StashJavaAPI {
@BeforeClass
public static void beforeAll() {
system = ActorSystem.create("StashJavaAPI", ConfigFactory.parseString(ActorWithStashSpec.testConf()));
system = ActorSystem.create("StashJavaAPI",
ConfigFactory.parseString(ActorWithStashSpec.testConf()));
}
@AfterClass
@ -27,10 +24,11 @@ public class StashJavaAPI {
@Test
public void mustBeAbleToUseStash() {
ActorRef ref = system.actorOf(new Props(StashJavaAPITestActor.class).withDispatcher("my-dispatcher"));
ActorRef ref = system.actorOf(new Props(StashJavaAPITestActor.class)
.withDispatcher("my-dispatcher"));
ref.tell("Hello", ref);
ref.tell("Hello", ref);
ref.tell(new Object());
ref.tell(new Object(), null);
}
}

View file

@ -4,10 +4,11 @@ import static org.junit.Assert.*;
public class StashJavaAPITestActor extends UntypedActorWithStash {
int count = 0;
public void onReceive(Object msg) {
if (msg instanceof String) {
if (count < 0) {
getSender().tell(new Integer(((String) msg).length()));
getSender().tell(new Integer(((String) msg).length()), getSelf());
} else if (count == 2) {
count = -1;
unstashAll();

View file

@ -3,14 +3,9 @@
*/
package akka.routing;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.routing.RoundRobinRouter;
import akka.testkit.ExtractRoute;
public class CustomRouteTest {

View file

@ -371,8 +371,8 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
val timeout = Timeout(20000)
val ref = system.actorOf(Props(new Actor {
def receive = {
case 5 sender.tell("five")
case 0 sender.tell("null")
case 5 sender ! "five"
case 0 sender ! "null"
}
}))

View file

@ -32,7 +32,7 @@ object ConsistencySpec {
case step: Long
if (lastStep != (step - 1))
sender.tell("Test failed: Last step %s, this step %s".format(lastStep, step))
sender ! "Test failed: Last step %s, this step %s".format(lastStep, step)
var shouldBeFortyTwo = left.value + right.value
if (shouldBeFortyTwo != 42)

View file

@ -59,11 +59,11 @@ object Ticket669Spec {
}
override def preRestart(reason: scala.Throwable, msg: Option[Any]) {
sender.tell("failure1")
sender ! "failure1"
}
override def postStop() {
sender.tell("failure2")
sender ! "failure2"
}
}
}

View file

@ -86,7 +86,7 @@ object ActorModelSpec {
case Wait(time) ack; Thread.sleep(time); busy.switchOff()
case WaitAck(time, l) ack; Thread.sleep(time); l.countDown(); busy.switchOff()
case Reply(msg) ack; sender ! msg; busy.switchOff()
case TryReply(msg) ack; sender.tell(msg); busy.switchOff()
case TryReply(msg) ack; sender.tell(msg, null); busy.switchOff()
case Forward(to, msg) ack; to.forward(msg); busy.switchOff()
case CountDown(latch) ack; latch.countDown(); busy.switchOff()
case Increment(count) ack; count.incrementAndGet(); busy.switchOff()

View file

@ -56,7 +56,7 @@ class PriorityDispatcherSpec extends AkkaSpec(PriorityDispatcherSpec.config) wit
def receive = {
case i: Int acc = i :: acc
case 'Result sender.tell(acc)
case 'Result sender ! acc
}
}).withDispatcher(dispatcherKey)).asInstanceOf[InternalActorRef]

View file

@ -535,7 +535,7 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
case _id: Int if (_id == id)
case x {
Thread sleep 100 * id
sender.tell(id)
sender ! id
}
}

View file

@ -6,9 +6,6 @@ package akka.dispatch;
import akka.util.Unsafe;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
abstract class AbstractMessageDispatcher {
final static long shutdownScheduleOffset;
final static long inhabitantsOffset;

View file

@ -504,7 +504,7 @@ private[akka] class ActorCell(
if (success) true
else {
val parent: Class[_] = clazz.getSuperclass
if (parent eq null) throw new IllegalActorStateException(toString + " is not an Actor since it have not mixed in the 'Actor' trait")
if (parent eq null) throw new IllegalActorStateException(actorInstance.getClass + " is not an Actor since it have not mixed in the 'Actor' trait")
lookupAndSetField(parent, actor, name, value)
}
}

View file

@ -92,13 +92,14 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable
* actor.tell(message);
* </pre>
*/
@deprecated("use the two-arg variant (typically getSelf() as second arg)", "2.1")
final def tell(msg: Any): Unit = this.!(msg)(null: ActorRef)
/**
* Java API. <p/>
* Sends the specified message to the sender, i.e. fire-and-forget
* semantics, including the sender reference if possible (not supported on
* all senders).<p/>
* semantics, including the sender reference if possible (pass `null` if
* there is nobody to reply to).<p/>
* <pre>
* actor.tell(message, context);
* </pre>

View file

@ -195,7 +195,7 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, val self: Rep
lock.unlock()
}
} else {
system.deadLetters.tell(DeadLetter(message, sender, self))
system.deadLetters ! DeadLetter(message, sender, self)
}
}
def sendSystemMessage(msg: SystemMessage): Unit = {
@ -209,7 +209,7 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, val self: Rep
} else {
// FIXME: once we have guaranteed delivery of system messages, hook this in!
system.eventStream.publish(Warning(self.path.toString, getClass, "dropping system message " + msg + " due to lock timeout"))
system.deadLetters.tell(DeadLetter(msg, self, self))
system.deadLetters ! DeadLetter(msg, self, self)
}
}
def isLocal = true

View file

@ -57,7 +57,7 @@ private[akka] trait Dispatch { this: ActorCell ⇒
if (sendSupervise) {
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
parent.sendSystemMessage(akka.dispatch.Supervise(self, uid))
parent.tell(NullMessage) // read ScalaDoc of NullMessage to see why
parent ! NullMessage // read ScalaDoc of NullMessage to see why
}
// This call is expected to start off the actor by scheduling its mailbox.

View file

@ -194,7 +194,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
try if (a ne null) a.postStop()
finally try dispatcher.detach(this)
finally try parent.sendSystemMessage(ChildTerminated(self))
finally try parent.tell(NullMessage) // read ScalaDoc of NullMessage to see why
finally try parent ! NullMessage // read ScalaDoc of NullMessage to see why
finally try tellWatchersWeDied(a)
finally try unwatchWatchedActors(a)
finally {

View file

@ -75,7 +75,7 @@ trait AskSupport {
*/
def ask(actorRef: ActorRef, message: Any)(implicit timeout: Timeout): Future[Any] = actorRef match {
case ref: InternalActorRef if ref.isTerminated
actorRef.tell(message)
actorRef ! message
Future.failed[Any](new AskTimeoutException("Recipient[%s] had already been terminated." format actorRef))
case ref: InternalActorRef
if (timeout.duration.length <= 0) Future.failed[Any](new IllegalArgumentException("Timeout length for an `ask` must be greater or equal to 1. Question not sent to [%s]" format actorRef))

View file

@ -9,9 +9,8 @@ import akka.japi.{ Function ⇒ JFunc, Procedure ⇒ JProc }
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.stm._
import concurrent.{ ExecutionContext, Future, Promise, Await }
import concurrent.util.Duration
import scala.concurrent.util.FiniteDuration
import scala.concurrent.{ ExecutionContext, Future, Promise, Await }
import scala.concurrent.util.{ FiniteDuration, Duration }
/**
* Used internally to send functions.

View file

@ -21,7 +21,6 @@ import akka.actor.Props;
import akka.testkit.AkkaSpec;
import akka.testkit.JavaTestKit;
/**
* @author Martin Krasser
*/
@ -35,25 +34,29 @@ public class ConsumerJavaTestBase {
}
@Test
public void shouldHandleExceptionThrownByActorAndGenerateCustomResponse() throws Exception {
new JavaTestKit(system) {{
public void shouldHandleExceptionThrownByActorAndGenerateCustomResponse()
throws Exception {
new JavaTestKit(system) {
{
String result = new EventFilter<String>(Exception.class) {
protected String run() {
FiniteDuration timeout = Duration.create(1, TimeUnit.SECONDS);
Camel camel = CamelExtension.get(system);
ExecutionContext executionContext = system.dispatcher();
try {
ActorRef ref = Await.result(
camel.activationFutureFor(system.actorOf(new Props(SampleErrorHandlingConsumer.class)), timeout, executionContext),
timeout);
return camel.template().requestBody("direct:error-handler-test-java", "hello", String.class);
}
catch (Exception e) {
@SuppressWarnings("unused")
ActorRef ref = Await.result(camel.activationFutureFor(
system.actorOf(new Props(SampleErrorHandlingConsumer.class)),
timeout, executionContext), timeout);
return camel.template().requestBody(
"direct:error-handler-test-java", "hello", String.class);
} catch (Exception e) {
return e.getMessage();
}
}
}.occurrences(1).exec();
assertEquals("error: hello", result);
}};
}
};
}
}

View file

@ -208,7 +208,7 @@ public class CustomRouteTestBase {
@Override
public void onReceive(Object message) {
this.getProducerTemplate().sendBody(to, "test");
getSender().tell(Ack.getInstance());
getSender().tell(Ack.getInstance(), getSelf());
}
}
}

View file

@ -42,7 +42,7 @@ public class SampleErrorHandlingConsumer extends UntypedConsumerActor {
@Override
public void preRestart(Throwable reason, Option<Object> message){
getSender().tell(new Status.Failure(reason));
getSender().tell(new Status.Failure(reason), getSelf());
}
}

View file

@ -19,7 +19,7 @@ public class SampleUntypedConsumer extends UntypedConsumerActor {
CamelMessage msg = (CamelMessage)message;
String body = msg.getBodyAs(String.class, getCamelContext());
String header = msg.getHeaderAs("test", String.class,getCamelContext());
sender().tell(String.format("%s %s", body, header));
sender().tell(String.format("%s %s", body, header), getSelf());
}
}

View file

@ -36,9 +36,6 @@ akka {
# how often should the node send out gossip information?
gossip-interval = 1s
# how often should the node send out heartbeats?
heartbeat-interval = 1s
# how often should the leader perform maintenance tasks?
leader-actions-interval = 1s
@ -76,6 +73,9 @@ akka {
# akka.cluster.ClusterSettings parameters
implementation-class = "akka.cluster.AccrualFailureDetector"
# how often should the node send out heartbeats?
heartbeat-interval = 1s
# defines the failure detector threshold
# A low threshold is prone to generate many wrong suspicions but ensures
# a quick detection in the event of a real crash. Conversely, a high

View file

@ -23,6 +23,7 @@ class ClusterSettings(val config: Config, val systemName: String) {
Duration(getMilliseconds("akka.cluster.failure-detector.min-std-deviation"), MILLISECONDS)
final val FailureDetectorAcceptableHeartbeatPause: FiniteDuration =
Duration(getMilliseconds("akka.cluster.failure-detector.acceptable-heartbeat-pause"), MILLISECONDS)
final val HeartbeatInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.failure-detector.heartbeat-interval"), MILLISECONDS)
final val SeedNodes: IndexedSeq[Address] = getStringList("akka.cluster.seed-nodes").asScala.map {
case AddressFromURIString(addr) addr
@ -30,7 +31,6 @@ class ClusterSettings(val config: Config, val systemName: String) {
final val SeedNodeTimeout: FiniteDuration = Duration(getMilliseconds("akka.cluster.seed-node-timeout"), MILLISECONDS)
final val PeriodicTasksInitialDelay: FiniteDuration = Duration(getMilliseconds("akka.cluster.periodic-tasks-initial-delay"), MILLISECONDS)
final val GossipInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.gossip-interval"), MILLISECONDS)
final val HeartbeatInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.heartbeat-interval"), MILLISECONDS)
final val LeaderActionsInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.leader-actions-interval"), MILLISECONDS)
final val UnreachableNodesReaperInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-interval"), MILLISECONDS)
final val PublishStatsInterval: FiniteDuration = Duration(getMilliseconds("akka.cluster.publish-stats-interval"), MILLISECONDS)

View file

@ -8,7 +8,7 @@ import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import akka.actor.{ Address, ExtendedActorSystem }
import akka.remote.testconductor.RoleName
import akka.remote.testkit.{STMultiNodeSpec, MultiNodeSpec}
import akka.remote.testkit.{ STMultiNodeSpec, MultiNodeSpec }
import akka.testkit._
import scala.concurrent.util.duration._
import scala.concurrent.util.Duration
@ -35,11 +35,11 @@ object MultiNodeClusterSpec {
auto-down = off
jmx.enabled = off
gossip-interval = 200 ms
heartbeat-interval = 400 ms
leader-actions-interval = 200 ms
unreachable-nodes-reaper-interval = 200 ms
periodic-tasks-initial-delay = 300 ms
publish-stats-interval = 0 s # always, when it happens
failure-detector.heartbeat-interval = 400 ms
}
akka.remote.log-remote-lifecycle-events = off
akka.test {

View file

@ -48,6 +48,7 @@
</div>
<ul class="nav">
<li><a href="http://akka.io/docs">Documentation</a></li>
<li><a href="http://akka.io/faq">FAQ</a></li>
<li><a href="http://akka.io/downloads">Download</a></li>
<li><a href="http://groups.google.com/group/akka-user">Mailing List</a></li>
<li><a href="http://github.com/akka/akka">Code</a></li>
@ -111,15 +112,17 @@
<ul>
<li><h5>Akka</h5></li>
<li><a href="http://akka.io/docs">Documentation</a></li>
<li><a href="http://akka.io/faq">FAQ</a></li>
<li><a href="http://akka.io/downloads">Downloads</a></li>
<li><a href="http://akka.io/news">News</a></li>
<li><a href="http://letitcrash.com">Blog</a></li>
</ul>
<ul>
<li><h5>Contribute</h5></li>
<li><a href="http://akka.io/community">Community Projects</a></li>
<li><a href="http://github.com/akka/akka">Source Code</a></li>
<li><a href="http://groups.google.com/group/akka-user">Mailing List</a></li>
<li><a href="http://www.assembla.com/spaces/akka/tickets">Report a Bug</a></li>
<li><a href="http://doc.akka.io/docs/akka/current/project/issue-tracking.html">Report a Bug</a></li>
</ul>
<ul>
<li><h5>Company</h5></li>

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.5 KiB

View file

@ -16,10 +16,12 @@ The Akka cluster is a separate jar file. Make sure that you have the following d
.. parsed-literal::
"com.typesafe.akka" %% "akka-cluster" % "2.1-SNAPSHOT"
"com.typesafe.akka" %% "akka-cluster" % "@version@" @crossString@
If you are using the latest nightly build you should pick a timestamped Akka version from `<http://repo.typesafe.com/typesafe/snapshots/com/typesafe/akka/>`_. Don't use ``SNAPSHOT``. Note that the Scala version |scalaVersion|
is part of the artifactId.
If you are using the latest nightly build you should pick a timestamped Akka
version from
`<http://repo.typesafe.com/typesafe/snapshots/com/typesafe/akka/akka-cluster-experimental_@binVersion@/>`_.
We recommend against using ``SNAPSHOT`` in order to obtain stable builds.
A Simple Cluster Example
^^^^^^^^^^^^^^^^^^^^^^^^
@ -32,7 +34,7 @@ Try it out:
1. Add the following ``application.conf`` in your project, place it in ``src/main/resources``:
.. literalinclude:: ../../akka-samples/akka-sample-cluster/src/main/resources/application.conf
.. literalinclude:: ../../../akka-samples/akka-sample-cluster/src/main/resources/application.conf
:language: none
To enable cluster capabilities in your Akka project you should, at a minimum, add the :ref:`remoting-scala`
@ -47,7 +49,7 @@ ip-addresses or host names of the machines in ``application.conf`` instead of ``
2. Add the following main program to your project, place it in ``src/main/scala``:
.. literalinclude:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/simple/SimpleClusterApp.scala
.. literalinclude:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/simple/SimpleClusterApp.scala
:language: scala
@ -167,15 +169,15 @@ added or removed to the cluster dynamically.
In this example the following imports are used:
.. includecode:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/transformation/TransformationSample.scala#imports
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/transformation/TransformationSample.scala#imports
Messages:
.. includecode:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/transformation/TransformationSample.scala#messages
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/transformation/TransformationSample.scala#messages
The backend worker that performs the transformation job:
.. includecode:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/transformation/TransformationSample.scala#backend
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/transformation/TransformationSample.scala#backend
Note that the ``TransformationBackend`` actor subscribes to cluster events to detect new,
potential, frontend nodes, and send them a registration message so that they know
@ -183,7 +185,7 @@ that they can use the backend worker.
The frontend that receives user jobs and delegates to one of the registered backend workers:
.. includecode:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/transformation/TransformationSample.scala#frontend
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/transformation/TransformationSample.scala#frontend
Note that the ``TransformationFrontend`` actor watch the registered backend
to be able to remove it from its list of availble backend workers.
@ -212,6 +214,57 @@ frontend nodes and 3 backend nodes::
.. note:: The above example should probably be designed as two separate, frontend/backend, clusters, when there is a `cluster client for decoupling clusters <https://www.assembla.com/spaces/akka/tickets/1165>`_.
Failure Detector
^^^^^^^^^^^^^^^^
The nodes in the cluster monitor each other by sending heartbeats to detect if a node is
unreachable from the rest of the cluster. The heartbeat arrival times is interpreted
by an implementation of
`The Phi Accrual Failure Detector <http://ddg.jaist.ac.jp/pub/HDY+04.pdf>`_.
The suspicion level of failure is given by a value called *phi*.
The basic idea of the phi failure detector is to express the value of *phi* on a scale that
is dynamically adjusted to reflect current network conditions.
The value of *phi* is calculated as::
phi = -log10(1 - F(timeSinceLastHeartbeat)
where F is the cumulative distribution function of a normal distribution with mean
and standard deviation estimated from historical heartbeat inter-arrival times.
In the :ref:`cluster_configuration` you can adjust the ``akka.cluster.failure-detector.threshold``
to define when a *phi* value is considered to be a failure.
A low ``threshold`` is prone to generate many false positives but ensures
a quick detection in the event of a real crash. Conversely, a high ``threshold``
generates fewer mistakes but needs more time to detect actual crashes. The
default ``threshold`` is 8 and is appropriate for most situations. However in
cloud environments, such as Amazon EC2, the value could be increased to 12 in
order to account for network issues that sometimes occur on such platforms.
The following chart illustrates how *phi* increase with increasing time since the
previous heartbeat.
.. image:: images/phi1.png
Phi is calculated from the mean and standard deviation of historical
inter arrival times. The previous chart is an example for standard deviation
of 200 ms. If the heartbeats arrive with less deviation the curve becomes steeper,
i.e. it's possible to determine failure more quickly. The curve looks like this for
a standard deviation of 100 ms.
.. image:: images/phi2.png
To be able to survive sudden abnormalities, such as garbage collection pauses and
transient network failures the failure detector is configured with a margin,
``akka.cluster.failure-detector.acceptable-heartbeat-pause``. You may want to
adjust the :ref:`cluster_configuration` of this depending on you environment.
This is how the curve looks like for ``acceptable-heartbeat-pause`` configured to
3 seconds.
.. image:: images/phi3.png
Cluster Aware Routers
^^^^^^^^^^^^^^^^^^^^^
@ -224,7 +277,7 @@ routees are added to the router, according to the configuration.
When using a router with routees looked up on the cluster member nodes, i.e. the routees
are already running, the configuration for a router looks like this:
.. includecode:: ../../akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala#router-lookup-config
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala#router-lookup-config
It's the relative actor path defined in ``routees-path`` that identify what actor to lookup.
@ -234,12 +287,12 @@ added to the router when nodes join the cluster.
The same type of router could also have been defined in code:
.. includecode:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#router-lookup-in-code
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#router-lookup-in-code
When using a router with routees created and deployed on the cluster member nodes
the configuration for a router looks like this:
.. includecode:: ../../akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala#router-deploy-config
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala#router-deploy-config
``nr-of-instances`` defines total number of routees in the cluster, but the number of routees
@ -249,7 +302,7 @@ the cluster.
The same type of router could also have been defined in code:
.. includecode:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#router-deploy-in-code
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#router-deploy-in-code
See :ref:`cluster_configuration` section for further descriptions of the settings.
@ -267,19 +320,19 @@ the average number of characters per word when all results have been collected.
In this example we use the following imports:
.. includecode:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#imports
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#imports
Messages:
.. includecode:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#messages
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#messages
The worker that counts number of characters in each word:
.. includecode:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#worker
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#worker
The service that receives text from users and splits it up into words, delegates to workers and aggregates:
.. includecode:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#service
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#service
Note, nothing cluster specific so far, just plain actors.
@ -290,7 +343,7 @@ or with create and deploy of routees. Remember, routees are the workers in this
We start with the router setup with lookup of routees. All nodes start ``StatsService`` and
``StatsWorker`` actors and the router is configured with ``routees-path``:
.. includecode:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#start-router-lookup
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#start-router-lookup
This means that user requests can be sent to ``StatsService`` on any node and it will use
``StatsWorker`` on all nodes. There can only be one worker per node, but that worker could easily
@ -312,7 +365,7 @@ The above setup is nice for this example, but we will also take a look at how to
a single master node that creates and deploys workers. To keep track of a single
master we need one additional actor:
.. includecode:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#facade
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#facade
The ``StatsFacade`` receives text from users and delegates to the current ``StatsService``, the single
master. It listens to cluster events to create or lookup the ``StatsService`` depending on if
@ -322,7 +375,7 @@ i.e. it can change when new nodes join or when current leader leaves.
All nodes start ``StatsFacade`` and the router is now configured like this:
.. includecode:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#start-router-deploy
.. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala#start-router-deploy
This example is included in ``akka-samples/akka-sample-cluster``
@ -407,7 +460,7 @@ There are several configuration properties for the cluster. We refer to the foll
reference file for more information:
.. literalinclude:: ../../akka-cluster/src/main/resources/reference.conf
.. literalinclude:: ../../../akka-cluster/src/main/resources/reference.conf
:language: none
Cluster Scheduler

View file

Before

Width:  |  Height:  |  Size: 38 KiB

After

Width:  |  Height:  |  Size: 38 KiB

Before After
Before After

View file

Before

Width:  |  Height:  |  Size: 1.5 KiB

After

Width:  |  Height:  |  Size: 1.5 KiB

Before After
Before After

Binary file not shown.

After

Width:  |  Height:  |  Size: 26 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 25 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 26 KiB

View file

@ -64,7 +64,7 @@ public class DangerousJavaActor extends UntypedActor {
public Future<String> call() throws Exception {
return f;
}
}));
}), getSelf());
}
if ("block for me".equals(m)) {
getSender().tell(breaker
@ -74,7 +74,7 @@ public class DangerousJavaActor extends UntypedActor {
public String call() throws Exception {
return dangerousCall();
}
}));
}), getSelf());
}
}
}

View file

@ -22,5 +22,6 @@ class Java {
final Deadline deadline = Duration.create(10, "seconds").fromNow();
final Duration rest = deadline.timeLeft();
//#deadline
rest.toString();
}
}

View file

@ -7,7 +7,7 @@ import sys, os
# -- General configuration -----------------------------------------------------
sys.path.append(os.path.abspath('_sphinx/exts'))
sys.path.append(os.path.abspath('../_sphinx/exts'))
extensions = ['sphinx.ext.todo', 'includecode']
templates_path = ['_templates']
@ -17,29 +17,25 @@ exclude_patterns = ['_build', 'pending', 'disabled']
project = u'Akka'
copyright = u'2011, Typesafe Inc'
version = '2.1-SNAPSHOT'
release = '2.1-SNAPSHOT'
version = '@version@'
release = '@version@'
pygments_style = 'simple'
highlight_language = 'scala'
add_function_parentheses = False
show_authors = True
f = open('epilog_rst', 'U')
rst_epilog = "\n" + f.read()
f.close()
# -- Options for HTML output ---------------------------------------------------
html_theme = 'akka'
html_theme_path = ['_sphinx/themes']
html_favicon = '_sphinx/static/favicon.ico'
html_theme_path = ['../_sphinx/themes']
html_favicon = '../_sphinx/static/favicon.ico'
html_title = 'Akka Documentation'
html_logo = '_sphinx/static/logo.png'
html_logo = '../_sphinx/static/logo.png'
#html_favicon = None
html_static_path = ['_sphinx/static']
html_static_path = ['../_sphinx/static']
html_last_updated_fmt = '%b %d, %Y'
#html_sidebars = {}
@ -63,7 +59,7 @@ epub_language = "en"
epub_publisher = epub_author
epub_identifier = "http://doc.akka.io/docs/akka/snapshot/"
epub_scheme = "URL"
epub_cover = ("_sphinx/static/akka.png", "")
epub_cover = ("../_sphinx/static/akka.png", "")
# -- Options for LaTeX output --------------------------------------------------

View file

@ -46,8 +46,8 @@ multi-JVM testing (Simplified for clarity):
lazy val buildSettings = Defaults.defaultSettings ++ SbtMultiJvm.multiJvmSettings ++ Seq(
organization := "com.typesafe.akka",
version := "2.1-SNAPSHOT",
scalaVersion := "|scalaVersion|",
version := "@version@",
scalaVersion := "@scalaVersion@",
crossPaths := false
)

View file

Before

Width:  |  Height:  |  Size: 41 KiB

After

Width:  |  Height:  |  Size: 41 KiB

Before After
Before After

View file

Before

Width:  |  Height:  |  Size: 56 KiB

After

Width:  |  Height:  |  Size: 56 KiB

Before After
Before After

View file

@ -131,8 +131,9 @@ When an Actor Terminates
Once an actor terminates, i.e. fails in a way which is not handled by a
restart, stops itself or is stopped by its supervisor, it will free up its
resources, draining all remaining messages from its mailbox into the systems
“dead letter mailbox”. The mailbox is then replaced within the actor reference
with a system mailbox, redirecting all new messages “into the drain”. This
“dead letter mailbox” which will forward them to the EventStream as DeadLetters.
The mailbox is then replaced within the actor reference with a system mailbox,
redirecting all new messages to the EventStream as DeadLetters. This
is done on a best effort basis, though, so do not rely on it in order to
construct “guaranteed delivery”.

View file

@ -162,13 +162,13 @@ updates for the 2.1 release.*
Creating Actors
^^^^^^^^^^^^^^^
An actor system is typically started by creating actors above the guardian
An actor system is typically started by creating actors beneath the guardian
actor using the :meth:`ActorSystem.actorOf` method and then using
:meth:`ActorContext.actorOf` from within the created actors to spawn the actor
tree. These methods return a reference to the newly created actor. Each actor
has direct access to references for its parent, itself and its children. These
references may be sent within messages to other actors, enabling those to reply
directly.
has direct access (through its ``ActorContext``) to references for its parent,
itself and its children. These references may be sent within messages to other actors,
enabling those to reply directly.
Looking up Actors by Concrete Path
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

View file

@ -142,7 +142,7 @@ before or after using them to construct an actor system:
.. parsed-literal::
Welcome to Scala version |scalaVersion| (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_27).
Welcome to Scala version @scalaVersion@ (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_27).
Type in expressions to have them evaluated.
Type :help for more information.
@ -337,41 +337,41 @@ Each Akka module has a reference configuration file with the default values.
akka-actor
~~~~~~~~~~
.. literalinclude:: ../../akka-actor/src/main/resources/reference.conf
.. literalinclude:: ../../../akka-actor/src/main/resources/reference.conf
:language: none
akka-remote
~~~~~~~~~~~
.. literalinclude:: ../../akka-remote/src/main/resources/reference.conf
.. literalinclude:: ../../../akka-remote/src/main/resources/reference.conf
:language: none
akka-testkit
~~~~~~~~~~~~
.. literalinclude:: ../../akka-testkit/src/main/resources/reference.conf
.. literalinclude:: ../../../akka-testkit/src/main/resources/reference.conf
:language: none
akka-transactor
~~~~~~~~~~~~~~~
.. literalinclude:: ../../akka-transactor/src/main/resources/reference.conf
.. literalinclude:: ../../../akka-transactor/src/main/resources/reference.conf
:language: none
akka-agent
~~~~~~~~~~
.. literalinclude:: ../../akka-agent/src/main/resources/reference.conf
.. literalinclude:: ../../../akka-agent/src/main/resources/reference.conf
:language: none
akka-zeromq
~~~~~~~~~~~
.. literalinclude:: ../../akka-zeromq/src/main/resources/reference.conf
.. literalinclude:: ../../../akka-zeromq/src/main/resources/reference.conf
:language: none
akka-file-mailbox
~~~~~~~~~~~~~~~~~
.. literalinclude:: ../../akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf
.. literalinclude:: ../../../akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf
:language: none

View file

Before

Width:  |  Height:  |  Size: 72 KiB

After

Width:  |  Height:  |  Size: 72 KiB

Before After
Before After

View file

@ -66,4 +66,51 @@ This means that:
5) ``A2`` can see messages from ``A1`` interleaved with messages from ``A3``
6) Since there is no guaranteed delivery, none, some or all of the messages may arrive to ``A2``
.. _deadletters:
Dead Letters
============
Messages which cannot be delivered (and for which this can be ascertained) will
be delivered to a synthetic actor called ``/deadLetters``. This delivery
happens on a best-effort basis; it may fail even within the local JVM (e.g.
during actor termination). Messages sent via unreliable network transports will
be lost without turning up as dead letters.
How do I Receive Dead Letters?
------------------------------
An actor can subscribe to class :class:`akka.actor.DeadLetter` on the event
stream, see :ref:`event-stream-java` (Java) or :ref:`event-stream-scala`
(Scala) for how to do that. The subscribed actor will then receive all dead
letters published in the (local) system from that point onwards. Dead letters
are not propagated over the network, if you want to collect them in one place
you will have to subscribe one actor per network node and forward them
manually. Also consider that dead letters are generated at that node which can
determine that a send operation is failed, which for a remote send can be the
local system (if no network connection can be established) or the remote one
(if the actor you are sending to does not exist at that point in time).
What Should I Use Dead Letters For?
-----------------------------------
The dead letter service follows the same rules with respect to delivery
guarantees as all other message sends, hence it cannot be used to implement
guaranteed delivery. The main use is for debugging, especially if an actor send
does not arrive consistently (where usually inspecting the dead letters will
tell you that the sender or recipient was set wrong somewhere along the way).
Dead Letters Which are (Usually) not Worrisome
----------------------------------------------
Every time an actor does not terminate by its own decision, there is a chance
that some messages which it sends to itself are lost. There is one which
happens quite easily in complex shutdown scenarios that is usually benign:
seeing a :class:`akka.dispatch.Terminate` message dropped means that two
termination requests were given, but of course only one can succeed. In the
same vein, you might see :class:`akka.actor.Terminated` messages from children
while stopping a hierarchy of actors turning up in dead letters if the parent
is still watching the child when the parent terminates.
.. _Erlang documentation: http://www.erlang.org/faq/academic.html

View file

@ -49,7 +49,7 @@ recommended way in this case is to add a level of supervision.
Akka implements a specific form called “parental supervision”. Actors can only
be created by other actors—where the top-level actor is provided by the
library—and each created actor is supervised by its parent. This restriction
makes the formation of actor supervision hierarchies explicit and encourages
makes the formation of actor supervision hierarchies implicit and encourages
sound design decisions. It should be noted that this also guarantees that
actors cannot be orphaned or attached to supervisors from the outside, which
might otherwise catch them unawares. In addition, this yields a natural and
@ -141,21 +141,25 @@ re-processed.
The precise sequence of events during a restart is the following:
* suspend the actor
* call the old instances :meth:`supervisionStrategy.handleSupervisorFailing`
method (defaults to suspending all children)
* call the old instances :meth:`preRestart` hook (defaults to sending
#. suspend the actor (which means that it will not process normal messages until
resumed), and recursively suspend all children
#. call the old instances :meth:`preRestart` hook (defaults to sending
termination requests to all children and calling :meth:`postStop`)
* wait for all children stopped during :meth:`preRestart` to actually terminate
* call the old instances :meth:`supervisionStrategy.handleSupervisorRestarted`
method (defaults to sending restart request to all remaining children)
* create new actor instance by invoking the originally provided factory again
* invoke :meth:`postRestart` on the new instance
* resume the actor
#. wait for all children which were requested to terminate (using
``context.stop()``) during :meth:`preRestart` to actually terminate
#. create new actor instance by invoking the originally provided factory again
#. invoke :meth:`postRestart` on the new instance (which by default also calls :meth:`preStart`)
#. send restart request to all children (they will follow the same process
recursively, from step 2)
#. resume the actor
What Lifecycle Monitoring Means
-------------------------------
.. note::
Lifecycle Monitoring in Akka is usually referred to as ``DeathWatch``
In contrast to the special relationship between parent and child described
above, each actor may monitor any other actor. Since actors emerge from
creation fully alive and restarts are not visible outside of the affected
@ -166,8 +170,10 @@ reacts to failure.
Lifecycle monitoring is implemented using a :class:`Terminated` message to be
received by the monitoring actor, where the default behavior is to throw a
special :class:`DeathPactException` if not otherwise handled. One important
property is that the message will be delivered irrespective of the order in
special :class:`DeathPactException` if not otherwise handled. In order to
start listening for :class:`Terminated` messages is to use ``ActorContext.watch(targetActorRef)``
and then ``ActorContext.unwatch(targetActorRef)`` to stop listening for that.
One important property is that the message will be delivered irrespective of the order in
which the monitoring request and targets termination occur, i.e. you still get
the message even if at the time of registration the target is already dead.

View file

Before

Width:  |  Height:  |  Size: 22 KiB

After

Width:  |  Height:  |  Size: 22 KiB

Before After
Before After

View file

Before

Width:  |  Height:  |  Size: 14 KiB

After

Width:  |  Height:  |  Size: 14 KiB

Before After
Before After

View file

Before

Width:  |  Height:  |  Size: 119 KiB

After

Width:  |  Height:  |  Size: 119 KiB

Before After
Before After

View file

Before

Width:  |  Height:  |  Size: 22 KiB

After

Width:  |  Height:  |  Size: 22 KiB

Before After
Before After

View file

Before

Width:  |  Height:  |  Size: 71 KiB

After

Width:  |  Height:  |  Size: 71 KiB

Before After
Before After

View file

Before

Width:  |  Height:  |  Size: 64 KiB

After

Width:  |  Height:  |  Size: 64 KiB

Before After
Before After

View file

Before

Width:  |  Height:  |  Size: 53 KiB

After

Width:  |  Height:  |  Size: 53 KiB

Before After
Before After

View file

Before

Width:  |  Height:  |  Size: 110 KiB

After

Width:  |  Height:  |  Size: 110 KiB

Before After
Before After

View file

Before

Width:  |  Height:  |  Size: 82 KiB

After

Width:  |  Height:  |  Size: 82 KiB

Before After
Before After

View file

Before

Width:  |  Height:  |  Size: 84 KiB

After

Width:  |  Height:  |  Size: 84 KiB

Before After
Before After

View file

Before

Width:  |  Height:  |  Size: 134 KiB

After

Width:  |  Height:  |  Size: 134 KiB

Before After
Before After

View file

Before

Width:  |  Height:  |  Size: 1.5 KiB

After

Width:  |  Height:  |  Size: 1.5 KiB

Before After
Before After

View file

Before

Width:  |  Height:  |  Size: 28 KiB

After

Width:  |  Height:  |  Size: 28 KiB

Before After
Before After

View file

Before

Width:  |  Height:  |  Size: 124 KiB

After

Width:  |  Height:  |  Size: 124 KiB

Before After
Before After

View file

@ -29,18 +29,21 @@ Akka Maven repository.
Modules
-------
Akka is very modular and has many JARs for containing different features.
Akka is very modular and consists of several JARs containing different features.
- ``akka-actor-2.1-SNAPSHOT.jar`` -- Classic Actors, Typed Actors, IO Actor etc.
- ``akka-remote-2.1-SNAPSHOT.jar`` -- Remote Actors
- ``akka-testkit-2.1-SNAPSHOT.jar`` -- Toolkit for testing Actor systems
- ``akka-kernel-2.1-SNAPSHOT.jar`` -- Akka microkernel for running a bare-bones mini application server
- ``akka-transactor-2.1-SNAPSHOT.jar`` -- Transactors - transactional actors, integrated with Scala STM
- ``akka-agent-2.1-SNAPSHOT.jar`` -- Agents, integrated with Scala STM
- ``akka-camel-2.1-SNAPSHOT.jar`` -- Apache Camel integration
- ``akka-zeromq-2.1-SNAPSHOT.jar`` -- ZeroMQ integration
- ``akka-slf4j-2.1-SNAPSHOT.jar`` -- SLF4J Event Handler Listener
- ``akka-<storage-system>-mailbox-2.1-SNAPSHOT.jar`` -- Akka durable mailboxes
- ``akka-actor`` -- Classic Actors, Typed Actors, IO Actor etc.
- ``akka-remote`` -- Remote Actors
- ``akka-testkit`` -- Toolkit for testing Actor systems
- ``akka-kernel`` -- Akka microkernel for running a bare-bones mini application server
- ``akka-transactor`` -- Transactors - transactional actors, integrated with Scala STM
- ``akka-agent`` -- Agents, integrated with Scala STM
- ``akka-camel`` -- Apache Camel integration
- ``akka-zeromq`` -- ZeroMQ integration
- ``akka-slf4j`` -- SLF4J Event Handler Listener
- ``akka-filebased-mailbox`` -- Akka durable mailbox (find more among community projects)
The filename of the actual JAR is for example ``@jarName@`` (and analog for
the other modules).
How to see the JARs dependencies of each Akka module is described in the
:ref:`dependencies` section.
@ -84,26 +87,16 @@ The simplest way to get started with Akka and Maven is to check out the
`Akka/Maven template <http://typesafe.com/resources/getting-started/typesafe-stack/downloading-installing.html#template-projects-for-scala-akka-and-play>`_
project.
Summary of the essential parts for using Akka with Maven:
1) Add this repository to your ``pom.xml``:
.. code-block:: xml
<repository>
<id>typesafe</id>
<name>Typesafe Repository</name>
<url>http://repo.typesafe.com/typesafe/releases/</url>
</repository>
2) Add the Akka dependencies. For example, here is the dependency for Akka Actor 2.1-SNAPSHOT:
Since Akka is published to Maven Central (for versions since 2.1-M2), is it
enough to add the Akka dependencies to the POM. For example, here is the
dependency for akka-actor:
.. code-block:: xml
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor</artifactId>
<version>2.1-SNAPSHOT</version>
<artifactId>akka-actor_@binVersion@</artifactId>
<version>@version@</version>
</dependency>
**Note**: for snapshot versions both ``SNAPSHOT`` and timestamped versions are published.
@ -128,11 +121,12 @@ SBT installation instructions on `https://github.com/harrah/xsbt/wiki/Setup <htt
version := "1.0"
scalaVersion := "|scalaVersion|"
scalaVersion := "@scalaVersion@"
resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
libraryDependencies += "com.typesafe.akka" % "akka-actor" % "2.1-SNAPSHOT"
libraryDependencies +=
"com.typesafe.akka" %% "akka-actor" % "@version@" @crossString@
Using Akka with Eclipse
@ -167,3 +161,4 @@ If you have questions you can get help on the `Akka Mailing List <http://groups.
You can also ask for `commercial support <http://typesafe.com>`_.
Thanks for being a part of the Akka community.

View file

@ -147,7 +147,7 @@ public class FSMDocTestBase {
@Override
public void transition(State old, State next) {
if (old == State.ACTIVE) {
getTarget().tell(new Batch(drainQueue()));
getTarget().tell(new Batch(drainQueue()), getSelf());
}
}
@ -175,11 +175,11 @@ public class FSMDocTestBase {
public void mustBunch() {
final ActorRef buncher = system.actorOf(new Props(MyFSM.class));
final TestProbe probe = new TestProbe(system);
buncher.tell(new SetTarget(probe.ref()));
buncher.tell(new Queue(1));
buncher.tell(new Queue(2));
buncher.tell(flush);
buncher.tell(new Queue(3));
buncher.tell(new SetTarget(probe.ref()), null);
buncher.tell(new Queue(1), null);
buncher.tell(new Queue(2), null);
buncher.tell(flush, null);
buncher.tell(new Queue(3), null);
final Batch b = probe.expectMsgClass(Batch.class);
assert b.objects.size() == 2;
assert b.objects.contains(1);

Some files were not shown because too many files have changed in this diff Show more