2011-09-19 15:21:18 +02:00
|
|
|
|
2009-05-25 14:48:43 +02:00
|
|
|
/**
|
2011-07-14 16:03:08 +02:00
|
|
|
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
2009-05-25 14:48:43 +02:00
|
|
|
*/
|
|
|
|
|
|
2010-10-26 12:49:25 +02:00
|
|
|
package akka.dispatch
|
2009-05-25 14:48:43 +02:00
|
|
|
|
2010-10-26 12:49:25 +02:00
|
|
|
import akka.AkkaException
|
2011-10-27 12:23:01 +02:00
|
|
|
import akka.event.Logging.Error
|
2011-12-14 17:30:54 +01:00
|
|
|
import akka.util.Timeout
|
2011-08-26 17:25:18 +02:00
|
|
|
import scala.Option
|
|
|
|
|
import akka.japi.{ Procedure, Function ⇒ JFunc, Option ⇒ JOption }
|
2010-10-31 19:27:55 +01:00
|
|
|
|
2011-04-21 15:12:47 -06:00
|
|
|
import scala.util.continuations._
|
|
|
|
|
|
2011-12-02 17:13:46 +01:00
|
|
|
import java.util.concurrent.TimeUnit.{ NANOSECONDS, MILLISECONDS }
|
2011-05-18 17:25:30 +02:00
|
|
|
import java.lang.{ Iterable ⇒ JIterable }
|
|
|
|
|
import java.util.{ LinkedList ⇒ JLinkedList }
|
2011-04-27 00:38:10 +02:00
|
|
|
|
|
|
|
|
import scala.annotation.tailrec
|
2011-04-28 20:53:45 -06:00
|
|
|
import scala.collection.mutable.Stack
|
2011-07-10 20:18:43 +02:00
|
|
|
import akka.util.{ Switch, Duration, BoxedType }
|
2011-11-09 15:25:14 +01:00
|
|
|
import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicBoolean }
|
2011-12-11 00:40:52 +01:00
|
|
|
import java.util.concurrent.{ TimeoutException, ConcurrentLinkedQueue, TimeUnit, Callable }
|
2011-12-12 22:50:08 +01:00
|
|
|
import akka.dispatch.Await.CanAwait
|
2009-05-25 14:48:43 +02:00
|
|
|
|
2011-12-12 22:50:08 +01:00
|
|
|
object Await {
|
|
|
|
|
sealed trait CanAwait
|
2009-06-10 20:04:33 +02:00
|
|
|
|
2011-12-12 22:50:08 +01:00
|
|
|
trait Awaitable[+T] {
|
2011-12-11 00:40:52 +01:00
|
|
|
/**
|
|
|
|
|
* Should throw java.util.concurrent.TimeoutException if times out
|
2011-12-15 19:35:53 +01:00
|
|
|
* This method should not be called directly.
|
2011-12-11 00:40:52 +01:00
|
|
|
*/
|
2011-12-12 22:50:08 +01:00
|
|
|
def ready(atMost: Duration)(implicit permit: CanAwait): this.type
|
2011-12-11 01:29:46 +01:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Throws exceptions if cannot produce a T within the specified time
|
2011-12-15 19:35:53 +01:00
|
|
|
* This method should not be called directly.
|
2011-12-11 01:29:46 +01:00
|
|
|
*/
|
2011-12-12 22:50:08 +01:00
|
|
|
def result(atMost: Duration)(implicit permit: CanAwait): T
|
2011-12-11 00:40:52 +01:00
|
|
|
}
|
2011-02-28 16:29:03 -07:00
|
|
|
|
2011-12-19 15:05:33 +01:00
|
|
|
private[this] implicit final val permit = new CanAwait {}
|
2011-06-18 23:23:47 -06:00
|
|
|
|
2011-12-12 22:50:08 +01:00
|
|
|
def ready[T <: Awaitable[_]](awaitable: T, atMost: Duration): T = awaitable.ready(atMost)
|
|
|
|
|
def result[T](awaitable: Awaitable[T], atMost: Duration): T = awaitable.result(atMost)
|
2011-12-11 00:40:52 +01:00
|
|
|
}
|
|
|
|
|
|
2011-12-15 16:56:53 +01:00
|
|
|
/**
|
|
|
|
|
* Futures is the Java API for Futures and Promises
|
|
|
|
|
*/
|
2011-12-12 14:39:10 +01:00
|
|
|
object Futures {
|
2011-02-28 16:29:03 -07:00
|
|
|
|
2010-11-12 14:04:06 +01:00
|
|
|
/**
|
2011-03-22 17:20:35 +01:00
|
|
|
* Java API, equivalent to Future.apply
|
2010-11-12 14:04:06 +01:00
|
|
|
*/
|
2011-12-12 14:39:10 +01:00
|
|
|
def future[T](body: Callable[T], dispatcher: MessageDispatcher): Future[T] = Future(body.call)(dispatcher)
|
2011-06-18 23:23:47 -06:00
|
|
|
|
2010-10-26 16:40:09 +02:00
|
|
|
/**
|
2011-12-11 00:40:52 +01:00
|
|
|
* Java API, equivalent to Promise.apply
|
2010-10-26 16:40:09 +02:00
|
|
|
*/
|
2011-12-12 14:39:10 +01:00
|
|
|
def promise[T](dispatcher: MessageDispatcher): Promise[T] = Promise[T]()(dispatcher)
|
2010-11-12 12:54:48 +01:00
|
|
|
|
2011-12-15 16:56:53 +01:00
|
|
|
/**
|
|
|
|
|
* Java API, creates an already completed Promise with the specified exception
|
|
|
|
|
*/
|
|
|
|
|
def failed[T](exception: Throwable, dispatcher: MessageDispatcher): Promise[T] = Promise.failed(exception)(dispatcher)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Java API, Creates an already completed Promise with the specified result
|
|
|
|
|
*/
|
|
|
|
|
def successful[T](result: T, dispatcher: MessageDispatcher): Promise[T] = Promise.successful(result)(dispatcher)
|
|
|
|
|
|
2011-08-26 17:25:18 +02:00
|
|
|
/**
|
|
|
|
|
* Java API.
|
|
|
|
|
* Returns a Future that will hold the optional result of the first Future with a result that matches the predicate
|
|
|
|
|
*/
|
2011-12-12 14:39:10 +01:00
|
|
|
def find[T <: AnyRef](futures: JIterable[Future[T]], predicate: JFunc[T, java.lang.Boolean], dispatcher: MessageDispatcher): Future[JOption[T]] = {
|
|
|
|
|
Future.find[T]((scala.collection.JavaConversions.iterableAsScalaIterable(futures)))(predicate.apply(_))(dispatcher).map(JOption.fromScalaOption(_))
|
2011-08-26 17:25:18 +02:00
|
|
|
}
|
|
|
|
|
|
2011-03-22 22:12:16 +01:00
|
|
|
/**
|
2011-03-30 21:19:26 +02:00
|
|
|
* Java API.
|
2011-03-22 22:12:16 +01:00
|
|
|
* Returns a Future to the result of the first future in the list that is completed
|
|
|
|
|
*/
|
2011-12-12 14:39:10 +01:00
|
|
|
def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]], dispatcher: MessageDispatcher): Future[T] =
|
|
|
|
|
Future.firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(dispatcher)
|
2011-10-06 21:19:46 +02:00
|
|
|
|
2011-03-22 22:12:16 +01:00
|
|
|
/**
|
|
|
|
|
* Java API
|
2011-12-27 12:02:55 +01:00
|
|
|
* A non-blocking fold over the specified futures, with the start value of the given zero.
|
2011-03-22 22:12:16 +01:00
|
|
|
* The fold is performed on the thread where the last future is completed,
|
|
|
|
|
* the result will be the first failure of any of the futures, or any failure in the actual fold,
|
|
|
|
|
* or the result of the fold.
|
|
|
|
|
*/
|
2011-12-14 01:24:55 +01:00
|
|
|
def fold[T <: AnyRef, R <: AnyRef](zero: R, futures: JIterable[Future[T]], fun: akka.japi.Function2[R, T, R], dispatcher: MessageDispatcher): Future[R] =
|
2011-12-12 14:39:10 +01:00
|
|
|
Future.fold(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(zero)(fun.apply _)(dispatcher)
|
2011-06-18 23:23:47 -06:00
|
|
|
|
2011-03-22 22:12:16 +01:00
|
|
|
/**
|
2011-03-30 21:19:26 +02:00
|
|
|
* Java API.
|
2011-03-22 22:12:16 +01:00
|
|
|
* Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
|
|
|
|
|
*/
|
2011-12-14 01:24:55 +01:00
|
|
|
def reduce[T <: AnyRef, R >: T](futures: JIterable[Future[T]], fun: akka.japi.Function2[R, T, T], dispatcher: MessageDispatcher): Future[R] =
|
2011-12-12 14:39:10 +01:00
|
|
|
Future.reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(fun.apply _)(dispatcher)
|
2011-10-06 21:19:46 +02:00
|
|
|
|
2011-04-15 13:09:53 -06:00
|
|
|
/**
|
|
|
|
|
* Java API.
|
2011-12-14 01:24:55 +01:00
|
|
|
* Simple version of Future.traverse. Transforms a JIterable[Future[A]] into a Future[JIterable[A]].
|
2011-04-15 13:09:53 -06:00
|
|
|
* Useful for reducing many Futures into a single Future.
|
|
|
|
|
*/
|
2011-12-12 14:39:10 +01:00
|
|
|
def sequence[A](in: JIterable[Future[A]], dispatcher: MessageDispatcher): Future[JIterable[A]] = {
|
|
|
|
|
implicit val d = dispatcher
|
2011-05-18 17:25:30 +02:00
|
|
|
scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[A]()))((fr, fa) ⇒
|
|
|
|
|
for (r ← fr; a ← fa) yield {
|
2011-04-15 13:09:53 -06:00
|
|
|
r add a
|
|
|
|
|
r
|
|
|
|
|
})
|
2011-10-06 21:19:46 +02:00
|
|
|
}
|
2011-01-24 16:37:08 +01:00
|
|
|
|
2011-03-30 21:19:26 +02:00
|
|
|
/**
|
2011-04-15 13:09:53 -06:00
|
|
|
* Java API.
|
2011-12-14 01:24:55 +01:00
|
|
|
* Transforms a JIterable[A] into a Future[JIterable[B]] using the provided Function A ⇒ Future[B].
|
2011-04-03 10:40:06 -06:00
|
|
|
* This is useful for performing a parallel map. For example, to apply a function to all items of a list
|
2011-04-15 13:09:53 -06:00
|
|
|
* in parallel.
|
2011-03-30 21:19:26 +02:00
|
|
|
*/
|
2011-12-12 14:39:10 +01:00
|
|
|
def traverse[A, B](in: JIterable[A], fn: JFunc[A, Future[B]], dispatcher: MessageDispatcher): Future[JIterable[B]] = {
|
|
|
|
|
implicit val d = dispatcher
|
2011-05-18 17:25:30 +02:00
|
|
|
scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[B]())) { (fr, a) ⇒
|
2011-04-15 13:09:53 -06:00
|
|
|
val fb = fn(a)
|
2011-12-11 00:40:52 +01:00
|
|
|
for (r ← fr; b ← fb) yield { r add b; r }
|
2011-04-15 13:09:53 -06:00
|
|
|
}
|
2011-10-06 21:19:46 +02:00
|
|
|
}
|
2010-02-23 10:05:47 +01:00
|
|
|
}
|
|
|
|
|
|
2011-02-28 11:47:39 -07:00
|
|
|
object Future {
|
2011-04-27 01:06:08 +02:00
|
|
|
|
2011-03-22 22:12:16 +01:00
|
|
|
/**
|
|
|
|
|
* This method constructs and returns a Future that will eventually hold the result of the execution of the supplied body
|
|
|
|
|
* The execution is performed by the specified Dispatcher.
|
|
|
|
|
*/
|
2011-12-11 00:40:52 +01:00
|
|
|
def apply[T](body: ⇒ T)(implicit dispatcher: MessageDispatcher): Future[T] = {
|
|
|
|
|
val promise = Promise[T]()
|
2011-07-26 22:23:16 -06:00
|
|
|
dispatcher dispatchTask { () ⇒
|
|
|
|
|
promise complete {
|
|
|
|
|
try {
|
|
|
|
|
Right(body)
|
|
|
|
|
} catch {
|
2011-11-30 10:20:57 +01:00
|
|
|
// FIXME catching all and continue isn't good for OOME, ticket #1418
|
2011-07-26 22:23:16 -06:00
|
|
|
case e ⇒ Left(e)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
promise
|
|
|
|
|
}
|
2011-04-15 13:09:53 -06:00
|
|
|
|
|
|
|
|
import scala.collection.mutable.Builder
|
|
|
|
|
import scala.collection.generic.CanBuildFrom
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Simple version of Futures.traverse. Transforms a Traversable[Future[A]] into a Future[Traversable[A]].
|
|
|
|
|
* Useful for reducing many Futures into a single Future.
|
|
|
|
|
*/
|
2011-12-11 00:40:52 +01:00
|
|
|
def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], dispatcher: MessageDispatcher): Future[M[A]] =
|
2011-12-12 22:50:08 +01:00
|
|
|
in.foldLeft(Promise.successful(cbf(in)): Future[Builder[A, M[A]]])((fr, fa) ⇒ for (r ← fr; a ← fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result)
|
2011-10-07 15:22:36 +02:00
|
|
|
|
2011-09-08 15:54:06 +02:00
|
|
|
/**
|
|
|
|
|
* Returns a Future to the result of the first future in the list that is completed
|
|
|
|
|
*/
|
2011-12-14 01:24:55 +01:00
|
|
|
def firstCompletedOf[T](futures: Traversable[Future[T]])(implicit dispatcher: MessageDispatcher): Future[T] = {
|
2011-12-11 00:40:52 +01:00
|
|
|
val futureResult = Promise[T]()
|
2011-09-08 15:54:06 +02:00
|
|
|
|
2011-12-14 01:24:55 +01:00
|
|
|
val completeFirst: Either[Throwable, T] ⇒ Unit = futureResult complete _
|
2011-09-08 15:54:06 +02:00
|
|
|
futures.foreach(_ onComplete completeFirst)
|
|
|
|
|
|
|
|
|
|
futureResult
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns a Future that will hold the optional result of the first Future with a result that matches the predicate
|
|
|
|
|
*/
|
2011-12-14 01:24:55 +01:00
|
|
|
def find[T](futures: Traversable[Future[T]])(predicate: T ⇒ Boolean)(implicit dispatcher: MessageDispatcher): Future[Option[T]] = {
|
2011-12-12 22:50:08 +01:00
|
|
|
if (futures.isEmpty) Promise.successful[Option[T]](None)
|
2011-09-08 15:54:06 +02:00
|
|
|
else {
|
2011-12-11 00:40:52 +01:00
|
|
|
val result = Promise[Option[T]]()
|
2011-09-08 15:54:06 +02:00
|
|
|
val ref = new AtomicInteger(futures.size)
|
2011-12-14 01:24:55 +01:00
|
|
|
val search: Either[Throwable, T] ⇒ Unit = v ⇒ try {
|
|
|
|
|
v match {
|
2011-12-12 17:25:34 +01:00
|
|
|
case Right(r) ⇒ if (predicate(r)) result success Some(r)
|
2011-12-12 12:41:56 +01:00
|
|
|
case _ ⇒
|
|
|
|
|
}
|
2011-09-08 15:54:06 +02:00
|
|
|
} finally {
|
|
|
|
|
if (ref.decrementAndGet == 0)
|
2011-12-12 17:25:34 +01:00
|
|
|
result success None
|
2011-09-08 15:54:06 +02:00
|
|
|
}
|
2011-12-12 12:41:56 +01:00
|
|
|
|
2011-09-08 15:54:06 +02:00
|
|
|
futures.foreach(_ onComplete search)
|
|
|
|
|
|
|
|
|
|
result
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2011-12-27 12:02:55 +01:00
|
|
|
* A non-blocking fold over the specified futures, with the start value of the given zero.
|
2011-09-08 15:54:06 +02:00
|
|
|
* The fold is performed on the thread where the last future is completed,
|
|
|
|
|
* the result will be the first failure of any of the futures, or any failure in the actual fold,
|
|
|
|
|
* or the result of the fold.
|
|
|
|
|
* Example:
|
|
|
|
|
* <pre>
|
2011-12-27 12:02:55 +01:00
|
|
|
* val result = Await.result(Future.fold(futures)(0)(_ + _), 5 seconds)
|
2011-09-08 15:54:06 +02:00
|
|
|
* </pre>
|
|
|
|
|
*/
|
2011-12-14 01:24:55 +01:00
|
|
|
def fold[T, R](futures: Traversable[Future[T]])(zero: R)(foldFun: (R, T) ⇒ R)(implicit dispatcher: MessageDispatcher): Future[R] = {
|
2011-12-12 22:50:08 +01:00
|
|
|
if (futures.isEmpty) Promise.successful(zero)
|
2011-12-14 01:45:20 +01:00
|
|
|
else sequence(futures).map(_.foldLeft(zero)(foldFun))
|
2011-09-08 15:54:06 +02:00
|
|
|
}
|
2011-10-07 15:22:36 +02:00
|
|
|
|
2011-09-08 15:54:06 +02:00
|
|
|
/**
|
|
|
|
|
* Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
|
|
|
|
|
* Example:
|
|
|
|
|
* <pre>
|
2011-12-14 01:24:55 +01:00
|
|
|
* val result = Await.result(Futures.reduce(futures)(_ + _), 5 seconds)
|
2011-09-08 15:54:06 +02:00
|
|
|
* </pre>
|
|
|
|
|
*/
|
2011-12-14 01:24:55 +01:00
|
|
|
def reduce[T, R >: T](futures: Traversable[Future[T]])(op: (R, T) ⇒ T)(implicit dispatcher: MessageDispatcher): Future[R] = {
|
2011-12-14 01:45:20 +01:00
|
|
|
if (futures.isEmpty) Promise[R].failure(new NoSuchElementException("reduce attempted on empty collection"))
|
2011-12-14 01:24:55 +01:00
|
|
|
else sequence(futures).map(_ reduce op)
|
2011-09-08 15:54:06 +02:00
|
|
|
}
|
2011-04-15 13:09:53 -06:00
|
|
|
/**
|
2011-08-06 14:03:04 +02:00
|
|
|
* Transforms a Traversable[A] into a Future[Traversable[B]] using the provided Function A ⇒ Future[B].
|
2011-04-15 13:09:53 -06:00
|
|
|
* This is useful for performing a parallel map. For example, to apply a function to all items of a list
|
|
|
|
|
* in parallel:
|
|
|
|
|
* <pre>
|
2011-12-27 12:02:55 +01:00
|
|
|
* val myFutureList = Future.traverse(myList)(x ⇒ Future(myFunc(x)))
|
2011-04-15 13:09:53 -06:00
|
|
|
* </pre>
|
|
|
|
|
*/
|
2011-12-11 00:40:52 +01:00
|
|
|
def traverse[A, B, M[_] <: Traversable[_]](in: M[A])(fn: A ⇒ Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], dispatcher: MessageDispatcher): Future[M[B]] =
|
2011-12-12 22:50:08 +01:00
|
|
|
in.foldLeft(Promise.successful(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) ⇒
|
2011-04-15 13:09:53 -06:00
|
|
|
val fb = fn(a.asInstanceOf[A])
|
2011-05-18 17:25:30 +02:00
|
|
|
for (r ← fr; b ← fb) yield (r += b)
|
2011-04-15 13:09:53 -06:00
|
|
|
}.map(_.result)
|
2011-10-07 15:22:36 +02:00
|
|
|
|
2011-04-23 12:57:28 -06:00
|
|
|
/**
|
|
|
|
|
* Captures a block that will be transformed into 'Continuation Passing Style' using Scala's Delimited
|
|
|
|
|
* Continuations plugin.
|
|
|
|
|
*
|
|
|
|
|
* Within the block, the result of a Future may be accessed by calling Future.apply. At that point
|
|
|
|
|
* execution is suspended with the rest of the block being stored in a continuation until the result
|
|
|
|
|
* of the Future is available. If an Exception is thrown while processing, it will be contained
|
|
|
|
|
* within the resulting Future.
|
|
|
|
|
*
|
|
|
|
|
* This allows working with Futures in an imperative style without blocking for each result.
|
|
|
|
|
*
|
2011-05-23 11:31:01 +02:00
|
|
|
* Completing a Future using 'Promise << Future' will also suspend execution until the
|
2011-04-23 12:57:28 -06:00
|
|
|
* value of the other Future is available.
|
|
|
|
|
*
|
|
|
|
|
* The Delimited Continuations compiler plugin must be enabled in order to use this method.
|
|
|
|
|
*/
|
2011-12-11 00:40:52 +01:00
|
|
|
def flow[A](body: ⇒ A @cps[Future[Any]])(implicit dispatcher: MessageDispatcher): Future[A] = {
|
|
|
|
|
val future = Promise[A]
|
2011-08-05 08:55:36 -06:00
|
|
|
dispatchTask({ () ⇒
|
2011-12-12 20:09:26 +01:00
|
|
|
(reify(body) foreachFull (future success, future failure): Future[Any]) onFailure {
|
2011-12-12 17:25:34 +01:00
|
|
|
case e: Exception ⇒ future failure e
|
2011-08-05 08:55:36 -06:00
|
|
|
}
|
|
|
|
|
}, true)
|
2011-05-02 16:56:42 -06:00
|
|
|
future
|
2011-05-02 14:44:40 -06:00
|
|
|
}
|
2011-10-07 15:22:36 +02:00
|
|
|
|
2011-10-27 20:04:32 -06:00
|
|
|
/**
|
2011-11-05 12:48:19 -06:00
|
|
|
* Assures that any Future tasks initiated in the current thread will be
|
|
|
|
|
* executed asynchronously, including any tasks currently queued to be
|
|
|
|
|
* executed in the current thread. This is needed if the current task may
|
|
|
|
|
* block, causing delays in executing the remaining tasks which in some
|
|
|
|
|
* cases may cause a deadlock.
|
2011-10-27 20:04:32 -06:00
|
|
|
*
|
2011-12-14 01:54:33 +01:00
|
|
|
* Note: Calling 'Await.result(future)' or 'Await.ready(future)' will automatically trigger this method.
|
2011-11-05 12:48:19 -06:00
|
|
|
*
|
|
|
|
|
* For example, in the following block of code the call to 'latch.open'
|
|
|
|
|
* might not be executed until after the call to 'latch.await', causing
|
|
|
|
|
* a deadlock. By adding 'Future.blocking()' the call to 'latch.open'
|
|
|
|
|
* will instead be dispatched separately from the current block, allowing
|
|
|
|
|
* it to be run in parallel:
|
2011-10-27 20:04:32 -06:00
|
|
|
* <pre>
|
|
|
|
|
* val latch = new StandardLatch
|
|
|
|
|
* val future = Future() map { _ ⇒
|
2011-11-05 12:48:19 -06:00
|
|
|
* Future.blocking()
|
2011-10-27 20:04:32 -06:00
|
|
|
* val nested = Future()
|
|
|
|
|
* nested foreach (_ ⇒ latch.open)
|
|
|
|
|
* latch.await
|
|
|
|
|
* }
|
|
|
|
|
* </pre>
|
|
|
|
|
*/
|
2011-12-11 00:40:52 +01:00
|
|
|
def blocking(implicit dispatcher: MessageDispatcher): Unit =
|
2011-10-27 20:04:32 -06:00
|
|
|
_taskStack.get match {
|
|
|
|
|
case Some(taskStack) if taskStack.nonEmpty ⇒
|
|
|
|
|
val tasks = taskStack.elems
|
|
|
|
|
taskStack.clear()
|
2011-11-05 12:48:19 -06:00
|
|
|
_taskStack set None
|
2011-10-27 20:04:32 -06:00
|
|
|
dispatchTask(() ⇒ _taskStack.get.get.elems = tasks, true)
|
2011-11-05 12:48:19 -06:00
|
|
|
case Some(_) ⇒ _taskStack set None
|
|
|
|
|
case _ ⇒ // already None
|
2011-10-27 20:04:32 -06:00
|
|
|
}
|
|
|
|
|
|
2011-08-05 08:55:36 -06:00
|
|
|
private val _taskStack = new ThreadLocal[Option[Stack[() ⇒ Unit]]]() {
|
|
|
|
|
override def initialValue = None
|
|
|
|
|
}
|
|
|
|
|
|
2011-12-23 16:47:40 +01:00
|
|
|
/**
|
|
|
|
|
* Internal API, do not call
|
|
|
|
|
*/
|
2011-08-05 08:55:36 -06:00
|
|
|
private[akka] def dispatchTask(task: () ⇒ Unit, force: Boolean = false)(implicit dispatcher: MessageDispatcher): Unit =
|
|
|
|
|
_taskStack.get match {
|
|
|
|
|
case Some(taskStack) if !force ⇒ taskStack push task
|
|
|
|
|
case _ ⇒
|
|
|
|
|
dispatcher dispatchTask { () ⇒
|
|
|
|
|
try {
|
|
|
|
|
val taskStack = Stack[() ⇒ Unit](task)
|
|
|
|
|
_taskStack set Some(taskStack)
|
|
|
|
|
while (taskStack.nonEmpty) {
|
|
|
|
|
val next = taskStack.pop()
|
|
|
|
|
try {
|
|
|
|
|
next.apply()
|
|
|
|
|
} catch {
|
2011-11-30 10:20:57 +01:00
|
|
|
case e ⇒
|
|
|
|
|
// FIXME catching all and continue isn't good for OOME, ticket #1418
|
|
|
|
|
dispatcher.prerequisites.eventStream.publish(Error(e, "Future.dispatchTask", "Failed to dispatch task, due to: " + e.getMessage))
|
2011-08-05 08:55:36 -06:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} finally { _taskStack set None }
|
|
|
|
|
}
|
|
|
|
|
}
|
2011-02-28 11:47:39 -07:00
|
|
|
}
|
|
|
|
|
|
2011-12-12 22:50:08 +01:00
|
|
|
sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
|
2011-03-22 22:12:16 +01:00
|
|
|
|
2011-08-02 10:19:49 -06:00
|
|
|
implicit def dispatcher: MessageDispatcher
|
2011-03-22 22:12:16 +01:00
|
|
|
|
2011-12-23 16:47:40 +01:00
|
|
|
protected final def resolve[X](source: Either[Throwable, X]): Either[Throwable, X] = source match {
|
|
|
|
|
case Left(t: scala.runtime.NonLocalReturnControl[_]) ⇒ Right(t.value.asInstanceOf[X])
|
2011-12-25 18:30:44 +01:00
|
|
|
case Left(t: InterruptedException) ⇒ Left(new RuntimeException("Boxed InterruptedException", t))
|
2011-12-25 16:11:01 +01:00
|
|
|
case _ ⇒ source
|
2011-12-23 16:47:40 +01:00
|
|
|
}
|
|
|
|
|
|
2011-03-22 22:12:16 +01:00
|
|
|
/**
|
2011-04-23 12:57:28 -06:00
|
|
|
* For use only within a Future.flow block or another compatible Delimited Continuations reset block.
|
|
|
|
|
*
|
|
|
|
|
* Returns the result of this Future without blocking, by suspending execution and storing it as a
|
|
|
|
|
* continuation until the result is available.
|
2011-03-22 22:12:16 +01:00
|
|
|
*/
|
2011-12-11 00:40:52 +01:00
|
|
|
def apply(): T @cps[Future[Any]] = shift(this flatMap (_: T ⇒ Future[Any]))
|
2011-04-27 15:34:42 +02:00
|
|
|
|
2011-02-23 19:57:42 -07:00
|
|
|
/**
|
|
|
|
|
* Tests whether this Future has been completed.
|
|
|
|
|
*/
|
2011-02-13 19:59:54 -07:00
|
|
|
final def isCompleted: Boolean = value.isDefined
|
2010-10-31 19:27:55 +01:00
|
|
|
|
2011-02-23 19:57:42 -07:00
|
|
|
/**
|
|
|
|
|
* The contained value of this Future. Before this Future is completed
|
|
|
|
|
* the value will be None. After completion the value will be Some(Right(t))
|
|
|
|
|
* if it contains a valid result, or Some(Left(error)) if it contains
|
|
|
|
|
* an exception.
|
|
|
|
|
*/
|
2011-02-11 14:46:39 -07:00
|
|
|
def value: Option[Either[Throwable, T]]
|
|
|
|
|
|
2011-02-23 19:57:42 -07:00
|
|
|
/**
|
|
|
|
|
* When this Future is completed, apply the provided function to the
|
|
|
|
|
* Future. If the Future has already been completed, this will apply
|
2011-12-11 00:40:52 +01:00
|
|
|
* immediately. Multiple
|
2011-09-16 21:19:41 +02:00
|
|
|
* callbacks may be registered; there is no guarantee that they will be
|
|
|
|
|
* executed in a particular order.
|
2011-02-23 19:57:42 -07:00
|
|
|
*/
|
2011-12-14 01:24:55 +01:00
|
|
|
def onComplete(func: Either[Throwable, T] ⇒ Unit): this.type
|
2010-11-12 12:11:53 +01:00
|
|
|
|
2011-02-13 21:11:37 -07:00
|
|
|
/**
|
2011-04-23 08:11:31 +02:00
|
|
|
* When the future is completed with a valid result, apply the provided
|
2011-09-16 21:19:41 +02:00
|
|
|
* PartialFunction to the result. See `onComplete` for more details.
|
2011-03-30 21:19:26 +02:00
|
|
|
* <pre>
|
2011-12-12 20:09:26 +01:00
|
|
|
* future onSuccess {
|
2011-08-06 14:03:04 +02:00
|
|
|
* case Foo ⇒ target ! "foo"
|
|
|
|
|
* case Bar ⇒ target ! "bar"
|
2011-06-02 13:33:49 -07:00
|
|
|
* }
|
2011-03-30 21:19:26 +02:00
|
|
|
* </pre>
|
2011-02-13 21:11:37 -07:00
|
|
|
*/
|
2011-12-14 01:24:55 +01:00
|
|
|
final def onSuccess[U](pf: PartialFunction[T, U]): this.type = onComplete {
|
|
|
|
|
case Right(r) if pf isDefinedAt r ⇒ pf(r)
|
|
|
|
|
case _ ⇒
|
2011-06-02 13:33:49 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* When the future is completed with an exception, apply the provided
|
2011-09-16 21:19:41 +02:00
|
|
|
* PartialFunction to the exception. See `onComplete` for more details.
|
2011-06-02 13:33:49 -07:00
|
|
|
* <pre>
|
2011-12-12 20:09:26 +01:00
|
|
|
* future onFailure {
|
2011-08-06 14:03:04 +02:00
|
|
|
* case NumberFormatException ⇒ target ! "wrong format"
|
2011-06-02 13:33:49 -07:00
|
|
|
* }
|
|
|
|
|
* </pre>
|
|
|
|
|
*/
|
2011-12-14 01:24:55 +01:00
|
|
|
final def onFailure[U](pf: PartialFunction[Throwable, U]): this.type = onComplete {
|
|
|
|
|
case Left(ex) if pf isDefinedAt ex ⇒ pf(ex)
|
|
|
|
|
case _ ⇒
|
2011-02-13 21:11:37 -07:00
|
|
|
}
|
|
|
|
|
|
2011-12-14 00:19:57 +01:00
|
|
|
/**
|
|
|
|
|
* Returns a failure projection of this Future
|
|
|
|
|
* If `this` becomes completed with a failure, that failure will be the success of the returned Future
|
|
|
|
|
* If `this` becomes completed with a result, then the returned future will fail with a NoSuchElementException
|
|
|
|
|
*/
|
|
|
|
|
final def failed: Future[Throwable] = {
|
|
|
|
|
val p = Promise[Throwable]()
|
2011-12-14 01:24:55 +01:00
|
|
|
this.onComplete {
|
2011-12-14 00:19:57 +01:00
|
|
|
case Left(t) ⇒ p success t
|
|
|
|
|
case Right(r) ⇒ p failure new NoSuchElementException("Future.failed not completed with a throwable. Instead completed with: " + r)
|
2011-12-14 01:24:55 +01:00
|
|
|
}
|
2011-12-14 00:19:57 +01:00
|
|
|
p
|
|
|
|
|
}
|
|
|
|
|
|
2011-12-11 00:40:52 +01:00
|
|
|
/**
|
|
|
|
|
* Creates a Future that will be the result of the first completed Future of this and the Future that was passed into this.
|
|
|
|
|
* This is semantically the same as: Future.firstCompletedOf(Seq(this, that))
|
|
|
|
|
*/
|
2011-12-11 01:29:46 +01:00
|
|
|
//FIXME implement as The result of any of the Futures, or if oth failed, the first failure
|
2011-12-11 00:40:52 +01:00
|
|
|
def orElse[A >: T](that: Future[A]): Future[A] = Future.firstCompletedOf(List(this, that)) //TODO Optimize
|
2011-06-20 16:22:46 -06:00
|
|
|
|
2011-04-25 16:14:07 -06:00
|
|
|
/**
|
|
|
|
|
* Creates a new Future that will handle any matching Throwable that this
|
|
|
|
|
* Future might contain. If there is no match, or if this Future contains
|
|
|
|
|
* a valid result then the new Future will contain the same.
|
|
|
|
|
* Example:
|
|
|
|
|
* <pre>
|
2011-09-01 14:58:18 +02:00
|
|
|
* Future(6 / 0) recover { case e: ArithmeticException ⇒ 0 } // result: 0
|
|
|
|
|
* Future(6 / 0) recover { case e: NotFoundException ⇒ 0 } // result: exception
|
|
|
|
|
* Future(6 / 2) recover { case e: ArithmeticException ⇒ 0 } // result: 3
|
2011-04-25 16:14:07 -06:00
|
|
|
* </pre>
|
|
|
|
|
*/
|
2011-12-11 00:40:52 +01:00
|
|
|
final def recover[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = {
|
|
|
|
|
val future = Promise[A]()
|
2011-08-06 14:03:04 +02:00
|
|
|
onComplete {
|
2011-12-14 01:24:55 +01:00
|
|
|
case Left(e) if pf isDefinedAt e ⇒ future.complete(try { Right(pf(e)) } catch { case x: Exception ⇒ Left(x) })
|
|
|
|
|
case otherwise ⇒ future complete otherwise
|
2011-07-27 22:20:02 -06:00
|
|
|
}
|
|
|
|
|
future
|
2011-06-18 20:13:13 -06:00
|
|
|
}
|
2011-04-25 16:14:07 -06:00
|
|
|
|
2011-02-23 19:57:42 -07:00
|
|
|
/**
|
|
|
|
|
* Creates a new Future by applying a function to the successful result of
|
|
|
|
|
* this Future. If this Future is completed with an exception then the new
|
|
|
|
|
* Future will also contain this exception.
|
2011-03-30 21:19:26 +02:00
|
|
|
* Example:
|
|
|
|
|
* <pre>
|
|
|
|
|
* val future1 = for {
|
2011-06-13 13:43:21 +02:00
|
|
|
* a: Int <- actor ? "Hello" // returns 5
|
|
|
|
|
* b: String <- actor ? a // returns "10"
|
|
|
|
|
* c: String <- actor ? 7 // returns "14"
|
2011-03-30 21:19:26 +02:00
|
|
|
* } yield b + "-" + c
|
|
|
|
|
* </pre>
|
2011-02-23 19:57:42 -07:00
|
|
|
*/
|
2011-12-11 00:40:52 +01:00
|
|
|
final def map[A](f: T ⇒ A): Future[A] = {
|
|
|
|
|
val future = Promise[A]()
|
2011-08-06 14:03:04 +02:00
|
|
|
onComplete {
|
2011-12-14 01:24:55 +01:00
|
|
|
case l: Left[_, _] ⇒ future complete l.asInstanceOf[Either[Throwable, A]]
|
|
|
|
|
case Right(res) ⇒
|
|
|
|
|
future complete (try {
|
|
|
|
|
Right(f(res))
|
|
|
|
|
} catch {
|
|
|
|
|
case e: Exception ⇒
|
|
|
|
|
dispatcher.prerequisites.eventStream.publish(Error(e, "Future.map", e.getMessage))
|
|
|
|
|
Left(e)
|
|
|
|
|
})
|
2011-07-27 22:20:02 -06:00
|
|
|
}
|
|
|
|
|
future
|
2011-06-18 20:13:13 -06:00
|
|
|
}
|
2011-02-21 17:24:51 -07:00
|
|
|
|
2011-06-13 22:36:46 +02:00
|
|
|
/**
|
|
|
|
|
* Creates a new Future[A] which is completed with this Future's result if
|
|
|
|
|
* that conforms to A's erased type or a ClassCastException otherwise.
|
|
|
|
|
*/
|
2011-12-11 00:40:52 +01:00
|
|
|
final def mapTo[A](implicit m: Manifest[A]): Future[A] = {
|
|
|
|
|
val fa = Promise[A]()
|
2011-12-14 01:24:55 +01:00
|
|
|
onComplete {
|
|
|
|
|
case l: Left[_, _] ⇒ fa complete l.asInstanceOf[Either[Throwable, A]]
|
|
|
|
|
case Right(t) ⇒
|
|
|
|
|
fa complete (try {
|
|
|
|
|
Right(BoxedType(m.erasure).cast(t).asInstanceOf[A])
|
|
|
|
|
} catch {
|
|
|
|
|
case e: ClassCastException ⇒ Left(e)
|
|
|
|
|
})
|
2011-07-27 22:20:02 -06:00
|
|
|
}
|
|
|
|
|
fa
|
2011-06-18 20:13:13 -06:00
|
|
|
}
|
2011-06-13 22:36:46 +02:00
|
|
|
|
2011-02-23 19:57:42 -07:00
|
|
|
/**
|
|
|
|
|
* Creates a new Future by applying a function to the successful result of
|
|
|
|
|
* this Future, and returns the result of the function as the new Future.
|
|
|
|
|
* If this Future is completed with an exception then the new Future will
|
|
|
|
|
* also contain this exception.
|
2011-03-30 21:19:26 +02:00
|
|
|
* Example:
|
|
|
|
|
* <pre>
|
|
|
|
|
* val future1 = for {
|
2011-06-13 13:43:21 +02:00
|
|
|
* a: Int <- actor ? "Hello" // returns 5
|
|
|
|
|
* b: String <- actor ? a // returns "10"
|
|
|
|
|
* c: String <- actor ? 7 // returns "14"
|
2011-03-30 21:19:26 +02:00
|
|
|
* } yield b + "-" + c
|
|
|
|
|
* </pre>
|
2011-02-23 19:57:42 -07:00
|
|
|
*/
|
2011-12-11 00:40:52 +01:00
|
|
|
final def flatMap[A](f: T ⇒ Future[A]): Future[A] = {
|
2011-12-14 01:24:55 +01:00
|
|
|
val p = Promise[A]()
|
2011-08-06 14:03:04 +02:00
|
|
|
|
2011-07-27 22:20:02 -06:00
|
|
|
onComplete {
|
2011-12-14 01:24:55 +01:00
|
|
|
case l: Left[_, _] ⇒ p complete l.asInstanceOf[Either[Throwable, A]]
|
|
|
|
|
case Right(r) ⇒
|
|
|
|
|
try {
|
|
|
|
|
p completeWith f(r)
|
2011-08-06 14:03:04 +02:00
|
|
|
} catch {
|
|
|
|
|
case e: Exception ⇒
|
2011-12-14 01:24:55 +01:00
|
|
|
p complete Left(e)
|
2011-11-18 11:59:43 +01:00
|
|
|
dispatcher.prerequisites.eventStream.publish(Error(e, "Future.flatMap", e.getMessage))
|
2011-06-18 20:13:13 -06:00
|
|
|
}
|
2011-07-27 22:20:02 -06:00
|
|
|
}
|
2011-12-14 01:24:55 +01:00
|
|
|
p
|
2011-06-18 20:13:13 -06:00
|
|
|
}
|
2011-02-21 17:24:51 -07:00
|
|
|
|
2011-05-26 19:33:03 +02:00
|
|
|
final def foreach(f: T ⇒ Unit): Unit = onComplete {
|
2011-12-14 01:24:55 +01:00
|
|
|
case Right(r) ⇒ f(r)
|
|
|
|
|
case _ ⇒
|
2011-05-26 19:33:03 +02:00
|
|
|
}
|
|
|
|
|
|
2011-12-11 00:40:52 +01:00
|
|
|
final def withFilter(p: T ⇒ Boolean) = new FutureWithFilter[T](this, p)
|
2011-05-26 19:33:03 +02:00
|
|
|
|
2011-12-11 00:40:52 +01:00
|
|
|
final class FutureWithFilter[+A](self: Future[A], p: A ⇒ Boolean) {
|
2011-05-26 19:33:03 +02:00
|
|
|
def foreach(f: A ⇒ Unit): Unit = self filter p foreach f
|
|
|
|
|
def map[B](f: A ⇒ B): Future[B] = self filter p map f
|
|
|
|
|
def flatMap[B](f: A ⇒ Future[B]): Future[B] = self filter p flatMap f
|
|
|
|
|
def withFilter(q: A ⇒ Boolean): FutureWithFilter[A] = new FutureWithFilter[A](self, x ⇒ p(x) && q(x))
|
2011-02-21 17:24:51 -07:00
|
|
|
}
|
|
|
|
|
|
2011-12-14 01:24:55 +01:00
|
|
|
final def filter(pred: T ⇒ Boolean): Future[T] = {
|
|
|
|
|
val p = Promise[T]()
|
2011-08-06 14:03:04 +02:00
|
|
|
onComplete {
|
2011-12-14 01:24:55 +01:00
|
|
|
case l: Left[_, _] ⇒ p complete l.asInstanceOf[Either[Throwable, T]]
|
|
|
|
|
case r @ Right(res) ⇒ p complete (try {
|
|
|
|
|
if (pred(res)) r else Left(new MatchError(res))
|
|
|
|
|
} catch {
|
|
|
|
|
case e: Exception ⇒
|
|
|
|
|
dispatcher.prerequisites.eventStream.publish(Error(e, "Future.filter", e.getMessage))
|
|
|
|
|
Left(e)
|
|
|
|
|
})
|
2011-07-27 22:20:02 -06:00
|
|
|
}
|
2011-12-14 01:24:55 +01:00
|
|
|
p
|
2011-06-18 20:13:13 -06:00
|
|
|
}
|
2011-08-05 08:55:36 -06:00
|
|
|
}
|
2010-11-12 14:04:06 +01:00
|
|
|
|
2011-05-03 18:48:42 -06:00
|
|
|
object Promise {
|
2011-06-17 09:54:49 +02:00
|
|
|
/**
|
2011-12-12 17:25:34 +01:00
|
|
|
* Creates a non-completed Promise
|
2011-12-12 14:39:10 +01:00
|
|
|
*
|
|
|
|
|
* Scala API
|
2011-06-17 09:54:49 +02:00
|
|
|
*/
|
2011-12-11 00:40:52 +01:00
|
|
|
def apply[A]()(implicit dispatcher: MessageDispatcher): Promise[A] = new DefaultPromise[A]()
|
2011-12-12 17:25:34 +01:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Creates an already completed Promise with the specified exception
|
|
|
|
|
*/
|
|
|
|
|
def failed[T](exception: Throwable)(implicit dispatcher: MessageDispatcher): Promise[T] = new KeptPromise[T](Left(exception))
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Creates an already completed Promise with the specified result
|
|
|
|
|
*/
|
2011-12-12 20:56:04 +01:00
|
|
|
def successful[T](result: T)(implicit dispatcher: MessageDispatcher): Promise[T] = new KeptPromise[T](Right(result))
|
2009-05-25 14:48:43 +02:00
|
|
|
}
|
|
|
|
|
|
2011-03-18 17:37:25 +01:00
|
|
|
/**
|
2011-03-30 21:19:26 +02:00
|
|
|
* Essentially this is the Promise (or write-side) of a Future (read-side).
|
2011-03-18 17:37:25 +01:00
|
|
|
*/
|
2011-05-23 11:31:01 +02:00
|
|
|
trait Promise[T] extends Future[T] {
|
2011-12-12 17:25:34 +01:00
|
|
|
|
2011-12-14 00:19:57 +01:00
|
|
|
/**
|
|
|
|
|
* Returns the Future associated with this Promise
|
|
|
|
|
*/
|
|
|
|
|
def future: Future[T] = this
|
|
|
|
|
|
2011-03-22 22:12:16 +01:00
|
|
|
/**
|
2011-12-12 17:25:34 +01:00
|
|
|
* Completes this Promise with the specified result, if not already completed.
|
|
|
|
|
* @return whether this call completed the Promise
|
|
|
|
|
*/
|
|
|
|
|
def tryComplete(value: Either[Throwable, T]): Boolean
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Completes this Promise with the specified result, if not already completed.
|
2011-03-30 21:19:26 +02:00
|
|
|
* @return this
|
2011-03-22 22:12:16 +01:00
|
|
|
*/
|
2011-12-12 17:25:34 +01:00
|
|
|
final def complete(value: Either[Throwable, T]): this.type = { tryComplete(value); this }
|
2011-03-22 22:12:16 +01:00
|
|
|
|
|
|
|
|
/**
|
2011-12-12 17:25:34 +01:00
|
|
|
* Completes this Promise with the specified result, if not already completed.
|
2011-03-30 21:19:26 +02:00
|
|
|
* @return this
|
2011-03-22 22:12:16 +01:00
|
|
|
*/
|
2011-12-12 17:25:34 +01:00
|
|
|
final def success(result: T): this.type = complete(Right(result))
|
2011-03-22 22:12:16 +01:00
|
|
|
|
|
|
|
|
/**
|
2011-12-12 17:25:34 +01:00
|
|
|
* Completes this Promise with the specified exception, if not already completed.
|
2011-03-30 21:19:26 +02:00
|
|
|
* @return this
|
2011-03-22 22:12:16 +01:00
|
|
|
*/
|
2011-12-12 17:25:34 +01:00
|
|
|
final def failure(exception: Throwable): this.type = complete(Left(exception))
|
2011-03-22 22:12:16 +01:00
|
|
|
|
|
|
|
|
/**
|
2011-12-12 17:25:34 +01:00
|
|
|
* Completes this Promise with the specified other Future, when that Future is completed,
|
|
|
|
|
* unless this Promise has already been completed.
|
2011-03-30 21:19:26 +02:00
|
|
|
* @return this.
|
2011-03-22 22:12:16 +01:00
|
|
|
*/
|
2011-06-13 22:36:46 +02:00
|
|
|
final def completeWith(other: Future[T]): this.type = {
|
2011-12-14 01:24:55 +01:00
|
|
|
other onComplete { complete(_) }
|
2011-03-16 21:17:38 +01:00
|
|
|
this
|
2010-11-12 12:52:08 +01:00
|
|
|
}
|
2011-03-22 22:12:16 +01:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
final def <<(value: T): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] ⇒ Future[Any]) ⇒ cont(complete(Right(value))) }
|
2011-03-22 22:12:16 +01:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
final def <<(other: Future[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] ⇒ Future[Any]) ⇒
|
2011-12-11 00:40:52 +01:00
|
|
|
val fr = Promise[Any]()
|
2011-12-14 01:24:55 +01:00
|
|
|
val thisPromise = this
|
|
|
|
|
thisPromise completeWith other onComplete { v ⇒
|
2011-04-23 09:14:20 -06:00
|
|
|
try {
|
2011-12-14 01:24:55 +01:00
|
|
|
fr completeWith cont(thisPromise)
|
2011-04-23 09:14:20 -06:00
|
|
|
} catch {
|
2011-05-18 17:25:30 +02:00
|
|
|
case e: Exception ⇒
|
2011-11-18 11:59:43 +01:00
|
|
|
dispatcher.prerequisites.eventStream.publish(Error(e, "Promise.completeWith", e.getMessage))
|
2011-12-12 17:25:34 +01:00
|
|
|
fr failure e
|
2011-04-23 09:14:20 -06:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
fr
|
2011-04-21 15:12:47 -06:00
|
|
|
}
|
2011-03-22 22:12:16 +01:00
|
|
|
|
2011-05-19 14:43:58 -06:00
|
|
|
final def <<(stream: PromiseStreamOut[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] ⇒ Future[Any]) ⇒
|
2011-12-11 00:40:52 +01:00
|
|
|
val fr = Promise[Any]()
|
2011-12-14 01:24:55 +01:00
|
|
|
val f = stream.dequeue(this)
|
|
|
|
|
f.onComplete { _ ⇒
|
2011-05-19 14:43:58 -06:00
|
|
|
try {
|
|
|
|
|
fr completeWith cont(f)
|
|
|
|
|
} catch {
|
|
|
|
|
case e: Exception ⇒
|
2011-11-18 11:59:43 +01:00
|
|
|
dispatcher.prerequisites.eventStream.publish(Error(e, "Promise.completeWith", e.getMessage))
|
2011-12-12 17:25:34 +01:00
|
|
|
fr failure e
|
2011-05-19 14:43:58 -06:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
fr
|
|
|
|
|
}
|
2009-05-25 14:48:43 +02:00
|
|
|
}
|
|
|
|
|
|
2011-08-05 22:27:16 +02:00
|
|
|
//Companion object to FState, just to provide a cheap, immutable default entry
|
2011-10-29 00:36:42 +02:00
|
|
|
private[dispatch] object DefaultPromise {
|
2011-10-31 22:20:21 +01:00
|
|
|
def EmptyPending[T](): FState[T] = emptyPendingValue.asInstanceOf[FState[T]]
|
2011-08-05 22:27:16 +02:00
|
|
|
|
2011-10-12 16:29:33 +02:00
|
|
|
/**
|
|
|
|
|
* Represents the internal state of the DefaultCompletableFuture
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
sealed trait FState[+T] { def value: Option[Either[Throwable, T]] }
|
2011-12-14 01:24:55 +01:00
|
|
|
case class Pending[T](listeners: List[Either[Throwable, T] ⇒ Unit] = Nil) extends FState[T] {
|
2011-10-12 16:29:33 +02:00
|
|
|
def value: Option[Either[Throwable, T]] = None
|
|
|
|
|
}
|
|
|
|
|
case class Success[T](value: Option[Either[Throwable, T]] = None) extends FState[T] {
|
|
|
|
|
def result: T = value.get.right.get
|
|
|
|
|
}
|
|
|
|
|
case class Failure[T](value: Option[Either[Throwable, T]] = None) extends FState[T] {
|
|
|
|
|
def exception: Throwable = value.get.left.get
|
|
|
|
|
}
|
2011-10-31 22:20:21 +01:00
|
|
|
private val emptyPendingValue = Pending[Nothing](Nil)
|
2011-10-12 16:29:33 +02:00
|
|
|
}
|
2011-08-05 22:27:16 +02:00
|
|
|
|
2011-02-28 10:17:44 -07:00
|
|
|
/**
|
2011-03-30 21:19:26 +02:00
|
|
|
* The default concrete Future implementation.
|
2011-02-28 10:17:44 -07:00
|
|
|
*/
|
2011-12-11 00:40:52 +01:00
|
|
|
class DefaultPromise[T](implicit val dispatcher: MessageDispatcher) extends AbstractPromise with Promise[T] {
|
2011-06-17 06:51:40 -06:00
|
|
|
self ⇒
|
2010-10-31 19:27:55 +01:00
|
|
|
|
2011-12-11 00:40:52 +01:00
|
|
|
import DefaultPromise.{ FState, Success, Failure, Pending }
|
2011-10-13 13:41:44 +02:00
|
|
|
|
2011-12-11 01:29:46 +01:00
|
|
|
protected final def tryAwait(atMost: Duration): Boolean = {
|
2011-12-11 00:40:52 +01:00
|
|
|
Future.blocking
|
2011-06-18 23:23:47 -06:00
|
|
|
|
2011-12-11 00:40:52 +01:00
|
|
|
@tailrec
|
|
|
|
|
def awaitUnsafe(waitTimeNanos: Long): Boolean = {
|
|
|
|
|
if (value.isEmpty && waitTimeNanos > 0) {
|
|
|
|
|
val ms = NANOSECONDS.toMillis(waitTimeNanos)
|
|
|
|
|
val ns = (waitTimeNanos % 1000000l).toInt //As per object.wait spec
|
2011-12-11 01:29:46 +01:00
|
|
|
val start = System.nanoTime()
|
2011-12-11 00:40:52 +01:00
|
|
|
try { synchronized { if (value.isEmpty) wait(ms, ns) } } catch { case e: InterruptedException ⇒ }
|
|
|
|
|
|
2011-12-11 01:29:46 +01:00
|
|
|
awaitUnsafe(waitTimeNanos - (System.nanoTime() - start))
|
|
|
|
|
} else
|
2011-12-11 00:40:52 +01:00
|
|
|
value.isDefined
|
2011-02-07 18:59:49 +01:00
|
|
|
}
|
2011-12-11 01:29:46 +01:00
|
|
|
awaitUnsafe(if (atMost.isFinite) atMost.toNanos else Long.MaxValue)
|
|
|
|
|
}
|
2011-10-27 20:04:32 -06:00
|
|
|
|
2011-12-12 22:50:08 +01:00
|
|
|
def ready(atMost: Duration)(implicit permit: CanAwait): this.type =
|
2011-12-11 01:29:46 +01:00
|
|
|
if (value.isDefined || tryAwait(atMost)) this
|
|
|
|
|
else throw new TimeoutException("Futures timed out after [" + atMost.toMillis + "] milliseconds")
|
2011-08-06 20:05:43 +02:00
|
|
|
|
2011-12-12 22:50:08 +01:00
|
|
|
def result(atMost: Duration)(implicit permit: CanAwait): T =
|
|
|
|
|
ready(atMost).value.get match {
|
2011-12-11 14:06:30 +01:00
|
|
|
case Left(e) ⇒ throw e
|
|
|
|
|
case Right(r) ⇒ r
|
|
|
|
|
}
|
2009-05-25 14:48:43 +02:00
|
|
|
|
2011-10-04 21:17:01 +02:00
|
|
|
def value: Option[Either[Throwable, T]] = getState.value
|
|
|
|
|
|
|
|
|
|
@inline
|
2011-12-11 00:40:52 +01:00
|
|
|
private[this] final def updater = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]]
|
2011-10-04 21:17:01 +02:00
|
|
|
|
|
|
|
|
@inline
|
2011-12-11 00:40:52 +01:00
|
|
|
protected final def updateState(oldState: FState[T], newState: FState[T]): Boolean = updater.compareAndSet(this, oldState, newState)
|
2011-10-12 16:29:33 +02:00
|
|
|
|
2011-12-11 00:40:52 +01:00
|
|
|
@inline
|
|
|
|
|
protected final def getState: FState[T] = updater.get(this)
|
2010-11-12 12:11:53 +01:00
|
|
|
|
2011-12-12 17:25:34 +01:00
|
|
|
def tryComplete(value: Either[Throwable, T]): Boolean = {
|
2011-12-14 01:24:55 +01:00
|
|
|
val callbacks: List[Either[Throwable, T] ⇒ Unit] = {
|
2011-07-28 09:57:45 -06:00
|
|
|
try {
|
2011-08-05 22:27:16 +02:00
|
|
|
@tailrec
|
2011-12-23 16:47:40 +01:00
|
|
|
def tryComplete(v: Either[Throwable, T]): List[Either[Throwable, T] ⇒ Unit] = {
|
|
|
|
|
getState match {
|
|
|
|
|
case cur @ Pending(listeners) ⇒
|
|
|
|
|
if (updateState(cur, if (v.isLeft) Failure(Some(v)) else Success(Some(v)))) listeners
|
|
|
|
|
else tryComplete(v)
|
2011-12-12 17:25:34 +01:00
|
|
|
case _ ⇒ null
|
2011-07-28 09:57:45 -06:00
|
|
|
}
|
2011-04-29 13:22:39 +02:00
|
|
|
}
|
2011-12-23 16:47:40 +01:00
|
|
|
tryComplete(resolve(value))
|
2011-07-28 09:57:45 -06:00
|
|
|
} finally {
|
2011-10-04 21:17:01 +02:00
|
|
|
synchronized { notifyAll() } //Notify any evil blockers
|
2011-04-28 20:53:45 -06:00
|
|
|
}
|
|
|
|
|
}
|
2011-02-28 10:17:44 -07:00
|
|
|
|
2011-12-12 17:25:34 +01:00
|
|
|
callbacks match {
|
|
|
|
|
case null ⇒ false
|
|
|
|
|
case cs if cs.isEmpty ⇒ true
|
2011-12-14 01:45:20 +01:00
|
|
|
case cs ⇒ Future.dispatchTask(() ⇒ cs.foreach(f ⇒ notifyCompleted(f, value))); true
|
2011-12-12 17:25:34 +01:00
|
|
|
}
|
2010-11-12 12:11:53 +01:00
|
|
|
}
|
|
|
|
|
|
2011-12-14 01:24:55 +01:00
|
|
|
def onComplete(func: Either[Throwable, T] ⇒ Unit): this.type = {
|
2011-08-05 22:27:16 +02:00
|
|
|
@tailrec //Returns whether the future has already been completed or not
|
|
|
|
|
def tryAddCallback(): Boolean = {
|
2011-10-04 21:17:01 +02:00
|
|
|
val cur = getState
|
2011-10-12 16:29:33 +02:00
|
|
|
cur match {
|
|
|
|
|
case _: Success[_] | _: Failure[_] ⇒ true
|
|
|
|
|
case p: Pending[_] ⇒
|
|
|
|
|
val pt = p.asInstanceOf[Pending[T]]
|
2011-12-11 00:40:52 +01:00
|
|
|
if (updateState(pt, pt.copy(listeners = func :: pt.listeners))) false else tryAddCallback()
|
2011-10-12 16:29:33 +02:00
|
|
|
}
|
2011-02-28 10:17:44 -07:00
|
|
|
}
|
|
|
|
|
|
2011-12-14 01:45:20 +01:00
|
|
|
if (tryAddCallback()) {
|
|
|
|
|
val result = value.get
|
|
|
|
|
Future.dispatchTask(() ⇒ notifyCompleted(func, result))
|
|
|
|
|
}
|
2011-02-28 10:17:44 -07:00
|
|
|
|
2010-11-12 12:11:53 +01:00
|
|
|
this
|
2009-05-25 14:48:43 +02:00
|
|
|
}
|
|
|
|
|
|
2011-12-14 01:45:20 +01:00
|
|
|
private final def notifyCompleted(func: Either[Throwable, T] ⇒ Unit, result: Either[Throwable, T]) {
|
|
|
|
|
try { func(result) } catch { case e ⇒ dispatcher.prerequisites.eventStream.publish(Error(e, "Future", "Future onComplete-callback raised an exception")) }
|
2010-11-12 12:11:53 +01:00
|
|
|
}
|
2009-05-25 14:48:43 +02:00
|
|
|
}
|
2011-03-18 17:26:53 +01:00
|
|
|
|
2011-03-18 17:37:25 +01:00
|
|
|
/**
|
|
|
|
|
* An already completed Future is seeded with it's result at creation, is useful for when you are participating in
|
|
|
|
|
* a Future-composition but you already have a value to contribute.
|
|
|
|
|
*/
|
2011-12-11 00:40:52 +01:00
|
|
|
final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val dispatcher: MessageDispatcher) extends Promise[T] {
|
2011-12-23 16:47:40 +01:00
|
|
|
val value = Some(resolve(suppliedValue))
|
2011-03-18 17:26:53 +01:00
|
|
|
|
2011-12-12 17:25:34 +01:00
|
|
|
def tryComplete(value: Either[Throwable, T]): Boolean = true
|
2011-12-14 01:24:55 +01:00
|
|
|
def onComplete(func: Either[Throwable, T] ⇒ Unit): this.type = {
|
|
|
|
|
val completedAs = value.get
|
|
|
|
|
Future dispatchTask (() ⇒ func(completedAs))
|
2011-07-28 10:10:41 -06:00
|
|
|
this
|
|
|
|
|
}
|
2011-06-17 06:51:40 -06:00
|
|
|
|
2011-12-12 22:50:08 +01:00
|
|
|
def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this
|
|
|
|
|
def result(atMost: Duration)(implicit permit: CanAwait): T = value.get match {
|
2011-12-11 14:06:30 +01:00
|
|
|
case Left(e) ⇒ throw e
|
|
|
|
|
case Right(r) ⇒ r
|
|
|
|
|
}
|
2011-03-18 17:26:53 +01:00
|
|
|
}
|