2009-05-25 14:48:43 +02:00
|
|
|
/**
|
2010-12-22 15:35:50 +01:00
|
|
|
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
2009-05-25 14:48:43 +02:00
|
|
|
*/
|
|
|
|
|
|
2010-10-26 12:49:25 +02:00
|
|
|
package akka.dispatch
|
2009-05-25 14:48:43 +02:00
|
|
|
|
2010-10-26 12:49:25 +02:00
|
|
|
import akka.AkkaException
|
|
|
|
|
import akka.actor.Actor.spawn
|
2010-10-31 19:27:55 +01:00
|
|
|
import akka.routing.Dispatcher
|
|
|
|
|
|
2010-08-19 07:01:09 +02:00
|
|
|
import java.util.concurrent.locks.ReentrantLock
|
2010-11-12 12:11:53 +01:00
|
|
|
import akka.japi.Procedure
|
2011-01-24 12:14:50 +01:00
|
|
|
import java.util.concurrent. {ConcurrentLinkedQueue, TimeUnit}
|
|
|
|
|
import akka.actor.Actor
|
2011-01-24 16:37:08 +01:00
|
|
|
import annotation.tailrec
|
|
|
|
|
import java.util.concurrent.atomic. {AtomicBoolean, AtomicInteger}
|
2009-05-25 14:48:43 +02:00
|
|
|
|
2010-08-19 07:01:09 +02:00
|
|
|
class FutureTimeoutException(message: String) extends AkkaException(message)
|
2009-06-10 20:04:33 +02:00
|
|
|
|
2010-02-23 10:05:47 +01:00
|
|
|
object Futures {
|
|
|
|
|
|
2010-03-01 22:03:17 +01:00
|
|
|
/**
|
2010-05-12 07:40:08 +02:00
|
|
|
* Module with utility methods for working with Futures.
|
2010-03-01 22:03:17 +01:00
|
|
|
* <pre>
|
|
|
|
|
* val future = Futures.future(1000) {
|
|
|
|
|
* ... // do stuff
|
|
|
|
|
* }
|
|
|
|
|
* </pre>
|
2010-10-12 13:18:15 +02:00
|
|
|
*/
|
|
|
|
|
def future[T](timeout: Long,
|
|
|
|
|
dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher)
|
|
|
|
|
(body: => T): Future[T] = {
|
|
|
|
|
val f = new DefaultCompletableFuture[T](timeout)
|
|
|
|
|
spawn({
|
|
|
|
|
try { f completeWithResult body }
|
|
|
|
|
catch { case e => f completeWithException e}
|
|
|
|
|
})(dispatcher)
|
|
|
|
|
f
|
2010-03-01 22:03:17 +01:00
|
|
|
}
|
|
|
|
|
|
2010-11-12 14:04:06 +01:00
|
|
|
/**
|
|
|
|
|
* (Blocking!)
|
|
|
|
|
*/
|
2010-04-23 20:46:58 +02:00
|
|
|
def awaitAll(futures: List[Future[_]]): Unit = futures.foreach(_.await)
|
2010-03-01 22:03:17 +01:00
|
|
|
|
2010-10-26 16:40:09 +02:00
|
|
|
/**
|
2011-02-07 18:59:49 +01:00
|
|
|
* Returns the First Future that is completed (blocking!)
|
2010-10-26 16:40:09 +02:00
|
|
|
*/
|
2010-12-14 10:10:52 +01:00
|
|
|
def awaitOne(futures: List[Future[_]], timeout: Long = Long.MaxValue): Future[_] = firstCompletedOf(futures, timeout).await
|
2010-11-12 12:54:48 +01:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns a Future to the result of the first future in the list that is completed
|
|
|
|
|
*/
|
2011-01-24 16:37:08 +01:00
|
|
|
def firstCompletedOf(futures: Iterable[Future[_]], timeout: Long = Long.MaxValue): Future[_] = {
|
2010-11-12 12:52:08 +01:00
|
|
|
val futureResult = new DefaultCompletableFuture[Any](timeout)
|
2011-02-08 16:14:40 +01:00
|
|
|
|
|
|
|
|
val completeFirst: Future[_] => Unit = f => futureResult.completeWith(f.asInstanceOf[Future[Any]])
|
|
|
|
|
for(f <- futures) f onComplete completeFirst
|
|
|
|
|
|
2010-11-12 12:54:48 +01:00
|
|
|
futureResult
|
2010-02-23 10:05:47 +01:00
|
|
|
}
|
|
|
|
|
|
2010-10-15 17:24:15 +02:00
|
|
|
/**
|
|
|
|
|
* Applies the supplied function to the specified collection of Futures after awaiting each future to be completed
|
|
|
|
|
*/
|
|
|
|
|
def awaitMap[A,B](in: Traversable[Future[A]])(fun: (Future[A]) => B): Traversable[B] =
|
|
|
|
|
in map { f => fun(f.await) }
|
|
|
|
|
|
2010-11-12 14:04:06 +01:00
|
|
|
/**
|
|
|
|
|
* Returns Future.resultOrException of the first completed of the 2 Futures provided (blocking!)
|
|
|
|
|
*/
|
|
|
|
|
def awaitEither[T](f1: Future[T], f2: Future[T]): Option[T] = awaitOne(List(f1,f2)).asInstanceOf[Future[T]].resultOrException
|
2011-01-24 12:14:50 +01:00
|
|
|
|
2011-01-24 13:10:52 +01:00
|
|
|
/**
|
|
|
|
|
* A non-blocking fold over the specified futures.
|
|
|
|
|
* The fold is performed on the thread where the last future is completed,
|
|
|
|
|
* the result will be the first failure of any of the futures, or any failure in the actual fold,
|
|
|
|
|
* or the result of the fold.
|
|
|
|
|
*/
|
2011-01-24 17:12:56 +01:00
|
|
|
def fold[T,R](zero: R, timeout: Long = Actor.TIMEOUT)(futures: Iterable[Future[T]])(foldFun: (R, T) => R): Future[R] = {
|
2011-02-08 16:14:40 +01:00
|
|
|
if(futures.isEmpty) {
|
|
|
|
|
(new DefaultCompletableFuture[R](timeout)) completeWithResult zero
|
|
|
|
|
} else {
|
|
|
|
|
val result = new DefaultCompletableFuture[R](timeout)
|
|
|
|
|
val results = new ConcurrentLinkedQueue[T]()
|
|
|
|
|
val waitingFor = new AtomicInteger(futures.size)
|
|
|
|
|
|
|
|
|
|
val aggregate: Future[T] => Unit = f => if (!result.isCompleted) { //TODO: This is an optimization, is it premature?
|
|
|
|
|
if (f.exception.isDefined)
|
|
|
|
|
result completeWithException f.exception.get
|
|
|
|
|
else {
|
|
|
|
|
results add f.result.get
|
|
|
|
|
if (waitingFor.decrementAndGet == 0) { //Only one thread can get here
|
|
|
|
|
try {
|
|
|
|
|
val r = scala.collection.JavaConversions.asScalaIterable(results).foldLeft(zero)(foldFun)
|
|
|
|
|
results.clear //Do not retain the values since someone can hold onto the Future for a long time
|
|
|
|
|
result completeWithResult r
|
|
|
|
|
} catch {
|
|
|
|
|
case e: Exception => result completeWithException e
|
|
|
|
|
}
|
2011-01-24 12:14:50 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2011-01-24 17:42:56 +01:00
|
|
|
futures foreach { _ onComplete aggregate }
|
2011-02-08 16:14:40 +01:00
|
|
|
result
|
|
|
|
|
}
|
2011-01-24 12:14:50 +01:00
|
|
|
}
|
2011-01-24 16:37:08 +01:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
|
|
|
|
|
*/
|
2011-01-24 17:12:56 +01:00
|
|
|
def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Long = Actor.TIMEOUT)(op: (R,T) => T): Future[R] = {
|
|
|
|
|
if (futures.isEmpty)
|
2011-02-08 16:14:40 +01:00
|
|
|
(new DefaultCompletableFuture[R](timeout)).completeWithException(new UnsupportedOperationException("empty reduce left"))
|
|
|
|
|
else {
|
|
|
|
|
val result = new DefaultCompletableFuture[R](timeout)
|
|
|
|
|
val seedFound = new AtomicBoolean(false)
|
|
|
|
|
val seedFold: Future[T] => Unit = f => {
|
|
|
|
|
if (seedFound.compareAndSet(false, true)){ //Only the first completed should trigger the fold
|
|
|
|
|
if (f.exception.isDefined) result completeWithException f.exception.get //If the seed is a failure, we're done here
|
|
|
|
|
else (fold[T,R](f.result.get, timeout)(futures.filterNot(_ eq f))(op)).onComplete(result.completeWith(_)) //Fold using the seed
|
|
|
|
|
}
|
|
|
|
|
() //Without this Unit value, the compiler crashes
|
2011-01-24 16:37:08 +01:00
|
|
|
}
|
2011-02-08 16:14:40 +01:00
|
|
|
for(f <- futures) f onComplete seedFold //Attach the listener to the Futures
|
|
|
|
|
result
|
2011-01-24 16:37:08 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2010-02-23 10:05:47 +01:00
|
|
|
}
|
|
|
|
|
|
2010-04-23 20:46:58 +02:00
|
|
|
sealed trait Future[T] {
|
2010-04-25 18:07:27 +02:00
|
|
|
def await : Future[T]
|
2010-10-31 19:27:55 +01:00
|
|
|
|
2010-04-25 18:07:27 +02:00
|
|
|
def awaitBlocking : Future[T]
|
2010-10-31 19:27:55 +01:00
|
|
|
|
2011-02-11 14:46:39 -07:00
|
|
|
def isCompleted: Boolean = value.isDefined
|
2010-10-31 19:27:55 +01:00
|
|
|
|
2009-05-25 14:48:43 +02:00
|
|
|
def isExpired: Boolean
|
2010-10-31 19:27:55 +01:00
|
|
|
|
2009-05-25 14:48:43 +02:00
|
|
|
def timeoutInNanos: Long
|
2010-10-31 19:27:55 +01:00
|
|
|
|
2011-02-11 14:46:39 -07:00
|
|
|
def value: Option[Either[Throwable, T]]
|
|
|
|
|
|
|
|
|
|
def result: Option[T] = value flatMap (_.right.toOption)
|
2010-10-31 19:27:55 +01:00
|
|
|
|
2011-02-12 11:01:08 -07:00
|
|
|
def awaitResult: Option[Either[Throwable, T]]
|
|
|
|
|
|
2011-02-07 18:59:49 +01:00
|
|
|
/**
|
|
|
|
|
* Returns the result of the Future if one is available within the specified time,
|
|
|
|
|
* if the time left on the future is less than the specified time, the time left on the future will be used instead
|
|
|
|
|
* of the specified time.
|
2011-02-11 14:46:39 -07:00
|
|
|
* returns None if no result, Some(Right(t)) if a result, and Some(Left(error)) if there was an exception
|
2011-02-07 18:59:49 +01:00
|
|
|
*/
|
2011-02-11 14:46:39 -07:00
|
|
|
def resultWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]]
|
2011-02-07 18:59:49 +01:00
|
|
|
|
2011-02-11 14:46:39 -07:00
|
|
|
def exception: Option[Throwable] = value flatMap (_.left.toOption)
|
2010-10-31 19:27:55 +01:00
|
|
|
|
2010-11-12 12:11:53 +01:00
|
|
|
def onComplete(func: Future[T] => Unit): Future[T]
|
|
|
|
|
|
2010-11-12 14:04:06 +01:00
|
|
|
/**
|
2011-02-07 18:59:49 +01:00
|
|
|
* Returns the current result, throws the exception is one has been raised, else returns None
|
2010-11-12 14:04:06 +01:00
|
|
|
*/
|
2011-02-11 14:46:39 -07:00
|
|
|
def resultOrException: Option[T] = value map (_.fold(t => throw t, identity))
|
2010-11-12 14:04:06 +01:00
|
|
|
|
2010-11-12 12:11:53 +01:00
|
|
|
/* Java API */
|
2011-02-08 16:14:40 +01:00
|
|
|
def onComplete(proc: Procedure[Future[T]]): Future[T] = onComplete(proc(_))
|
2010-11-12 12:11:53 +01:00
|
|
|
|
2009-05-25 14:48:43 +02:00
|
|
|
}
|
|
|
|
|
|
2010-04-23 20:46:58 +02:00
|
|
|
trait CompletableFuture[T] extends Future[T] {
|
2011-02-12 09:01:15 -07:00
|
|
|
def complete(value: Either[Throwable, T]): CompletableFuture[T]
|
|
|
|
|
def completeWithResult(result: T): CompletableFuture[T] = complete(Right(result))
|
|
|
|
|
def completeWithException(exception: Throwable): CompletableFuture[T] = complete(Left(exception))
|
2011-02-08 16:14:40 +01:00
|
|
|
def completeWith(other: Future[T]): CompletableFuture[T] = {
|
2011-02-11 14:46:39 -07:00
|
|
|
val value = other.value
|
|
|
|
|
if (value.isDefined)
|
2011-02-12 09:01:15 -07:00
|
|
|
complete(value.get)
|
2011-02-11 14:46:39 -07:00
|
|
|
else
|
|
|
|
|
this
|
2010-11-12 12:52:08 +01:00
|
|
|
}
|
2009-05-25 14:48:43 +02:00
|
|
|
}
|
|
|
|
|
|
2010-03-01 22:03:17 +01:00
|
|
|
// Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
|
2010-04-23 20:46:58 +02:00
|
|
|
class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] {
|
2010-10-26 16:40:09 +02:00
|
|
|
import TimeUnit.{MILLISECONDS => TIME_UNIT}
|
2010-10-31 19:27:55 +01:00
|
|
|
|
2009-06-05 22:08:53 +02:00
|
|
|
def this() = this(0)
|
|
|
|
|
|
|
|
|
|
val timeoutInNanos = TIME_UNIT.toNanos(timeout)
|
2009-05-25 14:48:43 +02:00
|
|
|
private val _startTimeInNanos = currentTimeInNanos
|
|
|
|
|
private val _lock = new ReentrantLock
|
|
|
|
|
private val _signal = _lock.newCondition
|
2011-02-11 14:46:39 -07:00
|
|
|
private var _value: Option[Either[Throwable, T]] = None
|
2010-11-12 12:11:53 +01:00
|
|
|
private var _listeners: List[Future[T] => Unit] = Nil
|
2009-06-21 14:08:43 +02:00
|
|
|
|
2011-02-12 09:16:29 -07:00
|
|
|
@scala.annotation.tailrec
|
|
|
|
|
private def awaitUnsafe(wait: Long): Boolean = {
|
|
|
|
|
if (!_value.isDefined && wait > 0) {
|
2011-02-07 18:59:49 +01:00
|
|
|
val start = currentTimeInNanos
|
2011-02-12 09:16:29 -07:00
|
|
|
awaitUnsafe(try {
|
|
|
|
|
_signal.awaitNanos(wait)
|
2011-02-07 18:59:49 +01:00
|
|
|
} catch {
|
|
|
|
|
case e: InterruptedException =>
|
2011-02-12 09:16:29 -07:00
|
|
|
wait - (currentTimeInNanos - start)
|
|
|
|
|
})
|
|
|
|
|
} else {
|
|
|
|
|
_value.isDefined
|
2011-02-07 18:59:49 +01:00
|
|
|
}
|
2011-02-12 09:16:29 -07:00
|
|
|
}
|
|
|
|
|
|
2011-02-12 11:01:08 -07:00
|
|
|
def awaitResult: Option[Either[Throwable, T]] = try {
|
|
|
|
|
_lock.lock
|
|
|
|
|
awaitUnsafe(timeoutInNanos - (currentTimeInNanos - _startTimeInNanos))
|
|
|
|
|
_value
|
|
|
|
|
} finally {
|
|
|
|
|
_lock.unlock
|
|
|
|
|
}
|
|
|
|
|
|
2011-02-12 09:16:29 -07:00
|
|
|
def resultWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] = try {
|
|
|
|
|
_lock.lock
|
|
|
|
|
awaitUnsafe(unit.toNanos(time).min(timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)))
|
2011-02-11 14:46:39 -07:00
|
|
|
_value
|
2011-02-07 18:59:49 +01:00
|
|
|
} finally {
|
|
|
|
|
_lock.unlock
|
|
|
|
|
}
|
|
|
|
|
|
2009-06-29 15:01:20 +02:00
|
|
|
def await = try {
|
2009-05-25 14:48:43 +02:00
|
|
|
_lock.lock
|
2011-02-12 09:16:29 -07:00
|
|
|
if (awaitUnsafe(timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)))
|
|
|
|
|
this
|
|
|
|
|
else
|
2011-02-11 15:12:35 -07:00
|
|
|
throw new FutureTimeoutException("Futures timed out after [" + timeout + "] milliseconds")
|
2009-05-25 14:48:43 +02:00
|
|
|
} finally {
|
|
|
|
|
_lock.unlock
|
|
|
|
|
}
|
|
|
|
|
|
2009-06-29 15:01:20 +02:00
|
|
|
def awaitBlocking = try {
|
2009-06-05 22:08:53 +02:00
|
|
|
_lock.lock
|
2011-02-11 14:46:39 -07:00
|
|
|
while (!_value.isDefined) {
|
2009-06-05 22:08:53 +02:00
|
|
|
_signal.await
|
|
|
|
|
}
|
2010-04-25 18:07:27 +02:00
|
|
|
this
|
2009-06-05 22:08:53 +02:00
|
|
|
} finally {
|
|
|
|
|
_lock.unlock
|
|
|
|
|
}
|
|
|
|
|
|
2009-06-25 13:07:58 +02:00
|
|
|
def isExpired: Boolean = try {
|
2009-05-25 14:48:43 +02:00
|
|
|
_lock.lock
|
2009-06-05 22:08:53 +02:00
|
|
|
timeoutInNanos - (currentTimeInNanos - _startTimeInNanos) <= 0
|
2009-05-25 14:48:43 +02:00
|
|
|
} finally {
|
|
|
|
|
_lock.unlock
|
|
|
|
|
}
|
|
|
|
|
|
2011-02-11 14:46:39 -07:00
|
|
|
def value: Option[Either[Throwable, T]] = try {
|
2009-05-25 14:48:43 +02:00
|
|
|
_lock.lock
|
2011-02-11 14:46:39 -07:00
|
|
|
_value
|
2009-05-25 14:48:43 +02:00
|
|
|
} finally {
|
|
|
|
|
_lock.unlock
|
|
|
|
|
}
|
|
|
|
|
|
2011-02-12 09:01:15 -07:00
|
|
|
def complete(value: Either[Throwable, T]): DefaultCompletableFuture[T] = {
|
2011-01-24 12:14:50 +01:00
|
|
|
val notifyTheseListeners = try {
|
2010-11-12 12:11:53 +01:00
|
|
|
_lock.lock
|
2011-02-11 14:46:39 -07:00
|
|
|
if (!_value.isDefined) {
|
|
|
|
|
_value = Some(value)
|
2011-01-24 12:14:50 +01:00
|
|
|
val all = _listeners
|
|
|
|
|
_listeners = Nil
|
|
|
|
|
all
|
|
|
|
|
} else Nil
|
2010-11-12 12:11:53 +01:00
|
|
|
} finally {
|
|
|
|
|
_signal.signalAll
|
|
|
|
|
_lock.unlock
|
2009-05-25 14:48:43 +02:00
|
|
|
}
|
2010-11-12 12:11:53 +01:00
|
|
|
|
2011-01-24 12:14:50 +01:00
|
|
|
if (notifyTheseListeners.nonEmpty)
|
|
|
|
|
notifyTheseListeners foreach notify
|
2011-02-08 16:14:40 +01:00
|
|
|
|
|
|
|
|
this
|
2010-11-12 12:11:53 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def onComplete(func: Future[T] => Unit): CompletableFuture[T] = {
|
|
|
|
|
val notifyNow = try {
|
|
|
|
|
_lock.lock
|
2011-02-11 14:46:39 -07:00
|
|
|
if (!_value.isDefined) {
|
2010-11-12 12:11:53 +01:00
|
|
|
_listeners ::= func
|
|
|
|
|
false
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
true
|
|
|
|
|
} finally {
|
|
|
|
|
_lock.unlock
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (notifyNow)
|
2011-01-24 12:14:50 +01:00
|
|
|
notify(func)
|
2010-11-12 12:11:53 +01:00
|
|
|
|
|
|
|
|
this
|
2009-05-25 14:48:43 +02:00
|
|
|
}
|
|
|
|
|
|
2011-01-24 12:14:50 +01:00
|
|
|
private def notify(func: Future[T] => Unit) {
|
2010-11-12 12:11:53 +01:00
|
|
|
func(this)
|
|
|
|
|
}
|
2010-10-31 19:27:55 +01:00
|
|
|
|
|
|
|
|
private def currentTimeInNanos: Long = TIME_UNIT.toNanos(System.currentTimeMillis)
|
2009-05-25 14:48:43 +02:00
|
|
|
}
|