From ca75bbc2a00023b1a93b361c08662696f5941290 Mon Sep 17 00:00:00 2001 From: Roland Date: Mon, 24 Sep 2012 11:51:21 +0200 Subject: [PATCH 1/7] say "IO support" in docs (avoid "module") --- akka-docs/rst/scala/io.rst | 6 +++--- akka-samples/akka-sample-hello/{README => README.md} | 0 2 files changed, 3 insertions(+), 3 deletions(-) rename akka-samples/akka-sample-hello/{README => README.md} (100%) diff --git a/akka-docs/rst/scala/io.rst b/akka-docs/rst/scala/io.rst index 0b664f9c13..866fa8bffc 100644 --- a/akka-docs/rst/scala/io.rst +++ b/akka-docs/rst/scala/io.rst @@ -15,7 +15,7 @@ Components ByteString ^^^^^^^^^^ -A primary goal of Akka's IO module is to only communicate between actors with immutable objects. When dealing with network IO on the jvm ``Array[Byte]`` and ``ByteBuffer`` are commonly used to represent collections of ``Byte``\s, but they are mutable. Scala's collection library also lacks a suitably efficient immutable collection for ``Byte``\s. Being able to safely and efficiently move ``Byte``\s around is very important for this IO module, so ``ByteString`` was developed. +A primary goal of Akka's IO support is to only communicate between actors with immutable objects. When dealing with network IO on the jvm ``Array[Byte]`` and ``ByteBuffer`` are commonly used to represent collections of ``Byte``\s, but they are mutable. Scala's collection library also lacks a suitably efficient immutable collection for ``Byte``\s. Being able to safely and efficiently move ``Byte``\s around is very important for this IO support, so ``ByteString`` was developed. ``ByteString`` is a `Rope-like `_ data structure that is immutable and efficient. When 2 ``ByteString``\s are concatenated together they are both stored within the resulting ``ByteString`` instead of copying both to a new ``Array``. Operations such as ``drop`` and ``take`` return ``ByteString``\s that still reference the original ``Array``, but just change the offset and length that is visible. Great care has also been taken to make sure that the internal ``Array`` cannot be modified. Whenever a potentially unsafe ``Array`` is used to create a new ``ByteString`` a defensive copy is created. If you require a ``ByteString`` that only blocks a much memory as necessary for it's content, use the ``compact`` method to get a ``CompactByteString`` instance. If the ``ByteString`` represented only a slice of the original array, this will result in copying all bytes in that slice. @@ -138,13 +138,13 @@ Receiving messages from the ``IOManager``: IO.Iteratee ^^^^^^^^^^^ -Included with Akka's IO module is a basic implementation of ``Iteratee``\s. ``Iteratee``\s are an effective way of handling a stream of data without needing to wait for all the data to arrive. This is especially useful when dealing with non blocking IO since we will usually receive data in chunks which may not include enough information to process, or it may contain much more data then we currently need. +Included with Akka's IO support is a basic implementation of ``Iteratee``\s. ``Iteratee``\s are an effective way of handling a stream of data without needing to wait for all the data to arrive. This is especially useful when dealing with non blocking IO since we will usually receive data in chunks which may not include enough information to process, or it may contain much more data then we currently need. This ``Iteratee`` implementation is much more basic then what is usually found. There is only support for ``ByteString`` input, and enumerators aren't used. The reason for this limited implementation is to reduce the amount of explicit type signatures needed and to keep things simple. It is important to note that Akka's ``Iteratee``\s are completely optional, incoming data can be handled in any way, including other ``Iteratee`` libraries. ``Iteratee``\s work by processing the data that it is given and returning either the result (with any unused input) or a continuation if more input is needed. They are monadic, so methods like ``flatMap`` can be used to pass the result of an ``Iteratee`` to another. -The basic ``Iteratee``\s included in the IO module can all be found in the ScalaDoc under ``akka.actor.IO``, and some of them are covered in the example below. +The basic ``Iteratee``\s included in the IO support can all be found in the ScalaDoc under ``akka.actor.IO``, and some of them are covered in the example below. Examples -------- diff --git a/akka-samples/akka-sample-hello/README b/akka-samples/akka-sample-hello/README.md similarity index 100% rename from akka-samples/akka-sample-hello/README rename to akka-samples/akka-sample-hello/README.md From 4c9d1760a1b0c58da9b565a3678bb83d0b069f02 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 24 Sep 2012 13:15:58 +0200 Subject: [PATCH 2/7] Don't await the write of shutdownSignal to openChannels when shutting netty, see #2535 * The multi node tests triggered shutdown ordering issue * Not necessary to awaitUninterruptibly for the shutdownSignal to openChannels --- akka-remote/src/main/scala/akka/remote/netty/Server.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-remote/src/main/scala/akka/remote/netty/Server.scala b/akka-remote/src/main/scala/akka/remote/netty/Server.scala index 895fea9212..16269a43a2 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Server.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Server.scala @@ -72,7 +72,7 @@ private[akka] class NettyRemoteServer(val netty: NettyRemoteTransport) { b.setCookie(settings.SecureCookie.get) b.build } - openChannels.write(netty.createControlEnvelope(shutdownSignal)).awaitUninterruptibly + openChannels.write(netty.createControlEnvelope(shutdownSignal)) openChannels.disconnect openChannels.close.awaitUninterruptibly bootstrap.releaseExternalResources() From 0489c5daf195f48ba85133726cdd6d0a17a275c5 Mon Sep 17 00:00:00 2001 From: Roland Date: Tue, 25 Sep 2012 08:25:24 +0200 Subject: [PATCH 3/7] document ActorDSL, see #2411 --- .../test/scala/akka/actor/ActorDSLSpec.scala | 15 +++++ akka-docs/rst/scala/actors.rst | 55 +++++++++++++++++++ 2 files changed, 70 insertions(+) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala index 863742fc35..520c0de6bf 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala @@ -7,7 +7,9 @@ package akka.actor import language.postfixOps import akka.testkit.{ AkkaSpec, EventFilter } +//#import import akka.actor.ActorDSL._ +//#import import akka.event.Logging.Warning import scala.concurrent.{ Await, Future } import scala.concurrent.util.duration._ @@ -88,11 +90,13 @@ class ActorDSLSpec extends AkkaSpec { "A lightweight creator" must { "support creating regular actors" in { + //#simple-actor val a = actor(new Act { become { case "hello" ⇒ sender ! "hi" } }) + //#simple-actor implicit val i = inbox() a ! "hello" @@ -100,10 +104,12 @@ class ActorDSLSpec extends AkkaSpec { } "support setup/teardown" in { + //#simple-start-stop val a = actor(new Act { whenStarting { testActor ! "started" } whenStopping { testActor ! "stopped" } }) + //#simple-start-stop system stop a expectMsg("started") @@ -111,6 +117,7 @@ class ActorDSLSpec extends AkkaSpec { } "support restart" in { + //#failing-actor val a = actor(new Act { become { case "die" ⇒ throw new Exception @@ -118,6 +125,7 @@ class ActorDSLSpec extends AkkaSpec { whenFailing { (cause, msg) ⇒ testActor ! (cause, msg) } whenRestarted { cause ⇒ testActor ! cause } }) + //#failing-actor EventFilter[Exception](occurrences = 1) intercept { a ! "die" @@ -129,10 +137,12 @@ class ActorDSLSpec extends AkkaSpec { "support superviseWith" in { val a = actor(new Act { val system = null // shadow the implicit system + //#supervise-with superviseWith(OneForOneStrategy() { case e: Exception if e.getMessage == "hello" ⇒ SupervisorStrategy.Stop case _: Exception ⇒ SupervisorStrategy.Resume }) + //#supervise-with val child = actor("child")(new Act { whenFailing { (_, _) ⇒ } become { @@ -157,6 +167,8 @@ class ActorDSLSpec extends AkkaSpec { "supported nested declaration" in { val system = this.system + //#nested-actor + // here we pass in the ActorRefFactory explicitly as an example val a = actor(system, "fred")(new Act { val b = actor("barney")(new Act { whenStarting { context.parent ! ("hello from " + self) } @@ -165,11 +177,13 @@ class ActorDSLSpec extends AkkaSpec { case x ⇒ testActor ! x } }) + //#nested-actor expectMsg("hello from Actor[akka://ActorDSLSpec/user/fred/barney]") lastSender must be(a) } "support Stash" in { + //#act-with-stash val a = actor(new ActWithStash { become { case 1 ⇒ stash() @@ -179,6 +193,7 @@ class ActorDSLSpec extends AkkaSpec { } } }) + //#act-with-stash a ! 1 a ! 2 diff --git a/akka-docs/rst/scala/actors.rst b/akka-docs/rst/scala/actors.rst index 88559939e6..914c899a77 100644 --- a/akka-docs/rst/scala/actors.rst +++ b/akka-docs/rst/scala/actors.rst @@ -153,6 +153,61 @@ When spawning actors for specific sub-tasks from within an actor, it may be conv there is not yet a way to detect these illegal accesses at compile time. See also: :ref:`jmm-shared-state` +The Actor DSL +------------- + +Simple actors—for example one-off workers or even when trying things out in the +REPL—can be created more concisely using the :class:`Act` trait. The supporting +infrastructure is bundled in the following import: + +.. includecode:: ../../../akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala#import + +This import is assumed for all code samples throughout this section. To defined +a simple actor, the following is sufficient: + +.. includecode:: ../../../akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala#simple-actor + +Here, :meth:`actor` takes the role of either ``system.actorOf`` or +``context.actorOf``, depending on which context it is called in: it takes an +implicit :class:`ActorRefFactory`, which within an actor is available in the +form of the ``implicit val context: ActorContext``. Outside of an actor, you’ll +have to either declare an implicit :class:`ActorSystem`, or you can give the +factory explicitly (see further below). + +Life-cycle hooks are also exposed as DSL elements, where later invocations of +the methods shown below will replace the contents of the respective hooks: + +.. includecode:: ../../../akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala#simple-start-stop + +The above is enough if the logical life-cycle of the actor matches the restart +cycles (i.e. ``whenStopping`` is executed before a restart and ``whenStarting`` +afterwards). If that is not desired, use the following two hooks: + +.. includecode:: ../../../akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala#failing-actor + +It is also possible to create nested actors, i.e. grand-children, like this: + +.. includecode:: ../../../akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala#nested-actor + +.. note:: + + In some cases it will be necessary to explicitly pass the + :class:`ActorRefFactory` to the :meth:`actor()` method (you will notice when + the compiler tells you about ambiguous implicits). + +The grand-child will be supervised by the child; the supervisor strategy for +this relationship can also be configured using a DSL element: + +.. includecode:: ../../../akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala#supervise-with + +Last but not least there is a little bit of convenience magic built-in, which +detects if the runtime class of the statically given actor subtype extends the +:class:`Stash` trait (this is a complicated way of saying that ``new Act with +Stash`` would not work because its runtime erased type is just an anonymous +subtype of ``Act``). If you want to use this magic, simply extend +:class:`ActWithStash`: + +.. includecode:: ../../../akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala#act-with-stash Actor API ========= From 3627b6cb4c418117b68d3ab28284866e45d15d85 Mon Sep 17 00:00:00 2001 From: Roland Date: Tue, 25 Sep 2012 09:27:13 +0200 Subject: [PATCH 4/7] replace/add READMEs for the samples (keep it simple) --- akka-samples/akka-sample-camel/README | 28 ------------------ akka-samples/akka-sample-camel/README.md | 15 ++++++++++ akka-samples/akka-sample-cluster/README.md | 20 +++++++++++++ akka-samples/akka-sample-fsm/README | 28 ------------------ akka-samples/akka-sample-fsm/README.md | 15 ++++++++++ .../akka-sample-hello-kernel/README.md | 13 +++++++++ akka-samples/akka-sample-hello/README.md | 29 +++++-------------- akka-samples/akka-sample-multi-node/README.md | 15 ++++++++++ 8 files changed, 86 insertions(+), 77 deletions(-) delete mode 100644 akka-samples/akka-sample-camel/README create mode 100644 akka-samples/akka-sample-camel/README.md create mode 100644 akka-samples/akka-sample-cluster/README.md delete mode 100644 akka-samples/akka-sample-fsm/README create mode 100644 akka-samples/akka-sample-fsm/README.md create mode 100644 akka-samples/akka-sample-hello-kernel/README.md create mode 100644 akka-samples/akka-sample-multi-node/README.md diff --git a/akka-samples/akka-sample-camel/README b/akka-samples/akka-sample-camel/README deleted file mode 100644 index 10738ce0ee..0000000000 --- a/akka-samples/akka-sample-camel/README +++ /dev/null @@ -1,28 +0,0 @@ -Camel -=== - -Requirements ------------- - -To build and run Camel you need [Simple Build Tool][sbt] (sbt). - -Running -------- - -First time, 'sbt update' to get dependencies, then use 'sbt run'. -Here is an example. First type 'sbt' to start SBT interactively, the run 'update' and 'run': -> cd $AKKA_HOME - -> % sbt - -> > project akka-sample-camel - -> > run - -> > Choose 1 or 2 depending on what sample you wish to run - -Notice ------- - -[akka]: http://akka.io -[sbt]: http://code.google.com/p/simple-build-tool/ \ No newline at end of file diff --git a/akka-samples/akka-sample-camel/README.md b/akka-samples/akka-sample-camel/README.md new file mode 100644 index 0000000000..d71806b0f9 --- /dev/null +++ b/akka-samples/akka-sample-camel/README.md @@ -0,0 +1,15 @@ +Camel Sample +============ + +This sample is meant to be used by studying the code; it does not perform any +astounding functions when running it. If you want to run it, check out the akka +sources on your local hard drive, follow the [instructions for setting up Akka +with SBT](http://doc.akka.io/docs/akka/current/intro/getting-started.html). +When you start SBT within the checked-out akka source directory, you can run +this sample by typing + + akka-sample-camel/run + +and then choose which of the demonstrations you would like to run. + +You can read more in the [Akka docs](http://akka.io/docs). diff --git a/akka-samples/akka-sample-cluster/README.md b/akka-samples/akka-sample-cluster/README.md new file mode 100644 index 0000000000..3a6daaca2a --- /dev/null +++ b/akka-samples/akka-sample-cluster/README.md @@ -0,0 +1,20 @@ +Cluster Sample +============== + +This sample is meant to be used by studying the code; it does not perform any +astounding functions when running it. If you want to run it, check out the akka +sources on your local hard drive, follow the [instructions for setting up Akka +with SBT](http://doc.akka.io/docs/akka/current/intro/getting-started.html). +When you start SBT within the checked-out akka source directory, you can run +this sample by typing + + akka-sample-cluster-experimental/run-main sample.cluster.simple.SimpleClusterApp 2551 + +and then from another terminal start more cluster nodes like this: + + akka-sample-cluster-experimental/run-main sample.cluster.simple.SimpleClusterApp + +Then you can start and stop cluster nodes and observe the messages printed by +the remaining ones, demonstrating cluster membership changes. + +You can read more in the [Akka docs](http://akka.io/docs). diff --git a/akka-samples/akka-sample-fsm/README b/akka-samples/akka-sample-fsm/README deleted file mode 100644 index 1391071f0b..0000000000 --- a/akka-samples/akka-sample-fsm/README +++ /dev/null @@ -1,28 +0,0 @@ -FSM -=== - -Requirements ------------- - -To build and run FSM you need [Simple Build Tool][sbt] (sbt). - -Running -------- - -First time, 'sbt update' to get dependencies, then to run Ants use 'sbt run'. -Here is an example. First type 'sbt' to start SBT interactively, the run 'update' and 'run': -> cd $AKKA_HOME - -> % sbt - -> > project akka-sample-fsm - -> > run - -> > Choose 1 or 2 depending on what sample you wish to run - -Notice ------- - -[akka]: http://akka.io -[sbt]: http://code.google.com/p/simple-build-tool/ diff --git a/akka-samples/akka-sample-fsm/README.md b/akka-samples/akka-sample-fsm/README.md new file mode 100644 index 0000000000..223782c0f1 --- /dev/null +++ b/akka-samples/akka-sample-fsm/README.md @@ -0,0 +1,15 @@ +FSM Sample +========== + +This sample is meant to be used by studying the code; it does not perform any +astounding functions when running it. If you want to run it, check out the akka +sources on your local hard drive, follow the [instructions for setting up Akka +with SBT](http://doc.akka.io/docs/akka/current/intro/getting-started.html). +When you start SBT within the checked-out akka source directory, you can run +this sample by typing + + akka-sample-fsm/run + +and then choose one of the two ways (one using `context.become` and one using FSM). + +You can read more in the [Akka docs](http://akka.io/docs). diff --git a/akka-samples/akka-sample-hello-kernel/README.md b/akka-samples/akka-sample-hello-kernel/README.md new file mode 100644 index 0000000000..845516d206 --- /dev/null +++ b/akka-samples/akka-sample-hello-kernel/README.md @@ -0,0 +1,13 @@ +Hello World Sample +================== + +This sample is meant to be used by studying the code; it does not perform any +astounding functions when running it. If you want to run it, check out the akka +sources on your local hard drive, follow the [instructions for setting up Akka +with SBT](http://doc.akka.io/docs/akka/current/intro/getting-started.html). +When you start SBT within the checked-out akka source directory, you can run +this sample by typing + + akka-sample-hello/run + +You can read more in the [Akka docs](http://akka.io/docs). diff --git a/akka-samples/akka-sample-hello/README.md b/akka-samples/akka-sample-hello/README.md index 81a6db8d3e..ebce03e807 100644 --- a/akka-samples/akka-sample-hello/README.md +++ b/akka-samples/akka-sample-hello/README.md @@ -1,26 +1,13 @@ HELLO ===== -Requirements ------------- +This sample is meant to be used by studying the code; it does not perform any +astounding functions when running it. If you want to run it, check out the akka +sources on your local hard drive, follow the [instructions for setting up Akka +with SBT](http://doc.akka.io/docs/akka/current/intro/getting-started.html). +When you start SBT within the checked-out akka source directory, you can run +this sample by typing -To build and run FSM you need [Simple Build Tool][sbt] (sbt). + akka-sample-hello/run -Running -------- - -First time, 'sbt update' to get dependencies, then to run Ants use 'sbt run'. -Here is an example. First type 'sbt' to start SBT interactively, the run 'update' and 'run': -> cd $AKKA_HOME - -> % sbt - -> > project akka-sample-hello - -> > run - -Notice ------- - -[akka]: http://akka.io -[sbt]: http://code.google.com/p/simple-build-tool/ +You can read more in the [Akka docs](http://akka.io/docs). diff --git a/akka-samples/akka-sample-multi-node/README.md b/akka-samples/akka-sample-multi-node/README.md new file mode 100644 index 0000000000..ed01383705 --- /dev/null +++ b/akka-samples/akka-sample-multi-node/README.md @@ -0,0 +1,15 @@ +Multi-Node Test Sample +====================== + +This sample is meant to be used by studying the code; it does not perform any +astounding functions when running it. If you want to run it, check out the akka +sources on your local hard drive, follow the [instructions for setting up Akka +with SBT](http://doc.akka.io/docs/akka/current/intro/getting-started.html). +When you start SBT within the checked-out akka source directory, you can run +this sample by typing + + akka-sample-multi-node-experimental/test + +(You might have to pass a system property containing `akka.test.tags.include=long-running`.) + +You can read more in the [Akka docs](http://akka.io/docs). From f3eadcd927bbb5fbf09368c72962e6007ddefc79 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 25 Sep 2012 10:58:40 +0200 Subject: [PATCH 5/7] Race in LeaderLeavingSpec, see #2549 * Since the leaving cluster is shutdown it's not possible to use the readView to assert that thing. * Removed the check, enough verification in other parts of the test anyway --- .../src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala index eb0aa40591..394db2af77 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala @@ -56,9 +56,6 @@ abstract class LeaderLeavingSpec cluster.leave(oldLeaderAddress) enterBarrier("leader-left") - // verify that a NEW LEADER have taken over - awaitCond(!clusterView.isLeader) - // verify that the LEADER is shut down awaitCond(!cluster.isRunning) From be877a6197fb6030013ce8776200462c8bfe62d2 Mon Sep 17 00:00:00 2001 From: Roland Date: Tue, 25 Sep 2012 12:18:44 +0200 Subject: [PATCH 6/7] export supervision tools in Act trait, and other review fixes - add more cross references to ActorDSL docs - improve SBT command line for running the multi-node test - correct small error in Restart Hooks section of actors.rst --- .../test/scala/akka/actor/ActorDSLSpec.scala | 4 +-- .../main/scala/akka/actor/dsl/Creators.scala | 30 +++++++++++++++++++ akka-docs/rst/java/untyped-actors.rst | 5 ++-- akka-docs/rst/scala/actors.rst | 18 ++++++----- .../scala/code/docs/actor/ActorDocSpec.scala | 8 +++++ akka-samples/akka-sample-multi-node/README.md | 2 +- 6 files changed, 55 insertions(+), 12 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala index 520c0de6bf..bb5ed0d4bd 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala @@ -139,8 +139,8 @@ class ActorDSLSpec extends AkkaSpec { val system = null // shadow the implicit system //#supervise-with superviseWith(OneForOneStrategy() { - case e: Exception if e.getMessage == "hello" ⇒ SupervisorStrategy.Stop - case _: Exception ⇒ SupervisorStrategy.Resume + case e: Exception if e.getMessage == "hello" ⇒ Stop + case _: Exception ⇒ Resume }) //#supervise-with val child = actor("child")(new Act { diff --git a/akka-actor/src/main/scala/akka/actor/dsl/Creators.scala b/akka-actor/src/main/scala/akka/actor/dsl/Creators.scala index b81f733013..29dda88300 100644 --- a/akka-actor/src/main/scala/akka/actor/dsl/Creators.scala +++ b/akka-actor/src/main/scala/akka/actor/dsl/Creators.scala @@ -54,6 +54,36 @@ trait Creators { this: ActorDSL.type ⇒ private[this] var postRestartFun: Throwable ⇒ Unit = null private[this] var strategy: SupervisorStrategy = null + /** + * @see [[akka.actor.OneForOneStrategy]] + */ + def OneForOneStrategy = akka.actor.OneForOneStrategy + + /** + * @see [[akka.actor.AllForOneStrategy]] + */ + def AllForOneStrategy = akka.actor.AllForOneStrategy + + /** + * @see [[akka.actor.SupervisorStrategy]] + */ + def Stop = SupervisorStrategy.Stop + + /** + * @see [[akka.actor.SupervisorStrategy]] + */ + def Restart = SupervisorStrategy.Restart + + /** + * @see [[akka.actor.SupervisorStrategy]] + */ + def Resume = SupervisorStrategy.Resume + + /** + * @see [[akka.actor.SupervisorStrategy]] + */ + def Escalate = SupervisorStrategy.Escalate + /** * Add the given behavior on top of the behavior stack for this actor. This * stack is cleared upon restart. Use `unbecome()` to pop an element off diff --git a/akka-docs/rst/java/untyped-actors.rst b/akka-docs/rst/java/untyped-actors.rst index f1cf49f7d1..40bc124ba0 100644 --- a/akka-docs/rst/java/untyped-actors.rst +++ b/akka-docs/rst/java/untyped-actors.rst @@ -200,8 +200,9 @@ Restart Hooks ------------- All actors are supervised, i.e. linked to another actor with a fault -handling strategy. Actors will be restarted in case an exception is thrown while -processing a message. This restart involves the hooks mentioned above: +handling strategy. Actors may be restarted in case an exception is thrown while +processing a message (see :ref:`supervision`). This restart involves the hooks +mentioned above: 1. The old actor is informed by calling :meth:`preRestart` with the exception which caused the restart and the message which triggered that exception; the diff --git a/akka-docs/rst/scala/actors.rst b/akka-docs/rst/scala/actors.rst index 914c899a77..cb97262329 100644 --- a/akka-docs/rst/scala/actors.rst +++ b/akka-docs/rst/scala/actors.rst @@ -162,7 +162,7 @@ infrastructure is bundled in the following import: .. includecode:: ../../../akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala#import -This import is assumed for all code samples throughout this section. To defined +This import is assumed for all code samples throughout this section. To define a simple actor, the following is sufficient: .. includecode:: ../../../akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala#simple-actor @@ -174,14 +174,16 @@ form of the ``implicit val context: ActorContext``. Outside of an actor, you have to either declare an implicit :class:`ActorSystem`, or you can give the factory explicitly (see further below). -Life-cycle hooks are also exposed as DSL elements, where later invocations of -the methods shown below will replace the contents of the respective hooks: +Life-cycle hooks are also exposed as DSL elements (see `Start Hook`_ and `Stop +Hook`_ below), where later invocations of the methods shown below will replace +the contents of the respective hooks: .. includecode:: ../../../akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala#simple-start-stop The above is enough if the logical life-cycle of the actor matches the restart cycles (i.e. ``whenStopping`` is executed before a restart and ``whenStarting`` -afterwards). If that is not desired, use the following two hooks: +afterwards). If that is not desired, use the following two hooks (see `Restart +Hooks`_ below): .. includecode:: ../../../akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala#failing-actor @@ -196,7 +198,8 @@ It is also possible to create nested actors, i.e. grand-children, like this: the compiler tells you about ambiguous implicits). The grand-child will be supervised by the child; the supervisor strategy for -this relationship can also be configured using a DSL element: +this relationship can also be configured using a DSL element (supervision +directives are part of the :class:`Act` trait): .. includecode:: ../../../akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala#supervise-with @@ -301,8 +304,9 @@ Restart Hooks ------------- All actors are supervised, i.e. linked to another actor with a fault -handling strategy. Actors will be restarted in case an exception is thrown while -processing a message. This restart involves the hooks mentioned above: +handling strategy. Actors may be restarted in case an exception is thrown while +processing a message (see :ref:`supervision`). This restart involves the hooks +mentioned above: 1. The old actor is informed by calling :meth:`preRestart` with the exception which caused the restart and the message which triggered that exception; the diff --git a/akka-docs/rst/scala/code/docs/actor/ActorDocSpec.scala b/akka-docs/rst/scala/code/docs/actor/ActorDocSpec.scala index 0ce5f87728..244ab5f136 100644 --- a/akka-docs/rst/scala/code/docs/actor/ActorDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/actor/ActorDocSpec.scala @@ -406,4 +406,12 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { lastSender must be === system.actorFor("/user") } + "using ActorDSL outside of akka.actor package" in { + import akka.actor.ActorDSL._ + actor(new Act { + superviseWith(OneForOneStrategy() { case _ ⇒ Stop; Restart; Resume; Escalate }) + superviseWith(AllForOneStrategy() { case _ ⇒ Stop; Restart; Resume; Escalate }) + }) + } + } diff --git a/akka-samples/akka-sample-multi-node/README.md b/akka-samples/akka-sample-multi-node/README.md index ed01383705..4a3149599f 100644 --- a/akka-samples/akka-sample-multi-node/README.md +++ b/akka-samples/akka-sample-multi-node/README.md @@ -8,7 +8,7 @@ with SBT](http://doc.akka.io/docs/akka/current/intro/getting-started.html). When you start SBT within the checked-out akka source directory, you can run this sample by typing - akka-sample-multi-node-experimental/test + akka-sample-multi-node-experimental/multi-jvm:test-only sample.multinode.MultiNodeSampleSpec (You might have to pass a system property containing `akka.test.tags.include=long-running`.) From 426120f50ce554c9858251354ad5fa10d4e7bf04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Antonsson?= Date: Tue, 25 Sep 2012 12:32:56 +0200 Subject: [PATCH 7/7] EventStream should publish to all matching traits. See #2525 --- .../scala/akka/event/EventStreamSpec.scala | 95 ++++++++++++++++++- .../src/main/scala/akka/event/EventBus.scala | 11 ++- .../scala/akka/util/SubclassifiedIndex.scala | 59 ++++++++---- 3 files changed, 138 insertions(+), 27 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala index c027d805d1..5447a1eb74 100644 --- a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala @@ -5,13 +5,13 @@ package akka.event import language.postfixOps -import akka.testkit.AkkaSpec import scala.concurrent.util.duration._ import akka.actor.{ Actor, ActorRef, ActorSystemImpl, ActorSystem, Props, UnhandledMessage } import com.typesafe.config.ConfigFactory import scala.collection.JavaConverters._ import akka.event.Logging.InitializeLogger import akka.pattern.gracefulStop +import akka.testkit.{ TestProbe, AkkaSpec } object EventStreamSpec { @@ -53,6 +53,12 @@ object EventStreamSpec { class B1 extends A class B2 extends A class C extends B1 + + trait T + trait AT extends T + trait BT extends T + class TA + class TAATBT extends TA with AT with BT } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -117,7 +123,7 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) { } } - "manage sub-channels" in { + "manage sub-channels using classes" in { val a = new A val b1 = new B1 val b2 = new B2 @@ -143,6 +149,89 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) { } } + "manage sub-channels using classes and traits (update on subscribe)" in { + val es = new EventStream(false) + val tm1 = new TA + val tm2 = new TAATBT + val a1, a2, a3, a4 = TestProbe() + + es.subscribe(a1.ref, classOf[AT]) must be === true + es.subscribe(a2.ref, classOf[BT]) must be === true + es.subscribe(a3.ref, classOf[TA]) must be === true + es.subscribe(a4.ref, classOf[TAATBT]) must be === true + es.publish(tm1) + es.publish(tm2) + a1.expectMsgType[AT] must be === tm2 + a2.expectMsgType[BT] must be === tm2 + a3.expectMsgType[TA] must be === tm1 + a3.expectMsgType[TA] must be === tm2 + a4.expectMsgType[TAATBT] must be === tm2 + es.unsubscribe(a1.ref, classOf[AT]) must be === true + es.unsubscribe(a2.ref, classOf[BT]) must be === true + es.unsubscribe(a3.ref, classOf[TA]) must be === true + es.unsubscribe(a4.ref, classOf[TAATBT]) must be === true + } + + "manage sub-channels using classes and traits (update on unsubscribe)" in { + val es = new EventStream(false) + val tm1 = new TA + val tm2 = new TAATBT + val a1, a2, a3, a4 = TestProbe() + + es.subscribe(a1.ref, classOf[AT]) must be === true + es.subscribe(a2.ref, classOf[BT]) must be === true + es.subscribe(a3.ref, classOf[TA]) must be === true + es.subscribe(a4.ref, classOf[TAATBT]) must be === true + es.unsubscribe(a3.ref, classOf[TA]) must be === true + es.publish(tm1) + es.publish(tm2) + a1.expectMsgType[AT] must be === tm2 + a2.expectMsgType[BT] must be === tm2 + a3.expectNoMsg(1 second) + a4.expectMsgType[TAATBT] must be === tm2 + es.unsubscribe(a1.ref, classOf[AT]) must be === true + es.unsubscribe(a2.ref, classOf[BT]) must be === true + es.unsubscribe(a4.ref, classOf[TAATBT]) must be === true + } + + "manage sub-channels using classes and traits (update on unsubscribe all)" in { + val es = new EventStream(false) + val tm1 = new TA + val tm2 = new TAATBT + val a1, a2, a3, a4 = TestProbe() + + es.subscribe(a1.ref, classOf[AT]) must be === true + es.subscribe(a2.ref, classOf[BT]) must be === true + es.subscribe(a3.ref, classOf[TA]) must be === true + es.subscribe(a4.ref, classOf[TAATBT]) must be === true + es.unsubscribe(a3.ref) + es.publish(tm1) + es.publish(tm2) + a1.expectMsgType[AT] must be === tm2 + a2.expectMsgType[BT] must be === tm2 + a3.expectNoMsg(1 second) + a4.expectMsgType[TAATBT] must be === tm2 + es.unsubscribe(a1.ref, classOf[AT]) must be === true + es.unsubscribe(a2.ref, classOf[BT]) must be === true + es.unsubscribe(a4.ref, classOf[TAATBT]) must be === true + } + + "manage sub-channels using classes and traits (update on publish)" in { + val es = new EventStream(false) + val tm1 = new TA + val tm2 = new TAATBT + val a1, a2 = TestProbe() + + es.subscribe(a1.ref, classOf[AT]) must be === true + es.subscribe(a2.ref, classOf[BT]) must be === true + es.publish(tm1) + es.publish(tm2) + a1.expectMsgType[AT] must be === tm2 + a2.expectMsgType[BT] must be === tm2 + es.unsubscribe(a1.ref, classOf[AT]) must be === true + es.unsubscribe(a2.ref, classOf[BT]) must be === true + } + } private def verifyLevel(bus: LoggingBus, level: Logging.LogLevel) { @@ -150,7 +239,7 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) { val allmsg = Seq(Debug("", null, "debug"), Info("", null, "info"), Warning("", null, "warning"), Error("", null, "error")) val msg = allmsg filter (_.level <= level) allmsg foreach bus.publish - msg foreach (x ⇒ expectMsg(x)) + msg foreach (expectMsg(_)) } } \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/event/EventBus.scala b/akka-actor/src/main/scala/akka/event/EventBus.scala index cad7351bbb..63436723bd 100644 --- a/akka-actor/src/main/scala/akka/event/EventBus.scala +++ b/akka-actor/src/main/scala/akka/event/EventBus.scala @@ -139,7 +139,7 @@ trait SubchannelClassification { this: EventBus ⇒ val diff = subscriptions.addValue(to, subscriber) if (diff.isEmpty) false else { - cache = cache ++ diff + cache ++= diff true } } @@ -148,16 +148,19 @@ trait SubchannelClassification { this: EventBus ⇒ val diff = subscriptions.removeValue(from, subscriber) if (diff.isEmpty) false else { - cache = cache ++ diff + removeFromCache(diff) true } } def unsubscribe(subscriber: Subscriber): Unit = subscriptions.synchronized { val diff = subscriptions.removeValue(subscriber) - if (diff.nonEmpty) cache = cache ++ diff + if (diff.nonEmpty) removeFromCache(diff) } + private def removeFromCache(changes: Seq[(Classifier, Set[Subscriber])]): Unit = + cache ++= changes map { case (c, s) ⇒ (c, cache.getOrElse(c, Set[Subscriber]()) -- s) } + def publish(event: Event): Unit = { val c = classify(event) val recv = @@ -166,7 +169,7 @@ trait SubchannelClassification { this: EventBus ⇒ if (cache contains c) cache(c) else { val diff = subscriptions.addKey(c) - cache = cache ++ diff + cache ++= diff cache(c) } } diff --git a/akka-actor/src/main/scala/akka/util/SubclassifiedIndex.scala b/akka-actor/src/main/scala/akka/util/SubclassifiedIndex.scala index f3623003ed..45a70be27c 100644 --- a/akka-actor/src/main/scala/akka/util/SubclassifiedIndex.scala +++ b/akka-actor/src/main/scala/akka/util/SubclassifiedIndex.scala @@ -21,9 +21,9 @@ object SubclassifiedIndex { class Nonroot[K, V](val key: K, _values: Set[V])(implicit sc: Subclassification[K]) extends SubclassifiedIndex[K, V](_values) { - override def addValue(key: K, value: V): Changes = { + override def innerAddValue(key: K, value: V): Changes = { // break the recursion on super when key is found and transition to recursive add-to-set - if (sc.isEqual(key, this.key)) addValue(value) else super.addValue(key, value) + if (sc.isEqual(key, this.key)) addValue(value) else super.innerAddValue(key, value) } private def addValue(value: V): Changes = { @@ -34,23 +34,24 @@ object SubclassifiedIndex { } else kids } - override def removeValue(key: K, value: V): Changes = { + override def innerRemoveValue(key: K, value: V): Changes = { // break the recursion on super when key is found and transition to recursive remove-from-set - if (sc.isEqual(key, this.key)) removeValue(value) else super.removeValue(key, value) + if (sc.isEqual(key, this.key)) removeValue(value) else super.innerRemoveValue(key, value) } override def removeValue(value: V): Changes = { val kids = subkeys flatMap (_ removeValue value) if (values contains value) { values -= value - kids :+ ((key, values)) + kids :+ ((key, Set(value))) } else kids } override def toString = subkeys.mkString("Nonroot(" + key + ", " + values + ",\n", ",\n", ")") - } + private[SubclassifiedIndex] def emptyMergeMap[K, V] = internalEmptyMergeMap.asInstanceOf[Map[K, Set[V]]] + private[this] val internalEmptyMergeMap = Map[AnyRef, Set[AnyRef]]().withDefault(_ ⇒ Set[AnyRef]()) } /** @@ -79,44 +80,58 @@ class SubclassifiedIndex[K, V] private (private var values: Set[V])(implicit sc: * Add key to this index which inherits its value set from the most specific * super-class which is known. */ - def addKey(key: K): Changes = - subkeys collectFirst { - case n if sc.isEqual(n.key, key) ⇒ Nil - case n if sc.isSubclass(key, n.key) ⇒ n.addKey(key) - } getOrElse { - integrate(new Nonroot(key, values)) - List((key, values)) + def addKey(key: K): Changes = mergeChangesByKey(innerAddKey(key)) + + protected def innerAddKey(key: K): Changes = { + var found = false + val ch = subkeys flatMap { n ⇒ + if (sc.isEqual(key, n.key)) { + found = true + Nil + } else if (sc.isSubclass(key, n.key)) { + found = true + n.innerAddKey(key) + } else Nil } + if (!found) { + integrate(new Nonroot(key, values)) + Seq((key, values)) + } else ch + } /** * Add value to all keys which are subclasses of the given key. If the key * is not known yet, it is inserted as if using addKey. */ - def addValue(key: K, value: V): Changes = { + def addValue(key: K, value: V): Changes = mergeChangesByKey(innerAddValue(key, value)) + + protected def innerAddValue(key: K, value: V): Changes = { var found = false val ch = subkeys flatMap { n ⇒ if (sc.isSubclass(key, n.key)) { found = true - n.addValue(key, value) + n.innerAddValue(key, value) } else Nil } if (!found) { val v = values + value val n = new Nonroot(key, v) integrate(n) - n.addValue(key, value) :+ ((key, v)) + n.innerAddValue(key, value) :+ (key -> v) } else ch } /** * Remove value from all keys which are subclasses of the given key. + * @return The keys and values that have been removed. */ - def removeValue(key: K, value: V): Changes = { + def removeValue(key: K, value: V): Changes = mergeChangesByKey(innerRemoveValue(key, value)) + protected def innerRemoveValue(key: K, value: V): Changes = { var found = false val ch = subkeys flatMap { n ⇒ if (sc.isSubclass(key, n.key)) { found = true - n.removeValue(key, value) + n.innerRemoveValue(key, value) } else Nil } if (!found) { @@ -129,7 +144,7 @@ class SubclassifiedIndex[K, V] private (private var values: Set[V])(implicit sc: /** * Remove value from all keys in the index. */ - def removeValue(value: V): Changes = subkeys flatMap (_ removeValue value) + def removeValue(value: V): Changes = mergeChangesByKey(subkeys flatMap (_ removeValue value)) override def toString = subkeys.mkString("SubclassifiedIndex(" + values + ",\n", ",\n", ")") @@ -148,4 +163,8 @@ class SubclassifiedIndex[K, V] private (private var values: Set[V])(implicit sc: } } -} \ No newline at end of file + private def mergeChangesByKey(changes: Changes): Changes = + (emptyMergeMap[K, V] /: changes) { + case (m, (k, s)) ⇒ m.updated(k, m(k) ++ s) + }.toSeq +}