From 7eced71a85a1b3737669d3a0d9d52938eda1f32c Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 12 Dec 2011 14:39:10 +0100 Subject: [PATCH] Removing FutureFactory and reintroducing Futures (for Java API) --- .../java/akka/dispatch/JavaFutureTests.java | 60 +++++++++---------- .../src/main/scala/akka/dispatch/Future.scala | 38 ++++++------ 2 files changed, 46 insertions(+), 52 deletions(-) diff --git a/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java b/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java index a97da8fe56..0f06dbd5c7 100644 --- a/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java +++ b/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java @@ -23,14 +23,12 @@ import akka.testkit.AkkaSpec; public class JavaFutureTests { private static ActorSystem system; - private volatile static FutureFactory ff; private static Timeout t; @BeforeClass public static void beforeAll() { system = ActorSystem.create("JavaFutureTests", AkkaSpec.testConf()); t = system.settings().ActorTimeout(); - ff = new FutureFactory(system.dispatcher()); } @AfterClass @@ -42,11 +40,11 @@ public class JavaFutureTests { @Test public void mustBeAbleToMapAFuture() { - Future f1 = ff.future(new Callable() { + Future f1 = Futures.future(new Callable() { public String call() { return "Hello"; } - }); + }, system.dispatcher()); Future f2 = f1.map(new Function() { public String apply(String s) { @@ -60,7 +58,7 @@ public class JavaFutureTests { @Test public void mustBeAbleToExecuteAnOnResultCallback() throws Throwable { final CountDownLatch latch = new CountDownLatch(1); - Promise cf = ff.promise(); + Promise cf = Futures.promise(system.dispatcher()); Future f = cf; f.onResult(new Procedure() { public void apply(String result) { @@ -77,7 +75,7 @@ public class JavaFutureTests { @Test public void mustBeAbleToExecuteAnOnExceptionCallback() throws Throwable { final CountDownLatch latch = new CountDownLatch(1); - Promise cf = ff.promise(); + Promise cf = Futures.promise(system.dispatcher()); Future f = cf; f.onException(new Procedure() { public void apply(Throwable t) { @@ -95,7 +93,7 @@ public class JavaFutureTests { @Test public void mustBeAbleToExecuteAnOnCompleteCallback() throws Throwable { final CountDownLatch latch = new CountDownLatch(1); - Promise cf = ff.promise(); + Promise cf = Futures.promise(system.dispatcher()); Future f = cf; f.onComplete(new Procedure>() { public void apply(akka.dispatch.Future future) { @@ -111,7 +109,7 @@ public class JavaFutureTests { @Test public void mustBeAbleToForeachAFuture() throws Throwable { final CountDownLatch latch = new CountDownLatch(1); - Promise cf = ff.promise(); + Promise cf = Futures.promise(system.dispatcher()); Future f = cf; f.foreach(new Procedure() { public void apply(String future) { @@ -127,13 +125,13 @@ public class JavaFutureTests { @Test public void mustBeAbleToFlatMapAFuture() throws Throwable { final CountDownLatch latch = new CountDownLatch(1); - Promise cf = ff.promise(); + Promise cf = Futures.promise(system.dispatcher()); cf.completeWithResult("1000"); Future f = cf; Future r = f.flatMap(new Function>() { public Future apply(String r) { latch.countDown(); - Promise cf = ff.promise(); + Promise cf = Futures.promise(system.dispatcher()); cf.completeWithResult(Integer.parseInt(r)); return cf; } @@ -147,7 +145,7 @@ public class JavaFutureTests { @Test public void mustBeAbleToFilterAFuture() throws Throwable { final CountDownLatch latch = new CountDownLatch(1); - Promise cf = ff.promise(); + Promise cf = Futures.promise(system.dispatcher()); Future f = cf; Future r = f.filter(new Function() { public Boolean apply(String r) { @@ -170,14 +168,14 @@ public class JavaFutureTests { for (int i = 0; i < 10; i++) { listExpected.add("test"); - listFutures.add(ff.future(new Callable() { + listFutures.add(Futures.future(new Callable() { public String call() { return "test"; } - })); + }, system.dispatcher())); } - Future> futureList = ff.sequence(listFutures); + Future> futureList = Futures.sequence(listFutures, system.dispatcher()); assertEquals(futureList.get(), listExpected); } @@ -190,18 +188,18 @@ public class JavaFutureTests { for (int i = 0; i < 10; i++) { expected.append("test"); - listFutures.add(ff.future(new Callable() { + listFutures.add(Futures.future(new Callable() { public String call() { return "test"; } - })); + }, system.dispatcher())); } - Future result = ff.fold("", listFutures, new Function2() { + Future result = Futures.fold("", listFutures, new Function2() { public String apply(String r, String t) { return r + t; } - }); + }, system.dispatcher()); assertEquals(result.get(), expected.toString()); } @@ -213,18 +211,18 @@ public class JavaFutureTests { for (int i = 0; i < 10; i++) { expected.append("test"); - listFutures.add(ff.future(new Callable() { + listFutures.add(Futures.future(new Callable() { public String call() { return "test"; } - })); + }, system.dispatcher())); } - Future result = ff.reduce(listFutures, new Function2() { + Future result = Futures.reduce(listFutures, new Function2() { public String apply(String r, String t) { return r + t; } - }); + }, system.dispatcher()); assertEquals(result.get(), expected.toString()); } @@ -239,15 +237,15 @@ public class JavaFutureTests { listStrings.add("test"); } - Future> result = ff.traverse(listStrings, new Function>() { + Future> result = Futures.traverse(listStrings, new Function>() { public Future apply(final String r) { - return ff.future(new Callable() { + return Futures.future(new Callable() { public String call() { return r.toUpperCase(); } - }); + }, system.dispatcher()); } - }); + }, system.dispatcher()); assertEquals(result.get(), expectedStrings); } @@ -257,25 +255,25 @@ public class JavaFutureTests { LinkedList> listFutures = new LinkedList>(); for (int i = 0; i < 10; i++) { final Integer fi = i; - listFutures.add(ff.future(new Callable() { + listFutures.add(Futures.future(new Callable() { public Integer call() { return fi; } - })); + }, system.dispatcher())); } final Integer expect = 5; - Future> f = ff.find(listFutures, new Function() { + Future> f = Futures.find(listFutures, new Function() { public Boolean apply(Integer i) { return i == 5; } - }); + }, system.dispatcher()); assertEquals(expect, Block.sync(f, Duration.create(5, TimeUnit.SECONDS))); } @Test public void BlockMustBeCallable() { - Promise p = ff.promise(); + Promise p = Futures.promise(system.dispatcher()); Duration d = Duration.create(1, TimeUnit.SECONDS); p.completeWithResult("foo"); Block.on(p, d); diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index e4ff9d1a14..241c00ee65 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -45,40 +45,32 @@ object Block { def sync[T](block: Blockable[T], atMost: Duration): T = block.sync(atMost) } -class FutureFactory(implicit dispatcher: MessageDispatcher) { +object Futures { /** * Java API, equivalent to Future.apply */ - def future[T](body: Callable[T]): Future[T] = - Future(body.call) - - /** - * Java API, equivalent to Future.apply - */ - def future[T](body: Callable[T], dispatcher: MessageDispatcher): Future[T] = - Future(body.call)(dispatcher) + def future[T](body: Callable[T], dispatcher: MessageDispatcher): Future[T] = Future(body.call)(dispatcher) /** * Java API, equivalent to Promise.apply */ - def promise[T](): Promise[T] = Promise[T]() + def promise[T](dispatcher: MessageDispatcher): Promise[T] = Promise[T]()(dispatcher) /** * Java API. * Returns a Future that will hold the optional result of the first Future with a result that matches the predicate */ - def find[T <: AnyRef](futures: JIterable[Future[T]], predicate: JFunc[T, java.lang.Boolean]): Future[JOption[T]] = { - val pred: T ⇒ Boolean = predicate.apply(_) - Future.find[T]((scala.collection.JavaConversions.iterableAsScalaIterable(futures)))(pred).map(JOption.fromScalaOption(_)) + def find[T <: AnyRef](futures: JIterable[Future[T]], predicate: JFunc[T, java.lang.Boolean], dispatcher: MessageDispatcher): Future[JOption[T]] = { + Future.find[T]((scala.collection.JavaConversions.iterableAsScalaIterable(futures)))(predicate.apply(_))(dispatcher).map(JOption.fromScalaOption(_)) } /** * Java API. * Returns a Future to the result of the first future in the list that is completed */ - def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]]): Future[T] = - Future.firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures)) + def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]], dispatcher: MessageDispatcher): Future[T] = + Future.firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(dispatcher) /** * Java API @@ -87,22 +79,23 @@ class FutureFactory(implicit dispatcher: MessageDispatcher) { * the result will be the first failure of any of the futures, or any failure in the actual fold, * or the result of the fold. */ - def fold[T <: AnyRef, R <: AnyRef](zero: R, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = - Future.fold(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(zero)(fun.apply _) + def fold[T <: AnyRef, R <: AnyRef](zero: R, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R], dispatcher: MessageDispatcher): Future[R] = + Future.fold(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(zero)(fun.apply _)(dispatcher) /** * Java API. * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first */ - def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, T]): Future[R] = - Future.reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(fun.apply _) + def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, T], dispatcher: MessageDispatcher): Future[R] = + Future.reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(fun.apply _)(dispatcher) /** * Java API. * Simple version of Future.traverse. Transforms a java.lang.Iterable[Future[A]] into a Future[java.lang.Iterable[A]]. * Useful for reducing many Futures into a single Future. */ - def sequence[A](in: JIterable[Future[A]]): Future[JIterable[A]] = { + def sequence[A](in: JIterable[Future[A]], dispatcher: MessageDispatcher): Future[JIterable[A]] = { + implicit val d = dispatcher scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[A]()))((fr, fa) ⇒ for (r ← fr; a ← fa) yield { r add a @@ -116,7 +109,8 @@ class FutureFactory(implicit dispatcher: MessageDispatcher) { * This is useful for performing a parallel map. For example, to apply a function to all items of a list * in parallel. */ - def traverse[A, B](in: JIterable[A], fn: JFunc[A, Future[B]]): Future[JIterable[B]] = { + def traverse[A, B](in: JIterable[A], fn: JFunc[A, Future[B]], dispatcher: MessageDispatcher): Future[JIterable[B]] = { + implicit val d = dispatcher scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[B]())) { (fr, a) ⇒ val fb = fn(a) for (r ← fr; b ← fb) yield { r add b; r } @@ -612,6 +606,8 @@ sealed trait Future[+T] extends japi.Future[T] with Block.Blockable[T] { object Promise { /** * Creates a non-completed, new, Promise + * + * Scala API */ def apply[A]()(implicit dispatcher: MessageDispatcher): Promise[A] = new DefaultPromise[A]() }