diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 0800f67065..105ef4798d 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -50,9 +50,9 @@ abstract class RemoteActor(hostname: String, port: Int) extends Actor { @serializable sealed trait LifeCycleMessage case class HotSwap(code: Option[PartialFunction[Any, Unit]]) extends LifeCycleMessage case class Restart(reason: Throwable) extends LifeCycleMessage -case class Exit(dead: Actor, killer: Throwable) extends LifeCycleMessage -case class Unlink(child: Actor) extends LifeCycleMessage -case class UnlinkAndStop(child: Actor) extends LifeCycleMessage +case class Exit(dead: ActorID, killer: Throwable) extends LifeCycleMessage +case class Unlink(child: ActorID) extends LifeCycleMessage +case class UnlinkAndStop(child: ActorID) extends LifeCycleMessage case object Kill extends LifeCycleMessage class ActorKilledException private[akka](message: String) extends RuntimeException(message) @@ -72,72 +72,39 @@ class ActorMessageInvoker(val actor: Actor) extends MessageInvoker { def invoke(handle: MessageInvocation) = actor.invoke(handle) } -final class ActorRef private (val actor: Actor) { - - private[akka] var _uuid = UUID.newUuid.toString - @volatile private[this] var _isRunning = false - @volatile private[this] var _isSuspended = true - @volatile private[this] var _isShutDown = false - @volatile private[this] var _isEventBased: Boolean = false - @volatile private[akka] var _isKilled = false - private[akka] var _remoteAddress: Option[InetSocketAddress] = None - private[akka] var _linkedActors: Option[HashSet[Actor]] = None - private[akka] var _supervisor: Option[Actor] = None - private[akka] var _replyToAddress: Option[InetSocketAddress] = None - private[akka] val _mailbox: Deque[MessageInvocation] = new ConcurrentLinkedDeque[MessageInvocation] +final class ActorID private[akka] (private[akka] val actor: Actor) { + if (actor eq null) throw new IllegalArgumentException("Actor instance passed to ActorID can not be 'null'") /** * Starts up the actor and its message queue. */ - def start: Actor = synchronized { - if (_isShutDown) throw new IllegalStateException("Can't restart an actor that has been shut down with 'stop' or 'exit'") - if (!_isRunning) { - messageDispatcher.register(this) - messageDispatcher.start - _isRunning = true - init - initTransactionalState - } - Actor.log.debug("[%s] has started", toString) - ActorRegistry.register(this) - this - } + def start = actor.start /** * Shuts down the actor its dispatcher and message queue. * Alias for 'stop'. */ - protected def exit = stop + protected def exit = actor.stop /** * Shuts down the actor its dispatcher and message queue. */ - def stop = synchronized { - if (_isRunning) { - messageDispatcher.unregister(this) - _isRunning = false - _isShutDown = true - shutdown - ActorRegistry.unregister(this) - _remoteAddress.foreach(address => RemoteClient.unregister(address.getHostName, address.getPort, uuid)) - } - } + def stop = actor.stop /** * Is the actor running? */ - def isRunning: Boolean = _isRunning + def isRunning: Boolean = actor.isRunning /** * Returns the mailbox size. */ - def mailboxSize: Int = _mailbox.size - + def mailboxSize: Int = actor.mailboxSize /** * Returns the supervisor, if there is one. */ - def supervisor: Option[Actor] = _supervisor + def supervisor: Option[ActorID] = actor.supervisor /** * Sends a one-way asynchronous message. E.g. fire-and-forget semantics. @@ -153,9 +120,9 @@ final class ActorRef private (val actor: Actor) { * *

*/ - def !(message: Any)(implicit sender: Option[Actor] = None) = { - if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") - if (_isRunning) postMessageToMailbox(message, sender) + def !(message: Any)(implicit sender: Option[ActorID] = None) = { + if (actor.isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") + if (actor.isRunning) actor.postMessageToMailbox(message, sender) else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") } @@ -172,11 +139,11 @@ final class ActorRef private (val actor: Actor) { * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ def !![T](message: Any, timeout: Long): Option[T] = { - if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") - if (_isRunning) { - val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, None) + if (actor.isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") + if (actor.isRunning) { + val future = actor.postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, None) val isActiveObject = message.isInstanceOf[Invocation] - if (isActiveObject && message.asInstanceOf[Invocation].isVoid) future.completeWithResult(None) + if (isActiveObject && message.asInstanceOf[Invocation].isVoid) future.asInstanceOf[CompletableFuture[Option[_]]].completeWithResult(None) try { future.await } catch { @@ -184,47 +151,13 @@ final class ActorRef private (val actor: Actor) { if (isActiveObject) throw e else None } - getResultOrThrowException(future) - } else throw new IllegalStateException( + + if (future.exception.isDefined) throw future.exception.get._2 + else future.result + } + else throw new IllegalStateException( "Actor has not been started, you need to invoke 'actor.start' before using it") } -<<<<<<< HEAD:akka-core/src/main/scala/actor/Actor.scala -======= -} - -/** - * Actor base trait that should be extended by or mixed to create an Actor with the semantics of the 'Actor Model': - * http://en.wikipedia.org/wiki/Actor_model - *

- * An actor has a well-defined (non-cyclic) life-cycle. - *

- * => NEW (newly created actor) - can't receive messages (yet)
- *     => STARTED (when 'start' is invoked) - can receive messages
- *         => SHUT DOWN (when 'exit' is invoked) - can't do anything
- * 
- * - * @author Jonas Bonér - */ -trait Actor extends TransactionManagement with Logging { - implicit protected val self: Some[Actor] = Some(this) - // Only mutable for RemoteServer in order to maintain identity across nodes - private[akka] var _uuid = UUID.newUuid.toString - - // ==================================== - // private fields - // ==================================== - - @volatile private[this] var _isRunning = false - @volatile private[this] var _isSuspended = true - @volatile private[this] var _isShutDown = false - @volatile private[akka] var _isKilled = false - private var _hotswap: Option[PartialFunction[Any, Unit]] = None - private[akka] var _remoteAddress: Option[InetSocketAddress] = None - private[akka] var _linkedActors: Option[HashSet[Actor]] = None - private[akka] var _supervisor: Option[Actor] = None - private[akka] var _replyToAddress: Option[InetSocketAddress] = None - private[akka] val _mailbox: Deque[MessageInvocation] = new ConcurrentLinkedDeque[MessageInvocation] ->>>>>>> master:akka-core/src/main/scala/actor/Actor.scala /** * Sends a message asynchronously and waits on a future for a reply message. @@ -240,122 +173,63 @@ trait Actor extends TransactionManagement with Logging { * If you are sending messages using !! then you have to use reply(..) * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ - def !![T](message: Any): Option[T] = !![T](message, timeout) + def !![T](message: Any): Option[T] = !![T](message, actor.timeout) /** -<<<<<<< HEAD:akka-core/src/main/scala/actor/Actor.scala * FIXME document !!! */ - def !!!(message: Any): Future = { - if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") - if (_isRunning) { - postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, None) + def !!![T](message: Any): Future[T] = { + if (actor.isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") + if (actor.isRunning) { + actor.postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, actor.timeout, None) } else throw new IllegalStateException( "Actor has not been started, you need to invoke 'actor.start' before using it") } -======= - * Holds the reference to the sender of the currently processed message. - * Is None if no sender was specified - * Is Some(Left(Actor)) if sender is an actor - * Is Some(Right(CompletableFuture)) if sender is holding on to a Future for the result - */ - protected var replyTo: Option[Either[Actor,CompletableFuture[Any]]] = None - - // ==================================== - // ==== USER CALLBACKS TO OVERRIDE ==== - // ==================================== ->>>>>>> master:akka-core/src/main/scala/actor/Actor.scala /** * Forwards the message and passes the original sender actor as the sender. *

* Works with both '!' and '!!'. */ - def forward(message: Any)(implicit sender: Option[Actor] = None) = { - if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") - if (_isRunning) { - val forwarder = sender.getOrElse(throw new IllegalStateException("Can't forward message when the forwarder/mediator is not an actor")) - forwarder.replyTo match { - case Some(Left(actor)) => postMessageToMailbox(message, Some(actor)) - case Some(Right(future)) => postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, Some(future)) + def forward(message: Any)(implicit sender: Some[ActorID]) = { + if (actor.isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") + if (actor.isRunning) { + sender.get.actor.replyTo match { + case Some(Left(actorID)) => actor.postMessageToMailbox(message, Some(actorID)) + case Some(Right(future)) => actor.postMessageToMailboxAndCreateFutureResultWithTimeout(message, actor.timeout, Some(future)) case _ => throw new IllegalStateException("Can't forward message when initial sender is not an actor") } } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") } - /** - * Use reply(..) to reply with a message to the original sender of the message currently - * being processed. - */ - protected[this] def reply(message: Any) = replyTo match { - case Some(Left(actor)) => actor ! message - case Some(Right(future)) => future.completeWithResult(message) - case _ => throw new IllegalStateException( - "\n\tNo sender in scope, can't reply. " + - "\n\tYou have probably used the '!' method to either; " + - "\n\t\t1. Send a message to a remote actor which does not have a contact address." + - "\n\t\t2. Send a message from an instance that is *not* an actor" + - "\n\t\t3. Send a message to an Active Object annotated with the '@oneway' annotation? " + - "\n\tIf so, switch to '!!' (or remove '@oneway') which passes on an implicit future" + - "\n\tthat will be bound by the argument passed to 'reply'." + - "\n\tAlternatively, you can use setReplyToAddress to make sure the actor can be contacted over the network.") - } - /** * Get the dispatcher for this actor. */ -<<<<<<< HEAD:akka-core/src/main/scala/actor/Actor.scala - def dispatcher: MessageDispatcher = messageDispatcher -======= - protected[akka] var messageDispatcher: MessageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher ->>>>>>> master:akka-core/src/main/scala/actor/Actor.scala + def dispatcher: MessageDispatcher = actor.messageDispatcher /** * Sets the dispatcher for this actor. Needs to be invoked before the actor is started. */ -<<<<<<< HEAD:akka-core/src/main/scala/actor/Actor.scala - def dispatcher_=(md: MessageDispatcher): Unit = synchronized { - if (!_isRunning) { - messageDispatcher.unregister(this) - messageDispatcher = md - messageDispatcher.register(this) - _isEventBased = messageDispatcher.isInstanceOf[ExecutorBasedEventDrivenDispatcher] - } else throw new IllegalArgumentException( - "Can not swap dispatcher for " + toString + " after it has been started") - } -======= - protected[akka] var trapExit: List[Class[_ <: Throwable]] = Nil ->>>>>>> master:akka-core/src/main/scala/actor/Actor.scala + def dispatcher_=(md: MessageDispatcher): Unit = actor.dispatcher = md /** * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host. */ -<<<<<<< HEAD:akka-core/src/main/scala/actor/Actor.scala def makeRemote(hostname: String, port: Int): Unit = - if (_isRunning) throw new IllegalStateException("Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.") - else makeRemote(new InetSocketAddress(hostname, port)) -======= - protected[akka] var faultHandler: Option[FaultHandlingStrategy] = None ->>>>>>> master:akka-core/src/main/scala/actor/Actor.scala + if (actor.isRunning) throw new IllegalStateException("Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.") + else actor.makeRemote(new InetSocketAddress(hostname, port)) /** * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host. */ - def makeRemote(address: InetSocketAddress): Unit = - if (_isRunning) throw new IllegalStateException("Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.") - else { - _remoteAddress = Some(address) - RemoteClient.register(address.getHostName, address.getPort, uuid) - if (_replyToAddress.isEmpty) setReplyToAddress(Actor.HOSTNAME, Actor.PORT) - } - + def makeRemote(address: InetSocketAddress): Unit = actor.makeRemote(address) /** * Set the contact address for this actor. This is used for replying to messages sent asynchronously when no reply channel exists. */ - def setReplyToAddress(hostname: String, port: Int): Unit = setReplyToAddress(new InetSocketAddress(hostname, port)) + def setReplyToAddress(hostname: String, port: Int): Unit = actor.setReplyToAddress(new InetSocketAddress(hostname, port)) - def setReplyToAddress(address: InetSocketAddress): Unit = _replyToAddress = Some(address) + def setReplyToAddress(address: InetSocketAddress): Unit = actor.setReplyToAddress(address) /** * Invoking 'makeTransactionRequired' means that the actor will **start** a new transaction if non exists. @@ -365,123 +239,32 @@ trait Actor extends TransactionManagement with Logging { * TransactionManagement.disableTransactions * */ - def makeTransactionRequired = synchronized { - if (_isRunning) throw new IllegalArgumentException( - "Can not make actor transaction required after it has been started") - else isTransactor = true - } + def makeTransactionRequired = actor.makeTransactionRequired /** - * Links an other actor to this actor. Links are unidirectional and means that a the linking actor will - * receive a notification if the linked actor has crashed. - *

- * If the 'trapExit' member field has been set to at contain at least one exception class then it will - * 'trap' these exceptions and automatically restart the linked actors according to the restart strategy - * defined by the 'faultHandler'. - *

- * To be invoked from within the actor itself. + * Returns the id for the actor. */ - def link(actor: Actor) = { - if (actor._supervisor.isDefined) throw new IllegalStateException( - "Actor can only have one supervisor [" + actor + "], e.g. link(actor) fails") - getLinkedActors.add(actor) - actor._supervisor = Some(this) - Actor.log.debug("Linking actor [%s] to actor [%s]", actor, this) - } + def getId = actor.getId /** - * Unlink the actor. - *

- * To be invoked from within the actor itself. + * Returns the uuid for the actor. */ - def unlink(actor: Actor) = { - if (!getLinkedActors.contains(actor)) throw new IllegalStateException( - "Actor [" + actor + "] is not a linked actor, can't unlink") - getLinkedActors.remove(actor) - actor._supervisor = None - Actor.log.debug("Unlinking actor [%s] from actor [%s]", actor, this) - } + def uuid = actor.uuid + + override def toString = "ActorID[" + actor.toString + "]" + override def hashCode = actor.hashCode + override def equals(that: AnyRef) = actor.equals(that) - /** - * Atomically start and link an actor. - *

- * To be invoked from within the actor itself. - */ - def startLink(actor: Actor) = { - try { - actor.start - } finally { - link(actor) - } - } + private[akka] def supervisor_=(sup: Option[ActorID]): Unit = actor._supervisor = sup - /** - * Atomically start, link and make an actor remote. - *

- * To be invoked from within the actor itself. - */ - def startLinkRemote(actor: Actor, hostname: String, port: Int) = { - try { - actor.makeRemote(hostname, port) - actor.start - } finally { - link(actor) - } - } + private[akka] def trapExit: List[Class[_ <: Throwable]] = actor.trapExit + private[akka] def trapExit_=(exits: List[Class[_ <: Throwable]]) = actor.trapExit = exits - /** - * Atomically create (from actor class) and start an actor. - *

- * To be invoked from within the actor itself. - */ - def spawn[T <: Actor : Manifest] : T = { - val actor = spawnButDoNotStart[T] - actor.start - actor - } + private[akka] def lifeCycle: Option[LifeCycle] = actor.lifeCycle + private[akka] def lifeCycle_=(cycle: Option[LifeCycle]) = actor.lifeCycle = cycle - /** - * Atomically create (from actor class), start and make an actor remote. - *

- * To be invoked from within the actor itself. - */ - def spawnRemote[T <: Actor : Manifest](hostname: String, port: Int): T = { - val actor = spawnButDoNotStart[T] - actor.makeRemote(hostname, port) - actor.start - actor - } - - /** - * Atomically create (from actor class), start and link an actor. - *

- * To be invoked from within the actor itself. - */ - def spawnLink[T <: Actor : Manifest] : T = { - val actor = spawnButDoNotStart[T] - try { - actor.start - } finally { - link(actor) - } - actor - } - - /** - * Atomically create (from actor class), start, link and make an actor remote. - *

- * To be invoked from within the actor itself. - */ - def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): T = { - val actor = spawnButDoNotStart[T] - try { - actor.makeRemote(hostname, port) - actor.start - } finally { - link(actor) - } - actor - } + private[akka] def faultHandler: Option[FaultHandlingStrategy] = actor.faultHandler + private[akka] def faultHandler_=(handler: Option[FaultHandlingStrategy]) = actor.faultHandler = handler } /** @@ -500,12 +283,11 @@ object Actor extends Logging { object Self } - def newActor[T <: Actor: Manifest]: ActorRef = { + def newActor[T <: Actor: Manifest]: ActorID = { val actor = manifest[T].erasure.asInstanceOf[Class[T]].newInstance - new ActorRef(actor) + new ActorID(actor) } - /** * Use to create an anonymous event-driven actor. *

@@ -522,11 +304,12 @@ object Actor extends Logging { * } * */ - def actor(body: PartialFunction[Any, Unit]): Actor = new Actor() { - lifeCycle = Some(LifeCycle(Permanent)) - start - def receive: PartialFunction[Any, Unit] = body - } + def actor(body: PartialFunction[Any, Unit]): ActorID = + new ActorID(new Actor() { + lifeCycle = Some(LifeCycle(Permanent)) + start + def receive: PartialFunction[Any, Unit] = body + }) /** * Use to create an anonymous transactional event-driven actor. @@ -544,33 +327,12 @@ object Actor extends Logging { * } * */ -<<<<<<< HEAD:akka-core/src/main/scala/actor/Actor.scala - def transactor(body: PartialFunction[Any, Unit]): Actor = new Transactor() { - lifeCycle = Some(LifeCycle(Permanent)) - start - def receive: PartialFunction[Any, Unit] = body -======= - def !![T](message: Any, timeout: Long): Option[T] = { - if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") - if (_isRunning) { - val future = postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, None) - val isActiveObject = message.isInstanceOf[Invocation] - if (isActiveObject && message.asInstanceOf[Invocation].isVoid) future.asInstanceOf[CompletableFuture[Option[_]]].completeWithResult(None) - try { - future.await - } catch { - case e: FutureTimeoutException => - if (isActiveObject) throw e - else None - } - - if (future.exception.isDefined) throw future.exception.get._2 - else future.result - } - else throw new IllegalStateException( - "Actor has not been started, you need to invoke 'actor.start' before using it") ->>>>>>> master:akka-core/src/main/scala/actor/Actor.scala - } + def transactor(body: PartialFunction[Any, Unit]): ActorID = + new ActorID(new Transactor() { + lifeCycle = Some(LifeCycle(Permanent)) + start + def receive: PartialFunction[Any, Unit] = body + }) /** * Use to create an anonymous event-driven actor with a 'temporary' life-cycle configuration, @@ -586,11 +348,12 @@ object Actor extends Logging { * } * */ - def temporaryActor(body: PartialFunction[Any, Unit]): Actor = new Actor() { - lifeCycle = Some(LifeCycle(Temporary)) - start - def receive = body - } + def temporaryActor(body: PartialFunction[Any, Unit]): ActorID = + new ActorID(new Actor() { + lifeCycle = Some(LifeCycle(Temporary)) + start + def receive = body + }) /** * Use to create an anonymous event-driven actor with both an init block and a message loop block. @@ -611,12 +374,13 @@ object Actor extends Logging { */ def init[A](body: => Unit) = { def handler[A](body: => Unit) = new { - def receive(handler: PartialFunction[Any, Unit]) = new Actor() { - lifeCycle = Some(LifeCycle(Permanent)) - start - body - def receive = handler - } + def receive(handler: PartialFunction[Any, Unit]) = + new ActorID(new Actor() { + lifeCycle = Some(LifeCycle(Permanent)) + start + body + def receive = handler + }) } handler(body) } @@ -636,24 +400,15 @@ object Actor extends Logging { * } * */ -<<<<<<< HEAD:akka-core/src/main/scala/actor/Actor.scala def spawn(body: => Unit): Unit = { case object Spawn new Actor() { start - this ! Spawn + selfId ! Spawn def receive = { case Spawn => body; stop } } -======= - def !!![T](message: Any): Future[T] = { - if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") - if (_isRunning) { - postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, None) - } else throw new IllegalStateException( - "Actor has not been started, you need to invoke 'actor.start' before using it") ->>>>>>> master:akka-core/src/main/scala/actor/Actor.scala } } @@ -671,12 +426,25 @@ object Actor extends Logging { * @author Jonas Bonér */ trait Actor extends TransactionManagement with Logging { - - implicit protected val self: Option[Actor] = Some(this) // Only mutable for RemoteServer in order to maintain identity across nodes + private[akka] var _uuid = UUID.newUuid.toString -<<<<<<< HEAD:akka-core/src/main/scala/actor/Actor.scala - private[akka] var ref: Option[ActorRef] = None + implicit private[akka] var _actorID: Option[ActorID] = None + + // ==================================== + // private fields + // ==================================== + + @volatile private[this] var _isRunning = false + @volatile private[this] var _isSuspended = true + @volatile private[this] var _isShutDown = false + @volatile private[akka] var _isKilled = false + private var _hotswap: Option[PartialFunction[Any, Unit]] = None + private[akka] var _remoteAddress: Option[InetSocketAddress] = None + private[akka] var _linkedActors: Option[HashSet[ActorID]] = None + private[akka] var _supervisor: Option[ActorID] = None + private[akka] var _replyToAddress: Option[InetSocketAddress] = None + private[akka] val _mailbox: Deque[MessageInvocation] = new ConcurrentLinkedDeque[MessageInvocation] /** * This lock ensures thread safety in the dispatching: only one message can @@ -688,61 +456,13 @@ trait Actor extends TransactionManagement with Logging { // protected fields // ==================================== -======= /** - * Forwards the message and passes the original sender actor as the sender. - *

- * Works with both '!' and '!!'. + * Holds the reference to the sender of the currently processed message. + * Is None if no sender was specified + * Is Some(Left(Actor)) if sender is an actor + * Is Some(Right(CompletableFuture)) if sender is holding on to a Future for the result */ - def forward(message: Any)(implicit sender: Some[Actor]) = { - if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") - if (_isRunning) { - sender.get.replyTo match { - case Some(Left(actor)) => postMessageToMailbox(message, Some(actor)) - case Some(Right(future)) => postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, Some(future)) - case _ => throw new IllegalStateException("Can't forward message when initial sender is not an actor") - } - } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") - } - - /** - * Use reply(..) to reply with a message to the original sender of the message currently - * being processed. - * Throws an IllegalStateException if unable to determine what to reply to - */ - protected[this] def reply(message: Any) = if(!reply_?(message)) throw new IllegalStateException( - "\n\tNo sender in scope, can't reply. " + - "\n\tYou have probably used the '!' method to either; " + - "\n\t\t1. Send a message to a remote actor which does not have a contact address." + - "\n\t\t2. Send a message from an instance that is *not* an actor" + - "\n\t\t3. Send a message to an Active Object annotated with the '@oneway' annotation? " + - "\n\tIf so, switch to '!!' (or remove '@oneway') which passes on an implicit future" + - "\n\tthat will be bound by the argument passed to 'reply'." + - "\n\tAlternatively, you can use setReplyToAddress to make sure the actor can be contacted over the network.") - - /** - * Use reply_?(..) to reply with a message to the original sender of the message currently - * being processed. - * Returns true if reply was sent, and false if unable to determine what to reply to. - */ - protected[this] def reply_?(message: Any) : Boolean = replyTo match { - case Some(Left(actor)) => - actor ! message - true - - case Some(Right(future : Future[Any])) => - future completeWithResult message - true - - case _ => - false - } - ->>>>>>> master:akka-core/src/main/scala/actor/Actor.scala - /** - * TODO: Document replyTo - */ - protected var replyTo: Option[Either[Actor, CompletableFuture]] = None + private[akka] var replyTo: Option[Either[ActorID, CompletableFuture[Any]]] = None // ==================================== // ==== USER CALLBACKS TO OVERRIDE ==== @@ -759,18 +479,7 @@ trait Actor extends TransactionManagement with Logging { * use a custom name to be able to retrieve the "correct" persisted state * upon restart, remote restart etc. */ -<<<<<<< HEAD:akka-core/src/main/scala/actor/Actor.scala protected var id: String = this.getClass.getName -======= - def dispatcher_=(md: MessageDispatcher): Unit = synchronized { - if (!_isRunning) { - messageDispatcher.unregister(this) - messageDispatcher = md - messageDispatcher.register(this) - } else throw new IllegalArgumentException( - "Can not swap dispatcher for " + toString + " after it has been started") - } ->>>>>>> master:akka-core/src/main/scala/actor/Actor.scala /** * User overridable callback/setting. @@ -793,11 +502,7 @@ trait Actor extends TransactionManagement with Logging { * The default is also that all actors that are created and spawned from within this actor * is sharing the same dispatcher as its creator. */ - protected[akka] var messageDispatcher: MessageDispatcher = { - val dispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher - _isEventBased = dispatcher.isInstanceOf[ExecutorBasedEventDrivenDispatcher] - dispatcher - } + protected[akka] var messageDispatcher: MessageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher /** * User overridable callback/setting. @@ -817,7 +522,7 @@ trait Actor extends TransactionManagement with Logging { * trapExit = List(classOf[MyApplicationException], classOf[MyApplicationError]) * */ - protected var trapExit: List[Class[_ <: Throwable]] = Nil + protected[akka] var trapExit: List[Class[_ <: Throwable]] = Nil /** * User overridable callback/setting. @@ -830,7 +535,7 @@ trait Actor extends TransactionManagement with Logging { * faultHandler = Some(OneForOneStrategy(maxNrOfRetries, withinTimeRange)) * */ - protected var faultHandler: Option[FaultHandlingStrategy] = None + protected[akka] var faultHandler: Option[FaultHandlingStrategy] = None /** * User overridable callback/setting. @@ -892,16 +597,7 @@ trait Actor extends TransactionManagement with Logging { * Mandatory callback method that is called during restart and reinitialization after a server crash. * To be implemented by subclassing actor. */ -<<<<<<< HEAD:akka-core/src/main/scala/actor/Actor.scala protected def postRestart(reason: Throwable) {} -======= - protected[this] def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): T = { - val actor = spawnButDoNotStart[T] - actor.makeRemote(hostname, port) - actor.start - actor - } ->>>>>>> master:akka-core/src/main/scala/actor/Actor.scala /** * User overridable callback/setting. @@ -909,19 +605,7 @@ trait Actor extends TransactionManagement with Logging { * Optional callback method that is called during termination. * To be implemented by subclassing actor. */ -<<<<<<< HEAD:akka-core/src/main/scala/actor/Actor.scala protected def initTransactionalState {} -======= - protected[this] def spawnLink[T <: Actor: Manifest]: T = { - val actor = spawnButDoNotStart[T] - try { - actor.start - } finally { - link(actor) - } - actor - } ->>>>>>> master:akka-core/src/main/scala/actor/Actor.scala /** * User overridable callback/setting. @@ -931,6 +615,275 @@ trait Actor extends TransactionManagement with Logging { */ protected def shutdown {} + // ============= + // ==== API ==== + // ============= + + /** + * 'selfId' holds the ActorID for this actor. + */ + def selfId: ActorID = + _actorID.getOrElse(throw new IllegalStateException("ActorID for actor " + toString + " is not available")) + + /** + * Starts up the actor and its message queue. + */ + def start: Unit = synchronized { + if (_isShutDown) throw new IllegalStateException("Can't restart an actor that has been shut down with 'stop' or 'exit'") + if (!_isRunning) { + messageDispatcher.register(selfId) + messageDispatcher.start + _isRunning = true + init + initTransactionalState + } + Actor.log.debug("[%s] has started", toString) + ActorRegistry.register(selfId) + } + + /** + * Shuts down the actor its dispatcher and message queue. + * Alias for 'stop'. + */ + protected def exit = stop + + /** + * Shuts down the actor its dispatcher and message queue. + */ + def stop = synchronized { + if (_isRunning) { + messageDispatcher.unregister(selfId) + _isRunning = false + _isShutDown = true + shutdown + ActorRegistry.unregister(selfId) + _remoteAddress.foreach(address => RemoteClient.unregister(address.getHostName, address.getPort, uuid)) + } + } + + /** + * Is the actor killed? + */ + def isKilled: Boolean = _isKilled + + /** + * Is the actor running? + */ + def isRunning: Boolean = _isRunning + + /** + * Returns the mailbox size. + */ + def mailboxSize: Int = _mailbox.size + + + /** + * Returns the supervisor, if there is one. + */ + def supervisor: Option[ActorID] = _supervisor + + /** + * Use reply(..) to reply with a message to the original sender of the message currently + * being processed. + * Throws an IllegalStateException if unable to determine what to reply to + */ + protected[this] def reply(message: Any) = if(!reply_?(message)) throw new IllegalStateException( + "\n\tNo sender in scope, can't reply. " + + "\n\tYou have probably used the '!' method to either; " + + "\n\t\t1. Send a message to a remote actor which does not have a contact address." + + "\n\t\t2. Send a message from an instance that is *not* an actor" + + "\n\t\t3. Send a message to an Active Object annotated with the '@oneway' annotation? " + + "\n\tIf so, switch to '!!' (or remove '@oneway') which passes on an implicit future" + + "\n\tthat will be bound by the argument passed to 'reply'." + + "\n\tAlternatively, you can use setReplyToAddress to make sure the actor can be contacted over the network.") + + /** + * Use reply_?(..) to reply with a message to the original sender of the message currently + * being processed. + * Returns true if reply was sent, and false if unable to determine what to reply to. + */ + protected[this] def reply_?(message: Any): Boolean = replyTo match { + case Some(Left(actor)) => + actor ! message + true + + case Some(Right(future : Future[Any])) => + future completeWithResult message + true + + case _ => + false + } + + /** + * Get the dispatcher for this actor. + */ + def dispatcher: MessageDispatcher = messageDispatcher + + /** + * Sets the dispatcher for this actor. Needs to be invoked before the actor is started. + */ + def dispatcher_=(md: MessageDispatcher): Unit = synchronized { + if (!_isRunning) { + messageDispatcher.unregister(selfId) + messageDispatcher = md + messageDispatcher.register(selfId) + } else throw new IllegalArgumentException( + "Can not swap dispatcher for " + toString + " after it has been started") + } + + /** + * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host. + */ + def makeRemote(hostname: String, port: Int): Unit = + if (_isRunning) throw new IllegalStateException("Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.") + else makeRemote(new InetSocketAddress(hostname, port)) + + /** + * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host. + */ + def makeRemote(address: InetSocketAddress): Unit = + if (_isRunning) throw new IllegalStateException("Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.") + else { + _remoteAddress = Some(address) + RemoteClient.register(address.getHostName, address.getPort, uuid) + if (_replyToAddress.isEmpty) setReplyToAddress(Actor.HOSTNAME, Actor.PORT) + } + + + /** + * Set the contact address for this actor. This is used for replying to messages sent asynchronously when no reply channel exists. + */ + def setReplyToAddress(hostname: String, port: Int): Unit = setReplyToAddress(new InetSocketAddress(hostname, port)) + + def setReplyToAddress(address: InetSocketAddress): Unit = _replyToAddress = Some(address) + + /** + * Invoking 'makeTransactionRequired' means that the actor will **start** a new transaction if non exists. + * However, it will always participate in an existing transaction. + * If transactionality want to be completely turned off then do it by invoking: + *

+   *  TransactionManagement.disableTransactions
+   * 
+ */ + def makeTransactionRequired = synchronized { + if (_isRunning) throw new IllegalArgumentException( + "Can not make actor transaction required after it has been started") + else isTransactor = true + } + + /** + * Links an other actor to this actor. Links are unidirectional and means that a the linking actor will + * receive a notification if the linked actor has crashed. + *

+ * If the 'trapExit' member field has been set to at contain at least one exception class then it will + * 'trap' these exceptions and automatically restart the linked actors according to the restart strategy + * defined by the 'faultHandler'. + *

+ * To be invoked from within the actor itself. + */ + protected[this] def link(actorId: ActorID) = { + if (actorId.supervisor.isDefined) throw new IllegalStateException( + "Actor can only have one supervisor [" + actorId + "], e.g. link(actor) fails") + getLinkedActors.add(actorId) + actorId.supervisor = Some(selfId) + Actor.log.debug("Linking actor [%s] to actor [%s]", actorId, this) + } + + /** + * Unlink the actor. + *

+ * To be invoked from within the actor itself. + */ + protected[this] def unlink(actorId: ActorID) = { + if (!getLinkedActors.contains(actorId)) throw new IllegalStateException( + "Actor [" + actorId + "] is not a linked actor, can't unlink") + getLinkedActors.remove(actorId) + actorId.supervisor = None + Actor.log.debug("Unlinking actor [%s] from actor [%s]", actorId, this) + } + + /** + * Atomically start and link an actor. + *

+ * To be invoked from within the actor itself. + */ + protected[this] def startLink(actorId: ActorID) = { + try { + actorId.start + } finally { + link(actorId) + } + } + + /** + * Atomically start, link and make an actor remote. + *

+ * To be invoked from within the actor itself. + */ + protected[this] def startLinkRemote(actorId: ActorID, hostname: String, port: Int) = { + try { + actorId.makeRemote(hostname, port) + actorId.start + } finally { + link(actorId) + } + } + + /** + * Atomically create (from actor class) and start an actor. + *

+ * To be invoked from within the actor itself. + */ + protected[this] def spawn[T <: Actor : Manifest]: ActorID = { + val actorId = spawnButDoNotStart[T] + actorId.start + actorId + } + + /** + * Atomically create (from actor class), start and make an actor remote. + *

+ * To be invoked from within the actor itself. + */ + protected[this] def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorID = { + val actor = spawnButDoNotStart[T] + actor.makeRemote(hostname, port) + actor.start + actor + } + + /** + * Atomically create (from actor class), start and link an actor. + *

+ * To be invoked from within the actor itself. + */ + protected[this] def spawnLink[T <: Actor: Manifest]: ActorID = { + val actor = spawnButDoNotStart[T] + try { + actor.start + } finally { + link(actor) + } + actor + } + + /** + * Atomically create (from actor class), start, link and make an actor remote. + *

+ * To be invoked from within the actor itself. + */ + protected[this] def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorID = { + val actor = spawnButDoNotStart[T] + try { + actor.makeRemote(hostname, port) + actor.start + } finally { + link(actor) + } + actor + } + /** * Returns the id for the actor. */ @@ -949,15 +902,15 @@ trait Actor extends TransactionManagement with Logging { private[akka] def _resume = _isSuspended = false - private def spawnButDoNotStart[T <: Actor : Manifest] : T = { + private def spawnButDoNotStart[T <: Actor : Manifest]: ActorID = { val actor = manifest[T].erasure.asInstanceOf[Class[T]].newInstance if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) { actor.dispatcher = dispatcher } - actor + new ActorID(actor) } - protected[akka] def postMessageToMailbox(message: Any, sender: Option[Actor]): Unit = { + protected[akka] def postMessageToMailbox(message: Any, sender: Option[ActorID]): Unit = { joinTransaction(message) if (_remoteAddress.isDefined) { @@ -976,11 +929,11 @@ trait Actor extends TransactionManagement with Logging { // set the source fields used to reply back to the original sender // (i.e. not the remote proxy actor) if (sender.isDefined) { - val s = sender.get + val s = sender.get.actor requestBuilder.setSourceTarget(s.getClass.getName) requestBuilder.setSourceUuid(s.uuid) - val (host, port) = s._replyToAddress.map(a => (a.getHostName,a.getPort)).getOrElse((Actor.HOSTNAME, Actor.PORT)) + val (host, port) = s._replyToAddress.map(a => (a.getHostName, a.getPort)).getOrElse((Actor.HOSTNAME, Actor.PORT)) Actor.log.debug("Setting sending actor as %s @ %s:%s", s.getClass.getName, host, port) @@ -996,7 +949,7 @@ trait Actor extends TransactionManagement with Logging { RemoteProtocolBuilder.setMessage(message, requestBuilder) RemoteClient.clientFor(_remoteAddress.get).send[Any](requestBuilder.build, None) } else { - val invocation = new MessageInvocation(this, message, sender.map(Left(_)), transactionSet.get) + val invocation = new MessageInvocation(selfId, message, sender.map(Left(_)), transactionSet.get) if (messageDispatcher.usesActorMailbox) { _mailbox.add(invocation) if (_isSuspended) invocation.send @@ -1029,8 +982,8 @@ trait Actor extends TransactionManagement with Logging { } else { val future = if (senderFuture.isDefined) senderFuture.get else new DefaultCompletableFuture[T](timeout) - val invocation = new MessageInvocation(this, message, Some(Right(future.asInstanceOf[CompletableFuture[Any]])), transactionSet.get) - + val invocation = new MessageInvocation(selfId, message, Some(Right(future.asInstanceOf[CompletableFuture[Any]])), transactionSet.get) + if (messageDispatcher.usesActorMailbox) _mailbox.add(invocation) @@ -1074,9 +1027,9 @@ trait Actor extends TransactionManagement with Logging { _isKilled = true Actor.log.error(e, "Could not invoke actor [%s]", this) // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client - if (_supervisor.isDefined) _supervisor.get ! Exit(this, e) + if (_supervisor.isDefined) _supervisor.get ! Exit(selfId, e) replyTo match { - case Some(Right(future)) => future.completeWithException(this, e) + case Some(Right(future)) => future.completeWithException(selfId, e) case _ => } } finally { @@ -1125,7 +1078,7 @@ trait Actor extends TransactionManagement with Logging { Actor.log.error(e, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message) replyTo match { - case Some(Right(future)) => future.completeWithException(this, e) + case Some(Right(future)) => future.completeWithException(selfId, e) case _ => } @@ -1133,7 +1086,7 @@ trait Actor extends TransactionManagement with Logging { if (topLevelTransaction) clearTransactionSet // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client - if (_supervisor.isDefined) _supervisor.get ! Exit(this, e) + if (_supervisor.isDefined) _supervisor.get ! Exit(selfId, e) } finally { clearTransaction if (topLevelTransaction) clearTransactionSet @@ -1151,13 +1104,13 @@ trait Actor extends TransactionManagement with Logging { case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message") } - private[this] def handleTrapExit(dead: Actor, reason: Throwable): Unit = { + private[this] def handleTrapExit(dead: ActorID, reason: Throwable): Unit = { if (trapExit.exists(_.isAssignableFrom(reason.getClass))) { if (faultHandler.isDefined) { faultHandler.get match { // FIXME: implement support for maxNrOfRetries and withinTimeRange in RestartStrategy case AllForOneStrategy(maxNrOfRetries, withinTimeRange) => restartLinkedActors(reason) - case OneForOneStrategy(maxNrOfRetries, withinTimeRange) => dead.restart(reason) + case OneForOneStrategy(maxNrOfRetries, withinTimeRange) => dead.actor.restart(reason) } } else throw new IllegalStateException( "No 'faultHandler' defined for an actor with the 'trapExit' member field defined " + @@ -1166,8 +1119,9 @@ trait Actor extends TransactionManagement with Logging { } private[this] def restartLinkedActors(reason: Throwable) = { - getLinkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach { - actor => + getLinkedActors.toArray.toList.asInstanceOf[List[ActorID]].foreach { + actorId => + val actor = actorId.actor if (actor.lifeCycle.isEmpty) actor.lifeCycle = Some(LifeCycle(Permanent)) actor.lifeCycle.get match { case LifeCycle(scope, _) => { @@ -1177,11 +1131,13 @@ trait Actor extends TransactionManagement with Logging { case Temporary => Actor.log.info("Actor [%s] configured as TEMPORARY and will not be restarted.", actor.id) actor.stop - getLinkedActors.remove(actor) // remove the temporary actor + getLinkedActors.remove(actorId) // remove the temporary actor // if last temporary actor is gone, then unlink me from supervisor if (getLinkedActors.isEmpty) { - Actor.log.info("All linked actors have died permanently (they were all configured as TEMPORARY)\n\tshutting down and unlinking supervisor actor as well [%s].", actor.id) - _supervisor.foreach(_ ! UnlinkAndStop(this)) + Actor.log.info("All linked actors have died permanently (they were all configured as TEMPORARY)" + + "\n\tshutting down and unlinking supervisor actor as well [%s].", + actor.id) + _supervisor.foreach(_ ! UnlinkAndStop(selfId)) } } } @@ -1204,9 +1160,9 @@ trait Actor extends TransactionManagement with Logging { } else None } - protected def getLinkedActors: HashSet[Actor] = { + protected def getLinkedActors: HashSet[ActorID] = { if (_linkedActors.isEmpty) { - val set = new HashSet[Actor] + val set = new HashSet[ActorID] _linkedActors = Some(set) set } else _linkedActors.get diff --git a/akka-core/src/main/scala/actor/ActorRegistry.scala b/akka-core/src/main/scala/actor/ActorRegistry.scala index b942ef3d49..5026d788d3 100644 --- a/akka-core/src/main/scala/actor/ActorRegistry.scala +++ b/akka-core/src/main/scala/actor/ActorRegistry.scala @@ -11,8 +11,8 @@ import scala.reflect.Manifest import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap} sealed trait ActorRegistryEvent -case class ActorRegistered(actor: Actor) extends ActorRegistryEvent -case class ActorUnregistered(actor: Actor) extends ActorRegistryEvent +case class ActorRegistered(actor: ActorID) extends ActorRegistryEvent +case class ActorUnregistered(actor: ActorID) extends ActorRegistryEvent /** * Registry holding all Actor instances in the whole system. @@ -27,16 +27,16 @@ case class ActorUnregistered(actor: Actor) extends ActorRegistryEvent * @author Jonas Bonér */ object ActorRegistry extends Logging { - private val actorsByUUID = new ConcurrentHashMap[String, Actor] - private val actorsById = new ConcurrentHashMap[String, List[Actor]] - private val actorsByClassName = new ConcurrentHashMap[String, List[Actor]] - private val registrationListeners = new CopyOnWriteArrayList[Actor] + private val actorsByUUID = new ConcurrentHashMap[String, ActorID] + private val actorsById = new ConcurrentHashMap[String, List[ActorID]] + private val actorsByClassName = new ConcurrentHashMap[String, List[ActorID]] + private val registrationListeners = new CopyOnWriteArrayList[ActorID] /** * Returns all actors in the system. */ - def actors: List[Actor] = { - val all = new ListBuffer[Actor] + def actors: List[ActorID] = { + val all = new ListBuffer[ActorID] val elements = actorsByUUID.elements while (elements.hasMoreElements) all += elements.nextElement all.toList @@ -45,7 +45,7 @@ object ActorRegistry extends Logging { /** * Invokes a function for all actors. */ - def foreach(f: (Actor) => Unit) = { + def foreach(f: (ActorID) => Unit) = { val elements = actorsByUUID.elements while (elements.hasMoreElements) f(elements.nextElement) } @@ -57,9 +57,9 @@ object ActorRegistry extends Logging { val all = new ListBuffer[T] val elements = actorsByUUID.elements while (elements.hasMoreElements) { - val actor = elements.nextElement - if (manifest.erasure.isAssignableFrom(actor.getClass)) { - all += actor.asInstanceOf[T] + val actorId = elements.nextElement + if (manifest.erasure.isAssignableFrom(actorId.actor.getClass)) { + all += actorId.actor.asInstanceOf[T] } } all.toList @@ -77,15 +77,15 @@ object ActorRegistry extends Logging { /** * Finds all actors that has a specific id. */ - def actorsFor(id: String): List[Actor] = { - if (actorsById.containsKey(id)) actorsById.get(id).asInstanceOf[List[Actor]] + def actorsFor(id: String): List[ActorID] = { + if (actorsById.containsKey(id)) actorsById.get(id).asInstanceOf[List[ActorID]] else Nil } /** * Finds the actor that has a specific UUID. */ - def actorFor(uuid: String): Option[Actor] = { + def actorFor(uuid: String): Option[ActorID] = { if (actorsByUUID.containsKey(uuid)) Some(actorsByUUID.get(uuid)) else None } @@ -93,7 +93,7 @@ object ActorRegistry extends Logging { /** * Registers an actor in the ActorRegistry. */ - def register(actor: Actor) = { + def register(actor: ActorID) = { // UUID actorsByUUID.put(actor.uuid, actor) @@ -116,7 +116,7 @@ object ActorRegistry extends Logging { /** * Unregisters an actor in the ActorRegistry. */ - def unregister(actor: Actor) = { + def unregister(actor: ActorID) = { actorsByUUID remove actor.uuid actorsById remove actor.getId actorsByClassName remove actor.getClass.getName @@ -139,18 +139,18 @@ object ActorRegistry extends Logging { /** * Adds the registration listener this this registry's listener list. */ - def addRegistrationListener(listener: Actor) = { + def addRegistrationListener(listener: ActorID) = { registrationListeners.add(listener) } /** * Removes the registration listener this this registry's listener list. */ - def removeRegistrationListener(listener: Actor) = { + def removeRegistrationListener(listener: ActorID) = { registrationListeners.remove(listener) } - private def foreachListener(f: (Actor) => Unit) { + private def foreachListener(f: (ActorID) => Unit) { val iterator = registrationListeners.iterator while (iterator.hasNext) f(iterator.next) } diff --git a/akka-core/src/main/scala/actor/Supervisor.scala b/akka-core/src/main/scala/actor/Supervisor.scala index 964fc8d1ac..9f8d230c65 100644 --- a/akka-core/src/main/scala/actor/Supervisor.scala +++ b/akka-core/src/main/scala/actor/Supervisor.scala @@ -84,7 +84,7 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep faultHandler = Some(handler) dispatcher = Dispatchers.newThreadBasedDispatcher(this) - private val actors = new ConcurrentHashMap[String, List[Actor]] + private val actors = new ConcurrentHashMap[String, List[ActorID]] // Cheating, should really go through the dispatcher rather than direct access to a CHM def getInstance[T](clazz: Class[T]): List[T] = actors.get(clazz.getName).asInstanceOf[List[T]] @@ -94,7 +94,7 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep def isDefined(clazz: Class[_]): Boolean = actors.containsKey(clazz.getName) - override def start: Actor = synchronized { + override def start: Unit = synchronized { ConfiguratorRepository.registerConfigurator(this) super[Actor].start } @@ -117,31 +117,35 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep case SupervisorConfig(_, servers) => servers.map(server => server match { - case Supervise(actor, lifeCycle, remoteAddress) => - val className = actor.getClass.getName + case Supervise(actorId, lifeCycle, remoteAddress) => + val className = actorId.actor.getClass.getName val currentActors = { val list = actors.get(className) - if (list eq null) List[Actor]() + if (list eq null) List[ActorID]() else list } - actors.put(className, actor :: currentActors) - actor.lifeCycle = Some(lifeCycle) - startLink(actor) + actors.put(className, actorId :: currentActors) + actorId.actor.lifeCycle = Some(lifeCycle) + startLink(actorId) remoteAddress.foreach(address => RemoteServer.actorsFor( RemoteServer.Address(address.hostname, address.port)) - .actors.put(actor.getId, actor)) + .actors.put(actorId.getId, actorId)) case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration - val supervisor = factory.newInstanceFor(supervisorConfig).start + val supervisor = { + val instance = factory.newInstanceFor(supervisorConfig) + instance.start + instance + } supervisor.lifeCycle = Some(LifeCycle(Permanent)) val className = supervisor.getClass.getName val currentSupervisors = { val list = actors.get(className) - if (list eq null) List[Actor]() + if (list eq null) List[ActorID]() else list } - actors.put(className, supervisor :: currentSupervisors) - link(supervisor) + actors.put(className, supervisor.selfId :: currentSupervisors) + link(supervisor.selfId) }) } } diff --git a/akka-core/src/main/scala/config/SupervisionConfig.scala b/akka-core/src/main/scala/config/SupervisionConfig.scala index 5672ee3695..ffc90f7877 100644 --- a/akka-core/src/main/scala/config/SupervisionConfig.scala +++ b/akka-core/src/main/scala/config/SupervisionConfig.scala @@ -4,7 +4,7 @@ package se.scalablesolutions.akka.config -import se.scalablesolutions.akka.actor.Actor +import se.scalablesolutions.akka.actor.{Actor, ActorID} import se.scalablesolutions.akka.dispatch.MessageDispatcher sealed abstract class FaultHandlingStrategy @@ -25,13 +25,13 @@ object ScalaConfig { case class SupervisorConfig(restartStrategy: RestartStrategy, worker: List[Server]) extends Server - class Supervise(val actor: Actor, val lifeCycle: LifeCycle, _remoteAddress: RemoteAddress) extends Server { + class Supervise(val actorId: ActorID, val lifeCycle: LifeCycle, _remoteAddress: RemoteAddress) extends Server { val remoteAddress: Option[RemoteAddress] = if (_remoteAddress eq null) None else Some(_remoteAddress) } object Supervise { - def apply(actor: Actor, lifeCycle: LifeCycle, remoteAddress: RemoteAddress) = new Supervise(actor, lifeCycle, remoteAddress) - def apply(actor: Actor, lifeCycle: LifeCycle) = new Supervise(actor, lifeCycle, null) - def unapply(supervise: Supervise) = Some((supervise.actor, supervise.lifeCycle, supervise.remoteAddress)) + def apply(actorId: ActorID, lifeCycle: LifeCycle, remoteAddress: RemoteAddress) = new Supervise(actorId, lifeCycle, remoteAddress) + def apply(actorId: ActorID, lifeCycle: LifeCycle) = new Supervise(actorId, lifeCycle, null) + def unapply(supervise: Supervise) = Some((supervise.actorId, supervise.lifeCycle, supervise.remoteAddress)) } case class RestartStrategy( @@ -227,8 +227,8 @@ object JavaConfig { intf, target, lifeCycle.transform, timeout, transactionRequired, dispatcher, if (remoteAddress ne null) se.scalablesolutions.akka.config.ScalaConfig.RemoteAddress(remoteAddress.hostname, remoteAddress.port) else null) - def newSupervised(actor: Actor) = - se.scalablesolutions.akka.config.ScalaConfig.Supervise(actor, lifeCycle.transform) + def newSupervised(actorId: ActorID) = + se.scalablesolutions.akka.config.ScalaConfig.Supervise(actorId, lifeCycle.transform) } } \ No newline at end of file diff --git a/akka-core/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala index 3727e8ca92..9a5b6de189 100644 --- a/akka-core/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala @@ -7,7 +7,7 @@ package se.scalablesolutions.akka.dispatch import java.util.{LinkedList, Queue, List} import java.util.HashMap -import se.scalablesolutions.akka.actor.{ActorMessageInvoker, Actor} +import se.scalablesolutions.akka.actor.{ActorMessageInvoker, Actor, ActorID} abstract class AbstractReactorBasedEventDrivenDispatcher(val name: String) extends MessageDispatcher { @volatile protected var active: Boolean = false @@ -15,17 +15,17 @@ abstract class AbstractReactorBasedEventDrivenDispatcher(val name: String) exten protected val messageInvokers = new HashMap[AnyRef, MessageInvoker] protected var selectorThread: Thread = _ protected val guard = new Object - + def dispatch(invocation: MessageInvocation) = queue.append(invocation) - override def register(actor: Actor) = synchronized { - messageInvokers.put(actor, new ActorMessageInvoker(actor)) - super.register(actor) + override def register(actorId: ActorID) = synchronized { + messageInvokers.put(actorId, new ActorMessageInvoker(actorId)) + super.register(actorId) } - override def unregister(actor: Actor) = synchronized { - messageInvokers.remove(actor) - super.unregister(actor) + override def unregister(actorId: ActorID) = synchronized { + messageInvokers.remove(actorId) + super.unregister(actorId) } def shutdown = if (active) { diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 0c624c2e3a..4ea16e47e1 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -65,21 +65,23 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche var lockAcquiredOnce = false // this do-wile loop is required to prevent missing new messages between the end of the inner while // loop and releasing the lock + val lock = invocation.receiver.actor._dispatcherLock + val mailbox = invocation.receiver.actor._mailbox do { - if (invocation.receiver._dispatcherLock.tryLock) { + if (lock.tryLock) { lockAcquiredOnce = true try { // Only dispatch if we got the lock. Otherwise another thread is already dispatching. - var messageInvocation = invocation.receiver._mailbox.poll + var messageInvocation = mailbox.poll while (messageInvocation != null) { messageInvocation.invoke - messageInvocation = invocation.receiver._mailbox.poll + messageInvocation = mailbox.poll } } finally { - invocation.receiver._dispatcherLock.unlock + lock.unlock } } - } while ((lockAcquiredOnce && !invocation.receiver._mailbox.isEmpty)) + } while ((lockAcquiredOnce && !mailbox.isEmpty)) } }) } else throw new IllegalStateException("Can't submit invocations to dispatcher since it's not started") diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index 28fe624b86..d166c1956d 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -5,7 +5,8 @@ package se.scalablesolutions.akka.dispatch import java.util.concurrent.CopyOnWriteArrayList -import se.scalablesolutions.akka.actor.Actor + +import se.scalablesolutions.akka.actor.{Actor, ActorID} /** * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed @@ -30,10 +31,12 @@ import se.scalablesolutions.akka.actor.Actor class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends MessageDispatcher with ThreadPoolBuilder { @volatile private var active: Boolean = false + implicit def actorId2actor(actorId: ActorID): Actor = actorId.actor + /** Type of the actors registered in this dispatcher. */ private var actorType:Option[Class[_]] = None - private val pooledActors = new CopyOnWriteArrayList[Actor] + private val pooledActors = new CopyOnWriteArrayList[ActorID] /** The index in the pooled actors list which was last used to steal work */ @volatile private var lastThiefIndex = 0 @@ -65,17 +68,18 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess * * @return true if the mailbox was processed, false otherwise */ - private def tryProcessMailbox(receiver: Actor): Boolean = { + private def tryProcessMailbox(receiver: ActorID): Boolean = { var lockAcquiredOnce = false + val lock = receiver.actor._dispatcherLock // this do-wile loop is required to prevent missing new messages between the end of processing // the mailbox and releasing the lock do { - if (receiver._dispatcherLock.tryLock) { + if (lock.tryLock) { lockAcquiredOnce = true try { processMailbox(receiver) } finally { - receiver._dispatcherLock.unlock + lock.unlock } } } while ((lockAcquiredOnce && !receiver._mailbox.isEmpty)) @@ -86,7 +90,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess /** * Process the messages in the mailbox of the given actor. */ - private def processMailbox(receiver: Actor) = { + private def processMailbox(receiver: ActorID) = { var messageInvocation = receiver._mailbox.poll while (messageInvocation != null) { messageInvocation.invoke @@ -94,9 +98,9 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess } } - private def findThief(receiver: Actor): Option[Actor] = { + private def findThief(receiver: ActorID): Option[ActorID] = { // copy to prevent concurrent modifications having any impact - val actors = pooledActors.toArray(new Array[Actor](pooledActors.size)) + val actors = pooledActors.toArray(new Array[ActorID](pooledActors.size)) var i = lastThiefIndex if (i > actors.size) i = 0 @@ -104,7 +108,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess // we risk to pick a thief which is unregistered from the dispatcher in the meantime, but that typically means // the dispatcher is being shut down... doFindThief(receiver, actors, i) match { - case (thief: Option[Actor], index: Int) => { + case (thief: Option[ActorID], index: Int) => { lastThiefIndex = (index + 1) % actors.size return thief } @@ -119,7 +123,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess * @param startIndex first index to start looking in the list (i.e. for round robin) * @return the thief (or None) and the new index to start searching next time */ - private def doFindThief(receiver: Actor, actors: Array[Actor], startIndex: Int): (Option[Actor], Int) = { + private def doFindThief(receiver: ActorID, actors: Array[ActorID], startIndex: Int): (Option[ActorID], Int) = { for (i <- 0 to actors.length) { val index = (i + startIndex) % actors.length val actor = actors(index) @@ -136,7 +140,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess * Try donating messages to the thief and processing the thiefs mailbox. Doesn't do anything if we can not acquire * the thiefs dispatching lock, because in that case another thread is already processing the thiefs mailbox. */ - private def tryDonateAndProcessMessages(receiver: Actor, thief: Actor) = { + private def tryDonateAndProcessMessages(receiver: ActorID, thief: ActorID) = { if (thief._dispatcherLock.tryLock) { try { donateAndProcessMessages(receiver, thief) @@ -149,7 +153,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess /** * Donate messages to the thief and process them on the thief as long as the receiver has more messages. */ - private def donateAndProcessMessages(receiver: Actor, thief: Actor): Unit = { + private def donateAndProcessMessages(receiver: ActorID, thief: ActorID): Unit = { donateMessage(receiver, thief) match { case None => { // no more messages to donate @@ -165,10 +169,10 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess /** * Steal a message from the receiver and give it to the thief. */ - private def donateMessage(receiver: Actor, thief: Actor): Option[MessageInvocation] = { + private def donateMessage(receiver: ActorID, thief: ActorID): Option[MessageInvocation] = { val donated = receiver._mailbox.pollLast if (donated != null) { - thief ! donated.message + thief.selfId ! donated.message return Some(donated) } else return None } @@ -189,29 +193,29 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess private[akka] def init = withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool - override def register(actor: Actor) = { - verifyActorsAreOfSameType(actor) - pooledActors.add(actor) - super.register(actor) + override def register(actorId: ActorID) = { + verifyActorsAreOfSameType(actorId) + pooledActors.add(actorId) + super.register(actorId) } - override def unregister(actor: Actor) = { - pooledActors.remove(actor) - super.unregister(actor) + override def unregister(actorId: ActorID) = { + pooledActors.remove(actorId) + super.unregister(actorId) } - + def usesActorMailbox = true - private def verifyActorsAreOfSameType(newActor: Actor) = { + private def verifyActorsAreOfSameType(newActorId: ActorID) = { actorType match { case None => { - actorType = Some(newActor.getClass) + actorType = Some(newActorId.actor.getClass) } case Some(aType) => { - if (aType != newActor.getClass) + if (aType != newActorId.actor.getClass) throw new IllegalStateException( String.format("Can't register actor %s in a work stealing dispatcher which already knows actors of type %s", - newActor, aType)) + newActorId.actor, aType)) } } } diff --git a/akka-core/src/main/scala/dispatch/Reactor.scala b/akka-core/src/main/scala/dispatch/Reactor.scala index 3eecbef0f3..4f2cdab348 100644 --- a/akka-core/src/main/scala/dispatch/Reactor.scala +++ b/akka-core/src/main/scala/dispatch/Reactor.scala @@ -7,25 +7,25 @@ package se.scalablesolutions.akka.dispatch import java.util.List import se.scalablesolutions.akka.util.{HashCode, Logging} -import se.scalablesolutions.akka.actor.Actor +import se.scalablesolutions.akka.actor.{Actor, ActorID} import java.util.concurrent.ConcurrentHashMap import org.multiverse.commitbarriers.CountDownCommitBarrier -final class MessageInvocation(val receiver: Actor, +final class MessageInvocation(val receiver: ActorID, val message: Any, - val replyTo : Option[Either[Actor,CompletableFuture[Any]]], + val replyTo : Option[Either[ActorID, CompletableFuture[Any]]], val transactionSet: Option[CountDownCommitBarrier]) { if (receiver eq null) throw new IllegalArgumentException("receiver is null") - def invoke = receiver.invoke(this) + def invoke = receiver.actor.invoke(this) def send = receiver.dispatcher.dispatch(this) override def hashCode(): Int = synchronized { var result = HashCode.SEED - result = HashCode.hash(result, receiver) + result = HashCode.hash(result, receiver.actor) result = HashCode.hash(result, message.asInstanceOf[AnyRef]) result } @@ -33,7 +33,7 @@ final class MessageInvocation(val receiver: Actor, override def equals(that: Any): Boolean = synchronized { that != null && that.isInstanceOf[MessageInvocation] && - that.asInstanceOf[MessageInvocation].receiver == receiver && + that.asInstanceOf[MessageInvocation].receiver.actor == receiver.actor && that.asInstanceOf[MessageInvocation].message == message } @@ -56,14 +56,14 @@ trait MessageInvoker { } trait MessageDispatcher extends Logging { - protected val references = new ConcurrentHashMap[String, Actor] + protected val references = new ConcurrentHashMap[String, ActorID] def dispatch(invocation: MessageInvocation) def start def shutdown - def register(actor: Actor) = references.put(actor.uuid, actor) - def unregister(actor: Actor) = { - references.remove(actor.uuid) - if (canBeShutDown) + def register(actorId: ActorID) = references.put(actorId.uuid, actorId) + def unregister(actorId: ActorID) = { + references.remove(actorId.uuid) + if (canBeShutDown) shutdown // shut down in the dispatcher's references is zero } def canBeShutDown: Boolean = references.isEmpty diff --git a/akka-core/src/main/scala/remote/Cluster.scala b/akka-core/src/main/scala/remote/Cluster.scala index be0e881eb6..f5b0dce52f 100644 --- a/akka-core/src/main/scala/remote/Cluster.scala +++ b/akka-core/src/main/scala/remote/Cluster.scala @@ -212,19 +212,19 @@ abstract class BasicClusterActor extends ClusterActor with Logging { * Registers a local endpoint */ def registerLocalNode(hostname: String, port: Int): Unit = - this ! RegisterLocalNode(RemoteAddress(hostname, port)) + selfId ! RegisterLocalNode(RemoteAddress(hostname, port)) /** * Deregisters a local endpoint */ def deregisterLocalNode(hostname: String, port: Int): Unit = - this ! DeregisterLocalNode(RemoteAddress(hostname, port)) + selfId ! DeregisterLocalNode(RemoteAddress(hostname, port)) /** * Broadcasts the specified message to all Actors of type Class on all known Nodes */ def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit = - this ! RelayedMessage(to.getName, msg) + selfId ! RelayedMessage(to.getName, msg) } /** @@ -261,7 +261,7 @@ object Cluster extends Cluster with Logging { val sup = SupervisorFactory( SupervisorConfig( RestartStrategy(OneForOne, 5, 1000, List(classOf[Exception])), - Supervise(actor, LifeCycle(Permanent)) :: Nil) + Supervise(actor.selfId, LifeCycle(Permanent)) :: Nil) ).newInstance Some(sup) } diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index 1fcbbe8551..4db987827e 100644 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -89,7 +89,7 @@ object RemoteServer { } class RemoteActorSet { - val actors = new ConcurrentHashMap[String, Actor] + val actors = new ConcurrentHashMap[String, ActorID] val activeObjects = new ConcurrentHashMap[String, AnyRef] }