diff --git a/akka-actors/src/main/scala/actor/Actor.scala b/akka-actors/src/main/scala/actor/Actor.scala index 21597a8c52..af185ba6fc 100644 --- a/akka-actors/src/main/scala/actor/Actor.scala +++ b/akka-actors/src/main/scala/actor/Actor.scala @@ -71,26 +71,31 @@ trait Actor extends Logging with TransactionManagement { private var _hotswap: Option[PartialFunction[Any, Unit]] = None private var _config: Option[AnyRef] = None private val _remoteFlagLock = new ReadWriteLock - private var _senderFuture: Option[CompletableFutureResult] = None private var _remoteAddress: Option[InetSocketAddress] = None private[akka] val _linkedActors = new HashSet[Actor] private[akka] var _mailbox: MessageQueue = _ private[akka] var _supervisor: Option[Actor] = None + /** + * This field should normally not be touched by user code, which should instead use the 'reply' method. + * But it can be used for advanced use-cases when one might want to store away the future and + * resolve it later and/or somewhere else. + */ + protected var senderFuture: Option[CompletableFutureResult] = None + // ==================================== // ==== USER CALLBACKS TO OVERRIDE ==== // ==================================== - /** - * User overridable callback/setting. + * User overridable callback/setting. * * Identifier for actor, does not have to be a unique one. Default is the class name. * * This field is used for logging etc. but also as the identifier for persistence, which means that you can * use a custom name to be able to retrieve the "correct" persisted state upon restart, remote restart etc. */ - protected[this] var id: String = this.getClass.toString + protected[akka] var id: String = this.getClass.toString /** * User overridable callback/setting. @@ -102,15 +107,19 @@ trait Actor extends Logging with TransactionManagement { /** * User overridable callback/setting. * - * User can (and is encouraged to) override the default configuration so it fits the specific use-case that the actor is used for. + * User can (and is encouraged to) override the default configuration (newEventBasedThreadPoolDispatcher) + * so it fits the specific use-case that the actor is used for. *

* It is beneficial to have actors share the same dispatcher, easily +100 actors can share the same. *
- * But if you are running many many actors then it can be a good idea to have split them up in terms of dispatcher sharing. + * But if you are running many many actors then it can be a good idea to have split them up in terms of + * dispatcher sharing. *
- * Default is that all actors that are created and spawned from within this actor is sharing the same dispatcher as its creator. + * Default is that all actors that are created and spawned from within this actor is sharing the same + * dispatcher as its creator. *

-   *   dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher
+   *   val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher
+   *   dispatcher
    *     .withNewThreadPoolWithBoundedBlockingQueue(100)
    *     .setCorePoolSize(16)
    *     .setMaxPoolSize(128)
@@ -318,7 +327,7 @@ trait Actor extends Logging with TransactionManagement {
    * Does only work together with the actor !! method and/or active objects not annotated
    * with @oneway.
    */
-  protected[this] def reply(message: AnyRef) = _senderFuture match {
+  protected[this] def reply(message: AnyRef) = senderFuture match {
     case None => throw new IllegalStateException(
       "\n\tNo sender in scope, can't reply. " +
       "\n\tHave you used the '!' message send or the '@oneway' active object annotation? " +
@@ -367,8 +376,11 @@ trait Actor extends Logging with TransactionManagement {
   }
 
   /**
-   * Links an other actor to this actor. Links are unidirectional and means that a the linking actor will receive a notification nif the linked actor has crashed.
-   * If the 'trapExit' flag has been set then it will 'trap' the failure and automatically restart the linked actors according to the restart strategy defined by the 'faultHandler'.
+   * Links an other actor to this actor. Links are unidirectional and means that a the linking actor will
+   * receive a notification if the linked actor has crashed.
+   * 

+ * If the 'trapExit' member field has been set to 'true' then it will 'trap' the failure and automatically + * restart the linked actors according to the restart strategy defined by the 'faultHandler'. *

* To be invoked from within the actor itself. */ @@ -530,7 +542,7 @@ trait Actor extends Logging with TransactionManagement { val message = messageHandle.message //serializeMessage(messageHandle.message) val future = messageHandle.future try { - _senderFuture = future + senderFuture = future if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function else throw new IllegalArgumentException("No handler matching message [" + message + "] in " + toString) } catch { @@ -561,9 +573,9 @@ trait Actor extends Logging with TransactionManagement { } try { - _senderFuture = future + senderFuture = future if (isTransactionRequiresNew && !isTransactionInScope) { - if (_senderFuture.isEmpty) throw new StmException( + if (senderFuture.isEmpty) throw new StmException( "\n\tCan't continue transaction in a one-way fire-forget message send" + "\n\tE.g. using Actor '!' method or Active Object 'void' method" + "\n\tPlease use the Actor '!!', '!?' methods or Active Object method with non-void return type") diff --git a/akka-actors/src/main/scala/actor/ActorRegistry.scala b/akka-actors/src/main/scala/actor/ActorRegistry.scala index 0fc1860682..9e87562d4a 100755 --- a/akka-actors/src/main/scala/actor/ActorRegistry.scala +++ b/akka-actors/src/main/scala/actor/ActorRegistry.scala @@ -9,27 +9,38 @@ import se.scalablesolutions.akka.util.Logging import scala.collection.mutable.HashMap /** - * Registry holding all actor instances, mapped by class. + * Registry holding all actor instances, mapped by class and the actor's id field (which can be set by user-code). * * @author Jonas Bonér */ object ActorRegistry extends Logging { - private val actors = new HashMap[String, List[Actor]] + private val actorsByClassName = new HashMap[String, List[Actor]] + private val actorsById = new HashMap[String, List[Actor]] - def actorsFor(clazz: Class[_]): List[Actor] = actorsFor(clazz.getName) - - def actorsFor(fqn : String): List[Actor] = synchronized { - actors.get(fqn) match { + def actorsFor(clazz: Class[_ <: Actor]): List[Actor] = synchronized { + actorsByClassName.get(clazz.getName) match { case None => Nil case Some(instances) => instances } } - + + def actorsFor(id : String): List[Actor] = synchronized { + actorsById.get(id) match { + case None => Nil + case Some(instances) => instances + } + } + def register(actor: Actor) = synchronized { - val name = actor.getClass.getName - actors.get(name) match { - case Some(instances) => actors + (name -> (actor :: instances)) - case None => actors + (name -> (actor :: Nil)) + val className = actor.getClass.getName + actorsByClassName.get(className) match { + case Some(instances) => actorsByClassName + (className -> (actor :: instances)) + case None => actorsByClassName + (className -> (actor :: Nil)) + } + val id = actor.getClass.getName + actorsById.get(id) match { + case Some(instances) => actorsById + (id -> (actor :: instances)) + case None => actorsById + (id -> (actor :: Nil)) } } } diff --git a/akka.ipr b/akka.ipr index cae3624287..b1f20c7e6b 100644 --- a/akka.ipr +++ b/akka.ipr @@ -136,6 +136,7 @@ + diff --git a/akka.iws b/akka.iws index 99032215d7..609101c2e2 100644 --- a/akka.iws +++ b/akka.iws @@ -5,19 +5,14 @@ - - - - - - + + + - - - + @@ -1087,11 +1112,11 @@ - - - - - + + + + + localhost @@ -1132,17 +1157,15 @@ - + - - + - @@ -1154,6 +1177,8 @@ + + @@ -1191,116 +1216,92 @@ - - - - - - - - - - - - - - - - - - - - - - - - + - - - + - - - - - - - - - - + - - - + - - - - - + - - - + - - - + - + - - - - - - - - - - - - - - - + - + + + + + + + + + + + + + + + + + + + + + - + - + + + + + + + + + + + + + + + diff --git a/config/storage-conf.xml b/config/storage-conf.xml index 9deee41c42..5894740d89 100644 --- a/config/storage-conf.xml +++ b/config/storage-conf.xml @@ -82,8 +82,8 @@ one logical cluster from joining any other cluster. --> + +