From 7712c206205b85d8c90621315a161d292de09cd2 Mon Sep 17 00:00:00 2001 From: Roland Date: Mon, 13 Jun 2011 22:36:46 +0200 Subject: [PATCH] unify sender/senderFuture into channel (++) (squashed merge from the various bits and pieces already part of release-1.2, everything related to Channel & Future) --- .../ActorFireForgetRequestReplySpec.scala | 2 +- .../akka/actor/actor/ActorTimeoutSpec.scala | 55 +++++ .../test/scala/akka/actor/actor/Bench.scala | 2 +- .../akka/dispatch/MailboxConfigSpec.scala | 6 +- .../test/scala/akka/routing/RoutingSpec.scala | 4 +- .../src/main/scala/akka/actor/Actor.scala | 35 ++- .../src/main/scala/akka/actor/ActorRef.scala | 214 ++++++++---------- .../src/main/scala/akka/actor/Channel.scala | 173 ++++++++++++++ .../src/main/scala/akka/actor/FSM.scala | 5 +- .../main/scala/akka/actor/TypedActor.scala | 14 +- .../akka/dispatch/BalancingDispatcher.scala | 5 +- .../main/scala/akka/dispatch/Dispatcher.scala | 5 +- .../src/main/scala/akka/dispatch/Future.scala | 101 +++++++-- .../scala/akka/dispatch/MessageHandling.scala | 7 +- .../src/main/scala/akka/routing/Pool.scala | 3 +- .../src/main/scala/akka/util/BoxedType.scala | 25 ++ .../src/main/scala/akka/camel/Producer.scala | 18 +- .../akka/camel/component/ActorComponent.scala | 4 +- .../scala/akka/cluster/ClusterActorRef.scala | 22 +- .../akka/cluster/ReplicatedClusterRef.scala | 9 +- .../actor/mailbox/DurableDispatcher.scala | 4 +- .../akka/actor/mailbox/DurableMailbox.scala | 15 +- .../remote/netty/NettyRemoteSupport.scala | 7 +- .../testkit/CallingThreadDispatcher.scala | 20 +- .../src/main/scala/akka/testkit/TestKit.scala | 118 ++++++++-- .../scala/akka/testkit/TestActorRefSpec.scala | 7 +- .../scala/akka/testkit/TestProbeSpec.scala | 45 ++++ .../java/akka/tutorial/java/second/Pi.java | 6 +- 28 files changed, 679 insertions(+), 252 deletions(-) create mode 100644 akka-actor-tests/src/test/scala/akka/actor/actor/ActorTimeoutSpec.scala create mode 100644 akka-actor/src/main/scala/akka/actor/Channel.scala create mode 100644 akka-actor/src/main/scala/akka/util/BoxedType.scala create mode 100644 akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorFireForgetRequestReplySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorFireForgetRequestReplySpec.scala index dd0cb87990..3f2f8e57db 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorFireForgetRequestReplySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorFireForgetRequestReplySpec.scala @@ -23,7 +23,7 @@ object ActorFireForgetRequestReplySpec { case "Send" ⇒ self.reply("Reply") case "SendImplicit" ⇒ - self.sender.get ! "ReplyImplicit" + self.channel ! "ReplyImplicit" } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorTimeoutSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorTimeoutSpec.scala new file mode 100644 index 0000000000..b42ac75bcb --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorTimeoutSpec.scala @@ -0,0 +1,55 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ +package akka.actor + +import org.scalatest.{ WordSpec, BeforeAndAfterAll } +import org.scalatest.matchers.MustMatchers +import akka.testkit.TestKit +import akka.dispatch.FutureTimeoutException +import akka.util.duration._ + +class ActorTimeoutSpec + extends WordSpec + with BeforeAndAfterAll + with MustMatchers + with TestKit { + + val echo = Actor.actorOf(new Actor { + def receive = { + case x ⇒ + } + }).start() + + val testTimeout = if (Actor.defaultTimeout.duration < 400.millis) 500 millis else 100 millis + + override def afterAll { echo.stop() } + + "An Actor-based Future" must { + + "use the global default timeout if no implicit in scope" in { + echo.timeout = 12 + within((Actor.TIMEOUT - 100).millis, (Actor.TIMEOUT + 300).millis) { + val f = echo ? "hallo" + intercept[FutureTimeoutException] { f.await } + } + } + + "use implicitly supplied timeout" in { + implicit val timeout = Actor.Timeout(testTimeout) + within(testTimeout - 100.millis, testTimeout + 300.millis) { + val f = (echo ? "hallo").mapTo[String] + intercept[FutureTimeoutException] { f.await } + f.value must be(None) + } + } + + "use explicitly supplied timeout" in { + within(testTimeout - 100.millis, testTimeout + 300.millis) { + (echo.?("hallo")(timeout = testTimeout)).as[String] must be(None) + } + } + + } + +} diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/Bench.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/Bench.scala index 258809c5b6..315798cc19 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/Bench.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/Bench.scala @@ -102,7 +102,7 @@ object Chameneos { } } else { waitingChameneo.foreach(_ ! Exit) - self.sender.get ! Exit + self.channel ! Exit } } } diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index e71ca14721..678cbd2e86 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -4,7 +4,7 @@ import org.scalatest.matchers.MustMatchers import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } import org.scalatest.junit.JUnitRunner import org.junit.runner.RunWith -import akka.actor.{ Actor, ActorRegistry } +import akka.actor.{ Actor, ActorRegistry, NullChannel } import akka.actor.Actor.{ actorOf } import java.util.concurrent.{ TimeUnit, CountDownLatch, BlockingQueue } import java.util.{ Queue } @@ -84,7 +84,7 @@ abstract class MailboxSpec extends WordSpec with MustMatchers with BeforeAndAfte new MessageInvocation( actorOf(new Actor { //Dummy actor def receive = { case _ ⇒ } - }), msg, None, None) + }), msg, NullChannel) } def ensureInitialMailboxState(config: MailboxType, q: MessageQueue) { @@ -158,4 +158,4 @@ class PriorityMailboxSpec extends MailboxSpec { case UnboundedMailbox() ⇒ new UnboundedPriorityMessageQueue(comparator) case BoundedMailbox(capacity, pushTimeOut) ⇒ new BoundedPriorityMessageQueue(capacity, pushTimeOut, comparator) } -} \ No newline at end of file +} diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index ddd37f760c..de8cbae96d 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -193,11 +193,11 @@ class RoutingSpec extends WordSpec with MustMatchers { }).start() val successes = TestLatch(2) - val successCounter = Some(actorOf(new Actor { + val successCounter = actorOf(new Actor { def receive = { case "success" ⇒ successes.countDown() } - }).start()) + }).start() implicit val replyTo = successCounter pool ! "a" diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index dd0642893b..50e6be2011 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -24,6 +24,7 @@ import scala.reflect.BeanProperty import com.eaio.uuid.UUID import java.lang.reflect.InvocationTargetException +import java.util.concurrent.TimeUnit /** * Life-cycle messages for the Actors @@ -110,9 +111,6 @@ object Status { */ object Actor extends ListenerManagement { - private[akka] val TIMEOUT = Duration(config.getInt("akka.actor.timeout", 5), TIME_UNIT).toMillis - private[akka] val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false) - /** * A Receive is a convenience type that defines actor message behavior currently modeled as * a PartialFunction[Any, Unit]. @@ -140,6 +138,20 @@ object Actor extends ListenerManagement { override def initialValue = Stack[ActorRef]() } + case class Timeout(duration: Duration) { + def this(timeout: Long) = this(Duration(timeout, TimeUnit.MILLISECONDS)) + def this(length: Long, unit: TimeUnit) = this(Duration(length, unit)) + } + object Timeout { + def apply(timeout: Long) = new Timeout(timeout) + def apply(length: Long, unit: TimeUnit) = new Timeout(length, unit) + implicit def durationToTimeout(duration: Duration) = new Timeout(duration) + } + + private[akka] val TIMEOUT = Duration(config.getInt("akka.actor.timeout", 5), TIME_UNIT).toMillis + val defaultTimeout = Timeout(TIMEOUT) + private[akka] val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false) + /** * Handle to the ActorRegistry. */ @@ -495,14 +507,14 @@ trait Actor { */ type Receive = Actor.Receive - /* + /** * Some[ActorRef] representation of the 'self' ActorRef reference. *

* Mainly for internal use, functions as the implicit sender references when invoking * the 'forward' function. */ @transient - implicit val someSelf: Some[ActorRef] = { + val someSelf: Some[ActorRef] = { val refStack = Actor.actorRefInCreation.get if (refStack.isEmpty) throw new ActorInitializationException( "ActorRef for instance of actor [" + getClass.getName + "] is not in scope." + @@ -528,7 +540,7 @@ trait Actor { * Mainly for internal use, functions as the implicit sender references when invoking * one of the message send functions ('!', '!!' and '!!!'). */ - implicit def optionSelf: Option[ActorRef] = someSelf + def optionSelf: Option[ActorRef] = someSelf /** * The 'self' field holds the ActorRef for this actor. @@ -558,7 +570,7 @@ trait Actor { * */ @transient - val self: ScalaActorRef = someSelf.get + implicit val self: ScalaActorRef = someSelf.get /** * User overridable callback/setting. @@ -645,8 +657,7 @@ trait Actor { private[akka] final def apply(msg: Any) = { if (msg.isInstanceOf[AnyRef] && (msg.asInstanceOf[AnyRef] eq null)) - throw new InvalidMessageException("Message from [" + self.sender + "] to [" + self.toString + "] is null") - + throw new InvalidMessageException("Message from [" + self.channel + "] to [" + self.toString + "] is null") val behaviorStack = self.hotswap msg match { @@ -675,9 +686,9 @@ trait Actor { case Restart(reason) ⇒ throw reason case Kill ⇒ throw new ActorKilledException("Kill") case PoisonPill ⇒ - val f = self.senderFuture() + val ch = self.channel self.stop() - if (f.isDefined) f.get.completeWithException(new ActorKilledException("PoisonPill")) + ch.sendException(new ActorKilledException("PoisonPill")) } } @@ -697,4 +708,4 @@ private[actor] class AnyOptionAsTypedOption(anyOption: Option[Any]) { * ClassCastException and return None in that case. */ def asSilently[T: Manifest]: Option[T] = narrowSilently[T](anyOption) -} \ No newline at end of file +} diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 9cb2c14513..f05fc7fa6b 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -33,27 +33,6 @@ private[akka] object ActorRefInternals { object SHUTDOWN extends StatusType } -/** - * Abstraction for unification of sender and senderFuture for later reply. - * Can be stored away and used at a later point in time. - */ -abstract class Channel[T] { - - /** - * Scala API.

- * Sends the specified message to the channel. - */ - def !(msg: T) - - /** - * Java API.

- * Sends the specified message to the channel. - */ - def sendOneWay(msg: T) { - this.!(msg) - } -} - /** * ActorRef is an immutable and serializable handle to an Actor. *

@@ -86,7 +65,7 @@ abstract class Channel[T] { * * @author Jonas Bonér */ -trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] with Serializable { scalaRef: ScalaActorRef ⇒ +trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Comparable[ActorRef] with Serializable { scalaRef: ScalaActorRef ⇒ // Only mutable for RemoteServer in order to maintain identity across nodes @volatile protected[akka] var _uuid = newUuid @@ -241,40 +220,18 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] with S _uuid = uid } - /** - * Akka Java API.

- * Sends a one-way asynchronous message. E.g. fire-and-forget semantics. - *

- *

-   * actor.sendOneWay(message);
-   * 
- *

- */ - def sendOneWay(message: AnyRef): Unit = { - sendOneWay(message, null) - } - - /** - * Akka Java API.

- * Sends a one-way asynchronous message. E.g. fire-and-forget semantics. - *

- * Allows you to pass along the sender of the message. - *

- *

-   * actor.sendOneWay(message, context);
-   * 
- *

- */ - def sendOneWay(message: AnyRef, sender: ActorRef) { - this.!(message)(Option(sender)) - } - /** * Akka Java API.

* @see sendRequestReply(message: AnyRef, timeout: Long, sender: ActorRef) * Uses the default timeout of the Actor (setTimeout()) and omits the sender reference */ - def sendRequestReply(message: AnyRef): AnyRef = sendRequestReply(message, timeout, null) + def sendRequestReply(message: AnyRef): AnyRef = { + !!(message, timeout).getOrElse(throw new ActorTimeoutException( + "Message [" + message + + "]\n\tfrom [nowhere]\n\twith timeout [" + timeout + + "]\n\ttimed out.")) + .asInstanceOf[AnyRef] + } /** * Akka Java API.

@@ -298,7 +255,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] with S * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ def sendRequestReply(message: AnyRef, timeout: Long, sender: ActorRef): AnyRef = { - !!(message, timeout)(Option(sender)).getOrElse(throw new ActorTimeoutException( + ?(message)(sender, Actor.Timeout(timeout)).as[AnyRef].getOrElse(throw new ActorTimeoutException( "Message [" + message + "]\n\tfrom [" + (if (sender ne null) sender.address else "nowhere") + "]\n\twith timeout [" + timeout + @@ -311,14 +268,14 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] with S * @see sendRequestReplyFuture(message: AnyRef, sender: ActorRef): Future[_] * Uses the Actors default timeout (setTimeout()) and omits the sender */ - def sendRequestReplyFuture[T <: AnyRef](message: AnyRef): Future[T] = sendRequestReplyFuture(message, timeout, null).asInstanceOf[Future[T]] + def sendRequestReplyFuture(message: AnyRef): Future[Any] = ?(message) /** * Akka Java API.

* @see sendRequestReplyFuture(message: AnyRef, sender: ActorRef): Future[_] * Uses the Actors default timeout (setTimeout()) */ - def sendRequestReplyFuture[T <: AnyRef](message: AnyRef, sender: ActorRef): Future[T] = sendRequestReplyFuture(message, timeout, sender).asInstanceOf[Future[T]] + def sendRequestReplyFuture(message: AnyRef, sender: ActorRef): Future[Any] = ?(message)(sender) /** * Akka Java API.

@@ -331,7 +288,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] with S * If you are sending messages using sendRequestReplyFuture then you have to use getContext().reply(..) * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ - def sendRequestReplyFuture[T <: AnyRef](message: AnyRef, timeout: Long, sender: ActorRef): Future[T] = !!!(message, timeout)(Option(sender)).asInstanceOf[Future[T]] + def sendRequestReplyFuture(message: AnyRef, timeout: Long, sender: ActorRef): Future[Any] = ?(message)(sender, Actor.Timeout(timeout)) /** * Akka Java API.

@@ -339,7 +296,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] with S */ def forward(message: AnyRef, sender: ActorRef) { if (sender eq null) throw new IllegalArgumentException("The 'sender' argument to 'forward' can't be null") - else forward(message)(Some(sender)) + else forward(message)(sender) } /** @@ -448,36 +405,36 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] with S /** * Abstraction for unification of sender and senderFuture for later reply */ - def channel: Channel[Any] = { - if (senderFuture.isDefined) { - new Channel[Any] { - val future = senderFuture.get - def !(msg: Any) = future completeWithResult msg - } - } else if (sender.isDefined) { - val someSelf = Some(this) - new Channel[Any] { - val client = sender.get - def !(msg: Any) = client.!(msg)(someSelf) - } - } else throw new IllegalActorStateException("No channel available") + def channel: UntypedChannel = { + val msg = currentMessage + if (msg ne null) msg.channel + else NullChannel } + /* + * Implementation of ForwardableChannel + */ + + def sendException(ex: Throwable) {} + def isUsableOnlyOnce = false + def isUsable = true + def isReplyable = true + def canSendException = false + /** * Java API.

* Abstraction for unification of sender and senderFuture for later reply */ - def getChannel: Channel[Any] = channel + def getChannel: UntypedChannel = channel protected[akka] def invoke(messageHandle: MessageInvocation) - protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]) + protected[akka] def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit - protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( + protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout( message: Any, timeout: Long, - senderOption: Option[ActorRef], - senderFuture: Option[Promise[T]]): Promise[T] + channel: UntypedChannel): Future[Any] protected[akka] def actorInstance: AtomicReference[Actor] @@ -692,18 +649,19 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor, _supervisor = sup } - protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]) { - dispatcher dispatchMessage MessageInvocation(this, message, senderOption, None) - } + protected[akka] def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = + dispatcher dispatchMessage new MessageInvocation(this, message, channel) - protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( + protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout( message: Any, timeout: Long, - senderOption: Option[ActorRef], - senderFuture: Option[Promise[T]]): Promise[T] = { - val future = if (senderFuture.isDefined) senderFuture else Some(new DefaultPromise[T](timeout)) - dispatcher dispatchMessage MessageInvocation(this, message, senderOption, future.asInstanceOf[Some[Promise[Any]]]) - future.get + channel: UntypedChannel): Future[Any] = { + val future = channel match { + case f: ActorPromise ⇒ f + case _ ⇒ new ActorPromise(timeout) + } + dispatcher dispatchMessage new MessageInvocation(this, message, future) + future } /** @@ -888,7 +846,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor, //Prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) - senderFuture.foreach(_.completeWithException(reason)) + channel.sendException(reason) if (supervisor.isDefined) notifySupervisorWithMessage(Exit(this, reason)) else { @@ -995,19 +953,28 @@ private[akka] case class RemoteActorRef private[akka] ( start() - def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]) { - Actor.remote.send[Any](message, senderOption, None, remoteAddress, timeout, true, this, loader) + def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = { + val chSender = channel match { + case ref: ActorRef ⇒ Some(ref) + case _ ⇒ None + } + Actor.remote.send[Any](message, chSender, None, remoteAddress, timeout, true, this, loader) } - def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( + def postMessageToMailboxAndCreateFutureResultWithTimeout( message: Any, timeout: Long, - senderOption: Option[ActorRef], - senderFuture: Option[Promise[T]]): Promise[T] = { - val future = Actor.remote.send[T]( - message, senderOption, senderFuture, - remoteAddress, timeout, false, this, loader) - if (future.isDefined) future.get + channel: UntypedChannel): Future[Any] = { + val chSender = channel match { + case ref: ActorRef ⇒ Some(ref) + case _ ⇒ None + } + val chFuture = channel match { + case f: Promise[Any] ⇒ Some(f) + case _ ⇒ None + } + val future = Actor.remote.send[Any](message, chSender, chFuture, remoteAddress, timeout, false, this, loader) + if (future.isDefined) ActorPromise(future.get) else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString) } @@ -1096,7 +1063,7 @@ trait ActorRefShared { * There are implicit conversions in ../actor/Implicits.scala * from ActorRef -> ScalaActorRef and back */ -trait ScalaActorRef extends ActorRefShared { ref: ActorRef ⇒ +trait ScalaActorRef extends ActorRefShared with ForwardableChannel { ref: ActorRef ⇒ /** * Address for actor, must be a unique one. @@ -1134,20 +1101,28 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef ⇒ * The reference sender Actor of the last received message. * Is defined if the message was sent from another Actor, else None. */ + @deprecated("will be removed in 2.0, use channel instead", "1.2") def sender: Option[ActorRef] = { val msg = currentMessage if (msg eq null) None - else msg.sender + else msg.channel match { + case ref: ActorRef ⇒ Some(ref) + case _ ⇒ None + } } /** * The reference sender future of the last received message. * Is defined if the message was sent with sent with '!!' or '!!!', else None. */ + @deprecated("will be removed in 2.0, use channel instead", "1.2") def senderFuture(): Option[Promise[Any]] = { val msg = currentMessage if (msg eq null) None - else msg.senderFuture + else msg.channel match { + case f: ActorPromise ⇒ Some(f) + case _ ⇒ None + } } /** @@ -1164,8 +1139,8 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef ⇒ * *

*/ - def !(message: Any)(implicit sender: Option[ActorRef] = None) { - if (isRunning) postMessageToMailbox(message, sender) + def !(message: Any)(implicit channel: UntypedChannel = NullChannel): Unit = { + if (isRunning) postMessageToMailbox(message, channel) else throw new ActorInitializationException( "Actor has not been started, you need to invoke 'actor.start()' before using it") } @@ -1182,9 +1157,10 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef ⇒ * If you are sending messages using !! then you have to use self.reply(..) * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ - def !!(message: Any, timeout: Long = this.timeout)(implicit sender: Option[ActorRef] = None): Option[Any] = { + @deprecated("use `(actor ? msg).as[T]` instead", "1.2") + def !!(message: Any, timeout: Long = this.timeout)(implicit channel: UntypedChannel = NullChannel): Option[Any] = { if (isRunning) { - val future = postMessageToMailboxAndCreateFutureResultWithTimeout[Any](message, timeout, sender, None) + val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, channel) try { future.await.resultOrException } catch { case e: FutureTimeoutException ⇒ None } } else throw new ActorInitializationException( @@ -1200,8 +1176,15 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef ⇒ * If you are sending messages using !!! then you have to use self.reply(..) * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ - def !!![T](message: Any, timeout: Long = this.timeout)(implicit sender: Option[ActorRef] = None): Future[T] = { - if (isRunning) postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, sender, None) + @deprecated("return type is an illusion, use the more honest ? method", "1.2") + def !!![T](message: Any, timeout: Long = this.timeout)(implicit channel: UntypedChannel = NullChannel): Future[T] = + this.?(message)(channel, Actor.Timeout(timeout)).asInstanceOf[Future[T]] + + /** + * Sends a message asynchronously, returning a future which may eventually hold the reply. + */ + def ?(message: Any)(implicit channel: UntypedChannel = NullChannel, timeout: Actor.Timeout = Actor.defaultTimeout): Future[Any] = { + if (isRunning) postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout.duration.toMillis, channel) else throw new ActorInitializationException( "Actor has not been started, you need to invoke 'actor.start()' before using it") } @@ -1211,12 +1194,9 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef ⇒ *

* Works with '!', '!!' and '!!!'. */ - def forward(message: Any)(implicit sender: Some[ActorRef]) = { + def forward(message: Any)(implicit channel: ForwardableChannel) = { if (isRunning) { - if (sender.get.senderFuture.isDefined) - postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, sender.get.sender, sender.get.senderFuture) - else - postMessageToMailbox(message, sender.get.sender) + postMessageToMailbox(message, channel.channel) } else throw new ActorInitializationException("Actor has not been started, you need to invoke 'actor.start()' before using it") } @@ -1226,14 +1206,7 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef ⇒ *

* Throws an IllegalStateException if unable to determine what to reply to. */ - def reply(message: Any) { - if (!reply_?(message)) throw new IllegalActorStateException( - "\n\tNo sender in scope, can't reply. " + - "\n\tYou have probably: " + - "\n\t\t1. Sent a message to an Actor from an instance that is NOT an Actor." + - "\n\t\t2. Invoked a method on an TypedActor from an instance NOT an TypedActor." + - "\n\tElse you might want to use 'reply_?' which returns Boolean(true) if succes and Boolean(false) if no sender in scope") - } + def reply(message: Any) = channel.!(message)(this) /** * Use reply_?(..) to reply with a message to the original sender of the message currently @@ -1241,16 +1214,7 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef ⇒ *

* Returns true if reply was sent, and false if unable to determine what to reply to. */ - def reply_?(message: Any): Boolean = { - if (senderFuture.isDefined) { - senderFuture.get completeWithResult message - true - } else if (sender.isDefined) { - //TODO: optimize away this allocation, perhaps by having implicit self: Option[ActorRef] in signature - sender.get.!(message)(Some(this)) - true - } else false - } + def reply_?(message: Any): Boolean = channel.safe_!(message)(this) } case class SerializedActorRef(val uuid: Uuid, diff --git a/akka-actor/src/main/scala/akka/actor/Channel.scala b/akka-actor/src/main/scala/akka/actor/Channel.scala new file mode 100644 index 0000000000..80134f2df8 --- /dev/null +++ b/akka-actor/src/main/scala/akka/actor/Channel.scala @@ -0,0 +1,173 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +package akka.actor + +/** + * Abstraction for unification of sender and senderFuture for later reply. + * Can be stored away and used at a later point in time. + * + * Channel cannot be contravariant because of Future providing its value in + * covariant position. + * + * The possible reply channel which can be passed into ! and safe_! is always + * untyped, as there is no way to utilize its real static type without + * requiring runtime-costly manifests. + */ +trait Channel[T] { + + /** + * Scala API.

+ * Sends the specified message to the channel. + */ + def !(msg: T)(implicit channel: UntypedChannel = NullChannel): Unit + + /** + * Try to send an exception. Not all channel types support this, one notable + * positive example is Future. Failure to send is silent. + */ + def sendException(ex: Throwable): Unit + + /** + * Scala API.

+ * Try to send message to the channel, return whether successful. + */ + def safe_!(msg: T)(implicit channel: UntypedChannel = NullChannel): Boolean + + /** + * Indicates whether this channel may be used only once, e.g. a Future. + */ + def isUsableOnlyOnce: Boolean + + /** + * Indicates whether this channel may still be used (only useful if + * isUsableOnlyOnce returns true). + */ + def isUsable: Boolean + + /** + * Indicates whether this channel carries reply information, e.g. an + * ActorRef. + */ + def isReplyable: Boolean + + /** + * Indicates whether this channel is capable of sending exceptions to its + * recipient. + */ + def canSendException: Boolean + + /** + * Java API.

+ * Sends the specified message to the channel, i.e. fire-and-forget semantics.

+ *

+   * actor.sendOneWay(message);
+   * 
+ */ + def sendOneWay(msg: T): Unit = this.!(msg) + + /** + * Java API.

+ * Sends the specified message to the channel, i.e. fire-and-forget + * semantics, including the sender reference if possible (not supported on + * all channels).

+ *

+   * actor.sendOneWay(message, context);
+   * 
+ */ + def sendOneWay(msg: T, sender: UntypedChannel): Unit = this.!(msg)(sender) + + /** + * Java API.

+ * Try to send the specified message to the channel, i.e. fire-and-forget semantics.

+ *

+   * actor.sendOneWay(message);
+   * 
+ */ + def sendOneWaySafe(msg: T): Boolean = this.safe_!(msg) + + /** + * Java API.

+ * Try to send the specified message to the channel, i.e. fire-and-forget + * semantics, including the sender reference if possible (not supported on + * all channels).

+ *

+   * actor.sendOneWay(message, context);
+   * 
+ */ + def sendOneWaySafe(msg: T, sender: UntypedChannel): Boolean = this.safe_!(msg)(sender) + +} + +/** + * This trait represents a channel that a priori does have sending capability, + * i.e. ! is not guaranteed to fail (e.g. NullChannel would be a + * counter-example). + */ +trait AvailableChannel[T] { self: Channel[T] ⇒ + def safe_!(msg: T)(implicit channel: UntypedChannel = NullChannel): Boolean = { + if (isUsable) { + try { + this ! msg + true + } catch { + case _ ⇒ false + } + } else false + } +} + +/** + * All channels used in conjunction with MessageInvocation are untyped by + * design, so make this explicit. + */ +trait UntypedChannel extends Channel[Any] + +object UntypedChannel { + implicit def senderOption2Channel(sender: Option[ActorRef]): UntypedChannel = + sender match { + case Some(actor) ⇒ actor + case None ⇒ NullChannel + } +} + +/** + * Default channel when none available. + */ +case object NullChannel extends UntypedChannel { + def !(msg: Any)(implicit channel: UntypedChannel = NullChannel) { + throw new IllegalActorStateException(""" + No sender in scope, can't reply. + You have probably: + 1. Sent a message to an Actor from an instance that is NOT an Actor. + 2. Invoked a method on an TypedActor from an instance NOT an TypedActor. + You may want to have a look at safe_! for a variant returning a Boolean""") + } + def safe_!(msg: Any)(implicit channel: UntypedChannel = NullChannel): Boolean = false + def sendException(ex: Throwable) {} + def isUsableOnlyOnce = false + def isUsable = false + def isReplyable = false + def canSendException = false +} + +/** + * A channel which may be forwarded: a message received with such a reply + * channel attached can be passed on transparently such that a reply from a + * later processing stage is sent directly back to the origin. Keep in mind + * that not all channels can be used multiple times. + */ +trait ForwardableChannel extends UntypedChannel with AvailableChannel[Any] { + /** + * Get channel by which this channel would reply (ActorRef.forward takes an + * implicit ForwardableChannel and uses its .channel as message origin) + */ + def channel: UntypedChannel +} + +object ForwardableChannel { + implicit def someS2FC(sender: Some[ActorRef]): ForwardableChannel = sender.get + implicit def someIS2FC(implicit sender: Some[ActorRef]): ForwardableChannel = sender.get +} + diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala index 60e2b10c67..8317fc5fe3 100644 --- a/akka-actor/src/main/scala/akka/actor/FSM.scala +++ b/akka-actor/src/main/scala/akka/actor/FSM.scala @@ -494,10 +494,7 @@ trait FSM[S, D] extends ListenerManagement { * @return this state transition descriptor */ def replying(replyValue: Any): State = { - self.sender match { - case Some(sender) ⇒ sender ! replyValue - case None ⇒ - } + self.channel safe_! replyValue this } diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 85a388e24e..4e97855026 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -6,7 +6,7 @@ package akka.actor import akka.japi.{ Creator, Option ⇒ JOption } import akka.actor.Actor.{ actorOf, futureToAnyOptionAsTypedOption } -import akka.dispatch.{ MessageDispatcher, Dispatchers, Future } +import akka.dispatch.{ MessageDispatcher, Dispatchers, Future, FutureTimeoutException } import java.lang.reflect.{ InvocationTargetException, Method, InvocationHandler, Proxy } import akka.util.{ Duration } import java.util.concurrent.atomic.{ AtomicReference ⇒ AtomVar } @@ -41,6 +41,7 @@ object TypedActor { case "equals" ⇒ (args.length == 1 && (proxy eq args(0)) || actor == getActorRefFor(args(0))).asInstanceOf[AnyRef] //Force boxing of the boolean case "hashCode" ⇒ actor.hashCode.asInstanceOf[AnyRef] case _ ⇒ + implicit val timeout = Actor.Timeout(actor.timeout) MethodCall(method, args) match { case m if m.isOneWay ⇒ actor ! m @@ -48,9 +49,12 @@ object TypedActor { case m if m.returnsFuture_? ⇒ actor !!! m case m if m.returnsJOption_? || m.returnsOption_? ⇒ - (actor !!! m).as[AnyRef] match { - case Some(null) | None ⇒ if (m.returnsJOption_?) JOption.none[Any] else None - case Some(joption) ⇒ joption + val f = actor ? m + try { f.await } catch { case _: FutureTimeoutException ⇒ } + f.value match { + case None | Some(Right(null)) ⇒ if (m.returnsJOption_?) JOption.none[Any] else None + case Some(Right(joption: AnyRef)) ⇒ joption + case Some(Left(ex)) ⇒ throw ex } case m ⇒ (actor !!! m).get @@ -164,4 +168,4 @@ object TypedActor { } private[akka] def extractInterfaces(clazz: Class[_]): Array[Class[_]] = if (clazz.isInterface) Array[Class[_]](clazz) else clazz.getInterfaces -} \ No newline at end of file +} diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index 0460720a73..98b7465e5a 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -129,10 +129,7 @@ class BalancingDispatcher( */ protected def donate(organ: MessageInvocation, recipient: ActorRef): Boolean = { if (organ ne null) { - if (organ.senderFuture.isDefined) recipient.postMessageToMailboxAndCreateFutureResultWithTimeout[Any]( - organ.message, recipient.timeout, organ.sender, organ.senderFuture) - else if (organ.sender.isDefined) recipient.postMessageToMailbox(organ.message, organ.sender) - else recipient.postMessageToMailbox(organ.message, None) + recipient.postMessageToMailbox(organ.message, organ.channel) true } else false } diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index bf02af5997..9e7cb8e754 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -164,10 +164,7 @@ class Dispatcher( var invocation = m.dequeue lazy val exception = new ActorKilledException("Actor has been stopped") while (invocation ne null) { - val f = invocation.senderFuture - if (f.isDefined) - f.get.completeWithException(exception) - + invocation.channel.sendException(exception) invocation = m.dequeue } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index e760f97bb7..4e8e908049 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -6,8 +6,8 @@ package akka.dispatch import akka.AkkaException import akka.event.EventHandler -import akka.actor.{ Actor, Channel } -import akka.util.Duration +import akka.actor.{ Actor, Channel, ForwardableChannel, NullChannel, UntypedChannel, ActorRef } +import akka.util.{ Duration, BoxedType } import akka.japi.{ Procedure, Function ⇒ JFunc } import scala.util.continuations._ @@ -308,6 +308,22 @@ sealed trait Future[+T] { */ def await(atMost: Duration): Future[T] + /** + * Await completion of this Future (as `await`) and return its value if it + * conforms to A's erased type. + */ + def as[A](implicit m: Manifest[A]): Option[A] = + try { + await + value match { + case None ⇒ None + case Some(_: Left[_, _]) ⇒ None + case Some(Right(v)) ⇒ Some(BoxedType(m.erasure).cast(v).asInstanceOf[A]) + } + } catch { + case _: Exception ⇒ None + } + /** * Tests whether this Future has been completed. */ @@ -357,7 +373,7 @@ sealed trait Future[+T] { * Future. If the Future has already been completed, this will apply * immediately. */ - def onComplete(func: Future[T] ⇒ Unit): Future[T] + def onComplete(func: Future[T] ⇒ Unit): this.type /** * When the future is completed with a valid result, apply the provided @@ -369,7 +385,7 @@ sealed trait Future[+T] { * } * */ - final def onResult(pf: PartialFunction[Any, Unit]): Future[T] = onComplete { f ⇒ + final def onResult(pf: PartialFunction[Any, Unit]): this.type = onComplete { f ⇒ val optr = f.result if (optr.isDefined) { val r = optr.get @@ -496,6 +512,26 @@ sealed trait Future[+T] { fa } + /** + * Creates a new Future[A] which is completed with this Future's result if + * that conforms to A's erased type or a ClassCastException otherwise. + */ + final def mapTo[A](implicit m: Manifest[A]): Future[A] = { + val fa = new DefaultPromise[A](timeoutInNanos, NANOS) + onComplete { ft ⇒ + fa complete (ft.value.get match { + case l: Left[_, _] ⇒ l.asInstanceOf[Either[Throwable, A]] + case Right(t) ⇒ + try { + Right(BoxedType(m.erasure).cast(t).asInstanceOf[A]) + } catch { + case e: ClassCastException ⇒ Left(e) + } + }) + } + fa + } + /** * Creates a new Future by applying a function to the successful result of * this Future, and returns the result of the function as the new Future. @@ -586,7 +622,7 @@ sealed trait Future[+T] { } /* Java API */ - final def onComplete[A >: T](proc: Procedure[Future[A]]): Future[T] = onComplete(proc(_)) + final def onComplete[A >: T](proc: Procedure[Future[A]]): this.type = onComplete(proc(_)) final def map[A >: T, B](f: JFunc[A, B]): Future[B] = map(f(_)) @@ -607,10 +643,7 @@ object Promise { /** * Construct a completable channel */ - def channel(timeout: Long = Actor.TIMEOUT) = new Channel[Any] { - val promise = Promise[Any](timeout) - def !(msg: Any) = promise completeWithResult msg - } + def channel(timeout: Long = Actor.TIMEOUT): ActorPromise = new ActorPromise(timeout) private[akka] val callbacksPendingExecution = new ThreadLocal[Option[Stack[() ⇒ Unit]]]() { override def initialValue = None @@ -625,26 +658,26 @@ trait Promise[T] extends Future[T] { * Completes this Future with the specified result, if not already completed. * @return this */ - def complete(value: Either[Throwable, T]): Future[T] + def complete(value: Either[Throwable, T]): this.type /** * Completes this Future with the specified result, if not already completed. * @return this */ - final def completeWithResult(result: T): Future[T] = complete(Right(result)) + final def completeWithResult(result: T): this.type = complete(Right(result)) /** * Completes this Future with the specified exception, if not already completed. * @return this */ - final def completeWithException(exception: Throwable): Future[T] = complete(Left(exception)) + final def completeWithException(exception: Throwable): this.type = complete(Left(exception)) /** * Completes this Future with the specified other Future, when that Future is completed, * unless this Future has already been completed. * @return this. */ - final def completeWith(other: Future[T]): Future[T] = { + final def completeWith(other: Future[T]): this.type = { other onComplete { f ⇒ complete(f.value.get) } this } @@ -725,7 +758,7 @@ class DefaultPromise[T](timeout: Long, timeunit: TimeUnit) extends Promise[T] { } } - def complete(value: Either[Throwable, T]): DefaultPromise[T] = { + def complete(value: Either[Throwable, T]): this.type = { _lock.lock val notifyTheseListeners = try { if (_value.isEmpty) { //Only complete if we aren't expired @@ -772,7 +805,7 @@ class DefaultPromise[T](timeout: Long, timeunit: TimeUnit) extends Promise[T] { this } - def onComplete(func: Future[T] ⇒ Unit): Promise[T] = { + def onComplete(func: Future[T] ⇒ Unit): this.type = { _lock.lock val notifyNow = try { if (_value.isEmpty) { @@ -804,6 +837,36 @@ class DefaultPromise[T](timeout: Long, timeunit: TimeUnit) extends Promise[T] { private def timeLeft(): Long = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos) } +class ActorPromise(timeout: Long, timeunit: TimeUnit) + extends DefaultPromise[Any](timeout, timeunit) + with ForwardableChannel { + def this() = this(0, MILLIS) + def this(timeout: Long) = this(timeout, MILLIS) + + def !(message: Any)(implicit channel: UntypedChannel = NullChannel) = completeWithResult(message) + + def sendException(ex: Throwable) = completeWithException(ex) + + def channel: UntypedChannel = this + + def isUsableOnlyOnce = true + def isUsable = !isCompleted + def isReplyable = false + def canSendException = true + + @deprecated("ActorPromise merged with Channel[Any], just use 'this'", "1.2") + def future = this +} + +object ActorPromise { + def apply(f: Promise[Any]): ActorPromise = + new ActorPromise(f.timeoutInNanos, NANOS) { + completeWith(f) + override def !(message: Any)(implicit channel: UntypedChannel) = f completeWithResult message + override def sendException(ex: Throwable) = f completeWithException ex + } +} + /** * An already completed Future is seeded with it's result at creation, is useful for when you are participating in * a Future-composition but you already have a value to contribute. @@ -811,10 +874,10 @@ class DefaultPromise[T](timeout: Long, timeunit: TimeUnit) extends Promise[T] { sealed class KeptPromise[T](suppliedValue: Either[Throwable, T]) extends Promise[T] { val value = Some(suppliedValue) - def complete(value: Either[Throwable, T]): Promise[T] = this - def onComplete(func: Future[T] ⇒ Unit): Future[T] = { func(this); this } - def await(atMost: Duration): Future[T] = this - def await: Future[T] = this + def complete(value: Either[Throwable, T]): this.type = this + def onComplete(func: Future[T] ⇒ Unit): this.type = { func(this); this } + def await(atMost: Duration): this.type = this + def await: this.type = this def isExpired: Boolean = true def timeoutInNanos: Long = 0 } diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala index 0c87581e5e..d109d274d4 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala @@ -16,10 +16,9 @@ import akka.actor._ /** * @author Jonas Bonér */ -final case class MessageInvocation(receiver: ActorRef, - message: Any, - sender: Option[ActorRef], - senderFuture: Option[Promise[Any]]) { +final case class MessageInvocation(val receiver: ActorRef, + val message: Any, + val channel: UntypedChannel) { if (receiver eq null) throw new IllegalArgumentException("Receiver can't be null") final def invoke() { diff --git a/akka-actor/src/main/scala/akka/routing/Pool.scala b/akka-actor/src/main/scala/akka/routing/Pool.scala index c036616521..097f2d720a 100644 --- a/akka-actor/src/main/scala/akka/routing/Pool.scala +++ b/akka-actor/src/main/scala/akka/routing/Pool.scala @@ -5,6 +5,7 @@ package akka.routing import akka.actor.{ Actor, ActorRef, PoisonPill } +import akka.dispatch.{ Promise } /** * Actor pooling @@ -195,7 +196,7 @@ trait MailboxPressureCapacitor { */ trait ActiveFuturesPressureCapacitor { def pressure(delegates: Seq[ActorRef]): Int = - delegates count { _.senderFuture.isDefined } + delegates count { _.channel.isInstanceOf[Promise[Any]] } } /** diff --git a/akka-actor/src/main/scala/akka/util/BoxedType.scala b/akka-actor/src/main/scala/akka/util/BoxedType.scala new file mode 100644 index 0000000000..7bcacaa5f9 --- /dev/null +++ b/akka-actor/src/main/scala/akka/util/BoxedType.scala @@ -0,0 +1,25 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ +package akka.util + +import java.{ lang ⇒ jl } + +object BoxedType { + + private val toBoxed = Map[Class[_], Class[_]]( + classOf[Boolean] -> classOf[jl.Boolean], + classOf[Byte] -> classOf[jl.Byte], + classOf[Char] -> classOf[jl.Character], + classOf[Short] -> classOf[jl.Short], + classOf[Int] -> classOf[jl.Integer], + classOf[Long] -> classOf[jl.Long], + classOf[Float] -> classOf[jl.Float], + classOf[Double] -> classOf[jl.Double], + classOf[Unit] -> classOf[scala.runtime.BoxedUnit]) + + def apply(c: Class[_]): Class[_] = { + if (c.isPrimitive) toBoxed(c) else c + } + +} diff --git a/akka-camel/src/main/scala/akka/camel/Producer.scala b/akka-camel/src/main/scala/akka/camel/Producer.scala index f4f745a294..041f3397ff 100644 --- a/akka-camel/src/main/scala/akka/camel/Producer.scala +++ b/akka-camel/src/main/scala/akka/camel/Producer.scala @@ -10,6 +10,7 @@ import org.apache.camel._ import org.apache.camel.processor.SendProcessor import akka.actor.{ Actor, ActorRef, UntypedActor } +import akka.dispatch.ActorPromise /** * Support trait for producing messages to Camel endpoints. @@ -96,10 +97,9 @@ trait ProducerSupport { this: Actor ⇒ val exchange = createExchange(pattern).fromRequestMessage(cmsg) processor.process(exchange, new AsyncCallback { val producer = self - // Need copies of sender and senderFuture references here - // since the callback could be done later by another thread. - val sender = self.sender - val senderFuture = self.senderFuture + // Need copies of channel reference here since the callback could be done + // later by another thread. + val channel = self.channel def done(doneSync: Boolean): Unit = { (doneSync, exchange.isFailed) match { @@ -114,10 +114,12 @@ trait ProducerSupport { this: Actor ⇒ receiveAfterProduce(result) private def dispatchAsync(result: Any) = { - if (senderFuture.isDefined) - producer.postMessageToMailboxAndCreateFutureResultWithTimeout(result, producer.timeout, sender, senderFuture) - else - producer.postMessageToMailbox(result, sender) + channel match { + case _: ActorPromise ⇒ + producer.postMessageToMailboxAndCreateFutureResultWithTimeout(result, producer.timeout, channel) + case _ ⇒ + producer.postMessageToMailbox(result, channel) + } } }) } diff --git a/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala b/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala index d06bdcb70e..0000a74503 100644 --- a/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala +++ b/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala @@ -285,7 +285,7 @@ private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCall * @param message reply message * @param sender ignored */ - protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]) = { + protected[akka] def postMessageToMailbox(message: Any, channel: UntypedChannel) = { message match { case Ack ⇒ { /* no response message to set */ } case msg: Failure ⇒ exchange.fromFailureMessage(msg) @@ -312,7 +312,7 @@ private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCall def shutdownLinkedActors: Unit = unsupported def supervisor: Option[ActorRef] = unsupported def homeAddress: Option[InetSocketAddress] = None - protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T](message: Any, timeout: Long, senderOption: Option[ActorRef], senderFuture: Option[Promise[T]]) = unsupported + protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout(message: Any, timeout: Long, channel: UntypedChannel) = unsupported protected[akka] def mailbox: AnyRef = unsupported protected[akka] def mailbox_=(msg: AnyRef): AnyRef = unsupported protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala index f107904892..5698ac7be0 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala @@ -8,7 +8,7 @@ import Cluster._ import akka.actor._ import akka.actor.Actor._ import akka.event.EventHandler -import akka.dispatch.Promise +import akka.dispatch.Future import java.net.InetSocketAddress import java.util.concurrent.atomic.AtomicReference @@ -37,15 +37,23 @@ class ClusterActorRef private[akka] ( def connections: Map[InetSocketAddress, ActorRef] = inetSocketAddressToActorRefMap.get - override def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = - route(message)(senderOption) + override def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = { + val sender = channel match { + case ref: ActorRef ⇒ Some(ref) + case _ ⇒ None + } + route(message)(sender) + } - override def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( + override def postMessageToMailboxAndCreateFutureResultWithTimeout( message: Any, timeout: Long, - senderOption: Option[ActorRef], - senderFuture: Option[Promise[T]]): Promise[T] = { - route[T](message, timeout)(senderOption).asInstanceOf[Promise[T]] + channel: UntypedChannel): Future[Any] = { + val sender = channel match { + case ref: ActorRef ⇒ Some(ref) + case _ ⇒ None + } + route[Any](message, timeout)(sender) } private[akka] def failOver(fromInetSocketAddress: InetSocketAddress, toInetSocketAddress: InetSocketAddress) { diff --git a/akka-cluster/src/main/scala/akka/cluster/ReplicatedClusterRef.scala b/akka-cluster/src/main/scala/akka/cluster/ReplicatedClusterRef.scala index 4b075c7f91..b9ea77043c 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ReplicatedClusterRef.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ReplicatedClusterRef.scala @@ -79,14 +79,13 @@ class ReplicatedActorRef private[akka] (actorRef: ActorRef, val address: String) def startLink(actorRef: ActorRef): ActorRef = actorRef.startLink(actorRef) def supervisor: Option[ActorRef] = actorRef.supervisor def linkedActors: JMap[Uuid, ActorRef] = actorRef.linkedActors - protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]) { - actorRef.postMessageToMailbox(message, senderOption) + protected[akka] def postMessageToMailbox(message: Any, channel: UntypedChannel) { + actorRef.postMessageToMailbox(message, channel) } - protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( + protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout( message: Any, timeout: Long, - senderOption: Option[ActorRef], - senderFuture: Option[Promise[T]]): Promise[T] = actorRef.postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, senderOption, senderFuture) + channel: UntypedChannel): Future[Any] = actorRef.postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, channel) protected[akka] def actorInstance: AtomicReference[Actor] = actorRef.actorInstance protected[akka] def supervisor_=(sup: Option[ActorRef]) { actorRef.supervisor_=(sup) diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableDispatcher.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableDispatcher.scala index 142fbea84f..adc3a12beb 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableDispatcher.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableDispatcher.scala @@ -94,7 +94,7 @@ case class DurableDispatcher( override def createMailbox(actorRef: ActorRef): AnyRef = _storage.createFor(actorRef) private[akka] override def dispatch(invocation: MessageInvocation): Unit = { - if (invocation.senderFuture.isDefined) + if (invocation.channel.isInstanceOf[ActorPromise]) throw new IllegalArgumentException("Durable mailboxes do not support Future-based messages from !! and !!!") super.dispatch(invocation) } @@ -131,7 +131,7 @@ case class DurablePinnedDispatcher( override def createMailbox(actorRef: ActorRef): AnyRef = _storage.createFor(actorRef) private[akka] override def dispatch(invocation: MessageInvocation): Unit = { - if (invocation.senderFuture.isDefined) + if (invocation.channel.isInstanceOf[ActorPromise]) throw new IllegalArgumentException("Actor has a durable mailbox that does not support !! or !!!") super.dispatch(invocation) } diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala index 5b83f6a8c4..ec1a3a6e62 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala @@ -5,7 +5,7 @@ package akka.actor.mailbox import MailboxProtocol._ -import akka.actor.{Actor, ActorRef} +import akka.actor.{Actor, ActorRef, NullChannel} import akka.dispatch._ import akka.event.EventHandler import akka.remote.MessageSerializer @@ -48,7 +48,10 @@ abstract class DurableExecutableMailbox(owner: ActorRef) extends MessageQueue wi val builder = DurableMailboxMessageProtocol.newBuilder .setOwnerAddress(ownerAddress) .setMessage(message.toByteString) - if (durableMessage.sender.isDefined) builder.setSenderAddress(durableMessage.sender.get.address) + durableMessage.channel match { + case a : ActorRef => builder.setSenderAddress(a.address) + case _ => + } builder.build.toByteArray } @@ -62,10 +65,14 @@ abstract class DurableExecutableMailbox(owner: ActorRef) extends MessageQueue wi throw new DurableMailboxException("No actor could be found for address [" + ownerAddress + "], could not deserialize message.")) - val sender = if (durableMessage.hasSenderAddress) { + val senderOption = if (durableMessage.hasSenderAddress) { Actor.registry.actorFor(durableMessage.getSenderAddress) } else None + val sender = senderOption match { + case Some(ref) => ref + case None => NullChannel + } - new MessageInvocation(owner, message, sender, None) + new MessageInvocation(owner, message, sender) } } diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 5ffc4b16f3..e107f45d49 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -4,7 +4,7 @@ package akka.remote.netty -import akka.dispatch.{ DefaultPromise, Promise, Future } +import akka.dispatch.{ ActorPromise, DefaultPromise, Promise, Future } import akka.remote.{ MessageSerializer, RemoteClientSettings, RemoteServerSettings } import akka.remote.protocol.RemoteProtocol._ import akka.serialization.RemoteActorSerialization @@ -913,8 +913,7 @@ class RemoteServerHandler( else actorRef.postMessageToMailboxAndCreateFutureResultWithTimeout( message, request.getActorInfo.getTimeout, - None, - Some(new DefaultPromise[Any](request.getActorInfo.getTimeout). + new ActorPromise(request.getActorInfo.getTimeout). onComplete(_.value.get match { case l: Left[Throwable, Any] ⇒ write(channel, createErrorReplyMessage(l.a, request)) case r: Right[Throwable, Any] ⇒ @@ -931,7 +930,7 @@ class RemoteServerHandler( if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid) write(channel, RemoteEncoder.encode(messageBuilder.build)) - }))) + })) } } diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 90d9bfda83..cd8a778c93 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -5,7 +5,7 @@ package akka.testkit import akka.event.EventHandler import akka.actor.ActorRef -import akka.dispatch.{ MessageDispatcher, MessageInvocation, FutureInvocation } +import akka.dispatch.{ MessageDispatcher, MessageInvocation, FutureInvocation, Promise, ActorPromise } import java.util.concurrent.locks.ReentrantLock import java.util.LinkedList import java.util.concurrent.RejectedExecutionException @@ -140,14 +140,14 @@ class CallingThreadDispatcher(val warnings: Boolean = true) extends MessageDispa val queue = mbox.queue val execute = mbox.suspended.ifElseYield { queue.push(handle) - if (warnings && handle.senderFuture.isDefined) { + if (warnings && handle.channel.isInstanceOf[Promise[_]]) { EventHandler.warning(this, "suspended, creating Future could deadlock; target: %s" format handle.receiver) } false } { queue.push(handle) if (queue.isActive) { - if (warnings && handle.senderFuture.isDefined) { + if (warnings && handle.channel.isInstanceOf[Promise[_]]) { EventHandler.warning(this, "blocked on this thread, creating Future could deadlock; target: %s" format handle.receiver) } false @@ -186,14 +186,18 @@ class CallingThreadDispatcher(val warnings: Boolean = true) extends MessageDispa if (handle ne null) { try { handle.invoke - val f = handle.senderFuture - if (warnings && f.isDefined && !f.get.isCompleted) { - EventHandler.warning(this, "calling %s with message %s did not reply as expected, might deadlock" format (handle.receiver, handle.message)) + if (warnings) handle.channel match { + case f: ActorPromise if !f.isCompleted ⇒ + EventHandler.warning(this, "calling %s with message %s did not reply as expected, might deadlock" format (handle.receiver, handle.message)) + case _ ⇒ } + true } catch { - case _ ⇒ queue.leave + case e ⇒ + EventHandler.error(this, e) + queue.leave + false } - true } else if (queue.isActive) { queue.leave false diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index c13787fb0e..4598749ae2 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -3,7 +3,7 @@ */ package akka.testkit -import akka.actor.{ Actor, FSM } +import akka.actor._ import Actor._ import akka.util.Duration import akka.util.duration._ @@ -17,9 +17,19 @@ object TestActor { case class SetTimeout(d: Duration) case class SetIgnore(i: Ignore) + + trait Message { + def msg: AnyRef + def channel: UntypedChannel + } + case class RealMessage(msg: AnyRef, channel: UntypedChannel) extends Message + case object NullMessage extends Message { + override def msg: AnyRef = throw new IllegalActorStateException("last receive did not dequeue a message") + override def channel: UntypedChannel = throw new IllegalActorStateException("last receive did not dequeue a message") + } } -class TestActor(queue: BlockingDeque[AnyRef]) extends Actor with FSM[Int, TestActor.Ignore] { +class TestActor(queue: BlockingDeque[TestActor.Message]) extends Actor with FSM[Int, TestActor.Ignore] { import FSM._ import TestActor._ @@ -36,7 +46,7 @@ class TestActor(queue: BlockingDeque[AnyRef]) extends Actor with FSM[Int, TestAc case Event(x: AnyRef, ign) ⇒ val ignore = ign map (z ⇒ if (z isDefinedAt x) z(x) else false) getOrElse false if (!ignore) { - queue.offerLast(x) + queue.offerLast(RealMessage(x, self.channel)) } stay } @@ -76,19 +86,23 @@ class TestActor(queue: BlockingDeque[AnyRef]) extends Actor with FSM[Int, TestAc */ trait TestKit { - private val queue = new LinkedBlockingDeque[AnyRef]() + import TestActor.{ Message, RealMessage, NullMessage } + + private val queue = new LinkedBlockingDeque[Message]() + private[akka] var lastMessage: Message = NullMessage /** * ActorRef of the test actor. Access is provided to enable e.g. * registration as message target. */ - protected val testActor = actorOf(new TestActor(queue)).start() + implicit val testActor = actorOf(new TestActor(queue)).start() /** * Implicit sender reference so that replies are possible for messages sent * from the test class. */ - protected implicit val senderOption = Some(testActor) + @deprecated("will be removed after 1.2, replaced by implicit testActor", "1.2") + val senderOption = Some(testActor) private var end: Duration = Duration.Inf /* @@ -183,6 +197,14 @@ trait TestKit { */ def within[T](max: Duration)(f: ⇒ T): T = within(0 seconds, max)(f) + /** + * Send reply to the last dequeued message. Will throw + * IllegalActorStateException if no message has been dequeued, yet. Dequeuing + * means reception of the message as part of an expect... or receive... call, + * not reception by the testActor. + */ + def reply(msg: AnyRef) { lastMessage.channel ! msg } + /** * Same as `expectMsg`, but takes the maximum wait time from the innermost * enclosing `within` block. @@ -396,16 +418,21 @@ trait TestKit { */ def receiveWhile[T](max: Duration)(f: PartialFunction[AnyRef, T]): Seq[T] = { val stop = now + max + var msg: Message = NullMessage @tailrec def doit(acc: List[T]): List[T] = { - receiveOne(stop - now) match { - case null ⇒ + receiveOne(stop - now) + lastMessage match { + case NullMessage ⇒ + lastMessage = msg acc.reverse - case o if (f isDefinedAt o) ⇒ + case RealMessage(o, _) if (f isDefinedAt o) ⇒ + msg = lastMessage doit(f(o) :: acc) - case o ⇒ - queue.offerFirst(o) + case RealMessage(o, _) ⇒ + queue.offerFirst(lastMessage) + lastMessage = msg acc.reverse } } @@ -415,7 +442,10 @@ trait TestKit { ret } - private def receiveN(n: Int, stop: Duration): Seq[AnyRef] = { + /** + * Receive N messages in a row before the given deadline. + */ + def receiveN(n: Int, stop: Duration): Seq[AnyRef] = { for { x ← 1 to n } yield { val timeout = stop - now val o = receiveOne(timeout) @@ -424,17 +454,65 @@ trait TestKit { } } - private def receiveOne(max: Duration): AnyRef = { - if (max == 0.seconds) { - queue.pollFirst - } else if (max.finite_?) { - queue.pollFirst(max.length, max.unit) - } else { - queue.takeFirst + /** + * Receive one message from the internal queue of the TestActor. If the given + * duration is zero, the queue is polled (non-blocking). + */ + def receiveOne(max: Duration): AnyRef = { + val message = + if (max == 0.seconds) { + queue.pollFirst + } else if (max.finite_?) { + queue.pollFirst(max.length, max.unit) + } else { + queue.takeFirst + } + message match { + case null ⇒ + lastMessage = NullMessage + null + case RealMessage(msg, _) ⇒ + lastMessage = message + msg } } private def format(u: TimeUnit, d: Duration) = "%.3f %s".format(d.toUnit(u), u.toString.toLowerCase) } -// vim: set ts=2 sw=2 et: +/** + * TestKit-based probe which allows sending, reception and reply. + */ +class TestProbe extends TestKit { + + /** + * Shorthand to get the testActor. + */ + def ref = testActor + + /** + * Send message to an actor while using the probe's TestActor as the sender. + * Replies will be available for inspection with all of TestKit's assertion + * methods. + */ + def send(actor: ActorRef, msg: AnyRef) = { + actor ! msg + } + + /** + * Forward this message as if in the TestActor's receive method with self.forward. + */ + def forward(actor: ActorRef, msg: AnyRef = lastMessage.msg) { + actor.!(msg)(lastMessage.channel) + } + + /** + * Get channel of last received message. + */ + def channel = lastMessage.channel + +} + +object TestProbe { + def apply() = new TestProbe +} diff --git a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala index 5d74db9b34..27e8a5fbd7 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala @@ -190,7 +190,7 @@ class TestActorRefSpec extends WordSpec with MustMatchers with BeforeAndAfterEac "support futures" in { val a = TestActorRef[WorkerActor].start() - val f: Future[String] = a !!! "work" + val f = a ? "work" mapTo manifest[String] f must be('completed) f.get must equal("workDone") } @@ -239,9 +239,8 @@ class TestActorRefSpec extends WordSpec with MustMatchers with BeforeAndAfterEac intercept[IllegalActorStateException] { ref("work") } val ch = Promise.channel() ref ! ch - val f = ch.promise - f must be('completed) - f.get must be("complexReply") + ch must be('completed) + ch.get must be("complexReply") } } diff --git a/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala new file mode 100644 index 0000000000..f12d8a2e19 --- /dev/null +++ b/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala @@ -0,0 +1,45 @@ +package akka.testkit + +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers +import org.scalatest.{ BeforeAndAfterEach, WordSpec } +import akka.actor._ +import akka.config.Supervision.OneForOneStrategy +import akka.event.EventHandler +import akka.dispatch.Future +import akka.util.duration._ + +class TestProbeSpec extends WordSpec with MustMatchers { + + "A TestProbe" must { + + "reply to futures" in { + val tk = TestProbe() + val future = tk.ref ? "hello" + tk.expectMsg(0 millis, "hello") // TestActor runs on CallingThreadDispatcher + tk.reply("world") + future must be('completed) + future.get must equal("world") + } + + "reply to messages" in { + val tk1 = TestProbe() + val tk2 = TestProbe() + tk1.ref.!("hello")(tk2.ref) + tk1.expectMsg(0 millis, "hello") + tk1.reply("world") + tk2.expectMsg(0 millis, "world") + } + + "properly send and reply to messages" in { + val probe1 = TestProbe() + val probe2 = TestProbe() + probe1.send(probe2.ref, "hello") + probe2.expectMsg(0 millis, "hello") + probe2.reply("world") + probe1.expectMsg(0 millis, "world") + } + + } + +} diff --git a/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java b/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java index 5caae5e365..12c16d0142 100644 --- a/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java +++ b/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java @@ -184,10 +184,10 @@ public class Pi { // send calculate message long timeout = 60000; - Future replyFuture = master.sendRequestReplyFuture(new Calculate(), timeout, null); - Option result = replyFuture.await().resultOrException(); + Future replyFuture = master.sendRequestReplyFuture(new Calculate(), timeout, null); + Option result = replyFuture.await().resultOrException(); if (result.isDefined()) { - double pi = result.get(); + double pi = (Double) result.get(); // TODO java api for EventHandler? // EventHandler.info(this, String.format("\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis", pi, (currentTimeMillis() - start))); System.out.println(String.format("\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis", pi, (currentTimeMillis() - start)));