2009-05-25 14:48:43 +02:00
|
|
|
/**
|
2009-12-27 16:01:53 +01:00
|
|
|
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
2009-05-25 14:48:43 +02:00
|
|
|
*/
|
|
|
|
|
|
2009-10-08 19:01:04 +02:00
|
|
|
package se.scalablesolutions.akka.dispatch
|
2009-05-25 14:48:43 +02:00
|
|
|
|
2010-08-19 07:01:09 +02:00
|
|
|
import se.scalablesolutions.akka.AkkaException
|
2010-10-12 13:18:15 +02:00
|
|
|
import se.scalablesolutions.akka.actor.Actor.spawn
|
2010-08-19 07:01:09 +02:00
|
|
|
import java.util.concurrent.locks.ReentrantLock
|
2009-05-25 14:48:43 +02:00
|
|
|
import java.util.concurrent.TimeUnit
|
2010-10-12 13:18:15 +02:00
|
|
|
import se.scalablesolutions.akka.routing.Dispatcher
|
2009-05-25 14:48:43 +02:00
|
|
|
|
2010-08-19 07:01:09 +02:00
|
|
|
class FutureTimeoutException(message: String) extends AkkaException(message)
|
2009-06-10 20:04:33 +02:00
|
|
|
|
2010-02-23 10:05:47 +01:00
|
|
|
object Futures {
|
|
|
|
|
|
2010-03-01 22:03:17 +01:00
|
|
|
/**
|
2010-05-12 07:40:08 +02:00
|
|
|
* Module with utility methods for working with Futures.
|
2010-03-01 22:03:17 +01:00
|
|
|
* <pre>
|
|
|
|
|
* val future = Futures.future(1000) {
|
|
|
|
|
* ... // do stuff
|
|
|
|
|
* }
|
|
|
|
|
* </pre>
|
2010-10-12 13:18:15 +02:00
|
|
|
*/
|
|
|
|
|
def future[T](timeout: Long,
|
|
|
|
|
dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher)
|
|
|
|
|
(body: => T): Future[T] = {
|
|
|
|
|
val f = new DefaultCompletableFuture[T](timeout)
|
|
|
|
|
|
|
|
|
|
spawn({
|
|
|
|
|
try { f completeWithResult body }
|
|
|
|
|
catch { case e => f completeWithException e}
|
|
|
|
|
})(dispatcher)
|
|
|
|
|
|
|
|
|
|
f
|
2010-03-01 22:03:17 +01:00
|
|
|
}
|
|
|
|
|
|
2010-04-23 20:46:58 +02:00
|
|
|
def awaitAll(futures: List[Future[_]]): Unit = futures.foreach(_.await)
|
2010-03-01 22:03:17 +01:00
|
|
|
|
2010-04-23 20:46:58 +02:00
|
|
|
def awaitOne(futures: List[Future[_]]): Future[_] = {
|
|
|
|
|
var future: Option[Future[_]] = None
|
2010-02-23 10:05:47 +01:00
|
|
|
do {
|
|
|
|
|
future = futures.find(_.isCompleted)
|
|
|
|
|
} while (future.isEmpty)
|
|
|
|
|
future.get
|
|
|
|
|
}
|
|
|
|
|
|
2010-10-15 17:24:15 +02:00
|
|
|
/**
|
|
|
|
|
* Applies the supplied function to the specified collection of Futures after awaiting each future to be completed
|
|
|
|
|
*/
|
|
|
|
|
def awaitMap[A,B](in: Traversable[Future[A]])(fun: (Future[A]) => B): Traversable[B] =
|
|
|
|
|
in map { f => fun(f.await) }
|
|
|
|
|
|
2010-02-23 10:05:47 +01:00
|
|
|
/*
|
2010-04-23 20:46:58 +02:00
|
|
|
def awaitEither[T](f1: Future[T], f2: Future[T]): Option[T] = {
|
2010-02-23 10:05:47 +01:00
|
|
|
import Actor.Sender.Self
|
|
|
|
|
import Actor.{spawn, actor}
|
2010-05-21 20:08:49 +02:00
|
|
|
|
2010-04-23 20:46:58 +02:00
|
|
|
case class Result(res: Option[T])
|
|
|
|
|
val handOff = new SynchronousQueue[Option[T]]
|
2010-02-23 10:05:47 +01:00
|
|
|
spawn {
|
|
|
|
|
try {
|
|
|
|
|
println("f1 await")
|
|
|
|
|
f1.await
|
|
|
|
|
println("f1 offer")
|
|
|
|
|
handOff.offer(f1.result)
|
|
|
|
|
} catch {case _ => {}}
|
|
|
|
|
}
|
|
|
|
|
spawn {
|
|
|
|
|
try {
|
|
|
|
|
println("f2 await")
|
|
|
|
|
f2.await
|
|
|
|
|
println("f2 offer")
|
|
|
|
|
println("f2 offer: " + f2.result)
|
|
|
|
|
handOff.offer(f2.result)
|
|
|
|
|
} catch {case _ => {}}
|
|
|
|
|
}
|
|
|
|
|
Thread.sleep(100)
|
|
|
|
|
handOff.take
|
|
|
|
|
}
|
|
|
|
|
*/
|
|
|
|
|
}
|
|
|
|
|
|
2010-04-23 20:46:58 +02:00
|
|
|
sealed trait Future[T] {
|
2010-04-25 18:07:27 +02:00
|
|
|
def await : Future[T]
|
|
|
|
|
def awaitBlocking : Future[T]
|
2009-05-25 14:48:43 +02:00
|
|
|
def isCompleted: Boolean
|
|
|
|
|
def isExpired: Boolean
|
|
|
|
|
def timeoutInNanos: Long
|
2010-04-23 20:46:58 +02:00
|
|
|
def result: Option[T]
|
2010-08-12 09:46:42 +02:00
|
|
|
def exception: Option[Throwable]
|
2010-10-15 17:24:15 +02:00
|
|
|
def map[O](f: (T) => O): Future[O] = {
|
|
|
|
|
val wrapped = this
|
|
|
|
|
new Future[O] {
|
|
|
|
|
def await = { wrapped.await; this }
|
|
|
|
|
def awaitBlocking = { wrapped.awaitBlocking; this }
|
|
|
|
|
def isCompleted = wrapped.isCompleted
|
|
|
|
|
def isExpired = wrapped.isExpired
|
|
|
|
|
def timeoutInNanos = wrapped.timeoutInNanos
|
|
|
|
|
def result: Option[O] = { wrapped.result map f }
|
|
|
|
|
def exception: Option[Throwable] = wrapped.exception
|
|
|
|
|
}
|
|
|
|
|
}
|
2009-05-25 14:48:43 +02:00
|
|
|
}
|
|
|
|
|
|
2010-04-23 20:46:58 +02:00
|
|
|
trait CompletableFuture[T] extends Future[T] {
|
|
|
|
|
def completeWithResult(result: T)
|
2010-08-12 09:46:42 +02:00
|
|
|
def completeWithException(exception: Throwable)
|
2009-05-25 14:48:43 +02:00
|
|
|
}
|
|
|
|
|
|
2010-03-01 22:03:17 +01:00
|
|
|
// Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
|
2010-04-23 20:46:58 +02:00
|
|
|
class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] {
|
2009-06-05 22:08:53 +02:00
|
|
|
private val TIME_UNIT = TimeUnit.MILLISECONDS
|
|
|
|
|
def this() = this(0)
|
|
|
|
|
|
|
|
|
|
val timeoutInNanos = TIME_UNIT.toNanos(timeout)
|
2009-05-25 14:48:43 +02:00
|
|
|
private val _startTimeInNanos = currentTimeInNanos
|
|
|
|
|
private val _lock = new ReentrantLock
|
|
|
|
|
private val _signal = _lock.newCondition
|
|
|
|
|
private var _completed: Boolean = _
|
2010-04-23 20:46:58 +02:00
|
|
|
private var _result: Option[T] = None
|
2010-08-12 09:46:42 +02:00
|
|
|
private var _exception: Option[Throwable] = None
|
2009-06-21 14:08:43 +02:00
|
|
|
|
2009-06-29 15:01:20 +02:00
|
|
|
def await = try {
|
2009-05-25 14:48:43 +02:00
|
|
|
_lock.lock
|
2009-06-05 22:08:53 +02:00
|
|
|
var wait = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)
|
2009-05-25 14:48:43 +02:00
|
|
|
while (!_completed && wait > 0) {
|
|
|
|
|
var start = currentTimeInNanos
|
|
|
|
|
try {
|
|
|
|
|
wait = _signal.awaitNanos(wait)
|
2010-02-23 10:05:47 +01:00
|
|
|
if (wait <= 0) throw new FutureTimeoutException("Futures timed out after [" + timeout + "] milliseconds")
|
2009-05-25 14:48:43 +02:00
|
|
|
} catch {
|
|
|
|
|
case e: InterruptedException =>
|
2009-06-05 22:08:53 +02:00
|
|
|
wait = wait - (currentTimeInNanos - start)
|
2009-05-25 14:48:43 +02:00
|
|
|
}
|
|
|
|
|
}
|
2010-04-25 18:07:27 +02:00
|
|
|
this
|
2009-05-25 14:48:43 +02:00
|
|
|
} finally {
|
|
|
|
|
_lock.unlock
|
|
|
|
|
}
|
|
|
|
|
|
2009-06-29 15:01:20 +02:00
|
|
|
def awaitBlocking = try {
|
2009-06-05 22:08:53 +02:00
|
|
|
_lock.lock
|
|
|
|
|
while (!_completed) {
|
|
|
|
|
_signal.await
|
|
|
|
|
}
|
2010-04-25 18:07:27 +02:00
|
|
|
this
|
2009-06-05 22:08:53 +02:00
|
|
|
} finally {
|
|
|
|
|
_lock.unlock
|
|
|
|
|
}
|
|
|
|
|
|
2009-06-25 13:07:58 +02:00
|
|
|
def isCompleted: Boolean = try {
|
2009-05-25 14:48:43 +02:00
|
|
|
_lock.lock
|
|
|
|
|
_completed
|
|
|
|
|
} finally {
|
|
|
|
|
_lock.unlock
|
|
|
|
|
}
|
|
|
|
|
|
2009-06-25 13:07:58 +02:00
|
|
|
def isExpired: Boolean = try {
|
2009-05-25 14:48:43 +02:00
|
|
|
_lock.lock
|
2009-06-05 22:08:53 +02:00
|
|
|
timeoutInNanos - (currentTimeInNanos - _startTimeInNanos) <= 0
|
2009-05-25 14:48:43 +02:00
|
|
|
} finally {
|
|
|
|
|
_lock.unlock
|
|
|
|
|
}
|
|
|
|
|
|
2010-04-23 20:46:58 +02:00
|
|
|
def result: Option[T] = try {
|
2009-05-25 14:48:43 +02:00
|
|
|
_lock.lock
|
|
|
|
|
_result
|
|
|
|
|
} finally {
|
|
|
|
|
_lock.unlock
|
|
|
|
|
}
|
|
|
|
|
|
2010-08-12 09:46:42 +02:00
|
|
|
def exception: Option[Throwable] = try {
|
2009-05-25 14:48:43 +02:00
|
|
|
_lock.lock
|
|
|
|
|
_exception
|
|
|
|
|
} finally {
|
|
|
|
|
_lock.unlock
|
|
|
|
|
}
|
|
|
|
|
|
2010-04-23 20:46:58 +02:00
|
|
|
def completeWithResult(result: T) = try {
|
2009-05-25 14:48:43 +02:00
|
|
|
_lock.lock
|
|
|
|
|
if (!_completed) {
|
|
|
|
|
_completed = true
|
2009-06-05 22:08:53 +02:00
|
|
|
_result = Some(result)
|
2010-09-16 15:58:46 +02:00
|
|
|
onComplete(result)
|
2009-05-25 14:48:43 +02:00
|
|
|
}
|
|
|
|
|
} finally {
|
|
|
|
|
_signal.signalAll
|
|
|
|
|
_lock.unlock
|
|
|
|
|
}
|
|
|
|
|
|
2010-08-12 09:46:42 +02:00
|
|
|
def completeWithException(exception: Throwable) = try {
|
2009-05-25 14:48:43 +02:00
|
|
|
_lock.lock
|
|
|
|
|
if (!_completed) {
|
|
|
|
|
_completed = true
|
2010-08-12 09:46:42 +02:00
|
|
|
_exception = Some(exception)
|
2010-09-16 15:58:46 +02:00
|
|
|
onCompleteException(exception)
|
2009-05-25 14:48:43 +02:00
|
|
|
}
|
|
|
|
|
} finally {
|
|
|
|
|
_signal.signalAll
|
|
|
|
|
_lock.unlock
|
|
|
|
|
}
|
|
|
|
|
|
2009-06-05 22:08:53 +02:00
|
|
|
private def currentTimeInNanos: Long = TIME_UNIT.toNanos(System.currentTimeMillis)
|
2010-09-16 15:58:46 +02:00
|
|
|
protected def onComplete(result: T) {}
|
|
|
|
|
protected def onCompleteException(exception: Throwable) {}
|
2009-05-25 14:48:43 +02:00
|
|
|
}
|