diff --git a/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java b/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java index 84eddf5ef7..b3c82bc957 100644 --- a/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java +++ b/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java @@ -3,6 +3,7 @@ package akka.dispatch; import akka.actor.Timeout; import akka.actor.ActorSystem; +import akka.japi.*; import akka.util.Duration; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -14,10 +15,6 @@ import java.lang.Iterable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import akka.japi.Function; -import akka.japi.Function2; -import akka.japi.Procedure; -import akka.japi.Option; import akka.testkit.AkkaSpec; public class JavaFutureTests { @@ -97,8 +94,8 @@ public class JavaFutureTests { final CountDownLatch latch = new CountDownLatch(1); Promise cf = Futures.promise(system.dispatcher()); Future f = cf; - f.onComplete(new Procedure>() { - public void apply(akka.dispatch.Future future) { + f.onComplete(new Procedure2() { + public void apply(Throwable t, String r) { latch.countDown(); } }); diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 549cac1424..5777f84277 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -330,10 +330,8 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi if (m.returnsFuture_?) { val s = sender m(me).asInstanceOf[Future[Any]] onComplete { - _.value.get match { - case Left(f) ⇒ s ! Status.Failure(f) - case Right(r) ⇒ s ! r - } + case Left(f) ⇒ s ! Status.Failure(f) + case Right(r) ⇒ s ! r } } else { sender ! m(me) diff --git a/akka-actor/src/main/scala/akka/actor/package.scala b/akka-actor/src/main/scala/akka/actor/package.scala index 569c66f03e..cfe5bc1b0d 100644 --- a/akka-actor/src/main/scala/akka/actor/package.scala +++ b/akka-actor/src/main/scala/akka/actor/package.scala @@ -30,8 +30,10 @@ package object actor { implicit def future2actor[T](f: akka.dispatch.Future[T]) = new { def pipeTo(actor: ActorRef): this.type = { - def send(f: akka.dispatch.Future[T]) { f.value.get.fold(f ⇒ actor ! Status.Failure(f), r ⇒ actor ! r) } - if (f.isCompleted) send(f) else f onComplete send + f onComplete { + case Right(r) ⇒ actor ! r + case Left(f) ⇒ actor ! Status.Failure(f) + } this } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 43ddf4e6d1..56cd2058d9 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -79,19 +79,19 @@ object Futures { * the result will be the first failure of any of the futures, or any failure in the actual fold, * or the result of the fold. */ - def fold[T <: AnyRef, R <: AnyRef](zero: R, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R], dispatcher: MessageDispatcher): Future[R] = + def fold[T <: AnyRef, R <: AnyRef](zero: R, futures: JIterable[Future[T]], fun: akka.japi.Function2[R, T, R], dispatcher: MessageDispatcher): Future[R] = Future.fold(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(zero)(fun.apply _)(dispatcher) /** * Java API. * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first */ - def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, T], dispatcher: MessageDispatcher): Future[R] = + def reduce[T <: AnyRef, R >: T](futures: JIterable[Future[T]], fun: akka.japi.Function2[R, T, T], dispatcher: MessageDispatcher): Future[R] = Future.reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(fun.apply _)(dispatcher) /** * Java API. - * Simple version of Future.traverse. Transforms a java.lang.Iterable[Future[A]] into a Future[java.lang.Iterable[A]]. + * Simple version of Future.traverse. Transforms a JIterable[Future[A]] into a Future[JIterable[A]]. * Useful for reducing many Futures into a single Future. */ def sequence[A](in: JIterable[Future[A]], dispatcher: MessageDispatcher): Future[JIterable[A]] = { @@ -105,7 +105,7 @@ object Futures { /** * Java API. - * Transforms a java.lang.Iterable[A] into a Future[java.lang.Iterable[B]] using the provided Function A ⇒ Future[B]. + * Transforms a JIterable[A] into a Future[JIterable[B]] using the provided Function A ⇒ Future[B]. * This is useful for performing a parallel map. For example, to apply a function to all items of a list * in parallel. */ @@ -152,10 +152,10 @@ object Future { /** * Returns a Future to the result of the first future in the list that is completed */ - def firstCompletedOf[T](futures: Iterable[Future[T]])(implicit dispatcher: MessageDispatcher): Future[T] = { + def firstCompletedOf[T](futures: Traversable[Future[T]])(implicit dispatcher: MessageDispatcher): Future[T] = { val futureResult = Promise[T]() - val completeFirst: Future[T] ⇒ Unit = _.value.foreach(futureResult complete _) + val completeFirst: Either[Throwable, T] ⇒ Unit = futureResult complete _ futures.foreach(_ onComplete completeFirst) futureResult @@ -164,13 +164,13 @@ object Future { /** * Returns a Future that will hold the optional result of the first Future with a result that matches the predicate */ - def find[T](futures: Iterable[Future[T]])(predicate: T ⇒ Boolean)(implicit dispatcher: MessageDispatcher): Future[Option[T]] = { + def find[T](futures: Traversable[Future[T]])(predicate: T ⇒ Boolean)(implicit dispatcher: MessageDispatcher): Future[Option[T]] = { if (futures.isEmpty) Promise.successful[Option[T]](None) else { val result = Promise[Option[T]]() val ref = new AtomicInteger(futures.size) - val search: Future[T] ⇒ Unit = f ⇒ try { - f.value.get match { + val search: Either[Throwable, T] ⇒ Unit = v ⇒ try { + v match { case Right(r) ⇒ if (predicate(r)) result success Some(r) case _ ⇒ } @@ -195,7 +195,7 @@ object Future { * val result = Futures.fold(0)(futures)(_ + _).await.result * */ - def fold[T, R](futures: Iterable[Future[T]])(zero: R)(foldFun: (R, T) ⇒ R)(implicit dispatcher: MessageDispatcher): Future[R] = { + def fold[T, R](futures: Traversable[Future[T]])(zero: R)(foldFun: (R, T) ⇒ R)(implicit dispatcher: MessageDispatcher): Future[R] = { if (futures.isEmpty) Promise.successful(zero) else { val result = Promise[R]() @@ -203,8 +203,8 @@ object Future { val done = new Switch(false) val allDone = futures.size - val aggregate: Future[T] ⇒ Unit = f ⇒ if (done.isOff && !result.isCompleted) { - f.value.get match { + val aggregate: Either[Throwable, T] ⇒ Unit = v ⇒ if (done.isOff && !result.isCompleted) { + v match { case Right(value) ⇒ val added = results add value if (added && results.size == allDone) { //Only one thread can get here @@ -240,25 +240,12 @@ object Future { * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first * Example: *
-   *   val result = Futures.reduce(futures)(_ + _).await.result
+   *   val result = Await.result(Futures.reduce(futures)(_ + _), 5 seconds)
    * 
*/ - def reduce[T, R >: T](futures: Iterable[Future[T]])(op: (R, T) ⇒ T)(implicit dispatcher: MessageDispatcher): Future[R] = { + def reduce[T, R >: T](futures: Traversable[Future[T]])(op: (R, T) ⇒ T)(implicit dispatcher: MessageDispatcher): Future[R] = { if (futures.isEmpty) Promise[R].failure(new UnsupportedOperationException("empty reduce left")) - else { - val result = Promise[R]() - val seedFound = new AtomicBoolean(false) - val seedFold: Future[T] ⇒ Unit = f ⇒ { - if (seedFound.compareAndSet(false, true)) { //Only the first completed should trigger the fold - f.value.get match { - case Right(value) ⇒ result.completeWith(fold(futures.filterNot(_ eq f))(value)(op)) - case Left(exception) ⇒ result.failure(exception) - } - } - } - for (f ← futures) f onComplete seedFold //Attach the listener to the Futures - result - } + else sequence(futures).map(_ reduce op) } /** * Transforms a Traversable[A] into a Future[Traversable[B]] using the provided Function A ⇒ Future[B]. @@ -394,7 +381,7 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { * callbacks may be registered; there is no guarantee that they will be * executed in a particular order. */ - def onComplete(func: Future[T] ⇒ Unit): this.type + def onComplete(func: Either[Throwable, T] ⇒ Unit): this.type /** * When the future is completed with a valid result, apply the provided @@ -406,11 +393,9 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { * } * */ - final def onSuccess(pf: PartialFunction[T, Unit]): this.type = onComplete { - _.value match { - case Some(Right(r)) if pf isDefinedAt r ⇒ pf(r) - case _ ⇒ - } + final def onSuccess[U](pf: PartialFunction[T, U]): this.type = onComplete { + case Right(r) if pf isDefinedAt r ⇒ pf(r) + case _ ⇒ } /** @@ -422,11 +407,9 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { * } * */ - final def onFailure(pf: PartialFunction[Throwable, Unit]): this.type = onComplete { - _.value match { - case Some(Left(ex)) if pf isDefinedAt ex ⇒ pf(ex) - case _ ⇒ - } + final def onFailure[U](pf: PartialFunction[Throwable, U]): this.type = onComplete { + case Left(ex) if pf isDefinedAt ex ⇒ pf(ex) + case _ ⇒ } /** @@ -436,10 +419,10 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { */ final def failed: Future[Throwable] = { val p = Promise[Throwable]() - this.onComplete(_.value.get match { + this.onComplete { case Left(t) ⇒ p success t case Right(r) ⇒ p failure new NoSuchElementException("Future.failed not completed with a throwable. Instead completed with: " + r) - }) + } p } @@ -464,10 +447,8 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { final def recover[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = { val future = Promise[A]() onComplete { - _.value.get match { - case Left(e) if pf isDefinedAt e ⇒ future.complete(try { Right(pf(e)) } catch { case x: Exception ⇒ Left(x) }) - case otherwise ⇒ future complete otherwise - } + case Left(e) if pf isDefinedAt e ⇒ future.complete(try { Right(pf(e)) } catch { case x: Exception ⇒ Left(x) }) + case otherwise ⇒ future complete otherwise } future } @@ -488,17 +469,15 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { final def map[A](f: T ⇒ A): Future[A] = { val future = Promise[A]() onComplete { - _.value.get match { - case l: Left[_, _] ⇒ future complete l.asInstanceOf[Either[Throwable, A]] - case Right(res) ⇒ - future complete (try { - Right(f(res)) - } catch { - case e: Exception ⇒ - dispatcher.prerequisites.eventStream.publish(Error(e, "Future.map", e.getMessage)) - Left(e) - }) - } + case l: Left[_, _] ⇒ future complete l.asInstanceOf[Either[Throwable, A]] + case Right(res) ⇒ + future complete (try { + Right(f(res)) + } catch { + case e: Exception ⇒ + dispatcher.prerequisites.eventStream.publish(Error(e, "Future.map", e.getMessage)) + Left(e) + }) } future } @@ -509,16 +488,14 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { */ final def mapTo[A](implicit m: Manifest[A]): Future[A] = { val fa = Promise[A]() - onComplete { ft ⇒ - fa complete (ft.value.get match { - case l: Left[_, _] ⇒ l.asInstanceOf[Either[Throwable, A]] - case Right(t) ⇒ - try { - Right(BoxedType(m.erasure).cast(t).asInstanceOf[A]) - } catch { - case e: ClassCastException ⇒ Left(e) - } - }) + onComplete { + case l: Left[_, _] ⇒ fa complete l.asInstanceOf[Either[Throwable, A]] + case Right(t) ⇒ + fa complete (try { + Right(BoxedType(m.erasure).cast(t).asInstanceOf[A]) + } catch { + case e: ClassCastException ⇒ Left(e) + }) } fa } @@ -538,28 +515,25 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { * */ final def flatMap[A](f: T ⇒ Future[A]): Future[A] = { - val future = Promise[A]() + val p = Promise[A]() onComplete { - _.value.get match { - case l: Left[_, _] ⇒ future complete l.asInstanceOf[Either[Throwable, A]] - case Right(r) ⇒ try { - future.completeWith(f(r)) + case l: Left[_, _] ⇒ p complete l.asInstanceOf[Either[Throwable, A]] + case Right(r) ⇒ + try { + p completeWith f(r) } catch { case e: Exception ⇒ + p complete Left(e) dispatcher.prerequisites.eventStream.publish(Error(e, "Future.flatMap", e.getMessage)) - future complete Left(e) } - } } - future + p } final def foreach(f: T ⇒ Unit): Unit = onComplete { - _.value.get match { - case Right(r) ⇒ f(r) - case _ ⇒ - } + case Right(r) ⇒ f(r) + case _ ⇒ } final def withFilter(p: T ⇒ Boolean) = new FutureWithFilter[T](this, p) @@ -571,21 +545,19 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { def withFilter(q: A ⇒ Boolean): FutureWithFilter[A] = new FutureWithFilter[A](self, x ⇒ p(x) && q(x)) } - final def filter(p: T ⇒ Boolean): Future[T] = { - val future = Promise[T]() + final def filter(pred: T ⇒ Boolean): Future[T] = { + val p = Promise[T]() onComplete { - _.value.get match { - case l: Left[_, _] ⇒ future complete l.asInstanceOf[Either[Throwable, T]] - case r @ Right(res) ⇒ future complete (try { - if (p(res)) r else Left(new MatchError(res)) - } catch { - case e: Exception ⇒ - dispatcher.prerequisites.eventStream.publish(Error(e, "Future.filter", e.getMessage)) - Left(e) - }) - } + case l: Left[_, _] ⇒ p complete l.asInstanceOf[Either[Throwable, T]] + case r @ Right(res) ⇒ p complete (try { + if (pred(res)) r else Left(new MatchError(res)) + } catch { + case e: Exception ⇒ + dispatcher.prerequisites.eventStream.publish(Error(e, "Future.filter", e.getMessage)) + Left(e) + }) } - future + p } } @@ -648,7 +620,7 @@ trait Promise[T] extends Future[T] { * @return this. */ final def completeWith(other: Future[T]): this.type = { - other onComplete { f ⇒ complete(f.value.get) } + other onComplete { complete(_) } this } @@ -656,9 +628,10 @@ trait Promise[T] extends Future[T] { final def <<(other: Future[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] ⇒ Future[Any]) ⇒ val fr = Promise[Any]() - this completeWith other onComplete { f ⇒ + val thisPromise = this + thisPromise completeWith other onComplete { v ⇒ try { - fr completeWith cont(f) + fr completeWith cont(thisPromise) } catch { case e: Exception ⇒ dispatcher.prerequisites.eventStream.publish(Error(e, "Promise.completeWith", e.getMessage)) @@ -670,7 +643,8 @@ trait Promise[T] extends Future[T] { final def <<(stream: PromiseStreamOut[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] ⇒ Future[Any]) ⇒ val fr = Promise[Any]() - stream.dequeue(this).onComplete { f ⇒ + val f = stream.dequeue(this) + f.onComplete { _ ⇒ try { fr completeWith cont(f) } catch { @@ -692,7 +666,7 @@ private[dispatch] object DefaultPromise { */ sealed trait FState[+T] { def value: Option[Either[Throwable, T]] } - case class Pending[T](listeners: List[Future[T] ⇒ Unit] = Nil) extends FState[T] { + case class Pending[T](listeners: List[Either[Throwable, T] ⇒ Unit] = Nil) extends FState[T] { def value: Option[Either[Throwable, T]] = None } case class Success[T](value: Option[Either[Throwable, T]] = None) extends FState[T] { @@ -752,10 +726,10 @@ class DefaultPromise[T](implicit val dispatcher: MessageDispatcher) extends Abst protected final def getState: FState[T] = updater.get(this) def tryComplete(value: Either[Throwable, T]): Boolean = { - val callbacks: List[Future[T] ⇒ Unit] = { + val callbacks: List[Either[Throwable, T] ⇒ Unit] = { try { @tailrec - def tryComplete: List[Future[T] ⇒ Unit] = { + def tryComplete: List[Either[Throwable, T] ⇒ Unit] = { val cur = getState cur match { @@ -778,7 +752,7 @@ class DefaultPromise[T](implicit val dispatcher: MessageDispatcher) extends Abst } } - def onComplete(func: Future[T] ⇒ Unit): this.type = { + def onComplete(func: Either[Throwable, T] ⇒ Unit): this.type = { @tailrec //Returns whether the future has already been completed or not def tryAddCallback(): Boolean = { val cur = getState @@ -795,9 +769,8 @@ class DefaultPromise[T](implicit val dispatcher: MessageDispatcher) extends Abst this } - private def notifyCompleted(func: Future[T] ⇒ Unit) { - // TODO FIXME catching all and continue isn't good for OOME, ticket #1418 - try { func(this) } catch { case e ⇒ dispatcher.prerequisites.eventStream.publish(Error(e, "Future", "Future onComplete-callback raised an exception")) } + private final def notifyCompleted(func: Either[Throwable, T] ⇒ Unit) { + try { func(this.value.get) } catch { case e ⇒ dispatcher.prerequisites.eventStream.publish(Error(e, "Future", "Future onComplete-callback raised an exception")) } } } @@ -809,8 +782,9 @@ final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val dis val value = Some(suppliedValue) def tryComplete(value: Either[Throwable, T]): Boolean = true - def onComplete(func: Future[T] ⇒ Unit): this.type = { - Future dispatchTask (() ⇒ func(this)) + def onComplete(func: Either[Throwable, T] ⇒ Unit): this.type = { + val completedAs = value.get + Future dispatchTask (() ⇒ func(completedAs)) this } diff --git a/akka-actor/src/main/scala/akka/dispatch/japi/Future.scala b/akka-actor/src/main/scala/akka/dispatch/japi/Future.scala index a9b2b2482f..64852912fe 100644 --- a/akka-actor/src/main/scala/akka/dispatch/japi/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/japi/Future.scala @@ -3,17 +3,51 @@ */ package akka.dispatch.japi -import akka.japi.{ Procedure, Function ⇒ JFunc, Option ⇒ JOption } import akka.actor.Timeout +import akka.japi.{ Procedure2, Procedure, Function ⇒ JFunc, Option ⇒ JOption } /* Java API */ trait Future[+T] { self: akka.dispatch.Future[T] ⇒ + /** + * Asynchronously called when this Future gets a successful result + */ private[japi] final def onSuccess[A >: T](proc: Procedure[A]): this.type = self.onSuccess({ case r ⇒ proc(r.asInstanceOf[A]) }: PartialFunction[T, Unit]) + + /** + * Asynchronously called when this Future gets a failed result + */ private[japi] final def onFailure(proc: Procedure[Throwable]): this.type = self.onFailure({ case t: Throwable ⇒ proc(t) }: PartialFunction[Throwable, Unit]) - private[japi] final def onComplete[A >: T](proc: Procedure[akka.dispatch.Future[A]]): this.type = self.onComplete(proc(_)) + + /** + * Asynchronously called when this future is completed with either a failed or a successful result + * In case of a success, the first parameter (Throwable) will be null + * In case of a failure, the second parameter (T) will be null + * For no reason will both be null or neither be null + */ + private[japi] final def onComplete[A >: T](proc: Procedure2[Throwable, A]): this.type = self.onComplete(_.fold(t ⇒ proc(t, null.asInstanceOf[T]), r ⇒ proc(null, r))) + + /** + * Asynchronously applies the provided function to the (if any) successful result of this Future + * Any failure of this Future will be propagated to the Future returned by this method. + */ private[japi] final def map[A >: T, B](f: JFunc[A, B]): akka.dispatch.Future[B] = self.map(f(_)) + + /** + * Asynchronously applies the provided function to the (if any) successful result of this Future and flattens it. + * Any failure of this Future will be propagated to the Future returned by this method. + */ private[japi] final def flatMap[A >: T, B](f: JFunc[A, akka.dispatch.Future[B]]): akka.dispatch.Future[B] = self.flatMap(f(_)) + + /** + * Asynchronously applies the provided Procedure to the (if any) successful result of this Future + * Provided Procedure will not be called in case of no-result or in case of failed result + */ private[japi] final def foreach[A >: T](proc: Procedure[A]): Unit = self.foreach(proc(_)) + + /** + * Returns a new Future whose successful result will be the successful result of this Future if that result conforms to the provided predicate + * Any failure of this Future will be propagated to the Future returned by this method. + */ private[japi] final def filter[A >: T](p: JFunc[A, java.lang.Boolean]): akka.dispatch.Future[A] = self.filter((a: Any) ⇒ p(a.asInstanceOf[A])).asInstanceOf[akka.dispatch.Future[A]] }