From 072bbe461cc705343fb8bbef7c16f3d5e8a800a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Sat, 8 May 2010 12:13:44 +0200 Subject: [PATCH] Added isOrRemoteNode field to ActorRef --- akka-camel/src/main/scala/Producer.scala | 2 +- akka-core/src/main/scala/actor/Actor.scala | 54 +++++++++++++++---- akka-core/src/main/scala/actor/Agent.scala | 2 +- .../src/main/scala/routing/Routers.scala | 5 +- 4 files changed, 50 insertions(+), 13 deletions(-) diff --git a/akka-camel/src/main/scala/Producer.scala b/akka-camel/src/main/scala/Producer.scala index 743b3dfe06..8e4b8b5064 100644 --- a/akka-camel/src/main/scala/Producer.scala +++ b/akka-camel/src/main/scala/Producer.scala @@ -114,7 +114,7 @@ trait Producer { self: Actor => protected def produceAsync(msg: Any): Unit = { val cmsg = Message.canonicalize(msg) val sync = new ProducerResponseSender( - cmsg.headers(headersToCopy), this.replyTo, this) + cmsg.headers(headersToCopy), self.replyTo, this) template.asyncCallback(endpointUri, createInOutExchange.fromRequestMessage(cmsg), sync) } diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index df3b237029..4254e98191 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -25,6 +25,23 @@ import java.net.InetSocketAddress import java.util.concurrent.locks.{Lock, ReentrantLock} import java.util.{HashSet => JHashSet} +/* +trait ActorWithNestedReceive extends Actor { + import Actor.actor + private var nestedReactsProcessors: List[ActorRef] = Nil + private val processNestedReacts: PartialFunction[Any, Unit] = { + case message if !nestedReactsProcessors.isEmpty => + val processors = nestedReactsProcessors.reverse + processors.head forward message + nestedReactsProcessors = processors.tail.reverse + } + + protected def react: PartialFunction[Any, Unit] + protected def reactAgain(pf: PartialFunction[Any, Unit]) = nestedReactsProcessors ::= actor(pf) + protected def receive = processNestedReacts orElse react +} +*/ + /** * Implements the Transactor abstraction. E.g. a transactional actor. *

@@ -71,6 +88,8 @@ object Actor extends Logging { val PORT = config.getInt("akka.remote.server.port", 9999) val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false) + private[actor] val actorRefInCreation = new scala.util.DynamicVariable[Option[ActorRef]](None) + // FIXME remove next release object Sender { @deprecated("import Actor.Sender.Self is not needed anymore, just use 'actor ! msg'") @@ -783,6 +802,7 @@ sealed class ActorRef private[akka] () extends TransactionManagement { } private[this] def newActor: Actor = { + Actor.actorRefInCreation.value = Some(this) val actor = actorFactory match { case Left(Some(clazz)) => try { @@ -798,7 +818,6 @@ sealed class ActorRef private[akka] () extends TransactionManagement { case _ => throw new ActorInitializationException("Can't create Actor, no Actor class or factory function in scope") } - actor._selfOption = Some(this) if (actor eq null) throw new ActorInitializationException("Actor instance passed to ActorRef can not be 'null'") actor } @@ -1096,16 +1115,22 @@ trait Actor extends Logging { * Note: if you are using the 'self' field in the constructor of the Actor * then you have to make the fields/operations that are using it 'lazy'. */ - protected[akka] def self: ActorRef = _selfOption.getOrElse(throw new IllegalStateException( - "ActorRef for instance of Actor [" + getClass.getName + "] is not in scope." + - "\n\tAre you using 'self' within the constructor (the class body) of the Actor?" + - "\n\tIf so you have to refactor and make all fields that uses the 'self' reference lazy," + - "\n\tand move all operations that uses 'self' out of the constructor.")) + def self: ActorRef = _selfSenderRef.getOrElse(throw new IllegalStateException( + "ActorRef for instance of actor [" + getClass.getName + "] is not in scope." + + "\n\tYou can not create an instance of an actor explicitly using 'new MyActor'." + + "\n\tYou have to use one of the factory methods in the 'Actor' object to create a new actor." + + "\n\tEither use:" + + "\n\t\t'val actor = Actor.newActor[MyActor]', or" + + "\n\t\t'val actor = Actor.newActor(() => new MyActor(..))'")) /** * For internal use only. */ - protected[akka] implicit var _selfOption: Option[ActorRef] = None + protected implicit var _selfSenderRef: Option[ActorRef] = { + val ref = Actor.actorRefInCreation.value + Actor.actorRefInCreation.value = None + ref + } /** * Holds the hot swapped partial function. @@ -1116,6 +1141,13 @@ trait Actor extends Logging { // ==== USER API ==== // ================== + /** + * Forwards the message and passes the original sender actor as the sender. + *

+ * Works with '!', '!!' and '!!!'. + */ + def forward(message: Any)(implicit sender: Some[ActorRef]) = self.forward(message)(sender) + /** * Use to override the default dispatcher. */ @@ -1449,7 +1481,9 @@ class ActorMessageInvoker private[akka] (val actorRef: ActorRef) extends Message * @author Jonas Bonér */ private[akka] class RemoteActorRef private ( +// uuid: String, className: String, hostname: String, port: Int, timeOut: Long, isOnRemoteHost: Boolean) extends ActorRef { uuid: String, className: String, hostname: String, port: Int, timeOut: Long) extends ActorRef { + val remoteClient = RemoteClient.clientFor(hostname, port) override def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = { @@ -1492,7 +1526,9 @@ private[akka] class RemoteActorRef private ( * @author Jonas Bonér */ private[akka] object RemoteActorRef { - def apply(uuid: String, className: String, hostname: String, port: Int, timeout: Long): ActorRef = - (new RemoteActorRef(uuid, className, hostname, port, timeout)).start + // def apply(uuid: String, className: String, hostname: String, port: Int, timeout: Long, isOnRemoteHost: Boolean): ActorRef = + // (new RemoteActorRef(uuid, className, hostname, port, timeout, isOnRemoteHost)).start + def apply(uuid: String, className: String, hostname: String, port: Int, timeout: Long): ActorRef = + (new RemoteActorRef(uuid, className, hostname, port, timeout)).start } diff --git a/akka-core/src/main/scala/actor/Agent.scala b/akka-core/src/main/scala/actor/Agent.scala index 2949c41706..419f3ef218 100644 --- a/akka-core/src/main/scala/actor/Agent.scala +++ b/akka-core/src/main/scala/actor/Agent.scala @@ -101,7 +101,7 @@ class AgentException private[akka](message: String) extends RuntimeException(mes sealed class Agent[T] private (initialValue: T) extends Transactor { import Agent._ import Actor._ - _selfOption = Some(newActor(() => this).start) + _selfSenderRef = Some(newActor(() => this).start) log.debug("Starting up Agent [%s]", uuid) diff --git a/akka-core/src/main/scala/routing/Routers.scala b/akka-core/src/main/scala/routing/Routers.scala index 991820d314..a26d9addcd 100644 --- a/akka-core/src/main/scala/routing/Routers.scala +++ b/akka-core/src/main/scala/routing/Routers.scala @@ -8,7 +8,8 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRef} /** A Dispatcher is a trait whose purpose is to route incoming messages to actors */ -trait Dispatcher { self: Actor => +trait Dispatcher { this: Actor => + implicit val sender = Some(self) protected def transform(msg: Any): Any = msg @@ -16,7 +17,7 @@ trait Dispatcher { self: Actor => protected def dispatch: PartialFunction[Any, Unit] = { case a if routes.isDefinedAt(a) => - if (self.self.replyTo.isDefined) routes(a).forward(transform(a))(Some(self.self)) + if (self.replyTo.isDefined) routes(a) forward transform(a) else routes(a).!(transform(a))(None) }