From dfe7e4ef63c40fdc2f63e925568941b7bae415f6 Mon Sep 17 00:00:00 2001 From: Henrik Engstrom Date: Fri, 23 Dec 2011 10:25:05 +0100 Subject: [PATCH 01/28] Updated and corrected documentation for remoting. See #1546 --- akka-docs/scala/remoting.rst | 88 +++++++++++++++------- akka-samples/akka-sample-remote/README.rst | 8 +- 2 files changed, 63 insertions(+), 33 deletions(-) diff --git a/akka-docs/scala/remoting.rst b/akka-docs/scala/remoting.rst index 8201e37815..189beb1862 100644 --- a/akka-docs/scala/remoting.rst +++ b/akka-docs/scala/remoting.rst @@ -17,67 +17,97 @@ In you SBT project you should add the following as a dependency:: "com.typesafe.akka" % "akka-remote" % "2.0-SNAPSHOT" -First of all you have to change the actor provider from ``LocalActorRefProvider`` to ``RemoteActorRefProvider``:: +To enable remote capabilities in your Akka project you should, at a minimum, add the following changes +to your ``application.conf`` file:: akka { actor { - provider = "akka.remote.RemoteActorRefProvider" + provider = "akka.remote.RemoteActorRefProvider" } - } - -After that you must also add the following settings:: - - akka { remote { + transport = "akka.remote.netty.NettyRemoteSupport" server { - # The hostname or ip to bind the remoting to, - # InetAddress.getLocalHost.getHostAddress is used if empty - hostname = "" - - # The default remote server port clients should connect to. - # Default is 2552 (AKKA) + hostname = "127.0.0.1" port = 2552 } - } + } + cluster.nodename = "someUniqueNameInTheCluster1" } -These are the bare minimal settings that must exist in order to get started with remoting. -There are, of course, more properties that can be tweaked. We refer to the following +As you can see in the example above there are four things you need to add to get started: + +* Change provider from ``LocalActorRefProvider`` to ``RemoteActorRefProvider`` +* Add host name - the machine you want to run the actor system on +* Add port number - the port the actor system should listen on +* Add cluster node name - must be a unique name in the cluster + +The example above only illustrates the bare minimum of properties you have to add to enable remoting. +There are lots of more properties that are related to remoting in Akka. We refer to the following reference file for more information: * `reference.conf of akka-remote `_ +Types of Remote Interaction +^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Akka has two ways of using remoting: + +* Lookup : used to look up an actor on a remote node +* Creation : used to create an actor on a remote node + +In the next sections these ways are described in detail. + Looking up Remote Actors ^^^^^^^^^^^^^^^^^^^^^^^^ -``actorFor(path)`` will obtain an ``ActorRef`` to an Actor on a remote node:: +``actorFor(path)`` will obtain an ``ActorRef`` to an Actor on a remote node, e.g.:: val actor = context.actorFor("akka://app@10.0.0.1:2552/user/serviceA/retrieval") As you can see from the example above the following pattern is used to find an ``ActorRef`` on a remote node:: - akka://@:/ + akka://@:/ + +Once you a reference to the actor you can interact with it they same way you would with a local actor, e.g.:: + + actor ! "Pretty awesome feature" Creating Actors Remotely ^^^^^^^^^^^^^^^^^^^^^^^^ -The configuration below instructs the system to deploy the actor "retrieval” on the specific host "app@10.0.0.1". -The "app" in this case refers to the name of the ``ActorSystem``:: +If you want to use the creation functionality in Akka remoting you have to further amend the +``application.conf`` file in the following way:: akka { actor { - deployment { - /serviceA/retrieval { - remote = “akka://app@10.0.0.1:2552” - } - } + provider = "akka.remote.RemoteActorRefProvider" + deployment { /sampleActor { + remote = "akka://sampleActorSystem@127.0.0.1:2553" + }} } - } + ... -Logical path lookup is supported on the node you are on, i.e. to use the -actor created above you would do the following:: +The configuration above instructs Akka to react specially once when an actor at path /actorName is created, i.e. +using system.actorOf(Props(...), "sampleActor"). This specific actor will not only be instantiated, +but instead the remote daemon of the remote system will be asked to create the actor instead, +which is at sampleActorSystem@127.0.0.1:2553 in this sample. - val actor = context.actorFor("/serviceA/retrieval") +Once you have configured the properties above you would do the following in code:: + + class SampleActor extends Actor { def receive = { case _ => println("Got something") } } + + val actor = context.actorOf(Props[SampleActor], "sampleActor") + actor ! "Pretty slick" + +``SampleActor`` has to be available to the runtimes using it, i.e. the classloader of the +actor systems has to have a JAR containing the class. + +Remote Sample Code +^^^^^^^^^^^^^^^^^^ + +There is a more extensive remote example that comes with the Akka distribution. +Please have a look here for more information: +`Remote Sample `_ Serialization ^^^^^^^^^^^^^ diff --git a/akka-samples/akka-sample-remote/README.rst b/akka-samples/akka-sample-remote/README.rst index 24f63ccc86..0732c33327 100644 --- a/akka-samples/akka-sample-remote/README.rst +++ b/akka-samples/akka-sample-remote/README.rst @@ -70,7 +70,7 @@ Open up a new terminal window and run SBT once more: > run -Select to run "sample.remote.calculator.LookupApp" which in the case below is number 1: +Select to run "sample.remote.calculator.LookupApp" which in the case below is number 1:: Multiple main classes detected, select one to run: @@ -80,7 +80,7 @@ Select to run "sample.remote.calculator.LookupApp" which in the case below is nu Enter number: 1 -Now you should see something like this: +Now you should see something like this:: [info] Running sample.remote.calculator.LookupApp [INFO] [12/22/2011 14:54:38.630] [run-main] [ActorSystem] REMOTE: RemoteServerStarted@akka://LookupApplication@127.0.0.1:2553 @@ -102,7 +102,7 @@ Once more you should open a new terminal window and run SBT: > run -Select to run "sample.remote.calculator.CreationApp" which in the case below is number 2: +Select to run "sample.remote.calculator.CreationApp" which in the case below is number 2:: Multiple main classes detected, select one to run: @@ -112,7 +112,7 @@ Select to run "sample.remote.calculator.CreationApp" which in the case below is Enter number: 2 -Now you should see something like this: +Now you should see something like this:: [info] Running sample.remote.calculator.CreationApp [INFO] [12/22/2011 14:57:02.150] [run-main] [ActorSystem] REMOTE: RemoteServerStarted@akka://RemoteCreation@127.0.0.1:2554 From e000bb533936bf2b3297dbd0e6df0ed9e657697f Mon Sep 17 00:00:00 2001 From: Henrik Engstrom Date: Fri, 23 Dec 2011 11:29:51 +0100 Subject: [PATCH 02/28] Fixed typos and wordings.See #1546 --- akka-docs/scala/remoting.rst | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/akka-docs/scala/remoting.rst b/akka-docs/scala/remoting.rst index 189beb1862..48ee1b5efa 100644 --- a/akka-docs/scala/remoting.rst +++ b/akka-docs/scala/remoting.rst @@ -55,18 +55,18 @@ Akka has two ways of using remoting: * Lookup : used to look up an actor on a remote node * Creation : used to create an actor on a remote node -In the next sections these ways are described in detail. +In the next sections the two alternatives are described in detail. Looking up Remote Actors ^^^^^^^^^^^^^^^^^^^^^^^^ ``actorFor(path)`` will obtain an ``ActorRef`` to an Actor on a remote node, e.g.:: - val actor = context.actorFor("akka://app@10.0.0.1:2552/user/serviceA/retrieval") + val actor = context.actorFor("akka://actorSystemName@10.0.0.1:2552/user/actorName") As you can see from the example above the following pattern is used to find an ``ActorRef`` on a remote node:: - akka://@:/ + akka://@:/ Once you a reference to the actor you can interact with it they same way you would with a local actor, e.g.:: @@ -87,10 +87,10 @@ If you want to use the creation functionality in Akka remoting you have to furth } ... -The configuration above instructs Akka to react specially once when an actor at path /actorName is created, i.e. -using system.actorOf(Props(...), "sampleActor"). This specific actor will not only be instantiated, -but instead the remote daemon of the remote system will be asked to create the actor instead, -which is at sampleActorSystem@127.0.0.1:2553 in this sample. +The configuration above instructs Akka to react when an actor with path /sampleActor is created, i.e. +using ``system.actorOf(Props(...)`, sampleActor)``. This specific actor will not be directly instantiated, +but instead the remote daemon of the remote system will be asked to create the actor, +which in this sample corresponds to ``sampleActorSystem@127.0.0.1:2553``. Once you have configured the properties above you would do the following in code:: From 0bcdc85bbc993bf2420d7c765399c05905fa6e43 Mon Sep 17 00:00:00 2001 From: Henrik Engstrom Date: Fri, 23 Dec 2011 12:11:55 +0100 Subject: [PATCH 03/28] Updated after feedback. See #1546 --- akka-docs/scala/remoting.rst | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/akka-docs/scala/remoting.rst b/akka-docs/scala/remoting.rst index 48ee1b5efa..4e8fb2692d 100644 --- a/akka-docs/scala/remoting.rst +++ b/akka-docs/scala/remoting.rst @@ -36,7 +36,7 @@ to your ``application.conf`` file:: As you can see in the example above there are four things you need to add to get started: -* Change provider from ``LocalActorRefProvider`` to ``RemoteActorRefProvider`` +* Change provider from ``akka.actor.LocalActorRefProvider`` to ``akka.remote.RemoteActorRefProvider`` * Add host name - the machine you want to run the actor system on * Add port number - the port the actor system should listen on * Add cluster node name - must be a unique name in the cluster @@ -52,8 +52,8 @@ Types of Remote Interaction Akka has two ways of using remoting: -* Lookup : used to look up an actor on a remote node -* Creation : used to create an actor on a remote node +* Lookup : used to look up an actor on a remote node with ``actorFor(path)`` +* Creation : used to create an actor on a remote node with ``actorOf(Props(...), actorName)`` In the next sections the two alternatives are described in detail. From fa031bce9938eda77d46152515ba6a6a827fd897 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sun, 25 Dec 2011 17:45:55 +0100 Subject: [PATCH 04/28] Defaulting the nodename to 'default' --- akka-remote/src/main/scala/akka/remote/Remote.scala | 6 +++--- .../src/main/scala/akka/remote/RemoteExtension.scala | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/Remote.scala b/akka-remote/src/main/scala/akka/remote/Remote.scala index b1425a2754..276431e808 100644 --- a/akka-remote/src/main/scala/akka/remote/Remote.scala +++ b/akka-remote/src/main/scala/akka/remote/Remote.scala @@ -142,9 +142,9 @@ class RemoteSystemDaemon(system: ActorSystemImpl, remote: Remote, _path: ActorPa val full = Vector() ++ names rec(full.mkString("/"), 0) match { - case (Nobody, _) ⇒ Nobody - case (ref, n) if n == 0 ⇒ ref - case (ref, n) ⇒ ref.getChild(full.takeRight(n).iterator) + case (Nobody, _) ⇒ Nobody + case (ref, 0) ⇒ ref + case (ref, n) ⇒ ref.getChild(full.takeRight(n).iterator) } } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala b/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala index 5c5d41b0d4..9e30a72f81 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala @@ -31,7 +31,7 @@ class RemoteSettings(val config: Config, val systemName: String) extends Extensi } val NodeName: String = config.getString("akka.cluster.nodename") match { - case "" ⇒ throw new ConfigurationException("akka.cluster.nodename configuration property must be defined") + case "" ⇒ "undefined" case value ⇒ value } From 2a45291b7dba8cc8b5f35068e47e9a085216309f Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sun, 25 Dec 2011 17:51:07 +0100 Subject: [PATCH 05/28] Outputting the node name on startuo --- akka-remote/src/main/scala/akka/remote/Remote.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-remote/src/main/scala/akka/remote/Remote.scala b/akka-remote/src/main/scala/akka/remote/Remote.scala index 276431e808..d9de67c473 100644 --- a/akka-remote/src/main/scala/akka/remote/Remote.scala +++ b/akka-remote/src/main/scala/akka/remote/Remote.scala @@ -111,7 +111,7 @@ class Remote(val settings: ActorSystem.Settings, val remoteSettings: RemoteSetti } } - log.info("Starting remote server on [{}]", remoteAddress) + log.info("Starting remote server on [{}] with node name [{}]", remoteAddress, provider.nodename) } } From 193618e195849d6854a9d3fd66d110fe48b9304a Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sun, 25 Dec 2011 18:30:44 +0100 Subject: [PATCH 06/28] Changing boxing of TimeoutException to boxing of InterruptedException and adding support for completing Ask Futures with an AskTimeoutException on timeout. --- .../src/test/scala/akka/dispatch/FutureSpec.scala | 6 +++--- akka-actor/src/main/scala/akka/actor/ActorRef.scala | 10 +++++++++- .../src/main/scala/akka/actor/ActorRefProvider.scala | 12 +++++------- akka-actor/src/main/scala/akka/dispatch/Future.scala | 2 +- 4 files changed, 18 insertions(+), 12 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index 15bf70b2e6..4fbb67fbb4 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -64,9 +64,9 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val future = Promise[String]().complete(Left(new RuntimeException(message))) behave like futureWithException[RuntimeException](_(future, message)) } - "completed with a j.u.c.TimeoutException" must { - val message = "Boxed TimeoutException" - val future = Promise[String]().complete(Left(new TimeoutException(message))) + "completed with an InterruptedException" must { + val message = "Boxed InterruptedException" + val future = Promise[String]().complete(Left(new InterruptedException(message))) behave like futureWithException[RuntimeException](_(future, message)) } "completed with a NonLocalReturnControl" must { diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index c9da4d5ae7..5e72ba14bc 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -13,7 +13,7 @@ import java.util.concurrent.TimeUnit import akka.event.EventStream import akka.event.DeathWatch import scala.annotation.tailrec -import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.{ ConcurrentHashMap, TimeoutException } import akka.event.LoggingAdapter import java.util.concurrent.atomic.AtomicBoolean @@ -489,6 +489,14 @@ class VirtualPathContainer(val path: ActorPath, override val getParent: Internal } } +/** + * This is what is used to complete a Future that is returned from an ask/? call, + * when it times out. + */ +class AskTimeoutException(message: String, cause: Throwable) extends TimeoutException { + def this(message: String) = this(message, null: Throwable) +} + class AskActorRef( val path: ActorPath, override val getParent: InternalActorRef, diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 115fca87d5..57ef9f108c 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -6,14 +6,12 @@ package akka.actor import java.util.concurrent.atomic.AtomicLong import org.jboss.netty.akka.util.{ TimerTask, HashedWheelTimer } -import akka.util.Timeout import akka.util.Timeout.intToTimeout import akka.config.ConfigurationException import akka.dispatch._ import akka.routing._ -import akka.util.Timeout import akka.AkkaException -import akka.util.{ Duration, Switch, Helpers } +import akka.util.{ Duration, Switch, Helpers, Timeout } import akka.event._ import java.io.Closeable @@ -485,15 +483,15 @@ class LocalActorRefProvider( def ask(within: Timeout): Option[AskActorRef] = { (if (within == null) settings.ActorTimeout else within) match { - case t if t.duration.length <= 0 ⇒ - None + case t if t.duration.length <= 0 ⇒ None case t ⇒ val path = tempPath() val name = path.name val a = new AskActorRef(path, tempContainer, dispatcher, deathWatch) tempContainer.addChild(name, a) - val f = dispatcher.prerequisites.scheduler.scheduleOnce(t.duration) { tempContainer.removeChild(name); a.stop() } - a.result onComplete { _ ⇒ + val result = a.result + val f = dispatcher.prerequisites.scheduler.scheduleOnce(t.duration) { result.failure(new AskTimeoutException("Timed out")) } + result onComplete { _ ⇒ try { a.stop(); f.cancel() } finally { tempContainer.removeChild(name) } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 2906cd7b08..681b7edb2f 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -337,7 +337,7 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { protected final def resolve[X](source: Either[Throwable, X]): Either[Throwable, X] = source match { case Left(t: scala.runtime.NonLocalReturnControl[_]) ⇒ Right(t.value.asInstanceOf[X]) - case Left(t: TimeoutException) ⇒ Left(new RuntimeException("Boxed TimeoutException", t)) + case Left(t: InterruptedException) ⇒ Left(new RuntimeException("Boxed InterruptedException", t)) case _ ⇒ source } From a0885f13621357d70cefba4bc3b9811059140594 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 26 Dec 2011 00:22:27 +0100 Subject: [PATCH 07/28] Switching to a default value for the nodename --- akka-remote/src/main/resources/reference.conf | 2 +- akka-remote/src/main/scala/akka/remote/RemoteExtension.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index b1900a237e..fb5b843e17 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -128,7 +128,7 @@ akka { cluster { name = "default-cluster" - nodename = "" + nodename = "default" seed-nodes = [] } } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala b/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala index 9e30a72f81..80e870c076 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala @@ -31,7 +31,7 @@ class RemoteSettings(val config: Config, val systemName: String) extends Extensi } val NodeName: String = config.getString("akka.cluster.nodename") match { - case "" ⇒ "undefined" + case "" ⇒ throw new ConfigurationException("Configuration option 'akka.cluster.nodename' must be non-empty.") case value ⇒ value } From 6ca343eb6ed5db453bd252fa98ad695cdfbc35dc Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 26 Dec 2011 00:37:52 +0100 Subject: [PATCH 08/28] Adding documentation for ask timeout expiry --- akka-actor/src/main/scala/akka/actor/ActorRef.scala | 5 +++++ akka-docs/java/untyped-actors.rst | 5 +++-- akka-docs/scala/actors.rst | 3 ++- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 5e72ba14bc..ee53fec688 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -106,6 +106,8 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable * Akka Java API. * * Sends a message asynchronously returns a future holding the eventual reply message. + * The Future will be completed with an [[akka.actor.AskTimeoutException]] after the given + * timeout has expired. * * NOTE: * Use this method with care. In most cases it is better to use 'tell' together with the sender @@ -177,6 +179,9 @@ trait ScalaActorRef { ref: ActorRef ⇒ /** * Sends a message asynchronously, returning a future which may eventually hold the reply. + * The Future will be completed with an [[akka.actor.AskTimeoutException]] after the given + * timeout has expired. + * * NOTE: * Use this method with care. In most cases it is better to use '!' together with implicit or explicit * sender parameter to implement non-blocking request/response message exchanges. diff --git a/akka-docs/java/untyped-actors.rst b/akka-docs/java/untyped-actors.rst index 23b9c18235..2b9a797b57 100644 --- a/akka-docs/java/untyped-actors.rst +++ b/akka-docs/java/untyped-actors.rst @@ -280,8 +280,9 @@ If invoked without the sender parameter the sender will be Ask: Send-And-Receive-Future ---------------------------- -Using ``ask`` will send a message to the receiving Actor asynchronously and -will immediately return a :class:`Future`: +Using ``?`` will send a message to the receiving Actor asynchronously and +will immediately return a :class:`Future` which will be completed with +an ``akka.actor.AskTimeoutException`` after the specified timeout: .. code-block:: java diff --git a/akka-docs/scala/actors.rst b/akka-docs/scala/actors.rst index 13fe9a7d30..12d368d148 100644 --- a/akka-docs/scala/actors.rst +++ b/akka-docs/scala/actors.rst @@ -318,7 +318,8 @@ Ask: Send-And-Receive-Future ---------------------------- Using ``?`` will send a message to the receiving Actor asynchronously and -will immediately return a :class:`Future`: +will immediately return a :class:`Future` which will be completed with +an ``akka.actor.AskTimeoutException`` after the specified timeout: .. code-block:: scala From 0265bc35ff8ee49238083f27ce7eccad550fbc25 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 27 Dec 2011 12:02:55 +0100 Subject: [PATCH 09/28] Fixing ScalaDoc errors in Future --- akka-actor/src/main/scala/akka/dispatch/Future.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 2906cd7b08..2ce0721f25 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -89,7 +89,7 @@ object Futures { /** * Java API - * A non-blocking fold over the specified futures. + * A non-blocking fold over the specified futures, with the start value of the given zero. * The fold is performed on the thread where the last future is completed, * the result will be the first failure of any of the futures, or any failure in the actual fold, * or the result of the fold. @@ -201,13 +201,13 @@ object Future { } /** - * A non-blocking fold over the specified futures. + * A non-blocking fold over the specified futures, with the start value of the given zero. * The fold is performed on the thread where the last future is completed, * the result will be the first failure of any of the futures, or any failure in the actual fold, * or the result of the fold. * Example: *
-   *   val result = Await.result(Futures.fold(0)(futures)(_ + _), 5 seconds)
+   *   val result = Await.result(Future.fold(futures)(0)(_ + _), 5 seconds)
    * 
*/ def fold[T, R](futures: Traversable[Future[T]])(zero: R)(foldFun: (R, T) ⇒ R)(implicit dispatcher: MessageDispatcher): Future[R] = { @@ -231,7 +231,7 @@ object Future { * This is useful for performing a parallel map. For example, to apply a function to all items of a list * in parallel: *
-   * val myFutureList = Futures.traverse(myList)(x ⇒ Future(myFunc(x)))
+   * val myFutureList = Future.traverse(myList)(x ⇒ Future(myFunc(x)))
    * 
*/ def traverse[A, B, M[_] <: Traversable[_]](in: M[A])(fn: A ⇒ Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], dispatcher: MessageDispatcher): Future[M[B]] = From a9ed565ebf8af8d2b355dc72c960cf21135ba7a8 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 27 Dec 2011 13:20:15 +0100 Subject: [PATCH 10/28] Adding validation for ActorPath, to disallow slashes. --- akka-actor/src/main/scala/akka/actor/ActorPath.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/akka-actor/src/main/scala/akka/actor/ActorPath.scala b/akka-actor/src/main/scala/akka/actor/ActorPath.scala index 399011fd7a..a36c5e8973 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorPath.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorPath.scala @@ -110,6 +110,7 @@ final case class RootActorPath(address: Address, name: String = "/") extends Act } final class ChildActorPath(val parent: ActorPath, val name: String) extends ActorPath { + if (name.indexOf('/') != -1) throw new IllegalArgumentException("/ is a path separator and is not legal in ActorPath names: [%s]" format name) def address: Address = root.address From da708901a1518362f62177d12b7ab20b3d1babf9 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 27 Dec 2011 13:35:50 +0100 Subject: [PATCH 11/28] Switching withCreator + class to Props + class --- .../java/code/akka/docs/actor/SchedulerDocTestBase.java | 2 +- .../java/code/akka/docs/actor/UntypedActorDocTestBase.java | 2 +- .../code/akka/docs/dispatcher/DispatcherDocTestBase.java | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/akka-docs/java/code/akka/docs/actor/SchedulerDocTestBase.java b/akka-docs/java/code/akka/docs/actor/SchedulerDocTestBase.java index 5067b7a51b..1998c45a76 100644 --- a/akka-docs/java/code/akka/docs/actor/SchedulerDocTestBase.java +++ b/akka-docs/java/code/akka/docs/actor/SchedulerDocTestBase.java @@ -34,7 +34,7 @@ public class SchedulerDocTestBase { @Before public void setUp() { system = ActorSystem.create("MySystem", AkkaSpec.testConf()); - testActor = system.actorOf(new Props().withCreator(MyUntypedActor.class)); + testActor = system.actorOf(new Props(MyUntypedActor.class)); } @After diff --git a/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java b/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java index bdd359892f..27a7232df6 100644 --- a/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java +++ b/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java @@ -96,7 +96,7 @@ public class UntypedActorDocTestBase { public void propsActorOf() { ActorSystem system = ActorSystem.create("MySystem"); //#creating-props - ActorRef myActor = system.actorOf(new Props().withCreator(MyUntypedActor.class).withDispatcher("my-dispatcher"), + ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class).withDispatcher("my-dispatcher"), "myactor"); //#creating-props myActor.tell("test"); diff --git a/akka-docs/java/code/akka/docs/dispatcher/DispatcherDocTestBase.java b/akka-docs/java/code/akka/docs/dispatcher/DispatcherDocTestBase.java index fc76c36a14..8b2006edd8 100644 --- a/akka-docs/java/code/akka/docs/dispatcher/DispatcherDocTestBase.java +++ b/akka-docs/java/code/akka/docs/dispatcher/DispatcherDocTestBase.java @@ -57,9 +57,9 @@ public class DispatcherDocTestBase { @Test public void defineDispatcher() { //#defining-dispatcher - ActorRef myActor1 = system.actorOf(new Props().withCreator(MyUntypedActor.class).withDispatcher("my-dispatcher"), + ActorRef myActor1 = system.actorOf(new Props(MyUntypedActor.class).withDispatcher("my-dispatcher"), "myactor1"); - ActorRef myActor2 = system.actorOf(new Props().withCreator(MyUntypedActor.class).withDispatcher("my-dispatcher"), + ActorRef myActor2 = system.actorOf(new Props(MyUntypedActor.class).withDispatcher("my-dispatcher"), "myactor2"); //#defining-dispatcher } @@ -68,7 +68,7 @@ public class DispatcherDocTestBase { public void definePinnedDispatcher() { //#defining-pinned-dispatcher String name = "myactor"; - ActorRef myActor = system.actorOf(new Props().withCreator(MyUntypedActor.class) + ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class) .withDispatcher("myactor-dispatcher"), name); //#defining-pinned-dispatcher } From 59333c0de681d5f2eeaaa63596a4a4382d673607 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 27 Dec 2011 13:53:36 +0100 Subject: [PATCH 12/28] Adding ScalaDoc for Resume, Restart, Stop and Escalate --- .../main/scala/akka/actor/FaultHandling.scala | 38 ++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala index e4e2ee856a..db41a0fd25 100644 --- a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala @@ -46,15 +46,51 @@ case class ChildRestartStats(val child: ActorRef, var maxNrOfRetriesCount: Int = object FaultHandlingStrategy { sealed trait Action + + /** + * Resumes message processing for the failed Actor + */ case object Resume extends Action + + /** + * Discards the old Actor instance and replaces it with a new, + * then resumes message processing. + */ case object Restart extends Action + + /** + * Stops the Actor + */ case object Stop extends Action + + /** + * Escalates the failure to the supervisor of the supervisor + */ case object Escalate extends Action - // Java API + /** + * Resumes message processing for the failed Actor + * Java API + */ def resume = Resume + + /** + * Discards the old Actor instance and replaces it with a new, + * then resumes message processing. + * Java API + */ def restart = Restart + + /** + * Stops the Actor + * Java API + */ def stop = Stop + + /** + * Escalates the failure to the supervisor of the supervisor + * Java API + */ def escalate = Escalate type Decider = PartialFunction[Throwable, Action] From 8c345525f5b22780e7fe0a869abc9fb06641b09b Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 27 Dec 2011 13:57:37 +0100 Subject: [PATCH 13/28] Adding rethrowing explanation for Escalate --- akka-actor/src/main/scala/akka/actor/FaultHandling.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala index db41a0fd25..afdd683419 100644 --- a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala @@ -64,7 +64,8 @@ object FaultHandlingStrategy { case object Stop extends Action /** - * Escalates the failure to the supervisor of the supervisor + * Escalates the failure to the supervisor of the supervisor, + * by rethrowing the cause of the failure. */ case object Escalate extends Action @@ -88,7 +89,8 @@ object FaultHandlingStrategy { def stop = Stop /** - * Escalates the failure to the supervisor of the supervisor + * Escalates the failure to the supervisor of the supervisor, + * by rethrowing the cause of the failure. * Java API */ def escalate = Escalate From 8db3f6aa0a2eed4fd9080f8a8a9ee33200a2b0b0 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 27 Dec 2011 14:43:37 +0100 Subject: [PATCH 14/28] Removing mailboxIsEmpty and mailboxSize from MessageDispatcher --- .../src/test/scala/akka/actor/FSMTimingSpec.scala | 4 ++-- akka-actor/src/main/scala/akka/actor/ActorCell.scala | 2 +- .../main/scala/akka/dispatch/AbstractDispatcher.scala | 10 ---------- akka-actor/src/main/scala/akka/routing/Pool.scala | 4 ++-- .../scala/akka/testkit/CallingThreadDispatcher.scala | 8 ++------ 5 files changed, 7 insertions(+), 21 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala index 2d7534c755..7168daa265 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala @@ -182,7 +182,7 @@ object FSMTimingSpec { when(TestCancelTimer) { case Ev(Tick) ⇒ setTimer("hallo", Tock, 1 milli, false) - TestKit.awaitCond(!context.dispatcher.mailboxIsEmpty(context.asInstanceOf[ActorCell]), 1 second) + TestKit.awaitCond(context.asInstanceOf[ActorCell].mailbox.hasMessages, 1 second) cancelTimer("hallo") sender ! Tick setTimer("hallo", Tock, 500 millis, false) @@ -209,7 +209,7 @@ object FSMTimingSpec { case Ev(Tick) ⇒ suspend(self) setTimer("named", Tock, 1 millis, false) - TestKit.awaitCond(!context.dispatcher.mailboxIsEmpty(context.asInstanceOf[ActorCell]), 1 second) + TestKit.awaitCond(context.asInstanceOf[ActorCell].mailbox.hasMessages, 1 second) stay forMax (1 millis) replying Tick case Ev(Tock) ⇒ goto(TestCancelStateTimerInNamedTimerMessage2) diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 6a532136b4..1d929c1de0 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -566,7 +566,7 @@ private[akka] class ActorCell( final def checkReceiveTimeout() { val recvtimeout = receiveTimeoutData - if (recvtimeout._1 > 0 && dispatcher.mailboxIsEmpty(this)) { + if (recvtimeout._1 > 0 && !mailbox.hasMessages) { recvtimeout._2.cancel() //Cancel any ongoing future //Only reschedule if desired and there are currently no more messages to be processed receiveTimeoutData = (recvtimeout._1, system.scheduler.scheduleOnce(Duration(recvtimeout._1, TimeUnit.MILLISECONDS), self, ReceiveTimeout)) diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 905d2d6498..8832b71afc 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -257,16 +257,6 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext * Must be idempotent */ protected[akka] def shutdown(): Unit - - /** - * Returns the size of the mailbox for the specified actor - */ - def mailboxSize(actor: ActorCell): Int = actor.mailbox.numberOfMessages - - /** - * Returns the "current" emptiness status of the mailbox for the specified actor - */ - def mailboxIsEmpty(actor: ActorCell): Boolean = !actor.mailbox.hasMessages } /** diff --git a/akka-actor/src/main/scala/akka/routing/Pool.scala b/akka-actor/src/main/scala/akka/routing/Pool.scala index 138be5e902..988820cf18 100644 --- a/akka-actor/src/main/scala/akka/routing/Pool.scala +++ b/akka-actor/src/main/scala/akka/routing/Pool.scala @@ -165,7 +165,7 @@ trait SmallestMailboxSelector { var take = if (partialFill) math.min(selectionCount, delegates.length) else selectionCount def mailboxSize(a: ActorRef): Int = a match { - case l: LocalActorRef ⇒ l.underlying.dispatcher.mailboxSize(l.underlying) + case l: LocalActorRef ⇒ l.underlying.mailbox.numberOfMessages case _ ⇒ Int.MaxValue //Non-local actors mailbox size is unknown, so consider them lowest priority } @@ -282,7 +282,7 @@ trait MailboxPressureCapacitor { def pressureThreshold: Int def pressure(delegates: Seq[ActorRef]): Int = delegates count { - case a: LocalActorRef ⇒ a.underlying.dispatcher.mailboxSize(a.underlying) > pressureThreshold + case a: LocalActorRef ⇒ a.underlying.mailbox.numberOfMessages > pressureThreshold case _ ⇒ false } } diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 784bb6f184..947ae4e262 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -165,10 +165,6 @@ class CallingThreadDispatcher( } } - override def mailboxSize(actor: ActorCell) = getMailbox(actor) map (_.queue.size) getOrElse 0 - - override def mailboxIsEmpty(actor: ActorCell): Boolean = getMailbox(actor) map (_.queue.isEmpty) getOrElse true - protected[akka] override def systemDispatch(receiver: ActorCell, message: SystemMessage) { receiver.mailbox match { case mbox: CallingThreadMailbox ⇒ @@ -304,6 +300,6 @@ class CallingThreadMailbox(_receiver: ActorCell) extends Mailbox(_receiver) with override def enqueue(receiver: ActorRef, msg: Envelope) {} override def dequeue() = null - override def hasMessages = true - override def numberOfMessages = 0 + override def hasMessages = queue.isEmpty + override def numberOfMessages = queue.size } From 2044faa72af4654506e7fa0b037211931d8dc458 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 27 Dec 2011 14:51:57 +0100 Subject: [PATCH 15/28] Removing unchecked pattern --- .../src/test/scala/akka/testkit/TestActorRefSpec.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala index 6fbe81a0cc..747a9c90e9 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala @@ -56,9 +56,9 @@ object TestActorRefSpec { class WorkerActor() extends TActor { def receiveT = { - case "work" ⇒ sender ! "workDone"; context.stop(self) - case replyTo: Promise[Any] ⇒ replyTo.success("complexReply") - case replyTo: ActorRef ⇒ replyTo ! "complexReply" + case "work" ⇒ sender ! "workDone"; context.stop(self) + case replyTo: Promise[_] ⇒ replyTo.asInstanceOf[Promise[Any]].success("complexReply") + case replyTo: ActorRef ⇒ replyTo ! "complexReply" } } From 88d6b89163541b8e1d01ea9a941c0554da05a29b Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 27 Dec 2011 15:07:36 +0100 Subject: [PATCH 16/28] Automatically changing min/max so that min is always <= max and max is always >= min --- .../main/scala/akka/dispatch/ThreadPoolBuilder.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index 22dfd15ffa..aba58df3a4 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -111,10 +111,16 @@ case class ThreadPoolConfigDispatcherBuilder(dispatcherFactory: (ThreadPoolConfi this.copy(config = config.copy(queueFactory = arrayBlockingQueue(capacity, fair))) def setCorePoolSize(size: Int): ThreadPoolConfigDispatcherBuilder = - this.copy(config = config.copy(corePoolSize = size)) + if (config.maxPoolSize < size) + this.copy(config = config.copy(corePoolSize = size, maxPoolSize = size)) + else + this.copy(config = config.copy(corePoolSize = size)) def setMaxPoolSize(size: Int): ThreadPoolConfigDispatcherBuilder = - this.copy(config = config.copy(maxPoolSize = size)) + if (config.corePoolSize > size) + this.copy(config = config.copy(corePoolSize = size, maxPoolSize = size)) + else + this.copy(config = config.copy(maxPoolSize = size)) def setCorePoolSizeFromFactor(min: Int, multiplier: Double, max: Int): ThreadPoolConfigDispatcherBuilder = setCorePoolSize(scaledPoolSize(min, multiplier, max)) From 5ff8f4e2a414fe68deadd24501bda14d15ac1c90 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 27 Dec 2011 16:22:24 +0100 Subject: [PATCH 17/28] Verifying serializability of messages when serialize is on --- .../akka/dispatch/MailboxConfigSpec.scala | 2 +- .../src/main/scala/akka/actor/ActorCell.scala | 2 +- .../main/scala/akka/actor/ActorSystem.scala | 85 +++++-------------- .../akka/dispatch/AbstractDispatcher.scala | 16 ++-- .../akka/actor/mailbox/DurableMailbox.scala | 2 +- .../actor/mailbox/MongoBasedMailbox.scala | 2 +- .../actor/mailbox/MongoDurableMessage.scala | 5 +- 7 files changed, 35 insertions(+), 79 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index 534c4a757d..b797827680 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -75,7 +75,7 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn result } - def createMessageInvocation(msg: Any): Envelope = Envelope(msg, system.deadLetters) + def createMessageInvocation(msg: Any): Envelope = Envelope(msg, system.deadLetters)(system) def ensureInitialMailboxState(config: MailboxType, q: Mailbox) { q must not be null diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 1d929c1de0..55a388683b 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -308,7 +308,7 @@ private[akka] class ActorCell( } final def tell(message: Any, sender: ActorRef): Unit = - dispatcher.dispatch(this, Envelope(message, if (sender eq null) system.deadLetters else sender)) + dispatcher.dispatch(this, Envelope(message, if (sender eq null) system.deadLetters else sender)(system)) final def sender: ActorRef = currentMessage match { case null ⇒ system.deadLetters diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 8c9a3bd5b9..7ee4ad80d1 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -58,9 +58,9 @@ object ActorSystem { def create(): ActorSystem = apply() def apply(): ActorSystem = apply("default") - class Settings(cfg: Config, val name: String) { + class Settings(cfg: Config, final val name: String) { - val config: Config = { + final val config: Config = { val config = cfg.withFallback(ConfigFactory.defaultReference) config.checkValid(ConfigFactory.defaultReference, "akka") config @@ -69,81 +69,38 @@ object ActorSystem { import scala.collection.JavaConverters._ import config._ - val ConfigVersion = getString("akka.version") + final val ConfigVersion = getString("akka.version") - val ProviderClass = getString("akka.actor.provider") + final val ProviderClass = getString("akka.actor.provider") - val CreationTimeout = Timeout(Duration(getMilliseconds("akka.actor.creation-timeout"), MILLISECONDS)) - val ReaperInterval = Duration(getMilliseconds("akka.actor.reaper-interval"), MILLISECONDS) - val ActorTimeout = Timeout(Duration(getMilliseconds("akka.actor.timeout"), MILLISECONDS)) - val SerializeAllMessages = getBoolean("akka.actor.serialize-messages") + final val CreationTimeout = Timeout(Duration(getMilliseconds("akka.actor.creation-timeout"), MILLISECONDS)) + final val ReaperInterval = Duration(getMilliseconds("akka.actor.reaper-interval"), MILLISECONDS) + final val ActorTimeout = Timeout(Duration(getMilliseconds("akka.actor.timeout"), MILLISECONDS)) + final val SerializeAllMessages = getBoolean("akka.actor.serialize-messages") - val LogLevel = getString("akka.loglevel") - val StdoutLogLevel = getString("akka.stdout-loglevel") - val EventHandlers: Seq[String] = getStringList("akka.event-handlers").asScala - val LogConfigOnStart = config.getBoolean("akka.logConfigOnStart") - val AddLoggingReceive = getBoolean("akka.actor.debug.receive") - val DebugAutoReceive = getBoolean("akka.actor.debug.autoreceive") - val DebugLifecycle = getBoolean("akka.actor.debug.lifecycle") - val FsmDebugEvent = getBoolean("akka.actor.debug.fsm") - val DebugEventStream = getBoolean("akka.actor.debug.event-stream") + final val LogLevel = getString("akka.loglevel") + final val StdoutLogLevel = getString("akka.stdout-loglevel") + final val EventHandlers: Seq[String] = getStringList("akka.event-handlers").asScala + final val LogConfigOnStart = config.getBoolean("akka.logConfigOnStart") + final val AddLoggingReceive = getBoolean("akka.actor.debug.receive") + final val DebugAutoReceive = getBoolean("akka.actor.debug.autoreceive") + final val DebugLifecycle = getBoolean("akka.actor.debug.lifecycle") + final val FsmDebugEvent = getBoolean("akka.actor.debug.fsm") + final val DebugEventStream = getBoolean("akka.actor.debug.event-stream") - val Home = config.getString("akka.home") match { + final val Home = config.getString("akka.home") match { case "" ⇒ None case x ⇒ Some(x) } - val SchedulerTickDuration = Duration(getMilliseconds("akka.scheduler.tickDuration"), MILLISECONDS) - val SchedulerTicksPerWheel = getInt("akka.scheduler.ticksPerWheel") + final val SchedulerTickDuration = Duration(getMilliseconds("akka.scheduler.tickDuration"), MILLISECONDS) + final val SchedulerTicksPerWheel = getInt("akka.scheduler.ticksPerWheel") if (ConfigVersion != Version) throw new ConfigurationException("Akka JAR version [" + Version + "] does not match the provided config version [" + ConfigVersion + "]") override def toString: String = config.root.render } - - // TODO move to migration kit - object OldConfigurationLoader { - - val defaultConfig: Config = { - val cfg = fromProperties orElse fromClasspath orElse fromHome getOrElse emptyConfig - cfg.withFallback(ConfigFactory.defaultReference).resolve(ConfigResolveOptions.defaults) - } - - // file extensions (.conf, .json, .properties), are handled by parseFileAnySyntax - val defaultLocation: String = (systemMode orElse envMode).map("akka." + _).getOrElse("akka") - - private def envMode = System.getenv("AKKA_MODE") match { - case null | "" ⇒ None - case value ⇒ Some(value) - } - - private def systemMode = System.getProperty("akka.mode") match { - case null | "" ⇒ None - case value ⇒ Some(value) - } - - private def configParseOptions = ConfigParseOptions.defaults.setAllowMissing(false) - - private def fromProperties = try { - val property = Option(System.getProperty("akka.config")) - property.map(p ⇒ - ConfigFactory.systemProperties.withFallback( - ConfigFactory.parseFileAnySyntax(new File(p), configParseOptions))) - } catch { case _ ⇒ None } - - private def fromClasspath = try { - Option(ConfigFactory.systemProperties.withFallback( - ConfigFactory.parseResourcesAnySyntax(ActorSystem.getClass, "/" + defaultLocation, configParseOptions))) - } catch { case _ ⇒ None } - - private def fromHome = try { - Option(ConfigFactory.systemProperties.withFallback( - ConfigFactory.parseFileAnySyntax(new File(GlobalHome.get + "/config/" + defaultLocation), configParseOptions))) - } catch { case _ ⇒ None } - - private def emptyConfig = ConfigFactory.systemProperties - } } /** @@ -323,7 +280,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor import ActorSystem._ - val settings = new Settings(applicationConfig, name) + final val settings = new Settings(applicationConfig, name) def logConfiguration(): Unit = log.info(settings.toString) diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 8832b71afc..d4f74ac14b 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -6,21 +6,21 @@ package akka.dispatch import java.util.concurrent._ import akka.event.Logging.Error -import akka.util.{ Duration, Switch, ReentrantGuard } -import atomic.{ AtomicInteger, AtomicLong } -import java.util.concurrent.ThreadPoolExecutor.{ AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy, DiscardPolicy } +import akka.util.Duration import akka.actor._ import akka.actor.ActorSystem -import locks.ReentrantLock import scala.annotation.tailrec import akka.event.EventStream -import akka.actor.ActorSystem.Settings import com.typesafe.config.Config -import java.util.concurrent.atomic.AtomicReference import akka.util.ReflectiveAccess +import akka.serialization.SerializationExtension -final case class Envelope(val message: Any, val sender: ActorRef) { +final case class Envelope(val message: Any, val sender: ActorRef)(system: ActorSystem) { if (message.isInstanceOf[AnyRef] && (message.asInstanceOf[AnyRef] eq null)) throw new InvalidMessageException("Message is null") + else if (system.settings.SerializeAllMessages) SerializationExtension(system).serialize(message.asInstanceOf[AnyRef]) match { + case Left(t) ⇒ throw t + case Right(_) ⇒ //Just verify that it works to serialize it + } } object SystemMessage { @@ -103,7 +103,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext def name: String /** - * Identfier of this dispatcher, corresponds to the full key + * Identifier of this dispatcher, corresponds to the full key * of the dispatcher configuration. */ def id: String diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala index 6a474c8ab7..1d684b8f58 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala @@ -67,7 +67,7 @@ trait DurableMessageSerialization { val message = MessageSerializer.deserialize(owner.system, durableMessage.getMessage) val sender = deserializeActorRef(durableMessage.getSender) - new Envelope(message, sender) + new Envelope(message, sender)(owner.system) } } diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala index dfb8e3a481..f6507c41d2 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala @@ -72,7 +72,7 @@ class MongoBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) { doc match { case Some(msg) ⇒ { log.debug("DEQUEUING message in mongo-based mailbox [{}]", msg) - envelopePromise.success(msg.envelope()) + envelopePromise.success(msg.envelope(system)) log.debug("DEQUEUING messageInvocation in mongo-based mailbox [{}]", envelopePromise) } case None ⇒ diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoDurableMessage.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoDurableMessage.scala index af82322276..8a714147f0 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoDurableMessage.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoDurableMessage.scala @@ -10,9 +10,8 @@ import org.bson.io.OutputBuffer import org.bson.types.ObjectId import java.io.InputStream import org.bson.collection._ -import akka.actor.LocalActorRef -import akka.actor.ActorRef import akka.dispatch.Envelope +import akka.actor.{ ActorSystem, LocalActorRef, ActorRef } /** * A container message for durable mailbox messages, which can be easily stuffed into @@ -32,7 +31,7 @@ case class MongoDurableMessage( val sender: ActorRef, val _id: ObjectId = new ObjectId) { - def envelope() = Envelope(message, sender) + def envelope(system: ActorSystem) = Envelope(message, sender)(system) } // vim: set ts=2 sw=2 sts=2 et: From 642b3aae2b2cb1bd6c9a86a69fd15fd06fab63ae Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 27 Dec 2011 16:36:30 +0100 Subject: [PATCH 18/28] Making sure it deserializes too --- .../akka/dispatch/AbstractDispatcher.scala | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index d4f74ac14b..90c68a4996 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -16,10 +16,19 @@ import akka.util.ReflectiveAccess import akka.serialization.SerializationExtension final case class Envelope(val message: Any, val sender: ActorRef)(system: ActorSystem) { - if (message.isInstanceOf[AnyRef] && (message.asInstanceOf[AnyRef] eq null)) throw new InvalidMessageException("Message is null") - else if (system.settings.SerializeAllMessages) SerializationExtension(system).serialize(message.asInstanceOf[AnyRef]) match { - case Left(t) ⇒ throw t - case Right(_) ⇒ //Just verify that it works to serialize it + if (message.isInstanceOf[AnyRef]) { + val msg = message.asInstanceOf[AnyRef] + if (msg eq null) throw new InvalidMessageException("Message is null") + if (system.settings.SerializeAllMessages) { + val ser = SerializationExtension(system) + ser.serialize(msg) match { //Verify serializability + case Left(t) ⇒ throw t + case Right(bytes) ⇒ ser.deserialize(bytes, msg.getClass, None) match { //Verify deserializability + case Left(t) ⇒ throw t + case _ ⇒ //All good + } + } + } } } From f6f56889f8d09942cef8d73866b1134d40dfcd69 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 27 Dec 2011 16:56:32 +0100 Subject: [PATCH 19/28] Adding support for checking the serializability of the Props.creator as per #1462 --- akka-actor/src/main/resources/reference.conf | 8 ++++++-- akka-actor/src/main/scala/akka/actor/ActorCell.scala | 11 +++++++++++ .../src/main/scala/akka/actor/ActorSystem.scala | 1 + 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 42993515e8..6dd2b2a452 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -50,10 +50,14 @@ akka { # - TypedActor: methods with non-void return type timeout = 5s - # Does a deep clone of (non-primitive) messages to ensure immutability - # FIXME: not used, make use of it or remove the option + # Serializes and deserializes (non-primitive) messages to ensure immutability, + # this is only intended for testing. serialize-messages = off + # Serializes and deserializes creators (in Props) to ensure that they can be sent over the network, + # this is only intended for testing. + serialize-creators = off + deployment { # deployment id pattern - on the format: /parent/child etc. diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 55a388683b..a214e234d0 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -13,6 +13,7 @@ import akka.event.Logging.{ Debug, Warning, Error } import akka.util.{ Duration, Helpers } import akka.japi.Procedure import java.io.{ NotSerializableException, ObjectOutputStream } +import akka.serialization.SerializationExtension //TODO: everything here for current compatibility - could be limited more @@ -214,6 +215,16 @@ private[akka] class ActorCell( final var childrenRefs: TreeMap[String, ChildRestartStats] = emptyChildrenRefs private def _actorOf(props: Props, name: String): ActorRef = { + if (system.settings.SerializeAllCreators) { + val ser = SerializationExtension(system) + ser.serialize(props.creator) match { + case Left(t) ⇒ throw t + case Right(bytes) ⇒ ser.deserialize(bytes, props.creator.getClass, None) match { + case Left(t) ⇒ throw t + case _ ⇒ //All good + } + } + } val actor = provider.actorOf(systemImpl, props, self, self.path / name, false, None) childrenRefs = childrenRefs.updated(name, ChildRestartStats(actor)) actor diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 7ee4ad80d1..dbe5630789 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -77,6 +77,7 @@ object ActorSystem { final val ReaperInterval = Duration(getMilliseconds("akka.actor.reaper-interval"), MILLISECONDS) final val ActorTimeout = Timeout(Duration(getMilliseconds("akka.actor.timeout"), MILLISECONDS)) final val SerializeAllMessages = getBoolean("akka.actor.serialize-messages") + final val SerializeAllCreators = getBoolean("akka.actor.serialize-creators") final val LogLevel = getString("akka.loglevel") final val StdoutLogLevel = getString("akka.stdout-loglevel") From 9701685897951e097c2a29f10ced25881bdcf4e0 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 27 Dec 2011 17:30:05 +0100 Subject: [PATCH 20/28] Adding support for verifying serializability of Props.creator as well --- .../akka/serialization/SerializeSpec.scala | 59 +++++++++++++++++++ .../src/main/scala/akka/actor/Actor.scala | 9 +++ .../src/main/scala/akka/actor/ActorCell.scala | 2 +- .../akka/dispatch/AbstractDispatcher.scala | 2 +- .../src/main/scala/akka/event/Logging.scala | 2 +- 5 files changed, 71 insertions(+), 3 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala index cceb608452..12fe90bbec 100644 --- a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala @@ -10,6 +10,9 @@ import akka.testkit.AkkaSpec import com.typesafe.config.ConfigFactory import akka.actor._ import java.io._ +import akka.dispatch.Await +import akka.util.Timeout +import akka.util.duration._ object SerializeSpec { @@ -129,3 +132,59 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) { } } } + +object VerifySerializabilitySpec { + val conf = ConfigFactory.parseString(""" + akka { + actor { + serialize-messages = on + + serialize-creators = on + + serializers { + java = "akka.serialization.JavaSerializer" + proto = "akka.testing.ProtobufSerializer" + sjson = "akka.testing.SJSONSerializer" + default = "akka.serialization.JavaSerializer" + } + + serialization-bindings { + java = ["akka.serialization.SerializeSpec$Address", "akka.serialization.MyJavaSerializableActor", "akka.serialization.MyStatelessActorWithMessagesInMailbox", "akka.serialization.MyActorWithProtobufMessagesInMailbox"] + sjson = ["akka.serialization.SerializeSpec$Person"] + proto = ["com.google.protobuf.Message", "akka.actor.ProtobufProtocol$MyMessage"] + } + } + } + """) + + class FooActor extends Actor { + def receive = { + case s: String ⇒ sender ! s + } + } + + class NonSerializableActor(system: ActorSystem) extends Actor { + def receive = { + case s: String ⇒ sender ! s + } + } +} + +class VerifySerializabilitySpec extends AkkaSpec(VerifySerializabilitySpec.conf) { + import VerifySerializabilitySpec._ + + "verify creators and messages" in { + implicit val timeout = Timeout(5 seconds) + system.settings.SerializeAllCreators must be(true) + system.settings.SerializeAllMessages must be(true) + + val a = system.actorOf(Props[FooActor]) + Await.result(a ? "pigdog", timeout.duration) must be("pigdog") + intercept[NotSerializableException] { + Await.result(a ? new AnyRef, timeout.duration) + } + intercept[java.io.NotSerializableException] { + val b = system.actorOf(Props(new NonSerializableActor(system))) + } + } +} diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 2c44ec70c6..8713df95b4 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -25,8 +25,17 @@ import java.util.regex.Pattern */ trait AutoReceivedMessage extends Serializable +/** + * Marker trait to indicate that a message might be potentially harmful, + * this is used to block messages coming in over remoting. + */ trait PossiblyHarmful +/** + * Marker trait to signal that this class should not be verified for serializability. + */ +trait NoSerializationVerificationNeeded + case class Failed(cause: Throwable) extends AutoReceivedMessage with PossiblyHarmful case object PoisonPill extends AutoReceivedMessage with PossiblyHarmful diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index a214e234d0..69b8c0b6c9 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -215,7 +215,7 @@ private[akka] class ActorCell( final var childrenRefs: TreeMap[String, ChildRestartStats] = emptyChildrenRefs private def _actorOf(props: Props, name: String): ActorRef = { - if (system.settings.SerializeAllCreators) { + if (system.settings.SerializeAllCreators && !props.creator.isInstanceOf[NoSerializationVerificationNeeded]) { val ser = SerializationExtension(system) ser.serialize(props.creator) match { case Left(t) ⇒ throw t diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 90c68a4996..24b9da4447 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -19,7 +19,7 @@ final case class Envelope(val message: Any, val sender: ActorRef)(system: ActorS if (message.isInstanceOf[AnyRef]) { val msg = message.asInstanceOf[AnyRef] if (msg eq null) throw new InvalidMessageException("Message is null") - if (system.settings.SerializeAllMessages) { + if (system.settings.SerializeAllMessages && !msg.isInstanceOf[NoSerializationVerificationNeeded]) { val ser = SerializationExtension(system) ser.serialize(msg) match { //Verify serializability case Left(t) ⇒ throw t diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index 1797eb9d18..fbceb6973d 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -363,7 +363,7 @@ object Logging { * message. This is necessary to ensure that additional subscriptions are in * effect when the logging system finished starting. */ - case class InitializeLogger(bus: LoggingBus) + case class InitializeLogger(bus: LoggingBus) extends NoSerializationVerificationNeeded /** * Response message each logger must send within 1 second after receiving the From 4e4d95621138fa16561574ebb9233304188cb312 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 27 Dec 2011 18:17:09 +0100 Subject: [PATCH 21/28] Removing nodename from the necessities in the remoting docs --- akka-docs/scala/remoting.rst | 2 -- 1 file changed, 2 deletions(-) diff --git a/akka-docs/scala/remoting.rst b/akka-docs/scala/remoting.rst index 6e69202505..241ae31510 100644 --- a/akka-docs/scala/remoting.rst +++ b/akka-docs/scala/remoting.rst @@ -31,7 +31,6 @@ to your ``application.conf`` file:: port = 2552 } } - cluster.nodename = "someUniqueNameInTheCluster1" } As you can see in the example above there are four things you need to add to get started: @@ -39,7 +38,6 @@ As you can see in the example above there are four things you need to add to get * Change provider from ``akka.actor.LocalActorRefProvider`` to ``akka.remote.RemoteActorRefProvider`` * Add host name - the machine you want to run the actor system on * Add port number - the port the actor system should listen on -* Add cluster node name - must be a unique name in the cluster The example above only illustrates the bare minimum of properties you have to add to enable remoting. There are lots of more properties that are related to remoting in Akka. We refer to the following From d50a07c74ba13812ea5fd48a2a31569557c755f3 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 27 Dec 2011 20:07:21 +0100 Subject: [PATCH 22/28] Breaking the verification test up in 3 pats --- .../scala/akka/serialization/SerializeSpec.scala | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala index 12fe90bbec..f90f651065 100644 --- a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala @@ -172,19 +172,27 @@ object VerifySerializabilitySpec { class VerifySerializabilitySpec extends AkkaSpec(VerifySerializabilitySpec.conf) { import VerifySerializabilitySpec._ + implicit val timeout = Timeout(5 seconds) - "verify creators and messages" in { - implicit val timeout = Timeout(5 seconds) + "verify config" in { system.settings.SerializeAllCreators must be(true) system.settings.SerializeAllMessages must be(true) + } + "verify creators" in { val a = system.actorOf(Props[FooActor]) - Await.result(a ? "pigdog", timeout.duration) must be("pigdog") intercept[NotSerializableException] { Await.result(a ? new AnyRef, timeout.duration) } + system stop a + } + + "verify messages" in { + val a = system.actorOf(Props[FooActor]) + Await.result(a ? "pigdog", timeout.duration) must be("pigdog") intercept[java.io.NotSerializableException] { val b = system.actorOf(Props(new NonSerializableActor(system))) } + system stop a } } From 2dffb2b659bbbd0bc9336e6c616f7026bc9c41d9 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 27 Dec 2011 20:17:48 +0100 Subject: [PATCH 23/28] Removing wasteful DNS resolve --- .../src/main/scala/akka/remote/netty/NettyRemoteSupport.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 6e53c88866..11c59ea40d 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -669,7 +669,7 @@ class RemoteServerHandler( private def getClientAddress(c: Channel): Option[RemoteNettyAddress] = c.getRemoteAddress match { - case inet: InetSocketAddress ⇒ Some(RemoteNettyAddress(inet.getHostName, inet.getPort)) + case inet: InetSocketAddress ⇒ Some(RemoteNettyAddress(inet.getHostName, Some(inet.getAddress), inet.getPort)) case _ ⇒ None } } From a764b2bbdd7414f2393093a91f430af97593d012 Mon Sep 17 00:00:00 2001 From: Henrik Engstrom Date: Tue, 27 Dec 2011 22:13:02 +0100 Subject: [PATCH 24/28] Added remote Java sample. See #1571 --- .../remote/calculator/java/InternalMsg.java | 26 +++ .../java/JAdvancedCalculatorActor.java | 21 ++ .../remote/calculator/java/JCalcApp.java | 13 ++ .../java/JCalculatorApplication.java | 28 +++ .../calculator/java/JCreationActor.java | 31 +++ .../remote/calculator/java/JCreationApp.java | 26 +++ .../calculator/java/JCreationApplication.java | 35 ++++ .../remote/calculator/java/JLookupActor.java | 27 +++ .../remote/calculator/java/JLookupApp.java | 26 +++ .../calculator/java/JLookupApplication.java | 35 ++++ .../java/JSimpleCalculatorActor.java | 21 ++ .../sample/remote/calculator/java/Op.java | 181 ++++++++++++++++++ .../sample/remote/calculator/MathOp.scala | 4 +- 13 files changed, 472 insertions(+), 2 deletions(-) create mode 100644 akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/InternalMsg.java create mode 100644 akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JAdvancedCalculatorActor.java create mode 100644 akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCalcApp.java create mode 100644 akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCalculatorApplication.java create mode 100644 akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCreationActor.java create mode 100644 akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCreationApp.java create mode 100644 akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCreationApplication.java create mode 100644 akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JLookupActor.java create mode 100644 akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JLookupApp.java create mode 100644 akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JLookupApplication.java create mode 100644 akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JSimpleCalculatorActor.java create mode 100644 akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/Op.java diff --git a/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/InternalMsg.java b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/InternalMsg.java new file mode 100644 index 0000000000..d918866921 --- /dev/null +++ b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/InternalMsg.java @@ -0,0 +1,26 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package sample.remote.calculator.java; + +import akka.actor.ActorRef; + +public class InternalMsg { + static class MathOpMsg { + private ActorRef actor; + private Op.MathOp mathOp; + + MathOpMsg(ActorRef actor, Op.MathOp mathOp) { + this.actor = actor; + this.mathOp = mathOp; + } + + public ActorRef getActor() { + return actor; + } + + public Op.MathOp getMathOp() { + return mathOp; + } + } +} diff --git a/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JAdvancedCalculatorActor.java b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JAdvancedCalculatorActor.java new file mode 100644 index 0000000000..36da1486c6 --- /dev/null +++ b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JAdvancedCalculatorActor.java @@ -0,0 +1,21 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package sample.remote.calculator.java; + +import akka.actor.UntypedActor; + +public class JAdvancedCalculatorActor extends UntypedActor { + @Override + public void onReceive(Object message) throws Exception { + if (message instanceof Op.Multiply) { + Op.Multiply multiply = (Op.Multiply) message; + System.out.println("Calculating " + multiply.getN1() + " * " + multiply.getN2()); + getSender().tell(new Op.MultiplicationResult(multiply.getN1(), multiply.getN2(), multiply.getN1() * multiply.getN2())); + } else if (message instanceof Op.Divide) { + Op.Divide divide = (Op.Divide) message; + System.out.println("Calculating " + divide.getN1() + " / " + divide.getN2()); + getSender().tell(new Op.DivisionResult(divide.getN1(), divide.getN2(), divide.getN1() / divide.getN2())); + } + } +} diff --git a/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCalcApp.java b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCalcApp.java new file mode 100644 index 0000000000..470c4d123a --- /dev/null +++ b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCalcApp.java @@ -0,0 +1,13 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package sample.remote.calculator.java; + +public class JCalcApp { + + public static void main(String[] args) { + JCalculatorApplication app = new JCalculatorApplication(); + System.out.println("Started Calculator Application - waiting for messages"); + } + +} diff --git a/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCalculatorApplication.java b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCalculatorApplication.java new file mode 100644 index 0000000000..21d8e7883f --- /dev/null +++ b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCalculatorApplication.java @@ -0,0 +1,28 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package sample.remote.calculator.java; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.kernel.Bootable; +import com.typesafe.config.ConfigFactory; + +public class JCalculatorApplication implements Bootable { + private ActorSystem system; + + public JCalculatorApplication() { + system = ActorSystem.create("CalculatorApplication", ConfigFactory.load().getConfig("calculator")); + ActorRef actor = system.actorOf(new Props().withCreator(JSimpleCalculatorActor.class), "simpleCalculator"); + } + + @Override + public void startup() { + } + + @Override + public void shutdown() { + system.shutdown(); + } +} diff --git a/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCreationActor.java b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCreationActor.java new file mode 100644 index 0000000000..dc303e55ca --- /dev/null +++ b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCreationActor.java @@ -0,0 +1,31 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package sample.remote.calculator.java; + +import akka.actor.UntypedActor; + +import java.text.DecimalFormat; +import java.text.NumberFormat; + +public class JCreationActor extends UntypedActor { + private static final NumberFormat formatter = new DecimalFormat("#0.00"); + + @Override + public void onReceive(Object message) throws Exception { + if (message instanceof InternalMsg.MathOpMsg) { + InternalMsg.MathOpMsg msg = (InternalMsg.MathOpMsg) message; + msg.getActor().tell(msg.getMathOp(), getSelf()); + } else if (message instanceof Op.MathResult) { + if (message instanceof Op.MultiplicationResult) { + Op.MultiplicationResult result = (Op.MultiplicationResult) message; + System.out.println("Mul result: " + result.getN1() + " * " + + result.getN2() + " = " + result.getResult()); + } else if (message instanceof Op.DivisionResult) { + Op.DivisionResult result = (Op.DivisionResult) message; + System.out.println("Div result: " + result.getN1() + " / " + + result.getN2() + " = " + formatter.format(result.getResult())); + } + } + } +} diff --git a/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCreationApp.java b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCreationApp.java new file mode 100644 index 0000000000..67ac7f490a --- /dev/null +++ b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCreationApp.java @@ -0,0 +1,26 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package sample.remote.calculator.java; + +import java.util.Random; + +public class JCreationApp { + public static void main(String[] args) { + JCreationApplication app = new JCreationApplication(); + System.out.println("Started Creation Application"); + Random r = new Random(); + while (true) { + if (r.nextInt(100) % 2 == 0) { + app.doSomething(new Op.Multiply(r.nextInt(100), r.nextInt(100))); + } else { + app.doSomething(new Op.Divide(r.nextInt(10000), r.nextInt(99) + 1)); + } + + try { + Thread.sleep(200); + } catch (InterruptedException e) { + } + } + } +} diff --git a/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCreationApplication.java b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCreationApplication.java new file mode 100644 index 0000000000..583e8d5a96 --- /dev/null +++ b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCreationApplication.java @@ -0,0 +1,35 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package sample.remote.calculator.java; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.kernel.Bootable; +import com.typesafe.config.ConfigFactory; + +public class JCreationApplication implements Bootable { + private ActorSystem system; + private ActorRef actor; + private ActorRef remoteActor; + + public JCreationApplication() { + system = ActorSystem.create("CreationApplication", ConfigFactory.load().getConfig("remotecreation")); + actor = system.actorOf(new Props().withCreator(JCreationActor.class)); + remoteActor = system.actorOf(new Props().withCreator(JAdvancedCalculatorActor.class), "advancedCalculator"); + } + + public void doSomething(Op.MathOp mathOp) { + actor.tell(new InternalMsg.MathOpMsg(remoteActor, mathOp)); + } + + @Override + public void startup() { + } + + @Override + public void shutdown() { + system.shutdown(); + } +} diff --git a/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JLookupActor.java b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JLookupActor.java new file mode 100644 index 0000000000..52a2f1d67c --- /dev/null +++ b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JLookupActor.java @@ -0,0 +1,27 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package sample.remote.calculator.java; + +import akka.actor.UntypedActor; + +public class JLookupActor extends UntypedActor { + + @Override + public void onReceive(Object message) throws Exception { + if (message instanceof InternalMsg.MathOpMsg) { + InternalMsg.MathOpMsg msg = (InternalMsg.MathOpMsg) message; + msg.getActor().tell(msg.getMathOp(), getSelf()); + } else if (message instanceof Op.MathResult) { + if (message instanceof Op.AddResult) { + Op.AddResult result = (Op.AddResult) message; + System.out.println("Add result: " + result.getN1() + " + " + + result.getN2() + " = " + result.getResult()); + } else if (message instanceof Op.SubtractResult) { + Op.SubtractResult result = (Op.SubtractResult) message; + System.out.println("Sub result: " + result.getN1() + " - " + + result.getN2() + " = " + result.getResult()); + } + } + } +} diff --git a/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JLookupApp.java b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JLookupApp.java new file mode 100644 index 0000000000..a854a1916d --- /dev/null +++ b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JLookupApp.java @@ -0,0 +1,26 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package sample.remote.calculator.java; + +import java.util.Random; + +public class JLookupApp { + public static void main(String[] args) { + JLookupApplication app = new JLookupApplication(); + System.out.println("Started Lookup Application"); + Random r = new Random(); + while (true) { + if (r.nextInt(100) % 2 == 0) { + app.doSomething(new Op.Add(r.nextInt(100), r.nextInt(100))); + } else { + app.doSomething(new Op.Subtract(r.nextInt(100), r.nextInt(100))); + } + + try { + Thread.sleep(200); + } catch (InterruptedException e) { + } + } + } +} diff --git a/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JLookupApplication.java b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JLookupApplication.java new file mode 100644 index 0000000000..1ee426fe97 --- /dev/null +++ b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JLookupApplication.java @@ -0,0 +1,35 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package sample.remote.calculator.java; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.kernel.Bootable; +import com.typesafe.config.ConfigFactory; + +public class JLookupApplication implements Bootable { + private ActorSystem system; + private ActorRef actor; + private ActorRef remoteActor; + + public JLookupApplication() { + system = ActorSystem.create("LookupApplication", ConfigFactory.load().getConfig("remotelookup")); + actor = system.actorOf(new Props().withCreator(JLookupActor.class)); + remoteActor = system.actorFor("akka://CalculatorApplication@127.0.0.1:2552/user/simpleCalculator"); + } + + public void doSomething(Op.MathOp mathOp) { + actor.tell(new InternalMsg.MathOpMsg(remoteActor, mathOp)); + } + + @Override + public void startup() { + } + + @Override + public void shutdown() { + system.shutdown(); + } +} diff --git a/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JSimpleCalculatorActor.java b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JSimpleCalculatorActor.java new file mode 100644 index 0000000000..738e4c7ae5 --- /dev/null +++ b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JSimpleCalculatorActor.java @@ -0,0 +1,21 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package sample.remote.calculator.java; + +import akka.actor.UntypedActor; + +public class JSimpleCalculatorActor extends UntypedActor { + @Override + public void onReceive(Object message) { + if (message instanceof Op.Add) { + Op.Add add = (Op.Add) message; + System.out.println("Calculating " + add.getN1() + " + " + add.getN2()); + getSender().tell(new Op.AddResult(add.getN1(), add.getN2(), add.getN1() + add.getN2())); + } else if (message instanceof Op.Subtract) { + Op.Subtract subtract = (Op.Subtract) message; + System.out.println("Calculating " + subtract.getN1() + " - " + subtract.getN2()); + getSender().tell(new Op.SubtractResult(subtract.getN1(), subtract.getN2(), subtract.getN1() - subtract.getN2())); + } + } +} diff --git a/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/Op.java b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/Op.java new file mode 100644 index 0000000000..1c461b0405 --- /dev/null +++ b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/Op.java @@ -0,0 +1,181 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package sample.remote.calculator.java; + +import java.io.Serializable; + +public class Op { + + public interface MathOp extends Serializable {} + + public interface MathResult extends Serializable {} + + static class Add implements MathOp { + private final int n1; + private final int n2; + + public Add(int n1, int n2) { + this.n1 = n1; + this.n2 = n2; + } + + public int getN1() { + return n1; + } + + public int getN2() { + return n2; + } + } + + static class AddResult implements MathResult { + private final int n1; + private final int n2; + private final int result; + + public AddResult(int n1, int n2, int result) { + this.n1 = n1; + this.n2 = n2; + this.result = result; + } + + public int getN1() { + return n1; + } + + public int getN2() { + return n2; + } + + public int getResult() { + return result; + } + } + + static class Subtract implements MathOp { + private final int n1; + private final int n2; + + public Subtract(int n1, int n2) { + this.n1 = n1; + this.n2 = n2; + } + + public int getN1() { + return n1; + } + + public int getN2() { + return n2; + } + } + + static class SubtractResult implements MathResult { + private final int n1; + private final int n2; + private final int result; + + public SubtractResult(int n1, int n2, int result) { + this.n1 = n1; + this.n2 = n2; + this.result = result; + } + + public int getN1() { + return n1; + } + + public int getN2() { + return n2; + } + + public int getResult() { + return result; + } + } + + static class Multiply implements MathOp { + private final int n1; + private final int n2; + + public Multiply(int n1, int n2) { + this.n1 = n1; + this.n2 = n2; + } + + public int getN1() { + return n1; + } + + public int getN2() { + return n2; + } + } + + static class MultiplicationResult implements MathResult { + private final int n1; + private final int n2; + private final int result; + + public MultiplicationResult(int n1, int n2, int result) { + this.n1 = n1; + this.n2 = n2; + this.result = result; + } + + public int getN1() { + return n1; + } + + public int getN2() { + return n2; + } + + public int getResult() { + return result; + } + } + + static class Divide implements MathOp { + private final double n1; + private final int n2; + + public Divide(double n1, int n2) { + this.n1 = n1; + this.n2 = n2; + } + + public double getN1() { + return n1; + } + + public int getN2() { + return n2; + } + } + + static class DivisionResult implements MathResult { + private final double n1; + private final int n2; + private final double result; + + public DivisionResult(double n1, int n2, double result) { + this.n1 = n1; + this.n2 = n2; + this.result = result; + } + + public double getN1() { + return n1; + } + + public int getN2() { + return n2; + } + + public double getResult() { + return result; + } + } +} diff --git a/akka-samples/akka-sample-remote/src/main/scala/sample/remote/calculator/MathOp.scala b/akka-samples/akka-sample-remote/src/main/scala/sample/remote/calculator/MathOp.scala index 5dde9625a9..f2a718cbdc 100644 --- a/akka-samples/akka-sample-remote/src/main/scala/sample/remote/calculator/MathOp.scala +++ b/akka-samples/akka-sample-remote/src/main/scala/sample/remote/calculator/MathOp.scala @@ -13,7 +13,7 @@ case class Subtract(nbr1: Int, nbr2: Int) extends MathOp case class Multiply(nbr1: Int, nbr2: Int) extends MathOp -case class Divide(nbr1: Int, nbr2: Int) extends MathOp +case class Divide(nbr1: Double, nbr2: Int) extends MathOp trait MathResult @@ -31,7 +31,7 @@ class AdvancedCalculatorActor extends Actor { println("Calculating %d * %d".format(n1, n2)) sender ! MultiplicationResult(n1, n2, n1 * n2) case Divide(n1, n2) ⇒ - println("Calculating %d / %d".format(n1, n2)) + println("Calculating %.0f / %d".format(n1, n2)) sender ! DivisionResult(n1, n2, n1 / n2) } } \ No newline at end of file From f21651d7479c5572b0339a02c0b1ce24cdc167da Mon Sep 17 00:00:00 2001 From: Henrik Engstrom Date: Wed, 28 Dec 2011 01:40:58 +0100 Subject: [PATCH 25/28] Minor changes - updated after feedback. See #1571 --- .../main/java/sample/remote/calculator/java/InternalMsg.java | 4 ++-- .../sample/remote/calculator/java/JCalculatorApplication.java | 2 +- .../sample/remote/calculator/java/JCreationApplication.java | 4 ++-- .../sample/remote/calculator/java/JLookupApplication.java | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/InternalMsg.java b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/InternalMsg.java index d918866921..37cca0dd53 100644 --- a/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/InternalMsg.java +++ b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/InternalMsg.java @@ -7,8 +7,8 @@ import akka.actor.ActorRef; public class InternalMsg { static class MathOpMsg { - private ActorRef actor; - private Op.MathOp mathOp; + private final ActorRef actor; + private final Op.MathOp mathOp; MathOpMsg(ActorRef actor, Op.MathOp mathOp) { this.actor = actor; diff --git a/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCalculatorApplication.java b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCalculatorApplication.java index 21d8e7883f..5144c90e0f 100644 --- a/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCalculatorApplication.java +++ b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCalculatorApplication.java @@ -14,7 +14,7 @@ public class JCalculatorApplication implements Bootable { public JCalculatorApplication() { system = ActorSystem.create("CalculatorApplication", ConfigFactory.load().getConfig("calculator")); - ActorRef actor = system.actorOf(new Props().withCreator(JSimpleCalculatorActor.class), "simpleCalculator"); + ActorRef actor = system.actorOf(new Props(JSimpleCalculatorActor.class), "simpleCalculator"); } @Override diff --git a/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCreationApplication.java b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCreationApplication.java index 583e8d5a96..82d5ed373f 100644 --- a/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCreationApplication.java +++ b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JCreationApplication.java @@ -16,8 +16,8 @@ public class JCreationApplication implements Bootable { public JCreationApplication() { system = ActorSystem.create("CreationApplication", ConfigFactory.load().getConfig("remotecreation")); - actor = system.actorOf(new Props().withCreator(JCreationActor.class)); - remoteActor = system.actorOf(new Props().withCreator(JAdvancedCalculatorActor.class), "advancedCalculator"); + actor = system.actorOf(new Props(JCreationActor.class)); + remoteActor = system.actorOf(new Props(JAdvancedCalculatorActor.class), "advancedCalculator"); } public void doSomething(Op.MathOp mathOp) { diff --git a/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JLookupApplication.java b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JLookupApplication.java index 1ee426fe97..f70fda5baa 100644 --- a/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JLookupApplication.java +++ b/akka-samples/akka-sample-remote/src/main/java/sample/remote/calculator/java/JLookupApplication.java @@ -16,7 +16,7 @@ public class JLookupApplication implements Bootable { public JLookupApplication() { system = ActorSystem.create("LookupApplication", ConfigFactory.load().getConfig("remotelookup")); - actor = system.actorOf(new Props().withCreator(JLookupActor.class)); + actor = system.actorOf(new Props(JLookupActor.class)); remoteActor = system.actorFor("akka://CalculatorApplication@127.0.0.1:2552/user/simpleCalculator"); } From f8aca9eee5e20337252ed7ecbcd89f75b37a26f2 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 28 Dec 2011 12:43:28 +0100 Subject: [PATCH 26/28] Adding ScalaDoc to ThreadPoolBuilder.scala --- .../src/main/scala/akka/dispatch/ThreadPoolBuilder.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index aba58df3a4..3a5733f9ef 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -212,6 +212,11 @@ trait ExecutorServiceDelegate extends ExecutorService { def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit) } +/** + * The RejectedExecutionHandler used by Akka, it improves on CallerRunsPolicy + * by throwing a RejectedExecutionException if the executor isShutdown. + * (CallerRunsPolicy silently discards the runnable in this case, which is arguably broken) + */ class SaneRejectedExecutionHandler extends RejectedExecutionHandler { def rejectedExecution(runnable: Runnable, threadPoolExecutor: ThreadPoolExecutor): Unit = { if (threadPoolExecutor.isShutdown) throw new RejectedExecutionException("Shutdown") From 519c8fb830769d2e06f2dea21e545671e75210f4 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 28 Dec 2011 12:43:48 +0100 Subject: [PATCH 27/28] Removing unused import and DurableMailboxException from the DurableMailbox.scala --- .../akka/actor/mailbox/DurableMailbox.scala | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala index 1d684b8f58..6d6bf10eb0 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala @@ -3,35 +3,19 @@ */ package akka.actor.mailbox -import akka.util.ReflectiveAccess -import akka.AkkaException import akka.actor.ActorContext import akka.actor.ActorRef -import akka.actor.SerializedActorRef import akka.dispatch.Envelope import akka.dispatch.DefaultSystemMessageQueue -import akka.dispatch.Dispatcher import akka.dispatch.CustomMailbox -import akka.dispatch.MailboxType -import akka.dispatch.MessageDispatcher -import akka.dispatch.MessageQueue import akka.remote.MessageSerializer import akka.remote.RemoteProtocol.ActorRefProtocol -import akka.remote.RemoteProtocol.MessageProtocol import akka.remote.RemoteProtocol.RemoteMessageProtocol -import akka.remote.RemoteActorRefProvider -import akka.remote.netty.NettyRemoteServer -import akka.serialization.Serialization -import com.typesafe.config.Config private[akka] object DurableExecutableMailboxConfig { val Name = "[\\.\\/\\$\\s]".r } -class DurableMailboxException private[akka] (message: String, cause: Throwable) extends AkkaException(message, cause) { - def this(message: String) = this(message, null) -} - abstract class DurableMailbox(owner: ActorContext) extends CustomMailbox(owner) with DefaultSystemMessageQueue { import DurableExecutableMailboxConfig._ From 23aa095964fde6a171dc315ebd5bfade7505e4f5 Mon Sep 17 00:00:00 2001 From: Henrik Engstrom Date: Wed, 28 Dec 2011 14:33:43 +0100 Subject: [PATCH 28/28] Updated README to explain the Java sample. See #1571 --- akka-samples/akka-sample-remote/README.rst | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/akka-samples/akka-sample-remote/README.rst b/akka-samples/akka-sample-remote/README.rst index 0732c33327..5f9a9fb6c6 100644 --- a/akka-samples/akka-sample-remote/README.rst +++ b/akka-samples/akka-sample-remote/README.rst @@ -11,7 +11,12 @@ The Sample Explained In order to showcase the remote capabilities of Akka 2.0 we thought a remote calculator could do the trick. +There are two implementations of the sample; one in Scala and one in Java. +The explanation below is for Scala, but everything is similar in Java except that the class names begin with a ``J``, +e.g. ``JCalcApp`` instead of ``CalcApp``, and that the Java classes reside in another package structure. + There are three actor systems used in the sample: + * CalculatorApplication : the actor system performing the number crunching * LookupApplication : illustrates how to look up an actor on a remote node and and how communicate with that actor * CreationApplication : illustrates how to create an actor on a remote node and how to communicate with that actor @@ -134,5 +139,5 @@ The sample application is just that, i.e. a sample. Parts of it are not the way Some improvements are to remove all hard coded addresses from the code as they reduce the flexibility of how and where the application can be run. We leave this to the astute reader to refine the sample into a real-world app. -[akka]: http://akka.io -[sbt]: http://code.google.com/p/simple-build-tool/ +* `Akka `_ +* `SBT `_