Add Java API versions of Future.{traverse, sequence}, closes #786
This commit is contained in:
parent
414122bf6c
commit
ff711a4253
4 changed files with 93 additions and 27 deletions
|
|
@ -3,28 +3,50 @@ package akka.dispatch;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.LinkedList;
|
||||||
import akka.japi.Function;
|
import akka.japi.Function;
|
||||||
import akka.japi.Procedure;
|
import akka.japi.Procedure;
|
||||||
import scala.Some;
|
import scala.Some;
|
||||||
import scala.Right;
|
import scala.Right;
|
||||||
import static akka.dispatch.Futures.future;
|
import static akka.dispatch.Futures.future;
|
||||||
|
import static akka.dispatch.Futures.traverse;
|
||||||
|
import static akka.dispatch.Futures.sequence;
|
||||||
|
|
||||||
@SuppressWarnings("unchecked") public class JavaFutureTests {
|
public class JavaFutureTests {
|
||||||
|
|
||||||
@Test public void mustBeAbleToMapAFuture() {
|
@Test public void mustBeAbleToMapAFuture() {
|
||||||
Future f1 = future(new Callable<String>() {
|
Future<String> f1 = future(new Callable<String>() {
|
||||||
public String call() {
|
public String call() {
|
||||||
return "Hello";
|
return "Hello";
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
Future f2 = f1.map(new Function<String, String>() {
|
Future<String> f2 = f1.map(new Function<String, String>() {
|
||||||
public String apply(String s) {
|
public String apply(String s) {
|
||||||
return s + " World";
|
return s + " World";
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
assertEquals(new Some(new Right("Hello World")), f2.await().value());
|
assertEquals("Hello World", f2.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Improve this test, perhaps with an Actor
|
||||||
|
@Test public void mustSequenceAFutureList() {
|
||||||
|
LinkedList<Future<String>> listFutures = new LinkedList<Future<String>>();
|
||||||
|
LinkedList<String> listExpected = new LinkedList<String>();
|
||||||
|
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
listExpected.add("test");
|
||||||
|
listFutures.add(future(new Callable<String>() {
|
||||||
|
public String call() {
|
||||||
|
return "test";
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<LinkedList<String>> futureList = sequence(listFutures);
|
||||||
|
|
||||||
|
assertEquals(futureList.get(), listExpected);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -316,11 +316,11 @@ class FutureSpec extends JUnitSuite {
|
||||||
}).start()
|
}).start()
|
||||||
|
|
||||||
val oddFutures: List[Future[Int]] = List.fill(100)(oddActor !!! 'GetNext)
|
val oddFutures: List[Future[Int]] = List.fill(100)(oddActor !!! 'GetNext)
|
||||||
assert(Futures.sequence(oddFutures).get.sum === 10000)
|
assert(Future.sequence(oddFutures).get.sum === 10000)
|
||||||
oddActor.stop()
|
oddActor.stop()
|
||||||
|
|
||||||
val list = (1 to 100).toList
|
val list = (1 to 100).toList
|
||||||
assert(Futures.traverse(list)(x => Future(x * 2 - 1)).get.sum === 10000)
|
assert(Future.traverse(list)(x => Future(x * 2 - 1)).get.sum === 10000)
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test def shouldHandleThrowables {
|
@Test def shouldHandleThrowables {
|
||||||
|
|
|
||||||
|
|
@ -14,6 +14,8 @@ import java.util.concurrent.locks.ReentrantLock
|
||||||
import java.util.concurrent. {ConcurrentLinkedQueue, TimeUnit, Callable}
|
import java.util.concurrent. {ConcurrentLinkedQueue, TimeUnit, Callable}
|
||||||
import java.util.concurrent.TimeUnit.{NANOSECONDS => NANOS, MILLISECONDS => MILLIS}
|
import java.util.concurrent.TimeUnit.{NANOSECONDS => NANOS, MILLISECONDS => MILLIS}
|
||||||
import java.util.concurrent.atomic. {AtomicBoolean, AtomicInteger}
|
import java.util.concurrent.atomic. {AtomicBoolean, AtomicInteger}
|
||||||
|
import java.lang.{Iterable => JIterable}
|
||||||
|
import java.util.{LinkedList => JLinkedList}
|
||||||
import annotation.tailrec
|
import annotation.tailrec
|
||||||
|
|
||||||
class FutureTimeoutException(message: String) extends AkkaException(message)
|
class FutureTimeoutException(message: String) extends AkkaException(message)
|
||||||
|
|
@ -152,29 +154,47 @@ object Futures {
|
||||||
def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Long, fun: akka.japi.Function2[R, T, T]): Future[R] =
|
def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Long, fun: akka.japi.Function2[R, T, T]): Future[R] =
|
||||||
reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)(fun.apply _)
|
reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)(fun.apply _)
|
||||||
|
|
||||||
import scala.collection.mutable.Builder
|
|
||||||
import scala.collection.generic.CanBuildFrom
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Simple version of Futures.traverse. Transforms a Traversable[Future[A]] into a Future[Traversable[A]].
|
* Java API.
|
||||||
|
* Simple version of Futures.traverse. Transforms a java.lang.Iterable[Future[A]] into a Future[java.util.LinkedList[A]].
|
||||||
* Useful for reducing many Futures into a single Future.
|
* Useful for reducing many Futures into a single Future.
|
||||||
*/
|
*/
|
||||||
def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]], timeout: Long = Actor.TIMEOUT)(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]]): Future[M[A]] =
|
def sequence[A](in: JIterable[Future[A]], timeout: Long): Future[JLinkedList[A]] =
|
||||||
in.foldLeft(new DefaultCompletableFuture[Builder[A, M[A]]](timeout).completeWithResult(cbf(in)): Future[Builder[A, M[A]]])((fr, fa) => for (r <- fr; a <- fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result)
|
scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[A]()))((fr, fa) =>
|
||||||
|
for (r <- fr; a <- fa) yield {
|
||||||
|
r add a
|
||||||
|
r
|
||||||
|
})
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Transforms a Traversable[A] into a Future[Traversable[B]] using the provided Function A => Future[B].
|
* Java API.
|
||||||
* This is useful for performing a parallel map. For example, to apply a function to all items of a list
|
* Simple version of Futures.traverse. Transforms a java.lang.Iterable[Future[A]] into a Future[java.util.LinkedList[A]].
|
||||||
* in parallel:
|
* Useful for reducing many Futures into a single Future.
|
||||||
* <pre>
|
|
||||||
* val myFutureList = Futures.traverse(myList)(x => Future(myFunc(x)))
|
|
||||||
* </pre>
|
|
||||||
*/
|
*/
|
||||||
def traverse[A, B, M[_] <: Traversable[_]](in: M[A], timeout: Long = Actor.TIMEOUT)(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]]): Future[M[B]] =
|
def sequence[A](in: JIterable[Future[A]]): Future[JLinkedList[A]] = sequence(in, Actor.TIMEOUT)
|
||||||
in.foldLeft(new DefaultCompletableFuture[Builder[B, M[B]]](timeout).completeWithResult(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) =>
|
|
||||||
val fb = fn(a.asInstanceOf[A])
|
/**
|
||||||
for (r <- fr; b <-fb) yield (r += b)
|
* Java API.
|
||||||
}.map(_.result)
|
* Transforms a java.lang.Iterable[A] into a Future[java.util.LinkedList[B]] using the provided Function A => Future[B].
|
||||||
|
* This is useful for performing a parallel map. For example, to apply a function to all items of a list
|
||||||
|
* in parallel.
|
||||||
|
*/
|
||||||
|
def traverse[A, B](in: JIterable[A], timeout: Long, fn: JFunc[A,Future[B]]): Future[JLinkedList[B]] =
|
||||||
|
scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[B]())){(fr, a) =>
|
||||||
|
val fb = fn(a)
|
||||||
|
for (r <- fr; b <- fb) yield {
|
||||||
|
r add b
|
||||||
|
r
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Java API.
|
||||||
|
* Transforms a java.lang.Iterable[A] into a Future[java.util.LinkedList[B]] using the provided Function A => Future[B].
|
||||||
|
* This is useful for performing a parallel map. For example, to apply a function to all items of a list
|
||||||
|
* in parallel.
|
||||||
|
*/
|
||||||
|
def traverse[A, B](in: JIterable[A], fn: JFunc[A,Future[B]]): Future[JLinkedList[B]] = traverse(in, Actor.TIMEOUT, fn)
|
||||||
|
|
||||||
// =====================================
|
// =====================================
|
||||||
// Deprecations
|
// Deprecations
|
||||||
|
|
@ -217,6 +237,30 @@ object Future {
|
||||||
dispatcher.dispatchFuture(FutureInvocation(f.asInstanceOf[CompletableFuture[Any]], () => body))
|
dispatcher.dispatchFuture(FutureInvocation(f.asInstanceOf[CompletableFuture[Any]], () => body))
|
||||||
f
|
f
|
||||||
}
|
}
|
||||||
|
|
||||||
|
import scala.collection.mutable.Builder
|
||||||
|
import scala.collection.generic.CanBuildFrom
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simple version of Futures.traverse. Transforms a Traversable[Future[A]] into a Future[Traversable[A]].
|
||||||
|
* Useful for reducing many Futures into a single Future.
|
||||||
|
*/
|
||||||
|
def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]], timeout: Long = Actor.TIMEOUT)(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]]): Future[M[A]] =
|
||||||
|
in.foldLeft(new DefaultCompletableFuture[Builder[A, M[A]]](timeout).completeWithResult(cbf(in)): Future[Builder[A, M[A]]])((fr, fa) => for (r <- fr; a <- fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Transforms a Traversable[A] into a Future[Traversable[B]] using the provided Function A => Future[B].
|
||||||
|
* This is useful for performing a parallel map. For example, to apply a function to all items of a list
|
||||||
|
* in parallel:
|
||||||
|
* <pre>
|
||||||
|
* val myFutureList = Futures.traverse(myList)(x => Future(myFunc(x)))
|
||||||
|
* </pre>
|
||||||
|
*/
|
||||||
|
def traverse[A, B, M[_] <: Traversable[_]](in: M[A], timeout: Long = Actor.TIMEOUT)(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]]): Future[M[B]] =
|
||||||
|
in.foldLeft(new DefaultCompletableFuture[Builder[B, M[B]]](timeout).completeWithResult(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) =>
|
||||||
|
val fb = fn(a.asInstanceOf[A])
|
||||||
|
for (r <- fr; b <-fb) yield (r += b)
|
||||||
|
}.map(_.result)
|
||||||
}
|
}
|
||||||
|
|
||||||
sealed trait Future[+T] {
|
sealed trait Future[+T] {
|
||||||
|
|
|
||||||
|
|
@ -158,24 +158,24 @@ This is fine when dealing with a known amount of Actors, but can grow unwieldly
|
||||||
val listOfFutures: List[Future[Int]] = List.fill(100)(oddActor !!! GetNext)
|
val listOfFutures: List[Future[Int]] = List.fill(100)(oddActor !!! GetNext)
|
||||||
|
|
||||||
// now we have a Future[List[Int]]
|
// now we have a Future[List[Int]]
|
||||||
val futureList = Futures.sequence(listOfFutures)
|
val futureList = Future.sequence(listOfFutures)
|
||||||
|
|
||||||
// Find the sum of the odd numbers
|
// Find the sum of the odd numbers
|
||||||
val oddSum = futureList.map(_.sum).apply
|
val oddSum = futureList.map(_.sum).apply
|
||||||
|
|
||||||
To better explain what happened in the example, Futures.sequence is taking the List[Future[Int]] and turning it into a Future[List[Int]]. We can then use 'map' to work with the List[Int] directly, and we find the sum of the List.
|
To better explain what happened in the example, Future.sequence is taking the List[Future[Int]] and turning it into a Future[List[Int]]. We can then use 'map' to work with the List[Int] directly, and we find the sum of the List.
|
||||||
|
|
||||||
The 'traverse' method is similar to 'sequence', but it takes a Traversable[A] and a Function T => Future[B] to return a Future[Traversable[B]]. For example, to use 'traverse' to sum the first 100 odd numbers:
|
The 'traverse' method is similar to 'sequence', but it takes a Traversable[A] and a Function T => Future[B] to return a Future[Traversable[B]]. For example, to use 'traverse' to sum the first 100 odd numbers:
|
||||||
|
|
||||||
.. code-block:: scala
|
.. code-block:: scala
|
||||||
|
|
||||||
val oddSum = Futures.traverse((1 to 100).toList)(x => Future(x * 2 - 1)).map(_.sum).apply
|
val oddSum = Future.traverse((1 to 100).toList)(x => Future(x * 2 - 1)).map(_.sum).apply
|
||||||
|
|
||||||
This is the same result as this example:
|
This is the same result as this example:
|
||||||
|
|
||||||
.. code-block:: scala
|
.. code-block:: scala
|
||||||
|
|
||||||
val oddSum = Futures.sequence((1 to 100).toList.map(x => Future(x * 2 - 1))).map(_.sum).apply
|
val oddSum = Future.sequence((1 to 100).toList.map(x => Future(x * 2 - 1))).map(_.sum).apply
|
||||||
|
|
||||||
But it may be faster to use 'traverse' as it doesn't have to create an intermediate List[Future[Int]].
|
But it may be faster to use 'traverse' as it doesn't have to create an intermediate List[Future[Int]].
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue