diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index e12294a70d..bc60f7762f 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -277,26 +277,6 @@ class FutureSpec extends JUnitSuite { Futures.reduce(List[Future[Int]]())(_ + _).await.resultOrException } - @Test def resultWithinShouldNotThrowExceptions { - val latch = new StandardLatch - - val actors = (1 to 10).toList map { _ => - actorOf(new Actor { - def receive = { case (add: Int, wait: Boolean, latch: StandardLatch) => if (wait) latch.await; self reply_? add } - }).start() - } - - def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) => actor.!!![Int]((idx, idx >= 5, latch)) } - val result = for(f <- futures) yield f.valueWithin(2, TimeUnit.SECONDS) - latch.open - val done = result collect { case Some(Right(x)) => x } - val undone = result collect { case None => None } - val errors = result collect { case Some(Left(t)) => t } - assert(done.size === 5) - assert(undone.size === 5) - assert(errors.size === 0) - } - @Test def receiveShouldExecuteOnComplete { val latch = new StandardLatch val actor = actorOf[TestActor].start() diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 863d9c1283..a2d5a63697 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -7,13 +7,13 @@ package akka.dispatch import akka.AkkaException import akka.event.EventHandler import akka.actor.{Actor, Channel} -import akka.routing.Dispatcher +import akka.util.Duration import akka.japi.{ Procedure, Function => JFunc } import java.util.concurrent.locks.ReentrantLock import java.util.concurrent. {ConcurrentLinkedQueue, TimeUnit, Callable} import java.util.concurrent.TimeUnit.{NANOSECONDS => NANOS, MILLISECONDS => MILLIS} -import java.util.concurrent.atomic. {AtomicBoolean, AtomicInteger} +import java.util.concurrent.atomic. {AtomicBoolean} import java.lang.{Iterable => JIterable} import java.util.{LinkedList => JLinkedList} import annotation.tailrec @@ -298,11 +298,20 @@ sealed trait Future[+T] { */ def await : Future[T] + /** + * Blocks the current thread until the Future has been completed or the + * timeout has expired. The timeout will be the least value of 'atMost' and the timeout + * supplied at the constructuion of this Future. + * In the case of the timeout expiring a FutureTimeoutException will be thrown. + */ + def await(atMost: Duration) : Future[T] + /** * Blocks the current thread until the Future has been completed. Use * caution with this method as it ignores the timeout and will block * indefinitely if the Future is never completed. */ + @deprecated("Will be removed after 1.1, it's dangerous and can cause deadlocks, agony and insanity.") def awaitBlocking : Future[T] /** @@ -340,24 +349,6 @@ sealed trait Future[+T] { else None } - /** - * Waits for the completion of this Future, then returns the completed value. - * If the Future's timeout expires while waiting a FutureTimeoutException - * will be thrown. - * - * Equivalent to calling future.await.value. - */ - def awaitValue: Option[Either[Throwable, T]] - - /** - * Returns the result of the Future if one is available within the specified - * time, if the time left on the future is less than the specified time, the - * time left on the future will be used instead of the specified time. - * returns None if no result, Some(Right(t)) if a result, or - * Some(Left(error)) if there was an exception - */ - def valueWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] - /** * Returns the contained exception of this Future if it exists. */ @@ -620,39 +611,25 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com * Must be called inside _lock.lock<->_lock.unlock */ @tailrec - private def awaitUnsafe(wait: Long): Boolean = { - if (_value.isEmpty && wait > 0) { + private def awaitUnsafe(waitTimeNanos: Long): Boolean = { + if (_value.isEmpty && waitTimeNanos > 0) { val start = currentTimeInNanos - val remaining = try { - _signal.awaitNanos(wait) + val remainingNanos = try { + _signal.awaitNanos(waitTimeNanos) } catch { case e: InterruptedException => - wait - (currentTimeInNanos - start) + waitTimeNanos - (currentTimeInNanos - start) } - awaitUnsafe(remaining) + awaitUnsafe(remainingNanos) } else { _value.isDefined } } - def awaitValue: Option[Either[Throwable, T]] = { + def await(atMost: Duration) = { _lock.lock - try { - awaitUnsafe(timeLeft()) - _value - } finally { - _lock.unlock - } - } - - def valueWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] = { - _lock.lock - try { - awaitUnsafe(unit toNanos time min timeLeft()) - _value - } finally { - _lock.unlock - } + if (try { awaitUnsafe(atMost.toNanos min timeLeft()) } finally { _lock.unlock }) this + else throw new FutureTimeoutException("Futures timed out after [" + NANOS.toMillis(timeoutInNanos) + "] milliseconds") } def await = { @@ -741,8 +718,7 @@ sealed class AlreadyCompletedFuture[T](suppliedValue: Either[Throwable, T]) exte def complete(value: Either[Throwable, T]): CompletableFuture[T] = this def onComplete(func: Future[T] => Unit): Future[T] = { func(this); this } - def awaitValue: Option[Either[Throwable, T]] = value - def valueWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] = value + def await(atMost: Duration): Future[T] = this def await : Future[T] = this def awaitBlocking : Future[T] = this def isExpired: Boolean = true