diff --git a/akka-amqp/src/main/scala/AMQP.scala b/akka-amqp/src/main/scala/AMQP.scala index 3dabce5023..84f52088e7 100644 --- a/akka-amqp/src/main/scala/AMQP.scala +++ b/akka-amqp/src/main/scala/AMQP.scala @@ -7,7 +7,7 @@ package se.scalablesolutions.akka.amqp import com.rabbitmq.client.{AMQP => RabbitMQ, _} import com.rabbitmq.client.ConnectionFactory -import se.scalablesolutions.akka.actor.{Actor, ActorID} +import se.scalablesolutions.akka.actor.{Actor, ActorRef} import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.config.OneForOneStrategy import se.scalablesolutions.akka.config.ScalaConfig._ @@ -51,7 +51,7 @@ object AMQP { exchangeName: String, returnListener: Option[ReturnListener], shutdownListener: Option[ShutdownListener], - initReconnectDelay: Long): ActorID = + initReconnectDelay: Long): ActorRef = supervisor.newProducer( config, hostname, port, exchangeName, returnListener, shutdownListener, initReconnectDelay) @@ -66,13 +66,13 @@ object AMQP { passive: Boolean, durable: Boolean, autoDelete: Boolean, - configurationArguments: Map[String, AnyRef]): ActorID = + configurationArguments: Map[String, AnyRef]): ActorRef = supervisor.newConsumer( config, hostname, port, exchangeName, exchangeType, shutdownListener, initReconnectDelay, passive, durable, autoDelete, configurationArguments) - def stopConnection(connection: ActorID) = supervisor.stopConnection(connection) + def stopConnection(connection: ActorRef) = supervisor.stopConnection(connection) /** * @author Jonas Bonér @@ -80,7 +80,7 @@ object AMQP { class AMQPSupervisor extends Actor with Logging { import scala.collection.JavaConversions._ - private val connections = new ConcurrentHashMap[ActorID, ActorID] + private val connections = new ConcurrentHashMap[ActorRef, ActorRef] faultHandler = Some(OneForOneStrategy(5, 5000)) trapExit = List(classOf[Throwable]) @@ -93,7 +93,7 @@ object AMQP { exchangeName: String, returnListener: Option[ReturnListener], shutdownListener: Option[ShutdownListener], - initReconnectDelay: Long): ActorID = { + initReconnectDelay: Long): ActorRef = { val producer = newActor(() => new Producer( new ConnectionFactory(config), hostname, port, @@ -116,7 +116,7 @@ object AMQP { passive: Boolean, durable: Boolean, autoDelete: Boolean, - configurationArguments: Map[String, AnyRef]): ActorID = { + configurationArguments: Map[String, AnyRef]): ActorRef = { val consumer = newActor(() => new Consumer( new ConnectionFactory(config), hostname, port, @@ -132,7 +132,7 @@ object AMQP { consumer } - def stopConnection(connection: ActorID) = { + def stopConnection(connection: ActorRef) = { connection ! Stop unlink(connection) connections.remove(connection) @@ -189,11 +189,11 @@ object AMQP { val exclusive: Boolean, val autoDelete: Boolean, val isUsingExistingQueue: Boolean, - val actor: ActorID) extends AMQPMessage { + val actor: ActorRef) extends AMQPMessage { /** * Creates a non-exclusive, non-autodelete message listener. */ - def this(queueName: String, routingKey: String, actor: ActorID) = this (queueName, routingKey, false, false, false, actor) + def this(queueName: String, routingKey: String, actor: ActorRef) = this (queueName, routingKey, false, false, false, actor) private[akka] var tag: Option[String] = None @@ -242,12 +242,12 @@ object AMQP { exclusive: Boolean, autoDelete: Boolean, isUsingExistingQueue: Boolean, - actor: ActorID) = + actor: ActorRef) = new MessageConsumerListener(queueName, routingKey, exclusive, autoDelete, isUsingExistingQueue, actor) def apply(queueName: String, routingKey: String, - actor: ActorID) = + actor: ActorRef) = new MessageConsumerListener(queueName, routingKey, false, false, false, actor) } @@ -591,10 +591,10 @@ object AMQP { } catch { case e: Exception => val waitInMillis = delay * 2 - val outerActorID = self + val outerActorRef = self log.debug("Trying to reconnect to AMQP server in %n milliseconds [%s]", waitInMillis, this) reconnectionTimer.schedule(new TimerTask() { - override def run = outerActorID ! Reconnect(waitInMillis) + override def run = outerActorRef ! Reconnect(waitInMillis) }, delay) } } diff --git a/akka-camel/src/main/scala/Producer.scala b/akka-camel/src/main/scala/Producer.scala index 09b2bc98da..743b3dfe06 100644 --- a/akka-camel/src/main/scala/Producer.scala +++ b/akka-camel/src/main/scala/Producer.scala @@ -10,7 +10,7 @@ import org.apache.camel.{Processor, ExchangePattern, Exchange, ProducerTemplate} import org.apache.camel.impl.DefaultExchange import org.apache.camel.spi.Synchronization -import se.scalablesolutions.akka.actor.{Actor, ActorID} +import se.scalablesolutions.akka.actor.{Actor, ActorRef} import se.scalablesolutions.akka.dispatch.CompletableFuture import se.scalablesolutions.akka.util.Logging @@ -162,7 +162,7 @@ trait Producer { self: Actor => */ class ProducerResponseSender( headers: Map[String, Any], - replyTo : Option[Either[ActorID, CompletableFuture[Any]]], + replyTo : Option[Either[ActorRef, CompletableFuture[Any]]], producer: Actor) extends Synchronization with Logging { implicit val producerActor = Some(producer) // the response sender diff --git a/akka-camel/src/main/scala/component/ActorComponent.scala b/akka-camel/src/main/scala/component/ActorComponent.scala index dc7866d329..9bd543e830 100644 --- a/akka-camel/src/main/scala/component/ActorComponent.scala +++ b/akka-camel/src/main/scala/component/ActorComponent.scala @@ -11,7 +11,7 @@ import java.util.concurrent.TimeoutException import org.apache.camel.{Exchange, Consumer, Processor} import org.apache.camel.impl.{DefaultProducer, DefaultEndpoint, DefaultComponent} -import se.scalablesolutions.akka.actor.{ActorRegistry, Actor, ActorID} +import se.scalablesolutions.akka.actor.{ActorRegistry, Actor, ActorRef} import se.scalablesolutions.akka.camel.{Failure, CamelMessageConversion, Message} /** @@ -106,7 +106,7 @@ class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) { * Send the exchange in-message to the given actor using the ! operator. The message * send to the actor is of type se.scalablesolutions.akka.camel.Message. */ - protected def processInOnly(exchange: Exchange, actor: ActorID): Unit = + protected def processInOnly(exchange: Exchange, actor: ActorRef): Unit = actor ! exchange.toRequestMessage(Map(Message.MessageExchangeId -> exchange.getExchangeId)) /** @@ -114,7 +114,7 @@ class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) { * out-message is populated from the actor's reply message. The message sent to the * actor is of type se.scalablesolutions.akka.camel.Message. */ - protected def processInOut(exchange: Exchange, actor: ActorID) { + protected def processInOut(exchange: Exchange, actor: ActorRef) { val header = Map(Message.MessageExchangeId -> exchange.getExchangeId) val result: Any = actor !! exchange.toRequestMessage(header) @@ -128,7 +128,7 @@ class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) { } } - private def target: Option[ActorID] = + private def target: Option[ActorRef] = if (ep.id.isDefined) targetById(ep.id.get) else targetByUuid(ep.uuid.get) diff --git a/akka-camel/src/main/scala/service/ConsumerPublisher.scala b/akka-camel/src/main/scala/service/ConsumerPublisher.scala index 95b60f26c4..45aa0514f6 100644 --- a/akka-camel/src/main/scala/service/ConsumerPublisher.scala +++ b/akka-camel/src/main/scala/service/ConsumerPublisher.scala @@ -8,7 +8,7 @@ import java.util.concurrent.CountDownLatch import org.apache.camel.builder.RouteBuilder -import se.scalablesolutions.akka.actor.{ActorUnregistered, ActorRegistered, Actor, ActorID} +import se.scalablesolutions.akka.actor.{ActorUnregistered, ActorRegistered, Actor, ActorRef} import se.scalablesolutions.akka.actor.annotation.consume import se.scalablesolutions.akka.camel.{Consumer, CamelContextManager} import se.scalablesolutions.akka.util.Logging @@ -81,7 +81,7 @@ class ConsumerRoute(val endpointUri: String, id: String, uuid: Boolean) extends * * @author Martin Krasser */ -class PublishRequestor(consumerPublisher: ActorID) extends Actor { +class PublishRequestor(consumerPublisher: ActorRef) extends Actor { protected def receive = { case ActorUnregistered(actor) => { /* ignore */ } case ActorRegistered(actor) => Publish.forConsumer(actor) match { @@ -112,23 +112,23 @@ object Publish { * Creates a list of Publish request messages for all consumer actors in the actors * list. */ - def forConsumers(actors: List[ActorID]): List[Publish] = + def forConsumers(actors: List[ActorRef]): List[Publish] = for (actor <- actors; pub = forConsumer(actor); if pub.isDefined) yield pub.get /** * Creates a Publish request message if actor is a consumer actor. */ - def forConsumer(actor: ActorID): Option[Publish] = + def forConsumer(actor: ActorRef): Option[Publish] = forConsumeAnnotated(actor) orElse forConsumerType(actor) - private def forConsumeAnnotated(actorId: ActorID): Option[Publish] = { + private def forConsumeAnnotated(actorId: ActorRef): Option[Publish] = { val annotation = actorId.actorClass.getAnnotation(classOf[consume]) if (annotation eq null) None else if (actorId.remoteAddress.isDefined) None // do not publish proxies else Some(Publish(annotation.value, actorId.id, false)) } - private def forConsumerType(actorId: ActorID): Option[Publish] = + private def forConsumerType(actorId: ActorRef): Option[Publish] = if (!actorId.actor.isInstanceOf[Consumer]) None else if (actorId.remoteAddress.isDefined) None else Some(Publish(actorId.actor.asInstanceOf[Consumer].endpointUri, actorId.uuid, true)) diff --git a/akka-core/src/main/scala/actor/ActiveObject.scala b/akka-core/src/main/scala/actor/ActiveObject.scala index a67a1c5b72..187d770a07 100644 --- a/akka-core/src/main/scala/actor/ActiveObject.scala +++ b/akka-core/src/main/scala/actor/ActiveObject.scala @@ -284,7 +284,7 @@ object ActiveObject { actor.initialize(target, proxy) actor.timeout = timeout if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get) - val actorId = new ActorID(() => actor) + val actorId = new ActorRef(() => actor) AspectInitRegistry.register(proxy, AspectInit(target, actorId, remoteAddress, timeout)) actorId.start proxy.asInstanceOf[T] @@ -295,7 +295,7 @@ object ActiveObject { actor.initialize(target.getClass, target) actor.timeout = timeout if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get) - val actorId = new ActorID(() => actor) + val actorId = new ActorRef(() => actor) AspectInitRegistry.register(proxy, AspectInit(intf, actorId, remoteAddress, timeout)) actorId.start proxy.asInstanceOf[T] @@ -304,7 +304,7 @@ object ActiveObject { /** * Get the underlying dispatcher actor for the given active object. */ - def actorFor(obj: AnyRef): Option[ActorID] = + def actorFor(obj: AnyRef): Option[ActorRef] = ActorRegistry.actorsFor(classOf[Dispatcher]).find(a => a.actor.asInstanceOf[Dispatcher].target == Some(obj)) /** @@ -388,10 +388,10 @@ private[akka] object AspectInitRegistry { private[akka] sealed case class AspectInit( val target: Class[_], - val actorId: ActorID, + val actorId: ActorRef, val remoteAddress: Option[InetSocketAddress], val timeout: Long) { - def this(target: Class[_], actorId: ActorID, timeout: Long) = this(target, actorId, None, timeout) + def this(target: Class[_], actorId: ActorRef, timeout: Long) = this(target, actorId, None, timeout) } /** @@ -405,7 +405,7 @@ private[akka] sealed case class AspectInit( private[akka] sealed class ActiveObjectAspect { @volatile private var isInitialized = false private var target: Class[_] = _ - private var actorId: ActorID = _ + private var actorId: ActorRef = _ private var remoteAddress: Option[InetSocketAddress] = _ private var timeout: Long = _ @@ -520,7 +520,7 @@ private[akka] sealed class ActiveObjectAspect { } // FIXME Jan Kronquist: started work on issue 121 -private[akka] case class Link(val actor: ActorID) +private[akka] case class Link(val actor: ActorRef) object Dispatcher { val ZERO_ITEM_CLASS_ARRAY = Array[Class[_]]() diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index ec4cd938ad..381df9d535 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -51,9 +51,9 @@ abstract class RemoteActor(hostname: String, port: Int) extends Actor { @serializable sealed trait LifeCycleMessage case class HotSwap(code: Option[PartialFunction[Any, Unit]]) extends LifeCycleMessage case class Restart(reason: Throwable) extends LifeCycleMessage -case class Exit(dead: ActorID, killer: Throwable) extends LifeCycleMessage -case class Unlink(child: ActorID) extends LifeCycleMessage -case class UnlinkAndStop(child: ActorID) extends LifeCycleMessage +case class Exit(dead: ActorRef, killer: Throwable) extends LifeCycleMessage +case class Unlink(child: ActorRef) extends LifeCycleMessage +case class UnlinkAndStop(child: ActorRef) extends LifeCycleMessage case object Kill extends LifeCycleMessage // Exceptions for Actors @@ -78,7 +78,7 @@ object Actor extends Logging { } /** - * Creates a new ActorID out of the Actor with type T. + * Creates a new ActorRef out of the Actor with type T. *
    *   import Actor._
    *   val actor = newActor[MyActor]
@@ -87,10 +87,10 @@ object Actor extends Logging {
    *   actor.stop
    * 
*/ - def newActor[T <: Actor: Manifest]: ActorID = new ActorID(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]]) + def newActor[T <: Actor: Manifest]: ActorRef = new ActorRef(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]]) /** - * Creates a new ActorID out of the Actor. Allows you to pass in a factory function + * Creates a new ActorRef out of the Actor. Allows you to pass in a factory function * that creates the Actor. Please note that this function can be invoked multiple * times if for example the Actor is supervised and needs to be restarted. *

@@ -103,7 +103,7 @@ object Actor extends Logging { * actor.stop * */ - def newActor(factory: () => Actor): ActorID = new ActorID(factory) + def newActor(factory: () => Actor): ActorRef = new ActorRef(factory) /** * Use to create an anonymous event-driven actor. @@ -121,8 +121,8 @@ object Actor extends Logging { * } * */ - def actor(body: PartialFunction[Any, Unit]): ActorID = - new ActorID(() => new Actor() { + def actor(body: PartialFunction[Any, Unit]): ActorRef = + new ActorRef(() => new Actor() { lifeCycle = Some(LifeCycle(Permanent)) start def receive: PartialFunction[Any, Unit] = body @@ -144,8 +144,8 @@ object Actor extends Logging { * } * */ - def transactor(body: PartialFunction[Any, Unit]): ActorID = - new ActorID(() => new Transactor() { + def transactor(body: PartialFunction[Any, Unit]): ActorRef = + new ActorRef(() => new Transactor() { lifeCycle = Some(LifeCycle(Permanent)) start def receive: PartialFunction[Any, Unit] = body @@ -165,8 +165,8 @@ object Actor extends Logging { * } * */ - def temporaryActor(body: PartialFunction[Any, Unit]): ActorID = - new ActorID(() => new Actor() { + def temporaryActor(body: PartialFunction[Any, Unit]): ActorRef = + new ActorRef(() => new Actor() { lifeCycle = Some(LifeCycle(Temporary)) start def receive = body @@ -192,7 +192,7 @@ object Actor extends Logging { def init[A](body: => Unit) = { def handler[A](body: => Unit) = new { def receive(handler: PartialFunction[Any, Unit]) = - new ActorID(() => new Actor() { + new ActorRef(() => new Actor() { lifeCycle = Some(LifeCycle(Permanent)) start body @@ -243,8 +243,8 @@ object Actor extends Logging { } /** - * ActorID is an immutable and serializable handle to an Actor. - * Create an ActorID for an Actor by using the factory method on the Actor object. + * ActorRef is an immutable and serializable handle to an Actor. + * Create an ActorRef for an Actor by using the factory method on the Actor object. * Here is an example: *

  *   import Actor._
@@ -257,7 +257,7 @@ object Actor extends Logging {
  * 
  * @author Jonas Bonér
  */
-final class ActorID private[akka] () {
+final class ActorRef private[akka] () {
   private[akka] var newActorFactory: Either[Option[Class[_ <: Actor]], Option[() => Actor]] = Left(None)
 
   private[akka] def this(clazz: Class[_ <: Actor]) = { 
@@ -286,19 +286,19 @@ final class ActorID private[akka] () {
       case _ => 
         throw new ActorInitializationException("Can't create Actor, no Actor class or factory function in scope")
     }
-    if (actor eq null) throw new ActorInitializationException("Actor instance passed to ActorID can not be 'null'")
+    if (actor eq null) throw new ActorInitializationException("Actor instance passed to ActorRef can not be 'null'")
     actor
   }
   
   /**
-   * Returns the class for the Actor instance that is managed by the ActorID.
+   * Returns the class for the Actor instance that is managed by the ActorRef.
    */
   def actorClass: Class[_ <: Actor] = actor.getClass.asInstanceOf[Class[_ <: Actor]]
   
   /**
    * Starts up the actor and its message queue.
    */
-  def start: ActorID = {
+  def start: ActorRef = {
     actor.start
     this
   }
@@ -327,7 +327,7 @@ final class ActorID private[akka] () {
   /**
    * Returns the supervisor, if there is one.
    */
-  def supervisor: Option[ActorID] = actor.supervisor
+  def supervisor: Option[ActorRef] = actor.supervisor
 
   /**
    * Sends a one-way asynchronous message. E.g. fire-and-forget semantics.
@@ -343,7 +343,7 @@ final class ActorID private[akka] () {
    * 
*

*/ - def !(message: Any)(implicit sender: Option[ActorID] = None) = { + def !(message: Any)(implicit sender: Option[ActorRef] = None) = { if (actor.isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") if (actor.isRunning) actor.postMessageToMailbox(message, sender) else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") @@ -421,7 +421,7 @@ final class ActorID private[akka] () { *

* Works with '!', '!!' and '!!!'. */ - def forward(message: Any)(implicit sender: Some[ActorID]) = { + def forward(message: Any)(implicit sender: Some[ActorRef]) = { if (actor.isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") if (actor.isRunning) { sender.get.actor.replyTo match { @@ -491,11 +491,11 @@ final class ActorID private[akka] () { */ def timeout: Long = actor.timeout - override def toString: String = "ActorID[" + actor.toString + "]" + override def toString: String = "ActorRef[" + actor.toString + "]" override def hashCode: Int = actor.hashCode override def equals(that: Any): Boolean = actor.equals(that) - private[akka] def supervisor_=(sup: Option[ActorID]): Unit = actor._supervisor = sup + private[akka] def supervisor_=(sup: Option[ActorRef]): Unit = actor._supervisor = sup private[akka] def trapExit: List[Class[_ <: Throwable]] = actor.trapExit private[akka] def trapExit_=(exits: List[Class[_ <: Throwable]]) = actor.trapExit = exits @@ -525,7 +525,7 @@ trait Actor extends TransactionManagement with Logging { private[akka] var _uuid = UUID.newUuid.toString /** - * The 'self' field holds the ActorID for this actor. + * The 'self' field holds the ActorRef for this actor. * Can be used to send messages to itself: *

    * self ! message
@@ -533,7 +533,7 @@ trait Actor extends TransactionManagement with Logging {
    * Note: if you are using the 'self' field in the constructor of the Actor
    *       then you have to make the fields/operations that are using it 'lazy'.
    */
-   implicit val self = new ActorID(() => this)
+   implicit val self = new ActorRef(() => this)
 
    /** For internal use only */
    implicit val _selfOption = Some(self)
@@ -548,8 +548,8 @@ trait Actor extends TransactionManagement with Logging {
   @volatile private[akka] var _isKilled = false
   private var _hotswap: Option[PartialFunction[Any, Unit]] = None
   private[akka] var _remoteAddress: Option[InetSocketAddress] = None
-  private[akka] var _linkedActors: Option[HashSet[ActorID]] = None
-  private[akka] var _supervisor: Option[ActorID] = None
+  private[akka] var _linkedActors: Option[HashSet[ActorRef]] = None
+  private[akka] var _supervisor: Option[ActorRef] = None
   private[akka] var _replyToAddress: Option[InetSocketAddress] = None
   private[akka] val _mailbox: Deque[MessageInvocation] = new ConcurrentLinkedDeque[MessageInvocation]
 
@@ -565,7 +565,7 @@ trait Actor extends TransactionManagement with Logging {
    * - Is Some(Left(Actor)) if sender is an actor
    * - Is Some(Right(CompletableFuture)) if sender is holding on to a Future for the result
    */
-  private[akka] var replyTo: Option[Either[ActorID, CompletableFuture[Any]]] = None
+  private[akka] var replyTo: Option[Either[ActorRef, CompletableFuture[Any]]] = None
 
   // ====================================
   // ==== USER CALLBACKS TO OVERRIDE ====
@@ -777,7 +777,7 @@ trait Actor extends TransactionManagement with Logging {
   /**
    * Returns the supervisor, if there is one.
    */
-  def supervisor: Option[ActorID] = _supervisor
+  def supervisor: Option[ActorRef] = _supervisor
 
   /**
    * Use reply(..) to reply with a message to the original sender of the message currently
@@ -879,7 +879,7 @@ trait Actor extends TransactionManagement with Logging {
    * 

* To be invoked from within the actor itself. */ - protected[this] def link(actorId: ActorID) = { + protected[this] def link(actorId: ActorRef) = { if (actorId.supervisor.isDefined) throw new IllegalStateException( "Actor can only have one supervisor [" + actorId + "], e.g. link(actor) fails") getLinkedActors.add(actorId) @@ -892,7 +892,7 @@ trait Actor extends TransactionManagement with Logging { *

* To be invoked from within the actor itself. */ - protected[this] def unlink(actorId: ActorID) = { + protected[this] def unlink(actorId: ActorRef) = { if (!getLinkedActors.contains(actorId)) throw new IllegalStateException( "Actor [" + actorId + "] is not a linked actor, can't unlink") getLinkedActors.remove(actorId) @@ -905,7 +905,7 @@ trait Actor extends TransactionManagement with Logging { *

* To be invoked from within the actor itself. */ - protected[this] def startLink(actorId: ActorID) = { + protected[this] def startLink(actorId: ActorRef) = { try { actorId.start } finally { @@ -918,7 +918,7 @@ trait Actor extends TransactionManagement with Logging { *

* To be invoked from within the actor itself. */ - protected[this] def startLinkRemote(actorId: ActorID, hostname: String, port: Int) = { + protected[this] def startLinkRemote(actorId: ActorRef, hostname: String, port: Int) = { try { actorId.makeRemote(hostname, port) actorId.start @@ -932,7 +932,7 @@ trait Actor extends TransactionManagement with Logging { *

* To be invoked from within the actor itself. */ - protected[this] def spawn[T <: Actor : Manifest]: ActorID = { + protected[this] def spawn[T <: Actor : Manifest]: ActorRef = { val actorId = spawnButDoNotStart[T] actorId.start actorId @@ -943,7 +943,7 @@ trait Actor extends TransactionManagement with Logging { *

* To be invoked from within the actor itself. */ - protected[this] def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorID = { + protected[this] def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef = { val actor = spawnButDoNotStart[T] actor.makeRemote(hostname, port) actor.start @@ -955,7 +955,7 @@ trait Actor extends TransactionManagement with Logging { *

* To be invoked from within the actor itself. */ - protected[this] def spawnLink[T <: Actor: Manifest]: ActorID = { + protected[this] def spawnLink[T <: Actor: Manifest]: ActorRef = { val actor = spawnButDoNotStart[T] try { actor.start @@ -970,7 +970,7 @@ trait Actor extends TransactionManagement with Logging { *

* To be invoked from within the actor itself. */ - protected[this] def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorID = { + protected[this] def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef = { val actor = spawnButDoNotStart[T] try { actor.makeRemote(hostname, port) @@ -999,15 +999,15 @@ trait Actor extends TransactionManagement with Logging { private[akka] def _resume = _isSuspended = false - private def spawnButDoNotStart[T <: Actor : Manifest]: ActorID = { + private def spawnButDoNotStart[T <: Actor : Manifest]: ActorRef = { val actor = manifest[T].erasure.asInstanceOf[Class[T]].newInstance if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) { actor.dispatcher = dispatcher } - new ActorID(() => actor) + new ActorRef(() => actor) } - protected[akka] def postMessageToMailbox(message: Any, sender: Option[ActorID]): Unit = { + protected[akka] def postMessageToMailbox(message: Any, sender: Option[ActorRef]): Unit = { joinTransaction(message) if (_remoteAddress.isDefined) { @@ -1201,7 +1201,7 @@ trait Actor extends TransactionManagement with Logging { case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message") } - private[this] def handleTrapExit(dead: ActorID, reason: Throwable): Unit = { + private[this] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = { if (trapExit.exists(_.isAssignableFrom(reason.getClass))) { if (faultHandler.isDefined) { faultHandler.get match { @@ -1216,7 +1216,7 @@ trait Actor extends TransactionManagement with Logging { } private[this] def restartLinkedActors(reason: Throwable) = { - getLinkedActors.toArray.toList.asInstanceOf[List[ActorID]].foreach { + getLinkedActors.toArray.toList.asInstanceOf[List[ActorRef]].foreach { actorId => val actor = actorId.actor if (actor.lifeCycle.isEmpty) actor.lifeCycle = Some(LifeCycle(Permanent)) @@ -1257,9 +1257,9 @@ trait Actor extends TransactionManagement with Logging { } else None } - protected def getLinkedActors: HashSet[ActorID] = { + protected def getLinkedActors: HashSet[ActorRef] = { if (_linkedActors.isEmpty) { - val set = new HashSet[ActorID] + val set = new HashSet[ActorRef] _linkedActors = Some(set) set } else _linkedActors.get @@ -1314,7 +1314,7 @@ object DispatcherType { * * @author Jonas Bonér */ -class ActorMessageInvoker private[akka] (val actorId: ActorID) extends MessageInvoker { +class ActorMessageInvoker private[akka] (val actorId: ActorRef) extends MessageInvoker { def invoke(handle: MessageInvocation) = actorId.actor.invoke(handle) } diff --git a/akka-core/src/main/scala/actor/ActorRegistry.scala b/akka-core/src/main/scala/actor/ActorRegistry.scala index 14ab413037..c72c588937 100644 --- a/akka-core/src/main/scala/actor/ActorRegistry.scala +++ b/akka-core/src/main/scala/actor/ActorRegistry.scala @@ -11,8 +11,8 @@ import scala.reflect.Manifest import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap} sealed trait ActorRegistryEvent -case class ActorRegistered(actor: ActorID) extends ActorRegistryEvent -case class ActorUnregistered(actor: ActorID) extends ActorRegistryEvent +case class ActorRegistered(actor: ActorRef) extends ActorRegistryEvent +case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent /** * Registry holding all Actor instances in the whole system. @@ -27,16 +27,16 @@ case class ActorUnregistered(actor: ActorID) extends ActorRegistryEvent * @author Jonas Bonér */ object ActorRegistry extends Logging { - private val actorsByUUID = new ConcurrentHashMap[String, ActorID] - private val actorsById = new ConcurrentHashMap[String, List[ActorID]] - private val actorsByClassName = new ConcurrentHashMap[String, List[ActorID]] - private val registrationListeners = new CopyOnWriteArrayList[ActorID] + private val actorsByUUID = new ConcurrentHashMap[String, ActorRef] + private val actorsById = new ConcurrentHashMap[String, List[ActorRef]] + private val actorsByClassName = new ConcurrentHashMap[String, List[ActorRef]] + private val registrationListeners = new CopyOnWriteArrayList[ActorRef] /** * Returns all actors in the system. */ - def actors: List[ActorID] = { - val all = new ListBuffer[ActorID] + def actors: List[ActorRef] = { + val all = new ListBuffer[ActorRef] val elements = actorsByUUID.elements while (elements.hasMoreElements) all += elements.nextElement all.toList @@ -45,7 +45,7 @@ object ActorRegistry extends Logging { /** * Invokes a function for all actors. */ - def foreach(f: (ActorID) => Unit) = { + def foreach(f: (ActorRef) => Unit) = { val elements = actorsByUUID.elements while (elements.hasMoreElements) f(elements.nextElement) } @@ -53,8 +53,8 @@ object ActorRegistry extends Logging { /** * Finds all actors that are subtypes of the class passed in as the Manifest argument. */ - def actorsFor[T <: Actor](implicit manifest: Manifest[T]): List[ActorID] = { - val all = new ListBuffer[ActorID] + def actorsFor[T <: Actor](implicit manifest: Manifest[T]): List[ActorRef] = { + val all = new ListBuffer[ActorRef] val elements = actorsByUUID.elements while (elements.hasMoreElements) { val actorId = elements.nextElement @@ -68,7 +68,7 @@ object ActorRegistry extends Logging { /** * Finds all actors of the exact type specified by the class passed in as the Class argument. */ - def actorsFor[T <: Actor](clazz: Class[T]): List[ActorID] = { + def actorsFor[T <: Actor](clazz: Class[T]): List[ActorRef] = { if (actorsByClassName.containsKey(clazz.getName)) actorsByClassName.get(clazz.getName) else Nil } @@ -76,7 +76,7 @@ object ActorRegistry extends Logging { /** * Finds all actors that has a specific id. */ - def actorsFor(id: String): List[ActorID] = { + def actorsFor(id: String): List[ActorRef] = { if (actorsById.containsKey(id)) actorsById.get(id) else Nil } @@ -84,7 +84,7 @@ object ActorRegistry extends Logging { /** * Finds the actor that has a specific UUID. */ - def actorFor(uuid: String): Option[ActorID] = { + def actorFor(uuid: String): Option[ActorRef] = { if (actorsByUUID.containsKey(uuid)) Some(actorsByUUID.get(uuid)) else None } @@ -92,7 +92,7 @@ object ActorRegistry extends Logging { /** * Registers an actor in the ActorRegistry. */ - def register(actorId: ActorID) = { + def register(actorId: ActorRef) = { // UUID actorsByUUID.put(actorId.uuid, actorId) @@ -115,7 +115,7 @@ object ActorRegistry extends Logging { /** * Unregisters an actor in the ActorRegistry. */ - def unregister(actor: ActorID) = { + def unregister(actor: ActorRef) = { actorsByUUID remove actor.uuid actorsById remove actor.id actorsByClassName remove actor.getClass.getName @@ -138,7 +138,7 @@ object ActorRegistry extends Logging { /** * Adds the registration listener this this registry's listener list. */ - def addRegistrationListener(listener: ActorID) = { + def addRegistrationListener(listener: ActorRef) = { listener.start registrationListeners.add(listener) } @@ -146,12 +146,12 @@ object ActorRegistry extends Logging { /** * Removes the registration listener this this registry's listener list. */ - def removeRegistrationListener(listener: ActorID) = { + def removeRegistrationListener(listener: ActorRef) = { listener.stop registrationListeners.remove(listener) } - private def foreachListener(f: (ActorID) => Unit) { + private def foreachListener(f: (ActorRef) => Unit) { val iterator = registrationListeners.iterator while (iterator.hasNext) { val listener = iterator.next diff --git a/akka-core/src/main/scala/actor/Scheduler.scala b/akka-core/src/main/scala/actor/Scheduler.scala index b88bfd33c2..905cd6c8fe 100644 --- a/akka-core/src/main/scala/actor/Scheduler.scala +++ b/akka-core/src/main/scala/actor/Scheduler.scala @@ -26,7 +26,7 @@ case class SchedulerException(msg: String, e: Throwable) extends RuntimeExceptio * Rework of David Pollak's ActorPing class in the Lift Project * which is licensed under the Apache 2 License. */ -class ScheduleActor(val receiver: ActorID, val future: ScheduledFuture[AnyRef]) extends Actor with Logging { +class ScheduleActor(val receiver: ActorRef, val future: ScheduledFuture[AnyRef]) extends Actor with Logging { lifeCycle = Some(LifeCycle(Permanent)) def receive = { @@ -39,14 +39,14 @@ class ScheduleActor(val receiver: ActorID, val future: ScheduledFuture[AnyRef]) object Scheduler extends Actor { private var service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory) - private val schedulers = new ConcurrentHashMap[ActorID, ActorID] + private val schedulers = new ConcurrentHashMap[ActorRef, ActorRef] faultHandler = Some(OneForOneStrategy(5, 5000)) trapExit = List(classOf[Throwable]) start - def schedule(receiver: ActorID, message: AnyRef, initialDelay: Long, delay: Long, timeUnit: TimeUnit) = { + def schedule(receiver: ActorRef, message: AnyRef, initialDelay: Long, delay: Long, timeUnit: TimeUnit) = { try { - startLink(new ActorID(() => new ScheduleActor( + startLink(new ActorRef(() => new ScheduleActor( receiver, service.scheduleAtFixedRate(new java.lang.Runnable { def run = receiver ! message; @@ -58,7 +58,7 @@ object Scheduler extends Actor { def restart = service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory) - def stopSupervising(actorId: ActorID) = { + def stopSupervising(actorId: ActorRef) = { unlink(actorId) schedulers.remove(actorId) } diff --git a/akka-core/src/main/scala/actor/Supervisor.scala b/akka-core/src/main/scala/actor/Supervisor.scala index 4de358f7ce..f4b7b2b012 100644 --- a/akka-core/src/main/scala/actor/Supervisor.scala +++ b/akka-core/src/main/scala/actor/Supervisor.scala @@ -86,7 +86,7 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep // FIXME should Supervisor really havea newThreadBasedDispatcher?? dispatcher = Dispatchers.newThreadBasedDispatcher(this) - private val actors = new ConcurrentHashMap[String, List[ActorID]] + private val actors = new ConcurrentHashMap[String, List[ActorRef]] // Cheating, should really go through the dispatcher rather than direct access to a CHM def getInstance[T](clazz: Class[T]): List[T] = actors.get(clazz.getName).asInstanceOf[List[T]] @@ -103,7 +103,7 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep override def stop = synchronized { super[Actor].stop - getLinkedActors.toArray.toList.asInstanceOf[List[ActorID]].foreach { actorId => + getLinkedActors.toArray.toList.asInstanceOf[List[ActorRef]].foreach { actorId => actorId.stop log.info("Shutting actor down: %s", actorId) } @@ -123,7 +123,7 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep val className = actorId.actor.getClass.getName val currentActors = { val list = actors.get(className) - if (list eq null) List[ActorID]() + if (list eq null) List[ActorRef]() else list } actors.put(className, actorId :: currentActors) @@ -143,7 +143,7 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep val className = supervisor.getClass.getName val currentSupervisors = { val list = actors.get(className) - if (list eq null) List[ActorID]() + if (list eq null) List[ActorRef]() else list } actors.put(className, supervisor.self :: currentSupervisors) diff --git a/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala b/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala index 520487ebd4..7c8da6efc9 100644 --- a/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala +++ b/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala @@ -7,7 +7,7 @@ package se.scalablesolutions.akka.config import com.google.inject._ import se.scalablesolutions.akka.config.ScalaConfig._ -import se.scalablesolutions.akka.actor.{Supervisor, ActiveObject, Dispatcher, ActorID} +import se.scalablesolutions.akka.actor.{Supervisor, ActiveObject, Dispatcher, ActorRef} import se.scalablesolutions.akka.remote.RemoteServer import se.scalablesolutions.akka.util.Logging @@ -94,7 +94,7 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat .actorsFor(RemoteServer.Address(component.remoteAddress.get.hostname, component.remoteAddress.get.port)) .activeObjects.put(targetClass.getName, proxy) } - supervised ::= Supervise(new ActorID(() => actor), component.lifeCycle) + supervised ::= Supervise(new ActorRef(() => actor), component.lifeCycle) activeObjectRegistry.put(targetClass, (proxy, proxy, component)) new DependencyBinding(targetClass, proxy) } @@ -116,7 +116,7 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat .actorsFor(RemoteServer.Address(component.remoteAddress.get.hostname, component.remoteAddress.get.port)) .activeObjects.put(targetClass.getName, proxy) } - supervised ::= Supervise(new ActorID(() => actor), component.lifeCycle) + supervised ::= Supervise(new ActorRef(() => actor), component.lifeCycle) activeObjectRegistry.put(targetClass, (proxy, targetInstance, component)) new DependencyBinding(targetClass, proxy) } diff --git a/akka-core/src/main/scala/config/SupervisionConfig.scala b/akka-core/src/main/scala/config/SupervisionConfig.scala index ffc90f7877..7e1daa5935 100644 --- a/akka-core/src/main/scala/config/SupervisionConfig.scala +++ b/akka-core/src/main/scala/config/SupervisionConfig.scala @@ -4,7 +4,7 @@ package se.scalablesolutions.akka.config -import se.scalablesolutions.akka.actor.{Actor, ActorID} +import se.scalablesolutions.akka.actor.{Actor, ActorRef} import se.scalablesolutions.akka.dispatch.MessageDispatcher sealed abstract class FaultHandlingStrategy @@ -25,12 +25,12 @@ object ScalaConfig { case class SupervisorConfig(restartStrategy: RestartStrategy, worker: List[Server]) extends Server - class Supervise(val actorId: ActorID, val lifeCycle: LifeCycle, _remoteAddress: RemoteAddress) extends Server { + class Supervise(val actorId: ActorRef, val lifeCycle: LifeCycle, _remoteAddress: RemoteAddress) extends Server { val remoteAddress: Option[RemoteAddress] = if (_remoteAddress eq null) None else Some(_remoteAddress) } object Supervise { - def apply(actorId: ActorID, lifeCycle: LifeCycle, remoteAddress: RemoteAddress) = new Supervise(actorId, lifeCycle, remoteAddress) - def apply(actorId: ActorID, lifeCycle: LifeCycle) = new Supervise(actorId, lifeCycle, null) + def apply(actorId: ActorRef, lifeCycle: LifeCycle, remoteAddress: RemoteAddress) = new Supervise(actorId, lifeCycle, remoteAddress) + def apply(actorId: ActorRef, lifeCycle: LifeCycle) = new Supervise(actorId, lifeCycle, null) def unapply(supervise: Supervise) = Some((supervise.actorId, supervise.lifeCycle, supervise.remoteAddress)) } @@ -227,7 +227,7 @@ object JavaConfig { intf, target, lifeCycle.transform, timeout, transactionRequired, dispatcher, if (remoteAddress ne null) se.scalablesolutions.akka.config.ScalaConfig.RemoteAddress(remoteAddress.hostname, remoteAddress.port) else null) - def newSupervised(actorId: ActorID) = + def newSupervised(actorId: ActorRef) = se.scalablesolutions.akka.config.ScalaConfig.Supervise(actorId, lifeCycle.transform) } diff --git a/akka-core/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala index 9a5b6de189..f5fd490df9 100644 --- a/akka-core/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala @@ -7,7 +7,7 @@ package se.scalablesolutions.akka.dispatch import java.util.{LinkedList, Queue, List} import java.util.HashMap -import se.scalablesolutions.akka.actor.{ActorMessageInvoker, Actor, ActorID} +import se.scalablesolutions.akka.actor.{ActorMessageInvoker, Actor, ActorRef} abstract class AbstractReactorBasedEventDrivenDispatcher(val name: String) extends MessageDispatcher { @volatile protected var active: Boolean = false @@ -18,12 +18,12 @@ abstract class AbstractReactorBasedEventDrivenDispatcher(val name: String) exten def dispatch(invocation: MessageInvocation) = queue.append(invocation) - override def register(actorId: ActorID) = synchronized { + override def register(actorId: ActorRef) = synchronized { messageInvokers.put(actorId, new ActorMessageInvoker(actorId)) super.register(actorId) } - override def unregister(actorId: ActorID) = synchronized { + override def unregister(actorId: ActorRef) = synchronized { messageInvokers.remove(actorId) super.unregister(actorId) } diff --git a/akka-core/src/main/scala/dispatch/Dispatchers.scala b/akka-core/src/main/scala/dispatch/Dispatchers.scala index b3ce9567f3..411ab297ea 100644 --- a/akka-core/src/main/scala/dispatch/Dispatchers.scala +++ b/akka-core/src/main/scala/dispatch/Dispatchers.scala @@ -4,7 +4,7 @@ package se.scalablesolutions.akka.dispatch -import se.scalablesolutions.akka.actor.{Actor, ActorID} +import se.scalablesolutions.akka.actor.{Actor, ActorRef} /** * Scala API. Dispatcher factory. @@ -40,7 +40,7 @@ import se.scalablesolutions.akka.actor.{Actor, ActorID} */ object Dispatchers { object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global") { - override def register(actor: ActorID) = { + override def register(actor: ActorRef) = { if (isShutdown) init super.register(actor) } diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index 2360945f5e..23adfed05f 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -6,7 +6,7 @@ package se.scalablesolutions.akka.dispatch import java.util.concurrent.CopyOnWriteArrayList -import se.scalablesolutions.akka.actor.{Actor, ActorID} +import se.scalablesolutions.akka.actor.{Actor, ActorRef} /** * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed @@ -31,12 +31,12 @@ import se.scalablesolutions.akka.actor.{Actor, ActorID} class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends MessageDispatcher with ThreadPoolBuilder { @volatile private var active: Boolean = false - implicit def actorId2actor(actorId: ActorID): Actor = actorId.actor + implicit def actorId2actor(actorId: ActorRef): Actor = actorId.actor /** Type of the actors registered in this dispatcher. */ private var actorType:Option[Class[_]] = None - private val pooledActors = new CopyOnWriteArrayList[ActorID] + private val pooledActors = new CopyOnWriteArrayList[ActorRef] /** The index in the pooled actors list which was last used to steal work */ @volatile private var lastThiefIndex = 0 @@ -68,7 +68,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess * * @return true if the mailbox was processed, false otherwise */ - private def tryProcessMailbox(receiver: ActorID): Boolean = { + private def tryProcessMailbox(receiver: ActorRef): Boolean = { var lockAcquiredOnce = false val lock = receiver.actor._dispatcherLock // this do-wile loop is required to prevent missing new messages between the end of processing @@ -90,7 +90,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess /** * Process the messages in the mailbox of the given actor. */ - private def processMailbox(receiver: ActorID) = { + private def processMailbox(receiver: ActorRef) = { var messageInvocation = receiver._mailbox.poll while (messageInvocation != null) { messageInvocation.invoke @@ -98,9 +98,9 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess } } - private def findThief(receiver: ActorID): Option[ActorID] = { + private def findThief(receiver: ActorRef): Option[ActorRef] = { // copy to prevent concurrent modifications having any impact - val actors = pooledActors.toArray(new Array[ActorID](pooledActors.size)) + val actors = pooledActors.toArray(new Array[ActorRef](pooledActors.size)) var i = lastThiefIndex if (i > actors.size) i = 0 @@ -108,7 +108,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess // we risk to pick a thief which is unregistered from the dispatcher in the meantime, but that typically means // the dispatcher is being shut down... doFindThief(receiver, actors, i) match { - case (thief: Option[ActorID], index: Int) => { + case (thief: Option[ActorRef], index: Int) => { lastThiefIndex = (index + 1) % actors.size return thief } @@ -123,7 +123,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess * @param startIndex first index to start looking in the list (i.e. for round robin) * @return the thief (or None) and the new index to start searching next time */ - private def doFindThief(receiver: ActorID, actors: Array[ActorID], startIndex: Int): (Option[ActorID], Int) = { + private def doFindThief(receiver: ActorRef, actors: Array[ActorRef], startIndex: Int): (Option[ActorRef], Int) = { for (i <- 0 to actors.length) { val index = (i + startIndex) % actors.length val actor = actors(index) @@ -140,7 +140,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess * Try donating messages to the thief and processing the thiefs mailbox. Doesn't do anything if we can not acquire * the thiefs dispatching lock, because in that case another thread is already processing the thiefs mailbox. */ - private def tryDonateAndProcessMessages(receiver: ActorID, thief: ActorID) = { + private def tryDonateAndProcessMessages(receiver: ActorRef, thief: ActorRef) = { if (thief._dispatcherLock.tryLock) { try { donateAndProcessMessages(receiver, thief) @@ -153,7 +153,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess /** * Donate messages to the thief and process them on the thief as long as the receiver has more messages. */ - private def donateAndProcessMessages(receiver: ActorID, thief: ActorID): Unit = { + private def donateAndProcessMessages(receiver: ActorRef, thief: ActorRef): Unit = { donateMessage(receiver, thief) match { case None => { // no more messages to donate @@ -169,7 +169,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess /** * Steal a message from the receiver and give it to the thief. */ - private def donateMessage(receiver: ActorID, thief: ActorID): Option[MessageInvocation] = { + private def donateMessage(receiver: ActorRef, thief: ActorRef): Option[MessageInvocation] = { val donated = receiver._mailbox.pollLast if (donated != null) { thief.self ! donated.message @@ -193,20 +193,20 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess private[akka] def init = withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool - override def register(actorId: ActorID) = { + override def register(actorId: ActorRef) = { verifyActorsAreOfSameType(actorId) pooledActors.add(actorId) super.register(actorId) } - override def unregister(actorId: ActorID) = { + override def unregister(actorId: ActorRef) = { pooledActors.remove(actorId) super.unregister(actorId) } def usesActorMailbox = true - private def verifyActorsAreOfSameType(newActorId: ActorID) = { + private def verifyActorsAreOfSameType(newActorId: ActorRef) = { actorType match { case None => { actorType = Some(newActorId.actor.getClass) diff --git a/akka-core/src/main/scala/dispatch/MessageHandling.scala b/akka-core/src/main/scala/dispatch/MessageHandling.scala index 4f2cdab348..9d5c049495 100644 --- a/akka-core/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-core/src/main/scala/dispatch/MessageHandling.scala @@ -7,15 +7,15 @@ package se.scalablesolutions.akka.dispatch import java.util.List import se.scalablesolutions.akka.util.{HashCode, Logging} -import se.scalablesolutions.akka.actor.{Actor, ActorID} +import se.scalablesolutions.akka.actor.{Actor, ActorRef} import java.util.concurrent.ConcurrentHashMap import org.multiverse.commitbarriers.CountDownCommitBarrier -final class MessageInvocation(val receiver: ActorID, +final class MessageInvocation(val receiver: ActorRef, val message: Any, - val replyTo : Option[Either[ActorID, CompletableFuture[Any]]], + val replyTo : Option[Either[ActorRef, CompletableFuture[Any]]], val transactionSet: Option[CountDownCommitBarrier]) { if (receiver eq null) throw new IllegalArgumentException("receiver is null") @@ -56,12 +56,12 @@ trait MessageInvoker { } trait MessageDispatcher extends Logging { - protected val references = new ConcurrentHashMap[String, ActorID] + protected val references = new ConcurrentHashMap[String, ActorRef] def dispatch(invocation: MessageInvocation) def start def shutdown - def register(actorId: ActorID) = references.put(actorId.uuid, actorId) - def unregister(actorId: ActorID) = { + def register(actorId: ActorRef) = references.put(actorId.uuid, actorId) + def unregister(actorId: ActorRef) = { references.remove(actorId.uuid) if (canBeShutDown) shutdown // shut down in the dispatcher's references is zero diff --git a/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala b/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala index 2d71dc359e..db78f41c1e 100644 --- a/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala @@ -7,7 +7,7 @@ package se.scalablesolutions.akka.dispatch import java.util.concurrent.LinkedBlockingQueue import java.util.Queue -import se.scalablesolutions.akka.actor.{Actor, ActorID, ActorMessageInvoker} +import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorMessageInvoker} /** * Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue. @@ -17,7 +17,7 @@ import se.scalablesolutions.akka.actor.{Actor, ActorID, ActorMessageInvoker} class ThreadBasedDispatcher private[akka] (val name: String, val messageHandler: MessageInvoker) extends MessageDispatcher { - def this(actor: Actor) = this(actor.getClass.getName, new ActorMessageInvoker(new ActorID(() => actor))) + def this(actor: Actor) = this(actor.getClass.getName, new ActorMessageInvoker(new ActorRef(() => actor))) private val queue = new BlockingMessageQueue(name) private var selectorThread: Thread = _ diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala index 9c03683771..f977260cf2 100644 --- a/akka-core/src/main/scala/remote/RemoteClient.scala +++ b/akka-core/src/main/scala/remote/RemoteClient.scala @@ -5,7 +5,7 @@ package se.scalablesolutions.akka.remote import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequest, RemoteReply} -import se.scalablesolutions.akka.actor.{Exit, Actor, ActorID} +import se.scalablesolutions.akka.actor.{Exit, Actor, ActorRef} import se.scalablesolutions.akka.dispatch.{DefaultCompletableFuture, CompletableFuture} import se.scalablesolutions.akka.util.{UUID, Logging} import se.scalablesolutions.akka.config.Config.config @@ -27,6 +27,8 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable.{HashSet, HashMap} /** + * Atomic remote request/reply message id generator. + * * @author Jonas Bonér */ object RemoteRequestIdFactory { @@ -41,6 +43,69 @@ case class RemoteClientError(cause: Throwable) extends RemoteClientLifeCycleEven case class RemoteClientDisconnected(host: String, port: Int) extends RemoteClientLifeCycleEvent case class RemoteClientConnected(host: String, port: Int) extends RemoteClientLifeCycleEvent +/** + * Remote Actor proxy factory. + * + * @author Jonas Bonér + */ +private[akka] object RemoteActorProxy { + def apply(uuid: String, className: String, hostname: String, port: Int, timeout: Long): ActorRef = + new ActorRef(() => new RemoteActorProxy(uuid, className, hostname, port, timeout)) +} + +/** + * Remote Actor proxy. + * + * @author Jonas Bonér + */ +private[akka] class RemoteActorProxy private ( + uuid: String, className: String, hostname: String, port: Int, timeOut: Long) extends Actor { + start + val remoteClient = RemoteClient.clientFor(hostname, port) + + override def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = { + val requestBuilder = RemoteRequest.newBuilder + .setId(RemoteRequestIdFactory.nextId) + .setTarget(className) + .setTimeout(timeOut) + .setUuid(uuid) + .setIsActor(true) + .setIsOneWay(true) + .setIsEscaped(false) + if (senderOption.isDefined) { + val sender = senderOption.get.actor + requestBuilder.setSourceTarget(sender.getClass.getName) + requestBuilder.setSourceUuid(sender.uuid) + val (host, port) = sender._replyToAddress.map(address => + (address.getHostName, address.getPort)).getOrElse((Actor.HOSTNAME, Actor.PORT)) + requestBuilder.setSourceHostname(host) + requestBuilder.setSourcePort(port) + } + RemoteProtocolBuilder.setMessage(message, requestBuilder) + remoteClient.send[Any](requestBuilder.build, None) + } + + override def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( + message: Any, + timeout: Long, + senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { + val requestBuilder = RemoteRequest.newBuilder + .setId(RemoteRequestIdFactory.nextId) + .setTarget(className) + .setTimeout(timeout) + .setUuid(uuid) + .setIsActor(true) + .setIsOneWay(false) + .setIsEscaped(false) + RemoteProtocolBuilder.setMessage(message, requestBuilder) + val future = remoteClient.send(requestBuilder.build, senderFuture) + if (future.isDefined) future.get + else throw new IllegalStateException("Expected a future from remote call to actor " + toString) + } + + def receive = {case _ => {}} +} + /** * @author Jonas Bonér */ @@ -53,62 +118,17 @@ object RemoteClient extends Logging { // FIXME: simplify overloaded methods when we have Scala 2.8 - def actorFor(className: String, hostname: String, port: Int): ActorID = + def actorFor(className: String, hostname: String, port: Int): ActorRef = actorFor(className, className, 5000L, hostname, port) - def actorFor(actorId: String, className: String, hostname: String, port: Int): ActorID = + def actorFor(actorId: String, className: String, hostname: String, port: Int): ActorRef = actorFor(actorId, className, 5000L, hostname, port) - def actorFor(className: String, timeout: Long, hostname: String, port: Int): ActorID = + def actorFor(className: String, timeout: Long, hostname: String, port: Int): ActorRef = actorFor(className, className, timeout, hostname, port) - def actorFor(actorId: String, className: String, timeout: Long, hostname: String, port: Int): ActorID = new ActorID(() => - new Actor { - start - val remoteClient = RemoteClient.clientFor(hostname, port) - - override def postMessageToMailbox(message: Any, sender: Option[ActorID]): Unit = { - val requestBuilder = RemoteRequest.newBuilder - .setId(RemoteRequestIdFactory.nextId) - .setTarget(className) - .setTimeout(timeout) - .setUuid(actorId) - .setIsActor(true) - .setIsOneWay(true) - .setIsEscaped(false) - if (sender.isDefined) { - val sndr = sender.get.actor - requestBuilder.setSourceTarget(sndr.getClass.getName) - requestBuilder.setSourceUuid(sndr.uuid) - val (host, port) = sndr._replyToAddress.map(a => (a.getHostName, a.getPort)).getOrElse((Actor.HOSTNAME, Actor.PORT)) - requestBuilder.setSourceHostname(host) - requestBuilder.setSourcePort(port) - } - RemoteProtocolBuilder.setMessage(message, requestBuilder) - remoteClient.send[Any](requestBuilder.build, None) - } - - override def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( - message: Any, - timeout: Long, - senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { - val requestBuilder = RemoteRequest.newBuilder - .setId(RemoteRequestIdFactory.nextId) - .setTarget(className) - .setTimeout(timeout) - .setUuid(actorId) - .setIsActor(true) - .setIsOneWay(false) - .setIsEscaped(false) - RemoteProtocolBuilder.setMessage(message, requestBuilder) - val future = remoteClient.send(requestBuilder.build, senderFuture) - if (future.isDefined) future.get - else throw new IllegalStateException("Expected a future from remote call to actor " + toString) - } - - def receive = {case _ => {}} - } - ) + def actorFor(actorId: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef = + RemoteActorProxy(actorId, className, hostname, port, timeout) def clientFor(hostname: String, port: Int): RemoteClient = clientFor(new InetSocketAddress(hostname, port)) @@ -174,8 +194,8 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging { @volatile private[remote] var isRunning = false private val futures = new ConcurrentHashMap[Long, CompletableFuture[_]] - private val supervisors = new ConcurrentHashMap[String, ActorID] - private[remote] val listeners = new ConcurrentSkipListSet[ActorID] + private val supervisors = new ConcurrentHashMap[String, ActorRef] + private[remote] val listeners = new ConcurrentSkipListSet[ActorRef] private val channelFactory = new NioClientSocketChannelFactory( Executors.newCachedThreadPool, @@ -200,7 +220,7 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging { val channel = connection.awaitUninterruptibly.getChannel openChannels.add(channel) if (!connection.isSuccess) { - listeners.toArray.foreach(l => l.asInstanceOf[ActorID] ! RemoteClientError(connection.getCause)) + listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(connection.getCause)) log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port) } isRunning = true @@ -232,21 +252,21 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging { } } else { val exception = new IllegalStateException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.") - listeners.toArray.foreach(l => l.asInstanceOf[ActorID] ! RemoteClientError(exception)) + listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(exception)) throw exception } - def registerSupervisorForActor(actorId: ActorID) = + def registerSupervisorForActor(actorId: ActorRef) = if (!actorId.supervisor.isDefined) throw new IllegalStateException("Can't register supervisor for " + actorId + " since it is not under supervision") else supervisors.putIfAbsent(actorId.supervisor.get.uuid, actorId) - def deregisterSupervisorForActor(actorId: ActorID) = + def deregisterSupervisorForActor(actorId: ActorRef) = if (!actorId.supervisor.isDefined) throw new IllegalStateException("Can't unregister supervisor for " + actorId + " since it is not under supervision") else supervisors.remove(actorId.supervisor.get.uuid) - def registerListener(actorId: ActorID) = listeners.add(actorId) + def registerListener(actorId: ActorRef) = listeners.add(actorId) - def deregisterListener(actorId: ActorID) = listeners.remove(actorId) + def deregisterListener(actorId: ActorRef) = listeners.remove(actorId) } /** @@ -254,7 +274,7 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging { */ class RemoteClientPipelineFactory(name: String, futures: ConcurrentMap[Long, CompletableFuture[_]], - supervisors: ConcurrentMap[String, ActorID], + supervisors: ConcurrentMap[String, ActorRef], bootstrap: ClientBootstrap, remoteAddress: SocketAddress, timer: HashedWheelTimer, @@ -285,7 +305,7 @@ class RemoteClientPipelineFactory(name: String, @ChannelHandler.Sharable class RemoteClientHandler(val name: String, val futures: ConcurrentMap[Long, CompletableFuture[_]], - val supervisors: ConcurrentMap[String, ActorID], + val supervisors: ConcurrentMap[String, ActorRef], val bootstrap: ClientBootstrap, val remoteAddress: SocketAddress, val timer: HashedWheelTimer, @@ -325,12 +345,12 @@ class RemoteClientHandler(val name: String, futures.remove(reply.getId) } else { val exception = new IllegalArgumentException("Unknown message received in remote client handler: " + result) - client.listeners.toArray.foreach(l => l.asInstanceOf[ActorID] ! RemoteClientError(exception)) + client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(exception)) throw exception } } catch { case e: Exception => - client.listeners.toArray.foreach(l => l.asInstanceOf[ActorID] ! RemoteClientError(e)) + client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(e)) log.error("Unexpected exception in remote client handler: %s", e) throw e } @@ -345,7 +365,7 @@ class RemoteClientHandler(val name: String, // Wait until the connection attempt succeeds or fails. client.connection.awaitUninterruptibly if (!client.connection.isSuccess) { - client.listeners.toArray.foreach(l => l.asInstanceOf[ActorID] ! RemoteClientError(client.connection.getCause)) + client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(client.connection.getCause)) log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress) } } @@ -353,17 +373,17 @@ class RemoteClientHandler(val name: String, } override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - client.listeners.toArray.foreach(l => l.asInstanceOf[ActorID] ! RemoteClientConnected(client.hostname, client.port)) + client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientConnected(client.hostname, client.port)) log.debug("Remote client connected to [%s]", ctx.getChannel.getRemoteAddress) } override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - client.listeners.toArray.foreach(l => l.asInstanceOf[ActorID] ! RemoteClientDisconnected(client.hostname, client.port)) + client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientDisconnected(client.hostname, client.port)) log.debug("Remote client disconnected from [%s]", ctx.getChannel.getRemoteAddress) } override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { - client.listeners.toArray.foreach(l => l.asInstanceOf[ActorID] ! RemoteClientError(event.getCause)) + client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(event.getCause)) log.error(event.getCause, "Unexpected exception from downstream in remote client") event.getChannel.close } diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index 64e72f3fe8..7b3ad3ca03 100644 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -89,7 +89,7 @@ object RemoteServer { } class RemoteActorSet { - val actors = new ConcurrentHashMap[String, ActorID] + val actors = new ConcurrentHashMap[String, ActorRef] val activeObjects = new ConcurrentHashMap[String, AnyRef] } @@ -168,7 +168,8 @@ class RemoteServer extends Logging { log.info("Starting remote server at [%s:%s]", hostname, port) RemoteServer.register(hostname, port, this) val remoteActorSet = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)) - val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, loader, remoteActorSet.actors, remoteActorSet.activeObjects) + val pipelineFactory = new RemoteServerPipelineFactory( + name, openChannels, loader, remoteActorSet.actors, remoteActorSet.activeObjects) bootstrap.setPipelineFactory(pipelineFactory) bootstrap.setOption("child.tcpNoDelay", true) bootstrap.setOption("child.keepAlive", true) @@ -198,9 +199,9 @@ class RemoteServer extends Logging { /** * Register Remote Actor by the Actor's 'id' field. */ - def register(actor: ActorID) = synchronized { + def register(actor: ActorRef) = synchronized { if (_isRunning) { - log.info("Registering server side remote actor [%s] with id [%s]", actor.getClass.getName, actor.id) + log.info("Registering server side remote actor [%s] with id [%s]", actor.actorClass.getName, actor.id) RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(actor.id, actor) } } @@ -208,9 +209,9 @@ class RemoteServer extends Logging { /** * Register Remote Actor by a specific 'id' passed as argument. */ - def register(id: String, actor: ActorID) = synchronized { + def register(id: String, actor: ActorRef) = synchronized { if (_isRunning) { - log.info("Registering server side remote actor [%s] with id [%s]", actor.getClass.getName, id) + log.info("Registering server side remote actor [%s] with id [%s]", actor.actorClass.getName, id) RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(id, actor) } } @@ -225,7 +226,7 @@ class RemoteServerPipelineFactory( val name: String, val openChannels: ChannelGroup, val loader: Option[ClassLoader], - val actors: JMap[String, ActorID], + val actors: JMap[String, ActorRef], val activeObjects: JMap[String, AnyRef]) extends ChannelPipelineFactory { import RemoteServer._ @@ -256,7 +257,7 @@ class RemoteServerHandler( val name: String, val openChannels: ChannelGroup, val applicationLoader: Option[ClassLoader], - val actors: JMap[String, ActorID], + val actors: JMap[String, ActorRef], val activeObjects: JMap[String, AnyRef]) extends SimpleChannelUpstreamHandler with Logging { val AW_PROXY_PREFIX = "$$ProxiedByAW".intern @@ -440,7 +441,7 @@ class RemoteServerHandler( * If actor already created then just return it from the registry. * Does not start the actor. */ - private def createActor(name: String, uuid: String, timeout: Long): ActorID = { + private def createActor(name: String, uuid: String, timeout: Long): ActorRef = { val actorIdOrNull = actors.get(uuid) if (actorIdOrNull eq null) { try { @@ -451,7 +452,7 @@ class RemoteServerHandler( newInstance._uuid = uuid newInstance.timeout = timeout newInstance._remoteAddress = None - val actorId = new ActorID(() => newInstance) + val actorId = new ActorRef(() => newInstance) actors.put(uuid, actorId) actorId } catch { diff --git a/akka-core/src/main/scala/routing/Iterators.scala b/akka-core/src/main/scala/routing/Iterators.scala index 77159767af..96140010c8 100644 --- a/akka-core/src/main/scala/routing/Iterators.scala +++ b/akka-core/src/main/scala/routing/Iterators.scala @@ -4,7 +4,7 @@ package se.scalablesolutions.akka.patterns -import se.scalablesolutions.akka.actor.ActorID +import se.scalablesolutions.akka.actor.ActorRef trait InfiniteIterator[T] extends Iterator[T] @@ -20,7 +20,7 @@ class CyclicIterator[T](items: List[T]) extends InfiniteIterator[T] { } } -class SmallestMailboxFirstIterator(items : List[ActorID]) extends InfiniteIterator[ActorID] { +class SmallestMailboxFirstIterator(items : List[ActorRef]) extends InfiniteIterator[ActorRef] { def hasNext = items != Nil def next = items.reduceLeft((a1, a2) => if (a1.mailboxSize < a2.mailboxSize) a1 else a2) diff --git a/akka-core/src/main/scala/routing/Listeners.scala b/akka-core/src/main/scala/routing/Listeners.scala index d2fcc1cc73..0d4b96cecf 100644 --- a/akka-core/src/main/scala/routing/Listeners.scala +++ b/akka-core/src/main/scala/routing/Listeners.scala @@ -4,16 +4,16 @@ package se.scalablesolutions.akka.patterns -import se.scalablesolutions.akka.actor.{Actor, ActorID} +import se.scalablesolutions.akka.actor.{Actor, ActorRef} sealed trait ListenerMessage -case class Listen(listener: ActorID) extends ListenerMessage -case class Deafen(listener: ActorID) extends ListenerMessage -case class WithListeners(f: Set[ActorID] => Unit) extends ListenerMessage +case class Listen(listener: ActorRef) extends ListenerMessage +case class Deafen(listener: ActorRef) extends ListenerMessage +case class WithListeners(f: Set[ActorRef] => Unit) extends ListenerMessage trait Listeners { self : Actor => import se.scalablesolutions.akka.actor.Agent - private lazy val listeners = Agent(Set[ActorID]()) + private lazy val listeners = Agent(Set[ActorRef]()) protected def listenerManagement : PartialFunction[Any,Unit] = { case Listen(l) => listeners( _ + l) diff --git a/akka-core/src/main/scala/routing/Patterns.scala b/akka-core/src/main/scala/routing/Patterns.scala index c8ac39fc72..5ad78261f8 100644 --- a/akka-core/src/main/scala/routing/Patterns.scala +++ b/akka-core/src/main/scala/routing/Patterns.scala @@ -4,7 +4,7 @@ package se.scalablesolutions.akka.patterns -import se.scalablesolutions.akka.actor.{Actor, ActorID} +import se.scalablesolutions.akka.actor.{Actor, ActorRef} import se.scalablesolutions.akka.actor.Actor._ object Patterns { @@ -27,24 +27,24 @@ object Patterns { filter({case a if a.isInstanceOf[A] => interceptor(a)}, interceptee) //FIXME 2.8, use default params with CyclicIterator - def loadBalancerActor(actors: => InfiniteIterator[ActorID]): ActorID = + def loadBalancerActor(actors: => InfiniteIterator[ActorRef]): ActorRef = newActor(() => new Actor with LoadBalancer { start val seq = actors }) - def dispatcherActor(routing: PF[Any, ActorID], msgTransformer: (Any) => Any): ActorID = + def dispatcherActor(routing: PF[Any, ActorRef], msgTransformer: (Any) => Any): ActorRef = newActor(() => new Actor with Dispatcher { start override def transform(msg: Any) = msgTransformer(msg) def routes = routing }) - def dispatcherActor(routing: PF[Any, ActorID]): ActorID = newActor(() => new Actor with Dispatcher { + def dispatcherActor(routing: PF[Any, ActorRef]): ActorRef = newActor(() => new Actor with Dispatcher { start def routes = routing }) - def loggerActor(actorToLog: ActorID, logger: (Any) => Unit): ActorID = + def loggerActor(actorToLog: ActorRef, logger: (Any) => Unit): ActorRef = dispatcherActor({case _ => actorToLog}, logger) } \ No newline at end of file diff --git a/akka-core/src/main/scala/routing/Routers.scala b/akka-core/src/main/scala/routing/Routers.scala index ce3c7c311c..7515197f74 100644 --- a/akka-core/src/main/scala/routing/Routers.scala +++ b/akka-core/src/main/scala/routing/Routers.scala @@ -4,13 +4,13 @@ package se.scalablesolutions.akka.patterns -import se.scalablesolutions.akka.actor.{Actor, ActorID} +import se.scalablesolutions.akka.actor.{Actor, ActorRef} trait Dispatcher { self: Actor => protected def transform(msg: Any): Any = msg - protected def routes: PartialFunction[Any, ActorID] + protected def routes: PartialFunction[Any, ActorRef] protected def dispatch: PartialFunction[Any, Unit] = { case a if routes.isDefinedAt(a) => @@ -22,7 +22,7 @@ trait Dispatcher { self: Actor => } trait LoadBalancer extends Dispatcher { self: Actor => - protected def seq: InfiniteIterator[ActorID] + protected def seq: InfiniteIterator[ActorRef] protected def routes = { case x if seq.hasNext => seq.next } } \ No newline at end of file diff --git a/akka-core/src/main/scala/stm/DataFlowVariable.scala b/akka-core/src/main/scala/stm/DataFlowVariable.scala index 4b42c76d82..5b4826550a 100644 --- a/akka-core/src/main/scala/stm/DataFlowVariable.scala +++ b/akka-core/src/main/scala/stm/DataFlowVariable.scala @@ -7,7 +7,7 @@ package se.scalablesolutions.akka.stm import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue} -import se.scalablesolutions.akka.actor.{Actor, ActorID} +import se.scalablesolutions.akka.actor.{Actor, ActorRef} import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.dispatch.CompletableFuture @@ -61,7 +61,7 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture private case object Get extends DataFlowVariableMessage private val value = new AtomicReference[Option[T]](None) - private val blockedReaders = new ConcurrentLinkedQueue[ActorID] + private val blockedReaders = new ConcurrentLinkedQueue[ActorRef] private class In[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor { timeout = TIME_OUT diff --git a/akka-core/src/test/scala/ActorFireForgetRequestReplySpec.scala b/akka-core/src/test/scala/ActorFireForgetRequestReplySpec.scala index e63d560183..50e655d97d 100644 --- a/akka-core/src/test/scala/ActorFireForgetRequestReplySpec.scala +++ b/akka-core/src/test/scala/ActorFireForgetRequestReplySpec.scala @@ -17,7 +17,7 @@ object ActorFireForgetRequestReplySpec { } } - class SenderActor(replyActor: ActorID) extends Actor { + class SenderActor(replyActor: ActorRef) extends Actor { dispatcher = Dispatchers.newThreadBasedDispatcher(this) def receive = { diff --git a/akka-core/src/test/scala/ClientInitiatedRemoteActorSpec.scala b/akka-core/src/test/scala/ClientInitiatedRemoteActorSpec.scala index 9a9117497d..9b8b5e03e2 100644 --- a/akka-core/src/test/scala/ClientInitiatedRemoteActorSpec.scala +++ b/akka-core/src/test/scala/ClientInitiatedRemoteActorSpec.scala @@ -45,7 +45,7 @@ object SendOneWayAndReplySenderActor { } class SendOneWayAndReplySenderActor extends Actor { var state: Option[AnyRef] = None - var sendTo: ActorID = _ + var sendTo: ActorRef = _ var latch: CountDownLatch = _ def sendOff = sendTo ! "Hello" diff --git a/akka-core/src/test/scala/ForwardActorSpec.scala b/akka-core/src/test/scala/ForwardActorSpec.scala index 38de039e58..05d06df32b 100644 --- a/akka-core/src/test/scala/ForwardActorSpec.scala +++ b/akka-core/src/test/scala/ForwardActorSpec.scala @@ -8,7 +8,7 @@ import Actor._ object ForwardActorSpec { object ForwardState { - var sender: Option[ActorID] = None + var sender: Option[ActorRef] = None } class ReceiverActor extends Actor { diff --git a/akka-core/src/test/scala/InMemoryActorSpec.scala b/akka-core/src/test/scala/InMemoryActorSpec.scala index 66320cd757..c9380eb34f 100644 --- a/akka-core/src/test/scala/InMemoryActorSpec.scala +++ b/akka-core/src/test/scala/InMemoryActorSpec.scala @@ -16,13 +16,13 @@ case class SetMapState(key: String, value: String) case class SetVectorState(key: String) case class SetRefState(key: String) case class Success(key: String, value: String) -case class Failure(key: String, value: String, failer: ActorID) +case class Failure(key: String, value: String, failer: ActorRef) case class SetMapStateOneWay(key: String, value: String) case class SetVectorStateOneWay(key: String) case class SetRefStateOneWay(key: String) case class SuccessOneWay(key: String, value: String) -case class FailureOneWay(key: String, value: String, failer: ActorID) +case class FailureOneWay(key: String, value: String, failer: ActorRef) case object GetNotifier diff --git a/akka-core/src/test/scala/RemoteSupervisorSpec.scala b/akka-core/src/test/scala/RemoteSupervisorSpec.scala index 2bb988465f..e3f06bd555 100644 --- a/akka-core/src/test/scala/RemoteSupervisorSpec.scala +++ b/akka-core/src/test/scala/RemoteSupervisorSpec.scala @@ -83,9 +83,9 @@ class RemoteSupervisorSpec extends JUnitSuite { }).start Thread.sleep(1000) - var pingpong1: ActorID = _ - var pingpong2: ActorID = _ - var pingpong3: ActorID = _ + var pingpong1: ActorRef = _ + var pingpong2: ActorRef = _ + var pingpong3: ActorRef = _ @Test def shouldStartServer = { Log.messageLog.clear diff --git a/akka-core/src/test/scala/ServerInitiatedRemoteActorSpec.scala b/akka-core/src/test/scala/ServerInitiatedRemoteActorSpec.scala index 1e07a3cc9c..6ecefca1e3 100644 --- a/akka-core/src/test/scala/ServerInitiatedRemoteActorSpec.scala +++ b/akka-core/src/test/scala/ServerInitiatedRemoteActorSpec.scala @@ -13,7 +13,7 @@ object ServerInitiatedRemoteActorSpec { val PORT = 9990 var server: RemoteServer = null - case class Send(actor: ActorID) + case class Send(actor: ActorRef) object RemoteActorSpecActorUnidirectional { val latch = new CountDownLatch(1) @@ -43,13 +43,13 @@ object ServerInitiatedRemoteActorSpec { class RemoteActorSpecActorAsyncSender extends Actor { start def receive = { - case Send(actor: ActorID) => + case Send(actor: ActorRef) => actor ! "Hello" case "World" => RemoteActorSpecActorAsyncSender.latch.countDown } - def send(actor: ActorID) { + def send(actor: ActorRef) { self ! Send(actor) } } diff --git a/akka-core/src/test/scala/SupervisorSpec.scala b/akka-core/src/test/scala/SupervisorSpec.scala index 92f2fffe04..b2c634cb85 100644 --- a/akka-core/src/test/scala/SupervisorSpec.scala +++ b/akka-core/src/test/scala/SupervisorSpec.scala @@ -69,9 +69,9 @@ object SupervisorSpec { class SupervisorSpec extends JUnitSuite { import SupervisorSpec._ - var pingpong1: ActorID = _ - var pingpong2: ActorID = _ - var pingpong3: ActorID = _ + var pingpong1: ActorRef = _ + var pingpong2: ActorRef = _ + var pingpong3: ActorRef = _ @Test def shouldStartServer = { messageLog.clear diff --git a/akka-http/src/main/scala/Security.scala b/akka-http/src/main/scala/Security.scala index a3ba2a2cdf..9ea966505e 100644 --- a/akka-http/src/main/scala/Security.scala +++ b/akka-http/src/main/scala/Security.scala @@ -22,7 +22,7 @@ package se.scalablesolutions.akka.security -import se.scalablesolutions.akka.actor.{Scheduler, Actor, ActorID, ActorRegistry} +import se.scalablesolutions.akka.actor.{Scheduler, Actor, ActorRef, ActorRegistry} import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.config.Config @@ -73,7 +73,7 @@ case class SpnegoCredentials(token: Array[Byte]) extends Credentials * Jersey Filter for invocation intercept and authorization/authentication */ class AkkaSecurityFilterFactory extends ResourceFilterFactory with Logging { - class Filter(actor: ActorID, rolesAllowed: Option[List[String]]) + class Filter(actor: ActorRef, rolesAllowed: Option[List[String]]) extends ResourceFilter with ContainerRequestFilter with Logging { override def getRequestFilter: ContainerRequestFilter = this @@ -111,7 +111,7 @@ class AkkaSecurityFilterFactory extends ResourceFilterFactory with Logging { * Currently we always take the first, since there usually should be at most one authentication actor, but a round-robin * strategy could be implemented in the future */ - def authenticator: ActorID = ActorRegistry.actorsFor(authenticatorFQN).head + def authenticator: ActorRef = ActorRegistry.actorsFor(authenticatorFQN).head def mkFilter(roles: Option[List[String]]): java.util.List[ResourceFilter] = java.util.Collections.singletonList(new Filter(authenticator, roles)) diff --git a/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraPersistentActorSpec.scala b/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraPersistentActorSpec.scala index 30d22c5d5e..b47e11c982 100644 --- a/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraPersistentActorSpec.scala +++ b/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraPersistentActorSpec.scala @@ -1,6 +1,6 @@ package se.scalablesolutions.akka.persistence.cassandra -import se.scalablesolutions.akka.actor.{Actor, ActorID, Transactor} +import se.scalablesolutions.akka.actor.{Actor, ActorRef, Transactor} import Actor._ import org.junit.Test @@ -17,13 +17,13 @@ case class SetMapState(key: String, value: String) case class SetVectorState(key: String) case class SetRefState(key: String) case class Success(key: String, value: String) -case class Failure(key: String, value: String, failer: ActorID) +case class Failure(key: String, value: String, failer: ActorRef) case class SetMapStateOneWay(key: String, value: String) case class SetVectorStateOneWay(key: String) case class SetRefStateOneWay(key: String) case class SuccessOneWay(key: String, value: String) -case class FailureOneWay(key: String, value: String, failer: ActorID) +case class FailureOneWay(key: String, value: String, failer: ActorRef) class CassandraPersistentActor extends Transactor { timeout = 100000 diff --git a/akka-persistence/akka-persistence-mongo/src/test/scala/MongoPersistentActorSpec.scala b/akka-persistence/akka-persistence-mongo/src/test/scala/MongoPersistentActorSpec.scala index 2398f3d460..d2fbc959be 100644 --- a/akka-persistence/akka-persistence-mongo/src/test/scala/MongoPersistentActorSpec.scala +++ b/akka-persistence/akka-persistence-mongo/src/test/scala/MongoPersistentActorSpec.scala @@ -7,7 +7,7 @@ import org.scalatest.junit.JUnitSuite import _root_.dispatch.json.{JsNumber, JsValue} import _root_.dispatch.json.Js._ -import se.scalablesolutions.akka.actor.{Transactor, Actor, ActorID} +import se.scalablesolutions.akka.actor.{Transactor, Actor, ActorRef} import Actor._ /** @@ -24,8 +24,8 @@ import Actor._ */ case class Balance(accountNo: String) -case class Debit(accountNo: String, amount: BigInt, failer: ActorID) -case class MultiDebit(accountNo: String, amounts: List[BigInt], failer: ActorID) +case class Debit(accountNo: String, amount: BigInt, failer: ActorRef) +case class MultiDebit(accountNo: String, amounts: List[BigInt], failer: ActorRef) case class Credit(accountNo: String, amount: BigInt) case class Log(start: Int, finish: Int) case object LogSize diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala index e46eb74d52..889078a544 100644 --- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala +++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala @@ -3,7 +3,7 @@ package se.scalablesolutions.akka.persistence.redis import org.junit.{Test, Before} import org.junit.Assert._ -import se.scalablesolutions.akka.actor.{Actor, ActorID, Transactor} +import se.scalablesolutions.akka.actor.{Actor, ActorRef, Transactor} import Actor._ /** @@ -20,8 +20,8 @@ import Actor._ */ case class Balance(accountNo: String) -case class Debit(accountNo: String, amount: BigInt, failer: ActorID) -case class MultiDebit(accountNo: String, amounts: List[BigInt], failer: ActorID) +case class Debit(accountNo: String, amount: BigInt, failer: ActorRef) +case class MultiDebit(accountNo: String, amounts: List[BigInt], failer: ActorRef) case class Credit(accountNo: String, amount: BigInt) case class Log(start: Int, finish: Int) case object LogSize diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentQSpec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentQSpec.scala index af1049cf95..6a4b42e3b9 100644 --- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentQSpec.scala +++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentQSpec.scala @@ -3,7 +3,7 @@ package se.scalablesolutions.akka.persistence.redis import org.junit.{Test, Before} import org.junit.Assert._ -import se.scalablesolutions.akka.actor.{Actor, ActorID, Transactor} +import se.scalablesolutions.akka.actor.{Actor, ActorRef, Transactor} import Actor._ /** @@ -15,7 +15,7 @@ import Actor._ case class NQ(accountNo: String) case object DQ -case class MNDQ(accountNos: List[String], noOfDQs: Int, failer: ActorID) +case class MNDQ(accountNos: List[String], noOfDQs: Int, failer: ActorRef) case object SZ class QueueActor extends Transactor { diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentSortedSetSpec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentSortedSetSpec.scala index a56e916937..f970e3fd27 100644 --- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentSortedSetSpec.scala +++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentSortedSetSpec.scala @@ -7,7 +7,7 @@ import org.scalatest.BeforeAndAfterAll import org.scalatest.junit.JUnitRunner import org.junit.runner.RunWith -import se.scalablesolutions.akka.actor.{Actor, ActorID, Transactor} +import se.scalablesolutions.akka.actor.{Actor, ActorRef, Transactor} import Actor._ /** @@ -43,7 +43,7 @@ case class SCORE(h: Hacker) case class RANGE(start: Int, end: Int) // add and remove subject to the condition that there will be at least 3 hackers -case class MULTI(add: List[Hacker], rem: List[Hacker], failer: ActorID) +case class MULTI(add: List[Hacker], rem: List[Hacker], failer: ActorRef) class SortedSetActor extends Transactor { timeout = 100000 diff --git a/akka-samples/akka-sample-camel/src/main/scala/Actors.scala b/akka-samples/akka-sample-camel/src/main/scala/Actors.scala index f23a8cddfe..d92f742e6f 100644 --- a/akka-samples/akka-sample-camel/src/main/scala/Actors.scala +++ b/akka-samples/akka-sample-camel/src/main/scala/Actors.scala @@ -1,7 +1,7 @@ package sample.camel import se.scalablesolutions.akka.actor.annotation.consume -import se.scalablesolutions.akka.actor.{Actor, ActorID, RemoteActor} +import se.scalablesolutions.akka.actor.{Actor, ActorRef, RemoteActor} import se.scalablesolutions.akka.camel.{Producer, Message, Consumer} import se.scalablesolutions.akka.util.Logging @@ -51,7 +51,7 @@ class Consumer2 extends Actor { } } -class Consumer3(transformer: ActorID) extends Actor with Consumer { +class Consumer3(transformer: ActorRef) extends Actor with Consumer { def endpointUri = "jetty:http://0.0.0.0:8877/camel/welcome" def receive = { @@ -59,7 +59,7 @@ class Consumer3(transformer: ActorID) extends Actor with Consumer { } } -class Transformer(producer: ActorID) extends Actor { +class Transformer(producer: ActorRef) extends Actor { protected def receive = { case msg: Message => producer.forward(msg.transformBody[String]("- %s -" format _)) } @@ -80,7 +80,7 @@ class Publisher(name: String, uri: String) extends Actor with Producer { protected def receive = produce } -class PublisherBridge(uri: String, publisher: ActorID) extends Actor with Consumer { +class PublisherBridge(uri: String, publisher: ActorRef) extends Actor with Consumer { def endpointUri = uri protected def receive = { diff --git a/akka-samples/akka-sample-camel/src/main/scala/Application1.scala b/akka-samples/akka-sample-camel/src/main/scala/Application1.scala index eda30a78c4..11f1bb8657 100644 --- a/akka-samples/akka-sample-camel/src/main/scala/Application1.scala +++ b/akka-samples/akka-sample-camel/src/main/scala/Application1.scala @@ -1,6 +1,6 @@ package sample.camel -import se.scalablesolutions.akka.actor.{Actor, ActorID} +import se.scalablesolutions.akka.actor.{Actor, ActorRef} import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.camel.Message import se.scalablesolutions.akka.remote.RemoteClient @@ -15,7 +15,7 @@ object Application1 { // def main(args: Array[String]) { - implicit val sender: Option[ActorID] = None + implicit val sender: Option[ActorRef] = None val actor1 = newActor[RemoteActor1] val actor2 = RemoteClient.actorFor("remote2", "localhost", 7777) diff --git a/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala b/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala index 22a2c36bb2..c2ce76e1fa 100644 --- a/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala +++ b/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala @@ -6,7 +6,7 @@ package sample.chat import scala.collection.mutable.HashMap -import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor, ActorID, RemoteActor} +import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor, ActorRef, RemoteActor} import se.scalablesolutions.akka.remote.{RemoteNode, RemoteClient} import se.scalablesolutions.akka.persistence.common.PersistentVector import se.scalablesolutions.akka.persistence.redis.RedisStorage @@ -72,7 +72,7 @@ class ChatClient(val name: String) { /** * Internal chat client session. */ -class Session(user: String, storage: ActorID) extends Actor { +class Session(user: String, storage: ActorRef) extends Actor { private val loginTime = System.currentTimeMillis private var userLog: List[String] = Nil @@ -124,8 +124,8 @@ class RedisChatStorage extends ChatStorage { */ trait SessionManagement { this: Actor => - val storage: ActorID // needs someone to provide the ChatStorage - val sessions = new HashMap[String, ActorID] + val storage: ActorRef // needs someone to provide the ChatStorage + val sessions = new HashMap[String, ActorRef] protected def sessionManagement: PartialFunction[Any, Unit] = { case Login(username) => @@ -151,7 +151,7 @@ trait SessionManagement { this: Actor => * Uses self-type annotation (this: Actor =>) to declare that it needs to be mixed in with an Actor. */ trait ChatManagement { this: Actor => - val sessions: HashMap[String, ActorID] // needs someone to provide the Session map + val sessions: HashMap[String, ActorRef] // needs someone to provide the Session map protected def chatManagement: PartialFunction[Any, Unit] = { case msg @ ChatMessage(from, _) => sessions(from) ! msg @@ -173,7 +173,7 @@ trait ChatServer extends Actor { faultHandler = Some(OneForOneStrategy(5, 5000)) trapExit = List(classOf[Exception]) - val storage: ActorID + val storage: ActorRef log.info("Chat service is starting up...")