2009-05-25 14:48:43 +02:00
|
|
|
/**
|
2010-12-22 15:35:50 +01:00
|
|
|
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
2009-05-25 14:48:43 +02:00
|
|
|
*/
|
|
|
|
|
|
2010-10-26 12:49:25 +02:00
|
|
|
package akka.dispatch
|
2009-05-25 14:48:43 +02:00
|
|
|
|
2010-10-26 12:49:25 +02:00
|
|
|
import akka.AkkaException
|
2011-03-23 15:12:09 +01:00
|
|
|
import akka.event.EventHandler
|
|
|
|
|
import akka.actor.Actor
|
2010-10-31 19:27:55 +01:00
|
|
|
import akka.routing.Dispatcher
|
2011-03-11 10:46:36 -07:00
|
|
|
import akka.japi.{ Procedure, Function => JFunc }
|
2010-10-31 19:27:55 +01:00
|
|
|
|
2010-08-19 07:01:09 +02:00
|
|
|
import java.util.concurrent.locks.ReentrantLock
|
2011-02-28 16:29:03 -07:00
|
|
|
import java.util.concurrent. {ConcurrentLinkedQueue, TimeUnit, Callable}
|
2011-02-12 12:26:56 -07:00
|
|
|
import java.util.concurrent.TimeUnit.{NANOSECONDS => NANOS, MILLISECONDS => MILLIS}
|
2011-01-24 16:37:08 +01:00
|
|
|
import java.util.concurrent.atomic. {AtomicBoolean, AtomicInteger}
|
2011-03-02 00:14:45 +01:00
|
|
|
import annotation.tailrec
|
2009-05-25 14:48:43 +02:00
|
|
|
|
2010-08-19 07:01:09 +02:00
|
|
|
class FutureTimeoutException(message: String) extends AkkaException(message)
|
2009-06-10 20:04:33 +02:00
|
|
|
|
2010-02-23 10:05:47 +01:00
|
|
|
object Futures {
|
|
|
|
|
|
2011-03-22 17:20:35 +01:00
|
|
|
/**
|
|
|
|
|
* Java API, equivalent to Future.apply
|
|
|
|
|
*/
|
2011-02-28 16:29:03 -07:00
|
|
|
def future[T](body: Callable[T]): Future[T] =
|
|
|
|
|
Future(body.call)
|
|
|
|
|
|
2011-03-22 17:20:35 +01:00
|
|
|
/**
|
|
|
|
|
* Java API, equivalent to Future.apply
|
|
|
|
|
*/
|
2011-02-28 16:29:03 -07:00
|
|
|
def future[T](body: Callable[T], timeout: Long): Future[T] =
|
|
|
|
|
Future(body.call, timeout)
|
|
|
|
|
|
2010-11-12 14:04:06 +01:00
|
|
|
/**
|
2011-03-22 17:20:35 +01:00
|
|
|
* Java API, equivalent to Future.apply
|
2010-11-12 14:04:06 +01:00
|
|
|
*/
|
2011-03-22 17:20:35 +01:00
|
|
|
def future[T](body: Callable[T], dispatcher: MessageDispatcher): Future[T] =
|
|
|
|
|
Future(body.call)(dispatcher)
|
2010-03-01 22:03:17 +01:00
|
|
|
|
2010-10-26 16:40:09 +02:00
|
|
|
/**
|
2011-03-22 17:20:35 +01:00
|
|
|
* Java API, equivalent to Future.apply
|
2010-10-26 16:40:09 +02:00
|
|
|
*/
|
2011-03-22 17:20:35 +01:00
|
|
|
def future[T](body: Callable[T], timeout: Long, dispatcher: MessageDispatcher): Future[T] =
|
|
|
|
|
Future(body.call, timeout)(dispatcher)
|
2010-11-12 12:54:48 +01:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns a Future to the result of the first future in the list that is completed
|
|
|
|
|
*/
|
2011-03-22 17:20:35 +01:00
|
|
|
def firstCompletedOf[T](futures: Iterable[Future[T]], timeout: Long = Long.MaxValue): Future[T] = {
|
|
|
|
|
val futureResult = new DefaultCompletableFuture[T](timeout)
|
2011-02-08 16:14:40 +01:00
|
|
|
|
2011-03-22 17:20:35 +01:00
|
|
|
val completeFirst: Future[T] => Unit = _.value.foreach(futureResult complete _)
|
2011-02-08 16:14:40 +01:00
|
|
|
for(f <- futures) f onComplete completeFirst
|
|
|
|
|
|
2010-11-12 12:54:48 +01:00
|
|
|
futureResult
|
2010-02-23 10:05:47 +01:00
|
|
|
}
|
|
|
|
|
|
2011-03-22 22:12:16 +01:00
|
|
|
/**
|
2011-03-30 21:19:26 +02:00
|
|
|
* Java API.
|
2011-03-22 22:12:16 +01:00
|
|
|
* Returns a Future to the result of the first future in the list that is completed
|
|
|
|
|
*/
|
|
|
|
|
def firstCompletedOf[T <: AnyRef](futures: java.lang.Iterable[Future[T]], timeout: Long): Future[T] =
|
|
|
|
|
firstCompletedOf(scala.collection.JavaConversions.asScalaIterable(futures),timeout)
|
|
|
|
|
|
2011-01-24 13:10:52 +01:00
|
|
|
/**
|
|
|
|
|
* A non-blocking fold over the specified futures.
|
|
|
|
|
* The fold is performed on the thread where the last future is completed,
|
|
|
|
|
* the result will be the first failure of any of the futures, or any failure in the actual fold,
|
|
|
|
|
* or the result of the fold.
|
2011-03-30 21:19:26 +02:00
|
|
|
* Example:
|
|
|
|
|
* <pre>
|
|
|
|
|
* val result = Futures.fold(0)(futures)(_ + _).await.result
|
|
|
|
|
* </pre>
|
2011-01-24 13:10:52 +01:00
|
|
|
*/
|
2011-01-24 17:12:56 +01:00
|
|
|
def fold[T,R](zero: R, timeout: Long = Actor.TIMEOUT)(futures: Iterable[Future[T]])(foldFun: (R, T) => R): Future[R] = {
|
2011-02-08 16:14:40 +01:00
|
|
|
if(futures.isEmpty) {
|
2011-03-18 17:26:53 +01:00
|
|
|
new AlreadyCompletedFuture[R](Right(zero))
|
2011-02-08 16:14:40 +01:00
|
|
|
} else {
|
|
|
|
|
val result = new DefaultCompletableFuture[R](timeout)
|
|
|
|
|
val results = new ConcurrentLinkedQueue[T]()
|
2011-03-18 01:21:26 +01:00
|
|
|
val allDone = futures.size
|
2011-02-08 16:14:40 +01:00
|
|
|
|
|
|
|
|
val aggregate: Future[T] => Unit = f => if (!result.isCompleted) { //TODO: This is an optimization, is it premature?
|
2011-03-18 01:21:26 +01:00
|
|
|
f.value.get match {
|
|
|
|
|
case r: Right[Throwable, T] =>
|
|
|
|
|
results add r.b
|
|
|
|
|
if (results.size == allDone) { //Only one thread can get here
|
|
|
|
|
try {
|
|
|
|
|
result completeWithResult scala.collection.JavaConversions.asScalaIterable(results).foldLeft(zero)(foldFun)
|
|
|
|
|
} catch {
|
|
|
|
|
case e: Exception =>
|
2011-03-18 23:04:48 +01:00
|
|
|
EventHandler.error(e, this, e.getMessage)
|
2011-03-18 01:21:26 +01:00
|
|
|
result completeWithException e
|
|
|
|
|
} finally {
|
|
|
|
|
results.clear
|
|
|
|
|
}
|
2011-02-08 16:14:40 +01:00
|
|
|
}
|
2011-03-18 01:21:26 +01:00
|
|
|
case l: Left[Throwable, T] =>
|
|
|
|
|
result completeWithException l.a
|
|
|
|
|
results.clear
|
2011-01-24 12:14:50 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2011-01-24 17:42:56 +01:00
|
|
|
futures foreach { _ onComplete aggregate }
|
2011-02-08 16:14:40 +01:00
|
|
|
result
|
|
|
|
|
}
|
2011-01-24 12:14:50 +01:00
|
|
|
}
|
2011-01-24 16:37:08 +01:00
|
|
|
|
2011-03-22 22:12:16 +01:00
|
|
|
/**
|
|
|
|
|
* Java API
|
|
|
|
|
* A non-blocking fold over the specified futures.
|
|
|
|
|
* The fold is performed on the thread where the last future is completed,
|
|
|
|
|
* the result will be the first failure of any of the futures, or any failure in the actual fold,
|
|
|
|
|
* or the result of the fold.
|
|
|
|
|
*/
|
|
|
|
|
def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Long, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] =
|
|
|
|
|
fold(zero, timeout)(scala.collection.JavaConversions.asScalaIterable(futures))( fun.apply _ )
|
|
|
|
|
|
2011-01-24 16:37:08 +01:00
|
|
|
/**
|
|
|
|
|
* Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
|
2011-03-30 21:19:26 +02:00
|
|
|
* Example:
|
|
|
|
|
* <pre>
|
|
|
|
|
* val result = Futures.reduce(futures)(_ + _).await.result
|
|
|
|
|
* </pre>
|
2011-01-24 16:37:08 +01:00
|
|
|
*/
|
2011-01-24 17:12:56 +01:00
|
|
|
def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Long = Actor.TIMEOUT)(op: (R,T) => T): Future[R] = {
|
|
|
|
|
if (futures.isEmpty)
|
2011-03-18 17:26:53 +01:00
|
|
|
new AlreadyCompletedFuture[R](Left(new UnsupportedOperationException("empty reduce left")))
|
2011-02-08 16:14:40 +01:00
|
|
|
else {
|
|
|
|
|
val result = new DefaultCompletableFuture[R](timeout)
|
|
|
|
|
val seedFound = new AtomicBoolean(false)
|
|
|
|
|
val seedFold: Future[T] => Unit = f => {
|
2011-03-18 01:21:26 +01:00
|
|
|
if (seedFound.compareAndSet(false, true)) { //Only the first completed should trigger the fold
|
|
|
|
|
f.value.get match {
|
|
|
|
|
case r: Right[Throwable, T] =>
|
2011-03-18 11:18:03 +01:00
|
|
|
result.completeWith(fold(r.b, timeout)(futures.filterNot(_ eq f))(op))
|
2011-03-18 01:21:26 +01:00
|
|
|
case l: Left[Throwable, T] =>
|
|
|
|
|
result.completeWithException(l.a)
|
|
|
|
|
}
|
2011-02-08 16:14:40 +01:00
|
|
|
}
|
2011-01-24 16:37:08 +01:00
|
|
|
}
|
2011-02-08 16:14:40 +01:00
|
|
|
for(f <- futures) f onComplete seedFold //Attach the listener to the Futures
|
|
|
|
|
result
|
2011-01-24 16:37:08 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2011-03-22 22:12:16 +01:00
|
|
|
/**
|
2011-03-30 21:19:26 +02:00
|
|
|
* Java API.
|
2011-03-22 22:12:16 +01:00
|
|
|
* Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
|
|
|
|
|
*/
|
|
|
|
|
def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Long, fun: akka.japi.Function2[R, T, T]): Future[R] =
|
|
|
|
|
reduce(scala.collection.JavaConversions.asScalaIterable(futures), timeout)(fun.apply _)
|
|
|
|
|
|
2011-02-25 07:30:31 -07:00
|
|
|
import scala.collection.mutable.Builder
|
|
|
|
|
import scala.collection.generic.CanBuildFrom
|
2011-01-24 16:37:08 +01:00
|
|
|
|
2011-03-30 21:19:26 +02:00
|
|
|
/**
|
|
|
|
|
* FIXME document Futures.sequence
|
|
|
|
|
*/
|
2011-02-25 07:30:31 -07:00
|
|
|
def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]], timeout: Long = Actor.TIMEOUT)(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]]): Future[M[A]] =
|
2011-02-25 18:00:27 -07:00
|
|
|
in.foldLeft(new DefaultCompletableFuture[Builder[A, M[A]]](timeout).completeWithResult(cbf(in)): Future[Builder[A, M[A]]])((fr, fa) => for (r <- fr; a <- fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result)
|
2011-02-25 07:30:31 -07:00
|
|
|
|
2011-03-30 21:19:26 +02:00
|
|
|
/**
|
|
|
|
|
* FIXME document Futures.traverse
|
|
|
|
|
*/
|
2011-02-25 07:30:31 -07:00
|
|
|
def traverse[A, B, M[_] <: Traversable[_]](in: M[A], timeout: Long = Actor.TIMEOUT)(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]]): Future[M[B]] =
|
2011-02-25 18:00:27 -07:00
|
|
|
in.foldLeft(new DefaultCompletableFuture[Builder[B, M[B]]](timeout).completeWithResult(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) =>
|
|
|
|
|
val fb = fn(a.asInstanceOf[A])
|
|
|
|
|
for (r <- fr; b <-fb) yield (r += b)
|
|
|
|
|
}.map(_.result)
|
2011-03-22 17:20:35 +01:00
|
|
|
|
2011-03-30 21:19:26 +02:00
|
|
|
// =====================================
|
|
|
|
|
// Deprecations
|
|
|
|
|
// =====================================
|
|
|
|
|
|
2011-03-22 17:20:35 +01:00
|
|
|
/**
|
|
|
|
|
* (Blocking!)
|
|
|
|
|
*/
|
|
|
|
|
@deprecated("Will be removed after 1.1, if you must block, use: futures.foreach(_.await)")
|
|
|
|
|
def awaitAll(futures: List[Future[_]]): Unit = futures.foreach(_.await)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns the First Future that is completed (blocking!)
|
|
|
|
|
*/
|
|
|
|
|
@deprecated("Will be removed after 1.1, if you must block, use: firstCompletedOf(futures).await")
|
|
|
|
|
def awaitOne(futures: List[Future[_]], timeout: Long = Long.MaxValue): Future[_] = firstCompletedOf[Any](futures, timeout).await
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Applies the supplied function to the specified collection of Futures after awaiting each future to be completed
|
|
|
|
|
*/
|
|
|
|
|
@deprecated("Will be removed after 1.1, if you must block, use: futures map { f => fun(f.await) }")
|
|
|
|
|
def awaitMap[A,B](in: Traversable[Future[A]])(fun: (Future[A]) => B): Traversable[B] =
|
|
|
|
|
in map { f => fun(f.await) }
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns Future.resultOrException of the first completed of the 2 Futures provided (blocking!)
|
|
|
|
|
*/
|
|
|
|
|
@deprecated("Will be removed after 1.1, if you must block, use: firstCompletedOf(List(f1,f2)).await.resultOrException")
|
|
|
|
|
def awaitEither[T](f1: Future[T], f2: Future[T]): Option[T] = firstCompletedOf[T](List(f1,f2)).await.resultOrException
|
2010-02-23 10:05:47 +01:00
|
|
|
}
|
|
|
|
|
|
2011-02-28 11:47:39 -07:00
|
|
|
object Future {
|
2011-03-22 22:12:16 +01:00
|
|
|
/**
|
|
|
|
|
* This method constructs and returns a Future that will eventually hold the result of the execution of the supplied body
|
|
|
|
|
* The execution is performed by the specified Dispatcher.
|
|
|
|
|
*/
|
2011-02-28 11:47:39 -07:00
|
|
|
def apply[T](body: => T, timeout: Long = Actor.TIMEOUT)(implicit dispatcher: MessageDispatcher): Future[T] = {
|
|
|
|
|
val f = new DefaultCompletableFuture[T](timeout)
|
|
|
|
|
dispatcher.dispatchFuture(FutureInvocation(f.asInstanceOf[CompletableFuture[Any]], () => body))
|
|
|
|
|
f
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2011-03-11 10:46:36 -07:00
|
|
|
sealed trait Future[+T] {
|
2011-03-22 22:12:16 +01:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns the result of this future after waiting for it to complete,
|
|
|
|
|
* this method will throw any throwable that this Future was completed with
|
|
|
|
|
* and will throw a java.util.concurrent.TimeoutException if there is no result
|
|
|
|
|
* within the Futures timeout
|
|
|
|
|
*/
|
2011-03-22 22:20:32 +01:00
|
|
|
def apply(): T = this.await.resultOrException.get
|
2011-03-22 22:12:16 +01:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Java API for apply()
|
|
|
|
|
*/
|
|
|
|
|
def get: T = apply()
|
|
|
|
|
|
2011-02-23 19:57:42 -07:00
|
|
|
/**
|
|
|
|
|
* Blocks the current thread until the Future has been completed or the
|
|
|
|
|
* timeout has expired. In the case of the timeout expiring a
|
|
|
|
|
* FutureTimeoutException will be thrown.
|
|
|
|
|
*/
|
2010-04-25 18:07:27 +02:00
|
|
|
def await : Future[T]
|
2010-10-31 19:27:55 +01:00
|
|
|
|
2011-02-23 19:57:42 -07:00
|
|
|
/**
|
|
|
|
|
* Blocks the current thread until the Future has been completed. Use
|
|
|
|
|
* caution with this method as it ignores the timeout and will block
|
|
|
|
|
* indefinitely if the Future is never completed.
|
|
|
|
|
*/
|
2010-04-25 18:07:27 +02:00
|
|
|
def awaitBlocking : Future[T]
|
2010-10-31 19:27:55 +01:00
|
|
|
|
2011-02-23 19:57:42 -07:00
|
|
|
/**
|
|
|
|
|
* Tests whether this Future has been completed.
|
|
|
|
|
*/
|
2011-02-13 19:59:54 -07:00
|
|
|
final def isCompleted: Boolean = value.isDefined
|
2010-10-31 19:27:55 +01:00
|
|
|
|
2011-02-23 19:57:42 -07:00
|
|
|
/**
|
|
|
|
|
* Tests whether this Future's timeout has expired.
|
|
|
|
|
*
|
|
|
|
|
* Note that an expired Future may still contain a value, or it may be
|
|
|
|
|
* completed with a value.
|
|
|
|
|
*/
|
2009-05-25 14:48:43 +02:00
|
|
|
def isExpired: Boolean
|
2010-10-31 19:27:55 +01:00
|
|
|
|
2011-02-23 19:57:42 -07:00
|
|
|
/**
|
|
|
|
|
* This Future's timeout in nanoseconds.
|
|
|
|
|
*/
|
2009-05-25 14:48:43 +02:00
|
|
|
def timeoutInNanos: Long
|
2010-10-31 19:27:55 +01:00
|
|
|
|
2011-02-23 19:57:42 -07:00
|
|
|
/**
|
|
|
|
|
* The contained value of this Future. Before this Future is completed
|
|
|
|
|
* the value will be None. After completion the value will be Some(Right(t))
|
|
|
|
|
* if it contains a valid result, or Some(Left(error)) if it contains
|
|
|
|
|
* an exception.
|
|
|
|
|
*/
|
2011-02-11 14:46:39 -07:00
|
|
|
def value: Option[Either[Throwable, T]]
|
|
|
|
|
|
2011-02-23 19:57:42 -07:00
|
|
|
/**
|
|
|
|
|
* Returns the successful result of this Future if it exists.
|
|
|
|
|
*/
|
2011-02-13 19:59:54 -07:00
|
|
|
final def result: Option[T] = {
|
|
|
|
|
val v = value
|
|
|
|
|
if (v.isDefined) v.get.right.toOption
|
|
|
|
|
else None
|
|
|
|
|
}
|
2010-10-31 19:27:55 +01:00
|
|
|
|
2011-02-23 19:57:42 -07:00
|
|
|
/**
|
|
|
|
|
* Waits for the completion of this Future, then returns the completed value.
|
|
|
|
|
* If the Future's timeout expires while waiting a FutureTimeoutException
|
|
|
|
|
* will be thrown.
|
|
|
|
|
*
|
|
|
|
|
* Equivalent to calling future.await.value.
|
|
|
|
|
*/
|
2011-03-22 17:20:35 +01:00
|
|
|
def awaitValue: Option[Either[Throwable, T]]
|
2011-02-12 11:01:08 -07:00
|
|
|
|
2011-02-07 18:59:49 +01:00
|
|
|
/**
|
2011-02-23 19:57:42 -07:00
|
|
|
* Returns the result of the Future if one is available within the specified
|
|
|
|
|
* time, if the time left on the future is less than the specified time, the
|
|
|
|
|
* time left on the future will be used instead of the specified time.
|
|
|
|
|
* returns None if no result, Some(Right(t)) if a result, or
|
|
|
|
|
* Some(Left(error)) if there was an exception
|
2011-02-07 18:59:49 +01:00
|
|
|
*/
|
2011-03-22 17:20:35 +01:00
|
|
|
def valueWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]]
|
2011-02-07 18:59:49 +01:00
|
|
|
|
2011-02-23 19:57:42 -07:00
|
|
|
/**
|
|
|
|
|
* Returns the contained exception of this Future if it exists.
|
|
|
|
|
*/
|
2011-02-13 19:59:54 -07:00
|
|
|
final def exception: Option[Throwable] = {
|
|
|
|
|
val v = value
|
|
|
|
|
if (v.isDefined) v.get.left.toOption
|
|
|
|
|
else None
|
|
|
|
|
}
|
2010-10-31 19:27:55 +01:00
|
|
|
|
2011-02-23 19:57:42 -07:00
|
|
|
/**
|
|
|
|
|
* When this Future is completed, apply the provided function to the
|
|
|
|
|
* Future. If the Future has already been completed, this will apply
|
|
|
|
|
* immediatly.
|
|
|
|
|
*/
|
2010-11-12 12:11:53 +01:00
|
|
|
def onComplete(func: Future[T] => Unit): Future[T]
|
|
|
|
|
|
2011-02-13 21:11:37 -07:00
|
|
|
/**
|
2011-02-23 19:57:42 -07:00
|
|
|
* When the future is compeleted with a valid result, apply the provided
|
|
|
|
|
* PartialFunction to the result.
|
2011-03-30 21:19:26 +02:00
|
|
|
* <pre>
|
|
|
|
|
* val result = future receive {
|
|
|
|
|
* case Foo => "foo"
|
|
|
|
|
* case Bar => "bar"
|
|
|
|
|
* }.await.result
|
|
|
|
|
* </pre>
|
2011-02-13 21:11:37 -07:00
|
|
|
*/
|
|
|
|
|
final def receive(pf: PartialFunction[Any, Unit]): Future[T] = onComplete { f =>
|
|
|
|
|
val optr = f.result
|
|
|
|
|
if (optr.isDefined) {
|
|
|
|
|
val r = optr.get
|
|
|
|
|
if (pf.isDefinedAt(r)) pf(r)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2011-03-11 10:46:36 -07:00
|
|
|
/**
|
|
|
|
|
* Creates a new Future by applying a PartialFunction to the successful
|
|
|
|
|
* result of this Future if a match is found, or else return a MatchError.
|
|
|
|
|
* If this Future is completed with an exception then the new Future will
|
|
|
|
|
* also contain this exception.
|
2011-03-30 21:19:26 +02:00
|
|
|
* Example:
|
|
|
|
|
* <pre>
|
|
|
|
|
* val future1 = for {
|
|
|
|
|
* a <- actor !!! Req("Hello") collect { case Res(x: Int) => x }
|
|
|
|
|
* b <- actor !!! Req(a) collect { case Res(x: String) => x }
|
|
|
|
|
* c <- actor !!! Req(7) collect { case Res(x: String) => x }
|
|
|
|
|
* } yield b + "-" + c
|
|
|
|
|
* </pre>
|
2011-03-11 10:46:36 -07:00
|
|
|
*/
|
|
|
|
|
final def collect[A](pf: PartialFunction[Any, A]): Future[A] = {
|
|
|
|
|
val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS)
|
|
|
|
|
onComplete { ft =>
|
|
|
|
|
val optv = ft.value
|
|
|
|
|
if (optv.isDefined) {
|
|
|
|
|
val v = optv.get
|
|
|
|
|
fa complete {
|
|
|
|
|
if (v.isLeft) v.asInstanceOf[Either[Throwable, A]]
|
|
|
|
|
else {
|
|
|
|
|
try {
|
|
|
|
|
val r = v.right.get
|
|
|
|
|
if (pf isDefinedAt r) Right(pf(r))
|
|
|
|
|
else Left(new MatchError(r))
|
|
|
|
|
} catch {
|
|
|
|
|
case e: Exception =>
|
2011-03-18 23:04:48 +01:00
|
|
|
EventHandler.error(e, this, e.getMessage)
|
2011-03-11 10:46:36 -07:00
|
|
|
Left(e)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
fa
|
|
|
|
|
}
|
|
|
|
|
|
2011-02-23 19:57:42 -07:00
|
|
|
/**
|
|
|
|
|
* Creates a new Future by applying a function to the successful result of
|
|
|
|
|
* this Future. If this Future is completed with an exception then the new
|
|
|
|
|
* Future will also contain this exception.
|
2011-03-30 21:19:26 +02:00
|
|
|
* Example:
|
|
|
|
|
* <pre>
|
|
|
|
|
* val future1 = for {
|
|
|
|
|
* a: Int <- actor !!! "Hello" // returns 5
|
|
|
|
|
* b: String <- actor !!! a // returns "10"
|
|
|
|
|
* c: String <- actor !!! 7 // returns "14"
|
|
|
|
|
* } yield b + "-" + c
|
|
|
|
|
* </pre>
|
2011-02-23 19:57:42 -07:00
|
|
|
*/
|
2011-02-21 17:24:51 -07:00
|
|
|
final def map[A](f: T => A): Future[A] = {
|
|
|
|
|
val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS)
|
2011-02-22 20:32:59 -07:00
|
|
|
onComplete { ft =>
|
|
|
|
|
val optv = ft.value
|
|
|
|
|
if (optv.isDefined) {
|
|
|
|
|
val v = optv.get
|
|
|
|
|
if (v.isLeft)
|
|
|
|
|
fa complete v.asInstanceOf[Either[Throwable, A]]
|
|
|
|
|
else {
|
|
|
|
|
fa complete (try {
|
|
|
|
|
Right(f(v.right.get))
|
|
|
|
|
} catch {
|
2011-03-02 18:19:17 +01:00
|
|
|
case e: Exception =>
|
2011-03-18 23:04:48 +01:00
|
|
|
EventHandler.error(e, this, e.getMessage)
|
2011-03-02 00:14:45 +01:00
|
|
|
Left(e)
|
2011-02-22 20:32:59 -07:00
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2011-02-21 17:24:51 -07:00
|
|
|
fa
|
|
|
|
|
}
|
|
|
|
|
|
2011-02-23 19:57:42 -07:00
|
|
|
/**
|
|
|
|
|
* Creates a new Future by applying a function to the successful result of
|
|
|
|
|
* this Future, and returns the result of the function as the new Future.
|
|
|
|
|
* If this Future is completed with an exception then the new Future will
|
|
|
|
|
* also contain this exception.
|
2011-03-30 21:19:26 +02:00
|
|
|
* Example:
|
|
|
|
|
* <pre>
|
|
|
|
|
* val future1 = for {
|
|
|
|
|
* a: Int <- actor !!! "Hello" // returns 5
|
|
|
|
|
* b: String <- actor !!! a // returns "10"
|
|
|
|
|
* c: String <- actor !!! 7 // returns "14"
|
|
|
|
|
* } yield b + "-" + c
|
|
|
|
|
* </pre>
|
2011-02-23 19:57:42 -07:00
|
|
|
*/
|
2011-02-21 17:24:51 -07:00
|
|
|
final def flatMap[A](f: T => Future[A]): Future[A] = {
|
|
|
|
|
val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS)
|
2011-02-22 20:32:59 -07:00
|
|
|
onComplete { ft =>
|
|
|
|
|
val optv = ft.value
|
|
|
|
|
if (optv.isDefined) {
|
|
|
|
|
val v = optv.get
|
|
|
|
|
if (v.isLeft)
|
|
|
|
|
fa complete v.asInstanceOf[Either[Throwable, A]]
|
|
|
|
|
else {
|
|
|
|
|
try {
|
2011-03-18 17:26:53 +01:00
|
|
|
fa.completeWith(f(v.right.get))
|
2011-02-22 20:32:59 -07:00
|
|
|
} catch {
|
2011-03-02 18:19:17 +01:00
|
|
|
case e: Exception =>
|
2011-03-18 23:04:48 +01:00
|
|
|
EventHandler.error(e, this, e.getMessage)
|
2011-03-02 00:14:45 +01:00
|
|
|
fa completeWithException e
|
2011-02-22 20:32:59 -07:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2011-02-21 17:24:51 -07:00
|
|
|
fa
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
final def foreach(f: T => Unit): Unit = onComplete { ft =>
|
|
|
|
|
val optr = ft.result
|
|
|
|
|
if (optr.isDefined)
|
|
|
|
|
f(optr.get)
|
|
|
|
|
}
|
|
|
|
|
|
2011-02-21 18:28:17 -07:00
|
|
|
final def filter(p: T => Boolean): Future[T] = {
|
|
|
|
|
val f = new DefaultCompletableFuture[T](timeoutInNanos, NANOS)
|
2011-02-22 20:32:59 -07:00
|
|
|
onComplete { ft =>
|
|
|
|
|
val optv = ft.value
|
|
|
|
|
if (optv.isDefined) {
|
|
|
|
|
val v = optv.get
|
|
|
|
|
if (v.isLeft)
|
|
|
|
|
f complete v
|
|
|
|
|
else {
|
|
|
|
|
val r = v.right.get
|
|
|
|
|
f complete (try {
|
|
|
|
|
if (p(r)) Right(r)
|
|
|
|
|
else Left(new MatchError(r))
|
|
|
|
|
} catch {
|
2011-03-02 18:19:17 +01:00
|
|
|
case e: Exception =>
|
2011-03-18 23:04:48 +01:00
|
|
|
EventHandler.error(e, this, e.getMessage)
|
2011-03-02 00:14:45 +01:00
|
|
|
Left(e)
|
2011-02-22 20:32:59 -07:00
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2011-02-21 18:28:17 -07:00
|
|
|
f
|
|
|
|
|
}
|
|
|
|
|
|
2010-11-12 14:04:06 +01:00
|
|
|
/**
|
2011-03-30 21:19:26 +02:00
|
|
|
* Returns the current result, throws the exception is one has been raised, else returns None
|
2010-11-12 14:04:06 +01:00
|
|
|
*/
|
2011-02-13 19:59:54 -07:00
|
|
|
final def resultOrException: Option[T] = {
|
|
|
|
|
val v = value
|
|
|
|
|
if (v.isDefined) {
|
|
|
|
|
val r = v.get
|
|
|
|
|
if (r.isLeft) throw r.left.get
|
|
|
|
|
else r.right.toOption
|
|
|
|
|
} else None
|
|
|
|
|
}
|
2010-11-12 14:04:06 +01:00
|
|
|
|
2010-11-12 12:11:53 +01:00
|
|
|
/* Java API */
|
2011-03-11 10:46:36 -07:00
|
|
|
final def onComplete[A >: T](proc: Procedure[Future[A]]): Future[T] = onComplete(proc(_))
|
|
|
|
|
|
|
|
|
|
final def map[A >: T, B](f: JFunc[A,B]): Future[B] = map(f(_))
|
|
|
|
|
|
|
|
|
|
final def flatMap[A >: T, B](f: JFunc[A,Future[B]]): Future[B] = flatMap(f(_))
|
|
|
|
|
|
|
|
|
|
final def foreach[A >: T](proc: Procedure[A]): Unit = foreach(proc(_))
|
|
|
|
|
|
|
|
|
|
final def filter[A >: T](p: JFunc[A,Boolean]): Future[T] = filter(p(_))
|
2010-11-12 12:11:53 +01:00
|
|
|
|
2009-05-25 14:48:43 +02:00
|
|
|
}
|
|
|
|
|
|
2011-03-18 17:37:25 +01:00
|
|
|
/**
|
2011-03-30 21:19:26 +02:00
|
|
|
* Essentially this is the Promise (or write-side) of a Future (read-side).
|
2011-03-18 17:37:25 +01:00
|
|
|
*/
|
2010-04-23 20:46:58 +02:00
|
|
|
trait CompletableFuture[T] extends Future[T] {
|
2011-03-22 22:12:16 +01:00
|
|
|
/**
|
2011-03-30 21:19:26 +02:00
|
|
|
* Completes this Future with the specified result, if not already completed.
|
|
|
|
|
* @return this
|
2011-03-22 22:12:16 +01:00
|
|
|
*/
|
2011-02-12 09:01:15 -07:00
|
|
|
def complete(value: Either[Throwable, T]): CompletableFuture[T]
|
2011-03-22 22:12:16 +01:00
|
|
|
|
|
|
|
|
/**
|
2011-03-30 21:19:26 +02:00
|
|
|
* Completes this Future with the specified result, if not already completed.
|
|
|
|
|
* @return this
|
2011-03-22 22:12:16 +01:00
|
|
|
*/
|
2011-02-13 19:59:54 -07:00
|
|
|
final def completeWithResult(result: T): CompletableFuture[T] = complete(Right(result))
|
2011-03-22 22:12:16 +01:00
|
|
|
|
|
|
|
|
/**
|
2011-03-30 21:19:26 +02:00
|
|
|
* Completes this Future with the specified exception, if not already completed.
|
|
|
|
|
* @return this
|
2011-03-22 22:12:16 +01:00
|
|
|
*/
|
2011-02-13 19:59:54 -07:00
|
|
|
final def completeWithException(exception: Throwable): CompletableFuture[T] = complete(Left(exception))
|
2011-03-22 22:12:16 +01:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Completes this Future with the specified other Future, when that Future is completed,
|
2011-03-30 21:19:26 +02:00
|
|
|
* unless this Future has already been completed.
|
|
|
|
|
* @return this.
|
2011-03-22 22:12:16 +01:00
|
|
|
*/
|
2011-02-13 19:59:54 -07:00
|
|
|
final def completeWith(other: Future[T]): CompletableFuture[T] = {
|
2011-03-16 21:17:38 +01:00
|
|
|
other onComplete { f => complete(f.value.get) }
|
|
|
|
|
this
|
2010-11-12 12:52:08 +01:00
|
|
|
}
|
2011-03-22 22:12:16 +01:00
|
|
|
|
|
|
|
|
/**
|
2011-03-30 21:19:26 +02:00
|
|
|
* Alias for complete(Right(value)).
|
2011-03-22 22:12:16 +01:00
|
|
|
*/
|
|
|
|
|
final def << (value: T): CompletableFuture[T] = complete(Right(value))
|
|
|
|
|
|
|
|
|
|
/**
|
2011-03-30 21:19:26 +02:00
|
|
|
* Alias for completeWith(other).
|
2011-03-22 22:12:16 +01:00
|
|
|
*/
|
|
|
|
|
final def << (other : Future[T]): CompletableFuture[T] = completeWith(other)
|
2009-05-25 14:48:43 +02:00
|
|
|
}
|
|
|
|
|
|
2011-02-28 10:17:44 -07:00
|
|
|
/**
|
2011-03-30 21:19:26 +02:00
|
|
|
* The default concrete Future implementation.
|
2011-02-28 10:17:44 -07:00
|
|
|
*/
|
2011-02-12 12:26:56 -07:00
|
|
|
class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends CompletableFuture[T] {
|
2010-10-31 19:27:55 +01:00
|
|
|
|
2011-02-12 12:26:56 -07:00
|
|
|
def this() = this(0, MILLIS)
|
2009-06-05 22:08:53 +02:00
|
|
|
|
2011-02-12 12:26:56 -07:00
|
|
|
def this(timeout: Long) = this(timeout, MILLIS)
|
|
|
|
|
|
|
|
|
|
val timeoutInNanos = timeunit.toNanos(timeout)
|
2009-05-25 14:48:43 +02:00
|
|
|
private val _startTimeInNanos = currentTimeInNanos
|
|
|
|
|
private val _lock = new ReentrantLock
|
|
|
|
|
private val _signal = _lock.newCondition
|
2011-02-11 14:46:39 -07:00
|
|
|
private var _value: Option[Either[Throwable, T]] = None
|
2010-11-12 12:11:53 +01:00
|
|
|
private var _listeners: List[Future[T] => Unit] = Nil
|
2009-06-21 14:08:43 +02:00
|
|
|
|
2011-02-12 12:26:56 -07:00
|
|
|
@tailrec
|
2011-02-12 09:16:29 -07:00
|
|
|
private def awaitUnsafe(wait: Long): Boolean = {
|
2011-02-13 19:59:54 -07:00
|
|
|
if (_value.isEmpty && wait > 0) {
|
2011-02-07 18:59:49 +01:00
|
|
|
val start = currentTimeInNanos
|
2011-02-28 10:17:44 -07:00
|
|
|
val remaining = try {
|
2011-02-12 09:16:29 -07:00
|
|
|
_signal.awaitNanos(wait)
|
2011-02-07 18:59:49 +01:00
|
|
|
} catch {
|
|
|
|
|
case e: InterruptedException =>
|
2011-02-12 09:16:29 -07:00
|
|
|
wait - (currentTimeInNanos - start)
|
2011-02-28 10:17:44 -07:00
|
|
|
}
|
|
|
|
|
awaitUnsafe(remaining)
|
2011-02-12 09:16:29 -07:00
|
|
|
} else {
|
|
|
|
|
_value.isDefined
|
2011-02-07 18:59:49 +01:00
|
|
|
}
|
2011-02-12 09:16:29 -07:00
|
|
|
}
|
|
|
|
|
|
2011-03-22 17:20:35 +01:00
|
|
|
def awaitValue: Option[Either[Throwable, T]] = {
|
2011-02-28 10:17:44 -07:00
|
|
|
_lock.lock
|
|
|
|
|
try {
|
|
|
|
|
awaitUnsafe(timeoutInNanos - (currentTimeInNanos - _startTimeInNanos))
|
|
|
|
|
_value
|
|
|
|
|
} finally {
|
|
|
|
|
_lock.unlock
|
2011-02-27 17:42:17 -07:00
|
|
|
}
|
2011-02-28 10:17:44 -07:00
|
|
|
}
|
2011-02-12 11:01:08 -07:00
|
|
|
|
2011-03-22 17:20:35 +01:00
|
|
|
def valueWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] = {
|
2011-02-28 10:17:44 -07:00
|
|
|
_lock.lock
|
|
|
|
|
try {
|
|
|
|
|
awaitUnsafe(unit.toNanos(time).min(timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)))
|
|
|
|
|
_value
|
|
|
|
|
} finally {
|
|
|
|
|
_lock.unlock
|
2011-02-27 17:42:17 -07:00
|
|
|
}
|
2011-02-28 10:17:44 -07:00
|
|
|
}
|
2011-02-07 18:59:49 +01:00
|
|
|
|
2011-02-27 17:42:17 -07:00
|
|
|
def await = {
|
2011-02-28 10:17:44 -07:00
|
|
|
_lock.lock
|
2011-02-28 10:32:31 -07:00
|
|
|
if (try { awaitUnsafe(timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)) } finally { _lock.unlock }) this
|
|
|
|
|
else throw new FutureTimeoutException("Futures timed out after [" + NANOS.toMillis(timeoutInNanos) + "] milliseconds")
|
2009-05-25 14:48:43 +02:00
|
|
|
}
|
|
|
|
|
|
2011-02-27 17:42:17 -07:00
|
|
|
def awaitBlocking = {
|
2011-02-28 10:17:44 -07:00
|
|
|
_lock.lock
|
|
|
|
|
try {
|
|
|
|
|
while (_value.isEmpty) {
|
|
|
|
|
_signal.await
|
2011-02-27 17:42:17 -07:00
|
|
|
}
|
2011-02-28 10:17:44 -07:00
|
|
|
this
|
|
|
|
|
} finally {
|
|
|
|
|
_lock.unlock
|
2009-06-05 22:08:53 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2011-02-13 19:59:54 -07:00
|
|
|
def isExpired: Boolean = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos) <= 0
|
2009-05-25 14:48:43 +02:00
|
|
|
|
2011-02-28 10:17:44 -07:00
|
|
|
def value: Option[Either[Throwable, T]] = {
|
|
|
|
|
_lock.lock
|
|
|
|
|
try {
|
|
|
|
|
_value
|
|
|
|
|
} finally {
|
|
|
|
|
_lock.unlock
|
2009-05-25 14:48:43 +02:00
|
|
|
}
|
2011-02-28 10:17:44 -07:00
|
|
|
}
|
2010-11-12 12:11:53 +01:00
|
|
|
|
2011-02-27 17:42:17 -07:00
|
|
|
def complete(value: Either[Throwable, T]): DefaultCompletableFuture[T] = {
|
2011-02-28 10:17:44 -07:00
|
|
|
_lock.lock
|
2011-03-18 17:37:25 +01:00
|
|
|
val notifyTheseListeners = try {
|
2011-02-28 10:17:44 -07:00
|
|
|
if (_value.isEmpty) {
|
|
|
|
|
_value = Some(value)
|
2011-03-18 17:37:25 +01:00
|
|
|
val existingListeners = _listeners
|
2011-02-28 10:17:44 -07:00
|
|
|
_listeners = Nil
|
2011-03-18 17:37:25 +01:00
|
|
|
existingListeners
|
|
|
|
|
} else Nil
|
2011-02-28 10:17:44 -07:00
|
|
|
} finally {
|
|
|
|
|
_signal.signalAll
|
|
|
|
|
_lock.unlock
|
2011-02-27 17:42:17 -07:00
|
|
|
}
|
2011-02-28 10:17:44 -07:00
|
|
|
|
|
|
|
|
if (notifyTheseListeners.nonEmpty)
|
2011-02-28 16:27:27 -07:00
|
|
|
notifyTheseListeners.reverse foreach notify
|
2011-02-28 10:17:44 -07:00
|
|
|
|
2011-02-08 16:14:40 +01:00
|
|
|
this
|
2010-11-12 12:11:53 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def onComplete(func: Future[T] => Unit): CompletableFuture[T] = {
|
2011-02-28 10:17:44 -07:00
|
|
|
_lock.lock
|
2011-03-18 17:37:25 +01:00
|
|
|
val notifyNow = try {
|
|
|
|
|
if (_value.isEmpty) {
|
|
|
|
|
_listeners ::= func
|
|
|
|
|
false
|
|
|
|
|
} else true
|
2011-02-28 10:17:44 -07:00
|
|
|
} finally {
|
|
|
|
|
_lock.unlock
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (notifyNow) notify(func)
|
|
|
|
|
|
2010-11-12 12:11:53 +01:00
|
|
|
this
|
2009-05-25 14:48:43 +02:00
|
|
|
}
|
|
|
|
|
|
2011-01-24 12:14:50 +01:00
|
|
|
private def notify(func: Future[T] => Unit) {
|
2011-03-18 17:26:53 +01:00
|
|
|
try {
|
|
|
|
|
func(this)
|
|
|
|
|
} catch {
|
2011-03-20 17:15:23 -06:00
|
|
|
case e => EventHandler notify EventHandler.Error(e, this)
|
2011-03-18 17:26:53 +01:00
|
|
|
}
|
2010-11-12 12:11:53 +01:00
|
|
|
}
|
2010-10-31 19:27:55 +01:00
|
|
|
|
2011-02-12 12:26:56 -07:00
|
|
|
private def currentTimeInNanos: Long = MILLIS.toNanos(System.currentTimeMillis)
|
2009-05-25 14:48:43 +02:00
|
|
|
}
|
2011-03-18 17:26:53 +01:00
|
|
|
|
2011-03-18 17:37:25 +01:00
|
|
|
/**
|
|
|
|
|
* An already completed Future is seeded with it's result at creation, is useful for when you are participating in
|
|
|
|
|
* a Future-composition but you already have a value to contribute.
|
|
|
|
|
*/
|
2011-03-18 17:26:53 +01:00
|
|
|
sealed class AlreadyCompletedFuture[T](suppliedValue: Either[Throwable, T]) extends CompletableFuture[T] {
|
|
|
|
|
val value = Some(suppliedValue)
|
|
|
|
|
|
|
|
|
|
def complete(value: Either[Throwable, T]): CompletableFuture[T] = this
|
|
|
|
|
def onComplete(func: Future[T] => Unit): Future[T] = { func(this); this }
|
2011-03-22 17:20:35 +01:00
|
|
|
def awaitValue: Option[Either[Throwable, T]] = value
|
|
|
|
|
def valueWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] = value
|
2011-03-18 17:26:53 +01:00
|
|
|
def await : Future[T] = this
|
|
|
|
|
def awaitBlocking : Future[T] = this
|
2011-03-22 14:45:31 +01:00
|
|
|
def isExpired: Boolean = true
|
2011-03-18 17:26:53 +01:00
|
|
|
def timeoutInNanos: Long = 0
|
|
|
|
|
}
|