From 8c8bf195a8b0fb285094b91ee4a5397544dcd311 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Sat, 1 May 2010 12:59:24 +0200 Subject: [PATCH] akka-core now compiles --- .../src/main/scala/actor/ActiveObject.scala | 70 ++-- akka-core/src/main/scala/actor/Actor.scala | 391 ++++++++++-------- .../src/main/scala/actor/ActorRegistry.scala | 15 +- akka-core/src/main/scala/actor/Agent.scala | 18 +- .../src/main/scala/actor/Scheduler.scala | 18 +- .../src/main/scala/actor/Supervisor.scala | 6 +- .../ActiveObjectGuiceConfigurator.scala | 6 +- .../src/main/scala/dispatch/Dispatchers.scala | 4 +- ...sedEventDrivenWorkStealingDispatcher.scala | 2 +- .../dispatch/ThreadBasedDispatcher.scala | 4 +- akka-core/src/main/scala/remote/Cluster.scala | 8 +- .../src/main/scala/remote/RemoteClient.scala | 66 +-- .../src/main/scala/remote/RemoteServer.scala | 39 +- .../src/main/scala/stm/DataFlowVariable.scala | 10 +- 14 files changed, 353 insertions(+), 304 deletions(-) diff --git a/akka-core/src/main/scala/actor/ActiveObject.scala b/akka-core/src/main/scala/actor/ActiveObject.scala index f80dd2db42..89208fd73b 100644 --- a/akka-core/src/main/scala/actor/ActiveObject.scala +++ b/akka-core/src/main/scala/actor/ActiveObject.scala @@ -180,7 +180,8 @@ object ActiveObject { } @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") - def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = { + def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, + dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = { val actor = new Dispatcher(false, restartCallbacks) actor.messageDispatcher = dispatcher newInstance(intf, target, actor, None, timeout) @@ -194,7 +195,8 @@ object ActiveObject { } @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") - def newInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = { + def newInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, + dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = { val actor = new Dispatcher(transactionRequired, restartCallbacks) actor.messageDispatcher = dispatcher newInstance(target, actor, None, timeout) @@ -208,7 +210,8 @@ object ActiveObject { } @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") - def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = { + def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, + dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = { val actor = new Dispatcher(transactionRequired, restartCallbacks) actor.messageDispatcher = dispatcher newInstance(intf, target, actor, None, timeout) @@ -222,7 +225,8 @@ object ActiveObject { } @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") - def newRemoteInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = { + def newRemoteInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, + hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = { val actor = new Dispatcher(false, restartCallbacks) actor.messageDispatcher = dispatcher newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout) @@ -236,35 +240,40 @@ object ActiveObject { } @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") - def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = { + def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, + hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = { val actor = new Dispatcher(false, restartCallbacks) actor.messageDispatcher = dispatcher newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout) } @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") - def newRemoteInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, hostname: String, port: Int): T = { + def newRemoteInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, + dispatcher: MessageDispatcher, hostname: String, port: Int): T = { val actor = new Dispatcher(transactionRequired, None) actor.messageDispatcher = dispatcher newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout) } @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") - def newRemoteInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = { + def newRemoteInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, + hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = { val actor = new Dispatcher(transactionRequired, restartCallbacks) actor.messageDispatcher = dispatcher newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout) } @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") - def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, hostname: String, port: Int): T = { + def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, + dispatcher: MessageDispatcher, hostname: String, port: Int): T = { val actor = new Dispatcher(transactionRequired, None) actor.messageDispatcher = dispatcher newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout) } @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") - def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = { + def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, + dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = { val actor = new Dispatcher(transactionRequired, restartCallbacks) actor.messageDispatcher = dispatcher newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout) @@ -274,11 +283,10 @@ object ActiveObject { val proxy = Proxy.newInstance(target, false, false) actor.initialize(target, proxy) actor.timeout = timeout - if (remoteAddress.isDefined) { - actor.makeRemote(remoteAddress.get) - } - AspectInitRegistry.register(proxy, AspectInit(target, actor, remoteAddress, timeout)) - actor.start + if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get) + val actorId = new ActorID(actor) + AspectInitRegistry.register(proxy, AspectInit(target, actorId, remoteAddress, timeout)) + actorId.start proxy.asInstanceOf[T] } @@ -286,20 +294,18 @@ object ActiveObject { val proxy = Proxy.newInstance(Array(intf), Array(target), false, false) actor.initialize(target.getClass, target) actor.timeout = timeout - if (remoteAddress.isDefined) { - actor.makeRemote(remoteAddress.get) - } - AspectInitRegistry.register(proxy, AspectInit(intf, actor, remoteAddress, timeout)) - actor.start + if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get) + val actorId = new ActorID(actor) + AspectInitRegistry.register(proxy, AspectInit(intf, actorId, remoteAddress, timeout)) + actorId.start proxy.asInstanceOf[T] } /** * Get the underlying dispatcher actor for the given active object. */ - def actorFor(obj: AnyRef): Option[Actor] = { - ActorRegistry.actorsFor(classOf[Dispatcher]).find(a=>a.target == Some(obj)) - } + def actorFor(obj: AnyRef): Option[ActorID] = + ActorRegistry.actorsFor(classOf[Dispatcher]).find(a => a.actor.asInstanceOf[Dispatcher].target == Some(obj)) /** * Links an other active object to this active object. @@ -382,10 +388,10 @@ private[akka] object AspectInitRegistry { private[akka] sealed case class AspectInit( val target: Class[_], - val actor: Dispatcher, + val actorId: ActorID, val remoteAddress: Option[InetSocketAddress], val timeout: Long) { - def this(target: Class[_],actor: Dispatcher, timeout: Long) = this(target, actor, None, timeout) + def this(target: Class[_], actorId: ActorID, timeout: Long) = this(target, actorId, None, timeout) } /** @@ -399,7 +405,7 @@ private[akka] sealed case class AspectInit( private[akka] sealed class ActiveObjectAspect { @volatile private var isInitialized = false private var target: Class[_] = _ - private var actor: Dispatcher = _ + private var actorId: ActorID = _ private var remoteAddress: Option[InetSocketAddress] = _ private var timeout: Long = _ @@ -408,7 +414,7 @@ private[akka] sealed class ActiveObjectAspect { if (!isInitialized) { val init = AspectInitRegistry.initFor(joinPoint.getThis) target = init.target - actor = init.actor + actorId = init.actorId remoteAddress = init.remoteAddress timeout = init.timeout isInitialized = true @@ -424,10 +430,10 @@ private[akka] sealed class ActiveObjectAspect { private def localDispatch(joinPoint: JoinPoint): AnyRef = { val rtti = joinPoint.getRtti.asInstanceOf[MethodRtti] if (isOneWay(rtti)) { - (actor ! Invocation(joinPoint, true, true) ).asInstanceOf[AnyRef] + (actorId ! Invocation(joinPoint, true, true) ).asInstanceOf[AnyRef] } else { - val result = actor !! (Invocation(joinPoint, false, isVoid(rtti)), timeout) + val result = actorId !! (Invocation(joinPoint, false, isVoid(rtti)), timeout) if (result.isDefined) result.get else throw new IllegalStateException("No result defined for invocation [" + joinPoint + "]") } @@ -441,13 +447,13 @@ private[akka] sealed class ActiveObjectAspect { .setId(RemoteRequestIdFactory.nextId) .setMethod(rtti.getMethod.getName) .setTarget(target.getName) - .setUuid(actor.uuid) + .setUuid(actorId.uuid) .setTimeout(timeout) .setIsActor(false) .setIsOneWay(oneWay_?) .setIsEscaped(false) RemoteProtocolBuilder.setMessage(message, requestBuilder) - val id = actor.registerSupervisorAsRemoteActor + val id = actorId.actor.registerSupervisorAsRemoteActor if (id.isDefined) requestBuilder.setSupervisorUuid(id.get) val remoteMessage = requestBuilder.build val future = RemoteClient.clientFor(remoteAddress.get).send(remoteMessage, None) @@ -513,8 +519,8 @@ private[akka] sealed class ActiveObjectAspect { } } -// Jan Kronquist: started work on issue 121 -private[akka] case class Link(val actor: Actor) +// FIXME Jan Kronquist: started work on issue 121 +private[akka] case class Link(val actor: ActorID) object Dispatcher { val ZERO_ITEM_CLASS_ARRAY = Array[Class[_]]() diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 105ef4798d..e3ab511793 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -47,6 +47,7 @@ abstract class RemoteActor(hostname: String, port: Int) extends Actor { makeRemote(hostname, port) } +// Life-cycle messages for the Actors @serializable sealed trait LifeCycleMessage case class HotSwap(code: Option[PartialFunction[Any, Unit]]) extends LifeCycleMessage case class Restart(reason: Throwable) extends LifeCycleMessage @@ -55,30 +56,199 @@ case class Unlink(child: ActorID) extends LifeCycleMessage case class UnlinkAndStop(child: ActorID) extends LifeCycleMessage case object Kill extends LifeCycleMessage +// Exceptions for Actors class ActorKilledException private[akka](message: String) extends RuntimeException(message) +class ActorInitializationException private[akka](message: String) extends RuntimeException(message) -sealed abstract class DispatcherType -object DispatcherType { - case object EventBasedThreadPooledProxyInvokingDispatcher extends DispatcherType - case object EventBasedSingleThreadDispatcher extends DispatcherType - case object EventBasedThreadPoolDispatcher extends DispatcherType - case object ThreadBasedDispatcher extends DispatcherType +/** + * Utility class with factory methods for creating Actors. + * + * @author Jonas Bonér + */ +object Actor extends Logging { + val TIMEOUT = config.getInt("akka.actor.timeout", 5000) + val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false) + val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost") + val PORT = config.getInt("akka.remote.server.port", 9999) + + // FIXME remove next release + object Sender { + @deprecated("import Actor.Sender.Self is not needed anymore, just use 'actor ! msg'") + object Self + } + + /** + * Creates a new ActorID out of the Actor with type T. + *
+   *   import Actor._
+   *   val actor = newActor[MyActor]
+   *   actor.start
+   *   actor ! message
+   *   actor.stop
+   * 
+ */ + def newActor[T <: Actor: Manifest]: ActorID = new ActorID(manifest[T].erasure.asInstanceOf[Class[T]].newInstance) + + /** + * Use to create an anonymous event-driven actor. + *

+ * The actor is created with a 'permanent' life-cycle configuration, which means that + * if the actor is supervised and dies it will be restarted. + *

+ * The actor is started when created. + * Example: + *

+   * import Actor._
+   *
+   * val a = actor {
+   *   case msg => ... // handle message
+   * }
+   * 
+ */ + def actor(body: PartialFunction[Any, Unit]): ActorID = + new ActorID(new Actor() { + lifeCycle = Some(LifeCycle(Permanent)) + start + def receive: PartialFunction[Any, Unit] = body + }) + + /** + * Use to create an anonymous transactional event-driven actor. + *

+ * The actor is created with a 'permanent' life-cycle configuration, which means that + * if the actor is supervised and dies it will be restarted. + *

+ * The actor is started when created. + * Example: + *

+   * import Actor._
+   *
+   * val a = transactor {
+   *   case msg => ... // handle message
+   * }
+   * 
+ */ + def transactor(body: PartialFunction[Any, Unit]): ActorID = + new ActorID(new Transactor() { + lifeCycle = Some(LifeCycle(Permanent)) + start + def receive: PartialFunction[Any, Unit] = body + }) + + /** + * Use to create an anonymous event-driven actor with a 'temporary' life-cycle configuration, + * which means that if the actor is supervised and dies it will *not* be restarted. + *

+ * The actor is started when created. + * Example: + *

+   * import Actor._
+   *
+   * val a = temporaryActor {
+   *   case msg => ... // handle message
+   * }
+   * 
+ */ + def temporaryActor(body: PartialFunction[Any, Unit]): ActorID = + new ActorID(new Actor() { + lifeCycle = Some(LifeCycle(Temporary)) + start + def receive = body + }) + + /** + * Use to create an anonymous event-driven actor with both an init block and a message loop block. + *

+ * The actor is created with a 'permanent' life-cycle configuration, which means that + * if the actor is supervised and dies it will be restarted. + *

+ * The actor is started when created. + * Example: + *

+   * val a = Actor.init {
+   *   ... // init stuff
+   * } receive  {
+   *   case msg => ... // handle message
+   * }
+   * 
+ * + */ + def init[A](body: => Unit) = { + def handler[A](body: => Unit) = new { + def receive(handler: PartialFunction[Any, Unit]) = + new ActorID(new Actor() { + lifeCycle = Some(LifeCycle(Permanent)) + start + body + def receive = handler + }) + } + handler(body) + } + + /** + * Use to spawn out a block of code in an event-driven actor. Will shut actor down when + * the block has been executed. + *

+ * NOTE: If used from within an Actor then has to be qualified with 'Actor.spawn' since + * there is a method 'spawn[ActorType]' in the Actor trait already. + * Example: + *

+   * import Actor._
+   *
+   * spawn {
+   *   ... // do stuff
+   * }
+   * 
+ */ + def spawn(body: => Unit): Unit = { + case object Spawn + new Actor() { + start + self ! Spawn + def receive = { + case Spawn => body; stop + } + } + } } /** + * FIXME document + * * @author Jonas Bonér */ -class ActorMessageInvoker(val actor: Actor) extends MessageInvoker { - def invoke(handle: MessageInvocation) = actor.invoke(handle) +class ActorMessageInvoker(val actorId: ActorID) extends MessageInvoker { + def invoke(handle: MessageInvocation) = actorId.actor.invoke(handle) } +/** + * ActorID is an immutable and serializable handle to an Actor. + * Create an ActorID for an Actor by using the factory method on the Actor object. + * Here is an example: + *
+ *   import Actor._
+ * 
+ *   val actor = newActor[MyActor]
+ *   actor.start
+ *   actor ! message
+ *   actor.stop
+ * 
+ * + * @author Jonas Bonér + */ final class ActorID private[akka] (private[akka] val actor: Actor) { + actor._actorID = Some(this) + if (actor eq null) throw new IllegalArgumentException("Actor instance passed to ActorID can not be 'null'") /** * Starts up the actor and its message queue. */ - def start = actor.start + def start: ActorID = { + actor.start + this + } /** * Shuts down the actor its dispatcher and message queue. @@ -251,9 +421,9 @@ final class ActorID private[akka] (private[akka] val actor: Actor) { */ def uuid = actor.uuid - override def toString = "ActorID[" + actor.toString + "]" - override def hashCode = actor.hashCode - override def equals(that: AnyRef) = actor.equals(that) + override def toString: String = "ActorID[" + actor.toString + "]" + override def hashCode: Int = actor.hashCode + override def equals(that: Any): Boolean = actor.equals(that) private[akka] def supervisor_=(sup: Option[ActorID]): Unit = actor._supervisor = sup @@ -267,151 +437,6 @@ final class ActorID private[akka] (private[akka] val actor: Actor) { private[akka] def faultHandler_=(handler: Option[FaultHandlingStrategy]) = actor.faultHandler = handler } -/** - * Utility class with factory methods for creating Actors. - * - * @author Jonas Bonér - */ -object Actor extends Logging { - val TIMEOUT = config.getInt("akka.actor.timeout", 5000) - val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false) - val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost") - val PORT = config.getInt("akka.remote.server.port", 9999) - - object Sender { - @deprecated("import Actor.Sender.Self is not needed anymore, just use 'actor ! msg'") - object Self - } - - def newActor[T <: Actor: Manifest]: ActorID = { - val actor = manifest[T].erasure.asInstanceOf[Class[T]].newInstance - new ActorID(actor) - } - - /** - * Use to create an anonymous event-driven actor. - *

- * The actor is created with a 'permanent' life-cycle configuration, which means that - * if the actor is supervised and dies it will be restarted. - *

- * The actor is started when created. - * Example: - *

-   * import Actor._
-   *
-   * val a = actor {
-   *   case msg => ... // handle message
-   * }
-   * 
- */ - def actor(body: PartialFunction[Any, Unit]): ActorID = - new ActorID(new Actor() { - lifeCycle = Some(LifeCycle(Permanent)) - start - def receive: PartialFunction[Any, Unit] = body - }) - - /** - * Use to create an anonymous transactional event-driven actor. - *

- * The actor is created with a 'permanent' life-cycle configuration, which means that - * if the actor is supervised and dies it will be restarted. - *

- * The actor is started when created. - * Example: - *

-   * import Actor._
-   *
-   * val a = transactor {
-   *   case msg => ... // handle message
-   * }
-   * 
- */ - def transactor(body: PartialFunction[Any, Unit]): ActorID = - new ActorID(new Transactor() { - lifeCycle = Some(LifeCycle(Permanent)) - start - def receive: PartialFunction[Any, Unit] = body - }) - - /** - * Use to create an anonymous event-driven actor with a 'temporary' life-cycle configuration, - * which means that if the actor is supervised and dies it will *not* be restarted. - *

- * The actor is started when created. - * Example: - *

-   * import Actor._
-   *
-   * val a = temporaryActor {
-   *   case msg => ... // handle message
-   * }
-   * 
- */ - def temporaryActor(body: PartialFunction[Any, Unit]): ActorID = - new ActorID(new Actor() { - lifeCycle = Some(LifeCycle(Temporary)) - start - def receive = body - }) - - /** - * Use to create an anonymous event-driven actor with both an init block and a message loop block. - *

- * The actor is created with a 'permanent' life-cycle configuration, which means that - * if the actor is supervised and dies it will be restarted. - *

- * The actor is started when created. - * Example: - *

-   * val a = Actor.init {
-   *   ... // init stuff
-   * } receive  {
-   *   case msg => ... // handle message
-   * }
-   * 
- * - */ - def init[A](body: => Unit) = { - def handler[A](body: => Unit) = new { - def receive(handler: PartialFunction[Any, Unit]) = - new ActorID(new Actor() { - lifeCycle = Some(LifeCycle(Permanent)) - start - body - def receive = handler - }) - } - handler(body) - } - - /** - * Use to spawn out a block of code in an event-driven actor. Will shut actor down when - * the block has been executed. - *

- * NOTE: If used from within an Actor then has to be qualified with 'Actor.spawn' since - * there is a method 'spawn[ActorType]' in the Actor trait already. - * Example: - *

-   * import Actor._
-   *
-   * spawn {
-   *   ... // do stuff
-   * }
-   * 
- */ - def spawn(body: => Unit): Unit = { - case object Spawn - new Actor() { - start - selfId ! Spawn - def receive = { - case Spawn => body; stop - } - } - } -} - /** * Actor base trait that should be extended by or mixed to create an Actor with the semantics of the 'Actor Model': * http://en.wikipedia.org/wiki/Actor_model @@ -620,10 +645,18 @@ trait Actor extends TransactionManagement with Logging { // ============= /** - * 'selfId' holds the ActorID for this actor. + * The 'self' field holds the ActorID for this actor. + * Can be used to send messages to itself: + *
+   * self ! message
+   * 
+ * Note: if you are using the 'self' field in the constructor of the Actor + * then you have to make the fields/operations that are using it 'lazy'. */ - def selfId: ActorID = - _actorID.getOrElse(throw new IllegalStateException("ActorID for actor " + toString + " is not available")) + def self: ActorID = _actorID.getOrElse(throw new ActorInitializationException( + "ActorID for actor " + toString + " is not available." + + "\n\tIf you are using the 'self' field in the constructor of the Actor" + + "\n\tthen you have to make the fields/operations that are using it 'lazy'")) /** * Starts up the actor and its message queue. @@ -631,14 +664,14 @@ trait Actor extends TransactionManagement with Logging { def start: Unit = synchronized { if (_isShutDown) throw new IllegalStateException("Can't restart an actor that has been shut down with 'stop' or 'exit'") if (!_isRunning) { - messageDispatcher.register(selfId) + messageDispatcher.register(self) messageDispatcher.start _isRunning = true init initTransactionalState } Actor.log.debug("[%s] has started", toString) - ActorRegistry.register(selfId) + ActorRegistry.register(self) } /** @@ -652,11 +685,11 @@ trait Actor extends TransactionManagement with Logging { */ def stop = synchronized { if (_isRunning) { - messageDispatcher.unregister(selfId) + messageDispatcher.unregister(self) _isRunning = false _isShutDown = true shutdown - ActorRegistry.unregister(selfId) + ActorRegistry.unregister(self) _remoteAddress.foreach(address => RemoteClient.unregister(address.getHostName, address.getPort, uuid)) } } @@ -725,9 +758,9 @@ trait Actor extends TransactionManagement with Logging { */ def dispatcher_=(md: MessageDispatcher): Unit = synchronized { if (!_isRunning) { - messageDispatcher.unregister(selfId) + messageDispatcher.unregister(self) messageDispatcher = md - messageDispatcher.register(selfId) + messageDispatcher.register(self) } else throw new IllegalArgumentException( "Can not swap dispatcher for " + toString + " after it has been started") } @@ -786,7 +819,7 @@ trait Actor extends TransactionManagement with Logging { if (actorId.supervisor.isDefined) throw new IllegalStateException( "Actor can only have one supervisor [" + actorId + "], e.g. link(actor) fails") getLinkedActors.add(actorId) - actorId.supervisor = Some(selfId) + actorId.supervisor = Some(self) Actor.log.debug("Linking actor [%s] to actor [%s]", actorId, this) } @@ -949,7 +982,7 @@ trait Actor extends TransactionManagement with Logging { RemoteProtocolBuilder.setMessage(message, requestBuilder) RemoteClient.clientFor(_remoteAddress.get).send[Any](requestBuilder.build, None) } else { - val invocation = new MessageInvocation(selfId, message, sender.map(Left(_)), transactionSet.get) + val invocation = new MessageInvocation(self, message, sender.map(Left(_)), transactionSet.get) if (messageDispatcher.usesActorMailbox) { _mailbox.add(invocation) if (_isSuspended) invocation.send @@ -982,7 +1015,7 @@ trait Actor extends TransactionManagement with Logging { } else { val future = if (senderFuture.isDefined) senderFuture.get else new DefaultCompletableFuture[T](timeout) - val invocation = new MessageInvocation(selfId, message, Some(Right(future.asInstanceOf[CompletableFuture[Any]])), transactionSet.get) + val invocation = new MessageInvocation(self, message, Some(Right(future.asInstanceOf[CompletableFuture[Any]])), transactionSet.get) if (messageDispatcher.usesActorMailbox) _mailbox.add(invocation) @@ -1027,9 +1060,9 @@ trait Actor extends TransactionManagement with Logging { _isKilled = true Actor.log.error(e, "Could not invoke actor [%s]", this) // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client - if (_supervisor.isDefined) _supervisor.get ! Exit(selfId, e) + if (_supervisor.isDefined) _supervisor.get ! Exit(self, e) replyTo match { - case Some(Right(future)) => future.completeWithException(selfId, e) + case Some(Right(future)) => future.completeWithException(self, e) case _ => } } finally { @@ -1078,7 +1111,7 @@ trait Actor extends TransactionManagement with Logging { Actor.log.error(e, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message) replyTo match { - case Some(Right(future)) => future.completeWithException(selfId, e) + case Some(Right(future)) => future.completeWithException(self, e) case _ => } @@ -1086,7 +1119,7 @@ trait Actor extends TransactionManagement with Logging { if (topLevelTransaction) clearTransactionSet // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client - if (_supervisor.isDefined) _supervisor.get ! Exit(selfId, e) + if (_supervisor.isDefined) _supervisor.get ! Exit(self, e) } finally { clearTransaction if (topLevelTransaction) clearTransactionSet @@ -1137,7 +1170,7 @@ trait Actor extends TransactionManagement with Logging { Actor.log.info("All linked actors have died permanently (they were all configured as TEMPORARY)" + "\n\tshutting down and unlinking supervisor actor as well [%s].", actor.id) - _supervisor.foreach(_ ! UnlinkAndStop(selfId)) + _supervisor.foreach(_ ! UnlinkAndStop(self)) } } } @@ -1155,7 +1188,7 @@ trait Actor extends TransactionManagement with Logging { private[akka] def registerSupervisorAsRemoteActor: Option[String] = synchronized { if (_supervisor.isDefined) { - RemoteClient.clientFor(_remoteAddress.get).registerSupervisorForActor(this) + RemoteClient.clientFor(_remoteAddress.get).registerSupervisorForActor(self) Some(_supervisor.get.uuid) } else None } @@ -1203,3 +1236,11 @@ trait Actor extends TransactionManagement with Logging { override def toString = "Actor[" + id + ":" + uuid + "]" } + +sealed abstract class DispatcherType +object DispatcherType { + case object EventBasedThreadPooledProxyInvokingDispatcher extends DispatcherType + case object EventBasedSingleThreadDispatcher extends DispatcherType + case object EventBasedThreadPoolDispatcher extends DispatcherType + case object ThreadBasedDispatcher extends DispatcherType +} diff --git a/akka-core/src/main/scala/actor/ActorRegistry.scala b/akka-core/src/main/scala/actor/ActorRegistry.scala index 5026d788d3..d59d2630d3 100644 --- a/akka-core/src/main/scala/actor/ActorRegistry.scala +++ b/akka-core/src/main/scala/actor/ActorRegistry.scala @@ -53,13 +53,13 @@ object ActorRegistry extends Logging { /** * Finds all actors that are subtypes of the class passed in as the Manifest argument. */ - def actorsFor[T <: Actor](implicit manifest: Manifest[T]): List[T] = { - val all = new ListBuffer[T] + def actorsFor[T <: Actor](implicit manifest: Manifest[T]): List[ActorID] = { + val all = new ListBuffer[ActorID] val elements = actorsByUUID.elements while (elements.hasMoreElements) { val actorId = elements.nextElement if (manifest.erasure.isAssignableFrom(actorId.actor.getClass)) { - all += actorId.actor.asInstanceOf[T] + all += actorId } } all.toList @@ -68,17 +68,16 @@ object ActorRegistry extends Logging { /** * Finds all actors of the exact type specified by the class passed in as the Class argument. */ - def actorsFor[T <: Actor](clazz: Class[T]): List[T] = { - if (actorsByClassName.containsKey(clazz.getName)) { - actorsByClassName.get(clazz.getName).asInstanceOf[List[T]] - } else Nil + def actorsFor[T <: Actor](clazz: Class[T]): List[ActorID] = { + if (actorsByClassName.containsKey(clazz.getName)) actorsByClassName.get(clazz.getName) + else Nil } /** * Finds all actors that has a specific id. */ def actorsFor(id: String): List[ActorID] = { - if (actorsById.containsKey(id)) actorsById.get(id).asInstanceOf[List[ActorID]] + if (actorsById.containsKey(id)) actorsById.get(id) else Nil } diff --git a/akka-core/src/main/scala/actor/Agent.scala b/akka-core/src/main/scala/actor/Agent.scala index b6a65423a3..0785288d10 100644 --- a/akka-core/src/main/scala/actor/Agent.scala +++ b/akka-core/src/main/scala/actor/Agent.scala @@ -105,17 +105,17 @@ sealed class Agent[T] private (initialValue: T) extends Transactor { private lazy val value = Ref[T]() - this ! Value(initialValue) + self ! Value(initialValue) /** * Periodically handles incoming messages. */ def receive = { - case Value(v: T) => + case Value(v: T) => swap(v) - case Function(fun: (T => T)) => + case Function(fun: (T => T)) => swap(fun(value.getOrWait)) - case Procedure(proc: (T => Unit)) => + case Procedure(proc: (T => Unit)) => proc(copyStrategy(value.getOrElse(throw new AgentException("Could not read Agent's value; value is null")))) } @@ -157,22 +157,22 @@ sealed class Agent[T] private (initialValue: T) extends Transactor { /** * Submits the provided function for execution against the internal agent's state. */ - final def apply(message: (T => T)): Unit = this ! Function(message) + final def apply(message: (T => T)): Unit = self ! Function(message) /** * Submits a new value to be set as the new agent's internal state. */ - final def apply(message: T): Unit = this ! Value(message) + final def apply(message: T): Unit = self ! Value(message) /** * Submits the provided function of type 'T => T' for execution against the internal agent's state. */ - final def send(message: (T) => T): Unit = this ! Function(message) + final def send(message: (T) => T): Unit = self ! Function(message) /** * Submits a new value to be set as the new agent's internal state. */ - final def send(message: T): Unit = this ! Value(message) + final def send(message: T): Unit = self ! Value(message) /** * Asynchronously submits a procedure of type 'T => Unit' to read the internal state. @@ -180,7 +180,7 @@ sealed class Agent[T] private (initialValue: T) extends Transactor { * of the internal state will be used, depending on the underlying effective copyStrategy. * Does not change the value of the agent (this). */ - final def sendProc(f: (T) => Unit): Unit = this ! Procedure(f) + final def sendProc(f: (T) => Unit): Unit = self ! Procedure(f) /** * Applies function with type 'T => B' to the agent's internal state and then returns a new agent with the result. diff --git a/akka-core/src/main/scala/actor/Scheduler.scala b/akka-core/src/main/scala/actor/Scheduler.scala index f85511bd28..1a96e1d0c9 100644 --- a/akka-core/src/main/scala/actor/Scheduler.scala +++ b/akka-core/src/main/scala/actor/Scheduler.scala @@ -26,12 +26,12 @@ case class SchedulerException(msg: String, e: Throwable) extends RuntimeExceptio * Rework of David Pollak's ActorPing class in the Lift Project * which is licensed under the Apache 2 License. */ -class ScheduleActor(val receiver: Actor, val future: ScheduledFuture[AnyRef]) extends Actor with Logging { +class ScheduleActor(val receiver: ActorID, val future: ScheduledFuture[AnyRef]) extends Actor with Logging { lifeCycle = Some(LifeCycle(Permanent)) def receive = { case UnSchedule => - Scheduler.stopSupervising(this) + Scheduler.stopSupervising(self) future.cancel(true) exit } @@ -39,18 +39,18 @@ class ScheduleActor(val receiver: Actor, val future: ScheduledFuture[AnyRef]) ex object Scheduler extends Actor { private var service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory) - private val schedulers = new ConcurrentHashMap[Actor, Actor] + private val schedulers = new ConcurrentHashMap[ActorID, ActorID] faultHandler = Some(OneForOneStrategy(5, 5000)) trapExit = List(classOf[Throwable]) start - def schedule(receiver: Actor, message: AnyRef, initialDelay: Long, delay: Long, timeUnit: TimeUnit) = { + def schedule(receiver: ActorID, message: AnyRef, initialDelay: Long, delay: Long, timeUnit: TimeUnit) = { try { - startLink(new ScheduleActor( + startLink(new ActorID(new ScheduleActor( receiver, service.scheduleAtFixedRate(new java.lang.Runnable { def run = receiver ! message; - }, initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]])) + }, initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]))) } catch { case e => throw SchedulerException(message + " could not be scheduled on " + receiver, e) } @@ -58,9 +58,9 @@ object Scheduler extends Actor { def restart = service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory) - def stopSupervising(actor: Actor) = { - unlink(actor) - schedulers.remove(actor) + def stopSupervising(actorId: ActorID) = { + unlink(actorId) + schedulers.remove(actorId) } override def shutdown = { diff --git a/akka-core/src/main/scala/actor/Supervisor.scala b/akka-core/src/main/scala/actor/Supervisor.scala index 9f8d230c65..ae3332b681 100644 --- a/akka-core/src/main/scala/actor/Supervisor.scala +++ b/akka-core/src/main/scala/actor/Supervisor.scala @@ -82,6 +82,8 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep trapExit = trapExceptions faultHandler = Some(handler) + + // FIXME should Supervisor really havea newThreadBasedDispatcher?? dispatcher = Dispatchers.newThreadBasedDispatcher(this) private val actors = new ConcurrentHashMap[String, List[ActorID]] @@ -144,8 +146,8 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep if (list eq null) List[ActorID]() else list } - actors.put(className, supervisor.selfId :: currentSupervisors) - link(supervisor.selfId) + actors.put(className, supervisor.self :: currentSupervisors) + link(supervisor.self) }) } } diff --git a/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala b/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala index e01f91f92a..2c7dafc3fc 100644 --- a/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala +++ b/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala @@ -7,7 +7,7 @@ package se.scalablesolutions.akka.config import com.google.inject._ import se.scalablesolutions.akka.config.ScalaConfig._ -import se.scalablesolutions.akka.actor.{Supervisor, ActiveObject, Dispatcher} +import se.scalablesolutions.akka.actor.{Supervisor, ActiveObject, Dispatcher, ActorID} import se.scalablesolutions.akka.remote.RemoteServer import se.scalablesolutions.akka.util.Logging @@ -94,7 +94,7 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat .actorsFor(RemoteServer.Address(component.remoteAddress.get.hostname, component.remoteAddress.get.port)) .activeObjects.put(targetClass.getName, proxy) } - supervised ::= Supervise(actor, component.lifeCycle) + supervised ::= Supervise(new ActorID(actor), component.lifeCycle) activeObjectRegistry.put(targetClass, (proxy, proxy, component)) new DependencyBinding(targetClass, proxy) } @@ -116,7 +116,7 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat .actorsFor(RemoteServer.Address(component.remoteAddress.get.hostname, component.remoteAddress.get.port)) .activeObjects.put(targetClass.getName, proxy) } - supervised ::= Supervise(actor, component.lifeCycle) + supervised ::= Supervise(new ActorID(actor), component.lifeCycle) activeObjectRegistry.put(targetClass, (proxy, targetInstance, component)) new DependencyBinding(targetClass, proxy) } diff --git a/akka-core/src/main/scala/dispatch/Dispatchers.scala b/akka-core/src/main/scala/dispatch/Dispatchers.scala index 2030d2026e..b3ce9567f3 100644 --- a/akka-core/src/main/scala/dispatch/Dispatchers.scala +++ b/akka-core/src/main/scala/dispatch/Dispatchers.scala @@ -4,7 +4,7 @@ package se.scalablesolutions.akka.dispatch -import se.scalablesolutions.akka.actor.Actor +import se.scalablesolutions.akka.actor.{Actor, ActorID} /** * Scala API. Dispatcher factory. @@ -40,7 +40,7 @@ import se.scalablesolutions.akka.actor.Actor */ object Dispatchers { object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global") { - override def register(actor: Actor) = { + override def register(actor: ActorID) = { if (isShutdown) init super.register(actor) } diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index d166c1956d..2360945f5e 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -172,7 +172,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess private def donateMessage(receiver: ActorID, thief: ActorID): Option[MessageInvocation] = { val donated = receiver._mailbox.pollLast if (donated != null) { - thief.selfId ! donated.message + thief.self ! donated.message return Some(donated) } else return None } diff --git a/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala b/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala index fbfffc999e..459964bf57 100644 --- a/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala @@ -7,7 +7,7 @@ package se.scalablesolutions.akka.dispatch import java.util.concurrent.LinkedBlockingQueue import java.util.Queue -import se.scalablesolutions.akka.actor.{Actor, ActorMessageInvoker} +import se.scalablesolutions.akka.actor.{Actor, ActorID, ActorMessageInvoker} /** * Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue. @@ -17,7 +17,7 @@ import se.scalablesolutions.akka.actor.{Actor, ActorMessageInvoker} class ThreadBasedDispatcher private[akka] (val name: String, val messageHandler: MessageInvoker) extends MessageDispatcher { - def this(actor: Actor) = this(actor.getClass.getName, new ActorMessageInvoker(actor)) + def this(actor: Actor) = this(actor.getClass.getName, new ActorMessageInvoker(new ActorID(actor))) private val queue = new BlockingMessageQueue(name) private var selectorThread: Thread = _ diff --git a/akka-core/src/main/scala/remote/Cluster.scala b/akka-core/src/main/scala/remote/Cluster.scala index f5b0dce52f..93c28029e5 100644 --- a/akka-core/src/main/scala/remote/Cluster.scala +++ b/akka-core/src/main/scala/remote/Cluster.scala @@ -212,19 +212,19 @@ abstract class BasicClusterActor extends ClusterActor with Logging { * Registers a local endpoint */ def registerLocalNode(hostname: String, port: Int): Unit = - selfId ! RegisterLocalNode(RemoteAddress(hostname, port)) + self ! RegisterLocalNode(RemoteAddress(hostname, port)) /** * Deregisters a local endpoint */ def deregisterLocalNode(hostname: String, port: Int): Unit = - selfId ! DeregisterLocalNode(RemoteAddress(hostname, port)) + self ! DeregisterLocalNode(RemoteAddress(hostname, port)) /** * Broadcasts the specified message to all Actors of type Class on all known Nodes */ def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit = - selfId ! RelayedMessage(to.getName, msg) + self ! RelayedMessage(to.getName, msg) } /** @@ -261,7 +261,7 @@ object Cluster extends Cluster with Logging { val sup = SupervisorFactory( SupervisorConfig( RestartStrategy(OneForOne, 5, 1000, List(classOf[Exception])), - Supervise(actor.selfId, LifeCycle(Permanent)) :: Nil) + Supervise(actor.self, LifeCycle(Permanent)) :: Nil) ).newInstance Some(sup) } diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala index 2557c33d02..425855f866 100644 --- a/akka-core/src/main/scala/remote/RemoteClient.scala +++ b/akka-core/src/main/scala/remote/RemoteClient.scala @@ -5,7 +5,7 @@ package se.scalablesolutions.akka.remote import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequest, RemoteReply} -import se.scalablesolutions.akka.actor.{Exit, Actor} +import se.scalablesolutions.akka.actor.{Exit, Actor, ActorID} import se.scalablesolutions.akka.dispatch.{DefaultCompletableFuture, CompletableFuture} import se.scalablesolutions.akka.util.{UUID, Logging} import se.scalablesolutions.akka.config.Config.config @@ -53,21 +53,21 @@ object RemoteClient extends Logging { // FIXME: simplify overloaded methods when we have Scala 2.8 - def actorFor(className: String, hostname: String, port: Int): Actor = + def actorFor(className: String, hostname: String, port: Int): ActorID = actorFor(className, className, 5000L, hostname, port) - def actorFor(actorId: String, className: String, hostname: String, port: Int): Actor = + def actorFor(actorId: String, className: String, hostname: String, port: Int): ActorID = actorFor(actorId, className, 5000L, hostname, port) - def actorFor(className: String, timeout: Long, hostname: String, port: Int): Actor = + def actorFor(className: String, timeout: Long, hostname: String, port: Int): ActorID = actorFor(className, className, timeout, hostname, port) - def actorFor(actorId: String, className: String, timeout: Long, hostname: String, port: Int): Actor = { + def actorFor(actorId: String, className: String, timeout: Long, hostname: String, port: Int): ActorID = new ActorID( new Actor { start val remoteClient = RemoteClient.clientFor(hostname, port) - override def postMessageToMailbox(message: Any, sender: Option[Actor]): Unit = { + override def postMessageToMailbox(message: Any, sender: Option[ActorID]): Unit = { val requestBuilder = RemoteRequest.newBuilder .setId(RemoteRequestIdFactory.nextId) .setTarget(className) @@ -77,10 +77,10 @@ object RemoteClient extends Logging { .setIsOneWay(true) .setIsEscaped(false) if (sender.isDefined) { - val s = sender.get - requestBuilder.setSourceTarget(s.getClass.getName) - requestBuilder.setSourceUuid(s.uuid) - val (host, port) = s._replyToAddress.map(a => (a.getHostName, a.getPort)).getOrElse((Actor.HOSTNAME, Actor.PORT)) + val sndr = sender.get.actor + requestBuilder.setSourceTarget(sndr.getClass.getName) + requestBuilder.setSourceUuid(sndr.uuid) + val (host, port) = sndr._replyToAddress.map(a => (a.getHostName, a.getPort)).getOrElse((Actor.HOSTNAME, Actor.PORT)) requestBuilder.setSourceHostname(host) requestBuilder.setSourcePort(port) } @@ -108,7 +108,7 @@ object RemoteClient extends Logging { def receive = {case _ => {}} } - } + ) def clientFor(hostname: String, port: Int): RemoteClient = clientFor(new InetSocketAddress(hostname, port)) @@ -174,8 +174,8 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging { @volatile private[remote] var isRunning = false private val futures = new ConcurrentHashMap[Long, CompletableFuture[_]] - private val supervisors = new ConcurrentHashMap[String, Actor] - private[remote] val listeners = new ConcurrentSkipListSet[Actor] + private val supervisors = new ConcurrentHashMap[String, ActorID] + private[remote] val listeners = new ConcurrentSkipListSet[ActorID] private val channelFactory = new NioClientSocketChannelFactory( Executors.newCachedThreadPool, @@ -200,7 +200,7 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging { val channel = connection.awaitUninterruptibly.getChannel openChannels.add(channel) if (!connection.isSuccess) { - listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientError(connection.getCause)) + listeners.toArray.foreach(l => l.asInstanceOf[ActorID] ! RemoteClientError(connection.getCause)) log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port) } isRunning = true @@ -232,21 +232,21 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging { } } else { val exception = new IllegalStateException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.") - listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientError(exception)) + listeners.toArray.foreach(l => l.asInstanceOf[ActorID] ! RemoteClientError(exception)) throw exception } - def registerSupervisorForActor(actor: Actor) = - if (!actor._supervisor.isDefined) throw new IllegalStateException("Can't register supervisor for " + actor + " since it is not under supervision") - else supervisors.putIfAbsent(actor._supervisor.get.uuid, actor) + def registerSupervisorForActor(actorId: ActorID) = + if (!actorId.supervisor.isDefined) throw new IllegalStateException("Can't register supervisor for " + actorId + " since it is not under supervision") + else supervisors.putIfAbsent(actorId.supervisor.get.uuid, actorId) - def deregisterSupervisorForActor(actor: Actor) = - if (!actor._supervisor.isDefined) throw new IllegalStateException("Can't unregister supervisor for " + actor + " since it is not under supervision") - else supervisors.remove(actor._supervisor.get.uuid) + def deregisterSupervisorForActor(actorId: ActorID) = + if (!actorId.supervisor.isDefined) throw new IllegalStateException("Can't unregister supervisor for " + actorId + " since it is not under supervision") + else supervisors.remove(actorId.supervisor.get.uuid) - def registerListener(actor: Actor) = listeners.add(actor) + def registerListener(actorId: ActorID) = listeners.add(actorId) - def deregisterListener(actor: Actor) = listeners.remove(actor) + def deregisterListener(actorId: ActorID) = listeners.remove(actorId) } /** @@ -254,7 +254,7 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging { */ class RemoteClientPipelineFactory(name: String, futures: ConcurrentMap[Long, CompletableFuture[_]], - supervisors: ConcurrentMap[String, Actor], + supervisors: ConcurrentMap[String, ActorID], bootstrap: ClientBootstrap, remoteAddress: SocketAddress, timer: HashedWheelTimer, @@ -285,7 +285,7 @@ class RemoteClientPipelineFactory(name: String, @ChannelHandler.Sharable class RemoteClientHandler(val name: String, val futures: ConcurrentMap[Long, CompletableFuture[_]], - val supervisors: ConcurrentMap[String, Actor], + val supervisors: ConcurrentMap[String, ActorID], val bootstrap: ClientBootstrap, val remoteAddress: SocketAddress, val timer: HashedWheelTimer, @@ -316,21 +316,21 @@ class RemoteClientHandler(val name: String, if (!supervisors.containsKey(supervisorUuid)) throw new IllegalStateException("Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found") val supervisedActor = supervisors.get(supervisorUuid) - if (!supervisedActor._supervisor.isDefined) + if (!supervisedActor.supervisor.isDefined) throw new IllegalStateException("Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed") - else supervisedActor._supervisor.get ! Exit(supervisedActor, parseException(reply)) + else supervisedActor.supervisor.get ! Exit(supervisedActor, parseException(reply)) } future.completeWithException(null, parseException(reply)) } futures.remove(reply.getId) } else { val exception = new IllegalArgumentException("Unknown message received in remote client handler: " + result) - client.listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientError(exception)) + client.listeners.toArray.foreach(l => l.asInstanceOf[ActorID] ! RemoteClientError(exception)) throw exception } } catch { case e: Exception => - client.listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientError(e)) + client.listeners.toArray.foreach(l => l.asInstanceOf[ActorID] ! RemoteClientError(e)) log.error("Unexpected exception in remote client handler: %s", e) throw e } @@ -345,7 +345,7 @@ class RemoteClientHandler(val name: String, // Wait until the connection attempt succeeds or fails. client.connection.awaitUninterruptibly if (!client.connection.isSuccess) { - client.listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientError(client.connection.getCause)) + client.listeners.toArray.foreach(l => l.asInstanceOf[ActorID] ! RemoteClientError(client.connection.getCause)) log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress) } } @@ -353,17 +353,17 @@ class RemoteClientHandler(val name: String, } override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - client.listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientConnected(client.hostname, client.port)) + client.listeners.toArray.foreach(l => l.asInstanceOf[ActorID] ! RemoteClientConnected(client.hostname, client.port)) log.debug("Remote client connected to [%s]", ctx.getChannel.getRemoteAddress) } override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - client.listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientDisconnected(client.hostname, client.port)) + client.listeners.toArray.foreach(l => l.asInstanceOf[ActorID] ! RemoteClientDisconnected(client.hostname, client.port)) log.debug("Remote client disconnected from [%s]", ctx.getChannel.getRemoteAddress) } override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { - client.listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientError(event.getCause)) + client.listeners.toArray.foreach(l => l.asInstanceOf[ActorID] ! RemoteClientError(event.getCause)) log.error(event.getCause, "Unexpected exception from downstream in remote client") event.getChannel.close } diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index 4db987827e..24cc83da60 100644 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -198,7 +198,7 @@ class RemoteServer extends Logging { /** * Register Remote Actor by the Actor's 'id' field. */ - def register(actor: Actor) = synchronized { + def register(actor: ActorID) = synchronized { if (_isRunning) { log.info("Registering server side remote actor [%s] with id [%s]", actor.getClass.getName, actor.getId) RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(actor.getId, actor) @@ -208,7 +208,7 @@ class RemoteServer extends Logging { /** * Register Remote Actor by a specific 'id' passed as argument. */ - def register(id: String, actor: Actor) = synchronized { + def register(id: String, actor: ActorID) = synchronized { if (_isRunning) { log.info("Registering server side remote actor [%s] with id [%s]", actor.getClass.getName, id) RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(id, actor) @@ -225,7 +225,7 @@ class RemoteServerPipelineFactory( val name: String, val openChannels: ChannelGroup, val loader: Option[ClassLoader], - val actors: JMap[String, Actor], + val actors: JMap[String, ActorID], val activeObjects: JMap[String, AnyRef]) extends ChannelPipelineFactory { import RemoteServer._ @@ -256,7 +256,7 @@ class RemoteServerHandler( val name: String, val openChannels: ChannelGroup, val applicationLoader: Option[ClassLoader], - val actors: JMap[String, Actor], + val actors: JMap[String, ActorID], val activeObjects: JMap[String, AnyRef]) extends SimpleChannelUpstreamHandler with Logging { val AW_PROXY_PREFIX = "$$ProxiedByAW".intern @@ -300,8 +300,8 @@ class RemoteServerHandler( private def dispatchToActor(request: RemoteRequest, channel: Channel) = { log.debug("Dispatching to remote actor [%s]", request.getTarget) - val actor = createActor(request.getTarget, request.getUuid, request.getTimeout) - actor.start + val actorId = createActor(request.getTarget, request.getUuid, request.getTimeout) + actorId.start val message = RemoteProtocolBuilder.getMessage(request) if (request.getIsOneWay) { @@ -310,19 +310,19 @@ class RemoteServerHandler( val targetClass = if (request.hasSourceTarget) request.getSourceTarget else request.getTarget - val remoteActor = createActor(targetClass, request.getSourceUuid, request.getTimeout) - if (!remoteActor.isRunning) { - remoteActor.makeRemote(request.getSourceHostname, request.getSourcePort) - remoteActor.start + val remoteActorId = createActor(targetClass, request.getSourceUuid, request.getTimeout) + if (!remoteActorId.isRunning) { + remoteActorId.makeRemote(request.getSourceHostname, request.getSourcePort) + remoteActorId.start } - actor.!(message)(Some(remoteActor)) + actorId.!(message)(Some(remoteActorId)) } else { // couldn't find a way to reply, send the message without a source/sender - actor ! message + actorId ! message } } else { try { - val resultOrNone = actor !! message + val resultOrNone = actorId !! message val result: AnyRef = if (resultOrNone.isDefined) resultOrNone.get else null log.debug("Returning result from actor invocation [%s]", result) val replyBuilder = RemoteReply.newBuilder @@ -440,9 +440,9 @@ class RemoteServerHandler( * If actor already created then just return it from the registry. * Does not start the actor. */ - private def createActor(name: String, uuid: String, timeout: Long): Actor = { - val actorOrNull = actors.get(uuid) - if (actorOrNull eq null) { + private def createActor(name: String, uuid: String, timeout: Long): ActorID = { + val actorIdOrNull = actors.get(uuid) + if (actorIdOrNull eq null) { try { log.info("Creating a new remote actor [%s:%s]", name, uuid) val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name) @@ -451,13 +451,14 @@ class RemoteServerHandler( newInstance._uuid = uuid newInstance.timeout = timeout newInstance._remoteAddress = None - actors.put(uuid, newInstance) - newInstance + val actorId = new ActorID(newInstance) + actors.put(uuid, actorId) + actorId } catch { case e => log.error(e, "Could not create remote actor instance") throw e } - } else actorOrNull + } else actorIdOrNull } } diff --git a/akka-core/src/main/scala/stm/DataFlowVariable.scala b/akka-core/src/main/scala/stm/DataFlowVariable.scala index 332ae5c14e..c040ea0cee 100644 --- a/akka-core/src/main/scala/stm/DataFlowVariable.scala +++ b/akka-core/src/main/scala/stm/DataFlowVariable.scala @@ -7,7 +7,7 @@ package se.scalablesolutions.akka.stm import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue} -import se.scalablesolutions.akka.actor.Actor +import se.scalablesolutions.akka.actor.{Actor, ActorID} import se.scalablesolutions.akka.dispatch.CompletableFuture /** @@ -26,7 +26,7 @@ import se.scalablesolutions.akka.actor.Actor import se.scalablesolutions.akka.dispatch.CompletableFuture def thread(body: => Unit) = { - val thread = new IsolatedEventBasedThread(body).start + val thread = new ActorID(new IsolatedEventBasedThread(body)).start thread ! Start thread } @@ -60,7 +60,7 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture private case object Get extends DataFlowVariableMessage private val value = new AtomicReference[Option[T]](None) - private val blockedReaders = new ConcurrentLinkedQueue[Actor] + private val blockedReaders = new ConcurrentLinkedQueue[ActorID] private class In[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor { timeout = TIME_OUT @@ -97,7 +97,7 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture } } - private[this] val in = new In(this) + private[this] val in = new ActorID(new In(this)) def <<(ref: DataFlowVariable[T]) = in ! Set(ref()) @@ -107,7 +107,7 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture val ref = value.get if (ref.isDefined) ref.get else { - val out = new Out(this) + val out = new ActorID(new Out(this)) blockedReaders.offer(out) val result = out !! Get out ! Exit