From 5f83340d5e08b71009735a99256831d03fccea0f Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 25 Jan 2012 22:36:03 +0100 Subject: [PATCH 01/10] wip --- .../src/main/scala/akka/dispatch/Future.scala | 11 +++- .../scala/akka/dispatch/japi/Future.scala | 50 +++++++++++++++---- .../src/main/scala/akka/util/BoxedType.scala | 8 +-- .../akka/docs/future/FutureDocTestBase.java | 21 ++++---- 4 files changed, 64 insertions(+), 26 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 09ce22d6b8..19466c2a93 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -507,13 +507,20 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { * Creates a new Future[A] which is completed with this Future's result if * that conforms to A's erased type or a ClassCastException otherwise. */ - final def mapTo[A](implicit m: Manifest[A]): Future[A] = { + final def mapTo[A](implicit m: Manifest[A]): Future[A] = + mapTo[A](m.erasure.asInstanceOf[Class[A]]) + + /** + * Creates a new Future[A] which is completed with this Future's result if + * that conforms to A's erased type or a ClassCastException otherwise. + */ + final def mapTo[A](clazz: Class[A]): Future[A] = { val fa = Promise[A]() onComplete { case l: Left[_, _] ⇒ fa complete l.asInstanceOf[Either[Throwable, A]] case Right(t) ⇒ fa complete (try { - Right(BoxedType(m.erasure).cast(t).asInstanceOf[A]) + Right(BoxedType(clazz).cast(t).asInstanceOf[A]) } catch { case e: ClassCastException ⇒ Left(e) }) diff --git a/akka-actor/src/main/scala/akka/dispatch/japi/Future.scala b/akka-actor/src/main/scala/akka/dispatch/japi/Future.scala index ac4ef7694e..ca28099d8d 100644 --- a/akka-actor/src/main/scala/akka/dispatch/japi/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/japi/Future.scala @@ -6,6 +6,47 @@ package akka.dispatch.japi import akka.util.Timeout import akka.japi.{ Procedure2, Procedure, Function ⇒ JFunc, Option ⇒ JOption } +class Callback[-T] extends PartialFunction[T, Unit] { + override final def isDefinedAt(t: T): Boolean = true + override final def apply(t: T): Unit = on(t) + protected def on(result: T): Unit = () +} + +abstract class OnSuccess[-T] extends Callback[T] { + protected final override def on(result: T) = onSuccess(result) + def onSuccess(result: T): Unit +} + +abstract class OnFailure extends Callback[Throwable] { + protected final override def on(failure: Throwable) = onFailure(failure) + def onFailure(failure: Throwable): Unit +} + +abstract class OnComplete[-T] extends Callback[Either[Throwable, T]] { + protected final override def on(value: Either[Throwable, T]): Unit = value match { + case Left(t) ⇒ onComplete(t, null.asInstanceOf[T]) + case Right(r) ⇒ onComplete(null, r) + } + def onComplete(failure: Throwable, success: T): Unit +} + +abstract class Filter[-T] extends (T ⇒ Boolean) { + override final def apply(t: T): Boolean = filter(t) + def filter(result: T): Boolean +} + +abstract class Foreach[-T] extends (T ⇒ Unit) { + override final def apply(t: T): Unit = each(t) + def each(result: T): Unit +} + +abstract class Mapper[-T, +R] extends (T ⇒ R) + +/* +map => A => B +flatMap => A => F[B] +foreach +*/ /* Java API */ trait Future[+T] { self: akka.dispatch.Future[T] ⇒ /** @@ -50,14 +91,5 @@ trait Future[+T] { self: akka.dispatch.Future[T] ⇒ */ private[japi] final def filter[A >: T](p: JFunc[A, java.lang.Boolean]): akka.dispatch.Future[A] = self.filter((a: Any) ⇒ p(a.asInstanceOf[A])).asInstanceOf[akka.dispatch.Future[A]] - - /** - * Returns a new Future whose value will be of the specified type if it really is - * Or a failure with a ClassCastException if it wasn't. - */ - private[japi] final def mapTo[A](clazz: Class[A]): akka.dispatch.Future[A] = { - implicit val manifest: Manifest[A] = Manifest.classType(clazz) - self.mapTo[A] - } } diff --git a/akka-actor/src/main/scala/akka/util/BoxedType.scala b/akka-actor/src/main/scala/akka/util/BoxedType.scala index d2c5092be4..f5f95096d9 100644 --- a/akka-actor/src/main/scala/akka/util/BoxedType.scala +++ b/akka-actor/src/main/scala/akka/util/BoxedType.scala @@ -3,9 +3,8 @@ */ package akka.util -import java.{ lang ⇒ jl } - object BoxedType { + import java.{ lang ⇒ jl } private val toBoxed = Map[Class[_], Class[_]]( classOf[Boolean] -> classOf[jl.Boolean], @@ -18,8 +17,5 @@ object BoxedType { classOf[Double] -> classOf[jl.Double], classOf[Unit] -> classOf[scala.runtime.BoxedUnit]) - def apply(c: Class[_]): Class[_] = { - if (c.isPrimitive) toBoxed(c) else c - } - + final def apply(c: Class[_]): Class[_] = if (c.isPrimitive) toBoxed(c) else c } diff --git a/akka-docs/java/code/akka/docs/future/FutureDocTestBase.java b/akka-docs/java/code/akka/docs/future/FutureDocTestBase.java index e642047709..8c600440f3 100644 --- a/akka-docs/java/code/akka/docs/future/FutureDocTestBase.java +++ b/akka-docs/java/code/akka/docs/future/FutureDocTestBase.java @@ -10,6 +10,9 @@ import akka.japi.Procedure2; import akka.util.Timeout; import akka.dispatch.Await; import akka.dispatch.Future; +import akka.dispatch.japi.Mapper; +import akka.dispatch.japi.OnSuccess; +import akka.dispatch.japi.OnFailure; //#imports1 @@ -110,7 +113,7 @@ public class FutureDocTestBase { } }, system.dispatcher()); - Future f2 = f1.map(new Function() { + Future f2 = f1.map(new Mapper() { public Integer apply(String s) { return s.length(); } @@ -131,7 +134,7 @@ public class FutureDocTestBase { } }, system.dispatcher()); - Future f2 = f1.map(new Function() { + Future f2 = f1.map(new Mapper() { public Integer apply(String s) { return s.length(); } @@ -153,7 +156,7 @@ public class FutureDocTestBase { Thread.sleep(100); - Future f2 = f1.map(new Function() { + Future f2 = f1.map(new Mapper() { public Integer apply(String s) { return s.length(); } @@ -173,7 +176,7 @@ public class FutureDocTestBase { } }, system.dispatcher()); - Future f2 = f1.flatMap(new Function>() { + Future f2 = f1.flatMap(new Mapper>() { public Future apply(final String s) { return future(new Callable() { public Integer call() { @@ -322,8 +325,8 @@ public class FutureDocTestBase { { Future future = Futures.successful("foo", system.dispatcher()); //#onSuccess - future.onSuccess(new Procedure() { - public void apply(String result) { + future.onSuccess(new OnSuccess() { + public void onSuccess(String result) { if ("bar" == result) { //Do something if it resulted in "bar" } else { @@ -337,8 +340,8 @@ public class FutureDocTestBase { Future future = Futures.failed(new IllegalStateException("OHNOES"), system.dispatcher()); //#onFailure - future.onFailure( new Procedure() { - public void apply(Throwable failure) { + future.onFailure( new OnFailure() { + public void onFailure(Throwable failure) { if (failure instanceof IllegalStateException) { //Do something if it was this particular failure } else { @@ -370,7 +373,7 @@ public class FutureDocTestBase { Future future1 = Futures.successful("foo", system.dispatcher()); Future future2 = Futures.successful("bar", system.dispatcher()); Future future3 = - future1.zip(future2).map(new Function, String>() { + future1.zip(future2).map(new Mapper, String>() { public String apply(scala.Tuple2 zipped) { return zipped._1() + " " + zipped._2(); } From f00c4f61be66591a0ceb8bc071ea87607a5b71a7 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 25 Jan 2012 22:49:31 +0100 Subject: [PATCH 02/10] more wip --- .../java/code/akka/docs/future/FutureDocTestBase.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/akka-docs/java/code/akka/docs/future/FutureDocTestBase.java b/akka-docs/java/code/akka/docs/future/FutureDocTestBase.java index 8c600440f3..37349c186b 100644 --- a/akka-docs/java/code/akka/docs/future/FutureDocTestBase.java +++ b/akka-docs/java/code/akka/docs/future/FutureDocTestBase.java @@ -13,6 +13,7 @@ import akka.dispatch.Future; import akka.dispatch.japi.Mapper; import akka.dispatch.japi.OnSuccess; import akka.dispatch.japi.OnFailure; +import akka.dispatch.japi.Filter; //#imports1 @@ -207,7 +208,7 @@ public class FutureDocTestBase { // Find the sum of the odd numbers Future futureSum = futureListOfInts.map( - new Function, Long>() { + new Mapper, Long>() { public Long apply(Iterable ints) { long sum = 0; for (Integer i : ints) @@ -309,13 +310,13 @@ public class FutureDocTestBase { //#filter Future future1 = Futures.successful(4, system.dispatcher()); Future successfulFilter = - future1.filter(new Function() { - public Boolean apply(Integer i) { return i % 2 == 0; } + future1.filter(new Filter() { + public boolean filter(Integer i) { return i % 2 == 0; } }); Future failedFilter = - future1.filter(new Function() { - public Boolean apply(Integer i) { return i % 2 != 0; } + future1.filter(new Filter() { + public boolean filter(Integer i) { return i % 2 != 0; } }); //When filter fails, the returned Future will be failed with a scala.MatchError //#filter From 87cb83f0d73616f73a64620ed83daa28fcb072b4 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 26 Jan 2012 12:41:50 +0100 Subject: [PATCH 03/10] wip --- .../src/main/scala/akka/dispatch/japi/Future.scala | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/akka-actor/src/main/scala/akka/dispatch/japi/Future.scala b/akka-actor/src/main/scala/akka/dispatch/japi/Future.scala index ca28099d8d..5f32ac1f35 100644 --- a/akka-actor/src/main/scala/akka/dispatch/japi/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/japi/Future.scala @@ -6,6 +6,7 @@ package akka.dispatch.japi import akka.util.Timeout import akka.japi.{ Procedure2, Procedure, Function ⇒ JFunc, Option ⇒ JOption } +@deprecated("Do not use this directly, use subclasses of this", "2.0") class Callback[-T] extends PartialFunction[T, Unit] { override final def isDefinedAt(t: T): Boolean = true override final def apply(t: T): Unit = on(t) @@ -30,6 +31,18 @@ abstract class OnComplete[-T] extends Callback[Either[Throwable, T]] { def onComplete(failure: Throwable, success: T): Unit } +@deprecated("Do not use this directly, use 'Recover'", "2.0") +class RecoverBridge[+T] extends PartialFunction[Throwable, T] { + override final def isDefinedAt(t: Throwable): Boolean = true + override final def apply(t: Throwable): T = on(t) + protected def on(result: Throwable): T = null.asInstanceOf[T] +} + +abstract class Recover[+T] extends RecoverBridge[T] { + protected final override def on(result: Throwable): T = recover(result) + def recover(failure: Throwable): T +} + abstract class Filter[-T] extends (T ⇒ Boolean) { override final def apply(t: T): Boolean = filter(t) def filter(result: T): Boolean From b31040733411889bed30f57c98e15edd1d73ea36 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 26 Jan 2012 14:15:25 +0100 Subject: [PATCH 04/10] wip --- .../java/akka/dispatch/JavaFutureTests.java | 24 ++-- .../src/main/scala/akka/dispatch/Future.scala | 64 ++++++++++- .../scala/akka/dispatch/japi/Future.scala | 108 ------------------ .../docs/actor/UntypedActorDocTestBase.java | 3 +- .../akka/docs/future/FutureDocTestBase.java | 13 +-- 5 files changed, 80 insertions(+), 132 deletions(-) delete mode 100644 akka-actor/src/main/scala/akka/dispatch/japi/Future.scala diff --git a/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java b/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java index a87b7933d8..9b89a2b476 100644 --- a/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java +++ b/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java @@ -45,7 +45,7 @@ public class JavaFutureTests { } }, system.dispatcher()); - Future f2 = f1.map(new Function() { + Future f2 = f1.map(new Mapper() { public String apply(String s) { return s + " World"; } @@ -59,8 +59,8 @@ public class JavaFutureTests { final CountDownLatch latch = new CountDownLatch(1); Promise cf = Futures.promise(system.dispatcher()); Future f = cf; - f.onSuccess(new Procedure() { - public void apply(String result) { + f.onSuccess(new OnSuccess() { + public void onSuccess(String result) { if (result.equals("foo")) latch.countDown(); } @@ -76,8 +76,8 @@ public class JavaFutureTests { final CountDownLatch latch = new CountDownLatch(1); Promise cf = Futures.promise(system.dispatcher()); Future f = cf; - f.onFailure(new Procedure() { - public void apply(Throwable t) { + f.onFailure(new OnFailure() { + public void onFailure(Throwable t) { if (t instanceof NullPointerException) latch.countDown(); } @@ -94,8 +94,8 @@ public class JavaFutureTests { final CountDownLatch latch = new CountDownLatch(1); Promise cf = Futures.promise(system.dispatcher()); Future f = cf; - f.onComplete(new Procedure2() { - public void apply(Throwable t, String r) { + f.onComplete(new OnComplete() { + public void onComplete(Throwable t, String r) { latch.countDown(); } }); @@ -110,8 +110,8 @@ public class JavaFutureTests { final CountDownLatch latch = new CountDownLatch(1); Promise cf = Futures.promise(system.dispatcher()); Future f = cf; - f.foreach(new Procedure() { - public void apply(String future) { + f.foreach(new Foreach() { + public void each(String future) { latch.countDown(); } }); @@ -127,7 +127,7 @@ public class JavaFutureTests { Promise cf = Futures.promise(system.dispatcher()); cf.success("1000"); Future f = cf; - Future r = f.flatMap(new Function>() { + Future r = f.flatMap(new Mapper>() { public Future apply(String r) { latch.countDown(); Promise cf = Futures.promise(system.dispatcher()); @@ -146,8 +146,8 @@ public class JavaFutureTests { final CountDownLatch latch = new CountDownLatch(1); Promise cf = Futures.promise(system.dispatcher()); Future f = cf; - Future r = f.filter(new Function() { - public Boolean apply(String r) { + Future r = f.filter(new Filter() { + public boolean filter(String r) { latch.countDown(); return r.equals("foo"); } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 19466c2a93..454a045c9c 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -343,7 +343,7 @@ object Future { } } -sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { +sealed trait Future[+T] extends Await.Awaitable[T] { implicit def executor: ExecutionContext @@ -828,3 +828,65 @@ final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val exe case Right(r) ⇒ r } } +object japi { + @deprecated("Do not use this directly, use subclasses of this", "2.0") + class CallbackBridge[-T] extends PartialFunction[T, Unit] { + override final def isDefinedAt(t: T): Boolean = true + override final def apply(t: T): Unit = internal(t) + protected def internal(result: T): Unit = () + } + + @deprecated("Do not use this directly, use 'Recover'", "2.0") + class RecoverBridge[+T] extends PartialFunction[Throwable, T] { + override final def isDefinedAt(t: Throwable): Boolean = true + override final def apply(t: Throwable): T = internal(t) + protected def internal(result: Throwable): T = null.asInstanceOf[T] + } + + @deprecated("Do not use this directly, use subclasses of this", "2.0") + class BooleanFunctionBridge[-T] extends scala.Function1[T, Boolean] { + override final def apply(t: T): Boolean = internal(t) + protected def internal(result: T): Boolean = false + } + + @deprecated("Do not use this directly, use subclasses of this", "2.0") + class UnitFunctionBridge[-T] extends (T ⇒ Unit) { + override final def apply(t: T): Unit = internal(t) + protected def internal(result: T): Unit = () + } +} + +abstract class OnSuccess[-T] extends japi.CallbackBridge[T] { + protected final override def internal(result: T) = onSuccess(result) + def onSuccess(result: T): Unit +} + +abstract class OnFailure extends japi.CallbackBridge[Throwable] { + protected final override def internal(failure: Throwable) = onFailure(failure) + def onFailure(failure: Throwable): Unit +} + +abstract class OnComplete[-T] extends japi.CallbackBridge[Either[Throwable, T]] { + protected final override def internal(value: Either[Throwable, T]): Unit = value match { + case Left(t) ⇒ onComplete(t, null.asInstanceOf[T]) + case Right(r) ⇒ onComplete(null, r) + } + def onComplete(failure: Throwable, success: T): Unit +} + +abstract class Recover[+T] extends japi.RecoverBridge[T] { + protected final override def internal(result: Throwable): T = recover(result) + def recover(failure: Throwable): T +} + +abstract class Filter[-T] extends japi.BooleanFunctionBridge[T] { + override final def internal(t: T): Boolean = filter(t) + def filter(result: T): Boolean +} + +abstract class Foreach[-T] extends japi.UnitFunctionBridge[T] { + override final def internal(t: T): Unit = each(t) + def each(result: T): Unit +} + +abstract class Mapper[-T, +R] extends scala.runtime.AbstractFunction1[T, R] \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/dispatch/japi/Future.scala b/akka-actor/src/main/scala/akka/dispatch/japi/Future.scala deleted file mode 100644 index 5f32ac1f35..0000000000 --- a/akka-actor/src/main/scala/akka/dispatch/japi/Future.scala +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ -package akka.dispatch.japi - -import akka.util.Timeout -import akka.japi.{ Procedure2, Procedure, Function ⇒ JFunc, Option ⇒ JOption } - -@deprecated("Do not use this directly, use subclasses of this", "2.0") -class Callback[-T] extends PartialFunction[T, Unit] { - override final def isDefinedAt(t: T): Boolean = true - override final def apply(t: T): Unit = on(t) - protected def on(result: T): Unit = () -} - -abstract class OnSuccess[-T] extends Callback[T] { - protected final override def on(result: T) = onSuccess(result) - def onSuccess(result: T): Unit -} - -abstract class OnFailure extends Callback[Throwable] { - protected final override def on(failure: Throwable) = onFailure(failure) - def onFailure(failure: Throwable): Unit -} - -abstract class OnComplete[-T] extends Callback[Either[Throwable, T]] { - protected final override def on(value: Either[Throwable, T]): Unit = value match { - case Left(t) ⇒ onComplete(t, null.asInstanceOf[T]) - case Right(r) ⇒ onComplete(null, r) - } - def onComplete(failure: Throwable, success: T): Unit -} - -@deprecated("Do not use this directly, use 'Recover'", "2.0") -class RecoverBridge[+T] extends PartialFunction[Throwable, T] { - override final def isDefinedAt(t: Throwable): Boolean = true - override final def apply(t: Throwable): T = on(t) - protected def on(result: Throwable): T = null.asInstanceOf[T] -} - -abstract class Recover[+T] extends RecoverBridge[T] { - protected final override def on(result: Throwable): T = recover(result) - def recover(failure: Throwable): T -} - -abstract class Filter[-T] extends (T ⇒ Boolean) { - override final def apply(t: T): Boolean = filter(t) - def filter(result: T): Boolean -} - -abstract class Foreach[-T] extends (T ⇒ Unit) { - override final def apply(t: T): Unit = each(t) - def each(result: T): Unit -} - -abstract class Mapper[-T, +R] extends (T ⇒ R) - -/* -map => A => B -flatMap => A => F[B] -foreach -*/ -/* Java API */ -trait Future[+T] { self: akka.dispatch.Future[T] ⇒ - /** - * Asynchronously called when this Future gets a successful result - */ - private[japi] final def onSuccess[A >: T](proc: Procedure[A]): this.type = self.onSuccess({ case r ⇒ proc(r.asInstanceOf[A]) }: PartialFunction[T, Unit]) - - /** - * Asynchronously called when this Future gets a failed result - */ - private[japi] final def onFailure(proc: Procedure[Throwable]): this.type = self.onFailure({ case t: Throwable ⇒ proc(t) }: PartialFunction[Throwable, Unit]) - - /** - * Asynchronously called when this future is completed with either a failed or a successful result - * In case of a success, the first parameter (Throwable) will be null - * In case of a failure, the second parameter (T) will be null - * For no reason will both be null or neither be null - */ - private[japi] final def onComplete[A >: T](proc: Procedure2[Throwable, A]): this.type = self.onComplete(_.fold(t ⇒ proc(t, null.asInstanceOf[T]), r ⇒ proc(null, r))) - - /** - * Asynchronously applies the provided function to the (if any) successful result of this Future - * Any failure of this Future will be propagated to the Future returned by this method. - */ - private[japi] final def map[A >: T, B](f: JFunc[A, B]): akka.dispatch.Future[B] = self.map(f(_)) - - /** - * Asynchronously applies the provided function to the (if any) successful result of this Future and flattens it. - * Any failure of this Future will be propagated to the Future returned by this method. - */ - private[japi] final def flatMap[A >: T, B](f: JFunc[A, akka.dispatch.Future[B]]): akka.dispatch.Future[B] = self.flatMap(f(_)) - - /** - * Asynchronously applies the provided Procedure to the (if any) successful result of this Future - * Provided Procedure will not be called in case of no-result or in case of failed result - */ - private[japi] final def foreach[A >: T](proc: Procedure[A]): Unit = self.foreach(proc(_)) - - /** - * Returns a new Future whose successful result will be the successful result of this Future if that result conforms to the provided predicate - * Any failure of this Future will be propagated to the Future returned by this method. - */ - private[japi] final def filter[A >: T](p: JFunc[A, java.lang.Boolean]): akka.dispatch.Future[A] = - self.filter((a: Any) ⇒ p(a.asInstanceOf[A])).asInstanceOf[akka.dispatch.Future[A]] -} - diff --git a/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java b/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java index a72c828862..9abd96b0a7 100644 --- a/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java +++ b/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java @@ -12,6 +12,7 @@ import akka.actor.Props; //#import-future import akka.dispatch.Future; import akka.dispatch.Futures; +import akka.dispatch.Mapper; import akka.dispatch.Await; import akka.util.Duration; import akka.util.Timeout; @@ -236,7 +237,7 @@ public class UntypedActorDocTestBase { final Future> aggregate = Futures.sequence(futures, system.dispatcher()); - final Future transformed = aggregate.map(new akka.japi.Function, Result>() { + final Future transformed = aggregate.map(new Mapper, Result>() { public Result apply(Iterable coll) { final Iterator it = coll.iterator(); final String s = (String) it.next(); diff --git a/akka-docs/java/code/akka/docs/future/FutureDocTestBase.java b/akka-docs/java/code/akka/docs/future/FutureDocTestBase.java index 37349c186b..8ecfccbeac 100644 --- a/akka-docs/java/code/akka/docs/future/FutureDocTestBase.java +++ b/akka-docs/java/code/akka/docs/future/FutureDocTestBase.java @@ -4,16 +4,10 @@ package akka.docs.future; //#imports1 -import akka.dispatch.Promise; +import akka.dispatch.*; import akka.japi.Procedure; import akka.japi.Procedure2; import akka.util.Timeout; -import akka.dispatch.Await; -import akka.dispatch.Future; -import akka.dispatch.japi.Mapper; -import akka.dispatch.japi.OnSuccess; -import akka.dispatch.japi.OnFailure; -import akka.dispatch.japi.Filter; //#imports1 @@ -61,7 +55,6 @@ import akka.actor.ActorSystem; import akka.actor.UntypedActor; import akka.actor.ActorRef; import akka.actor.Props; -import akka.dispatch.Futures; import akka.pattern.Patterns; import static org.junit.Assert.*; @@ -355,8 +348,8 @@ public class FutureDocTestBase { { Future future = Futures.successful("foo", system.dispatcher()); //#onComplete - future.onComplete(new Procedure2() { - public void apply(Throwable failure, String result) { + future.onComplete(new OnComplete() { + public void onComplete(Throwable failure, String result) { if (failure != null) { //We got a failure, handle it here } else { From 5ddf1afb2045c3319f039fb36b28f6c70dee1613 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 26 Jan 2012 15:11:49 +0100 Subject: [PATCH 05/10] Adding tests for recover and mapTo, adding API for creating manifests from Java and doccing things --- .../java/akka/dispatch/JavaFutureTests.java | 27 +++++ .../src/main/scala/akka/dispatch/Future.scala | 108 ++++++++++++++++-- .../src/main/scala/akka/japi/JavaAPI.scala | 7 ++ 3 files changed, 133 insertions(+), 9 deletions(-) diff --git a/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java b/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java index 9b89a2b476..ca21b9a6fc 100644 --- a/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java +++ b/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java @@ -14,6 +14,7 @@ import java.util.LinkedList; import java.lang.Iterable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import static akka.japi.util.manifest; import akka.testkit.AkkaSpec; @@ -278,4 +279,30 @@ public class JavaFutureTests { Await.ready(p, d); assertEquals(Await.result(p, d), "foo"); } + + @Test + public void MapToMustBeCallable() { + Promise p = Futures.promise(system.dispatcher()); + Future f = p.future().mapTo(manifest(String.class)); + Duration d = Duration.create(1, TimeUnit.SECONDS); + p.success("foo"); + Await.ready(p, d); + assertEquals(Await.result(p, d), "foo"); + } + + @Test + public void RecoverToMustBeCallable() { + final IllegalStateException fail = new IllegalStateException("OHNOES"); + Promise p = Futures.promise(system.dispatcher()); + Future f = p.future().recover(new Recover() { + public Object recover(Throwable t) throws Throwable { + if (t == fail) return "foo"; + else throw t; + } + }); + Duration d = Duration.create(1, TimeUnit.SECONDS); + p.failure(fail); + Await.ready(p, d); + assertEquals(Await.result(p, d), "foo"); + } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 555ad86055..70768133a0 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -503,21 +503,18 @@ sealed trait Future[+T] extends Await.Awaitable[T] { /** * Creates a new Future[A] which is completed with this Future's result if * that conforms to A's erased type or a ClassCastException otherwise. + * + * When used from Java, to create the Manifest, use: + * import static akka.japi.util.manifest; + * future.mapTo(manifest(MyClass.class)); */ - final def mapTo[A](implicit m: Manifest[A]): Future[A] = - mapTo[A](m.erasure.asInstanceOf[Class[A]]) - - /** - * Creates a new Future[A] which is completed with this Future's result if - * that conforms to A's erased type or a ClassCastException otherwise. - */ - final def mapTo[A](clazz: Class[A]): Future[A] = { + final def mapTo[A](implicit m: Manifest[A]): Future[A] = { val fa = Promise[A]() onComplete { case l: Left[_, _] ⇒ fa complete l.asInstanceOf[Either[Throwable, A]] case Right(t) ⇒ fa complete (try { - Right(BoxedType(clazz).cast(t).asInstanceOf[A]) + Right(BoxedType(m.erasure).cast(t).asInstanceOf[A]) } catch { case e: ClassCastException ⇒ Left(e) }) @@ -825,6 +822,11 @@ final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val exe case Right(r) ⇒ r } } + +/** + * This class contains bridge classes between Scala and Java. + * Internal use only. + */ object japi { @deprecated("Do not use this directly, use subclasses of this", "2.0") class CallbackBridge[-T] extends PartialFunction[T, Unit] { @@ -853,37 +855,125 @@ object japi { } } +/** + * Callback for when a Future is completed successfully + * SAM (Single Abstract Method) class + * + * Java API + */ abstract class OnSuccess[-T] extends japi.CallbackBridge[T] { protected final override def internal(result: T) = onSuccess(result) + + /** + * This method will be invoked once when/if a Future that this callback is registered on + * becomes successfully completed + */ def onSuccess(result: T): Unit } +/** + * Callback for when a Future is completed with a failure + * SAM (Single Abstract Method) class + * + * Java API + */ abstract class OnFailure extends japi.CallbackBridge[Throwable] { protected final override def internal(failure: Throwable) = onFailure(failure) + + /** + * This method will be invoked once when/if a Future that this callback is registered on + * becomes completed with a failure + */ def onFailure(failure: Throwable): Unit } +/** + * Callback for when a Future is completed with either failure or a success + * SAM (Single Abstract Method) class + * + * Java API + */ abstract class OnComplete[-T] extends japi.CallbackBridge[Either[Throwable, T]] { protected final override def internal(value: Either[Throwable, T]): Unit = value match { case Left(t) ⇒ onComplete(t, null.asInstanceOf[T]) case Right(r) ⇒ onComplete(null, r) } + + /** + * This method will be invoked once when/if a Future that this callback is registered on + * becomes completed with a failure or a success. + * In the case of success then "failure" will be null, and in the case of failure the "success" will be null. + */ def onComplete(failure: Throwable, success: T): Unit } +/** + * Callback for the Future.recover operation that conditionally turns failures into successes. + * + * SAM (Single Abstract Method) class + * + * Java API + */ abstract class Recover[+T] extends japi.RecoverBridge[T] { protected final override def internal(result: Throwable): T = recover(result) + + /** + * This method will be invoked once when/if the Future this recover callback is registered on + * becomes completed with a failure. + * + * @returns a successful value for the passed in failure + * @throws the passed in failure to propagate it. + * + * Java API + */ + @throws(classOf[Throwable]) def recover(failure: Throwable): T } +/** + * Callback for the Future.filter operation that creates a new Future which will + * conditionally contain the success of another Future. + * + * SAM (Single Abstract Method) class + * Java API + */ abstract class Filter[-T] extends japi.BooleanFunctionBridge[T] { override final def internal(t: T): Boolean = filter(t) + + /** + * This method will be invoked once when/if a Future that this callback is registered on + * becomes completed with a success. + * + * @returns true if the successful value should be propagated to the new Future or not + */ def filter(result: T): Boolean } +/** + * Callback for the Future.foreach operation that will be invoked if the Future that this callback + * is registered on becomes completed with a success. This method is essentially the same operation + * as onSuccess. + * + * SAM (Single Abstract Method) class + * Java API + */ abstract class Foreach[-T] extends japi.UnitFunctionBridge[T] { override final def internal(t: T): Unit = each(t) + + /** + * This method will be invoked once when/if a Future that this callback is registered on + * becomes successfully completed + */ def each(result: T): Unit } +/** + * Callback for the Future.map and Future.flatMap operations that will be invoked + * if the Future that this callback is registered on becomes completed with a success. + * This callback is the equivalent of an akka.japi.Function + * + * SAM (Single Abstract Method) class + * + * Java API + */ abstract class Mapper[-T, +R] extends scala.runtime.AbstractFunction1[T, R] diff --git a/akka-actor/src/main/scala/akka/japi/JavaAPI.scala b/akka-actor/src/main/scala/akka/japi/JavaAPI.scala index e414d0fee6..94a347f653 100644 --- a/akka-actor/src/main/scala/akka/japi/JavaAPI.scala +++ b/akka-actor/src/main/scala/akka/japi/JavaAPI.scala @@ -119,3 +119,10 @@ object Option { implicit def java2ScalaOption[A](o: Option[A]): scala.Option[A] = o.asScala implicit def scala2JavaOption[A](o: scala.Option[A]): Option[A] = if (o.isDefined) some(o.get) else none } + +object util { + /** + * Given a Class returns a Scala Manifest of that Class + */ + def manifest[T](clazz: Class[T]): Manifest[T] = Manifest.classType(clazz) +} From 1ebdcaca1adc728165bb1f4e5dd7622fa72e6d33 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 26 Jan 2012 17:47:31 +0100 Subject: [PATCH 06/10] Fixes after review --- .../src/test/java/akka/dispatch/JavaFutureTests.java | 8 ++++---- akka-actor/src/main/scala/akka/dispatch/Future.scala | 2 +- akka-actor/src/main/scala/akka/japi/JavaAPI.scala | 5 ++++- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java b/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java index ca21b9a6fc..4ccdd46dc1 100644 --- a/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java +++ b/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java @@ -14,7 +14,7 @@ import java.util.LinkedList; import java.lang.Iterable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import static akka.japi.util.manifest; +import static akka.japi.Util.manifest; import akka.testkit.AkkaSpec; @@ -272,7 +272,7 @@ public class JavaFutureTests { } @Test - public void BlockMustBeCallable() { + public void blockMustBeCallable() { Promise p = Futures.promise(system.dispatcher()); Duration d = Duration.create(1, TimeUnit.SECONDS); p.success("foo"); @@ -281,7 +281,7 @@ public class JavaFutureTests { } @Test - public void MapToMustBeCallable() { + public void mapToMustBeCallable() { Promise p = Futures.promise(system.dispatcher()); Future f = p.future().mapTo(manifest(String.class)); Duration d = Duration.create(1, TimeUnit.SECONDS); @@ -291,7 +291,7 @@ public class JavaFutureTests { } @Test - public void RecoverToMustBeCallable() { + public void recoverToMustBeCallable() { final IllegalStateException fail = new IllegalStateException("OHNOES"); Promise p = Futures.promise(system.dispatcher()); Future f = p.future().recover(new Recover() { diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 70768133a0..c6fff48f34 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -505,7 +505,7 @@ sealed trait Future[+T] extends Await.Awaitable[T] { * that conforms to A's erased type or a ClassCastException otherwise. * * When used from Java, to create the Manifest, use: - * import static akka.japi.util.manifest; + * import static akka.japi.Util.manifest; * future.mapTo(manifest(MyClass.class)); */ final def mapTo[A](implicit m: Manifest[A]): Future[A] = { diff --git a/akka-actor/src/main/scala/akka/japi/JavaAPI.scala b/akka-actor/src/main/scala/akka/japi/JavaAPI.scala index 94a347f653..47ce667759 100644 --- a/akka-actor/src/main/scala/akka/japi/JavaAPI.scala +++ b/akka-actor/src/main/scala/akka/japi/JavaAPI.scala @@ -120,7 +120,10 @@ object Option { implicit def scala2JavaOption[A](o: scala.Option[A]): Option[A] = if (o.isDefined) some(o.get) else none } -object util { +/** + * This class hold common utilities for Java + */ +object Util { /** * Given a Class returns a Scala Manifest of that Class */ From 1a5e7590ea719988ca45c6c01379e8d1d8bca712 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 30 Jan 2012 17:32:24 +0100 Subject: [PATCH 07/10] Adding tests for tryRecover and andThen --- .../test/scala/akka/dispatch/FutureSpec.scala | 28 +++++++++- .../src/main/scala/akka/dispatch/Future.scala | 51 +++++++++++++------ 2 files changed, 63 insertions(+), 16 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index 0e162708d3..62e3cb8e22 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -13,10 +13,10 @@ import akka.testkit.AkkaSpec import org.scalatest.junit.JUnitSuite import akka.testkit.DefaultTimeout import akka.testkit.TestLatch -import java.util.concurrent.{ TimeoutException, TimeUnit, CountDownLatch } import scala.runtime.NonLocalReturnControl import akka.pattern.ask import java.lang.{ IllegalStateException, ArithmeticException } +import java.util.concurrent._ object FutureSpec { class TestActor extends Actor { @@ -302,6 +302,32 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa } } + "tryRecover from exceptions" in { + val o = new IllegalStateException("original") + val r = new IllegalStateException("recovered") + + intercept[IllegalStateException] { + Await.result(Promise.failed[String](o) tryRecover { case _ if false == true ⇒ Promise.successful("yay!") }, timeout.duration) + } must be(o) + + Await.result(Promise.failed[String](o) tryRecover { case _ ⇒ Promise.successful("yay!") }, timeout.duration) must equal("yay!") + + intercept[IllegalStateException] { + Await.result(Promise.failed[String](o) tryRecover { case _ ⇒ Promise.failed[String](r) }, timeout.duration) + } must be(r) + } + + "andThen like a boss" in { + val q = new LinkedBlockingQueue[Int] + for (i ← 1 to 1000) { + Await.result(Future { q.add(1); 3 } andThen { case _ ⇒ q.add(2) } andThen { case Right(0) ⇒ q.add(Int.MaxValue) } andThen { case _ ⇒ q.add(3); }, timeout.duration) must be(3) + q.poll() must be(1) + q.poll() must be(2) + q.poll() must be(3) + q.clear() + } + } + "firstCompletedOf" in { val futures = Vector.fill[Future[Int]](10)(Promise[Int]()) :+ Promise.successful[Int](5) Await.result(Future.firstCompletedOf(futures), timeout.duration) must be(5) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 4918253002..90eb6a0b8d 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -362,7 +362,7 @@ sealed trait Future[+T] extends Await.Awaitable[T] { case Right(r) ⇒ that onSuccess { case r2 ⇒ p success ((r, r2)) } } that onFailure { case f ⇒ p failure f } - p + p.future } /** @@ -435,7 +435,7 @@ sealed trait Future[+T] extends Await.Awaitable[T] { case Left(t) ⇒ p success t case Right(r) ⇒ p failure new NoSuchElementException("Future.failed not completed with a throwable. Instead completed with: " + r) } - p + p.future } /** @@ -448,7 +448,7 @@ sealed trait Future[+T] extends Await.Awaitable[T] { case r @ Right(_) ⇒ p complete r case _ ⇒ p completeWith that } - p + p.future } /** @@ -463,26 +463,26 @@ sealed trait Future[+T] extends Await.Awaitable[T] { * */ final def recover[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = { - val future = Promise[A]() + val p = Promise[A]() onComplete { - case Left(e) if pf isDefinedAt e ⇒ future.complete(try { Right(pf(e)) } catch { case x: Exception ⇒ Left(x) }) - case otherwise ⇒ future complete otherwise + case Left(e) if pf isDefinedAt e ⇒ p.complete(try { Right(pf(e)) } catch { case x: Exception ⇒ Left(x) }) + case otherwise ⇒ p complete otherwise } - future + p.future } /** - * Creates a new future that will handle any matching throwable that this - * future might contain by assigning it a value of another future. + * Returns a new Future that will, in case this future fails, + * be completed with the resulting Future of the given PartialFunction, + * if the given PartialFunction matches the failure of the original Future. * - * If there is no match, or if this future contains - * a valid result then the new future will contain the same result. + * If the PartialFunction throws, that Throwable will be propagated to the returned Future. * * Example: * * {{{ * val f = Future { Int.MaxValue } - * future (6 / 0) tryRecover { case e: ArithmeticException => f } // result: Int.MaxValue + * Future (6 / 0) tryRecover { case e: ArithmeticException => f } // result: Int.MaxValue * }}} */ def tryRecover[U >: T](pf: PartialFunction[Throwable, Future[U]]): Future[U] = { @@ -497,6 +497,27 @@ sealed trait Future[+T] extends Await.Awaitable[T] { p.future } + /** + * Returns a new Future that will contain the completed result of this Future, + * and which will invoke the supplied PartialFunction when completed. + * + * This allows for establishing order of side-effects. + * + * {{{ + * Future { 5 } andThen { + * case something => assert(something is awesome) + * } andThen { + * case Left(t) => handleProblem(t) + * case Right(v) => dealWithSuccess(v) + * } + * }}} + */ + def andThen[U](pf: PartialFunction[Either[Throwable, T], U]): Future[T] = { + val p = Promise[T]() + onComplete { case r ⇒ try if (pf isDefinedAt r) pf(r) finally p complete r } + p.future + } + /** * Creates a new Future by applying a function to the successful result of * this Future. If this Future is completed with an exception then the new @@ -545,7 +566,7 @@ sealed trait Future[+T] extends Await.Awaitable[T] { case e: ClassCastException ⇒ Left(e) }) } - fa + fa.future } /** @@ -576,7 +597,7 @@ sealed trait Future[+T] extends Await.Awaitable[T] { logError("Future.flatMap", e) } } - p + p.future } /** @@ -616,7 +637,7 @@ sealed trait Future[+T] extends Await.Awaitable[T] { Left(e) }) } - p + p.future } protected def logError(msg: String, problem: Throwable): Unit = { From 32b5e5314600f8339d48754ad7d5d2433b09b7ce Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 30 Jan 2012 17:42:50 +0100 Subject: [PATCH 08/10] Renaming tests to reflect current APIs --- .../src/test/scala/akka/dispatch/FutureSpec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index 62e3cb8e22..89735d1a94 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -881,7 +881,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa "be completed" in { f((future, _) ⇒ future must be('completed)) } "contain a value" in { f((future, result) ⇒ future.value must be(Some(Right(result)))) } "return result with 'get'" in { f((future, result) ⇒ Await.result(future, timeout.duration) must be(result)) } - "return result with 'Await.sync'" in { f((future, result) ⇒ Await.result(future, timeout.duration) must be(result)) } + "return result with 'Await.result'" in { f((future, result) ⇒ Await.result(future, timeout.duration) must be(result)) } "not timeout" in { f((future, _) ⇒ Await.ready(future, 0 millis)) } "filter result" in { f { (future, result) ⇒ @@ -932,7 +932,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa }) } "throw exception with 'get'" in { f((future, message) ⇒ (evaluating { Await.result(future, timeout.duration) } must produce[E]).getMessage must be(message)) } - "throw exception with 'Await.sync'" in { f((future, message) ⇒ (evaluating { Await.result(future, timeout.duration) } must produce[E]).getMessage must be(message)) } + "throw exception with 'Await.result'" in { f((future, message) ⇒ (evaluating { Await.result(future, timeout.duration) } must produce[E]).getMessage must be(message)) } "retain exception with filter" in { f { (future, message) ⇒ (evaluating { Await.result(future filter (_ ⇒ true), timeout.duration) } must produce[E]).getMessage must be(message) From e32adebfd95cdeb9a12c28849a5733823093a042 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 31 Jan 2012 15:23:00 +0100 Subject: [PATCH 09/10] Adding doc to andThen and tryRecover --- .../code/akka/docs/future/FutureDocSpec.scala | 29 +++++++++++++++++++ akka-docs/scala/futures.rst | 18 ++++++++++++ 2 files changed, 47 insertions(+) diff --git a/akka-docs/scala/code/akka/docs/future/FutureDocSpec.scala b/akka-docs/scala/code/akka/docs/future/FutureDocSpec.scala index 175fc08ff5..023bdd8df7 100644 --- a/akka-docs/scala/code/akka/docs/future/FutureDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/future/FutureDocSpec.scala @@ -13,6 +13,7 @@ import akka.dispatch.Future import akka.dispatch.Await import akka.util.duration._ import akka.dispatch.Promise +import java.lang.IllegalStateException object FutureDocSpec { @@ -266,6 +267,19 @@ class FutureDocSpec extends AkkaSpec { Await.result(future, 1 second) must be(0) } + "demonstrate usage of tryRecover" in { + implicit val timeout = system.settings.ActorTimeout + val actor = system.actorOf(Props[MyActor]) + val msg1 = -1 + //#try-recover + val future = akka.pattern.ask(actor, msg1) tryRecover { + case e: ArithmeticException ⇒ Promise.successful(0) + case foo: IllegalArgumentException ⇒ Promise.failed[Int](new IllegalStateException("All br0ken!")) + } + //#try-recover + Await.result(future, 1 second) must be(0) + } + "demonstrate usage of zip" in { val future1 = Future { "foo" } val future2 = Future { "bar" } @@ -275,6 +289,21 @@ class FutureDocSpec extends AkkaSpec { Await.result(future3, 1 second) must be("foo bar") } + "demonstrate usage of andThen" in { + def loadPage(s: String) = s + val url = "foo bar" + def log(cause: Throwable) = () + def watchSomeTV = () + //#and-then + val result = Future { loadPage(url) } andThen { + case Left(exception) ⇒ log(exception) + } andThen { + case _ ⇒ watchSomeTV + } + //#and-then + Await.result(result, 1 second) must be("foo bar") + } + "demonstrate usage of or" in { val future1 = Future { "foo" } val future2 = Future { "bar" } diff --git a/akka-docs/scala/futures.rst b/akka-docs/scala/futures.rst index c46db30927..38edee51af 100644 --- a/akka-docs/scala/futures.rst +++ b/akka-docs/scala/futures.rst @@ -198,6 +198,18 @@ For this Akka supports ``onComplete``, ``onSuccess`` and ``onFailure``, of which .. includecode:: code/akka/docs/future/FutureDocSpec.scala :include: onComplete +Ordering +-------- + +Since callbacks are executed in any order and potentially in parallel, +it can be tricky at the times when you need sequential ordering of operations. +But there's a solution! And it's name is ``andThen``, and it creates a new Future with +the specified callback, a Future that will have the same result as the Future it's called on, +which allows for ordering like in the following sample: + +.. includecode:: code/akka/docs/future/FutureDocSpec.scala + :include: and-then + Auxiliary methods ----------------- @@ -232,3 +244,9 @@ our ``Future`` would have a result of 0. The ``recover`` method works very simil so multiple ``Exception``\s can be handled in this manner, and if an ``Exception`` is not handled this way it will behave as if we hadn't used the ``recover`` method. +You can also use the ``tryRecover`` method, which has the same relationship to ``recover`` as ``flatMap` has to ``map``, +and is use like this: + +.. includecode:: code/akka/docs/future/FutureDocSpec.scala + :include: try-recover + From aa1c7ea9b9fa9a48fd525ef043e100ba80b0edbe Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 31 Jan 2012 16:00:46 +0100 Subject: [PATCH 10/10] Adding java documentation for andThen, recover and tryRecover --- .../akka/docs/future/FutureDocTestBase.java | 63 +++++++++++++++++++ akka-docs/java/futures.rst | 34 +++++++++- 2 files changed, 95 insertions(+), 2 deletions(-) diff --git a/akka-docs/java/code/akka/docs/future/FutureDocTestBase.java b/akka-docs/java/code/akka/docs/future/FutureDocTestBase.java index 8ecfccbeac..d9a5308050 100644 --- a/akka-docs/java/code/akka/docs/future/FutureDocTestBase.java +++ b/akka-docs/java/code/akka/docs/future/FutureDocTestBase.java @@ -315,6 +315,69 @@ public class FutureDocTestBase { //#filter } + public void sendToTheInternetz(String s) { + + } + + public void sendToIssueTracker(Throwable t) { + + } + + @Test public void useAndThen() { + //#and-then + Future future1 = Futures.successful("value", system.dispatcher()). + andThen(new OnComplete() { + public void onComplete(Throwable failure, String result) { + if (failure != null) sendToIssueTracker(failure); + } + }).andThen(new OnComplete() { + public void onComplete(Throwable failure, String result) { + if (result != null) sendToTheInternetz(result); + } + }); + //#and-then + } + + @Test public void useRecover() { + //#recover + Future future = future(new Callable() { + public Integer call() { + return 1 / 0; + } + }, system.dispatcher()).recover(new Recover() { + public Integer recover(Throwable problem) throws Throwable { + if (problem instanceof ArithmeticException) return 0; + else throw problem; + } + }); + int result = Await.result(future, Duration.create(1, SECONDS)); + assertEquals(result, 0); + //#recover + } + + @Test public void useTryRecover() { + //#try-recover + Future future = future(new Callable() { + public Integer call() { + return 1 / 0; + } + }, system.dispatcher()).tryRecover(new Recover>() { + public Future recover(Throwable problem) throws Throwable { + if (problem instanceof ArithmeticException) { + return future(new Callable() { + public Integer call() { + return 0; + } + }, system.dispatcher()); + } + else throw problem; + } + }); + int result = Await.result(future, Duration.create(1, SECONDS)); + assertEquals(result, 0); + //#try-recover + } + @Test public void useOnSuccessOnFailureAndOnComplete() { { Future future = Futures.successful("foo", system.dispatcher()); diff --git a/akka-docs/java/futures.rst b/akka-docs/java/futures.rst index e9b743535a..a75fb21ba5 100644 --- a/akka-docs/java/futures.rst +++ b/akka-docs/java/futures.rst @@ -67,7 +67,7 @@ These allow you to create 'pipelines' or 'streams' that the result will travel t Future is a Monad ^^^^^^^^^^^^^^^^^ -The first method for working with ``Future`` functionally is ``map``. This method takes a ``Function`` which performs +The first method for working with ``Future`` functionally is ``map``. This method takes a ``Mapper`` which performs some operation on the result of the ``Future``, and returning a new result. The return value of the ``map`` method is another ``Future`` that will contain the new result: @@ -176,6 +176,18 @@ For this Akka supports ``onComplete``, ``onSuccess`` and ``onFailure``, of which .. includecode:: code/akka/docs/future/FutureDocTestBase.java :include: onComplete +Ordering +-------- + +Since callbacks are executed in any order and potentially in parallel, +it can be tricky at the times when you need sequential ordering of operations. +But there's a solution! And it's name is ``andThen``, and it creates a new Future with +the specified callback, a Future that will have the same result as the Future it's called on, +which allows for ordering like in the following sample: + +.. includecode:: code/akka/docs/future/FutureDocTestBase.java + :include: and-then + Auxiliary methods ----------------- @@ -197,4 +209,22 @@ Exceptions Since the result of a ``Future`` is created concurrently to the rest of the program, exceptions must be handled differently. It doesn't matter if an ``UntypedActor`` or the dispatcher is completing the ``Future``, if an ``Exception`` is caught the ``Future`` will contain it instead of a valid result. If a ``Future`` does contain an ``Exception``, -calling ``Await.result`` will cause it to be thrown again so it can be handled properly. \ No newline at end of file +calling ``Await.result`` will cause it to be thrown again so it can be handled properly. + +It is also possible to handle an ``Exception`` by returning a different result. +This is done with the ``recover`` method. For example: + +.. includecode:: code/akka/docs/future/FutureDocTestBase.java + :include: recover + +In this example, if the actor replied with a ``akka.actor.Status.Failure`` containing the ``ArithmeticException``, +our ``Future`` would have a result of 0. The ``recover`` method works very similarly to the standard try/catch blocks, +so multiple ``Exception``\s can be handled in this manner, and if an ``Exception`` is not handled this way +it will behave as if we hadn't used the ``recover`` method. + +You can also use the ``tryRecover`` method, which has the same relationship to ``recover`` as ``flatMap` has to ``map``, +and is use like this: + +.. includecode:: code/akka/docs/future/FutureDocTestBase.java + :include: try-recover +