diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index f4f304606c..60a16218c7 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -136,68 +136,48 @@ sealed trait Future[T] { def awaitBlocking : Future[T] - def isCompleted: Boolean + def isCompleted: Boolean = value.isDefined def isExpired: Boolean def timeoutInNanos: Long - def result: Option[T] + def value: Option[Either[Throwable, T]] + + def result: Option[T] = value flatMap (_.right.toOption) /** * Returns the result of the Future if one is available within the specified time, * if the time left on the future is less than the specified time, the time left on the future will be used instead * of the specified time. - * returns None if no result, Some(Left(t)) if a result, and Some(Right(error)) if there was an exception + * returns None if no result, Some(Right(t)) if a result, and Some(Left(error)) if there was an exception */ - def resultWithin(time: Long, unit: TimeUnit): Option[Either[T,Throwable]] + def resultWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] - def exception: Option[Throwable] + def exception: Option[Throwable] = value flatMap (_.left.toOption) def onComplete(func: Future[T] => Unit): Future[T] /** * Returns the current result, throws the exception is one has been raised, else returns None */ - def resultOrException: Option[T] = resultWithin(0, TimeUnit.MILLISECONDS) match { - case None => None - case Some(Left(t)) => Some(t) - case Some(Right(t)) => throw t - } + def resultOrException: Option[T] = value map (_.fold(t => throw t, identity)) /* Java API */ def onComplete(proc: Procedure[Future[T]]): Future[T] = onComplete(proc(_)) - def map[O](f: (T) => O): Future[O] = { - val wrapped = this - new Future[O] { - def await = { wrapped.await; this } - def awaitBlocking = { wrapped.awaitBlocking; this } - def isCompleted = wrapped.isCompleted - def isExpired = wrapped.isExpired - def timeoutInNanos = wrapped.timeoutInNanos - def result: Option[O] = { wrapped.result map f } - def exception: Option[Throwable] = wrapped.exception - def resultWithin(time: Long, unit: TimeUnit): Option[Either[O,Throwable]] = wrapped.resultWithin(time, unit) match { - case None => None - case Some(Left(t)) => Some(Left(f(t))) - case Some(Right(t)) => Some(Right(t)) - } - def onComplete(func: Future[O] => Unit): Future[O] = { wrapped.onComplete(_ => func(this)); this } - } - } } trait CompletableFuture[T] extends Future[T] { - def completeWithResult(result: T): CompletableFuture[T] - def completeWithException(exception: Throwable): CompletableFuture[T] + def completeWithValue(value: Either[Throwable, T]): CompletableFuture[T] + def completeWithResult(result: T): CompletableFuture[T] = completeWithValue(Right(result)) + def completeWithException(exception: Throwable): CompletableFuture[T] = completeWithValue(Left(exception)) def completeWith(other: Future[T]): CompletableFuture[T] = { - val result = other.result - val exception = other.exception - if (result.isDefined) completeWithResult(result.get) - else if (exception.isDefined) completeWithException(exception.get) - //else TODO how to handle this case? - this + val value = other.value + if (value.isDefined) + completeWithValue(value.get) + else + this } } @@ -211,15 +191,13 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { private val _startTimeInNanos = currentTimeInNanos private val _lock = new ReentrantLock private val _signal = _lock.newCondition - private var _completed: Boolean = _ - private var _result: Option[T] = None - private var _exception: Option[Throwable] = None + private var _value: Option[Either[Throwable, T]] = None private var _listeners: List[Future[T] => Unit] = Nil - def resultWithin(time: Long, unit: TimeUnit): Option[Either[T,Throwable]] = try { + def resultWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] = try { _lock.lock var wait = unit.toNanos(time).min(timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)) - while (!_completed && wait > 0) { + while (!_value.isDefined && wait > 0) { val start = currentTimeInNanos try { wait = _signal.awaitNanos(wait) @@ -228,10 +206,7 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { wait = wait - (currentTimeInNanos - start) } } - if(_completed) { - if (_result.isDefined) Some(Left(_result.get)) - else Some(Right(_exception.get)) - } else None + _value } finally { _lock.unlock } @@ -239,7 +214,7 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { def await = try { _lock.lock var wait = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos) - while (!_completed && wait > 0) { + while (!_value.isDefined && wait > 0) { val start = currentTimeInNanos try { wait = _signal.awaitNanos(wait) @@ -256,7 +231,7 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { def awaitBlocking = try { _lock.lock - while (!_completed) { + while (!_value.isDefined) { _signal.await } this @@ -264,13 +239,6 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { _lock.unlock } - def isCompleted: Boolean = try { - _lock.lock - _completed - } finally { - _lock.unlock - } - def isExpired: Boolean = try { _lock.lock timeoutInNanos - (currentTimeInNanos - _startTimeInNanos) <= 0 @@ -278,47 +246,18 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { _lock.unlock } - def result: Option[T] = try { + def value: Option[Either[Throwable, T]] = try { _lock.lock - _result + _value } finally { _lock.unlock } - def exception: Option[Throwable] = try { - _lock.lock - _exception - } finally { - _lock.unlock - } - - def completeWithResult(result: T): DefaultCompletableFuture[T] = { + def completeWithValue(value: Either[Throwable, T]): DefaultCompletableFuture[T] = { val notifyTheseListeners = try { _lock.lock - if (!_completed) { - _completed = true - _result = Some(result) - val all = _listeners - _listeners = Nil - all - } else Nil - } finally { - _signal.signalAll - _lock.unlock - } - - if (notifyTheseListeners.nonEmpty) - notifyTheseListeners foreach notify - - this - } - - def completeWithException(exception: Throwable): DefaultCompletableFuture[T] = { - val notifyTheseListeners = try { - _lock.lock - if (!_completed) { - _completed = true - _exception = Some(exception) + if (!_value.isDefined) { + _value = Some(value) val all = _listeners _listeners = Nil all @@ -337,7 +276,7 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { def onComplete(func: Future[T] => Unit): CompletableFuture[T] = { val notifyNow = try { _lock.lock - if (!_completed) { + if (!_value.isDefined) { _listeners ::= func false } diff --git a/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala index f7906106af..4aa25d3f05 100644 --- a/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala @@ -117,20 +117,6 @@ class FutureSpec extends JUnitSuite { actor2.stop } - @Test def shouldFutureMapBeDeferred { - val latch = new StandardLatch - val actor1 = actorOf(new TestDelayActor(latch)).start - - val mappedFuture = (actor1.!!![String]("Hello")).map(x => 5) - assert(mappedFuture.isCompleted === false) - assert(mappedFuture.isExpired === false) - latch.open - mappedFuture.await - assert(mappedFuture.isCompleted === true) - assert(mappedFuture.isExpired === false) - assert(mappedFuture.result === Some(5)) - } - @Test def shouldFuturesAwaitMapHandleEmptySequence { assert(Futures.awaitMap[Nothing,Unit](Nil)(x => ()) === Nil) } @@ -211,9 +197,9 @@ class FutureSpec extends JUnitSuite { def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) => actor.!!![Int]((idx, if(idx >= 5) 5000 else 0 )) } val result = for(f <- futures) yield f.resultWithin(2, TimeUnit.SECONDS) - val done = result collect { case Some(Left(x)) => x } + val done = result collect { case Some(Right(x)) => x } val undone = result collect { case None => None } - val errors = result collect { case Some(Right(t)) => t } + val errors = result collect { case Some(Left(t)) => t } assert(done.size === 5) assert(undone.size === 5) assert(errors.size === 0)