diff --git a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala index 32f2ede8bf..df2170905e 100644 --- a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala @@ -15,24 +15,6 @@ import scala.reflect.BeanInfo import com.google.protobuf.Message import akka.pattern.ask -class ProtobufSerializer extends Serializer { - val ARRAY_OF_BYTE_ARRAY = Array[Class[_]](classOf[Array[Byte]]) - def includeManifest: Boolean = true - def identifier = 2 - - def toBinary(obj: AnyRef): Array[Byte] = { - if (!obj.isInstanceOf[Message]) throw new IllegalArgumentException( - "Can't serialize a non-protobuf message using protobuf [" + obj + "]") - obj.asInstanceOf[Message].toByteArray - } - - def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]], classLoader: Option[ClassLoader] = None): AnyRef = { - if (!clazz.isDefined) throw new IllegalArgumentException( - "Need a protobuf message class to be able to serialize bytes using protobuf") - clazz.get.getDeclaredMethod("parseFrom", ARRAY_OF_BYTE_ARRAY: _*).invoke(null, bytes).asInstanceOf[Message] - } -} - object SerializeSpec { val serializationConf = ConfigFactory.parseString(""" @@ -40,11 +22,12 @@ object SerializeSpec { actor { serializers { java = "akka.serialization.JavaSerializer" + test = "akka.serialization.TestSerializer" } - + serialization-bindings { java = ["akka.serialization.SerializeSpec$Person", "akka.serialization.SerializeSpec$Address", "akka.serialization.MyJavaSerializableActor", "akka.serialization.MyStatelessActorWithMessagesInMailbox", "akka.serialization.MyActorWithProtobufMessagesInMailbox"] - proto = ["com.google.protobuf.Message", "akka.actor.ProtobufProtocol$MyMessage"] + test = ["akka.serialization.TestSerializble", "akka.serialization.SerializeSpec$PlainMessage"] } } } @@ -56,6 +39,21 @@ object SerializeSpec { case class Person(name: String, age: Int, address: Address) { def this() = this("", 0, null) } case class Record(id: Int, person: Person) + + class SimpleMessage(s: String) extends TestSerializble + + class ExtendedSimpleMessage(s: String, i: Int) extends SimpleMessage(s) + + trait AnotherInterface extends TestSerializble + + class AnotherMessage extends AnotherInterface + + class ExtendedAnotherMessage extends AnotherMessage + + class PlainMessage + + class ExtendedPlainMessage extends PlainMessage + } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -72,7 +70,7 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) { "have correct bindings" in { ser.bindings(addr.getClass.getName) must be("java") - ser.bindings("akka.actor.ProtobufProtocol$MyMessage") must be("proto") + ser.bindings(classOf[PlainMessage].getName) must be("test") } "serialize Address" in { @@ -145,6 +143,37 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) { a.shutdown() } } + + "resove serializer by direct interface" in { + val msg = new SimpleMessage("foo") + ser.serializerFor(msg.getClass).getClass must be(classOf[TestSerializer]) + } + + "resove serializer by interface implemented by super class" in { + val msg = new ExtendedSimpleMessage("foo", 17) + ser.serializerFor(msg.getClass).getClass must be(classOf[TestSerializer]) + } + + "resove serializer by indirect interface" in { + val msg = new AnotherMessage + ser.serializerFor(msg.getClass).getClass must be(classOf[TestSerializer]) + } + + "resove serializer by indirect interface implemented by super class" in { + val msg = new ExtendedAnotherMessage + ser.serializerFor(msg.getClass).getClass must be(classOf[TestSerializer]) + } + + "resove serializer for message with binding" in { + val msg = new PlainMessage + ser.serializerFor(msg.getClass).getClass must be(classOf[TestSerializer]) + } + + "resove serializer for message extending class with with binding" in { + val msg = new ExtendedPlainMessage + ser.serializerFor(msg.getClass).getClass must be(classOf[TestSerializer]) + } + } } @@ -158,13 +187,11 @@ object VerifySerializabilitySpec { serializers { java = "akka.serialization.JavaSerializer" - proto = "akka.serialization.ProtobufSerializer" default = "akka.serialization.JavaSerializer" } serialization-bindings { java = ["akka.serialization.SerializeSpec$Address", "akka.serialization.MyJavaSerializableActor", "akka.serialization.MyStatelessActorWithMessagesInMailbox", "akka.serialization.MyActorWithProtobufMessagesInMailbox"] - proto = ["com.google.protobuf.Message", "akka.actor.ProtobufProtocol$MyMessage"] } } } @@ -209,3 +236,20 @@ class VerifySerializabilitySpec extends AkkaSpec(VerifySerializabilitySpec.conf) system stop a } } + +trait TestSerializble + +class TestSerializer extends Serializer { + def includeManifest: Boolean = false + + def identifier = 9999 + + def toBinary(o: AnyRef): Array[Byte] = { + Array.empty[Byte] + } + + def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]] = None, + classLoader: Option[ClassLoader] = None): AnyRef = { + null + } +} diff --git a/akka-actor/src/main/java/akka/jsr166y/ForkJoinTask.java b/akka-actor/src/main/java/akka/jsr166y/ForkJoinTask.java index 996d05e647..9fe2005698 100644 --- a/akka-actor/src/main/java/akka/jsr166y/ForkJoinTask.java +++ b/akka-actor/src/main/java/akka/jsr166y/ForkJoinTask.java @@ -5,6 +5,8 @@ */ package akka.jsr166y; +import akka.util.Unsafe; + import java.io.Serializable; import java.util.Collection; import java.util.List; @@ -1521,23 +1523,6 @@ public abstract class ForkJoinTask implements Future, Serializable { * @return a sun.misc.Unsafe */ private static sun.misc.Unsafe getUnsafe() { - try { - return sun.misc.Unsafe.getUnsafe(); - } catch (SecurityException se) { - try { - return java.security.AccessController.doPrivileged - (new java.security - .PrivilegedExceptionAction() { - public sun.misc.Unsafe run() throws Exception { - java.lang.reflect.Field f = sun.misc - .Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - return (sun.misc.Unsafe) f.get(null); - }}); - } catch (java.security.PrivilegedActionException e) { - throw new RuntimeException("Could not initialize intrinsics", - e.getCause()); - } - } + return Unsafe.instance; } } diff --git a/akka-actor/src/main/scala/akka/AkkaException.scala b/akka-actor/src/main/scala/akka/AkkaException.scala index eb4a1cea8e..47f7465535 100644 --- a/akka-actor/src/main/scala/akka/AkkaException.scala +++ b/akka-actor/src/main/scala/akka/AkkaException.scala @@ -7,6 +7,24 @@ package akka import akka.actor.newUuid import java.net.{ InetAddress, UnknownHostException } +object AkkaException { + val hostname = try InetAddress.getLocalHost.getHostAddress catch { case e: UnknownHostException ⇒ "unknown host" } + + def toStringWithStackTrace(throwable: Throwable): String = throwable match { + case null ⇒ "Unknown Throwable: was 'null'" + case ae: AkkaException ⇒ ae.toLongString + case e ⇒ "%s:%s\n%s" format (e.getClass.getName, e.getMessage, stackTraceToString(e)) + } + + def stackTraceToString(throwable: Throwable): String = { + val trace = throwable.getStackTrace + val sb = new StringBuilder + for (i ← 0 until trace.length) + sb.append("\tat %s\n" format trace(i)) + sb.toString + } +} + /** * Akka base Exception. Each Exception gets: *
    @@ -19,26 +37,12 @@ class AkkaException(message: String = "", cause: Throwable = null) extends Runti val uuid = "%s_%s".format(AkkaException.hostname, newUuid) override lazy val toString = - "%s: %s\n[%s]".format(getClass.getName, message, uuid) + "%s:%s\n[%s]".format(getClass.getName, message, uuid) lazy val toLongString = - "%s: %s\n[%s]\n%s".format(getClass.getName, message, uuid, stackTraceToString) + "%s:%s\n[%s]\n%s".format(getClass.getName, message, uuid, stackTraceToString) def this(msg: String) = this(msg, null); - def stackTraceToString = { - val trace = getStackTrace - val sb = new StringBuilder - for (i ← 0 until trace.length) - sb.append("\tat %s\n" format trace(i)) - sb.toString - } -} - -object AkkaException { - val hostname = try { - InetAddress.getLocalHost.getHostAddress - } catch { - case e: UnknownHostException ⇒ "unknown" - } + def stackTraceToString = AkkaException.stackTraceToString(this) } diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index a6ed7259a5..f2e4e9827c 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -4,15 +4,21 @@ package akka.routing import akka.actor._ -import java.util.concurrent.atomic.{ AtomicLong, AtomicBoolean } -import java.util.concurrent.TimeUnit import akka.util.Duration import akka.util.duration._ -import com.typesafe.config.Config import akka.config.ConfigurationException import akka.pattern.pipe +import akka.pattern.AskSupport + +import com.typesafe.config.Config + import scala.collection.JavaConversions.iterableAsScalaIterable +import java.util.concurrent.atomic.{ AtomicLong, AtomicBoolean } +import java.util.concurrent.TimeUnit + +import akka.jsr166y.ThreadLocalRandom + /** * A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to * send a message to on (or more) of these actors. @@ -447,23 +453,16 @@ case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, } trait RandomLike { this: RouterConfig ⇒ - - import java.security.SecureRandom - def nrOfInstances: Int def routees: Iterable[String] - private val random = new ThreadLocal[SecureRandom] { - override def initialValue = SecureRandom.getInstance("SHA1PRNG") - } - def createRoute(props: Props, routeeProvider: RouteeProvider): Route = { routeeProvider.createAndRegisterRoutees(props, nrOfInstances, routees) def getNext(): ActorRef = { val _routees = routeeProvider.routees - _routees(random.get.nextInt(_routees.size)) + _routees(ThreadLocalRandom.current.nextInt(_routees.size)) } { @@ -711,11 +710,13 @@ object ScatterGatherFirstCompletedRouter { } /** * Simple router that broadcasts the message to all routees, and replies with the first response. - *
    + *
    + * You have to defin the 'within: Duration' parameter (f.e: within = 10 seconds). + *
    * Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means * that the router should both create new actors and use the 'routees' actor(s). * In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. - *
    + *
    * The configuration parameter trumps the constructor arguments. This means that * if you provide either 'nrOfInstances' or 'routees' during instantiation they will * be ignored if the router is defined in the configuration file for the actor being used. @@ -727,6 +728,9 @@ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: It override val resizer: Option[Resizer] = None) extends RouterConfig with ScatterGatherFirstCompletedLike { + if (within <= Duration.Zero) throw new IllegalArgumentException( + "[within: Duration] can not be zero or negative, was [" + within + "]") + /** * Constructor that sets nrOfInstances to be created. * Java API diff --git a/akka-actor/src/main/scala/akka/serialization/Serialization.scala b/akka-actor/src/main/scala/akka/serialization/Serialization.scala index 750b2e5c35..e89adde8fb 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serialization.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serialization.scala @@ -10,6 +10,8 @@ import scala.util.DynamicVariable import com.typesafe.config.Config import akka.config.ConfigurationException import akka.actor.{ Extension, ActorSystem, ExtendedActorSystem, Address } +import java.util.concurrent.ConcurrentHashMap +import akka.event.Logging case class NoSerializerFoundException(m: String) extends AkkaException(m) @@ -65,6 +67,7 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { import Serialization._ val settings = new Settings(system.settings.config) + val log = Logging(system, getClass.getName) /** * Serializes the given AnyRef/java.lang.Object according to the Serialization configuration @@ -111,10 +114,37 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { } /** - * Returns the configured Serializer for the given Class, falls back to the Serializer named "default" + * Returns the configured Serializer for the given Class, falls back to the Serializer named "default". + * It traverses interfaces and super classes to find any configured Serializer that match + * the class name. */ - def serializerFor(clazz: Class[_]): Serializer = //TODO fall back on BestMatchClass THEN default AND memoize the lookups - serializerMap.get(clazz.getName).getOrElse(serializers("default")) + def serializerFor(clazz: Class[_]): Serializer = + if (bindings.isEmpty) { + // quick path to default when no bindings are registered + serializers("default") + } else { + + def resolve(c: Class[_]): Option[Serializer] = + serializerMap.get(c.getName) match { + case null ⇒ + val classes = c.getInterfaces ++ Option(c.getSuperclass) + classes.view map resolve collectFirst { case Some(x) ⇒ x } + case x ⇒ Some(x) + } + + serializerMap.get(clazz.getName) match { + case null ⇒ + val ser = resolve(clazz).getOrElse(serializers("default")) + // memorize the lookups for performance + serializerMap.putIfAbsent(clazz.getName, ser) match { + case null ⇒ + log.debug("Using serializer[{}] for message [{}]", ser.getClass.getName, clazz.getName) + ser + case some ⇒ some + } + case ser ⇒ ser + } + } /** * Tries to load the specified Serializer by the FQN @@ -146,9 +176,15 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { } /** - * serializerMap is a Map whose keys = FQN of class that is serializable and values = the FQN of the serializer to be used for that class + * serializerMap is a Map whose keys = FQN of class that is serializable and values is the serializer to be used for that class */ - lazy val serializerMap: Map[String, Serializer] = bindings mapValues serializers + private lazy val serializerMap: ConcurrentHashMap[String, Serializer] = { + val serializerMap = new ConcurrentHashMap[String, Serializer] + for ((k, v) ← bindings) { + serializerMap.put(k, serializers(v)) + } + serializerMap + } /** * Maps from a Serializer Identity (Int) to a Serializer instance (optimization) diff --git a/akka-docs/intro/getting-started-first-java.rst b/akka-docs/intro/getting-started-first-java.rst index 19ede63b18..5f45a51528 100644 --- a/akka-docs/intro/getting-started-first-java.rst +++ b/akka-docs/intro/getting-started-first-java.rst @@ -397,6 +397,18 @@ When this in done we can run our application directly inside Maven:: Yippee! It is working. +Overriding Configuration Externally +----------------------------------- + +The sample project includes an ``application.conf`` file in the resources directory: + +.. includecode:: ../../akka-tutorials/akka-tutorial-first/src/main/resources/application.conf + +If you uncomment the two lines, you should see a change in performance, +hopefully for the better. It should be noted that overriding only works if a +router type is given, so just uncommenting ``nr-of-instances`` does not work; +see :ref:`routing-java` for more details. + Conclusion ---------- diff --git a/akka-docs/intro/getting-started-first-scala-eclipse.rst b/akka-docs/intro/getting-started-first-scala-eclipse.rst index 66a9e6323d..9d45270ebd 100644 --- a/akka-docs/intro/getting-started-first-scala-eclipse.rst +++ b/akka-docs/intro/getting-started-first-scala-eclipse.rst @@ -421,6 +421,18 @@ arguments to the JVM on the ``Arguments`` page, for instance to define where :re Once you finished your run configuration, click ``Run``. You should see the same output in the ``Console`` window. You can use the same configuration for debugging the application, by choosing ``Run/Debug History`` or just ``Debug As``. +Overriding Configuration Externally +----------------------------------- + +The sample project includes an ``application.conf`` file in the resources directory: + +.. includecode:: ../../akka-tutorials/akka-tutorial-first/src/main/resources/application.conf + +If you uncomment the two lines, you should see a change in performance, +hopefully for the better. It should be noted that overriding only works if a +router type is given, so just uncommenting ``nr-of-instances`` does not work; +see :ref:`routing-java` for more details. + Conclusion ---------- diff --git a/akka-docs/intro/getting-started-first-scala.rst b/akka-docs/intro/getting-started-first-scala.rst index 8f5fb40243..10386ca334 100644 --- a/akka-docs/intro/getting-started-first-scala.rst +++ b/akka-docs/intro/getting-started-first-scala.rst @@ -442,6 +442,18 @@ When this in done we can run our application directly inside SBT:: Yippee! It is working. +Overriding Configuration Externally +=================================== + +The sample project includes an ``application.conf`` file in the resources directory: + +.. includecode:: ../../akka-tutorials/akka-tutorial-first/src/main/resources/application.conf + +If you uncomment the two lines, you should see a change in performance, +hopefully for the better. It should be noted that overriding only works if a +router type is given, so just uncommenting ``nr-of-instances`` does not work; +see :ref:`routing-scala` for more details. + Conclusion ========== diff --git a/akka-docs/java/code/akka/docs/jrouting/RouterViaProgramExample.java b/akka-docs/java/code/akka/docs/jrouting/RouterViaProgramExample.java index cc3c45169e..27e90b63fa 100644 --- a/akka-docs/java/code/akka/docs/jrouting/RouterViaProgramExample.java +++ b/akka-docs/java/code/akka/docs/jrouting/RouterViaProgramExample.java @@ -68,4 +68,18 @@ public class RouterViaProgramExample { router3.tell(new ExampleActor.Message(i)); } } + + private class CompileCheckJavaDocsForRouting extends UntypedActor { + + @Override + public void onReceive(Object o) { + //#reply-with-parent + getSender().tell("reply", getContext().parent()); // replies go to router + //#reply-with-parent + //#reply-with-self + getSender().tell("reply", getSelf()); // replies go to this actor + //#reply-with-self + } + + } } \ No newline at end of file diff --git a/akka-docs/java/code/akka/docs/serialization/SerializationDocTestBase.java b/akka-docs/java/code/akka/docs/serialization/SerializationDocTestBase.java index 1b917c8b5b..b68f8f2e79 100644 --- a/akka-docs/java/code/akka/docs/serialization/SerializationDocTestBase.java +++ b/akka-docs/java/code/akka/docs/serialization/SerializationDocTestBase.java @@ -91,14 +91,17 @@ public class SerializationDocTestBase { serializers { default = "akka.serialization.JavaSerializer" java = "akka.serialization.JavaSerializer" + proto = "akka.serialization.ProtobufSerializer" myown = "akka.docs.serialization.MyOwnSerializer" } serialization-bindings { java = ["java.lang.String", "app.my.Customer"] + proto = ["com.google.protobuf.Message"] myown = ["my.own.BusinessObject", "something.equally.Awesome", + "akka.docs.serialization.MyOwnSerializable" "java.lang.Boolean"] } } diff --git a/akka-docs/java/code/akka/docs/transactor/CoordinatedCounter.java b/akka-docs/java/code/akka/docs/transactor/CoordinatedCounter.java index cf93a048c7..dd7f119005 100644 --- a/akka-docs/java/code/akka/docs/transactor/CoordinatedCounter.java +++ b/akka-docs/java/code/akka/docs/transactor/CoordinatedCounter.java @@ -8,10 +8,10 @@ package akka.docs.transactor; import akka.actor.*; import akka.transactor.*; import scala.concurrent.stm.Ref; -import scala.concurrent.stm.japi.Stm; +import scala.concurrent.stm.japi.STM; public class CoordinatedCounter extends UntypedActor { - private Ref.View count = Stm.newRef(0); + private Ref.View count = STM.newRef(0); public void onReceive(Object incoming) throws Exception { if (incoming instanceof Coordinated) { @@ -24,7 +24,7 @@ public class CoordinatedCounter extends UntypedActor { } coordinated.atomic(new Runnable() { public void run() { - Stm.increment(count, 1); + STM.increment(count, 1); } }); } diff --git a/akka-docs/java/code/akka/docs/transactor/Counter.java b/akka-docs/java/code/akka/docs/transactor/Counter.java index 0160a34048..ea2291afeb 100644 --- a/akka-docs/java/code/akka/docs/transactor/Counter.java +++ b/akka-docs/java/code/akka/docs/transactor/Counter.java @@ -7,14 +7,14 @@ package akka.docs.transactor; //#class import akka.transactor.*; import scala.concurrent.stm.Ref; -import scala.concurrent.stm.japi.Stm; +import scala.concurrent.stm.japi.STM; public class Counter extends UntypedTransactor { - Ref.View count = Stm.newRef(0); + Ref.View count = STM.newRef(0); public void atomically(Object message) { if (message instanceof Increment) { - Stm.increment(count, 1); + STM.increment(count, 1); } } diff --git a/akka-docs/java/code/akka/docs/transactor/FriendlyCounter.java b/akka-docs/java/code/akka/docs/transactor/FriendlyCounter.java index 14a01f859e..18f2137ea4 100644 --- a/akka-docs/java/code/akka/docs/transactor/FriendlyCounter.java +++ b/akka-docs/java/code/akka/docs/transactor/FriendlyCounter.java @@ -9,10 +9,10 @@ import akka.actor.*; import akka.transactor.*; import java.util.Set; import scala.concurrent.stm.Ref; -import scala.concurrent.stm.japi.Stm; +import scala.concurrent.stm.japi.STM; public class FriendlyCounter extends UntypedTransactor { - Ref.View count = Stm.newRef(0); + Ref.View count = STM.newRef(0); @Override public Set coordinate(Object message) { if (message instanceof Increment) { @@ -25,7 +25,7 @@ public class FriendlyCounter extends UntypedTransactor { public void atomically(Object message) { if (message instanceof Increment) { - Stm.increment(count, 1); + STM.increment(count, 1); } } diff --git a/akka-docs/java/futures.rst b/akka-docs/java/futures.rst index bbebb0f080..c738167a50 100644 --- a/akka-docs/java/futures.rst +++ b/akka-docs/java/futures.rst @@ -18,7 +18,7 @@ Execution Contexts ------------------ In order to execute callbacks and operations, Futures need something called an ``ExecutionContext``, -which is very similar to a `java.util.concurrent.Executor``. if you have an ``ActorSystem`` in scope, +which is very similar to a ``java.util.concurrent.Executor``. if you have an ``ActorSystem`` in scope, it will use its default dispatcher as the ``ExecutionContext``, or you can use the factory methods provided by the ``ExecutionContexts`` class to wrap ``Executors`` and ``ExecutorServices``, or even create your own. @@ -199,7 +199,7 @@ which allows for ordering like in the following sample: Auxiliary methods ----------------- -``Future`` ``fallbackTo`` combines 2 Futures into a new ``Future``, and will hold the successful value of the second ``Future` +``Future`` ``fallbackTo`` combines 2 Futures into a new ``Future``, and will hold the successful value of the second ``Future`` if the first ``Future`` fails. .. includecode:: code/akka/docs/future/FutureDocTestBase.java @@ -230,7 +230,7 @@ our ``Future`` would have a result of 0. The ``recover`` method works very simil so multiple ``Exception``\s can be handled in this manner, and if an ``Exception`` is not handled this way it will behave as if we hadn't used the ``recover`` method. -You can also use the ``recoverWith`` method, which has the same relationship to ``recover`` as ``flatMap` has to ``map``, +You can also use the ``recoverWith`` method, which has the same relationship to ``recover`` as ``flatMap`` has to ``map``, and is use like this: .. includecode:: code/akka/docs/future/FutureDocTestBase.java diff --git a/akka-docs/java/routing.rst b/akka-docs/java/routing.rst index 42ad1108ea..a422900440 100644 --- a/akka-docs/java/routing.rst +++ b/akka-docs/java/routing.rst @@ -25,7 +25,7 @@ is really easy to create your own. The routers shipped with Akka are: * ``akka.routing.BroadcastRouter`` * ``akka.routing.ScatterGatherFirstCompletedRouter`` -Routers Explained +Routers In Action ^^^^^^^^^^^^^^^^^ This is an example of how to create a router that is defined in configuration: @@ -45,8 +45,11 @@ You can also give the router already created routees as in: When you create a router programatically you define the number of routees *or* you pass already created routees to it. If you send both parameters to the router *only* the latter will be used, i.e. ``nrOfInstances`` is disregarded. -*It is also worth pointing out that if you define the ``router`` in the configuration file then this value will be used -instead of any programmatically sent parameters.* +*It is also worth pointing out that if you define the ``router`` in the +configuration file then this value will be used instead of any programmatically +sent parameters. The decision whether to create a router at all, on the other +hand, must be taken within the code, i.e. you cannot make something a router by +external configuration alone (see below for details).* Once you have the router actor it is just to send messages to it as you would to any actor: @@ -56,6 +59,44 @@ Once you have the router actor it is just to send messages to it as you would to The router will apply its behavior to the message it receives and forward it to the routees. +How Routing is Designed within Akka +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Routers behave like single actors, but they should also not hinder scalability. +This apparent contradiction is solved by making routers be represented by a +special :class:`RoutedActorRef`, which dispatches incoming messages destined +for the routees without actually invoking the router actor’s behavior (and thus +avoiding its mailbox; the single router actor’s task is to manage all aspects +related to the lifecycle of the routees). This means that the code which decides +which route to take is invoked concurrently from all possible senders and hence +must be thread-safe, it cannot live the simple and happy life of code within an +actor. + +There is one part in the above paragraph which warrants some more background +explanation: Why does a router need a “head” which is actual parent to all the +routees? The initial design tried to side-step this issue, but location +transparency as well as mandatory parental supervision required a redesign. +Each of the actors which the router spawns must have its unique identity, which +translates into a unique actor path. Since the router has only one given name +in its parent’s context, another level in the name space is needed, which +according to the addressing semantics implies the existence of an actor with +the router’s name. This is not only necessary for the internal messaging +involved in creating, restarting and terminating actors, it is also needed when +the pooled actors need to converse with other actors and receive replies in a +deterministic fashion. Since each actor knows its own external representation +as well as that of its parent, the routees decide where replies should be sent +when reacting to a message: + +.. includecode:: code/akka/docs/jrouting/RouterViaProgramExample.java#reply-with-parent + +.. includecode:: code/akka/docs/jrouting/RouterViaProgramExample.java#reply-with-self + +It is apparent now why routing needs to be enabled in code rather than being +possible to “bolt on” later: whether or not an actor is routed means a change +to the actor hierarchy, changing the actor paths of all children of the router. +The routees especially do need to know that they are routed to in order to +choose the sender reference for any messages they dispatch as shown above. + Router usage ^^^^^^^^^^^^ diff --git a/akka-docs/java/serialization.rst b/akka-docs/java/serialization.rst index 7140b42aac..2920538ded 100644 --- a/akka-docs/java/serialization.rst +++ b/akka-docs/java/serialization.rst @@ -39,8 +39,26 @@ should be serialized using which ``Serializer``, this is done in the "akka.actor .. note:: - Akka currently only checks for absolute equality of Classes, i.e. it does not yet check ``isAssignableFrom``, - this means that you'll need to list the specific classes. + You only need to specify the name of an interface or abstract base class if the messages implements + that. E.g. ``com.google.protobuf.Message`` for protobuf serialization. + +Protobuf +-------- + +Akka provides a ``Serializer`` for `protobuf `_ messages. +To use that you need to add the following to the configuration:: + + akka { + actor { + serializers { + proto = "akka.serialization.ProtobufSerializer" + } + + serialization-bindings { + proto = ["com.google.protobuf.Message"] + } + } + } Verification ------------ @@ -74,6 +92,7 @@ here's some examples: For more information, have a look at the ``ScalaDoc`` for ``akka.serialization._`` + Customization ============= diff --git a/akka-docs/project/migration-guide-1.3.x-2.0.x.rst b/akka-docs/project/migration-guide-1.3.x-2.0.x.rst index d17070aeff..8a46d5f654 100644 --- a/akka-docs/project/migration-guide-1.3.x-2.0.x.rst +++ b/akka-docs/project/migration-guide-1.3.x-2.0.x.rst @@ -609,11 +609,11 @@ Java For Java there is a special helper object with Java-friendly methods:: - import scala.concurrent.stm.japi.Stm; + import scala.concurrent.stm.japi.STM; These methods can also be statically imported:: - import static scala.concurrent.stm.japi.Stm.*; + import static scala.concurrent.stm.japi.STM.*; Other imports that are needed are in the stm package, particularly ``Ref``:: @@ -671,7 +671,7 @@ v1.3:: v2.0:: - import static scala.concurrent.stm.japi.Stm.atomic; + import static scala.concurrent.stm.japi.STM.atomic; import java.util.concurrent.Callable; atomic(new Runnable() { @@ -756,7 +756,7 @@ Java As ``Ref.View`` in ScalaSTM does not require implicit transactions, this is more easily used from Java. ``Ref`` could be used, but requires explicit threading of -transactions. There are helper methods in ``japi.Stm`` for creating ``Ref.View`` +transactions. There are helper methods in ``japi.STM`` for creating ``Ref.View`` references. v1.3:: @@ -765,7 +765,7 @@ v1.3:: v2.0:: - Ref.View ref = Stm.newRef(0); + Ref.View ref = STM.newRef(0); The ``set`` and ``get`` methods work the same way for both versions. @@ -780,7 +780,7 @@ v2.0:: ref.set(1); // set new value There are also ``transform``, ``getAndTransform``, and ``transformAndGet`` -methods in ``japi.Stm`` which accept ``scala.runtime.AbstractFunction1``. +methods in ``japi.STM`` which accept ``japi.STM.Transformer`` objects. There are ``increment`` helper methods for ``Ref.View`` and ``Ref.View`` references. @@ -821,7 +821,7 @@ Java Rather than using the ``deferred`` and ``compensating`` methods in ``akka.stm.StmUtils``, use the ``afterCommit`` and ``afterRollback`` methods in -``scala.concurrent.stm.japi.Stm``, which behave in the same way and accept +``scala.concurrent.stm.japi.STM``, which behave in the same way and accept ``Runnable``. Transactional Datastructures @@ -830,12 +830,12 @@ Transactional Datastructures In ScalaSTM see ``TMap``, ``TSet``, and ``TArray`` for transactional datastructures. -There are helper methods for creating these from Java in ``japi.Stm``: +There are helper methods for creating these from Java in ``japi.STM``: ``newTMap``, ``newTSet``, and ``newTArray``. These datastructures implement the ``scala.collection`` interfaces and can also be used from Java with Scala's ``JavaConversions``. There are helper methods that apply the conversions, returning ``java.util`` ``Map``, ``Set``, and ``List``: ``newMap``, ``newSet``, -and ``newList``. +and ``newArrayAsList``. More to be written diff --git a/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala b/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala index 55a205746f..8747060554 100644 --- a/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala @@ -11,7 +11,7 @@ import akka.event.Logging //#imports1 import akka.dispatch.Future -import akka.actor.ActorSystem +import akka.actor.{ ActorRef, ActorSystem } import org.scalatest.{ BeforeAndAfterAll, WordSpec } import org.scalatest.matchers.MustMatchers import akka.testkit._ @@ -356,4 +356,29 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { //#ask-pipeTo } + "replying with own or other sender" in { + val actor = system.actorOf(Props(new Actor { + def receive = { + case ref: ActorRef ⇒ + //#reply-with-sender + sender.tell("reply", context.parent) // replies will go back to parent + sender.!("reply")(context.parent) // alternative syntax (beware of the parens!) + //#reply-with-sender + case x ⇒ + //#reply-without-sender + sender ! x // replies will go to this actor + //#reply-without-sender + } + })) + implicit val me = testActor + actor ! 42 + expectMsg(42) + lastSender must be === actor + actor ! me + expectMsg("reply") + lastSender must be === system.actorFor("/user") + expectMsg("reply") + lastSender must be === system.actorFor("/user") + } + } diff --git a/akka-docs/scala/code/akka/docs/serialization/SerializationDocSpec.scala b/akka-docs/scala/code/akka/docs/serialization/SerializationDocSpec.scala index 6baba425bc..7f1553f75c 100644 --- a/akka-docs/scala/code/akka/docs/serialization/SerializationDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/serialization/SerializationDocSpec.scala @@ -104,14 +104,17 @@ class SerializationDocSpec extends AkkaSpec { serializers { default = "akka.serialization.JavaSerializer" java = "akka.serialization.JavaSerializer" + proto = "akka.serialization.ProtobufSerializer" myown = "akka.docs.serialization.MyOwnSerializer" } serialization-bindings { java = ["java.lang.String", "app.my.Customer"] + proto = ["com.google.protobuf.Message"] myown = ["my.own.BusinessObject", "something.equally.Awesome", + "akka.docs.serialization.MyOwnSerializable" "java.lang.Boolean"] } } diff --git a/akka-docs/scala/futures.rst b/akka-docs/scala/futures.rst index 584274004d..5255e91f91 100644 --- a/akka-docs/scala/futures.rst +++ b/akka-docs/scala/futures.rst @@ -22,7 +22,7 @@ which is very similar to a `java.util.concurrent.Executor``. if you have an ``Ac it will use its default dispatcher as the ``ExecutionContext``, or you can use the factory methods provided by the ``ExecutionContext`` companion object to wrap ``Executors`` and ``ExecutorServices``, or even create your own. -Use with Actors +Use With Actors --------------- There are generally two ways of getting a reply from an ``Actor``: the first is by a sent message (``actor ! msg``), @@ -105,6 +105,9 @@ but if 2 or more ``Future``\s are involved ``map`` will not allow you to combine .. includecode:: code/akka/docs/future/FutureDocSpec.scala :include: flat-map +Composing futures using nested combinators it can sometimes become quite complicated and hard read, in these cases using Scala's +'for comprehensions' usually yields more readable code. See next section for examples. + If you need to do conditional propagation, you can use ``filter``: .. includecode:: code/akka/docs/future/FutureDocSpec.scala @@ -113,7 +116,7 @@ If you need to do conditional propagation, you can use ``filter``: For Comprehensions ^^^^^^^^^^^^^^^^^^ -Since ``Future`` has a ``map``, ``filter` and ``flatMap`` method it can be easily used in a 'for comprehension': +Since ``Future`` has a ``map``, ``filter`` and ``flatMap`` method it can be easily used in a 'for comprehension': .. includecode:: code/akka/docs/future/FutureDocSpec.scala :include: for-comprehension @@ -188,7 +191,7 @@ as the start-value, you can use ``reduce``, it works like this: .. includecode:: code/akka/docs/future/FutureDocSpec.scala :include: reduce -Same as with ``fold``, the execution will be done asynchronously when the last of the Future is completed,` +Same as with ``fold``, the execution will be done asynchronously when the last of the Future is completed, you can also parallelize it by chunking your futures into sub-sequences and reduce them, and then reduce the reduced results again. Callbacks @@ -206,22 +209,22 @@ For this Akka supports ``onComplete``, ``onSuccess`` and ``onFailure``, of which .. includecode:: code/akka/docs/future/FutureDocSpec.scala :include: onComplete -Ordering --------- +Define Ordering +--------------- Since callbacks are executed in any order and potentially in parallel, it can be tricky at the times when you need sequential ordering of operations. -But there's a solution! And it's name is ``andThen``, and it creates a new Future with -the specified callback, a Future that will have the same result as the Future it's called on, +But there's a solution and it's name is ``andThen``. It creates a new ``Future`` with +the specified callback, a ``Future`` that will have the same result as the ``Future`` it's called on, which allows for ordering like in the following sample: .. includecode:: code/akka/docs/future/FutureDocSpec.scala :include: and-then -Auxiliary methods +Auxiliary Methods ----------------- -``Future`` ``fallbackTo`` combines 2 Futures into a new ``Future``, and will hold the successful value of the second ``Future` +``Future`` ``fallbackTo`` combines 2 Futures into a new ``Future``, and will hold the successful value of the second ``Future`` if the first ``Future`` fails. .. includecode:: code/akka/docs/future/FutureDocSpec.scala @@ -252,7 +255,7 @@ our ``Future`` would have a result of 0. The ``recover`` method works very simil so multiple ``Exception``\s can be handled in this manner, and if an ``Exception`` is not handled this way it will behave as if we hadn't used the ``recover`` method. -You can also use the ``recoverWith`` method, which has the same relationship to ``recover`` as ``flatMap` has to ``map``, +You can also use the ``recoverWith`` method, which has the same relationship to ``recover`` as ``flatMap`` has to ``map``, and is use like this: .. includecode:: code/akka/docs/future/FutureDocSpec.scala diff --git a/akka-docs/scala/routing.rst b/akka-docs/scala/routing.rst index 5b2ed24d28..f67841df2c 100644 --- a/akka-docs/scala/routing.rst +++ b/akka-docs/scala/routing.rst @@ -25,7 +25,7 @@ is really easy to create your own. The routers shipped with Akka are: * ``akka.routing.BroadcastRouter`` * ``akka.routing.ScatterGatherFirstCompletedRouter`` -Routers Explained +Routers In Action ^^^^^^^^^^^^^^^^^ This is an example of how to create a router that is defined in configuration: @@ -45,8 +45,11 @@ You can also give the router already created routees as in: When you create a router programatically you define the number of routees *or* you pass already created routees to it. If you send both parameters to the router *only* the latter will be used, i.e. ``nrOfInstances`` is disregarded. -*It is also worth pointing out that if you define the ``router`` in the configuration file then this value will be used -instead of any programmatically sent parameters.* +*It is also worth pointing out that if you define the ``router`` in the +configuration file then this value will be used instead of any programmatically +sent parameters. The decision whether to create a router at all, on the other +hand, must be taken within the code, i.e. you cannot make something a router by +external configuration alone (see below for details).* Once you have the router actor it is just to send messages to it as you would to any actor: @@ -56,6 +59,44 @@ Once you have the router actor it is just to send messages to it as you would to The router will apply its behavior to the message it receives and forward it to the routees. +How Routing is Designed within Akka +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Routers behave like single actors, but they should also not hinder scalability. +This apparent contradiction is solved by making routers be represented by a +special :class:`RoutedActorRef`, which dispatches incoming messages destined +for the routees without actually invoking the router actor’s behavior (and thus +avoiding its mailbox; the single router actor’s task is to manage all aspects +related to the lifecycle of the routees). This means that the code which decides +which route to take is invoked concurrently from all possible senders and hence +must be thread-safe, it cannot live the simple and happy life of code within an +actor. + +There is one part in the above paragraph which warrants some more background +explanation: Why does a router need a “head” which is actual parent to all the +routees? The initial design tried to side-step this issue, but location +transparency as well as mandatory parental supervision required a redesign. +Each of the actors which the router spawns must have its unique identity, which +translates into a unique actor path. Since the router has only one given name +in its parent’s context, another level in the name space is needed, which +according to the addressing semantics implies the existence of an actor with +the router’s name. This is not only necessary for the internal messaging +involved in creating, restarting and terminating actors, it is also needed when +the pooled actors need to converse with other actors and receive replies in a +deterministic fashion. Since each actor knows its own external representation +as well as that of its parent, the routees decide where replies should be sent +when reacting to a message: + +.. includecode:: code/akka/docs/actor/ActorDocSpec.scala#reply-with-sender + +.. includecode:: code/akka/docs/actor/ActorDocSpec.scala#reply-without-sender + +It is apparent now why routing needs to be enabled in code rather than being +possible to “bolt on” later: whether or not an actor is routed means a change +to the actor hierarchy, changing the actor paths of all children of the router. +The routees especially do need to know that they are routed to in order to +choose the sender reference for any messages they dispatch as shown above. + Router usage ^^^^^^^^^^^^ diff --git a/akka-docs/scala/serialization.rst b/akka-docs/scala/serialization.rst index 15879b2ce4..6a0867dea2 100644 --- a/akka-docs/scala/serialization.rst +++ b/akka-docs/scala/serialization.rst @@ -39,8 +39,26 @@ should be serialized using which ``Serializer``, this is done in the "akka.actor .. note:: - Akka currently only checks for absolute equality of Classes, i.e. it does not yet check ``isAssignableFrom``, - this means that you'll need to list the specific classes. + You only need to specify the name of an interface or abstract base class if the messages implements + that. E.g. ``com.google.protobuf.Message`` for protobuf serialization. + +Protobuf +-------- + +Akka provides a ``Serializer`` for `protobuf `_ messages. +To use that you need to add the following to the configuration:: + + akka { + actor { + serializers { + proto = "akka.serialization.ProtobufSerializer" + } + + serialization-bindings { + proto = ["com.google.protobuf.Message"] + } + } + } Verification ------------ diff --git a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala index a9f1199546..b40992d12a 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala @@ -32,11 +32,7 @@ case class RemoteClientError( @BeanProperty remoteAddress: Address) extends RemoteClientLifeCycleEvent { override def logLevel = Logging.ErrorLevel override def toString = - "RemoteClientError@" + - remoteAddress + - ": Error[" + - (if (cause ne null) cause.getClass.getName + ": " + cause.getMessage else "unknown") + - "]" + "RemoteClientError@" + remoteAddress + ": Error[" + AkkaException.toStringWithStackTrace(cause) + "]" } case class RemoteClientDisconnected( @@ -78,13 +74,9 @@ case class RemoteClientWriteFailed( @BeanProperty remoteAddress: Address) extends RemoteClientLifeCycleEvent { override def logLevel = Logging.WarningLevel override def toString = - "RemoteClientWriteFailed@" + - remoteAddress + - ": MessageClass[" + - (if (request ne null) request.getClass.getName else "no message") + - "] Error[" + - (if (cause ne null) cause.getClass.getName + ": " + cause.getMessage else "unknown") + - "]" + "RemoteClientWriteFailed@" + remoteAddress + + ": MessageClass[" + (if (request ne null) request.getClass.getName else "no message") + + "] Error[" + AkkaException.toStringWithStackTrace(cause) + "]" } /** @@ -111,11 +103,7 @@ case class RemoteServerError( @BeanProperty remote: RemoteTransport) extends RemoteServerLifeCycleEvent { override def logLevel = Logging.ErrorLevel override def toString = - "RemoteServerError@" + - remote + - ": Error[" + - (if (cause ne null) cause.getClass.getName + ": " + cause.getMessage else "unknown") + - "]" + "RemoteServerError@" + remote + "] Error[" + AkkaException.toStringWithStackTrace(cause) + "]" } case class RemoteServerClientConnected( @@ -123,11 +111,8 @@ case class RemoteServerClientConnected( @BeanProperty val clientAddress: Option[Address]) extends RemoteServerLifeCycleEvent { override def logLevel = Logging.DebugLevel override def toString = - "RemoteServerClientConnected@" + - remote + - ": Client[" + - (if (clientAddress.isDefined) clientAddress.get else "no address") + - "]" + "RemoteServerClientConnected@" + remote + + ": Client[" + clientAddress.getOrElse("no address") + "]" } case class RemoteServerClientDisconnected( @@ -135,11 +120,8 @@ case class RemoteServerClientDisconnected( @BeanProperty val clientAddress: Option[Address]) extends RemoteServerLifeCycleEvent { override def logLevel = Logging.DebugLevel override def toString = - "RemoteServerClientDisconnected@" + - remote + - ": Client[" + - (if (clientAddress.isDefined) clientAddress.get else "no address") + - "]" + "RemoteServerClientDisconnected@" + remote + + ": Client[" + clientAddress.getOrElse("no address") + "]" } case class RemoteServerClientClosed( @@ -147,11 +129,8 @@ case class RemoteServerClientClosed( @BeanProperty val clientAddress: Option[Address]) extends RemoteServerLifeCycleEvent { override def logLevel = Logging.DebugLevel override def toString = - "RemoteServerClientClosed@" + - remote + - ": Client[" + - (if (clientAddress.isDefined) clientAddress.get else "no address") + - "]" + "RemoteServerClientClosed@" + remote + + ": Client[" + clientAddress.getOrElse("no address") + "]" } case class RemoteServerWriteFailed( @@ -161,15 +140,10 @@ case class RemoteServerWriteFailed( @BeanProperty remoteAddress: Option[Address]) extends RemoteServerLifeCycleEvent { override def logLevel = Logging.WarningLevel override def toString = - "RemoteServerWriteFailed@" + - remote + - ": ClientAddress[" + - remoteAddress + - "] MessageClass[" + - (if (request ne null) request.getClass.getName else "no message") + - "] Error[" + - (if (cause ne null) cause.getClass.getName + ": " + cause.getMessage else "unknown") + - "]" + "RemoteServerWriteFailed@" + remote + + ": ClientAddress[" + remoteAddress + + "] MessageClass[" + (if (request ne null) request.getClass.getName else "no message") + + "] Error[" + AkkaException.toStringWithStackTrace(cause) + "]" } /** diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/AbstractRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/AbstractRemoteActorMultiJvmSpec.scala index dd15817374..ab8bdadae6 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/AbstractRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/AbstractRemoteActorMultiJvmSpec.scala @@ -6,23 +6,24 @@ trait AbstractRemoteActorMultiJvmSpec { def NrOfNodes: Int def commonConfig: Config + def PortRangeStart = 1990 + def NodeRange = 1 to NrOfNodes + def PortRange = PortRangeStart to NrOfNodes + private[this] val remotes: IndexedSeq[String] = { val nodesOpt = Option(AkkaRemoteSpec.testNodes).map(_.split(",").toIndexedSeq) nodesOpt getOrElse IndexedSeq.fill(NrOfNodes)("localhost") } - def akkaSpec(idx: Int) = "AkkaRemoteSpec@%s:%d".format(remotes(idx), 9991+idx) - - def akkaURIs(count: Int): String = { - 0 until count map {idx => "\"akka://" + akkaSpec(idx) + "\""} mkString "," - } - - val nodeConfigs = ((1 to NrOfNodes).toList zip remotes) map { - case (idx, host) => + val nodeConfigs = (NodeRange.toList zip remotes) map { + case (port, host) => ConfigFactory.parseString(""" akka { remote.netty.hostname="%s" remote.netty.port = "%d" - }""".format(host, 9990+idx, idx)) withFallback commonConfig + }""".format(host, PortRangeStart + port, port)) withFallback commonConfig } + + def akkaSpec(port: Int) = "AkkaRemoteSpec@%s:%d".format(remotes(port), PortRangeStart + 1 + port) + def akkaURIs(count: Int): String = 0 until count map {idx => "\"akka://" + akkaSpec(idx) + "\""} mkString "," } diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/AkkaRemoteSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/AkkaRemoteSpec.scala index 4ed4c16450..9a6dd08226 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/AkkaRemoteSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/AkkaRemoteSpec.scala @@ -30,6 +30,4 @@ object AkkaRemoteSpec { abstract class AkkaRemoteSpec(config: Config) extends AkkaSpec(config.withFallback(AkkaRemoteSpec.testConf)) - with MultiJvmSync { - -} + with MultiJvmSync diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/RandomRoutedRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/RandomRoutedRemoteActorMultiJvmSpec.scala index f895708294..bc69b2dd76 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/RandomRoutedRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/RandomRoutedRemoteActorMultiJvmSpec.scala @@ -76,7 +76,7 @@ class RandomRoutedRemoteActorMultiJvmNode4 extends AkkaRemoteSpec(RandomRoutedRe "be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" in { barrier("start") - val actor = system.actorOf(Props[SomeActor].withRouter(RoundRobinRouter()), "service-hello") + val actor = system.actorOf(Props[SomeActor].withRouter(RandomRouter()), "service-hello") actor.isInstanceOf[RoutedActorRef] must be(true) val connectionCount = NrOfNodes - 1 diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/ScatterGatherRoutedRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/ScatterGatherRoutedRemoteActorMultiJvmSpec.scala index 8a6bad79fa..b618300ff2 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/ScatterGatherRoutedRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/ScatterGatherRoutedRemoteActorMultiJvmSpec.scala @@ -76,7 +76,7 @@ class ScatterGatherRoutedRemoteActorMultiJvmNode4 extends AkkaRemoteSpec(Scatter "be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" in { barrier("start") - val actor = system.actorOf(Props[SomeActor].withRouter(RoundRobinRouter()), "service-hello") + val actor = system.actorOf(Props[SomeActor].withRouter(ScatterGatherFirstCompletedRouter(within = 10 seconds)), "service-hello") actor.isInstanceOf[RoutedActorRef] must be(true) val connectionCount = NrOfNodes - 1 diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/ZKClient.scala b/akka-remote/src/multi-jvm/scala/akka/remote/ZKClient.scala index 733883228e..611478babb 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/ZKClient.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/ZKClient.scala @@ -25,7 +25,8 @@ object ZkClient extends Watcher { private def waitForServer() { // SI-1672 val r = try { - zk.exists("/", false); true + zk.exists("/", false) + true } catch { case _: KeeperException.ConnectionLossException => Thread.sleep(10000) @@ -35,9 +36,7 @@ object ZkClient extends Watcher { } waitForServer() - try { - zk.create(root, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT) - } catch { + try zk.create(root, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT) catch { case _: KeeperException.NodeExistsException => } @@ -46,9 +45,7 @@ object ZkClient extends Watcher { private def block(num: Int) { val start = System.currentTimeMillis while (true) { - if (System.currentTimeMillis - start > timeoutMs) - throw new InterruptedException("Timed out blocking in zk") - + if (System.currentTimeMillis - start > timeoutMs) throw new InterruptedException("Timed out blocking in zk") ZkClient.this.synchronized { val children = zk.getChildren(root, true) if (children.size < num) { @@ -60,16 +57,12 @@ object ZkClient extends Watcher { } def enter() { - zk.create(root + "/" + name, Array[Byte](), Ids.OPEN_ACL_UNSAFE, - CreateMode.EPHEMERAL) - + zk.create(root + "/" + name, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL) block(count) } final def leave() { - zk.create(root + "/" + name + ".leave", Array[Byte](), Ids.OPEN_ACL_UNSAFE, - CreateMode.EPHEMERAL) - + zk.create(root + "/" + name + ".leave", Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL) block(2*count) } } diff --git a/akka-transactor/src/main/scala/scala/concurrent/stm/japi/Stm.scala b/akka-transactor/src/main/scala/scala/concurrent/stm/japi/Stm.scala deleted file mode 100644 index d9ed5a8330..0000000000 --- a/akka-transactor/src/main/scala/scala/concurrent/stm/japi/Stm.scala +++ /dev/null @@ -1,147 +0,0 @@ -/* scala-stm - (c) 2009-2011, Stanford University, PPL */ - -package scala.concurrent.stm.japi - -import java.util.concurrent.Callable -import java.util.{ List ⇒ JList, Map ⇒ JMap, Set ⇒ JSet } -import scala.collection.JavaConversions -import scala.concurrent.stm -import scala.concurrent.stm._ -import scala.runtime.AbstractFunction1 - -/** - * Java-friendly API for ScalaSTM. - * These methods can also be statically imported. - */ -object Stm { - - /** - * Create a Ref with an initial value. Return a `Ref.View`, which does not - * require implicit transactions. - * @param initialValue the initial value for the newly created `Ref.View` - * @return a new `Ref.View` - */ - def newRef[A](initialValue: A): Ref.View[A] = Ref(initialValue).single - - /** - * Create an empty TMap. Return a `TMap.View`, which does not require - * implicit transactions. See newMap for included java conversion. - * @return a new, empty `TMap.View` - */ - def newTMap[A, B](): TMap.View[A, B] = TMap.empty[A, B].single - - /** - * Create an empty TMap. Return a `java.util.Map` view of this TMap. - * @return a new, empty `TMap.View` wrapped as a `java.util.Map`. - */ - def newMap[A, B](): JMap[A, B] = JavaConversions.mutableMapAsJavaMap(newTMap[A, B]) - - /** - * Create an empty TSet. Return a `TSet.View`, which does not require - * implicit transactions. See newSet for included java conversion. - * @return a new, empty `TSet.View` - */ - def newTSet[A](): TSet.View[A] = TSet.empty[A].single - - /** - * Create an empty TSet. Return a `java.util.Set` view of this TSet. - * @return a new, empty `TSet.View` wrapped as a `java.util.Set`. - */ - def newSet[A](): JSet[A] = JavaConversions.mutableSetAsJavaSet(newTSet[A]) - - /** - * Create a TArray containing `length` elements. Return a `TArray.View`, - * which does not require implicit transactions. See newList for included - * java conversion. - * @param length the length of the `TArray.View` to be created - * @return a new `TArray.View` containing `length` elements (initially null) - */ - def newTArray[A <: AnyRef](length: Int): TArray.View[A] = TArray.ofDim[A](length)(ClassManifest.classType(AnyRef.getClass)).single - - /** - * Create an empty TArray. Return a `java.util.List` view of this Array. - * @param length the length of the `TArray.View` to be created - * @return a new, empty `TArray.View` wrapped as a `java.util.List`. - */ - def newList[A <: AnyRef](length: Int): JList[A] = JavaConversions.mutableSeqAsJavaList(newTArray[A](length)) - - /** - * Atomic block that takes a `Runnable`. - * @param runnable the `Runnable` to run within a transaction - */ - def atomic(runnable: Runnable): Unit = stm.atomic { txn ⇒ runnable.run } - - /** - * Atomic block that takes a `Callable`. - * @param callable the `Callable` to run within a transaction - * @return the value returned by the `Callable` - */ - def atomic[A](callable: Callable[A]): A = stm.atomic { txn ⇒ callable.call } - - /** - * Transform the value stored by `ref` by applying the function `f`. - * @param ref the `Ref.View` to be transformed - * @param f the function to be applied - */ - def transform[A](ref: Ref.View[A], f: AbstractFunction1[A, A]): Unit = ref.transform(f) - - /** - * Transform the value stored by `ref` by applying the function `f` and - * return the old value. - * @param ref the `Ref.View` to be transformed - * @param f the function to be applied - * @return the old value of `ref` - */ - def getAndTransform[A](ref: Ref.View[A], f: AbstractFunction1[A, A]): A = ref.getAndTransform(f) - - /** - * Transform the value stored by `ref` by applying the function `f` and - * return the new value. - * @param ref the `Ref.View` to be transformed - * @param f the function to be applied - * @return the new value of `ref` - */ - def transformAndGet[A](ref: Ref.View[A], f: AbstractFunction1[A, A]): A = ref.transformAndGet(f) - - /** - * Increment the `java.lang.Integer` value of a `Ref.View`. - * @param ref the `Ref.View` to be incremented - * @param delta the amount to increment - */ - def increment(ref: Ref.View[java.lang.Integer], delta: Int): Unit = ref.transform { v ⇒ v.intValue + delta } - - /** - * Increment the `java.lang.Long` value of a `Ref.View`. - * @param ref the `Ref.View` to be incremented - * @param delta the amount to increment - */ - def increment(ref: Ref.View[java.lang.Long], delta: Long): Unit = ref.transform { v ⇒ v.longValue + delta } - - /** - * Add a task to run after the current transaction has committed. - * @param task the `Runnable` task to run after transaction commit - */ - def afterCommit(task: Runnable): Unit = { - val txn = Txn.findCurrent - if (txn.isDefined) Txn.afterCommit(status ⇒ task.run)(txn.get) - } - - /** - * Add a task to run after the current transaction has rolled back. - * @param task the `Runnable` task to run after transaction rollback - */ - def afterRollback(task: Runnable): Unit = { - val txn = Txn.findCurrent - if (txn.isDefined) Txn.afterRollback(status ⇒ task.run)(txn.get) - } - - /** - * Add a task to run after the current transaction has either rolled back - * or committed. - * @param task the `Runnable` task to run after transaction completion - */ - def afterCompletion(task: Runnable): Unit = { - val txn = Txn.findCurrent - if (txn.isDefined) Txn.afterCompletion(status ⇒ task.run)(txn.get) - } -} diff --git a/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedCounter.java b/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedCounter.java index 7d169b1548..61905a775c 100644 --- a/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedCounter.java +++ b/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedCounter.java @@ -8,14 +8,14 @@ import akka.actor.ActorRef; import akka.actor.Actors; import akka.actor.UntypedActor; import scala.concurrent.stm.Ref; -import scala.concurrent.stm.japi.Stm; +import scala.concurrent.stm.japi.STM; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; public class UntypedCoordinatedCounter extends UntypedActor { private String name; - private Ref.View count = Stm.newRef(0); + private Ref.View count = STM.newRef(0); public UntypedCoordinatedCounter(String name) { this.name = name; @@ -40,8 +40,8 @@ public class UntypedCoordinatedCounter extends UntypedActor { } coordinated.atomic(new Runnable() { public void run() { - Stm.increment(count, 1); - Stm.afterCompletion(countDown); + STM.increment(count, 1); + STM.afterCompletion(countDown); } }); } diff --git a/akka-transactor/src/test/java/akka/transactor/UntypedCounter.java b/akka-transactor/src/test/java/akka/transactor/UntypedCounter.java index 446b79a747..452e528a5c 100644 --- a/akka-transactor/src/test/java/akka/transactor/UntypedCounter.java +++ b/akka-transactor/src/test/java/akka/transactor/UntypedCounter.java @@ -8,7 +8,7 @@ import akka.actor.ActorRef; import akka.transactor.UntypedTransactor; import akka.transactor.SendTo; import scala.concurrent.stm.Ref; -import scala.concurrent.stm.japi.Stm; +import scala.concurrent.stm.japi.STM; import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -16,7 +16,7 @@ import java.util.concurrent.TimeUnit; public class UntypedCounter extends UntypedTransactor { private String name; - private Ref.View count = Stm.newRef(0); + private Ref.View count = STM.newRef(0); public UntypedCounter(String name) { this.name = name; @@ -39,14 +39,14 @@ public class UntypedCounter extends UntypedTransactor { public void atomically(Object message) { if (message instanceof Increment) { - Stm.increment(count, 1); + STM.increment(count, 1); final Increment increment = (Increment) message; Runnable countDown = new Runnable() { public void run() { increment.getLatch().countDown(); } }; - Stm.afterCompletion(countDown); + STM.afterCompletion(countDown); } } diff --git a/akka-transactor/src/test/java/scala/concurrent/stm/JavaAPITests.java b/akka-transactor/src/test/java/scala/concurrent/stm/JavaAPITests.java deleted file mode 100644 index 09ed90af7a..0000000000 --- a/akka-transactor/src/test/java/scala/concurrent/stm/JavaAPITests.java +++ /dev/null @@ -1,156 +0,0 @@ -/* scala-stm - (c) 2009-2011, Stanford University, PPL */ - -package scala.concurrent.stm; - -import static org.junit.Assert.*; -import org.junit.Test; - -import scala.concurrent.stm.japi.Stm; -import static scala.concurrent.stm.japi.Stm.*; - -import scala.runtime.AbstractFunction1; -import java.util.concurrent.Callable; - -import java.util.Map; -import java.util.Set; -import java.util.List; - -public class JavaAPITests { - @Test - public void createIntegerRef() { - Ref.View ref = newRef(0); - int unboxed = ref.get(); - assertEquals(0, unboxed); - } - - @Test - public void atomicWithRunnable() { - final Ref.View ref = newRef(0); - atomic(new Runnable() { - public void run() { - ref.set(10); - } - }); - int value = ref.get(); - assertEquals(10, value); - } - - @Test - public void atomicWithCallable() { - final Ref.View ref = newRef(0); - int oldValue = atomic(new Callable() { - public Integer call() { - return ref.swap(10); - } - }); - assertEquals(0, oldValue); - int newValue = ref.get(); - assertEquals(10, newValue); - } - - @Test(expected = TestException.class) - public void failingTransaction() { - final Ref.View ref = newRef(0); - try { - atomic(new Runnable() { - public void run() { - ref.set(10); - throw new TestException(); - } - }); - } catch (TestException e) { - int value = ref.get(); - assertEquals(0, value); - throw e; - } - } - - @Test - public void transformInteger() { - Ref.View ref = newRef(0); - transform(ref, new AbstractFunction1() { - public Integer apply(Integer i) { - return i + 10; - } - }); - int value = ref.get(); - assertEquals(10, value); - } - - @Test - public void incrementInteger() { - Ref.View ref = newRef(0); - increment(ref, 10); - int value = ref.get(); - assertEquals(10, value); - } - - @Test - public void incrementLong() { - Ref.View ref = newRef(0L); - increment(ref, 10L); - long value = ref.get(); - assertEquals(10L, value); - } - - @Test - public void createAndUseTMap() { - Map map = newMap(); - map.put(1, "one"); - map.put(2, "two"); - assertEquals("one", map.get(1)); - assertEquals("two", map.get(2)); - assertTrue(map.containsKey(2)); - map.remove(2); - assertFalse(map.containsKey(2)); - } - - @Test(expected = TestException.class) - public void failingTMapTransaction() { - final Map map = newMap(); - try { - atomic(new Runnable() { - public void run() { - map.put(1, "one"); - map.put(2, "two"); - assertTrue(map.containsKey(1)); - assertTrue(map.containsKey(2)); - throw new TestException(); - } - }); - } catch (TestException e) { - assertFalse(map.containsKey(1)); - assertFalse(map.containsKey(2)); - throw e; - } - } - - @Test - public void createAndUseTSet() { - Set set = newSet(); - set.add("one"); - set.add("two"); - assertTrue(set.contains("one")); - assertTrue(set.contains("two")); - assertEquals(2, set.size()); - set.add("one"); - assertEquals(2, set.size()); - set.remove("two"); - assertFalse(set.contains("two")); - assertEquals(1, set.size()); - } - - @Test - public void createAndUseTArray() { - List list = newList(3); - assertEquals(null, list.get(0)); - assertEquals(null, list.get(1)); - assertEquals(null, list.get(2)); - list.set(0, "zero"); - list.set(1, "one"); - list.set(2, "two"); - assertEquals("zero", list.get(0)); - assertEquals("one", list.get(1)); - assertEquals("two", list.get(2)); - } -} diff --git a/akka-transactor/src/test/java/scala/concurrent/stm/TestException.java b/akka-transactor/src/test/java/scala/concurrent/stm/TestException.java deleted file mode 100644 index cc810761d4..0000000000 --- a/akka-transactor/src/test/java/scala/concurrent/stm/TestException.java +++ /dev/null @@ -1,9 +0,0 @@ -/* scala-stm - (c) 2009-2011, Stanford University, PPL */ - -package scala.concurrent.stm; - -public class TestException extends RuntimeException { - public TestException() { - super("Expected failure"); - } -} diff --git a/akka-transactor/src/test/scala/scala/concurrent/stm/JavaAPISuite.scala b/akka-transactor/src/test/scala/scala/concurrent/stm/JavaAPISuite.scala deleted file mode 100644 index 3d0c48e90f..0000000000 --- a/akka-transactor/src/test/scala/scala/concurrent/stm/JavaAPISuite.scala +++ /dev/null @@ -1,7 +0,0 @@ -/* scala-stm - (c) 2009-2011, Stanford University, PPL */ - -package scala.concurrent.stm - -import org.scalatest.junit.JUnitWrapperSuite - -class JavaAPISuite extends JUnitWrapperSuite("scala.concurrent.stm.JavaAPITests", Thread.currentThread.getContextClassLoader) diff --git a/akka-tutorials/akka-tutorial-first/src/main/resources/application.conf b/akka-tutorials/akka-tutorial-first/src/main/resources/application.conf new file mode 100644 index 0000000000..0a2509357e --- /dev/null +++ b/akka-tutorials/akka-tutorial-first/src/main/resources/application.conf @@ -0,0 +1,7 @@ +akka.actor.deployment { + /master/workerRouter { + # Uncomment the following two lines to change the calculation to use 10 workers instead of 4: + #router = round-robin + #nr-of-instances = 10 + } +} diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index a5c257ca84..f810da86a6 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -491,7 +491,7 @@ object Dependency { val Netty = "3.3.0.Final" val Protobuf = "2.4.1" val Rabbit = "2.3.1" - val ScalaStm = "0.4" + val ScalaStm = "0.5" val Scalatest = "1.6.1" val Slf4j = "1.6.4" val Spring = "3.0.5.RELEASE" diff --git a/project/scripts/release b/project/scripts/release index e768fd675b..b716a484bd 100755 --- a/project/scripts/release +++ b/project/scripts/release @@ -1,6 +1,15 @@ #!/usr/bin/env bash # # Release script for Akka. +# +# To run this script you need a user account on akka.io and contributor access +# to github.com/jboner/akka. +# +# If your username on akka.io is different from your local username then you can +# configure ssh to always associate a particular username with akka.io by adding +# the following to .ssh/config: +# Host akka.io +# User # defaults declare -r default_server="akka.io"