Merge pull request #189 from jboner/wip-1568-time-out-asks-√

Changing boxing of TimeoutException to boxing of InterruptedException
This commit is contained in:
viktorklang 2011-12-27 04:01:32 -08:00
commit ca96cb3973
6 changed files with 28 additions and 15 deletions

View file

@ -64,9 +64,9 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
val future = Promise[String]().complete(Left(new RuntimeException(message)))
behave like futureWithException[RuntimeException](_(future, message))
}
"completed with a j.u.c.TimeoutException" must {
val message = "Boxed TimeoutException"
val future = Promise[String]().complete(Left(new TimeoutException(message)))
"completed with an InterruptedException" must {
val message = "Boxed InterruptedException"
val future = Promise[String]().complete(Left(new InterruptedException(message)))
behave like futureWithException[RuntimeException](_(future, message))
}
"completed with a NonLocalReturnControl" must {

View file

@ -13,7 +13,7 @@ import java.util.concurrent.TimeUnit
import akka.event.EventStream
import akka.event.DeathWatch
import scala.annotation.tailrec
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.{ ConcurrentHashMap, TimeoutException }
import akka.event.LoggingAdapter
import java.util.concurrent.atomic.AtomicBoolean
@ -106,6 +106,8 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable
* Akka Java API.
*
* Sends a message asynchronously returns a future holding the eventual reply message.
* The Future will be completed with an [[akka.actor.AskTimeoutException]] after the given
* timeout has expired.
*
* <b>NOTE:</b>
* Use this method with care. In most cases it is better to use 'tell' together with the sender
@ -177,6 +179,9 @@ trait ScalaActorRef { ref: ActorRef ⇒
/**
* Sends a message asynchronously, returning a future which may eventually hold the reply.
* The Future will be completed with an [[akka.actor.AskTimeoutException]] after the given
* timeout has expired.
*
* <b>NOTE:</b>
* Use this method with care. In most cases it is better to use '!' together with implicit or explicit
* sender parameter to implement non-blocking request/response message exchanges.
@ -489,6 +494,14 @@ class VirtualPathContainer(val path: ActorPath, override val getParent: Internal
}
}
/**
* This is what is used to complete a Future that is returned from an ask/? call,
* when it times out.
*/
class AskTimeoutException(message: String, cause: Throwable) extends TimeoutException {
def this(message: String) = this(message, null: Throwable)
}
class AskActorRef(
val path: ActorPath,
override val getParent: InternalActorRef,

View file

@ -6,14 +6,12 @@ package akka.actor
import java.util.concurrent.atomic.AtomicLong
import org.jboss.netty.akka.util.{ TimerTask, HashedWheelTimer }
import akka.util.Timeout
import akka.util.Timeout.intToTimeout
import akka.config.ConfigurationException
import akka.dispatch._
import akka.routing._
import akka.util.Timeout
import akka.AkkaException
import akka.util.{ Duration, Switch, Helpers }
import akka.util.{ Duration, Switch, Helpers, Timeout }
import akka.event._
import java.io.Closeable
@ -485,15 +483,15 @@ class LocalActorRefProvider(
def ask(within: Timeout): Option[AskActorRef] = {
(if (within == null) settings.ActorTimeout else within) match {
case t if t.duration.length <= 0
None
case t if t.duration.length <= 0 None
case t
val path = tempPath()
val name = path.name
val a = new AskActorRef(path, tempContainer, dispatcher, deathWatch)
tempContainer.addChild(name, a)
val f = dispatcher.prerequisites.scheduler.scheduleOnce(t.duration) { tempContainer.removeChild(name); a.stop() }
a.result onComplete { _
val result = a.result
val f = dispatcher.prerequisites.scheduler.scheduleOnce(t.duration) { result.failure(new AskTimeoutException("Timed out")) }
result onComplete { _
try { a.stop(); f.cancel() }
finally { tempContainer.removeChild(name) }
}

View file

@ -337,7 +337,7 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
protected final def resolve[X](source: Either[Throwable, X]): Either[Throwable, X] = source match {
case Left(t: scala.runtime.NonLocalReturnControl[_]) Right(t.value.asInstanceOf[X])
case Left(t: TimeoutException) Left(new RuntimeException("Boxed TimeoutException", t))
case Left(t: InterruptedException) Left(new RuntimeException("Boxed InterruptedException", t))
case _ source
}

View file

@ -280,8 +280,9 @@ If invoked without the sender parameter the sender will be
Ask: Send-And-Receive-Future
----------------------------
Using ``ask`` will send a message to the receiving Actor asynchronously and
will immediately return a :class:`Future`:
Using ``?`` will send a message to the receiving Actor asynchronously and
will immediately return a :class:`Future` which will be completed with
an ``akka.actor.AskTimeoutException`` after the specified timeout:
.. code-block:: java

View file

@ -318,7 +318,8 @@ Ask: Send-And-Receive-Future
----------------------------
Using ``?`` will send a message to the receiving Actor asynchronously and
will immediately return a :class:`Future`:
will immediately return a :class:`Future` which will be completed with
an ``akka.actor.AskTimeoutException`` after the specified timeout:
.. code-block:: scala