diff --git a/akka-camel/src/main/scala/Producer.scala b/akka-camel/src/main/scala/Producer.scala index e82bb57997..2b7d053457 100644 --- a/akka-camel/src/main/scala/Producer.scala +++ b/akka-camel/src/main/scala/Producer.scala @@ -114,7 +114,7 @@ trait Producer { self: Actor => protected def produceAsync(msg: Any): Unit = { val cmsg = Message.canonicalize(msg) val sync = new ProducerResponseSender( - cmsg.headers(headersToCopy), this.sender, this.senderFuture, this) + cmsg.headers(headersToCopy), this.replyTo, this) template.asyncCallback(endpointUri, createInOutExchange.fromRequestMessage(cmsg), sync) } @@ -162,8 +162,7 @@ trait Producer { self: Actor => */ class ProducerResponseSender( headers: Map[String, Any], - sender: Option[Actor], - senderFuture: Option[CompletableFuture], + replyTo : Option[Either[Actor,CompletableFuture]], producer: Actor) extends Synchronization with Logging { implicit val producerActor = Some(producer) // the response sender @@ -180,14 +179,10 @@ class ProducerResponseSender( */ def onComplete(exchange: Exchange) = reply(exchange.toResponseMessage(headers)) - private def reply(message: Any) = { - sender match { - case Some(actor) => actor ! message - case None => senderFuture match { - case Some(future) => future.completeWithResult(message) - case None => log.warning("No destination for sending response") - } - } + private def reply(message: Any) = replyTo match { + case Some(Left(actor)) => actor ! message + case Some(Right(future)) => future.completeWithResult(message) + case _ => log.warning("No destination for sending response") } } diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 51fb20c87b..065423b6bc 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -274,23 +274,9 @@ trait Actor extends TransactionManagement with Logging { // ==================================== /** - * The 'sender' field holds the sender of the message currently being processed. - *

- * If the sender was an actor then it is defined as 'Some(senderActor)' and - * if the sender was of some other instance then it is defined as 'None'. - *

- * This sender reference can be used together with the '!' method for request/reply - * message exchanges and which is in many ways better than using the '!!' method - * which will make the sender wait for a reply using a *blocking* future. + * TODO: Document replyTo */ - protected var sender: Option[Actor] = None - - /** - * The 'senderFuture' field should normally not be touched by user code, which should instead use the 'reply' method. - * But it can be used for advanced use-cases when one might want to store away the future and - * resolve it later and/or somewhere else. - */ - protected var senderFuture: Option[CompletableFuture] = None + protected var replyTo: Option[Either[Actor,CompletableFuture]] = None // ==================================== // ==== USER CALLBACKS TO OVERRIDE ==== @@ -573,9 +559,11 @@ trait Actor extends TransactionManagement with Logging { if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") if (_isRunning) { val forwarder = sender.getOrElse(throw new IllegalStateException("Can't forward message when the forwarder/mediator is not an actor")) - if (forwarder.getSenderFuture.isDefined) postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, forwarder.getSenderFuture) - else if (forwarder.getSender.isDefined) postMessageToMailbox(message, forwarder.getSender) - else throw new IllegalStateException("Can't forward message when initial sender is not an actor") + forwarder.replyTo match { + case Some(Left(actor)) => postMessageToMailbox(message, Some(actor)) + case Some(Right(future)) => postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, Some(future)) + case _ => throw new IllegalStateException("Can't forward message when initial sender is not an actor") + } } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") } @@ -583,14 +571,10 @@ trait Actor extends TransactionManagement with Logging { * Use reply(..) to reply with a message to the original sender of the message currently * being processed. */ - protected[this] def reply(message: Any) = { - sender match { - case Some(senderActor) => - senderActor ! message - case None => - senderFuture match { - case None => - throw new IllegalStateException( + protected[this] def reply(message: Any) = replyTo match { + case Some(Left(actor)) => actor ! message + case Some(Right(future)) => future.completeWithResult(message) + case _ => throw new IllegalStateException( "\n\tNo sender in scope, can't reply. " + "\n\tYou have probably used the '!' method to either; " + "\n\t\t1. Send a message to a remote actor which does not have a contact address." + @@ -599,12 +583,8 @@ trait Actor extends TransactionManagement with Logging { "\n\tIf so, switch to '!!' (or remove '@oneway') which passes on an implicit future" + "\n\tthat will be bound by the argument passed to 'reply'." + "\n\tAlternatively, you can use setReplyToAddress to make sure the actor can be contacted over the network.") - case Some(future) => - future.completeWithResult(message) - } - } } - + /** * Get the dispatcher for this actor. */ @@ -793,10 +773,6 @@ trait Actor extends TransactionManagement with Logging { private[akka] def _resume = _isSuspended = false - private[akka] def getSender = sender - - private[akka] def getSenderFuture = senderFuture - private def spawnButDoNotStart[T <: Actor : Manifest] : T = { val actor = manifest[T].erasure.asInstanceOf[Class[T]].newInstance if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) { @@ -844,7 +820,7 @@ trait Actor extends TransactionManagement with Logging { RemoteProtocolBuilder.setMessage(message, requestBuilder) RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build, None) } else { - val invocation = new MessageInvocation(this, message, None, sender, transactionSet.get) + val invocation = new MessageInvocation(this, message, sender.map(Left(_)), transactionSet.get) if (_isEventBased) { _mailbox.add(invocation) if (_isSuspended) invocation.send @@ -877,7 +853,7 @@ trait Actor extends TransactionManagement with Logging { } else { val future = if (senderFuture.isDefined) senderFuture.get else new DefaultCompletableFuture(timeout) - val invocation = new MessageInvocation(this, message, Some(future), None, transactionSet.get) + val invocation = new MessageInvocation(this, message, Some(Right(future)), transactionSet.get) if (_isEventBased) { _mailbox.add(invocation) invocation.send @@ -911,8 +887,7 @@ trait Actor extends TransactionManagement with Logging { setTransactionSet(messageHandle.transactionSet) val message = messageHandle.message //serializeMessage(messageHandle.message) - senderFuture = messageHandle.future - sender = messageHandle.sender + replyTo = messageHandle.replyTo try { if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function @@ -923,7 +898,7 @@ trait Actor extends TransactionManagement with Logging { Actor.log.error(e, "Could not invoke actor [%s]", this) // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client if (_supervisor.isDefined) _supervisor.get ! Exit(this, e) - if (senderFuture.isDefined) senderFuture.get.completeWithException(this, e) + if (replyTo.isDefined && replyTo.get.isRight) replyTo.get.right.get.completeWithException(this, e) } finally { clearTransaction } @@ -944,8 +919,7 @@ trait Actor extends TransactionManagement with Logging { setTransactionSet(txSet) val message = messageHandle.message //serializeMessage(messageHandle.message) - senderFuture = messageHandle.future - sender = messageHandle.sender + replyTo = messageHandle.replyTo def proceed = { if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function @@ -970,7 +944,7 @@ trait Actor extends TransactionManagement with Logging { } catch { case e: IllegalStateException => {} } Actor.log.error(e, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message) - if (senderFuture.isDefined) senderFuture.get.completeWithException(this, e) + if (replyTo.isDefined && replyTo.get.isRight) replyTo.get.right.get.completeWithException(this, e) clearTransaction if (topLevelTransaction) clearTransactionSet diff --git a/akka-core/src/main/scala/dispatch/Reactor.scala b/akka-core/src/main/scala/dispatch/Reactor.scala index 8aee0075ad..f9db74190f 100644 --- a/akka-core/src/main/scala/dispatch/Reactor.scala +++ b/akka-core/src/main/scala/dispatch/Reactor.scala @@ -15,8 +15,7 @@ import org.multiverse.commitbarriers.CountDownCommitBarrier final class MessageInvocation(val receiver: Actor, val message: Any, - val future: Option[CompletableFuture], - val sender: Option[Actor], + val replyTo : Option[Either[Actor,CompletableFuture]], val transactionSet: Option[CountDownCommitBarrier]) { if (receiver eq null) throw new IllegalArgumentException("receiver is null") @@ -42,8 +41,7 @@ final class MessageInvocation(val receiver: Actor, "MessageInvocation[" + "\n\tmessage = " + message + "\n\treceiver = " + receiver + - "\n\tsender = " + sender + - "\n\tfuture = " + future + + "\n\treplyTo = " + replyTo + "\n\ttransactionSet = " + transactionSet + "\n]" } diff --git a/akka-core/src/main/scala/stm/DataFlowVariable.scala b/akka-core/src/main/scala/stm/DataFlowVariable.scala index 742d074129..7b2084aec6 100644 --- a/akka-core/src/main/scala/stm/DataFlowVariable.scala +++ b/akka-core/src/main/scala/stm/DataFlowVariable.scala @@ -84,8 +84,14 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture def receive = { case Get => val ref = dataFlow.value.get - if (ref.isDefined) reply(ref.get) - else readerFuture = senderFuture + if (ref.isDefined) + reply(ref.get) + else { + readerFuture = replyTo match { + case Some(Right(future)) => Some(future) + case _ => None + } + } case Set(v) => if (readerFuture.isDefined) readerFuture.get.completeWithResult(v) case Exit => exit } diff --git a/akka-core/src/test/scala/ActorFireForgetRequestReplySpec.scala b/akka-core/src/test/scala/ActorFireForgetRequestReplySpec.scala index 0ed4698033..e4789bc2a6 100644 --- a/akka-core/src/test/scala/ActorFireForgetRequestReplySpec.scala +++ b/akka-core/src/test/scala/ActorFireForgetRequestReplySpec.scala @@ -18,7 +18,7 @@ class ActorFireForgetRequestReplySpec extends JUnitSuite { def receive = { case "Send" => reply("Reply") - case "SendImplicit" => sender.get ! "ReplyImplicit" + case "SendImplicit" => replyTo.get.left.get ! "ReplyImplicit" } } diff --git a/akka-core/src/test/scala/ForwardActorSpec.scala b/akka-core/src/test/scala/ForwardActorSpec.scala index 575bbab7dd..581a189d70 100644 --- a/akka-core/src/test/scala/ForwardActorSpec.scala +++ b/akka-core/src/test/scala/ForwardActorSpec.scala @@ -15,7 +15,7 @@ class ForwardActorSpec extends JUnitSuite { class ReceiverActor extends Actor { def receive = { case "SendBang" => { - ForwardState.sender = sender.get + ForwardState.sender = replyTo.get.left.get ForwardState.finished.countDown } case "SendBangBang" => reply("SendBangBang") diff --git a/akka-core/src/test/scala/PerformanceSpec.scala b/akka-core/src/test/scala/PerformanceSpec.scala index b10eb4de62..742a560f06 100644 --- a/akka-core/src/test/scala/PerformanceSpec.scala +++ b/akka-core/src/test/scala/PerformanceSpec.scala @@ -55,6 +55,11 @@ class PerformanceSpec extends JUnitSuite { i = i + 1 } } + + protected def sender : Option[Actor] = replyTo match { + case Some(Left(actor)) => Some(actor) + case _ => None + } def receive = { case MeetingCount(i) => { @@ -97,6 +102,11 @@ class PerformanceSpec extends JUnitSuite { mall ! Meet(this, colour) r } + + protected def sender : Option[Actor] = replyTo match { + case Some(Left(actor)) => Some(actor) + case _ => None + } override def receive: PartialFunction[Any, Unit] = { case Meet(from, otherColour) => diff --git a/akka-core/src/test/scala/ThreadBasedDispatcherSpec.scala b/akka-core/src/test/scala/ThreadBasedDispatcherSpec.scala index d382e6ff06..91edac5d9e 100644 --- a/akka-core/src/test/scala/ThreadBasedDispatcherSpec.scala +++ b/akka-core/src/test/scala/ThreadBasedDispatcherSpec.scala @@ -57,7 +57,7 @@ class ThreadBasedDispatcherSpec extends JUnitSuite { val dispatcher = new ThreadBasedDispatcher("name", new TestMessageHandle(handleLatch)) dispatcher.start for (i <- 0 until 100) { - dispatcher.dispatch(new MessageInvocation(key1, new Object, None, None, None)) + dispatcher.dispatch(new MessageInvocation(key1, new Object, None, None)) } assert(handleLatch.await(5, TimeUnit.SECONDS)) assert(!threadingIssueDetected.get) @@ -78,7 +78,7 @@ class ThreadBasedDispatcherSpec extends JUnitSuite { }) dispatcher.start for (i <- 0 until 100) { - dispatcher.dispatch(new MessageInvocation(key1, i, None, None, None)) + dispatcher.dispatch(new MessageInvocation(key1, i, None, None)) } assert(handleLatch.await(5, TimeUnit.SECONDS)) assert(!threadingIssueDetected.get) diff --git a/akka-patterns/src/main/scala/Patterns.scala b/akka-patterns/src/main/scala/Patterns.scala index b477adb10e..fc50292676 100644 --- a/akka-patterns/src/main/scala/Patterns.scala +++ b/akka-patterns/src/main/scala/Patterns.scala @@ -48,7 +48,7 @@ trait Dispatcher { self: Actor => protected def dispatch: PartialFunction[Any, Unit] = { case a if routes.isDefinedAt(a) => - if (self.sender.isDefined) routes(a) forward transform(a) + if (self.replyTo.isDefined) routes(a) forward transform(a) else routes(a) ! transform(a) }