Changing Akka Futures to better conform to spec

This commit is contained in:
Viktor Klang 2011-12-14 01:24:55 +01:00
parent 48adb3c2b6
commit b3e5da2377
5 changed files with 123 additions and 118 deletions

View file

@ -3,6 +3,7 @@ package akka.dispatch;
import akka.actor.Timeout;
import akka.actor.ActorSystem;
import akka.japi.*;
import akka.util.Duration;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -14,10 +15,6 @@ import java.lang.Iterable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import akka.japi.Function;
import akka.japi.Function2;
import akka.japi.Procedure;
import akka.japi.Option;
import akka.testkit.AkkaSpec;
public class JavaFutureTests {
@ -97,8 +94,8 @@ public class JavaFutureTests {
final CountDownLatch latch = new CountDownLatch(1);
Promise<String> cf = Futures.promise(system.dispatcher());
Future<String> f = cf;
f.onComplete(new Procedure<Future<String>>() {
public void apply(akka.dispatch.Future<String> future) {
f.onComplete(new Procedure2<Throwable,String>() {
public void apply(Throwable t, String r) {
latch.countDown();
}
});

View file

@ -330,11 +330,9 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
if (m.returnsFuture_?) {
val s = sender
m(me).asInstanceOf[Future[Any]] onComplete {
_.value.get match {
case Left(f) s ! Status.Failure(f)
case Right(r) s ! r
}
}
} else {
sender ! m(me)
}

View file

@ -30,8 +30,10 @@ package object actor {
implicit def future2actor[T](f: akka.dispatch.Future[T]) = new {
def pipeTo(actor: ActorRef): this.type = {
def send(f: akka.dispatch.Future[T]) { f.value.get.fold(f actor ! Status.Failure(f), r actor ! r) }
if (f.isCompleted) send(f) else f onComplete send
f onComplete {
case Right(r) actor ! r
case Left(f) actor ! Status.Failure(f)
}
this
}
}

View file

@ -79,19 +79,19 @@ object Futures {
* the result will be the first failure of any of the futures, or any failure in the actual fold,
* or the result of the fold.
*/
def fold[T <: AnyRef, R <: AnyRef](zero: R, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R], dispatcher: MessageDispatcher): Future[R] =
def fold[T <: AnyRef, R <: AnyRef](zero: R, futures: JIterable[Future[T]], fun: akka.japi.Function2[R, T, R], dispatcher: MessageDispatcher): Future[R] =
Future.fold(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(zero)(fun.apply _)(dispatcher)
/**
* Java API.
* Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
*/
def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, T], dispatcher: MessageDispatcher): Future[R] =
def reduce[T <: AnyRef, R >: T](futures: JIterable[Future[T]], fun: akka.japi.Function2[R, T, T], dispatcher: MessageDispatcher): Future[R] =
Future.reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(fun.apply _)(dispatcher)
/**
* Java API.
* Simple version of Future.traverse. Transforms a java.lang.Iterable[Future[A]] into a Future[java.lang.Iterable[A]].
* Simple version of Future.traverse. Transforms a JIterable[Future[A]] into a Future[JIterable[A]].
* Useful for reducing many Futures into a single Future.
*/
def sequence[A](in: JIterable[Future[A]], dispatcher: MessageDispatcher): Future[JIterable[A]] = {
@ -105,7 +105,7 @@ object Futures {
/**
* Java API.
* Transforms a java.lang.Iterable[A] into a Future[java.lang.Iterable[B]] using the provided Function A Future[B].
* Transforms a JIterable[A] into a Future[JIterable[B]] using the provided Function A Future[B].
* This is useful for performing a parallel map. For example, to apply a function to all items of a list
* in parallel.
*/
@ -152,10 +152,10 @@ object Future {
/**
* Returns a Future to the result of the first future in the list that is completed
*/
def firstCompletedOf[T](futures: Iterable[Future[T]])(implicit dispatcher: MessageDispatcher): Future[T] = {
def firstCompletedOf[T](futures: Traversable[Future[T]])(implicit dispatcher: MessageDispatcher): Future[T] = {
val futureResult = Promise[T]()
val completeFirst: Future[T] Unit = _.value.foreach(futureResult complete _)
val completeFirst: Either[Throwable, T] Unit = futureResult complete _
futures.foreach(_ onComplete completeFirst)
futureResult
@ -164,13 +164,13 @@ object Future {
/**
* Returns a Future that will hold the optional result of the first Future with a result that matches the predicate
*/
def find[T](futures: Iterable[Future[T]])(predicate: T Boolean)(implicit dispatcher: MessageDispatcher): Future[Option[T]] = {
def find[T](futures: Traversable[Future[T]])(predicate: T Boolean)(implicit dispatcher: MessageDispatcher): Future[Option[T]] = {
if (futures.isEmpty) Promise.successful[Option[T]](None)
else {
val result = Promise[Option[T]]()
val ref = new AtomicInteger(futures.size)
val search: Future[T] Unit = f try {
f.value.get match {
val search: Either[Throwable, T] Unit = v try {
v match {
case Right(r) if (predicate(r)) result success Some(r)
case _
}
@ -195,7 +195,7 @@ object Future {
* val result = Futures.fold(0)(futures)(_ + _).await.result
* </pre>
*/
def fold[T, R](futures: Iterable[Future[T]])(zero: R)(foldFun: (R, T) R)(implicit dispatcher: MessageDispatcher): Future[R] = {
def fold[T, R](futures: Traversable[Future[T]])(zero: R)(foldFun: (R, T) R)(implicit dispatcher: MessageDispatcher): Future[R] = {
if (futures.isEmpty) Promise.successful(zero)
else {
val result = Promise[R]()
@ -203,8 +203,8 @@ object Future {
val done = new Switch(false)
val allDone = futures.size
val aggregate: Future[T] Unit = f if (done.isOff && !result.isCompleted) {
f.value.get match {
val aggregate: Either[Throwable, T] Unit = v if (done.isOff && !result.isCompleted) {
v match {
case Right(value)
val added = results add value
if (added && results.size == allDone) { //Only one thread can get here
@ -240,25 +240,12 @@ object Future {
* Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
* Example:
* <pre>
* val result = Futures.reduce(futures)(_ + _).await.result
* val result = Await.result(Futures.reduce(futures)(_ + _), 5 seconds)
* </pre>
*/
def reduce[T, R >: T](futures: Iterable[Future[T]])(op: (R, T) T)(implicit dispatcher: MessageDispatcher): Future[R] = {
def reduce[T, R >: T](futures: Traversable[Future[T]])(op: (R, T) T)(implicit dispatcher: MessageDispatcher): Future[R] = {
if (futures.isEmpty) Promise[R].failure(new UnsupportedOperationException("empty reduce left"))
else {
val result = Promise[R]()
val seedFound = new AtomicBoolean(false)
val seedFold: Future[T] Unit = f {
if (seedFound.compareAndSet(false, true)) { //Only the first completed should trigger the fold
f.value.get match {
case Right(value) result.completeWith(fold(futures.filterNot(_ eq f))(value)(op))
case Left(exception) result.failure(exception)
}
}
}
for (f futures) f onComplete seedFold //Attach the listener to the Futures
result
}
else sequence(futures).map(_ reduce op)
}
/**
* Transforms a Traversable[A] into a Future[Traversable[B]] using the provided Function A Future[B].
@ -394,7 +381,7 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
* callbacks may be registered; there is no guarantee that they will be
* executed in a particular order.
*/
def onComplete(func: Future[T] Unit): this.type
def onComplete(func: Either[Throwable, T] Unit): this.type
/**
* When the future is completed with a valid result, apply the provided
@ -406,12 +393,10 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
* }
* </pre>
*/
final def onSuccess(pf: PartialFunction[T, Unit]): this.type = onComplete {
_.value match {
case Some(Right(r)) if pf isDefinedAt r pf(r)
final def onSuccess[U](pf: PartialFunction[T, U]): this.type = onComplete {
case Right(r) if pf isDefinedAt r pf(r)
case _
}
}
/**
* When the future is completed with an exception, apply the provided
@ -422,12 +407,10 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
* }
* </pre>
*/
final def onFailure(pf: PartialFunction[Throwable, Unit]): this.type = onComplete {
_.value match {
case Some(Left(ex)) if pf isDefinedAt ex pf(ex)
final def onFailure[U](pf: PartialFunction[Throwable, U]): this.type = onComplete {
case Left(ex) if pf isDefinedAt ex pf(ex)
case _
}
}
/**
* Returns a failure projection of this Future
@ -436,10 +419,10 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
*/
final def failed: Future[Throwable] = {
val p = Promise[Throwable]()
this.onComplete(_.value.get match {
this.onComplete {
case Left(t) p success t
case Right(r) p failure new NoSuchElementException("Future.failed not completed with a throwable. Instead completed with: " + r)
})
}
p
}
@ -464,11 +447,9 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
final def recover[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = {
val future = Promise[A]()
onComplete {
_.value.get match {
case Left(e) if pf isDefinedAt e future.complete(try { Right(pf(e)) } catch { case x: Exception Left(x) })
case otherwise future complete otherwise
}
}
future
}
@ -488,7 +469,6 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
final def map[A](f: T A): Future[A] = {
val future = Promise[A]()
onComplete {
_.value.get match {
case l: Left[_, _] future complete l.asInstanceOf[Either[Throwable, A]]
case Right(res)
future complete (try {
@ -499,7 +479,6 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
Left(e)
})
}
}
future
}
@ -509,15 +488,13 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
*/
final def mapTo[A](implicit m: Manifest[A]): Future[A] = {
val fa = Promise[A]()
onComplete { ft
fa complete (ft.value.get match {
case l: Left[_, _] l.asInstanceOf[Either[Throwable, A]]
onComplete {
case l: Left[_, _] fa complete l.asInstanceOf[Either[Throwable, A]]
case Right(t)
try {
fa complete (try {
Right(BoxedType(m.erasure).cast(t).asInstanceOf[A])
} catch {
case e: ClassCastException Left(e)
}
})
}
fa
@ -538,29 +515,26 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
* </pre>
*/
final def flatMap[A](f: T Future[A]): Future[A] = {
val future = Promise[A]()
val p = Promise[A]()
onComplete {
_.value.get match {
case l: Left[_, _] future complete l.asInstanceOf[Either[Throwable, A]]
case Right(r) try {
future.completeWith(f(r))
case l: Left[_, _] p complete l.asInstanceOf[Either[Throwable, A]]
case Right(r)
try {
p completeWith f(r)
} catch {
case e: Exception
p complete Left(e)
dispatcher.prerequisites.eventStream.publish(Error(e, "Future.flatMap", e.getMessage))
future complete Left(e)
}
}
}
future
p
}
final def foreach(f: T Unit): Unit = onComplete {
_.value.get match {
case Right(r) f(r)
case _
}
}
final def withFilter(p: T Boolean) = new FutureWithFilter[T](this, p)
@ -571,21 +545,19 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
def withFilter(q: A Boolean): FutureWithFilter[A] = new FutureWithFilter[A](self, x p(x) && q(x))
}
final def filter(p: T Boolean): Future[T] = {
val future = Promise[T]()
final def filter(pred: T Boolean): Future[T] = {
val p = Promise[T]()
onComplete {
_.value.get match {
case l: Left[_, _] future complete l.asInstanceOf[Either[Throwable, T]]
case r @ Right(res) future complete (try {
if (p(res)) r else Left(new MatchError(res))
case l: Left[_, _] p complete l.asInstanceOf[Either[Throwable, T]]
case r @ Right(res) p complete (try {
if (pred(res)) r else Left(new MatchError(res))
} catch {
case e: Exception
dispatcher.prerequisites.eventStream.publish(Error(e, "Future.filter", e.getMessage))
Left(e)
})
}
}
future
p
}
}
@ -648,7 +620,7 @@ trait Promise[T] extends Future[T] {
* @return this.
*/
final def completeWith(other: Future[T]): this.type = {
other onComplete { f complete(f.value.get) }
other onComplete { complete(_) }
this
}
@ -656,9 +628,10 @@ trait Promise[T] extends Future[T] {
final def <<(other: Future[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] Future[Any])
val fr = Promise[Any]()
this completeWith other onComplete { f
val thisPromise = this
thisPromise completeWith other onComplete { v
try {
fr completeWith cont(f)
fr completeWith cont(thisPromise)
} catch {
case e: Exception
dispatcher.prerequisites.eventStream.publish(Error(e, "Promise.completeWith", e.getMessage))
@ -670,7 +643,8 @@ trait Promise[T] extends Future[T] {
final def <<(stream: PromiseStreamOut[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] Future[Any])
val fr = Promise[Any]()
stream.dequeue(this).onComplete { f
val f = stream.dequeue(this)
f.onComplete { _
try {
fr completeWith cont(f)
} catch {
@ -692,7 +666,7 @@ private[dispatch] object DefaultPromise {
*/
sealed trait FState[+T] { def value: Option[Either[Throwable, T]] }
case class Pending[T](listeners: List[Future[T] Unit] = Nil) extends FState[T] {
case class Pending[T](listeners: List[Either[Throwable, T] Unit] = Nil) extends FState[T] {
def value: Option[Either[Throwable, T]] = None
}
case class Success[T](value: Option[Either[Throwable, T]] = None) extends FState[T] {
@ -752,10 +726,10 @@ class DefaultPromise[T](implicit val dispatcher: MessageDispatcher) extends Abst
protected final def getState: FState[T] = updater.get(this)
def tryComplete(value: Either[Throwable, T]): Boolean = {
val callbacks: List[Future[T] Unit] = {
val callbacks: List[Either[Throwable, T] Unit] = {
try {
@tailrec
def tryComplete: List[Future[T] Unit] = {
def tryComplete: List[Either[Throwable, T] Unit] = {
val cur = getState
cur match {
@ -778,7 +752,7 @@ class DefaultPromise[T](implicit val dispatcher: MessageDispatcher) extends Abst
}
}
def onComplete(func: Future[T] Unit): this.type = {
def onComplete(func: Either[Throwable, T] Unit): this.type = {
@tailrec //Returns whether the future has already been completed or not
def tryAddCallback(): Boolean = {
val cur = getState
@ -795,9 +769,8 @@ class DefaultPromise[T](implicit val dispatcher: MessageDispatcher) extends Abst
this
}
private def notifyCompleted(func: Future[T] Unit) {
// TODO FIXME catching all and continue isn't good for OOME, ticket #1418
try { func(this) } catch { case e dispatcher.prerequisites.eventStream.publish(Error(e, "Future", "Future onComplete-callback raised an exception")) }
private final def notifyCompleted(func: Either[Throwable, T] Unit) {
try { func(this.value.get) } catch { case e dispatcher.prerequisites.eventStream.publish(Error(e, "Future", "Future onComplete-callback raised an exception")) }
}
}
@ -809,8 +782,9 @@ final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val dis
val value = Some(suppliedValue)
def tryComplete(value: Either[Throwable, T]): Boolean = true
def onComplete(func: Future[T] Unit): this.type = {
Future dispatchTask (() func(this))
def onComplete(func: Either[Throwable, T] Unit): this.type = {
val completedAs = value.get
Future dispatchTask (() func(completedAs))
this
}

View file

@ -3,17 +3,51 @@
*/
package akka.dispatch.japi
import akka.japi.{ Procedure, Function JFunc, Option JOption }
import akka.actor.Timeout
import akka.japi.{ Procedure2, Procedure, Function JFunc, Option JOption }
/* Java API */
trait Future[+T] { self: akka.dispatch.Future[T]
/**
* Asynchronously called when this Future gets a successful result
*/
private[japi] final def onSuccess[A >: T](proc: Procedure[A]): this.type = self.onSuccess({ case r proc(r.asInstanceOf[A]) }: PartialFunction[T, Unit])
/**
* Asynchronously called when this Future gets a failed result
*/
private[japi] final def onFailure(proc: Procedure[Throwable]): this.type = self.onFailure({ case t: Throwable proc(t) }: PartialFunction[Throwable, Unit])
private[japi] final def onComplete[A >: T](proc: Procedure[akka.dispatch.Future[A]]): this.type = self.onComplete(proc(_))
/**
* Asynchronously called when this future is completed with either a failed or a successful result
* In case of a success, the first parameter (Throwable) will be null
* In case of a failure, the second parameter (T) will be null
* For no reason will both be null or neither be null
*/
private[japi] final def onComplete[A >: T](proc: Procedure2[Throwable, A]): this.type = self.onComplete(_.fold(t proc(t, null.asInstanceOf[T]), r proc(null, r)))
/**
* Asynchronously applies the provided function to the (if any) successful result of this Future
* Any failure of this Future will be propagated to the Future returned by this method.
*/
private[japi] final def map[A >: T, B](f: JFunc[A, B]): akka.dispatch.Future[B] = self.map(f(_))
/**
* Asynchronously applies the provided function to the (if any) successful result of this Future and flattens it.
* Any failure of this Future will be propagated to the Future returned by this method.
*/
private[japi] final def flatMap[A >: T, B](f: JFunc[A, akka.dispatch.Future[B]]): akka.dispatch.Future[B] = self.flatMap(f(_))
/**
* Asynchronously applies the provided Procedure to the (if any) successful result of this Future
* Provided Procedure will not be called in case of no-result or in case of failed result
*/
private[japi] final def foreach[A >: T](proc: Procedure[A]): Unit = self.foreach(proc(_))
/**
* Returns a new Future whose successful result will be the successful result of this Future if that result conforms to the provided predicate
* Any failure of this Future will be propagated to the Future returned by this method.
*/
private[japi] final def filter[A >: T](p: JFunc[A, java.lang.Boolean]): akka.dispatch.Future[A] =
self.filter((a: Any) p(a.asInstanceOf[A])).asInstanceOf[akka.dispatch.Future[A]]
}