Added isOrRemoteNode field to ActorRef

This commit is contained in:
Jonas Bonér 2010-05-08 12:13:44 +02:00
parent 4de5302804
commit 072bbe461c
4 changed files with 50 additions and 13 deletions

View file

@ -114,7 +114,7 @@ trait Producer { self: Actor =>
protected def produceAsync(msg: Any): Unit = {
val cmsg = Message.canonicalize(msg)
val sync = new ProducerResponseSender(
cmsg.headers(headersToCopy), this.replyTo, this)
cmsg.headers(headersToCopy), self.replyTo, this)
template.asyncCallback(endpointUri, createInOutExchange.fromRequestMessage(cmsg), sync)
}

View file

@ -25,6 +25,23 @@ import java.net.InetSocketAddress
import java.util.concurrent.locks.{Lock, ReentrantLock}
import java.util.{HashSet => JHashSet}
/*
trait ActorWithNestedReceive extends Actor {
import Actor.actor
private var nestedReactsProcessors: List[ActorRef] = Nil
private val processNestedReacts: PartialFunction[Any, Unit] = {
case message if !nestedReactsProcessors.isEmpty =>
val processors = nestedReactsProcessors.reverse
processors.head forward message
nestedReactsProcessors = processors.tail.reverse
}
protected def react: PartialFunction[Any, Unit]
protected def reactAgain(pf: PartialFunction[Any, Unit]) = nestedReactsProcessors ::= actor(pf)
protected def receive = processNestedReacts orElse react
}
*/
/**
* Implements the Transactor abstraction. E.g. a transactional actor.
* <p/>
@ -71,6 +88,8 @@ object Actor extends Logging {
val PORT = config.getInt("akka.remote.server.port", 9999)
val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false)
private[actor] val actorRefInCreation = new scala.util.DynamicVariable[Option[ActorRef]](None)
// FIXME remove next release
object Sender {
@deprecated("import Actor.Sender.Self is not needed anymore, just use 'actor ! msg'")
@ -783,6 +802,7 @@ sealed class ActorRef private[akka] () extends TransactionManagement {
}
private[this] def newActor: Actor = {
Actor.actorRefInCreation.value = Some(this)
val actor = actorFactory match {
case Left(Some(clazz)) =>
try {
@ -798,7 +818,6 @@ sealed class ActorRef private[akka] () extends TransactionManagement {
case _ =>
throw new ActorInitializationException("Can't create Actor, no Actor class or factory function in scope")
}
actor._selfOption = Some(this)
if (actor eq null) throw new ActorInitializationException("Actor instance passed to ActorRef can not be 'null'")
actor
}
@ -1096,16 +1115,22 @@ trait Actor extends Logging {
* Note: if you are using the 'self' field in the constructor of the Actor
* then you have to make the fields/operations that are using it 'lazy'.
*/
protected[akka] def self: ActorRef = _selfOption.getOrElse(throw new IllegalStateException(
"ActorRef for instance of Actor [" + getClass.getName + "] is not in scope." +
"\n\tAre you using 'self' within the constructor (the class body) of the Actor?" +
"\n\tIf so you have to refactor and make all fields that uses the 'self' reference lazy," +
"\n\tand move all operations that uses 'self' out of the constructor."))
def self: ActorRef = _selfSenderRef.getOrElse(throw new IllegalStateException(
"ActorRef for instance of actor [" + getClass.getName + "] is not in scope." +
"\n\tYou can not create an instance of an actor explicitly using 'new MyActor'." +
"\n\tYou have to use one of the factory methods in the 'Actor' object to create a new actor." +
"\n\tEither use:" +
"\n\t\t'val actor = Actor.newActor[MyActor]', or" +
"\n\t\t'val actor = Actor.newActor(() => new MyActor(..))'"))
/**
* For internal use only.
*/
protected[akka] implicit var _selfOption: Option[ActorRef] = None
protected implicit var _selfSenderRef: Option[ActorRef] = {
val ref = Actor.actorRefInCreation.value
Actor.actorRefInCreation.value = None
ref
}
/**
* Holds the hot swapped partial function.
@ -1116,6 +1141,13 @@ trait Actor extends Logging {
// ==== USER API ====
// ==================
/**
* Forwards the message and passes the original sender actor as the sender.
* <p/>
* Works with '!', '!!' and '!!!'.
*/
def forward(message: Any)(implicit sender: Some[ActorRef]) = self.forward(message)(sender)
/**
* Use to override the default dispatcher.
*/
@ -1449,7 +1481,9 @@ class ActorMessageInvoker private[akka] (val actorRef: ActorRef) extends Message
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
private[akka] class RemoteActorRef private (
// uuid: String, className: String, hostname: String, port: Int, timeOut: Long, isOnRemoteHost: Boolean) extends ActorRef {
uuid: String, className: String, hostname: String, port: Int, timeOut: Long) extends ActorRef {
val remoteClient = RemoteClient.clientFor(hostname, port)
override def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = {
@ -1492,7 +1526,9 @@ private[akka] class RemoteActorRef private (
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
private[akka] object RemoteActorRef {
def apply(uuid: String, className: String, hostname: String, port: Int, timeout: Long): ActorRef =
(new RemoteActorRef(uuid, className, hostname, port, timeout)).start
// def apply(uuid: String, className: String, hostname: String, port: Int, timeout: Long, isOnRemoteHost: Boolean): ActorRef =
// (new RemoteActorRef(uuid, className, hostname, port, timeout, isOnRemoteHost)).start
def apply(uuid: String, className: String, hostname: String, port: Int, timeout: Long): ActorRef =
(new RemoteActorRef(uuid, className, hostname, port, timeout)).start
}

View file

@ -101,7 +101,7 @@ class AgentException private[akka](message: String) extends RuntimeException(mes
sealed class Agent[T] private (initialValue: T) extends Transactor {
import Agent._
import Actor._
_selfOption = Some(newActor(() => this).start)
_selfSenderRef = Some(newActor(() => this).start)
log.debug("Starting up Agent [%s]", uuid)

View file

@ -8,7 +8,8 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRef}
/** A Dispatcher is a trait whose purpose is to route incoming messages to actors
*/
trait Dispatcher { self: Actor =>
trait Dispatcher { this: Actor =>
implicit val sender = Some(self)
protected def transform(msg: Any): Any = msg
@ -16,7 +17,7 @@ trait Dispatcher { self: Actor =>
protected def dispatch: PartialFunction[Any, Unit] = {
case a if routes.isDefinedAt(a) =>
if (self.self.replyTo.isDefined) routes(a).forward(transform(a))(Some(self.self))
if (self.replyTo.isDefined) routes(a) forward transform(a)
else routes(a).!(transform(a))(None)
}