Merge branch '661-derekjw'

This commit is contained in:
Derek Williams 2011-02-13 20:01:40 -07:00
commit 7437209ff4
2 changed files with 81 additions and 143 deletions

View file

@ -11,6 +11,7 @@ import akka.routing.Dispatcher
import java.util.concurrent.locks.ReentrantLock
import akka.japi.Procedure
import java.util.concurrent. {ConcurrentLinkedQueue, TimeUnit}
import java.util.concurrent.TimeUnit.{NANOSECONDS => NANOS, MILLISECONDS => MILLIS}
import akka.actor.Actor
import annotation.tailrec
import java.util.concurrent.atomic. {AtomicBoolean, AtomicInteger}
@ -136,127 +137,124 @@ sealed trait Future[T] {
def awaitBlocking : Future[T]
def isCompleted: Boolean
final def isCompleted: Boolean = value.isDefined
def isExpired: Boolean
def timeoutInNanos: Long
def result: Option[T]
def value: Option[Either[Throwable, T]]
final def result: Option[T] = {
val v = value
if (v.isDefined) v.get.right.toOption
else None
}
def awaitResult: Option[Either[Throwable, T]]
/**
* Returns the result of the Future if one is available within the specified time,
* if the time left on the future is less than the specified time, the time left on the future will be used instead
* of the specified time.
* returns None if no result, Some(Left(t)) if a result, and Some(Right(error)) if there was an exception
* returns None if no result, Some(Right(t)) if a result, and Some(Left(error)) if there was an exception
*/
def resultWithin(time: Long, unit: TimeUnit): Option[Either[T,Throwable]]
def resultWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]]
def exception: Option[Throwable]
final def exception: Option[Throwable] = {
val v = value
if (v.isDefined) v.get.left.toOption
else None
}
def onComplete(func: Future[T] => Unit): Future[T]
/**
* Returns the current result, throws the exception is one has been raised, else returns None
*/
def resultOrException: Option[T] = resultWithin(0, TimeUnit.MILLISECONDS) match {
case None => None
case Some(Left(t)) => Some(t)
case Some(Right(t)) => throw t
final def resultOrException: Option[T] = {
val v = value
if (v.isDefined) {
val r = v.get
if (r.isLeft) throw r.left.get
else r.right.toOption
} else None
}
/* Java API */
def onComplete(proc: Procedure[Future[T]]): Future[T] = onComplete(proc(_))
final def onComplete(proc: Procedure[Future[T]]): Future[T] = onComplete(proc(_))
def map[O](f: (T) => O): Future[O] = {
val wrapped = this
new Future[O] {
def await = { wrapped.await; this }
def awaitBlocking = { wrapped.awaitBlocking; this }
def isCompleted = wrapped.isCompleted
def isExpired = wrapped.isExpired
def timeoutInNanos = wrapped.timeoutInNanos
def result: Option[O] = { wrapped.result map f }
def exception: Option[Throwable] = wrapped.exception
def resultWithin(time: Long, unit: TimeUnit): Option[Either[O,Throwable]] = wrapped.resultWithin(time, unit) match {
case None => None
case Some(Left(t)) => Some(Left(f(t)))
case Some(Right(t)) => Some(Right(t))
}
def onComplete(func: Future[O] => Unit): Future[O] = { wrapped.onComplete(_ => func(this)); this }
}
}
}
trait CompletableFuture[T] extends Future[T] {
def completeWithResult(result: T): CompletableFuture[T]
def completeWithException(exception: Throwable): CompletableFuture[T]
def completeWith(other: Future[T]): CompletableFuture[T] = {
val result = other.result
val exception = other.exception
if (result.isDefined) completeWithResult(result.get)
else if (exception.isDefined) completeWithException(exception.get)
//else TODO how to handle this case?
this
def complete(value: Either[Throwable, T]): CompletableFuture[T]
final def completeWithResult(result: T): CompletableFuture[T] = complete(Right(result))
final def completeWithException(exception: Throwable): CompletableFuture[T] = complete(Left(exception))
final def completeWith(other: Future[T]): CompletableFuture[T] = {
val v = other.value
if (v.isDefined) complete(v.get)
else this
}
}
// Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] {
import TimeUnit.{MILLISECONDS => TIME_UNIT}
class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends CompletableFuture[T] {
def this() = this(0)
def this() = this(0, MILLIS)
val timeoutInNanos = TIME_UNIT.toNanos(timeout)
def this(timeout: Long) = this(timeout, MILLIS)
val timeoutInNanos = timeunit.toNanos(timeout)
private val _startTimeInNanos = currentTimeInNanos
private val _lock = new ReentrantLock
private val _signal = _lock.newCondition
private var _completed: Boolean = _
private var _result: Option[T] = None
private var _exception: Option[Throwable] = None
private var _value: Option[Either[Throwable, T]] = None
private var _listeners: List[Future[T] => Unit] = Nil
def resultWithin(time: Long, unit: TimeUnit): Option[Either[T,Throwable]] = try {
_lock.lock
var wait = unit.toNanos(time).min(timeoutInNanos - (currentTimeInNanos - _startTimeInNanos))
while (!_completed && wait > 0) {
@tailrec
private def awaitUnsafe(wait: Long): Boolean = {
if (_value.isEmpty && wait > 0) {
val start = currentTimeInNanos
try {
wait = _signal.awaitNanos(wait)
awaitUnsafe(try {
_signal.awaitNanos(wait)
} catch {
case e: InterruptedException =>
wait = wait - (currentTimeInNanos - start)
}
wait - (currentTimeInNanos - start)
})
} else {
_value.isDefined
}
if(_completed) {
if (_result.isDefined) Some(Left(_result.get))
else Some(Right(_exception.get))
} else None
}
def awaitResult: Option[Either[Throwable, T]] = try {
_lock.lock
awaitUnsafe(timeoutInNanos - (currentTimeInNanos - _startTimeInNanos))
_value
} finally {
_lock.unlock
}
def resultWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] = try {
_lock.lock
awaitUnsafe(unit.toNanos(time).min(timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)))
_value
} finally {
_lock.unlock
}
def await = try {
_lock.lock
var wait = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)
while (!_completed && wait > 0) {
val start = currentTimeInNanos
try {
wait = _signal.awaitNanos(wait)
if (wait <= 0) throw new FutureTimeoutException("Futures timed out after [" + timeout + "] milliseconds")
} catch {
case e: InterruptedException =>
wait = wait - (currentTimeInNanos - start)
}
}
this
if (awaitUnsafe(timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)))
this
else
throw new FutureTimeoutException("Futures timed out after [" + NANOS.toMillis(timeoutInNanos) + "] milliseconds")
} finally {
_lock.unlock
}
def awaitBlocking = try {
_lock.lock
while (!_completed) {
while (_value.isEmpty) {
_signal.await
}
this
@ -264,61 +262,20 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] {
_lock.unlock
}
def isCompleted: Boolean = try {
def isExpired: Boolean = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos) <= 0
def value: Option[Either[Throwable, T]] = try {
_lock.lock
_completed
_value
} finally {
_lock.unlock
}
def isExpired: Boolean = try {
_lock.lock
timeoutInNanos - (currentTimeInNanos - _startTimeInNanos) <= 0
} finally {
_lock.unlock
}
def result: Option[T] = try {
_lock.lock
_result
} finally {
_lock.unlock
}
def exception: Option[Throwable] = try {
_lock.lock
_exception
} finally {
_lock.unlock
}
def completeWithResult(result: T): DefaultCompletableFuture[T] = {
def complete(value: Either[Throwable, T]): DefaultCompletableFuture[T] = {
val notifyTheseListeners = try {
_lock.lock
if (!_completed) {
_completed = true
_result = Some(result)
val all = _listeners
_listeners = Nil
all
} else Nil
} finally {
_signal.signalAll
_lock.unlock
}
if (notifyTheseListeners.nonEmpty)
notifyTheseListeners foreach notify
this
}
def completeWithException(exception: Throwable): DefaultCompletableFuture[T] = {
val notifyTheseListeners = try {
_lock.lock
if (!_completed) {
_completed = true
_exception = Some(exception)
if (_value.isEmpty) {
_value = Some(value)
val all = _listeners
_listeners = Nil
all
@ -335,21 +292,16 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] {
}
def onComplete(func: Future[T] => Unit): CompletableFuture[T] = {
val notifyNow = try {
if (try {
_lock.lock
if (!_completed) {
if (_value.isEmpty) {
_listeners ::= func
false
}
else
true
else true
} finally {
_lock.unlock
}
if (notifyNow)
notify(func)
}) notify(func)
this
}
@ -357,5 +309,5 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] {
func(this)
}
private def currentTimeInNanos: Long = TIME_UNIT.toNanos(System.currentTimeMillis)
private def currentTimeInNanos: Long = MILLIS.toNanos(System.currentTimeMillis)
}

View file

@ -117,20 +117,6 @@ class FutureSpec extends JUnitSuite {
actor2.stop
}
@Test def shouldFutureMapBeDeferred {
val latch = new StandardLatch
val actor1 = actorOf(new TestDelayActor(latch)).start
val mappedFuture = (actor1.!!![String]("Hello")).map(x => 5)
assert(mappedFuture.isCompleted === false)
assert(mappedFuture.isExpired === false)
latch.open
mappedFuture.await
assert(mappedFuture.isCompleted === true)
assert(mappedFuture.isExpired === false)
assert(mappedFuture.result === Some(5))
}
@Test def shouldFuturesAwaitMapHandleEmptySequence {
assert(Futures.awaitMap[Nothing,Unit](Nil)(x => ()) === Nil)
}
@ -211,9 +197,9 @@ class FutureSpec extends JUnitSuite {
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) => actor.!!![Int]((idx, if(idx >= 5) 5000 else 0 )) }
val result = for(f <- futures) yield f.resultWithin(2, TimeUnit.SECONDS)
val done = result collect { case Some(Left(x)) => x }
val done = result collect { case Some(Right(x)) => x }
val undone = result collect { case None => None }
val errors = result collect { case Some(Right(t)) => t }
val errors = result collect { case Some(Left(t)) => t }
assert(done.size === 5)
assert(undone.size === 5)
assert(errors.size === 0)