diff --git a/akka-camel/src/main/scala/Producer.scala b/akka-camel/src/main/scala/Producer.scala index 8bc0a82611..72fd88658f 100644 --- a/akka-camel/src/main/scala/Producer.scala +++ b/akka-camel/src/main/scala/Producer.scala @@ -114,7 +114,7 @@ trait Producer { this: Actor => protected def produceAsync(msg: Any): Unit = { val cmsg = Message.canonicalize(msg) val sync = new ProducerResponseSender( - cmsg.headers(headersToCopy), self.replyTo, this) + cmsg.headers(headersToCopy), self.sender, self.senderFuture, this) template.asyncCallback(endpointUri, createInOutExchange.fromRequestMessage(cmsg), sync) } @@ -162,7 +162,8 @@ trait Producer { this: Actor => */ class ProducerResponseSender( headers: Map[String, Any], - replyTo : Option[Either[ActorRef, CompletableFuture[Any]]], + sender: Option[ActorRef], + senderFuture: Option[CompletableFuture[Any]], producer: Actor) extends Synchronization with Logging { implicit val producerActor = Some(producer) // the response sender @@ -179,10 +180,10 @@ class ProducerResponseSender( */ def onComplete(exchange: Exchange) = reply(exchange.toResponseMessage(headers)) - private def reply(message: Any) = replyTo match { - case Some(Left(actor)) => actor ! message - case Some(Right(future)) => future.completeWithResult(message) - case _ => log.warning("No destination for sending response") + private def reply(message: Any) = { + if (senderFuture.isDefined) senderFuture.get completeWithResult message + else if (sender.isDefined) sender.get ! message + else log.warning("No destination for sending response") } } diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index 5a5f08adc7..01da50c4b0 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -208,10 +208,17 @@ trait ActorRef extends TransactionManagement { * - Is Some(Left(Actor)) if sender is an actor * - Is Some(Right(CompletableFuture)) if sender is holding on to a Future for the result */ - protected[this] var _replyTo: Option[Either[ActorRef, CompletableFuture[Any]]] = None - protected[akka] def replyTo: Option[Either[ActorRef, CompletableFuture[Any]]] = guard.withGuard { _replyTo } - protected[akka] def replyTo_=(rt: Option[Either[ActorRef, CompletableFuture[Any]]]) = guard.withGuard { _replyTo = rt } - +// protected[this] var _replyTo: Option[Either[ActorRef, CompletableFuture[Any]]] = None +// protected[akka] def replyTo: Option[Either[ActorRef, CompletableFuture[Any]]] = guard.withGuard { _replyTo } +// protected[akka] def replyTo_=(rt: Option[Either[ActorRef, CompletableFuture[Any]]]) = guard.withGuard { _replyTo = rt } + + protected[akka] var _sender: Option[ActorRef] = None + protected[akka] var _senderFuture: Option[CompletableFuture[Any]] = None + protected[akka] def sender: Option[ActorRef] = guard.withGuard { _sender } + protected[akka] def senderFuture: Option[CompletableFuture[Any]] = guard.withGuard { _senderFuture } + protected[akka] def sender_=(s: Option[ActorRef]) = guard.withGuard { _sender = s} + protected[akka] def senderFuture_=(sf: Option[CompletableFuture[Any]]) = guard.withGuard { _senderFuture = sf} + /** * Is the actor being restarted? */ @@ -269,9 +276,9 @@ trait ActorRef extends TransactionManagement { * If you are sending messages using !! then you have to use self.reply(..) * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ - def !![T](message: Any, timeout: Long): Option[T] = { + def !![T](message: Any, timeout: Long = this.timeout)(implicit sender: Option[ActorRef] = None): Option[T] = { if (isRunning) { - val future = postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, None) + val future = postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, sender, None) val isActiveObject = message.isInstanceOf[Invocation] if (isActiveObject && message.asInstanceOf[Invocation].isVoid) { future.asInstanceOf[CompletableFuture[Option[_]]].completeWithResult(None) @@ -283,7 +290,6 @@ trait ActorRef extends TransactionManagement { if (isActiveObject) throw e else None } - if (future.exception.isDefined) throw future.exception.get._2 else future.result } else throw new ActorInitializationException( @@ -304,7 +310,7 @@ trait ActorRef extends TransactionManagement { * If you are sending messages using !! then you have to use self.reply(..) * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ - def !![T](message: Any)(implicit sender: Option[ActorRef] = None): Option[T] = !![T](message, timeout) +// def !![T](message: Any)(implicit sender: Option[ActorRef] = None): Option[T] = !![T](message, timeout) /** * Sends a message asynchronously returns a future holding the eventual reply message. @@ -315,8 +321,8 @@ trait ActorRef extends TransactionManagement { * If you are sending messages using !!! then you have to use self.reply(..) * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ - def !!![T](message: Any): Future[T] = { - if (isRunning) postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, None) + def !!![T](message: Any)(implicit sender: Option[ActorRef] = None): Future[T] = { + if (isRunning) postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, sender, None) else throw new ActorInitializationException( "Actor has not been started, you need to invoke 'actor.start' before using it") } @@ -328,11 +334,10 @@ trait ActorRef extends TransactionManagement { */ def forward(message: Any)(implicit sender: Some[ActorRef]) = { if (isRunning) { - sender.get.replyTo match { - case Some(Left(actorRef)) => postMessageToMailbox(message, Some(actorRef)) - case Some(Right(future)) => postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, Some(future)) - case _ => throw new IllegalStateException("Can't forward message when initial sender is not an actor") - } + if (sender.get.senderFuture.isDefined) postMessageToMailboxAndCreateFutureResultWithTimeout( + message, timeout, sender.get.sender, sender.get.senderFuture) + else if (sender.get.sender.isDefined) postMessageToMailbox(message, Some(sender.get.sender.get)) + else throw new IllegalStateException("Can't forward message when initial sender is not an actor") } else throw new ActorInitializationException("Actor has not been started, you need to invoke 'actor.start' before using it") } @@ -358,12 +363,16 @@ trait ActorRef extends TransactionManagement { *

* Returns true if reply was sent, and false if unable to determine what to reply to. */ - def reply_?(message: Any): Boolean = replyTo match { - case Some(Left(actor)) => actor ! message; true - case Some(Right(future: Future[Any])) => future completeWithResult message; true - case _ => false + def reply_?(message: Any): Boolean = { + if (senderFuture.isDefined) { + senderFuture.get completeWithResult message + true + } else if (sender.isDefined) { + sender.get ! message + true + } else false } - + /** * Serializes the ActorRef instance into a byte array (Array[Byte]). */ @@ -527,6 +536,7 @@ trait ActorRef extends TransactionManagement { protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( message: Any, timeout: Long, + senderOption: Option[ActorRef], senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] protected[this] def actorInstance: AtomicReference[Actor] @@ -596,7 +606,7 @@ sealed class LocalActorRef private[akka]( @volatile private var runActorInitialization = false // Needed to be able to null out the 'val self: ActorRef' member variables to make the Actor - // instance eligeble for garbage collection + // instance elegible for garbage collection private val actorSelfFields = findActorSelfField(actor.getClass) if (runActorInitialization) initializeActorInstance @@ -893,6 +903,7 @@ sealed class LocalActorRef private[akka]( } protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = { + sender = senderOption joinTransaction(message) if (remoteAddress.isDefined) { @@ -912,7 +923,7 @@ sealed class LocalActorRef private[akka]( RemoteProtocolBuilder.setMessage(message, requestBuilder) RemoteClient.clientFor(remoteAddress.get).send[Any](requestBuilder.build, None) } else { - val invocation = new MessageInvocation(this, message, senderOption.map(Left(_)), transactionSet.get) + val invocation = new MessageInvocation(this, message, senderOption, None, transactionSet.get) if (dispatcher.usesActorMailbox) { _mailbox.add(invocation) invocation.send @@ -923,7 +934,9 @@ sealed class LocalActorRef private[akka]( protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( message: Any, timeout: Long, + senderOption: Option[ActorRef], senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { + sender = senderOption joinTransaction(message) if (remoteAddress.isDefined) { @@ -949,7 +962,7 @@ sealed class LocalActorRef private[akka]( val future = if (senderFuture.isDefined) senderFuture.get else new DefaultCompletableFuture[T](timeout) val invocation = new MessageInvocation( - this, message, Some(Right(future.asInstanceOf[CompletableFuture[Any]])), transactionSet.get) + this, message, senderOption, Some(future.asInstanceOf[CompletableFuture[Any]]), transactionSet.get) if (dispatcher.usesActorMailbox) _mailbox.add(invocation) invocation.send future @@ -968,6 +981,8 @@ sealed class LocalActorRef private[akka]( */ protected[akka] def invoke(messageHandle: MessageInvocation) = actor.synchronized { try { + sender = messageHandle.sender + senderFuture = messageHandle.senderFuture if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle) else dispatch(messageHandle) } catch { @@ -978,11 +993,8 @@ sealed class LocalActorRef private[akka]( } private def dispatch[T](messageHandle: MessageInvocation) = { - setTransactionSet(messageHandle.transactionSet) - val message = messageHandle.message //serializeMessage(messageHandle.message) - _replyTo = messageHandle.replyTo - + setTransactionSet(messageHandle.transactionSet) try { if (actor.base.isDefinedAt(message)) actor.base(message) // invoke user actor's receive partial function else throw new IllegalArgumentException( @@ -993,16 +1005,14 @@ sealed class LocalActorRef private[akka]( Actor.log.error(e, "Could not invoke actor [%s]", toString) // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client if (_supervisor.isDefined) _supervisor.get ! Exit(this, e) - replyTo match { - case Some(Right(future)) => future.completeWithException(this, e) - case _ => - } + senderFuture.foreach(_.completeWithException(this, e)) } finally { clearTransaction } } private def transactionalDispatch[T](messageHandle: MessageInvocation) = { + val message = messageHandle.message //serializeMessage(messageHandle.message) var topLevelTransaction = false val txSet: Option[CountDownCommitBarrier] = if (messageHandle.transactionSet.isDefined) messageHandle.transactionSet @@ -1017,9 +1027,6 @@ sealed class LocalActorRef private[akka]( } setTransactionSet(txSet) - val message = messageHandle.message //serializeMessage(messageHandle.message) - _replyTo = messageHandle.replyTo - def proceed = { if (actor.base.isDefinedAt(message)) actor.base(message) // invoke user actor's receive partial function else throw new IllegalArgumentException( @@ -1044,10 +1051,7 @@ sealed class LocalActorRef private[akka]( } catch { case e: IllegalStateException => {} } Actor.log.error(e, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message) - replyTo match { - case Some(Right(future)) => future.completeWithException(this, e) - case _ => - } + senderFuture.foreach(_.completeWithException(this, e)) clearTransaction if (topLevelTransaction) clearTransactionSet @@ -1238,6 +1242,7 @@ private[akka] case class RemoteActorRef private[akka] ( def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( message: Any, timeout: Long, + senderOption: Option[ActorRef], senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { val requestBuilder = RemoteRequestProtocol.newBuilder .setId(RemoteRequestProtocolIdFactory.nextId) diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index 1d28d1eb04..7f95ebcc9b 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -143,12 +143,10 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess private def donateMessage(receiver: ActorRef, thief: ActorRef): Boolean = { val donated = receiver.mailbox.pollLast if (donated ne null) { - donated.replyTo match { - case None => thief.self.postMessageToMailbox(donated.message, None) - case Some(Left(actor)) => thief.self.postMessageToMailbox(donated.message, Some(actor.asInstanceOf[ActorRef])) - case Some(Right(future)) => thief.self.postMessageToMailboxAndCreateFutureResultWithTimeout[Any]( - donated.message, receiver.timeout, Some(future.asInstanceOf[CompletableFuture[Any]])) - } + if (donated.senderFuture.isDefined) thief.self.postMessageToMailboxAndCreateFutureResultWithTimeout[Any]( + donated.message, receiver.timeout, donated.sender, donated.senderFuture) + else if (donated.sender.isDefined) thief.self.postMessageToMailbox(donated.message, donated.sender) + else thief.self.postMessageToMailbox(donated.message, None) true } else false } diff --git a/akka-core/src/main/scala/dispatch/MessageHandling.scala b/akka-core/src/main/scala/dispatch/MessageHandling.scala index 3ba6e319df..8e094fe108 100644 --- a/akka-core/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-core/src/main/scala/dispatch/MessageHandling.scala @@ -15,7 +15,8 @@ import org.multiverse.commitbarriers.CountDownCommitBarrier final class MessageInvocation(val receiver: ActorRef, val message: Any, - val replyTo : Option[Either[ActorRef, CompletableFuture[Any]]], + val sender: Option[ActorRef], + val senderFuture: Option[CompletableFuture[Any]], val transactionSet: Option[CountDownCommitBarrier]) { if (receiver eq null) throw new IllegalArgumentException("receiver is null") @@ -41,7 +42,8 @@ final class MessageInvocation(val receiver: ActorRef, "MessageInvocation[" + "\n\tmessage = " + message + "\n\treceiver = " + receiver + - "\n\treplyTo = " + replyTo + + "\n\tsender = " + sender + + "\n\tsenderFuture = " + senderFuture + "\n\ttransactionSet = " + transactionSet + "\n]" } diff --git a/akka-core/src/main/scala/routing/Routers.scala b/akka-core/src/main/scala/routing/Routers.scala index a4b6978250..31402dab5a 100644 --- a/akka-core/src/main/scala/routing/Routers.scala +++ b/akka-core/src/main/scala/routing/Routers.scala @@ -6,27 +6,29 @@ package se.scalablesolutions.akka.patterns import se.scalablesolutions.akka.actor.{Actor, ActorRef} -/** A Dispatcher is a trait whose purpose is to route incoming messages to actors +/** + * A Dispatcher is a trait whose purpose is to route incoming messages to actors. */ trait Dispatcher { this: Actor => -// implicit val sender = Some(self) protected def transform(msg: Any): Any = msg protected def routes: PartialFunction[Any, ActorRef] protected def dispatch: Receive = { - case a if routes.isDefinedAt(a) => - if (self.replyTo.isDefined) routes(a).forward(transform(a))(Some(self)) - else routes(a).!(transform(a))(None) + case a if routes.isDefinedAt(a) => + if (isSenderDefined) routes(a).forward(transform(a))(someSelf) + else routes(a).!(transform(a))(None) } def receive = dispatch + + private def isSenderDefined = self.senderFuture.isDefined || self.sender.isDefined } -/** A LoadBalancer is a specialized kind of Dispatcher, - * that is supplied an InfiniteIterator of targets - * to dispatch incoming messages to +/** + * A LoadBalancer is a specialized kind of Dispatcher, that is supplied an InfiniteIterator of targets + * to dispatch incoming messages to. */ trait LoadBalancer extends Dispatcher { self: Actor => protected def seq: InfiniteIterator[ActorRef] diff --git a/akka-core/src/main/scala/stm/DataFlowVariable.scala b/akka-core/src/main/scala/stm/DataFlowVariable.scala index a1256d715f..752c71cead 100644 --- a/akka-core/src/main/scala/stm/DataFlowVariable.scala +++ b/akka-core/src/main/scala/stm/DataFlowVariable.scala @@ -83,14 +83,8 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture def receive = { case Get => val ref = dataFlow.value.get - if (ref.isDefined) - self.reply(ref.get) - else { - readerFuture = self.replyTo match { - case Some(Right(future)) => Some(future.asInstanceOf[CompletableFuture[T]]) - case _ => None - } - } + if (ref.isDefined) self.reply(ref.get) + else readerFuture = self.senderFuture.asInstanceOf[Option[CompletableFuture[T]]] case Set(v:T) => if (readerFuture.isDefined) readerFuture.get.completeWithResult(v) case Exit => exit } diff --git a/akka-core/src/test/scala/ActorFireForgetRequestReplySpec.scala b/akka-core/src/test/scala/ActorFireForgetRequestReplySpec.scala index cabe94e080..7c3a827ab5 100644 --- a/akka-core/src/test/scala/ActorFireForgetRequestReplySpec.scala +++ b/akka-core/src/test/scala/ActorFireForgetRequestReplySpec.scala @@ -15,7 +15,7 @@ object ActorFireForgetRequestReplySpec { case "Send" => self.reply("Reply") case "SendImplicit" => - self.replyTo.get.left.get ! "ReplyImplicit" + self.sender.get ! "ReplyImplicit" } } diff --git a/akka-core/src/test/scala/ForwardActorSpec.scala b/akka-core/src/test/scala/ForwardActorSpec.scala index 76f4808706..d8813a3fb8 100644 --- a/akka-core/src/test/scala/ForwardActorSpec.scala +++ b/akka-core/src/test/scala/ForwardActorSpec.scala @@ -15,7 +15,7 @@ object ForwardActorSpec { val latch = new CountDownLatch(1) def receive = { case "SendBang" => { - ForwardState.sender = Some(self.replyTo.get.left.get) + ForwardState.sender = self.sender latch.countDown } case "SendBangBang" => self.reply("SendBangBang")