Re-introducing 'sender' and 'senderFuture' references. Now 'sender' is available both for !! and !!! message sends

This commit is contained in:
Jonas Bonér 2010-05-19 10:09:30 +02:00
parent feef59f845
commit b36dc5cf79
8 changed files with 72 additions and 70 deletions

View file

@ -114,7 +114,7 @@ trait Producer { this: Actor =>
protected def produceAsync(msg: Any): Unit = { protected def produceAsync(msg: Any): Unit = {
val cmsg = Message.canonicalize(msg) val cmsg = Message.canonicalize(msg)
val sync = new ProducerResponseSender( val sync = new ProducerResponseSender(
cmsg.headers(headersToCopy), self.replyTo, this) cmsg.headers(headersToCopy), self.sender, self.senderFuture, this)
template.asyncCallback(endpointUri, createInOutExchange.fromRequestMessage(cmsg), sync) template.asyncCallback(endpointUri, createInOutExchange.fromRequestMessage(cmsg), sync)
} }
@ -162,7 +162,8 @@ trait Producer { this: Actor =>
*/ */
class ProducerResponseSender( class ProducerResponseSender(
headers: Map[String, Any], headers: Map[String, Any],
replyTo : Option[Either[ActorRef, CompletableFuture[Any]]], sender: Option[ActorRef],
senderFuture: Option[CompletableFuture[Any]],
producer: Actor) extends Synchronization with Logging { producer: Actor) extends Synchronization with Logging {
implicit val producerActor = Some(producer) // the response sender implicit val producerActor = Some(producer) // the response sender
@ -179,10 +180,10 @@ class ProducerResponseSender(
*/ */
def onComplete(exchange: Exchange) = reply(exchange.toResponseMessage(headers)) def onComplete(exchange: Exchange) = reply(exchange.toResponseMessage(headers))
private def reply(message: Any) = replyTo match { private def reply(message: Any) = {
case Some(Left(actor)) => actor ! message if (senderFuture.isDefined) senderFuture.get completeWithResult message
case Some(Right(future)) => future.completeWithResult(message) else if (sender.isDefined) sender.get ! message
case _ => log.warning("No destination for sending response") else log.warning("No destination for sending response")
} }
} }

View file

@ -208,9 +208,16 @@ trait ActorRef extends TransactionManagement {
* - Is Some(Left(Actor)) if sender is an actor * - Is Some(Left(Actor)) if sender is an actor
* - Is Some(Right(CompletableFuture)) if sender is holding on to a Future for the result * - Is Some(Right(CompletableFuture)) if sender is holding on to a Future for the result
*/ */
protected[this] var _replyTo: Option[Either[ActorRef, CompletableFuture[Any]]] = None // protected[this] var _replyTo: Option[Either[ActorRef, CompletableFuture[Any]]] = None
protected[akka] def replyTo: Option[Either[ActorRef, CompletableFuture[Any]]] = guard.withGuard { _replyTo } // protected[akka] def replyTo: Option[Either[ActorRef, CompletableFuture[Any]]] = guard.withGuard { _replyTo }
protected[akka] def replyTo_=(rt: Option[Either[ActorRef, CompletableFuture[Any]]]) = guard.withGuard { _replyTo = rt } // protected[akka] def replyTo_=(rt: Option[Either[ActorRef, CompletableFuture[Any]]]) = guard.withGuard { _replyTo = rt }
protected[akka] var _sender: Option[ActorRef] = None
protected[akka] var _senderFuture: Option[CompletableFuture[Any]] = None
protected[akka] def sender: Option[ActorRef] = guard.withGuard { _sender }
protected[akka] def senderFuture: Option[CompletableFuture[Any]] = guard.withGuard { _senderFuture }
protected[akka] def sender_=(s: Option[ActorRef]) = guard.withGuard { _sender = s}
protected[akka] def senderFuture_=(sf: Option[CompletableFuture[Any]]) = guard.withGuard { _senderFuture = sf}
/** /**
* Is the actor being restarted? * Is the actor being restarted?
@ -269,9 +276,9 @@ trait ActorRef extends TransactionManagement {
* If you are sending messages using <code>!!</code> then you <b>have to</b> use <code>self.reply(..)</code> * If you are sending messages using <code>!!</code> then you <b>have to</b> use <code>self.reply(..)</code>
* to send a reply message to the original sender. If not then the sender will block until the timeout expires. * to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/ */
def !![T](message: Any, timeout: Long): Option[T] = { def !![T](message: Any, timeout: Long = this.timeout)(implicit sender: Option[ActorRef] = None): Option[T] = {
if (isRunning) { if (isRunning) {
val future = postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, None) val future = postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, sender, None)
val isActiveObject = message.isInstanceOf[Invocation] val isActiveObject = message.isInstanceOf[Invocation]
if (isActiveObject && message.asInstanceOf[Invocation].isVoid) { if (isActiveObject && message.asInstanceOf[Invocation].isVoid) {
future.asInstanceOf[CompletableFuture[Option[_]]].completeWithResult(None) future.asInstanceOf[CompletableFuture[Option[_]]].completeWithResult(None)
@ -283,7 +290,6 @@ trait ActorRef extends TransactionManagement {
if (isActiveObject) throw e if (isActiveObject) throw e
else None else None
} }
if (future.exception.isDefined) throw future.exception.get._2 if (future.exception.isDefined) throw future.exception.get._2
else future.result else future.result
} else throw new ActorInitializationException( } else throw new ActorInitializationException(
@ -304,7 +310,7 @@ trait ActorRef extends TransactionManagement {
* If you are sending messages using <code>!!</code> then you <b>have to</b> use <code>self.reply(..)</code> * If you are sending messages using <code>!!</code> then you <b>have to</b> use <code>self.reply(..)</code>
* to send a reply message to the original sender. If not then the sender will block until the timeout expires. * to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/ */
def !![T](message: Any)(implicit sender: Option[ActorRef] = None): Option[T] = !![T](message, timeout) // def !![T](message: Any)(implicit sender: Option[ActorRef] = None): Option[T] = !![T](message, timeout)
/** /**
* Sends a message asynchronously returns a future holding the eventual reply message. * Sends a message asynchronously returns a future holding the eventual reply message.
@ -315,8 +321,8 @@ trait ActorRef extends TransactionManagement {
* If you are sending messages using <code>!!!</code> then you <b>have to</b> use <code>self.reply(..)</code> * If you are sending messages using <code>!!!</code> then you <b>have to</b> use <code>self.reply(..)</code>
* to send a reply message to the original sender. If not then the sender will block until the timeout expires. * to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/ */
def !!![T](message: Any): Future[T] = { def !!![T](message: Any)(implicit sender: Option[ActorRef] = None): Future[T] = {
if (isRunning) postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, None) if (isRunning) postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, sender, None)
else throw new ActorInitializationException( else throw new ActorInitializationException(
"Actor has not been started, you need to invoke 'actor.start' before using it") "Actor has not been started, you need to invoke 'actor.start' before using it")
} }
@ -328,11 +334,10 @@ trait ActorRef extends TransactionManagement {
*/ */
def forward(message: Any)(implicit sender: Some[ActorRef]) = { def forward(message: Any)(implicit sender: Some[ActorRef]) = {
if (isRunning) { if (isRunning) {
sender.get.replyTo match { if (sender.get.senderFuture.isDefined) postMessageToMailboxAndCreateFutureResultWithTimeout(
case Some(Left(actorRef)) => postMessageToMailbox(message, Some(actorRef)) message, timeout, sender.get.sender, sender.get.senderFuture)
case Some(Right(future)) => postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, Some(future)) else if (sender.get.sender.isDefined) postMessageToMailbox(message, Some(sender.get.sender.get))
case _ => throw new IllegalStateException("Can't forward message when initial sender is not an actor") else throw new IllegalStateException("Can't forward message when initial sender is not an actor")
}
} else throw new ActorInitializationException("Actor has not been started, you need to invoke 'actor.start' before using it") } else throw new ActorInitializationException("Actor has not been started, you need to invoke 'actor.start' before using it")
} }
@ -358,10 +363,14 @@ trait ActorRef extends TransactionManagement {
* <p/> * <p/>
* Returns true if reply was sent, and false if unable to determine what to reply to. * Returns true if reply was sent, and false if unable to determine what to reply to.
*/ */
def reply_?(message: Any): Boolean = replyTo match { def reply_?(message: Any): Boolean = {
case Some(Left(actor)) => actor ! message; true if (senderFuture.isDefined) {
case Some(Right(future: Future[Any])) => future completeWithResult message; true senderFuture.get completeWithResult message
case _ => false true
} else if (sender.isDefined) {
sender.get ! message
true
} else false
} }
/** /**
@ -527,6 +536,7 @@ trait ActorRef extends TransactionManagement {
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
message: Any, message: Any,
timeout: Long, timeout: Long,
senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T]
protected[this] def actorInstance: AtomicReference[Actor] protected[this] def actorInstance: AtomicReference[Actor]
@ -596,7 +606,7 @@ sealed class LocalActorRef private[akka](
@volatile private var runActorInitialization = false @volatile private var runActorInitialization = false
// Needed to be able to null out the 'val self: ActorRef' member variables to make the Actor // Needed to be able to null out the 'val self: ActorRef' member variables to make the Actor
// instance eligeble for garbage collection // instance elegible for garbage collection
private val actorSelfFields = findActorSelfField(actor.getClass) private val actorSelfFields = findActorSelfField(actor.getClass)
if (runActorInitialization) initializeActorInstance if (runActorInitialization) initializeActorInstance
@ -893,6 +903,7 @@ sealed class LocalActorRef private[akka](
} }
protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = { protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = {
sender = senderOption
joinTransaction(message) joinTransaction(message)
if (remoteAddress.isDefined) { if (remoteAddress.isDefined) {
@ -912,7 +923,7 @@ sealed class LocalActorRef private[akka](
RemoteProtocolBuilder.setMessage(message, requestBuilder) RemoteProtocolBuilder.setMessage(message, requestBuilder)
RemoteClient.clientFor(remoteAddress.get).send[Any](requestBuilder.build, None) RemoteClient.clientFor(remoteAddress.get).send[Any](requestBuilder.build, None)
} else { } else {
val invocation = new MessageInvocation(this, message, senderOption.map(Left(_)), transactionSet.get) val invocation = new MessageInvocation(this, message, senderOption, None, transactionSet.get)
if (dispatcher.usesActorMailbox) { if (dispatcher.usesActorMailbox) {
_mailbox.add(invocation) _mailbox.add(invocation)
invocation.send invocation.send
@ -923,7 +934,9 @@ sealed class LocalActorRef private[akka](
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
message: Any, message: Any,
timeout: Long, timeout: Long,
senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
sender = senderOption
joinTransaction(message) joinTransaction(message)
if (remoteAddress.isDefined) { if (remoteAddress.isDefined) {
@ -949,7 +962,7 @@ sealed class LocalActorRef private[akka](
val future = if (senderFuture.isDefined) senderFuture.get val future = if (senderFuture.isDefined) senderFuture.get
else new DefaultCompletableFuture[T](timeout) else new DefaultCompletableFuture[T](timeout)
val invocation = new MessageInvocation( val invocation = new MessageInvocation(
this, message, Some(Right(future.asInstanceOf[CompletableFuture[Any]])), transactionSet.get) this, message, senderOption, Some(future.asInstanceOf[CompletableFuture[Any]]), transactionSet.get)
if (dispatcher.usesActorMailbox) _mailbox.add(invocation) if (dispatcher.usesActorMailbox) _mailbox.add(invocation)
invocation.send invocation.send
future future
@ -968,6 +981,8 @@ sealed class LocalActorRef private[akka](
*/ */
protected[akka] def invoke(messageHandle: MessageInvocation) = actor.synchronized { protected[akka] def invoke(messageHandle: MessageInvocation) = actor.synchronized {
try { try {
sender = messageHandle.sender
senderFuture = messageHandle.senderFuture
if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle) if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle)
else dispatch(messageHandle) else dispatch(messageHandle)
} catch { } catch {
@ -978,11 +993,8 @@ sealed class LocalActorRef private[akka](
} }
private def dispatch[T](messageHandle: MessageInvocation) = { private def dispatch[T](messageHandle: MessageInvocation) = {
setTransactionSet(messageHandle.transactionSet)
val message = messageHandle.message //serializeMessage(messageHandle.message) val message = messageHandle.message //serializeMessage(messageHandle.message)
_replyTo = messageHandle.replyTo setTransactionSet(messageHandle.transactionSet)
try { try {
if (actor.base.isDefinedAt(message)) actor.base(message) // invoke user actor's receive partial function if (actor.base.isDefinedAt(message)) actor.base(message) // invoke user actor's receive partial function
else throw new IllegalArgumentException( else throw new IllegalArgumentException(
@ -993,16 +1005,14 @@ sealed class LocalActorRef private[akka](
Actor.log.error(e, "Could not invoke actor [%s]", toString) Actor.log.error(e, "Could not invoke actor [%s]", toString)
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
if (_supervisor.isDefined) _supervisor.get ! Exit(this, e) if (_supervisor.isDefined) _supervisor.get ! Exit(this, e)
replyTo match { senderFuture.foreach(_.completeWithException(this, e))
case Some(Right(future)) => future.completeWithException(this, e)
case _ =>
}
} finally { } finally {
clearTransaction clearTransaction
} }
} }
private def transactionalDispatch[T](messageHandle: MessageInvocation) = { private def transactionalDispatch[T](messageHandle: MessageInvocation) = {
val message = messageHandle.message //serializeMessage(messageHandle.message)
var topLevelTransaction = false var topLevelTransaction = false
val txSet: Option[CountDownCommitBarrier] = val txSet: Option[CountDownCommitBarrier] =
if (messageHandle.transactionSet.isDefined) messageHandle.transactionSet if (messageHandle.transactionSet.isDefined) messageHandle.transactionSet
@ -1017,9 +1027,6 @@ sealed class LocalActorRef private[akka](
} }
setTransactionSet(txSet) setTransactionSet(txSet)
val message = messageHandle.message //serializeMessage(messageHandle.message)
_replyTo = messageHandle.replyTo
def proceed = { def proceed = {
if (actor.base.isDefinedAt(message)) actor.base(message) // invoke user actor's receive partial function if (actor.base.isDefinedAt(message)) actor.base(message) // invoke user actor's receive partial function
else throw new IllegalArgumentException( else throw new IllegalArgumentException(
@ -1044,10 +1051,7 @@ sealed class LocalActorRef private[akka](
} catch { case e: IllegalStateException => {} } } catch { case e: IllegalStateException => {} }
Actor.log.error(e, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message) Actor.log.error(e, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message)
replyTo match { senderFuture.foreach(_.completeWithException(this, e))
case Some(Right(future)) => future.completeWithException(this, e)
case _ =>
}
clearTransaction clearTransaction
if (topLevelTransaction) clearTransactionSet if (topLevelTransaction) clearTransactionSet
@ -1238,6 +1242,7 @@ private[akka] case class RemoteActorRef private[akka] (
def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
message: Any, message: Any,
timeout: Long, timeout: Long,
senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
val requestBuilder = RemoteRequestProtocol.newBuilder val requestBuilder = RemoteRequestProtocol.newBuilder
.setId(RemoteRequestProtocolIdFactory.nextId) .setId(RemoteRequestProtocolIdFactory.nextId)

View file

@ -143,12 +143,10 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
private def donateMessage(receiver: ActorRef, thief: ActorRef): Boolean = { private def donateMessage(receiver: ActorRef, thief: ActorRef): Boolean = {
val donated = receiver.mailbox.pollLast val donated = receiver.mailbox.pollLast
if (donated ne null) { if (donated ne null) {
donated.replyTo match { if (donated.senderFuture.isDefined) thief.self.postMessageToMailboxAndCreateFutureResultWithTimeout[Any](
case None => thief.self.postMessageToMailbox(donated.message, None) donated.message, receiver.timeout, donated.sender, donated.senderFuture)
case Some(Left(actor)) => thief.self.postMessageToMailbox(donated.message, Some(actor.asInstanceOf[ActorRef])) else if (donated.sender.isDefined) thief.self.postMessageToMailbox(donated.message, donated.sender)
case Some(Right(future)) => thief.self.postMessageToMailboxAndCreateFutureResultWithTimeout[Any]( else thief.self.postMessageToMailbox(donated.message, None)
donated.message, receiver.timeout, Some(future.asInstanceOf[CompletableFuture[Any]]))
}
true true
} else false } else false
} }

View file

@ -15,7 +15,8 @@ import org.multiverse.commitbarriers.CountDownCommitBarrier
final class MessageInvocation(val receiver: ActorRef, final class MessageInvocation(val receiver: ActorRef,
val message: Any, val message: Any,
val replyTo : Option[Either[ActorRef, CompletableFuture[Any]]], val sender: Option[ActorRef],
val senderFuture: Option[CompletableFuture[Any]],
val transactionSet: Option[CountDownCommitBarrier]) { val transactionSet: Option[CountDownCommitBarrier]) {
if (receiver eq null) throw new IllegalArgumentException("receiver is null") if (receiver eq null) throw new IllegalArgumentException("receiver is null")
@ -41,7 +42,8 @@ final class MessageInvocation(val receiver: ActorRef,
"MessageInvocation[" + "MessageInvocation[" +
"\n\tmessage = " + message + "\n\tmessage = " + message +
"\n\treceiver = " + receiver + "\n\treceiver = " + receiver +
"\n\treplyTo = " + replyTo + "\n\tsender = " + sender +
"\n\tsenderFuture = " + senderFuture +
"\n\ttransactionSet = " + transactionSet + "\n\ttransactionSet = " + transactionSet +
"\n]" "\n]"
} }

View file

@ -6,10 +6,10 @@ package se.scalablesolutions.akka.patterns
import se.scalablesolutions.akka.actor.{Actor, ActorRef} import se.scalablesolutions.akka.actor.{Actor, ActorRef}
/** A Dispatcher is a trait whose purpose is to route incoming messages to actors /**
* A Dispatcher is a trait whose purpose is to route incoming messages to actors.
*/ */
trait Dispatcher { this: Actor => trait Dispatcher { this: Actor =>
// implicit val sender = Some(self)
protected def transform(msg: Any): Any = msg protected def transform(msg: Any): Any = msg
@ -17,16 +17,18 @@ trait Dispatcher { this: Actor =>
protected def dispatch: Receive = { protected def dispatch: Receive = {
case a if routes.isDefinedAt(a) => case a if routes.isDefinedAt(a) =>
if (self.replyTo.isDefined) routes(a).forward(transform(a))(Some(self)) if (isSenderDefined) routes(a).forward(transform(a))(someSelf)
else routes(a).!(transform(a))(None) else routes(a).!(transform(a))(None)
} }
def receive = dispatch def receive = dispatch
private def isSenderDefined = self.senderFuture.isDefined || self.sender.isDefined
} }
/** A LoadBalancer is a specialized kind of Dispatcher, /**
* that is supplied an InfiniteIterator of targets * A LoadBalancer is a specialized kind of Dispatcher, that is supplied an InfiniteIterator of targets
* to dispatch incoming messages to * to dispatch incoming messages to.
*/ */
trait LoadBalancer extends Dispatcher { self: Actor => trait LoadBalancer extends Dispatcher { self: Actor =>
protected def seq: InfiniteIterator[ActorRef] protected def seq: InfiniteIterator[ActorRef]

View file

@ -83,14 +83,8 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture
def receive = { def receive = {
case Get => case Get =>
val ref = dataFlow.value.get val ref = dataFlow.value.get
if (ref.isDefined) if (ref.isDefined) self.reply(ref.get)
self.reply(ref.get) else readerFuture = self.senderFuture.asInstanceOf[Option[CompletableFuture[T]]]
else {
readerFuture = self.replyTo match {
case Some(Right(future)) => Some(future.asInstanceOf[CompletableFuture[T]])
case _ => None
}
}
case Set(v:T) => if (readerFuture.isDefined) readerFuture.get.completeWithResult(v) case Set(v:T) => if (readerFuture.isDefined) readerFuture.get.completeWithResult(v)
case Exit => exit case Exit => exit
} }

View file

@ -15,7 +15,7 @@ object ActorFireForgetRequestReplySpec {
case "Send" => case "Send" =>
self.reply("Reply") self.reply("Reply")
case "SendImplicit" => case "SendImplicit" =>
self.replyTo.get.left.get ! "ReplyImplicit" self.sender.get ! "ReplyImplicit"
} }
} }

View file

@ -15,7 +15,7 @@ object ForwardActorSpec {
val latch = new CountDownLatch(1) val latch = new CountDownLatch(1)
def receive = { def receive = {
case "SendBang" => { case "SendBang" => {
ForwardState.sender = Some(self.replyTo.get.left.get) ForwardState.sender = self.sender
latch.countDown latch.countDown
} }
case "SendBangBang" => self.reply("SendBangBang") case "SendBangBang" => self.reply("SendBangBang")