diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 0b833f47b0..370bad6d1c 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -55,6 +55,13 @@ object Futures { futureResult } + /** + * Java API + * Returns a Future to the result of the first future in the list that is completed + */ + def firstCompletedOf[T <: AnyRef](futures: java.lang.Iterable[Future[T]], timeout: Long): Future[T] = + firstCompletedOf(scala.collection.JavaConversions.asScalaIterable(futures),timeout) + /** * A non-blocking fold over the specified futures. * The fold is performed on the thread where the last future is completed, @@ -95,6 +102,16 @@ object Futures { } } + /** + * Java API + * A non-blocking fold over the specified futures. + * The fold is performed on the thread where the last future is completed, + * the result will be the first failure of any of the futures, or any failure in the actual fold, + * or the result of the fold. + */ + def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Long, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = + fold(zero, timeout)(scala.collection.JavaConversions.asScalaIterable(futures))( fun.apply _ ) + /** * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first */ @@ -119,6 +136,13 @@ object Futures { } } + /** + * Java API + * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first + */ + def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Long, fun: akka.japi.Function2[R, T, T]): Future[R] = + reduce(scala.collection.JavaConversions.asScalaIterable(futures), timeout)(fun.apply _) + import scala.collection.mutable.Builder import scala.collection.generic.CanBuildFrom @@ -162,6 +186,10 @@ object Futures { } object Future { + /** + * This method constructs and returns a Future that will eventually hold the result of the execution of the supplied body + * The execution is performed by the specified Dispatcher. + */ def apply[T](body: => T, timeout: Long = Actor.TIMEOUT)(implicit dispatcher: MessageDispatcher): Future[T] = { val f = new DefaultCompletableFuture[T](timeout) dispatcher.dispatchFuture(FutureInvocation(f.asInstanceOf[CompletableFuture[Any]], () => body)) @@ -170,6 +198,23 @@ object Future { } sealed trait Future[+T] { + + /** + * Returns the result of this future after waiting for it to complete, + * this method will throw any throwable that this Future was completed with + * and will throw a java.util.concurrent.TimeoutException if there is no result + * within the Futures timeout + */ + def apply(): T = this.await.resultOrException match { + case None => throw new java.util.concurrent.TimeoutException("Future timed out!") + case s: Some[T] => s.get + } + + /** + * Java API for apply() + */ + def get: T = apply() + /** * Blocks the current thread until the Future has been completed or the * timeout has expired. In the case of the timeout expiring a @@ -410,13 +455,43 @@ sealed trait Future[+T] { * Essentially this is the Promise (or write-side) of a Future (read-side) */ trait CompletableFuture[T] extends Future[T] { + /** + * Completes this Future with the specified result, if not already completed, + * returns this + */ def complete(value: Either[Throwable, T]): CompletableFuture[T] + + /** + * Completes this Future with the specified result, if not already completed, + * returns this + */ final def completeWithResult(result: T): CompletableFuture[T] = complete(Right(result)) + + /** + * Completes this Future with the specified exception, if not already completed, + * returns this + */ final def completeWithException(exception: Throwable): CompletableFuture[T] = complete(Left(exception)) + + /** + * Completes this Future with the specified other Future, when that Future is completed, + * unless this Future has already been completed + * returns this + */ final def completeWith(other: Future[T]): CompletableFuture[T] = { other onComplete { f => complete(f.value.get) } this } + + /** + * Alias for complete(Right(value)) + */ + final def << (value: T): CompletableFuture[T] = complete(Right(value)) + + /** + * Alias for completeWith(other) + */ + final def << (other : Future[T]): CompletableFuture[T] = completeWith(other) } /** diff --git a/akka-actor/src/main/scala/akka/japi/JavaAPI.scala b/akka-actor/src/main/scala/akka/japi/JavaAPI.scala index 4454ed117a..20cd33b311 100644 --- a/akka-actor/src/main/scala/akka/japi/JavaAPI.scala +++ b/akka-actor/src/main/scala/akka/japi/JavaAPI.scala @@ -7,6 +7,13 @@ trait Function[T,R] { def apply(param: T): R } +/** + * A Function interface. Used to create 2-arg first-class-functions is Java (sort of). + */ +trait Function2[T1, T2, R] { + def apply(arg1: T1, arg2: T2): R +} + /** A Procedure is like a Function, but it doesn't produce a return value */ trait Procedure[T] { diff --git a/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala index 9e47ecdbe2..2cdd9b340d 100644 --- a/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala @@ -324,4 +324,34 @@ class FutureSpec extends JUnitSuite { // make sure all futures are completed in dispatcher assert(Dispatchers.defaultGlobalDispatcher.futureQueueSize === 0) } + + @Test def shouldBlockUntilResult { + val latch = new StandardLatch + + val f = Future({ latch.await; 5}) + val f2 = Future({ f() + 5 }) + + assert(f2.resultOrException === None) + latch.open + assert(f2() === 10) + } + + @Test def lesslessIsMore { + import akka.actor.Actor.spawn + val dataflowVar, dataflowVar2 = new DefaultCompletableFuture[Int](Long.MaxValue) + val begin, end = new StandardLatch + spawn { + begin.await + dataflowVar2 << dataflowVar + end.open + } + + spawn { + dataflowVar << 5 + } + begin.open + end.await + assert(dataflowVar2() === 5) + assert(dataflowVar.get === 5) + } }