diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index f8aa76a7ca..943a24025b 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -11,11 +11,11 @@ import akka.testkit.{ EventFilter, filterEvents, filterException } import akka.util.duration._ import akka.testkit.AkkaSpec import org.scalatest.junit.JUnitSuite -import java.lang.ArithmeticException import akka.testkit.DefaultTimeout import akka.testkit.TestLatch import java.util.concurrent.{ TimeoutException, TimeUnit, CountDownLatch } import scala.runtime.NonLocalReturnControl +import java.lang.{ IllegalStateException, ArithmeticException } object FutureSpec { class TestActor extends Actor { @@ -327,6 +327,24 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa Await.result(Future.fold(futures)(0)(_ + _), timeout millis) must be(45) } + "zip" in { + val timeout = 10000 millis + val f = new IllegalStateException("test") + intercept[IllegalStateException] { + Await.result(Promise.failed[String](f) zip Promise.successful("foo"), timeout) + } must be(f) + + intercept[IllegalStateException] { + Await.result(Promise.successful("foo") zip Promise.failed[String](f), timeout) + } must be(f) + + intercept[IllegalStateException] { + Await.result(Promise.failed[String](f) zip Promise.failed[String](f), timeout) + } must be(f) + + Await.result(Promise.successful("foo") zip Promise.successful("foo"), timeout) must be(("foo", "foo")) + } + "fold by composing" in { val actors = (1 to 10).toList map { _ ⇒ system.actorOf(Props(new Actor { @@ -859,6 +877,12 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa Await.result(p, timeout.duration) must be(result) } } + "zip properly" in { + f { (future, result) ⇒ + Await.result(future zip Promise.successful("foo"), timeout.duration) must be((result, "foo")) + (evaluating { Await.result(future zip Promise.failed(new RuntimeException("ohnoes")), timeout.duration) } must produce[RuntimeException]).getMessage must be("ohnoes") + } + } "not recover from exception" in { f((future, result) ⇒ Await.result(future.recover({ case _ ⇒ "pigdog" }), timeout.duration) must be(result)) } "perform action on result" in { f { (future, result) ⇒ @@ -892,6 +916,10 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa "retain exception with map" in { f((future, message) ⇒ (evaluating { Await.result(future map (_.toString.length), timeout.duration) } must produce[E]).getMessage must be(message)) } "retain exception with flatMap" in { f((future, message) ⇒ (evaluating { Await.result(future flatMap (_ ⇒ Promise.successful[Any]("foo")), timeout.duration) } must produce[E]).getMessage must be(message)) } "not perform action with foreach" is pending + + "zip properly" in { + f { (future, message) ⇒ (evaluating { Await.result(future zip Promise.successful("foo"), timeout.duration) } must produce[E]).getMessage must be(message) } + } "recover from exception" in { f((future, message) ⇒ Await.result(future.recover({ case e if e.getMessage == message ⇒ "pigdog" }), timeout.duration) must be("pigdog")) } "not perform action on result" is pending "project a failure" in { f((future, message) ⇒ Await.result(future.failed, timeout.duration).getMessage must be(message)) } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 452bf02927..17d7e095b7 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -346,6 +346,21 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { case _ ⇒ source } + /** + * @returns a new Future that will contain a tuple containing the successful result of this and that Future. + * If this or that fail, they will race to complete the returned Future with their failure. + * The returned Future will not be completed if neither this nor that are completed. + */ + def zip[U](that: Future[U]): Future[(T, U)] = { + val p = Promise[(T, U)]() + onComplete { + case Left(t) ⇒ p failure t + case Right(r) ⇒ that onSuccess { case r2 ⇒ p success ((r, r2)) } + } + that onFailure { case f ⇒ p failure f } + p + } + /** * For use only within a Future.flow block or another compatible Delimited Continuations reset block. * @@ -357,7 +372,7 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { /** * Tests whether this Future has been completed. */ - final def isCompleted: Boolean = value.isDefined + def isCompleted: Boolean /** * The contained value of this Future. Before this Future is completed @@ -676,23 +691,7 @@ trait Promise[T] extends Future[T] { //Companion object to FState, just to provide a cheap, immutable default entry private[dispatch] object DefaultPromise { - def EmptyPending[T](): FState[T] = emptyPendingValue.asInstanceOf[FState[T]] - - /** - * Represents the internal state of the DefaultCompletableFuture - */ - - sealed trait FState[+T] { def value: Option[Either[Throwable, T]] } - case class Pending[T](listeners: List[Either[Throwable, T] ⇒ Unit] = Nil) extends FState[T] { - def value: Option[Either[Throwable, T]] = None - } - case class Success[T](value: Option[Either[Throwable, T]] = None) extends FState[T] { - def result: T = value.get.right.get - } - case class Failure[T](value: Option[Either[Throwable, T]] = None) extends FState[T] { - def exception: Throwable = value.get.left.get - } - private val emptyPendingValue = Pending[Nothing](Nil) + def EmptyPending[T](): List[T] = Nil } /** @@ -701,28 +700,25 @@ private[dispatch] object DefaultPromise { class DefaultPromise[T](implicit val executor: ExecutionContext) extends AbstractPromise with Promise[T] { self ⇒ - import DefaultPromise.{ FState, Success, Failure, Pending } - protected final def tryAwait(atMost: Duration): Boolean = { Future.blocking @tailrec def awaitUnsafe(waitTimeNanos: Long): Boolean = { - if (value.isEmpty && waitTimeNanos > 0) { + if (!isCompleted && waitTimeNanos > 0) { val ms = NANOSECONDS.toMillis(waitTimeNanos) val ns = (waitTimeNanos % 1000000l).toInt //As per object.wait spec val start = System.nanoTime() - try { synchronized { if (value.isEmpty) wait(ms, ns) } } catch { case e: InterruptedException ⇒ } + try { synchronized { if (!isCompleted) wait(ms, ns) } } catch { case e: InterruptedException ⇒ } awaitUnsafe(waitTimeNanos - (System.nanoTime() - start)) - } else - value.isDefined + } else isCompleted } awaitUnsafe(if (atMost.isFinite) atMost.toNanos else Long.MaxValue) } def ready(atMost: Duration)(implicit permit: CanAwait): this.type = - if (value.isDefined || tryAwait(atMost)) this + if (isCompleted || tryAwait(atMost)) this else throw new TimeoutException("Futures timed out after [" + atMost.toMillis + "] milliseconds") def result(atMost: Duration)(implicit permit: CanAwait): T = @@ -731,16 +727,24 @@ class DefaultPromise[T](implicit val executor: ExecutionContext) extends Abstrac case Right(r) ⇒ r } - def value: Option[Either[Throwable, T]] = getState.value + def value: Option[Either[Throwable, T]] = getState match { + case _: List[_] ⇒ None + case c: Either[_, _] ⇒ Some(c.asInstanceOf[Either[Throwable, T]]) + } + + def isCompleted(): Boolean = getState match { + case _: Either[_, _] ⇒ true + case _ ⇒ false + } @inline - private[this] final def updater = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]] + private[this] final def updater = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, AnyRef]] @inline - protected final def updateState(oldState: FState[T], newState: FState[T]): Boolean = updater.compareAndSet(this, oldState, newState) + protected final def updateState(oldState: AnyRef, newState: AnyRef): Boolean = updater.compareAndSet(this, oldState, newState) @inline - protected final def getState: FState[T] = updater.get(this) + protected final def getState: AnyRef = updater.get(this) def tryComplete(value: Either[Throwable, T]): Boolean = { val callbacks: List[Either[Throwable, T] ⇒ Unit] = { @@ -748,9 +752,9 @@ class DefaultPromise[T](implicit val executor: ExecutionContext) extends Abstrac @tailrec def tryComplete(v: Either[Throwable, T]): List[Either[Throwable, T] ⇒ Unit] = { getState match { - case cur @ Pending(listeners) ⇒ - if (updateState(cur, if (v.isLeft) Failure(Some(v)) else Success(Some(v)))) listeners - else tryComplete(v) + case raw: List[_] ⇒ + val cur = raw.asInstanceOf[List[Either[Throwable, T] ⇒ Unit]] + if (updateState(cur, v)) cur else tryComplete(v) case _ ⇒ null } } @@ -769,22 +773,20 @@ class DefaultPromise[T](implicit val executor: ExecutionContext) extends Abstrac def onComplete(func: Either[Throwable, T] ⇒ Unit): this.type = { @tailrec //Returns whether the future has already been completed or not - def tryAddCallback(): Boolean = { + def tryAddCallback(): Either[Throwable, T] = { val cur = getState cur match { - case _: Success[_] | _: Failure[_] ⇒ true - case p: Pending[_] ⇒ - val pt = p.asInstanceOf[Pending[T]] - if (updateState(pt, pt.copy(listeners = func :: pt.listeners))) false else tryAddCallback() + case r: Either[_, _] ⇒ r.asInstanceOf[Either[Throwable, T]] + case listeners: List[_] ⇒ if (updateState(listeners, func :: listeners)) null else tryAddCallback() } } - if (tryAddCallback()) { - val result = value.get - Future.dispatchTask(() ⇒ notifyCompleted(func, result)) + tryAddCallback() match { + case null ⇒ this + case completed ⇒ + Future.dispatchTask(() ⇒ notifyCompleted(func, completed)) + this } - - this } private final def notifyCompleted(func: Either[Throwable, T] ⇒ Unit, result: Either[Throwable, T]) { @@ -805,7 +807,7 @@ final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val exe Future dispatchTask (() ⇒ func(completedAs)) this } - + def isCompleted(): Boolean = true def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this def result(atMost: Duration)(implicit permit: CanAwait): T = value.get match { case Left(e) ⇒ throw e