Merge branch 'master' into 1063-docs-routing-he

Conflicts:
	akka-docs/java/routing.rst
	akka-docs/scala/routing.rst
This commit is contained in:
Henrik Engstrom 2011-12-15 20:15:32 +01:00
commit 26d49fecb9
57 changed files with 1633 additions and 877 deletions

View file

@ -99,7 +99,7 @@ object TypedActorSpec {
} }
def futureComposePigdogFrom(foo: Foo): Future[String] = { def futureComposePigdogFrom(foo: Foo): Future[String] = {
implicit val timeout = TypedActor.system.settings.ActorTimeout implicit val timeout = TypedActor.context.system.settings.ActorTimeout
foo.futurePigdog(500).map(_.toUpperCase) foo.futurePigdog(500).map(_.toUpperCase)
} }

View file

@ -39,7 +39,8 @@ akka {
# Timeout for ActorSystem.actorOf # Timeout for ActorSystem.actorOf
creation-timeout = 20s creation-timeout = 20s
# frequency with which stopping actors are prodded in case they had to be removed from their parents # frequency with which stopping actors are prodded in case they had to be
# removed from their parents
reaper-interval = 5s reaper-interval = 5s
# Default timeout for Future based invocations # Default timeout for Future based invocations
@ -83,9 +84,9 @@ akka {
} }
target { target {
# Alternatively to giving nr-of-instances you can specify the full paths of # Alternatively to giving nr-of-instances you can specify the full
# those actors which should be routed to. This setting takes precedence over # paths of those actors which should be routed to. This setting takes
# nr-of-instances # precedence over nr-of-instances
paths = [] paths = []
} }
@ -94,8 +95,10 @@ akka {
default-dispatcher { default-dispatcher {
# Must be one of the following # Must be one of the following
# Dispatcher, (BalancingDispatcher, only valid when all actors using it are of the same type), # Dispatcher, (BalancingDispatcher, only valid when all actors using it are of
# A FQCN to a class inheriting MessageDispatcherConfigurator with a no-arg visible constructor # the same type),
# A FQCN to a class inheriting MessageDispatcherConfigurator with a no-arg
# visible constructor
type = "Dispatcher" type = "Dispatcher"
# Name used in log messages and thread names. # Name used in log messages and thread names.
@ -129,22 +132,25 @@ akka {
# Specifies the bounded capacity of the task queue (< 1 == unbounded) # Specifies the bounded capacity of the task queue (< 1 == unbounded)
task-queue-size = -1 task-queue-size = -1
# Specifies which type of task queue will be used, can be "array" or "linked" (default) # Specifies which type of task queue will be used, can be "array" or
# "linked" (default)
task-queue-type = "linked" task-queue-type = "linked"
# Allow core threads to time out # Allow core threads to time out
allow-core-timeout = on allow-core-timeout = on
# Throughput defines the number of messages that are processed in a batch before the # Throughput defines the number of messages that are processed in a batch
# thread is returned to the pool. Set to 1 for as fair as possible. # before the thread is returned to the pool. Set to 1 for as fair as possible.
throughput = 5 throughput = 5
# Throughput deadline for Dispatcher, set to 0 or negative for no deadline # Throughput deadline for Dispatcher, set to 0 or negative for no deadline
throughput-deadline-time = 0ms throughput-deadline-time = 0ms
# If negative (or zero) then an unbounded mailbox is used (default) # If negative (or zero) then an unbounded mailbox is used (default)
# If positive then a bounded mailbox is used and the capacity is set using the property # If positive then a bounded mailbox is used and the capacity is set using the
# NOTE: setting a mailbox to 'blocking' can be a bit dangerous, could lead to deadlock, use with care # property
# NOTE: setting a mailbox to 'blocking' can be a bit dangerous, could lead to
# deadlock, use with care
# The following are only used for Dispatcher and only if mailbox-capacity > 0 # The following are only used for Dispatcher and only if mailbox-capacity > 0
mailbox-capacity = -1 mailbox-capacity = -1
@ -154,7 +160,8 @@ akka {
} }
debug { debug {
# enable function of Actor.loggable(), which is to log any received message at DEBUG level # enable function of Actor.loggable(), which is to log any received message at
# DEBUG level
receive = off receive = off
# enable DEBUG logging of all AutoReceiveMessages (Kill, PoisonPill and the like) # enable DEBUG logging of all AutoReceiveMessages (Kill, PoisonPill and the like)
@ -170,8 +177,8 @@ akka {
event-stream = off event-stream = off
} }
# Entries for pluggable serializers and their bindings. If a binding for a specific class is not found, # Entries for pluggable serializers and their bindings. If a binding for a specific
# then the default serializer (Java serialization) is used. # class is not found, then the default serializer (Java serialization) is used.
serializers { serializers {
# java = "akka.serialization.JavaSerializer" # java = "akka.serialization.JavaSerializer"
# proto = "akka.testing.ProtobufSerializer" # proto = "akka.testing.ProtobufSerializer"
@ -193,13 +200,16 @@ akka {
} }
# Used to set the behavior of the scheduler. # Used to set the behavior of the scheduler.
# Changing the default values may change the system behavior drastically so make sure you know what you're doing! # Changing the default values may change the system behavior drastically so make sure
# you know what you're doing!
# #
scheduler { scheduler {
# The HashedWheelTimer (HWT) implementation from Netty is used as the default scheduler in the system. # The HashedWheelTimer (HWT) implementation from Netty is used as the default scheduler
# in the system.
# HWT does not execute the scheduled tasks on exact time. # HWT does not execute the scheduled tasks on exact time.
# It will, on every tick, check if there are any tasks behind the schedule and execute them. # It will, on every tick, check if there are any tasks behind the schedule and execute them.
# You can increase or decrease the accuracy of the execution timing by specifying smaller or larger tick duration. # You can increase or decrease the accuracy of the execution timing by specifying smaller
# or larger tick duration.
# If you are scheduling a lot of tasks you should consider increasing the ticks per wheel. # If you are scheduling a lot of tasks you should consider increasing the ticks per wheel.
# For more information see: http://www.jboss.org/netty/ # For more information see: http://www.jboss.org/netty/
tickDuration = 100ms tickDuration = 100ms

View file

@ -25,48 +25,60 @@ import akka.util.Duration
*/ */
trait Scheduler { trait Scheduler {
/** /**
* Schedules a message to be sent repeatedly with an initial delay and frequency. * Schedules a message to be sent repeatedly with an initial delay and
* E.g. if you would like a message to be sent immediately and thereafter every 500ms you would set * frequency. E.g. if you would like a message to be sent immediately and
* delay = Duration.Zero and frequency = Duration(500, TimeUnit.MILLISECONDS) * thereafter every 500ms you would set delay = Duration.Zero and frequency
* = Duration(500, TimeUnit.MILLISECONDS)
* *
* Java & Scala API * Java & Scala API
*/ */
def schedule(initialDelay: Duration, frequency: Duration, receiver: ActorRef, message: Any): Cancellable def schedule(
initialDelay: Duration,
frequency: Duration,
receiver: ActorRef,
message: Any): Cancellable
/** /**
* Schedules a function to be run repeatedly with an initial delay and a frequency. * Schedules a function to be run repeatedly with an initial delay and a
* E.g. if you would like the function to be run after 2 seconds and thereafter every 100ms you would set * frequency. E.g. if you would like the function to be run after 2 seconds
* delay = Duration(2, TimeUnit.SECONDS) and frequency = Duration(100, TimeUnit.MILLISECONDS) * and thereafter every 100ms you would set delay = Duration(2, TimeUnit.SECONDS)
* and frequency = Duration(100, TimeUnit.MILLISECONDS)
* *
* Scala API * Scala API
*/ */
def schedule(initialDelay: Duration, frequency: Duration)(f: Unit): Cancellable def schedule(
initialDelay: Duration, frequency: Duration)(f: Unit): Cancellable
/** /**
* Schedules a function to be run repeatedly with an initial delay and a frequency. * Schedules a function to be run repeatedly with an initial delay and
* E.g. if you would like the function to be run after 2 seconds and thereafter every 100ms you would set * a frequency. E.g. if you would like the function to be run after 2
* delay = Duration(2, TimeUnit.SECONDS) and frequency = Duration(100, TimeUnit.MILLISECONDS) * seconds and thereafter every 100ms you would set delay = Duration(2,
* TimeUnit.SECONDS) and frequency = Duration(100, TimeUnit.MILLISECONDS)
* *
* Java API * Java API
*/ */
def schedule(initialDelay: Duration, frequency: Duration, runnable: Runnable): Cancellable def schedule(
initialDelay: Duration, frequency: Duration, runnable: Runnable): Cancellable
/** /**
* Schedules a Runnable to be run once with a delay, i.e. a time period that has to pass before the runnable is executed. * Schedules a Runnable to be run once with a delay, i.e. a time period that
* has to pass before the runnable is executed.
* *
* Java & Scala API * Java & Scala API
*/ */
def scheduleOnce(delay: Duration, runnable: Runnable): Cancellable def scheduleOnce(delay: Duration, runnable: Runnable): Cancellable
/** /**
* Schedules a message to be sent once with a delay, i.e. a time period that has to pass before the message is sent. * Schedules a message to be sent once with a delay, i.e. a time period that has
* to pass before the message is sent.
* *
* Java & Scala API * Java & Scala API
*/ */
def scheduleOnce(delay: Duration, receiver: ActorRef, message: Any): Cancellable def scheduleOnce(delay: Duration, receiver: ActorRef, message: Any): Cancellable
/** /**
* Schedules a function to be run once with a delay, i.e. a time period that has to pass before the function is run. * Schedules a function to be run once with a delay, i.e. a time period that has
* to pass before the function is run.
* *
* Scala API * Scala API
*/ */
@ -95,4 +107,4 @@ trait Cancellable {
*/ */
def isCancelled: Boolean def isCancelled: Boolean
} }
//#cancellable //#cancellable

View file

@ -51,6 +51,8 @@ trait TypedActorFactory {
* Creates a new TypedActor proxy using the supplied Props, * Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the suppli ed interface class (if the class represents an interface) or * the interfaces usable by the returned proxy is the suppli ed interface class (if the class represents an interface) or
* all interfaces (Class.getInterfaces) if it's not an interface class * all interfaces (Class.getInterfaces) if it's not an interface class
*
* Java API
*/ */
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props): R = def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props): R =
typedActor.createProxyAndTypedActor(actorFactory, interface, impl.newInstance, props, None, interface.getClassLoader) typedActor.createProxyAndTypedActor(actorFactory, interface, impl.newInstance, props, None, interface.getClassLoader)
@ -59,6 +61,8 @@ trait TypedActorFactory {
* Creates a new TypedActor proxy using the supplied Props, * Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or * the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or
* all interfaces (Class.getInterfaces) if it's not an interface class * all interfaces (Class.getInterfaces) if it's not an interface class
*
* Java API
*/ */
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props, name: String): R = def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props, name: String): R =
typedActor.createProxyAndTypedActor(actorFactory, interface, impl.newInstance, props, Some(name), interface.getClassLoader) typedActor.createProxyAndTypedActor(actorFactory, interface, impl.newInstance, props, Some(name), interface.getClassLoader)
@ -67,6 +71,8 @@ trait TypedActorFactory {
* Creates a new TypedActor proxy using the supplied Props, * Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or * the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or
* all interfaces (Class.getInterfaces) if it's not an interface class * all interfaces (Class.getInterfaces) if it's not an interface class
*
* Java API
*/ */
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props): R = def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props): R =
typedActor.createProxyAndTypedActor(actorFactory, interface, impl.create, props, None, interface.getClassLoader) typedActor.createProxyAndTypedActor(actorFactory, interface, impl.create, props, None, interface.getClassLoader)
@ -75,6 +81,8 @@ trait TypedActorFactory {
* Creates a new TypedActor proxy using the supplied Props, * Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or * the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or
* all interfaces (Class.getInterfaces) if it's not an interface class * all interfaces (Class.getInterfaces) if it's not an interface class
*
* Java API
*/ */
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props, name: String): R = def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props, name: String): R =
typedActor.createProxyAndTypedActor(actorFactory, interface, impl.create, props, Some(name), interface.getClassLoader) typedActor.createProxyAndTypedActor(actorFactory, interface, impl.create, props, Some(name), interface.getClassLoader)
@ -83,55 +91,21 @@ trait TypedActorFactory {
* Creates a new TypedActor proxy using the supplied Props, * Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or * the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or
* all interfaces (Class.getInterfaces) if it's not an interface class * all interfaces (Class.getInterfaces) if it's not an interface class
*
* Scala API
*/ */
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props, loader: ClassLoader): R = def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: T, props: Props, name: String): R =
typedActor.createProxyAndTypedActor(actorFactory, interface, impl.newInstance, props, None, loader) typedActor.createProxyAndTypedActor(actorFactory, interface, impl, props, Some(name), interface.getClassLoader)
/**
* Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or
* all interfaces (Class.getInterfaces) if it's not an interface class
*/
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props, name: String, loader: ClassLoader): R =
typedActor.createProxyAndTypedActor(actorFactory, interface, impl.newInstance, props, Some(name), loader)
/**
* Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or
* all interfaces (Class.getInterfaces) if it's not an interface class
*/
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props, loader: ClassLoader): R =
typedActor.createProxyAndTypedActor(actorFactory, interface, impl.create, props, None, loader)
/**
* Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or
* all interfaces (Class.getInterfaces) if it's not an interface class
*/
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props, name: String, loader: ClassLoader): R =
typedActor.createProxyAndTypedActor(actorFactory, interface, impl.create, props, Some(name), loader)
/** /**
* Creates a new TypedActor proxy using the supplied Props, * Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces) * the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces)
*
* Scala API
*/ */
def typedActorOf[R <: AnyRef, T <: R](impl: Class[T], props: Props, loader: ClassLoader): R = def typedActorOf[R <: AnyRef, T <: R: ClassManifest](props: Props = Props(), name: String = null): R = {
typedActor.createProxyAndTypedActor(actorFactory, impl, impl.newInstance, props, None, loader) val clazz = implicitly[ClassManifest[T]].erasure.asInstanceOf[Class[T]]
typedActor.createProxyAndTypedActor(actorFactory, clazz, clazz.newInstance, props, Option(name), clazz.getClassLoader)
/**
* Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces)
*/
def typedActorOf[R <: AnyRef, T <: R](impl: Class[T], props: Props, name: String, loader: ClassLoader): R =
typedActor.createProxyAndTypedActor(actorFactory, impl, impl.newInstance, props, Some(name), loader)
/**
* Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces)
*/
def typedActorOf[R <: AnyRef, T <: R](props: Props = Props(), name: String = null, loader: ClassLoader = null)(implicit m: Manifest[T]): R = {
val clazz = m.erasure.asInstanceOf[Class[T]]
typedActor.createProxyAndTypedActor(actorFactory, clazz, clazz.newInstance, props, Option(name), if (loader eq null) clazz.getClassLoader else loader)
} }
/** /**
@ -172,6 +146,8 @@ trait TypedActorFactory {
} }
object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvider { object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvider {
override def get(system: ActorSystem): TypedActorExtension = super.get(system)
def lookup() = this def lookup() = this
def createExtension(system: ActorSystemImpl): TypedActorExtension = new TypedActorExtension(system) def createExtension(system: ActorSystemImpl): TypedActorExtension = new TypedActorExtension(system)
@ -255,7 +231,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
} }
private val selfReference = new ThreadLocal[AnyRef] private val selfReference = new ThreadLocal[AnyRef]
private val currentSystem = new ThreadLocal[ActorSystem] private val currentContext = new ThreadLocal[ActorContext]
/** /**
* Returns the reference to the proxy when called inside a method call in a TypedActor * Returns the reference to the proxy when called inside a method call in a TypedActor
@ -281,23 +257,30 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
} }
/** /**
* Returns the akka system (for a TypedActor) when inside a method call in a TypedActor. * Returns the ActorContext (for a TypedActor) when inside a method call in a TypedActor.
*/ */
def system = currentSystem.get match { def context = currentContext.get match {
case null throw new IllegalStateException("Calling TypedActor.system outside of a TypedActor implementation method!") case null throw new IllegalStateException("Calling TypedActor.context outside of a TypedActor implementation method!")
case some some case some some
} }
/** /**
* Returns the default dispatcher (for a TypedActor) when inside a method call in a TypedActor. * Returns the default dispatcher (for a TypedActor) when inside a method call in a TypedActor.
*/ */
implicit def dispatcher = system.dispatcher implicit def dispatcher = context.dispatcher
/** /**
* Implementation of TypedActor as an Actor * Implementation of TypedActor as an Actor
*/ */
private[akka] class TypedActor[R <: AnyRef, T <: R](val proxyVar: AtomVar[R], createInstance: T) extends Actor { private[akka] class TypedActor[R <: AnyRef, T <: R](val proxyVar: AtomVar[R], createInstance: T) extends Actor {
val me = createInstance val me = try {
TypedActor.selfReference set proxyVar.get
TypedActor.currentContext set context
createInstance
} finally {
TypedActor.selfReference set null
TypedActor.currentContext set null
}
override def preStart(): Unit = me match { override def preStart(): Unit = me match {
case l: PreStart l.preStart() case l: PreStart l.preStart()
@ -331,7 +314,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
def receive = { def receive = {
case m: MethodCall case m: MethodCall
TypedActor.selfReference set proxyVar.get TypedActor.selfReference set proxyVar.get
TypedActor.currentSystem set context.system TypedActor.currentContext set context
try { try {
if (m.isOneWay) m(me) if (m.isOneWay) m(me)
else { else {
@ -351,7 +334,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
} }
} finally { } finally {
TypedActor.selfReference set null TypedActor.selfReference set null
TypedActor.currentSystem set null TypedActor.currentContext set null
} }
} }
} }

View file

@ -30,11 +30,13 @@ object Await {
trait Awaitable[+T] { trait Awaitable[+T] {
/** /**
* Should throw java.util.concurrent.TimeoutException if times out * Should throw java.util.concurrent.TimeoutException if times out
* This method should not be called directly.
*/ */
def ready(atMost: Duration)(implicit permit: CanAwait): this.type def ready(atMost: Duration)(implicit permit: CanAwait): this.type
/** /**
* Throws exceptions if cannot produce a T within the specified time * Throws exceptions if cannot produce a T within the specified time
* This method should not be called directly.
*/ */
def result(atMost: Duration)(implicit permit: CanAwait): T def result(atMost: Duration)(implicit permit: CanAwait): T
} }
@ -45,6 +47,9 @@ object Await {
def result[T](awaitable: Awaitable[T], atMost: Duration): T = awaitable.result(atMost) def result[T](awaitable: Awaitable[T], atMost: Duration): T = awaitable.result(atMost)
} }
/**
* Futures is the Java API for Futures and Promises
*/
object Futures { object Futures {
/** /**
@ -57,6 +62,16 @@ object Futures {
*/ */
def promise[T](dispatcher: MessageDispatcher): Promise[T] = Promise[T]()(dispatcher) def promise[T](dispatcher: MessageDispatcher): Promise[T] = Promise[T]()(dispatcher)
/**
* Java API, creates an already completed Promise with the specified exception
*/
def failed[T](exception: Throwable, dispatcher: MessageDispatcher): Promise[T] = Promise.failed(exception)(dispatcher)
/**
* Java API, Creates an already completed Promise with the specified result
*/
def successful[T](result: T, dispatcher: MessageDispatcher): Promise[T] = Promise.successful(result)(dispatcher)
/** /**
* Java API. * Java API.
* Returns a Future that will hold the optional result of the first Future with a result that matches the predicate * Returns a Future that will hold the optional result of the first Future with a result that matches the predicate

View file

@ -3,9 +3,10 @@
*/ */
package akka.serialization package akka.serialization
import akka.actor.{ ExtensionId, ExtensionIdProvider, ActorSystemImpl } import akka.actor.{ ActorSystem, ExtensionId, ExtensionIdProvider, ActorSystemImpl }
object SerializationExtension extends ExtensionId[Serialization] with ExtensionIdProvider { object SerializationExtension extends ExtensionId[Serialization] with ExtensionIdProvider {
override def get(system: ActorSystem): Serialization = super.get(system)
override def lookup = SerializationExtension override def lookup = SerializationExtension
override def createExtension(system: ActorSystemImpl): Serialization = new Serialization(system) override def createExtension(system: ActorSystemImpl): Serialization = new Serialization(system)
} }

View file

@ -20,12 +20,3 @@ Compares:
- Request-reply - Request-reply
- Fire-forget with default dispatcher - Fire-forget with default dispatcher
- Fire-forget with Hawt dispatcher - Fire-forget with Hawt dispatcher
Performance benchmark
---------------------
Benchmarking Akka against:
- Scala Library Actors
- Raw Java concurrency
- Jetlang (Java actors lib) `<http://github.com/jboner/akka-bench>`_

View file

@ -4,10 +4,8 @@ Additional Information
.. toctree:: .. toctree::
:maxdepth: 2 :maxdepth: 2
articles
benchmarks benchmarks
recipies recipies
external-sample-projects
companies-using-akka companies-using-akka
third-party-integrations third-party-integrations
language-bindings language-bindings

View file

@ -4,21 +4,14 @@ Other Language Bindings
JRuby JRuby
----- -----
High level concurrency using Akka actors and JRuby. Read more here: `<https://github.com/iconara/mikka>`_.
`<https://github.com/danielribeiro/RubyOnAkka>`_
If you are using STM with JRuby then you need to unwrap the Multiverse control flow exception as follows:
.. code-block:: ruby
begin
... atomic stuff
rescue NativeException => e
raise e.cause if e.cause.java_class.package.name.include? "org.multiverse"
end
Groovy/Groovy++ Groovy/Groovy++
--------------- ---------------
`<https://gist.github.com/620439>`_ Read more here: `<https://gist.github.com/620439>`_.
Clojure
-------
Read more here: `<http://blog.darevay.com/2011/06/clojure-and-akka-a-match-made-in/>`_.

View file

@ -4,19 +4,14 @@ Third-party Integrations
The Play! Framework The Play! Framework
------------------- -------------------
Dustin Whitney has done an Akka integration module for the `Play! framework <http://www.playframework.org/>`_. Play 2.0 is based upon Akka. Uses all its eventing and threading using Akka actors and futures.
Detailed instructions here: `<http://github.com/dwhitney/akka/blob/master/README.textile>`_. Read more here: `<http://www.playframework.org/2.0>`_.
There are three screencasts: Scalatra
--------
- Using Play! with Akka STM: `<http://vimeo.com/10764693>`_ Scalatra has Akka integration.
- Using Play! with Akka Actors: `<http://vimeo.com/10792173>`_
- Using Play! with Akka Remote Actors: `<http://vimeo.com/10793443>`_
The Pinky REST/MVC Framework Read more here: `<https://github.com/scalatra/scalatra/blob/feature/newakka/akka/src/main/scala/org/scalatra/akka/AkkaSupport.scala>`_
----------------------------
Peter Hausel has done an Akka integration module for the `Pinky framework <http://wiki.github.com/pk11/pinky/>`_.
Read more here: `<http://wiki.github.com/pk11/pinky/release-13>`_

View file

@ -1,12 +1,15 @@
.. _cluster: .. _cluster:
######### ######################
Cluster Cluster Specification
######### ######################
*This document describes the new clustering coming in Akka 2.1* .. sidebar:: Contents
.. contents:: :local:
.. note:: *This document describes the new clustering coming in Akka 2.1 (not 2.0)*
Intro Intro
===== =====

View file

@ -65,7 +65,8 @@ object Pi extends App {
val workers = Vector.fill(nrOfWorkers)(actorOf(Props[Worker]) val workers = Vector.fill(nrOfWorkers)(actorOf(Props[Worker])
// wrap them with a load-balancing router // wrap them with a load-balancing router
val router = Routing.actorOf(RoutedProps().withRoundRobinRouter.withLocalConnections(workers), "pi") val router = Routing.actorOf(
RoutedProps().withRoundRobinRouter.withLocalConnections(workers), "pi")
loadBalancerActor(CyclicIterator(workers)) loadBalancerActor(CyclicIterator(workers))
//#create-workers //#create-workers

View file

@ -0,0 +1,114 @@
.. _actor-systems:
Actor Systems
=============
Actors are objects which encapsulate state and behavior, they communicate
exclusively by exchanging messages which are placed into the recipients
mailbox. In a sense, actors are the most strigent form of object-oriented
programming, but it serves better to view them as persons: while modeling a
solution with actors, envision a group of people and assign sub-tasks to them,
arrange their functions into an organizational structure and think about how to
escalate failure (all with the benefit of not actually dealing with people,
which means that we need not concern ourselves with their emotional state or
moral issues). The result can then serve as a mental scaffolding for building
the software implementation.
Hierarchical Structure
----------------------
Like in an economic organization, actors naturally form hierarchies. One actor,
which is to oversee a certain function in the program might want to split up
its task into smaller, more manageable pieces. For this purpose it starts child
actors which it supervises. While the details of supervision are explained
:ref:`here <supervision>`, we shall concentrate on the underlying concepts in
this section. The only prerequisite is to know that each actor has exactly one
supervisor, which is the actor that created it.
The quintessential feature of actor systems is that tasks are split up and
delegated until they become small enough to be handled in one piece. In doing
so, not only is the task itself clearly structured, but the resulting actors
can be reasoned about in terms of which messages they should process, how they
should react nominally and how failure should be handled. If one actor does not
have the means for dealing with a certain situation, it sends a corresponding
failure message to its supervisor, asking for help. The recursive structure
then allows to handle failure at the right level.
Compare this to layered software design which easily devolves into defensive
programming with the aim of not leaking any failure out: if the problem is
communicated to the right person, a better solution can be found than if
trying to keep everything “under the carpet”.
Now, the difficulty in designing such a system is how to decide who should
supervise what. There is of course no single best solution, but there are a few
guide lines which might be helpful:
- If one actor manages the work another actor is doing, e.g. by passing on
sub-tasks, then the manager should supervise the child. The reason is that
the manager knows which kind of failures are expected and how to handle
them.
- If one actor carries very important data (i.e. its state shall not be lost
if avoidable), this actor should source out any possibly dangerous sub-tasks
to children it supervises and handle failures of these children as
appropriate. Depending on the nature of the requests, it may be best to
create a new child for each request, which simplifies state management for
collecting the replies. This is known as the “Error Kernel Pattern” from
Erlang.
- If one actor depends on another actor for carrying out its duty, it should
watch that other actors liveness and act upon receiving a termination
notice. This is different from supervision, as the watching party has no
influence on the supervision strategy, and it should be noted that a
functional dependency alone is not a criterion for deciding where to place a
certain child actor in the hierarchy.
There are of course always exceptions to these rules, but no matter whether you
follow the rules or break them, you should always have a reason.
Configuration Container
-----------------------
The actor system as a collaborating ensemble of actors is the natural unit for
managing shared facilities like scheduling services, configuration, logging,
etc. Several actor systems with different configuration may co-exist within the
same JVM without problems, there is no global shared state within Akka itself.
Couple this with the transparent communication between actor systems—within one
node or across a network connection—to see that actor systems themselves can be
used as building blocks in a functional hierarchy.
Actor Best Practices
--------------------
#. Actors should be like nice co-workers: do their job efficiently without
bothering everyone else needlessly and avoid hogging resources. Translated
to programming this means to process events and generate responses (or more
requests) in an event-driven manner. Actors should not block (i.e. passively
wait while occupying a Thread) on some external entity, which might be a
lock, a network socket, etc. The blocking operations should be done in some
special-cased thread which sends messages to the actors which shall act on
them.
#. Do not pass mutable objects between actors. In order to ensure that, prefer
immutable messages. If the encapsulation of actors is broken by exposing
their mutable state to the outside, you are back in normal Java concurrency
land with all the drawbacks.
#. Actors are made to be containers for behavior and state, embracing this
means to not routinely send behavior within messages (which may be tempting
using Scala closures). One of the risks is to accidentally share mutable
state between actors, and this violation of the actor model unfortunately
breaks all the properties which make programming in actors such a nice
experience.
What you should not concern yourself with
-----------------------------------------
An actor system manages the resources it is configured to use in order to run
the actors which it contains. There may be millions of actors within one such
system, after all the mantra is to view them as abundant and they weigh in at
an overhead of only roughly 300 bytes per instance. Naturally, the exact order
in which messages are processed in large systems is not controllable by the
application author, but this is also not intended. Take a step back and relax
while Akka does the heavy lifting under the hood.

View file

@ -0,0 +1,142 @@
.. _actors-general:
What is an Actor?
=================
The previous section about :ref:`actor-systems` explained how actors form
hierarchies and are the smallest unit when building an application. This
section looks at one such actor in isolation, explaining the concepts you
encounter while implementing it. For more an in depth reference with all the
details please refer to :ref:`actors-scala` and :ref:`untyped-actors-java`.
An actor is a container for `State`_, `Behavior`_, a `Mailbox`_, `Children`_
and a `Fault Handling Strategy`_. All of this is encapsulated behind an `Actor
Reference`_. Finally, this happens `When and Actor Terminates`_.
Actor Reference
---------------
As detailed below, an actor object needs to be shielded from the outside in
order to benefit from the actor model. Therefore, actors are represented to the
outside using actor references, which are objects that can be passed around
freely and without restriction. This split into inner and outer object enables
transparency for all the desired operations: restarting an actor without
needing to update references elsewhere, placing the actual actor object on
remote hosts, sending messages to actors in completely different applications.
But the most important aspect is that it is not possible to look inside an
actor and get hold of its state from the outside, unless the actor unwisely
publishes this information itself.
State
-----
Actor objects will typically contain some variables which reflect possible
states the actor may be in. This can be an explicit state machine (e.g. using
the :ref:`fsm` module), or it could be a counter, set of listeners, pending
requests, etc. These data are what make an actor valuable, and they must be
protected from corruption by other actors. The good news is that Akka actors
conceptually each have their own light-weight thread, which is completely
shielded from the rest of the system. This means that instead of having to
synchronize access using locks you can just write your actor code without
worrying about concurrency at all.
Behind the scenes Akka will run sets of actors on sets of real threads, where
typically many actors share one thread, and subsequent invocations of one actor
may end up being processed on different threads. Akka ensures that this
implementation detail does not affect the single-threadedness of handling the
actors state.
Because the internal state is vital to an actors operations, having
inconsistent state is fatal. Thus, when the actor fails and is restarted by its
supervisor, the state will be created from scratch, like upon first creating
the actor. This is to enable the ability of self-healing of the system.
Behavior
--------
Every time a message is processed, it is matched against the current behavior
of the actor. Behavior means a function which defines the actions to be taken
in reaction to the message at that point in time, say forward a request if the
client is authorized, deny it otherwise. This behavior may change over time,
e.g. because different clients obtain authorization over time, or because the
actor may go into an “out-of-service” mode and later come back. These changes
are achieved by either encoding them in state variables which are read from the
behavior logic, or the function itself may be swapped out at runtime, see the
``become`` and ``unbecome`` operations. However, the initial behavior defined
during construction of the actor object is special in the sense that a restart
of the actor will reset its behavior to this initial one.
Mailbox
-------
An actors purpose is the processing of messages, and these messages were sent
to the actor from other actors (or from outside the actor system). The piece
which connects sender and receiver is the actors mailbox: each actor has
exactly one mailbox to which all senders enqueue their messages. Enqueuing
happens in the time-order of send operations, which means that messages sent
from different actors may not have a defined order at runtime due to the
apparent randomness of distributing actors across threads. Sending multiple
messages to the same target from the same actor, on the other hand, will
enqueue them in the same order.
There are different mailbox implementations to choose from, the default being a
FIFO: the order of the messages processed by the actor matches the order in
which they were enqueued. This is usually a good default, but applications may
need to prioritize some messages over others. In this case, a priority mailbox
will enqueue not always at the end but at a position as given by the message
priority, which might even be at the front. While using such a queue, the order
of messages processed will naturally be defined by the queues algorithm and in
general not be FIFO.
An important feature in which Akka differs from some other actor model
implementations is that the current behavior must always handle the next
dequeued message, there is no scanning the mailbox for the next matching one.
Failure to handle a message will typically be treated as a failure, unless this
behavior is overridden.
Children
--------
Each actor is potentially a supervisor: if it creates children for delegating
sub-tasks, it will automatically supervise them. The list of children is
maintained within the actors context and the actor has access to it.
Modifications to the list are done by creating (``context.actorOf(...)``) or
stopping (``context.stop(child)``) children and these actions are reflected
immediately. The actual creation and termination actions happen behind the
scenes in an asynchronous way, so they do not “block” their supervisor.
Fault Handling Strategy
-----------------------
The final piece of an actor is its strategy for handling faults of its
children. To keep it simple and robust, this is declared outside of the actors
code and has no access to the actors state. Fault handling is then done
transparently by Akka, applying one of the strategies described in
:ref:`supervision` for each incoming failure. As this strategy is fundamental
to how an actor system is structured, it cannot be changed once an actor has
been created.
Considering that there is only one such strategy for each actor, this means
that if different strategies apply to the various children of an actor, the
children should be grouped beneath intermediate supervisors with matching
strategies, preferring once more the structuring of actor systems according to
the splitting of tasks into sub-tasks.
When an Actor Terminates
------------------------
Once an actor terminates, i.e. fails in a way which is not handled by a
restart, stops itself or is stopped by its supervisor, it will free up its
resources, draining all remaining messages from its mailbox into the systems
“dead letter mailbox”. The mailbox is then replaced within the actor reference
with a that system mailbox, redirecting all new messages “into the drain”. This
is done on a best effort basis, though, so do not rely on it in order to
construct “guaranteed delivery”.
The reason for not just silently dumping the messages was inspired by our
tests: we register the TestEventListener on the event bus to which the dead
letters are forwarded, and that will log a warning for every dead letter
received—this has been very helpful for deciphering test failures more quickly.
It is conceivable that this feature may also be of use for other purposes.

View file

@ -2,9 +2,10 @@ Actor References, Paths and Addresses
===================================== =====================================
This chapter describes how actors are identified and located within a possibly This chapter describes how actors are identified and located within a possibly
distributed actor system. It ties into the central idea that actor systems form distributed actor system. It ties into the central idea that
intrinsic supervision hierarchies as well as that communication between actors :ref:`actor-systems` form intrinsic supervision hierarchies as well as that
is transparent with respect to their placement across multiple network nodes. communication between actors is transparent with respect to their placement
across multiple network nodes.
What is an Actor Reference? What is an Actor Reference?
--------------------------- ---------------------------
@ -84,7 +85,7 @@ actors in the hierarchy from the root up. Examples are::
Here, ``akka`` is the default remote protocol for the 2.0 release, and others Here, ``akka`` is the default remote protocol for the 2.0 release, and others
are pluggable. The interpretation of the host & port part (i.e. are pluggable. The interpretation of the host & port part (i.e.
``serv.example.com:5678`` in the example) depends on the transport mechanism ``serv.example.com:5678`` in the example) depends on the transport mechanism
used, but it should abide by the URI structural rules. used, but it must abide by the URI structural rules.
Logical Actor Paths Logical Actor Paths
^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^
@ -99,7 +100,7 @@ Physical Actor Paths
^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^
While the logical actor path describes the functional location within one actor While the logical actor path describes the functional location within one actor
system, configuration-based transparent remoting means that an actor may be system, configuration-based remote deployment means that an actor may be
created on a different network host as its parent, i.e. within a different created on a different network host as its parent, i.e. within a different
actor system. In this case, following the actor path from the root guardian up actor system. In this case, following the actor path from the root guardian up
entails traversing the network, which is a costly operation. Therefore, each entails traversing the network, which is a costly operation. Therefore, each
@ -144,7 +145,7 @@ Creating Actors
An actor system is typically started by creating actors above the guardian An actor system is typically started by creating actors above the guardian
actor using the :meth:`ActorSystem.actorOf` method and then using actor using the :meth:`ActorSystem.actorOf` method and then using
:meth:`ActorContext.actorOf` from within the created actors to spawn the actor :meth:`ActorContext.actorOf` from within the created actors to spawn the actor
tree. These methods return a reference to the newly created actors. Each actor tree. These methods return a reference to the newly created actor. Each actor
has direct access to references for its parent, itself and its children. These has direct access to references for its parent, itself and its children. These
references may be sent within messages to other actors, enabling those to reply references may be sent within messages to other actors, enabling those to reply
directly. directly.
@ -152,17 +153,17 @@ directly.
Looking up Actors by Concrete Path Looking up Actors by Concrete Path
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
In addition, actor references may be looked up using the In addition, actor references may be looked up using the
:meth:`ActorSystem.actorFor` method, which returns an (unverified) local, :meth:`ActorSystem.actorFor` method, which returns an (unverified) local,
remote or clustered actor reference. Sending messages to such a reference or remote or clustered actor reference. Sending messages to such a reference or
attempting to observe its livelyhood will traverse the actor hierarchy of the attempting to observe its livelyhood will traverse the actor hierarchy of the
actor system from top to bottom by passing messages from parent to child until actor system from top to bottom by passing messages from parent to child until
either the target is reached or failure is certain, i.e. a name in the path either the target is reached or failure is certain, i.e. a name in the path
does not exist (in practice this process will be optimized using caches, but it does not exist (in practice this process will be optimized using caches, but it
still has added cost compared to using the physical actor path, can for example still has added cost compared to using the physical actor path, which can for
to obtained from the sender reference included in replies from that actor). The example to obtained from the sender reference included in replies from that
messages passed are handled automatically by Akka, so this process is not actor). The messages passed are handled automatically by Akka, so this process
visible to client code. is not visible to client code.
Absolute vs. Relative Paths Absolute vs. Relative Paths
``````````````````````````` ```````````````````````````
@ -177,12 +178,20 @@ example send a message to a specific sibling::
context.actorFor("../brother") ! msg context.actorFor("../brother") ! msg
Absolute paths may of course also be looked up on `context` in the usual way, i.e.
.. code-block:: scala
context.actorFor("/user/serviceA") ! msg
will work as expected.
Querying the Logical Actor Hierarchy Querying the Logical Actor Hierarchy
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Since the actor system forms a file-system like hierarchy, matching on paths is Since the actor system forms a file-system like hierarchy, matching on paths is
possible in the same was as supported by Unix shells: you may replace (parts possible in the same was as supported by Unix shells: you may replace (parts
of) path element names with wildcards (`"*"` and `"?"`) to formulate a of) path element names with wildcards (`«*»` and `«?»`) to formulate a
selection which may match zero or more actual actors. Because the result is not selection which may match zero or more actual actors. Because the result is not
a single actor reference, it has a different type :class:`ActorSelection` and a single actor reference, it has a different type :class:`ActorSelection` and
does not support the full set of operations an :class:`ActorRef` does. does not support the full set of operations an :class:`ActorRef` does.
@ -261,8 +270,13 @@ other actors are found. The next level consists of the following:
those which are used in the implementation of :meth:`ActorRef.ask`. those which are used in the implementation of :meth:`ActorRef.ask`.
- ``"/remote"`` is an artificial path below which all actors reside whose - ``"/remote"`` is an artificial path below which all actors reside whose
supervisors are remote actor references supervisors are remote actor references
Future extensions:
- ``"/service"`` is an artificial path below which actors can be presented by - ``"/service"`` is an artificial path below which actors can be presented by
means of configuration, i.e. deployed at system start-up or just-in-time means of configuration, i.e. deployed at system start-up or just-in-time
(triggered by look-up) or “mounting” other actors by path—local or remote—to (triggered by look-up)
give them logical names. - ``"/alias"`` is an artificial path below which other actors may be “mounted”
(as in the Unix file-system sense) by path—local or remote—to give them
logical names.

View file

@ -6,7 +6,6 @@ import org.scalatest.matchers.MustMatchers
//#imports //#imports
import akka.actor.ActorSystem import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
//#imports //#imports
class ConfigDocSpec extends WordSpec with MustMatchers { class ConfigDocSpec extends WordSpec with MustMatchers {
@ -15,7 +14,7 @@ class ConfigDocSpec extends WordSpec with MustMatchers {
//#custom-config //#custom-config
val customConf = ConfigFactory.parseString(""" val customConf = ConfigFactory.parseString("""
akka.actor.deployment { akka.actor.deployment {
/user/my-service { /my-service {
router = round-robin router = round-robin
nr-of-instances = 3 nr-of-instances = 3
} }
@ -27,7 +26,5 @@ class ConfigDocSpec extends WordSpec with MustMatchers {
//#custom-config //#custom-config
system.shutdown() system.shutdown()
} }
} }

View file

@ -57,46 +57,57 @@ Defining the configuration file
Each Akka module has a reference configuration file with the default values. Each Akka module has a reference configuration file with the default values.
*akka-actor:* akka-actor
~~~~~~~~~~
.. literalinclude:: ../../akka-actor/src/main/resources/reference.conf .. literalinclude:: ../../akka-actor/src/main/resources/reference.conf
:language: none :language: none
*akka-remote:* akka-remote
~~~~~~~~~~~
.. literalinclude:: ../../akka-remote/src/main/resources/reference.conf .. literalinclude:: ../../akka-remote/src/main/resources/reference.conf
:language: none :language: none
*akka-testkit:* akka-testkit
~~~~~~~~~~~~
.. literalinclude:: ../../akka-testkit/src/main/resources/reference.conf .. literalinclude:: ../../akka-testkit/src/main/resources/reference.conf
:language: none :language: none
*akka-beanstalk-mailbox:* akka-beanstalk-mailbox
~~~~~~~~~~~~~~~~~~~~~~
.. literalinclude:: ../../akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/resources/reference.conf .. literalinclude:: ../../akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/resources/reference.conf
:language: none :language: none
*akka-file-mailbox:* akka-file-mailbox
~~~~~~~~~~~~~~~~~
.. literalinclude:: ../../akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf .. literalinclude:: ../../akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf
:language: none :language: none
*akka-mongo-mailbox:* akka-mongo-mailbox
~~~~~~~~~~~~~~~~~~
.. literalinclude:: ../../akka-durable-mailboxes/akka-mongo-mailbox/src/main/resources/reference.conf .. literalinclude:: ../../akka-durable-mailboxes/akka-mongo-mailbox/src/main/resources/reference.conf
:language: none :language: none
*akka-redis-mailbox:* akka-redis-mailbox
~~~~~~~~~~~~~~~~~~
.. literalinclude:: ../../akka-durable-mailboxes/akka-redis-mailbox/src/main/resources/reference.conf .. literalinclude:: ../../akka-durable-mailboxes/akka-redis-mailbox/src/main/resources/reference.conf
:language: none :language: none
*akka-zookeeper-mailbox:* akka-zookeeper-mailbox
~~~~~~~~~~~~~~~~~~~~~~
.. literalinclude:: ../../akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/resources/reference.conf .. literalinclude:: ../../akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/resources/reference.conf
:language: none :language: none
Custom application.conf
-----------------------
A custom ``application.conf`` might look like this:: A custom ``application.conf`` might look like this::
# In this file you can override any option defined in the reference files. # In this file you can override any option defined in the reference files.

View file

@ -4,8 +4,11 @@ General
.. toctree:: .. toctree::
:maxdepth: 2 :maxdepth: 2
actor-systems
actors
supervision
addressing
remoting
jmm jmm
message-send-semantics message-send-semantics
configuration configuration
addressing
supervision

View file

@ -0,0 +1,64 @@
.. _remoting:
Location Transparency
=====================
The previous section describes how actor paths are used to enable location
transparency. This special feature deserves some extra explanation, because the
related term “transparent remoting” was used quite differently in the context
of programming languages, platforms and technologies.
Distributed by Default
----------------------
Everything in Akka is designed to work in a distributed setting: all
interactions of actors use purely message passing and everything is
asynchronous. This effort has been undertaken to ensure that all functions are
available equally when running within a single JVM or on a cluster of hundreds
of machines. The key for enabling this is to go from remote to local by way of
optimization instead of trying to go from local to remote by way of
generalization. See `this classic paper
<http://labs.oracle.com/techrep/1994/abstract-29.html>`_ for a detailed
discussion on why the second approach is bound to fail.
Ways in which Transparency is Broken
------------------------------------
What is true of Akka need not be true of the application which uses it, since
designing for distributed execution poses some restrictions on what is
possible. The most obvious one is that all messages sent over the wire must be
serializable. While being a little less obvious this includes closures which
are used as actor factories (i.e. within :class:`Props`) if the actor is to be
created on a remote node.
Another consequence is that everything needs to be aware of all interactions
being fully asynchronous, which in a computer network might mean that it may
take several minutes for a message to reach its recipient (depending on
configuration). It also means that the probability for a message to be lost is
much higher than within one JVM, where it is close to zero (still: no hard
guarantee!).
How is Remoting Used?
---------------------
We took the idea of transparency to the limit in that there is no API for the
remoting layer of Akka: it is purely driven by configuration. Just write your
application according to the principles outlined in the previous sections, then
specify remote deployment of actor sub-trees in the configuration file. This
way, your application can be scaled out without having to touch the code.
Marking Points for Scaling Up with Routers
------------------------------------------
In addition to being able to run different parts of an actor system on
different nodes of a cluster, it is also possible to scale up onto more cores
by multiplying actor sub-trees which support parallelization (think for example
a search engine processing different queries in parallel). The clones can then
be routed to in different fashions, e.g. round-robin. The only thing necessary
to achieve this is that the developer needs to declare a certain actor as
“withRouter”, the in its stead a router actor will be created which will spawn
up a configurable number of children of the desired type and route to them in
the configured fashion. Once such a router has been declared, its configuration
can be freely overridden from the configuration file, including mixing it with
the remote deployment of (some of) the children. Read more about
this in :ref:`routing-scala` and :ref:`routing-java`.

View file

@ -1,5 +1,7 @@
Supervision .. _supervision:
===========
Supervision and Monitoring
==========================
This chapter outlines the concept behind supervision, the primitives offered This chapter outlines the concept behind supervision, the primitives offered
and their semantics. For details on how that translates into real code, please and their semantics. For details on how that translates into real code, please
@ -8,12 +10,13 @@ refer to the corresponding chapters for Scala and Java APIs.
What Supervision Means What Supervision Means
---------------------- ----------------------
Supervision describes a dependency relationship between actors: the supervisor As described in :ref:`actor-systems` supervision describes a dependency
delegates tasks to subordinates and therefore must respond to their failures. relationship between actors: the supervisor delegates tasks to subordinates and
When a subordinate detects a failure (i.e. throws an exception), it suspends therefore must respond to their failures. When a subordinate detects a failure
itself and all its subordinates and sends a message to its supervisor, (i.e. throws an exception), it suspends itself and all its subordinates and
signaling failure. Depending on the nature of the work to be supervised and sends a message to its supervisor, signaling failure. Depending on the nature
the nature of the failure, the supervisor has four basic choices: of the work to be supervised and the nature of the failure, the supervisor has
four basic choices:
#. Resume the subordinate, keeping its accumulated internal state #. Resume the subordinate, keeping its accumulated internal state
#. Restart the subordinate, clearing out its accumulated internal state #. Restart the subordinate, clearing out its accumulated internal state
@ -27,7 +30,8 @@ three: resuming an actor resumes all its subordinates, restarting an actor
entails restarting all its subordinates, similarly stopping an actor will also entails restarting all its subordinates, similarly stopping an actor will also
stop all its subordinates. It should be noted that the default behavior of an stop all its subordinates. It should be noted that the default behavior of an
actor is to stop all its children before restarting, but this can be overridden actor is to stop all its children before restarting, but this can be overridden
using the :meth:`preRestart` hook. using the :meth:`preRestart` hook; the recursive restart applies to all
children left after this hook has been executed.
Each supervisor is configured with a function translating all possible failure Each supervisor is configured with a function translating all possible failure
causes (i.e. exceptions) into one of the four choices given above; notably, causes (i.e. exceptions) into one of the four choices given above; notably,
@ -46,7 +50,7 @@ makes the formation of actor supervision hierarchies explicit and encourages
sound design decisions. It should be noted that this also guarantees that sound design decisions. It should be noted that this also guarantees that
actors cannot be orphaned or attached to supervisors from the outside, which actors cannot be orphaned or attached to supervisors from the outside, which
might otherwise catch them unawares. In addition, this yields a natural and might otherwise catch them unawares. In addition, this yields a natural and
clean shutdown procedure for (parts of) actor applications. clean shutdown procedure for (sub-trees of) actor applications.
What Restarting Means What Restarting Means
--------------------- ---------------------
@ -90,12 +94,11 @@ it may react to the other actors termination, in contrast to supervision whic
reacts to failure. reacts to failure.
Lifecycle monitoring is implemented using a :class:`Terminated` message to be Lifecycle monitoring is implemented using a :class:`Terminated` message to be
received by the behavior of the monitoring actor, where the default behavior is received by the monitoring actor, where the default behavior is to throw a
to throw a special :class:`DeathPactException` if not otherwise handled. One special :class:`DeathPactException` if not otherwise handled. One important
important property is that the message will be delivered irrespective of the property is that the message will be delivered irrespective of the order in
order in which the monitoring request and targets termination occur, i.e. you which the monitoring request and targets termination occur, i.e. you still get
still get the message even if at the time of registration the target is already the message even if at the time of registration the target is already dead.
dead.
Monitoring is particularly useful if a supervisor cannot simply restart its Monitoring is particularly useful if a supervisor cannot simply restart its
children and has to stop them, e.g. in case of errors during actor children and has to stop them, e.g. in case of errors during actor
@ -104,6 +107,6 @@ them or schedule itself to retry this at a later time.
Another common use case is that an actor needs to fail in the absence of an Another common use case is that an actor needs to fail in the absence of an
external resource, which may also be one of its own children. If a third party external resource, which may also be one of its own children. If a third party
terminates a child by way of the ``stop()`` method or sending a terminates a child by way of the ``system.stop(child)`` method or sending a
:class:`PoisonPill`, the supervisor might well be affected. :class:`PoisonPill`, the supervisor might well be affected.

View file

@ -117,10 +117,9 @@ modules are:
- ``akka-kernel`` -- Akka microkernel for running a bare-bones mini application server - ``akka-kernel`` -- Akka microkernel for running a bare-bones mini application server
- ``akka-durable-mailboxes`` -- Durable mailboxes: file-based, MongoDB, Redis, Zookeeper - ``akka-durable-mailboxes`` -- Durable mailboxes: file-based, MongoDB, Redis, Beanstalk and Zookeeper
- ``akka-amqp`` -- AMQP integration
.. - ``akka-amqp`` -- AMQP integration
.. - ``akka-stm-2.0-SNAPSHOT.jar`` -- STM (Software Transactional Memory), transactors and transactional datastructures .. - ``akka-stm-2.0-SNAPSHOT.jar`` -- STM (Software Transactional Memory), transactors and transactional datastructures
.. - ``akka-camel-2.0-SNAPSHOT.jar`` -- Apache Camel Actors integration (it's the best way to have your Akka application communicate with the rest of the world) .. - ``akka-camel-2.0-SNAPSHOT.jar`` -- Apache Camel Actors integration (it's the best way to have your Akka application communicate with the rest of the world)
.. - ``akka-camel-typed-2.0-SNAPSHOT.jar`` -- Apache Camel Typed Actors integration .. - ``akka-camel-typed-2.0-SNAPSHOT.jar`` -- Apache Camel Typed Actors integration

View file

@ -141,8 +141,7 @@ modules are:
- ``akka-durable-mailboxes`` -- Durable mailboxes: file-based, MongoDB, Redis, Zookeeper - ``akka-durable-mailboxes`` -- Durable mailboxes: file-based, MongoDB, Redis, Zookeeper
- ``akka-amqp`` -- AMQP integration .. - ``akka-amqp`` -- AMQP integration
.. - ``akka-stm-2.0-SNAPSHOT.jar`` -- STM (Software Transactional Memory), transactors and transactional datastructures .. - ``akka-stm-2.0-SNAPSHOT.jar`` -- STM (Software Transactional Memory), transactors and transactional datastructures
.. - ``akka-camel-2.0-SNAPSHOT.jar`` -- Apache Camel Actors integration (it's the best way to have your Akka application communicate with the rest of the world) .. - ``akka-camel-2.0-SNAPSHOT.jar`` -- Apache Camel Actors integration (it's the best way to have your Akka application communicate with the rest of the world)
.. - ``akka-camel-typed-2.0-SNAPSHOT.jar`` -- Apache Camel Typed Actors integration .. - ``akka-camel-typed-2.0-SNAPSHOT.jar`` -- Apache Camel Typed Actors integration

View file

@ -44,16 +44,12 @@ Modules
Akka is very modular and has many JARs for containing different features. Akka is very modular and has many JARs for containing different features.
- ``akka-actor-2.0-SNAPSHOT.jar`` -- Standard Actors - ``akka-actor-2.0-SNAPSHOT.jar`` -- Standard Actors, Typed Actors and much more
- ``akka-typed-actor-2.0-SNAPSHOT.jar`` -- Typed Actors
- ``akka-remote-2.0-SNAPSHOT.jar`` -- Remote Actors - ``akka-remote-2.0-SNAPSHOT.jar`` -- Remote Actors
- ``akka-stm-2.0-SNAPSHOT.jar`` -- STM (Software Transactional Memory), transactors and transactional datastructures
- ``akka-slf4j-2.0-SNAPSHOT.jar`` -- SLF4J Event Handler Listener - ``akka-slf4j-2.0-SNAPSHOT.jar`` -- SLF4J Event Handler Listener
- ``akka-testkit-2.0-SNAPSHOT.jar`` -- Toolkit for testing Actors - ``akka-testkit-2.0-SNAPSHOT.jar`` -- Toolkit for testing Actors
- ``akka-camel-2.0-SNAPSHOT.jar`` -- Apache Camel Actors integration (it's the best way to have your Akka application communicate with the rest of the world)
- ``akka-camel-typed-2.0-SNAPSHOT.jar`` -- Apache Camel Typed Actors integration
- ``akka-spring-2.0-SNAPSHOT.jar`` -- Spring framework integration
- ``akka-kernel-2.0-SNAPSHOT.jar`` -- Akka microkernel for running a bare-bones mini application server - ``akka-kernel-2.0-SNAPSHOT.jar`` -- Akka microkernel for running a bare-bones mini application server
- ``akka-<storage-system>-mailbox-2.0-SNAPSHOT.jar`` -- Akka durable mailboxes
How to see the JARs dependencies of each Akka module is described in the How to see the JARs dependencies of each Akka module is described in the
:ref:`dependencies` section. Worth noting is that ``akka-actor`` has zero :ref:`dependencies` section. Worth noting is that ``akka-actor`` has zero

View file

@ -1,49 +1,59 @@
Examples of use-cases for Akka
============================== .. _use-cases:
################################
Examples of use-cases for Akka
################################
We see Akka being adopted by many large organizations in a big range of industries
all from investment and merchant banking, retail and social media, simulation,
gaming and betting, automobile and traffic systems, health care, data analytics
and much more. Any system that have the need for high-throughput and low latency
is a good candidate for using Akka.
There is a great discussion on use-cases for Akka with some good write-ups by production There is a great discussion on use-cases for Akka with some good write-ups by production
users `here <http://stackoverflow.com/questions/4493001/good-use-case-for-akka/4494512#4494512>`_ users `here <http://stackoverflow.com/questions/4493001/good-use-case-for-akka/4494512#4494512>`_
Here are some of the areas where Akka is being deployed into production Here are some of the areas where Akka is being deployed into production
----------------------------------------------------------------------- =======================================================================
**Transaction processing (Online Gaming, Finance/Banking, Trading, Statistics, Betting, Social Media, Telecom)** Transaction processing (Online Gaming, Finance/Banking, Trading, Statistics, Betting, Social Media, Telecom)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ------------------------------------------------------------------------------------------------------------
Scale up, scale out, fault-tolerance / HA Scale up, scale out, fault-tolerance / HA
**Service backend (any industry, any app)** Service backend (any industry, any app)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ---------------------------------------
Service REST, SOAP, Cometd, WebSockets etc Service REST, SOAP, Cometd, WebSockets etc
Act as message hub / integration layer Act as message hub / integration layer
Scale up, scale out, fault-tolerance / HA Scale up, scale out, fault-tolerance / HA
**Concurrency/parallelism (any app)** Concurrency/parallelism (any app)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ---------------------------------
Correct Correct
Simple to work with and understand Simple to work with and understand
Just add the jars to your existing JVM project (use Scala, Java, Groovy or JRuby) Just add the jars to your existing JVM project (use Scala, Java, Groovy or JRuby)
**Simulation** Simulation
^^^^^^^^^^^^^^ ----------
Master/Worker, Compute Grid, MapReduce etc. Master/Worker, Compute Grid, MapReduce etc.
**Batch processing (any industry)** Batch processing (any industry)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -------------------------------
Camel integration to hook up with batch data sources Camel integration to hook up with batch data sources
Actors divide and conquer the batch workloads Actors divide and conquer the batch workloads
**Communications Hub (Telecom, Web media, Mobile media)** Communications Hub (Telecom, Web media, Mobile media)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -----------------------------------------------------
Scale up, scale out, fault-tolerance / HA Scale up, scale out, fault-tolerance / HA
**Gaming and Betting (MOM, online gaming, betting)** Gaming and Betting (MOM, online gaming, betting)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ------------------------------------------------
Scale up, scale out, fault-tolerance / HA Scale up, scale out, fault-tolerance / HA
**Business Intelligence/Data Mining/general purpose crunching** Business Intelligence/Data Mining/general purpose crunching
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -----------------------------------------------------------
Scale up, scale out, fault-tolerance / HA Scale up, scale out, fault-tolerance / HA
**Complex Event Stream Processing** Complex Event Stream Processing
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -------------------------------
Scale up, scale out, fault-tolerance / HA Scale up, scale out, fault-tolerance / HA

View file

@ -5,21 +5,23 @@
What is Akka? What is Akka?
############### ###############
.. sidebar:: Contents
**Simpler Scalability, Fault-Tolerance, Concurrency & Remoting through Actors** .. contents:: :local:
**Scalable real-time transaction processing**
We believe that writing correct concurrent, fault-tolerant and scalable We believe that writing correct concurrent, fault-tolerant and scalable
applications is too hard. Most of the time it's because we are using the wrong applications is too hard. Most of the time it's because we are using the wrong
tools and the wrong level of abstraction. Akka is here to change that. Using the tools and the wrong level of abstraction. Akka is here to change that. Using the
Actor Model together with Software Transactional Memory we raise the abstraction Actor Model we raise the abstraction level and provide a better platform to build
level and provide a better platform to build correct concurrent and scalable correct concurrent and scalable applications. For fault-tolerance we adopt the
applications. For fault-tolerance we adopt the Let it crash/Embrace failure "Let it crash" model which have been used with great success in the telecom industry to build
model which have been used with great success in the telecom industry to build
applications that self-heals, systems that never stop. Actors also provides the applications that self-heals, systems that never stop. Actors also provides the
abstraction for transparent distribution and the basis for truly scalable and abstraction for transparent distribution and the basis for truly scalable and
fault-tolerant applications. Akka is Open Source and available under the Apache fault-tolerant applications.
2 License.
Akka is Open Source and available under the Apache 2 License.
Download from http://akka.io/downloads/ Download from http://akka.io/downloads/
@ -43,18 +45,11 @@ Fault Tolerance
Fault tolerance through supervisor hierarchies with "let-it-crash" Fault tolerance through supervisor hierarchies with "let-it-crash"
semantics. Excellent for writing highly fault-tolerant systems that never stop, semantics. Excellent for writing highly fault-tolerant systems that never stop,
systems that self-heal. systems that self-heal. Supervisor hierarchies can span over multiple JVMs to
provide truly fault-tolerant systems.
See :ref:`fault-tolerance-scala` and :ref:`fault-tolerance-java` See :ref:`fault-tolerance-scala` and :ref:`fault-tolerance-java`
Transactors
-----------
Transactors combine actors and STM (Software Transactional Memory) into transactional actors.
It allows you to compose atomic message flows with automatic retry and rollback.
See :ref:`transactors-scala` and :ref:`transactors-java`
Remote Actors Remote Actors
------------- -------------
@ -63,6 +58,14 @@ management.
See :ref:`remote-actors-scala` and :ref:`remote-actors-java`. See :ref:`remote-actors-scala` and :ref:`remote-actors-java`.
Transactors
-----------
Transactors combine actors and STM (Software Transactional Memory) into transactional actors.
It allows you to compose atomic message flows with automatic retry and rollback.
See :ref:`transactors-scala` and :ref:`transactors-java`
Scala and Java APIs Scala and Java APIs
=================== ===================
@ -73,9 +76,37 @@ Akka has both a :ref:`scala-api` and a :ref:`java-api`.
Akka can be used in two different ways Akka can be used in two different ways
====================================== ======================================
- As a library: used by a web app, to be put into WEB-INF/lib or as a regular - As a library: used by a web app, to be put into ``WEB-INF/lib`` or as a regular
JAR on your classpath. JAR on your classpath.
- As a microkernel: stand-alone kernel to drop your application into. - As a microkernel: stand-alone kernel to drop your application into.
See the :ref:`deployment-scenarios` for details. See the :ref:`deployment-scenarios` for details.
Typesafe Stack
==============
Akka is now also part of the `Typesafe Stack <http://typesafe.com/stack>`_.
The Typesafe Stack is a modern software platform that makes it easy for developers
to build scalable software applications. It combines the Scala programming language,
Akka, the Play! web framework and robust developer tools in a simple package that
integrates seamlessly with existing Java infrastructure.
The Typesafe Stack is all fully open source.
Typesafe Console
================
On top of the Typesafe Stack we have also have commercial product called Typesafe
Console which provides the following features:
#. Management through Dashboard, JMX and REST
#. Dapper-style tracing of messages across components and remote nodes
#. Real-time statistics
#. Very low overhead monitoring agents (should always be on in production)
#. Consolidation of statistics and logging information to a single node
#. Storage of statistics data for later processing
#. Provisioning and rolling upgrades
Read more `here <http://typesafe.com/products/typesafe-subscription>`_.

View file

@ -4,6 +4,8 @@ Why Akka?
What features can the Akka platform offer, over the competition? What features can the Akka platform offer, over the competition?
---------------------------------------------------------------- ----------------------------------------------------------------
Akka provides scalable real-time transaction processing.
Akka is an unified runtime and programming model for: Akka is an unified runtime and programming model for:
- Scale up (Concurrency) - Scale up (Concurrency)
@ -25,39 +27,21 @@ even if you're only running it on one machine. Akka also supplies a wide array
of concurrency-paradigms, allowing for users to choose the right tool for the of concurrency-paradigms, allowing for users to choose the right tool for the
job. job.
The integration possibilities for Akka Actors are immense through the Apache
Camel integration. We have Transactors for coordinated concurrent transactions,
as well as Agents and Dataflow concurrency.
What's a good use-case for Akka? What's a good use-case for Akka?
-------------------------------- --------------------------------
(Web, Cloud, Application) Services - Actors lets you manage service failures We see Akka being adopted by many large organizations in a big range of industries
(Supervisors), load management (back-off strategies, timeouts and all from investment and merchant banking, retail and social media, simulation,
processing-isolation), both horizontal and vertical scalability (add more cores gaming and betting, automobile and traffic systems, health care, data analytics
and/or add more machines). Think payment processing, invoicing, order matching, and much more. Any system that have the need for high-throughput and low latency
datacrunching, messaging. Really any highly transactional systems like banking, is a good candidate for using Akka.
betting, games.
Actors lets you manage service failures (Supervisors), load management (back-off
strategies, timeouts and processing-isolation), both horizontal and vertical
scalability (add more cores and/or add more machines).
Here's what some of the Akka users have to say about how they are using Akka: Here's what some of the Akka users have to say about how they are using Akka:
http://stackoverflow.com/questions/4493001/good-use-case-for-akka http://stackoverflow.com/questions/4493001/good-use-case-for-akka
All this in the ApacheV2-licensed open source project.
Akka Atmos
----------
And that's all in the ApacheV2-licensed open source project. On top of that we
have a commercial product called Akka Atmos which provides the following
features:
#. Management through Dashboard, JMX and REST
#. Dapper-style tracing of messages across components and remote nodes
#. A configurable alert system
#. Real-time statistics
#. Very low overhead monitoring agents (should always be on in production)
#. Consolidation of statistics and logging information to a single node
#. Storage of statistics data for later processing
#. Provisioning and rolling upgrades
Read more `here <http://typesafe.com/products/typesafe-subscription>`_.

View file

@ -0,0 +1,5 @@
package akka.docs.actor
import org.scalatest.junit.JUnitSuite
class TypedActorDocTest extends TypedActorDocTestBase with JUnitSuite

View file

@ -0,0 +1,150 @@
package akka.docs.actor;
//#imports
import akka.dispatch.*;
import akka.actor.*;
import akka.japi.*;
import akka.util.Duration;
import java.util.concurrent.TimeUnit;
//#imports
import java.lang.Exception;
import org.junit.Test;
import static org.junit.Assert.*;
public class TypedActorDocTestBase {
Object someReference = null;
ActorSystem system = null;
//#typed-actor-iface
public static interface Squarer {
//#typed-actor-iface-methods
void squareDontCare(int i); //fire-forget
Future<Integer> square(int i); //non-blocking send-request-reply
Option<Integer> squareNowPlease(int i);//blocking send-request-reply
int squareNow(int i); //blocking send-request-reply
//#typed-actor-iface-methods
}
//#typed-actor-iface
//#typed-actor-impl
static class SquarerImpl implements Squarer {
private String name;
public SquarerImpl() {
this.name = "default";
}
public SquarerImpl(String name) {
this.name = name;
}
//#typed-actor-impl-methods
public void squareDontCare(int i) {
int sq = i * i; //Nobody cares :(
}
public Future<Integer> square(int i) {
return Futures.successful(i * i, TypedActor.dispatcher());
}
public Option<Integer> squareNowPlease(int i) {
return Option.some(i * i);
}
public int squareNow(int i) {
return i * i;
}
//#typed-actor-impl-methods
}
//#typed-actor-impl
@Test public void mustGetTheTypedActorExtension() {
try {
//#typed-actor-extension-tools
//Returns the Typed Actor Extension
TypedActorExtension extension =
TypedActor.get(system); //system is an instance of ActorSystem
//Returns whether the reference is a Typed Actor Proxy or not
TypedActor.get(system).isTypedActor(someReference);
//Returns the backing Akka Actor behind an external Typed Actor Proxy
TypedActor.get(system).getActorRefFor(someReference);
//Returns the current ActorContext,
// method only valid within methods of a TypedActor implementation
ActorContext context = TypedActor.context();
//Returns the external proxy of the current Typed Actor,
// method only valid within methods of a TypedActor implementation
Squarer sq = TypedActor.<Squarer>self();
//Returns a contextual instance of the Typed Actor Extension
//this means that if you create other Typed Actors with this,
//they will become children to the current Typed Actor.
TypedActor.get(TypedActor.context());
//#typed-actor-extension-tools
} catch (Exception e) {
//dun care
}
}
@Test public void createATypedActor() {
try {
//#typed-actor-create1
Squarer mySquarer =
TypedActor.get(system).typedActorOf(Squarer.class, SquarerImpl.class, new Props());
//#typed-actor-create1
//#typed-actor-create2
Squarer otherSquarer =
TypedActor.get(system).typedActorOf(Squarer.class,
new Creator<SquarerImpl>() {
public SquarerImpl create() { return new SquarerImpl("foo"); }
},
new Props(),
"name");
//#typed-actor-create2
//#typed-actor-calls
//#typed-actor-call-oneway
mySquarer.squareDontCare(10);
//#typed-actor-call-oneway
//#typed-actor-call-future
Future<Integer> fSquare = mySquarer.square(10); //A Future[Int]
//#typed-actor-call-future
//#typed-actor-call-option
Option<Integer> oSquare = mySquarer.squareNowPlease(10); //Option[Int]
//#typed-actor-call-option
//#typed-actor-call-strict
int iSquare = mySquarer.squareNow(10); //Int
//#typed-actor-call-strict
//#typed-actor-calls
assertEquals(100, Await.result(fSquare, Duration.create(3, TimeUnit.SECONDS)).intValue());
assertEquals(100, oSquare.get().intValue());
assertEquals(100, iSquare);
//#typed-actor-stop
TypedActor.get(system).stop(mySquarer);
//#typed-actor-stop
//#typed-actor-poisonpill
TypedActor.get(system).poisonPill(otherSquarer);
//#typed-actor-poisonpill
} catch(Exception e) {
//Ignore
}
}
}

View file

@ -1,50 +0,0 @@
Guice Integration
=================
All Typed Actors support dependency injection using `Guice <http://code.google.com/p/google-guice/>`_ annotations (such as @Inject etc.).
The TypedActorManager class understands Guice and will do the wiring for you.
External Guice modules
----------------------
You can also plug in external Guice modules and have not-actors wired up as part of the configuration.
Here is an example:
.. code-block:: java
import static akka.config.Supervision.*;
import static akka.config.SupervisorConfig.*;
TypedActorConfigurator manager = new TypedActorConfigurator();
manager.configure(
new AllForOneStrategy(new Class[]{Exception.class}, 3, 1000),
new SuperviseTypedActor[] {
new SuperviseTypedActor(
Foo.class,
FooImpl.class,
temporary(),
1000),
new SuperviseTypedActor(
Bar.class,
BarImpl.class,
permanent(),
1000)
})
.addExternalGuiceModule(new AbstractModule() {
protected void configure() {
bind(Ext.class).to(ExtImpl.class).in(Scopes.SINGLETON);
}})
.configure()
.inject()
.supervise();
Retrieve the external Guice dependency
--------------------------------------
The external dependency can be retrieved like this:
.. code-block:: java
Ext ext = manager.getExternalDependency(Ext.class);

View file

@ -12,9 +12,7 @@ Java API
scheduler scheduler
futures futures
dataflow dataflow
transactors
fault-tolerance fault-tolerance
dispatchers dispatchers
routing routing
guice-integration
extending-akka extending-akka

View file

@ -1,6 +1,3 @@
.. _routing-java:
Routing (Java) Routing (Java)
============== ==============

View file

@ -1,199 +1,166 @@
Typed Actors (Java) Typed Actors (Scala)
=================== ====================
.. sidebar:: Contents .. sidebar:: Contents
.. contents:: :local: .. contents:: :local:
Module stability: **SOLID** Akka Typed Actors is an implementation of the `Active Objects <http://en.wikipedia.org/wiki/Active_object>`_ pattern.
Essentially turning method invocations into asynchronous dispatch instead of synchronous that has been the default way since Smalltalk came out.
The Typed Actors are implemented through `Typed Actors <http://en.wikipedia.org/wiki/Active_object>`_. It uses AOP through `AspectWerkz <http://aspectwerkz.codehaus.org/>`_ to turn regular POJOs into asynchronous non-blocking Actors with semantics of the Actor Model. Each method dispatch is turned into a message that is put on a queue to be processed by the Typed Actor sequentially one by one. Typed Actors consist of 2 "parts", a public interface and an implementation, and if you've done any work in "enterprise" Java, this will be very familiar to you. As with normal Actors you have an external API (the public interface instance) that will delegate methodcalls asynchronously to
a private instance of the implementation.
If you are using the `Spring Framework <http://springsource.org>`_ then take a look at Akka's `Spring integration <spring-integration>`_. The advantage of Typed Actors vs. Actors is that with TypedActors you have a static contract, and don't need to define your own messages, the downside is that it places some limitations on what you can do and what you can't, i.e. you can't use become/unbecome.
** WARNING: ** Do not configure to use a WorkStealingDispatcher with your TypedActors, it just isn't safe with how TypedActors currently are implemented. This limitation will most likely be removed in the future. Typed Actors are implemented using `JDK Proxies <http://docs.oracle.com/javase/6/docs/api/java/lang/reflect/Proxy.html>`_ which provide a pretty easy-worked API to intercept method calls.
.. note::
Just as with regular Akka Untyped Actors, Typed Actors process one call at a time.
The tools of the trade
----------------------
Before we create our first Typed Actor we should first go through the tools that we have at our disposal,
it's located in ``akka.actor.TypedActor``.
.. includecode:: code/akka/docs/actor/TypedActorDocTestBase.java
:include: typed-actor-extension-tools
.. warning::
Same as not exposing ``this`` of an Akka Actor, it's important not to expose ``this`` of a Typed Actor,
instead you should pass the external proxy reference, which is obtained from within your Typed Actor as
``TypedActor.self()``, this is your external identity, as the ``ActorRef`` is the external identity of
an Akka Actor.
Creating Typed Actors Creating Typed Actors
--------------------- ---------------------
**IMPORTANT:** The Typed Actors class must have access modifier 'public' and can't be a non-static inner class. To create a Typed Actor you need to have one or more interfaces, and one implementation.
Akka turns POJOs with interface and implementation into asynchronous (Typed) Actors. Akka is using `AspectWerkzs Proxy <http://blogs.codehaus.org/people/jboner/archives/000914_awproxy_proxy_on_steriods.html>`_ implementation, which is the `most performant <http://docs.codehaus.org/display/AW/AOP+Benchmark>`_ proxy implementation there exists. Our example interface:
In order to create a Typed Actor you have to subclass the TypedActor base class. .. includecode:: code/akka/docs/actor/TypedActorDocTestBase.java
:include: imports,typed-actor-iface
:exclude: typed-actor-iface-methods
Here is an example. Our example implementation of that interface:
If you have a POJO with an interface implementation separation like this: .. includecode:: code/akka/docs/actor/TypedActorDocTestBase.java
:include: imports,typed-actor-impl
:exclude: typed-actor-impl-methods
.. code-block:: java The most trivial way of creating a Typed Actor instance
of our ``Squarer``:
interface RegistrationService { .. includecode:: code/akka/docs/actor/TypedActorDocTestBase.java
void register(User user, Credentials cred); :include: typed-actor-create1
User getUserFor(String username);
}
.. code-block:: java First type is the type of the proxy, the second type is the type of the implementation.
If you need to call a specific constructor you do it like this:
import akka.actor.TypedActor; .. includecode:: code/akka/docs/actor/TypedActorDocTestBase.java
:include: typed-actor-create2
public class RegistrationServiceImpl extends TypedActor implements RegistrationService { Since you supply a ``Props``, you can specify which dispatcher to use, what the default timeout should be used and more.
public void register(User user, Credentials cred) { Now, our ``Squarer`` doesn't have any methods, so we'd better add those.
... // register user
}
public User getUserFor(String username) { .. includecode:: code/akka/docs/actor/TypedActorDocTestBase.java
... // fetch user by username :include: imports,typed-actor-iface
return user;
}
}
Then you can create an Typed Actor out of it by creating it through the 'TypedActor' factory like this: Alright, now we've got some methods we can call, but we need to implement those in ``SquarerImpl``.
.. code-block:: java .. includecode:: code/akka/docs/actor/TypedActorDocTestBase.java
:include: imports,typed-actor-impl
RegistrationService service = Excellent, now we have an interface and an implementation of that interface,
(RegistrationService) TypedActor.newInstance(RegistrationService.class, RegistrationServiceImpl.class, 1000); and we know how to create a Typed Actor from that, so let's look at calling these methods.
// The last parameter defines the timeout for Future calls
Creating Typed Actors with non-default constructor Method dispatch semantics
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -------------------------
To create a typed actor that takes constructor arguments use a variant of 'newInstance' or 'newRemoteInstance' that takes an instance of a 'TypedActorFactory' in which you can create the TypedActor in any way you like. If you use this method then make sure that no one can get a reference to the actor instance. Touching actor state directly is bypassing the whole actor dispatching mechanism and create race conditions which can lead to corrupt data. Methods returning:
Here is an example: * ``void`` will be dispatched with ``fire-and-forget`` semantics, exactly like ``ActorRef.tell``
* ``akka.dispatch.Future<?>`` will use ``send-request-reply`` semantics, exactly like ``ActorRef.ask``
.. code-block:: java * ``scala.Option<?>`` or ``akka.japi.Option<?>`` will use ``send-request-reply`` semantics, but *will* block to wait for an answer,
and return None if no answer was produced within the timout, or scala.Some/akka.japi.Some containing the result otherwise.
Service service = TypedActor.newInstance(classOf[Service], new TypedActorFactory() { Any exception that was thrown during this call will be rethrown.
public TypedActor create() { * Any other type of value will use ``send-request-reply`` semantics, but *will* block to wait for an answer,
return new ServiceWithConstructorArgsImpl("someString", 500L)); throwing ``java.util.concurrent.TimeoutException`` if there was a timeout or rethrow any exception that was thrown during this call.
});
Configuration factory class
^^^^^^^^^^^^^^^^^^^^^^^^^^^
Using a configuration object:
.. code-block:: java
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import akka.actor.TypedActorConfiguration;
import akka.util.FiniteDuration;
TypedActorConfiguration config = new TypedActorConfiguration()
.timeout(new FiniteDuration(3000, MILLISECONDS));
RegistrationService service = (RegistrationService) TypedActor.newInstance(RegistrationService.class, config);
However, often you will not use these factory methods but declaratively define the Typed Actors as part of a supervisor hierarchy. More on that in the :ref:`fault-tolerance-java` section.
Sending messages
----------------
Messages are sent simply by invoking methods on the POJO, which is proxy to the "real" POJO now. The arguments to the method are bundled up atomically into an message and sent to the receiver (the actual POJO instance).
One-way message send
^^^^^^^^^^^^^^^^^^^^
Methods that return void are turned into fire-and-forget semantics by asynchronously firing off the message and return immediately. In the example above it would be the 'register' method, so if this method is invoked then it returns immediately:
.. code-block:: java
// method invocation returns immediately and method is invoke asynchronously using the Actor Model semantics
service.register(user, creds);
Request-reply message send
^^^^^^^^^^^^^^^^^^^^^^^^^^
Methods that return something (e.g. non-void methods) are turned into send-and-receive-eventually semantics by asynchronously firing off the message and wait on the reply using a Future.
.. code-block:: java
// method invocation is asynchronously dispatched using the Actor Model semantics,
// but it blocks waiting on a Future to be resolved in the background
User user = service.getUser(username);
Generally it is preferred to use fire-forget messages as much as possible since they will never block, e.g. consume a resource by waiting. But sometimes they are neat to use since they:
# Simulates standard Java method dispatch, which is more intuitive for most Java developers
# Are a neat to model request-reply
# Are useful when you need to do things in a defined order
The same holds for the 'request-reply-with-future' described below.
Request-reply-with-future message send
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Methods that return a 'akka.dispatch.Future<TYPE>' are turned into send-and-receive-with-future semantics by asynchronously firing off the message and returns immediately with a Future. You need to use the 'future(...)' method in the TypedActor base class to resolve the Future that the client code is waiting on.
Here is an example:
.. code-block:: java
public class MathTypedActorImpl extends TypedActor implements MathTypedActor {
public Future<Integer> square(int value) {
return future(value * value);
}
}
MathTypedActor math = TypedActor.typedActorOf(MathTypedActor.class, MathTypedActorImpl.class);
// This method will return immediately when called, caller should wait on the Future for the result
Future<Integer> future = math.square(10);
future.await();
Integer result = future.get();
Stopping Typed Actors
---------------------
Once Typed Actors have been created with one of the TypedActor.newInstance methods they need to be stopped with TypedActor.stop to free resources allocated by the created Typed Actor (this is not needed when the Typed Actor is supervised).
.. code-block:: java
// Create Typed Actor
RegistrationService service = (RegistrationService) TypedActor.newInstance(RegistrationService.class);
// ...
// Free Typed Actor resources
TypedActor.stop(service);
When the Typed Actor defines a shutdown callback method (:ref:`fault-tolerance-java`) it will be invoked on TypedActor.stop.
How to use the TypedActorContext for runtime information access
---------------------------------------------------------------
The 'akka.actor.TypedActorContext' class Holds 'runtime type information' (RTTI) for the Typed Actor. This context is a member field in the TypedActor base class and holds for example the current sender reference, the current sender future etc.
Here is an example how you can use it to in a 'void' (e.g. fire-forget) method to implement request-reply by using the sender reference:
.. code-block:: java
class PingImpl implements Ping extends TypedActor {
public void hit(int count) {
Pong pong = (Pong) getContext().getSender();
pong.hit(count++);
}
}
If the sender, sender future etc. is not available, then these methods will return 'null' so you should have a way of dealing with that scenario.
Messages and immutability Messages and immutability
------------------------- -------------------------
**IMPORTANT**: Messages can be any kind of object but have to be immutable (there is a workaround, see next section). Java or Scala cant enforce immutability (yet) so this has to be by convention. Primitives like String, int, Long are always immutable. Apart from these you have to create your own immutable objects to send as messages. If you pass on a reference to an instance that is mutable then this instance can be modified concurrently by two different Typed Actors and the Actor model is broken leaving you with NO guarantees and most likely corrupt data. While Akka cannot enforce that the parameters to the methods of your Typed Actors are immutable,
we *strongly* recommend that parameters passed are immutable.
Akka can help you in this regard. It allows you to turn on an option for serializing all messages, e.g. all parameters to the Typed Actor effectively making a deep clone/copy of the parameters. This will make sending mutable messages completely safe. This option is turned on in the :ref:`configuration` file like this: One-way message send
^^^^^^^^^^^^^^^^^^^^
.. code-block:: ruby .. includecode:: code/akka/docs/actor/TypedActorDocTestBase.java
:include: typed-actor-call-oneway
akka { As simple as that! The method will be executed on another thread; asynchronously.
actor {
serialize-messages = on # does a deep clone of messages to ensure immutability
}
}
This will make a deep clone (using Java serialization) of all parameters. Request-reply message send
^^^^^^^^^^^^^^^^^^^^^^^^^^
.. includecode:: code/akka/docs/actor/TypedActorDocTestBase.java
:include: typed-actor-call-option
This will block for as long as the timeout that was set in the ``Props`` of the Typed Actor,
if needed. It will return ``None`` if a timeout occurs.
.. includecode:: code/akka/docs/actor/TypedActorDocTestBase.java
:include: typed-actor-call-strict
This will block for as long as the timeout that was set in the ``Props` of the Typed Actor,
if needed. It will throw a ``java.util.concurrent.TimeoutException`` if a timeout occurs.
Request-reply-with-future message send
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. includecode:: code/akka/docs/actor/TypedActorDocTestBase.java
:include: typed-actor-call-future
This call is asynchronous, and the Future returned can be used for asynchronous composition.
Stopping Typed Actors
---------------------
Since Akka's Typed Actors are backed by Akka Actors they must be stopped when they aren't needed anymore.
.. includecode:: code/akka/docs/actor/TypedActorDocTestBase.java
:include: typed-actor-stop
This asynchronously stops the Typed Actor associated with the specified proxy ASAP.
.. includecode:: code/akka/docs/actor/TypedActorDocTestBase.java
:include: typed-actor-poisonpill
This asynchronously stops the Typed Actor associated with the specified proxy
after it's done with all calls that were made prior to this call.
Typed Actor Hierarchies
-----------------------
Since you can obtain a contextual Typed Actor Extension by passing in an ``ActorContext``
you can create child Typed Actors by invoking ``typedActorOf(..)`` on that.
This also works for creating child Typed Actors in regular Akka Actors.
Lifecycle callbacks
-------------------
By having your Typed Actor implementation class implement any and all of the following:
* ``TypedActor.PreStart``
* ``TypedActor.PostStop``
* ``TypedActor.PreRestart``
* ``TypedActor.PostRestart``
You can hook into the lifecycle of your Typed Actor.

View file

@ -5,4 +5,9 @@
Camel Camel
####### #######
The Akka Camel module has not been migrated to Akka 2.0-SNAPSHOT yet. .. note::
The Akka Camel module has not been migrated to Akka 2.0-SNAPSHOT yet.
It might not make it into Akka 2.0 final but will then hopefully be
re-introduce in an upcoming release. It might also be backported to
2.0 final.

View file

@ -19,13 +19,13 @@ resides on crashes, then when you restart the node, the actor will be able to
continue processing as if nothing had happened; with all pending messages still continue processing as if nothing had happened; with all pending messages still
in its mailbox. in its mailbox.
None of these mailboxes implements transactions for current message. It's possible None of these mailboxes implements transactions for current message. It's possible
if the actor crashes after receiving a message, but before completing processing of if the actor crashes after receiving a message, but before completing processing of
it, that the message could be lost. it, that the message could be lost.
.. warning:: **IMPORTANT** .. warning:: **IMPORTANT**
None of these mailboxes work with blocking message send, e.g. the message None of these mailboxes work with blocking message send, i.e. the message
send operations that are relying on futures; ``?`` or ``ask``. If the node send operations that are relying on futures; ``?`` or ``ask``. If the node
has crashed and then restarted, the thread that was blocked waiting for the has crashed and then restarted, the thread that was blocked waiting for the
reply is gone and there is no way we can deliver the message. reply is gone and there is no way we can deliver the message.
@ -42,11 +42,15 @@ We'll walk through each one of these in detail in the sections below.
You can easily implement your own mailbox. Look at the existing implementations for inspiration. You can easily implement your own mailbox. Look at the existing implementations for inspiration.
Soon Akka will also have: We are also discussing adding some of these durable mailboxes:
- ``AmqpBasedMailbox`` -- AMQP based mailbox (default RabbitMQ) - ``AmqpBasedMailbox`` -- AMQP based mailbox (default RabbitMQ)
- ``JmsBasedMailbox`` -- JMS based mailbox (default ActiveMQ) - ``JmsBasedMailbox`` -- JMS based mailbox (default ActiveMQ)
- ``CassandraBasedMailbox`` -- Cassandra based mailbox
- ``CamelBasedMailbox`` -- Camel based mailbox
- ``SqlBasedMailbox`` -- SQL based mailbox for general RDBMS (Postgres, MySQL, Oracle etc.)
Let us know if you have a wish for a certain priority order.
.. _DurableMailbox.General: .. _DurableMailbox.General:
@ -57,7 +61,7 @@ The durable mailboxes and their configuration options reside in the
``akka.actor.mailbox`` package. ``akka.actor.mailbox`` package.
You configure durable mailboxes through the dispatcher. The You configure durable mailboxes through the dispatcher. The
actor is oblivious to which type of mailbox it is using. actor is oblivious to which type of mailbox it is using.
Here is an example in Scala: Here is an example in Scala:
.. includecode:: code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala .. includecode:: code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala
@ -186,13 +190,13 @@ MongoDB-based Durable Mailboxes
=============================== ===============================
This mailbox is backed by `MongoDB <http://mongodb.org>`_. This mailbox is backed by `MongoDB <http://mongodb.org>`_.
MongoDB is a fast, lightweight and scalable document-oriented database. It contains a number of MongoDB is a fast, lightweight and scalable document-oriented database. It contains a number of
features cohesive to a fast, reliable & durable queueing mechanism which the Akka Mailbox takes advantage of. features cohesive to a fast, reliable & durable queueing mechanism which the Akka Mailbox takes advantage of.
Akka's implementations of MongoDB mailboxes are built on top of the purely asynchronous MongoDB driver Akka's implementations of MongoDB mailboxes are built on top of the purely asynchronous MongoDB driver
(often known as `Hammersmith <http://github.com/bwmcadams/hammersmith>`_ and ``com.mongodb.async``) (often known as `Hammersmith <http://github.com/bwmcadams/hammersmith>`_ and ``com.mongodb.async``)
and as such are purely callback based with a Netty network layer. This makes them extremely fast & and as such are purely callback based with a Netty network layer. This makes them extremely fast &
lightweight versus building on other MongoDB implementations such as lightweight versus building on other MongoDB implementations such as
`mongo-java-driver <http://github.com/mongodb/mongo-java-driver>`_ and `Casbah <http://github.com/mongodb/casbah>`_. `mongo-java-driver <http://github.com/mongodb/mongo-java-driver>`_ and `Casbah <http://github.com/mongodb/casbah>`_.
You configure durable mailboxes through the dispatcher, as described in You configure durable mailboxes through the dispatcher, as described in
@ -206,15 +210,15 @@ Java::
akka.actor.mailbox.DurableMailboxType.mongoDurableMailboxType() akka.actor.mailbox.DurableMailboxType.mongoDurableMailboxType()
You will need to configure the URI for the MongoDB server, using the URI Format specified in the You will need to configure the URI for the MongoDB server, using the URI Format specified in the
`MongoDB Documentation <http://www.mongodb.org/display/DOCS/Connections>`_. This is done in `MongoDB Documentation <http://www.mongodb.org/display/DOCS/Connections>`_. This is done in
the ``akka.actor.mailbox.mongodb`` section in the :ref:`configuration`. the ``akka.actor.mailbox.mongodb`` section in the :ref:`configuration`.
.. literalinclude:: ../../akka-durable-mailboxes/akka-mongo-mailbox/src/main/resources/reference.conf .. literalinclude:: ../../akka-durable-mailboxes/akka-mongo-mailbox/src/main/resources/reference.conf
:language: none :language: none
You must specify a hostname (and optionally port) and at *least* a Database name. If you specify a You must specify a hostname (and optionally port) and at *least* a Database name. If you specify a
collection name, it will be used as a 'prefix' for the collections Akka creates to store mailbox messages. collection name, it will be used as a 'prefix' for the collections Akka creates to store mailbox messages.
Otherwise, collections will be prefixed with ``mailbox.`` Otherwise, collections will be prefixed with ``mailbox.``
It is also possible to configure the timeout thresholds for Read and Write operations in the ``timeout`` block. It is also possible to configure the timeout thresholds for Read and Write operations in the ``timeout`` block.

View file

@ -26,11 +26,11 @@ command (on a unix-based system):
bin/akka sample.kernel.hello.HelloKernel bin/akka sample.kernel.hello.HelloKernel
Use Ctrl-C to interrupt and exit the microkernel. Use ``Ctrl-C`` to interrupt and exit the microkernel.
On a Windows machine you can also use the bin/akka.bat script. On a Windows machine you can also use the bin/akka.bat script.
The code for the Hello Kernel example (see the HelloKernel class for an example The code for the Hello Kernel example (see the ``HelloKernel`` class for an example
of creating a Bootable): of creating a Bootable):
.. includecode:: ../../akka-samples/akka-sample-hello-kernel/src/main/scala/sample/kernel/hello/HelloKernel.scala .. includecode:: ../../akka-samples/akka-sample-hello-kernel/src/main/scala/sample/kernel/hello/HelloKernel.scala

View file

@ -5,4 +5,9 @@
Spring Integration Spring Integration
#################### ####################
The Akka Spring module has not been migrated to Akka 2.0-SNAPSHOT yet. .. note::
The Akka Spring module has not been migrated to Akka 2.0-SNAPSHOT yet.
It might not make it into Akka 2.0 final but will then hopefully be
re-introduce in an upcoming release. It might also be backported to
2.0 final.

View file

@ -1,9 +1,10 @@
.. _support: .. _support:
`Support <http://typesafe.com>`__ `Commercial Support <http://typesafe.com>`__
========================================= ============================================
`Typesafe <http://typesafe.com>`_ Commercial support is provided by `Typesafe <http://typesafe.com>`_.
Akka is now part of the `Typesafe Stack <http://typesafe.com/stack>`_.
`Mailing List <http://groups.google.com/group/akka-user>`_ `Mailing List <http://groups.google.com/group/akka-user>`_
========================================================== ==========================================================

View file

@ -10,11 +10,7 @@ Akka Snapshot
============= =============
Automatically published Scaladoc API for the latest SNAPSHOT version of Akka can Automatically published Scaladoc API for the latest SNAPSHOT version of Akka can
be found here: be found here: http://akka.io/api/akka/snapshot
- Akka - http://akka.io/api/akka/snapshot
- Akka Modules - http://akka.io/api/akka-modules/snapshot
Release Versions Release Versions

View file

@ -0,0 +1,154 @@
package akka.docs.actor
//#imports
import akka.dispatch.{ Promise, Future, Await }
import akka.util.duration._
import akka.actor.{ ActorContext, TypedActor, Props }
//#imports
import org.scalatest.{ BeforeAndAfterAll, WordSpec }
import org.scalatest.matchers.MustMatchers
import akka.testkit._
//#typed-actor-iface
trait Squarer {
//#typed-actor-iface-methods
def squareDontCare(i: Int): Unit //fire-forget
def square(i: Int): Future[Int] //non-blocking send-request-reply
def squareNowPlease(i: Int): Option[Int] //blocking send-request-reply
def squareNow(i: Int): Int //blocking send-request-reply
//#typed-actor-iface-methods
}
//#typed-actor-iface
//#typed-actor-impl
class SquarerImpl(val name: String) extends Squarer {
def this() = this("default")
//#typed-actor-impl-methods
import TypedActor.dispatcher //So we can create Promises
def squareDontCare(i: Int): Unit = i * i //Nobody cares :(
def square(i: Int): Future[Int] = Promise successful i * i
def squareNowPlease(i: Int): Option[Int] = Some(i * i)
def squareNow(i: Int): Int = i * i
//#typed-actor-impl-methods
}
//#typed-actor-impl
//#typed-actor-supercharge
trait Foo {
def doFoo(times: Int): Unit = println("doFoo(" + times + ")")
}
trait Bar {
import TypedActor.dispatcher //So we have an implicit dispatcher for our Promise
def doBar(str: String): Future[String] = Promise successful str.toUpperCase
}
class FooBar extends Foo with Bar
//#typed-actor-supercharge
class TypedActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
"get the TypedActor extension" in {
val someReference: AnyRef = null
try {
//#typed-actor-extension-tools
import akka.actor.TypedActor
//Returns the Typed Actor Extension
val extension = TypedActor(system) //system is an instance of ActorSystem
//Returns whether the reference is a Typed Actor Proxy or not
TypedActor(system).isTypedActor(someReference)
//Returns the backing Akka Actor behind an external Typed Actor Proxy
TypedActor(system).getActorRefFor(someReference)
//Returns the current ActorContext,
// method only valid within methods of a TypedActor implementation
val c: ActorContext = TypedActor.context
//Returns the external proxy of the current Typed Actor,
// method only valid within methods of a TypedActor implementation
val s: Squarer = TypedActor.self[Squarer]
//Returns a contextual instance of the Typed Actor Extension
//this means that if you create other Typed Actors with this,
//they will become children to the current Typed Actor.
TypedActor(TypedActor.context)
//#typed-actor-extension-tools
} catch {
case e: Exception //dun care
}
}
"create a typed actor" in {
//#typed-actor-create1
val mySquarer: Squarer =
TypedActor(system).typedActorOf[Squarer, SquarerImpl]()
//#typed-actor-create1
//#typed-actor-create2
val otherSquarer: Squarer =
TypedActor(system).typedActorOf(classOf[Squarer],
new SquarerImpl("foo"),
Props(),
"name")
//#typed-actor-create2
//#typed-actor-calls
//#typed-actor-call-oneway
mySquarer.squareDontCare(10)
//#typed-actor-call-oneway
//#typed-actor-call-future
val fSquare = mySquarer.square(10) //A Future[Int]
//#typed-actor-call-future
//#typed-actor-call-option
val oSquare = mySquarer.squareNowPlease(10) //Option[Int]
//#typed-actor-call-option
//#typed-actor-call-strict
val iSquare = mySquarer.squareNow(10) //Int
//#typed-actor-call-strict
//#typed-actor-calls
Await.result(fSquare, 3 seconds) must be === 100
oSquare must be === Some(100)
iSquare must be === 100
//#typed-actor-stop
TypedActor(system).stop(mySquarer)
//#typed-actor-stop
//#typed-actor-poisonpill
TypedActor(system).poisonPill(otherSquarer)
//#typed-actor-poisonpill
}
"supercharge" in {
//#typed-actor-supercharge-usage
val awesomeFooBar = TypedActor(system).typedActorOf[Foo with Bar, FooBar]()
awesomeFooBar.doFoo(10)
val f = awesomeFooBar.doBar("yes")
TypedActor(system).poisonPill(awesomeFooBar)
//#typed-actor-supercharge-usage
Await.result(f, 3 seconds) must be === "YES"
}
}

View file

@ -0,0 +1,44 @@
package akka.docs.testkit
//#plain-spec
import akka.actor.ActorSystem
import akka.actor.Actor
import akka.actor.Props
import akka.testkit.TestKit
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import org.scalatest.BeforeAndAfterAll
import akka.testkit.ImplicitSender
object MySpec {
class EchoActor extends Actor {
def receive = {
case x sender ! x
}
}
}
//#implicit-sender
class MySpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender
with WordSpec with MustMatchers with BeforeAndAfterAll {
//#implicit-sender
def this() = this(ActorSystem("MySpec"))
import MySpec._
override def afterAll {
system.shutdown()
}
"An Echo actor" must {
"send back messages unchanged" in {
val echo = system.actorOf(Props[EchoActor])
echo ! "hello world"
expectMsg("hello world")
}
}
}
//#plain-spec

View file

@ -0,0 +1,238 @@
package akka.docs.testkit
//#imports-test-probe
import akka.testkit.TestProbe
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.util.duration._
//#imports-test-probe
import akka.testkit.AkkaSpec
import akka.actor.Actor
import akka.testkit.DefaultTimeout
import akka.testkit.ImplicitSender
import akka.actor.ActorRef
import akka.actor.Props
object TestkitDocSpec {
case object Say42
case object Unknown
class MyActor extends Actor {
def receive = {
case Say42 sender ! 42
case "some work" sender ! "some result"
}
}
//#my-double-echo
class MyDoubleEcho extends Actor {
var dest1: ActorRef = _
var dest2: ActorRef = _
def receive = {
case (d1: ActorRef, d2: ActorRef)
dest1 = d1
dest2 = d2
case x
dest1 ! x
dest2 ! x
}
}
//#my-double-echo
import akka.testkit.TestProbe
//#test-probe-forward-actors
class Source(target: ActorRef) extends Actor {
def receive = {
case "start" target ! "work"
}
}
class Destination extends Actor {
def receive = {
case x // Do something..
}
}
//#test-probe-forward-actors
class LoggingActor extends Actor {
//#logging-receive
import akka.event.LoggingReceive
implicit def system = context.system
def receive = LoggingReceive(this) {
case msg // Do something...
}
//#logging-receive
}
}
class TestkitDocSpec extends AkkaSpec with DefaultTimeout with ImplicitSender {
import TestkitDocSpec._
"demonstrate usage of TestActorRef" in {
//#test-actor-ref
import akka.testkit.TestActorRef
val actorRef = TestActorRef[MyActor]
val actor = actorRef.underlyingActor
//#test-actor-ref
}
"demonstrate usage of TestFSMRef" in {
//#test-fsm-ref
import akka.testkit.TestFSMRef
import akka.actor.FSM
import akka.util.duration._
val fsm = TestFSMRef(new Actor with FSM[Int, String] {
startWith(1, "")
when(1) {
case Ev("go") goto(2) using "go"
}
when(2) {
case Ev("back") goto(1) using "back"
}
})
assert(fsm.stateName == 1)
assert(fsm.stateData == "")
fsm ! "go" // being a TestActorRef, this runs also on the CallingThreadDispatcher
assert(fsm.stateName == 2)
assert(fsm.stateData == "go")
fsm.setState(stateName = 1)
assert(fsm.stateName == 1)
assert(fsm.timerActive_?("test") == false)
fsm.setTimer("test", 12, 10 millis, true)
assert(fsm.timerActive_?("test") == true)
fsm.cancelTimer("test")
assert(fsm.timerActive_?("test") == false)
//#test-fsm-ref
}
"demonstrate testing of behavior" in {
//#test-behavior
import akka.testkit.TestActorRef
import akka.util.duration._
import akka.dispatch.Await
val actorRef = TestActorRef(new MyActor)
// hypothetical message stimulating a '42' answer
val result = Await.result((actorRef ? Say42), 5 seconds).asInstanceOf[Int]
result must be(42)
//#test-behavior
}
"demonstrate unhandled message" in {
//#test-unhandled
import akka.testkit.TestActorRef
import akka.actor.UnhandledMessageException
val ref = TestActorRef[MyActor]
intercept[UnhandledMessageException] { ref(Unknown) }
//#test-unhandled
}
"demonstrate expecting exceptions" in {
//#test-expecting-exceptions
import akka.testkit.TestActorRef
val actorRef = TestActorRef(new Actor {
def receive = {
case boom throw new IllegalArgumentException("boom")
}
})
intercept[IllegalArgumentException] { actorRef("hello") }
//#test-expecting-exceptions
}
"demonstrate within" in {
type Worker = MyActor
//#test-within
import akka.actor.Props
import akka.util.duration._
val worker = system.actorOf(Props[Worker])
within(200 millis) {
worker ! "some work"
expectMsg("some result")
expectNoMsg // will block for the rest of the 200ms
Thread.sleep(300) // will NOT make this block fail
}
//#test-within
}
"demonstrate dilated duration" in {
//#duration-dilation
import akka.util.duration._
import akka.testkit._
10.milliseconds.dilated
//#duration-dilation
}
"demonstrate usage of probe" in {
//#test-probe
val probe1 = TestProbe()
val probe2 = TestProbe()
val actor = system.actorOf(Props[MyDoubleEcho])
actor ! (probe1.ref, probe2.ref)
actor ! "hello"
probe1.expectMsg(50 millis, "hello")
probe2.expectMsg(50 millis, "hello")
//#test-probe
//#test-special-probe
case class Update(id: Int, value: String)
val probe = new TestProbe(system) {
def expectUpdate(x: Int) = {
expectMsgPF() {
case Update(id, _) if id == x true
}
sender ! "ACK"
}
}
//#test-special-probe
}
"demonstrate probe reply" in {
import akka.testkit.TestProbe
import akka.util.duration._
//#test-probe-reply
val probe = TestProbe()
val future = probe.ref ? "hello"
probe.expectMsg(0 millis, "hello") // TestActor runs on CallingThreadDispatcher
probe.sender ! "world"
assert(future.isCompleted && future.value == Some(Right("world")))
//#test-probe-reply
}
"demonstrate probe forward" in {
import akka.testkit.TestProbe
import akka.actor.Props
//#test-probe-forward
val probe = TestProbe()
val source = system.actorOf(Props(new Source(probe.ref)))
val dest = system.actorOf(Props[Destination])
source ! "start"
probe.expectMsg("work")
probe.forward(dest)
//#test-probe-forward
}
"demonstrate " in {
//#calling-thread-dispatcher
import akka.testkit.CallingThreadDispatcher
val dispatcher = new CallingThreadDispatcher(system.dispatcherFactory.prerequisites)
val ref = system.actorOf(Props[MyActor].withDispatcher(dispatcher))
//#calling-thread-dispatcher
}
}

View file

@ -29,7 +29,7 @@ Scala's Delimited Continuations plugin is required to use the Dataflow API. To e
import sbt._ import sbt._
class MyAkkaProject(info: ProjectInfo) extends DefaultProject(info) with AkkaProject with AutoCompilerPlugins { class MyAkkaProject(info: ProjectInfo) extends DefaultProject(info) with AkkaProject with AutoCompilerPlugins {
val continuationsPlugin = compilerPlugin("org.scala-lang.plugins" % "continuations" % "2.9.0") val continuationsPlugin = compilerPlugin("org.scala-lang.plugins" % "continuations" % "2.9.1")
override def compileOptions = super.compileOptions ++ compileOptions("-P:continuations:enable") override def compileOptions = super.compileOptions ++ compileOptions("-P:continuations:enable")
} }
@ -72,6 +72,7 @@ Dataflow is implemented in Akka using Scala's Delimited Continuations. To use th
.. code-block:: scala .. code-block:: scala
import Future.flow import Future.flow
implicit val dispatcher = ...
val a = Future( ... ) val a = Future( ... )
val b = Future( ... ) val b = Future( ... )
@ -81,13 +82,14 @@ Dataflow is implemented in Akka using Scala's Delimited Continuations. To use th
c << (a() + b()) c << (a() + b())
} }
val result = c.get() val result = Await.result(c, timeout)
The ``flow`` method also returns a ``Future`` for the result of the contained expression, so the previous example could also be written like this: The ``flow`` method also returns a ``Future`` for the result of the contained expression, so the previous example could also be written like this:
.. code-block:: scala .. code-block:: scala
import Future.flow import Future.flow
implicit val dispatcher = ...
val a = Future( ... ) val a = Future( ... )
val b = Future( ... ) val b = Future( ... )
@ -96,7 +98,7 @@ The ``flow`` method also returns a ``Future`` for the result of the contained ex
a() + b() a() + b()
} }
val result = c.get() val result = Await.result(c, timeout)
Examples Examples
-------- --------
@ -149,6 +151,7 @@ Example in Akka:
import akka.dispatch._ import akka.dispatch._
import Future.flow import Future.flow
implicit val dispatcher = ...
val x, y, z = Promise[Int]() val x, y, z = Promise[Int]()
@ -193,6 +196,7 @@ Example in Akka:
import akka.dispatch._ import akka.dispatch._
import Future.flow import Future.flow
implicit val dispatcher = ...
def ints(n: Int, max: Int): List[Int] = { def ints(n: Int, max: Int): List[Int] = {
if (n == max) Nil if (n == max) Nil
@ -221,6 +225,7 @@ Example in Akka:
import akka.dispatch._ import akka.dispatch._
import Future.flow import Future.flow
implicit val dispatcher = ...
// create four 'Int' data flow variables // create four 'Int' data flow variables
val x, y, z, v = Promise[Int]() val x, y, z, v = Promise[Int]()

View file

@ -1,3 +1,5 @@
.. _fsm:
### ###
FSM FSM
### ###

View file

@ -12,8 +12,6 @@ Scala API
scheduler scheduler
futures futures
dataflow dataflow
agents
transactors
fault-tolerance fault-tolerance
dispatchers dispatchers
routing routing

View file

@ -15,11 +15,6 @@ Testing Actor Systems
.. module:: akka-testkit .. module:: akka-testkit
:synopsis: Tools for Testing Actor Systems :synopsis: Tools for Testing Actor Systems
.. moduleauthor:: Roland Kuhn .. moduleauthor:: Roland Kuhn
.. versionadded:: 1.0
.. versionchanged:: 1.1
added :class:`TestActorRef`
.. versionchanged:: 1.2
added :class:`TestFSMRef`
As with any piece of software, automated tests are a very important part of the As with any piece of software, automated tests are a very important part of the
development cycle. The actor model presents a different view on how units of development cycle. The actor model presents a different view on how units of
@ -74,12 +69,7 @@ Having access to the actual :class:`Actor` object allows application of all
traditional unit testing techniques on the contained methods. Obtaining a traditional unit testing techniques on the contained methods. Obtaining a
reference is done like this: reference is done like this:
.. code-block:: scala .. includecode:: code/akka/docs/testkit/TestkitDocSpec.scala#test-actor-ref
import akka.testkit.TestActorRef
val actorRef = TestActorRef[MyActor]
val actor = actorRef.underlyingActor
Since :class:`TestActorRef` is generic in the actor type it returns the Since :class:`TestActorRef` is generic in the actor type it returns the
underlying actor with its proper static type. From this point on you may bring underlying actor with its proper static type. From this point on you may bring
@ -92,35 +82,9 @@ Testing Finite State Machines
If your actor under test is a :class:`FSM`, you may use the special If your actor under test is a :class:`FSM`, you may use the special
:class:`TestFSMRef` which offers all features of a normal :class:`TestActorRef` :class:`TestFSMRef` which offers all features of a normal :class:`TestActorRef`
and in addition allows access to the internal state:: and in addition allows access to the internal state:
import akka.testkit.TestFSMRef .. includecode:: code/akka/docs/testkit/TestkitDocSpec.scala#test-fsm-ref
import akka.util.duration._
val fsm = TestFSMRef(new Actor with FSM[Int, String] {
startWith(1, "")
when (1) {
case Ev("go") => goto(2) using "go"
}
when (2) {
case Ev("back") => goto(1) using "back"
}
})
assert (fsm.stateName == 1)
assert (fsm.stateData == "")
fsm ! "go" // being a TestActorRef, this runs also on the CallingThreadDispatcher
assert (fsm.stateName == 2)
assert (fsm.stateData == "go")
fsm.setState(stateName = 1)
assert (fsm.stateName == 1)
assert (fsm.timerActive_?("test") == false)
fsm.setTimer("test", 12, 10 millis, true)
assert (fsm.timerActive_?("test") == true)
fsm.cancelTimer("test")
assert (fsm.timerActive_?("test") == false)
Due to a limitation in Scalas type inference, there is only the factory method Due to a limitation in Scalas type inference, there is only the factory method
shown above, so you will probably write code like ``TestFSMRef(new MyFSM)`` shown above, so you will probably write code like ``TestFSMRef(new MyFSM)``
@ -150,11 +114,7 @@ usual. This trick is made possible by the :class:`CallingThreadDispatcher`
described below; this dispatcher is set implicitly for any actor instantiated described below; this dispatcher is set implicitly for any actor instantiated
into a :class:`TestActorRef`. into a :class:`TestActorRef`.
.. code-block:: scala .. includecode:: code/akka/docs/testkit/TestkitDocSpec.scala#test-behavior
val actorRef = TestActorRef(new MyActor).start()
val result = (actorRef ? Say42).as[Int] // hypothetical message stimulating a '42' answer
result must be (Some(42))
As the :class:`TestActorRef` is a subclass of :class:`LocalActorRef` with a few As the :class:`TestActorRef` is a subclass of :class:`LocalActorRef` with a few
special extras, also aspects like linking to a supervisor and restarting work special extras, also aspects like linking to a supervisor and restarting work
@ -172,7 +132,7 @@ One more special aspect which is overridden for single-threaded tests is the
To summarize: :class:`TestActorRef` overwrites two fields: it sets the To summarize: :class:`TestActorRef` overwrites two fields: it sets the
dispatcher to :obj:`CallingThreadDispatcher.global` and it sets the dispatcher to :obj:`CallingThreadDispatcher.global` and it sets the
:obj:`receiveTimeout` to zero. :obj:`receiveTimeout` to None.
The Way In-Between The Way In-Between
------------------ ------------------
@ -180,15 +140,13 @@ The Way In-Between
If you want to test the actor behavior, including hotswapping, but without If you want to test the actor behavior, including hotswapping, but without
involving a dispatcher and without having the :class:`TestActorRef` swallow involving a dispatcher and without having the :class:`TestActorRef` swallow
any thrown exceptions, then there is another mode available for you: just use any thrown exceptions, then there is another mode available for you: just use
the :class:`TestActorRef` as a partial function, the calls to the :meth:`apply` method :class:`TestActorRef`, which will be forwarded to the
:meth:`isDefinedAt` and :meth:`apply` will be forwarded to the underlying underlying actor:
actor:
.. code-block:: scala .. includecode:: code/akka/docs/testkit/TestkitDocSpec.scala#test-unhandled
val ref = TestActorRef[MyActor] The above sample assumes the default behavior for unhandled messages, i.e.
ref.isDefinedAt('unknown) must be (false) that the actor doesn't swallow all messages and doesn't override :meth:`unhandled`.
intercept[IllegalActorStateException] { ref(RequestReply) }
Use Cases Use Cases
--------- ---------
@ -221,33 +179,13 @@ stimuli at varying injection points and arrange results to be sent from
different emission points, but the basic principle stays the same in that a different emission points, but the basic principle stays the same in that a
single procedure drives the test. single procedure drives the test.
The :class:`TestKit` trait contains a collection of tools which makes this The :class:`TestKit` class contains a collection of tools which makes this
common task easy: common task easy.
.. code-block:: scala .. includecode:: code/akka/docs/testkit/PlainWordSpec.scala#plain-spec
import akka.testkit.TestKit When using ``with ImplicitSender`` the :class:`TestKit` contains an actor named :obj:`testActor`
import org.scalatest.WordSpec which is implicitly used as sender reference when dispatching messages from the test
import org.scalatest.matchers.MustMatchers
class MySpec extends WordSpec with MustMatchers with TestKit {
"An Echo actor" must {
"send back messages unchanged" in {
val echo = Actor.actorOf(Props[EchoActor])
echo ! "hello world"
expectMsg("hello world")
}
}
}
The :class:`TestKit` contains an actor named :obj:`testActor` which is
implicitly used as sender reference when dispatching messages from the test
procedure. This enables replies to be received by this internal actor, whose procedure. This enables replies to be received by this internal actor, whose
only function is to queue them so that interrogation methods like only function is to queue them so that interrogation methods like
:meth:`expectMsg` can examine them. The :obj:`testActor` may also be passed to :meth:`expectMsg` can examine them. The :obj:`testActor` may also be passed to
@ -256,6 +194,9 @@ is a whole set of examination methods, e.g. receiving all consecutive messages
matching certain criteria, receiving a whole sequence of fixed messages or matching certain criteria, receiving a whole sequence of fixed messages or
classes, receiving nothing for some time, etc. classes, receiving nothing for some time, etc.
To avoid memory leaks it is important that you shutdown the :class:`ActorSystem`
when the test is finished, as illustrated in the :meth:`afterAll` method above.
.. note:: .. note::
The test actor shuts itself down by default after 5 seconds (configurable) The test actor shuts itself down by default after 5 seconds (configurable)
@ -385,15 +326,11 @@ with message flows:
Expecting Exceptions Expecting Exceptions
-------------------- --------------------
One case which is not handled by the :obj:`testActor` is if an exception is Testing that an expected exception is thrown while processing a message sent to
thrown while processing the message sent to the actor under test. This can be the actor under test can be done by using a :class:`TestActorRef` :meth:`apply` based
tested by using a :class:`Future` based invocation:: invocation:
// assuming ScalaTest ShouldMatchers .. includecode:: code/akka/docs/testkit/TestkitDocSpec.scala#test-expecting-exceptions
evaluating {
(someActor ? badOperation).await.get
} should produce [UnhandledMessageException]
.. _TestKit.within: .. _TestKit.within:
@ -426,21 +363,7 @@ It should be noted that if the last message-receiving assertion of the block is
latencies. This means that while individual contained assertions still use the latencies. This means that while individual contained assertions still use the
maximum time bound, the overall block may take arbitrarily longer in this case. maximum time bound, the overall block may take arbitrarily longer in this case.
.. code-block:: scala .. includecode:: code/akka/docs/testkit/TestkitDocSpec.scala#test-within
class SomeSpec extends WordSpec with MustMatchers with TestKit {
"A Worker" must {
"send timely replies" in {
val worker = ActorSystem().actorOf(...)
within (500 millis) {
worker ! "some work"
expectMsg("some result")
expectNoMsg // will block for the rest of the 500ms
Thread.sleep(1000) // will NOT make this block fail
}
}
}
}
.. note:: .. note::
@ -460,25 +383,18 @@ invariably lead to spurious test failures on the heavily loaded Jenkins server
internally scaled by a factor taken from the :ref:`configuration`, internally scaled by a factor taken from the :ref:`configuration`,
``akka.test.timefactor``, which defaults to 1. ``akka.test.timefactor``, which defaults to 1.
You can scale other durations with the same factor by using the implicit conversion
in ``akka.testkit`` package object to add dilated function to :class:`Duration`.
.. includecode:: code/akka/docs/testkit/TestkitDocSpec.scala#duration-dilation
Resolving Conflicts with Implicit ActorRef Resolving Conflicts with Implicit ActorRef
------------------------------------------ ------------------------------------------
If you want the sender of messages inside your TestKit-based tests to be the `testActor``` If you want the sender of messages inside your TestKit-based tests to be the ``testActor``
simply mix in `ÌmplicitSender`` into your test. simply mix in ``ÌmplicitSender`` into your test.
.. code-block:: scala .. includecode:: code/akka/docs/testkit/PlainWordSpec.scala#implicit-sender
class SomeSpec extends WordSpec with MustMatchers with TestKit with ImplicitSender {
"A Worker" must {
"send timely replies" in {
val worker = ActorSystem().actorOf(...)
within (500 millis) {
worker ! "some work" // testActor is the "sender" for this message
expectMsg("some result")
}
}
}
}
Using Multiple Probe Actors Using Multiple Probe Actors
--------------------------- ---------------------------
@ -489,42 +405,16 @@ at the :obj:`testActor` when using the :class:`TestKit` as a mixin. Another
approach is to use it for creation of simple probe actors to be inserted in the approach is to use it for creation of simple probe actors to be inserted in the
message flows. To make this more powerful and convenient, there is a concrete message flows. To make this more powerful and convenient, there is a concrete
implementation called :class:`TestProbe`. The functionality is best explained implementation called :class:`TestProbe`. The functionality is best explained
using a small example:: using a small example:
class MyDoubleEcho extends Actor { .. includecode:: code/akka/docs/testkit/TestkitDocSpec.scala
var dest1 : ActorRef = _ :include: imports-test-probe,my-double-echo,test-probe
var dest2 : ActorRef = _
def receive = {
case (d1: ActorRef, d2: ActorRef) =>
dest1 = d1
dest2 = d2
case x =>
dest1 ! x
dest2 ! x
}
}
val probe1 = TestProbe()
val probe2 = TestProbe()
val actor = ActorSystem().actorOf(Props[MyDoubleEcho])
actor ! (probe1.ref, probe2.ref)
actor ! "hello"
probe1.expectMsg(50 millis, "hello")
probe2.expectMsg(50 millis, "hello")
Probes may also be equipped with custom assertions to make your test code even Probes may also be equipped with custom assertions to make your test code even
more concise and clear:: more concise and clear:
case class Update(id : Int, value : String) .. includecode:: code/akka/docs/testkit/TestkitDocSpec.scala
:include: test-special-probe
val probe = new TestProbe {
def expectUpdate(x : Int) = {
expectMsg {
case Update(id, _) if id == x => true
}
reply("ACK")
}
}
You have complete flexibility here in mixing and matching the :class:`TestKit` You have complete flexibility here in mixing and matching the :class:`TestKit`
facilities with your own checks and choosing an intuitive name for it. In real facilities with your own checks and choosing an intuitive name for it. In real
@ -535,13 +425,9 @@ Replying to Messages Received by Probes
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The probes keep track of the communications channel for replies, if possible, The probes keep track of the communications channel for replies, if possible,
so they can also reply:: so they can also reply:
val probe = TestProbe() .. includecode:: code/akka/docs/testkit/TestkitDocSpec.scala#test-probe-reply
val future = probe.ref ? "hello"
probe.expectMsg(0 millis, "hello") // TestActor runs on CallingThreadDispatcher
probe.reply("world")
assert (future.isCompleted && future.as[String] == "world")
Forwarding Messages Received by Probes Forwarding Messages Received by Probes
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
@ -550,15 +436,10 @@ Given a destination actor ``dest`` which in the nominal actor network would
receive a message from actor ``source``. If you arrange for the message to be receive a message from actor ``source``. If you arrange for the message to be
sent to a :class:`TestProbe` ``probe`` instead, you can make assertions sent to a :class:`TestProbe` ``probe`` instead, you can make assertions
concerning volume and timing of the message flow while still keeping the concerning volume and timing of the message flow while still keeping the
network functioning:: network functioning:
val probe = TestProbe() .. includecode:: code/akka/docs/testkit/TestkitDocSpec.scala
val system = ActorSystem() :include: test-probe-forward-actors,test-probe-forward
val source = system.actorOf(Props(new Source(probe)))
val dest = system.actorOf(Props[Destination])
source ! "start"
probe.expectMsg("work")
probe.forward(dest)
The ``dest`` actor will receive the same message invocation as if no test probe The ``dest`` actor will receive the same message invocation as if no test probe
had intervened. had intervened.
@ -572,7 +453,7 @@ described :ref:`above <TestKit.within>` is local to each probe. Hence, probes
do not react to each other's deadlines or to the deadline set in an enclosing do not react to each other's deadlines or to the deadline set in an enclosing
:class:`TestKit` instance:: :class:`TestKit` instance::
class SomeTest extends TestKit { class SomeTest extends TestKit(_system: ActorSystem) with ImplicitSender {
val probe = TestProbe() val probe = TestProbe()
@ -599,25 +480,9 @@ so long as all intervening actors run on this dispatcher.
How to use it How to use it
------------- -------------
Just set the dispatcher as you normally would, either from within the actor Just set the dispatcher as you normally would:
.. code-block:: scala .. includecode:: code/akka/docs/testkit/TestkitDocSpec.scala#calling-thread-dispatcher
import akka.testkit.CallingThreadDispatcher
class MyActor extends Actor {
self.dispatcher = CallingThreadDispatcher.global
...
}
or from the client code
.. code-block:: scala
val ref = system.actorOf(Props[MyActor].withDispatcher(CallingThreadDispatcher.global))
As the :class:`CallingThreadDispatcher` does not have any configurable state,
you may always use the (lazily) preallocated one as shown in the examples.
How it works How it works
------------ ------------
@ -719,13 +584,11 @@ options:
This is enabled by a setting in the :ref:`configuration` — namely This is enabled by a setting in the :ref:`configuration` — namely
``akka.actor.debug.receive`` — which enables the :meth:`loggable` ``akka.actor.debug.receive`` — which enables the :meth:`loggable`
statement to be applied to an actors :meth:`receive` function:: statement to be applied to an actors :meth:`receive` function:
import akka.event.LoggingReceive .. includecode:: code/akka/docs/testkit/TestkitDocSpec.scala#logging-receive
def receive = LoggingReceive(this) {
case msg => ...
}
.
The first argument to :meth:`LoggingReceive` defines the source to be used in the The first argument to :meth:`LoggingReceive` defines the source to be used in the
logging events, which should be the current actor. logging events, which should be the current actor.
@ -755,12 +618,12 @@ All these messages are logged at ``DEBUG`` level. To summarize, you can enable
full logging of actor activities using this configuration fragment:: full logging of actor activities using this configuration fragment::
akka { akka {
loglevel = "DEBUG" loglevel = DEBUG
actor { actor {
debug { debug {
receive = "true" receive = on
autoreceive = "true" autoreceive = on
lifecycle = "true" lifecycle = on
} }
} }
} }

View file

@ -5,186 +5,171 @@ Typed Actors (Scala)
.. contents:: :local: .. contents:: :local:
The Typed Actors are implemented through `Typed Actors <http://en.wikipedia.org/wiki/Active_object>`_. It uses AOP through `AspectWerkz <http://aspectwerkz.codehaus.org/>`_ to turn regular POJOs into asynchronous non-blocking Actors with semantics of the Actor Model. Each method dispatch is turned into a message that is put on a queue to be processed by the Typed Actor sequentially one by one. Akka Typed Actors is an implementation of the `Active Objects <http://en.wikipedia.org/wiki/Active_object>`_ pattern.
Essentially turning method invocations into asynchronous dispatch instead of synchronous that has been the default way since Smalltalk came out.
If you are using the `Spring Framework <http://springsource.org>`_ then take a look at Akka's `Spring integration <spring-integration>`_. Typed Actors consist of 2 "parts", a public interface and an implementation, and if you've done any work in "enterprise" Java, this will be very familiar to you. As with normal Actors you have an external API (the public interface instance) that will delegate methodcalls asynchronously to
a private instance of the implementation.
**WARNING:** Do not configure to use a ``BalancingDispatcher`` with your ``TypedActors``, it just isn't safe with how ``TypedActors`` currently are implemented. This limitation will most likely be removed in the future. The advantage of Typed Actors vs. Actors is that with TypedActors you have a static contract, and don't need to define your own messages, the downside is that it places some limitations on what you can do and what you can't, i.e. you can't use become/unbecome.
Typed Actors are implemented using `JDK Proxies <http://docs.oracle.com/javase/6/docs/api/java/lang/reflect/Proxy.html>`_ which provide a pretty easy-worked API to intercept method calls.
.. note::
Just as with regular Akka Actors, Typed Actors process one call at a time.
The tools of the trade
----------------------
Before we create our first Typed Actor we should first go through the tools that we have at our disposal,
it's located in ``akka.actor.TypedActor``.
.. includecode:: code/akka/docs/actor/TypedActorDocSpec.scala
:include: typed-actor-extension-tools
.. warning::
Same as not exposing ``this`` of an Akka Actor, it's important not to expose ``this`` of a Typed Actor,
instead you should pass the external proxy reference, which is obtained from within your Typed Actor as
``TypedActor.self``, this is your external identity, as the ``ActorRef`` is the external identity of
an Akka Actor.
Creating Typed Actors Creating Typed Actors
--------------------- ---------------------
**IMPORTANT:** The Typed Actors class must have access modifier 'public' (which is default) and can't be an inner class (unless it is an inner class in an 'object'). To create a Typed Actor you need to have one or more interfaces, and one implementation.
Akka turns POJOs with interface and implementation into asynchronous (Typed) Actors. Akka is using `AspectWerkzs Proxy <http://blogs.codehaus.org/people/jboner/archives/000914_awproxy_proxy_on_steriods.html>`_ implementation, which is the `most performant <http://docs.codehaus.org/display/AW/AOP+Benchmark>`_ proxy implementation there exists. Our example interface:
In order to create a Typed Actor you have to subclass the ``TypedActor`` base class. .. includecode:: code/akka/docs/actor/TypedActorDocSpec.scala
:include: imports,typed-actor-iface
:exclude: typed-actor-iface-methods
Here is an example. Our example implementation of that interface:
If you have a POJO with an interface implementation separation like this: .. includecode:: code/akka/docs/actor/TypedActorDocSpec.scala
:include: imports,typed-actor-impl
:exclude: typed-actor-impl-methods
.. code-block:: scala The most trivial way of creating a Typed Actor instance
of our Squarer:
import akka.actor.TypedActor .. includecode:: code/akka/docs/actor/TypedActorDocSpec.scala
:include: typed-actor-create1
trait RegistrationService { First type is the type of the proxy, the second type is the type of the implementation.
def register(user: User, cred: Credentials): Unit If you need to call a specific constructor you do it like this:
def getUserFor(username: String): User
}
.. code-block:: scala .. includecode:: code/akka/docs/actor/TypedActorDocSpec.scala
:include: typed-actor-create2
public class RegistrationServiceImpl extends TypedActor with RegistrationService { Since you supply a Props, you can specify which dispatcher to use, what the default timeout should be used and more.
def register(user: User, cred: Credentials) { Now, our Squarer doesn't have any methods, so we'd better add those.
... // register user
}
def getUserFor(username: String): User = { .. includecode:: code/akka/docs/actor/TypedActorDocSpec.scala
... // fetch user by username :include: imports,typed-actor-iface
user
}
}
Then you can create an Typed Actor out of it by creating it through the ``TypedActor`` factory like this: Alright, now we've got some methods we can call, but we need to implement those in SquarerImpl.
.. code-block:: scala .. includecode:: code/akka/docs/actor/TypedActorDocSpec.scala
:include: imports,typed-actor-impl
val service = TypedActor.newInstance(classOf[RegistrationService], classOf[RegistrationServiceImpl], 1000) Excellent, now we have an interface and an implementation of that interface,
// The last parameter defines the timeout for Future calls and we know how to create a Typed Actor from that, so let's look at calling these methods.
Creating Typed Actors with non-default constructor Method dispatch semantics
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -------------------------
To create a typed actor that takes constructor arguments use a variant of ``newInstance`` or ``newRemoteInstance`` that takes a call-by-name block in which you can create the Typed Actor in any way you like. Methods returning:
Here is an example: * ``Unit`` will be dispatched with ``fire-and-forget`` semantics, exactly like ``ActorRef.tell``
* ``akka.dispatch.Future[_]`` will use ``send-request-reply`` semantics, exactly like ``ActorRef.ask``
.. code-block:: scala * ``scala.Option[_]`` or ``akka.japi.Option<?>`` will use ``send-request-reply`` semantics, but *will* block to wait for an answer,
and return None if no answer was produced within the timout, or scala.Some/akka.japi.Some containing the result otherwise.
val service = TypedActor.newInstance(classOf[Service], new ServiceWithConstructorArgs("someString", 500L)) Any exception that was thrown during this call will be rethrown.
* Any other type of value will use ``send-request-reply`` semantics, but *will* block to wait for an answer,
Configuration factory class throwing ``java.util.concurrent.TimeoutException`` if there was a timeout or rethrow any exception that was thrown during this call.
^^^^^^^^^^^^^^^^^^^^^^^^^^^
Using a configuration object:
.. code-block:: scala
import akka.actor.TypedActorConfiguration
import akka.util.Duration
import akka.util.duration._
val config = TypedActorConfiguration()
.timeout(3000 millis)
val service = TypedActor.newInstance(classOf[RegistrationService], classOf[RegistrationServiceImpl], config)
However, often you will not use these factory methods but declaratively define the Typed Actors as part of a supervisor hierarchy. More on that in the :ref:`fault-tolerance-scala` section.
Sending messages
----------------
Messages are sent simply by invoking methods on the POJO, which is proxy to the "real" POJO now. The arguments to the method are bundled up atomically into an message and sent to the receiver (the actual POJO instance).
One-way message send
^^^^^^^^^^^^^^^^^^^^
Methods that return void are turned into fire-and-forget semantics by asynchronously firing off the message and return immediately. In the example above it would be the 'register' method, so if this method is invoked then it returns immediately:
.. code-block:: java
// method invocation returns immediately and method is invoke asynchronously using the Actor Model semantics
service.register(user, creds)
Request-reply message send
^^^^^^^^^^^^^^^^^^^^^^^^^^
Methods that return something (e.g. non-void methods) are turned into send-and-receive-eventually semantics by asynchronously firing off the message and wait on the reply using a Future.
.. code-block:: scala
// method invocation is asynchronously dispatched using the Actor Model semantics,
// but it blocks waiting on a Future to be resolved in the background
val user = service.getUser(username)
Generally it is preferred to use fire-forget messages as much as possible since they will never block, e.g. consume a resource by waiting. But sometimes they are neat to use since they:
* Simulates standard Java method dispatch, which is more intuitive for most Java developers
* Are a neat to model request-reply
* Are useful when you need to do things in a defined order
Request-reply-with-future message send
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Methods that return a ``akka.dispatch.Future<TYPE>`` are turned into send-and-receive-with-future semantics by asynchronously firing off the message and returns immediately with a Future. You need to use the ``future(...)`` method in the ``TypedActor`` base class to resolve the Future that the client code is waiting on.
Here is an example:
.. code-block:: scala
class MathTypedActorImpl extends TypedActor with MathTypedActor {
def square(x: Int): Future[Integer] = future(x * x)
}
// create the ping actor
val math = TypedActor.newInstance(classOf[MathTyped], classOf[MathTypedImpl])
// This method will return immediately when called, caller should wait on the Future for the result
val future = math.square(10)
future.await
val result: Int = future.get
Stopping Typed Actors
---------------------
Once Typed Actors have been created with one of the ``TypedActor.newInstance`` methods they need to be stopped with ``TypedActor.stop`` to free resources allocated by the created Typed Actor (this is not needed when the Typed Actor is supervised).
.. code-block:: scala
// Create Typed Actor
val service = TypedActor.newInstance(classOf[RegistrationService], classOf[RegistrationServiceImpl], 1000)
// ...
// Free Typed Actor resources
TypedActor.stop(service)
When the Typed Actor defines a shutdown callback method (:ref:`fault-tolerance-scala`) it will be invoked on ``TypedActor.stop``.
How to use the TypedActorContext for runtime information access
---------------------------------------------------------------
The ``akka.actor.TypedActorContext`` class Holds 'runtime type information' (RTTI) for the Typed Actor. This context is a member field in the ``TypedActor`` base class and holds for example the current sender reference, the current sender future etc.
Here is an example how you can use it to in a 'void' (e.g. fire-forget) method to implement request-reply by using the sender reference:
.. code-block:: scala
class PingImpl extends TypedActor with Ping {
def hit(count: Int) {
val pong = context.getSender.asInstanceOf[Pong]
pong.hit(count++)
}
}
If the sender, sender future etc. is not available, then these methods will return ``null`` so you should have a way of dealing with that scenario.
Messages and immutability Messages and immutability
------------------------- -------------------------
**IMPORTANT**: Messages can be any kind of object but have to be immutable (there is a workaround, see next section). Java or Scala cant enforce immutability (yet) so this has to be by convention. Primitives like String, int, Long are always immutable. Apart from these you have to create your own immutable objects to send as messages. If you pass on a reference to an instance that is mutable then this instance can be modified concurrently by two different Typed Actors and the Actor model is broken leaving you with NO guarantees and most likely corrupt data. While Akka cannot enforce that the parameters to the methods of your Typed Actors are immutable,
we *strongly* recommend that parameters passed are immutable.
Akka can help you in this regard. It allows you to turn on an option for serializing all messages, e.g. all parameters to the Typed Actor effectively making a deep clone/copy of the parameters. This will make sending mutable messages completely safe. This option is turned on in the :ref:`configuration` file like this: One-way message send
^^^^^^^^^^^^^^^^^^^^
.. code-block:: ruby .. includecode:: code/akka/docs/actor/TypedActorDocSpec.scala
:include: typed-actor-call-oneway
akka { As simple as that! The method will be executed on another thread; asynchronously.
actor {
# does a deep clone of messages to ensure immutability
serialize-messages = on
}
}
This will make a deep clone (using Java serialization) of all parameters. Request-reply message send
^^^^^^^^^^^^^^^^^^^^^^^^^^
.. includecode:: code/akka/docs/actor/TypedActorDocSpec.scala
:include: typed-actor-call-option
This will block for as long as the timeout that was set in the Props of the Typed Actor,
if needed. It will return ``None`` if a timeout occurs.
.. includecode:: code/akka/docs/actor/TypedActorDocSpec.scala
:include: typed-actor-call-strict
This will block for as long as the timeout that was set in the Props of the Typed Actor,
if needed. It will throw a ``java.util.concurrent.TimeoutException`` if a timeout occurs.
Request-reply-with-future message send
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. includecode:: code/akka/docs/actor/TypedActorDocSpec.scala
:include: typed-actor-call-future
This call is asynchronous, and the Future returned can be used for asynchronous composition.
Stopping Typed Actors
---------------------
Since Akkas Typed Actors are backed by Akka Actors they must be stopped when they aren't needed anymore.
.. includecode:: code/akka/docs/actor/TypedActorDocSpec.scala
:include: typed-actor-stop
This asynchronously stops the Typed Actor associated with the specified proxy ASAP.
.. includecode:: code/akka/docs/actor/TypedActorDocSpec.scala
:include: typed-actor-poisonpill
This asynchronously stops the Typed Actor associated with the specified proxy
after it's done with all calls that were made prior to this call.
Typed Actor Hierarchies
-----------------------
Since you can obtain a contextual Typed Actor Extension by passing in an ``ActorContext``
you can create child Typed Actors by invoking ``typedActorOf(..)`` on that.
This also works for creating child Typed Actors in regular Akka Actors.
Lifecycle callbacks
-------------------
By having your Typed Actor implementation class implement any and all of the following:
* ``TypedActor.PreStart``
* ``TypedActor.PostStop``
* ``TypedActor.PreRestart``
* ``TypedActor.PostRestart``
You can hook into the lifecycle of your Typed Actor.
Supercharging
-------------
Here's an example on how you can use traits to mix in behavior in your Typed Actors.
.. includecode:: code/akka/docs/actor/TypedActorDocSpec.scala#typed-actor-supercharge
.. includecode:: code/akka/docs/actor/TypedActorDocSpec.scala#typed-actor-supercharge-usage

View file

@ -9,6 +9,7 @@ import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor._ import akka.actor._
object BeanstalkBasedMailboxExtension extends ExtensionId[BeanstalkMailboxSettings] with ExtensionIdProvider { object BeanstalkBasedMailboxExtension extends ExtensionId[BeanstalkMailboxSettings] with ExtensionIdProvider {
override def get(system: ActorSystem): BeanstalkMailboxSettings = super.get(system)
def lookup() = this def lookup() = this
def createExtension(system: ActorSystemImpl) = new BeanstalkMailboxSettings(system.settings.config) def createExtension(system: ActorSystemImpl) = new BeanstalkMailboxSettings(system.settings.config)
} }

View file

@ -9,6 +9,7 @@ import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor._ import akka.actor._
object FileBasedMailboxExtension extends ExtensionId[FileBasedMailboxSettings] with ExtensionIdProvider { object FileBasedMailboxExtension extends ExtensionId[FileBasedMailboxSettings] with ExtensionIdProvider {
override def get(system: ActorSystem): FileBasedMailboxSettings = super.get(system)
def lookup() = this def lookup() = this
def createExtension(system: ActorSystemImpl) = new FileBasedMailboxSettings(system.settings.config) def createExtension(system: ActorSystemImpl) = new FileBasedMailboxSettings(system.settings.config)
} }

View file

@ -10,7 +10,8 @@ akka {
mailbox { mailbox {
mongodb { mongodb {
# Any specified collection name will be used as a prefix for collections that use durable mongo mailboxes # Any specified collection name will be used as a prefix for
# collections that use durable mongo mailboxes.
# Follow Mongo URI Spec - http://www.mongodb.org/display/DOCS/Connections # Follow Mongo URI Spec - http://www.mongodb.org/display/DOCS/Connections
uri = "mongodb://localhost/akka.mailbox" uri = "mongodb://localhost/akka.mailbox"

View file

@ -9,6 +9,7 @@ import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor._ import akka.actor._
object MongoBasedMailboxExtension extends ExtensionId[MongoBasedMailboxSettings] with ExtensionIdProvider { object MongoBasedMailboxExtension extends ExtensionId[MongoBasedMailboxSettings] with ExtensionIdProvider {
override def get(system: ActorSystem): MongoBasedMailboxSettings = super.get(system)
def lookup() = this def lookup() = this
def createExtension(system: ActorSystemImpl) = new MongoBasedMailboxSettings(system.settings.config) def createExtension(system: ActorSystemImpl) = new MongoBasedMailboxSettings(system.settings.config)
} }

View file

@ -7,6 +7,7 @@ import com.typesafe.config.Config
import akka.actor._ import akka.actor._
object RedisBasedMailboxExtension extends ExtensionId[RedisBasedMailboxSettings] with ExtensionIdProvider { object RedisBasedMailboxExtension extends ExtensionId[RedisBasedMailboxSettings] with ExtensionIdProvider {
override def get(system: ActorSystem): RedisBasedMailboxSettings = super.get(system)
def lookup() = this def lookup() = this
def createExtension(system: ActorSystemImpl) = new RedisBasedMailboxSettings(system.settings.config) def createExtension(system: ActorSystemImpl) = new RedisBasedMailboxSettings(system.settings.config)
} }

View file

@ -9,6 +9,7 @@ import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor._ import akka.actor._
object ZooKeeperBasedMailboxExtension extends ExtensionId[ZooKeeperBasedMailboxSettings] with ExtensionIdProvider { object ZooKeeperBasedMailboxExtension extends ExtensionId[ZooKeeperBasedMailboxSettings] with ExtensionIdProvider {
override def get(system: ActorSystem): ZooKeeperBasedMailboxSettings = super.get(system)
def lookup() = this def lookup() = this
def createExtension(system: ActorSystemImpl) = new ZooKeeperBasedMailboxSettings(system.settings.config) def createExtension(system: ActorSystemImpl) = new ZooKeeperBasedMailboxSettings(system.settings.config)
} }

View file

@ -13,20 +13,22 @@ akka {
default { default {
# if this is set to a valid remote address, the named actor will be deployed at that node # if this is set to a valid remote address, the named actor will be deployed
# e.g. "akka://sys@host:port" # at that node e.g. "akka://sys@host:port"
remote = "" remote = ""
target { target {
# A list of hostnames and ports for instantiating the children of a non-direct router # A list of hostnames and ports for instantiating the children of a
# non-direct router
# The format should be on "akka://sys@host:port", where: # The format should be on "akka://sys@host:port", where:
# - sys is the remote actor system name # - sys is the remote actor system name
# - hostname can be either hostname or IP address the remote actor should connect to # - hostname can be either hostname or IP address the remote actor
# should connect to
# - port should be the port for the remote server on the other node # - port should be the port for the remote server on the other node
# The number of actor instances to be spawned is still taken from the nr-of-instances # The number of actor instances to be spawned is still taken from the
# setting as for local routers; the instances will be distributed round-robin among the # nr-of-instances setting as for local routers; the instances will be
# given nodes. # distributed round-robin among the given nodes.
nodes = [] nodes = []
} }
@ -53,9 +55,10 @@ akka {
failure-detector { failure-detector {
# defines the failure detector threshold # defines the failure detector threshold
# A low threshold is prone to generate many wrong suspicions but ensures a # A low threshold is prone to generate many wrong suspicions but ensures
# quick detection in the event of a real crash. Conversely, a high threshold # a quick detection in the event of a real crash. Conversely, a high
# generates fewer mistakes but needs more time to detect actual crashes # threshold generates fewer mistakes but needs more time to detect
# actual crashes
threshold = 8 threshold = 8
max-sample-size = 1000 max-sample-size = 1000
@ -73,10 +76,12 @@ akka {
} }
server { server {
# The hostname or ip to bind the remoting to, InetAddress.getLocalHost.getHostAddress is used if empty # The hostname or ip to bind the remoting to,
# InetAddress.getLocalHost.getHostAddress is used if empty
hostname = "" hostname = ""
# The default remote server port clients should connect to. Default is 2552 (AKKA) # The default remote server port clients should connect to.
# Default is 2552 (AKKA)
port = 2552 port = 2552
# Increase this if you want to be able to send messages with large payloads # Increase this if you want to be able to send messages with large payloads
@ -85,10 +90,12 @@ akka {
# Timeout duration # Timeout duration
connection-timeout = 120s connection-timeout = 120s
# Should the remote server require that it peers share the same secure-cookie (defined in the 'remote' section)? # Should the remote server require that it peers share the same secure-cookie
# (defined in the 'remote' section)?
require-cookie = off require-cookie = off
# Enable untrusted mode for full security of server managed actors, allows untrusted clients to connect. # Enable untrusted mode for full security of server managed actors, allows
# untrusted clients to connect.
untrusted-mode = off untrusted-mode = off
# Sets the size of the connection backlog # Sets the size of the connection backlog
@ -97,11 +104,13 @@ akka {
client { client {
buffering { buffering {
# Should message buffering on remote client error be used (buffer flushed on successful reconnect) # Should message buffering on remote client error be used (buffer flushed
# on successful reconnect)
retry-message-send-on-failure = off retry-message-send-on-failure = off
# If negative (or zero) then an unbounded mailbox is used (default) # If negative (or zero) then an unbounded mailbox is used (default)
# If positive then a bounded mailbox is used and the capacity is set using the property # If positive then a bounded mailbox is used and the capacity is set using
# the property
capacity = -1 capacity = -1
} }
reconnect-delay = 5s reconnect-delay = 5s

View file

@ -7,10 +7,14 @@
akka { akka {
test { test {
# factor by which to scale timeouts during tests, e.g. to account for shared build system load # factor by which to scale timeouts during tests, e.g. to account for shared
# build system load
timefactor = 1.0 timefactor = 1.0
# duration of EventFilter.intercept waits after the block is finished until all required messages are received
# duration of EventFilter.intercept waits after the block is finished until
# all required messages are received
filter-leeway = 3s filter-leeway = 3s
# duration to wait in expectMsg and friends outside of within() block by default # duration to wait in expectMsg and friends outside of within() block by default
single-expect-default = 3s single-expect-default = 3s
} }

View file

@ -9,6 +9,7 @@ import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor.{ ExtensionId, ActorSystem, Extension, ActorSystemImpl } import akka.actor.{ ExtensionId, ActorSystem, Extension, ActorSystemImpl }
object TestKitExtension extends ExtensionId[TestKitSettings] { object TestKitExtension extends ExtensionId[TestKitSettings] {
override def get(system: ActorSystem): TestKitSettings = super.get(system)
def createExtension(system: ActorSystemImpl): TestKitSettings = new TestKitSettings(system.settings.config) def createExtension(system: ActorSystemImpl): TestKitSettings = new TestKitSettings(system.settings.config)
} }

View file

@ -79,12 +79,11 @@ public class Pi {
public void onReceive(Object message) { public void onReceive(Object message) {
if (message instanceof Work) { if (message instanceof Work) {
Work work = (Work) message; Work work = (Work) message;
double result = calculatePiFor(work.getStart(), work.getNrOfElements()); double result = calculatePiFor(work.getStart(), work.getNrOfElements());
getSender().tell(new Result(result)); getSender().tell(new Result(result));
} else {
} else throw new IllegalArgumentException("Unknown message [" + message + "]"); throw new IllegalArgumentException("Unknown message [" + message + "]");
}
} }
} }
//#worker //#worker
@ -108,9 +107,9 @@ public class Pi {
this.latch = latch; this.latch = latch;
//#create-router //#create-router
router = this.getContext().actorOf(new Props().withCreator( router = this.getContext().actorOf(
Worker.class).withRouter(new RoundRobinRouter(nrOfWorkers)), new Props(Worker.class).withRouter(new RoundRobinRouter(nrOfWorkers)),
"pi"); "pi");
//#create-router //#create-router
} }
@ -139,8 +138,8 @@ public class Pi {
@Override @Override
public void postStop() { public void postStop() {
System.out.println(String.format( System.out.println(String.format(
"\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis", "\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis",
pi, (System.currentTimeMillis() - start))); pi, (System.currentTimeMillis() - start)));
latch.countDown(); latch.countDown();
} }
} }

View file

@ -42,7 +42,8 @@ object Pi extends App {
//#worker //#worker
//#master //#master
class Master(nrOfWorkers: Int, nrOfMessages: Int, nrOfElements: Int, latch: CountDownLatch) class Master(
nrOfWorkers: Int, nrOfMessages: Int, nrOfElements: Int, latch: CountDownLatch)
extends Actor { extends Actor {
var pi: Double = _ var pi: Double = _
@ -50,7 +51,8 @@ object Pi extends App {
var start: Long = _ var start: Long = _
//#create-router //#create-router
val router = context.actorOf(Props[Worker].withRouter(RoundRobinRouter(nrOfWorkers)), "pi") val router = context.actorOf(
Props[Worker].withRouter(RoundRobinRouter(nrOfWorkers)), "pi")
//#create-router //#create-router
//#master-receive //#master-receive
@ -72,9 +74,8 @@ object Pi extends App {
} }
override def postStop() { override def postStop() {
println( println("\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis"
"\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis" .format(pi, (System.currentTimeMillis - start)))
.format(pi, (System.currentTimeMillis - start)))
latch.countDown() latch.countDown()
} }
} }