diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index b0f13280d7..15bf70b2e6 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -15,6 +15,7 @@ import java.lang.ArithmeticException import akka.testkit.DefaultTimeout import akka.testkit.TestLatch import java.util.concurrent.{ TimeoutException, TimeUnit, CountDownLatch } +import scala.runtime.NonLocalReturnControl object FutureSpec { class TestActor extends Actor { @@ -63,6 +64,16 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val future = Promise[String]().complete(Left(new RuntimeException(message))) behave like futureWithException[RuntimeException](_(future, message)) } + "completed with a j.u.c.TimeoutException" must { + val message = "Boxed TimeoutException" + val future = Promise[String]().complete(Left(new TimeoutException(message))) + behave like futureWithException[RuntimeException](_(future, message)) + } + "completed with a NonLocalReturnControl" must { + val result = "test value" + val future = Promise[String]().complete(Left(new NonLocalReturnControl[String]("test", result))) + behave like futureWithResult(_(future, result)) + } } "A Future" when { diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 479de8731b..71c4822e35 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -305,6 +305,9 @@ object Future { override def initialValue = None } + /** + * Internal API, do not call + */ private[akka] def dispatchTask(task: () ⇒ Unit, force: Boolean = false)(implicit dispatcher: MessageDispatcher): Unit = _taskStack.get match { case Some(taskStack) if !force ⇒ taskStack push task @@ -332,6 +335,13 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { implicit def dispatcher: MessageDispatcher + protected final def resolve[X](source: Either[Throwable, X]): Either[Throwable, X] = source match { + case r: Right[_, _] ⇒ r.asInstanceOf[Either[Throwable, X]] + case Left(t: scala.runtime.NonLocalReturnControl[_]) ⇒ Right(t.value.asInstanceOf[X]) + case Left(t: TimeoutException) ⇒ Left(new RuntimeException("Boxed TimeoutException", t)) + case l: Left[_, _] ⇒ l.asInstanceOf[Either[Throwable, X]] + } + /** * For use only within a Future.flow block or another compatible Delimited Continuations reset block. * @@ -708,17 +718,15 @@ class DefaultPromise[T](implicit val dispatcher: MessageDispatcher) extends Abst val callbacks: List[Either[Throwable, T] ⇒ Unit] = { try { @tailrec - def tryComplete: List[Either[Throwable, T] ⇒ Unit] = { - val cur = getState - - cur match { - case Pending(listeners) ⇒ - if (updateState(cur, if (value.isLeft) Failure(Some(value)) else Success(Some(value)))) listeners - else tryComplete + def tryComplete(v: Either[Throwable, T]): List[Either[Throwable, T] ⇒ Unit] = { + getState match { + case cur @ Pending(listeners) ⇒ + if (updateState(cur, if (v.isLeft) Failure(Some(v)) else Success(Some(v)))) listeners + else tryComplete(v) case _ ⇒ null } } - tryComplete + tryComplete(resolve(value)) } finally { synchronized { notifyAll() } //Notify any evil blockers } @@ -761,7 +769,7 @@ class DefaultPromise[T](implicit val dispatcher: MessageDispatcher) extends Abst * a Future-composition but you already have a value to contribute. */ final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val dispatcher: MessageDispatcher) extends Promise[T] { - val value = Some(suppliedValue) + val value = Some(resolve(suppliedValue)) def tryComplete(value: Either[Throwable, T]): Boolean = true def onComplete(func: Either[Throwable, T] ⇒ Unit): this.type = {