Merged with current master

This commit is contained in:
Viktor Klang 2011-12-13 10:04:43 +01:00
commit bf01045779
144 changed files with 6879 additions and 8250 deletions

View file

@ -1,106 +0,0 @@
ActorRegistry (Scala)
=====================
Module stability: **SOLID**
ActorRegistry: Finding Actors
-----------------------------
Actors can be looked up by using the ``akka.actor.Actor.registry: akka.actor.ActorRegistry``. Lookups for actors through this registry can be done by:
* uuid akka.actor.Uuid this uses the ``uuid`` field in the Actor class, returns the actor reference for the actor with specified uuid, if one exists, otherwise None
* id string this uses the ``id`` field in the Actor class, which can be set by the user (default is the class name), returns all actor references to actors with specified id
* specific actor class - returns an ``Array[Actor]`` with all actors of this exact class
* parameterized type - returns an ``Array[Actor]`` with all actors that are a subtype of this specific type
Actors are automatically registered in the ActorRegistry when they are started, removed or stopped. You can explicitly register and unregister ActorRef's by using the ``register`` and ``unregister`` methods. The ActorRegistry contains many convenience methods for looking up typed actors.
Here is a summary of the API for finding actors:
.. code-block:: scala
def actors: Array[ActorRef]
def actorFor(uuid: akka.actor.Uuid): Option[ActorRef]
def actorsFor(id : String): Array[ActorRef]
def actorsFor[T <: Actor](implicit manifest: Manifest[T]): Array[ActorRef]
def actorsFor[T <: Actor](clazz: Class[T]): Array[ActorRef]
// finding typed actors
def typedActors: Array[AnyRef]
def typedActorFor(uuid: akka.actor.Uuid): Option[AnyRef]
def typedActorsFor(id: String): Array[AnyRef]
def typedActorsFor[T <: AnyRef](implicit manifest: Manifest[T]): Array[AnyRef]
def typedActorsFor[T <: AnyRef](clazz: Class[T]): Array[AnyRef]
Examples of how to use them:
.. code-block:: scala
val actor = Actor.registry.actorFor(uuid)
val pojo = Actor.registry.typedActorFor(uuid)
.. code-block:: scala
val actors = Actor.registry.actorsFor(classOf[...])
val pojos = Actor.registry.typedActorsFor(classOf[...])
.. code-block:: scala
val actors = Actor.registry.actorsFor(id)
val pojos = Actor.registry.typedActorsFor(id)
.. code-block:: scala
val actors = Actor.registry.actorsFor[MyActorType]
val pojos = Actor.registry.typedActorsFor[MyTypedActorImpl]
The ActorRegistry also has a 'shutdownAll' and 'foreach' methods:
.. code-block:: scala
def foreach(f: (ActorRef) => Unit)
def foreachTypedActor(f: (AnyRef) => Unit)
def shutdownAll()
If you need to know when a new Actor is added or removed from the registry, you can use the subscription API. You can register an Actor that should be notified when an event happens in the ActorRegistry:
.. code-block:: scala
def addListener(listener: ActorRef)
def removeListener(listener: ActorRef)
The messages sent to this Actor are:
.. code-block:: scala
case class ActorRegistered(@BeanProperty address: String,@BeanProperty actor: ActorRef) extends ActorRegistryEvent
case class ActorUnregistered(@BeanProperty address: String, @BeanProperty actor: ActorRef) extends ActorRegistryEvent
case class TypedActorRegistered(@BeanProperty address: String, @BeanProperty actor: ActorRef, @BeanProperty proxy: AnyRef) extends ActorRegistryEvent
case class TypedActorUnregistered(@BeanProperty address: String, @BeanProperty actor: ActorRef, @BeanProperty proxy: AnyRef) extends ActorRegistryEvent
So your listener Actor needs to be able to handle these messages. Example:
.. code-block:: scala
import akka.actor._
import akka.event.EventHandler
class RegistryListener extends Actor {
def receive = {
case event: ActorRegistered =>
EventHandler.info(this, "Actor registered: %s - %s".format(
event.actor.actorClassName, event.actor.uuid))
case event: ActorUnregistered =>
// ...
}
}
The above actor can be added as listener of registry events:
.. code-block:: scala
import akka.actor._
import akka.actor.Actor._
val listener = actorOf[RegistryListener]
registry.addListener(listener)

View file

@ -50,8 +50,8 @@ be able to handle unknown messages then you need to have a default case as in
the example above. Otherwise an ``UnhandledMessageException`` will be
thrown and the actor is restarted when an unknown message is received.
Creating Actors
---------------
Creating Actors with default constructor
----------------------------------------
.. includecode:: code/ActorDocSpec.scala
:include: imports2,system-actorOf
@ -73,6 +73,15 @@ a top level actor, that is supervised by the system (internal guardian actor).
.. includecode:: code/ActorDocSpec.scala#context-actorOf
Actors are automatically started asynchronously when created.
When you create the ``Actor`` then it will automatically call the ``preStart``
callback method on the ``Actor`` trait. This is an excellent place to
add initialization code for the actor.
.. code-block:: scala
override def preStart() = {
... // initialization code
}
Creating Actors with non-default constructor
--------------------------------------------
@ -110,6 +119,7 @@ When spawning actors for specific sub-tasks from within an actor, it may be conv
introduce synchronization bugs and race conditions because the other actors
code will be scheduled concurrently to the enclosing actor. Unfortunately
there is not yet a way to detect these illegal accesses at compile time.
See also: :ref:`jmm-shared-state`
Actor API
@ -127,7 +137,7 @@ In addition, it offers:
* :obj:`sender` reference sender Actor of the last received message, typically used as described in :ref:`Actor.Reply`
* :obj:`context` exposes contextual information for the actor and the current message, such as:
* factory method to create child actors (:meth:`actorOf`)
* factory methods to create child actors (:meth:`actorOf`)
* system that the actor belongs to
* parent supervisor
* supervised children
@ -242,8 +252,8 @@ Messages are sent to an Actor through one of the following methods.
Message ordering is guaranteed on a per-sender basis.
Fire-forget
-----------
Tell: Fire-forget
-----------------
This is the preferred way of sending messages. No blocking waiting for a
message. This gives the best concurrency and scalability characteristics.
@ -260,11 +270,11 @@ to reply to the original sender, by using ``sender ! replyMsg``.
If invoked from an instance that is **not** an Actor the sender will be
:obj:`deadLetters` actor reference by default.
Send-And-Receive-Future
-----------------------
Ask: Send-And-Receive-Future
----------------------------
Using ``?`` will send a message to the receiving Actor asynchronously and
will return a :class:`Future`:
will immediately return a :class:`Future`:
.. code-block:: scala
@ -277,15 +287,7 @@ To complete the future with an exception you need send a Failure message to the
This is not done automatically when an actor throws an exception while processing a
message.
.. code-block:: scala
try {
operation()
} catch {
case e: Exception =>
sender ! akka.actor.Status.Failure(e)
throw e
}
.. includecode:: code/ActorDocSpec.scala#reply-exception
If the actor does not complete the future, it will expire after the timeout period,
which is taken from one of the following locations in order of precedence:
@ -304,18 +306,20 @@ which is taken from one of the following locations in order of precedence:
See :ref:`futures-scala` for more information on how to await or query a
future.
The ``onComplete``, ``onResult``, or ``onTimeout`` methods of the ``Future`` can be
used to register a callback to get a notification when the Future completes.
Gives you a way to avoid blocking.
.. warning::
When using future callbacks, such as ``onComplete``, ``onSuccess``, and ``onFailure``,
inside actors you need to carefully avoid closing over the containing actors
reference, i.e. do not call methods or access mutable state on the enclosing actor
from within the callback. This would break the actor encapsulation and may
introduce synchronization bugs and race conditions because the callback
will be scheduled concurrently to the enclosing actor. Unfortunately
there is not yet a way to detect these illegal accesses at compile time.
Send-And-Receive-Eventually
---------------------------
inside actors you need to carefully avoid closing over the containing actors reference,
i.e. do not call methods or access mutable state on the enclosing actor from
within the callback. This would break the actor encapsulation and may introduce
synchronization bugs and race conditions because the callback will be scheduled
concurrently to the enclosing actor. Unfortunately there is not yet a way to detect
these illegal accesses at compile time.
See also: :ref:`jmm-shared-state`
The future returned from the ``?`` method can conveniently be passed around or
chained with further processing steps, but sometimes you just need the value,
@ -344,7 +348,7 @@ routers, load-balancers, replicators etc.
.. code-block:: scala
actor.forward(message)
myActor.forward(message)
Receive messages
@ -375,7 +379,7 @@ Reply to messages
If you want to have a handle for replying to a message, you can use
``sender``, which gives you an ActorRef. You can reply by sending to
that ActorRef with ``sender ! Message``. You can also store the ActorRef
that ActorRef with ``sender ! replyMsg``. You can also store the ActorRef
for replying later, or passing on to other actors. If there is no sender (a
message was sent without an actor or future context) then the sender
defaults to a 'dead-letter' actor ref.
@ -383,8 +387,8 @@ defaults to a 'dead-letter' actor ref.
.. code-block:: scala
case request =>
val result = process(request)
sender ! result // will have dead-letter actor as default
val result = process(request)
sender ! result // will have dead-letter actor as default
Initial receive timeout
=======================
@ -396,26 +400,6 @@ object.
.. includecode:: code/ActorDocSpec.scala#receive-timeout
Starting actors
===============
Actors are created & started by invoking the ``actorOf`` method.
.. code-block:: scala
val actor = actorOf[MyActor]
actor
When you create the ``Actor`` then it will automatically call the ``def
preStart`` callback method on the ``Actor`` trait. This is an excellent place to
add initialization code for the actor.
.. code-block:: scala
override def preStart() = {
... // initialization code
}
Stopping actors
===============
@ -442,17 +426,20 @@ take place. The ``Actor`` can use this callback to implement shutdown behavior.
... // clean up resources
}
All Actors are stopped when the ``ActorSystem`` is stopped.
Supervised actors are stopped when the supervisor is stopped, i.e. children are stopped
when parent is stopped.
PoisonPill
==========
----------
You can also send an actor the ``akka.actor.PoisonPill`` message, which will
stop the actor when the message is processed. ``PoisonPill`` is enqueued as
ordinary messages and will be handled after messages that were already queued
in the mailbox.
If the sender is a ``Future`` (e.g. the message is sent with ``?``), the
``Future`` will be completed with an
If the ``PoisonPill`` was sent with ``?``, the ``Future`` will be completed with an
``akka.actor.ActorKilledException("PoisonPill")``.
@ -465,7 +452,7 @@ Upgrade
-------
Akka supports hotswapping the Actors message loop (e.g. its implementation) at
runtime: Invoke the ``become`` method from within the Actor.
runtime: Invoke the ``context.become`` method from within the Actor.
Become takes a ``PartialFunction[Any, Unit]`` that implements
the new message handler. The hotswapped code is kept in a Stack which can be
@ -499,7 +486,7 @@ Downgrade
---------
Since the hotswapped code is pushed to a Stack you can downgrade the code as
well, all you need to do is to: Invoke the ``unbecome`` method from within the Actor.
well, all you need to do is to: Invoke the ``context.unbecome`` method from within the Actor.
This will pop the Stack and replace the Actor's implementation with the
``PartialFunction[Any, Unit]`` that is at the top of the Stack.
@ -509,7 +496,7 @@ Here's how you use the ``unbecome`` method:
.. code-block:: scala
def receive = {
case "revert" => unbecome()
case "revert" => context.unbecome()
}

View file

@ -26,7 +26,7 @@ class MyActor extends Actor {
}
//#my-actor
case class DoIt(msg: Message)
case class DoIt(msg: ImmutableMessage)
case class Message(s: String)
//#context-actorOf
@ -43,7 +43,7 @@ class FirstActor extends Actor {
sender ! replyMsg
self.stop()
}
def doSomeDangerousWork(msg: Message): String = { "done" }
def doSomeDangerousWork(msg: ImmutableMessage): String = { "done" }
}) ! m
case replyMsg: String sender ! replyMsg
@ -54,9 +54,29 @@ class FirstActor extends Actor {
//#system-actorOf
object Main extends App {
val system = ActorSystem("MySystem")
val myActor = system.actorOf[FirstActor]
val myActor = system.actorOf[MyActor]
//#system-actorOf
}
class ReplyException extends Actor {
def receive = {
case _
//#reply-exception
try {
val result = operation()
sender ! result
} catch {
case e: Exception
sender ! akka.actor.Status.Failure(e)
throw e
}
//#reply-exception
}
def operation(): String = { "Hi" }
}
//#swapper
case object Swap
class Swapper extends Actor {
@ -169,7 +189,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
"creating actor with Props" in {
//#creating-props
import akka.actor.Props
val dispatcher = system.dispatcherFactory.fromConfig("my-dispatcher")
val dispatcher = system.dispatcherFactory.newFromConfig("my-dispatcher")
val myActor = system.actorOf(Props[MyActor].withDispatcher(dispatcher), name = "myactor")
//#creating-props
@ -230,6 +250,9 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
}
}
//#hot-swap-actor
val actor = system.actorOf(new HotSwapActor)
}
}

View file

@ -120,17 +120,8 @@ should have, as shown above. This defines the number of messages for a specific
Actor the dispatcher should process in one single sweep; in other words, the
dispatcher will bunch up to ``throughput`` message invocations together when
having elected an actor to run. Setting this to a higher number will increase
throughput but lower fairness, and vice versa. If you don't specify it
explicitly then it uses the default value defined in the 'akka.conf'
configuration file:
.. code-block:: ruby
actor {
throughput = 5
}
If you don't define a the 'throughput' option in the configuration file then the default value of '5' will be used.
throughput but lower fairness, and vice versa. If you don't specify it explicitly
then it uses the value (5) defined for ``default-dispatcher`` in the :ref:`configuration`.
Browse the `ScalaDoc <scaladoc>`_ or look at the code for all the options available.

View file

@ -498,7 +498,7 @@ and in the following.
Event Tracing
-------------
The setting ``akka.actor.debug.fsm`` in ``akka.conf`` enables logging of an
The setting ``akka.actor.debug.fsm`` in `:ref:`configuration` enables logging of an
event trace by :class:`LoggingFSM` instances::
class MyFSM extends Actor with LoggingFSM[X, Z] {

View file

@ -244,7 +244,7 @@ In this example, if an ``ArithmeticException`` was thrown while the ``Actor`` pr
Timeouts
--------
Waiting forever for a ``Future`` to be completed can be dangerous. It could cause your program to block indefinitly or produce a memory leak. ``Future`` has support for a timeout already builtin with a default of 5 seconds (taken from 'akka.conf'). A timeout is an instance of ``akka.actor.Timeout`` which contains an ``akka.util.Duration``. A ``Duration`` can be finite, which needs a length and unit type, or infinite. An infinite ``Timeout`` can be dangerous since it will never actually expire.
Waiting forever for a ``Future`` to be completed can be dangerous. It could cause your program to block indefinitly or produce a memory leak. ``Future`` has support for a timeout already builtin with a default of 5 seconds (taken from :ref:`configuration`). A timeout is an instance of ``akka.actor.Timeout`` which contains an ``akka.util.Duration``. A ``Duration`` can be finite, which needs a length and unit type, or infinite. An infinite ``Timeout`` can be dangerous since it will never actually expire.
A different ``Timeout`` can be supplied either explicitly or implicitly when a ``Future`` is created. An implicit ``Timeout`` has the benefit of being usable by a for-comprehension as well as being picked up by any methods looking for an implicit ``Timeout``, while an explicit ``Timeout`` can be used in a more controlled manner.

View file

@ -1,470 +0,0 @@
.. _http-module:
HTTP
====
.. sidebar:: Contents
.. contents:: :local:
Module stability: **SOLID**
When deploying in a servlet container:
--------------------------------------------
If you deploy Akka in a JEE container, don't forget to create an Akka initialization and cleanup hook:
.. code-block:: scala
package com.my //<--- your own package
import akka.util.AkkaLoader
import akka.cluster.BootableRemoteActorService
import akka.actor.BootableActorLoaderService
import javax.servlet.{ServletContextListener, ServletContextEvent}
/**
* This class can be added to web.xml mappings as a listener to start and postStop Akka.
*<web-app>
* ...
* <listener>
* <listener-class>com.my.Initializer</listener-class>
* </listener>
* ...
*</web-app>
*/
class Initializer extends ServletContextListener {
lazy val loader = new AkkaLoader
def contextDestroyed(e: ServletContextEvent): Unit = loader.shutdown
def contextInitialized(e: ServletContextEvent): Unit =
loader.boot(true, new BootableActorLoaderService with BootableRemoteActorService) //<--- Important
// loader.boot(true, new BootableActorLoaderService {}) // If you don't need akka-remote
}
For Java users, it's currently only possible to use BootableActorLoaderService, but you'll need to use: akka.actor.DefaultBootableActorLoaderService
Then you just declare it in your web.xml:
.. code-block:: xml
<web-app>
...
<listener>
<listener-class>your.package.Initializer</listener-class>
</listener>
...
</web-app>
Adapting your own Akka Initializer for the Servlet Container
------------------------------------------------------------
If you want to use akka-camel or any other modules that have their own "Bootable"'s you'll need to write your own Initializer, which is _ultra_ simple, see below for an example on how to include Akka-camel.
.. code-block:: scala
package com.my //<--- your own package
import akka.cluster.BootableRemoteActorService
import akka.actor.BootableActorLoaderService
import akka.camel.CamelService
import javax.servlet.{ServletContextListener, ServletContextEvent}
/**
* This class can be added to web.xml mappings as a listener to start and postStop Akka.
*<web-app>
* ...
* <listener>
* <listener-class>com.my.Initializer</listener-class>
* </listener>
* ...
*</web-app>
*/
class Initializer extends ServletContextListener {
lazy val loader = new AkkaLoader
def contextDestroyed(e: ServletContextEvent): Unit = loader.shutdown
def contextInitialized(e: ServletContextEvent): Unit =
loader.boot(true, new BootableActorLoaderService with BootableRemoteActorService with CamelService) //<--- Important
}
Using Akka with the Pinky REST/MVC framework
--------------------------------------------
Pinky has a slick Akka integration. Read more `here <http://wiki.github.com/pk11/pinky/release-13>`_
jetty-run in SBT
----------------
If you want to use jetty-run in SBT you need to exclude the version of Jetty that is bundled in akka-http:
.. code-block:: scala
override def ivyXML =
<dependencies>
<dependency org="com.typesafe.akka" name="akka-http" rev="AKKA_VERSION_GOES_HERE">
<exclude module="jetty"/>
</dependency>
</dependencies>
Mist - Lightweight Asynchronous HTTP
------------------------------------
The *Mist* layer was developed to provide a direct connection between the servlet container and Akka actors with the goal of handling the incoming HTTP request as quickly as possible in an asynchronous manner. The motivation came from the simple desire to treat REST calls as completable futures, that is, effectively passing the request along an actor message chain to be resumed at the earliest possible time. The primary constraint was to not block any existing threads and secondarily, not create additional ones. Mist is very simple and works both with Jetty Continuations as well as with Servlet API 3.0 (tested using Jetty-8.0.0.M1). When the servlet handles a request, a message is created typed to represent the method (e.g. Get, Post, etc.), the request is suspended and the message is sent (fire-and-forget) to the *root endpoint* actor. That's it. There are no POJOs required to host the service endpoints and the request is treated as any other. The message can be resumed (completed) using a number of helper methods that set the proper HTTP response status code.
Complete runnable example can be found here: `<https://github.com/buka/akka-mist-sample>`_
Endpoints
^^^^^^^^^
Endpoints are actors that handle request messages. Minimally there must be an instance of the *RootEndpoint* and then at least one more (to implement your services).
Preparations
^^^^^^^^^^^^
In order to use Mist you have to register the MistServlet in *web.xml* or do the analogous for the embedded server if running in Akka Microkernel:
.. code-block:: xml
<servlet>
<servlet-name>akkaMistServlet</servlet-name>
<servlet-class>akka.http.AkkaMistServlet</servlet-class>
<init-param> <!-- Optional, if empty or omitted, it will use the default in the akka.conf -->
<param-name>root-endpoint</param-name>
<param-value>address_of_root_endpoint_actor</param-value>
</init-param>
<!-- <async-supported>true</async-supported> Enable this for Servlet 3.0 support -->
</servlet>
<servlet-mapping>
<servlet-name>akkaMistServlet</servlet-name>
<url-pattern>/*</url-pattern>
</servlet-mapping>
Then you also have to add the following dependencies to your SBT build definition:
.. code-block:: scala
val jettyWebapp = "org.eclipse.jetty" % "jetty-webapp" % "8.0.0.M2" % "test"
val javaxServlet30 = "org.mortbay.jetty" % "servlet-api" % "3.0.20100224" % "provided"
Attention: You have to use SBT 0.7.5.RC0 or higher in order to be able to work with that Jetty version.
An Example
^^^^^^^^^^
Startup
*******
In this example, we'll use the built-in *RootEndpoint* class and implement our own service from that. Here the services are started in the boot loader and attached to the top level supervisor.
.. code-block:: scala
class Boot {
val factory = SupervisorFactory(
SupervisorConfig(
OneForOneStrategy(List(classOf[Exception]), 3, 100),
//
// in this particular case, just boot the built-in default root endpoint
//
Supervise(
actorOf[RootEndpoint],
Permanent) ::
Supervise(
actorOf[SimpleAkkaAsyncHttpService],
Permanent)
:: Nil))
factory.newInstance.start
}
**Defining the Endpoint**
The service is an actor that mixes in the *Endpoint* trait. Here the dispatcher is taken from the Akka configuration file which allows for custom tuning of these actors, though naturally, any dispatcher can be used.
URI Handling
************
Rather than use traditional annotations to pair HTTP request and class methods, Mist uses hook and provide functions. This offers a great deal of flexibility in how a given endpoint responds to a URI. A hook function is simply a filter, returning a Boolean to indicate whether or not the endpoint will handle the URI. This can be as simple as a straight match or as fancy as you need. If a hook for a given URI returns true, the matching provide function is called to obtain an actor to which the message can be delivered. Notice in the example below, in one case, the same actor is returned and in the other, a new actor is created and returned. Note that URI hooking is non-exclusive and a message can be delivered to multiple actors (see next example).
Plumbing
********
Hook and provider functions are attached to a parent endpoint, in this case the root, by sending it the **Endpoint.Attach** message.
Finally, bind the *handleHttpRequest* function of the *Endpoint* trait to the actor's *receive* function and we're done.
.. code-block:: scala
class SimpleAkkaAsyncHttpService extends Actor with Endpoint {
final val ServiceRoot = "/simple/"
final val ProvideSameActor = ServiceRoot + "same"
final val ProvideNewActor = ServiceRoot + "new"
//
// use the configurable dispatcher
//
self.dispatcher = Endpoint.Dispatcher
//
// there are different ways of doing this - in this case, we'll use a single hook function
// and discriminate in the provider; alternatively we can pair hooks & providers
//
def hook(uri: String): Boolean = ((uri == ProvideSameActor) || (uri == ProvideNewActor))
def provide(uri: String): ActorRef = {
if (uri == ProvideSameActor) same
else actorOf[BoringActor]
}
//
// this is where you want attach your endpoint hooks
//
override def preStart() = {
//
// we expect there to be one root and that it's already been started up
// obviously there are plenty of other ways to obtaining this actor
// the point is that we need to attach something (for starters anyway)
// to the root
//
val root = Actor.registry.actorsFor(classOf[RootEndpoint]).head
root ! Endpoint.Attach(hook, provide)
}
//
// since this actor isn't doing anything else (i.e. not handling other messages)
// just assign the receive func like so...
// otherwise you could do something like:
// def myrecv = {...}
// def receive = myrecv orElse _recv
//
def receive = handleHttpRequest
//
// this will be our "same" actor provided with ProvideSameActor endpoint is hit
//
lazy val same = actorOf[BoringActor]
}
Handling requests
*****************
Messages are handled just as any other that are received by your actor. The servlet requests and response are not hidden and can be accessed directly as shown below.
.. code-block:: scala
/**
* Define a service handler to respond to some HTTP requests
*/
class BoringActor extends Actor {
import java.util.Date
import javax.ws.rs.core.MediaType
var gets = 0
var posts = 0
var lastget: Option[Date] = None
var lastpost: Option[Date] = None
def receive = {
// handle a get request
case get: Get =>
// the content type of the response.
// similar to @Produces annotation
get.response.setContentType(MediaType.TEXT_HTML)
//
// "work"
//
gets += 1
lastget = Some(new Date)
//
// respond
//
val res = "<p>Gets: "+gets+" Posts: "+posts+"</p><p>Last Get: "+lastget.getOrElse("Never").toString+" Last Post: "+lastpost.getOrElse("Never").toString+"</p>"
get.OK(res)
// handle a post request
case post:Post =>
// the expected content type of the request
// similar to @Consumes
if (post.request.getContentType startsWith MediaType.APPLICATION_FORM_URLENCODED) {
// the content type of the response.
// similar to @Produces annotation
post.response.setContentType(MediaType.TEXT_HTML)
// "work"
posts += 1
lastpost = Some(new Date)
// respond
val res = "<p>Gets: "+gets+" Posts: "+posts+"</p><p>Last Get: "+lastget.getOrElse("Never").toString+" Last Post: "+lastpost.getOrElse("Never").toString+"</p>"
post.OK(res)
} else {
post.UnsupportedMediaType("Content-Type request header missing or incorrect (was '" + post.request.getContentType + "' should be '" + MediaType.APPLICATION_FORM_URLENCODED + "')")
}
}
case other: RequestMethod =>
other.NotAllowed("Invalid method for this endpoint")
}
}
**Timeouts**
Messages will expire according to the default timeout (specified in akka.conf). Individual messages can also be updated using the *timeout* method. One thing that may seem unexpected is that when an expired request returns to the caller, it will have a status code of OK (200). Mist will add an HTTP header to such responses to help clients, if applicable. By default, the header will be named "Async-Timeout" with a value of "expired" - both of which are configurable.
Another Example - multiplexing handlers
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
As noted above, hook functions are non-exclusive. This means multiple actors can handle the same request if desired. In this next example, the hook functions are identical (yes, the same one could have been reused) and new instances of both A and B actors will be created to handle the Post. A third mediator is inserted to coordinate the results of these actions and respond to the caller.
.. code-block:: scala
package sample.mist
import akka.actor._
import akka.actor.Actor._
import akka.http._
import javax.servlet.http.HttpServletResponse
class InterestingService extends Actor with Endpoint {
final val ServiceRoot = "/interesting/"
final val Multi = ServiceRoot + "multi/"
// use the configurable dispatcher
self.dispatcher = Endpoint.Dispatcher
//
// The "multi" endpoint shows forking off multiple actions per request
// It is triggered by POSTing to http://localhost:9998/interesting/multi/{foo}
// Try with/without a header named "Test-Token"
// Try with/without a form parameter named "Data"
def hookMultiActionA(uri: String): Boolean = uri startsWith Multi
def provideMultiActionA(uri: String): ActorRef = actorOf(new ActionAActor(complete))
def hookMultiActionB(uri: String): Boolean = uri startsWith Multi
def provideMultiActionB(uri: String): ActorRef = actorOf(new ActionBActor(complete))
//
// this is where you want attach your endpoint hooks
//
override def preStart() = {
//
// we expect there to be one root and that it's already been started up
// obviously there are plenty of other ways to obtaining this actor
// the point is that we need to attach something (for starters anyway)
// to the root
//
val root = Actor.registry.actorsFor(classOf[RootEndpoint]).head
root ! Endpoint.Attach(hookMultiActionA, provideMultiActionA)
root ! Endpoint.Attach(hookMultiActionB, provideMultiActionB)
}
//
// since this actor isn't doing anything else (i.e. not handling other messages)
// just assign the receive func like so...
// otherwise you could do something like:
// def myrecv = {...}
// def receive = myrecv orElse handleHttpRequest
//
def receive = handleHttpRequest
//
// this guy completes requests after other actions have occurred
//
lazy val complete = actorOf[ActionCompleteActor]
}
class ActionAActor(complete:ActorRef) extends Actor {
import javax.ws.rs.core.MediaType
def receive = {
// handle a post request
case post: Post =>
// the expected content type of the request
// similar to @Consumes
if (post.request.getContentType startsWith MediaType.APPLICATION_FORM_URLENCODED) {
// the content type of the response.
// similar to @Produces annotation
post.response.setContentType(MediaType.TEXT_HTML)
// get the resource name
val name = post.request.getRequestURI.substring("/interesting/multi/".length)
if (name.length % 2 == 0) post.response.getWriter.write("<p>Action A verified request.</p>")
else post.response.getWriter.write("<p>Action A could not verify request.</p>")
// notify the next actor to coordinate the response
complete ! post
} else post.UnsupportedMediaType("Content-Type request header missing or incorrect (was '" + post.request.getContentType + "' should be '" + MediaType.APPLICATION_FORM_URLENCODED + "')")
}
}
}
class ActionBActor(complete:ActorRef) extends Actor {
import javax.ws.rs.core.MediaType
def receive = {
// handle a post request
case post: Post =>
// the expected content type of the request
// similar to @Consumes
if (post.request.getContentType startsWith MediaType.APPLICATION_FORM_URLENCODED) {
// pull some headers and form params
def default(any: Any): String = ""
val token = post.getHeaderOrElse("Test-Token", default)
val data = post.getParameterOrElse("Data", default)
val (resp, status) = (token, data) match {
case ("", _) => ("No token provided", HttpServletResponse.SC_FORBIDDEN)
case (_, "") => ("No data", HttpServletResponse.SC_ACCEPTED)
case _ => ("Data accepted", HttpServletResponse.SC_OK)
}
// update the response body
post.response.getWriter.write(resp)
// notify the next actor to coordinate the response
complete ! (post, status)
} else post.UnsupportedMediaType("Content-Type request header missing or incorrect (was '" + post.request.getContentType + "' should be '" + MediaType.APPLICATION_FORM_URLENCODED + "')")
}
case other: RequestMethod =>
other.NotAllowed("Invalid method for this endpoint")
}
}
class ActionCompleteActor extends Actor {
import collection.mutable.HashMap
val requests = HashMap.empty[Int, Int]
def receive = {
case req: RequestMethod =>
if (requests contains req.hashCode) complete(req)
else requests += (req.hashCode -> 0)
case t: Tuple2[RequestMethod, Int] =>
if (requests contains t._1.hashCode) complete(t._1)
else requests += (t._1.hashCode -> t._2)
}
def complete(req: RequestMethod) = requests.remove(req.hashCode) match {
case Some(HttpServletResponse.SC_FORBIDDEN) => req.Forbidden("")
case Some(HttpServletResponse.SC_ACCEPTED) => req.Accepted("")
case Some(_) => req.OK("")
case _ => {}
}
}
Examples
^^^^^^^^
Using the Akka Mist module with OAuth
*************************************
`<https://gist.github.com/759501>`_
Using the Akka Mist module with the Facebook Graph API and WebGL
****************************************************************
Example project using Akka Mist with the Facebook Graph API and WebGL
`<https://github.com/buka/fbgl1>`_
Using Akka Mist on Amazon ElasticBeanstalk
******************************************
`<https://groups.google.com/group/akka-user/browse_thread/thread/ab7b5432f2fc4153>`_

View file

@ -8,7 +8,6 @@ Scala API
actors
typed-actors
actor-registry
futures
dataflow
agents
@ -18,6 +17,4 @@ Scala API
dispatchers
routing
fsm
http
testing
tutorial-chat-server

View file

@ -5,549 +5,4 @@
Software Transactional Memory (Scala)
#######################################
.. sidebar:: Contents
.. contents:: :local:
Overview of STM
===============
An `STM <http://en.wikipedia.org/wiki/Software_transactional_memory>`_ turns the
Java heap into a transactional data set with begin/commit/rollback
semantics. Very much like a regular database. It implements the first three
letters in ACID; ACI:
* Atomic
* Consistent
* Isolated
Generally, the STM is not needed very often when working with Akka. Some
use-cases (that we can think of) are:
- When you really need composable message flows across many actors updating
their **internal local** state but need them to do that atomically in one big
transaction. Might not be often, but when you do need this then you are
screwed without it.
- When you want to share a datastructure across actors.
- When you need to use the persistence modules.
Akkas STM implements the concept in `Clojure's <clojure>`_ STM view on state in
general. Please take the time to read `this excellent document <clojure-state>`_
and view `this presentation <clojure-presentation>`_ by Rich Hickey (the genius
behind Clojure), since it forms the basis of Akkas view on STM and state in
general.
.. _clojure: http://clojure.org/
.. _clojure-state: http://clojure.org/state
.. _clojure-presentation: http://www.infoq.com/presentations/Value-Identity-State-Rich-Hickey
The STM is based on Transactional References (referred to as Refs). Refs are
memory cells, holding an (arbitrary) immutable value, that implement CAS
(Compare-And-Swap) semantics and are managed and enforced by the STM for
coordinated changes across many Refs. They are implemented using the excellent
`Multiverse STM <multiverse>`_.
.. _multiverse: http://multiverse.codehaus.org/overview.html
Working with immutable collections can sometimes give bad performance due to
extensive copying. Scala provides so-called persistent datastructures which
makes working with immutable collections fast. They are immutable but with
constant time access and modification. They use structural sharing and an insert
or update does not ruin the old structure, hence “persistent”. Makes working
with immutable composite types fast. The persistent datastructures currently
consist of a Map and Vector.
Simple example
==============
Here is a simple example of an incremental counter using STM. This shows
creating a ``Ref``, a transactional reference, and then modifying it within a
transaction, which is delimited by ``atomic``.
.. includecode:: code/StmDocSpec.scala#simple
Ref
---
Refs (transactional references) are mutable references to values and through the STM allow the safe sharing of mutable data. Refs separate identity from value. To ensure safety the value stored in a Ref should be immutable (they can of course contain refs themselves). The value referenced by a Ref can only be accessed or swapped within a transaction. If a transaction is not available, the call will be executed in its own transaction (the call will be atomic). This is a different approach than the Clojure Refs, where a missing transaction results in an error.
Creating a Ref
^^^^^^^^^^^^^^
You can create a Ref with or without an initial value.
.. code-block:: scala
import akka.stm._
// giving an initial value
val ref = Ref(0)
// specifying a type but no initial value
val ref = Ref[Int]
Accessing the value of a Ref
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Use ``get`` to access the value of a Ref. Note that if no initial value has been given then the value is initially ``null``.
.. code-block:: scala
import akka.stm._
val ref = Ref(0)
atomic {
ref.get
}
// -> 0
If there is a chance that the value of a Ref is null then you can use ``opt``, which will create an Option, either Some(value) or None, or you can provide a default value with ``getOrElse``. You can also check for null using ``isNull``.
.. code-block:: scala
import akka.stm._
val ref = Ref[Int]
atomic {
ref.opt // -> None
ref.getOrElse(0) // -> 0
ref.isNull // -> true
}
Changing the value of a Ref
^^^^^^^^^^^^^^^^^^^^^^^^^^^
To set a new value for a Ref you can use ``set`` (or equivalently ``swap``), which sets the new value and returns the old value.
.. code-block:: scala
import akka.stm._
val ref = Ref(0)
atomic {
ref.set(5)
}
// -> 0
atomic {
ref.get
}
// -> 5
You can also use ``alter`` which accepts a function that takes the old value and creates a new value of the same type.
.. code-block:: scala
import akka.stm._
val ref = Ref(0)
atomic {
ref alter (_ + 5)
}
// -> 5
val inc = (i: Int) => i + 1
atomic {
ref alter inc
}
// -> 6
Refs in for-comprehensions
^^^^^^^^^^^^^^^^^^^^^^^^^^
Ref is monadic and can be used in for-comprehensions.
.. code-block:: scala
import akka.stm._
val ref = Ref(1)
atomic {
for (value <- ref) {
// do something with value
}
}
val anotherRef = Ref(3)
atomic {
for {
value1 <- ref
value2 <- anotherRef
} yield (value1 + value2)
}
// -> Ref(4)
val emptyRef = Ref[Int]
atomic {
for {
value1 <- ref
value2 <- emptyRef
} yield (value1 + value2)
}
// -> Ref[Int]
Transactions
------------
A transaction is delimited using ``atomic``.
.. code-block:: scala
atomic {
// ...
}
All changes made to transactional objects are isolated from other changes, all make it or non make it (so failure atomicity) and are consistent. With the AkkaSTM you automatically have the Oracle version of the SERIALIZED isolation level, lower isolation is not possible. To make it fully serialized, set the writeskew property that checks if a writeskew problem is allowed to happen.
Retries
^^^^^^^
A transaction is automatically retried when it runs into some read or write conflict, until the operation completes, an exception (throwable) is thrown or when there are too many retries. When a read or writeconflict is encountered, the transaction uses a bounded exponential backoff to prevent cause more contention and give other transactions some room to complete.
If you are using non transactional resources in an atomic block, there could be problems because a transaction can be retried. If you are using print statements or logging, it could be that they are called more than once. So you need to be prepared to deal with this. One of the possible solutions is to work with a deferred or compensating task that is executed after the transaction aborts or commits.
Unexpected retries
^^^^^^^^^^^^^^^^^^
It can happen for the first few executions that you get a few failures of execution that lead to unexpected retries, even though there is not any read or writeconflict. The cause of this is that speculative transaction configuration/selection is used. There are transactions optimized for a single transactional object, for 1..n and for n to unlimited. So based on the execution of the transaction, the system learns; it begins with a cheap one and upgrades to more expensive ones. Once it has learned, it will reuse this knowledge. It can be activated/deactivated using the speculative property on the TransactionFactory. In most cases it is best use the default value (enabled) so you get more out of performance.
Coordinated transactions and Transactors
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
If you need coordinated transactions across actors or threads then see :ref:`transactors-scala`.
Configuring transactions
^^^^^^^^^^^^^^^^^^^^^^^^
It's possible to configure transactions. The ``atomic`` method can take an implicit or explicit ``TransactionFactory``, which can determine properties of the transaction. A default transaction factory is used if none is specified explicitly or there is no implicit ``TransactionFactory`` in scope.
Configuring transactions with an **implicit** ``TransactionFactory``:
.. code-block:: scala
import akka.stm._
implicit val txFactory = TransactionFactory(readonly = true)
atomic {
// read only transaction
}
Configuring transactions with an **explicit** ``TransactionFactory``:
.. code-block:: scala
import akka.stm._
val txFactory = TransactionFactory(readonly = true)
atomic(txFactory) {
// read only transaction
}
The following settings are possible on a TransactionFactory:
- ``familyName`` - Family name for transactions. Useful for debugging.
- ``readonly`` - Sets transaction as readonly. Readonly transactions are cheaper.
- ``maxRetries`` - The maximum number of times a transaction will retry.
- ``timeout`` - The maximum time a transaction will block for.
- ``trackReads`` - Whether all reads should be tracked. Needed for blocking operations.
- ``writeSkew`` - Whether writeskew is allowed. Disable with care.
- ``blockingAllowed`` - Whether explicit retries are allowed.
- ``interruptible`` - Whether a blocking transaction can be interrupted.
- ``speculative`` - Whether speculative configuration should be enabled.
- ``quickRelease`` - Whether locks should be released as quickly as possible (before whole commit).
- ``propagation`` - For controlling how nested transactions behave.
- ``traceLevel`` - Transaction trace level.
You can also specify the default values for some of these options in ``akka.conf``. Here they are with their default values:
::
stm {
fair = on # Should global transactions be fair or non-fair (non fair yield better performance)
max-retries = 1000
timeout = 5 # Default timeout for blocking transactions and transaction set (in unit defined by
# the time-unit property)
write-skew = true
blocking-allowed = false
interruptible = false
speculative = true
quick-release = true
propagation = "requires"
trace-level = "none"
}
You can also determine at which level a transaction factory is shared or not shared, which affects the way in which the STM can optimise transactions.
Here is a shared transaction factory for all instances of an actor.
.. code-block:: scala
import akka.actor._
import akka.stm._
object MyActor {
implicit val txFactory = TransactionFactory(readonly = true)
}
class MyActor extends Actor {
import MyActor.txFactory
def receive = {
case message: String =>
atomic {
// read only transaction
}
}
}
Here's a similar example with an individual transaction factory for each instance of an actor.
.. code-block:: scala
import akka.actor._
import akka.stm._
class MyActor extends Actor {
implicit val txFactory = TransactionFactory(readonly = true)
def receive = {
case message: String =>
atomic {
// read only transaction
}
}
}
Transaction lifecycle listeners
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
It's possible to have code that will only run on the successful commit of a transaction, or when a transaction aborts. You can do this by adding ``deferred`` or ``compensating`` blocks to a transaction.
.. code-block:: scala
import akka.stm._
atomic {
deferred {
// executes when transaction commits
}
compensating {
// executes when transaction aborts
}
}
Blocking transactions
^^^^^^^^^^^^^^^^^^^^^
You can block in a transaction until a condition is met by using an explicit ``retry``. To use ``retry`` you also need to configure the transaction to allow explicit retries.
Here is an example of using ``retry`` to block until an account has enough money for a withdrawal. This is also an example of using actors and STM together.
.. code-block:: scala
import akka.stm._
import akka.actor._
import akka.util.duration._
import akka.event.EventHandler
type Account = Ref[Double]
case class Transfer(from: Account, to: Account, amount: Double)
class Transferer extends Actor {
implicit val txFactory = TransactionFactory(blockingAllowed = true, trackReads = true, timeout = 60 seconds)
def receive = {
case Transfer(from, to, amount) =>
atomic {
if (from.get < amount) {
EventHandler.info(this, "not enough money - retrying")
retry
}
EventHandler.info(this, "transferring")
from alter (_ - amount)
to alter (_ + amount)
}
}
}
val account1 = Ref(100.0)
val account2 = Ref(100.0)
val transferer = Actor.actorOf(new Transferer)
transferer ! Transfer(account1, account2, 500.0)
// INFO Transferer: not enough money - retrying
atomic { account1 alter (_ + 2000) }
// INFO Transferer: transferring
atomic { account1.get }
// -> 1600.0
atomic { account2.get }
// -> 600.0
transferer.stop()
Alternative blocking transactions
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
You can also have two alternative blocking transactions, one of which can succeed first, with ``either-orElse``.
.. code-block:: scala
import akka.stm._
import akka.actor._
import akka.util.duration._
import akka.event.EventHandler
case class Branch(left: Ref[Int], right: Ref[Int], amount: Int)
class Brancher extends Actor {
implicit val txFactory = TransactionFactory(blockingAllowed = true, trackReads = true, timeout = 60 seconds)
def receive = {
case Branch(left, right, amount) =>
atomic {
either {
if (left.get < amount) {
EventHandler.info(this, "not enough on left - retrying")
retry
}
log.info("going left")
} orElse {
if (right.get < amount) {
EventHandler.info(this, "not enough on right - retrying")
retry
}
log.info("going right")
}
}
}
}
val ref1 = Ref(0)
val ref2 = Ref(0)
val brancher = Actor.actorOf(new Brancher)
brancher ! Branch(ref1, ref2, 1)
// INFO Brancher: not enough on left - retrying
// INFO Brancher: not enough on right - retrying
atomic { ref2 alter (_ + 1) }
// INFO Brancher: not enough on left - retrying
// INFO Brancher: going right
brancher.stop()
Transactional datastructures
----------------------------
Akka provides two datastructures that are managed by the STM.
- ``TransactionalMap``
- ``TransactionalVector``
``TransactionalMap`` and ``TransactionalVector`` look like regular mutable datastructures, they even implement the standard Scala 'Map' and 'RandomAccessSeq' interfaces, but they are implemented using persistent datastructures and managed references under the hood. Therefore they are safe to use in a concurrent environment. Underlying TransactionalMap is HashMap, an immutable Map but with near constant time access and modification operations. Similarly ``TransactionalVector`` uses a persistent Vector. See the Persistent Datastructures section below for more details.
Like managed references, ``TransactionalMap`` and ``TransactionalVector`` can only be modified inside the scope of an STM transaction.
*IMPORTANT*: There have been some problems reported when using transactional datastructures with 'lazy' initialization. Avoid that.
Here is how you create these transactional datastructures:
.. code-block:: scala
import akka.stm._
// assuming something like
case class User(name: String)
case class Address(location: String)
// using initial values
val map = TransactionalMap("bill" -> User("bill"))
val vector = TransactionalVector(Address("somewhere"))
// specifying types
val map = TransactionalMap[String, User]
val vector = TransactionalVector[Address]
``TransactionalMap`` and ``TransactionalVector`` wrap persistent datastructures with transactional references and provide a standard Scala interface. This makes them convenient to use.
Here is an example of using a ``Ref`` and a ``HashMap`` directly:
.. code-block:: scala
import akka.stm._
import scala.collection.immutable.HashMap
case class User(name: String)
val ref = Ref(HashMap[String, User]())
atomic {
val users = ref.get
val newUsers = users + ("bill" -> User("bill")) // creates a new HashMap
ref.swap(newUsers)
}
atomic {
ref.get.apply("bill")
}
// -> User("bill")
Here is the same example using ``TransactionalMap``:
.. code-block:: scala
import akka.stm._
case class User(name: String)
val users = TransactionalMap[String, User]
atomic {
users += "bill" -> User("bill")
}
atomic {
users("bill")
}
// -> User("bill")
Persistent datastructures
-------------------------
Akka's STM should only be used with immutable data. This can be costly if you have large datastructures and are using a naive copy-on-write. In order to make working with immutable datastructures fast enough Scala provides what are called Persistent Datastructures. There are currently two different ones:
* ``HashMap`` (`scaladoc <http://www.scala-lang.org/api/current/scala/collection/immutable/HashMap.html>`__)
* ``Vector`` (`scaladoc <http://www.scala-lang.org/api/current/scala/collection/immutable/Vector.html>`__)
They are immutable and each update creates a completely new version but they are using clever structural sharing in order to make them almost as fast, for both read and update, as regular mutable datastructures.
This illustration is taken from Rich Hickey's presentation. Copyright Rich Hickey 2009.
.. image:: ../images/clojure-trees.png
Ants simulation sample
----------------------
One fun and very enlightening visual demo of STM, actors and transactional references is the `Ant simulation sample <http://github.com/jboner/akka/tree/master/akka-samples/akka-sample-ants/>`_. I encourage you to run it and read through the code since it's a good example of using actors with STM.
Documentation of Akka STM has not been migrated to Akka 2.0-SNAPSHOT yet.

View file

@ -457,7 +457,7 @@ Accounting for Slow Test Systems
The tight timeouts you use during testing on your lightning-fast notebook will
invariably lead to spurious test failures on the heavily loaded Jenkins server
(or similar). To account for this situation, all maximum durations are
internally scaled by a factor taken from ``akka.conf``,
internally scaled by a factor taken from the :ref:`configuration`,
``akka.test.timefactor``, which defaults to 1.
Resolving Conflicts with Implicit ActorRef
@ -716,7 +716,7 @@ options:
* *Logging of message invocations on certain actors*
This is enabled by a setting in ``akka.conf`` — namely
This is enabled by a setting in the :ref:`configuration` — namely
``akka.actor.debug.receive`` — which enables the :meth:`loggable`
statement to be applied to an actors :meth:`receive` function::
@ -728,7 +728,7 @@ options:
The first argument to :meth:`LoggingReceive` defines the source to be used in the
logging events, which should be the current actor.
If the abovementioned setting is not given in ``akka.conf``, this method will
If the abovementioned setting is not given in the :ref:`configuration`, this method will
pass through the given :class:`Receive` function unmodified, meaning that
there is no runtime cost unless actually enabled.

View file

@ -3,248 +3,4 @@
Transactors (Scala)
===================
.. sidebar:: Contents
.. contents:: :local:
Module stability: **SOLID**
Why Transactors?
----------------
Actors are excellent for solving problems where you have many independent processes that can work in isolation and only interact with other Actors through message passing. This model fits many problems. But the actor model is unfortunately a terrible model for implementing truly shared state. E.g. when you need to have consensus and a stable view of state across many components. The classic example is the bank account where clients can deposit and withdraw, in which each operation needs to be atomic. For detailed discussion on the topic see `this JavaOne presentation <http://www.slideshare.net/jboner/state-youre-doing-it-wrong-javaone-2009>`_.
**STM** on the other hand is excellent for problems where you need consensus and a stable view of the state by providing compositional transactional shared state. Some of the really nice traits of STM are that transactions compose, and it raises the abstraction level from lock-based concurrency.
Akka's Transactors combine Actors and STM to provide the best of the Actor model (concurrency and asynchronous event-based programming) and STM (compositional transactional shared state) by providing transactional, compositional, asynchronous, event-based message flows.
If you need Durability then you should not use one of the in-memory data structures but one of the persistent ones.
Generally, the STM is not needed very often when working with Akka. Some use-cases (that we can think of) are:
- When you really need composable message flows across many actors updating their **internal local** state but need them to do that atomically in one big transaction. Might not often, but when you do need this then you are screwed without it.
- When you want to share a datastructure across actors.
- When you need to use the persistence modules.
Actors and STM
^^^^^^^^^^^^^^
You can combine Actors and STM in several ways. An Actor may use STM internally so that particular changes are guaranteed to be atomic. Actors may also share transactional datastructures as the STM provides safe shared state across threads.
It's also possible to coordinate transactions across Actors or threads so that either the transactions in a set all commit successfully or they all fail. This is the focus of Transactors and the explicit support for coordinated transactions in this section.
Coordinated transactions
------------------------
Akka provides an explicit mechanism for coordinating transactions across Actors. Under the hood it uses a ``CountDownCommitBarrier``, similar to a CountDownLatch.
Here is an example of coordinating two simple counter Actors so that they both increment together in coordinated transactions. If one of them was to fail to increment, the other would also fail.
.. code-block:: scala
import akka.transactor.Coordinated
import akka.stm.Ref
import akka.actor.{Actor, ActorRef}
case class Increment(friend: Option[ActorRef] = None)
case object GetCount
class Counter extends Actor {
val count = Ref(0)
def receive = {
case coordinated @ Coordinated(Increment(friend)) => {
friend foreach (_ ! coordinated(Increment()))
coordinated atomic {
count alter (_ + 1)
}
}
case GetCount => self.reply(count.get)
}
}
val counter1 = Actor.actorOf[Counter]
val counter2 = Actor.actorOf[Counter]
counter1 ! Coordinated(Increment(Some(counter2)))
...
(counter1 ? GetCount).as[Int] // Some(1)
counter1.stop()
counter2.stop()
To start a new coordinated transaction that you will also participate in, just create a ``Coordinated`` object:
.. code-block:: scala
val coordinated = Coordinated()
To start a coordinated transaction that you won't participate in yourself you can create a ``Coordinated`` object with a message and send it directly to an actor. The recipient of the message will be the first member of the coordination set:
.. code-block:: scala
actor ! Coordinated(Message)
To receive a coordinated message in an actor simply match it in a case statement:
.. code-block:: scala
def receive = {
case coordinated @ Coordinated(Message) => ...
}
To include another actor in the same coordinated transaction that you've created or received, use the apply method on that object. This will increment the number of parties involved by one and create a new ``Coordinated`` object to be sent.
.. code-block:: scala
actor ! coordinated(Message)
To enter the coordinated transaction use the atomic method of the coordinated object:
.. code-block:: scala
coordinated atomic {
// do something in transaction ...
}
The coordinated transaction will wait for the other transactions before committing. If any of the coordinated transactions fail then they all fail.
Transactor
----------
Transactors are actors that provide a general pattern for coordinating transactions, using the explicit coordination described above.
Here's an example of a simple transactor that will join a coordinated transaction:
.. code-block:: scala
import akka.transactor.Transactor
import akka.stm.Ref
case object Increment
class Counter extends Transactor {
val count = Ref(0)
override def atomically = {
case Increment => count alter (_ + 1)
}
}
You could send this Counter transactor a ``Coordinated(Increment)`` message. If you were to send it just an ``Increment`` message it will create its own ``Coordinated`` (but in this particular case wouldn't be coordinating transactions with any other transactors).
To coordinate with other transactors override the ``coordinate`` method. The ``coordinate`` method maps a message to a set of ``SendTo`` objects, pairs of ``ActorRef`` and a message. You can use the ``include`` and ``sendTo`` methods to easily coordinate with other transactors. The ``include`` method will send on the same message that was received to other transactors. The ``sendTo`` method allows you to specify both the actor to send to, and the message to send.
Example of coordinating an increment:
.. code-block:: scala
import akka.transactor.Transactor
import akka.stm.Ref
import akka.actor.ActorRef
case object Increment
class FriendlyCounter(friend: ActorRef) extends Transactor {
val count = Ref(0)
override def coordinate = {
case Increment => include(friend)
}
override def atomically = {
case Increment => count alter (_ + 1)
}
}
Using ``include`` to include more than one transactor:
.. code-block:: scala
override def coordinate = {
case Message => include(actor1, actor2, actor3)
}
Using ``sendTo`` to coordinate transactions but pass-on a different message than the one that was received:
.. code-block:: scala
override def coordinate = {
case Message => sendTo(someActor -> SomeOtherMessage)
case SomeMessage => sendTo(actor1 -> Message1, actor2 -> Message2)
}
To execute directly before or after the coordinated transaction, override the ``before`` and ``after`` methods. These methods also expect partial functions like the receive method. They do not execute within the transaction.
To completely bypass coordinated transactions override the ``normally`` method. Any message matched by ``normally`` will not be matched by the other methods, and will not be involved in coordinated transactions. In this method you can implement normal actor behavior, or use the normal STM atomic for local transactions.
Coordinating Typed Actors
-------------------------
It's also possible to use coordinated transactions with typed actors. You can explicitly pass around ``Coordinated`` objects, or use built-in support with the ``@Coordinated`` annotation and the ``Coordination.coordinate`` method.
To specify a method should use coordinated transactions add the ``@Coordinated`` annotation. **Note**: the ``@Coordinated`` annotation only works with methods that return Unit (one-way methods).
.. code-block:: scala
trait Counter {
@Coordinated def increment()
def get: Int
}
To coordinate transactions use a ``coordinate`` block:
.. code-block:: scala
coordinate {
counter1.increment()
counter2.increment()
}
Here's an example of using ``@Coordinated`` with a TypedActor to coordinate increments.
.. code-block:: scala
import akka.actor.TypedActor
import akka.stm.Ref
import akka.transactor.annotation.Coordinated
import akka.transactor.Coordination._
trait Counter {
@Coordinated def increment()
def get: Int
}
class CounterImpl extends TypedActor with Counter {
val ref = Ref(0)
def increment() { ref alter (_ + 1) }
def get = ref.get
}
...
val counter1 = TypedActor.newInstance(classOf[Counter], classOf[CounterImpl])
val counter2 = TypedActor.newInstance(classOf[Counter], classOf[CounterImpl])
coordinate {
counter1.increment()
counter2.increment()
}
TypedActor.stop(counter1)
TypedActor.stop(counter2)
The ``coordinate`` block will wait for the transactions to complete. If you do not want to wait then you can specify this explicitly:
.. code-block:: scala
coordinate(wait = false) {
counter1.increment()
counter2.increment()
}
Documentation of Akka Transactors has not been migrated to Akka 2.0-SNAPSHOT yet.

View file

@ -1,8 +0,0 @@
Tutorial: write a scalable, fault-tolerant, network chat server and client (Scala)
=============================================================================================
.. sidebar:: Contents
.. contents:: :local:
REWRITE ME

View file

@ -178,7 +178,7 @@ Messages and immutability
**IMPORTANT**: Messages can be any kind of object but have to be immutable (there is a workaround, see next section). Java or Scala cant enforce immutability (yet) so this has to be by convention. Primitives like String, int, Long are always immutable. Apart from these you have to create your own immutable objects to send as messages. If you pass on a reference to an instance that is mutable then this instance can be modified concurrently by two different Typed Actors and the Actor model is broken leaving you with NO guarantees and most likely corrupt data.
Akka can help you in this regard. It allows you to turn on an option for serializing all messages, e.g. all parameters to the Typed Actor effectively making a deep clone/copy of the parameters. This will make sending mutable messages completely safe. This option is turned on in the $AKKA_HOME/config/akka.conf config file like this:
Akka can help you in this regard. It allows you to turn on an option for serializing all messages, e.g. all parameters to the Typed Actor effectively making a deep clone/copy of the parameters. This will make sending mutable messages completely safe. This option is turned on in the :ref:`configuration` file like this:
.. code-block:: ruby