Removing FutureFactory and reintroducing Futures (for Java API)

This commit is contained in:
Viktor Klang 2011-12-12 14:39:10 +01:00
parent 0b6a1a0e54
commit 7eced71a85
2 changed files with 46 additions and 52 deletions

View file

@ -23,14 +23,12 @@ import akka.testkit.AkkaSpec;
public class JavaFutureTests { public class JavaFutureTests {
private static ActorSystem system; private static ActorSystem system;
private volatile static FutureFactory ff;
private static Timeout t; private static Timeout t;
@BeforeClass @BeforeClass
public static void beforeAll() { public static void beforeAll() {
system = ActorSystem.create("JavaFutureTests", AkkaSpec.testConf()); system = ActorSystem.create("JavaFutureTests", AkkaSpec.testConf());
t = system.settings().ActorTimeout(); t = system.settings().ActorTimeout();
ff = new FutureFactory(system.dispatcher());
} }
@AfterClass @AfterClass
@ -42,11 +40,11 @@ public class JavaFutureTests {
@Test @Test
public void mustBeAbleToMapAFuture() { public void mustBeAbleToMapAFuture() {
Future<String> f1 = ff.future(new Callable<String>() { Future<String> f1 = Futures.future(new Callable<String>() {
public String call() { public String call() {
return "Hello"; return "Hello";
} }
}); }, system.dispatcher());
Future<String> f2 = f1.map(new Function<String, String>() { Future<String> f2 = f1.map(new Function<String, String>() {
public String apply(String s) { public String apply(String s) {
@ -60,7 +58,7 @@ public class JavaFutureTests {
@Test @Test
public void mustBeAbleToExecuteAnOnResultCallback() throws Throwable { public void mustBeAbleToExecuteAnOnResultCallback() throws Throwable {
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
Promise<String> cf = ff.<String>promise(); Promise<String> cf = Futures.promise(system.dispatcher());
Future<String> f = cf; Future<String> f = cf;
f.onResult(new Procedure<String>() { f.onResult(new Procedure<String>() {
public void apply(String result) { public void apply(String result) {
@ -77,7 +75,7 @@ public class JavaFutureTests {
@Test @Test
public void mustBeAbleToExecuteAnOnExceptionCallback() throws Throwable { public void mustBeAbleToExecuteAnOnExceptionCallback() throws Throwable {
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
Promise<String> cf = ff.<String>promise(); Promise<String> cf = Futures.promise(system.dispatcher());
Future<String> f = cf; Future<String> f = cf;
f.onException(new Procedure<Throwable>() { f.onException(new Procedure<Throwable>() {
public void apply(Throwable t) { public void apply(Throwable t) {
@ -95,7 +93,7 @@ public class JavaFutureTests {
@Test @Test
public void mustBeAbleToExecuteAnOnCompleteCallback() throws Throwable { public void mustBeAbleToExecuteAnOnCompleteCallback() throws Throwable {
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
Promise<String> cf = ff.<String>promise(); Promise<String> cf = Futures.promise(system.dispatcher());
Future<String> f = cf; Future<String> f = cf;
f.onComplete(new Procedure<Future<String>>() { f.onComplete(new Procedure<Future<String>>() {
public void apply(akka.dispatch.Future<String> future) { public void apply(akka.dispatch.Future<String> future) {
@ -111,7 +109,7 @@ public class JavaFutureTests {
@Test @Test
public void mustBeAbleToForeachAFuture() throws Throwable { public void mustBeAbleToForeachAFuture() throws Throwable {
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
Promise<String> cf = ff.<String>promise(); Promise<String> cf = Futures.promise(system.dispatcher());
Future<String> f = cf; Future<String> f = cf;
f.foreach(new Procedure<String>() { f.foreach(new Procedure<String>() {
public void apply(String future) { public void apply(String future) {
@ -127,13 +125,13 @@ public class JavaFutureTests {
@Test @Test
public void mustBeAbleToFlatMapAFuture() throws Throwable { public void mustBeAbleToFlatMapAFuture() throws Throwable {
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
Promise<String> cf = ff.<String>promise(); Promise<String> cf = Futures.promise(system.dispatcher());
cf.completeWithResult("1000"); cf.completeWithResult("1000");
Future<String> f = cf; Future<String> f = cf;
Future<Integer> r = f.flatMap(new Function<String, Future<Integer>>() { Future<Integer> r = f.flatMap(new Function<String, Future<Integer>>() {
public Future<Integer> apply(String r) { public Future<Integer> apply(String r) {
latch.countDown(); latch.countDown();
Promise<Integer> cf = ff.<Integer>promise(); Promise<Integer> cf = Futures.promise(system.dispatcher());
cf.completeWithResult(Integer.parseInt(r)); cf.completeWithResult(Integer.parseInt(r));
return cf; return cf;
} }
@ -147,7 +145,7 @@ public class JavaFutureTests {
@Test @Test
public void mustBeAbleToFilterAFuture() throws Throwable { public void mustBeAbleToFilterAFuture() throws Throwable {
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
Promise<String> cf = ff.<String>promise(); Promise<String> cf = Futures.promise(system.dispatcher());
Future<String> f = cf; Future<String> f = cf;
Future<String> r = f.filter(new Function<String, Boolean>() { Future<String> r = f.filter(new Function<String, Boolean>() {
public Boolean apply(String r) { public Boolean apply(String r) {
@ -170,14 +168,14 @@ public class JavaFutureTests {
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
listExpected.add("test"); listExpected.add("test");
listFutures.add(ff.future(new Callable<String>() { listFutures.add(Futures.future(new Callable<String>() {
public String call() { public String call() {
return "test"; return "test";
} }
})); }, system.dispatcher()));
} }
Future<Iterable<String>> futureList = ff.sequence(listFutures); Future<Iterable<String>> futureList = Futures.sequence(listFutures, system.dispatcher());
assertEquals(futureList.get(), listExpected); assertEquals(futureList.get(), listExpected);
} }
@ -190,18 +188,18 @@ public class JavaFutureTests {
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
expected.append("test"); expected.append("test");
listFutures.add(ff.future(new Callable<String>() { listFutures.add(Futures.future(new Callable<String>() {
public String call() { public String call() {
return "test"; return "test";
} }
})); }, system.dispatcher()));
} }
Future<String> result = ff.fold("", listFutures, new Function2<String, String, String>() { Future<String> result = Futures.fold("", listFutures, new Function2<String, String, String>() {
public String apply(String r, String t) { public String apply(String r, String t) {
return r + t; return r + t;
} }
}); }, system.dispatcher());
assertEquals(result.get(), expected.toString()); assertEquals(result.get(), expected.toString());
} }
@ -213,18 +211,18 @@ public class JavaFutureTests {
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
expected.append("test"); expected.append("test");
listFutures.add(ff.future(new Callable<String>() { listFutures.add(Futures.future(new Callable<String>() {
public String call() { public String call() {
return "test"; return "test";
} }
})); }, system.dispatcher()));
} }
Future<String> result = ff.reduce(listFutures, new Function2<String, String, String>() { Future<String> result = Futures.reduce(listFutures, new Function2<String, String, String>() {
public String apply(String r, String t) { public String apply(String r, String t) {
return r + t; return r + t;
} }
}); }, system.dispatcher());
assertEquals(result.get(), expected.toString()); assertEquals(result.get(), expected.toString());
} }
@ -239,15 +237,15 @@ public class JavaFutureTests {
listStrings.add("test"); listStrings.add("test");
} }
Future<Iterable<String>> result = ff.traverse(listStrings, new Function<String, Future<String>>() { Future<Iterable<String>> result = Futures.traverse(listStrings, new Function<String, Future<String>>() {
public Future<String> apply(final String r) { public Future<String> apply(final String r) {
return ff.future(new Callable<String>() { return Futures.future(new Callable<String>() {
public String call() { public String call() {
return r.toUpperCase(); return r.toUpperCase();
} }
}); }, system.dispatcher());
} }
}); }, system.dispatcher());
assertEquals(result.get(), expectedStrings); assertEquals(result.get(), expectedStrings);
} }
@ -257,25 +255,25 @@ public class JavaFutureTests {
LinkedList<Future<Integer>> listFutures = new LinkedList<Future<Integer>>(); LinkedList<Future<Integer>> listFutures = new LinkedList<Future<Integer>>();
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
final Integer fi = i; final Integer fi = i;
listFutures.add(ff.future(new Callable<Integer>() { listFutures.add(Futures.future(new Callable<Integer>() {
public Integer call() { public Integer call() {
return fi; return fi;
} }
})); }, system.dispatcher()));
} }
final Integer expect = 5; final Integer expect = 5;
Future<Option<Integer>> f = ff.find(listFutures, new Function<Integer, Boolean>() { Future<Option<Integer>> f = Futures.find(listFutures, new Function<Integer, Boolean>() {
public Boolean apply(Integer i) { public Boolean apply(Integer i) {
return i == 5; return i == 5;
} }
}); }, system.dispatcher());
assertEquals(expect, Block.sync(f, Duration.create(5, TimeUnit.SECONDS))); assertEquals(expect, Block.sync(f, Duration.create(5, TimeUnit.SECONDS)));
} }
@Test @Test
public void BlockMustBeCallable() { public void BlockMustBeCallable() {
Promise<String> p = ff.<String>promise(); Promise<String> p = Futures.promise(system.dispatcher());
Duration d = Duration.create(1, TimeUnit.SECONDS); Duration d = Duration.create(1, TimeUnit.SECONDS);
p.completeWithResult("foo"); p.completeWithResult("foo");
Block.on(p, d); Block.on(p, d);

View file

@ -45,40 +45,32 @@ object Block {
def sync[T](block: Blockable[T], atMost: Duration): T = block.sync(atMost) def sync[T](block: Blockable[T], atMost: Duration): T = block.sync(atMost)
} }
class FutureFactory(implicit dispatcher: MessageDispatcher) { object Futures {
/** /**
* Java API, equivalent to Future.apply * Java API, equivalent to Future.apply
*/ */
def future[T](body: Callable[T]): Future[T] = def future[T](body: Callable[T], dispatcher: MessageDispatcher): Future[T] = Future(body.call)(dispatcher)
Future(body.call)
/**
* Java API, equivalent to Future.apply
*/
def future[T](body: Callable[T], dispatcher: MessageDispatcher): Future[T] =
Future(body.call)(dispatcher)
/** /**
* Java API, equivalent to Promise.apply * Java API, equivalent to Promise.apply
*/ */
def promise[T](): Promise[T] = Promise[T]() def promise[T](dispatcher: MessageDispatcher): Promise[T] = Promise[T]()(dispatcher)
/** /**
* Java API. * Java API.
* Returns a Future that will hold the optional result of the first Future with a result that matches the predicate * Returns a Future that will hold the optional result of the first Future with a result that matches the predicate
*/ */
def find[T <: AnyRef](futures: JIterable[Future[T]], predicate: JFunc[T, java.lang.Boolean]): Future[JOption[T]] = { def find[T <: AnyRef](futures: JIterable[Future[T]], predicate: JFunc[T, java.lang.Boolean], dispatcher: MessageDispatcher): Future[JOption[T]] = {
val pred: T Boolean = predicate.apply(_) Future.find[T]((scala.collection.JavaConversions.iterableAsScalaIterable(futures)))(predicate.apply(_))(dispatcher).map(JOption.fromScalaOption(_))
Future.find[T]((scala.collection.JavaConversions.iterableAsScalaIterable(futures)))(pred).map(JOption.fromScalaOption(_))
} }
/** /**
* Java API. * Java API.
* Returns a Future to the result of the first future in the list that is completed * Returns a Future to the result of the first future in the list that is completed
*/ */
def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]]): Future[T] = def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]], dispatcher: MessageDispatcher): Future[T] =
Future.firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures)) Future.firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(dispatcher)
/** /**
* Java API * Java API
@ -87,22 +79,23 @@ class FutureFactory(implicit dispatcher: MessageDispatcher) {
* the result will be the first failure of any of the futures, or any failure in the actual fold, * the result will be the first failure of any of the futures, or any failure in the actual fold,
* or the result of the fold. * or the result of the fold.
*/ */
def fold[T <: AnyRef, R <: AnyRef](zero: R, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = def fold[T <: AnyRef, R <: AnyRef](zero: R, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R], dispatcher: MessageDispatcher): Future[R] =
Future.fold(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(zero)(fun.apply _) Future.fold(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(zero)(fun.apply _)(dispatcher)
/** /**
* Java API. * Java API.
* Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
*/ */
def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, T]): Future[R] = def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, T], dispatcher: MessageDispatcher): Future[R] =
Future.reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(fun.apply _) Future.reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(fun.apply _)(dispatcher)
/** /**
* Java API. * Java API.
* Simple version of Future.traverse. Transforms a java.lang.Iterable[Future[A]] into a Future[java.lang.Iterable[A]]. * Simple version of Future.traverse. Transforms a java.lang.Iterable[Future[A]] into a Future[java.lang.Iterable[A]].
* Useful for reducing many Futures into a single Future. * Useful for reducing many Futures into a single Future.
*/ */
def sequence[A](in: JIterable[Future[A]]): Future[JIterable[A]] = { def sequence[A](in: JIterable[Future[A]], dispatcher: MessageDispatcher): Future[JIterable[A]] = {
implicit val d = dispatcher
scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[A]()))((fr, fa) scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[A]()))((fr, fa)
for (r fr; a fa) yield { for (r fr; a fa) yield {
r add a r add a
@ -116,7 +109,8 @@ class FutureFactory(implicit dispatcher: MessageDispatcher) {
* This is useful for performing a parallel map. For example, to apply a function to all items of a list * This is useful for performing a parallel map. For example, to apply a function to all items of a list
* in parallel. * in parallel.
*/ */
def traverse[A, B](in: JIterable[A], fn: JFunc[A, Future[B]]): Future[JIterable[B]] = { def traverse[A, B](in: JIterable[A], fn: JFunc[A, Future[B]], dispatcher: MessageDispatcher): Future[JIterable[B]] = {
implicit val d = dispatcher
scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[B]())) { (fr, a) scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[B]())) { (fr, a)
val fb = fn(a) val fb = fn(a)
for (r fr; b fb) yield { r add b; r } for (r fr; b fb) yield { r add b; r }
@ -612,6 +606,8 @@ sealed trait Future[+T] extends japi.Future[T] with Block.Blockable[T] {
object Promise { object Promise {
/** /**
* Creates a non-completed, new, Promise * Creates a non-completed, new, Promise
*
* Scala API
*/ */
def apply[A]()(implicit dispatcher: MessageDispatcher): Promise[A] = new DefaultPromise[A]() def apply[A]()(implicit dispatcher: MessageDispatcher): Promise[A] = new DefaultPromise[A]()
} }