diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 3038e3ebc3..0a21cd4c3d 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -248,14 +248,17 @@ trait Actor { /** * User overridable callback. *

- * Is called when an Actor is started by invoking 'actor'. + * Is called when an Actor is started. + * Actors are automatically started asynchronously when created. + * Empty default implementation. */ def preStart() {} /** * User overridable callback. *

- * Is called when 'actor.stop()' is invoked. + * Is called asynchronously after 'actor.stop()' is invoked. + * Empty default implementation. */ def postStop() {} diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index ea5b89671a..b7f2afc8f7 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -196,7 +196,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext /** * When the dispatcher no longer has any actors registered, how long will it wait until it shuts itself down, * defaulting to your akka configs "akka.actor.dispatcher-shutdown-timeout" or default specified in - * akka-actor-reference.conf + * reference.conf */ protected[akka] def shutdownTimeout: Duration diff --git a/akka-docs/general/index.rst b/akka-docs/general/index.rst index 687892b177..f3cc0d2a7d 100644 --- a/akka-docs/general/index.rst +++ b/akka-docs/general/index.rst @@ -5,9 +5,9 @@ General :maxdepth: 2 jmm + message-send-semantics configuration event-handler slf4j addressing supervision - guaranteed-delivery diff --git a/akka-docs/general/guaranteed-delivery.rst b/akka-docs/general/message-send-semantics.rst similarity index 51% rename from akka-docs/general/guaranteed-delivery.rst rename to akka-docs/general/message-send-semantics.rst index 550d84376c..d9488d1f2b 100644 --- a/akka-docs/general/guaranteed-delivery.rst +++ b/akka-docs/general/message-send-semantics.rst @@ -1,13 +1,14 @@ -.. _guaranteed-delivery: +.. _message-send-semantics: -##################### - Guaranteed Delivery -##################### +####################### + Message send semantics +####################### -Guaranteed Delivery -=================== + +Guaranteed Delivery? +==================== Akka does *not* support guaranteed delivery. @@ -34,9 +35,35 @@ in Erlang and requires the user to model his application around. You can read more about this approach in the `Erlang documentation`_ (section 10.9 and 10.10), Akka follows it closely. -Bottom line; you as a developer knows what guarantees you need in your +Bottom line: you as a developer know what guarantees you need in your application and can solve it fastest and most reliable by explicit ``ACK`` and ``RETRY`` (if you really need it, most often you don't). Using Akka's Durable Mailboxes could help with this. -.. _Erlang documentation: http://www.erlang.org/faq/academic.html +Delivery semantics +================== + +At-most-once +------------ + +Actual transports may provide stronger semantics, +but at-most-once is the semantics you should expect. +The alternatives would be once-and-only-once, which is extremely costly, +or at-least-once which essentially requires idempotency of message processing, +which is a user-level concern. + +Ordering is preserved on a per-sender basis +------------------------------------------- + +Actor ``A1` sends messages ``M1``, ``M2``, ``M3`` to ``A2`` +Actor ``A3`` sends messages ``M4``, ``M5``, ``M6`` to ``A2`` + +This means that: + 1) If ``M1`` is delivered it must be delivered before ``M2`` and ``M3`` + 2) If ``M2`` is delivered it must be delivered before ``M3`` + 3) If ``M4`` is delivered it must be delivered before ``M5`` and ``M6`` + 4) If ``M5`` is delivered it must be delivered before ``M6`` + 5) ``A2`` can see messages from ``A1`` interleaved with messages from ``A3`` + 6) Since there is no guaranteed delivery, none, some or all of the messages may arrive to ``A2`` + +.. _Erlang documentation: http://www.erlang.org/faq/academic.html \ No newline at end of file diff --git a/akka-docs/scala/actors.rst b/akka-docs/scala/actors.rst index d4ce659db9..b5cd58ef70 100644 --- a/akka-docs/scala/actors.rst +++ b/akka-docs/scala/actors.rst @@ -28,17 +28,12 @@ its syntax from Erlang. Creating Actors =============== -Actors can be created either by: - -* Extending the Actor class and implementing the receive method. -* Create an anonymous actor using one of the actor methods. - Defining an Actor class ----------------------- Actor classes are implemented by extending the Actor class and implementing the -``receive`` method. The ``receive`` method should define a series of case +:meth:`receive` method. The :meth:`receive` method should define a series of case statements (which has the type ``PartialFunction[Any, Unit]``) that defines which messages your Actor can handle, using standard Scala pattern matching, along with the implementation of how the messages should be processed. @@ -46,21 +41,22 @@ along with the implementation of how the messages should be processed. Here is an example: .. includecode:: code/ActorDocSpec.scala - :include: imports,my-actor + :include: imports1,my-actor Please note that the Akka Actor ``receive`` message loop is exhaustive, which is different compared to Erlang and Scala Actors. This means that you need to provide a pattern match for all messages that it can accept and if you want to be able to handle unknown messages then you need to have a default case as in -the example above. - +the example above. Otherwise an ``UnhandledMessageException`` will be +thrown and the actor is restarted when an unknown message is received. Creating Actors --------------- -.. includecode:: code/ActorDocSpec.scala#creating-actorOf +.. includecode:: code/ActorDocSpec.scala + :include: imports2,system-actorOf -The call to ``actorOf`` returns an instance of ``ActorRef``. This is a handle to +The call to :meth:`actorOf` returns an instance of ``ActorRef``. This is a handle to the ``Actor`` instance which you can use to interact with the ``Actor``. The ``ActorRef`` is immutable and has a one to one relationship with the Actor it represents. The ``ActorRef`` is also serializable and network-aware. This means @@ -68,6 +64,15 @@ that you can serialize it, send it over the wire and use it on a remote host and it will still be representing the same Actor on the original node, across the network. +In the above example the actor was created from the system. It is also possible +to create actors from other actors with the actor ``context``. The difference is +how the supervisor hierarchy is arranged. When using the context the current actor +will be supervisor of the created child actor. When using the system it will be +a top level actor, that is supervised by the system (internal guardian actor). + +.. includecode:: code/ActorDocSpec.scala#context-actorOf + +Actors are automatically started asynchronously when created. Creating Actors with non-default constructor -------------------------------------------- @@ -81,23 +86,21 @@ Here is an example: .. includecode:: code/ActorDocSpec.scala#creating-constructor +Creating Actors with Props +-------------------------- + +``Props`` is a configuration object to specify additional things for the actor to +be created, such as the ``MessageDispatcher``. + +.. includecode:: code/ActorDocSpec.scala#creating-props + + Creating Actors using anonymous classes -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +--------------------------------------- -When spawning actors for specific sub-tasks from within an actor, it may be convenient to include the code to be executed directly in place, using an anonymous class:: +When spawning actors for specific sub-tasks from within an actor, it may be convenient to include the code to be executed directly in place, using an anonymous class. - def receive = { - case m: DoIt => - actorOf(new Actor { - def receive = { - case DoIt(msg) => - val replyMsg = doSomeDangerousWork(msg) - self.reply(replyMsg) - self.stop() - } - def doSomeDangerousWork(msg: Message) = { ... } - }).start() ! m - } +.. includecode:: code/ActorDocSpec.scala#anonymous-actor .. warning:: @@ -108,41 +111,38 @@ When spawning actors for specific sub-tasks from within an actor, it may be conv code will be scheduled concurrently to the enclosing actor. Unfortunately there is not yet a way to detect these illegal accesses at compile time. -Running a block of code asynchronously --------------------------------------- -Here we create a light-weight actor-based thread, that can be used to spawn off -a task. Code blocks spawned up like this are always implicitly started, shut -down and made eligible for garbage collection. The actor that is created "under -the hood" is not reachable from the outside and there is no way of sending -messages to it. It being an actor is only an implementation detail. It will only -run the block in an event-based thread and exit once the block has run to -completion. - -.. code-block:: scala - - spawn { - ... // do stuff - } - - -Actor Internal API -================== +Actor API +========= The :class:`Actor` trait defines only one abstract method, the above mentioned -:meth:`receive`. In addition, it offers two convenience methods -:meth:`become`/:meth:`unbecome` for modifying the hotswap behavior stack as -described in :ref:`Actor.HotSwap` and the :obj:`self` reference to this actor’s -:class:`ActorRef` object. If the current actor behavior does not match a -received message, :meth:`unhandled` is called, which by default throws an -:class:`UnhandledMessageException`. +:meth:`receive`, which implements the behavior of the actor. + +If the current actor behavior does not match a received message, :meth:`unhandled` +is called, which by default throws an :class:`UnhandledMessageException`. + +In addition, it offers: + +* :obj:`self` reference to the :class:`ActorRef` of the actor +* :obj:`sender` reference sender Actor of the last received message, typically used as described in :ref:`Actor.Reply` +* :obj:`context` exposes contextual information for the actor and the current message, such as: + + * factory method to create child actors (:meth:`actorOf`) + * system that the actor belongs to + * parent supervisor + * supervised children + * hotswap behavior stack as described in :ref:`Actor.HotSwap` + +You can import the members in the :obj:`context` to avoid prefixing access with ``context.`` + +.. includecode:: code/ActorDocSpec.scala#import-context The remaining visible methods are user-overridable life-cycle hooks which are described in the following:: def preStart() {} - def preRestart(cause: Throwable, message: Option[Any]) {} - def postRestart(cause: Throwable) {} + def preRestart(reason: Throwable, message: Option[Any]) { postStop() } + def postRestart(reason: Throwable) { preStart() } def postStop() {} The implementations shown above are the defaults provided by the :class:`Actor` @@ -156,7 +156,7 @@ Right after starting the actor, its :meth:`preStart` method is invoked. :: - override def preStart { + override def preStart() { // registering with other actors someService ! Register(self) } @@ -165,9 +165,9 @@ Right after starting the actor, its :meth:`preStart` method is invoked. Restart Hooks ------------- -A supervised actor, i.e. one which is linked to another actor with a fault -handling strategy, will be restarted in case an exception is thrown while -processing a message. This restart involves four of the hooks mentioned above: +All actors are supervised, i.e. linked to another actor with a fault +handling strategy. Actors will be restarted in case an exception is thrown while +processing a message. This restart involves the hooks mentioned above: 1. The old actor is informed by calling :meth:`preRestart` with the exception which caused the restart and the message which triggered that exception; the @@ -175,17 +175,18 @@ processing a message. This restart involves four of the hooks mentioned above: message, e.g. when a supervisor does not trap the exception and is restarted in turn by its supervisor. This method is the best place for cleaning up, preparing hand-over to the fresh actor instance, etc. -2. The initial factory from the ``Actor.actorOf`` call is used + By default it calls :meth:`postStop`. +2. The initial factory from the ``actorOf`` call is used to produce the fresh instance. -3. The new actor’s :meth:`preStart` method is invoked, just as in the normal - start-up case. -4. The new actor’s :meth:`postRestart` method is called with the exception - which caused the restart. +3. The new actor’s :meth:`postRestart` method is invoked with the exception + which caused the restart. By default the :meth:`preStart` + is called, just as in the normal start-up case. An actor restart replaces only the actual actor object; the contents of the mailbox and the hotswap stack are unaffected by the restart, so processing of -messages will resume after the :meth:`postRestart` hook returns. Any message +messages will resume after the :meth:`postRestart` hook returns. The message +that triggered the exception will not be received again. Any message sent to an actor while it is being restarted will be queued to its mailbox as usual. @@ -194,16 +195,15 @@ Stop Hook After stopping an actor, its :meth:`postStop` hook is called, which may be used e.g. for deregistering this actor from other services. This hook is guaranteed -to run after message queuing has been disabled for this actor, i.e. sending -messages would fail with an :class:`IllegalActorStateException`. +to run after message queuing has been disabled for this actor, i.e. messages +sent to a stopped actor will be redirected to the :obj:`deadLetters` of the +:obj:`ActorSystem`. Identifying Actors ================== -An actor is identified by its address. If no address is associated with an actor -then a unique identifier is used instead. The address of an actor can be -accessed using ``self.address``. +FIXME Actor Path documentation Messages and immutability @@ -236,20 +236,9 @@ Send messages Messages are sent to an Actor through one of the following methods. * ``!`` means “fire-and-forget”, e.g. send a message asynchronously and return - immediately. + immediately. Also know as ``tell``. * ``?`` sends a message asynchronously and returns a :class:`Future` - representing a possible reply. - -.. note:: - - There used to be two more “bang” methods, which are now removed in Akka 2.0: - - * ``!!`` was similar to the current ``(actor ? msg).as[T]``; deprecation - followed from the change of timeout handling described below. - * ``!!![T]`` was similar to the current ``(actor ? msg).mapTo[T]``, with the - same change in the handling of :class:`Future`’s timeout as for ``!!``, but - additionally the old method could defer possible type cast problems into - seemingly unrelated parts of the code base. + representing a possible reply. Also know as ``ask``. Message ordering is guaranteed on a per-sender basis. @@ -265,13 +254,11 @@ message. This gives the best concurrency and scalability characteristics. If invoked from within an Actor, then the sending actor reference will be implicitly passed along with the message and available to the receiving Actor -in its ``channel: UntypedChannel`` member field. The target actor can use this -to reply to the original sender, e.g. by using the ``self.reply(message: Any)`` -method. +in its ``sender: ActorRef`` member field. The target actor can use this +to reply to the original sender, by using ``sender ! replyMsg``. -If invoked from an instance that is **not** an Actor there will be no implicit -sender passed along with the message and you will get an -IllegalActorStateException when calling ``self.reply(...)``. +If invoked from an instance that is **not** an Actor the sender will be +:obj:`deadLetters` actor reference by default. Send-And-Receive-Future ----------------------- @@ -284,25 +271,49 @@ will return a :class:`Future`: val future = actor ? "hello" The receiving actor should reply to this message, which will complete the -future with the reply message as value; if the actor throws an exception while -processing the invocation, this exception will also complete the future. If the -actor does not complete the future, it will expire after the timeout period, -which is taken from one of the following three locations in order of -precedence: +future with the reply message as value; ``sender ! result``. + +To complete the future with an exception you need send a Failure message to the sender. +This is not done automatically when an actor throws an exception while processing a +message. + +.. code-block:: scala + + try { + operation() + } catch { + case e: Exception => + sender ! akka.actor.Status.Failure(e) + throw e + } + +If the actor does not complete the future, it will expire after the timeout period, +which is taken from one of the following locations in order of precedence: #. explicitly given timeout as in ``actor.?("hello")(timeout = 12 millis)`` -#. implicit argument of type :class:`Actor.Timeout`, e.g. +#. implicit argument of type :class:`akka.actor.Timeout`, e.g. :: + + import akka.actor.Timeout + import akka.util.duration._ - implicit val timeout = Actor.Timeout(12 millis) + implicit val timeout = Timeout(12 millis) val future = actor ? "hello" -#. default timeout from ``akka.conf`` - See :ref:`futures-scala` for more information on how to await or query a future. +.. warning:: + + When using future callbacks, such as ``onComplete``, ``onResult``, and ``onTimeout``, + inside actors you need to carefully avoid closing over the containing actor’s + reference, i.e. do not call methods or access mutable state on the enclosing actor + from within the callback. This would break the actor encapsulation and may + introduce synchronization bugs and race conditions because the callback + will be scheduled concurrently to the enclosing actor. Unfortunately + there is not yet a way to detect these illegal accesses at compile time. + Send-And-Receive-Eventually --------------------------- @@ -321,17 +332,7 @@ type, it will throw the exception or a :class:`ClassCastException` (if you want to get :obj:`None` in the latter case, use :meth:`Future.asSilently[T]`). In case of a timeout, :obj:`None` is returned. -.. code-block:: scala - - (actor ? msg).as[String] match { - case Some(answer) => ... - case None => ... - } - - val resultOption = (actor ? msg).as[String] - if (resultOption.isDefined) ... else ... - - for (x <- (actor ? msg).as[Int]) yield { 2 * x } +.. includecode:: code/ActorDocSpec.scala#using-ask Forward message --------------- @@ -363,25 +364,15 @@ This method should return a ``PartialFunction``, e.g. a ‘match/case’ clause which the message can be matched against the different case clauses using Scala pattern matching. Here is an example: -.. code-block:: scala +.. includecode:: code/ActorDocSpec.scala + :include: imports1,my-actor - class MyActor extends Actor { - def receive = { - case "Hello" => - log.info("Received 'Hello'") - - case _ => - throw new RuntimeException("unknown message") - } - } +.. _Actor.Reply: Reply to messages ================= -Reply using the sender ----------------------- - If you want to have a handle for replying to a message, you can use ``sender``, which gives you an ActorRef. You can reply by sending to that ActorRef with ``sender ! Message``. You can also store the ActorRef @@ -403,16 +394,7 @@ received within a certain time. To receive this timeout you have to set the ``receiveTimeout`` property and declare a case handing the ReceiveTimeout object. -.. code-block:: scala - - context.receiveTimeout = Some(30000L) // 30 seconds - - def receive = { - case "Hello" => - log.info("Received 'Hello'") - case ReceiveTimeout => - throw new RuntimeException("received timeout") - } +.. includecode:: code/ActorDocSpec.scala#receive-timeout Starting actors =============== @@ -438,12 +420,19 @@ add initialization code for the actor. Stopping actors =============== -Actors are stopped by invoking the ``stop`` method. +Actors are stopped by invoking the ``stop`` method of the ``ActorRef``. +The actual termination of the actor is performed asynchronously, i.e. +``stop`` may return before the actor is stopped. .. code-block:: scala actor.stop() +Processing of the current message, if any, will continue before the actor is stopped, +but additional messages in the mailbox will not be processed. By default these +messages are sent to the :obj:`deadLetters` of the :obj:`ActorSystem`, but that +depends on the mailbox implementation. + When stop is called then a call to the ``def postStop`` callback method will take place. The ``Actor`` can use this callback to implement shutdown behavior. @@ -458,7 +447,9 @@ PoisonPill ========== You can also send an actor the ``akka.actor.PoisonPill`` message, which will -stop the actor when the message is processed. +stop the actor when the message is processed. ``PoisonPill`` is enqueued as +ordinary messages and will be handled after messages that were already queued +in the mailbox. If the sender is a ``Future`` (e.g. the message is sent with ``?``), the ``Future`` will be completed with an @@ -486,22 +477,7 @@ pushed and popped. To hotswap the Actor behavior using ``become``: -.. code-block:: scala - - def angry: Receive = { - case "foo" => context reply "I am already angry?" - case "bar" => become(happy) - } - - def happy: Receive = { - case "bar" => context reply "I am already happy :-)" - case "foo" => become(angry) - } - - def receive = { - case "foo" => become(angry) - case "bar" => become(happy) - } +.. includecode:: code/ActorDocSpec.scala#hot-swap-actor The ``become`` method is useful for many different things, but a particular nice example of it is in example where it is used to implement a Finite State Machine @@ -511,35 +487,12 @@ example of it is in example where it is used to implement a Finite State Machine Here is another little cute example of ``become`` and ``unbecome`` in action: -.. code-block:: scala - - case object Swap - class Swapper extends Actor { - def receive = { - case Swap => - println("Hi") - become { - case Swap => - println("Ho") - unbecome() // resets the latest 'become' (just for fun) - } - } - } - - val swap = actorOf[Swapper] - - swap ! Swap // prints Hi - swap ! Swap // prints Ho - swap ! Swap // prints Hi - swap ! Swap // prints Ho - swap ! Swap // prints Hi - swap ! Swap // prints Ho - +.. includecode:: code/ActorDocSpec.scala#swapper Encoding Scala Actors nested receives without accidentally leaking memory ------------------------------------------------------------------------- -See this `Unnested receive example `_. +See this `Unnested receive example `_. Downgrade @@ -555,7 +508,7 @@ Here's how you use the ``unbecome`` method: .. code-block:: scala - def receive: Receive = { + def receive = { case "revert" => unbecome() } @@ -601,16 +554,11 @@ messages on that mailbox, will be there as well. What happens to the actor ------------------------- -If an exception is thrown and the actor is supervised, the actor object itself -is discarded and a new instance is created. This new instance will now be used -in the actor references to this actor (so this is done invisible to the -developer). - -If the actor is _not_ supervised, but its lifeCycle is set to Permanent -(default), it will just keep on processing messages as if nothing had happened. - -If the actor is _not_ supervised, but its lifeCycle is set to Temporary, it will -be stopped immediately. +If an exception is thrown, the actor instance is discarded and a new instance is +created. This new instance will now be used in the actor references to this actor +(so this is done invisible to the developer). Note that this means that current +state of the failing actor instance is lost if you don't store and restore it in +``preRestart`` and ``postRestart`` callbacks. Extending Actors using PartialFunction chaining @@ -620,32 +568,4 @@ A bit advanced but very useful way of defining a base message handler and then extend that, either through inheritance or delegation, is to use ``PartialFunction.orElse`` chaining. -In generic base Actor: - -.. code-block:: scala - - import akka.actor.Actor.Receive - - abstract class GenericActor extends Actor { - // to be defined in subclassing actor - def specificMessageHandler: Receive - - // generic message handler - def genericMessageHandler: Receive = { - case event => printf("generic: %s\n", event) - } - - def receive = specificMessageHandler orElse genericMessageHandler - } - -In subclassing Actor: - -.. code-block:: scala - - class SpecificActor extends GenericActor { - def specificMessageHandler = { - case event: MyMsg => printf("specific: %s\n", event.subject) - } - } - - case class MyMsg(subject: String) +.. includecode:: code/ActorDocSpec.scala#receive-orElse diff --git a/akka-docs/scala/code/ActorDocSpec.scala b/akka-docs/scala/code/ActorDocSpec.scala index b796e32971..b8a827b9bf 100644 --- a/akka-docs/scala/code/ActorDocSpec.scala +++ b/akka-docs/scala/code/ActorDocSpec.scala @@ -1,16 +1,19 @@ -package akka.docs.stm +package akka.docs.actor + +//#imports1 +import akka.actor.Actor +import akka.event.Logging +//#imports1 + +//#imports2 +import akka.actor.ActorSystem +//#imports2 import org.scalatest.{ BeforeAndAfterAll, WordSpec } import org.scalatest.matchers.MustMatchers import akka.testkit._ import akka.util.duration._ -//#imports -import akka.actor.Actor -import akka.event.Logging - -//#imports - //#my-actor class MyActor extends Actor { val log = Logging(context.system, this) @@ -21,12 +24,110 @@ class MyActor extends Actor { } //#my-actor +case class DoIt(msg: Message) +case class Message(s: String) + +//#context-actorOf +class FirstActor extends Actor { + val myActor = context.actorOf[MyActor] + //#context-actorOf + //#anonymous-actor + def receive = { + case m: DoIt ⇒ + context.actorOf(new Actor { + def receive = { + case DoIt(msg) ⇒ + val replyMsg = doSomeDangerousWork(msg) + sender ! replyMsg + self.stop() + } + def doSomeDangerousWork(msg: Message): String = { "done" } + }) ! m + + case replyMsg: String ⇒ sender ! replyMsg + } + //#anonymous-actor +} + +//#system-actorOf +object Main extends App { + val system = ActorSystem("MySystem") + val myActor = system.actorOf[FirstActor] + //#system-actorOf +} +//#swapper +case object Swap +class Swapper extends Actor { + import context._ + val log = Logging(system, this) + + def receive = { + case Swap ⇒ + log.info("Hi") + become { + case Swap ⇒ + log.info("Ho") + unbecome() // resets the latest 'become' (just for fun) + } + } +} + +object SwapperApp extends App { + val system = ActorSystem("SwapperSystem") + val swap = system.actorOf[Swapper] + swap ! Swap // logs Hi + swap ! Swap // logs Ho + swap ! Swap // logs Hi + swap ! Swap // logs Ho + swap ! Swap // logs Hi + swap ! Swap // logs Ho +} +//#swapper + +//#receive-orElse +import akka.actor.Actor.Receive + +abstract class GenericActor extends Actor { + // to be defined in subclassing actor + def specificMessageHandler: Receive + + // generic message handler + def genericMessageHandler: Receive = { + case event ⇒ printf("generic: %s\n", event) + } + + def receive = specificMessageHandler orElse genericMessageHandler +} + +class SpecificActor extends GenericActor { + def specificMessageHandler = { + case event: MyMsg ⇒ printf("specific: %s\n", event.subject) + } +} + +case class MyMsg(subject: String) +//#receive-orElse + class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { + "import context" in { + //#import-context + class FirstActor extends Actor { + import context._ + val myActor = actorOf[MyActor] + def receive = { + case x ⇒ myActor ! x + } + } + //#import-context + + val first = system.actorOf(new FirstActor) + first.stop() + + } + "creating actor with AkkaSpec.actorOf" in { - //#creating-actorOf val myActor = system.actorOf[MyActor] - //#creating-actorOf // testing the actor @@ -62,4 +163,73 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { myActor.stop() } + + "creating actor with Props" in { + //#creating-props + import akka.actor.Props + val dispatcher = system.dispatcherFactory.fromConfig("my-dispatcher") + val myActor = system.actorOf(Props[MyActor].withDispatcher(dispatcher), name = "myactor") + //#creating-props + + myActor.stop() + } + + "using ask" in { + //#using-ask + class MyActor extends Actor { + def receive = { + case x: String ⇒ sender ! x.toUpperCase + case n: Int ⇒ sender ! (n + 1) + } + } + + val myActor = system.actorOf(new MyActor) + implicit val timeout = system.settings.ActorTimeout + val future = myActor ? "hello" + future.as[String] match { + case Some(answer) ⇒ //... + case None ⇒ //... + } + val result: Option[Int] = for (x ← (myActor ? 3).as[Int]) yield { 2 * x } + //#using-ask + + myActor.stop() + } + + "using receiveTimeout" in { + //#receive-timeout + import akka.actor.ReceiveTimeout + import akka.util.duration._ + class MyActor extends Actor { + context.receiveTimeout = Some(30 seconds) + def receive = { + case "Hello" ⇒ //... + case ReceiveTimeout ⇒ throw new RuntimeException("received timeout") + } + } + //#receive-timeout + } + + "using hot-swap" in { + //#hot-swap-actor + class HotSwapActor extends Actor { + import context._ + def angry: Receive = { + case "foo" ⇒ sender ! "I am already angry?" + case "bar" ⇒ become(happy) + } + + def happy: Receive = { + case "bar" ⇒ sender ! "I am already happy :-)" + case "foo" ⇒ become(angry) + } + + def receive = { + case "foo" ⇒ become(angry) + case "bar" ⇒ become(happy) + } + } + //#hot-swap-actor + } + } diff --git a/akka-docs/scala/code/UnnestedReceives.scala b/akka-docs/scala/code/UnnestedReceives.scala new file mode 100644 index 0000000000..ff84d992e0 --- /dev/null +++ b/akka-docs/scala/code/UnnestedReceives.scala @@ -0,0 +1,48 @@ +package akka.docs.actor + +import akka.actor._ +import akka.actor.Actor._ +import scala.collection.mutable.ListBuffer + +/** + * Requirements are as follows: + * The first thing the actor needs to do, is to subscribe to a channel of events, + * Then it must replay (process) all "old" events + * Then it has to wait for a GoAhead signal to begin processing the new events + * It mustn't "miss" events that happen between catching up with the old events and getting the GoAhead signal + */ +class UnnestedReceives extends Actor { + import context.become + //If you need to store sender/senderFuture you can change it to ListBuffer[(Any, Channel)] + val queue = new ListBuffer[Any]() + + //This message processes a message/event + def process(msg: Any): Unit = println("processing: " + msg) + //This method subscribes the actor to the event bus + def subscribe() {} //Your external stuff + //This method retrieves all prior messages/events + def allOldMessages() = List() + + override def preStart { + //We override preStart to be sure that the first message the actor gets is + //'Replay, that message will start to be processed _after_ the actor is started + self ! 'Replay + //Then we subscribe to the stream of messages/events + subscribe() + } + + def receive = { + case 'Replay ⇒ //Our first message should be a 'Replay message, all others are invalid + allOldMessages() foreach process //Process all old messages/events + become { //Switch behavior to look for the GoAhead signal + case 'GoAhead ⇒ //When we get the GoAhead signal we process all our buffered messages/events + queue foreach process + queue.clear + become { //Then we change behaviour to process incoming messages/events as they arrive + case msg ⇒ process(msg) + } + case msg ⇒ //While we haven't gotten the GoAhead signal, buffer all incoming messages + queue += msg //Here you have full control, you can handle overflow etc + } + } +} diff --git a/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala b/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala index 33ba83dd73..4122c539df 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala @@ -29,7 +29,7 @@ class RemoteExtensionSettings(val config: Config, val systemName: String) extend val InitalDelayForGossip = Duration(config.getMilliseconds("akka.remote.gossip.initialDelay"), MILLISECONDS) val GossipFrequency = Duration(config.getMilliseconds("akka.remote.gossip.frequency"), MILLISECONDS) - // TODO cluster config will go into akka-cluster-reference.conf when we enable that module + // TODO cluster config will go into akka-cluster/reference.conf when we enable that module val ClusterName = getString("akka.cluster.name") val SeedNodes = Set.empty[RemoteAddress] ++ getStringList("akka.cluster.seed-nodes").asScala.toSeq.map(RemoteAddress(_, systemName)) diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala index 87a213e0eb..991e9ca887 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala @@ -32,7 +32,7 @@ class RemoteConfigSpec extends AkkaSpec("akka.cluster.nodename = node1") { getMilliseconds("akka.remote.client.read-timeout") must equal(3600 * 1000) getMilliseconds("akka.remote.client.reconnection-time-window") must equal(600 * 1000) - // TODO cluster config will go into akka-cluster-reference.conf when we enable that module + // TODO cluster config will go into akka-cluster/reference.conf when we enable that module //akka.cluster getString("akka.cluster.name") must equal("default-cluster") getString("akka.cluster.nodename") must equal("node1") diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index fc33c1a3c5..bc596dc126 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -5,14 +5,11 @@ package akka import sbt._ -import Keys._ - +import sbt.Keys._ import com.typesafe.sbtmultijvm.MultiJvmPlugin +import com.typesafe.sbtmultijvm.MultiJvmPlugin.{ MultiJvm, extraOptions, jvmOptions, scalatestOptions } import com.typesafe.sbtscalariform.ScalariformPlugin - -import MultiJvmPlugin.{ MultiJvm, extraOptions, jvmOptions, scalatestOptions } -import ScalariformPlugin.{ format, formatPreferences, formatSourceDirectories } - +import com.typesafe.sbtscalariform.ScalariformPlugin.ScalariformKeys import java.lang.Boolean.getBoolean object AkkaBuild extends Build { @@ -225,14 +222,14 @@ object AkkaBuild extends Build { dependencies = Seq(actor), settings = defaultSettings ) - + lazy val helloSample = Project( id = "akka-sample-hello", base = file("akka-samples/akka-sample-hello"), dependencies = Seq(actor), settings = defaultSettings ) - + lazy val tutorials = Project( id = "akka-tutorials", base = file("akka-tutorials"), @@ -263,7 +260,7 @@ object AkkaBuild extends Build { settings = defaultSettings ++ Seq( unmanagedSourceDirectories in Test <<= baseDirectory { _ ** "code" get }, libraryDependencies ++= Dependencies.docs, - formatSourceDirectories in Test <<= unmanagedSourceDirectories in Test + unmanagedSourceDirectories in ScalariformKeys.format in Test <<= unmanagedSourceDirectories in Test ) ) @@ -335,9 +332,9 @@ object AkkaBuild extends Build { testOptions in Test += Tests.Argument("-oF") ) - lazy val formatSettings = ScalariformPlugin.settings ++ Seq( - formatPreferences in Compile := formattingPreferences, - formatPreferences in Test := formattingPreferences + lazy val formatSettings = ScalariformPlugin.scalariformSettings ++ Seq( + ScalariformKeys.preferences in Compile := formattingPreferences, + ScalariformKeys.preferences in Test := formattingPreferences ) def formattingPreferences = { @@ -348,9 +345,9 @@ object AkkaBuild extends Build { .setPreference(AlignSingleLineCaseStatements, true) } - lazy val multiJvmSettings = MultiJvmPlugin.settings ++ inConfig(MultiJvm)(ScalariformPlugin.formatSettings) ++ Seq( - compileInputs in MultiJvm <<= (compileInputs in MultiJvm) dependsOn (format in MultiJvm), - formatPreferences in MultiJvm := formattingPreferences + lazy val multiJvmSettings = MultiJvmPlugin.settings ++ inConfig(MultiJvm)(ScalariformPlugin.scalariformSettings) ++ Seq( + compileInputs in MultiJvm <<= (compileInputs in MultiJvm) dependsOn (ScalariformKeys.format in MultiJvm), + ScalariformKeys.preferences in MultiJvm := formattingPreferences ) // reStructuredText docs diff --git a/project/Unidoc.scala b/project/Unidoc.scala index c8f13b02ac..7fffd98a27 100644 --- a/project/Unidoc.scala +++ b/project/Unidoc.scala @@ -42,7 +42,7 @@ object Unidoc { } def unidocTask: Initialize[Task[File]] = { - (compilers, cacheDirectory, unidocSources, unidocClasspath, unidocDirectory, scaladocOptions in Compile, streams) map { + (compilers, cacheDirectory, unidocSources, unidocClasspath, unidocDirectory, scalacOptions in doc, streams) map { (compilers, cache, sources, classpath, target, options, s) => { val scaladoc = new Scaladoc(100, compilers.scalac) scaladoc.cached(cache / "unidoc", "main", sources, classpath, target, options, s.log) diff --git a/project/build.properties b/project/build.properties index f2ccdfa377..f4ff7a5afa 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=0.11.0 +sbt.version=0.11.2 diff --git a/project/plugins.sbt b/project/plugins.sbt index 7140718543..e298278e00 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -3,10 +3,11 @@ resolvers += Classpaths.typesafeResolver addSbtPlugin("com.typesafe.sbtmultijvm" % "sbt-multi-jvm" % "0.1.7") -addSbtPlugin("com.typesafe.sbtscalariform" % "sbt-scalariform" % "0.1.4") +addSbtPlugin("com.typesafe.sbtscalariform" % "sbtscalariform" % "0.3.1") resolvers ++= Seq( "less is" at "http://repo.lessis.me", "coda" at "http://repo.codahale.com") -addSbtPlugin("me.lessis" % "ls-sbt" % "0.1.0") +addSbtPlugin("me.lessis" % "ls-sbt" % "0.1.1") +