diff --git a/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java b/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java index a87b7933d8..812ff052a2 100644 --- a/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java +++ b/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java @@ -14,6 +14,7 @@ import java.util.LinkedList; import java.lang.Iterable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import static akka.japi.Util.manifest; import akka.testkit.AkkaSpec; @@ -45,7 +46,7 @@ public class JavaFutureTests { } }, system.dispatcher()); - Future f2 = f1.map(new Function() { + Future f2 = f1.map(new Mapper() { public String apply(String s) { return s + " World"; } @@ -59,8 +60,8 @@ public class JavaFutureTests { final CountDownLatch latch = new CountDownLatch(1); Promise cf = Futures.promise(system.dispatcher()); Future f = cf; - f.onSuccess(new Procedure() { - public void apply(String result) { + f.onSuccess(new OnSuccess() { + public void onSuccess(String result) { if (result.equals("foo")) latch.countDown(); } @@ -76,8 +77,8 @@ public class JavaFutureTests { final CountDownLatch latch = new CountDownLatch(1); Promise cf = Futures.promise(system.dispatcher()); Future f = cf; - f.onFailure(new Procedure() { - public void apply(Throwable t) { + f.onFailure(new OnFailure() { + public void onFailure(Throwable t) { if (t instanceof NullPointerException) latch.countDown(); } @@ -94,8 +95,8 @@ public class JavaFutureTests { final CountDownLatch latch = new CountDownLatch(1); Promise cf = Futures.promise(system.dispatcher()); Future f = cf; - f.onComplete(new Procedure2() { - public void apply(Throwable t, String r) { + f.onComplete(new OnComplete() { + public void onComplete(Throwable t, String r) { latch.countDown(); } }); @@ -110,8 +111,8 @@ public class JavaFutureTests { final CountDownLatch latch = new CountDownLatch(1); Promise cf = Futures.promise(system.dispatcher()); Future f = cf; - f.foreach(new Procedure() { - public void apply(String future) { + f.foreach(new Foreach() { + public void each(String future) { latch.countDown(); } }); @@ -127,7 +128,7 @@ public class JavaFutureTests { Promise cf = Futures.promise(system.dispatcher()); cf.success("1000"); Future f = cf; - Future r = f.flatMap(new Function>() { + Future r = f.flatMap(new Mapper>() { public Future apply(String r) { latch.countDown(); Promise cf = Futures.promise(system.dispatcher()); @@ -146,8 +147,8 @@ public class JavaFutureTests { final CountDownLatch latch = new CountDownLatch(1); Promise cf = Futures.promise(system.dispatcher()); Future f = cf; - Future r = f.filter(new Function() { - public Boolean apply(String r) { + Future r = f.filter(new Filter() { + public boolean filter(String r) { latch.countDown(); return r.equals("foo"); } @@ -267,15 +268,55 @@ public class JavaFutureTests { } }, system.dispatcher()); - assertEquals(expect, Await.result(f, timeout)); + assertEquals(expect, Await.result(f, timeout).get()); } @Test - public void BlockMustBeCallable() { + public void blockMustBeCallable() { Promise p = Futures.promise(system.dispatcher()); Duration d = Duration.create(1, TimeUnit.SECONDS); p.success("foo"); Await.ready(p, d); assertEquals(Await.result(p, d), "foo"); } + + @Test + public void mapToMustBeCallable() { + Promise p = Futures.promise(system.dispatcher()); + Future f = p.future().mapTo(manifest(String.class)); + Duration d = Duration.create(1, TimeUnit.SECONDS); + p.success("foo"); + Await.ready(p, d); + assertEquals(Await.result(p, d), "foo"); + } + + @Test + public void recoverToMustBeCallable() { + final IllegalStateException fail = new IllegalStateException("OHNOES"); + Promise p = Futures.promise(system.dispatcher()); + Future f = p.future().recover(new Recover() { + public Object recover(Throwable t) throws Throwable { + if (t == fail) return "foo"; + else throw t; + } + }); + Duration d = Duration.create(1, TimeUnit.SECONDS); + p.failure(fail); + assertEquals(Await.result(f, d), "foo"); + } + + @Test + public void tryRecoverToMustBeCallable() { + final IllegalStateException fail = new IllegalStateException("OHNOES"); + Promise p = Futures.promise(system.dispatcher()); + Future f = p.future().tryRecover(new Recover>() { + public Future recover(Throwable t) throws Throwable { + if (t == fail) return Futures.successful("foo", system.dispatcher()).future(); + else throw t; + } + }); + Duration d = Duration.create(1, TimeUnit.SECONDS); + p.failure(fail); + assertEquals(Await.result(f, d), "foo"); + } } diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index 71db22cd9a..89735d1a94 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -13,10 +13,10 @@ import akka.testkit.AkkaSpec import org.scalatest.junit.JUnitSuite import akka.testkit.DefaultTimeout import akka.testkit.TestLatch -import java.util.concurrent.{ TimeoutException, TimeUnit, CountDownLatch } import scala.runtime.NonLocalReturnControl import akka.pattern.ask import java.lang.{ IllegalStateException, ArithmeticException } +import java.util.concurrent._ object FutureSpec { class TestActor extends Actor { @@ -39,7 +39,6 @@ object FutureSpec { } } -@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class JavaFutureSpec extends JavaFutureTests with JUnitSuite @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -303,6 +302,32 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa } } + "tryRecover from exceptions" in { + val o = new IllegalStateException("original") + val r = new IllegalStateException("recovered") + + intercept[IllegalStateException] { + Await.result(Promise.failed[String](o) tryRecover { case _ if false == true ⇒ Promise.successful("yay!") }, timeout.duration) + } must be(o) + + Await.result(Promise.failed[String](o) tryRecover { case _ ⇒ Promise.successful("yay!") }, timeout.duration) must equal("yay!") + + intercept[IllegalStateException] { + Await.result(Promise.failed[String](o) tryRecover { case _ ⇒ Promise.failed[String](r) }, timeout.duration) + } must be(r) + } + + "andThen like a boss" in { + val q = new LinkedBlockingQueue[Int] + for (i ← 1 to 1000) { + Await.result(Future { q.add(1); 3 } andThen { case _ ⇒ q.add(2) } andThen { case Right(0) ⇒ q.add(Int.MaxValue) } andThen { case _ ⇒ q.add(3); }, timeout.duration) must be(3) + q.poll() must be(1) + q.poll() must be(2) + q.poll() must be(3) + q.clear() + } + } + "firstCompletedOf" in { val futures = Vector.fill[Future[Int]](10)(Promise[Int]()) :+ Promise.successful[Int](5) Await.result(Future.firstCompletedOf(futures), timeout.duration) must be(5) @@ -856,7 +881,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa "be completed" in { f((future, _) ⇒ future must be('completed)) } "contain a value" in { f((future, result) ⇒ future.value must be(Some(Right(result)))) } "return result with 'get'" in { f((future, result) ⇒ Await.result(future, timeout.duration) must be(result)) } - "return result with 'Await.sync'" in { f((future, result) ⇒ Await.result(future, timeout.duration) must be(result)) } + "return result with 'Await.result'" in { f((future, result) ⇒ Await.result(future, timeout.duration) must be(result)) } "not timeout" in { f((future, _) ⇒ Await.ready(future, 0 millis)) } "filter result" in { f { (future, result) ⇒ @@ -907,7 +932,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa }) } "throw exception with 'get'" in { f((future, message) ⇒ (evaluating { Await.result(future, timeout.duration) } must produce[E]).getMessage must be(message)) } - "throw exception with 'Await.sync'" in { f((future, message) ⇒ (evaluating { Await.result(future, timeout.duration) } must produce[E]).getMessage must be(message)) } + "throw exception with 'Await.result'" in { f((future, message) ⇒ (evaluating { Await.result(future, timeout.duration) } must produce[E]).getMessage must be(message)) } "retain exception with filter" in { f { (future, message) ⇒ (evaluating { Await.result(future filter (_ ⇒ true), timeout.duration) } must produce[E]).getMessage must be(message) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 495faba5d6..90eb6a0b8d 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -340,9 +340,9 @@ object Future { } } -sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { +sealed trait Future[+T] extends Await.Awaitable[T] { - implicit def executor: ExecutionContext + protected implicit def executor: ExecutionContext protected final def resolve[X](source: Either[Throwable, X]): Either[Throwable, X] = source match { case Left(t: scala.runtime.NonLocalReturnControl[_]) ⇒ Right(t.value.asInstanceOf[X]) @@ -362,7 +362,7 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { case Right(r) ⇒ that onSuccess { case r2 ⇒ p success ((r, r2)) } } that onFailure { case f ⇒ p failure f } - p + p.future } /** @@ -435,7 +435,7 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { case Left(t) ⇒ p success t case Right(r) ⇒ p failure new NoSuchElementException("Future.failed not completed with a throwable. Instead completed with: " + r) } - p + p.future } /** @@ -448,7 +448,7 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { case r @ Right(_) ⇒ p complete r case _ ⇒ p completeWith that } - p + p.future } /** @@ -463,12 +463,59 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { * */ final def recover[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = { - val future = Promise[A]() + val p = Promise[A]() onComplete { - case Left(e) if pf isDefinedAt e ⇒ future.complete(try { Right(pf(e)) } catch { case x: Exception ⇒ Left(x) }) - case otherwise ⇒ future complete otherwise + case Left(e) if pf isDefinedAt e ⇒ p.complete(try { Right(pf(e)) } catch { case x: Exception ⇒ Left(x) }) + case otherwise ⇒ p complete otherwise } - future + p.future + } + + /** + * Returns a new Future that will, in case this future fails, + * be completed with the resulting Future of the given PartialFunction, + * if the given PartialFunction matches the failure of the original Future. + * + * If the PartialFunction throws, that Throwable will be propagated to the returned Future. + * + * Example: + * + * {{{ + * val f = Future { Int.MaxValue } + * Future (6 / 0) tryRecover { case e: ArithmeticException => f } // result: Int.MaxValue + * }}} + */ + def tryRecover[U >: T](pf: PartialFunction[Throwable, Future[U]]): Future[U] = { + val p = Promise[U]() + + onComplete { + case Left(t) if pf isDefinedAt t ⇒ + try { p completeWith pf(t) } catch { case t: Throwable ⇒ p complete resolve(Left(t)) } + case otherwise ⇒ p complete otherwise + } + + p.future + } + + /** + * Returns a new Future that will contain the completed result of this Future, + * and which will invoke the supplied PartialFunction when completed. + * + * This allows for establishing order of side-effects. + * + * {{{ + * Future { 5 } andThen { + * case something => assert(something is awesome) + * } andThen { + * case Left(t) => handleProblem(t) + * case Right(v) => dealWithSuccess(v) + * } + * }}} + */ + def andThen[U](pf: PartialFunction[Either[Throwable, T], U]): Future[T] = { + val p = Promise[T]() + onComplete { case r ⇒ try if (pf isDefinedAt r) pf(r) finally p complete r } + p.future } /** @@ -503,6 +550,10 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { /** * Creates a new Future[A] which is completed with this Future's result if * that conforms to A's erased type or a ClassCastException otherwise. + * + * When used from Java, to create the Manifest, use: + * import static akka.japi.Util.manifest; + * future.mapTo(manifest(MyClass.class)); */ final def mapTo[A](implicit m: Manifest[A]): Future[A] = { val fa = Promise[A]() @@ -515,7 +566,7 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { case e: ClassCastException ⇒ Left(e) }) } - fa + fa.future } /** @@ -546,7 +597,7 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { logError("Future.flatMap", e) } } - p + p.future } /** @@ -586,7 +637,7 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { Left(e) }) } - p + p.future } protected def logError(msg: String, problem: Throwable): Unit = { @@ -818,3 +869,158 @@ final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val exe case Right(r) ⇒ r } } + +/** + * This class contains bridge classes between Scala and Java. + * Internal use only. + */ +object japi { + @deprecated("Do not use this directly, use subclasses of this", "2.0") + class CallbackBridge[-T] extends PartialFunction[T, Unit] { + override final def isDefinedAt(t: T): Boolean = true + override final def apply(t: T): Unit = internal(t) + protected def internal(result: T): Unit = () + } + + @deprecated("Do not use this directly, use 'Recover'", "2.0") + class RecoverBridge[+T] extends PartialFunction[Throwable, T] { + override final def isDefinedAt(t: Throwable): Boolean = true + override final def apply(t: Throwable): T = internal(t) + protected def internal(result: Throwable): T = null.asInstanceOf[T] + } + + @deprecated("Do not use this directly, use subclasses of this", "2.0") + class BooleanFunctionBridge[-T] extends scala.Function1[T, Boolean] { + override final def apply(t: T): Boolean = internal(t) + protected def internal(result: T): Boolean = false + } + + @deprecated("Do not use this directly, use subclasses of this", "2.0") + class UnitFunctionBridge[-T] extends (T ⇒ Unit) { + override final def apply(t: T): Unit = internal(t) + protected def internal(result: T): Unit = () + } +} + +/** + * Callback for when a Future is completed successfully + * SAM (Single Abstract Method) class + * + * Java API + */ +abstract class OnSuccess[-T] extends japi.CallbackBridge[T] { + protected final override def internal(result: T) = onSuccess(result) + + /** + * This method will be invoked once when/if a Future that this callback is registered on + * becomes successfully completed + */ + def onSuccess(result: T): Unit +} + +/** + * Callback for when a Future is completed with a failure + * SAM (Single Abstract Method) class + * + * Java API + */ +abstract class OnFailure extends japi.CallbackBridge[Throwable] { + protected final override def internal(failure: Throwable) = onFailure(failure) + + /** + * This method will be invoked once when/if a Future that this callback is registered on + * becomes completed with a failure + */ + def onFailure(failure: Throwable): Unit +} + +/** + * Callback for when a Future is completed with either failure or a success + * SAM (Single Abstract Method) class + * + * Java API + */ +abstract class OnComplete[-T] extends japi.CallbackBridge[Either[Throwable, T]] { + protected final override def internal(value: Either[Throwable, T]): Unit = value match { + case Left(t) ⇒ onComplete(t, null.asInstanceOf[T]) + case Right(r) ⇒ onComplete(null, r) + } + + /** + * This method will be invoked once when/if a Future that this callback is registered on + * becomes completed with a failure or a success. + * In the case of success then "failure" will be null, and in the case of failure the "success" will be null. + */ + def onComplete(failure: Throwable, success: T): Unit +} + +/** + * Callback for the Future.recover operation that conditionally turns failures into successes. + * + * SAM (Single Abstract Method) class + * + * Java API + */ +abstract class Recover[+T] extends japi.RecoverBridge[T] { + protected final override def internal(result: Throwable): T = recover(result) + + /** + * This method will be invoked once when/if the Future this recover callback is registered on + * becomes completed with a failure. + * + * @returns a successful value for the passed in failure + * @throws the passed in failure to propagate it. + * + * Java API + */ + @throws(classOf[Throwable]) + def recover(failure: Throwable): T +} + +/** + * Callback for the Future.filter operation that creates a new Future which will + * conditionally contain the success of another Future. + * + * SAM (Single Abstract Method) class + * Java API + */ +abstract class Filter[-T] extends japi.BooleanFunctionBridge[T] { + override final def internal(t: T): Boolean = filter(t) + + /** + * This method will be invoked once when/if a Future that this callback is registered on + * becomes completed with a success. + * + * @returns true if the successful value should be propagated to the new Future or not + */ + def filter(result: T): Boolean +} + +/** + * Callback for the Future.foreach operation that will be invoked if the Future that this callback + * is registered on becomes completed with a success. This method is essentially the same operation + * as onSuccess. + * + * SAM (Single Abstract Method) class + * Java API + */ +abstract class Foreach[-T] extends japi.UnitFunctionBridge[T] { + override final def internal(t: T): Unit = each(t) + + /** + * This method will be invoked once when/if a Future that this callback is registered on + * becomes successfully completed + */ + def each(result: T): Unit +} + +/** + * Callback for the Future.map and Future.flatMap operations that will be invoked + * if the Future that this callback is registered on becomes completed with a success. + * This callback is the equivalent of an akka.japi.Function + * + * SAM (Single Abstract Method) class + * + * Java API + */ +abstract class Mapper[-T, +R] extends scala.runtime.AbstractFunction1[T, R] diff --git a/akka-actor/src/main/scala/akka/dispatch/japi/Future.scala b/akka-actor/src/main/scala/akka/dispatch/japi/Future.scala deleted file mode 100644 index a237c0c647..0000000000 --- a/akka-actor/src/main/scala/akka/dispatch/japi/Future.scala +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ -package akka.dispatch.japi - -import akka.japi.{ Procedure2, Procedure, Function ⇒ JFunc } - -/* Java API */ -trait Future[+T] { self: akka.dispatch.Future[T] ⇒ - /** - * Asynchronously called when this Future gets a successful result - */ - private[japi] final def onSuccess[A >: T](proc: Procedure[A]): this.type = self.onSuccess({ case r ⇒ proc(r.asInstanceOf[A]) }: PartialFunction[T, Unit]) - - /** - * Asynchronously called when this Future gets a failed result - */ - private[japi] final def onFailure(proc: Procedure[Throwable]): this.type = self.onFailure({ case t: Throwable ⇒ proc(t) }: PartialFunction[Throwable, Unit]) - - /** - * Asynchronously called when this future is completed with either a failed or a successful result - * In case of a success, the first parameter (Throwable) will be null - * In case of a failure, the second parameter (T) will be null - * For no reason will both be null or neither be null - */ - private[japi] final def onComplete[A >: T](proc: Procedure2[Throwable, A]): this.type = self.onComplete(_.fold(t ⇒ proc(t, null.asInstanceOf[T]), r ⇒ proc(null, r))) - - /** - * Asynchronously applies the provided function to the (if any) successful result of this Future - * Any failure of this Future will be propagated to the Future returned by this method. - */ - private[japi] final def map[A >: T, B](f: JFunc[A, B]): akka.dispatch.Future[B] = self.map(f(_)) - - /** - * Asynchronously applies the provided function to the (if any) successful result of this Future and flattens it. - * Any failure of this Future will be propagated to the Future returned by this method. - */ - private[japi] final def flatMap[A >: T, B](f: JFunc[A, akka.dispatch.Future[B]]): akka.dispatch.Future[B] = self.flatMap(f(_)) - - /** - * Asynchronously applies the provided Procedure to the (if any) successful result of this Future - * Provided Procedure will not be called in case of no-result or in case of failed result - */ - private[japi] final def foreach[A >: T](proc: Procedure[A]): Unit = self.foreach(proc(_)) - - /** - * Returns a new Future whose successful result will be the successful result of this Future if that result conforms to the provided predicate - * Any failure of this Future will be propagated to the Future returned by this method. - */ - private[japi] final def filter[A >: T](p: JFunc[A, java.lang.Boolean]): akka.dispatch.Future[A] = - self.filter((a: Any) ⇒ p(a.asInstanceOf[A])).asInstanceOf[akka.dispatch.Future[A]] - - /** - * Returns a new Future whose value will be of the specified type if it really is - * Or a failure with a ClassCastException if it wasn't. - */ - private[japi] final def mapTo[A](clazz: Class[A]): akka.dispatch.Future[A] = { - implicit val manifest: Manifest[A] = Manifest.classType(clazz) - self.mapTo[A] - } -} - diff --git a/akka-actor/src/main/scala/akka/japi/JavaAPI.scala b/akka-actor/src/main/scala/akka/japi/JavaAPI.scala index e414d0fee6..47ce667759 100644 --- a/akka-actor/src/main/scala/akka/japi/JavaAPI.scala +++ b/akka-actor/src/main/scala/akka/japi/JavaAPI.scala @@ -119,3 +119,13 @@ object Option { implicit def java2ScalaOption[A](o: Option[A]): scala.Option[A] = o.asScala implicit def scala2JavaOption[A](o: scala.Option[A]): Option[A] = if (o.isDefined) some(o.get) else none } + +/** + * This class hold common utilities for Java + */ +object Util { + /** + * Given a Class returns a Scala Manifest of that Class + */ + def manifest[T](clazz: Class[T]): Manifest[T] = Manifest.classType(clazz) +} diff --git a/akka-actor/src/main/scala/akka/util/BoxedType.scala b/akka-actor/src/main/scala/akka/util/BoxedType.scala index d2c5092be4..f5f95096d9 100644 --- a/akka-actor/src/main/scala/akka/util/BoxedType.scala +++ b/akka-actor/src/main/scala/akka/util/BoxedType.scala @@ -3,9 +3,8 @@ */ package akka.util -import java.{ lang ⇒ jl } - object BoxedType { + import java.{ lang ⇒ jl } private val toBoxed = Map[Class[_], Class[_]]( classOf[Boolean] -> classOf[jl.Boolean], @@ -18,8 +17,5 @@ object BoxedType { classOf[Double] -> classOf[jl.Double], classOf[Unit] -> classOf[scala.runtime.BoxedUnit]) - def apply(c: Class[_]): Class[_] = { - if (c.isPrimitive) toBoxed(c) else c - } - + final def apply(c: Class[_]): Class[_] = if (c.isPrimitive) toBoxed(c) else c } diff --git a/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java b/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java index ff14c4e09b..b85599c3a0 100644 --- a/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java +++ b/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java @@ -12,6 +12,7 @@ import akka.actor.Props; //#import-future import akka.dispatch.Future; import akka.dispatch.Futures; +import akka.dispatch.Mapper; import akka.dispatch.Await; import akka.util.Duration; import akka.util.Timeout; @@ -236,8 +237,8 @@ public class UntypedActorDocTestBase { futures.add(ask(actorB, "reqeest", t)); // using timeout from above final Future> aggregate = Futures.sequence(futures, system.dispatcher()); - - final Future transformed = aggregate.map(new akka.japi.Function, Result>() { + + final Future transformed = aggregate.map(new Mapper, Result>() { public Result apply(Iterable coll) { final Iterator it = coll.iterator(); final String s = (String) it.next(); diff --git a/akka-docs/java/code/akka/docs/actor/japi/FaultHandlingDocSample.java b/akka-docs/java/code/akka/docs/actor/japi/FaultHandlingDocSample.java index 265f005059..9da5c91248 100644 --- a/akka-docs/java/code/akka/docs/actor/japi/FaultHandlingDocSample.java +++ b/akka-docs/java/code/akka/docs/actor/japi/FaultHandlingDocSample.java @@ -11,6 +11,7 @@ import java.util.List; import java.util.Map; import akka.actor.*; +import akka.dispatch.Mapper; import akka.japi.Function; import akka.util.Duration; import akka.util.Timeout; @@ -19,6 +20,8 @@ import akka.event.LoggingAdapter; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import static akka.japi.Util.manifest; + import static akka.actor.SupervisorStrategy.*; import static akka.pattern.Patterns.ask; import static akka.pattern.Patterns.pipeTo; @@ -142,10 +145,12 @@ public class FaultHandlingDocSample { counterService.tell(new Increment(1), getSelf()); // Send current progress to the initial sender - pipeTo(ask(counterService, GetCurrentCount, askTimeout).map(new Function() { - public Progress apply(CurrentCount c) { - return new Progress(100.0 * c.count / totalCount); - } + pipeTo(ask(counterService, GetCurrentCount, askTimeout) + .mapTo(manifest(CurrentCount.class)) + .map(new Mapper() { + public Progress apply(CurrentCount c) { + return new Progress(100.0 * c.count / totalCount); + } }), progressListener); } else { unhandled(msg); diff --git a/akka-docs/java/code/akka/docs/future/FutureDocTestBase.java b/akka-docs/java/code/akka/docs/future/FutureDocTestBase.java index e642047709..d9a5308050 100644 --- a/akka-docs/java/code/akka/docs/future/FutureDocTestBase.java +++ b/akka-docs/java/code/akka/docs/future/FutureDocTestBase.java @@ -4,12 +4,10 @@ package akka.docs.future; //#imports1 -import akka.dispatch.Promise; +import akka.dispatch.*; import akka.japi.Procedure; import akka.japi.Procedure2; import akka.util.Timeout; -import akka.dispatch.Await; -import akka.dispatch.Future; //#imports1 @@ -57,7 +55,6 @@ import akka.actor.ActorSystem; import akka.actor.UntypedActor; import akka.actor.ActorRef; import akka.actor.Props; -import akka.dispatch.Futures; import akka.pattern.Patterns; import static org.junit.Assert.*; @@ -110,7 +107,7 @@ public class FutureDocTestBase { } }, system.dispatcher()); - Future f2 = f1.map(new Function() { + Future f2 = f1.map(new Mapper() { public Integer apply(String s) { return s.length(); } @@ -131,7 +128,7 @@ public class FutureDocTestBase { } }, system.dispatcher()); - Future f2 = f1.map(new Function() { + Future f2 = f1.map(new Mapper() { public Integer apply(String s) { return s.length(); } @@ -153,7 +150,7 @@ public class FutureDocTestBase { Thread.sleep(100); - Future f2 = f1.map(new Function() { + Future f2 = f1.map(new Mapper() { public Integer apply(String s) { return s.length(); } @@ -173,7 +170,7 @@ public class FutureDocTestBase { } }, system.dispatcher()); - Future f2 = f1.flatMap(new Function>() { + Future f2 = f1.flatMap(new Mapper>() { public Future apply(final String s) { return future(new Callable() { public Integer call() { @@ -204,7 +201,7 @@ public class FutureDocTestBase { // Find the sum of the odd numbers Future futureSum = futureListOfInts.map( - new Function, Long>() { + new Mapper, Long>() { public Long apply(Iterable ints) { long sum = 0; for (Integer i : ints) @@ -306,24 +303,87 @@ public class FutureDocTestBase { //#filter Future future1 = Futures.successful(4, system.dispatcher()); Future successfulFilter = - future1.filter(new Function() { - public Boolean apply(Integer i) { return i % 2 == 0; } + future1.filter(new Filter() { + public boolean filter(Integer i) { return i % 2 == 0; } }); Future failedFilter = - future1.filter(new Function() { - public Boolean apply(Integer i) { return i % 2 != 0; } + future1.filter(new Filter() { + public boolean filter(Integer i) { return i % 2 != 0; } }); //When filter fails, the returned Future will be failed with a scala.MatchError //#filter } + public void sendToTheInternetz(String s) { + + } + + public void sendToIssueTracker(Throwable t) { + + } + + @Test public void useAndThen() { + //#and-then + Future future1 = Futures.successful("value", system.dispatcher()). + andThen(new OnComplete() { + public void onComplete(Throwable failure, String result) { + if (failure != null) sendToIssueTracker(failure); + } + }).andThen(new OnComplete() { + public void onComplete(Throwable failure, String result) { + if (result != null) sendToTheInternetz(result); + } + }); + //#and-then + } + + @Test public void useRecover() { + //#recover + Future future = future(new Callable() { + public Integer call() { + return 1 / 0; + } + }, system.dispatcher()).recover(new Recover() { + public Integer recover(Throwable problem) throws Throwable { + if (problem instanceof ArithmeticException) return 0; + else throw problem; + } + }); + int result = Await.result(future, Duration.create(1, SECONDS)); + assertEquals(result, 0); + //#recover + } + + @Test public void useTryRecover() { + //#try-recover + Future future = future(new Callable() { + public Integer call() { + return 1 / 0; + } + }, system.dispatcher()).tryRecover(new Recover>() { + public Future recover(Throwable problem) throws Throwable { + if (problem instanceof ArithmeticException) { + return future(new Callable() { + public Integer call() { + return 0; + } + }, system.dispatcher()); + } + else throw problem; + } + }); + int result = Await.result(future, Duration.create(1, SECONDS)); + assertEquals(result, 0); + //#try-recover + } + @Test public void useOnSuccessOnFailureAndOnComplete() { { Future future = Futures.successful("foo", system.dispatcher()); //#onSuccess - future.onSuccess(new Procedure() { - public void apply(String result) { + future.onSuccess(new OnSuccess() { + public void onSuccess(String result) { if ("bar" == result) { //Do something if it resulted in "bar" } else { @@ -337,8 +397,8 @@ public class FutureDocTestBase { Future future = Futures.failed(new IllegalStateException("OHNOES"), system.dispatcher()); //#onFailure - future.onFailure( new Procedure() { - public void apply(Throwable failure) { + future.onFailure( new OnFailure() { + public void onFailure(Throwable failure) { if (failure instanceof IllegalStateException) { //Do something if it was this particular failure } else { @@ -351,8 +411,8 @@ public class FutureDocTestBase { { Future future = Futures.successful("foo", system.dispatcher()); //#onComplete - future.onComplete(new Procedure2() { - public void apply(Throwable failure, String result) { + future.onComplete(new OnComplete() { + public void onComplete(Throwable failure, String result) { if (failure != null) { //We got a failure, handle it here } else { @@ -370,7 +430,7 @@ public class FutureDocTestBase { Future future1 = Futures.successful("foo", system.dispatcher()); Future future2 = Futures.successful("bar", system.dispatcher()); Future future3 = - future1.zip(future2).map(new Function, String>() { + future1.zip(future2).map(new Mapper, String>() { public String apply(scala.Tuple2 zipped) { return zipped._1() + " " + zipped._2(); } diff --git a/akka-docs/java/futures.rst b/akka-docs/java/futures.rst index e9b743535a..a75fb21ba5 100644 --- a/akka-docs/java/futures.rst +++ b/akka-docs/java/futures.rst @@ -67,7 +67,7 @@ These allow you to create 'pipelines' or 'streams' that the result will travel t Future is a Monad ^^^^^^^^^^^^^^^^^ -The first method for working with ``Future`` functionally is ``map``. This method takes a ``Function`` which performs +The first method for working with ``Future`` functionally is ``map``. This method takes a ``Mapper`` which performs some operation on the result of the ``Future``, and returning a new result. The return value of the ``map`` method is another ``Future`` that will contain the new result: @@ -176,6 +176,18 @@ For this Akka supports ``onComplete``, ``onSuccess`` and ``onFailure``, of which .. includecode:: code/akka/docs/future/FutureDocTestBase.java :include: onComplete +Ordering +-------- + +Since callbacks are executed in any order and potentially in parallel, +it can be tricky at the times when you need sequential ordering of operations. +But there's a solution! And it's name is ``andThen``, and it creates a new Future with +the specified callback, a Future that will have the same result as the Future it's called on, +which allows for ordering like in the following sample: + +.. includecode:: code/akka/docs/future/FutureDocTestBase.java + :include: and-then + Auxiliary methods ----------------- @@ -197,4 +209,22 @@ Exceptions Since the result of a ``Future`` is created concurrently to the rest of the program, exceptions must be handled differently. It doesn't matter if an ``UntypedActor`` or the dispatcher is completing the ``Future``, if an ``Exception`` is caught the ``Future`` will contain it instead of a valid result. If a ``Future`` does contain an ``Exception``, -calling ``Await.result`` will cause it to be thrown again so it can be handled properly. \ No newline at end of file +calling ``Await.result`` will cause it to be thrown again so it can be handled properly. + +It is also possible to handle an ``Exception`` by returning a different result. +This is done with the ``recover`` method. For example: + +.. includecode:: code/akka/docs/future/FutureDocTestBase.java + :include: recover + +In this example, if the actor replied with a ``akka.actor.Status.Failure`` containing the ``ArithmeticException``, +our ``Future`` would have a result of 0. The ``recover`` method works very similarly to the standard try/catch blocks, +so multiple ``Exception``\s can be handled in this manner, and if an ``Exception`` is not handled this way +it will behave as if we hadn't used the ``recover`` method. + +You can also use the ``tryRecover`` method, which has the same relationship to ``recover`` as ``flatMap` has to ``map``, +and is use like this: + +.. includecode:: code/akka/docs/future/FutureDocTestBase.java + :include: try-recover + diff --git a/akka-docs/scala/code/akka/docs/future/FutureDocSpec.scala b/akka-docs/scala/code/akka/docs/future/FutureDocSpec.scala index 175fc08ff5..023bdd8df7 100644 --- a/akka-docs/scala/code/akka/docs/future/FutureDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/future/FutureDocSpec.scala @@ -13,6 +13,7 @@ import akka.dispatch.Future import akka.dispatch.Await import akka.util.duration._ import akka.dispatch.Promise +import java.lang.IllegalStateException object FutureDocSpec { @@ -266,6 +267,19 @@ class FutureDocSpec extends AkkaSpec { Await.result(future, 1 second) must be(0) } + "demonstrate usage of tryRecover" in { + implicit val timeout = system.settings.ActorTimeout + val actor = system.actorOf(Props[MyActor]) + val msg1 = -1 + //#try-recover + val future = akka.pattern.ask(actor, msg1) tryRecover { + case e: ArithmeticException ⇒ Promise.successful(0) + case foo: IllegalArgumentException ⇒ Promise.failed[Int](new IllegalStateException("All br0ken!")) + } + //#try-recover + Await.result(future, 1 second) must be(0) + } + "demonstrate usage of zip" in { val future1 = Future { "foo" } val future2 = Future { "bar" } @@ -275,6 +289,21 @@ class FutureDocSpec extends AkkaSpec { Await.result(future3, 1 second) must be("foo bar") } + "demonstrate usage of andThen" in { + def loadPage(s: String) = s + val url = "foo bar" + def log(cause: Throwable) = () + def watchSomeTV = () + //#and-then + val result = Future { loadPage(url) } andThen { + case Left(exception) ⇒ log(exception) + } andThen { + case _ ⇒ watchSomeTV + } + //#and-then + Await.result(result, 1 second) must be("foo bar") + } + "demonstrate usage of or" in { val future1 = Future { "foo" } val future2 = Future { "bar" } diff --git a/akka-docs/scala/futures.rst b/akka-docs/scala/futures.rst index c46db30927..38edee51af 100644 --- a/akka-docs/scala/futures.rst +++ b/akka-docs/scala/futures.rst @@ -198,6 +198,18 @@ For this Akka supports ``onComplete``, ``onSuccess`` and ``onFailure``, of which .. includecode:: code/akka/docs/future/FutureDocSpec.scala :include: onComplete +Ordering +-------- + +Since callbacks are executed in any order and potentially in parallel, +it can be tricky at the times when you need sequential ordering of operations. +But there's a solution! And it's name is ``andThen``, and it creates a new Future with +the specified callback, a Future that will have the same result as the Future it's called on, +which allows for ordering like in the following sample: + +.. includecode:: code/akka/docs/future/FutureDocSpec.scala + :include: and-then + Auxiliary methods ----------------- @@ -232,3 +244,9 @@ our ``Future`` would have a result of 0. The ``recover`` method works very simil so multiple ``Exception``\s can be handled in this manner, and if an ``Exception`` is not handled this way it will behave as if we hadn't used the ``recover`` method. +You can also use the ``tryRecover`` method, which has the same relationship to ``recover`` as ``flatMap` has to ``map``, +and is use like this: + +.. includecode:: code/akka/docs/future/FutureDocSpec.scala + :include: try-recover +