diff --git a/akka-core/src/main/scala/actor/ActiveObject.scala b/akka-core/src/main/scala/actor/ActiveObject.scala index 7a81953403..22c175075d 100644 --- a/akka-core/src/main/scala/actor/ActiveObject.scala +++ b/akka-core/src/main/scala/actor/ActiveObject.scala @@ -77,25 +77,25 @@ object ActiveObject { private[actor] val AW_PROXY_PREFIX = "$$ProxiedByAW".intern def newInstance[T](target: Class[T], timeout: Long): T = - newInstance(target, new Dispatcher(false, None), None, timeout) + newInstance(target, new ActorRef(() => new Dispatcher(false, None)), None, timeout) def newInstance[T](target: Class[T]): T = - newInstance(target, new Dispatcher(false, None), None, Actor.TIMEOUT) + newInstance(target, new ActorRef(() => new Dispatcher(false, None)), None, Actor.TIMEOUT) def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long): T = - newInstance(intf, target, new Dispatcher(false, None), None, timeout) + newInstance(intf, target, new ActorRef(() => new Dispatcher(false, None)), None, timeout) def newInstance[T](intf: Class[T], target: AnyRef): T = - newInstance(intf, target, new Dispatcher(false, None), None, Actor.TIMEOUT) + newInstance(intf, target, new ActorRef(() => new Dispatcher(false, None)), None, Actor.TIMEOUT) def newRemoteInstance[T](target: Class[T], timeout: Long, hostname: String, port: Int): T = - newInstance(target, new Dispatcher(false, None), Some(new InetSocketAddress(hostname, port)), timeout) + newInstance(target, new ActorRef(() => new Dispatcher(false, None)), Some(new InetSocketAddress(hostname, port)), timeout) def newRemoteInstance[T](target: Class[T], hostname: String, port: Int): T = - newInstance(target, new Dispatcher(false, None), Some(new InetSocketAddress(hostname, port)), Actor.TIMEOUT) + newInstance(target, new ActorRef(() => new Dispatcher(false, None)), Some(new InetSocketAddress(hostname, port)), Actor.TIMEOUT) def newInstance[T](target: Class[T], config: ActiveObjectConfiguration): T = { - val actor = new Dispatcher(config._transactionRequired, config._restartCallbacks) + val actor = new ActorRef(() => new Dispatcher(config._transactionRequired, config._restartCallbacks)) if (config._messageDispatcher.isDefined) { actor.messageDispatcher = config._messageDispatcher.get } @@ -103,199 +103,28 @@ object ActiveObject { } def newInstance[T](intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration): T = { - val actor = new Dispatcher(config._transactionRequired, config._restartCallbacks) + val actor = new ActorRef(() => new Dispatcher(config._transactionRequired, config._restartCallbacks)) if (config._messageDispatcher.isDefined) { actor.messageDispatcher = config._messageDispatcher.get } newInstance(intf, target, actor, config._host, config._timeout) } - @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") - def newInstance[T](target: Class[T], timeout: Long, restartCallbacks: Option[RestartCallbacks]): T = - newInstance(target, new Dispatcher(false, restartCallbacks), None, timeout) - - @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") - def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, restartCallbacks: Option[RestartCallbacks]): T = - newInstance(intf, target, new Dispatcher(false, restartCallbacks), None, timeout) - - @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") - def newInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean): T = - newInstance(target, new Dispatcher(transactionRequired, None), None, timeout) - - @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") - def newInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, restartCallbacks: Option[RestartCallbacks]): T = - newInstance(target, new Dispatcher(transactionRequired, restartCallbacks), None, timeout) - - @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") - def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean): T = - newInstance(intf, target, new Dispatcher(transactionRequired, None), None, timeout) - - @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") - def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, restartCallbacks: Option[RestartCallbacks]): T = - newInstance(intf, target, new Dispatcher(transactionRequired, restartCallbacks), None, timeout) - - @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") - def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, hostname: String, port: Int): T = - newInstance(intf, target, new Dispatcher(false, None), Some(new InetSocketAddress(hostname, port)), timeout) - - @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") - def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = - newInstance(intf, target, new Dispatcher(false, restartCallbacks), Some(new InetSocketAddress(hostname, port)), timeout) - - @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") - def newRemoteInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, hostname: String, port: Int): T = - newInstance(target, new Dispatcher(transactionRequired, None), Some(new InetSocketAddress(hostname, port)), timeout) - - @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") - def newRemoteInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = - newInstance(target, new Dispatcher(transactionRequired, restartCallbacks), Some(new InetSocketAddress(hostname, port)), timeout) - - @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") - def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, hostname: String, port: Int): T = - newInstance(intf, target, new Dispatcher(transactionRequired, None), Some(new InetSocketAddress(hostname, port)), timeout) - - @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") - def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = - newInstance(intf, target, new Dispatcher(transactionRequired, restartCallbacks), Some(new InetSocketAddress(hostname, port)), timeout) - - @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") - def newInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher): T = { - val actor = new Dispatcher(false, None) - actor.messageDispatcher = dispatcher - newInstance(target, actor, None, timeout) - } - - @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") - def newInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = { - val actor = new Dispatcher(false, restartCallbacks) - actor.messageDispatcher = dispatcher - newInstance(target, actor, None, timeout) - } - - @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") - def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher): T = { - val actor = new Dispatcher(false, None) - actor.messageDispatcher = dispatcher - newInstance(intf, target, actor, None, timeout) - } - - @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") - def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, - dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = { - val actor = new Dispatcher(false, restartCallbacks) - actor.messageDispatcher = dispatcher - newInstance(intf, target, actor, None, timeout) - } - - @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") - def newInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher): T = { - val actor = new Dispatcher(transactionRequired, None) - actor.messageDispatcher = dispatcher - newInstance(target, actor, None, timeout) - } - - @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") - def newInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, - dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = { - val actor = new Dispatcher(transactionRequired, restartCallbacks) - actor.messageDispatcher = dispatcher - newInstance(target, actor, None, timeout) - } - - @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") - def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher): T = { - val actor = new Dispatcher(transactionRequired, None) - actor.messageDispatcher = dispatcher - newInstance(intf, target, actor, None, timeout) - } - - @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") - def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, - dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = { - val actor = new Dispatcher(transactionRequired, restartCallbacks) - actor.messageDispatcher = dispatcher - newInstance(intf, target, actor, None, timeout) - } - - @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") - def newRemoteInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int): T = { - val actor = new Dispatcher(false, None) - actor.messageDispatcher = dispatcher - newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout) - } - - @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") - def newRemoteInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, - hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = { - val actor = new Dispatcher(false, restartCallbacks) - actor.messageDispatcher = dispatcher - newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout) - } - - @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") - def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int): T = { - val actor = new Dispatcher(false, None) - actor.messageDispatcher = dispatcher - newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout) - } - - @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") - def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, - hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = { - val actor = new Dispatcher(false, restartCallbacks) - actor.messageDispatcher = dispatcher - newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout) - } - - @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") - def newRemoteInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, - dispatcher: MessageDispatcher, hostname: String, port: Int): T = { - val actor = new Dispatcher(transactionRequired, None) - actor.messageDispatcher = dispatcher - newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout) - } - - @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") - def newRemoteInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, - hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = { - val actor = new Dispatcher(transactionRequired, restartCallbacks) - actor.messageDispatcher = dispatcher - newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout) - } - - @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") - def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, - dispatcher: MessageDispatcher, hostname: String, port: Int): T = { - val actor = new Dispatcher(transactionRequired, None) - actor.messageDispatcher = dispatcher - newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout) - } - - @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") - def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, - dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = { - val actor = new Dispatcher(transactionRequired, restartCallbacks) - actor.messageDispatcher = dispatcher - newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout) - } - - private[akka] def newInstance[T](target: Class[T], actor: Dispatcher, remoteAddress: Option[InetSocketAddress], timeout: Long): T = { + private[akka] def newInstance[T](target: Class[T], actorRef: ActorRef, remoteAddress: Option[InetSocketAddress], timeout: Long): T = { val proxy = Proxy.newInstance(target, false, false) - actor.initialize(target, proxy) - actor.timeout = timeout - if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get) - val actorRef = new ActorRef(() => actor) + actorRef.actor.asInstanceOf[Dispatcher].initialize(target, proxy) + actorRef.actor.timeout = timeout + if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get) AspectInitRegistry.register(proxy, AspectInit(target, actorRef, remoteAddress, timeout)) actorRef.start proxy.asInstanceOf[T] } - private[akka] def newInstance[T](intf: Class[T], target: AnyRef, actor: Dispatcher, remoteAddress: Option[InetSocketAddress], timeout: Long): T = { + private[akka] def newInstance[T](intf: Class[T], target: AnyRef, actorRef: ActorRef, remoteAddress: Option[InetSocketAddress], timeout: Long): T = { val proxy = Proxy.newInstance(Array(intf), Array(target), false, false) - actor.initialize(target.getClass, target) - actor.timeout = timeout - if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get) - val actorRef = new ActorRef(() => actor) + actorRef.actor.asInstanceOf[Dispatcher].initialize(target.getClass, target) + actorRef.actor.timeout = timeout + if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get) AspectInitRegistry.register(proxy, AspectInit(intf, actorRef, remoteAddress, timeout)) actorRef.start proxy.asInstanceOf[T] @@ -313,8 +142,10 @@ object ActiveObject { * @param supervised the active object to link */ def link(supervisor: AnyRef, supervised: AnyRef) = { - val supervisorActor = actorFor(supervisor).getOrElse(throw new IllegalStateException("Can't link when the supervisor is not an active object")) - val supervisedActor = actorFor(supervised).getOrElse(throw new IllegalStateException("Can't link when the supervised is not an active object")) + val supervisorActor = actorFor(supervisor).getOrElse( + throw new IllegalStateException("Can't link when the supervisor is not an active object")) + val supervisedActor = actorFor(supervised).getOrElse( + throw new IllegalStateException("Can't link when the supervised is not an active object")) supervisorActor !! Link(supervisedActor) } @@ -326,8 +157,10 @@ object ActiveObject { * @param trapExceptions array of exceptions that should be handled by the supervisor */ def link(supervisor: AnyRef, supervised: AnyRef, handler: FaultHandlingStrategy, trapExceptions: Array[Class[_ <: Throwable]]) = { - val supervisorActor = actorFor(supervisor).getOrElse(throw new IllegalStateException("Can't link when the supervisor is not an active object")) - val supervisedActor = actorFor(supervised).getOrElse(throw new IllegalStateException("Can't link when the supervised is not an active object")) + val supervisorActor = actorFor(supervisor).getOrElse( + throw new IllegalStateException("Can't link when the supervisor is not an active object")) + val supervisedActor = actorFor(supervised).getOrElse( + throw new IllegalStateException("Can't link when the supervised is not an active object")) supervisorActor.trapExit = trapExceptions.toList supervisorActor.faultHandler = Some(handler) supervisorActor !! Link(supervisedActor) @@ -339,8 +172,10 @@ object ActiveObject { * @param supervised the active object to unlink */ def unlink(supervisor: AnyRef, supervised: AnyRef) = { - val supervisorActor = actorFor(supervisor).getOrElse(throw new IllegalStateException("Can't unlink when the supervisor is not an active object")) - val supervisedActor = actorFor(supervised).getOrElse(throw new IllegalStateException("Can't unlink when the supervised is not an active object")) + val supervisorActor = actorFor(supervisor).getOrElse( + throw new IllegalStateException("Can't unlink when the supervisor is not an active object")) + val supervisedActor = actorFor(supervised).getOrElse( + throw new IllegalStateException("Can't unlink when the supervised is not an active object")) supervisorActor !! Unlink(supervisedActor) } @@ -350,7 +185,8 @@ object ActiveObject { * @param trapExceptions array of exceptions that should be handled by the supervisor */ def trapExit(supervisor: AnyRef, trapExceptions: Array[Class[_ <: Throwable]]) = { - val supervisorActor = actorFor(supervisor).getOrElse(throw new IllegalStateException("Can't set trap exceptions when the supervisor is not an active object")) + val supervisorActor = actorFor(supervisor).getOrElse( + throw new IllegalStateException("Can't set trap exceptions when the supervisor is not an active object")) supervisorActor.trapExit = trapExceptions.toList this } @@ -361,17 +197,14 @@ object ActiveObject { * @param handler fault handling strategy */ def faultHandler(supervisor: AnyRef, handler: FaultHandlingStrategy) = { - val supervisorActor = actorFor(supervisor).getOrElse(throw new IllegalStateException("Can't set fault handler when the supervisor is not an active object")) + val supervisorActor = actorFor(supervisor).getOrElse( + throw new IllegalStateException("Can't set fault handler when the supervisor is not an active object")) supervisorActor.faultHandler = Some(handler) this } - private[akka] def supervise(restartStrategy: RestartStrategy, components: List[Supervise]): Supervisor = { - val factory = SupervisorFactory(SupervisorConfig(restartStrategy, components)) - val supervisor = factory.newInstance - supervisor.start - supervisor - } + private[akka] def supervise(restartStrategy: RestartStrategy, components: List[Supervise]): ActorRef = + SupervisorFactory(SupervisorConfig(restartStrategy, components)).newInstance.start } private[akka] object AspectInitRegistry { @@ -453,7 +286,7 @@ private[akka] sealed class ActiveObjectAspect { .setIsOneWay(oneWay_?) .setIsEscaped(false) RemoteProtocolBuilder.setMessage(message, requestBuilder) - val id = actorRef.actor.registerSupervisorAsRemoteActor + val id = actorRef.registerSupervisorAsRemoteActor if (id.isDefined) requestBuilder.setSupervisorUuid(id.get) val remoteMessage = requestBuilder.build val future = RemoteClient.clientFor(remoteAddress.get).send(remoteMessage, None) @@ -555,11 +388,13 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op preRestart = Some(try { targetInstance.getClass.getDeclaredMethod(pre, ZERO_ITEM_CLASS_ARRAY: _*) } catch { case e => throw new IllegalStateException( - "Could not find pre restart method [" + pre + "] \nin [" + targetClass.getName + "]. \nIt must have a zero argument definition.") }) + "Could not find pre restart method [" + pre + "] \nin [" + + targetClass.getName + "]. \nIt must have a zero argument definition.") }) postRestart = Some(try { targetInstance.getClass.getDeclaredMethod(post, ZERO_ITEM_CLASS_ARRAY: _*) } catch { case e => throw new IllegalStateException( - "Could not find post restart method [" + post + "] \nin [" + targetClass.getName + "]. \nIt must have a zero argument definition.") }) + "Could not find post restart method [" + post + "] \nin [" + + targetClass.getName + "]. \nIt must have a zero argument definition.") }) } // See if we have any annotation defined restart callbacks @@ -568,17 +403,20 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op if (preRestart.isDefined && preRestart.get.getParameterTypes.length != 0) throw new IllegalStateException( - "Method annotated with @prerestart or defined as a restart callback in \n[" + targetClass.getName + "] must have a zero argument definition") + "Method annotated with @prerestart or defined as a restart callback in \n[" + + targetClass.getName + "] must have a zero argument definition") if (postRestart.isDefined && postRestart.get.getParameterTypes.length != 0) throw new IllegalStateException( - "Method annotated with @postrestart or defined as a restart callback in \n[" + targetClass.getName + "] must have a zero argument definition") + "Method annotated with @postrestart or defined as a restart callback in \n[" + + targetClass.getName + "] must have a zero argument definition") if (preRestart.isDefined) preRestart.get.setAccessible(true) if (postRestart.isDefined) postRestart.get.setAccessible(true) // see if we have a method annotated with @inittransactionalstate, if so invoke it initTxState = methods.find(m => m.isAnnotationPresent(Annotations.inittransactionalstate)) - if (initTxState.isDefined && initTxState.get.getParameterTypes.length != 0) throw new IllegalStateException("Method annotated with @inittransactionalstate must have a zero argument definition") + if (initTxState.isDefined && initTxState.get.getParameterTypes.length != 0) + throw new IllegalStateException("Method annotated with @inittransactionalstate must have a zero argument definition") if (initTxState.isDefined) initTxState.get.setAccessible(true) } @@ -594,19 +432,19 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op throw new IllegalStateException("Unexpected message [" + unexpected + "] sent to [" + this + "]") } - override protected def preRestart(reason: Throwable) { + override def preRestart(reason: Throwable) { try { if (preRestart.isDefined) preRestart.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*) } catch { case e: InvocationTargetException => throw e.getCause } } - override protected def postRestart(reason: Throwable) { + override def postRestart(reason: Throwable) { try { if (postRestart.isDefined) postRestart.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*) } catch { case e: InvocationTargetException => throw e.getCause } } - override protected def initTransactionalState = { + override def initTransactionalState = { try { if (initTxState.isDefined && target.isDefined) initTxState.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*) } catch { case e: InvocationTargetException => throw e.getCause } diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 76fcfa0f15..df3b237029 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -11,19 +11,19 @@ import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.stm.Transaction.Global._ import se.scalablesolutions.akka.stm.TransactionManagement._ import se.scalablesolutions.akka.stm.TransactionManagement +import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequestProtocol, RemoteReplyProtocol, ActorRefProtocol} +import se.scalablesolutions.akka.remote.{RemoteNode, RemoteServer, RemoteClient, RemoteProtocolBuilder, RemoteRequestProtocolIdFactory} import se.scalablesolutions.akka.serialization.Serializer import se.scalablesolutions.akka.util.{HashCode, Logging, UUID} -import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol -import se.scalablesolutions.akka.remote.{RemoteNode, RemoteServer, RemoteClient, RemoteActorProxy, RemoteProtocolBuilder, RemoteRequestProtocolIdFactory} import org.multiverse.api.ThreadLocalTransaction._ import org.multiverse.commitbarriers.CountDownCommitBarrier import jsr166x.{Deque, ConcurrentLinkedDeque} -import java.util.HashSet import java.net.InetSocketAddress import java.util.concurrent.locks.{Lock, ReentrantLock} +import java.util.{HashSet => JHashSet} /** * Implements the Transactor abstraction. E.g. a transactional actor. @@ -122,11 +122,10 @@ object Actor extends Logging { * */ def actor(body: PartialFunction[Any, Unit]): ActorRef = - new ActorRef(() => new Actor() { + newActor(() => new Actor() { lifeCycle = Some(LifeCycle(Permanent)) - start def receive: PartialFunction[Any, Unit] = body - }) + }).start /** * Use to create an anonymous transactional event-driven actor. @@ -145,11 +144,10 @@ object Actor extends Logging { * */ def transactor(body: PartialFunction[Any, Unit]): ActorRef = - new ActorRef(() => new Transactor() { + newActor(() => new Transactor() { lifeCycle = Some(LifeCycle(Permanent)) - start def receive: PartialFunction[Any, Unit] = body - }) + }).start /** * Use to create an anonymous event-driven actor with a 'temporary' life-cycle configuration, @@ -166,11 +164,10 @@ object Actor extends Logging { * */ def temporaryActor(body: PartialFunction[Any, Unit]): ActorRef = - new ActorRef(() => new Actor() { + newActor(() => new Actor() { lifeCycle = Some(LifeCycle(Temporary)) - start def receive = body - }) + }).start /** * Use to create an anonymous event-driven actor with both an init block and a message loop block. @@ -192,12 +189,11 @@ object Actor extends Logging { def init[A](body: => Unit) = { def handler[A](body: => Unit) = new { def receive(handler: PartialFunction[Any, Unit]) = - new ActorRef(() => new Actor() { + newActor(() => new Actor() { lifeCycle = Some(LifeCycle(Permanent)) - start body def receive = handler - }) + }).start } handler(body) } @@ -219,50 +215,45 @@ object Actor extends Logging { */ def spawn(body: => Unit): Unit = { case object Spawn - new Actor() { - start + newActor(() => new Actor() { self ! Spawn def receive = { case Spawn => body; stop } - } - } - - /** - * Starts the specified actor and returns it, useful for simplifying code such as: - *
-   *  val actor = new FooActor
-   *  actor.start
-   * 
- * can be replaced with: - *
-   *  import Actor._
-   *
-   *  val actor = start(new FooActor)
-   * 
- */ - def start[T <: Actor](actor : T) : T = { - actor.start - actor + }).start } } /** - * The ActorRef object can be used to create ActorRef instances out of its binary - * protobuf based representation. + * The ActorRef object can be used to deserialize ActorRef instances from of its binary representation + * or its Protocol Buffers (protobuf) Message representation to a new ActorRef instance. + *

+ * Binary -> ActorRef: *

  *   val actorRef = ActorRef.fromBinary(bytes)
  *   actorRef ! message // send message to remote actor through its reference
  * 
- * + *

+ * Protobuf Message -> ActorRef: + *

+ *   val actorRef = ActorRef.fromProtocol(protobufMessage)
+ *   actorRef ! message // send message to remote actor through its reference
+ * 
* @author Jonas Bonér */ object ActorRef { - def fromBinary(bytes: Array[Byte]): ActorRef = - fromProtocol(RemoteProtocol.ActorRefProtocol.newBuilder.mergeFrom(bytes).build) - def fromProtocol(protocol: RemoteProtocol.ActorRefProtocol): ActorRef = - RemoteActorProxy( + /** + * Deserializes the ActorRef instance from a byte array (Array[Byte]) into an ActorRef instance. + */ + def fromBinary(bytes: Array[Byte]): ActorRef = + fromProtocol(ActorRefProtocol.newBuilder.mergeFrom(bytes).build) + + /** + * Deserializes the ActorRef instance from a Protocol Buffers (protobuf) Message into an ActorRef instance. + */ + def fromProtocol(protocol: ActorRefProtocol): ActorRef = + RemoteActorRef( protocol.getUuid, protocol.getActorClassName, protocol.getSourceHostname, @@ -296,29 +287,66 @@ object ActorRef { * * @author Jonas Bonér */ -final class ActorRef private[akka] () { - private[akka] var actorFactory: Either[Option[Class[_ <: Actor]], Option[() => Actor]] = Left(None) - - private[akka] lazy val actor: Actor = { - val actor = actorFactory match { - case Left(Some(clazz)) => - try { - clazz.newInstance - } catch { - case e: InstantiationException => throw new ActorInitializationException( - "Could not instantiate Actor due to:\n" + e + - "\nMake sure Actor is defined inside a class/trait," + - "\nif so put it outside the class/trait, f.e. in a companion object.") - } - case Right(Some(factory)) => - factory() - case _ => - throw new ActorInitializationException("Can't create Actor, no Actor class or factory function in scope") - } - if (actor eq null) throw new ActorInitializationException("Actor instance passed to ActorRef can not be 'null'") - actor - } +sealed class ActorRef private[akka] () extends TransactionManagement { + import java.util.concurrent.atomic.AtomicReference + // Only mutable for RemoteServer in order to maintain identity across nodes + private[akka] var _uuid = UUID.newUuid.toString + + @volatile private[this] var _isRunning = false + @volatile private[this] var _isSuspended = true + @volatile private[this] var _isShutDown = false + @volatile private[akka] var _isKilled = false + @volatile private[akka] var _registeredInRemoteNodeDuringSerialization = false + private[akka] var _remoteAddress: Option[InetSocketAddress] = None + private[akka] var _linkedActors: Option[JHashSet[ActorRef]] = None + private[akka] var _supervisor: Option[ActorRef] = None + private[akka] var _replyToAddress: Option[InetSocketAddress] = None + private[akka] val _mailbox: Deque[MessageInvocation] = new ConcurrentLinkedDeque[MessageInvocation] + + /** + * User overridable callback/setting. + *

+ * Set to true if messages should have REQUIRES_NEW semantics, e.g. a new transaction should + * start if there is no one running, else it joins the existing transaction. + */ + @volatile private[akka] var isTransactor = false + + /** + * This lock ensures thread safety in the dispatching: only one message can + * be dispatched at once on the actor. + */ + private[akka] val _dispatcherLock: Lock = new ReentrantLock + + /** + * User overridable callback/setting. + *

+ * The default dispatcher is the Dispatchers.globalExecutorBasedEventDrivenDispatcher. + * This means that all actors will share the same event-driven executor based dispatcher. + *

+ * You can override it so it fits the specific use-case that the actor is used for. + * See the se.scalablesolutions.akka.dispatch.Dispatchers class for the different + * dispatchers available. + *

+ * The default is also that all actors that are created and spawned from within this actor + * is sharing the same dispatcher as its creator. + */ + private[akka] var messageDispatcher: MessageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher + + /** + * Holds the reference to the sender of the currently processed message. + * - Is None if no sender was specified + * - Is Some(Left(Actor)) if sender is an actor + * - Is Some(Right(CompletableFuture)) if sender is holding on to a Future for the result + */ + private[akka] var replyTo: Option[Either[ActorRef, CompletableFuture[Any]]] = None + + private[this] var actorFactory: Either[Option[Class[_ <: Actor]], Option[() => Actor]] = Left(None) + + private[this] lazy val actorInstance: AtomicReference[Actor] = new AtomicReference[Actor](newActor) + + private[akka] def actor: Actor = actorInstance.get + private[akka] def this(clazz: Class[_ <: Actor]) = { this() actorFactory = Left(Some(clazz)) @@ -327,69 +355,7 @@ final class ActorRef private[akka] () { private[akka] def this(factory: () => Actor) = { this() actorFactory = Right(Some(factory)) - } - - def toProtocol: RemoteProtocol.ActorRefProtocol = { - val (host, port) = actor._replyToAddress.map(address => - (address.getHostName, address.getPort)) - .getOrElse((Actor.HOSTNAME, Actor.PORT)) - - if (!actor._registeredInRemoteNodeDuringSerialization) { - Actor.log.debug("Register serialized Actor [%s] as remote @ [%s:%s]", actorClass.getName, host, port) - if (RemoteServer.serverFor(host, port).isEmpty) (new RemoteServer).start(host, port) - RemoteServer.actorsFor(RemoteServer.Address(host, port)).actors.put(uuid, this) - actor._registeredInRemoteNodeDuringSerialization = true - } - - RemoteProtocol.ActorRefProtocol.newBuilder - .setUuid(uuid) - .setActorClassName(actorClass.getName) - .setSourceHostname(host) - .setSourcePort(port) - .setTimeout(timeout) - .build - } - - def toBinary: Array[Byte] = toProtocol.toByteArray - - /** - * Returns the class for the Actor instance that is managed by the ActorRef. - */ - def actorClass: Class[_ <: Actor] = actor.getClass.asInstanceOf[Class[_ <: Actor]] - - /** - * Starts up the actor and its message queue. - */ - def start: ActorRef = { - actor.start - this - } - - /** - * Shuts down the actor its dispatcher and message queue. - * Alias for 'stop'. - */ - protected def exit: Unit = actor.stop - - /** - * Shuts down the actor its dispatcher and message queue. - */ - def stop: Unit = actor.stop - - /** - * Is the actor running? - */ - def isRunning: Boolean = actor.isRunning - - /** - * Returns the mailbox size. - */ - def mailboxSize: Int = actor.mailboxSize - - /** - * Returns the supervisor, if there is one. - */ - def supervisor: Option[ActorRef] = actor.supervisor + } /** * Sends a one-way asynchronous message. E.g. fire-and-forget semantics. @@ -406,8 +372,8 @@ final class ActorRef private[akka] () { *

*/ def !(message: Any)(implicit sender: Option[ActorRef] = None) = { - if (actor.isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") - if (actor.isRunning) actor.postMessageToMailbox(message, sender) + if (isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") + if (isRunning) postMessageToMailbox(message, sender) else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") } @@ -424,9 +390,9 @@ final class ActorRef private[akka] () { * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ def !![T](message: Any, timeout: Long): Option[T] = { - if (actor.isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") - if (actor.isRunning) { - val future = actor.postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, None) + if (isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") + if (isRunning) { + val future = postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, None) val isActiveObject = message.isInstanceOf[Invocation] if (isActiveObject && message.asInstanceOf[Invocation].isVoid) future.asInstanceOf[CompletableFuture[Option[_]]].completeWithResult(None) @@ -459,7 +425,7 @@ final class ActorRef private[akka] () { * If you are sending messages using !! then you have to use reply(..) * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ - def !![T](message: Any)(implicit sender: Option[ActorRef] = None): Option[T] = !![T](message, actor.timeout) + def !![T](message: Any)(implicit sender: Option[ActorRef] = None): Option[T] = !![T](message, timeout) /** * Sends a message asynchronously returns a future holding the eventual reply message. @@ -471,10 +437,9 @@ final class ActorRef private[akka] () { * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ def !!![T](message: Any): Future[T] = { - if (actor.isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") - if (actor.isRunning) { - actor.postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, actor.timeout, None) - } else throw new IllegalStateException( + if (isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") + if (isRunning) postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, None) + else throw new IllegalStateException( "Actor has not been started, you need to invoke 'actor.start' before using it") } @@ -484,44 +449,84 @@ final class ActorRef private[akka] () { * Works with '!', '!!' and '!!!'. */ def forward(message: Any)(implicit sender: Some[ActorRef]) = { - if (actor.isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") - if (actor.isRunning) { - sender.get.actor.replyTo match { - case Some(Left(actorRef)) => actor.postMessageToMailbox(message, Some(actorRef)) - case Some(Right(future)) => actor.postMessageToMailboxAndCreateFutureResultWithTimeout(message, actor.timeout, Some(future)) - case _ => throw new IllegalStateException("Can't forward message when initial sender is not an actor") + if (isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") + if (isRunning) { + sender.get.replyTo match { + case Some(Left(actorRef)) => postMessageToMailbox(message, Some(actorRef)) + case Some(Right(future)) => postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, Some(future)) + case _ => throw new IllegalStateException("Can't forward message when initial sender is not an actor") } } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") } /** - * Get the dispatcher for this actor. + * Serializes the ActorRef instance into a Protocol Buffers (protobuf) Message. */ - def dispatcher: MessageDispatcher = actor.messageDispatcher + def toProtocol: ActorRefProtocol = { + val (host, port) = _replyToAddress.map(address => + (address.getHostName, address.getPort)) + .getOrElse((Actor.HOSTNAME, Actor.PORT)) + if (!_registeredInRemoteNodeDuringSerialization) { + Actor.log.debug("Register serialized Actor [%s] as remote @ [%s:%s]", actorClass.getName, host, port) + if (RemoteServer.serverFor(host, port).isEmpty) (new RemoteServer).start(host, port) + RemoteServer.actorsFor(RemoteServer.Address(host, port)).actors.put(uuid, this) + _registeredInRemoteNodeDuringSerialization = true + } + + ActorRefProtocol.newBuilder + .setUuid(uuid) + .setActorClassName(actorClass.getName) + .setSourceHostname(host) + .setSourcePort(port) + .setTimeout(timeout) + .build + } + + /** + * Serializes the ActorRef instance into a byte array (Array[Byte]). + */ + def toBinary: Array[Byte] = toProtocol.toByteArray + + /** + * Returns the class for the Actor instance that is managed by the ActorRef. + */ + def actorClass: Class[_ <: Actor] = actor.getClass.asInstanceOf[Class[_ <: Actor]] + /** * Sets the dispatcher for this actor. Needs to be invoked before the actor is started. */ - def dispatcher_=(md: MessageDispatcher): Unit = actor.dispatcher = md + def dispatcher_=(md: MessageDispatcher): Unit = synchronized { + if (!isRunning) { + messageDispatcher.unregister(this) + messageDispatcher = md + messageDispatcher.register(this) + } else throw new IllegalArgumentException( + "Can not swap dispatcher for " + toString + " after it has been started") + } + + /** + * Get the dispatcher for this actor. + */ + def dispatcher: MessageDispatcher = messageDispatcher /** * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host. */ def makeRemote(hostname: String, port: Int): Unit = - if (actor.isRunning) throw new IllegalStateException("Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.") - else actor.makeRemote(new InetSocketAddress(hostname, port)) + if (isRunning) throw new IllegalStateException("Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.") + else makeRemote(new InetSocketAddress(hostname, port)) /** * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host. */ - def makeRemote(address: InetSocketAddress): Unit = actor.makeRemote(address) - - /** - * Set the contact address for this actor. This is used for replying to messages sent asynchronously when no reply channel exists. - */ - def setReplyToAddress(hostname: String, port: Int): Unit = actor.setReplyToAddress(new InetSocketAddress(hostname, port)) - - def setReplyToAddress(address: InetSocketAddress): Unit = actor.setReplyToAddress(address) + def makeRemote(address: InetSocketAddress): Unit = + if (isRunning) throw new IllegalStateException("Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.") + else { + _remoteAddress = Some(address) + RemoteClient.register(address.getHostName, address.getPort, uuid) + if (_replyToAddress.isEmpty) setReplyToAddress(Actor.HOSTNAME, Actor.PORT) + } /** * Invoking 'makeTransactionRequired' means that the actor will **start** a new transaction if non exists. @@ -531,7 +536,21 @@ final class ActorRef private[akka] () { * TransactionManagement.disableTransactions * */ - def makeTransactionRequired = actor.makeTransactionRequired + def makeTransactionRequired = actor.synchronized { + if (isRunning) throw new IllegalArgumentException( + "Can not make actor transaction required after it has been started") + else isTransactor = true + } + + /** + * Set the contact address for this actor. This is used for replying to messages sent asynchronously when no reply channel exists. + */ + def setReplyToAddress(hostname: String, port: Int): Unit = setReplyToAddress(new InetSocketAddress(hostname, port)) + + /** + * Set the contact address for this actor. This is used for replying to messages sent asynchronously when no reply channel exists. + */ + def setReplyToAddress(address: InetSocketAddress): Unit = _replyToAddress = Some(address) /** * Returns the id for the actor. @@ -541,23 +560,209 @@ final class ActorRef private[akka] () { /** * Returns the uuid for the actor. */ - def uuid = actor.uuid + def uuid = _uuid /** * Returns the remote address for the actor, if any, else None. */ - def remoteAddress: Option[InetSocketAddress] = actor._remoteAddress + def remoteAddress: Option[InetSocketAddress] = _remoteAddress /** - * Returns the default timeout for the actor. + * User overridable callback/setting. + *

+ * Defines the default timeout for '!!' and '!!!' invocations, + * e.g. the timeout for the future returned by the call to '!!' and '!!!'. */ def timeout: Long = actor.timeout - override def toString: String = "ActorRef[" + actor.toString + "]" + /** + * Sets the default timeout for '!!' and '!!!' invocations, + * e.g. the timeout for the future returned by the call to '!!' and '!!!'. + */ + def timeout_=(t: Long) = actor.timeout = t + + /** + * Starts up the actor and its message queue. + */ + def start: ActorRef = synchronized { + if (_isShutDown) throw new IllegalStateException("Can't restart an actor that has been shut down with 'stop' or 'exit'") + if (!isRunning) { + messageDispatcher.register(this) + messageDispatcher.start + _isRunning = true + actor.init + actor.initTransactionalState + } + Actor.log.debug("[%s] has started", toString) + ActorRegistry.register(this) + this + } + + /** + * Shuts down the actor its dispatcher and message queue. + * Alias for 'stop'. + */ + def exit = stop + + /** + * Shuts down the actor its dispatcher and message queue. + */ + def stop = synchronized { + if (isRunning) { + messageDispatcher.unregister(this) + _isRunning = false + _isShutDown = true + actor.shutdown + ActorRegistry.unregister(this) + _remoteAddress.foreach(address => RemoteClient.unregister(address.getHostName, address.getPort, uuid)) + RemoteNode.unregister(this) + } + } + + /** + * Links an other actor to this actor. Links are unidirectional and means that a the linking actor will + * receive a notification if the linked actor has crashed. + *

+ * If the 'trapExit' member field has been set to at contain at least one exception class then it will + * 'trap' these exceptions and automatically restart the linked actors according to the restart strategy + * defined by the 'faultHandler'. + *

+ * To be invoked from within the actor itself. + */ + def link(actorRef: ActorRef) = { + if (actorRef.supervisor.isDefined) throw new IllegalStateException( + "Actor can only have one supervisor [" + actorRef + "], e.g. link(actor) fails") + linkedActors.add(actorRef) + actorRef.supervisor = Some(this) + Actor.log.debug("Linking actor [%s] to actor [%s]", actorRef, this) + } + + /** + * Unlink the actor. + *

+ * To be invoked from within the actor itself. + */ + def unlink(actorRef: ActorRef) = { + if (!linkedActors.contains(actorRef)) throw new IllegalStateException( + "Actor [" + actorRef + "] is not a linked actor, can't unlink") + linkedActors.remove(actorRef) + actorRef.supervisor = None + Actor.log.debug("Unlinking actor [%s] from actor [%s]", actorRef, this) + } + + /** + * Atomically start and link an actor. + *

+ * To be invoked from within the actor itself. + */ + def startLink(actorRef: ActorRef) = { + try { + actorRef.start + } finally { + link(actorRef) + } + } + + /** + * Atomically start, link and make an actor remote. + *

+ * To be invoked from within the actor itself. + */ + def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int) = { + try { + actorRef.makeRemote(hostname, port) + actorRef.start + } finally { + link(actorRef) + } + } + + /** + * Atomically create (from actor class) and start an actor. + *

+ * To be invoked from within the actor itself. + */ + def spawn[T <: Actor : Manifest]: ActorRef = { + val actorRef = spawnButDoNotStart[T] + actorRef.start + actorRef + } + + /** + * Atomically create (from actor class), start and make an actor remote. + *

+ * To be invoked from within the actor itself. + */ + def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef = { + val actor = spawnButDoNotStart[T] + actor.makeRemote(hostname, port) + actor.start + actor + } + + /** + * Atomically create (from actor class), start and link an actor. + *

+ * To be invoked from within the actor itself. + */ + def spawnLink[T <: Actor: Manifest]: ActorRef = { + val actor = spawnButDoNotStart[T] + try { + actor.start + } finally { + link(actor) + } + actor + } + + /** + * Atomically create (from actor class), start, link and make an actor remote. + *

+ * To be invoked from within the actor itself. + */ + def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef = { + val actor = spawnButDoNotStart[T] + try { + actor.makeRemote(hostname, port) + actor.start + } finally { + link(actor) + } + actor + } + + /** + * Is the actor killed? + */ + def isKilled: Boolean = _isKilled + + /** + * Is the actor running? + */ + def isRunning: Boolean = _isRunning + + /** + * Is the actor shut down? + */ + def isShutdown: Boolean = !_isRunning + + /** + * Returns the mailbox size. + */ + def mailboxSize: Int = _mailbox.size + + /** + * Returns the supervisor, if there is one. + */ + def supervisor: Option[ActorRef] = _supervisor + + override def toString: String = actor.toString + override def hashCode: Int = actor.hashCode + override def equals(that: Any): Boolean = actor.equals(that) - private[akka] def supervisor_=(sup: Option[ActorRef]): Unit = actor._supervisor = sup + private[akka] def supervisor_=(sup: Option[ActorRef]): Unit = _supervisor = sup private[akka] def trapExit: List[Class[_ <: Throwable]] = actor.trapExit private[akka] def trapExit_=(exits: List[Class[_ <: Throwable]]) = actor.trapExit = exits @@ -567,6 +772,303 @@ final class ActorRef private[akka] () { private[akka] def faultHandler: Option[FaultHandlingStrategy] = actor.faultHandler private[akka] def faultHandler_=(handler: Option[FaultHandlingStrategy]) = actor.faultHandler = handler + + private def spawnButDoNotStart[T <: Actor: Manifest]: ActorRef = { + val actor = manifest[T].erasure.asInstanceOf[Class[T]].newInstance + val actorRef = new ActorRef(() => actor) + if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) { + actorRef.dispatcher = dispatcher + } + actorRef + } + + private[this] def newActor: Actor = { + val actor = actorFactory match { + case Left(Some(clazz)) => + try { + clazz.newInstance + } catch { + case e: InstantiationException => throw new ActorInitializationException( + "Could not instantiate Actor due to:\n" + e + + "\nMake sure Actor is defined inside a class/trait," + + "\nif so put it outside the class/trait, f.e. in a companion object.") + } + case Right(Some(factory)) => + factory() + case _ => + throw new ActorInitializationException("Can't create Actor, no Actor class or factory function in scope") + } + actor._selfOption = Some(this) + if (actor eq null) throw new ActorInitializationException("Actor instance passed to ActorRef can not be 'null'") + actor + } + + private[akka] def restart(reason: Throwable): Unit = { + Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id) + val failedActor = actorInstance.get + failedActor.synchronized { + Actor.log.debug("Restarting linked actors for actor [%s].", id) + restartLinkedActors(reason) + Actor.log.debug("Invoking 'preRestart' for failed actor instance [%s].", id) + failedActor.preRestart(reason) + stop + val freshActor = newActor + freshActor.synchronized { + actorInstance.set(freshActor) + start + Actor.log.debug("Invoking 'postRestart' for new actor instance [%s].", id) + freshActor.postRestart(reason) + } + _isKilled = false + } + } + + + protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = { + joinTransaction(message) + + if (_remoteAddress.isDefined) { + val requestBuilder = RemoteRequestProtocol.newBuilder + .setId(RemoteRequestProtocolIdFactory.nextId) + .setTarget(this.getClass.getName) + .setTimeout(this.timeout) + .setUuid(this.uuid) + .setIsActor(true) + .setIsOneWay(true) + .setIsEscaped(false) + + val id = registerSupervisorAsRemoteActor + if (id.isDefined) requestBuilder.setSupervisorUuid(id.get) + + senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol)) + + RemoteProtocolBuilder.setMessage(message, requestBuilder) + RemoteClient.clientFor(_remoteAddress.get).send[Any](requestBuilder.build, None) + } else { + val invocation = new MessageInvocation(this, message, senderOption.map(Left(_)), transactionSet.get) + if (messageDispatcher.usesActorMailbox) { + _mailbox.add(invocation) + if (_isSuspended) invocation.send + } + else invocation.send + } + } + + protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( + message: Any, + timeout: Long, + senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { + joinTransaction(message) + + if (_remoteAddress.isDefined) { + val requestBuilder = RemoteRequestProtocol.newBuilder + .setId(RemoteRequestProtocolIdFactory.nextId) + .setTarget(this.getClass.getName) + .setTimeout(this.timeout) + .setUuid(this.uuid) + .setIsActor(true) + .setIsOneWay(false) + .setIsEscaped(false) + + //senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol)) + RemoteProtocolBuilder.setMessage(message, requestBuilder) + + val id = registerSupervisorAsRemoteActor + if (id.isDefined) requestBuilder.setSupervisorUuid(id.get) + + val future = RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build, senderFuture) + if (future.isDefined) future.get + else throw new IllegalStateException("Expected a future from remote call to actor " + toString) + } else { + val future = if (senderFuture.isDefined) senderFuture.get + else new DefaultCompletableFuture[T](timeout) + val invocation = new MessageInvocation( + this, message, Some(Right(future.asInstanceOf[CompletableFuture[Any]])), transactionSet.get) + if (messageDispatcher.usesActorMailbox) _mailbox.add(invocation) + invocation.send + future + } + } + + private def joinTransaction(message: Any) = if (isTransactionSetInScope) { + // FIXME test to run bench without this trace call + Actor.log.trace("Joining transaction set [%s];\n\tactor %s\n\twith message [%s]", + getTransactionSetInScope, toString, message) + getTransactionSetInScope.incParties + } + + /** + * Callback for the dispatcher. E.g. single entry point to the user code and all protected[this] methods. + */ + private[akka] def invoke(messageHandle: MessageInvocation) = actor.synchronized { + try { + if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle) + else dispatch(messageHandle) + } catch { + case e => + Actor.log.error(e, "Could not invoke actor [%s]", this) + throw e + } + } + + private def dispatch[T](messageHandle: MessageInvocation) = { + setTransactionSet(messageHandle.transactionSet) + + val message = messageHandle.message //serializeMessage(messageHandle.message) + replyTo = messageHandle.replyTo + + try { + if (actor.base.isDefinedAt(message)) actor.base(message) // invoke user actor's receive partial function + else throw new IllegalArgumentException("No handler matching message [" + message + "] in " + toString) + } catch { + case e => + _isKilled = true + Actor.log.error(e, "Could not invoke actor [%s]", toString) + // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client + if (_supervisor.isDefined) _supervisor.get ! Exit(this, e) + replyTo match { + case Some(Right(future)) => future.completeWithException(this, e) + case _ => + } + } finally { + clearTransaction + } + } + + private def transactionalDispatch[T](messageHandle: MessageInvocation) = { + var topLevelTransaction = false + val txSet: Option[CountDownCommitBarrier] = + if (messageHandle.transactionSet.isDefined) messageHandle.transactionSet + else { + topLevelTransaction = true // FIXME create a new internal atomic block that can wait for X seconds if top level tx + if (isTransactor) { + Actor.log.trace("Creating a new transaction set (top-level transaction)\n\tfor actor %s\n\twith message %s", + toString, messageHandle) + Some(createNewTransactionSet) + } else None + } + setTransactionSet(txSet) + + val message = messageHandle.message //serializeMessage(messageHandle.message) + replyTo = messageHandle.replyTo + + def proceed = { + if (actor.base.isDefinedAt(message)) actor.base(message) // invoke user actor's receive partial function + else throw new IllegalArgumentException( + toString + " could not process message [" + message + "]" + + "\n\tsince no matching 'case' clause in its 'receive' method could be found") + setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit + } + + try { + if (isTransactor) { + atomic { + proceed + } + } else proceed + } catch { + case e: IllegalStateException => {} + case e => + // abort transaction set + if (isTransactionSetInScope) try { + getTransactionSetInScope.abort + } catch { case e: IllegalStateException => {} } + Actor.log.error(e, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message) + + replyTo match { + case Some(Right(future)) => future.completeWithException(this, e) + case _ => + } + + clearTransaction + if (topLevelTransaction) clearTransactionSet + + // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client + if (_supervisor.isDefined) _supervisor.get ! Exit(this, e) + } finally { + clearTransaction + if (topLevelTransaction) clearTransactionSet + } + } + + private[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = { + if (trapExit.exists(_.isAssignableFrom(reason.getClass))) { + if (faultHandler.isDefined) { + faultHandler.get match { + // FIXME: implement support for maxNrOfRetries and withinTimeRange in RestartStrategy + case AllForOneStrategy(maxNrOfRetries, withinTimeRange) => restartLinkedActors(reason) + case OneForOneStrategy(maxNrOfRetries, withinTimeRange) => dead.restart(reason) + } + } else throw new IllegalStateException( + "No 'faultHandler' defined for an actor with the 'trapExit' member field defined " + + "\n\tto non-empty list of exception classes - can't proceed " + toString) + } else _supervisor.foreach(_ ! Exit(dead, reason)) // if 'trapExit' is not defined then pass the Exit on + } + + private[akka] def restartLinkedActors(reason: Throwable) = { + linkedActors.toArray.toList.asInstanceOf[List[ActorRef]].foreach { actorRef => + if (actorRef.lifeCycle.isEmpty) actorRef.lifeCycle = Some(LifeCycle(Permanent)) + actorRef.lifeCycle.get match { + case LifeCycle(scope, _) => { + scope match { + case Permanent => + actorRef.restart(reason) + case Temporary => + Actor.log.info("Actor [%s] configured as TEMPORARY and will not be restarted.", actorRef.id) + actorRef.stop + linkedActors.remove(actorRef) // remove the temporary actor + // if last temporary actor is gone, then unlink me from supervisor + if (linkedActors.isEmpty) { + Actor.log.info("All linked actors have died permanently (they were all configured as TEMPORARY)" + + "\n\tshutting down and unlinking supervisor actor as well [%s].", + actorRef.id) + _supervisor.foreach(_ ! UnlinkAndStop(this)) + } + } + } + } + } + } + + private[akka] def registerSupervisorAsRemoteActor: Option[String] = synchronized { + if (_supervisor.isDefined) { + RemoteClient.clientFor(_remoteAddress.get).registerSupervisorForActor(this) + Some(_supervisor.get.uuid) + } else None + } + + private[akka] def linkedActors: JHashSet[ActorRef] = { + if (_linkedActors.isEmpty) { + val set = new JHashSet[ActorRef] + _linkedActors = Some(set) + set + } else _linkedActors.get + } + + private def serializeMessage(message: AnyRef): AnyRef = if (Actor.SERIALIZE_MESSAGES) { + if (!message.isInstanceOf[String] && + !message.isInstanceOf[Byte] && + !message.isInstanceOf[Int] && + !message.isInstanceOf[Long] && + !message.isInstanceOf[Float] && + !message.isInstanceOf[Double] && + !message.isInstanceOf[Boolean] && + !message.isInstanceOf[Char] && + !message.isInstanceOf[Tuple2[_, _]] && + !message.isInstanceOf[Tuple3[_, _, _]] && + !message.isInstanceOf[Tuple4[_, _, _, _]] && + !message.isInstanceOf[Tuple5[_, _, _, _, _]] && + !message.isInstanceOf[Tuple6[_, _, _, _, _, _]] && + !message.isInstanceOf[Tuple7[_, _, _, _, _, _, _]] && + !message.isInstanceOf[Tuple8[_, _, _, _, _, _, _, _]] && + !message.getClass.isArray && + !message.isInstanceOf[List[_]] && + !message.isInstanceOf[scala.collection.immutable.Map[_, _]] && + !message.isInstanceOf[scala.collection.immutable.Set[_]] && + !message.getClass.isAnnotationPresent(Annotations.immutable)) { + Serializer.Java.deepClone(message) + } else message + } else message } /** @@ -582,12 +1084,11 @@ final class ActorRef private[akka] () { * * @author Jonas Bonér */ -trait Actor extends TransactionManagement with Logging { - // Only mutable for RemoteServer in order to maintain identity across nodes - private[akka] var _uuid = UUID.newUuid.toString +trait Actor extends Logging { /** * The 'self' field holds the ActorRef for this actor. + *

* Can be used to send messages to itself: *

    * self ! message
@@ -595,44 +1096,30 @@ trait Actor extends TransactionManagement with Logging {
    * Note: if you are using the 'self' field in the constructor of the Actor
    *       then you have to make the fields/operations that are using it 'lazy'.
    */
-   implicit val self = new ActorRef(() => this)
-
-   /** For internal use only */
-   implicit val _selfOption = Some(self)
-
-  // ====================================
-  // private fields
-  // ====================================
-
-  @volatile private[this] var _isRunning = false
-  @volatile private[this] var _isSuspended = true
-  @volatile private[this] var _isShutDown = false
-  @volatile private[akka] var _isKilled = false
-  @volatile private[akka] var _registeredInRemoteNodeDuringSerialization = false
-  private var _hotswap: Option[PartialFunction[Any, Unit]] = None
-  private[akka] var _remoteAddress: Option[InetSocketAddress] = None
-  private[akka] var _linkedActors: Option[HashSet[ActorRef]] = None
-  private[akka] var _supervisor: Option[ActorRef] = None
-  private[akka] var _replyToAddress: Option[InetSocketAddress] = None
-  private[akka] val _mailbox: Deque[MessageInvocation] = new ConcurrentLinkedDeque[MessageInvocation]
+   protected[akka] def self: ActorRef = _selfOption.getOrElse(throw new IllegalStateException(
+     "ActorRef for instance of Actor [" + getClass.getName + "] is not in scope." + 
+     "\n\tAre you using 'self' within the constructor (the class body) of the Actor?" + 
+     "\n\tIf so you have to refactor and make all fields that uses the 'self' reference lazy," + 
+     "\n\tand move all operations that uses 'self' out of the constructor."))
+   
+   /** 
+    * For internal use only.
+    */
+  protected[akka] implicit var _selfOption: Option[ActorRef] = None
 
   /**
-   * This lock ensures thread safety in the dispatching: only one message can
-   * be dispatched at once on the actor.
+   * Holds the hot swapped partial function.
    */
-  private[akka] val _dispatcherLock: Lock = new ReentrantLock
+  private var _hotswap: Option[PartialFunction[Any, Unit]] = None // FIXME: _hotswap should be a stack
+
+  // ==================
+  // ==== USER API ====
+  // ==================
 
   /**
-   * Holds the reference to the sender of the currently processed message.
-   * - Is None if no sender was specified
-   * - Is Some(Left(Actor)) if sender is an actor
-   * - Is Some(Right(CompletableFuture)) if sender is holding on to a Future for the result
+   * Use to override the default dispatcher.
    */
-  private[akka] var replyTo: Option[Either[ActorRef, CompletableFuture[Any]]] = None
-
-  // ====================================
-  // ==== USER CALLBACKS TO OVERRIDE ====
-  // ====================================
+  def dispatcher_=(dispatcher: MessageDispatcher) = self.dispatcher = dispatcher
 
   /**
    * User overridable callback/setting.
@@ -655,21 +1142,6 @@ trait Actor extends TransactionManagement with Logging {
    */
   @volatile var timeout: Long = Actor.TIMEOUT
 
-  /**
-   * User overridable callback/setting.
-   * 

- * The default dispatcher is the Dispatchers.globalExecutorBasedEventDrivenDispatcher. - * This means that all actors will share the same event-driven executor based dispatcher. - *

- * You can override it so it fits the specific use-case that the actor is used for. - * See the se.scalablesolutions.akka.dispatch.Dispatchers class for the different - * dispatchers available. - *

- * The default is also that all actors that are created and spawned from within this actor - * is sharing the same dispatcher as its creator. - */ - protected[akka] var messageDispatcher: MessageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher - /** * User overridable callback/setting. *

@@ -710,13 +1182,53 @@ trait Actor extends TransactionManagement with Logging { */ @volatile var lifeCycle: Option[LifeCycle] = None + /** + * User overridable callback/setting. + *

+ * Optional callback method that is called during initialization. + * To be implemented by subclassing actor. + */ + def init {} + + /** + * User overridable callback/setting. + *

+ * Mandatory callback method that is called during restart and reinitialization after a server crash. + * To be implemented by subclassing actor. + */ + def preRestart(reason: Throwable) {} + + /** + * User overridable callback/setting. + *

+ * Mandatory callback method that is called during restart and reinitialization after a server crash. + * To be implemented by subclassing actor. + */ + def postRestart(reason: Throwable) {} + + /** + * User overridable callback/setting. + *

+ * Optional callback method that is called during termination. + * To be implemented by subclassing actor. + */ + def initTransactionalState {} + + /** + * User overridable callback/setting. + *

+ * Optional callback method that is called during termination. + * To be implemented by subclassing actor. + */ + def shutdown {} + /** * User overridable callback/setting. *

* Set to true if messages should have REQUIRES_NEW semantics, e.g. a new transaction should * start if there is no one running, else it joins the existing transaction. */ - @volatile protected var isTransactor = false + protected def isTransactor_=(flag: Boolean) = self.isTransactor = flag /** * User overridable callback/setting. @@ -741,108 +1253,6 @@ trait Actor extends TransactionManagement with Logging { */ protected def receive: PartialFunction[Any, Unit] - /** - * User overridable callback/setting. - *

- * Optional callback method that is called during initialization. - * To be implemented by subclassing actor. - */ - protected def init {} - - /** - * User overridable callback/setting. - *

- * Mandatory callback method that is called during restart and reinitialization after a server crash. - * To be implemented by subclassing actor. - */ - protected def preRestart(reason: Throwable) {} - - /** - * User overridable callback/setting. - *

- * Mandatory callback method that is called during restart and reinitialization after a server crash. - * To be implemented by subclassing actor. - */ - protected def postRestart(reason: Throwable) {} - - /** - * User overridable callback/setting. - *

- * Optional callback method that is called during termination. - * To be implemented by subclassing actor. - */ - protected def initTransactionalState {} - - /** - * User overridable callback/setting. - *

- * Optional callback method that is called during termination. - * To be implemented by subclassing actor. - */ - protected def shutdown {} - - // ============= - // ==== API ==== - // ============= - - /** - * Starts up the actor and its message queue. - */ - def start: Unit = synchronized { - if (_isShutDown) throw new IllegalStateException("Can't restart an actor that has been shut down with 'stop' or 'exit'") - if (!_isRunning) { - messageDispatcher.register(self) - messageDispatcher.start - _isRunning = true - init - initTransactionalState - } - Actor.log.debug("[%s] has started", toString) - ActorRegistry.register(self) - } - - /** - * Shuts down the actor its dispatcher and message queue. - * Alias for 'stop'. - */ - protected def exit = stop - - /** - * Shuts down the actor its dispatcher and message queue. - */ - def stop = synchronized { - if (_isRunning) { - messageDispatcher.unregister(self) - _isRunning = false - _isShutDown = true - shutdown - ActorRegistry.unregister(self) - _remoteAddress.foreach(address => RemoteClient.unregister(address.getHostName, address.getPort, uuid)) - RemoteNode.unregister(self) - } - } - - /** - * Is the actor killed? - */ - def isKilled: Boolean = _isKilled - - /** - * Is the actor running? - */ - def isRunning: Boolean = _isRunning - - /** - * Returns the mailbox size. - */ - def mailboxSize: Int = _mailbox.size - - - /** - * Returns the supervisor, if there is one. - */ - def supervisor: Option[ActorRef] = _supervisor - /** * Use reply(..) to reply with a message to the original sender of the message currently * being processed. @@ -865,7 +1275,7 @@ trait Actor extends TransactionManagement with Logging { *

* Returns true if reply was sent, and false if unable to determine what to reply to. */ - protected[this] def reply_?(message: Any): Boolean = replyTo match { + protected[this] def reply_?(message: Any): Boolean = self.replyTo match { case Some(Left(actor)) => actor ! message true @@ -875,50 +1285,22 @@ trait Actor extends TransactionManagement with Logging { case _ => false } + + /** + * Starts the actor. + */ + def start = self.start /** - * Get the dispatcher for this actor. + * Shuts down the actor its dispatcher and message queue. + * Alias for 'stop'. */ - def dispatcher: MessageDispatcher = messageDispatcher + def exit = self.stop /** - * Sets the dispatcher for this actor. Needs to be invoked before the actor is started. + * Shuts down the actor its dispatcher and message queue. */ - def dispatcher_=(md: MessageDispatcher): Unit = synchronized { - if (!_isRunning) { - messageDispatcher.unregister(self) - messageDispatcher = md - messageDispatcher.register(self) - } else throw new IllegalArgumentException( - "Can not swap dispatcher for " + toString + " after it has been started") - } - - /** - * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host. - */ - def makeRemote(hostname: String, port: Int): Unit = - if (_isRunning) throw new IllegalStateException("Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.") - else makeRemote(new InetSocketAddress(hostname, port)) - - /** - * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host. - */ - def makeRemote(address: InetSocketAddress): Unit = - if (_isRunning) throw new IllegalStateException("Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.") - else { - _remoteAddress = Some(address) - RemoteClient.register(address.getHostName, address.getPort, uuid) - if (_replyToAddress.isEmpty) setReplyToAddress(Actor.HOSTNAME, Actor.PORT) - } - - - /** - * Set the contact address for this actor. This is used for replying to messages sent - * asynchronously when no reply channel exists. - */ - def setReplyToAddress(hostname: String, port: Int): Unit = setReplyToAddress(new InetSocketAddress(hostname, port)) - - def setReplyToAddress(address: InetSocketAddress): Unit = _replyToAddress = Some(address) + def stop = self.stop /** * Invoking 'makeTransactionRequired' means that the actor will **start** a new transaction if non exists. @@ -928,12 +1310,30 @@ trait Actor extends TransactionManagement with Logging { * TransactionManagement.disableTransactions *

*/ - def makeTransactionRequired = synchronized { - if (_isRunning) throw new IllegalArgumentException( - "Can not make actor transaction required after it has been started") - else isTransactor = true - } + def makeTransactionRequired = self.makeTransactionRequired + /** + * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host. + */ + def makeRemote(hostname: String, port: Int): Unit = makeRemote(new InetSocketAddress(hostname, port)) + + /** + * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host. + */ + def makeRemote(address: InetSocketAddress): Unit = self.makeRemote(address) + + /** + * Set the contact address for this actor. This is used for replying to messages sent + * asynchronously when no reply channel exists. + */ + def setReplyToAddress(hostname: String, port: Int): Unit = self.setReplyToAddress(new InetSocketAddress(hostname, port)) + + /** + * Set the contact address for this actor. This is used for replying to messages sent + * asynchronously when no reply channel exists. + */ + def setReplyToAddress(address: InetSocketAddress): Unit = self.setReplyToAddress(address) + /** * Links an other actor to this actor. Links are unidirectional and means that a the linking actor will * receive a notification if the linked actor has crashed. @@ -944,107 +1344,51 @@ trait Actor extends TransactionManagement with Logging { *

* To be invoked from within the actor itself. */ - protected[this] def link(actorRef: ActorRef) = { - if (actorRef.supervisor.isDefined) throw new IllegalStateException( - "Actor can only have one supervisor [" + actorRef + "], e.g. link(actor) fails") - getLinkedActors.add(actorRef) - actorRef.supervisor = Some(self) - Actor.log.debug("Linking actor [%s] to actor [%s]", actorRef, this) - } + protected[this] def link(actorRef: ActorRef) = self.link(actorRef) /** * Unlink the actor. *

* To be invoked from within the actor itself. */ - protected[this] def unlink(actorRef: ActorRef) = { - if (!getLinkedActors.contains(actorRef)) throw new IllegalStateException( - "Actor [" + actorRef + "] is not a linked actor, can't unlink") - getLinkedActors.remove(actorRef) - actorRef.supervisor = None - Actor.log.debug("Unlinking actor [%s] from actor [%s]", actorRef, this) - } + protected[this] def unlink(actorRef: ActorRef) = self.unlink(actorRef) /** * Atomically start and link an actor. *

* To be invoked from within the actor itself. */ - protected[this] def startLink(actorRef: ActorRef) = { - try { - actorRef.start - } finally { - link(actorRef) - } - } + protected[this] def startLink(actorRef: ActorRef) = self.startLink(actorRef) /** * Atomically start, link and make an actor remote. *

* To be invoked from within the actor itself. */ - protected[this] def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int) = { - try { - actorRef.makeRemote(hostname, port) - actorRef.start - } finally { - link(actorRef) - } - } + protected[this] def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int) = + self.startLinkRemote(actorRef, hostname, port) /** * Atomically create (from actor class) and start an actor. *

* To be invoked from within the actor itself. */ - protected[this] def spawn[T <: Actor : Manifest]: ActorRef = { - val actorRef = spawnButDoNotStart[T] - actorRef.start - actorRef - } + protected[this] def spawn[T <: Actor : Manifest]: ActorRef = self.spawn[T] /** * Atomically create (from actor class), start and make an actor remote. *

* To be invoked from within the actor itself. */ - protected[this] def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef = { - val actor = spawnButDoNotStart[T] - actor.makeRemote(hostname, port) - actor.start - actor - } + protected[this] def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef = + self.spawnRemote[T](hostname, port) /** * Atomically create (from actor class), start and link an actor. *

* To be invoked from within the actor itself. */ - protected[this] def spawnLink[T <: Actor: Manifest]: ActorRef = { - val actor = spawnButDoNotStart[T] - try { - actor.start - } finally { - link(actor) - } - actor - } - - /** - * Atomically create (from actor class), start, link and make an actor remote. - *

- * To be invoked from within the actor itself. - */ - protected[this] def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef = { - val actor = spawnButDoNotStart[T] - try { - actor.makeRemote(hostname, port) - actor.start - } finally { - link(actor) - } - actor - } + protected[this] def spawnLink[T <: Actor: Manifest]: ActorRef = self.spawnLink[T] /** * Returns the id for the actor. @@ -1054,297 +1398,29 @@ trait Actor extends TransactionManagement with Logging { /** * Returns the uuid for the actor. */ - def uuid = _uuid + def uuid = self.uuid // ========================================= // ==== INTERNAL IMPLEMENTATION DETAILS ==== // ========================================= - private[akka] def _suspend = _isSuspended = true - - private[akka] def _resume = _isSuspended = false - - private def spawnButDoNotStart[T <: Actor : Manifest]: ActorRef = { - val actor = manifest[T].erasure.asInstanceOf[Class[T]].newInstance - if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) { - actor.dispatcher = dispatcher - } - new ActorRef(() => actor) - } - - protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = { - joinTransaction(message) - - if (_remoteAddress.isDefined) { - val requestBuilder = RemoteProtocol.RemoteRequestProtocol.newBuilder - .setId(RemoteRequestProtocolIdFactory.nextId) - .setTarget(this.getClass.getName) - .setTimeout(this.timeout) - .setUuid(this.uuid) - .setIsActor(true) - .setIsOneWay(true) - .setIsEscaped(false) - - val id = registerSupervisorAsRemoteActor - if (id.isDefined) requestBuilder.setSupervisorUuid(id.get) - - senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol)) - - RemoteProtocolBuilder.setMessage(message, requestBuilder) - RemoteClient.clientFor(_remoteAddress.get).send[Any](requestBuilder.build, None) - } else { - val invocation = new MessageInvocation(self, message, senderOption.map(Left(_)), transactionSet.get) - if (messageDispatcher.usesActorMailbox) { - _mailbox.add(invocation) - if (_isSuspended) invocation.send - } - else invocation.send - } - } - - protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( - message: Any, - timeout: Long, - senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { - joinTransaction(message) - - if (_remoteAddress.isDefined) { - val requestBuilder = RemoteProtocol.RemoteRequestProtocol.newBuilder - .setId(RemoteRequestProtocolIdFactory.nextId) - .setTarget(this.getClass.getName) - .setTimeout(this.timeout) - .setUuid(this.uuid) - .setIsActor(true) - .setIsOneWay(false) - .setIsEscaped(false) - - //senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol)) - RemoteProtocolBuilder.setMessage(message, requestBuilder) - - val id = registerSupervisorAsRemoteActor - if (id.isDefined) requestBuilder.setSupervisorUuid(id.get) - - val future = RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build, senderFuture) - if (future.isDefined) future.get - else throw new IllegalStateException("Expected a future from remote call to actor " + toString) - } else { - val future = if (senderFuture.isDefined) senderFuture.get - else new DefaultCompletableFuture[T](timeout) - val invocation = new MessageInvocation( - self, message, Some(Right(future.asInstanceOf[CompletableFuture[Any]])), transactionSet.get) - if (messageDispatcher.usesActorMailbox) _mailbox.add(invocation) - invocation.send - future - } - } - - private def joinTransaction(message: Any) = if (isTransactionSetInScope) { - // FIXME test to run bench without this trace call - Actor.log.trace("Joining transaction set [%s];\n\tactor %s\n\twith message [%s]", - getTransactionSetInScope, toString, message) - getTransactionSetInScope.incParties - } - - /** - * Callback for the dispatcher. E.g. single entry point to the user code and all protected[this] methods. - */ - private[akka] def invoke(messageHandle: MessageInvocation) = synchronized { - try { - if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle) - else dispatch(messageHandle) - } catch { - case e => - Actor.log.error(e, "Could not invoke actor [%s]", this) - throw e - } - } - - private def dispatch[T](messageHandle: MessageInvocation) = { - setTransactionSet(messageHandle.transactionSet) - - val message = messageHandle.message //serializeMessage(messageHandle.message) - replyTo = messageHandle.replyTo - - try { - if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function - else throw new IllegalArgumentException("No handler matching message [" + message + "] in " + toString) - } catch { - case e => - _isKilled = true - Actor.log.error(e, "Could not invoke actor [%s]", this) - // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client - if (_supervisor.isDefined) _supervisor.get ! Exit(self, e) - replyTo match { - case Some(Right(future)) => future.completeWithException(self, e) - case _ => - } - } finally { - clearTransaction - } - } - - private def transactionalDispatch[T](messageHandle: MessageInvocation) = { - var topLevelTransaction = false - val txSet: Option[CountDownCommitBarrier] = - if (messageHandle.transactionSet.isDefined) messageHandle.transactionSet - else { - topLevelTransaction = true // FIXME create a new internal atomic block that can wait for X seconds if top level tx - if (isTransactor) { - Actor.log.trace("Creating a new transaction set (top-level transaction)\n\tfor actor %s\n\twith message %s", - toString, messageHandle) - Some(createNewTransactionSet) - } else None - } - setTransactionSet(txSet) - - val message = messageHandle.message //serializeMessage(messageHandle.message) - replyTo = messageHandle.replyTo - - def proceed = { - if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function - else throw new IllegalArgumentException( - toString + " could not process message [" + message + "]" + - "\n\tsince no matching 'case' clause in its 'receive' method could be found") - setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit - } - - try { - if (isTransactor) { - atomic { - proceed - } - } else proceed - } catch { - case e: IllegalStateException => {} - case e => - // abort transaction set - if (isTransactionSetInScope) try { - getTransactionSetInScope.abort - } catch { case e: IllegalStateException => {} } - Actor.log.error(e, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message) - - replyTo match { - case Some(Right(future)) => future.completeWithException(self, e) - case _ => - } - - clearTransaction - if (topLevelTransaction) clearTransactionSet - - // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client - if (_supervisor.isDefined) _supervisor.get ! Exit(self, e) - } finally { - clearTransaction - if (topLevelTransaction) clearTransactionSet - } - } - - private def base: PartialFunction[Any, Unit] = lifeCycles orElse (_hotswap getOrElse receive) + private[akka] def base: PartialFunction[Any, Unit] = lifeCycles orElse (_hotswap getOrElse receive) private val lifeCycles: PartialFunction[Any, Unit] = { case HotSwap(code) => _hotswap = code - case Restart(reason) => restart(reason) - case Exit(dead, reason) => handleTrapExit(dead, reason) - case Unlink(child) => unlink(child) - case UnlinkAndStop(child) => unlink(child); child.stop + case Restart(reason) => self.restart(reason) + case Exit(dead, reason) => self.handleTrapExit(dead, reason) + case Unlink(child) => self.unlink(child) + case UnlinkAndStop(child) => self.unlink(child); child.stop case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message") } - - private[this] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = { - if (trapExit.exists(_.isAssignableFrom(reason.getClass))) { - if (faultHandler.isDefined) { - faultHandler.get match { - // FIXME: implement support for maxNrOfRetries and withinTimeRange in RestartStrategy - case AllForOneStrategy(maxNrOfRetries, withinTimeRange) => restartLinkedActors(reason) - case OneForOneStrategy(maxNrOfRetries, withinTimeRange) => dead.actor.restart(reason) - } - } else throw new IllegalStateException( - "No 'faultHandler' defined for an actor with the 'trapExit' member field defined " + - "\n\tto non-empty list of exception classes - can't proceed " + toString) - } else _supervisor.foreach(_ ! Exit(dead, reason)) // if 'trapExit' is not defined then pass the Exit on - } - - private[this] def restartLinkedActors(reason: Throwable) = { - getLinkedActors.toArray.toList.asInstanceOf[List[ActorRef]].foreach { - actorRef => - val actor = actorRef.actor - if (actor.lifeCycle.isEmpty) actor.lifeCycle = Some(LifeCycle(Permanent)) - actor.lifeCycle.get match { - case LifeCycle(scope, _) => { - scope match { - case Permanent => - actor.restart(reason) - case Temporary => - Actor.log.info("Actor [%s] configured as TEMPORARY and will not be restarted.", actor.id) - actor.stop - getLinkedActors.remove(actorRef) // remove the temporary actor - // if last temporary actor is gone, then unlink me from supervisor - if (getLinkedActors.isEmpty) { - Actor.log.info("All linked actors have died permanently (they were all configured as TEMPORARY)" + - "\n\tshutting down and unlinking supervisor actor as well [%s].", - actor.id) - _supervisor.foreach(_ ! UnlinkAndStop(self)) - } - } - } - } - } - } - - private[Actor] def restart(reason: Throwable): Unit = synchronized { - restartLinkedActors(reason) - preRestart(reason) - Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id) - postRestart(reason) - _isKilled = false - } - - private[akka] def registerSupervisorAsRemoteActor: Option[String] = synchronized { - if (_supervisor.isDefined) { - RemoteClient.clientFor(_remoteAddress.get).registerSupervisorForActor(self) - Some(_supervisor.get.uuid) - } else None - } - - protected def getLinkedActors: HashSet[ActorRef] = { - if (_linkedActors.isEmpty) { - val set = new HashSet[ActorRef] - _linkedActors = Some(set) - set - } else _linkedActors.get - } - - private def serializeMessage(message: AnyRef): AnyRef = if (Actor.SERIALIZE_MESSAGES) { - if (!message.isInstanceOf[String] && - !message.isInstanceOf[Byte] && - !message.isInstanceOf[Int] && - !message.isInstanceOf[Long] && - !message.isInstanceOf[Float] && - !message.isInstanceOf[Double] && - !message.isInstanceOf[Boolean] && - !message.isInstanceOf[Char] && - !message.isInstanceOf[Tuple2[_, _]] && - !message.isInstanceOf[Tuple3[_, _, _]] && - !message.isInstanceOf[Tuple4[_, _, _, _]] && - !message.isInstanceOf[Tuple5[_, _, _, _, _]] && - !message.isInstanceOf[Tuple6[_, _, _, _, _, _]] && - !message.isInstanceOf[Tuple7[_, _, _, _, _, _, _]] && - !message.isInstanceOf[Tuple8[_, _, _, _, _, _, _, _]] && - !message.getClass.isArray && - !message.isInstanceOf[List[_]] && - !message.isInstanceOf[scala.collection.immutable.Map[_, _]] && - !message.isInstanceOf[scala.collection.immutable.Set[_]] && - !message.getClass.isAnnotationPresent(Annotations.immutable)) { - Serializer.Java.deepClone(message) - } else message - } else message - - override def hashCode(): Int = HashCode.hash(HashCode.SEED, _uuid) + + override def hashCode(): Int = HashCode.hash(HashCode.SEED, uuid) override def equals(that: Any): Boolean = { that != null && that.isInstanceOf[Actor] && - that.asInstanceOf[Actor]._uuid == _uuid + that.asInstanceOf[Actor].uuid == uuid } override def toString = "Actor[" + id + ":" + uuid + "]" @@ -1364,6 +1440,59 @@ object DispatcherType { * @author Jonas Bonér */ class ActorMessageInvoker private[akka] (val actorRef: ActorRef) extends MessageInvoker { - def invoke(handle: MessageInvocation) = actorRef.actor.invoke(handle) + def invoke(handle: MessageInvocation) = actorRef.invoke(handle) +} + +/** + * Remote Actor proxy. + * + * @author Jonas Bonér + */ +private[akka] class RemoteActorRef private ( + uuid: String, className: String, hostname: String, port: Int, timeOut: Long) extends ActorRef { + val remoteClient = RemoteClient.clientFor(hostname, port) + + override def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = { + val requestBuilder = RemoteRequestProtocol.newBuilder + .setId(RemoteRequestProtocolIdFactory.nextId) + .setTarget(className) + .setTimeout(timeOut) + .setUuid(uuid) + .setIsActor(true) + .setIsOneWay(true) + .setIsEscaped(false) + senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol)) + RemoteProtocolBuilder.setMessage(message, requestBuilder) + remoteClient.send[Any](requestBuilder.build, None) + } + + override def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( + message: Any, + timeout: Long, + senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { + val requestBuilder = RemoteRequestProtocol.newBuilder + .setId(RemoteRequestProtocolIdFactory.nextId) + .setTarget(className) + .setTimeout(timeout) + .setUuid(uuid) + .setIsActor(true) + .setIsOneWay(false) + .setIsEscaped(false) + //senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol)) + RemoteProtocolBuilder.setMessage(message, requestBuilder) + val future = remoteClient.send(requestBuilder.build, senderFuture) + if (future.isDefined) future.get + else throw new IllegalStateException("Expected a future from remote call to actor " + toString) + } +} + +/** + * Remote Actor proxy factory. + * + * @author Jonas Bonér + */ +private[akka] object RemoteActorRef { + def apply(uuid: String, className: String, hostname: String, port: Int, timeout: Long): ActorRef = + (new RemoteActorRef(uuid, className, hostname, port, timeout)).start } diff --git a/akka-core/src/main/scala/actor/ActorRegistry.scala b/akka-core/src/main/scala/actor/ActorRegistry.scala index c52c59b2ae..c72c588937 100644 --- a/akka-core/src/main/scala/actor/ActorRegistry.scala +++ b/akka-core/src/main/scala/actor/ActorRegistry.scala @@ -57,9 +57,9 @@ object ActorRegistry extends Logging { val all = new ListBuffer[ActorRef] val elements = actorsByUUID.elements while (elements.hasMoreElements) { - val actorRef = elements.nextElement - if (manifest.erasure.isAssignableFrom(actorRef.actor.getClass)) { - all += actorRef + val actorId = elements.nextElement + if (manifest.erasure.isAssignableFrom(actorId.actor.getClass)) { + all += actorId } } all.toList @@ -92,24 +92,24 @@ object ActorRegistry extends Logging { /** * Registers an actor in the ActorRegistry. */ - def register(actorRef: ActorRef) = { + def register(actorId: ActorRef) = { // UUID - actorsByUUID.put(actorRef.uuid, actorRef) + actorsByUUID.put(actorId.uuid, actorId) // ID - val id = actorRef.id - if (id eq null) throw new IllegalStateException("Actor.id is null " + actorRef) - if (actorsById.containsKey(id)) actorsById.put(id, actorRef :: actorsById.get(id)) - else actorsById.put(id, actorRef :: Nil) + val id = actorId.id + if (id eq null) throw new IllegalStateException("Actor.id is null " + actorId) + if (actorsById.containsKey(id)) actorsById.put(id, actorId :: actorsById.get(id)) + else actorsById.put(id, actorId :: Nil) // Class name - val className = actorRef.actor.getClass.getName + val className = actorId.actor.getClass.getName if (actorsByClassName.containsKey(className)) { - actorsByClassName.put(className, actorRef :: actorsByClassName.get(className)) - } else actorsByClassName.put(className, actorRef :: Nil) + actorsByClassName.put(className, actorId :: actorsByClassName.get(className)) + } else actorsByClassName.put(className, actorId :: Nil) // notify listeners - foreachListener(_ ! ActorRegistered(actorRef)) + foreachListener(_ ! ActorRegistered(actorId)) } /** diff --git a/akka-core/src/main/scala/actor/Agent.scala b/akka-core/src/main/scala/actor/Agent.scala index 0785288d10..2949c41706 100644 --- a/akka-core/src/main/scala/actor/Agent.scala +++ b/akka-core/src/main/scala/actor/Agent.scala @@ -99,9 +99,11 @@ class AgentException private[akka](message: String) extends RuntimeException(mes * @author Jonas Bonér */ sealed class Agent[T] private (initialValue: T) extends Transactor { - start import Agent._ - log.debug("Starting up Agent [%s]", _uuid) + import Actor._ + _selfOption = Some(newActor(() => this).start) + + log.debug("Starting up Agent [%s]", uuid) private lazy val value = Ref[T]() @@ -138,7 +140,7 @@ sealed class Agent[T] private (initialValue: T) extends Transactor { * method and then waits for its result on a CountDownLatch. */ final def get: T = { - if (isTransactionInScope) throw new AgentException( + if (self.isTransactionInScope) throw new AgentException( "Can't call Agent.get within an enclosing transaction.\n\tWould block indefinitely.\n\tPlease refactor your code.") val ref = new AtomicReference[T] val latch = new CountDownLatch(1) diff --git a/akka-core/src/main/scala/actor/Scheduler.scala b/akka-core/src/main/scala/actor/Scheduler.scala index 2b31bea60c..8bb6bbcd19 100644 --- a/akka-core/src/main/scala/actor/Scheduler.scala +++ b/akka-core/src/main/scala/actor/Scheduler.scala @@ -38,15 +38,16 @@ class ScheduleActor(val receiver: ActorRef, val future: ScheduledFuture[AnyRef]) } object Scheduler extends Actor { + import Actor._ + private var service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory) private val schedulers = new ConcurrentHashMap[ActorRef, ActorRef] faultHandler = Some(OneForOneStrategy(5, 5000)) trapExit = List(classOf[Throwable]) - start def schedule(receiver: ActorRef, message: AnyRef, initialDelay: Long, delay: Long, timeUnit: TimeUnit) = { try { - startLink(new ActorRef(() => new ScheduleActor( + startLink(newActor(() => new ScheduleActor( receiver, service.scheduleAtFixedRate(new java.lang.Runnable { def run = receiver ! message; diff --git a/akka-core/src/main/scala/actor/Supervisor.scala b/akka-core/src/main/scala/actor/Supervisor.scala index c0023ae44b..fc98a57da0 100644 --- a/akka-core/src/main/scala/actor/Supervisor.scala +++ b/akka-core/src/main/scala/actor/Supervisor.scala @@ -42,14 +42,13 @@ import java.util.concurrent.ConcurrentHashMap class SupervisorFactory(val config: SupervisorConfig) extends Logging { type ExceptionList = List[Class[_ <: Throwable]] - def newInstance: Supervisor = newInstanceFor(config) + def newInstance: ActorRef = newInstanceFor(config) - def newInstanceFor(config: SupervisorConfig): Supervisor = config match { + def newInstanceFor(config: SupervisorConfig): ActorRef = config match { case SupervisorConfig(restartStrategy, _) => val supervisor = create(restartStrategy) - supervisor.start supervisor.configure(config, this) - supervisor + Actor.newActor(() => supervisor).start } protected def create(strategy: RestartStrategy): Supervisor = strategy match { @@ -84,7 +83,7 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep faultHandler = Some(handler) // FIXME should Supervisor really havea newThreadBasedDispatcher?? - dispatcher = Dispatchers.newThreadBasedDispatcher(this) + self.dispatcher = Dispatchers.newThreadBasedDispatcher(this) private val actors = new ConcurrentHashMap[String, List[ActorRef]] @@ -96,14 +95,12 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep def isDefined(clazz: Class[_]): Boolean = actors.containsKey(clazz.getName) - override def start: Unit = synchronized { + override def init: Unit = synchronized { ConfiguratorRepository.registerConfigurator(this) - super[Actor].start } - override def stop = synchronized { - super[Actor].stop - getLinkedActors.toArray.toList.asInstanceOf[List[ActorRef]].foreach { actorRef => + override def shutdown: Unit = synchronized { + self.linkedActors.toArray.toList.asInstanceOf[List[ActorRef]].foreach { actorRef => actorRef.stop log.info("Shutting actor down: %s", actorRef) } @@ -131,7 +128,7 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep startLink(actorRef) remoteAddress.foreach(address => RemoteServer.actorsFor( RemoteServer.Address(address.hostname, address.port)) - .actors.put(actorRef.id, actorRef)) + .actors.put(actorRef.id, actorRef)) case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration val supervisor = { @@ -140,14 +137,14 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep instance } supervisor.lifeCycle = Some(LifeCycle(Permanent)) - val className = supervisor.getClass.getName + val className = supervisor.actorClass.getName val currentSupervisors = { val list = actors.get(className) if (list eq null) List[ActorRef]() else list } - actors.put(className, supervisor.self :: currentSupervisors) - link(supervisor.self) + actors.put(className, supervisor :: currentSupervisors) + link(supervisor) }) } } diff --git a/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala b/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala index 7c8da6efc9..c659751a4e 100644 --- a/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala +++ b/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala @@ -24,7 +24,7 @@ import java.lang.reflect.Method */ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfiguratorBase with Logging { private var injector: Injector = _ - private var supervisor: Option[Supervisor] = None + private var supervisor: Option[ActorRef] = None private var restartStrategy: RestartStrategy = _ private var components: List[Component] = _ private var supervised: List[Supervise] = Nil @@ -82,19 +82,19 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat private def newSubclassingProxy(component: Component): DependencyBinding = { val targetClass = component.target - val actor = new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks) - if (component.dispatcher.isDefined) actor.dispatcher = component.dispatcher.get + val actorRef = new ActorRef(() => new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks)) + if (component.dispatcher.isDefined) actorRef.dispatcher = component.dispatcher.get val remoteAddress = if (component.remoteAddress.isDefined) Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port)) else None - val proxy = ActiveObject.newInstance(targetClass, actor, remoteAddress, component.timeout).asInstanceOf[AnyRef] + val proxy = ActiveObject.newInstance(targetClass, actorRef, remoteAddress, component.timeout).asInstanceOf[AnyRef] if (remoteAddress.isDefined) { RemoteServer .actorsFor(RemoteServer.Address(component.remoteAddress.get.hostname, component.remoteAddress.get.port)) .activeObjects.put(targetClass.getName, proxy) } - supervised ::= Supervise(new ActorRef(() => actor), component.lifeCycle) + supervised ::= Supervise(actorRef, component.lifeCycle) activeObjectRegistry.put(targetClass, (proxy, proxy, component)) new DependencyBinding(targetClass, proxy) } @@ -103,20 +103,20 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat val targetClass = component.intf.get val targetInstance = component.target.newInstance.asInstanceOf[AnyRef] // TODO: perhaps need to put in registry component.target.getConstructor(Array[Class[_]](): _*).setAccessible(true) - val actor = new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks) - if (component.dispatcher.isDefined) actor.dispatcher = component.dispatcher.get + val actorRef = new ActorRef(() => new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks)) + if (component.dispatcher.isDefined) actorRef.dispatcher = component.dispatcher.get val remoteAddress = if (component.remoteAddress.isDefined) Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port)) else None val proxy = ActiveObject.newInstance( - targetClass, targetInstance, actor, remoteAddress, component.timeout).asInstanceOf[AnyRef] + targetClass, targetInstance, actorRef, remoteAddress, component.timeout).asInstanceOf[AnyRef] if (remoteAddress.isDefined) { RemoteServer .actorsFor(RemoteServer.Address(component.remoteAddress.get.hostname, component.remoteAddress.get.port)) .activeObjects.put(targetClass.getName, proxy) } - supervised ::= Supervise(new ActorRef(() => actor), component.lifeCycle) + supervised ::= Supervise(actorRef, component.lifeCycle) activeObjectRegistry.put(targetClass, (proxy, targetInstance, component)) new DependencyBinding(targetClass, proxy) } diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 4ea16e47e1..eb3a647710 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -65,8 +65,8 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche var lockAcquiredOnce = false // this do-wile loop is required to prevent missing new messages between the end of the inner while // loop and releasing the lock - val lock = invocation.receiver.actor._dispatcherLock - val mailbox = invocation.receiver.actor._mailbox + val lock = invocation.receiver._dispatcherLock + val mailbox = invocation.receiver._mailbox do { if (lock.tryLock) { lockAcquiredOnce = true diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index 4edf6651c0..f8bf772ccb 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -70,7 +70,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess */ private def tryProcessMailbox(receiver: ActorRef): Boolean = { var lockAcquiredOnce = false - val lock = receiver.actor._dispatcherLock + val lock = receiver._dispatcherLock // this do-wile loop is required to prevent missing new messages between the end of processing // the mailbox and releasing the lock do { diff --git a/akka-core/src/main/scala/dispatch/MessageHandling.scala b/akka-core/src/main/scala/dispatch/MessageHandling.scala index 6c9fc0f842..3ba6e319df 100644 --- a/akka-core/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-core/src/main/scala/dispatch/MessageHandling.scala @@ -19,7 +19,7 @@ final class MessageInvocation(val receiver: ActorRef, val transactionSet: Option[CountDownCommitBarrier]) { if (receiver eq null) throw new IllegalArgumentException("receiver is null") - def invoke = receiver.actor.invoke(this) + def invoke = receiver.invoke(this) def send = receiver.dispatcher.dispatch(this) diff --git a/akka-core/src/main/scala/remote/Cluster.scala b/akka-core/src/main/scala/remote/Cluster.scala index 93c28029e5..a1abe9b6b9 100644 --- a/akka-core/src/main/scala/remote/Cluster.scala +++ b/akka-core/src/main/scala/remote/Cluster.scala @@ -7,7 +7,7 @@ package se.scalablesolutions.akka.remote import se.scalablesolutions.akka.config.Config.config import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.serialization.Serializer -import se.scalablesolutions.akka.actor.{Supervisor, SupervisorFactory, Actor, ActorRegistry} +import se.scalablesolutions.akka.actor.{Supervisor, SupervisorFactory, Actor, ActorRef, ActorRegistry} import se.scalablesolutions.akka.util.Logging import scala.collection.immutable.{Map, HashMap} @@ -76,7 +76,6 @@ trait ClusterActor extends Actor with Cluster { */ private[akka] object ClusterActor { sealed trait ClusterMessage - private[akka] case class RelayedMessage(actorClassFQN: String, msg: AnyRef) extends ClusterMessage private[akka] case class Message[ADDR_T](sender: ADDR_T, msg: Array[Byte]) private[akka] case object PapersPlease extends ClusterMessage @@ -236,32 +235,37 @@ object Cluster extends Cluster with Logging { lazy val DEFAULT_SERIALIZER_CLASS_NAME = Serializer.Java.getClass.getName @volatile private[remote] var clusterActor: Option[ClusterActor] = None + @volatile private[remote] var clusterActorRef: Option[ActorRef] = None - private[remote] def createClusterActor(loader : ClassLoader): Option[ClusterActor] = { + private[remote] def createClusterActor(loader: ClassLoader): Option[ActorRef] = { val name = config.getString("akka.remote.cluster.actor") if (name.isEmpty) throw new IllegalArgumentException( "Can't start cluster since the 'akka.remote.cluster.actor' configuration option is not defined") - val serializer = Class.forName(config.getString("akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME)).newInstance.asInstanceOf[Serializer] + val serializer = Class.forName(config.getString( + "akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME)) + .newInstance.asInstanceOf[Serializer] serializer.classLoader = Some(loader) try { name map { fqn => val a = Class.forName(fqn).newInstance.asInstanceOf[ClusterActor] a setSerializer serializer - a + new ActorRef(() => a) } } catch { - case e => log.error(e, "Couldn't load Cluster provider: [%s]", name.getOrElse("Not specified")); None + case e => + log.error(e, "Couldn't load Cluster provider: [%s]", name.getOrElse("Not specified")) + None } } - private[akka] def createSupervisor(actor: ClusterActor): Option[Supervisor] = { + private[akka] def createSupervisor(actor: ActorRef): Option[ActorRef] = { val sup = SupervisorFactory( SupervisorConfig( RestartStrategy(OneForOne, 5, 1000, List(classOf[Exception])), - Supervise(actor.self, LifeCycle(Permanent)) :: Nil) + Supervise(actor, LifeCycle(Permanent)) :: Nil) ).newInstance Some(sup) } @@ -281,12 +285,15 @@ object Cluster extends Cluster with Logging { def start: Unit = start(None) - def start(serializerClassLoader : Option[ClassLoader]): Unit = synchronized { + def start(serializerClassLoader: Option[ClassLoader]): Unit = synchronized { log.info("Starting up Cluster Service...") if (clusterActor.isEmpty) { - for{ actor <- createClusterActor(serializerClassLoader getOrElse getClass.getClassLoader) - sup <- createSupervisor(actor) } { - clusterActor = Some(actor) + for { + actorRef <- createClusterActor(serializerClassLoader getOrElse getClass.getClassLoader) + sup <- createSupervisor(actorRef) + } { + clusterActorRef = Some(actorRef) + clusterActor = Some(actorRef.actor.asInstanceOf[ClusterActor]) sup.start } } @@ -294,8 +301,8 @@ object Cluster extends Cluster with Logging { def shutdown: Unit = synchronized { log.info("Shutting down Cluster Service...") - for{ - c <- clusterActor + for { + c <- clusterActorRef s <- c._supervisor } s.stop clusterActor = None diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala index 38e068b9a1..49238c1ff6 100644 --- a/akka-core/src/main/scala/remote/RemoteClient.scala +++ b/akka-core/src/main/scala/remote/RemoteClient.scala @@ -5,7 +5,7 @@ package se.scalablesolutions.akka.remote import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequestProtocol, RemoteReplyProtocol} -import se.scalablesolutions.akka.actor.{Exit, Actor, ActorRef} +import se.scalablesolutions.akka.actor.{Exit, Actor, ActorRef, RemoteActorRef} import se.scalablesolutions.akka.dispatch.{DefaultCompletableFuture, CompletableFuture} import se.scalablesolutions.akka.util.{UUID, Logging} import se.scalablesolutions.akka.config.Config.config @@ -43,62 +43,6 @@ case class RemoteClientError(cause: Throwable) extends RemoteClientLifeCycleEven case class RemoteClientDisconnected(host: String, port: Int) extends RemoteClientLifeCycleEvent case class RemoteClientConnected(host: String, port: Int) extends RemoteClientLifeCycleEvent -/** - * Remote Actor proxy factory. - * - * @author Jonas Bonér - */ -private[akka] object RemoteActorProxy { - def apply(uuid: String, className: String, hostname: String, port: Int, timeout: Long): ActorRef = - new ActorRef(() => new RemoteActorProxy(uuid, className, hostname, port, timeout)) -} - -/** - * Remote Actor proxy. - * - * @author Jonas Bonér - */ -private[akka] class RemoteActorProxy private ( - uuid: String, className: String, hostname: String, port: Int, timeOut: Long) extends Actor { - start - val remoteClient = RemoteClient.clientFor(hostname, port) - - override def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = { - val requestBuilder = RemoteRequestProtocol.newBuilder - .setId(RemoteRequestProtocolIdFactory.nextId) - .setTarget(className) - .setTimeout(timeOut) - .setUuid(uuid) - .setIsActor(true) - .setIsOneWay(true) - .setIsEscaped(false) - senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol)) - RemoteProtocolBuilder.setMessage(message, requestBuilder) - remoteClient.send[Any](requestBuilder.build, None) - } - - override def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( - message: Any, - timeout: Long, - senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { - val requestBuilder = RemoteRequestProtocol.newBuilder - .setId(RemoteRequestProtocolIdFactory.nextId) - .setTarget(className) - .setTimeout(timeout) - .setUuid(uuid) - .setIsActor(true) - .setIsOneWay(false) - .setIsEscaped(false) - //senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol)) - RemoteProtocolBuilder.setMessage(message, requestBuilder) - val future = remoteClient.send(requestBuilder.build, senderFuture) - if (future.isDefined) future.get - else throw new IllegalStateException("Expected a future from remote call to actor " + toString) - } - - def receive = {case _ => {}} -} - /** * @author Jonas Bonér */ @@ -121,7 +65,7 @@ object RemoteClient extends Logging { actorFor(className, className, timeout, hostname, port) def actorFor(actorRef: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef = - RemoteActorProxy(actorRef, className, hostname, port, timeout) + RemoteActorRef(actorRef, className, hostname, port, timeout) def clientFor(hostname: String, port: Int): RemoteClient = clientFor(new InetSocketAddress(hostname, port)) diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index 6703d677c0..7fa21fccf6 100644 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -226,7 +226,7 @@ class RemoteServer extends Logging { log.info("Unregistering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, actorRef.id) val server = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)) server.actors.remove(actorRef.id) - if (actorRef.actor._registeredInRemoteNodeDuringSerialization) server.actors.remove(actorRef.uuid) + if (actorRef._registeredInRemoteNodeDuringSerialization) server.actors.remove(actorRef.uuid) } } @@ -241,7 +241,7 @@ class RemoteServer extends Logging { val server = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)) val actorRef = server.actors.get(id) server.actors.remove(id) - if (actorRef.actor._registeredInRemoteNodeDuringSerialization) server.actors.remove(actorRef.uuid) + if (actorRef._registeredInRemoteNodeDuringSerialization) server.actors.remove(actorRef.uuid) } } } @@ -459,17 +459,15 @@ class RemoteServerHandler( */ private def createActor(name: String, uuid: String, timeout: Long): ActorRef = { val actorRefOrNull = actors.get(uuid) - println("----------- ACTOR " + actorRefOrNull + " " + uuid) if (actorRefOrNull eq null) { try { log.info("Creating a new remote actor [%s:%s]", name, uuid) val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name) else Class.forName(name) - val newInstance = clazz.newInstance.asInstanceOf[Actor] - newInstance._uuid = uuid - newInstance.timeout = timeout - newInstance._remoteAddress = None - val actorRef = new ActorRef(() => newInstance) + val actorRef = new ActorRef(() => clazz.newInstance.asInstanceOf[Actor]) + actorRef._uuid = uuid + actorRef.timeout = timeout + actorRef._remoteAddress = None actors.put(uuid, actorRef) actorRef } catch { diff --git a/akka-core/src/main/scala/routing/Patterns.scala b/akka-core/src/main/scala/routing/Patterns.scala index 258847b3fc..a18d82886a 100644 --- a/akka-core/src/main/scala/routing/Patterns.scala +++ b/akka-core/src/main/scala/routing/Patterns.scala @@ -28,25 +28,22 @@ object Patterns { */ def loadBalancerActor(actors: => InfiniteIterator[ActorRef]): ActorRef = newActor(() => new Actor with LoadBalancer { - start val seq = actors - }) + }).start /** Creates a Dispatcher given a routing and a message-transforming function */ def dispatcherActor(routing: PF[Any, ActorRef], msgTransformer: (Any) => Any): ActorRef = newActor(() => new Actor with Dispatcher { - start override def transform(msg: Any) = msgTransformer(msg) def routes = routing - }) + }).start /** Creates a Dispatcher given a routing */ def dispatcherActor(routing: PF[Any, ActorRef]): ActorRef = newActor(() => new Actor with Dispatcher { - start def routes = routing - }) + }).start /** Creates an actor that pipes all incoming messages to * both another actor and through the supplied function diff --git a/akka-core/src/main/scala/routing/Routers.scala b/akka-core/src/main/scala/routing/Routers.scala index dcfa3add15..991820d314 100644 --- a/akka-core/src/main/scala/routing/Routers.scala +++ b/akka-core/src/main/scala/routing/Routers.scala @@ -16,7 +16,7 @@ trait Dispatcher { self: Actor => protected def dispatch: PartialFunction[Any, Unit] = { case a if routes.isDefinedAt(a) => - if (self.replyTo.isDefined) routes(a) forward transform(a) + if (self.self.replyTo.isDefined) routes(a).forward(transform(a))(Some(self.self)) else routes(a).!(transform(a))(None) } diff --git a/akka-core/src/main/scala/stm/DataFlowVariable.scala b/akka-core/src/main/scala/stm/DataFlowVariable.scala index 5b4826550a..d5e3901fa7 100644 --- a/akka-core/src/main/scala/stm/DataFlowVariable.scala +++ b/akka-core/src/main/scala/stm/DataFlowVariable.scala @@ -40,7 +40,7 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture } def thread[A <: AnyRef, R <: AnyRef](body: A => R) = - new ReactiveEventBasedThread(body).start + newActor(() => new ReactiveEventBasedThread(body)).start private class ReactiveEventBasedThread[A <: AnyRef, T <: AnyRef](body: A => T) extends Actor { @@ -65,7 +65,6 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture private class In[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor { timeout = TIME_OUT - start def receive = { case Set(v) => if (dataFlow.value.compareAndSet(None, Some(v.asInstanceOf[T]))) { @@ -80,7 +79,6 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture private class Out[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor { timeout = TIME_OUT - start private var readerFuture: Option[CompletableFuture[T]] = None def receive = { case Get => @@ -88,7 +86,7 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture if (ref.isDefined) reply(ref.get) else { - readerFuture = replyTo match { + readerFuture = self.replyTo match { case Some(Right(future)) => Some(future.asInstanceOf[CompletableFuture[T]]) case _ => None } @@ -98,7 +96,7 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture } } - private[this] val in = newActor(() => new In(this)) + private[this] val in = newActor(() => new In(this)).start def <<(ref: DataFlowVariable[T]) = in ! Set(ref()) @@ -108,7 +106,7 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture val ref = value.get if (ref.isDefined) ref.get else { - val out = newActor(() => new Out(this)) + val out = newActor(() => new Out(this)).start blockedReaders.offer(out) val result = out !! Get out ! Exit