diff --git a/akka-core/src/main/scala/actor/ActiveObject.scala b/akka-core/src/main/scala/actor/ActiveObject.scala index 7a81953403..22c175075d 100644 --- a/akka-core/src/main/scala/actor/ActiveObject.scala +++ b/akka-core/src/main/scala/actor/ActiveObject.scala @@ -77,25 +77,25 @@ object ActiveObject { private[actor] val AW_PROXY_PREFIX = "$$ProxiedByAW".intern def newInstance[T](target: Class[T], timeout: Long): T = - newInstance(target, new Dispatcher(false, None), None, timeout) + newInstance(target, new ActorRef(() => new Dispatcher(false, None)), None, timeout) def newInstance[T](target: Class[T]): T = - newInstance(target, new Dispatcher(false, None), None, Actor.TIMEOUT) + newInstance(target, new ActorRef(() => new Dispatcher(false, None)), None, Actor.TIMEOUT) def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long): T = - newInstance(intf, target, new Dispatcher(false, None), None, timeout) + newInstance(intf, target, new ActorRef(() => new Dispatcher(false, None)), None, timeout) def newInstance[T](intf: Class[T], target: AnyRef): T = - newInstance(intf, target, new Dispatcher(false, None), None, Actor.TIMEOUT) + newInstance(intf, target, new ActorRef(() => new Dispatcher(false, None)), None, Actor.TIMEOUT) def newRemoteInstance[T](target: Class[T], timeout: Long, hostname: String, port: Int): T = - newInstance(target, new Dispatcher(false, None), Some(new InetSocketAddress(hostname, port)), timeout) + newInstance(target, new ActorRef(() => new Dispatcher(false, None)), Some(new InetSocketAddress(hostname, port)), timeout) def newRemoteInstance[T](target: Class[T], hostname: String, port: Int): T = - newInstance(target, new Dispatcher(false, None), Some(new InetSocketAddress(hostname, port)), Actor.TIMEOUT) + newInstance(target, new ActorRef(() => new Dispatcher(false, None)), Some(new InetSocketAddress(hostname, port)), Actor.TIMEOUT) def newInstance[T](target: Class[T], config: ActiveObjectConfiguration): T = { - val actor = new Dispatcher(config._transactionRequired, config._restartCallbacks) + val actor = new ActorRef(() => new Dispatcher(config._transactionRequired, config._restartCallbacks)) if (config._messageDispatcher.isDefined) { actor.messageDispatcher = config._messageDispatcher.get } @@ -103,199 +103,28 @@ object ActiveObject { } def newInstance[T](intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration): T = { - val actor = new Dispatcher(config._transactionRequired, config._restartCallbacks) + val actor = new ActorRef(() => new Dispatcher(config._transactionRequired, config._restartCallbacks)) if (config._messageDispatcher.isDefined) { actor.messageDispatcher = config._messageDispatcher.get } newInstance(intf, target, actor, config._host, config._timeout) } - @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") - def newInstance[T](target: Class[T], timeout: Long, restartCallbacks: Option[RestartCallbacks]): T = - newInstance(target, new Dispatcher(false, restartCallbacks), None, timeout) - - @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") - def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, restartCallbacks: Option[RestartCallbacks]): T = - newInstance(intf, target, new Dispatcher(false, restartCallbacks), None, timeout) - - @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") - def newInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean): T = - newInstance(target, new Dispatcher(transactionRequired, None), None, timeout) - - @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") - def newInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, restartCallbacks: Option[RestartCallbacks]): T = - newInstance(target, new Dispatcher(transactionRequired, restartCallbacks), None, timeout) - - @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") - def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean): T = - newInstance(intf, target, new Dispatcher(transactionRequired, None), None, timeout) - - @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") - def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, restartCallbacks: Option[RestartCallbacks]): T = - newInstance(intf, target, new Dispatcher(transactionRequired, restartCallbacks), None, timeout) - - @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") - def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, hostname: String, port: Int): T = - newInstance(intf, target, new Dispatcher(false, None), Some(new InetSocketAddress(hostname, port)), timeout) - - @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") - def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = - newInstance(intf, target, new Dispatcher(false, restartCallbacks), Some(new InetSocketAddress(hostname, port)), timeout) - - @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") - def newRemoteInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, hostname: String, port: Int): T = - newInstance(target, new Dispatcher(transactionRequired, None), Some(new InetSocketAddress(hostname, port)), timeout) - - @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") - def newRemoteInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = - newInstance(target, new Dispatcher(transactionRequired, restartCallbacks), Some(new InetSocketAddress(hostname, port)), timeout) - - @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") - def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, hostname: String, port: Int): T = - newInstance(intf, target, new Dispatcher(transactionRequired, None), Some(new InetSocketAddress(hostname, port)), timeout) - - @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") - def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = - newInstance(intf, target, new Dispatcher(transactionRequired, restartCallbacks), Some(new InetSocketAddress(hostname, port)), timeout) - - @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") - def newInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher): T = { - val actor = new Dispatcher(false, None) - actor.messageDispatcher = dispatcher - newInstance(target, actor, None, timeout) - } - - @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") - def newInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = { - val actor = new Dispatcher(false, restartCallbacks) - actor.messageDispatcher = dispatcher - newInstance(target, actor, None, timeout) - } - - @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") - def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher): T = { - val actor = new Dispatcher(false, None) - actor.messageDispatcher = dispatcher - newInstance(intf, target, actor, None, timeout) - } - - @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") - def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, - dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = { - val actor = new Dispatcher(false, restartCallbacks) - actor.messageDispatcher = dispatcher - newInstance(intf, target, actor, None, timeout) - } - - @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") - def newInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher): T = { - val actor = new Dispatcher(transactionRequired, None) - actor.messageDispatcher = dispatcher - newInstance(target, actor, None, timeout) - } - - @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") - def newInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, - dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = { - val actor = new Dispatcher(transactionRequired, restartCallbacks) - actor.messageDispatcher = dispatcher - newInstance(target, actor, None, timeout) - } - - @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") - def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher): T = { - val actor = new Dispatcher(transactionRequired, None) - actor.messageDispatcher = dispatcher - newInstance(intf, target, actor, None, timeout) - } - - @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") - def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, - dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = { - val actor = new Dispatcher(transactionRequired, restartCallbacks) - actor.messageDispatcher = dispatcher - newInstance(intf, target, actor, None, timeout) - } - - @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") - def newRemoteInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int): T = { - val actor = new Dispatcher(false, None) - actor.messageDispatcher = dispatcher - newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout) - } - - @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") - def newRemoteInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, - hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = { - val actor = new Dispatcher(false, restartCallbacks) - actor.messageDispatcher = dispatcher - newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout) - } - - @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") - def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int): T = { - val actor = new Dispatcher(false, None) - actor.messageDispatcher = dispatcher - newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout) - } - - @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") - def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, - hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = { - val actor = new Dispatcher(false, restartCallbacks) - actor.messageDispatcher = dispatcher - newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout) - } - - @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") - def newRemoteInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, - dispatcher: MessageDispatcher, hostname: String, port: Int): T = { - val actor = new Dispatcher(transactionRequired, None) - actor.messageDispatcher = dispatcher - newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout) - } - - @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") - def newRemoteInstance[T](target: Class[T], timeout: Long, transactionRequired: Boolean, dispatcher: MessageDispatcher, - hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = { - val actor = new Dispatcher(transactionRequired, restartCallbacks) - actor.messageDispatcher = dispatcher - newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout) - } - - @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") - def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, - dispatcher: MessageDispatcher, hostname: String, port: Int): T = { - val actor = new Dispatcher(transactionRequired, None) - actor.messageDispatcher = dispatcher - newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout) - } - - @deprecated("use newInstance(intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration) instead") - def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, transactionRequired: Boolean, - dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = { - val actor = new Dispatcher(transactionRequired, restartCallbacks) - actor.messageDispatcher = dispatcher - newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout) - } - - private[akka] def newInstance[T](target: Class[T], actor: Dispatcher, remoteAddress: Option[InetSocketAddress], timeout: Long): T = { + private[akka] def newInstance[T](target: Class[T], actorRef: ActorRef, remoteAddress: Option[InetSocketAddress], timeout: Long): T = { val proxy = Proxy.newInstance(target, false, false) - actor.initialize(target, proxy) - actor.timeout = timeout - if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get) - val actorRef = new ActorRef(() => actor) + actorRef.actor.asInstanceOf[Dispatcher].initialize(target, proxy) + actorRef.actor.timeout = timeout + if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get) AspectInitRegistry.register(proxy, AspectInit(target, actorRef, remoteAddress, timeout)) actorRef.start proxy.asInstanceOf[T] } - private[akka] def newInstance[T](intf: Class[T], target: AnyRef, actor: Dispatcher, remoteAddress: Option[InetSocketAddress], timeout: Long): T = { + private[akka] def newInstance[T](intf: Class[T], target: AnyRef, actorRef: ActorRef, remoteAddress: Option[InetSocketAddress], timeout: Long): T = { val proxy = Proxy.newInstance(Array(intf), Array(target), false, false) - actor.initialize(target.getClass, target) - actor.timeout = timeout - if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get) - val actorRef = new ActorRef(() => actor) + actorRef.actor.asInstanceOf[Dispatcher].initialize(target.getClass, target) + actorRef.actor.timeout = timeout + if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get) AspectInitRegistry.register(proxy, AspectInit(intf, actorRef, remoteAddress, timeout)) actorRef.start proxy.asInstanceOf[T] @@ -313,8 +142,10 @@ object ActiveObject { * @param supervised the active object to link */ def link(supervisor: AnyRef, supervised: AnyRef) = { - val supervisorActor = actorFor(supervisor).getOrElse(throw new IllegalStateException("Can't link when the supervisor is not an active object")) - val supervisedActor = actorFor(supervised).getOrElse(throw new IllegalStateException("Can't link when the supervised is not an active object")) + val supervisorActor = actorFor(supervisor).getOrElse( + throw new IllegalStateException("Can't link when the supervisor is not an active object")) + val supervisedActor = actorFor(supervised).getOrElse( + throw new IllegalStateException("Can't link when the supervised is not an active object")) supervisorActor !! Link(supervisedActor) } @@ -326,8 +157,10 @@ object ActiveObject { * @param trapExceptions array of exceptions that should be handled by the supervisor */ def link(supervisor: AnyRef, supervised: AnyRef, handler: FaultHandlingStrategy, trapExceptions: Array[Class[_ <: Throwable]]) = { - val supervisorActor = actorFor(supervisor).getOrElse(throw new IllegalStateException("Can't link when the supervisor is not an active object")) - val supervisedActor = actorFor(supervised).getOrElse(throw new IllegalStateException("Can't link when the supervised is not an active object")) + val supervisorActor = actorFor(supervisor).getOrElse( + throw new IllegalStateException("Can't link when the supervisor is not an active object")) + val supervisedActor = actorFor(supervised).getOrElse( + throw new IllegalStateException("Can't link when the supervised is not an active object")) supervisorActor.trapExit = trapExceptions.toList supervisorActor.faultHandler = Some(handler) supervisorActor !! Link(supervisedActor) @@ -339,8 +172,10 @@ object ActiveObject { * @param supervised the active object to unlink */ def unlink(supervisor: AnyRef, supervised: AnyRef) = { - val supervisorActor = actorFor(supervisor).getOrElse(throw new IllegalStateException("Can't unlink when the supervisor is not an active object")) - val supervisedActor = actorFor(supervised).getOrElse(throw new IllegalStateException("Can't unlink when the supervised is not an active object")) + val supervisorActor = actorFor(supervisor).getOrElse( + throw new IllegalStateException("Can't unlink when the supervisor is not an active object")) + val supervisedActor = actorFor(supervised).getOrElse( + throw new IllegalStateException("Can't unlink when the supervised is not an active object")) supervisorActor !! Unlink(supervisedActor) } @@ -350,7 +185,8 @@ object ActiveObject { * @param trapExceptions array of exceptions that should be handled by the supervisor */ def trapExit(supervisor: AnyRef, trapExceptions: Array[Class[_ <: Throwable]]) = { - val supervisorActor = actorFor(supervisor).getOrElse(throw new IllegalStateException("Can't set trap exceptions when the supervisor is not an active object")) + val supervisorActor = actorFor(supervisor).getOrElse( + throw new IllegalStateException("Can't set trap exceptions when the supervisor is not an active object")) supervisorActor.trapExit = trapExceptions.toList this } @@ -361,17 +197,14 @@ object ActiveObject { * @param handler fault handling strategy */ def faultHandler(supervisor: AnyRef, handler: FaultHandlingStrategy) = { - val supervisorActor = actorFor(supervisor).getOrElse(throw new IllegalStateException("Can't set fault handler when the supervisor is not an active object")) + val supervisorActor = actorFor(supervisor).getOrElse( + throw new IllegalStateException("Can't set fault handler when the supervisor is not an active object")) supervisorActor.faultHandler = Some(handler) this } - private[akka] def supervise(restartStrategy: RestartStrategy, components: List[Supervise]): Supervisor = { - val factory = SupervisorFactory(SupervisorConfig(restartStrategy, components)) - val supervisor = factory.newInstance - supervisor.start - supervisor - } + private[akka] def supervise(restartStrategy: RestartStrategy, components: List[Supervise]): ActorRef = + SupervisorFactory(SupervisorConfig(restartStrategy, components)).newInstance.start } private[akka] object AspectInitRegistry { @@ -453,7 +286,7 @@ private[akka] sealed class ActiveObjectAspect { .setIsOneWay(oneWay_?) .setIsEscaped(false) RemoteProtocolBuilder.setMessage(message, requestBuilder) - val id = actorRef.actor.registerSupervisorAsRemoteActor + val id = actorRef.registerSupervisorAsRemoteActor if (id.isDefined) requestBuilder.setSupervisorUuid(id.get) val remoteMessage = requestBuilder.build val future = RemoteClient.clientFor(remoteAddress.get).send(remoteMessage, None) @@ -555,11 +388,13 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op preRestart = Some(try { targetInstance.getClass.getDeclaredMethod(pre, ZERO_ITEM_CLASS_ARRAY: _*) } catch { case e => throw new IllegalStateException( - "Could not find pre restart method [" + pre + "] \nin [" + targetClass.getName + "]. \nIt must have a zero argument definition.") }) + "Could not find pre restart method [" + pre + "] \nin [" + + targetClass.getName + "]. \nIt must have a zero argument definition.") }) postRestart = Some(try { targetInstance.getClass.getDeclaredMethod(post, ZERO_ITEM_CLASS_ARRAY: _*) } catch { case e => throw new IllegalStateException( - "Could not find post restart method [" + post + "] \nin [" + targetClass.getName + "]. \nIt must have a zero argument definition.") }) + "Could not find post restart method [" + post + "] \nin [" + + targetClass.getName + "]. \nIt must have a zero argument definition.") }) } // See if we have any annotation defined restart callbacks @@ -568,17 +403,20 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op if (preRestart.isDefined && preRestart.get.getParameterTypes.length != 0) throw new IllegalStateException( - "Method annotated with @prerestart or defined as a restart callback in \n[" + targetClass.getName + "] must have a zero argument definition") + "Method annotated with @prerestart or defined as a restart callback in \n[" + + targetClass.getName + "] must have a zero argument definition") if (postRestart.isDefined && postRestart.get.getParameterTypes.length != 0) throw new IllegalStateException( - "Method annotated with @postrestart or defined as a restart callback in \n[" + targetClass.getName + "] must have a zero argument definition") + "Method annotated with @postrestart or defined as a restart callback in \n[" + + targetClass.getName + "] must have a zero argument definition") if (preRestart.isDefined) preRestart.get.setAccessible(true) if (postRestart.isDefined) postRestart.get.setAccessible(true) // see if we have a method annotated with @inittransactionalstate, if so invoke it initTxState = methods.find(m => m.isAnnotationPresent(Annotations.inittransactionalstate)) - if (initTxState.isDefined && initTxState.get.getParameterTypes.length != 0) throw new IllegalStateException("Method annotated with @inittransactionalstate must have a zero argument definition") + if (initTxState.isDefined && initTxState.get.getParameterTypes.length != 0) + throw new IllegalStateException("Method annotated with @inittransactionalstate must have a zero argument definition") if (initTxState.isDefined) initTxState.get.setAccessible(true) } @@ -594,19 +432,19 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op throw new IllegalStateException("Unexpected message [" + unexpected + "] sent to [" + this + "]") } - override protected def preRestart(reason: Throwable) { + override def preRestart(reason: Throwable) { try { if (preRestart.isDefined) preRestart.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*) } catch { case e: InvocationTargetException => throw e.getCause } } - override protected def postRestart(reason: Throwable) { + override def postRestart(reason: Throwable) { try { if (postRestart.isDefined) postRestart.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*) } catch { case e: InvocationTargetException => throw e.getCause } } - override protected def initTransactionalState = { + override def initTransactionalState = { try { if (initTxState.isDefined && target.isDefined) initTxState.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*) } catch { case e: InvocationTargetException => throw e.getCause } diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 76fcfa0f15..df3b237029 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -11,19 +11,19 @@ import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.stm.Transaction.Global._ import se.scalablesolutions.akka.stm.TransactionManagement._ import se.scalablesolutions.akka.stm.TransactionManagement +import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequestProtocol, RemoteReplyProtocol, ActorRefProtocol} +import se.scalablesolutions.akka.remote.{RemoteNode, RemoteServer, RemoteClient, RemoteProtocolBuilder, RemoteRequestProtocolIdFactory} import se.scalablesolutions.akka.serialization.Serializer import se.scalablesolutions.akka.util.{HashCode, Logging, UUID} -import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol -import se.scalablesolutions.akka.remote.{RemoteNode, RemoteServer, RemoteClient, RemoteActorProxy, RemoteProtocolBuilder, RemoteRequestProtocolIdFactory} import org.multiverse.api.ThreadLocalTransaction._ import org.multiverse.commitbarriers.CountDownCommitBarrier import jsr166x.{Deque, ConcurrentLinkedDeque} -import java.util.HashSet import java.net.InetSocketAddress import java.util.concurrent.locks.{Lock, ReentrantLock} +import java.util.{HashSet => JHashSet} /** * Implements the Transactor abstraction. E.g. a transactional actor. @@ -122,11 +122,10 @@ object Actor extends Logging { * */ def actor(body: PartialFunction[Any, Unit]): ActorRef = - new ActorRef(() => new Actor() { + newActor(() => new Actor() { lifeCycle = Some(LifeCycle(Permanent)) - start def receive: PartialFunction[Any, Unit] = body - }) + }).start /** * Use to create an anonymous transactional event-driven actor. @@ -145,11 +144,10 @@ object Actor extends Logging { * */ def transactor(body: PartialFunction[Any, Unit]): ActorRef = - new ActorRef(() => new Transactor() { + newActor(() => new Transactor() { lifeCycle = Some(LifeCycle(Permanent)) - start def receive: PartialFunction[Any, Unit] = body - }) + }).start /** * Use to create an anonymous event-driven actor with a 'temporary' life-cycle configuration, @@ -166,11 +164,10 @@ object Actor extends Logging { * */ def temporaryActor(body: PartialFunction[Any, Unit]): ActorRef = - new ActorRef(() => new Actor() { + newActor(() => new Actor() { lifeCycle = Some(LifeCycle(Temporary)) - start def receive = body - }) + }).start /** * Use to create an anonymous event-driven actor with both an init block and a message loop block. @@ -192,12 +189,11 @@ object Actor extends Logging { def init[A](body: => Unit) = { def handler[A](body: => Unit) = new { def receive(handler: PartialFunction[Any, Unit]) = - new ActorRef(() => new Actor() { + newActor(() => new Actor() { lifeCycle = Some(LifeCycle(Permanent)) - start body def receive = handler - }) + }).start } handler(body) } @@ -219,50 +215,45 @@ object Actor extends Logging { */ def spawn(body: => Unit): Unit = { case object Spawn - new Actor() { - start + newActor(() => new Actor() { self ! Spawn def receive = { case Spawn => body; stop } - } - } - - /** - * Starts the specified actor and returns it, useful for simplifying code such as: - *
- * val actor = new FooActor - * actor.start - *- * can be replaced with: - *
- * import Actor._ - * - * val actor = start(new FooActor) - *- */ - def start[T <: Actor](actor : T) : T = { - actor.start - actor + }).start } } /** - * The ActorRef object can be used to create ActorRef instances out of its binary - * protobuf based representation. + * The ActorRef object can be used to deserialize ActorRef instances from of its binary representation + * or its Protocol Buffers (protobuf) Message representation to a new ActorRef instance. + * + * Binary -> ActorRef: *
* val actorRef = ActorRef.fromBinary(bytes) * actorRef ! message // send message to remote actor through its reference *- * + * + * Protobuf Message -> ActorRef: + *
+ * val actorRef = ActorRef.fromProtocol(protobufMessage) + * actorRef ! message // send message to remote actor through its reference + ** @author Jonas Bonér */ object ActorRef { - def fromBinary(bytes: Array[Byte]): ActorRef = - fromProtocol(RemoteProtocol.ActorRefProtocol.newBuilder.mergeFrom(bytes).build) - def fromProtocol(protocol: RemoteProtocol.ActorRefProtocol): ActorRef = - RemoteActorProxy( + /** + * Deserializes the ActorRef instance from a byte array (Array[Byte]) into an ActorRef instance. + */ + def fromBinary(bytes: Array[Byte]): ActorRef = + fromProtocol(ActorRefProtocol.newBuilder.mergeFrom(bytes).build) + + /** + * Deserializes the ActorRef instance from a Protocol Buffers (protobuf) Message into an ActorRef instance. + */ + def fromProtocol(protocol: ActorRefProtocol): ActorRef = + RemoteActorRef( protocol.getUuid, protocol.getActorClassName, protocol.getSourceHostname, @@ -296,29 +287,66 @@ object ActorRef { * * @author Jonas Bonér */ -final class ActorRef private[akka] () { - private[akka] var actorFactory: Either[Option[Class[_ <: Actor]], Option[() => Actor]] = Left(None) - - private[akka] lazy val actor: Actor = { - val actor = actorFactory match { - case Left(Some(clazz)) => - try { - clazz.newInstance - } catch { - case e: InstantiationException => throw new ActorInitializationException( - "Could not instantiate Actor due to:\n" + e + - "\nMake sure Actor is defined inside a class/trait," + - "\nif so put it outside the class/trait, f.e. in a companion object.") - } - case Right(Some(factory)) => - factory() - case _ => - throw new ActorInitializationException("Can't create Actor, no Actor class or factory function in scope") - } - if (actor eq null) throw new ActorInitializationException("Actor instance passed to ActorRef can not be 'null'") - actor - } +sealed class ActorRef private[akka] () extends TransactionManagement { + import java.util.concurrent.atomic.AtomicReference + // Only mutable for RemoteServer in order to maintain identity across nodes + private[akka] var _uuid = UUID.newUuid.toString + + @volatile private[this] var _isRunning = false + @volatile private[this] var _isSuspended = true + @volatile private[this] var _isShutDown = false + @volatile private[akka] var _isKilled = false + @volatile private[akka] var _registeredInRemoteNodeDuringSerialization = false + private[akka] var _remoteAddress: Option[InetSocketAddress] = None + private[akka] var _linkedActors: Option[JHashSet[ActorRef]] = None + private[akka] var _supervisor: Option[ActorRef] = None + private[akka] var _replyToAddress: Option[InetSocketAddress] = None + private[akka] val _mailbox: Deque[MessageInvocation] = new ConcurrentLinkedDeque[MessageInvocation] + + /** + * User overridable callback/setting. + * + * Set to true if messages should have REQUIRES_NEW semantics, e.g. a new transaction should + * start if there is no one running, else it joins the existing transaction. + */ + @volatile private[akka] var isTransactor = false + + /** + * This lock ensures thread safety in the dispatching: only one message can + * be dispatched at once on the actor. + */ + private[akka] val _dispatcherLock: Lock = new ReentrantLock + + /** + * User overridable callback/setting. + * + * The default dispatcher is the Dispatchers.globalExecutorBasedEventDrivenDispatcher. + * This means that all actors will share the same event-driven executor based dispatcher. + * + * You can override it so it fits the specific use-case that the actor is used for. + * See the se.scalablesolutions.akka.dispatch.Dispatchers class for the different + * dispatchers available. + * + * The default is also that all actors that are created and spawned from within this actor + * is sharing the same dispatcher as its creator. + */ + private[akka] var messageDispatcher: MessageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher + + /** + * Holds the reference to the sender of the currently processed message. + * - Is None if no sender was specified + * - Is Some(Left(Actor)) if sender is an actor + * - Is Some(Right(CompletableFuture)) if sender is holding on to a Future for the result + */ + private[akka] var replyTo: Option[Either[ActorRef, CompletableFuture[Any]]] = None + + private[this] var actorFactory: Either[Option[Class[_ <: Actor]], Option[() => Actor]] = Left(None) + + private[this] lazy val actorInstance: AtomicReference[Actor] = new AtomicReference[Actor](newActor) + + private[akka] def actor: Actor = actorInstance.get + private[akka] def this(clazz: Class[_ <: Actor]) = { this() actorFactory = Left(Some(clazz)) @@ -327,69 +355,7 @@ final class ActorRef private[akka] () { private[akka] def this(factory: () => Actor) = { this() actorFactory = Right(Some(factory)) - } - - def toProtocol: RemoteProtocol.ActorRefProtocol = { - val (host, port) = actor._replyToAddress.map(address => - (address.getHostName, address.getPort)) - .getOrElse((Actor.HOSTNAME, Actor.PORT)) - - if (!actor._registeredInRemoteNodeDuringSerialization) { - Actor.log.debug("Register serialized Actor [%s] as remote @ [%s:%s]", actorClass.getName, host, port) - if (RemoteServer.serverFor(host, port).isEmpty) (new RemoteServer).start(host, port) - RemoteServer.actorsFor(RemoteServer.Address(host, port)).actors.put(uuid, this) - actor._registeredInRemoteNodeDuringSerialization = true - } - - RemoteProtocol.ActorRefProtocol.newBuilder - .setUuid(uuid) - .setActorClassName(actorClass.getName) - .setSourceHostname(host) - .setSourcePort(port) - .setTimeout(timeout) - .build - } - - def toBinary: Array[Byte] = toProtocol.toByteArray - - /** - * Returns the class for the Actor instance that is managed by the ActorRef. - */ - def actorClass: Class[_ <: Actor] = actor.getClass.asInstanceOf[Class[_ <: Actor]] - - /** - * Starts up the actor and its message queue. - */ - def start: ActorRef = { - actor.start - this - } - - /** - * Shuts down the actor its dispatcher and message queue. - * Alias for 'stop'. - */ - protected def exit: Unit = actor.stop - - /** - * Shuts down the actor its dispatcher and message queue. - */ - def stop: Unit = actor.stop - - /** - * Is the actor running? - */ - def isRunning: Boolean = actor.isRunning - - /** - * Returns the mailbox size. - */ - def mailboxSize: Int = actor.mailboxSize - - /** - * Returns the supervisor, if there is one. - */ - def supervisor: Option[ActorRef] = actor.supervisor + } /** * Sends a one-way asynchronous message. E.g. fire-and-forget semantics. @@ -406,8 +372,8 @@ final class ActorRef private[akka] () { * */ def !(message: Any)(implicit sender: Option[ActorRef] = None) = { - if (actor.isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") - if (actor.isRunning) actor.postMessageToMailbox(message, sender) + if (isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") + if (isRunning) postMessageToMailbox(message, sender) else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") } @@ -424,9 +390,9 @@ final class ActorRef private[akka] () { * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ def !: Option[T] = { - if (actor.isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") - if (actor.isRunning) { - val future = actor.postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, None) + if (isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") + if (isRunning) { + val future = postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, None) val isActiveObject = message.isInstanceOf[Invocation] if (isActiveObject && message.asInstanceOf[Invocation].isVoid) future.asInstanceOf[CompletableFuture[Option[_]]].completeWithResult(None) @@ -459,7 +425,7 @@ final class ActorRef private[akka] () { * If you are sending messages using
!! then you have to use reply(..)
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
- def !(implicit sender: Option[ActorRef] = None): Option[T] = !
+ def !(implicit sender: Option[ActorRef] = None): Option[T] = !
/**
* Sends a message asynchronously returns a future holding the eventual reply message.
@@ -471,10 +437,9 @@ final class ActorRef private[akka] () {
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
def !!: Future[T] = {
- if (actor.isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages")
- if (actor.isRunning) {
- actor.postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, actor.timeout, None)
- } else throw new IllegalStateException(
+ if (isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages")
+ if (isRunning) postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, None)
+ else throw new IllegalStateException(
"Actor has not been started, you need to invoke 'actor.start' before using it")
}
@@ -484,44 +449,84 @@ final class ActorRef private[akka] () {
* Works with '!', '!!' and '!!!'.
*/
def forward(message: Any)(implicit sender: Some[ActorRef]) = {
- if (actor.isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages")
- if (actor.isRunning) {
- sender.get.actor.replyTo match {
- case Some(Left(actorRef)) => actor.postMessageToMailbox(message, Some(actorRef))
- case Some(Right(future)) => actor.postMessageToMailboxAndCreateFutureResultWithTimeout(message, actor.timeout, Some(future))
- case _ => throw new IllegalStateException("Can't forward message when initial sender is not an actor")
+ if (isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages")
+ if (isRunning) {
+ sender.get.replyTo match {
+ case Some(Left(actorRef)) => postMessageToMailbox(message, Some(actorRef))
+ case Some(Right(future)) => postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, Some(future))
+ case _ => throw new IllegalStateException("Can't forward message when initial sender is not an actor")
}
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
}
/**
- * Get the dispatcher for this actor.
+ * Serializes the ActorRef instance into a Protocol Buffers (protobuf) Message.
*/
- def dispatcher: MessageDispatcher = actor.messageDispatcher
+ def toProtocol: ActorRefProtocol = {
+ val (host, port) = _replyToAddress.map(address =>
+ (address.getHostName, address.getPort))
+ .getOrElse((Actor.HOSTNAME, Actor.PORT))
+ if (!_registeredInRemoteNodeDuringSerialization) {
+ Actor.log.debug("Register serialized Actor [%s] as remote @ [%s:%s]", actorClass.getName, host, port)
+ if (RemoteServer.serverFor(host, port).isEmpty) (new RemoteServer).start(host, port)
+ RemoteServer.actorsFor(RemoteServer.Address(host, port)).actors.put(uuid, this)
+ _registeredInRemoteNodeDuringSerialization = true
+ }
+
+ ActorRefProtocol.newBuilder
+ .setUuid(uuid)
+ .setActorClassName(actorClass.getName)
+ .setSourceHostname(host)
+ .setSourcePort(port)
+ .setTimeout(timeout)
+ .build
+ }
+
+ /**
+ * Serializes the ActorRef instance into a byte array (Array[Byte]).
+ */
+ def toBinary: Array[Byte] = toProtocol.toByteArray
+
+ /**
+ * Returns the class for the Actor instance that is managed by the ActorRef.
+ */
+ def actorClass: Class[_ <: Actor] = actor.getClass.asInstanceOf[Class[_ <: Actor]]
+
/**
* Sets the dispatcher for this actor. Needs to be invoked before the actor is started.
*/
- def dispatcher_=(md: MessageDispatcher): Unit = actor.dispatcher = md
+ def dispatcher_=(md: MessageDispatcher): Unit = synchronized {
+ if (!isRunning) {
+ messageDispatcher.unregister(this)
+ messageDispatcher = md
+ messageDispatcher.register(this)
+ } else throw new IllegalArgumentException(
+ "Can not swap dispatcher for " + toString + " after it has been started")
+ }
+
+ /**
+ * Get the dispatcher for this actor.
+ */
+ def dispatcher: MessageDispatcher = messageDispatcher
/**
* Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
*/
def makeRemote(hostname: String, port: Int): Unit =
- if (actor.isRunning) throw new IllegalStateException("Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.")
- else actor.makeRemote(new InetSocketAddress(hostname, port))
+ if (isRunning) throw new IllegalStateException("Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.")
+ else makeRemote(new InetSocketAddress(hostname, port))
/**
* Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
*/
- def makeRemote(address: InetSocketAddress): Unit = actor.makeRemote(address)
-
- /**
- * Set the contact address for this actor. This is used for replying to messages sent asynchronously when no reply channel exists.
- */
- def setReplyToAddress(hostname: String, port: Int): Unit = actor.setReplyToAddress(new InetSocketAddress(hostname, port))
-
- def setReplyToAddress(address: InetSocketAddress): Unit = actor.setReplyToAddress(address)
+ def makeRemote(address: InetSocketAddress): Unit =
+ if (isRunning) throw new IllegalStateException("Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.")
+ else {
+ _remoteAddress = Some(address)
+ RemoteClient.register(address.getHostName, address.getPort, uuid)
+ if (_replyToAddress.isEmpty) setReplyToAddress(Actor.HOSTNAME, Actor.PORT)
+ }
/**
* Invoking 'makeTransactionRequired' means that the actor will **start** a new transaction if non exists.
@@ -531,7 +536,21 @@ final class ActorRef private[akka] () {
* TransactionManagement.disableTransactions
*
*/
- def makeTransactionRequired = actor.makeTransactionRequired
+ def makeTransactionRequired = actor.synchronized {
+ if (isRunning) throw new IllegalArgumentException(
+ "Can not make actor transaction required after it has been started")
+ else isTransactor = true
+ }
+
+ /**
+ * Set the contact address for this actor. This is used for replying to messages sent asynchronously when no reply channel exists.
+ */
+ def setReplyToAddress(hostname: String, port: Int): Unit = setReplyToAddress(new InetSocketAddress(hostname, port))
+
+ /**
+ * Set the contact address for this actor. This is used for replying to messages sent asynchronously when no reply channel exists.
+ */
+ def setReplyToAddress(address: InetSocketAddress): Unit = _replyToAddress = Some(address)
/**
* Returns the id for the actor.
@@ -541,23 +560,209 @@ final class ActorRef private[akka] () {
/**
* Returns the uuid for the actor.
*/
- def uuid = actor.uuid
+ def uuid = _uuid
/**
* Returns the remote address for the actor, if any, else None.
*/
- def remoteAddress: Option[InetSocketAddress] = actor._remoteAddress
+ def remoteAddress: Option[InetSocketAddress] = _remoteAddress
/**
- * Returns the default timeout for the actor.
+ * User overridable callback/setting.
+ *
+ * Defines the default timeout for '!!' and '!!!' invocations,
+ * e.g. the timeout for the future returned by the call to '!!' and '!!!'.
*/
def timeout: Long = actor.timeout
- override def toString: String = "ActorRef[" + actor.toString + "]"
+ /**
+ * Sets the default timeout for '!!' and '!!!' invocations,
+ * e.g. the timeout for the future returned by the call to '!!' and '!!!'.
+ */
+ def timeout_=(t: Long) = actor.timeout = t
+
+ /**
+ * Starts up the actor and its message queue.
+ */
+ def start: ActorRef = synchronized {
+ if (_isShutDown) throw new IllegalStateException("Can't restart an actor that has been shut down with 'stop' or 'exit'")
+ if (!isRunning) {
+ messageDispatcher.register(this)
+ messageDispatcher.start
+ _isRunning = true
+ actor.init
+ actor.initTransactionalState
+ }
+ Actor.log.debug("[%s] has started", toString)
+ ActorRegistry.register(this)
+ this
+ }
+
+ /**
+ * Shuts down the actor its dispatcher and message queue.
+ * Alias for 'stop'.
+ */
+ def exit = stop
+
+ /**
+ * Shuts down the actor its dispatcher and message queue.
+ */
+ def stop = synchronized {
+ if (isRunning) {
+ messageDispatcher.unregister(this)
+ _isRunning = false
+ _isShutDown = true
+ actor.shutdown
+ ActorRegistry.unregister(this)
+ _remoteAddress.foreach(address => RemoteClient.unregister(address.getHostName, address.getPort, uuid))
+ RemoteNode.unregister(this)
+ }
+ }
+
+ /**
+ * Links an other actor to this actor. Links are unidirectional and means that a the linking actor will
+ * receive a notification if the linked actor has crashed.
+ *
+ * If the 'trapExit' member field has been set to at contain at least one exception class then it will
+ * 'trap' these exceptions and automatically restart the linked actors according to the restart strategy
+ * defined by the 'faultHandler'.
+ *
+ * To be invoked from within the actor itself.
+ */
+ def link(actorRef: ActorRef) = {
+ if (actorRef.supervisor.isDefined) throw new IllegalStateException(
+ "Actor can only have one supervisor [" + actorRef + "], e.g. link(actor) fails")
+ linkedActors.add(actorRef)
+ actorRef.supervisor = Some(this)
+ Actor.log.debug("Linking actor [%s] to actor [%s]", actorRef, this)
+ }
+
+ /**
+ * Unlink the actor.
+ *
+ * To be invoked from within the actor itself.
+ */
+ def unlink(actorRef: ActorRef) = {
+ if (!linkedActors.contains(actorRef)) throw new IllegalStateException(
+ "Actor [" + actorRef + "] is not a linked actor, can't unlink")
+ linkedActors.remove(actorRef)
+ actorRef.supervisor = None
+ Actor.log.debug("Unlinking actor [%s] from actor [%s]", actorRef, this)
+ }
+
+ /**
+ * Atomically start and link an actor.
+ *
+ * To be invoked from within the actor itself.
+ */
+ def startLink(actorRef: ActorRef) = {
+ try {
+ actorRef.start
+ } finally {
+ link(actorRef)
+ }
+ }
+
+ /**
+ * Atomically start, link and make an actor remote.
+ *
+ * To be invoked from within the actor itself.
+ */
+ def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int) = {
+ try {
+ actorRef.makeRemote(hostname, port)
+ actorRef.start
+ } finally {
+ link(actorRef)
+ }
+ }
+
+ /**
+ * Atomically create (from actor class) and start an actor.
+ *
+ * To be invoked from within the actor itself.
+ */
+ def spawn[T <: Actor : Manifest]: ActorRef = {
+ val actorRef = spawnButDoNotStart[T]
+ actorRef.start
+ actorRef
+ }
+
+ /**
+ * Atomically create (from actor class), start and make an actor remote.
+ *
+ * To be invoked from within the actor itself.
+ */
+ def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef = {
+ val actor = spawnButDoNotStart[T]
+ actor.makeRemote(hostname, port)
+ actor.start
+ actor
+ }
+
+ /**
+ * Atomically create (from actor class), start and link an actor.
+ *
+ * To be invoked from within the actor itself.
+ */
+ def spawnLink[T <: Actor: Manifest]: ActorRef = {
+ val actor = spawnButDoNotStart[T]
+ try {
+ actor.start
+ } finally {
+ link(actor)
+ }
+ actor
+ }
+
+ /**
+ * Atomically create (from actor class), start, link and make an actor remote.
+ *
+ * To be invoked from within the actor itself.
+ */
+ def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef = {
+ val actor = spawnButDoNotStart[T]
+ try {
+ actor.makeRemote(hostname, port)
+ actor.start
+ } finally {
+ link(actor)
+ }
+ actor
+ }
+
+ /**
+ * Is the actor killed?
+ */
+ def isKilled: Boolean = _isKilled
+
+ /**
+ * Is the actor running?
+ */
+ def isRunning: Boolean = _isRunning
+
+ /**
+ * Is the actor shut down?
+ */
+ def isShutdown: Boolean = !_isRunning
+
+ /**
+ * Returns the mailbox size.
+ */
+ def mailboxSize: Int = _mailbox.size
+
+ /**
+ * Returns the supervisor, if there is one.
+ */
+ def supervisor: Option[ActorRef] = _supervisor
+
+ override def toString: String = actor.toString
+
override def hashCode: Int = actor.hashCode
+
override def equals(that: Any): Boolean = actor.equals(that)
- private[akka] def supervisor_=(sup: Option[ActorRef]): Unit = actor._supervisor = sup
+ private[akka] def supervisor_=(sup: Option[ActorRef]): Unit = _supervisor = sup
private[akka] def trapExit: List[Class[_ <: Throwable]] = actor.trapExit
private[akka] def trapExit_=(exits: List[Class[_ <: Throwable]]) = actor.trapExit = exits
@@ -567,6 +772,303 @@ final class ActorRef private[akka] () {
private[akka] def faultHandler: Option[FaultHandlingStrategy] = actor.faultHandler
private[akka] def faultHandler_=(handler: Option[FaultHandlingStrategy]) = actor.faultHandler = handler
+
+ private def spawnButDoNotStart[T <: Actor: Manifest]: ActorRef = {
+ val actor = manifest[T].erasure.asInstanceOf[Class[T]].newInstance
+ val actorRef = new ActorRef(() => actor)
+ if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) {
+ actorRef.dispatcher = dispatcher
+ }
+ actorRef
+ }
+
+ private[this] def newActor: Actor = {
+ val actor = actorFactory match {
+ case Left(Some(clazz)) =>
+ try {
+ clazz.newInstance
+ } catch {
+ case e: InstantiationException => throw new ActorInitializationException(
+ "Could not instantiate Actor due to:\n" + e +
+ "\nMake sure Actor is defined inside a class/trait," +
+ "\nif so put it outside the class/trait, f.e. in a companion object.")
+ }
+ case Right(Some(factory)) =>
+ factory()
+ case _ =>
+ throw new ActorInitializationException("Can't create Actor, no Actor class or factory function in scope")
+ }
+ actor._selfOption = Some(this)
+ if (actor eq null) throw new ActorInitializationException("Actor instance passed to ActorRef can not be 'null'")
+ actor
+ }
+
+ private[akka] def restart(reason: Throwable): Unit = {
+ Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id)
+ val failedActor = actorInstance.get
+ failedActor.synchronized {
+ Actor.log.debug("Restarting linked actors for actor [%s].", id)
+ restartLinkedActors(reason)
+ Actor.log.debug("Invoking 'preRestart' for failed actor instance [%s].", id)
+ failedActor.preRestart(reason)
+ stop
+ val freshActor = newActor
+ freshActor.synchronized {
+ actorInstance.set(freshActor)
+ start
+ Actor.log.debug("Invoking 'postRestart' for new actor instance [%s].", id)
+ freshActor.postRestart(reason)
+ }
+ _isKilled = false
+ }
+ }
+
+
+ protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = {
+ joinTransaction(message)
+
+ if (_remoteAddress.isDefined) {
+ val requestBuilder = RemoteRequestProtocol.newBuilder
+ .setId(RemoteRequestProtocolIdFactory.nextId)
+ .setTarget(this.getClass.getName)
+ .setTimeout(this.timeout)
+ .setUuid(this.uuid)
+ .setIsActor(true)
+ .setIsOneWay(true)
+ .setIsEscaped(false)
+
+ val id = registerSupervisorAsRemoteActor
+ if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
+
+ senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol))
+
+ RemoteProtocolBuilder.setMessage(message, requestBuilder)
+ RemoteClient.clientFor(_remoteAddress.get).send[Any](requestBuilder.build, None)
+ } else {
+ val invocation = new MessageInvocation(this, message, senderOption.map(Left(_)), transactionSet.get)
+ if (messageDispatcher.usesActorMailbox) {
+ _mailbox.add(invocation)
+ if (_isSuspended) invocation.send
+ }
+ else invocation.send
+ }
+ }
+
+ protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
+ message: Any,
+ timeout: Long,
+ senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
+ joinTransaction(message)
+
+ if (_remoteAddress.isDefined) {
+ val requestBuilder = RemoteRequestProtocol.newBuilder
+ .setId(RemoteRequestProtocolIdFactory.nextId)
+ .setTarget(this.getClass.getName)
+ .setTimeout(this.timeout)
+ .setUuid(this.uuid)
+ .setIsActor(true)
+ .setIsOneWay(false)
+ .setIsEscaped(false)
+
+ //senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol))
+ RemoteProtocolBuilder.setMessage(message, requestBuilder)
+
+ val id = registerSupervisorAsRemoteActor
+ if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
+
+ val future = RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build, senderFuture)
+ if (future.isDefined) future.get
+ else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
+ } else {
+ val future = if (senderFuture.isDefined) senderFuture.get
+ else new DefaultCompletableFuture[T](timeout)
+ val invocation = new MessageInvocation(
+ this, message, Some(Right(future.asInstanceOf[CompletableFuture[Any]])), transactionSet.get)
+ if (messageDispatcher.usesActorMailbox) _mailbox.add(invocation)
+ invocation.send
+ future
+ }
+ }
+
+ private def joinTransaction(message: Any) = if (isTransactionSetInScope) {
+ // FIXME test to run bench without this trace call
+ Actor.log.trace("Joining transaction set [%s];\n\tactor %s\n\twith message [%s]",
+ getTransactionSetInScope, toString, message)
+ getTransactionSetInScope.incParties
+ }
+
+ /**
+ * Callback for the dispatcher. E.g. single entry point to the user code and all protected[this] methods.
+ */
+ private[akka] def invoke(messageHandle: MessageInvocation) = actor.synchronized {
+ try {
+ if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle)
+ else dispatch(messageHandle)
+ } catch {
+ case e =>
+ Actor.log.error(e, "Could not invoke actor [%s]", this)
+ throw e
+ }
+ }
+
+ private def dispatch[T](messageHandle: MessageInvocation) = {
+ setTransactionSet(messageHandle.transactionSet)
+
+ val message = messageHandle.message //serializeMessage(messageHandle.message)
+ replyTo = messageHandle.replyTo
+
+ try {
+ if (actor.base.isDefinedAt(message)) actor.base(message) // invoke user actor's receive partial function
+ else throw new IllegalArgumentException("No handler matching message [" + message + "] in " + toString)
+ } catch {
+ case e =>
+ _isKilled = true
+ Actor.log.error(e, "Could not invoke actor [%s]", toString)
+ // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
+ if (_supervisor.isDefined) _supervisor.get ! Exit(this, e)
+ replyTo match {
+ case Some(Right(future)) => future.completeWithException(this, e)
+ case _ =>
+ }
+ } finally {
+ clearTransaction
+ }
+ }
+
+ private def transactionalDispatch[T](messageHandle: MessageInvocation) = {
+ var topLevelTransaction = false
+ val txSet: Option[CountDownCommitBarrier] =
+ if (messageHandle.transactionSet.isDefined) messageHandle.transactionSet
+ else {
+ topLevelTransaction = true // FIXME create a new internal atomic block that can wait for X seconds if top level tx
+ if (isTransactor) {
+ Actor.log.trace("Creating a new transaction set (top-level transaction)\n\tfor actor %s\n\twith message %s",
+ toString, messageHandle)
+ Some(createNewTransactionSet)
+ } else None
+ }
+ setTransactionSet(txSet)
+
+ val message = messageHandle.message //serializeMessage(messageHandle.message)
+ replyTo = messageHandle.replyTo
+
+ def proceed = {
+ if (actor.base.isDefinedAt(message)) actor.base(message) // invoke user actor's receive partial function
+ else throw new IllegalArgumentException(
+ toString + " could not process message [" + message + "]" +
+ "\n\tsince no matching 'case' clause in its 'receive' method could be found")
+ setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit
+ }
+
+ try {
+ if (isTransactor) {
+ atomic {
+ proceed
+ }
+ } else proceed
+ } catch {
+ case e: IllegalStateException => {}
+ case e =>
+ // abort transaction set
+ if (isTransactionSetInScope) try {
+ getTransactionSetInScope.abort
+ } catch { case e: IllegalStateException => {} }
+ Actor.log.error(e, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message)
+
+ replyTo match {
+ case Some(Right(future)) => future.completeWithException(this, e)
+ case _ =>
+ }
+
+ clearTransaction
+ if (topLevelTransaction) clearTransactionSet
+
+ // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
+ if (_supervisor.isDefined) _supervisor.get ! Exit(this, e)
+ } finally {
+ clearTransaction
+ if (topLevelTransaction) clearTransactionSet
+ }
+ }
+
+ private[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = {
+ if (trapExit.exists(_.isAssignableFrom(reason.getClass))) {
+ if (faultHandler.isDefined) {
+ faultHandler.get match {
+ // FIXME: implement support for maxNrOfRetries and withinTimeRange in RestartStrategy
+ case AllForOneStrategy(maxNrOfRetries, withinTimeRange) => restartLinkedActors(reason)
+ case OneForOneStrategy(maxNrOfRetries, withinTimeRange) => dead.restart(reason)
+ }
+ } else throw new IllegalStateException(
+ "No 'faultHandler' defined for an actor with the 'trapExit' member field defined " +
+ "\n\tto non-empty list of exception classes - can't proceed " + toString)
+ } else _supervisor.foreach(_ ! Exit(dead, reason)) // if 'trapExit' is not defined then pass the Exit on
+ }
+
+ private[akka] def restartLinkedActors(reason: Throwable) = {
+ linkedActors.toArray.toList.asInstanceOf[List[ActorRef]].foreach { actorRef =>
+ if (actorRef.lifeCycle.isEmpty) actorRef.lifeCycle = Some(LifeCycle(Permanent))
+ actorRef.lifeCycle.get match {
+ case LifeCycle(scope, _) => {
+ scope match {
+ case Permanent =>
+ actorRef.restart(reason)
+ case Temporary =>
+ Actor.log.info("Actor [%s] configured as TEMPORARY and will not be restarted.", actorRef.id)
+ actorRef.stop
+ linkedActors.remove(actorRef) // remove the temporary actor
+ // if last temporary actor is gone, then unlink me from supervisor
+ if (linkedActors.isEmpty) {
+ Actor.log.info("All linked actors have died permanently (they were all configured as TEMPORARY)" +
+ "\n\tshutting down and unlinking supervisor actor as well [%s].",
+ actorRef.id)
+ _supervisor.foreach(_ ! UnlinkAndStop(this))
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private[akka] def registerSupervisorAsRemoteActor: Option[String] = synchronized {
+ if (_supervisor.isDefined) {
+ RemoteClient.clientFor(_remoteAddress.get).registerSupervisorForActor(this)
+ Some(_supervisor.get.uuid)
+ } else None
+ }
+
+ private[akka] def linkedActors: JHashSet[ActorRef] = {
+ if (_linkedActors.isEmpty) {
+ val set = new JHashSet[ActorRef]
+ _linkedActors = Some(set)
+ set
+ } else _linkedActors.get
+ }
+
+ private def serializeMessage(message: AnyRef): AnyRef = if (Actor.SERIALIZE_MESSAGES) {
+ if (!message.isInstanceOf[String] &&
+ !message.isInstanceOf[Byte] &&
+ !message.isInstanceOf[Int] &&
+ !message.isInstanceOf[Long] &&
+ !message.isInstanceOf[Float] &&
+ !message.isInstanceOf[Double] &&
+ !message.isInstanceOf[Boolean] &&
+ !message.isInstanceOf[Char] &&
+ !message.isInstanceOf[Tuple2[_, _]] &&
+ !message.isInstanceOf[Tuple3[_, _, _]] &&
+ !message.isInstanceOf[Tuple4[_, _, _, _]] &&
+ !message.isInstanceOf[Tuple5[_, _, _, _, _]] &&
+ !message.isInstanceOf[Tuple6[_, _, _, _, _, _]] &&
+ !message.isInstanceOf[Tuple7[_, _, _, _, _, _, _]] &&
+ !message.isInstanceOf[Tuple8[_, _, _, _, _, _, _, _]] &&
+ !message.getClass.isArray &&
+ !message.isInstanceOf[List[_]] &&
+ !message.isInstanceOf[scala.collection.immutable.Map[_, _]] &&
+ !message.isInstanceOf[scala.collection.immutable.Set[_]] &&
+ !message.getClass.isAnnotationPresent(Annotations.immutable)) {
+ Serializer.Java.deepClone(message)
+ } else message
+ } else message
}
/**
@@ -582,12 +1084,11 @@ final class ActorRef private[akka] () {
*
* @author Jonas Bonér
*/
-trait Actor extends TransactionManagement with Logging {
- // Only mutable for RemoteServer in order to maintain identity across nodes
- private[akka] var _uuid = UUID.newUuid.toString
+trait Actor extends Logging {
/**
* The 'self' field holds the ActorRef for this actor.
+ *
* Can be used to send messages to itself:
*
* self ! message
@@ -595,44 +1096,30 @@ trait Actor extends TransactionManagement with Logging {
* Note: if you are using the 'self' field in the constructor of the Actor
* then you have to make the fields/operations that are using it 'lazy'.
*/
- implicit val self = new ActorRef(() => this)
-
- /** For internal use only */
- implicit val _selfOption = Some(self)
-
- // ====================================
- // private fields
- // ====================================
-
- @volatile private[this] var _isRunning = false
- @volatile private[this] var _isSuspended = true
- @volatile private[this] var _isShutDown = false
- @volatile private[akka] var _isKilled = false
- @volatile private[akka] var _registeredInRemoteNodeDuringSerialization = false
- private var _hotswap: Option[PartialFunction[Any, Unit]] = None
- private[akka] var _remoteAddress: Option[InetSocketAddress] = None
- private[akka] var _linkedActors: Option[HashSet[ActorRef]] = None
- private[akka] var _supervisor: Option[ActorRef] = None
- private[akka] var _replyToAddress: Option[InetSocketAddress] = None
- private[akka] val _mailbox: Deque[MessageInvocation] = new ConcurrentLinkedDeque[MessageInvocation]
+ protected[akka] def self: ActorRef = _selfOption.getOrElse(throw new IllegalStateException(
+ "ActorRef for instance of Actor [" + getClass.getName + "] is not in scope." +
+ "\n\tAre you using 'self' within the constructor (the class body) of the Actor?" +
+ "\n\tIf so you have to refactor and make all fields that uses the 'self' reference lazy," +
+ "\n\tand move all operations that uses 'self' out of the constructor."))
+
+ /**
+ * For internal use only.
+ */
+ protected[akka] implicit var _selfOption: Option[ActorRef] = None
/**
- * This lock ensures thread safety in the dispatching: only one message can
- * be dispatched at once on the actor.
+ * Holds the hot swapped partial function.
*/
- private[akka] val _dispatcherLock: Lock = new ReentrantLock
+ private var _hotswap: Option[PartialFunction[Any, Unit]] = None // FIXME: _hotswap should be a stack
+
+ // ==================
+ // ==== USER API ====
+ // ==================
/**
- * Holds the reference to the sender of the currently processed message.
- * - Is None if no sender was specified
- * - Is Some(Left(Actor)) if sender is an actor
- * - Is Some(Right(CompletableFuture)) if sender is holding on to a Future for the result
+ * Use to override the default dispatcher.
*/
- private[akka] var replyTo: Option[Either[ActorRef, CompletableFuture[Any]]] = None
-
- // ====================================
- // ==== USER CALLBACKS TO OVERRIDE ====
- // ====================================
+ def dispatcher_=(dispatcher: MessageDispatcher) = self.dispatcher = dispatcher
/**
* User overridable callback/setting.
@@ -655,21 +1142,6 @@ trait Actor extends TransactionManagement with Logging {
*/
@volatile var timeout: Long = Actor.TIMEOUT
- /**
- * User overridable callback/setting.
- *
- * The default dispatcher is the Dispatchers.globalExecutorBasedEventDrivenDispatcher.
- * This means that all actors will share the same event-driven executor based dispatcher.
- *
- * You can override it so it fits the specific use-case that the actor is used for.
- * See the se.scalablesolutions.akka.dispatch.Dispatchers class for the different
- * dispatchers available.
- *
- * The default is also that all actors that are created and spawned from within this actor
- * is sharing the same dispatcher as its creator.
- */
- protected[akka] var messageDispatcher: MessageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher
-
/**
* User overridable callback/setting.
*
@@ -710,13 +1182,53 @@ trait Actor extends TransactionManagement with Logging {
*/
@volatile var lifeCycle: Option[LifeCycle] = None
+ /**
+ * User overridable callback/setting.
+ *
+ * Optional callback method that is called during initialization.
+ * To be implemented by subclassing actor.
+ */
+ def init {}
+
+ /**
+ * User overridable callback/setting.
+ *
+ * Mandatory callback method that is called during restart and reinitialization after a server crash.
+ * To be implemented by subclassing actor.
+ */
+ def preRestart(reason: Throwable) {}
+
+ /**
+ * User overridable callback/setting.
+ *
+ * Mandatory callback method that is called during restart and reinitialization after a server crash.
+ * To be implemented by subclassing actor.
+ */
+ def postRestart(reason: Throwable) {}
+
+ /**
+ * User overridable callback/setting.
+ *
+ * Optional callback method that is called during termination.
+ * To be implemented by subclassing actor.
+ */
+ def initTransactionalState {}
+
+ /**
+ * User overridable callback/setting.
+ *
+ * Optional callback method that is called during termination.
+ * To be implemented by subclassing actor.
+ */
+ def shutdown {}
+
/**
* User overridable callback/setting.
*
* Set to true if messages should have REQUIRES_NEW semantics, e.g. a new transaction should
* start if there is no one running, else it joins the existing transaction.
*/
- @volatile protected var isTransactor = false
+ protected def isTransactor_=(flag: Boolean) = self.isTransactor = flag
/**
* User overridable callback/setting.
@@ -741,108 +1253,6 @@ trait Actor extends TransactionManagement with Logging {
*/
protected def receive: PartialFunction[Any, Unit]
- /**
- * User overridable callback/setting.
- *
- * Optional callback method that is called during initialization.
- * To be implemented by subclassing actor.
- */
- protected def init {}
-
- /**
- * User overridable callback/setting.
- *
- * Mandatory callback method that is called during restart and reinitialization after a server crash.
- * To be implemented by subclassing actor.
- */
- protected def preRestart(reason: Throwable) {}
-
- /**
- * User overridable callback/setting.
- *
- * Mandatory callback method that is called during restart and reinitialization after a server crash.
- * To be implemented by subclassing actor.
- */
- protected def postRestart(reason: Throwable) {}
-
- /**
- * User overridable callback/setting.
- *
- * Optional callback method that is called during termination.
- * To be implemented by subclassing actor.
- */
- protected def initTransactionalState {}
-
- /**
- * User overridable callback/setting.
- *
- * Optional callback method that is called during termination.
- * To be implemented by subclassing actor.
- */
- protected def shutdown {}
-
- // =============
- // ==== API ====
- // =============
-
- /**
- * Starts up the actor and its message queue.
- */
- def start: Unit = synchronized {
- if (_isShutDown) throw new IllegalStateException("Can't restart an actor that has been shut down with 'stop' or 'exit'")
- if (!_isRunning) {
- messageDispatcher.register(self)
- messageDispatcher.start
- _isRunning = true
- init
- initTransactionalState
- }
- Actor.log.debug("[%s] has started", toString)
- ActorRegistry.register(self)
- }
-
- /**
- * Shuts down the actor its dispatcher and message queue.
- * Alias for 'stop'.
- */
- protected def exit = stop
-
- /**
- * Shuts down the actor its dispatcher and message queue.
- */
- def stop = synchronized {
- if (_isRunning) {
- messageDispatcher.unregister(self)
- _isRunning = false
- _isShutDown = true
- shutdown
- ActorRegistry.unregister(self)
- _remoteAddress.foreach(address => RemoteClient.unregister(address.getHostName, address.getPort, uuid))
- RemoteNode.unregister(self)
- }
- }
-
- /**
- * Is the actor killed?
- */
- def isKilled: Boolean = _isKilled
-
- /**
- * Is the actor running?
- */
- def isRunning: Boolean = _isRunning
-
- /**
- * Returns the mailbox size.
- */
- def mailboxSize: Int = _mailbox.size
-
-
- /**
- * Returns the supervisor, if there is one.
- */
- def supervisor: Option[ActorRef] = _supervisor
-
/**
* Use reply(..) to reply with a message to the original sender of the message currently
* being processed.
@@ -865,7 +1275,7 @@ trait Actor extends TransactionManagement with Logging {
*
* Returns true if reply was sent, and false if unable to determine what to reply to.
*/
- protected[this] def reply_?(message: Any): Boolean = replyTo match {
+ protected[this] def reply_?(message: Any): Boolean = self.replyTo match {
case Some(Left(actor)) =>
actor ! message
true
@@ -875,50 +1285,22 @@ trait Actor extends TransactionManagement with Logging {
case _ =>
false
}
+
+ /**
+ * Starts the actor.
+ */
+ def start = self.start
/**
- * Get the dispatcher for this actor.
+ * Shuts down the actor its dispatcher and message queue.
+ * Alias for 'stop'.
*/
- def dispatcher: MessageDispatcher = messageDispatcher
+ def exit = self.stop
/**
- * Sets the dispatcher for this actor. Needs to be invoked before the actor is started.
+ * Shuts down the actor its dispatcher and message queue.
*/
- def dispatcher_=(md: MessageDispatcher): Unit = synchronized {
- if (!_isRunning) {
- messageDispatcher.unregister(self)
- messageDispatcher = md
- messageDispatcher.register(self)
- } else throw new IllegalArgumentException(
- "Can not swap dispatcher for " + toString + " after it has been started")
- }
-
- /**
- * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
- */
- def makeRemote(hostname: String, port: Int): Unit =
- if (_isRunning) throw new IllegalStateException("Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.")
- else makeRemote(new InetSocketAddress(hostname, port))
-
- /**
- * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
- */
- def makeRemote(address: InetSocketAddress): Unit =
- if (_isRunning) throw new IllegalStateException("Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.")
- else {
- _remoteAddress = Some(address)
- RemoteClient.register(address.getHostName, address.getPort, uuid)
- if (_replyToAddress.isEmpty) setReplyToAddress(Actor.HOSTNAME, Actor.PORT)
- }
-
-
- /**
- * Set the contact address for this actor. This is used for replying to messages sent
- * asynchronously when no reply channel exists.
- */
- def setReplyToAddress(hostname: String, port: Int): Unit = setReplyToAddress(new InetSocketAddress(hostname, port))
-
- def setReplyToAddress(address: InetSocketAddress): Unit = _replyToAddress = Some(address)
+ def stop = self.stop
/**
* Invoking 'makeTransactionRequired' means that the actor will **start** a new transaction if non exists.
@@ -928,12 +1310,30 @@ trait Actor extends TransactionManagement with Logging {
* TransactionManagement.disableTransactions
*
*/
- def makeTransactionRequired = synchronized {
- if (_isRunning) throw new IllegalArgumentException(
- "Can not make actor transaction required after it has been started")
- else isTransactor = true
- }
+ def makeTransactionRequired = self.makeTransactionRequired
+ /**
+ * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
+ */
+ def makeRemote(hostname: String, port: Int): Unit = makeRemote(new InetSocketAddress(hostname, port))
+
+ /**
+ * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
+ */
+ def makeRemote(address: InetSocketAddress): Unit = self.makeRemote(address)
+
+ /**
+ * Set the contact address for this actor. This is used for replying to messages sent
+ * asynchronously when no reply channel exists.
+ */
+ def setReplyToAddress(hostname: String, port: Int): Unit = self.setReplyToAddress(new InetSocketAddress(hostname, port))
+
+ /**
+ * Set the contact address for this actor. This is used for replying to messages sent
+ * asynchronously when no reply channel exists.
+ */
+ def setReplyToAddress(address: InetSocketAddress): Unit = self.setReplyToAddress(address)
+
/**
* Links an other actor to this actor. Links are unidirectional and means that a the linking actor will
* receive a notification if the linked actor has crashed.
@@ -944,107 +1344,51 @@ trait Actor extends TransactionManagement with Logging {
*
* To be invoked from within the actor itself.
*/
- protected[this] def link(actorRef: ActorRef) = {
- if (actorRef.supervisor.isDefined) throw new IllegalStateException(
- "Actor can only have one supervisor [" + actorRef + "], e.g. link(actor) fails")
- getLinkedActors.add(actorRef)
- actorRef.supervisor = Some(self)
- Actor.log.debug("Linking actor [%s] to actor [%s]", actorRef, this)
- }
+ protected[this] def link(actorRef: ActorRef) = self.link(actorRef)
/**
* Unlink the actor.
*
* To be invoked from within the actor itself.
*/
- protected[this] def unlink(actorRef: ActorRef) = {
- if (!getLinkedActors.contains(actorRef)) throw new IllegalStateException(
- "Actor [" + actorRef + "] is not a linked actor, can't unlink")
- getLinkedActors.remove(actorRef)
- actorRef.supervisor = None
- Actor.log.debug("Unlinking actor [%s] from actor [%s]", actorRef, this)
- }
+ protected[this] def unlink(actorRef: ActorRef) = self.unlink(actorRef)
/**
* Atomically start and link an actor.
*
* To be invoked from within the actor itself.
*/
- protected[this] def startLink(actorRef: ActorRef) = {
- try {
- actorRef.start
- } finally {
- link(actorRef)
- }
- }
+ protected[this] def startLink(actorRef: ActorRef) = self.startLink(actorRef)
/**
* Atomically start, link and make an actor remote.
*
* To be invoked from within the actor itself.
*/
- protected[this] def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int) = {
- try {
- actorRef.makeRemote(hostname, port)
- actorRef.start
- } finally {
- link(actorRef)
- }
- }
+ protected[this] def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int) =
+ self.startLinkRemote(actorRef, hostname, port)
/**
* Atomically create (from actor class) and start an actor.
*
* To be invoked from within the actor itself.
*/
- protected[this] def spawn[T <: Actor : Manifest]: ActorRef = {
- val actorRef = spawnButDoNotStart[T]
- actorRef.start
- actorRef
- }
+ protected[this] def spawn[T <: Actor : Manifest]: ActorRef = self.spawn[T]
/**
* Atomically create (from actor class), start and make an actor remote.
*
* To be invoked from within the actor itself.
*/
- protected[this] def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef = {
- val actor = spawnButDoNotStart[T]
- actor.makeRemote(hostname, port)
- actor.start
- actor
- }
+ protected[this] def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef =
+ self.spawnRemote[T](hostname, port)
/**
* Atomically create (from actor class), start and link an actor.
*
* To be invoked from within the actor itself.
*/
- protected[this] def spawnLink[T <: Actor: Manifest]: ActorRef = {
- val actor = spawnButDoNotStart[T]
- try {
- actor.start
- } finally {
- link(actor)
- }
- actor
- }
-
- /**
- * Atomically create (from actor class), start, link and make an actor remote.
- *
- * To be invoked from within the actor itself.
- */
- protected[this] def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef = {
- val actor = spawnButDoNotStart[T]
- try {
- actor.makeRemote(hostname, port)
- actor.start
- } finally {
- link(actor)
- }
- actor
- }
+ protected[this] def spawnLink[T <: Actor: Manifest]: ActorRef = self.spawnLink[T]
/**
* Returns the id for the actor.
@@ -1054,297 +1398,29 @@ trait Actor extends TransactionManagement with Logging {
/**
* Returns the uuid for the actor.
*/
- def uuid = _uuid
+ def uuid = self.uuid
// =========================================
// ==== INTERNAL IMPLEMENTATION DETAILS ====
// =========================================
- private[akka] def _suspend = _isSuspended = true
-
- private[akka] def _resume = _isSuspended = false
-
- private def spawnButDoNotStart[T <: Actor : Manifest]: ActorRef = {
- val actor = manifest[T].erasure.asInstanceOf[Class[T]].newInstance
- if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) {
- actor.dispatcher = dispatcher
- }
- new ActorRef(() => actor)
- }
-
- protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = {
- joinTransaction(message)
-
- if (_remoteAddress.isDefined) {
- val requestBuilder = RemoteProtocol.RemoteRequestProtocol.newBuilder
- .setId(RemoteRequestProtocolIdFactory.nextId)
- .setTarget(this.getClass.getName)
- .setTimeout(this.timeout)
- .setUuid(this.uuid)
- .setIsActor(true)
- .setIsOneWay(true)
- .setIsEscaped(false)
-
- val id = registerSupervisorAsRemoteActor
- if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
-
- senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol))
-
- RemoteProtocolBuilder.setMessage(message, requestBuilder)
- RemoteClient.clientFor(_remoteAddress.get).send[Any](requestBuilder.build, None)
- } else {
- val invocation = new MessageInvocation(self, message, senderOption.map(Left(_)), transactionSet.get)
- if (messageDispatcher.usesActorMailbox) {
- _mailbox.add(invocation)
- if (_isSuspended) invocation.send
- }
- else invocation.send
- }
- }
-
- protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
- message: Any,
- timeout: Long,
- senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
- joinTransaction(message)
-
- if (_remoteAddress.isDefined) {
- val requestBuilder = RemoteProtocol.RemoteRequestProtocol.newBuilder
- .setId(RemoteRequestProtocolIdFactory.nextId)
- .setTarget(this.getClass.getName)
- .setTimeout(this.timeout)
- .setUuid(this.uuid)
- .setIsActor(true)
- .setIsOneWay(false)
- .setIsEscaped(false)
-
- //senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol))
- RemoteProtocolBuilder.setMessage(message, requestBuilder)
-
- val id = registerSupervisorAsRemoteActor
- if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
-
- val future = RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build, senderFuture)
- if (future.isDefined) future.get
- else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
- } else {
- val future = if (senderFuture.isDefined) senderFuture.get
- else new DefaultCompletableFuture[T](timeout)
- val invocation = new MessageInvocation(
- self, message, Some(Right(future.asInstanceOf[CompletableFuture[Any]])), transactionSet.get)
- if (messageDispatcher.usesActorMailbox) _mailbox.add(invocation)
- invocation.send
- future
- }
- }
-
- private def joinTransaction(message: Any) = if (isTransactionSetInScope) {
- // FIXME test to run bench without this trace call
- Actor.log.trace("Joining transaction set [%s];\n\tactor %s\n\twith message [%s]",
- getTransactionSetInScope, toString, message)
- getTransactionSetInScope.incParties
- }
-
- /**
- * Callback for the dispatcher. E.g. single entry point to the user code and all protected[this] methods.
- */
- private[akka] def invoke(messageHandle: MessageInvocation) = synchronized {
- try {
- if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle)
- else dispatch(messageHandle)
- } catch {
- case e =>
- Actor.log.error(e, "Could not invoke actor [%s]", this)
- throw e
- }
- }
-
- private def dispatch[T](messageHandle: MessageInvocation) = {
- setTransactionSet(messageHandle.transactionSet)
-
- val message = messageHandle.message //serializeMessage(messageHandle.message)
- replyTo = messageHandle.replyTo
-
- try {
- if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function
- else throw new IllegalArgumentException("No handler matching message [" + message + "] in " + toString)
- } catch {
- case e =>
- _isKilled = true
- Actor.log.error(e, "Could not invoke actor [%s]", this)
- // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
- if (_supervisor.isDefined) _supervisor.get ! Exit(self, e)
- replyTo match {
- case Some(Right(future)) => future.completeWithException(self, e)
- case _ =>
- }
- } finally {
- clearTransaction
- }
- }
-
- private def transactionalDispatch[T](messageHandle: MessageInvocation) = {
- var topLevelTransaction = false
- val txSet: Option[CountDownCommitBarrier] =
- if (messageHandle.transactionSet.isDefined) messageHandle.transactionSet
- else {
- topLevelTransaction = true // FIXME create a new internal atomic block that can wait for X seconds if top level tx
- if (isTransactor) {
- Actor.log.trace("Creating a new transaction set (top-level transaction)\n\tfor actor %s\n\twith message %s",
- toString, messageHandle)
- Some(createNewTransactionSet)
- } else None
- }
- setTransactionSet(txSet)
-
- val message = messageHandle.message //serializeMessage(messageHandle.message)
- replyTo = messageHandle.replyTo
-
- def proceed = {
- if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function
- else throw new IllegalArgumentException(
- toString + " could not process message [" + message + "]" +
- "\n\tsince no matching 'case' clause in its 'receive' method could be found")
- setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit
- }
-
- try {
- if (isTransactor) {
- atomic {
- proceed
- }
- } else proceed
- } catch {
- case e: IllegalStateException => {}
- case e =>
- // abort transaction set
- if (isTransactionSetInScope) try {
- getTransactionSetInScope.abort
- } catch { case e: IllegalStateException => {} }
- Actor.log.error(e, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message)
-
- replyTo match {
- case Some(Right(future)) => future.completeWithException(self, e)
- case _ =>
- }
-
- clearTransaction
- if (topLevelTransaction) clearTransactionSet
-
- // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
- if (_supervisor.isDefined) _supervisor.get ! Exit(self, e)
- } finally {
- clearTransaction
- if (topLevelTransaction) clearTransactionSet
- }
- }
-
- private def base: PartialFunction[Any, Unit] = lifeCycles orElse (_hotswap getOrElse receive)
+ private[akka] def base: PartialFunction[Any, Unit] = lifeCycles orElse (_hotswap getOrElse receive)
private val lifeCycles: PartialFunction[Any, Unit] = {
case HotSwap(code) => _hotswap = code
- case Restart(reason) => restart(reason)
- case Exit(dead, reason) => handleTrapExit(dead, reason)
- case Unlink(child) => unlink(child)
- case UnlinkAndStop(child) => unlink(child); child.stop
+ case Restart(reason) => self.restart(reason)
+ case Exit(dead, reason) => self.handleTrapExit(dead, reason)
+ case Unlink(child) => self.unlink(child)
+ case UnlinkAndStop(child) => self.unlink(child); child.stop
case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message")
}
-
- private[this] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = {
- if (trapExit.exists(_.isAssignableFrom(reason.getClass))) {
- if (faultHandler.isDefined) {
- faultHandler.get match {
- // FIXME: implement support for maxNrOfRetries and withinTimeRange in RestartStrategy
- case AllForOneStrategy(maxNrOfRetries, withinTimeRange) => restartLinkedActors(reason)
- case OneForOneStrategy(maxNrOfRetries, withinTimeRange) => dead.actor.restart(reason)
- }
- } else throw new IllegalStateException(
- "No 'faultHandler' defined for an actor with the 'trapExit' member field defined " +
- "\n\tto non-empty list of exception classes - can't proceed " + toString)
- } else _supervisor.foreach(_ ! Exit(dead, reason)) // if 'trapExit' is not defined then pass the Exit on
- }
-
- private[this] def restartLinkedActors(reason: Throwable) = {
- getLinkedActors.toArray.toList.asInstanceOf[List[ActorRef]].foreach {
- actorRef =>
- val actor = actorRef.actor
- if (actor.lifeCycle.isEmpty) actor.lifeCycle = Some(LifeCycle(Permanent))
- actor.lifeCycle.get match {
- case LifeCycle(scope, _) => {
- scope match {
- case Permanent =>
- actor.restart(reason)
- case Temporary =>
- Actor.log.info("Actor [%s] configured as TEMPORARY and will not be restarted.", actor.id)
- actor.stop
- getLinkedActors.remove(actorRef) // remove the temporary actor
- // if last temporary actor is gone, then unlink me from supervisor
- if (getLinkedActors.isEmpty) {
- Actor.log.info("All linked actors have died permanently (they were all configured as TEMPORARY)" +
- "\n\tshutting down and unlinking supervisor actor as well [%s].",
- actor.id)
- _supervisor.foreach(_ ! UnlinkAndStop(self))
- }
- }
- }
- }
- }
- }
-
- private[Actor] def restart(reason: Throwable): Unit = synchronized {
- restartLinkedActors(reason)
- preRestart(reason)
- Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id)
- postRestart(reason)
- _isKilled = false
- }
-
- private[akka] def registerSupervisorAsRemoteActor: Option[String] = synchronized {
- if (_supervisor.isDefined) {
- RemoteClient.clientFor(_remoteAddress.get).registerSupervisorForActor(self)
- Some(_supervisor.get.uuid)
- } else None
- }
-
- protected def getLinkedActors: HashSet[ActorRef] = {
- if (_linkedActors.isEmpty) {
- val set = new HashSet[ActorRef]
- _linkedActors = Some(set)
- set
- } else _linkedActors.get
- }
-
- private def serializeMessage(message: AnyRef): AnyRef = if (Actor.SERIALIZE_MESSAGES) {
- if (!message.isInstanceOf[String] &&
- !message.isInstanceOf[Byte] &&
- !message.isInstanceOf[Int] &&
- !message.isInstanceOf[Long] &&
- !message.isInstanceOf[Float] &&
- !message.isInstanceOf[Double] &&
- !message.isInstanceOf[Boolean] &&
- !message.isInstanceOf[Char] &&
- !message.isInstanceOf[Tuple2[_, _]] &&
- !message.isInstanceOf[Tuple3[_, _, _]] &&
- !message.isInstanceOf[Tuple4[_, _, _, _]] &&
- !message.isInstanceOf[Tuple5[_, _, _, _, _]] &&
- !message.isInstanceOf[Tuple6[_, _, _, _, _, _]] &&
- !message.isInstanceOf[Tuple7[_, _, _, _, _, _, _]] &&
- !message.isInstanceOf[Tuple8[_, _, _, _, _, _, _, _]] &&
- !message.getClass.isArray &&
- !message.isInstanceOf[List[_]] &&
- !message.isInstanceOf[scala.collection.immutable.Map[_, _]] &&
- !message.isInstanceOf[scala.collection.immutable.Set[_]] &&
- !message.getClass.isAnnotationPresent(Annotations.immutable)) {
- Serializer.Java.deepClone(message)
- } else message
- } else message
-
- override def hashCode(): Int = HashCode.hash(HashCode.SEED, _uuid)
+
+ override def hashCode(): Int = HashCode.hash(HashCode.SEED, uuid)
override def equals(that: Any): Boolean = {
that != null &&
that.isInstanceOf[Actor] &&
- that.asInstanceOf[Actor]._uuid == _uuid
+ that.asInstanceOf[Actor].uuid == uuid
}
override def toString = "Actor[" + id + ":" + uuid + "]"
@@ -1364,6 +1440,59 @@ object DispatcherType {
* @author Jonas Bonér
*/
class ActorMessageInvoker private[akka] (val actorRef: ActorRef) extends MessageInvoker {
- def invoke(handle: MessageInvocation) = actorRef.actor.invoke(handle)
+ def invoke(handle: MessageInvocation) = actorRef.invoke(handle)
+}
+
+/**
+ * Remote Actor proxy.
+ *
+ * @author Jonas Bonér
+ */
+private[akka] class RemoteActorRef private (
+ uuid: String, className: String, hostname: String, port: Int, timeOut: Long) extends ActorRef {
+ val remoteClient = RemoteClient.clientFor(hostname, port)
+
+ override def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = {
+ val requestBuilder = RemoteRequestProtocol.newBuilder
+ .setId(RemoteRequestProtocolIdFactory.nextId)
+ .setTarget(className)
+ .setTimeout(timeOut)
+ .setUuid(uuid)
+ .setIsActor(true)
+ .setIsOneWay(true)
+ .setIsEscaped(false)
+ senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol))
+ RemoteProtocolBuilder.setMessage(message, requestBuilder)
+ remoteClient.send[Any](requestBuilder.build, None)
+ }
+
+ override def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
+ message: Any,
+ timeout: Long,
+ senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
+ val requestBuilder = RemoteRequestProtocol.newBuilder
+ .setId(RemoteRequestProtocolIdFactory.nextId)
+ .setTarget(className)
+ .setTimeout(timeout)
+ .setUuid(uuid)
+ .setIsActor(true)
+ .setIsOneWay(false)
+ .setIsEscaped(false)
+ //senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol))
+ RemoteProtocolBuilder.setMessage(message, requestBuilder)
+ val future = remoteClient.send(requestBuilder.build, senderFuture)
+ if (future.isDefined) future.get
+ else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
+ }
+}
+
+/**
+ * Remote Actor proxy factory.
+ *
+ * @author Jonas Bonér
+ */
+private[akka] object RemoteActorRef {
+ def apply(uuid: String, className: String, hostname: String, port: Int, timeout: Long): ActorRef =
+ (new RemoteActorRef(uuid, className, hostname, port, timeout)).start
}
diff --git a/akka-core/src/main/scala/actor/ActorRegistry.scala b/akka-core/src/main/scala/actor/ActorRegistry.scala
index c52c59b2ae..c72c588937 100644
--- a/akka-core/src/main/scala/actor/ActorRegistry.scala
+++ b/akka-core/src/main/scala/actor/ActorRegistry.scala
@@ -57,9 +57,9 @@ object ActorRegistry extends Logging {
val all = new ListBuffer[ActorRef]
val elements = actorsByUUID.elements
while (elements.hasMoreElements) {
- val actorRef = elements.nextElement
- if (manifest.erasure.isAssignableFrom(actorRef.actor.getClass)) {
- all += actorRef
+ val actorId = elements.nextElement
+ if (manifest.erasure.isAssignableFrom(actorId.actor.getClass)) {
+ all += actorId
}
}
all.toList
@@ -92,24 +92,24 @@ object ActorRegistry extends Logging {
/**
* Registers an actor in the ActorRegistry.
*/
- def register(actorRef: ActorRef) = {
+ def register(actorId: ActorRef) = {
// UUID
- actorsByUUID.put(actorRef.uuid, actorRef)
+ actorsByUUID.put(actorId.uuid, actorId)
// ID
- val id = actorRef.id
- if (id eq null) throw new IllegalStateException("Actor.id is null " + actorRef)
- if (actorsById.containsKey(id)) actorsById.put(id, actorRef :: actorsById.get(id))
- else actorsById.put(id, actorRef :: Nil)
+ val id = actorId.id
+ if (id eq null) throw new IllegalStateException("Actor.id is null " + actorId)
+ if (actorsById.containsKey(id)) actorsById.put(id, actorId :: actorsById.get(id))
+ else actorsById.put(id, actorId :: Nil)
// Class name
- val className = actorRef.actor.getClass.getName
+ val className = actorId.actor.getClass.getName
if (actorsByClassName.containsKey(className)) {
- actorsByClassName.put(className, actorRef :: actorsByClassName.get(className))
- } else actorsByClassName.put(className, actorRef :: Nil)
+ actorsByClassName.put(className, actorId :: actorsByClassName.get(className))
+ } else actorsByClassName.put(className, actorId :: Nil)
// notify listeners
- foreachListener(_ ! ActorRegistered(actorRef))
+ foreachListener(_ ! ActorRegistered(actorId))
}
/**
diff --git a/akka-core/src/main/scala/actor/Agent.scala b/akka-core/src/main/scala/actor/Agent.scala
index 0785288d10..2949c41706 100644
--- a/akka-core/src/main/scala/actor/Agent.scala
+++ b/akka-core/src/main/scala/actor/Agent.scala
@@ -99,9 +99,11 @@ class AgentException private[akka](message: String) extends RuntimeException(mes
* @author Jonas Bonér
*/
sealed class Agent[T] private (initialValue: T) extends Transactor {
- start
import Agent._
- log.debug("Starting up Agent [%s]", _uuid)
+ import Actor._
+ _selfOption = Some(newActor(() => this).start)
+
+ log.debug("Starting up Agent [%s]", uuid)
private lazy val value = Ref[T]()
@@ -138,7 +140,7 @@ sealed class Agent[T] private (initialValue: T) extends Transactor {
* method and then waits for its result on a CountDownLatch.
*/
final def get: T = {
- if (isTransactionInScope) throw new AgentException(
+ if (self.isTransactionInScope) throw new AgentException(
"Can't call Agent.get within an enclosing transaction.\n\tWould block indefinitely.\n\tPlease refactor your code.")
val ref = new AtomicReference[T]
val latch = new CountDownLatch(1)
diff --git a/akka-core/src/main/scala/actor/Scheduler.scala b/akka-core/src/main/scala/actor/Scheduler.scala
index 2b31bea60c..8bb6bbcd19 100644
--- a/akka-core/src/main/scala/actor/Scheduler.scala
+++ b/akka-core/src/main/scala/actor/Scheduler.scala
@@ -38,15 +38,16 @@ class ScheduleActor(val receiver: ActorRef, val future: ScheduledFuture[AnyRef])
}
object Scheduler extends Actor {
+ import Actor._
+
private var service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory)
private val schedulers = new ConcurrentHashMap[ActorRef, ActorRef]
faultHandler = Some(OneForOneStrategy(5, 5000))
trapExit = List(classOf[Throwable])
- start
def schedule(receiver: ActorRef, message: AnyRef, initialDelay: Long, delay: Long, timeUnit: TimeUnit) = {
try {
- startLink(new ActorRef(() => new ScheduleActor(
+ startLink(newActor(() => new ScheduleActor(
receiver,
service.scheduleAtFixedRate(new java.lang.Runnable {
def run = receiver ! message;
diff --git a/akka-core/src/main/scala/actor/Supervisor.scala b/akka-core/src/main/scala/actor/Supervisor.scala
index c0023ae44b..fc98a57da0 100644
--- a/akka-core/src/main/scala/actor/Supervisor.scala
+++ b/akka-core/src/main/scala/actor/Supervisor.scala
@@ -42,14 +42,13 @@ import java.util.concurrent.ConcurrentHashMap
class SupervisorFactory(val config: SupervisorConfig) extends Logging {
type ExceptionList = List[Class[_ <: Throwable]]
- def newInstance: Supervisor = newInstanceFor(config)
+ def newInstance: ActorRef = newInstanceFor(config)
- def newInstanceFor(config: SupervisorConfig): Supervisor = config match {
+ def newInstanceFor(config: SupervisorConfig): ActorRef = config match {
case SupervisorConfig(restartStrategy, _) =>
val supervisor = create(restartStrategy)
- supervisor.start
supervisor.configure(config, this)
- supervisor
+ Actor.newActor(() => supervisor).start
}
protected def create(strategy: RestartStrategy): Supervisor = strategy match {
@@ -84,7 +83,7 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep
faultHandler = Some(handler)
// FIXME should Supervisor really havea newThreadBasedDispatcher??
- dispatcher = Dispatchers.newThreadBasedDispatcher(this)
+ self.dispatcher = Dispatchers.newThreadBasedDispatcher(this)
private val actors = new ConcurrentHashMap[String, List[ActorRef]]
@@ -96,14 +95,12 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep
def isDefined(clazz: Class[_]): Boolean = actors.containsKey(clazz.getName)
- override def start: Unit = synchronized {
+ override def init: Unit = synchronized {
ConfiguratorRepository.registerConfigurator(this)
- super[Actor].start
}
- override def stop = synchronized {
- super[Actor].stop
- getLinkedActors.toArray.toList.asInstanceOf[List[ActorRef]].foreach { actorRef =>
+ override def shutdown: Unit = synchronized {
+ self.linkedActors.toArray.toList.asInstanceOf[List[ActorRef]].foreach { actorRef =>
actorRef.stop
log.info("Shutting actor down: %s", actorRef)
}
@@ -131,7 +128,7 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep
startLink(actorRef)
remoteAddress.foreach(address => RemoteServer.actorsFor(
RemoteServer.Address(address.hostname, address.port))
- .actors.put(actorRef.id, actorRef))
+ .actors.put(actorRef.id, actorRef))
case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration
val supervisor = {
@@ -140,14 +137,14 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep
instance
}
supervisor.lifeCycle = Some(LifeCycle(Permanent))
- val className = supervisor.getClass.getName
+ val className = supervisor.actorClass.getName
val currentSupervisors = {
val list = actors.get(className)
if (list eq null) List[ActorRef]()
else list
}
- actors.put(className, supervisor.self :: currentSupervisors)
- link(supervisor.self)
+ actors.put(className, supervisor :: currentSupervisors)
+ link(supervisor)
})
}
}
diff --git a/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala b/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala
index 7c8da6efc9..c659751a4e 100644
--- a/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala
+++ b/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala
@@ -24,7 +24,7 @@ import java.lang.reflect.Method
*/
private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfiguratorBase with Logging {
private var injector: Injector = _
- private var supervisor: Option[Supervisor] = None
+ private var supervisor: Option[ActorRef] = None
private var restartStrategy: RestartStrategy = _
private var components: List[Component] = _
private var supervised: List[Supervise] = Nil
@@ -82,19 +82,19 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
private def newSubclassingProxy(component: Component): DependencyBinding = {
val targetClass = component.target
- val actor = new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks)
- if (component.dispatcher.isDefined) actor.dispatcher = component.dispatcher.get
+ val actorRef = new ActorRef(() => new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks))
+ if (component.dispatcher.isDefined) actorRef.dispatcher = component.dispatcher.get
val remoteAddress =
if (component.remoteAddress.isDefined)
Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
else None
- val proxy = ActiveObject.newInstance(targetClass, actor, remoteAddress, component.timeout).asInstanceOf[AnyRef]
+ val proxy = ActiveObject.newInstance(targetClass, actorRef, remoteAddress, component.timeout).asInstanceOf[AnyRef]
if (remoteAddress.isDefined) {
RemoteServer
.actorsFor(RemoteServer.Address(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
.activeObjects.put(targetClass.getName, proxy)
}
- supervised ::= Supervise(new ActorRef(() => actor), component.lifeCycle)
+ supervised ::= Supervise(actorRef, component.lifeCycle)
activeObjectRegistry.put(targetClass, (proxy, proxy, component))
new DependencyBinding(targetClass, proxy)
}
@@ -103,20 +103,20 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
val targetClass = component.intf.get
val targetInstance = component.target.newInstance.asInstanceOf[AnyRef] // TODO: perhaps need to put in registry
component.target.getConstructor(Array[Class[_]](): _*).setAccessible(true)
- val actor = new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks)
- if (component.dispatcher.isDefined) actor.dispatcher = component.dispatcher.get
+ val actorRef = new ActorRef(() => new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks))
+ if (component.dispatcher.isDefined) actorRef.dispatcher = component.dispatcher.get
val remoteAddress =
if (component.remoteAddress.isDefined)
Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
else None
val proxy = ActiveObject.newInstance(
- targetClass, targetInstance, actor, remoteAddress, component.timeout).asInstanceOf[AnyRef]
+ targetClass, targetInstance, actorRef, remoteAddress, component.timeout).asInstanceOf[AnyRef]
if (remoteAddress.isDefined) {
RemoteServer
.actorsFor(RemoteServer.Address(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
.activeObjects.put(targetClass.getName, proxy)
}
- supervised ::= Supervise(new ActorRef(() => actor), component.lifeCycle)
+ supervised ::= Supervise(actorRef, component.lifeCycle)
activeObjectRegistry.put(targetClass, (proxy, targetInstance, component))
new DependencyBinding(targetClass, proxy)
}
diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala
index 4ea16e47e1..eb3a647710 100644
--- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala
+++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala
@@ -65,8 +65,8 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche
var lockAcquiredOnce = false
// this do-wile loop is required to prevent missing new messages between the end of the inner while
// loop and releasing the lock
- val lock = invocation.receiver.actor._dispatcherLock
- val mailbox = invocation.receiver.actor._mailbox
+ val lock = invocation.receiver._dispatcherLock
+ val mailbox = invocation.receiver._mailbox
do {
if (lock.tryLock) {
lockAcquiredOnce = true
diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala
index 4edf6651c0..f8bf772ccb 100644
--- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala
+++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala
@@ -70,7 +70,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
*/
private def tryProcessMailbox(receiver: ActorRef): Boolean = {
var lockAcquiredOnce = false
- val lock = receiver.actor._dispatcherLock
+ val lock = receiver._dispatcherLock
// this do-wile loop is required to prevent missing new messages between the end of processing
// the mailbox and releasing the lock
do {
diff --git a/akka-core/src/main/scala/dispatch/MessageHandling.scala b/akka-core/src/main/scala/dispatch/MessageHandling.scala
index 6c9fc0f842..3ba6e319df 100644
--- a/akka-core/src/main/scala/dispatch/MessageHandling.scala
+++ b/akka-core/src/main/scala/dispatch/MessageHandling.scala
@@ -19,7 +19,7 @@ final class MessageInvocation(val receiver: ActorRef,
val transactionSet: Option[CountDownCommitBarrier]) {
if (receiver eq null) throw new IllegalArgumentException("receiver is null")
- def invoke = receiver.actor.invoke(this)
+ def invoke = receiver.invoke(this)
def send = receiver.dispatcher.dispatch(this)
diff --git a/akka-core/src/main/scala/remote/Cluster.scala b/akka-core/src/main/scala/remote/Cluster.scala
index 93c28029e5..a1abe9b6b9 100644
--- a/akka-core/src/main/scala/remote/Cluster.scala
+++ b/akka-core/src/main/scala/remote/Cluster.scala
@@ -7,7 +7,7 @@ package se.scalablesolutions.akka.remote
import se.scalablesolutions.akka.config.Config.config
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.serialization.Serializer
-import se.scalablesolutions.akka.actor.{Supervisor, SupervisorFactory, Actor, ActorRegistry}
+import se.scalablesolutions.akka.actor.{Supervisor, SupervisorFactory, Actor, ActorRef, ActorRegistry}
import se.scalablesolutions.akka.util.Logging
import scala.collection.immutable.{Map, HashMap}
@@ -76,7 +76,6 @@ trait ClusterActor extends Actor with Cluster {
*/
private[akka] object ClusterActor {
sealed trait ClusterMessage
-
private[akka] case class RelayedMessage(actorClassFQN: String, msg: AnyRef) extends ClusterMessage
private[akka] case class Message[ADDR_T](sender: ADDR_T, msg: Array[Byte])
private[akka] case object PapersPlease extends ClusterMessage
@@ -236,32 +235,37 @@ object Cluster extends Cluster with Logging {
lazy val DEFAULT_SERIALIZER_CLASS_NAME = Serializer.Java.getClass.getName
@volatile private[remote] var clusterActor: Option[ClusterActor] = None
+ @volatile private[remote] var clusterActorRef: Option[ActorRef] = None
- private[remote] def createClusterActor(loader : ClassLoader): Option[ClusterActor] = {
+ private[remote] def createClusterActor(loader: ClassLoader): Option[ActorRef] = {
val name = config.getString("akka.remote.cluster.actor")
if (name.isEmpty) throw new IllegalArgumentException(
"Can't start cluster since the 'akka.remote.cluster.actor' configuration option is not defined")
- val serializer = Class.forName(config.getString("akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME)).newInstance.asInstanceOf[Serializer]
+ val serializer = Class.forName(config.getString(
+ "akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME))
+ .newInstance.asInstanceOf[Serializer]
serializer.classLoader = Some(loader)
try {
name map {
fqn =>
val a = Class.forName(fqn).newInstance.asInstanceOf[ClusterActor]
a setSerializer serializer
- a
+ new ActorRef(() => a)
}
}
catch {
- case e => log.error(e, "Couldn't load Cluster provider: [%s]", name.getOrElse("Not specified")); None
+ case e =>
+ log.error(e, "Couldn't load Cluster provider: [%s]", name.getOrElse("Not specified"))
+ None
}
}
- private[akka] def createSupervisor(actor: ClusterActor): Option[Supervisor] = {
+ private[akka] def createSupervisor(actor: ActorRef): Option[ActorRef] = {
val sup = SupervisorFactory(
SupervisorConfig(
RestartStrategy(OneForOne, 5, 1000, List(classOf[Exception])),
- Supervise(actor.self, LifeCycle(Permanent)) :: Nil)
+ Supervise(actor, LifeCycle(Permanent)) :: Nil)
).newInstance
Some(sup)
}
@@ -281,12 +285,15 @@ object Cluster extends Cluster with Logging {
def start: Unit = start(None)
- def start(serializerClassLoader : Option[ClassLoader]): Unit = synchronized {
+ def start(serializerClassLoader: Option[ClassLoader]): Unit = synchronized {
log.info("Starting up Cluster Service...")
if (clusterActor.isEmpty) {
- for{ actor <- createClusterActor(serializerClassLoader getOrElse getClass.getClassLoader)
- sup <- createSupervisor(actor) } {
- clusterActor = Some(actor)
+ for {
+ actorRef <- createClusterActor(serializerClassLoader getOrElse getClass.getClassLoader)
+ sup <- createSupervisor(actorRef)
+ } {
+ clusterActorRef = Some(actorRef)
+ clusterActor = Some(actorRef.actor.asInstanceOf[ClusterActor])
sup.start
}
}
@@ -294,8 +301,8 @@ object Cluster extends Cluster with Logging {
def shutdown: Unit = synchronized {
log.info("Shutting down Cluster Service...")
- for{
- c <- clusterActor
+ for {
+ c <- clusterActorRef
s <- c._supervisor
} s.stop
clusterActor = None
diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala
index 38e068b9a1..49238c1ff6 100644
--- a/akka-core/src/main/scala/remote/RemoteClient.scala
+++ b/akka-core/src/main/scala/remote/RemoteClient.scala
@@ -5,7 +5,7 @@
package se.scalablesolutions.akka.remote
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequestProtocol, RemoteReplyProtocol}
-import se.scalablesolutions.akka.actor.{Exit, Actor, ActorRef}
+import se.scalablesolutions.akka.actor.{Exit, Actor, ActorRef, RemoteActorRef}
import se.scalablesolutions.akka.dispatch.{DefaultCompletableFuture, CompletableFuture}
import se.scalablesolutions.akka.util.{UUID, Logging}
import se.scalablesolutions.akka.config.Config.config
@@ -43,62 +43,6 @@ case class RemoteClientError(cause: Throwable) extends RemoteClientLifeCycleEven
case class RemoteClientDisconnected(host: String, port: Int) extends RemoteClientLifeCycleEvent
case class RemoteClientConnected(host: String, port: Int) extends RemoteClientLifeCycleEvent
-/**
- * Remote Actor proxy factory.
- *
- * @author Jonas Bonér
- */
-private[akka] object RemoteActorProxy {
- def apply(uuid: String, className: String, hostname: String, port: Int, timeout: Long): ActorRef =
- new ActorRef(() => new RemoteActorProxy(uuid, className, hostname, port, timeout))
-}
-
-/**
- * Remote Actor proxy.
- *
- * @author Jonas Bonér
- */
-private[akka] class RemoteActorProxy private (
- uuid: String, className: String, hostname: String, port: Int, timeOut: Long) extends Actor {
- start
- val remoteClient = RemoteClient.clientFor(hostname, port)
-
- override def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = {
- val requestBuilder = RemoteRequestProtocol.newBuilder
- .setId(RemoteRequestProtocolIdFactory.nextId)
- .setTarget(className)
- .setTimeout(timeOut)
- .setUuid(uuid)
- .setIsActor(true)
- .setIsOneWay(true)
- .setIsEscaped(false)
- senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol))
- RemoteProtocolBuilder.setMessage(message, requestBuilder)
- remoteClient.send[Any](requestBuilder.build, None)
- }
-
- override def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
- message: Any,
- timeout: Long,
- senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
- val requestBuilder = RemoteRequestProtocol.newBuilder
- .setId(RemoteRequestProtocolIdFactory.nextId)
- .setTarget(className)
- .setTimeout(timeout)
- .setUuid(uuid)
- .setIsActor(true)
- .setIsOneWay(false)
- .setIsEscaped(false)
- //senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol))
- RemoteProtocolBuilder.setMessage(message, requestBuilder)
- val future = remoteClient.send(requestBuilder.build, senderFuture)
- if (future.isDefined) future.get
- else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
- }
-
- def receive = {case _ => {}}
-}
-
/**
* @author Jonas Bonér
*/
@@ -121,7 +65,7 @@ object RemoteClient extends Logging {
actorFor(className, className, timeout, hostname, port)
def actorFor(actorRef: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef =
- RemoteActorProxy(actorRef, className, hostname, port, timeout)
+ RemoteActorRef(actorRef, className, hostname, port, timeout)
def clientFor(hostname: String, port: Int): RemoteClient = clientFor(new InetSocketAddress(hostname, port))
diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala
index 6703d677c0..7fa21fccf6 100644
--- a/akka-core/src/main/scala/remote/RemoteServer.scala
+++ b/akka-core/src/main/scala/remote/RemoteServer.scala
@@ -226,7 +226,7 @@ class RemoteServer extends Logging {
log.info("Unregistering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, actorRef.id)
val server = RemoteServer.actorsFor(RemoteServer.Address(hostname, port))
server.actors.remove(actorRef.id)
- if (actorRef.actor._registeredInRemoteNodeDuringSerialization) server.actors.remove(actorRef.uuid)
+ if (actorRef._registeredInRemoteNodeDuringSerialization) server.actors.remove(actorRef.uuid)
}
}
@@ -241,7 +241,7 @@ class RemoteServer extends Logging {
val server = RemoteServer.actorsFor(RemoteServer.Address(hostname, port))
val actorRef = server.actors.get(id)
server.actors.remove(id)
- if (actorRef.actor._registeredInRemoteNodeDuringSerialization) server.actors.remove(actorRef.uuid)
+ if (actorRef._registeredInRemoteNodeDuringSerialization) server.actors.remove(actorRef.uuid)
}
}
}
@@ -459,17 +459,15 @@ class RemoteServerHandler(
*/
private def createActor(name: String, uuid: String, timeout: Long): ActorRef = {
val actorRefOrNull = actors.get(uuid)
- println("----------- ACTOR " + actorRefOrNull + " " + uuid)
if (actorRefOrNull eq null) {
try {
log.info("Creating a new remote actor [%s:%s]", name, uuid)
val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name)
else Class.forName(name)
- val newInstance = clazz.newInstance.asInstanceOf[Actor]
- newInstance._uuid = uuid
- newInstance.timeout = timeout
- newInstance._remoteAddress = None
- val actorRef = new ActorRef(() => newInstance)
+ val actorRef = new ActorRef(() => clazz.newInstance.asInstanceOf[Actor])
+ actorRef._uuid = uuid
+ actorRef.timeout = timeout
+ actorRef._remoteAddress = None
actors.put(uuid, actorRef)
actorRef
} catch {
diff --git a/akka-core/src/main/scala/routing/Patterns.scala b/akka-core/src/main/scala/routing/Patterns.scala
index 258847b3fc..a18d82886a 100644
--- a/akka-core/src/main/scala/routing/Patterns.scala
+++ b/akka-core/src/main/scala/routing/Patterns.scala
@@ -28,25 +28,22 @@ object Patterns {
*/
def loadBalancerActor(actors: => InfiniteIterator[ActorRef]): ActorRef =
newActor(() => new Actor with LoadBalancer {
- start
val seq = actors
- })
+ }).start
/** Creates a Dispatcher given a routing and a message-transforming function
*/
def dispatcherActor(routing: PF[Any, ActorRef], msgTransformer: (Any) => Any): ActorRef =
newActor(() => new Actor with Dispatcher {
- start
override def transform(msg: Any) = msgTransformer(msg)
def routes = routing
- })
+ }).start
/** Creates a Dispatcher given a routing
*/
def dispatcherActor(routing: PF[Any, ActorRef]): ActorRef = newActor(() => new Actor with Dispatcher {
- start
def routes = routing
- })
+ }).start
/** Creates an actor that pipes all incoming messages to
* both another actor and through the supplied function
diff --git a/akka-core/src/main/scala/routing/Routers.scala b/akka-core/src/main/scala/routing/Routers.scala
index dcfa3add15..991820d314 100644
--- a/akka-core/src/main/scala/routing/Routers.scala
+++ b/akka-core/src/main/scala/routing/Routers.scala
@@ -16,7 +16,7 @@ trait Dispatcher { self: Actor =>
protected def dispatch: PartialFunction[Any, Unit] = {
case a if routes.isDefinedAt(a) =>
- if (self.replyTo.isDefined) routes(a) forward transform(a)
+ if (self.self.replyTo.isDefined) routes(a).forward(transform(a))(Some(self.self))
else routes(a).!(transform(a))(None)
}
diff --git a/akka-core/src/main/scala/stm/DataFlowVariable.scala b/akka-core/src/main/scala/stm/DataFlowVariable.scala
index 5b4826550a..d5e3901fa7 100644
--- a/akka-core/src/main/scala/stm/DataFlowVariable.scala
+++ b/akka-core/src/main/scala/stm/DataFlowVariable.scala
@@ -40,7 +40,7 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture
}
def thread[A <: AnyRef, R <: AnyRef](body: A => R) =
- new ReactiveEventBasedThread(body).start
+ newActor(() => new ReactiveEventBasedThread(body)).start
private class ReactiveEventBasedThread[A <: AnyRef, T <: AnyRef](body: A => T)
extends Actor {
@@ -65,7 +65,6 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture
private class In[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor {
timeout = TIME_OUT
- start
def receive = {
case Set(v) =>
if (dataFlow.value.compareAndSet(None, Some(v.asInstanceOf[T]))) {
@@ -80,7 +79,6 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture
private class Out[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor {
timeout = TIME_OUT
- start
private var readerFuture: Option[CompletableFuture[T]] = None
def receive = {
case Get =>
@@ -88,7 +86,7 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture
if (ref.isDefined)
reply(ref.get)
else {
- readerFuture = replyTo match {
+ readerFuture = self.replyTo match {
case Some(Right(future)) => Some(future.asInstanceOf[CompletableFuture[T]])
case _ => None
}
@@ -98,7 +96,7 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture
}
}
- private[this] val in = newActor(() => new In(this))
+ private[this] val in = newActor(() => new In(this)).start
def <<(ref: DataFlowVariable[T]) = in ! Set(ref())
@@ -108,7 +106,7 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture
val ref = value.get
if (ref.isDefined) ref.get
else {
- val out = newActor(() => new Out(this))
+ val out = newActor(() => new Out(this)).start
blockedReaders.offer(out)
val result = out !! Get
out ! Exit