unify sender/senderFuture into channel (++)

(squashed merge from the various bits and pieces already part of
release-1.2, everything related to Channel & Future)
This commit is contained in:
Roland 2011-06-13 22:36:46 +02:00
parent cee934a99a
commit 7712c20620
28 changed files with 679 additions and 252 deletions

View file

@ -23,7 +23,7 @@ object ActorFireForgetRequestReplySpec {
case "Send"
self.reply("Reply")
case "SendImplicit"
self.sender.get ! "ReplyImplicit"
self.channel ! "ReplyImplicit"
}
}

View file

@ -0,0 +1,55 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.actor
import org.scalatest.{ WordSpec, BeforeAndAfterAll }
import org.scalatest.matchers.MustMatchers
import akka.testkit.TestKit
import akka.dispatch.FutureTimeoutException
import akka.util.duration._
class ActorTimeoutSpec
extends WordSpec
with BeforeAndAfterAll
with MustMatchers
with TestKit {
val echo = Actor.actorOf(new Actor {
def receive = {
case x
}
}).start()
val testTimeout = if (Actor.defaultTimeout.duration < 400.millis) 500 millis else 100 millis
override def afterAll { echo.stop() }
"An Actor-based Future" must {
"use the global default timeout if no implicit in scope" in {
echo.timeout = 12
within((Actor.TIMEOUT - 100).millis, (Actor.TIMEOUT + 300).millis) {
val f = echo ? "hallo"
intercept[FutureTimeoutException] { f.await }
}
}
"use implicitly supplied timeout" in {
implicit val timeout = Actor.Timeout(testTimeout)
within(testTimeout - 100.millis, testTimeout + 300.millis) {
val f = (echo ? "hallo").mapTo[String]
intercept[FutureTimeoutException] { f.await }
f.value must be(None)
}
}
"use explicitly supplied timeout" in {
within(testTimeout - 100.millis, testTimeout + 300.millis) {
(echo.?("hallo")(timeout = testTimeout)).as[String] must be(None)
}
}
}
}

View file

@ -102,7 +102,7 @@ object Chameneos {
}
} else {
waitingChameneo.foreach(_ ! Exit)
self.sender.get ! Exit
self.channel ! Exit
}
}
}

View file

@ -4,7 +4,7 @@ import org.scalatest.matchers.MustMatchers
import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import akka.actor.{ Actor, ActorRegistry }
import akka.actor.{ Actor, ActorRegistry, NullChannel }
import akka.actor.Actor.{ actorOf }
import java.util.concurrent.{ TimeUnit, CountDownLatch, BlockingQueue }
import java.util.{ Queue }
@ -84,7 +84,7 @@ abstract class MailboxSpec extends WordSpec with MustMatchers with BeforeAndAfte
new MessageInvocation(
actorOf(new Actor { //Dummy actor
def receive = { case _ }
}), msg, None, None)
}), msg, NullChannel)
}
def ensureInitialMailboxState(config: MailboxType, q: MessageQueue) {

View file

@ -193,11 +193,11 @@ class RoutingSpec extends WordSpec with MustMatchers {
}).start()
val successes = TestLatch(2)
val successCounter = Some(actorOf(new Actor {
val successCounter = actorOf(new Actor {
def receive = {
case "success" successes.countDown()
}
}).start())
}).start()
implicit val replyTo = successCounter
pool ! "a"

View file

@ -24,6 +24,7 @@ import scala.reflect.BeanProperty
import com.eaio.uuid.UUID
import java.lang.reflect.InvocationTargetException
import java.util.concurrent.TimeUnit
/**
* Life-cycle messages for the Actors
@ -110,9 +111,6 @@ object Status {
*/
object Actor extends ListenerManagement {
private[akka] val TIMEOUT = Duration(config.getInt("akka.actor.timeout", 5), TIME_UNIT).toMillis
private[akka] val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false)
/**
* A Receive is a convenience type that defines actor message behavior currently modeled as
* a PartialFunction[Any, Unit].
@ -140,6 +138,20 @@ object Actor extends ListenerManagement {
override def initialValue = Stack[ActorRef]()
}
case class Timeout(duration: Duration) {
def this(timeout: Long) = this(Duration(timeout, TimeUnit.MILLISECONDS))
def this(length: Long, unit: TimeUnit) = this(Duration(length, unit))
}
object Timeout {
def apply(timeout: Long) = new Timeout(timeout)
def apply(length: Long, unit: TimeUnit) = new Timeout(length, unit)
implicit def durationToTimeout(duration: Duration) = new Timeout(duration)
}
private[akka] val TIMEOUT = Duration(config.getInt("akka.actor.timeout", 5), TIME_UNIT).toMillis
val defaultTimeout = Timeout(TIMEOUT)
private[akka] val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false)
/**
* Handle to the ActorRegistry.
*/
@ -495,14 +507,14 @@ trait Actor {
*/
type Receive = Actor.Receive
/*
/**
* Some[ActorRef] representation of the 'self' ActorRef reference.
* <p/>
* Mainly for internal use, functions as the implicit sender references when invoking
* the 'forward' function.
*/
@transient
implicit val someSelf: Some[ActorRef] = {
val someSelf: Some[ActorRef] = {
val refStack = Actor.actorRefInCreation.get
if (refStack.isEmpty) throw new ActorInitializationException(
"ActorRef for instance of actor [" + getClass.getName + "] is not in scope." +
@ -528,7 +540,7 @@ trait Actor {
* Mainly for internal use, functions as the implicit sender references when invoking
* one of the message send functions ('!', '!!' and '!!!').
*/
implicit def optionSelf: Option[ActorRef] = someSelf
def optionSelf: Option[ActorRef] = someSelf
/**
* The 'self' field holds the ActorRef for this actor.
@ -558,7 +570,7 @@ trait Actor {
* </pre>
*/
@transient
val self: ScalaActorRef = someSelf.get
implicit val self: ScalaActorRef = someSelf.get
/**
* User overridable callback/setting.
@ -645,8 +657,7 @@ trait Actor {
private[akka] final def apply(msg: Any) = {
if (msg.isInstanceOf[AnyRef] && (msg.asInstanceOf[AnyRef] eq null))
throw new InvalidMessageException("Message from [" + self.sender + "] to [" + self.toString + "] is null")
throw new InvalidMessageException("Message from [" + self.channel + "] to [" + self.toString + "] is null")
val behaviorStack = self.hotswap
msg match {
@ -675,9 +686,9 @@ trait Actor {
case Restart(reason) throw reason
case Kill throw new ActorKilledException("Kill")
case PoisonPill
val f = self.senderFuture()
val ch = self.channel
self.stop()
if (f.isDefined) f.get.completeWithException(new ActorKilledException("PoisonPill"))
ch.sendException(new ActorKilledException("PoisonPill"))
}
}

View file

@ -33,27 +33,6 @@ private[akka] object ActorRefInternals {
object SHUTDOWN extends StatusType
}
/**
* Abstraction for unification of sender and senderFuture for later reply.
* Can be stored away and used at a later point in time.
*/
abstract class Channel[T] {
/**
* Scala API. <p/>
* Sends the specified message to the channel.
*/
def !(msg: T)
/**
* Java API. <p/>
* Sends the specified message to the channel.
*/
def sendOneWay(msg: T) {
this.!(msg)
}
}
/**
* ActorRef is an immutable and serializable handle to an Actor.
* <p/>
@ -86,7 +65,7 @@ abstract class Channel[T] {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] with Serializable { scalaRef: ScalaActorRef
trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Comparable[ActorRef] with Serializable { scalaRef: ScalaActorRef
// Only mutable for RemoteServer in order to maintain identity across nodes
@volatile
protected[akka] var _uuid = newUuid
@ -241,40 +220,18 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] with S
_uuid = uid
}
/**
* Akka Java API. <p/>
* Sends a one-way asynchronous message. E.g. fire-and-forget semantics.
* <p/>
* <pre>
* actor.sendOneWay(message);
* </pre>
* <p/>
*/
def sendOneWay(message: AnyRef): Unit = {
sendOneWay(message, null)
}
/**
* Akka Java API. <p/>
* Sends a one-way asynchronous message. E.g. fire-and-forget semantics.
* <p/>
* Allows you to pass along the sender of the message.
* <p/>
* <pre>
* actor.sendOneWay(message, context);
* </pre>
* <p/>
*/
def sendOneWay(message: AnyRef, sender: ActorRef) {
this.!(message)(Option(sender))
}
/**
* Akka Java API. <p/>
* @see sendRequestReply(message: AnyRef, timeout: Long, sender: ActorRef)
* Uses the default timeout of the Actor (setTimeout()) and omits the sender reference
*/
def sendRequestReply(message: AnyRef): AnyRef = sendRequestReply(message, timeout, null)
def sendRequestReply(message: AnyRef): AnyRef = {
!!(message, timeout).getOrElse(throw new ActorTimeoutException(
"Message [" + message +
"]\n\tfrom [nowhere]\n\twith timeout [" + timeout +
"]\n\ttimed out."))
.asInstanceOf[AnyRef]
}
/**
* Akka Java API. <p/>
@ -298,7 +255,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] with S
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
def sendRequestReply(message: AnyRef, timeout: Long, sender: ActorRef): AnyRef = {
!!(message, timeout)(Option(sender)).getOrElse(throw new ActorTimeoutException(
?(message)(sender, Actor.Timeout(timeout)).as[AnyRef].getOrElse(throw new ActorTimeoutException(
"Message [" + message +
"]\n\tfrom [" + (if (sender ne null) sender.address else "nowhere") +
"]\n\twith timeout [" + timeout +
@ -311,14 +268,14 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] with S
* @see sendRequestReplyFuture(message: AnyRef, sender: ActorRef): Future[_]
* Uses the Actors default timeout (setTimeout()) and omits the sender
*/
def sendRequestReplyFuture[T <: AnyRef](message: AnyRef): Future[T] = sendRequestReplyFuture(message, timeout, null).asInstanceOf[Future[T]]
def sendRequestReplyFuture(message: AnyRef): Future[Any] = ?(message)
/**
* Akka Java API. <p/>
* @see sendRequestReplyFuture(message: AnyRef, sender: ActorRef): Future[_]
* Uses the Actors default timeout (setTimeout())
*/
def sendRequestReplyFuture[T <: AnyRef](message: AnyRef, sender: ActorRef): Future[T] = sendRequestReplyFuture(message, timeout, sender).asInstanceOf[Future[T]]
def sendRequestReplyFuture(message: AnyRef, sender: ActorRef): Future[Any] = ?(message)(sender)
/**
* Akka Java API. <p/>
@ -331,7 +288,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] with S
* If you are sending messages using <code>sendRequestReplyFuture</code> then you <b>have to</b> use <code>getContext().reply(..)</code>
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
def sendRequestReplyFuture[T <: AnyRef](message: AnyRef, timeout: Long, sender: ActorRef): Future[T] = !!!(message, timeout)(Option(sender)).asInstanceOf[Future[T]]
def sendRequestReplyFuture(message: AnyRef, timeout: Long, sender: ActorRef): Future[Any] = ?(message)(sender, Actor.Timeout(timeout))
/**
* Akka Java API. <p/>
@ -339,7 +296,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] with S
*/
def forward(message: AnyRef, sender: ActorRef) {
if (sender eq null) throw new IllegalArgumentException("The 'sender' argument to 'forward' can't be null")
else forward(message)(Some(sender))
else forward(message)(sender)
}
/**
@ -448,36 +405,36 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] with S
/**
* Abstraction for unification of sender and senderFuture for later reply
*/
def channel: Channel[Any] = {
if (senderFuture.isDefined) {
new Channel[Any] {
val future = senderFuture.get
def !(msg: Any) = future completeWithResult msg
}
} else if (sender.isDefined) {
val someSelf = Some(this)
new Channel[Any] {
val client = sender.get
def !(msg: Any) = client.!(msg)(someSelf)
}
} else throw new IllegalActorStateException("No channel available")
def channel: UntypedChannel = {
val msg = currentMessage
if (msg ne null) msg.channel
else NullChannel
}
/*
* Implementation of ForwardableChannel
*/
def sendException(ex: Throwable) {}
def isUsableOnlyOnce = false
def isUsable = true
def isReplyable = true
def canSendException = false
/**
* Java API. <p/>
* Abstraction for unification of sender and senderFuture for later reply
*/
def getChannel: Channel[Any] = channel
def getChannel: UntypedChannel = channel
protected[akka] def invoke(messageHandle: MessageInvocation)
protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef])
protected[akka] def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: Any,
timeout: Long,
senderOption: Option[ActorRef],
senderFuture: Option[Promise[T]]): Promise[T]
channel: UntypedChannel): Future[Any]
protected[akka] def actorInstance: AtomicReference[Actor]
@ -692,18 +649,19 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
_supervisor = sup
}
protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]) {
dispatcher dispatchMessage MessageInvocation(this, message, senderOption, None)
}
protected[akka] def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit =
dispatcher dispatchMessage new MessageInvocation(this, message, channel)
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: Any,
timeout: Long,
senderOption: Option[ActorRef],
senderFuture: Option[Promise[T]]): Promise[T] = {
val future = if (senderFuture.isDefined) senderFuture else Some(new DefaultPromise[T](timeout))
dispatcher dispatchMessage MessageInvocation(this, message, senderOption, future.asInstanceOf[Some[Promise[Any]]])
future.get
channel: UntypedChannel): Future[Any] = {
val future = channel match {
case f: ActorPromise f
case _ new ActorPromise(timeout)
}
dispatcher dispatchMessage new MessageInvocation(this, message, future)
future
}
/**
@ -888,7 +846,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
//Prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
senderFuture.foreach(_.completeWithException(reason))
channel.sendException(reason)
if (supervisor.isDefined) notifySupervisorWithMessage(Exit(this, reason))
else {
@ -995,19 +953,28 @@ private[akka] case class RemoteActorRef private[akka] (
start()
def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]) {
Actor.remote.send[Any](message, senderOption, None, remoteAddress, timeout, true, this, loader)
def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = {
val chSender = channel match {
case ref: ActorRef Some(ref)
case _ None
}
Actor.remote.send[Any](message, chSender, None, remoteAddress, timeout, true, this, loader)
}
def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: Any,
timeout: Long,
senderOption: Option[ActorRef],
senderFuture: Option[Promise[T]]): Promise[T] = {
val future = Actor.remote.send[T](
message, senderOption, senderFuture,
remoteAddress, timeout, false, this, loader)
if (future.isDefined) future.get
channel: UntypedChannel): Future[Any] = {
val chSender = channel match {
case ref: ActorRef Some(ref)
case _ None
}
val chFuture = channel match {
case f: Promise[Any] Some(f)
case _ None
}
val future = Actor.remote.send[Any](message, chSender, chFuture, remoteAddress, timeout, false, this, loader)
if (future.isDefined) ActorPromise(future.get)
else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString)
}
@ -1096,7 +1063,7 @@ trait ActorRefShared {
* There are implicit conversions in ../actor/Implicits.scala
* from ActorRef -> ScalaActorRef and back
*/
trait ScalaActorRef extends ActorRefShared { ref: ActorRef
trait ScalaActorRef extends ActorRefShared with ForwardableChannel { ref: ActorRef
/**
* Address for actor, must be a unique one.
@ -1134,20 +1101,28 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef ⇒
* The reference sender Actor of the last received message.
* Is defined if the message was sent from another Actor, else None.
*/
@deprecated("will be removed in 2.0, use channel instead", "1.2")
def sender: Option[ActorRef] = {
val msg = currentMessage
if (msg eq null) None
else msg.sender
else msg.channel match {
case ref: ActorRef Some(ref)
case _ None
}
}
/**
* The reference sender future of the last received message.
* Is defined if the message was sent with sent with '!!' or '!!!', else None.
*/
@deprecated("will be removed in 2.0, use channel instead", "1.2")
def senderFuture(): Option[Promise[Any]] = {
val msg = currentMessage
if (msg eq null) None
else msg.senderFuture
else msg.channel match {
case f: ActorPromise Some(f)
case _ None
}
}
/**
@ -1164,8 +1139,8 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef ⇒
* </pre>
* <p/>
*/
def !(message: Any)(implicit sender: Option[ActorRef] = None) {
if (isRunning) postMessageToMailbox(message, sender)
def !(message: Any)(implicit channel: UntypedChannel = NullChannel): Unit = {
if (isRunning) postMessageToMailbox(message, channel)
else throw new ActorInitializationException(
"Actor has not been started, you need to invoke 'actor.start()' before using it")
}
@ -1182,9 +1157,10 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef ⇒
* If you are sending messages using <code>!!</code> then you <b>have to</b> use <code>self.reply(..)</code>
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
def !!(message: Any, timeout: Long = this.timeout)(implicit sender: Option[ActorRef] = None): Option[Any] = {
@deprecated("use `(actor ? msg).as[T]` instead", "1.2")
def !!(message: Any, timeout: Long = this.timeout)(implicit channel: UntypedChannel = NullChannel): Option[Any] = {
if (isRunning) {
val future = postMessageToMailboxAndCreateFutureResultWithTimeout[Any](message, timeout, sender, None)
val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, channel)
try { future.await.resultOrException } catch { case e: FutureTimeoutException None }
} else throw new ActorInitializationException(
@ -1200,8 +1176,15 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef ⇒
* If you are sending messages using <code>!!!</code> then you <b>have to</b> use <code>self.reply(..)</code>
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
def !!![T](message: Any, timeout: Long = this.timeout)(implicit sender: Option[ActorRef] = None): Future[T] = {
if (isRunning) postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, sender, None)
@deprecated("return type is an illusion, use the more honest ? method", "1.2")
def !!![T](message: Any, timeout: Long = this.timeout)(implicit channel: UntypedChannel = NullChannel): Future[T] =
this.?(message)(channel, Actor.Timeout(timeout)).asInstanceOf[Future[T]]
/**
* Sends a message asynchronously, returning a future which may eventually hold the reply.
*/
def ?(message: Any)(implicit channel: UntypedChannel = NullChannel, timeout: Actor.Timeout = Actor.defaultTimeout): Future[Any] = {
if (isRunning) postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout.duration.toMillis, channel)
else throw new ActorInitializationException(
"Actor has not been started, you need to invoke 'actor.start()' before using it")
}
@ -1211,12 +1194,9 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef ⇒
* <p/>
* Works with '!', '!!' and '!!!'.
*/
def forward(message: Any)(implicit sender: Some[ActorRef]) = {
def forward(message: Any)(implicit channel: ForwardableChannel) = {
if (isRunning) {
if (sender.get.senderFuture.isDefined)
postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, sender.get.sender, sender.get.senderFuture)
else
postMessageToMailbox(message, sender.get.sender)
postMessageToMailbox(message, channel.channel)
} else throw new ActorInitializationException("Actor has not been started, you need to invoke 'actor.start()' before using it")
}
@ -1226,14 +1206,7 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef ⇒
* <p/>
* Throws an IllegalStateException if unable to determine what to reply to.
*/
def reply(message: Any) {
if (!reply_?(message)) throw new IllegalActorStateException(
"\n\tNo sender in scope, can't reply. " +
"\n\tYou have probably: " +
"\n\t\t1. Sent a message to an Actor from an instance that is NOT an Actor." +
"\n\t\t2. Invoked a method on an TypedActor from an instance NOT an TypedActor." +
"\n\tElse you might want to use 'reply_?' which returns Boolean(true) if succes and Boolean(false) if no sender in scope")
}
def reply(message: Any) = channel.!(message)(this)
/**
* Use <code>reply_?(..)</code> to reply with a message to the original sender of the message currently
@ -1241,16 +1214,7 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef ⇒
* <p/>
* Returns true if reply was sent, and false if unable to determine what to reply to.
*/
def reply_?(message: Any): Boolean = {
if (senderFuture.isDefined) {
senderFuture.get completeWithResult message
true
} else if (sender.isDefined) {
//TODO: optimize away this allocation, perhaps by having implicit self: Option[ActorRef] in signature
sender.get.!(message)(Some(this))
true
} else false
}
def reply_?(message: Any): Boolean = channel.safe_!(message)(this)
}
case class SerializedActorRef(val uuid: Uuid,

View file

@ -0,0 +1,173 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.actor
/**
* Abstraction for unification of sender and senderFuture for later reply.
* Can be stored away and used at a later point in time.
*
* Channel cannot be contravariant because of Future providing its value in
* covariant position.
*
* The possible reply channel which can be passed into ! and safe_! is always
* untyped, as there is no way to utilize its real static type without
* requiring runtime-costly manifests.
*/
trait Channel[T] {
/**
* Scala API. <p/>
* Sends the specified message to the channel.
*/
def !(msg: T)(implicit channel: UntypedChannel = NullChannel): Unit
/**
* Try to send an exception. Not all channel types support this, one notable
* positive example is Future. Failure to send is silent.
*/
def sendException(ex: Throwable): Unit
/**
* Scala API.<p/>
* Try to send message to the channel, return whether successful.
*/
def safe_!(msg: T)(implicit channel: UntypedChannel = NullChannel): Boolean
/**
* Indicates whether this channel may be used only once, e.g. a Future.
*/
def isUsableOnlyOnce: Boolean
/**
* Indicates whether this channel may still be used (only useful if
* isUsableOnlyOnce returns true).
*/
def isUsable: Boolean
/**
* Indicates whether this channel carries reply information, e.g. an
* ActorRef.
*/
def isReplyable: Boolean
/**
* Indicates whether this channel is capable of sending exceptions to its
* recipient.
*/
def canSendException: Boolean
/**
* Java API.<p/>
* Sends the specified message to the channel, i.e. fire-and-forget semantics.<p/>
* <pre>
* actor.sendOneWay(message);
* </pre>
*/
def sendOneWay(msg: T): Unit = this.!(msg)
/**
* Java API. <p/>
* Sends the specified message to the channel, i.e. fire-and-forget
* semantics, including the sender reference if possible (not supported on
* all channels).<p/>
* <pre>
* actor.sendOneWay(message, context);
* </pre>
*/
def sendOneWay(msg: T, sender: UntypedChannel): Unit = this.!(msg)(sender)
/**
* Java API.<p/>
* Try to send the specified message to the channel, i.e. fire-and-forget semantics.<p/>
* <pre>
* actor.sendOneWay(message);
* </pre>
*/
def sendOneWaySafe(msg: T): Boolean = this.safe_!(msg)
/**
* Java API. <p/>
* Try to send the specified message to the channel, i.e. fire-and-forget
* semantics, including the sender reference if possible (not supported on
* all channels).<p/>
* <pre>
* actor.sendOneWay(message, context);
* </pre>
*/
def sendOneWaySafe(msg: T, sender: UntypedChannel): Boolean = this.safe_!(msg)(sender)
}
/**
* This trait represents a channel that a priori does have sending capability,
* i.e. ! is not guaranteed to fail (e.g. NullChannel would be a
* counter-example).
*/
trait AvailableChannel[T] { self: Channel[T]
def safe_!(msg: T)(implicit channel: UntypedChannel = NullChannel): Boolean = {
if (isUsable) {
try {
this ! msg
true
} catch {
case _ false
}
} else false
}
}
/**
* All channels used in conjunction with MessageInvocation are untyped by
* design, so make this explicit.
*/
trait UntypedChannel extends Channel[Any]
object UntypedChannel {
implicit def senderOption2Channel(sender: Option[ActorRef]): UntypedChannel =
sender match {
case Some(actor) actor
case None NullChannel
}
}
/**
* Default channel when none available.
*/
case object NullChannel extends UntypedChannel {
def !(msg: Any)(implicit channel: UntypedChannel = NullChannel) {
throw new IllegalActorStateException("""
No sender in scope, can't reply.
You have probably:
1. Sent a message to an Actor from an instance that is NOT an Actor.
2. Invoked a method on an TypedActor from an instance NOT an TypedActor.
You may want to have a look at safe_! for a variant returning a Boolean""")
}
def safe_!(msg: Any)(implicit channel: UntypedChannel = NullChannel): Boolean = false
def sendException(ex: Throwable) {}
def isUsableOnlyOnce = false
def isUsable = false
def isReplyable = false
def canSendException = false
}
/**
* A channel which may be forwarded: a message received with such a reply
* channel attached can be passed on transparently such that a reply from a
* later processing stage is sent directly back to the origin. Keep in mind
* that not all channels can be used multiple times.
*/
trait ForwardableChannel extends UntypedChannel with AvailableChannel[Any] {
/**
* Get channel by which this channel would reply (ActorRef.forward takes an
* implicit ForwardableChannel and uses its .channel as message origin)
*/
def channel: UntypedChannel
}
object ForwardableChannel {
implicit def someS2FC(sender: Some[ActorRef]): ForwardableChannel = sender.get
implicit def someIS2FC(implicit sender: Some[ActorRef]): ForwardableChannel = sender.get
}

View file

@ -494,10 +494,7 @@ trait FSM[S, D] extends ListenerManagement {
* @return this state transition descriptor
*/
def replying(replyValue: Any): State = {
self.sender match {
case Some(sender) sender ! replyValue
case None
}
self.channel safe_! replyValue
this
}

View file

@ -6,7 +6,7 @@ package akka.actor
import akka.japi.{ Creator, Option JOption }
import akka.actor.Actor.{ actorOf, futureToAnyOptionAsTypedOption }
import akka.dispatch.{ MessageDispatcher, Dispatchers, Future }
import akka.dispatch.{ MessageDispatcher, Dispatchers, Future, FutureTimeoutException }
import java.lang.reflect.{ InvocationTargetException, Method, InvocationHandler, Proxy }
import akka.util.{ Duration }
import java.util.concurrent.atomic.{ AtomicReference AtomVar }
@ -41,6 +41,7 @@ object TypedActor {
case "equals" (args.length == 1 && (proxy eq args(0)) || actor == getActorRefFor(args(0))).asInstanceOf[AnyRef] //Force boxing of the boolean
case "hashCode" actor.hashCode.asInstanceOf[AnyRef]
case _
implicit val timeout = Actor.Timeout(actor.timeout)
MethodCall(method, args) match {
case m if m.isOneWay
actor ! m
@ -48,9 +49,12 @@ object TypedActor {
case m if m.returnsFuture_?
actor !!! m
case m if m.returnsJOption_? || m.returnsOption_?
(actor !!! m).as[AnyRef] match {
case Some(null) | None if (m.returnsJOption_?) JOption.none[Any] else None
case Some(joption) joption
val f = actor ? m
try { f.await } catch { case _: FutureTimeoutException }
f.value match {
case None | Some(Right(null)) if (m.returnsJOption_?) JOption.none[Any] else None
case Some(Right(joption: AnyRef)) joption
case Some(Left(ex)) throw ex
}
case m
(actor !!! m).get

View file

@ -129,10 +129,7 @@ class BalancingDispatcher(
*/
protected def donate(organ: MessageInvocation, recipient: ActorRef): Boolean = {
if (organ ne null) {
if (organ.senderFuture.isDefined) recipient.postMessageToMailboxAndCreateFutureResultWithTimeout[Any](
organ.message, recipient.timeout, organ.sender, organ.senderFuture)
else if (organ.sender.isDefined) recipient.postMessageToMailbox(organ.message, organ.sender)
else recipient.postMessageToMailbox(organ.message, None)
recipient.postMessageToMailbox(organ.message, organ.channel)
true
} else false
}

View file

@ -164,10 +164,7 @@ class Dispatcher(
var invocation = m.dequeue
lazy val exception = new ActorKilledException("Actor has been stopped")
while (invocation ne null) {
val f = invocation.senderFuture
if (f.isDefined)
f.get.completeWithException(exception)
invocation.channel.sendException(exception)
invocation = m.dequeue
}
}

View file

@ -6,8 +6,8 @@ package akka.dispatch
import akka.AkkaException
import akka.event.EventHandler
import akka.actor.{ Actor, Channel }
import akka.util.Duration
import akka.actor.{ Actor, Channel, ForwardableChannel, NullChannel, UntypedChannel, ActorRef }
import akka.util.{ Duration, BoxedType }
import akka.japi.{ Procedure, Function JFunc }
import scala.util.continuations._
@ -308,6 +308,22 @@ sealed trait Future[+T] {
*/
def await(atMost: Duration): Future[T]
/**
* Await completion of this Future (as `await`) and return its value if it
* conforms to A's erased type.
*/
def as[A](implicit m: Manifest[A]): Option[A] =
try {
await
value match {
case None None
case Some(_: Left[_, _]) None
case Some(Right(v)) Some(BoxedType(m.erasure).cast(v).asInstanceOf[A])
}
} catch {
case _: Exception None
}
/**
* Tests whether this Future has been completed.
*/
@ -357,7 +373,7 @@ sealed trait Future[+T] {
* Future. If the Future has already been completed, this will apply
* immediately.
*/
def onComplete(func: Future[T] Unit): Future[T]
def onComplete(func: Future[T] Unit): this.type
/**
* When the future is completed with a valid result, apply the provided
@ -369,7 +385,7 @@ sealed trait Future[+T] {
* }
* </pre>
*/
final def onResult(pf: PartialFunction[Any, Unit]): Future[T] = onComplete { f
final def onResult(pf: PartialFunction[Any, Unit]): this.type = onComplete { f
val optr = f.result
if (optr.isDefined) {
val r = optr.get
@ -496,6 +512,26 @@ sealed trait Future[+T] {
fa
}
/**
* Creates a new Future[A] which is completed with this Future's result if
* that conforms to A's erased type or a ClassCastException otherwise.
*/
final def mapTo[A](implicit m: Manifest[A]): Future[A] = {
val fa = new DefaultPromise[A](timeoutInNanos, NANOS)
onComplete { ft
fa complete (ft.value.get match {
case l: Left[_, _] l.asInstanceOf[Either[Throwable, A]]
case Right(t)
try {
Right(BoxedType(m.erasure).cast(t).asInstanceOf[A])
} catch {
case e: ClassCastException Left(e)
}
})
}
fa
}
/**
* Creates a new Future by applying a function to the successful result of
* this Future, and returns the result of the function as the new Future.
@ -586,7 +622,7 @@ sealed trait Future[+T] {
}
/* Java API */
final def onComplete[A >: T](proc: Procedure[Future[A]]): Future[T] = onComplete(proc(_))
final def onComplete[A >: T](proc: Procedure[Future[A]]): this.type = onComplete(proc(_))
final def map[A >: T, B](f: JFunc[A, B]): Future[B] = map(f(_))
@ -607,10 +643,7 @@ object Promise {
/**
* Construct a completable channel
*/
def channel(timeout: Long = Actor.TIMEOUT) = new Channel[Any] {
val promise = Promise[Any](timeout)
def !(msg: Any) = promise completeWithResult msg
}
def channel(timeout: Long = Actor.TIMEOUT): ActorPromise = new ActorPromise(timeout)
private[akka] val callbacksPendingExecution = new ThreadLocal[Option[Stack[() Unit]]]() {
override def initialValue = None
@ -625,26 +658,26 @@ trait Promise[T] extends Future[T] {
* Completes this Future with the specified result, if not already completed.
* @return this
*/
def complete(value: Either[Throwable, T]): Future[T]
def complete(value: Either[Throwable, T]): this.type
/**
* Completes this Future with the specified result, if not already completed.
* @return this
*/
final def completeWithResult(result: T): Future[T] = complete(Right(result))
final def completeWithResult(result: T): this.type = complete(Right(result))
/**
* Completes this Future with the specified exception, if not already completed.
* @return this
*/
final def completeWithException(exception: Throwable): Future[T] = complete(Left(exception))
final def completeWithException(exception: Throwable): this.type = complete(Left(exception))
/**
* Completes this Future with the specified other Future, when that Future is completed,
* unless this Future has already been completed.
* @return this.
*/
final def completeWith(other: Future[T]): Future[T] = {
final def completeWith(other: Future[T]): this.type = {
other onComplete { f complete(f.value.get) }
this
}
@ -725,7 +758,7 @@ class DefaultPromise[T](timeout: Long, timeunit: TimeUnit) extends Promise[T] {
}
}
def complete(value: Either[Throwable, T]): DefaultPromise[T] = {
def complete(value: Either[Throwable, T]): this.type = {
_lock.lock
val notifyTheseListeners = try {
if (_value.isEmpty) { //Only complete if we aren't expired
@ -772,7 +805,7 @@ class DefaultPromise[T](timeout: Long, timeunit: TimeUnit) extends Promise[T] {
this
}
def onComplete(func: Future[T] Unit): Promise[T] = {
def onComplete(func: Future[T] Unit): this.type = {
_lock.lock
val notifyNow = try {
if (_value.isEmpty) {
@ -804,6 +837,36 @@ class DefaultPromise[T](timeout: Long, timeunit: TimeUnit) extends Promise[T] {
private def timeLeft(): Long = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)
}
class ActorPromise(timeout: Long, timeunit: TimeUnit)
extends DefaultPromise[Any](timeout, timeunit)
with ForwardableChannel {
def this() = this(0, MILLIS)
def this(timeout: Long) = this(timeout, MILLIS)
def !(message: Any)(implicit channel: UntypedChannel = NullChannel) = completeWithResult(message)
def sendException(ex: Throwable) = completeWithException(ex)
def channel: UntypedChannel = this
def isUsableOnlyOnce = true
def isUsable = !isCompleted
def isReplyable = false
def canSendException = true
@deprecated("ActorPromise merged with Channel[Any], just use 'this'", "1.2")
def future = this
}
object ActorPromise {
def apply(f: Promise[Any]): ActorPromise =
new ActorPromise(f.timeoutInNanos, NANOS) {
completeWith(f)
override def !(message: Any)(implicit channel: UntypedChannel) = f completeWithResult message
override def sendException(ex: Throwable) = f completeWithException ex
}
}
/**
* An already completed Future is seeded with it's result at creation, is useful for when you are participating in
* a Future-composition but you already have a value to contribute.
@ -811,10 +874,10 @@ class DefaultPromise[T](timeout: Long, timeunit: TimeUnit) extends Promise[T] {
sealed class KeptPromise[T](suppliedValue: Either[Throwable, T]) extends Promise[T] {
val value = Some(suppliedValue)
def complete(value: Either[Throwable, T]): Promise[T] = this
def onComplete(func: Future[T] Unit): Future[T] = { func(this); this }
def await(atMost: Duration): Future[T] = this
def await: Future[T] = this
def complete(value: Either[Throwable, T]): this.type = this
def onComplete(func: Future[T] Unit): this.type = { func(this); this }
def await(atMost: Duration): this.type = this
def await: this.type = this
def isExpired: Boolean = true
def timeoutInNanos: Long = 0
}

View file

@ -16,10 +16,9 @@ import akka.actor._
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
final case class MessageInvocation(receiver: ActorRef,
message: Any,
sender: Option[ActorRef],
senderFuture: Option[Promise[Any]]) {
final case class MessageInvocation(val receiver: ActorRef,
val message: Any,
val channel: UntypedChannel) {
if (receiver eq null) throw new IllegalArgumentException("Receiver can't be null")
final def invoke() {

View file

@ -5,6 +5,7 @@
package akka.routing
import akka.actor.{ Actor, ActorRef, PoisonPill }
import akka.dispatch.{ Promise }
/**
* Actor pooling
@ -195,7 +196,7 @@ trait MailboxPressureCapacitor {
*/
trait ActiveFuturesPressureCapacitor {
def pressure(delegates: Seq[ActorRef]): Int =
delegates count { _.senderFuture.isDefined }
delegates count { _.channel.isInstanceOf[Promise[Any]] }
}
/**

View file

@ -0,0 +1,25 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.util
import java.{ lang jl }
object BoxedType {
private val toBoxed = Map[Class[_], Class[_]](
classOf[Boolean] -> classOf[jl.Boolean],
classOf[Byte] -> classOf[jl.Byte],
classOf[Char] -> classOf[jl.Character],
classOf[Short] -> classOf[jl.Short],
classOf[Int] -> classOf[jl.Integer],
classOf[Long] -> classOf[jl.Long],
classOf[Float] -> classOf[jl.Float],
classOf[Double] -> classOf[jl.Double],
classOf[Unit] -> classOf[scala.runtime.BoxedUnit])
def apply(c: Class[_]): Class[_] = {
if (c.isPrimitive) toBoxed(c) else c
}
}

View file

@ -10,6 +10,7 @@ import org.apache.camel._
import org.apache.camel.processor.SendProcessor
import akka.actor.{ Actor, ActorRef, UntypedActor }
import akka.dispatch.ActorPromise
/**
* Support trait for producing messages to Camel endpoints.
@ -96,10 +97,9 @@ trait ProducerSupport { this: Actor ⇒
val exchange = createExchange(pattern).fromRequestMessage(cmsg)
processor.process(exchange, new AsyncCallback {
val producer = self
// Need copies of sender and senderFuture references here
// since the callback could be done later by another thread.
val sender = self.sender
val senderFuture = self.senderFuture
// Need copies of channel reference here since the callback could be done
// later by another thread.
val channel = self.channel
def done(doneSync: Boolean): Unit = {
(doneSync, exchange.isFailed) match {
@ -114,10 +114,12 @@ trait ProducerSupport { this: Actor ⇒
receiveAfterProduce(result)
private def dispatchAsync(result: Any) = {
if (senderFuture.isDefined)
producer.postMessageToMailboxAndCreateFutureResultWithTimeout(result, producer.timeout, sender, senderFuture)
else
producer.postMessageToMailbox(result, sender)
channel match {
case _: ActorPromise
producer.postMessageToMailboxAndCreateFutureResultWithTimeout(result, producer.timeout, channel)
case _
producer.postMessageToMailbox(result, channel)
}
}
})
}

View file

@ -285,7 +285,7 @@ private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCall
* @param message reply message
* @param sender ignored
*/
protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]) = {
protected[akka] def postMessageToMailbox(message: Any, channel: UntypedChannel) = {
message match {
case Ack { /* no response message to set */ }
case msg: Failure exchange.fromFailureMessage(msg)
@ -312,7 +312,7 @@ private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCall
def shutdownLinkedActors: Unit = unsupported
def supervisor: Option[ActorRef] = unsupported
def homeAddress: Option[InetSocketAddress] = None
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T](message: Any, timeout: Long, senderOption: Option[ActorRef], senderFuture: Option[Promise[T]]) = unsupported
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout(message: Any, timeout: Long, channel: UntypedChannel) = unsupported
protected[akka] def mailbox: AnyRef = unsupported
protected[akka] def mailbox_=(msg: AnyRef): AnyRef = unsupported
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported

View file

@ -8,7 +8,7 @@ import Cluster._
import akka.actor._
import akka.actor.Actor._
import akka.event.EventHandler
import akka.dispatch.Promise
import akka.dispatch.Future
import java.net.InetSocketAddress
import java.util.concurrent.atomic.AtomicReference
@ -37,15 +37,23 @@ class ClusterActorRef private[akka] (
def connections: Map[InetSocketAddress, ActorRef] = inetSocketAddressToActorRefMap.get
override def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit =
route(message)(senderOption)
override def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = {
val sender = channel match {
case ref: ActorRef Some(ref)
case _ None
}
route(message)(sender)
}
override def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
override def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: Any,
timeout: Long,
senderOption: Option[ActorRef],
senderFuture: Option[Promise[T]]): Promise[T] = {
route[T](message, timeout)(senderOption).asInstanceOf[Promise[T]]
channel: UntypedChannel): Future[Any] = {
val sender = channel match {
case ref: ActorRef Some(ref)
case _ None
}
route[Any](message, timeout)(sender)
}
private[akka] def failOver(fromInetSocketAddress: InetSocketAddress, toInetSocketAddress: InetSocketAddress) {

View file

@ -79,14 +79,13 @@ class ReplicatedActorRef private[akka] (actorRef: ActorRef, val address: String)
def startLink(actorRef: ActorRef): ActorRef = actorRef.startLink(actorRef)
def supervisor: Option[ActorRef] = actorRef.supervisor
def linkedActors: JMap[Uuid, ActorRef] = actorRef.linkedActors
protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]) {
actorRef.postMessageToMailbox(message, senderOption)
protected[akka] def postMessageToMailbox(message: Any, channel: UntypedChannel) {
actorRef.postMessageToMailbox(message, channel)
}
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: Any,
timeout: Long,
senderOption: Option[ActorRef],
senderFuture: Option[Promise[T]]): Promise[T] = actorRef.postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, senderOption, senderFuture)
channel: UntypedChannel): Future[Any] = actorRef.postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, channel)
protected[akka] def actorInstance: AtomicReference[Actor] = actorRef.actorInstance
protected[akka] def supervisor_=(sup: Option[ActorRef]) {
actorRef.supervisor_=(sup)

View file

@ -94,7 +94,7 @@ case class DurableDispatcher(
override def createMailbox(actorRef: ActorRef): AnyRef = _storage.createFor(actorRef)
private[akka] override def dispatch(invocation: MessageInvocation): Unit = {
if (invocation.senderFuture.isDefined)
if (invocation.channel.isInstanceOf[ActorPromise])
throw new IllegalArgumentException("Durable mailboxes do not support Future-based messages from !! and !!!")
super.dispatch(invocation)
}
@ -131,7 +131,7 @@ case class DurablePinnedDispatcher(
override def createMailbox(actorRef: ActorRef): AnyRef = _storage.createFor(actorRef)
private[akka] override def dispatch(invocation: MessageInvocation): Unit = {
if (invocation.senderFuture.isDefined)
if (invocation.channel.isInstanceOf[ActorPromise])
throw new IllegalArgumentException("Actor has a durable mailbox that does not support !! or !!!")
super.dispatch(invocation)
}

View file

@ -5,7 +5,7 @@ package akka.actor.mailbox
import MailboxProtocol._
import akka.actor.{Actor, ActorRef}
import akka.actor.{Actor, ActorRef, NullChannel}
import akka.dispatch._
import akka.event.EventHandler
import akka.remote.MessageSerializer
@ -48,7 +48,10 @@ abstract class DurableExecutableMailbox(owner: ActorRef) extends MessageQueue wi
val builder = DurableMailboxMessageProtocol.newBuilder
.setOwnerAddress(ownerAddress)
.setMessage(message.toByteString)
if (durableMessage.sender.isDefined) builder.setSenderAddress(durableMessage.sender.get.address)
durableMessage.channel match {
case a : ActorRef => builder.setSenderAddress(a.address)
case _ =>
}
builder.build.toByteArray
}
@ -62,10 +65,14 @@ abstract class DurableExecutableMailbox(owner: ActorRef) extends MessageQueue wi
throw new DurableMailboxException("No actor could be found for address [" + ownerAddress + "], could not deserialize message."))
val sender = if (durableMessage.hasSenderAddress) {
val senderOption = if (durableMessage.hasSenderAddress) {
Actor.registry.actorFor(durableMessage.getSenderAddress)
} else None
val sender = senderOption match {
case Some(ref) => ref
case None => NullChannel
}
new MessageInvocation(owner, message, sender, None)
new MessageInvocation(owner, message, sender)
}
}

View file

@ -4,7 +4,7 @@
package akka.remote.netty
import akka.dispatch.{ DefaultPromise, Promise, Future }
import akka.dispatch.{ ActorPromise, DefaultPromise, Promise, Future }
import akka.remote.{ MessageSerializer, RemoteClientSettings, RemoteServerSettings }
import akka.remote.protocol.RemoteProtocol._
import akka.serialization.RemoteActorSerialization
@ -913,8 +913,7 @@ class RemoteServerHandler(
else actorRef.postMessageToMailboxAndCreateFutureResultWithTimeout(
message,
request.getActorInfo.getTimeout,
None,
Some(new DefaultPromise[Any](request.getActorInfo.getTimeout).
new ActorPromise(request.getActorInfo.getTimeout).
onComplete(_.value.get match {
case l: Left[Throwable, Any] write(channel, createErrorReplyMessage(l.a, request))
case r: Right[Throwable, Any]
@ -931,7 +930,7 @@ class RemoteServerHandler(
if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
write(channel, RemoteEncoder.encode(messageBuilder.build))
})))
}))
}
}

View file

@ -5,7 +5,7 @@ package akka.testkit
import akka.event.EventHandler
import akka.actor.ActorRef
import akka.dispatch.{ MessageDispatcher, MessageInvocation, FutureInvocation }
import akka.dispatch.{ MessageDispatcher, MessageInvocation, FutureInvocation, Promise, ActorPromise }
import java.util.concurrent.locks.ReentrantLock
import java.util.LinkedList
import java.util.concurrent.RejectedExecutionException
@ -140,14 +140,14 @@ class CallingThreadDispatcher(val warnings: Boolean = true) extends MessageDispa
val queue = mbox.queue
val execute = mbox.suspended.ifElseYield {
queue.push(handle)
if (warnings && handle.senderFuture.isDefined) {
if (warnings && handle.channel.isInstanceOf[Promise[_]]) {
EventHandler.warning(this, "suspended, creating Future could deadlock; target: %s" format handle.receiver)
}
false
} {
queue.push(handle)
if (queue.isActive) {
if (warnings && handle.senderFuture.isDefined) {
if (warnings && handle.channel.isInstanceOf[Promise[_]]) {
EventHandler.warning(this, "blocked on this thread, creating Future could deadlock; target: %s" format handle.receiver)
}
false
@ -186,14 +186,18 @@ class CallingThreadDispatcher(val warnings: Boolean = true) extends MessageDispa
if (handle ne null) {
try {
handle.invoke
val f = handle.senderFuture
if (warnings && f.isDefined && !f.get.isCompleted) {
if (warnings) handle.channel match {
case f: ActorPromise if !f.isCompleted
EventHandler.warning(this, "calling %s with message %s did not reply as expected, might deadlock" format (handle.receiver, handle.message))
}
} catch {
case _ queue.leave
case _
}
true
} catch {
case e
EventHandler.error(this, e)
queue.leave
false
}
} else if (queue.isActive) {
queue.leave
false

View file

@ -3,7 +3,7 @@
*/
package akka.testkit
import akka.actor.{ Actor, FSM }
import akka.actor._
import Actor._
import akka.util.Duration
import akka.util.duration._
@ -17,9 +17,19 @@ object TestActor {
case class SetTimeout(d: Duration)
case class SetIgnore(i: Ignore)
trait Message {
def msg: AnyRef
def channel: UntypedChannel
}
case class RealMessage(msg: AnyRef, channel: UntypedChannel) extends Message
case object NullMessage extends Message {
override def msg: AnyRef = throw new IllegalActorStateException("last receive did not dequeue a message")
override def channel: UntypedChannel = throw new IllegalActorStateException("last receive did not dequeue a message")
}
}
class TestActor(queue: BlockingDeque[AnyRef]) extends Actor with FSM[Int, TestActor.Ignore] {
class TestActor(queue: BlockingDeque[TestActor.Message]) extends Actor with FSM[Int, TestActor.Ignore] {
import FSM._
import TestActor._
@ -36,7 +46,7 @@ class TestActor(queue: BlockingDeque[AnyRef]) extends Actor with FSM[Int, TestAc
case Event(x: AnyRef, ign)
val ignore = ign map (z if (z isDefinedAt x) z(x) else false) getOrElse false
if (!ignore) {
queue.offerLast(x)
queue.offerLast(RealMessage(x, self.channel))
}
stay
}
@ -76,19 +86,23 @@ class TestActor(queue: BlockingDeque[AnyRef]) extends Actor with FSM[Int, TestAc
*/
trait TestKit {
private val queue = new LinkedBlockingDeque[AnyRef]()
import TestActor.{ Message, RealMessage, NullMessage }
private val queue = new LinkedBlockingDeque[Message]()
private[akka] var lastMessage: Message = NullMessage
/**
* ActorRef of the test actor. Access is provided to enable e.g.
* registration as message target.
*/
protected val testActor = actorOf(new TestActor(queue)).start()
implicit val testActor = actorOf(new TestActor(queue)).start()
/**
* Implicit sender reference so that replies are possible for messages sent
* from the test class.
*/
protected implicit val senderOption = Some(testActor)
@deprecated("will be removed after 1.2, replaced by implicit testActor", "1.2")
val senderOption = Some(testActor)
private var end: Duration = Duration.Inf
/*
@ -183,6 +197,14 @@ trait TestKit {
*/
def within[T](max: Duration)(f: T): T = within(0 seconds, max)(f)
/**
* Send reply to the last dequeued message. Will throw
* IllegalActorStateException if no message has been dequeued, yet. Dequeuing
* means reception of the message as part of an expect... or receive... call,
* not reception by the testActor.
*/
def reply(msg: AnyRef) { lastMessage.channel ! msg }
/**
* Same as `expectMsg`, but takes the maximum wait time from the innermost
* enclosing `within` block.
@ -396,16 +418,21 @@ trait TestKit {
*/
def receiveWhile[T](max: Duration)(f: PartialFunction[AnyRef, T]): Seq[T] = {
val stop = now + max
var msg: Message = NullMessage
@tailrec
def doit(acc: List[T]): List[T] = {
receiveOne(stop - now) match {
case null
receiveOne(stop - now)
lastMessage match {
case NullMessage
lastMessage = msg
acc.reverse
case o if (f isDefinedAt o)
case RealMessage(o, _) if (f isDefinedAt o)
msg = lastMessage
doit(f(o) :: acc)
case o
queue.offerFirst(o)
case RealMessage(o, _)
queue.offerFirst(lastMessage)
lastMessage = msg
acc.reverse
}
}
@ -415,7 +442,10 @@ trait TestKit {
ret
}
private def receiveN(n: Int, stop: Duration): Seq[AnyRef] = {
/**
* Receive N messages in a row before the given deadline.
*/
def receiveN(n: Int, stop: Duration): Seq[AnyRef] = {
for { x 1 to n } yield {
val timeout = stop - now
val o = receiveOne(timeout)
@ -424,7 +454,12 @@ trait TestKit {
}
}
private def receiveOne(max: Duration): AnyRef = {
/**
* Receive one message from the internal queue of the TestActor. If the given
* duration is zero, the queue is polled (non-blocking).
*/
def receiveOne(max: Duration): AnyRef = {
val message =
if (max == 0.seconds) {
queue.pollFirst
} else if (max.finite_?) {
@ -432,9 +467,52 @@ trait TestKit {
} else {
queue.takeFirst
}
message match {
case null
lastMessage = NullMessage
null
case RealMessage(msg, _)
lastMessage = message
msg
}
}
private def format(u: TimeUnit, d: Duration) = "%.3f %s".format(d.toUnit(u), u.toString.toLowerCase)
}
// vim: set ts=2 sw=2 et:
/**
* TestKit-based probe which allows sending, reception and reply.
*/
class TestProbe extends TestKit {
/**
* Shorthand to get the testActor.
*/
def ref = testActor
/**
* Send message to an actor while using the probe's TestActor as the sender.
* Replies will be available for inspection with all of TestKit's assertion
* methods.
*/
def send(actor: ActorRef, msg: AnyRef) = {
actor ! msg
}
/**
* Forward this message as if in the TestActor's receive method with self.forward.
*/
def forward(actor: ActorRef, msg: AnyRef = lastMessage.msg) {
actor.!(msg)(lastMessage.channel)
}
/**
* Get channel of last received message.
*/
def channel = lastMessage.channel
}
object TestProbe {
def apply() = new TestProbe
}

View file

@ -190,7 +190,7 @@ class TestActorRefSpec extends WordSpec with MustMatchers with BeforeAndAfterEac
"support futures" in {
val a = TestActorRef[WorkerActor].start()
val f: Future[String] = a !!! "work"
val f = a ? "work" mapTo manifest[String]
f must be('completed)
f.get must equal("workDone")
}
@ -239,9 +239,8 @@ class TestActorRefSpec extends WordSpec with MustMatchers with BeforeAndAfterEac
intercept[IllegalActorStateException] { ref("work") }
val ch = Promise.channel()
ref ! ch
val f = ch.promise
f must be('completed)
f.get must be("complexReply")
ch must be('completed)
ch.get must be("complexReply")
}
}

View file

@ -0,0 +1,45 @@
package akka.testkit
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import org.scalatest.{ BeforeAndAfterEach, WordSpec }
import akka.actor._
import akka.config.Supervision.OneForOneStrategy
import akka.event.EventHandler
import akka.dispatch.Future
import akka.util.duration._
class TestProbeSpec extends WordSpec with MustMatchers {
"A TestProbe" must {
"reply to futures" in {
val tk = TestProbe()
val future = tk.ref ? "hello"
tk.expectMsg(0 millis, "hello") // TestActor runs on CallingThreadDispatcher
tk.reply("world")
future must be('completed)
future.get must equal("world")
}
"reply to messages" in {
val tk1 = TestProbe()
val tk2 = TestProbe()
tk1.ref.!("hello")(tk2.ref)
tk1.expectMsg(0 millis, "hello")
tk1.reply("world")
tk2.expectMsg(0 millis, "world")
}
"properly send and reply to messages" in {
val probe1 = TestProbe()
val probe2 = TestProbe()
probe1.send(probe2.ref, "hello")
probe2.expectMsg(0 millis, "hello")
probe2.reply("world")
probe1.expectMsg(0 millis, "world")
}
}
}

View file

@ -184,10 +184,10 @@ public class Pi {
// send calculate message
long timeout = 60000;
Future<Double> replyFuture = master.sendRequestReplyFuture(new Calculate(), timeout, null);
Option<Double> result = replyFuture.await().resultOrException();
Future<Object> replyFuture = master.sendRequestReplyFuture(new Calculate(), timeout, null);
Option<Object> result = replyFuture.await().resultOrException();
if (result.isDefined()) {
double pi = result.get();
double pi = (Double) result.get();
// TODO java api for EventHandler?
// EventHandler.info(this, String.format("\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis", pi, (currentTimeMillis() - start)));
System.out.println(String.format("\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis", pi, (currentTimeMillis() - start)));