Merge branch 'master' into wip-stash

This commit is contained in:
Philipp Haller 2012-02-06 14:52:26 +01:00
commit 2f99078d4d
37 changed files with 453 additions and 514 deletions

View file

@ -15,24 +15,6 @@ import scala.reflect.BeanInfo
import com.google.protobuf.Message
import akka.pattern.ask
class ProtobufSerializer extends Serializer {
val ARRAY_OF_BYTE_ARRAY = Array[Class[_]](classOf[Array[Byte]])
def includeManifest: Boolean = true
def identifier = 2
def toBinary(obj: AnyRef): Array[Byte] = {
if (!obj.isInstanceOf[Message]) throw new IllegalArgumentException(
"Can't serialize a non-protobuf message using protobuf [" + obj + "]")
obj.asInstanceOf[Message].toByteArray
}
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]], classLoader: Option[ClassLoader] = None): AnyRef = {
if (!clazz.isDefined) throw new IllegalArgumentException(
"Need a protobuf message class to be able to serialize bytes using protobuf")
clazz.get.getDeclaredMethod("parseFrom", ARRAY_OF_BYTE_ARRAY: _*).invoke(null, bytes).asInstanceOf[Message]
}
}
object SerializeSpec {
val serializationConf = ConfigFactory.parseString("""
@ -40,11 +22,12 @@ object SerializeSpec {
actor {
serializers {
java = "akka.serialization.JavaSerializer"
test = "akka.serialization.TestSerializer"
}
serialization-bindings {
java = ["akka.serialization.SerializeSpec$Person", "akka.serialization.SerializeSpec$Address", "akka.serialization.MyJavaSerializableActor", "akka.serialization.MyStatelessActorWithMessagesInMailbox", "akka.serialization.MyActorWithProtobufMessagesInMailbox"]
proto = ["com.google.protobuf.Message", "akka.actor.ProtobufProtocol$MyMessage"]
test = ["akka.serialization.TestSerializble", "akka.serialization.SerializeSpec$PlainMessage"]
}
}
}
@ -56,6 +39,21 @@ object SerializeSpec {
case class Person(name: String, age: Int, address: Address) { def this() = this("", 0, null) }
case class Record(id: Int, person: Person)
class SimpleMessage(s: String) extends TestSerializble
class ExtendedSimpleMessage(s: String, i: Int) extends SimpleMessage(s)
trait AnotherInterface extends TestSerializble
class AnotherMessage extends AnotherInterface
class ExtendedAnotherMessage extends AnotherMessage
class PlainMessage
class ExtendedPlainMessage extends PlainMessage
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
@ -72,7 +70,7 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) {
"have correct bindings" in {
ser.bindings(addr.getClass.getName) must be("java")
ser.bindings("akka.actor.ProtobufProtocol$MyMessage") must be("proto")
ser.bindings(classOf[PlainMessage].getName) must be("test")
}
"serialize Address" in {
@ -145,6 +143,37 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) {
a.shutdown()
}
}
"resove serializer by direct interface" in {
val msg = new SimpleMessage("foo")
ser.serializerFor(msg.getClass).getClass must be(classOf[TestSerializer])
}
"resove serializer by interface implemented by super class" in {
val msg = new ExtendedSimpleMessage("foo", 17)
ser.serializerFor(msg.getClass).getClass must be(classOf[TestSerializer])
}
"resove serializer by indirect interface" in {
val msg = new AnotherMessage
ser.serializerFor(msg.getClass).getClass must be(classOf[TestSerializer])
}
"resove serializer by indirect interface implemented by super class" in {
val msg = new ExtendedAnotherMessage
ser.serializerFor(msg.getClass).getClass must be(classOf[TestSerializer])
}
"resove serializer for message with binding" in {
val msg = new PlainMessage
ser.serializerFor(msg.getClass).getClass must be(classOf[TestSerializer])
}
"resove serializer for message extending class with with binding" in {
val msg = new ExtendedPlainMessage
ser.serializerFor(msg.getClass).getClass must be(classOf[TestSerializer])
}
}
}
@ -158,13 +187,11 @@ object VerifySerializabilitySpec {
serializers {
java = "akka.serialization.JavaSerializer"
proto = "akka.serialization.ProtobufSerializer"
default = "akka.serialization.JavaSerializer"
}
serialization-bindings {
java = ["akka.serialization.SerializeSpec$Address", "akka.serialization.MyJavaSerializableActor", "akka.serialization.MyStatelessActorWithMessagesInMailbox", "akka.serialization.MyActorWithProtobufMessagesInMailbox"]
proto = ["com.google.protobuf.Message", "akka.actor.ProtobufProtocol$MyMessage"]
}
}
}
@ -209,3 +236,20 @@ class VerifySerializabilitySpec extends AkkaSpec(VerifySerializabilitySpec.conf)
system stop a
}
}
trait TestSerializble
class TestSerializer extends Serializer {
def includeManifest: Boolean = false
def identifier = 9999
def toBinary(o: AnyRef): Array[Byte] = {
Array.empty[Byte]
}
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]] = None,
classLoader: Option[ClassLoader] = None): AnyRef = {
null
}
}

View file

@ -5,6 +5,8 @@
*/
package akka.jsr166y;
import akka.util.Unsafe;
import java.io.Serializable;
import java.util.Collection;
import java.util.List;
@ -1521,23 +1523,6 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* @return a sun.misc.Unsafe
*/
private static sun.misc.Unsafe getUnsafe() {
try {
return sun.misc.Unsafe.getUnsafe();
} catch (SecurityException se) {
try {
return java.security.AccessController.doPrivileged
(new java.security
.PrivilegedExceptionAction<sun.misc.Unsafe>() {
public sun.misc.Unsafe run() throws Exception {
java.lang.reflect.Field f = sun.misc
.Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
return (sun.misc.Unsafe) f.get(null);
}});
} catch (java.security.PrivilegedActionException e) {
throw new RuntimeException("Could not initialize intrinsics",
e.getCause());
}
}
return Unsafe.instance;
}
}

View file

@ -7,6 +7,24 @@ package akka
import akka.actor.newUuid
import java.net.{ InetAddress, UnknownHostException }
object AkkaException {
val hostname = try InetAddress.getLocalHost.getHostAddress catch { case e: UnknownHostException "unknown host" }
def toStringWithStackTrace(throwable: Throwable): String = throwable match {
case null "Unknown Throwable: was 'null'"
case ae: AkkaException ae.toLongString
case e "%s:%s\n%s" format (e.getClass.getName, e.getMessage, stackTraceToString(e))
}
def stackTraceToString(throwable: Throwable): String = {
val trace = throwable.getStackTrace
val sb = new StringBuilder
for (i 0 until trace.length)
sb.append("\tat %s\n" format trace(i))
sb.toString
}
}
/**
* Akka base Exception. Each Exception gets:
* <ul>
@ -19,26 +37,12 @@ class AkkaException(message: String = "", cause: Throwable = null) extends Runti
val uuid = "%s_%s".format(AkkaException.hostname, newUuid)
override lazy val toString =
"%s: %s\n[%s]".format(getClass.getName, message, uuid)
"%s:%s\n[%s]".format(getClass.getName, message, uuid)
lazy val toLongString =
"%s: %s\n[%s]\n%s".format(getClass.getName, message, uuid, stackTraceToString)
"%s:%s\n[%s]\n%s".format(getClass.getName, message, uuid, stackTraceToString)
def this(msg: String) = this(msg, null);
def stackTraceToString = {
val trace = getStackTrace
val sb = new StringBuilder
for (i 0 until trace.length)
sb.append("\tat %s\n" format trace(i))
sb.toString
}
}
object AkkaException {
val hostname = try {
InetAddress.getLocalHost.getHostAddress
} catch {
case e: UnknownHostException "unknown"
}
def stackTraceToString = AkkaException.stackTraceToString(this)
}

View file

@ -4,15 +4,21 @@
package akka.routing
import akka.actor._
import java.util.concurrent.atomic.{ AtomicLong, AtomicBoolean }
import java.util.concurrent.TimeUnit
import akka.util.Duration
import akka.util.duration._
import com.typesafe.config.Config
import akka.config.ConfigurationException
import akka.pattern.pipe
import akka.pattern.AskSupport
import com.typesafe.config.Config
import scala.collection.JavaConversions.iterableAsScalaIterable
import java.util.concurrent.atomic.{ AtomicLong, AtomicBoolean }
import java.util.concurrent.TimeUnit
import akka.jsr166y.ThreadLocalRandom
/**
* A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to
* send a message to on (or more) of these actors.
@ -447,23 +453,16 @@ case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil,
}
trait RandomLike { this: RouterConfig
import java.security.SecureRandom
def nrOfInstances: Int
def routees: Iterable[String]
private val random = new ThreadLocal[SecureRandom] {
override def initialValue = SecureRandom.getInstance("SHA1PRNG")
}
def createRoute(props: Props, routeeProvider: RouteeProvider): Route = {
routeeProvider.createAndRegisterRoutees(props, nrOfInstances, routees)
def getNext(): ActorRef = {
val _routees = routeeProvider.routees
_routees(random.get.nextInt(_routees.size))
_routees(ThreadLocalRandom.current.nextInt(_routees.size))
}
{
@ -711,11 +710,13 @@ object ScatterGatherFirstCompletedRouter {
}
/**
* Simple router that broadcasts the message to all routees, and replies with the first response.
* <br>
* <br/>
* You have to defin the 'within: Duration' parameter (f.e: within = 10 seconds).
* <br/>
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means
* that the router should both create new actors and use the 'routees' actor(s).
* In this case the 'nrOfInstances' will be ignored and the 'routees' will be used.
* <br>
* <br/>
* <b>The</b> configuration parameter trumps the constructor arguments. This means that
* if you provide either 'nrOfInstances' or 'routees' during instantiation they will
* be ignored if the router is defined in the configuration file for the actor being used.
@ -727,6 +728,9 @@ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: It
override val resizer: Option[Resizer] = None)
extends RouterConfig with ScatterGatherFirstCompletedLike {
if (within <= Duration.Zero) throw new IllegalArgumentException(
"[within: Duration] can not be zero or negative, was [" + within + "]")
/**
* Constructor that sets nrOfInstances to be created.
* Java API

View file

@ -10,6 +10,8 @@ import scala.util.DynamicVariable
import com.typesafe.config.Config
import akka.config.ConfigurationException
import akka.actor.{ Extension, ActorSystem, ExtendedActorSystem, Address }
import java.util.concurrent.ConcurrentHashMap
import akka.event.Logging
case class NoSerializerFoundException(m: String) extends AkkaException(m)
@ -65,6 +67,7 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
import Serialization._
val settings = new Settings(system.settings.config)
val log = Logging(system, getClass.getName)
/**
* Serializes the given AnyRef/java.lang.Object according to the Serialization configuration
@ -111,10 +114,37 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
}
/**
* Returns the configured Serializer for the given Class, falls back to the Serializer named "default"
* Returns the configured Serializer for the given Class, falls back to the Serializer named "default".
* It traverses interfaces and super classes to find any configured Serializer that match
* the class name.
*/
def serializerFor(clazz: Class[_]): Serializer = //TODO fall back on BestMatchClass THEN default AND memoize the lookups
serializerMap.get(clazz.getName).getOrElse(serializers("default"))
def serializerFor(clazz: Class[_]): Serializer =
if (bindings.isEmpty) {
// quick path to default when no bindings are registered
serializers("default")
} else {
def resolve(c: Class[_]): Option[Serializer] =
serializerMap.get(c.getName) match {
case null
val classes = c.getInterfaces ++ Option(c.getSuperclass)
classes.view map resolve collectFirst { case Some(x) x }
case x Some(x)
}
serializerMap.get(clazz.getName) match {
case null
val ser = resolve(clazz).getOrElse(serializers("default"))
// memorize the lookups for performance
serializerMap.putIfAbsent(clazz.getName, ser) match {
case null
log.debug("Using serializer[{}] for message [{}]", ser.getClass.getName, clazz.getName)
ser
case some some
}
case ser ser
}
}
/**
* Tries to load the specified Serializer by the FQN
@ -146,9 +176,15 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
}
/**
* serializerMap is a Map whose keys = FQN of class that is serializable and values = the FQN of the serializer to be used for that class
* serializerMap is a Map whose keys = FQN of class that is serializable and values is the serializer to be used for that class
*/
lazy val serializerMap: Map[String, Serializer] = bindings mapValues serializers
private lazy val serializerMap: ConcurrentHashMap[String, Serializer] = {
val serializerMap = new ConcurrentHashMap[String, Serializer]
for ((k, v) bindings) {
serializerMap.put(k, serializers(v))
}
serializerMap
}
/**
* Maps from a Serializer Identity (Int) to a Serializer instance (optimization)

View file

@ -397,6 +397,18 @@ When this in done we can run our application directly inside Maven::
Yippee! It is working.
Overriding Configuration Externally
-----------------------------------
The sample project includes an ``application.conf`` file in the resources directory:
.. includecode:: ../../akka-tutorials/akka-tutorial-first/src/main/resources/application.conf
If you uncomment the two lines, you should see a change in performance,
hopefully for the better. It should be noted that overriding only works if a
router type is given, so just uncommenting ``nr-of-instances`` does not work;
see :ref:`routing-java` for more details.
Conclusion
----------

View file

@ -421,6 +421,18 @@ arguments to the JVM on the ``Arguments`` page, for instance to define where :re
Once you finished your run configuration, click ``Run``. You should see the same output in the ``Console`` window.
You can use the same configuration for debugging the application, by choosing ``Run/Debug History`` or just ``Debug As``.
Overriding Configuration Externally
-----------------------------------
The sample project includes an ``application.conf`` file in the resources directory:
.. includecode:: ../../akka-tutorials/akka-tutorial-first/src/main/resources/application.conf
If you uncomment the two lines, you should see a change in performance,
hopefully for the better. It should be noted that overriding only works if a
router type is given, so just uncommenting ``nr-of-instances`` does not work;
see :ref:`routing-java` for more details.
Conclusion
----------

View file

@ -442,6 +442,18 @@ When this in done we can run our application directly inside SBT::
Yippee! It is working.
Overriding Configuration Externally
===================================
The sample project includes an ``application.conf`` file in the resources directory:
.. includecode:: ../../akka-tutorials/akka-tutorial-first/src/main/resources/application.conf
If you uncomment the two lines, you should see a change in performance,
hopefully for the better. It should be noted that overriding only works if a
router type is given, so just uncommenting ``nr-of-instances`` does not work;
see :ref:`routing-scala` for more details.
Conclusion
==========

View file

@ -68,4 +68,18 @@ public class RouterViaProgramExample {
router3.tell(new ExampleActor.Message(i));
}
}
private class CompileCheckJavaDocsForRouting extends UntypedActor {
@Override
public void onReceive(Object o) {
//#reply-with-parent
getSender().tell("reply", getContext().parent()); // replies go to router
//#reply-with-parent
//#reply-with-self
getSender().tell("reply", getSelf()); // replies go to this actor
//#reply-with-self
}
}
}

View file

@ -91,14 +91,17 @@ public class SerializationDocTestBase {
serializers {
default = "akka.serialization.JavaSerializer"
java = "akka.serialization.JavaSerializer"
proto = "akka.serialization.ProtobufSerializer"
myown = "akka.docs.serialization.MyOwnSerializer"
}
serialization-bindings {
java = ["java.lang.String",
"app.my.Customer"]
proto = ["com.google.protobuf.Message"]
myown = ["my.own.BusinessObject",
"something.equally.Awesome",
"akka.docs.serialization.MyOwnSerializable"
"java.lang.Boolean"]
}
}

View file

@ -8,10 +8,10 @@ package akka.docs.transactor;
import akka.actor.*;
import akka.transactor.*;
import scala.concurrent.stm.Ref;
import scala.concurrent.stm.japi.Stm;
import scala.concurrent.stm.japi.STM;
public class CoordinatedCounter extends UntypedActor {
private Ref.View<Integer> count = Stm.newRef(0);
private Ref.View<Integer> count = STM.newRef(0);
public void onReceive(Object incoming) throws Exception {
if (incoming instanceof Coordinated) {
@ -24,7 +24,7 @@ public class CoordinatedCounter extends UntypedActor {
}
coordinated.atomic(new Runnable() {
public void run() {
Stm.increment(count, 1);
STM.increment(count, 1);
}
});
}

View file

@ -7,14 +7,14 @@ package akka.docs.transactor;
//#class
import akka.transactor.*;
import scala.concurrent.stm.Ref;
import scala.concurrent.stm.japi.Stm;
import scala.concurrent.stm.japi.STM;
public class Counter extends UntypedTransactor {
Ref.View<Integer> count = Stm.newRef(0);
Ref.View<Integer> count = STM.newRef(0);
public void atomically(Object message) {
if (message instanceof Increment) {
Stm.increment(count, 1);
STM.increment(count, 1);
}
}

View file

@ -9,10 +9,10 @@ import akka.actor.*;
import akka.transactor.*;
import java.util.Set;
import scala.concurrent.stm.Ref;
import scala.concurrent.stm.japi.Stm;
import scala.concurrent.stm.japi.STM;
public class FriendlyCounter extends UntypedTransactor {
Ref.View<Integer> count = Stm.newRef(0);
Ref.View<Integer> count = STM.newRef(0);
@Override public Set<SendTo> coordinate(Object message) {
if (message instanceof Increment) {
@ -25,7 +25,7 @@ public class FriendlyCounter extends UntypedTransactor {
public void atomically(Object message) {
if (message instanceof Increment) {
Stm.increment(count, 1);
STM.increment(count, 1);
}
}

View file

@ -18,7 +18,7 @@ Execution Contexts
------------------
In order to execute callbacks and operations, Futures need something called an ``ExecutionContext``,
which is very similar to a `java.util.concurrent.Executor``. if you have an ``ActorSystem`` in scope,
which is very similar to a ``java.util.concurrent.Executor``. if you have an ``ActorSystem`` in scope,
it will use its default dispatcher as the ``ExecutionContext``, or you can use the factory methods provided
by the ``ExecutionContexts`` class to wrap ``Executors`` and ``ExecutorServices``, or even create your own.
@ -199,7 +199,7 @@ which allows for ordering like in the following sample:
Auxiliary methods
-----------------
``Future`` ``fallbackTo`` combines 2 Futures into a new ``Future``, and will hold the successful value of the second ``Future`
``Future`` ``fallbackTo`` combines 2 Futures into a new ``Future``, and will hold the successful value of the second ``Future``
if the first ``Future`` fails.
.. includecode:: code/akka/docs/future/FutureDocTestBase.java
@ -230,7 +230,7 @@ our ``Future`` would have a result of 0. The ``recover`` method works very simil
so multiple ``Exception``\s can be handled in this manner, and if an ``Exception`` is not handled this way
it will behave as if we hadn't used the ``recover`` method.
You can also use the ``recoverWith`` method, which has the same relationship to ``recover`` as ``flatMap` has to ``map``,
You can also use the ``recoverWith`` method, which has the same relationship to ``recover`` as ``flatMap`` has to ``map``,
and is use like this:
.. includecode:: code/akka/docs/future/FutureDocTestBase.java

View file

@ -25,7 +25,7 @@ is really easy to create your own. The routers shipped with Akka are:
* ``akka.routing.BroadcastRouter``
* ``akka.routing.ScatterGatherFirstCompletedRouter``
Routers Explained
Routers In Action
^^^^^^^^^^^^^^^^^
This is an example of how to create a router that is defined in configuration:
@ -45,8 +45,11 @@ You can also give the router already created routees as in:
When you create a router programatically you define the number of routees *or* you pass already created routees to it.
If you send both parameters to the router *only* the latter will be used, i.e. ``nrOfInstances`` is disregarded.
*It is also worth pointing out that if you define the ``router`` in the configuration file then this value will be used
instead of any programmatically sent parameters.*
*It is also worth pointing out that if you define the ``router`` in the
configuration file then this value will be used instead of any programmatically
sent parameters. The decision whether to create a router at all, on the other
hand, must be taken within the code, i.e. you cannot make something a router by
external configuration alone (see below for details).*
Once you have the router actor it is just to send messages to it as you would to any actor:
@ -56,6 +59,44 @@ Once you have the router actor it is just to send messages to it as you would to
The router will apply its behavior to the message it receives and forward it to the routees.
How Routing is Designed within Akka
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Routers behave like single actors, but they should also not hinder scalability.
This apparent contradiction is solved by making routers be represented by a
special :class:`RoutedActorRef`, which dispatches incoming messages destined
for the routees without actually invoking the router actors behavior (and thus
avoiding its mailbox; the single router actors task is to manage all aspects
related to the lifecycle of the routees). This means that the code which decides
which route to take is invoked concurrently from all possible senders and hence
must be thread-safe, it cannot live the simple and happy life of code within an
actor.
There is one part in the above paragraph which warrants some more background
explanation: Why does a router need a “head” which is actual parent to all the
routees? The initial design tried to side-step this issue, but location
transparency as well as mandatory parental supervision required a redesign.
Each of the actors which the router spawns must have its unique identity, which
translates into a unique actor path. Since the router has only one given name
in its parents context, another level in the name space is needed, which
according to the addressing semantics implies the existence of an actor with
the routers name. This is not only necessary for the internal messaging
involved in creating, restarting and terminating actors, it is also needed when
the pooled actors need to converse with other actors and receive replies in a
deterministic fashion. Since each actor knows its own external representation
as well as that of its parent, the routees decide where replies should be sent
when reacting to a message:
.. includecode:: code/akka/docs/jrouting/RouterViaProgramExample.java#reply-with-parent
.. includecode:: code/akka/docs/jrouting/RouterViaProgramExample.java#reply-with-self
It is apparent now why routing needs to be enabled in code rather than being
possible to “bolt on” later: whether or not an actor is routed means a change
to the actor hierarchy, changing the actor paths of all children of the router.
The routees especially do need to know that they are routed to in order to
choose the sender reference for any messages they dispatch as shown above.
Router usage
^^^^^^^^^^^^

View file

@ -39,8 +39,26 @@ should be serialized using which ``Serializer``, this is done in the "akka.actor
.. note::
Akka currently only checks for absolute equality of Classes, i.e. it does not yet check ``isAssignableFrom``,
this means that you'll need to list the specific classes.
You only need to specify the name of an interface or abstract base class if the messages implements
that. E.g. ``com.google.protobuf.Message`` for protobuf serialization.
Protobuf
--------
Akka provides a ``Serializer`` for `protobuf <http://code.google.com/p/protobuf/>`_ messages.
To use that you need to add the following to the configuration::
akka {
actor {
serializers {
proto = "akka.serialization.ProtobufSerializer"
}
serialization-bindings {
proto = ["com.google.protobuf.Message"]
}
}
}
Verification
------------
@ -74,6 +92,7 @@ here's some examples:
For more information, have a look at the ``ScalaDoc`` for ``akka.serialization._``
Customization
=============

View file

@ -609,11 +609,11 @@ Java
For Java there is a special helper object with Java-friendly methods::
import scala.concurrent.stm.japi.Stm;
import scala.concurrent.stm.japi.STM;
These methods can also be statically imported::
import static scala.concurrent.stm.japi.Stm.*;
import static scala.concurrent.stm.japi.STM.*;
Other imports that are needed are in the stm package, particularly ``Ref``::
@ -671,7 +671,7 @@ v1.3::
v2.0::
import static scala.concurrent.stm.japi.Stm.atomic;
import static scala.concurrent.stm.japi.STM.atomic;
import java.util.concurrent.Callable;
atomic(new Runnable() {
@ -756,7 +756,7 @@ Java
As ``Ref.View`` in ScalaSTM does not require implicit transactions, this is more
easily used from Java. ``Ref`` could be used, but requires explicit threading of
transactions. There are helper methods in ``japi.Stm`` for creating ``Ref.View``
transactions. There are helper methods in ``japi.STM`` for creating ``Ref.View``
references.
v1.3::
@ -765,7 +765,7 @@ v1.3::
v2.0::
Ref.View<Integer> ref = Stm.newRef(0);
Ref.View<Integer> ref = STM.newRef(0);
The ``set`` and ``get`` methods work the same way for both versions.
@ -780,7 +780,7 @@ v2.0::
ref.set(1); // set new value
There are also ``transform``, ``getAndTransform``, and ``transformAndGet``
methods in ``japi.Stm`` which accept ``scala.runtime.AbstractFunction1``.
methods in ``japi.STM`` which accept ``japi.STM.Transformer`` objects.
There are ``increment`` helper methods for ``Ref.View<Integer>`` and
``Ref.View<Long>`` references.
@ -821,7 +821,7 @@ Java
Rather than using the ``deferred`` and ``compensating`` methods in
``akka.stm.StmUtils``, use the ``afterCommit`` and ``afterRollback`` methods in
``scala.concurrent.stm.japi.Stm``, which behave in the same way and accept
``scala.concurrent.stm.japi.STM``, which behave in the same way and accept
``Runnable``.
Transactional Datastructures
@ -830,12 +830,12 @@ Transactional Datastructures
In ScalaSTM see ``TMap``, ``TSet``, and ``TArray`` for transactional
datastructures.
There are helper methods for creating these from Java in ``japi.Stm``:
There are helper methods for creating these from Java in ``japi.STM``:
``newTMap``, ``newTSet``, and ``newTArray``. These datastructures implement the
``scala.collection`` interfaces and can also be used from Java with Scala's
``JavaConversions``. There are helper methods that apply the conversions,
returning ``java.util`` ``Map``, ``Set``, and ``List``: ``newMap``, ``newSet``,
and ``newList``.
and ``newArrayAsList``.
More to be written

View file

@ -11,7 +11,7 @@ import akka.event.Logging
//#imports1
import akka.dispatch.Future
import akka.actor.ActorSystem
import akka.actor.{ ActorRef, ActorSystem }
import org.scalatest.{ BeforeAndAfterAll, WordSpec }
import org.scalatest.matchers.MustMatchers
import akka.testkit._
@ -356,4 +356,29 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
//#ask-pipeTo
}
"replying with own or other sender" in {
val actor = system.actorOf(Props(new Actor {
def receive = {
case ref: ActorRef
//#reply-with-sender
sender.tell("reply", context.parent) // replies will go back to parent
sender.!("reply")(context.parent) // alternative syntax (beware of the parens!)
//#reply-with-sender
case x
//#reply-without-sender
sender ! x // replies will go to this actor
//#reply-without-sender
}
}))
implicit val me = testActor
actor ! 42
expectMsg(42)
lastSender must be === actor
actor ! me
expectMsg("reply")
lastSender must be === system.actorFor("/user")
expectMsg("reply")
lastSender must be === system.actorFor("/user")
}
}

View file

@ -104,14 +104,17 @@ class SerializationDocSpec extends AkkaSpec {
serializers {
default = "akka.serialization.JavaSerializer"
java = "akka.serialization.JavaSerializer"
proto = "akka.serialization.ProtobufSerializer"
myown = "akka.docs.serialization.MyOwnSerializer"
}
serialization-bindings {
java = ["java.lang.String",
"app.my.Customer"]
proto = ["com.google.protobuf.Message"]
myown = ["my.own.BusinessObject",
"something.equally.Awesome",
"akka.docs.serialization.MyOwnSerializable"
"java.lang.Boolean"]
}
}

View file

@ -22,7 +22,7 @@ which is very similar to a `java.util.concurrent.Executor``. if you have an ``Ac
it will use its default dispatcher as the ``ExecutionContext``, or you can use the factory methods provided
by the ``ExecutionContext`` companion object to wrap ``Executors`` and ``ExecutorServices``, or even create your own.
Use with Actors
Use With Actors
---------------
There are generally two ways of getting a reply from an ``Actor``: the first is by a sent message (``actor ! msg``),
@ -105,6 +105,9 @@ but if 2 or more ``Future``\s are involved ``map`` will not allow you to combine
.. includecode:: code/akka/docs/future/FutureDocSpec.scala
:include: flat-map
Composing futures using nested combinators it can sometimes become quite complicated and hard read, in these cases using Scala's
'for comprehensions' usually yields more readable code. See next section for examples.
If you need to do conditional propagation, you can use ``filter``:
.. includecode:: code/akka/docs/future/FutureDocSpec.scala
@ -113,7 +116,7 @@ If you need to do conditional propagation, you can use ``filter``:
For Comprehensions
^^^^^^^^^^^^^^^^^^
Since ``Future`` has a ``map``, ``filter` and ``flatMap`` method it can be easily used in a 'for comprehension':
Since ``Future`` has a ``map``, ``filter`` and ``flatMap`` method it can be easily used in a 'for comprehension':
.. includecode:: code/akka/docs/future/FutureDocSpec.scala
:include: for-comprehension
@ -188,7 +191,7 @@ as the start-value, you can use ``reduce``, it works like this:
.. includecode:: code/akka/docs/future/FutureDocSpec.scala
:include: reduce
Same as with ``fold``, the execution will be done asynchronously when the last of the Future is completed,`
Same as with ``fold``, the execution will be done asynchronously when the last of the Future is completed,
you can also parallelize it by chunking your futures into sub-sequences and reduce them, and then reduce the reduced results again.
Callbacks
@ -206,22 +209,22 @@ For this Akka supports ``onComplete``, ``onSuccess`` and ``onFailure``, of which
.. includecode:: code/akka/docs/future/FutureDocSpec.scala
:include: onComplete
Ordering
--------
Define Ordering
---------------
Since callbacks are executed in any order and potentially in parallel,
it can be tricky at the times when you need sequential ordering of operations.
But there's a solution! And it's name is ``andThen``, and it creates a new Future with
the specified callback, a Future that will have the same result as the Future it's called on,
But there's a solution and it's name is ``andThen``. It creates a new ``Future`` with
the specified callback, a ``Future`` that will have the same result as the ``Future`` it's called on,
which allows for ordering like in the following sample:
.. includecode:: code/akka/docs/future/FutureDocSpec.scala
:include: and-then
Auxiliary methods
Auxiliary Methods
-----------------
``Future`` ``fallbackTo`` combines 2 Futures into a new ``Future``, and will hold the successful value of the second ``Future`
``Future`` ``fallbackTo`` combines 2 Futures into a new ``Future``, and will hold the successful value of the second ``Future``
if the first ``Future`` fails.
.. includecode:: code/akka/docs/future/FutureDocSpec.scala
@ -252,7 +255,7 @@ our ``Future`` would have a result of 0. The ``recover`` method works very simil
so multiple ``Exception``\s can be handled in this manner, and if an ``Exception`` is not handled this way
it will behave as if we hadn't used the ``recover`` method.
You can also use the ``recoverWith`` method, which has the same relationship to ``recover`` as ``flatMap` has to ``map``,
You can also use the ``recoverWith`` method, which has the same relationship to ``recover`` as ``flatMap`` has to ``map``,
and is use like this:
.. includecode:: code/akka/docs/future/FutureDocSpec.scala

View file

@ -25,7 +25,7 @@ is really easy to create your own. The routers shipped with Akka are:
* ``akka.routing.BroadcastRouter``
* ``akka.routing.ScatterGatherFirstCompletedRouter``
Routers Explained
Routers In Action
^^^^^^^^^^^^^^^^^
This is an example of how to create a router that is defined in configuration:
@ -45,8 +45,11 @@ You can also give the router already created routees as in:
When you create a router programatically you define the number of routees *or* you pass already created routees to it.
If you send both parameters to the router *only* the latter will be used, i.e. ``nrOfInstances`` is disregarded.
*It is also worth pointing out that if you define the ``router`` in the configuration file then this value will be used
instead of any programmatically sent parameters.*
*It is also worth pointing out that if you define the ``router`` in the
configuration file then this value will be used instead of any programmatically
sent parameters. The decision whether to create a router at all, on the other
hand, must be taken within the code, i.e. you cannot make something a router by
external configuration alone (see below for details).*
Once you have the router actor it is just to send messages to it as you would to any actor:
@ -56,6 +59,44 @@ Once you have the router actor it is just to send messages to it as you would to
The router will apply its behavior to the message it receives and forward it to the routees.
How Routing is Designed within Akka
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Routers behave like single actors, but they should also not hinder scalability.
This apparent contradiction is solved by making routers be represented by a
special :class:`RoutedActorRef`, which dispatches incoming messages destined
for the routees without actually invoking the router actors behavior (and thus
avoiding its mailbox; the single router actors task is to manage all aspects
related to the lifecycle of the routees). This means that the code which decides
which route to take is invoked concurrently from all possible senders and hence
must be thread-safe, it cannot live the simple and happy life of code within an
actor.
There is one part in the above paragraph which warrants some more background
explanation: Why does a router need a “head” which is actual parent to all the
routees? The initial design tried to side-step this issue, but location
transparency as well as mandatory parental supervision required a redesign.
Each of the actors which the router spawns must have its unique identity, which
translates into a unique actor path. Since the router has only one given name
in its parents context, another level in the name space is needed, which
according to the addressing semantics implies the existence of an actor with
the routers name. This is not only necessary for the internal messaging
involved in creating, restarting and terminating actors, it is also needed when
the pooled actors need to converse with other actors and receive replies in a
deterministic fashion. Since each actor knows its own external representation
as well as that of its parent, the routees decide where replies should be sent
when reacting to a message:
.. includecode:: code/akka/docs/actor/ActorDocSpec.scala#reply-with-sender
.. includecode:: code/akka/docs/actor/ActorDocSpec.scala#reply-without-sender
It is apparent now why routing needs to be enabled in code rather than being
possible to “bolt on” later: whether or not an actor is routed means a change
to the actor hierarchy, changing the actor paths of all children of the router.
The routees especially do need to know that they are routed to in order to
choose the sender reference for any messages they dispatch as shown above.
Router usage
^^^^^^^^^^^^

View file

@ -39,8 +39,26 @@ should be serialized using which ``Serializer``, this is done in the "akka.actor
.. note::
Akka currently only checks for absolute equality of Classes, i.e. it does not yet check ``isAssignableFrom``,
this means that you'll need to list the specific classes.
You only need to specify the name of an interface or abstract base class if the messages implements
that. E.g. ``com.google.protobuf.Message`` for protobuf serialization.
Protobuf
--------
Akka provides a ``Serializer`` for `protobuf <http://code.google.com/p/protobuf/>`_ messages.
To use that you need to add the following to the configuration::
akka {
actor {
serializers {
proto = "akka.serialization.ProtobufSerializer"
}
serialization-bindings {
proto = ["com.google.protobuf.Message"]
}
}
}
Verification
------------

View file

@ -32,11 +32,7 @@ case class RemoteClientError(
@BeanProperty remoteAddress: Address) extends RemoteClientLifeCycleEvent {
override def logLevel = Logging.ErrorLevel
override def toString =
"RemoteClientError@" +
remoteAddress +
": Error[" +
(if (cause ne null) cause.getClass.getName + ": " + cause.getMessage else "unknown") +
"]"
"RemoteClientError@" + remoteAddress + ": Error[" + AkkaException.toStringWithStackTrace(cause) + "]"
}
case class RemoteClientDisconnected(
@ -78,13 +74,9 @@ case class RemoteClientWriteFailed(
@BeanProperty remoteAddress: Address) extends RemoteClientLifeCycleEvent {
override def logLevel = Logging.WarningLevel
override def toString =
"RemoteClientWriteFailed@" +
remoteAddress +
": MessageClass[" +
(if (request ne null) request.getClass.getName else "no message") +
"] Error[" +
(if (cause ne null) cause.getClass.getName + ": " + cause.getMessage else "unknown") +
"]"
"RemoteClientWriteFailed@" + remoteAddress +
": MessageClass[" + (if (request ne null) request.getClass.getName else "no message") +
"] Error[" + AkkaException.toStringWithStackTrace(cause) + "]"
}
/**
@ -111,11 +103,7 @@ case class RemoteServerError(
@BeanProperty remote: RemoteTransport) extends RemoteServerLifeCycleEvent {
override def logLevel = Logging.ErrorLevel
override def toString =
"RemoteServerError@" +
remote +
": Error[" +
(if (cause ne null) cause.getClass.getName + ": " + cause.getMessage else "unknown") +
"]"
"RemoteServerError@" + remote + "] Error[" + AkkaException.toStringWithStackTrace(cause) + "]"
}
case class RemoteServerClientConnected(
@ -123,11 +111,8 @@ case class RemoteServerClientConnected(
@BeanProperty val clientAddress: Option[Address]) extends RemoteServerLifeCycleEvent {
override def logLevel = Logging.DebugLevel
override def toString =
"RemoteServerClientConnected@" +
remote +
": Client[" +
(if (clientAddress.isDefined) clientAddress.get else "no address") +
"]"
"RemoteServerClientConnected@" + remote +
": Client[" + clientAddress.getOrElse("no address") + "]"
}
case class RemoteServerClientDisconnected(
@ -135,11 +120,8 @@ case class RemoteServerClientDisconnected(
@BeanProperty val clientAddress: Option[Address]) extends RemoteServerLifeCycleEvent {
override def logLevel = Logging.DebugLevel
override def toString =
"RemoteServerClientDisconnected@" +
remote +
": Client[" +
(if (clientAddress.isDefined) clientAddress.get else "no address") +
"]"
"RemoteServerClientDisconnected@" + remote +
": Client[" + clientAddress.getOrElse("no address") + "]"
}
case class RemoteServerClientClosed(
@ -147,11 +129,8 @@ case class RemoteServerClientClosed(
@BeanProperty val clientAddress: Option[Address]) extends RemoteServerLifeCycleEvent {
override def logLevel = Logging.DebugLevel
override def toString =
"RemoteServerClientClosed@" +
remote +
": Client[" +
(if (clientAddress.isDefined) clientAddress.get else "no address") +
"]"
"RemoteServerClientClosed@" + remote +
": Client[" + clientAddress.getOrElse("no address") + "]"
}
case class RemoteServerWriteFailed(
@ -161,15 +140,10 @@ case class RemoteServerWriteFailed(
@BeanProperty remoteAddress: Option[Address]) extends RemoteServerLifeCycleEvent {
override def logLevel = Logging.WarningLevel
override def toString =
"RemoteServerWriteFailed@" +
remote +
": ClientAddress[" +
remoteAddress +
"] MessageClass[" +
(if (request ne null) request.getClass.getName else "no message") +
"] Error[" +
(if (cause ne null) cause.getClass.getName + ": " + cause.getMessage else "unknown") +
"]"
"RemoteServerWriteFailed@" + remote +
": ClientAddress[" + remoteAddress +
"] MessageClass[" + (if (request ne null) request.getClass.getName else "no message") +
"] Error[" + AkkaException.toStringWithStackTrace(cause) + "]"
}
/**

View file

@ -6,23 +6,24 @@ trait AbstractRemoteActorMultiJvmSpec {
def NrOfNodes: Int
def commonConfig: Config
def PortRangeStart = 1990
def NodeRange = 1 to NrOfNodes
def PortRange = PortRangeStart to NrOfNodes
private[this] val remotes: IndexedSeq[String] = {
val nodesOpt = Option(AkkaRemoteSpec.testNodes).map(_.split(",").toIndexedSeq)
nodesOpt getOrElse IndexedSeq.fill(NrOfNodes)("localhost")
}
def akkaSpec(idx: Int) = "AkkaRemoteSpec@%s:%d".format(remotes(idx), 9991+idx)
def akkaURIs(count: Int): String = {
0 until count map {idx => "\"akka://" + akkaSpec(idx) + "\""} mkString ","
}
val nodeConfigs = ((1 to NrOfNodes).toList zip remotes) map {
case (idx, host) =>
val nodeConfigs = (NodeRange.toList zip remotes) map {
case (port, host) =>
ConfigFactory.parseString("""
akka {
remote.netty.hostname="%s"
remote.netty.port = "%d"
}""".format(host, 9990+idx, idx)) withFallback commonConfig
}""".format(host, PortRangeStart + port, port)) withFallback commonConfig
}
def akkaSpec(port: Int) = "AkkaRemoteSpec@%s:%d".format(remotes(port), PortRangeStart + 1 + port)
def akkaURIs(count: Int): String = 0 until count map {idx => "\"akka://" + akkaSpec(idx) + "\""} mkString ","
}

View file

@ -30,6 +30,4 @@ object AkkaRemoteSpec {
abstract class AkkaRemoteSpec(config: Config)
extends AkkaSpec(config.withFallback(AkkaRemoteSpec.testConf))
with MultiJvmSync {
}
with MultiJvmSync

View file

@ -76,7 +76,7 @@ class RandomRoutedRemoteActorMultiJvmNode4 extends AkkaRemoteSpec(RandomRoutedRe
"be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" in {
barrier("start")
val actor = system.actorOf(Props[SomeActor].withRouter(RoundRobinRouter()), "service-hello")
val actor = system.actorOf(Props[SomeActor].withRouter(RandomRouter()), "service-hello")
actor.isInstanceOf[RoutedActorRef] must be(true)
val connectionCount = NrOfNodes - 1

View file

@ -76,7 +76,7 @@ class ScatterGatherRoutedRemoteActorMultiJvmNode4 extends AkkaRemoteSpec(Scatter
"be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" in {
barrier("start")
val actor = system.actorOf(Props[SomeActor].withRouter(RoundRobinRouter()), "service-hello")
val actor = system.actorOf(Props[SomeActor].withRouter(ScatterGatherFirstCompletedRouter(within = 10 seconds)), "service-hello")
actor.isInstanceOf[RoutedActorRef] must be(true)
val connectionCount = NrOfNodes - 1

View file

@ -25,7 +25,8 @@ object ZkClient extends Watcher {
private def waitForServer() {
// SI-1672
val r = try {
zk.exists("/", false); true
zk.exists("/", false)
true
} catch {
case _: KeeperException.ConnectionLossException =>
Thread.sleep(10000)
@ -35,9 +36,7 @@ object ZkClient extends Watcher {
}
waitForServer()
try {
zk.create(root, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)
} catch {
try zk.create(root, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT) catch {
case _: KeeperException.NodeExistsException =>
}
@ -46,9 +45,7 @@ object ZkClient extends Watcher {
private def block(num: Int) {
val start = System.currentTimeMillis
while (true) {
if (System.currentTimeMillis - start > timeoutMs)
throw new InterruptedException("Timed out blocking in zk")
if (System.currentTimeMillis - start > timeoutMs) throw new InterruptedException("Timed out blocking in zk")
ZkClient.this.synchronized {
val children = zk.getChildren(root, true)
if (children.size < num) {
@ -60,16 +57,12 @@ object ZkClient extends Watcher {
}
def enter() {
zk.create(root + "/" + name, Array[Byte](), Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL)
zk.create(root + "/" + name, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)
block(count)
}
final def leave() {
zk.create(root + "/" + name + ".leave", Array[Byte](), Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL)
zk.create(root + "/" + name + ".leave", Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)
block(2*count)
}
}

View file

@ -1,147 +0,0 @@
/* scala-stm - (c) 2009-2011, Stanford University, PPL */
package scala.concurrent.stm.japi
import java.util.concurrent.Callable
import java.util.{ List JList, Map JMap, Set JSet }
import scala.collection.JavaConversions
import scala.concurrent.stm
import scala.concurrent.stm._
import scala.runtime.AbstractFunction1
/**
* Java-friendly API for ScalaSTM.
* These methods can also be statically imported.
*/
object Stm {
/**
* Create a Ref with an initial value. Return a `Ref.View`, which does not
* require implicit transactions.
* @param initialValue the initial value for the newly created `Ref.View`
* @return a new `Ref.View`
*/
def newRef[A](initialValue: A): Ref.View[A] = Ref(initialValue).single
/**
* Create an empty TMap. Return a `TMap.View`, which does not require
* implicit transactions. See newMap for included java conversion.
* @return a new, empty `TMap.View`
*/
def newTMap[A, B](): TMap.View[A, B] = TMap.empty[A, B].single
/**
* Create an empty TMap. Return a `java.util.Map` view of this TMap.
* @return a new, empty `TMap.View` wrapped as a `java.util.Map`.
*/
def newMap[A, B](): JMap[A, B] = JavaConversions.mutableMapAsJavaMap(newTMap[A, B])
/**
* Create an empty TSet. Return a `TSet.View`, which does not require
* implicit transactions. See newSet for included java conversion.
* @return a new, empty `TSet.View`
*/
def newTSet[A](): TSet.View[A] = TSet.empty[A].single
/**
* Create an empty TSet. Return a `java.util.Set` view of this TSet.
* @return a new, empty `TSet.View` wrapped as a `java.util.Set`.
*/
def newSet[A](): JSet[A] = JavaConversions.mutableSetAsJavaSet(newTSet[A])
/**
* Create a TArray containing `length` elements. Return a `TArray.View`,
* which does not require implicit transactions. See newList for included
* java conversion.
* @param length the length of the `TArray.View` to be created
* @return a new `TArray.View` containing `length` elements (initially null)
*/
def newTArray[A <: AnyRef](length: Int): TArray.View[A] = TArray.ofDim[A](length)(ClassManifest.classType(AnyRef.getClass)).single
/**
* Create an empty TArray. Return a `java.util.List` view of this Array.
* @param length the length of the `TArray.View` to be created
* @return a new, empty `TArray.View` wrapped as a `java.util.List`.
*/
def newList[A <: AnyRef](length: Int): JList[A] = JavaConversions.mutableSeqAsJavaList(newTArray[A](length))
/**
* Atomic block that takes a `Runnable`.
* @param runnable the `Runnable` to run within a transaction
*/
def atomic(runnable: Runnable): Unit = stm.atomic { txn runnable.run }
/**
* Atomic block that takes a `Callable`.
* @param callable the `Callable` to run within a transaction
* @return the value returned by the `Callable`
*/
def atomic[A](callable: Callable[A]): A = stm.atomic { txn callable.call }
/**
* Transform the value stored by `ref` by applying the function `f`.
* @param ref the `Ref.View` to be transformed
* @param f the function to be applied
*/
def transform[A](ref: Ref.View[A], f: AbstractFunction1[A, A]): Unit = ref.transform(f)
/**
* Transform the value stored by `ref` by applying the function `f` and
* return the old value.
* @param ref the `Ref.View` to be transformed
* @param f the function to be applied
* @return the old value of `ref`
*/
def getAndTransform[A](ref: Ref.View[A], f: AbstractFunction1[A, A]): A = ref.getAndTransform(f)
/**
* Transform the value stored by `ref` by applying the function `f` and
* return the new value.
* @param ref the `Ref.View` to be transformed
* @param f the function to be applied
* @return the new value of `ref`
*/
def transformAndGet[A](ref: Ref.View[A], f: AbstractFunction1[A, A]): A = ref.transformAndGet(f)
/**
* Increment the `java.lang.Integer` value of a `Ref.View`.
* @param ref the `Ref.View<Integer>` to be incremented
* @param delta the amount to increment
*/
def increment(ref: Ref.View[java.lang.Integer], delta: Int): Unit = ref.transform { v v.intValue + delta }
/**
* Increment the `java.lang.Long` value of a `Ref.View`.
* @param ref the `Ref.View<Long>` to be incremented
* @param delta the amount to increment
*/
def increment(ref: Ref.View[java.lang.Long], delta: Long): Unit = ref.transform { v v.longValue + delta }
/**
* Add a task to run after the current transaction has committed.
* @param task the `Runnable` task to run after transaction commit
*/
def afterCommit(task: Runnable): Unit = {
val txn = Txn.findCurrent
if (txn.isDefined) Txn.afterCommit(status task.run)(txn.get)
}
/**
* Add a task to run after the current transaction has rolled back.
* @param task the `Runnable` task to run after transaction rollback
*/
def afterRollback(task: Runnable): Unit = {
val txn = Txn.findCurrent
if (txn.isDefined) Txn.afterRollback(status task.run)(txn.get)
}
/**
* Add a task to run after the current transaction has either rolled back
* or committed.
* @param task the `Runnable` task to run after transaction completion
*/
def afterCompletion(task: Runnable): Unit = {
val txn = Txn.findCurrent
if (txn.isDefined) Txn.afterCompletion(status task.run)(txn.get)
}
}

View file

@ -8,14 +8,14 @@ import akka.actor.ActorRef;
import akka.actor.Actors;
import akka.actor.UntypedActor;
import scala.concurrent.stm.Ref;
import scala.concurrent.stm.japi.Stm;
import scala.concurrent.stm.japi.STM;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class UntypedCoordinatedCounter extends UntypedActor {
private String name;
private Ref.View<Integer> count = Stm.newRef(0);
private Ref.View<Integer> count = STM.newRef(0);
public UntypedCoordinatedCounter(String name) {
this.name = name;
@ -40,8 +40,8 @@ public class UntypedCoordinatedCounter extends UntypedActor {
}
coordinated.atomic(new Runnable() {
public void run() {
Stm.increment(count, 1);
Stm.afterCompletion(countDown);
STM.increment(count, 1);
STM.afterCompletion(countDown);
}
});
}

View file

@ -8,7 +8,7 @@ import akka.actor.ActorRef;
import akka.transactor.UntypedTransactor;
import akka.transactor.SendTo;
import scala.concurrent.stm.Ref;
import scala.concurrent.stm.japi.Stm;
import scala.concurrent.stm.japi.STM;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
@ -16,7 +16,7 @@ import java.util.concurrent.TimeUnit;
public class UntypedCounter extends UntypedTransactor {
private String name;
private Ref.View<Integer> count = Stm.newRef(0);
private Ref.View<Integer> count = STM.newRef(0);
public UntypedCounter(String name) {
this.name = name;
@ -39,14 +39,14 @@ public class UntypedCounter extends UntypedTransactor {
public void atomically(Object message) {
if (message instanceof Increment) {
Stm.increment(count, 1);
STM.increment(count, 1);
final Increment increment = (Increment) message;
Runnable countDown = new Runnable() {
public void run() {
increment.getLatch().countDown();
}
};
Stm.afterCompletion(countDown);
STM.afterCompletion(countDown);
}
}

View file

@ -1,156 +0,0 @@
/* scala-stm - (c) 2009-2011, Stanford University, PPL */
package scala.concurrent.stm;
import static org.junit.Assert.*;
import org.junit.Test;
import scala.concurrent.stm.japi.Stm;
import static scala.concurrent.stm.japi.Stm.*;
import scala.runtime.AbstractFunction1;
import java.util.concurrent.Callable;
import java.util.Map;
import java.util.Set;
import java.util.List;
public class JavaAPITests {
@Test
public void createIntegerRef() {
Ref.View<Integer> ref = newRef(0);
int unboxed = ref.get();
assertEquals(0, unboxed);
}
@Test
public void atomicWithRunnable() {
final Ref.View<Integer> ref = newRef(0);
atomic(new Runnable() {
public void run() {
ref.set(10);
}
});
int value = ref.get();
assertEquals(10, value);
}
@Test
public void atomicWithCallable() {
final Ref.View<Integer> ref = newRef(0);
int oldValue = atomic(new Callable<Integer>() {
public Integer call() {
return ref.swap(10);
}
});
assertEquals(0, oldValue);
int newValue = ref.get();
assertEquals(10, newValue);
}
@Test(expected = TestException.class)
public void failingTransaction() {
final Ref.View<Integer> ref = newRef(0);
try {
atomic(new Runnable() {
public void run() {
ref.set(10);
throw new TestException();
}
});
} catch (TestException e) {
int value = ref.get();
assertEquals(0, value);
throw e;
}
}
@Test
public void transformInteger() {
Ref.View<Integer> ref = newRef(0);
transform(ref, new AbstractFunction1<Integer, Integer>() {
public Integer apply(Integer i) {
return i + 10;
}
});
int value = ref.get();
assertEquals(10, value);
}
@Test
public void incrementInteger() {
Ref.View<Integer> ref = newRef(0);
increment(ref, 10);
int value = ref.get();
assertEquals(10, value);
}
@Test
public void incrementLong() {
Ref.View<Long> ref = newRef(0L);
increment(ref, 10L);
long value = ref.get();
assertEquals(10L, value);
}
@Test
public void createAndUseTMap() {
Map<Integer, String> map = newMap();
map.put(1, "one");
map.put(2, "two");
assertEquals("one", map.get(1));
assertEquals("two", map.get(2));
assertTrue(map.containsKey(2));
map.remove(2);
assertFalse(map.containsKey(2));
}
@Test(expected = TestException.class)
public void failingTMapTransaction() {
final Map<Integer, String> map = newMap();
try {
atomic(new Runnable() {
public void run() {
map.put(1, "one");
map.put(2, "two");
assertTrue(map.containsKey(1));
assertTrue(map.containsKey(2));
throw new TestException();
}
});
} catch (TestException e) {
assertFalse(map.containsKey(1));
assertFalse(map.containsKey(2));
throw e;
}
}
@Test
public void createAndUseTSet() {
Set<String> set = newSet();
set.add("one");
set.add("two");
assertTrue(set.contains("one"));
assertTrue(set.contains("two"));
assertEquals(2, set.size());
set.add("one");
assertEquals(2, set.size());
set.remove("two");
assertFalse(set.contains("two"));
assertEquals(1, set.size());
}
@Test
public void createAndUseTArray() {
List<String> list = newList(3);
assertEquals(null, list.get(0));
assertEquals(null, list.get(1));
assertEquals(null, list.get(2));
list.set(0, "zero");
list.set(1, "one");
list.set(2, "two");
assertEquals("zero", list.get(0));
assertEquals("one", list.get(1));
assertEquals("two", list.get(2));
}
}

View file

@ -1,9 +0,0 @@
/* scala-stm - (c) 2009-2011, Stanford University, PPL */
package scala.concurrent.stm;
public class TestException extends RuntimeException {
public TestException() {
super("Expected failure");
}
}

View file

@ -1,7 +0,0 @@
/* scala-stm - (c) 2009-2011, Stanford University, PPL */
package scala.concurrent.stm
import org.scalatest.junit.JUnitWrapperSuite
class JavaAPISuite extends JUnitWrapperSuite("scala.concurrent.stm.JavaAPITests", Thread.currentThread.getContextClassLoader)

View file

@ -0,0 +1,7 @@
akka.actor.deployment {
/master/workerRouter {
# Uncomment the following two lines to change the calculation to use 10 workers instead of 4:
#router = round-robin
#nr-of-instances = 10
}
}

View file

@ -491,7 +491,7 @@ object Dependency {
val Netty = "3.3.0.Final"
val Protobuf = "2.4.1"
val Rabbit = "2.3.1"
val ScalaStm = "0.4"
val ScalaStm = "0.5"
val Scalatest = "1.6.1"
val Slf4j = "1.6.4"
val Spring = "3.0.5.RELEASE"

View file

@ -1,6 +1,15 @@
#!/usr/bin/env bash
#
# Release script for Akka.
#
# To run this script you need a user account on akka.io and contributor access
# to github.com/jboner/akka.
#
# If your username on akka.io is different from your local username then you can
# configure ssh to always associate a particular username with akka.io by adding
# the following to .ssh/config:
# Host akka.io
# User <username on akka.io>
# defaults
declare -r default_server="akka.io"