Renamed ActorID to ActorRef

This commit is contained in:
Jonas Bonér 2010-05-06 08:13:12 +02:00
parent c469c8644e
commit 84b8e64582
39 changed files with 301 additions and 280 deletions

View file

@ -7,7 +7,7 @@ package se.scalablesolutions.akka.amqp
import com.rabbitmq.client.{AMQP => RabbitMQ, _} import com.rabbitmq.client.{AMQP => RabbitMQ, _}
import com.rabbitmq.client.ConnectionFactory import com.rabbitmq.client.ConnectionFactory
import se.scalablesolutions.akka.actor.{Actor, ActorID} import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.config.OneForOneStrategy import se.scalablesolutions.akka.config.OneForOneStrategy
import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.config.ScalaConfig._
@ -51,7 +51,7 @@ object AMQP {
exchangeName: String, exchangeName: String,
returnListener: Option[ReturnListener], returnListener: Option[ReturnListener],
shutdownListener: Option[ShutdownListener], shutdownListener: Option[ShutdownListener],
initReconnectDelay: Long): ActorID = initReconnectDelay: Long): ActorRef =
supervisor.newProducer( supervisor.newProducer(
config, hostname, port, exchangeName, returnListener, shutdownListener, initReconnectDelay) config, hostname, port, exchangeName, returnListener, shutdownListener, initReconnectDelay)
@ -66,13 +66,13 @@ object AMQP {
passive: Boolean, passive: Boolean,
durable: Boolean, durable: Boolean,
autoDelete: Boolean, autoDelete: Boolean,
configurationArguments: Map[String, AnyRef]): ActorID = configurationArguments: Map[String, AnyRef]): ActorRef =
supervisor.newConsumer( supervisor.newConsumer(
config, hostname, port, exchangeName, exchangeType, config, hostname, port, exchangeName, exchangeType,
shutdownListener, initReconnectDelay, shutdownListener, initReconnectDelay,
passive, durable, autoDelete, configurationArguments) passive, durable, autoDelete, configurationArguments)
def stopConnection(connection: ActorID) = supervisor.stopConnection(connection) def stopConnection(connection: ActorRef) = supervisor.stopConnection(connection)
/** /**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
@ -80,7 +80,7 @@ object AMQP {
class AMQPSupervisor extends Actor with Logging { class AMQPSupervisor extends Actor with Logging {
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
private val connections = new ConcurrentHashMap[ActorID, ActorID] private val connections = new ConcurrentHashMap[ActorRef, ActorRef]
faultHandler = Some(OneForOneStrategy(5, 5000)) faultHandler = Some(OneForOneStrategy(5, 5000))
trapExit = List(classOf[Throwable]) trapExit = List(classOf[Throwable])
@ -93,7 +93,7 @@ object AMQP {
exchangeName: String, exchangeName: String,
returnListener: Option[ReturnListener], returnListener: Option[ReturnListener],
shutdownListener: Option[ShutdownListener], shutdownListener: Option[ShutdownListener],
initReconnectDelay: Long): ActorID = { initReconnectDelay: Long): ActorRef = {
val producer = newActor(() => new Producer( val producer = newActor(() => new Producer(
new ConnectionFactory(config), new ConnectionFactory(config),
hostname, port, hostname, port,
@ -116,7 +116,7 @@ object AMQP {
passive: Boolean, passive: Boolean,
durable: Boolean, durable: Boolean,
autoDelete: Boolean, autoDelete: Boolean,
configurationArguments: Map[String, AnyRef]): ActorID = { configurationArguments: Map[String, AnyRef]): ActorRef = {
val consumer = newActor(() => new Consumer( val consumer = newActor(() => new Consumer(
new ConnectionFactory(config), new ConnectionFactory(config),
hostname, port, hostname, port,
@ -132,7 +132,7 @@ object AMQP {
consumer consumer
} }
def stopConnection(connection: ActorID) = { def stopConnection(connection: ActorRef) = {
connection ! Stop connection ! Stop
unlink(connection) unlink(connection)
connections.remove(connection) connections.remove(connection)
@ -189,11 +189,11 @@ object AMQP {
val exclusive: Boolean, val exclusive: Boolean,
val autoDelete: Boolean, val autoDelete: Boolean,
val isUsingExistingQueue: Boolean, val isUsingExistingQueue: Boolean,
val actor: ActorID) extends AMQPMessage { val actor: ActorRef) extends AMQPMessage {
/** /**
* Creates a non-exclusive, non-autodelete message listener. * Creates a non-exclusive, non-autodelete message listener.
*/ */
def this(queueName: String, routingKey: String, actor: ActorID) = this (queueName, routingKey, false, false, false, actor) def this(queueName: String, routingKey: String, actor: ActorRef) = this (queueName, routingKey, false, false, false, actor)
private[akka] var tag: Option[String] = None private[akka] var tag: Option[String] = None
@ -242,12 +242,12 @@ object AMQP {
exclusive: Boolean, exclusive: Boolean,
autoDelete: Boolean, autoDelete: Boolean,
isUsingExistingQueue: Boolean, isUsingExistingQueue: Boolean,
actor: ActorID) = actor: ActorRef) =
new MessageConsumerListener(queueName, routingKey, exclusive, autoDelete, isUsingExistingQueue, actor) new MessageConsumerListener(queueName, routingKey, exclusive, autoDelete, isUsingExistingQueue, actor)
def apply(queueName: String, def apply(queueName: String,
routingKey: String, routingKey: String,
actor: ActorID) = actor: ActorRef) =
new MessageConsumerListener(queueName, routingKey, false, false, false, actor) new MessageConsumerListener(queueName, routingKey, false, false, false, actor)
} }
@ -591,10 +591,10 @@ object AMQP {
} catch { } catch {
case e: Exception => case e: Exception =>
val waitInMillis = delay * 2 val waitInMillis = delay * 2
val outerActorID = self val outerActorRef = self
log.debug("Trying to reconnect to AMQP server in %n milliseconds [%s]", waitInMillis, this) log.debug("Trying to reconnect to AMQP server in %n milliseconds [%s]", waitInMillis, this)
reconnectionTimer.schedule(new TimerTask() { reconnectionTimer.schedule(new TimerTask() {
override def run = outerActorID ! Reconnect(waitInMillis) override def run = outerActorRef ! Reconnect(waitInMillis)
}, delay) }, delay)
} }
} }

View file

@ -10,7 +10,7 @@ import org.apache.camel.{Processor, ExchangePattern, Exchange, ProducerTemplate}
import org.apache.camel.impl.DefaultExchange import org.apache.camel.impl.DefaultExchange
import org.apache.camel.spi.Synchronization import org.apache.camel.spi.Synchronization
import se.scalablesolutions.akka.actor.{Actor, ActorID} import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.dispatch.CompletableFuture import se.scalablesolutions.akka.dispatch.CompletableFuture
import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.util.Logging
@ -162,7 +162,7 @@ trait Producer { self: Actor =>
*/ */
class ProducerResponseSender( class ProducerResponseSender(
headers: Map[String, Any], headers: Map[String, Any],
replyTo : Option[Either[ActorID, CompletableFuture[Any]]], replyTo : Option[Either[ActorRef, CompletableFuture[Any]]],
producer: Actor) extends Synchronization with Logging { producer: Actor) extends Synchronization with Logging {
implicit val producerActor = Some(producer) // the response sender implicit val producerActor = Some(producer) // the response sender

View file

@ -11,7 +11,7 @@ import java.util.concurrent.TimeoutException
import org.apache.camel.{Exchange, Consumer, Processor} import org.apache.camel.{Exchange, Consumer, Processor}
import org.apache.camel.impl.{DefaultProducer, DefaultEndpoint, DefaultComponent} import org.apache.camel.impl.{DefaultProducer, DefaultEndpoint, DefaultComponent}
import se.scalablesolutions.akka.actor.{ActorRegistry, Actor, ActorID} import se.scalablesolutions.akka.actor.{ActorRegistry, Actor, ActorRef}
import se.scalablesolutions.akka.camel.{Failure, CamelMessageConversion, Message} import se.scalablesolutions.akka.camel.{Failure, CamelMessageConversion, Message}
/** /**
@ -106,7 +106,7 @@ class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) {
* Send the exchange in-message to the given actor using the ! operator. The message * Send the exchange in-message to the given actor using the ! operator. The message
* send to the actor is of type se.scalablesolutions.akka.camel.Message. * send to the actor is of type se.scalablesolutions.akka.camel.Message.
*/ */
protected def processInOnly(exchange: Exchange, actor: ActorID): Unit = protected def processInOnly(exchange: Exchange, actor: ActorRef): Unit =
actor ! exchange.toRequestMessage(Map(Message.MessageExchangeId -> exchange.getExchangeId)) actor ! exchange.toRequestMessage(Map(Message.MessageExchangeId -> exchange.getExchangeId))
/** /**
@ -114,7 +114,7 @@ class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) {
* out-message is populated from the actor's reply message. The message sent to the * out-message is populated from the actor's reply message. The message sent to the
* actor is of type se.scalablesolutions.akka.camel.Message. * actor is of type se.scalablesolutions.akka.camel.Message.
*/ */
protected def processInOut(exchange: Exchange, actor: ActorID) { protected def processInOut(exchange: Exchange, actor: ActorRef) {
val header = Map(Message.MessageExchangeId -> exchange.getExchangeId) val header = Map(Message.MessageExchangeId -> exchange.getExchangeId)
val result: Any = actor !! exchange.toRequestMessage(header) val result: Any = actor !! exchange.toRequestMessage(header)
@ -128,7 +128,7 @@ class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) {
} }
} }
private def target: Option[ActorID] = private def target: Option[ActorRef] =
if (ep.id.isDefined) targetById(ep.id.get) if (ep.id.isDefined) targetById(ep.id.get)
else targetByUuid(ep.uuid.get) else targetByUuid(ep.uuid.get)

View file

@ -8,7 +8,7 @@ import java.util.concurrent.CountDownLatch
import org.apache.camel.builder.RouteBuilder import org.apache.camel.builder.RouteBuilder
import se.scalablesolutions.akka.actor.{ActorUnregistered, ActorRegistered, Actor, ActorID} import se.scalablesolutions.akka.actor.{ActorUnregistered, ActorRegistered, Actor, ActorRef}
import se.scalablesolutions.akka.actor.annotation.consume import se.scalablesolutions.akka.actor.annotation.consume
import se.scalablesolutions.akka.camel.{Consumer, CamelContextManager} import se.scalablesolutions.akka.camel.{Consumer, CamelContextManager}
import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.util.Logging
@ -81,7 +81,7 @@ class ConsumerRoute(val endpointUri: String, id: String, uuid: Boolean) extends
* *
* @author Martin Krasser * @author Martin Krasser
*/ */
class PublishRequestor(consumerPublisher: ActorID) extends Actor { class PublishRequestor(consumerPublisher: ActorRef) extends Actor {
protected def receive = { protected def receive = {
case ActorUnregistered(actor) => { /* ignore */ } case ActorUnregistered(actor) => { /* ignore */ }
case ActorRegistered(actor) => Publish.forConsumer(actor) match { case ActorRegistered(actor) => Publish.forConsumer(actor) match {
@ -112,23 +112,23 @@ object Publish {
* Creates a list of Publish request messages for all consumer actors in the <code>actors</code> * Creates a list of Publish request messages for all consumer actors in the <code>actors</code>
* list. * list.
*/ */
def forConsumers(actors: List[ActorID]): List[Publish] = def forConsumers(actors: List[ActorRef]): List[Publish] =
for (actor <- actors; pub = forConsumer(actor); if pub.isDefined) yield pub.get for (actor <- actors; pub = forConsumer(actor); if pub.isDefined) yield pub.get
/** /**
* Creates a Publish request message if <code>actor</code> is a consumer actor. * Creates a Publish request message if <code>actor</code> is a consumer actor.
*/ */
def forConsumer(actor: ActorID): Option[Publish] = def forConsumer(actor: ActorRef): Option[Publish] =
forConsumeAnnotated(actor) orElse forConsumerType(actor) forConsumeAnnotated(actor) orElse forConsumerType(actor)
private def forConsumeAnnotated(actorId: ActorID): Option[Publish] = { private def forConsumeAnnotated(actorId: ActorRef): Option[Publish] = {
val annotation = actorId.actorClass.getAnnotation(classOf[consume]) val annotation = actorId.actorClass.getAnnotation(classOf[consume])
if (annotation eq null) None if (annotation eq null) None
else if (actorId.remoteAddress.isDefined) None // do not publish proxies else if (actorId.remoteAddress.isDefined) None // do not publish proxies
else Some(Publish(annotation.value, actorId.id, false)) else Some(Publish(annotation.value, actorId.id, false))
} }
private def forConsumerType(actorId: ActorID): Option[Publish] = private def forConsumerType(actorId: ActorRef): Option[Publish] =
if (!actorId.actor.isInstanceOf[Consumer]) None if (!actorId.actor.isInstanceOf[Consumer]) None
else if (actorId.remoteAddress.isDefined) None else if (actorId.remoteAddress.isDefined) None
else Some(Publish(actorId.actor.asInstanceOf[Consumer].endpointUri, actorId.uuid, true)) else Some(Publish(actorId.actor.asInstanceOf[Consumer].endpointUri, actorId.uuid, true))

View file

@ -284,7 +284,7 @@ object ActiveObject {
actor.initialize(target, proxy) actor.initialize(target, proxy)
actor.timeout = timeout actor.timeout = timeout
if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get) if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get)
val actorId = new ActorID(() => actor) val actorId = new ActorRef(() => actor)
AspectInitRegistry.register(proxy, AspectInit(target, actorId, remoteAddress, timeout)) AspectInitRegistry.register(proxy, AspectInit(target, actorId, remoteAddress, timeout))
actorId.start actorId.start
proxy.asInstanceOf[T] proxy.asInstanceOf[T]
@ -295,7 +295,7 @@ object ActiveObject {
actor.initialize(target.getClass, target) actor.initialize(target.getClass, target)
actor.timeout = timeout actor.timeout = timeout
if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get) if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get)
val actorId = new ActorID(() => actor) val actorId = new ActorRef(() => actor)
AspectInitRegistry.register(proxy, AspectInit(intf, actorId, remoteAddress, timeout)) AspectInitRegistry.register(proxy, AspectInit(intf, actorId, remoteAddress, timeout))
actorId.start actorId.start
proxy.asInstanceOf[T] proxy.asInstanceOf[T]
@ -304,7 +304,7 @@ object ActiveObject {
/** /**
* Get the underlying dispatcher actor for the given active object. * Get the underlying dispatcher actor for the given active object.
*/ */
def actorFor(obj: AnyRef): Option[ActorID] = def actorFor(obj: AnyRef): Option[ActorRef] =
ActorRegistry.actorsFor(classOf[Dispatcher]).find(a => a.actor.asInstanceOf[Dispatcher].target == Some(obj)) ActorRegistry.actorsFor(classOf[Dispatcher]).find(a => a.actor.asInstanceOf[Dispatcher].target == Some(obj))
/** /**
@ -388,10 +388,10 @@ private[akka] object AspectInitRegistry {
private[akka] sealed case class AspectInit( private[akka] sealed case class AspectInit(
val target: Class[_], val target: Class[_],
val actorId: ActorID, val actorId: ActorRef,
val remoteAddress: Option[InetSocketAddress], val remoteAddress: Option[InetSocketAddress],
val timeout: Long) { val timeout: Long) {
def this(target: Class[_], actorId: ActorID, timeout: Long) = this(target, actorId, None, timeout) def this(target: Class[_], actorId: ActorRef, timeout: Long) = this(target, actorId, None, timeout)
} }
/** /**
@ -405,7 +405,7 @@ private[akka] sealed case class AspectInit(
private[akka] sealed class ActiveObjectAspect { private[akka] sealed class ActiveObjectAspect {
@volatile private var isInitialized = false @volatile private var isInitialized = false
private var target: Class[_] = _ private var target: Class[_] = _
private var actorId: ActorID = _ private var actorId: ActorRef = _
private var remoteAddress: Option[InetSocketAddress] = _ private var remoteAddress: Option[InetSocketAddress] = _
private var timeout: Long = _ private var timeout: Long = _
@ -520,7 +520,7 @@ private[akka] sealed class ActiveObjectAspect {
} }
// FIXME Jan Kronquist: started work on issue 121 // FIXME Jan Kronquist: started work on issue 121
private[akka] case class Link(val actor: ActorID) private[akka] case class Link(val actor: ActorRef)
object Dispatcher { object Dispatcher {
val ZERO_ITEM_CLASS_ARRAY = Array[Class[_]]() val ZERO_ITEM_CLASS_ARRAY = Array[Class[_]]()

View file

@ -51,9 +51,9 @@ abstract class RemoteActor(hostname: String, port: Int) extends Actor {
@serializable sealed trait LifeCycleMessage @serializable sealed trait LifeCycleMessage
case class HotSwap(code: Option[PartialFunction[Any, Unit]]) extends LifeCycleMessage case class HotSwap(code: Option[PartialFunction[Any, Unit]]) extends LifeCycleMessage
case class Restart(reason: Throwable) extends LifeCycleMessage case class Restart(reason: Throwable) extends LifeCycleMessage
case class Exit(dead: ActorID, killer: Throwable) extends LifeCycleMessage case class Exit(dead: ActorRef, killer: Throwable) extends LifeCycleMessage
case class Unlink(child: ActorID) extends LifeCycleMessage case class Unlink(child: ActorRef) extends LifeCycleMessage
case class UnlinkAndStop(child: ActorID) extends LifeCycleMessage case class UnlinkAndStop(child: ActorRef) extends LifeCycleMessage
case object Kill extends LifeCycleMessage case object Kill extends LifeCycleMessage
// Exceptions for Actors // Exceptions for Actors
@ -78,7 +78,7 @@ object Actor extends Logging {
} }
/** /**
* Creates a new ActorID out of the Actor with type T. * Creates a new ActorRef out of the Actor with type T.
* <pre> * <pre>
* import Actor._ * import Actor._
* val actor = newActor[MyActor] * val actor = newActor[MyActor]
@ -87,10 +87,10 @@ object Actor extends Logging {
* actor.stop * actor.stop
* </pre> * </pre>
*/ */
def newActor[T <: Actor: Manifest]: ActorID = new ActorID(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]]) def newActor[T <: Actor: Manifest]: ActorRef = new ActorRef(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]])
/** /**
* Creates a new ActorID out of the Actor. Allows you to pass in a factory function * Creates a new ActorRef out of the Actor. Allows you to pass in a factory function
* that creates the Actor. Please note that this function can be invoked multiple * that creates the Actor. Please note that this function can be invoked multiple
* times if for example the Actor is supervised and needs to be restarted. * times if for example the Actor is supervised and needs to be restarted.
* <p/> * <p/>
@ -103,7 +103,7 @@ object Actor extends Logging {
* actor.stop * actor.stop
* </pre> * </pre>
*/ */
def newActor(factory: () => Actor): ActorID = new ActorID(factory) def newActor(factory: () => Actor): ActorRef = new ActorRef(factory)
/** /**
* Use to create an anonymous event-driven actor. * Use to create an anonymous event-driven actor.
@ -121,8 +121,8 @@ object Actor extends Logging {
* } * }
* </pre> * </pre>
*/ */
def actor(body: PartialFunction[Any, Unit]): ActorID = def actor(body: PartialFunction[Any, Unit]): ActorRef =
new ActorID(() => new Actor() { new ActorRef(() => new Actor() {
lifeCycle = Some(LifeCycle(Permanent)) lifeCycle = Some(LifeCycle(Permanent))
start start
def receive: PartialFunction[Any, Unit] = body def receive: PartialFunction[Any, Unit] = body
@ -144,8 +144,8 @@ object Actor extends Logging {
* } * }
* </pre> * </pre>
*/ */
def transactor(body: PartialFunction[Any, Unit]): ActorID = def transactor(body: PartialFunction[Any, Unit]): ActorRef =
new ActorID(() => new Transactor() { new ActorRef(() => new Transactor() {
lifeCycle = Some(LifeCycle(Permanent)) lifeCycle = Some(LifeCycle(Permanent))
start start
def receive: PartialFunction[Any, Unit] = body def receive: PartialFunction[Any, Unit] = body
@ -165,8 +165,8 @@ object Actor extends Logging {
* } * }
* </pre> * </pre>
*/ */
def temporaryActor(body: PartialFunction[Any, Unit]): ActorID = def temporaryActor(body: PartialFunction[Any, Unit]): ActorRef =
new ActorID(() => new Actor() { new ActorRef(() => new Actor() {
lifeCycle = Some(LifeCycle(Temporary)) lifeCycle = Some(LifeCycle(Temporary))
start start
def receive = body def receive = body
@ -192,7 +192,7 @@ object Actor extends Logging {
def init[A](body: => Unit) = { def init[A](body: => Unit) = {
def handler[A](body: => Unit) = new { def handler[A](body: => Unit) = new {
def receive(handler: PartialFunction[Any, Unit]) = def receive(handler: PartialFunction[Any, Unit]) =
new ActorID(() => new Actor() { new ActorRef(() => new Actor() {
lifeCycle = Some(LifeCycle(Permanent)) lifeCycle = Some(LifeCycle(Permanent))
start start
body body
@ -243,8 +243,8 @@ object Actor extends Logging {
} }
/** /**
* ActorID is an immutable and serializable handle to an Actor. * ActorRef is an immutable and serializable handle to an Actor.
* Create an ActorID for an Actor by using the factory method on the Actor object. * Create an ActorRef for an Actor by using the factory method on the Actor object.
* Here is an example: * Here is an example:
* <pre> * <pre>
* import Actor._ * import Actor._
@ -257,7 +257,7 @@ object Actor extends Logging {
* *
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
final class ActorID private[akka] () { final class ActorRef private[akka] () {
private[akka] var newActorFactory: Either[Option[Class[_ <: Actor]], Option[() => Actor]] = Left(None) private[akka] var newActorFactory: Either[Option[Class[_ <: Actor]], Option[() => Actor]] = Left(None)
private[akka] def this(clazz: Class[_ <: Actor]) = { private[akka] def this(clazz: Class[_ <: Actor]) = {
@ -286,19 +286,19 @@ final class ActorID private[akka] () {
case _ => case _ =>
throw new ActorInitializationException("Can't create Actor, no Actor class or factory function in scope") throw new ActorInitializationException("Can't create Actor, no Actor class or factory function in scope")
} }
if (actor eq null) throw new ActorInitializationException("Actor instance passed to ActorID can not be 'null'") if (actor eq null) throw new ActorInitializationException("Actor instance passed to ActorRef can not be 'null'")
actor actor
} }
/** /**
* Returns the class for the Actor instance that is managed by the ActorID. * Returns the class for the Actor instance that is managed by the ActorRef.
*/ */
def actorClass: Class[_ <: Actor] = actor.getClass.asInstanceOf[Class[_ <: Actor]] def actorClass: Class[_ <: Actor] = actor.getClass.asInstanceOf[Class[_ <: Actor]]
/** /**
* Starts up the actor and its message queue. * Starts up the actor and its message queue.
*/ */
def start: ActorID = { def start: ActorRef = {
actor.start actor.start
this this
} }
@ -327,7 +327,7 @@ final class ActorID private[akka] () {
/** /**
* Returns the supervisor, if there is one. * Returns the supervisor, if there is one.
*/ */
def supervisor: Option[ActorID] = actor.supervisor def supervisor: Option[ActorRef] = actor.supervisor
/** /**
* Sends a one-way asynchronous message. E.g. fire-and-forget semantics. * Sends a one-way asynchronous message. E.g. fire-and-forget semantics.
@ -343,7 +343,7 @@ final class ActorID private[akka] () {
* </pre> * </pre>
* <p/> * <p/>
*/ */
def !(message: Any)(implicit sender: Option[ActorID] = None) = { def !(message: Any)(implicit sender: Option[ActorRef] = None) = {
if (actor.isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") if (actor.isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages")
if (actor.isRunning) actor.postMessageToMailbox(message, sender) if (actor.isRunning) actor.postMessageToMailbox(message, sender)
else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
@ -421,7 +421,7 @@ final class ActorID private[akka] () {
* <p/> * <p/>
* Works with '!', '!!' and '!!!'. * Works with '!', '!!' and '!!!'.
*/ */
def forward(message: Any)(implicit sender: Some[ActorID]) = { def forward(message: Any)(implicit sender: Some[ActorRef]) = {
if (actor.isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") if (actor.isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages")
if (actor.isRunning) { if (actor.isRunning) {
sender.get.actor.replyTo match { sender.get.actor.replyTo match {
@ -491,11 +491,11 @@ final class ActorID private[akka] () {
*/ */
def timeout: Long = actor.timeout def timeout: Long = actor.timeout
override def toString: String = "ActorID[" + actor.toString + "]" override def toString: String = "ActorRef[" + actor.toString + "]"
override def hashCode: Int = actor.hashCode override def hashCode: Int = actor.hashCode
override def equals(that: Any): Boolean = actor.equals(that) override def equals(that: Any): Boolean = actor.equals(that)
private[akka] def supervisor_=(sup: Option[ActorID]): Unit = actor._supervisor = sup private[akka] def supervisor_=(sup: Option[ActorRef]): Unit = actor._supervisor = sup
private[akka] def trapExit: List[Class[_ <: Throwable]] = actor.trapExit private[akka] def trapExit: List[Class[_ <: Throwable]] = actor.trapExit
private[akka] def trapExit_=(exits: List[Class[_ <: Throwable]]) = actor.trapExit = exits private[akka] def trapExit_=(exits: List[Class[_ <: Throwable]]) = actor.trapExit = exits
@ -525,7 +525,7 @@ trait Actor extends TransactionManagement with Logging {
private[akka] var _uuid = UUID.newUuid.toString private[akka] var _uuid = UUID.newUuid.toString
/** /**
* The 'self' field holds the ActorID for this actor. * The 'self' field holds the ActorRef for this actor.
* Can be used to send messages to itself: * Can be used to send messages to itself:
* <pre> * <pre>
* self ! message * self ! message
@ -533,7 +533,7 @@ trait Actor extends TransactionManagement with Logging {
* Note: if you are using the 'self' field in the constructor of the Actor * Note: if you are using the 'self' field in the constructor of the Actor
* then you have to make the fields/operations that are using it 'lazy'. * then you have to make the fields/operations that are using it 'lazy'.
*/ */
implicit val self = new ActorID(() => this) implicit val self = new ActorRef(() => this)
/** For internal use only */ /** For internal use only */
implicit val _selfOption = Some(self) implicit val _selfOption = Some(self)
@ -548,8 +548,8 @@ trait Actor extends TransactionManagement with Logging {
@volatile private[akka] var _isKilled = false @volatile private[akka] var _isKilled = false
private var _hotswap: Option[PartialFunction[Any, Unit]] = None private var _hotswap: Option[PartialFunction[Any, Unit]] = None
private[akka] var _remoteAddress: Option[InetSocketAddress] = None private[akka] var _remoteAddress: Option[InetSocketAddress] = None
private[akka] var _linkedActors: Option[HashSet[ActorID]] = None private[akka] var _linkedActors: Option[HashSet[ActorRef]] = None
private[akka] var _supervisor: Option[ActorID] = None private[akka] var _supervisor: Option[ActorRef] = None
private[akka] var _replyToAddress: Option[InetSocketAddress] = None private[akka] var _replyToAddress: Option[InetSocketAddress] = None
private[akka] val _mailbox: Deque[MessageInvocation] = new ConcurrentLinkedDeque[MessageInvocation] private[akka] val _mailbox: Deque[MessageInvocation] = new ConcurrentLinkedDeque[MessageInvocation]
@ -565,7 +565,7 @@ trait Actor extends TransactionManagement with Logging {
* - Is Some(Left(Actor)) if sender is an actor * - Is Some(Left(Actor)) if sender is an actor
* - Is Some(Right(CompletableFuture)) if sender is holding on to a Future for the result * - Is Some(Right(CompletableFuture)) if sender is holding on to a Future for the result
*/ */
private[akka] var replyTo: Option[Either[ActorID, CompletableFuture[Any]]] = None private[akka] var replyTo: Option[Either[ActorRef, CompletableFuture[Any]]] = None
// ==================================== // ====================================
// ==== USER CALLBACKS TO OVERRIDE ==== // ==== USER CALLBACKS TO OVERRIDE ====
@ -777,7 +777,7 @@ trait Actor extends TransactionManagement with Logging {
/** /**
* Returns the supervisor, if there is one. * Returns the supervisor, if there is one.
*/ */
def supervisor: Option[ActorID] = _supervisor def supervisor: Option[ActorRef] = _supervisor
/** /**
* Use <code>reply(..)</code> to reply with a message to the original sender of the message currently * Use <code>reply(..)</code> to reply with a message to the original sender of the message currently
@ -879,7 +879,7 @@ trait Actor extends TransactionManagement with Logging {
* <p/> * <p/>
* To be invoked from within the actor itself. * To be invoked from within the actor itself.
*/ */
protected[this] def link(actorId: ActorID) = { protected[this] def link(actorId: ActorRef) = {
if (actorId.supervisor.isDefined) throw new IllegalStateException( if (actorId.supervisor.isDefined) throw new IllegalStateException(
"Actor can only have one supervisor [" + actorId + "], e.g. link(actor) fails") "Actor can only have one supervisor [" + actorId + "], e.g. link(actor) fails")
getLinkedActors.add(actorId) getLinkedActors.add(actorId)
@ -892,7 +892,7 @@ trait Actor extends TransactionManagement with Logging {
* <p/> * <p/>
* To be invoked from within the actor itself. * To be invoked from within the actor itself.
*/ */
protected[this] def unlink(actorId: ActorID) = { protected[this] def unlink(actorId: ActorRef) = {
if (!getLinkedActors.contains(actorId)) throw new IllegalStateException( if (!getLinkedActors.contains(actorId)) throw new IllegalStateException(
"Actor [" + actorId + "] is not a linked actor, can't unlink") "Actor [" + actorId + "] is not a linked actor, can't unlink")
getLinkedActors.remove(actorId) getLinkedActors.remove(actorId)
@ -905,7 +905,7 @@ trait Actor extends TransactionManagement with Logging {
* <p/> * <p/>
* To be invoked from within the actor itself. * To be invoked from within the actor itself.
*/ */
protected[this] def startLink(actorId: ActorID) = { protected[this] def startLink(actorId: ActorRef) = {
try { try {
actorId.start actorId.start
} finally { } finally {
@ -918,7 +918,7 @@ trait Actor extends TransactionManagement with Logging {
* <p/> * <p/>
* To be invoked from within the actor itself. * To be invoked from within the actor itself.
*/ */
protected[this] def startLinkRemote(actorId: ActorID, hostname: String, port: Int) = { protected[this] def startLinkRemote(actorId: ActorRef, hostname: String, port: Int) = {
try { try {
actorId.makeRemote(hostname, port) actorId.makeRemote(hostname, port)
actorId.start actorId.start
@ -932,7 +932,7 @@ trait Actor extends TransactionManagement with Logging {
* <p/> * <p/>
* To be invoked from within the actor itself. * To be invoked from within the actor itself.
*/ */
protected[this] def spawn[T <: Actor : Manifest]: ActorID = { protected[this] def spawn[T <: Actor : Manifest]: ActorRef = {
val actorId = spawnButDoNotStart[T] val actorId = spawnButDoNotStart[T]
actorId.start actorId.start
actorId actorId
@ -943,7 +943,7 @@ trait Actor extends TransactionManagement with Logging {
* <p/> * <p/>
* To be invoked from within the actor itself. * To be invoked from within the actor itself.
*/ */
protected[this] def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorID = { protected[this] def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int): ActorRef = {
val actor = spawnButDoNotStart[T] val actor = spawnButDoNotStart[T]
actor.makeRemote(hostname, port) actor.makeRemote(hostname, port)
actor.start actor.start
@ -955,7 +955,7 @@ trait Actor extends TransactionManagement with Logging {
* <p/> * <p/>
* To be invoked from within the actor itself. * To be invoked from within the actor itself.
*/ */
protected[this] def spawnLink[T <: Actor: Manifest]: ActorID = { protected[this] def spawnLink[T <: Actor: Manifest]: ActorRef = {
val actor = spawnButDoNotStart[T] val actor = spawnButDoNotStart[T]
try { try {
actor.start actor.start
@ -970,7 +970,7 @@ trait Actor extends TransactionManagement with Logging {
* <p/> * <p/>
* To be invoked from within the actor itself. * To be invoked from within the actor itself.
*/ */
protected[this] def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorID = { protected[this] def spawnLinkRemote[T <: Actor : Manifest](hostname: String, port: Int): ActorRef = {
val actor = spawnButDoNotStart[T] val actor = spawnButDoNotStart[T]
try { try {
actor.makeRemote(hostname, port) actor.makeRemote(hostname, port)
@ -999,15 +999,15 @@ trait Actor extends TransactionManagement with Logging {
private[akka] def _resume = _isSuspended = false private[akka] def _resume = _isSuspended = false
private def spawnButDoNotStart[T <: Actor : Manifest]: ActorID = { private def spawnButDoNotStart[T <: Actor : Manifest]: ActorRef = {
val actor = manifest[T].erasure.asInstanceOf[Class[T]].newInstance val actor = manifest[T].erasure.asInstanceOf[Class[T]].newInstance
if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) { if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) {
actor.dispatcher = dispatcher actor.dispatcher = dispatcher
} }
new ActorID(() => actor) new ActorRef(() => actor)
} }
protected[akka] def postMessageToMailbox(message: Any, sender: Option[ActorID]): Unit = { protected[akka] def postMessageToMailbox(message: Any, sender: Option[ActorRef]): Unit = {
joinTransaction(message) joinTransaction(message)
if (_remoteAddress.isDefined) { if (_remoteAddress.isDefined) {
@ -1201,7 +1201,7 @@ trait Actor extends TransactionManagement with Logging {
case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message") case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message")
} }
private[this] def handleTrapExit(dead: ActorID, reason: Throwable): Unit = { private[this] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = {
if (trapExit.exists(_.isAssignableFrom(reason.getClass))) { if (trapExit.exists(_.isAssignableFrom(reason.getClass))) {
if (faultHandler.isDefined) { if (faultHandler.isDefined) {
faultHandler.get match { faultHandler.get match {
@ -1216,7 +1216,7 @@ trait Actor extends TransactionManagement with Logging {
} }
private[this] def restartLinkedActors(reason: Throwable) = { private[this] def restartLinkedActors(reason: Throwable) = {
getLinkedActors.toArray.toList.asInstanceOf[List[ActorID]].foreach { getLinkedActors.toArray.toList.asInstanceOf[List[ActorRef]].foreach {
actorId => actorId =>
val actor = actorId.actor val actor = actorId.actor
if (actor.lifeCycle.isEmpty) actor.lifeCycle = Some(LifeCycle(Permanent)) if (actor.lifeCycle.isEmpty) actor.lifeCycle = Some(LifeCycle(Permanent))
@ -1257,9 +1257,9 @@ trait Actor extends TransactionManagement with Logging {
} else None } else None
} }
protected def getLinkedActors: HashSet[ActorID] = { protected def getLinkedActors: HashSet[ActorRef] = {
if (_linkedActors.isEmpty) { if (_linkedActors.isEmpty) {
val set = new HashSet[ActorID] val set = new HashSet[ActorRef]
_linkedActors = Some(set) _linkedActors = Some(set)
set set
} else _linkedActors.get } else _linkedActors.get
@ -1314,7 +1314,7 @@ object DispatcherType {
* *
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
class ActorMessageInvoker private[akka] (val actorId: ActorID) extends MessageInvoker { class ActorMessageInvoker private[akka] (val actorId: ActorRef) extends MessageInvoker {
def invoke(handle: MessageInvocation) = actorId.actor.invoke(handle) def invoke(handle: MessageInvocation) = actorId.actor.invoke(handle)
} }

View file

@ -11,8 +11,8 @@ import scala.reflect.Manifest
import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap} import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
sealed trait ActorRegistryEvent sealed trait ActorRegistryEvent
case class ActorRegistered(actor: ActorID) extends ActorRegistryEvent case class ActorRegistered(actor: ActorRef) extends ActorRegistryEvent
case class ActorUnregistered(actor: ActorID) extends ActorRegistryEvent case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent
/** /**
* Registry holding all Actor instances in the whole system. * Registry holding all Actor instances in the whole system.
@ -27,16 +27,16 @@ case class ActorUnregistered(actor: ActorID) extends ActorRegistryEvent
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
object ActorRegistry extends Logging { object ActorRegistry extends Logging {
private val actorsByUUID = new ConcurrentHashMap[String, ActorID] private val actorsByUUID = new ConcurrentHashMap[String, ActorRef]
private val actorsById = new ConcurrentHashMap[String, List[ActorID]] private val actorsById = new ConcurrentHashMap[String, List[ActorRef]]
private val actorsByClassName = new ConcurrentHashMap[String, List[ActorID]] private val actorsByClassName = new ConcurrentHashMap[String, List[ActorRef]]
private val registrationListeners = new CopyOnWriteArrayList[ActorID] private val registrationListeners = new CopyOnWriteArrayList[ActorRef]
/** /**
* Returns all actors in the system. * Returns all actors in the system.
*/ */
def actors: List[ActorID] = { def actors: List[ActorRef] = {
val all = new ListBuffer[ActorID] val all = new ListBuffer[ActorRef]
val elements = actorsByUUID.elements val elements = actorsByUUID.elements
while (elements.hasMoreElements) all += elements.nextElement while (elements.hasMoreElements) all += elements.nextElement
all.toList all.toList
@ -45,7 +45,7 @@ object ActorRegistry extends Logging {
/** /**
* Invokes a function for all actors. * Invokes a function for all actors.
*/ */
def foreach(f: (ActorID) => Unit) = { def foreach(f: (ActorRef) => Unit) = {
val elements = actorsByUUID.elements val elements = actorsByUUID.elements
while (elements.hasMoreElements) f(elements.nextElement) while (elements.hasMoreElements) f(elements.nextElement)
} }
@ -53,8 +53,8 @@ object ActorRegistry extends Logging {
/** /**
* Finds all actors that are subtypes of the class passed in as the Manifest argument. * Finds all actors that are subtypes of the class passed in as the Manifest argument.
*/ */
def actorsFor[T <: Actor](implicit manifest: Manifest[T]): List[ActorID] = { def actorsFor[T <: Actor](implicit manifest: Manifest[T]): List[ActorRef] = {
val all = new ListBuffer[ActorID] val all = new ListBuffer[ActorRef]
val elements = actorsByUUID.elements val elements = actorsByUUID.elements
while (elements.hasMoreElements) { while (elements.hasMoreElements) {
val actorId = elements.nextElement val actorId = elements.nextElement
@ -68,7 +68,7 @@ object ActorRegistry extends Logging {
/** /**
* Finds all actors of the exact type specified by the class passed in as the Class argument. * Finds all actors of the exact type specified by the class passed in as the Class argument.
*/ */
def actorsFor[T <: Actor](clazz: Class[T]): List[ActorID] = { def actorsFor[T <: Actor](clazz: Class[T]): List[ActorRef] = {
if (actorsByClassName.containsKey(clazz.getName)) actorsByClassName.get(clazz.getName) if (actorsByClassName.containsKey(clazz.getName)) actorsByClassName.get(clazz.getName)
else Nil else Nil
} }
@ -76,7 +76,7 @@ object ActorRegistry extends Logging {
/** /**
* Finds all actors that has a specific id. * Finds all actors that has a specific id.
*/ */
def actorsFor(id: String): List[ActorID] = { def actorsFor(id: String): List[ActorRef] = {
if (actorsById.containsKey(id)) actorsById.get(id) if (actorsById.containsKey(id)) actorsById.get(id)
else Nil else Nil
} }
@ -84,7 +84,7 @@ object ActorRegistry extends Logging {
/** /**
* Finds the actor that has a specific UUID. * Finds the actor that has a specific UUID.
*/ */
def actorFor(uuid: String): Option[ActorID] = { def actorFor(uuid: String): Option[ActorRef] = {
if (actorsByUUID.containsKey(uuid)) Some(actorsByUUID.get(uuid)) if (actorsByUUID.containsKey(uuid)) Some(actorsByUUID.get(uuid))
else None else None
} }
@ -92,7 +92,7 @@ object ActorRegistry extends Logging {
/** /**
* Registers an actor in the ActorRegistry. * Registers an actor in the ActorRegistry.
*/ */
def register(actorId: ActorID) = { def register(actorId: ActorRef) = {
// UUID // UUID
actorsByUUID.put(actorId.uuid, actorId) actorsByUUID.put(actorId.uuid, actorId)
@ -115,7 +115,7 @@ object ActorRegistry extends Logging {
/** /**
* Unregisters an actor in the ActorRegistry. * Unregisters an actor in the ActorRegistry.
*/ */
def unregister(actor: ActorID) = { def unregister(actor: ActorRef) = {
actorsByUUID remove actor.uuid actorsByUUID remove actor.uuid
actorsById remove actor.id actorsById remove actor.id
actorsByClassName remove actor.getClass.getName actorsByClassName remove actor.getClass.getName
@ -138,7 +138,7 @@ object ActorRegistry extends Logging {
/** /**
* Adds the registration <code>listener</code> this this registry's listener list. * Adds the registration <code>listener</code> this this registry's listener list.
*/ */
def addRegistrationListener(listener: ActorID) = { def addRegistrationListener(listener: ActorRef) = {
listener.start listener.start
registrationListeners.add(listener) registrationListeners.add(listener)
} }
@ -146,12 +146,12 @@ object ActorRegistry extends Logging {
/** /**
* Removes the registration <code>listener</code> this this registry's listener list. * Removes the registration <code>listener</code> this this registry's listener list.
*/ */
def removeRegistrationListener(listener: ActorID) = { def removeRegistrationListener(listener: ActorRef) = {
listener.stop listener.stop
registrationListeners.remove(listener) registrationListeners.remove(listener)
} }
private def foreachListener(f: (ActorID) => Unit) { private def foreachListener(f: (ActorRef) => Unit) {
val iterator = registrationListeners.iterator val iterator = registrationListeners.iterator
while (iterator.hasNext) { while (iterator.hasNext) {
val listener = iterator.next val listener = iterator.next

View file

@ -26,7 +26,7 @@ case class SchedulerException(msg: String, e: Throwable) extends RuntimeExceptio
* Rework of David Pollak's ActorPing class in the Lift Project * Rework of David Pollak's ActorPing class in the Lift Project
* which is licensed under the Apache 2 License. * which is licensed under the Apache 2 License.
*/ */
class ScheduleActor(val receiver: ActorID, val future: ScheduledFuture[AnyRef]) extends Actor with Logging { class ScheduleActor(val receiver: ActorRef, val future: ScheduledFuture[AnyRef]) extends Actor with Logging {
lifeCycle = Some(LifeCycle(Permanent)) lifeCycle = Some(LifeCycle(Permanent))
def receive = { def receive = {
@ -39,14 +39,14 @@ class ScheduleActor(val receiver: ActorID, val future: ScheduledFuture[AnyRef])
object Scheduler extends Actor { object Scheduler extends Actor {
private var service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory) private var service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory)
private val schedulers = new ConcurrentHashMap[ActorID, ActorID] private val schedulers = new ConcurrentHashMap[ActorRef, ActorRef]
faultHandler = Some(OneForOneStrategy(5, 5000)) faultHandler = Some(OneForOneStrategy(5, 5000))
trapExit = List(classOf[Throwable]) trapExit = List(classOf[Throwable])
start start
def schedule(receiver: ActorID, message: AnyRef, initialDelay: Long, delay: Long, timeUnit: TimeUnit) = { def schedule(receiver: ActorRef, message: AnyRef, initialDelay: Long, delay: Long, timeUnit: TimeUnit) = {
try { try {
startLink(new ActorID(() => new ScheduleActor( startLink(new ActorRef(() => new ScheduleActor(
receiver, receiver,
service.scheduleAtFixedRate(new java.lang.Runnable { service.scheduleAtFixedRate(new java.lang.Runnable {
def run = receiver ! message; def run = receiver ! message;
@ -58,7 +58,7 @@ object Scheduler extends Actor {
def restart = service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory) def restart = service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory)
def stopSupervising(actorId: ActorID) = { def stopSupervising(actorId: ActorRef) = {
unlink(actorId) unlink(actorId)
schedulers.remove(actorId) schedulers.remove(actorId)
} }

View file

@ -86,7 +86,7 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep
// FIXME should Supervisor really havea newThreadBasedDispatcher?? // FIXME should Supervisor really havea newThreadBasedDispatcher??
dispatcher = Dispatchers.newThreadBasedDispatcher(this) dispatcher = Dispatchers.newThreadBasedDispatcher(this)
private val actors = new ConcurrentHashMap[String, List[ActorID]] private val actors = new ConcurrentHashMap[String, List[ActorRef]]
// Cheating, should really go through the dispatcher rather than direct access to a CHM // Cheating, should really go through the dispatcher rather than direct access to a CHM
def getInstance[T](clazz: Class[T]): List[T] = actors.get(clazz.getName).asInstanceOf[List[T]] def getInstance[T](clazz: Class[T]): List[T] = actors.get(clazz.getName).asInstanceOf[List[T]]
@ -103,7 +103,7 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep
override def stop = synchronized { override def stop = synchronized {
super[Actor].stop super[Actor].stop
getLinkedActors.toArray.toList.asInstanceOf[List[ActorID]].foreach { actorId => getLinkedActors.toArray.toList.asInstanceOf[List[ActorRef]].foreach { actorId =>
actorId.stop actorId.stop
log.info("Shutting actor down: %s", actorId) log.info("Shutting actor down: %s", actorId)
} }
@ -123,7 +123,7 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep
val className = actorId.actor.getClass.getName val className = actorId.actor.getClass.getName
val currentActors = { val currentActors = {
val list = actors.get(className) val list = actors.get(className)
if (list eq null) List[ActorID]() if (list eq null) List[ActorRef]()
else list else list
} }
actors.put(className, actorId :: currentActors) actors.put(className, actorId :: currentActors)
@ -143,7 +143,7 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep
val className = supervisor.getClass.getName val className = supervisor.getClass.getName
val currentSupervisors = { val currentSupervisors = {
val list = actors.get(className) val list = actors.get(className)
if (list eq null) List[ActorID]() if (list eq null) List[ActorRef]()
else list else list
} }
actors.put(className, supervisor.self :: currentSupervisors) actors.put(className, supervisor.self :: currentSupervisors)

View file

@ -7,7 +7,7 @@ package se.scalablesolutions.akka.config
import com.google.inject._ import com.google.inject._
import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.actor.{Supervisor, ActiveObject, Dispatcher, ActorID} import se.scalablesolutions.akka.actor.{Supervisor, ActiveObject, Dispatcher, ActorRef}
import se.scalablesolutions.akka.remote.RemoteServer import se.scalablesolutions.akka.remote.RemoteServer
import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.util.Logging
@ -94,7 +94,7 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
.actorsFor(RemoteServer.Address(component.remoteAddress.get.hostname, component.remoteAddress.get.port)) .actorsFor(RemoteServer.Address(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
.activeObjects.put(targetClass.getName, proxy) .activeObjects.put(targetClass.getName, proxy)
} }
supervised ::= Supervise(new ActorID(() => actor), component.lifeCycle) supervised ::= Supervise(new ActorRef(() => actor), component.lifeCycle)
activeObjectRegistry.put(targetClass, (proxy, proxy, component)) activeObjectRegistry.put(targetClass, (proxy, proxy, component))
new DependencyBinding(targetClass, proxy) new DependencyBinding(targetClass, proxy)
} }
@ -116,7 +116,7 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
.actorsFor(RemoteServer.Address(component.remoteAddress.get.hostname, component.remoteAddress.get.port)) .actorsFor(RemoteServer.Address(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
.activeObjects.put(targetClass.getName, proxy) .activeObjects.put(targetClass.getName, proxy)
} }
supervised ::= Supervise(new ActorID(() => actor), component.lifeCycle) supervised ::= Supervise(new ActorRef(() => actor), component.lifeCycle)
activeObjectRegistry.put(targetClass, (proxy, targetInstance, component)) activeObjectRegistry.put(targetClass, (proxy, targetInstance, component))
new DependencyBinding(targetClass, proxy) new DependencyBinding(targetClass, proxy)
} }

View file

@ -4,7 +4,7 @@
package se.scalablesolutions.akka.config package se.scalablesolutions.akka.config
import se.scalablesolutions.akka.actor.{Actor, ActorID} import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.dispatch.MessageDispatcher import se.scalablesolutions.akka.dispatch.MessageDispatcher
sealed abstract class FaultHandlingStrategy sealed abstract class FaultHandlingStrategy
@ -25,12 +25,12 @@ object ScalaConfig {
case class SupervisorConfig(restartStrategy: RestartStrategy, worker: List[Server]) extends Server case class SupervisorConfig(restartStrategy: RestartStrategy, worker: List[Server]) extends Server
class Supervise(val actorId: ActorID, val lifeCycle: LifeCycle, _remoteAddress: RemoteAddress) extends Server { class Supervise(val actorId: ActorRef, val lifeCycle: LifeCycle, _remoteAddress: RemoteAddress) extends Server {
val remoteAddress: Option[RemoteAddress] = if (_remoteAddress eq null) None else Some(_remoteAddress) val remoteAddress: Option[RemoteAddress] = if (_remoteAddress eq null) None else Some(_remoteAddress)
} }
object Supervise { object Supervise {
def apply(actorId: ActorID, lifeCycle: LifeCycle, remoteAddress: RemoteAddress) = new Supervise(actorId, lifeCycle, remoteAddress) def apply(actorId: ActorRef, lifeCycle: LifeCycle, remoteAddress: RemoteAddress) = new Supervise(actorId, lifeCycle, remoteAddress)
def apply(actorId: ActorID, lifeCycle: LifeCycle) = new Supervise(actorId, lifeCycle, null) def apply(actorId: ActorRef, lifeCycle: LifeCycle) = new Supervise(actorId, lifeCycle, null)
def unapply(supervise: Supervise) = Some((supervise.actorId, supervise.lifeCycle, supervise.remoteAddress)) def unapply(supervise: Supervise) = Some((supervise.actorId, supervise.lifeCycle, supervise.remoteAddress))
} }
@ -227,7 +227,7 @@ object JavaConfig {
intf, target, lifeCycle.transform, timeout, transactionRequired, dispatcher, intf, target, lifeCycle.transform, timeout, transactionRequired, dispatcher,
if (remoteAddress ne null) se.scalablesolutions.akka.config.ScalaConfig.RemoteAddress(remoteAddress.hostname, remoteAddress.port) else null) if (remoteAddress ne null) se.scalablesolutions.akka.config.ScalaConfig.RemoteAddress(remoteAddress.hostname, remoteAddress.port) else null)
def newSupervised(actorId: ActorID) = def newSupervised(actorId: ActorRef) =
se.scalablesolutions.akka.config.ScalaConfig.Supervise(actorId, lifeCycle.transform) se.scalablesolutions.akka.config.ScalaConfig.Supervise(actorId, lifeCycle.transform)
} }

View file

@ -7,7 +7,7 @@ package se.scalablesolutions.akka.dispatch
import java.util.{LinkedList, Queue, List} import java.util.{LinkedList, Queue, List}
import java.util.HashMap import java.util.HashMap
import se.scalablesolutions.akka.actor.{ActorMessageInvoker, Actor, ActorID} import se.scalablesolutions.akka.actor.{ActorMessageInvoker, Actor, ActorRef}
abstract class AbstractReactorBasedEventDrivenDispatcher(val name: String) extends MessageDispatcher { abstract class AbstractReactorBasedEventDrivenDispatcher(val name: String) extends MessageDispatcher {
@volatile protected var active: Boolean = false @volatile protected var active: Boolean = false
@ -18,12 +18,12 @@ abstract class AbstractReactorBasedEventDrivenDispatcher(val name: String) exten
def dispatch(invocation: MessageInvocation) = queue.append(invocation) def dispatch(invocation: MessageInvocation) = queue.append(invocation)
override def register(actorId: ActorID) = synchronized { override def register(actorId: ActorRef) = synchronized {
messageInvokers.put(actorId, new ActorMessageInvoker(actorId)) messageInvokers.put(actorId, new ActorMessageInvoker(actorId))
super.register(actorId) super.register(actorId)
} }
override def unregister(actorId: ActorID) = synchronized { override def unregister(actorId: ActorRef) = synchronized {
messageInvokers.remove(actorId) messageInvokers.remove(actorId)
super.unregister(actorId) super.unregister(actorId)
} }

View file

@ -4,7 +4,7 @@
package se.scalablesolutions.akka.dispatch package se.scalablesolutions.akka.dispatch
import se.scalablesolutions.akka.actor.{Actor, ActorID} import se.scalablesolutions.akka.actor.{Actor, ActorRef}
/** /**
* Scala API. Dispatcher factory. * Scala API. Dispatcher factory.
@ -40,7 +40,7 @@ import se.scalablesolutions.akka.actor.{Actor, ActorID}
*/ */
object Dispatchers { object Dispatchers {
object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global") { object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global") {
override def register(actor: ActorID) = { override def register(actor: ActorRef) = {
if (isShutdown) init if (isShutdown) init
super.register(actor) super.register(actor)
} }

View file

@ -6,7 +6,7 @@ package se.scalablesolutions.akka.dispatch
import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.CopyOnWriteArrayList
import se.scalablesolutions.akka.actor.{Actor, ActorID} import se.scalablesolutions.akka.actor.{Actor, ActorRef}
/** /**
* An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed
@ -31,12 +31,12 @@ import se.scalablesolutions.akka.actor.{Actor, ActorID}
class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends MessageDispatcher with ThreadPoolBuilder { class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends MessageDispatcher with ThreadPoolBuilder {
@volatile private var active: Boolean = false @volatile private var active: Boolean = false
implicit def actorId2actor(actorId: ActorID): Actor = actorId.actor implicit def actorId2actor(actorId: ActorRef): Actor = actorId.actor
/** Type of the actors registered in this dispatcher. */ /** Type of the actors registered in this dispatcher. */
private var actorType:Option[Class[_]] = None private var actorType:Option[Class[_]] = None
private val pooledActors = new CopyOnWriteArrayList[ActorID] private val pooledActors = new CopyOnWriteArrayList[ActorRef]
/** The index in the pooled actors list which was last used to steal work */ /** The index in the pooled actors list which was last used to steal work */
@volatile private var lastThiefIndex = 0 @volatile private var lastThiefIndex = 0
@ -68,7 +68,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
* *
* @return true if the mailbox was processed, false otherwise * @return true if the mailbox was processed, false otherwise
*/ */
private def tryProcessMailbox(receiver: ActorID): Boolean = { private def tryProcessMailbox(receiver: ActorRef): Boolean = {
var lockAcquiredOnce = false var lockAcquiredOnce = false
val lock = receiver.actor._dispatcherLock val lock = receiver.actor._dispatcherLock
// this do-wile loop is required to prevent missing new messages between the end of processing // this do-wile loop is required to prevent missing new messages between the end of processing
@ -90,7 +90,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
/** /**
* Process the messages in the mailbox of the given actor. * Process the messages in the mailbox of the given actor.
*/ */
private def processMailbox(receiver: ActorID) = { private def processMailbox(receiver: ActorRef) = {
var messageInvocation = receiver._mailbox.poll var messageInvocation = receiver._mailbox.poll
while (messageInvocation != null) { while (messageInvocation != null) {
messageInvocation.invoke messageInvocation.invoke
@ -98,9 +98,9 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
} }
} }
private def findThief(receiver: ActorID): Option[ActorID] = { private def findThief(receiver: ActorRef): Option[ActorRef] = {
// copy to prevent concurrent modifications having any impact // copy to prevent concurrent modifications having any impact
val actors = pooledActors.toArray(new Array[ActorID](pooledActors.size)) val actors = pooledActors.toArray(new Array[ActorRef](pooledActors.size))
var i = lastThiefIndex var i = lastThiefIndex
if (i > actors.size) if (i > actors.size)
i = 0 i = 0
@ -108,7 +108,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
// we risk to pick a thief which is unregistered from the dispatcher in the meantime, but that typically means // we risk to pick a thief which is unregistered from the dispatcher in the meantime, but that typically means
// the dispatcher is being shut down... // the dispatcher is being shut down...
doFindThief(receiver, actors, i) match { doFindThief(receiver, actors, i) match {
case (thief: Option[ActorID], index: Int) => { case (thief: Option[ActorRef], index: Int) => {
lastThiefIndex = (index + 1) % actors.size lastThiefIndex = (index + 1) % actors.size
return thief return thief
} }
@ -123,7 +123,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
* @param startIndex first index to start looking in the list (i.e. for round robin) * @param startIndex first index to start looking in the list (i.e. for round robin)
* @return the thief (or None) and the new index to start searching next time * @return the thief (or None) and the new index to start searching next time
*/ */
private def doFindThief(receiver: ActorID, actors: Array[ActorID], startIndex: Int): (Option[ActorID], Int) = { private def doFindThief(receiver: ActorRef, actors: Array[ActorRef], startIndex: Int): (Option[ActorRef], Int) = {
for (i <- 0 to actors.length) { for (i <- 0 to actors.length) {
val index = (i + startIndex) % actors.length val index = (i + startIndex) % actors.length
val actor = actors(index) val actor = actors(index)
@ -140,7 +140,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
* Try donating messages to the thief and processing the thiefs mailbox. Doesn't do anything if we can not acquire * Try donating messages to the thief and processing the thiefs mailbox. Doesn't do anything if we can not acquire
* the thiefs dispatching lock, because in that case another thread is already processing the thiefs mailbox. * the thiefs dispatching lock, because in that case another thread is already processing the thiefs mailbox.
*/ */
private def tryDonateAndProcessMessages(receiver: ActorID, thief: ActorID) = { private def tryDonateAndProcessMessages(receiver: ActorRef, thief: ActorRef) = {
if (thief._dispatcherLock.tryLock) { if (thief._dispatcherLock.tryLock) {
try { try {
donateAndProcessMessages(receiver, thief) donateAndProcessMessages(receiver, thief)
@ -153,7 +153,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
/** /**
* Donate messages to the thief and process them on the thief as long as the receiver has more messages. * Donate messages to the thief and process them on the thief as long as the receiver has more messages.
*/ */
private def donateAndProcessMessages(receiver: ActorID, thief: ActorID): Unit = { private def donateAndProcessMessages(receiver: ActorRef, thief: ActorRef): Unit = {
donateMessage(receiver, thief) match { donateMessage(receiver, thief) match {
case None => { case None => {
// no more messages to donate // no more messages to donate
@ -169,7 +169,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
/** /**
* Steal a message from the receiver and give it to the thief. * Steal a message from the receiver and give it to the thief.
*/ */
private def donateMessage(receiver: ActorID, thief: ActorID): Option[MessageInvocation] = { private def donateMessage(receiver: ActorRef, thief: ActorRef): Option[MessageInvocation] = {
val donated = receiver._mailbox.pollLast val donated = receiver._mailbox.pollLast
if (donated != null) { if (donated != null) {
thief.self ! donated.message thief.self ! donated.message
@ -193,20 +193,20 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
private[akka] def init = withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool private[akka] def init = withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool
override def register(actorId: ActorID) = { override def register(actorId: ActorRef) = {
verifyActorsAreOfSameType(actorId) verifyActorsAreOfSameType(actorId)
pooledActors.add(actorId) pooledActors.add(actorId)
super.register(actorId) super.register(actorId)
} }
override def unregister(actorId: ActorID) = { override def unregister(actorId: ActorRef) = {
pooledActors.remove(actorId) pooledActors.remove(actorId)
super.unregister(actorId) super.unregister(actorId)
} }
def usesActorMailbox = true def usesActorMailbox = true
private def verifyActorsAreOfSameType(newActorId: ActorID) = { private def verifyActorsAreOfSameType(newActorId: ActorRef) = {
actorType match { actorType match {
case None => { case None => {
actorType = Some(newActorId.actor.getClass) actorType = Some(newActorId.actor.getClass)

View file

@ -7,15 +7,15 @@ package se.scalablesolutions.akka.dispatch
import java.util.List import java.util.List
import se.scalablesolutions.akka.util.{HashCode, Logging} import se.scalablesolutions.akka.util.{HashCode, Logging}
import se.scalablesolutions.akka.actor.{Actor, ActorID} import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import org.multiverse.commitbarriers.CountDownCommitBarrier import org.multiverse.commitbarriers.CountDownCommitBarrier
final class MessageInvocation(val receiver: ActorID, final class MessageInvocation(val receiver: ActorRef,
val message: Any, val message: Any,
val replyTo : Option[Either[ActorID, CompletableFuture[Any]]], val replyTo : Option[Either[ActorRef, CompletableFuture[Any]]],
val transactionSet: Option[CountDownCommitBarrier]) { val transactionSet: Option[CountDownCommitBarrier]) {
if (receiver eq null) throw new IllegalArgumentException("receiver is null") if (receiver eq null) throw new IllegalArgumentException("receiver is null")
@ -56,12 +56,12 @@ trait MessageInvoker {
} }
trait MessageDispatcher extends Logging { trait MessageDispatcher extends Logging {
protected val references = new ConcurrentHashMap[String, ActorID] protected val references = new ConcurrentHashMap[String, ActorRef]
def dispatch(invocation: MessageInvocation) def dispatch(invocation: MessageInvocation)
def start def start
def shutdown def shutdown
def register(actorId: ActorID) = references.put(actorId.uuid, actorId) def register(actorId: ActorRef) = references.put(actorId.uuid, actorId)
def unregister(actorId: ActorID) = { def unregister(actorId: ActorRef) = {
references.remove(actorId.uuid) references.remove(actorId.uuid)
if (canBeShutDown) if (canBeShutDown)
shutdown // shut down in the dispatcher's references is zero shutdown // shut down in the dispatcher's references is zero

View file

@ -7,7 +7,7 @@ package se.scalablesolutions.akka.dispatch
import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.LinkedBlockingQueue
import java.util.Queue import java.util.Queue
import se.scalablesolutions.akka.actor.{Actor, ActorID, ActorMessageInvoker} import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorMessageInvoker}
/** /**
* Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue. * Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue.
@ -17,7 +17,7 @@ import se.scalablesolutions.akka.actor.{Actor, ActorID, ActorMessageInvoker}
class ThreadBasedDispatcher private[akka] (val name: String, val messageHandler: MessageInvoker) class ThreadBasedDispatcher private[akka] (val name: String, val messageHandler: MessageInvoker)
extends MessageDispatcher { extends MessageDispatcher {
def this(actor: Actor) = this(actor.getClass.getName, new ActorMessageInvoker(new ActorID(() => actor))) def this(actor: Actor) = this(actor.getClass.getName, new ActorMessageInvoker(new ActorRef(() => actor)))
private val queue = new BlockingMessageQueue(name) private val queue = new BlockingMessageQueue(name)
private var selectorThread: Thread = _ private var selectorThread: Thread = _

View file

@ -5,7 +5,7 @@
package se.scalablesolutions.akka.remote package se.scalablesolutions.akka.remote
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequest, RemoteReply} import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequest, RemoteReply}
import se.scalablesolutions.akka.actor.{Exit, Actor, ActorID} import se.scalablesolutions.akka.actor.{Exit, Actor, ActorRef}
import se.scalablesolutions.akka.dispatch.{DefaultCompletableFuture, CompletableFuture} import se.scalablesolutions.akka.dispatch.{DefaultCompletableFuture, CompletableFuture}
import se.scalablesolutions.akka.util.{UUID, Logging} import se.scalablesolutions.akka.util.{UUID, Logging}
import se.scalablesolutions.akka.config.Config.config import se.scalablesolutions.akka.config.Config.config
@ -27,6 +27,8 @@ import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable.{HashSet, HashMap} import scala.collection.mutable.{HashSet, HashMap}
/** /**
* Atomic remote request/reply message id generator.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
object RemoteRequestIdFactory { object RemoteRequestIdFactory {
@ -41,6 +43,69 @@ case class RemoteClientError(cause: Throwable) extends RemoteClientLifeCycleEven
case class RemoteClientDisconnected(host: String, port: Int) extends RemoteClientLifeCycleEvent case class RemoteClientDisconnected(host: String, port: Int) extends RemoteClientLifeCycleEvent
case class RemoteClientConnected(host: String, port: Int) extends RemoteClientLifeCycleEvent case class RemoteClientConnected(host: String, port: Int) extends RemoteClientLifeCycleEvent
/**
* Remote Actor proxy factory.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
private[akka] object RemoteActorProxy {
def apply(uuid: String, className: String, hostname: String, port: Int, timeout: Long): ActorRef =
new ActorRef(() => new RemoteActorProxy(uuid, className, hostname, port, timeout))
}
/**
* Remote Actor proxy.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
private[akka] class RemoteActorProxy private (
uuid: String, className: String, hostname: String, port: Int, timeOut: Long) extends Actor {
start
val remoteClient = RemoteClient.clientFor(hostname, port)
override def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = {
val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
.setTarget(className)
.setTimeout(timeOut)
.setUuid(uuid)
.setIsActor(true)
.setIsOneWay(true)
.setIsEscaped(false)
if (senderOption.isDefined) {
val sender = senderOption.get.actor
requestBuilder.setSourceTarget(sender.getClass.getName)
requestBuilder.setSourceUuid(sender.uuid)
val (host, port) = sender._replyToAddress.map(address =>
(address.getHostName, address.getPort)).getOrElse((Actor.HOSTNAME, Actor.PORT))
requestBuilder.setSourceHostname(host)
requestBuilder.setSourcePort(port)
}
RemoteProtocolBuilder.setMessage(message, requestBuilder)
remoteClient.send[Any](requestBuilder.build, None)
}
override def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
message: Any,
timeout: Long,
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
.setTarget(className)
.setTimeout(timeout)
.setUuid(uuid)
.setIsActor(true)
.setIsOneWay(false)
.setIsEscaped(false)
RemoteProtocolBuilder.setMessage(message, requestBuilder)
val future = remoteClient.send(requestBuilder.build, senderFuture)
if (future.isDefined) future.get
else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
}
def receive = {case _ => {}}
}
/** /**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
@ -53,62 +118,17 @@ object RemoteClient extends Logging {
// FIXME: simplify overloaded methods when we have Scala 2.8 // FIXME: simplify overloaded methods when we have Scala 2.8
def actorFor(className: String, hostname: String, port: Int): ActorID = def actorFor(className: String, hostname: String, port: Int): ActorRef =
actorFor(className, className, 5000L, hostname, port) actorFor(className, className, 5000L, hostname, port)
def actorFor(actorId: String, className: String, hostname: String, port: Int): ActorID = def actorFor(actorId: String, className: String, hostname: String, port: Int): ActorRef =
actorFor(actorId, className, 5000L, hostname, port) actorFor(actorId, className, 5000L, hostname, port)
def actorFor(className: String, timeout: Long, hostname: String, port: Int): ActorID = def actorFor(className: String, timeout: Long, hostname: String, port: Int): ActorRef =
actorFor(className, className, timeout, hostname, port) actorFor(className, className, timeout, hostname, port)
def actorFor(actorId: String, className: String, timeout: Long, hostname: String, port: Int): ActorID = new ActorID(() => def actorFor(actorId: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef =
new Actor { RemoteActorProxy(actorId, className, hostname, port, timeout)
start
val remoteClient = RemoteClient.clientFor(hostname, port)
override def postMessageToMailbox(message: Any, sender: Option[ActorID]): Unit = {
val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
.setTarget(className)
.setTimeout(timeout)
.setUuid(actorId)
.setIsActor(true)
.setIsOneWay(true)
.setIsEscaped(false)
if (sender.isDefined) {
val sndr = sender.get.actor
requestBuilder.setSourceTarget(sndr.getClass.getName)
requestBuilder.setSourceUuid(sndr.uuid)
val (host, port) = sndr._replyToAddress.map(a => (a.getHostName, a.getPort)).getOrElse((Actor.HOSTNAME, Actor.PORT))
requestBuilder.setSourceHostname(host)
requestBuilder.setSourcePort(port)
}
RemoteProtocolBuilder.setMessage(message, requestBuilder)
remoteClient.send[Any](requestBuilder.build, None)
}
override def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
message: Any,
timeout: Long,
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
.setTarget(className)
.setTimeout(timeout)
.setUuid(actorId)
.setIsActor(true)
.setIsOneWay(false)
.setIsEscaped(false)
RemoteProtocolBuilder.setMessage(message, requestBuilder)
val future = remoteClient.send(requestBuilder.build, senderFuture)
if (future.isDefined) future.get
else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
}
def receive = {case _ => {}}
}
)
def clientFor(hostname: String, port: Int): RemoteClient = clientFor(new InetSocketAddress(hostname, port)) def clientFor(hostname: String, port: Int): RemoteClient = clientFor(new InetSocketAddress(hostname, port))
@ -174,8 +194,8 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging {
@volatile private[remote] var isRunning = false @volatile private[remote] var isRunning = false
private val futures = new ConcurrentHashMap[Long, CompletableFuture[_]] private val futures = new ConcurrentHashMap[Long, CompletableFuture[_]]
private val supervisors = new ConcurrentHashMap[String, ActorID] private val supervisors = new ConcurrentHashMap[String, ActorRef]
private[remote] val listeners = new ConcurrentSkipListSet[ActorID] private[remote] val listeners = new ConcurrentSkipListSet[ActorRef]
private val channelFactory = new NioClientSocketChannelFactory( private val channelFactory = new NioClientSocketChannelFactory(
Executors.newCachedThreadPool, Executors.newCachedThreadPool,
@ -200,7 +220,7 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging {
val channel = connection.awaitUninterruptibly.getChannel val channel = connection.awaitUninterruptibly.getChannel
openChannels.add(channel) openChannels.add(channel)
if (!connection.isSuccess) { if (!connection.isSuccess) {
listeners.toArray.foreach(l => l.asInstanceOf[ActorID] ! RemoteClientError(connection.getCause)) listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(connection.getCause))
log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port) log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port)
} }
isRunning = true isRunning = true
@ -232,21 +252,21 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging {
} }
} else { } else {
val exception = new IllegalStateException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.") val exception = new IllegalStateException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.")
listeners.toArray.foreach(l => l.asInstanceOf[ActorID] ! RemoteClientError(exception)) listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(exception))
throw exception throw exception
} }
def registerSupervisorForActor(actorId: ActorID) = def registerSupervisorForActor(actorId: ActorRef) =
if (!actorId.supervisor.isDefined) throw new IllegalStateException("Can't register supervisor for " + actorId + " since it is not under supervision") if (!actorId.supervisor.isDefined) throw new IllegalStateException("Can't register supervisor for " + actorId + " since it is not under supervision")
else supervisors.putIfAbsent(actorId.supervisor.get.uuid, actorId) else supervisors.putIfAbsent(actorId.supervisor.get.uuid, actorId)
def deregisterSupervisorForActor(actorId: ActorID) = def deregisterSupervisorForActor(actorId: ActorRef) =
if (!actorId.supervisor.isDefined) throw new IllegalStateException("Can't unregister supervisor for " + actorId + " since it is not under supervision") if (!actorId.supervisor.isDefined) throw new IllegalStateException("Can't unregister supervisor for " + actorId + " since it is not under supervision")
else supervisors.remove(actorId.supervisor.get.uuid) else supervisors.remove(actorId.supervisor.get.uuid)
def registerListener(actorId: ActorID) = listeners.add(actorId) def registerListener(actorId: ActorRef) = listeners.add(actorId)
def deregisterListener(actorId: ActorID) = listeners.remove(actorId) def deregisterListener(actorId: ActorRef) = listeners.remove(actorId)
} }
/** /**
@ -254,7 +274,7 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging {
*/ */
class RemoteClientPipelineFactory(name: String, class RemoteClientPipelineFactory(name: String,
futures: ConcurrentMap[Long, CompletableFuture[_]], futures: ConcurrentMap[Long, CompletableFuture[_]],
supervisors: ConcurrentMap[String, ActorID], supervisors: ConcurrentMap[String, ActorRef],
bootstrap: ClientBootstrap, bootstrap: ClientBootstrap,
remoteAddress: SocketAddress, remoteAddress: SocketAddress,
timer: HashedWheelTimer, timer: HashedWheelTimer,
@ -285,7 +305,7 @@ class RemoteClientPipelineFactory(name: String,
@ChannelHandler.Sharable @ChannelHandler.Sharable
class RemoteClientHandler(val name: String, class RemoteClientHandler(val name: String,
val futures: ConcurrentMap[Long, CompletableFuture[_]], val futures: ConcurrentMap[Long, CompletableFuture[_]],
val supervisors: ConcurrentMap[String, ActorID], val supervisors: ConcurrentMap[String, ActorRef],
val bootstrap: ClientBootstrap, val bootstrap: ClientBootstrap,
val remoteAddress: SocketAddress, val remoteAddress: SocketAddress,
val timer: HashedWheelTimer, val timer: HashedWheelTimer,
@ -325,12 +345,12 @@ class RemoteClientHandler(val name: String,
futures.remove(reply.getId) futures.remove(reply.getId)
} else { } else {
val exception = new IllegalArgumentException("Unknown message received in remote client handler: " + result) val exception = new IllegalArgumentException("Unknown message received in remote client handler: " + result)
client.listeners.toArray.foreach(l => l.asInstanceOf[ActorID] ! RemoteClientError(exception)) client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(exception))
throw exception throw exception
} }
} catch { } catch {
case e: Exception => case e: Exception =>
client.listeners.toArray.foreach(l => l.asInstanceOf[ActorID] ! RemoteClientError(e)) client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(e))
log.error("Unexpected exception in remote client handler: %s", e) log.error("Unexpected exception in remote client handler: %s", e)
throw e throw e
} }
@ -345,7 +365,7 @@ class RemoteClientHandler(val name: String,
// Wait until the connection attempt succeeds or fails. // Wait until the connection attempt succeeds or fails.
client.connection.awaitUninterruptibly client.connection.awaitUninterruptibly
if (!client.connection.isSuccess) { if (!client.connection.isSuccess) {
client.listeners.toArray.foreach(l => l.asInstanceOf[ActorID] ! RemoteClientError(client.connection.getCause)) client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(client.connection.getCause))
log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress) log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress)
} }
} }
@ -353,17 +373,17 @@ class RemoteClientHandler(val name: String,
} }
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
client.listeners.toArray.foreach(l => l.asInstanceOf[ActorID] ! RemoteClientConnected(client.hostname, client.port)) client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientConnected(client.hostname, client.port))
log.debug("Remote client connected to [%s]", ctx.getChannel.getRemoteAddress) log.debug("Remote client connected to [%s]", ctx.getChannel.getRemoteAddress)
} }
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
client.listeners.toArray.foreach(l => l.asInstanceOf[ActorID] ! RemoteClientDisconnected(client.hostname, client.port)) client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientDisconnected(client.hostname, client.port))
log.debug("Remote client disconnected from [%s]", ctx.getChannel.getRemoteAddress) log.debug("Remote client disconnected from [%s]", ctx.getChannel.getRemoteAddress)
} }
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
client.listeners.toArray.foreach(l => l.asInstanceOf[ActorID] ! RemoteClientError(event.getCause)) client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(event.getCause))
log.error(event.getCause, "Unexpected exception from downstream in remote client") log.error(event.getCause, "Unexpected exception from downstream in remote client")
event.getChannel.close event.getChannel.close
} }

View file

@ -89,7 +89,7 @@ object RemoteServer {
} }
class RemoteActorSet { class RemoteActorSet {
val actors = new ConcurrentHashMap[String, ActorID] val actors = new ConcurrentHashMap[String, ActorRef]
val activeObjects = new ConcurrentHashMap[String, AnyRef] val activeObjects = new ConcurrentHashMap[String, AnyRef]
} }
@ -168,7 +168,8 @@ class RemoteServer extends Logging {
log.info("Starting remote server at [%s:%s]", hostname, port) log.info("Starting remote server at [%s:%s]", hostname, port)
RemoteServer.register(hostname, port, this) RemoteServer.register(hostname, port, this)
val remoteActorSet = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)) val remoteActorSet = RemoteServer.actorsFor(RemoteServer.Address(hostname, port))
val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, loader, remoteActorSet.actors, remoteActorSet.activeObjects) val pipelineFactory = new RemoteServerPipelineFactory(
name, openChannels, loader, remoteActorSet.actors, remoteActorSet.activeObjects)
bootstrap.setPipelineFactory(pipelineFactory) bootstrap.setPipelineFactory(pipelineFactory)
bootstrap.setOption("child.tcpNoDelay", true) bootstrap.setOption("child.tcpNoDelay", true)
bootstrap.setOption("child.keepAlive", true) bootstrap.setOption("child.keepAlive", true)
@ -198,9 +199,9 @@ class RemoteServer extends Logging {
/** /**
* Register Remote Actor by the Actor's 'id' field. * Register Remote Actor by the Actor's 'id' field.
*/ */
def register(actor: ActorID) = synchronized { def register(actor: ActorRef) = synchronized {
if (_isRunning) { if (_isRunning) {
log.info("Registering server side remote actor [%s] with id [%s]", actor.getClass.getName, actor.id) log.info("Registering server side remote actor [%s] with id [%s]", actor.actorClass.getName, actor.id)
RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(actor.id, actor) RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(actor.id, actor)
} }
} }
@ -208,9 +209,9 @@ class RemoteServer extends Logging {
/** /**
* Register Remote Actor by a specific 'id' passed as argument. * Register Remote Actor by a specific 'id' passed as argument.
*/ */
def register(id: String, actor: ActorID) = synchronized { def register(id: String, actor: ActorRef) = synchronized {
if (_isRunning) { if (_isRunning) {
log.info("Registering server side remote actor [%s] with id [%s]", actor.getClass.getName, id) log.info("Registering server side remote actor [%s] with id [%s]", actor.actorClass.getName, id)
RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(id, actor) RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(id, actor)
} }
} }
@ -225,7 +226,7 @@ class RemoteServerPipelineFactory(
val name: String, val name: String,
val openChannels: ChannelGroup, val openChannels: ChannelGroup,
val loader: Option[ClassLoader], val loader: Option[ClassLoader],
val actors: JMap[String, ActorID], val actors: JMap[String, ActorRef],
val activeObjects: JMap[String, AnyRef]) extends ChannelPipelineFactory { val activeObjects: JMap[String, AnyRef]) extends ChannelPipelineFactory {
import RemoteServer._ import RemoteServer._
@ -256,7 +257,7 @@ class RemoteServerHandler(
val name: String, val name: String,
val openChannels: ChannelGroup, val openChannels: ChannelGroup,
val applicationLoader: Option[ClassLoader], val applicationLoader: Option[ClassLoader],
val actors: JMap[String, ActorID], val actors: JMap[String, ActorRef],
val activeObjects: JMap[String, AnyRef]) extends SimpleChannelUpstreamHandler with Logging { val activeObjects: JMap[String, AnyRef]) extends SimpleChannelUpstreamHandler with Logging {
val AW_PROXY_PREFIX = "$$ProxiedByAW".intern val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
@ -440,7 +441,7 @@ class RemoteServerHandler(
* If actor already created then just return it from the registry. * If actor already created then just return it from the registry.
* Does not start the actor. * Does not start the actor.
*/ */
private def createActor(name: String, uuid: String, timeout: Long): ActorID = { private def createActor(name: String, uuid: String, timeout: Long): ActorRef = {
val actorIdOrNull = actors.get(uuid) val actorIdOrNull = actors.get(uuid)
if (actorIdOrNull eq null) { if (actorIdOrNull eq null) {
try { try {
@ -451,7 +452,7 @@ class RemoteServerHandler(
newInstance._uuid = uuid newInstance._uuid = uuid
newInstance.timeout = timeout newInstance.timeout = timeout
newInstance._remoteAddress = None newInstance._remoteAddress = None
val actorId = new ActorID(() => newInstance) val actorId = new ActorRef(() => newInstance)
actors.put(uuid, actorId) actors.put(uuid, actorId)
actorId actorId
} catch { } catch {

View file

@ -4,7 +4,7 @@
package se.scalablesolutions.akka.patterns package se.scalablesolutions.akka.patterns
import se.scalablesolutions.akka.actor.ActorID import se.scalablesolutions.akka.actor.ActorRef
trait InfiniteIterator[T] extends Iterator[T] trait InfiniteIterator[T] extends Iterator[T]
@ -20,7 +20,7 @@ class CyclicIterator[T](items: List[T]) extends InfiniteIterator[T] {
} }
} }
class SmallestMailboxFirstIterator(items : List[ActorID]) extends InfiniteIterator[ActorID] { class SmallestMailboxFirstIterator(items : List[ActorRef]) extends InfiniteIterator[ActorRef] {
def hasNext = items != Nil def hasNext = items != Nil
def next = items.reduceLeft((a1, a2) => if (a1.mailboxSize < a2.mailboxSize) a1 else a2) def next = items.reduceLeft((a1, a2) => if (a1.mailboxSize < a2.mailboxSize) a1 else a2)

View file

@ -4,16 +4,16 @@
package se.scalablesolutions.akka.patterns package se.scalablesolutions.akka.patterns
import se.scalablesolutions.akka.actor.{Actor, ActorID} import se.scalablesolutions.akka.actor.{Actor, ActorRef}
sealed trait ListenerMessage sealed trait ListenerMessage
case class Listen(listener: ActorID) extends ListenerMessage case class Listen(listener: ActorRef) extends ListenerMessage
case class Deafen(listener: ActorID) extends ListenerMessage case class Deafen(listener: ActorRef) extends ListenerMessage
case class WithListeners(f: Set[ActorID] => Unit) extends ListenerMessage case class WithListeners(f: Set[ActorRef] => Unit) extends ListenerMessage
trait Listeners { self : Actor => trait Listeners { self : Actor =>
import se.scalablesolutions.akka.actor.Agent import se.scalablesolutions.akka.actor.Agent
private lazy val listeners = Agent(Set[ActorID]()) private lazy val listeners = Agent(Set[ActorRef]())
protected def listenerManagement : PartialFunction[Any,Unit] = { protected def listenerManagement : PartialFunction[Any,Unit] = {
case Listen(l) => listeners( _ + l) case Listen(l) => listeners( _ + l)

View file

@ -4,7 +4,7 @@
package se.scalablesolutions.akka.patterns package se.scalablesolutions.akka.patterns
import se.scalablesolutions.akka.actor.{Actor, ActorID} import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.actor.Actor._
object Patterns { object Patterns {
@ -27,24 +27,24 @@ object Patterns {
filter({case a if a.isInstanceOf[A] => interceptor(a)}, interceptee) filter({case a if a.isInstanceOf[A] => interceptor(a)}, interceptee)
//FIXME 2.8, use default params with CyclicIterator //FIXME 2.8, use default params with CyclicIterator
def loadBalancerActor(actors: => InfiniteIterator[ActorID]): ActorID = def loadBalancerActor(actors: => InfiniteIterator[ActorRef]): ActorRef =
newActor(() => new Actor with LoadBalancer { newActor(() => new Actor with LoadBalancer {
start start
val seq = actors val seq = actors
}) })
def dispatcherActor(routing: PF[Any, ActorID], msgTransformer: (Any) => Any): ActorID = def dispatcherActor(routing: PF[Any, ActorRef], msgTransformer: (Any) => Any): ActorRef =
newActor(() => new Actor with Dispatcher { newActor(() => new Actor with Dispatcher {
start start
override def transform(msg: Any) = msgTransformer(msg) override def transform(msg: Any) = msgTransformer(msg)
def routes = routing def routes = routing
}) })
def dispatcherActor(routing: PF[Any, ActorID]): ActorID = newActor(() => new Actor with Dispatcher { def dispatcherActor(routing: PF[Any, ActorRef]): ActorRef = newActor(() => new Actor with Dispatcher {
start start
def routes = routing def routes = routing
}) })
def loggerActor(actorToLog: ActorID, logger: (Any) => Unit): ActorID = def loggerActor(actorToLog: ActorRef, logger: (Any) => Unit): ActorRef =
dispatcherActor({case _ => actorToLog}, logger) dispatcherActor({case _ => actorToLog}, logger)
} }

View file

@ -4,13 +4,13 @@
package se.scalablesolutions.akka.patterns package se.scalablesolutions.akka.patterns
import se.scalablesolutions.akka.actor.{Actor, ActorID} import se.scalablesolutions.akka.actor.{Actor, ActorRef}
trait Dispatcher { self: Actor => trait Dispatcher { self: Actor =>
protected def transform(msg: Any): Any = msg protected def transform(msg: Any): Any = msg
protected def routes: PartialFunction[Any, ActorID] protected def routes: PartialFunction[Any, ActorRef]
protected def dispatch: PartialFunction[Any, Unit] = { protected def dispatch: PartialFunction[Any, Unit] = {
case a if routes.isDefinedAt(a) => case a if routes.isDefinedAt(a) =>
@ -22,7 +22,7 @@ trait Dispatcher { self: Actor =>
} }
trait LoadBalancer extends Dispatcher { self: Actor => trait LoadBalancer extends Dispatcher { self: Actor =>
protected def seq: InfiniteIterator[ActorID] protected def seq: InfiniteIterator[ActorRef]
protected def routes = { case x if seq.hasNext => seq.next } protected def routes = { case x if seq.hasNext => seq.next }
} }

View file

@ -7,7 +7,7 @@ package se.scalablesolutions.akka.stm
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue} import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue}
import se.scalablesolutions.akka.actor.{Actor, ActorID} import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.dispatch.CompletableFuture import se.scalablesolutions.akka.dispatch.CompletableFuture
@ -61,7 +61,7 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture
private case object Get extends DataFlowVariableMessage private case object Get extends DataFlowVariableMessage
private val value = new AtomicReference[Option[T]](None) private val value = new AtomicReference[Option[T]](None)
private val blockedReaders = new ConcurrentLinkedQueue[ActorID] private val blockedReaders = new ConcurrentLinkedQueue[ActorRef]
private class In[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor { private class In[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor {
timeout = TIME_OUT timeout = TIME_OUT

View file

@ -17,7 +17,7 @@ object ActorFireForgetRequestReplySpec {
} }
} }
class SenderActor(replyActor: ActorID) extends Actor { class SenderActor(replyActor: ActorRef) extends Actor {
dispatcher = Dispatchers.newThreadBasedDispatcher(this) dispatcher = Dispatchers.newThreadBasedDispatcher(this)
def receive = { def receive = {

View file

@ -45,7 +45,7 @@ object SendOneWayAndReplySenderActor {
} }
class SendOneWayAndReplySenderActor extends Actor { class SendOneWayAndReplySenderActor extends Actor {
var state: Option[AnyRef] = None var state: Option[AnyRef] = None
var sendTo: ActorID = _ var sendTo: ActorRef = _
var latch: CountDownLatch = _ var latch: CountDownLatch = _
def sendOff = sendTo ! "Hello" def sendOff = sendTo ! "Hello"

View file

@ -8,7 +8,7 @@ import Actor._
object ForwardActorSpec { object ForwardActorSpec {
object ForwardState { object ForwardState {
var sender: Option[ActorID] = None var sender: Option[ActorRef] = None
} }
class ReceiverActor extends Actor { class ReceiverActor extends Actor {

View file

@ -16,13 +16,13 @@ case class SetMapState(key: String, value: String)
case class SetVectorState(key: String) case class SetVectorState(key: String)
case class SetRefState(key: String) case class SetRefState(key: String)
case class Success(key: String, value: String) case class Success(key: String, value: String)
case class Failure(key: String, value: String, failer: ActorID) case class Failure(key: String, value: String, failer: ActorRef)
case class SetMapStateOneWay(key: String, value: String) case class SetMapStateOneWay(key: String, value: String)
case class SetVectorStateOneWay(key: String) case class SetVectorStateOneWay(key: String)
case class SetRefStateOneWay(key: String) case class SetRefStateOneWay(key: String)
case class SuccessOneWay(key: String, value: String) case class SuccessOneWay(key: String, value: String)
case class FailureOneWay(key: String, value: String, failer: ActorID) case class FailureOneWay(key: String, value: String, failer: ActorRef)
case object GetNotifier case object GetNotifier

View file

@ -83,9 +83,9 @@ class RemoteSupervisorSpec extends JUnitSuite {
}).start }).start
Thread.sleep(1000) Thread.sleep(1000)
var pingpong1: ActorID = _ var pingpong1: ActorRef = _
var pingpong2: ActorID = _ var pingpong2: ActorRef = _
var pingpong3: ActorID = _ var pingpong3: ActorRef = _
@Test def shouldStartServer = { @Test def shouldStartServer = {
Log.messageLog.clear Log.messageLog.clear

View file

@ -13,7 +13,7 @@ object ServerInitiatedRemoteActorSpec {
val PORT = 9990 val PORT = 9990
var server: RemoteServer = null var server: RemoteServer = null
case class Send(actor: ActorID) case class Send(actor: ActorRef)
object RemoteActorSpecActorUnidirectional { object RemoteActorSpecActorUnidirectional {
val latch = new CountDownLatch(1) val latch = new CountDownLatch(1)
@ -43,13 +43,13 @@ object ServerInitiatedRemoteActorSpec {
class RemoteActorSpecActorAsyncSender extends Actor { class RemoteActorSpecActorAsyncSender extends Actor {
start start
def receive = { def receive = {
case Send(actor: ActorID) => case Send(actor: ActorRef) =>
actor ! "Hello" actor ! "Hello"
case "World" => case "World" =>
RemoteActorSpecActorAsyncSender.latch.countDown RemoteActorSpecActorAsyncSender.latch.countDown
} }
def send(actor: ActorID) { def send(actor: ActorRef) {
self ! Send(actor) self ! Send(actor)
} }
} }

View file

@ -69,9 +69,9 @@ object SupervisorSpec {
class SupervisorSpec extends JUnitSuite { class SupervisorSpec extends JUnitSuite {
import SupervisorSpec._ import SupervisorSpec._
var pingpong1: ActorID = _ var pingpong1: ActorRef = _
var pingpong2: ActorID = _ var pingpong2: ActorRef = _
var pingpong3: ActorID = _ var pingpong3: ActorRef = _
@Test def shouldStartServer = { @Test def shouldStartServer = {
messageLog.clear messageLog.clear

View file

@ -22,7 +22,7 @@
package se.scalablesolutions.akka.security package se.scalablesolutions.akka.security
import se.scalablesolutions.akka.actor.{Scheduler, Actor, ActorID, ActorRegistry} import se.scalablesolutions.akka.actor.{Scheduler, Actor, ActorRef, ActorRegistry}
import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.config.Config import se.scalablesolutions.akka.config.Config
@ -73,7 +73,7 @@ case class SpnegoCredentials(token: Array[Byte]) extends Credentials
* Jersey Filter for invocation intercept and authorization/authentication * Jersey Filter for invocation intercept and authorization/authentication
*/ */
class AkkaSecurityFilterFactory extends ResourceFilterFactory with Logging { class AkkaSecurityFilterFactory extends ResourceFilterFactory with Logging {
class Filter(actor: ActorID, rolesAllowed: Option[List[String]]) class Filter(actor: ActorRef, rolesAllowed: Option[List[String]])
extends ResourceFilter with ContainerRequestFilter with Logging { extends ResourceFilter with ContainerRequestFilter with Logging {
override def getRequestFilter: ContainerRequestFilter = this override def getRequestFilter: ContainerRequestFilter = this
@ -111,7 +111,7 @@ class AkkaSecurityFilterFactory extends ResourceFilterFactory with Logging {
* Currently we always take the first, since there usually should be at most one authentication actor, but a round-robin * Currently we always take the first, since there usually should be at most one authentication actor, but a round-robin
* strategy could be implemented in the future * strategy could be implemented in the future
*/ */
def authenticator: ActorID = ActorRegistry.actorsFor(authenticatorFQN).head def authenticator: ActorRef = ActorRegistry.actorsFor(authenticatorFQN).head
def mkFilter(roles: Option[List[String]]): java.util.List[ResourceFilter] = def mkFilter(roles: Option[List[String]]): java.util.List[ResourceFilter] =
java.util.Collections.singletonList(new Filter(authenticator, roles)) java.util.Collections.singletonList(new Filter(authenticator, roles))

View file

@ -1,6 +1,6 @@
package se.scalablesolutions.akka.persistence.cassandra package se.scalablesolutions.akka.persistence.cassandra
import se.scalablesolutions.akka.actor.{Actor, ActorID, Transactor} import se.scalablesolutions.akka.actor.{Actor, ActorRef, Transactor}
import Actor._ import Actor._
import org.junit.Test import org.junit.Test
@ -17,13 +17,13 @@ case class SetMapState(key: String, value: String)
case class SetVectorState(key: String) case class SetVectorState(key: String)
case class SetRefState(key: String) case class SetRefState(key: String)
case class Success(key: String, value: String) case class Success(key: String, value: String)
case class Failure(key: String, value: String, failer: ActorID) case class Failure(key: String, value: String, failer: ActorRef)
case class SetMapStateOneWay(key: String, value: String) case class SetMapStateOneWay(key: String, value: String)
case class SetVectorStateOneWay(key: String) case class SetVectorStateOneWay(key: String)
case class SetRefStateOneWay(key: String) case class SetRefStateOneWay(key: String)
case class SuccessOneWay(key: String, value: String) case class SuccessOneWay(key: String, value: String)
case class FailureOneWay(key: String, value: String, failer: ActorID) case class FailureOneWay(key: String, value: String, failer: ActorRef)
class CassandraPersistentActor extends Transactor { class CassandraPersistentActor extends Transactor {
timeout = 100000 timeout = 100000

View file

@ -7,7 +7,7 @@ import org.scalatest.junit.JUnitSuite
import _root_.dispatch.json.{JsNumber, JsValue} import _root_.dispatch.json.{JsNumber, JsValue}
import _root_.dispatch.json.Js._ import _root_.dispatch.json.Js._
import se.scalablesolutions.akka.actor.{Transactor, Actor, ActorID} import se.scalablesolutions.akka.actor.{Transactor, Actor, ActorRef}
import Actor._ import Actor._
/** /**
@ -24,8 +24,8 @@ import Actor._
*/ */
case class Balance(accountNo: String) case class Balance(accountNo: String)
case class Debit(accountNo: String, amount: BigInt, failer: ActorID) case class Debit(accountNo: String, amount: BigInt, failer: ActorRef)
case class MultiDebit(accountNo: String, amounts: List[BigInt], failer: ActorID) case class MultiDebit(accountNo: String, amounts: List[BigInt], failer: ActorRef)
case class Credit(accountNo: String, amount: BigInt) case class Credit(accountNo: String, amount: BigInt)
case class Log(start: Int, finish: Int) case class Log(start: Int, finish: Int)
case object LogSize case object LogSize

View file

@ -3,7 +3,7 @@ package se.scalablesolutions.akka.persistence.redis
import org.junit.{Test, Before} import org.junit.{Test, Before}
import org.junit.Assert._ import org.junit.Assert._
import se.scalablesolutions.akka.actor.{Actor, ActorID, Transactor} import se.scalablesolutions.akka.actor.{Actor, ActorRef, Transactor}
import Actor._ import Actor._
/** /**
@ -20,8 +20,8 @@ import Actor._
*/ */
case class Balance(accountNo: String) case class Balance(accountNo: String)
case class Debit(accountNo: String, amount: BigInt, failer: ActorID) case class Debit(accountNo: String, amount: BigInt, failer: ActorRef)
case class MultiDebit(accountNo: String, amounts: List[BigInt], failer: ActorID) case class MultiDebit(accountNo: String, amounts: List[BigInt], failer: ActorRef)
case class Credit(accountNo: String, amount: BigInt) case class Credit(accountNo: String, amount: BigInt)
case class Log(start: Int, finish: Int) case class Log(start: Int, finish: Int)
case object LogSize case object LogSize

View file

@ -3,7 +3,7 @@ package se.scalablesolutions.akka.persistence.redis
import org.junit.{Test, Before} import org.junit.{Test, Before}
import org.junit.Assert._ import org.junit.Assert._
import se.scalablesolutions.akka.actor.{Actor, ActorID, Transactor} import se.scalablesolutions.akka.actor.{Actor, ActorRef, Transactor}
import Actor._ import Actor._
/** /**
@ -15,7 +15,7 @@ import Actor._
case class NQ(accountNo: String) case class NQ(accountNo: String)
case object DQ case object DQ
case class MNDQ(accountNos: List[String], noOfDQs: Int, failer: ActorID) case class MNDQ(accountNos: List[String], noOfDQs: Int, failer: ActorRef)
case object SZ case object SZ
class QueueActor extends Transactor { class QueueActor extends Transactor {

View file

@ -7,7 +7,7 @@ import org.scalatest.BeforeAndAfterAll
import org.scalatest.junit.JUnitRunner import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith import org.junit.runner.RunWith
import se.scalablesolutions.akka.actor.{Actor, ActorID, Transactor} import se.scalablesolutions.akka.actor.{Actor, ActorRef, Transactor}
import Actor._ import Actor._
/** /**
@ -43,7 +43,7 @@ case class SCORE(h: Hacker)
case class RANGE(start: Int, end: Int) case class RANGE(start: Int, end: Int)
// add and remove subject to the condition that there will be at least 3 hackers // add and remove subject to the condition that there will be at least 3 hackers
case class MULTI(add: List[Hacker], rem: List[Hacker], failer: ActorID) case class MULTI(add: List[Hacker], rem: List[Hacker], failer: ActorRef)
class SortedSetActor extends Transactor { class SortedSetActor extends Transactor {
timeout = 100000 timeout = 100000

View file

@ -1,7 +1,7 @@
package sample.camel package sample.camel
import se.scalablesolutions.akka.actor.annotation.consume import se.scalablesolutions.akka.actor.annotation.consume
import se.scalablesolutions.akka.actor.{Actor, ActorID, RemoteActor} import se.scalablesolutions.akka.actor.{Actor, ActorRef, RemoteActor}
import se.scalablesolutions.akka.camel.{Producer, Message, Consumer} import se.scalablesolutions.akka.camel.{Producer, Message, Consumer}
import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.util.Logging
@ -51,7 +51,7 @@ class Consumer2 extends Actor {
} }
} }
class Consumer3(transformer: ActorID) extends Actor with Consumer { class Consumer3(transformer: ActorRef) extends Actor with Consumer {
def endpointUri = "jetty:http://0.0.0.0:8877/camel/welcome" def endpointUri = "jetty:http://0.0.0.0:8877/camel/welcome"
def receive = { def receive = {
@ -59,7 +59,7 @@ class Consumer3(transformer: ActorID) extends Actor with Consumer {
} }
} }
class Transformer(producer: ActorID) extends Actor { class Transformer(producer: ActorRef) extends Actor {
protected def receive = { protected def receive = {
case msg: Message => producer.forward(msg.transformBody[String]("- %s -" format _)) case msg: Message => producer.forward(msg.transformBody[String]("- %s -" format _))
} }
@ -80,7 +80,7 @@ class Publisher(name: String, uri: String) extends Actor with Producer {
protected def receive = produce protected def receive = produce
} }
class PublisherBridge(uri: String, publisher: ActorID) extends Actor with Consumer { class PublisherBridge(uri: String, publisher: ActorRef) extends Actor with Consumer {
def endpointUri = uri def endpointUri = uri
protected def receive = { protected def receive = {

View file

@ -1,6 +1,6 @@
package sample.camel package sample.camel
import se.scalablesolutions.akka.actor.{Actor, ActorID} import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.camel.Message import se.scalablesolutions.akka.camel.Message
import se.scalablesolutions.akka.remote.RemoteClient import se.scalablesolutions.akka.remote.RemoteClient
@ -15,7 +15,7 @@ object Application1 {
// //
def main(args: Array[String]) { def main(args: Array[String]) {
implicit val sender: Option[ActorID] = None implicit val sender: Option[ActorRef] = None
val actor1 = newActor[RemoteActor1] val actor1 = newActor[RemoteActor1]
val actor2 = RemoteClient.actorFor("remote2", "localhost", 7777) val actor2 = RemoteClient.actorFor("remote2", "localhost", 7777)

View file

@ -6,7 +6,7 @@ package sample.chat
import scala.collection.mutable.HashMap import scala.collection.mutable.HashMap
import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor, ActorID, RemoteActor} import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor, ActorRef, RemoteActor}
import se.scalablesolutions.akka.remote.{RemoteNode, RemoteClient} import se.scalablesolutions.akka.remote.{RemoteNode, RemoteClient}
import se.scalablesolutions.akka.persistence.common.PersistentVector import se.scalablesolutions.akka.persistence.common.PersistentVector
import se.scalablesolutions.akka.persistence.redis.RedisStorage import se.scalablesolutions.akka.persistence.redis.RedisStorage
@ -72,7 +72,7 @@ class ChatClient(val name: String) {
/** /**
* Internal chat client session. * Internal chat client session.
*/ */
class Session(user: String, storage: ActorID) extends Actor { class Session(user: String, storage: ActorRef) extends Actor {
private val loginTime = System.currentTimeMillis private val loginTime = System.currentTimeMillis
private var userLog: List[String] = Nil private var userLog: List[String] = Nil
@ -124,8 +124,8 @@ class RedisChatStorage extends ChatStorage {
*/ */
trait SessionManagement { this: Actor => trait SessionManagement { this: Actor =>
val storage: ActorID // needs someone to provide the ChatStorage val storage: ActorRef // needs someone to provide the ChatStorage
val sessions = new HashMap[String, ActorID] val sessions = new HashMap[String, ActorRef]
protected def sessionManagement: PartialFunction[Any, Unit] = { protected def sessionManagement: PartialFunction[Any, Unit] = {
case Login(username) => case Login(username) =>
@ -151,7 +151,7 @@ trait SessionManagement { this: Actor =>
* Uses self-type annotation (this: Actor =>) to declare that it needs to be mixed in with an Actor. * Uses self-type annotation (this: Actor =>) to declare that it needs to be mixed in with an Actor.
*/ */
trait ChatManagement { this: Actor => trait ChatManagement { this: Actor =>
val sessions: HashMap[String, ActorID] // needs someone to provide the Session map val sessions: HashMap[String, ActorRef] // needs someone to provide the Session map
protected def chatManagement: PartialFunction[Any, Unit] = { protected def chatManagement: PartialFunction[Any, Unit] = {
case msg @ ChatMessage(from, _) => sessions(from) ! msg case msg @ ChatMessage(from, _) => sessions(from) ! msg
@ -173,7 +173,7 @@ trait ChatServer extends Actor {
faultHandler = Some(OneForOneStrategy(5, 5000)) faultHandler = Some(OneForOneStrategy(5, 5000))
trapExit = List(classOf[Exception]) trapExit = List(classOf[Exception])
val storage: ActorID val storage: ActorRef
log.info("Chat service is starting up...") log.info("Chat service is starting up...")