diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index e002ff330d..cf760dbc35 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -44,7 +44,7 @@ object Futures { def awaitAll(futures: List[Future[_]]): Unit = futures.foreach(_.await) /** - * Returns the First Future that is completed (blocking!) + * Returns the First Future that is completed (blocking!) */ def awaitOne(futures: List[Future[_]], timeout: Long = Long.MaxValue): Future[_] = firstCompletedOf(futures, timeout).await @@ -141,21 +141,25 @@ sealed trait Future[T] { def result: Option[T] + /** + * Returns the result of the Future if one is available within the specified time, + * if the time left on the future is less than the specified time, the time left on the future will be used instead + * of the specified time. + * returns None if no result, Some(Left(t)) if a result, and Some(Right(error)) if there was an exception + */ + def resultWithin(time: Long, unit: TimeUnit): Option[Either[T,Throwable]] + def exception: Option[Throwable] def onComplete(func: Future[T] => Unit): Future[T] /** - * Returns the current result, throws the exception is one has been raised, else returns None + * Returns the current result, throws the exception is one has been raised, else returns None */ - def resultOrException: Option[T] = { - val r = result - if (r.isDefined) result - else { - val problem = exception - if (problem.isDefined) throw problem.get - else None - } + def resultOrException: Option[T] = resultWithin(0, TimeUnit.MILLISECONDS) match { + case None => None + case Some(Left(t)) => Some(t) + case Some(Right(t)) => throw t } /* Java API */ @@ -171,6 +175,11 @@ sealed trait Future[T] { def timeoutInNanos = wrapped.timeoutInNanos def result: Option[O] = { wrapped.result map f } def exception: Option[Throwable] = wrapped.exception + def resultWithin(time: Long, unit: TimeUnit): Option[Either[O,Throwable]] = wrapped.resultWithin(time, unit) match { + case None => None + case Some(Left(t)) => Some(Left(f(t))) + case Some(Right(t)) => Some(Right(t)) + } def onComplete(func: Future[O] => Unit): Future[O] = { wrapped.onComplete(_ => func(this)); this } } } @@ -203,11 +212,31 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { private var _exception: Option[Throwable] = None private var _listeners: List[Future[T] => Unit] = Nil + def resultWithin(time: Long, unit: TimeUnit): Option[Either[T,Throwable]] = try { + _lock.lock + var wait = unit.toNanos(time).min(timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)) + while (!_completed && wait > 0) { + val start = currentTimeInNanos + try { + wait = _signal.awaitNanos(wait) + } catch { + case e: InterruptedException => + wait = wait - (currentTimeInNanos - start) + } + } + if(_completed) { + if (_result.isDefined) Some(Left(_result.get)) + else Some(Right(_exception.get)) + } else None + } finally { + _lock.unlock + } + def await = try { _lock.lock var wait = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos) while (!_completed && wait > 0) { - var start = currentTimeInNanos + val start = currentTimeInNanos try { wait = _signal.awaitNanos(wait) if (wait <= 0) throw new FutureTimeoutException("Futures timed out after [" + timeout + "] milliseconds") diff --git a/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala index 9ccdb6aba2..f7906106af 100644 --- a/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala @@ -4,8 +4,8 @@ import org.scalatest.junit.JUnitSuite import org.junit.Test import Actor._ import org.multiverse.api.latches.StandardLatch -import java.util.concurrent.CountDownLatch import akka.dispatch. {Future, Futures} +import java.util.concurrent. {TimeUnit, CountDownLatch} object FutureSpec { class TestActor extends Actor { @@ -201,4 +201,21 @@ class FutureSpec extends JUnitSuite { @Test(expected = classOf[UnsupportedOperationException]) def shouldReduceThrowIAEOnEmptyInput { Futures.reduce(List[Future[Int]]())(_ + _).await.resultOrException } + + @Test def resultWithinShouldNotThrowExceptions { + val actors = (1 to 10).toList map { _ => + actorOf(new Actor { + def receive = { case (add: Int, wait: Int) => Thread.sleep(wait); self reply_? add } + }).start + } + + def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) => actor.!!![Int]((idx, if(idx >= 5) 5000 else 0 )) } + val result = for(f <- futures) yield f.resultWithin(2, TimeUnit.SECONDS) + val done = result collect { case Some(Left(x)) => x } + val undone = result collect { case None => None } + val errors = result collect { case Some(Right(t)) => t } + assert(done.size === 5) + assert(undone.size === 5) + assert(errors.size === 0) + } }