diff --git a/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java b/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java index d35946ce60..cdec7f5631 100644 --- a/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java +++ b/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java @@ -3,28 +3,50 @@ package akka.dispatch; import org.junit.Test; import static org.junit.Assert.*; import java.util.concurrent.Callable; +import java.util.LinkedList; import akka.japi.Function; import akka.japi.Procedure; import scala.Some; import scala.Right; import static akka.dispatch.Futures.future; +import static akka.dispatch.Futures.traverse; +import static akka.dispatch.Futures.sequence; -@SuppressWarnings("unchecked") public class JavaFutureTests { +public class JavaFutureTests { @Test public void mustBeAbleToMapAFuture() { - Future f1 = future(new Callable() { + Future f1 = future(new Callable() { public String call() { return "Hello"; } }); - Future f2 = f1.map(new Function() { + Future f2 = f1.map(new Function() { public String apply(String s) { return s + " World"; } }); - assertEquals(new Some(new Right("Hello World")), f2.await().value()); + assertEquals("Hello World", f2.get()); + } + + // TODO: Improve this test, perhaps with an Actor + @Test public void mustSequenceAFutureList() { + LinkedList> listFutures = new LinkedList>(); + LinkedList listExpected = new LinkedList(); + + for (int i = 0; i < 10; i++) { + listExpected.add("test"); + listFutures.add(future(new Callable() { + public String call() { + return "test"; + } + })); + } + + Future> futureList = sequence(listFutures); + + assertEquals(futureList.get(), listExpected); } } diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index 6fc96bb6d2..e12294a70d 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -316,11 +316,11 @@ class FutureSpec extends JUnitSuite { }).start() val oddFutures: List[Future[Int]] = List.fill(100)(oddActor !!! 'GetNext) - assert(Futures.sequence(oddFutures).get.sum === 10000) + assert(Future.sequence(oddFutures).get.sum === 10000) oddActor.stop() val list = (1 to 100).toList - assert(Futures.traverse(list)(x => Future(x * 2 - 1)).get.sum === 10000) + assert(Future.traverse(list)(x => Future(x * 2 - 1)).get.sum === 10000) } @Test def shouldHandleThrowables { diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 1f86613b47..c69ca82bad 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -14,6 +14,8 @@ import java.util.concurrent.locks.ReentrantLock import java.util.concurrent. {ConcurrentLinkedQueue, TimeUnit, Callable} import java.util.concurrent.TimeUnit.{NANOSECONDS => NANOS, MILLISECONDS => MILLIS} import java.util.concurrent.atomic. {AtomicBoolean, AtomicInteger} +import java.lang.{Iterable => JIterable} +import java.util.{LinkedList => JLinkedList} import annotation.tailrec class FutureTimeoutException(message: String) extends AkkaException(message) @@ -152,29 +154,47 @@ object Futures { def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Long, fun: akka.japi.Function2[R, T, T]): Future[R] = reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)(fun.apply _) - import scala.collection.mutable.Builder - import scala.collection.generic.CanBuildFrom - /** - * Simple version of Futures.traverse. Transforms a Traversable[Future[A]] into a Future[Traversable[A]]. + * Java API. + * Simple version of Futures.traverse. Transforms a java.lang.Iterable[Future[A]] into a Future[java.util.LinkedList[A]]. * Useful for reducing many Futures into a single Future. */ - def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]], timeout: Long = Actor.TIMEOUT)(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]]): Future[M[A]] = - in.foldLeft(new DefaultCompletableFuture[Builder[A, M[A]]](timeout).completeWithResult(cbf(in)): Future[Builder[A, M[A]]])((fr, fa) => for (r <- fr; a <- fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result) + def sequence[A](in: JIterable[Future[A]], timeout: Long): Future[JLinkedList[A]] = + scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[A]()))((fr, fa) => + for (r <- fr; a <- fa) yield { + r add a + r + }) /** - * Transforms a Traversable[A] into a Future[Traversable[B]] using the provided Function A => Future[B]. - * This is useful for performing a parallel map. For example, to apply a function to all items of a list - * in parallel: - *
-   * val myFutureList = Futures.traverse(myList)(x => Future(myFunc(x)))
-   * 
+ * Java API. + * Simple version of Futures.traverse. Transforms a java.lang.Iterable[Future[A]] into a Future[java.util.LinkedList[A]]. + * Useful for reducing many Futures into a single Future. */ - def traverse[A, B, M[_] <: Traversable[_]](in: M[A], timeout: Long = Actor.TIMEOUT)(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]]): Future[M[B]] = - in.foldLeft(new DefaultCompletableFuture[Builder[B, M[B]]](timeout).completeWithResult(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) => - val fb = fn(a.asInstanceOf[A]) - for (r <- fr; b <-fb) yield (r += b) - }.map(_.result) + def sequence[A](in: JIterable[Future[A]]): Future[JLinkedList[A]] = sequence(in, Actor.TIMEOUT) + + /** + * Java API. + * Transforms a java.lang.Iterable[A] into a Future[java.util.LinkedList[B]] using the provided Function A => Future[B]. + * This is useful for performing a parallel map. For example, to apply a function to all items of a list + * in parallel. + */ + def traverse[A, B](in: JIterable[A], timeout: Long, fn: JFunc[A,Future[B]]): Future[JLinkedList[B]] = + scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[B]())){(fr, a) => + val fb = fn(a) + for (r <- fr; b <- fb) yield { + r add b + r + } + } + + /** + * Java API. + * Transforms a java.lang.Iterable[A] into a Future[java.util.LinkedList[B]] using the provided Function A => Future[B]. + * This is useful for performing a parallel map. For example, to apply a function to all items of a list + * in parallel. + */ + def traverse[A, B](in: JIterable[A], fn: JFunc[A,Future[B]]): Future[JLinkedList[B]] = traverse(in, Actor.TIMEOUT, fn) // ===================================== // Deprecations @@ -217,6 +237,30 @@ object Future { dispatcher.dispatchFuture(FutureInvocation(f.asInstanceOf[CompletableFuture[Any]], () => body)) f } + + import scala.collection.mutable.Builder + import scala.collection.generic.CanBuildFrom + + /** + * Simple version of Futures.traverse. Transforms a Traversable[Future[A]] into a Future[Traversable[A]]. + * Useful for reducing many Futures into a single Future. + */ + def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]], timeout: Long = Actor.TIMEOUT)(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]]): Future[M[A]] = + in.foldLeft(new DefaultCompletableFuture[Builder[A, M[A]]](timeout).completeWithResult(cbf(in)): Future[Builder[A, M[A]]])((fr, fa) => for (r <- fr; a <- fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result) + + /** + * Transforms a Traversable[A] into a Future[Traversable[B]] using the provided Function A => Future[B]. + * This is useful for performing a parallel map. For example, to apply a function to all items of a list + * in parallel: + *
+   * val myFutureList = Futures.traverse(myList)(x => Future(myFunc(x)))
+   * 
+ */ + def traverse[A, B, M[_] <: Traversable[_]](in: M[A], timeout: Long = Actor.TIMEOUT)(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]]): Future[M[B]] = + in.foldLeft(new DefaultCompletableFuture[Builder[B, M[B]]](timeout).completeWithResult(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) => + val fb = fn(a.asInstanceOf[A]) + for (r <- fr; b <-fb) yield (r += b) + }.map(_.result) } sealed trait Future[+T] { diff --git a/akka-docs/pending/futures-scala.rst b/akka-docs/pending/futures-scala.rst index 5cbfc08cea..3426ce0ff6 100644 --- a/akka-docs/pending/futures-scala.rst +++ b/akka-docs/pending/futures-scala.rst @@ -158,24 +158,24 @@ This is fine when dealing with a known amount of Actors, but can grow unwieldly val listOfFutures: List[Future[Int]] = List.fill(100)(oddActor !!! GetNext) // now we have a Future[List[Int]] - val futureList = Futures.sequence(listOfFutures) + val futureList = Future.sequence(listOfFutures) // Find the sum of the odd numbers val oddSum = futureList.map(_.sum).apply -To better explain what happened in the example, Futures.sequence is taking the List[Future[Int]] and turning it into a Future[List[Int]]. We can then use 'map' to work with the List[Int] directly, and we find the sum of the List. +To better explain what happened in the example, Future.sequence is taking the List[Future[Int]] and turning it into a Future[List[Int]]. We can then use 'map' to work with the List[Int] directly, and we find the sum of the List. The 'traverse' method is similar to 'sequence', but it takes a Traversable[A] and a Function T => Future[B] to return a Future[Traversable[B]]. For example, to use 'traverse' to sum the first 100 odd numbers: .. code-block:: scala - val oddSum = Futures.traverse((1 to 100).toList)(x => Future(x * 2 - 1)).map(_.sum).apply + val oddSum = Future.traverse((1 to 100).toList)(x => Future(x * 2 - 1)).map(_.sum).apply This is the same result as this example: .. code-block:: scala - val oddSum = Futures.sequence((1 to 100).toList.map(x => Future(x * 2 - 1))).map(_.sum).apply + val oddSum = Future.sequence((1 to 100).toList.map(x => Future(x * 2 - 1))).map(_.sum).apply But it may be faster to use 'traverse' as it doesn't have to create an intermediate List[Future[Int]].