Fixing ticket #645 adding support for resultWithin on Future

This commit is contained in:
Viktor Klang 2011-02-07 18:59:49 +01:00
parent 4b9621da87
commit bc423fcc76
2 changed files with 58 additions and 12 deletions

View file

@ -44,7 +44,7 @@ object Futures {
def awaitAll(futures: List[Future[_]]): Unit = futures.foreach(_.await)
/**
* Returns the First Future that is completed (blocking!)
* Returns the First Future that is completed (blocking!)
*/
def awaitOne(futures: List[Future[_]], timeout: Long = Long.MaxValue): Future[_] = firstCompletedOf(futures, timeout).await
@ -141,21 +141,25 @@ sealed trait Future[T] {
def result: Option[T]
/**
* Returns the result of the Future if one is available within the specified time,
* if the time left on the future is less than the specified time, the time left on the future will be used instead
* of the specified time.
* returns None if no result, Some(Left(t)) if a result, and Some(Right(error)) if there was an exception
*/
def resultWithin(time: Long, unit: TimeUnit): Option[Either[T,Throwable]]
def exception: Option[Throwable]
def onComplete(func: Future[T] => Unit): Future[T]
/**
* Returns the current result, throws the exception is one has been raised, else returns None
* Returns the current result, throws the exception is one has been raised, else returns None
*/
def resultOrException: Option[T] = {
val r = result
if (r.isDefined) result
else {
val problem = exception
if (problem.isDefined) throw problem.get
else None
}
def resultOrException: Option[T] = resultWithin(0, TimeUnit.MILLISECONDS) match {
case None => None
case Some(Left(t)) => Some(t)
case Some(Right(t)) => throw t
}
/* Java API */
@ -171,6 +175,11 @@ sealed trait Future[T] {
def timeoutInNanos = wrapped.timeoutInNanos
def result: Option[O] = { wrapped.result map f }
def exception: Option[Throwable] = wrapped.exception
def resultWithin(time: Long, unit: TimeUnit): Option[Either[O,Throwable]] = wrapped.resultWithin(time, unit) match {
case None => None
case Some(Left(t)) => Some(Left(f(t)))
case Some(Right(t)) => Some(Right(t))
}
def onComplete(func: Future[O] => Unit): Future[O] = { wrapped.onComplete(_ => func(this)); this }
}
}
@ -203,11 +212,31 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] {
private var _exception: Option[Throwable] = None
private var _listeners: List[Future[T] => Unit] = Nil
def resultWithin(time: Long, unit: TimeUnit): Option[Either[T,Throwable]] = try {
_lock.lock
var wait = unit.toNanos(time).min(timeoutInNanos - (currentTimeInNanos - _startTimeInNanos))
while (!_completed && wait > 0) {
val start = currentTimeInNanos
try {
wait = _signal.awaitNanos(wait)
} catch {
case e: InterruptedException =>
wait = wait - (currentTimeInNanos - start)
}
}
if(_completed) {
if (_result.isDefined) Some(Left(_result.get))
else Some(Right(_exception.get))
} else None
} finally {
_lock.unlock
}
def await = try {
_lock.lock
var wait = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)
while (!_completed && wait > 0) {
var start = currentTimeInNanos
val start = currentTimeInNanos
try {
wait = _signal.awaitNanos(wait)
if (wait <= 0) throw new FutureTimeoutException("Futures timed out after [" + timeout + "] milliseconds")

View file

@ -4,8 +4,8 @@ import org.scalatest.junit.JUnitSuite
import org.junit.Test
import Actor._
import org.multiverse.api.latches.StandardLatch
import java.util.concurrent.CountDownLatch
import akka.dispatch. {Future, Futures}
import java.util.concurrent. {TimeUnit, CountDownLatch}
object FutureSpec {
class TestActor extends Actor {
@ -201,4 +201,21 @@ class FutureSpec extends JUnitSuite {
@Test(expected = classOf[UnsupportedOperationException]) def shouldReduceThrowIAEOnEmptyInput {
Futures.reduce(List[Future[Int]]())(_ + _).await.resultOrException
}
@Test def resultWithinShouldNotThrowExceptions {
val actors = (1 to 10).toList map { _ =>
actorOf(new Actor {
def receive = { case (add: Int, wait: Int) => Thread.sleep(wait); self reply_? add }
}).start
}
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) => actor.!!![Int]((idx, if(idx >= 5) 5000 else 0 )) }
val result = for(f <- futures) yield f.resultWithin(2, TimeUnit.SECONDS)
val done = result collect { case Some(Left(x)) => x }
val undone = result collect { case None => None }
val errors = result collect { case Some(Right(t)) => t }
assert(done.size === 5)
assert(undone.size === 5)
assert(errors.size === 0)
}
}