Adding Java API for reduce, fold, apply and firstCompletedOf, adding << and apply() to CompletableFuture + a lot of docs

This commit is contained in:
Viktor Klang 2011-03-22 22:12:16 +01:00
parent 331f3e6469
commit e23ba6e5bb
3 changed files with 112 additions and 0 deletions

View file

@ -55,6 +55,13 @@ object Futures {
futureResult
}
/**
* Java API
* Returns a Future to the result of the first future in the list that is completed
*/
def firstCompletedOf[T <: AnyRef](futures: java.lang.Iterable[Future[T]], timeout: Long): Future[T] =
firstCompletedOf(scala.collection.JavaConversions.asScalaIterable(futures),timeout)
/**
* A non-blocking fold over the specified futures.
* The fold is performed on the thread where the last future is completed,
@ -95,6 +102,16 @@ object Futures {
}
}
/**
* Java API
* A non-blocking fold over the specified futures.
* The fold is performed on the thread where the last future is completed,
* the result will be the first failure of any of the futures, or any failure in the actual fold,
* or the result of the fold.
*/
def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Long, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] =
fold(zero, timeout)(scala.collection.JavaConversions.asScalaIterable(futures))( fun.apply _ )
/**
* Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
*/
@ -119,6 +136,13 @@ object Futures {
}
}
/**
* Java API
* Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
*/
def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Long, fun: akka.japi.Function2[R, T, T]): Future[R] =
reduce(scala.collection.JavaConversions.asScalaIterable(futures), timeout)(fun.apply _)
import scala.collection.mutable.Builder
import scala.collection.generic.CanBuildFrom
@ -162,6 +186,10 @@ object Futures {
}
object Future {
/**
* This method constructs and returns a Future that will eventually hold the result of the execution of the supplied body
* The execution is performed by the specified Dispatcher.
*/
def apply[T](body: => T, timeout: Long = Actor.TIMEOUT)(implicit dispatcher: MessageDispatcher): Future[T] = {
val f = new DefaultCompletableFuture[T](timeout)
dispatcher.dispatchFuture(FutureInvocation(f.asInstanceOf[CompletableFuture[Any]], () => body))
@ -170,6 +198,23 @@ object Future {
}
sealed trait Future[+T] {
/**
* Returns the result of this future after waiting for it to complete,
* this method will throw any throwable that this Future was completed with
* and will throw a java.util.concurrent.TimeoutException if there is no result
* within the Futures timeout
*/
def apply(): T = this.await.resultOrException match {
case None => throw new java.util.concurrent.TimeoutException("Future timed out!")
case s: Some[T] => s.get
}
/**
* Java API for apply()
*/
def get: T = apply()
/**
* Blocks the current thread until the Future has been completed or the
* timeout has expired. In the case of the timeout expiring a
@ -410,13 +455,43 @@ sealed trait Future[+T] {
* Essentially this is the Promise (or write-side) of a Future (read-side)
*/
trait CompletableFuture[T] extends Future[T] {
/**
* Completes this Future with the specified result, if not already completed,
* returns this
*/
def complete(value: Either[Throwable, T]): CompletableFuture[T]
/**
* Completes this Future with the specified result, if not already completed,
* returns this
*/
final def completeWithResult(result: T): CompletableFuture[T] = complete(Right(result))
/**
* Completes this Future with the specified exception, if not already completed,
* returns this
*/
final def completeWithException(exception: Throwable): CompletableFuture[T] = complete(Left(exception))
/**
* Completes this Future with the specified other Future, when that Future is completed,
* unless this Future has already been completed
* returns this
*/
final def completeWith(other: Future[T]): CompletableFuture[T] = {
other onComplete { f => complete(f.value.get) }
this
}
/**
* Alias for complete(Right(value))
*/
final def << (value: T): CompletableFuture[T] = complete(Right(value))
/**
* Alias for completeWith(other)
*/
final def << (other : Future[T]): CompletableFuture[T] = completeWith(other)
}
/**

View file

@ -7,6 +7,13 @@ trait Function[T,R] {
def apply(param: T): R
}
/**
* A Function interface. Used to create 2-arg first-class-functions is Java (sort of).
*/
trait Function2[T1, T2, R] {
def apply(arg1: T1, arg2: T2): R
}
/** A Procedure is like a Function, but it doesn't produce a return value
*/
trait Procedure[T] {

View file

@ -324,4 +324,34 @@ class FutureSpec extends JUnitSuite {
// make sure all futures are completed in dispatcher
assert(Dispatchers.defaultGlobalDispatcher.futureQueueSize === 0)
}
@Test def shouldBlockUntilResult {
val latch = new StandardLatch
val f = Future({ latch.await; 5})
val f2 = Future({ f() + 5 })
assert(f2.resultOrException === None)
latch.open
assert(f2() === 10)
}
@Test def lesslessIsMore {
import akka.actor.Actor.spawn
val dataflowVar, dataflowVar2 = new DefaultCompletableFuture[Int](Long.MaxValue)
val begin, end = new StandardLatch
spawn {
begin.await
dataflowVar2 << dataflowVar
end.open
}
spawn {
dataflowVar << 5
}
begin.open
end.await
assert(dataflowVar2() === 5)
assert(dataflowVar.get === 5)
}
}