#1180 - moving the Java API to Futures and Scala API to Future

This commit is contained in:
Viktor Klang 2011-09-08 15:54:06 +02:00
parent 24fb967988
commit d8390a61f6
4 changed files with 128 additions and 128 deletions

View file

@ -291,15 +291,15 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
"firstCompletedOf" in {
val futures = Vector.fill[Future[Int]](10)(new DefaultPromise[Int]()) :+ new KeptPromise[Int](Right(5))
Futures.firstCompletedOf(futures).get must be(5)
Future.firstCompletedOf(futures).get must be(5)
}
"find" in {
val futures = for (i 1 to 10) yield Future { i }
val result = Futures.find[Int](_ == 3)(futures)
val result = Future.find[Int](_ == 3)(futures)
result.get must be(Some(3))
val notFound = Futures.find[Int](_ == 11)(futures)
val notFound = Future.find[Int](_ == 11)(futures)
notFound.get must be(None)
}
@ -311,7 +311,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
}
val timeout = 10000
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 200), timeout).mapTo[Int] }
Futures.fold(0, timeout)(futures)(_ + _).get must be(45)
Future.fold(0, timeout)(futures)(_ + _).get must be(45)
}
"fold by composing" in {
@ -338,7 +338,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
}
val timeout = 10000
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 100), timeout).mapTo[Int] }
Futures.fold(0, timeout)(futures)(_ + _).await.exception.get.getMessage must be("shouldFoldResultsWithException: expected")
Future.fold(0, timeout)(futures)(_ + _).await.exception.get.getMessage must be("shouldFoldResultsWithException: expected")
}
}
@ -346,7 +346,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
import scala.collection.mutable.ArrayBuffer
def test(testNumber: Int) {
val fs = (0 to 1000) map (i Future(i, 10000))
val result = Futures.fold(ArrayBuffer.empty[AnyRef], 10000)(fs) {
val result = Future.fold(ArrayBuffer.empty[AnyRef], 10000)(fs) {
case (l, i) if i % 2 == 0 l += i.asInstanceOf[AnyRef]
case (l, _) l
}.get.asInstanceOf[ArrayBuffer[Int]].sum
@ -358,7 +358,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
}
"return zero value if folding empty list" in {
Futures.fold(0)(List[Future[Int]]())(_ + _).get must be(0)
Future.fold(0)(List[Future[Int]]())(_ + _).get must be(0)
}
"shouldReduceResults" in {
@ -369,7 +369,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
}
val timeout = 10000
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 200), timeout).mapTo[Int] }
assert(Futures.reduce(futures, timeout)(_ + _).get === 45)
assert(Future.reduce(futures, timeout)(_ + _).get === 45)
}
"shouldReduceResultsWithException" in {
@ -386,13 +386,13 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
}
val timeout = 10000
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 100), timeout).mapTo[Int] }
assert(Futures.reduce(futures, timeout)(_ + _).await.exception.get.getMessage === "shouldFoldResultsWithException: expected")
assert(Future.reduce(futures, timeout)(_ + _).await.exception.get.getMessage === "shouldFoldResultsWithException: expected")
}
}
"shouldReduceThrowIAEOnEmptyInput" in {
filterException[IllegalArgumentException] {
intercept[UnsupportedOperationException] { Futures.reduce(List[Future[Int]]())(_ + _).get }
intercept[UnsupportedOperationException] { Future.reduce(List[Future[Int]]())(_ + _).get }
}
}

View file

@ -64,45 +64,13 @@ object Futures {
def future[T](body: Callable[T], timeout: Long, dispatcher: MessageDispatcher): Future[T] =
Future(body.call)(dispatcher, timeout)
/**
* Returns a Future to the result of the first future in the list that is completed
*/
def firstCompletedOf[T](futures: Iterable[Future[T]], timeout: Timeout = Timeout.never): Future[T] = {
val futureResult = new DefaultPromise[T](timeout)
val completeFirst: Future[T] Unit = _.value.foreach(futureResult complete _)
futures.foreach(_ onComplete completeFirst)
futureResult
}
/**
* Returns a Future that will hold the optional result of the first Future with a result that matches the predicate
*/
def find[T](predicate: T Boolean, timeout: Timeout = Timeout.default)(futures: Iterable[Future[T]]): Future[Option[T]] = {
if (futures.isEmpty) new KeptPromise[Option[T]](Right(None))
else {
val result = new DefaultPromise[Option[T]](timeout)
val ref = new AtomicInteger(futures.size)
val search: Future[T] Unit = f try {
f.result.filter(predicate).foreach(r result completeWithResult Some(r))
} finally {
if (ref.decrementAndGet == 0)
result completeWithResult None
}
futures.foreach(_ onComplete search)
result
}
}
/**
* Java API.
* Returns a Future that will hold the optional result of the first Future with a result that matches the predicate
*/
def find[T <: AnyRef](futures: JIterable[Future[T]], predicate: JFunc[T, java.lang.Boolean], timeout: Timeout): Future[JOption[T]] = {
val pred: T Boolean = predicate.apply(_)
find[T](pred, timeout)(scala.collection.JavaConversions.iterableAsScalaIterable(futures)).map(JOption.fromScalaOption(_))
Future.find[T](pred, timeout)(scala.collection.JavaConversions.iterableAsScalaIterable(futures)).map(JOption.fromScalaOption(_))
}
/**
@ -110,59 +78,7 @@ object Futures {
* Returns a Future to the result of the first future in the list that is completed
*/
def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]], timeout: Timeout): Future[T] =
firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)
/**
* A non-blocking fold over the specified futures.
* The fold is performed on the thread where the last future is completed,
* the result will be the first failure of any of the futures, or any failure in the actual fold,
* or the result of the fold.
* Example:
* <pre>
* val result = Futures.fold(0)(futures)(_ + _).await.result
* </pre>
*/
def fold[T, R](zero: R, timeout: Timeout = Timeout.default)(futures: Iterable[Future[T]])(foldFun: (R, T) R): Future[R] = {
if (futures.isEmpty) {
new KeptPromise[R](Right(zero))
} else {
val result = new DefaultPromise[R](timeout)
val results = new ConcurrentLinkedQueue[T]()
val done = new Switch(false)
val allDone = futures.size
val aggregate: Future[T] Unit = f if (done.isOff && !result.isCompleted) { //TODO: This is an optimization, is it premature?
f.value.get match {
case Right(value)
val added = results add value
if (added && results.size == allDone) { //Only one thread can get here
if (done.switchOn) {
try {
val i = results.iterator
var currentValue = zero
while (i.hasNext) { currentValue = foldFun(currentValue, i.next) }
result completeWithResult currentValue
} catch {
case e: Exception
EventHandler.error(e, this, e.getMessage)
result completeWithException e
} finally {
results.clear
}
}
}
case Left(exception)
if (done.switchOn) {
result completeWithException exception
results.clear
}
}
}
futures foreach { _ onComplete aggregate }
result
}
}
Future.firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)
/**
* Java API
@ -172,50 +88,24 @@ object Futures {
* or the result of the fold.
*/
def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Timeout, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] =
fold(zero, timeout)(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(fun.apply _)
Future.fold(zero, timeout)(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(fun.apply _)
def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Long, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = fold(zero, timeout: Timeout, futures, fun)
def fold[T <: AnyRef, R <: AnyRef](zero: R, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = fold(zero, Timeout.default, futures, fun)
/**
* Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
* Example:
* <pre>
* val result = Futures.reduce(futures)(_ + _).await.result
* </pre>
*/
def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Timeout = Timeout.default)(op: (R, T) T): Future[R] = {
if (futures.isEmpty)
new KeptPromise[R](Left(new UnsupportedOperationException("empty reduce left")))
else {
val result = new DefaultPromise[R](timeout)
val seedFound = new AtomicBoolean(false)
val seedFold: Future[T] Unit = f {
if (seedFound.compareAndSet(false, true)) { //Only the first completed should trigger the fold
f.value.get match {
case Right(value) result.completeWith(fold(value, timeout)(futures.filterNot(_ eq f))(op))
case Left(exception) result.completeWithException(exception)
}
}
}
for (f futures) f onComplete seedFold //Attach the listener to the Futures
result
}
}
/**
* Java API.
* Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
*/
def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Timeout, fun: akka.japi.Function2[R, T, T]): Future[R] =
reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)(fun.apply _)
Future.reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)(fun.apply _)
def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Long, fun: akka.japi.Function2[R, T, T]): Future[R] = reduce(futures, timeout: Timeout, fun)
/**
* Java API.
* Simple version of Futures.traverse. Transforms a java.lang.Iterable[Future[A]] into a Future[java.lang.Iterable[A]].
* Simple version of Future.traverse. Transforms a java.lang.Iterable[Future[A]] into a Future[java.lang.Iterable[A]].
* Useful for reducing many Futures into a single Future.
*/
def sequence[A](in: JIterable[Future[A]], timeout: Timeout): Future[JIterable[A]] =
@ -298,6 +188,116 @@ object Future {
def sequence[A, M[_] <: Traversable[_]](timeout: Timeout)(in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]]): Future[M[A]] =
sequence(in)(cbf, timeout)
/**
* Returns a Future to the result of the first future in the list that is completed
*/
def firstCompletedOf[T](futures: Iterable[Future[T]], timeout: Timeout = Timeout.never): Future[T] = {
val futureResult = new DefaultPromise[T](timeout)
val completeFirst: Future[T] Unit = _.value.foreach(futureResult complete _)
futures.foreach(_ onComplete completeFirst)
futureResult
}
/**
* Returns a Future that will hold the optional result of the first Future with a result that matches the predicate
*/
def find[T](predicate: T Boolean, timeout: Timeout = Timeout.default)(futures: Iterable[Future[T]]): Future[Option[T]] = {
if (futures.isEmpty) new KeptPromise[Option[T]](Right(None))
else {
val result = new DefaultPromise[Option[T]](timeout)
val ref = new AtomicInteger(futures.size)
val search: Future[T] Unit = f try {
f.result.filter(predicate).foreach(r result completeWithResult Some(r))
} finally {
if (ref.decrementAndGet == 0)
result completeWithResult None
}
futures.foreach(_ onComplete search)
result
}
}
/**
* A non-blocking fold over the specified futures.
* The fold is performed on the thread where the last future is completed,
* the result will be the first failure of any of the futures, or any failure in the actual fold,
* or the result of the fold.
* Example:
* <pre>
* val result = Futures.fold(0)(futures)(_ + _).await.result
* </pre>
*/
def fold[T, R](zero: R, timeout: Timeout = Timeout.default)(futures: Iterable[Future[T]])(foldFun: (R, T) R): Future[R] = {
if (futures.isEmpty) {
new KeptPromise[R](Right(zero))
} else {
val result = new DefaultPromise[R](timeout)
val results = new ConcurrentLinkedQueue[T]()
val done = new Switch(false)
val allDone = futures.size
val aggregate: Future[T] Unit = f if (done.isOff && !result.isCompleted) { //TODO: This is an optimization, is it premature?
f.value.get match {
case Right(value)
val added = results add value
if (added && results.size == allDone) { //Only one thread can get here
if (done.switchOn) {
try {
val i = results.iterator
var currentValue = zero
while (i.hasNext) { currentValue = foldFun(currentValue, i.next) }
result completeWithResult currentValue
} catch {
case e: Exception
EventHandler.error(e, this, e.getMessage)
result completeWithException e
} finally {
results.clear
}
}
}
case Left(exception)
if (done.switchOn) {
result completeWithException exception
results.clear
}
}
}
futures foreach { _ onComplete aggregate }
result
}
}
/**
* Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
* Example:
* <pre>
* val result = Futures.reduce(futures)(_ + _).await.result
* </pre>
*/
def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Timeout = Timeout.default)(op: (R, T) T): Future[R] = {
if (futures.isEmpty)
new KeptPromise[R](Left(new UnsupportedOperationException("empty reduce left")))
else {
val result = new DefaultPromise[R](timeout)
val seedFound = new AtomicBoolean(false)
val seedFold: Future[T] Unit = f {
if (seedFound.compareAndSet(false, true)) { //Only the first completed should trigger the fold
f.value.get match {
case Right(value) result.completeWith(fold(value, timeout)(futures.filterNot(_ eq f))(op))
case Left(exception) result.completeWithException(exception)
}
}
}
for (f futures) f onComplete seedFold //Attach the listener to the Futures
result
}
}
/**
* Transforms a Traversable[A] into a Future[Traversable[B]] using the provided Function A Future[B].
* This is useful for performing a parallel map. For example, to apply a function to all items of a list

View file

@ -574,5 +574,5 @@ trait ScatterGatherRouter extends BasicRouter with Serializable {
*/
class ScatterGatherFirstCompletedRouter extends RoundRobinRouter with ScatterGatherRouter {
protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G] = Futures.firstCompletedOf(results)
protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G] = Future.firstCompletedOf(results)
}

View file

@ -199,7 +199,7 @@ Then there's a method that's called ``fold`` that takes a start-value, a sequenc
val futures = for(i <- 1 to 1000) yield Future(i * 2) // Create a sequence of Futures
val futureSum = Futures.fold(0)(futures)(_ + _)
val futureSum = Future.fold(0)(futures)(_ + _)
That's all it takes!
@ -210,7 +210,7 @@ If the sequence passed to ``fold`` is empty, it will return the start-value, in
val futures = for(i <- 1 to 1000) yield Future(i * 2) // Create a sequence of Futures
val futureSum = Futures.reduce(futures)(_ + _)
val futureSum = Future.reduce(futures)(_ + _)
Same as with ``fold``, the execution will be done by the Thread that completes the last of the Futures, you can also parallize it by chunking your futures into sub-sequences and reduce them, and then reduce the reduced results again.