From 331f3e64691f2a6d7f389a72e0a520c3534d71da Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 22 Mar 2011 17:20:35 +0100 Subject: [PATCH] Renaming resultWithin to valueWithin, awaitResult to awaitValue to aling the naming, and then deprecating the blocking methods in Futures --- .../src/main/scala/akka/dispatch/Future.scala | 80 ++++++++++++------- .../test/scala/akka/dispatch/FutureSpec.scala | 2 +- 2 files changed, 51 insertions(+), 31 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index f30a06d4ef..0b833f47b0 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -19,51 +19,42 @@ class FutureTimeoutException(message: String) extends AkkaException(message) object Futures { + /** + * Java API, equivalent to Future.apply + */ def future[T](body: Callable[T]): Future[T] = Future(body.call) + /** + * Java API, equivalent to Future.apply + */ def future[T](body: Callable[T], timeout: Long): Future[T] = Future(body.call, timeout) + /** + * Java API, equivalent to Future.apply + */ def future[T](body: Callable[T], dispatcher: MessageDispatcher): Future[T] = Future(body.call)(dispatcher) + /** + * Java API, equivalent to Future.apply + */ def future[T](body: Callable[T], timeout: Long, dispatcher: MessageDispatcher): Future[T] = Future(body.call, timeout)(dispatcher) - /** - * (Blocking!) - */ - def awaitAll(futures: List[Future[_]]): Unit = futures.foreach(_.await) - - /** - * Returns the First Future that is completed (blocking!) - */ - def awaitOne(futures: List[Future[_]], timeout: Long = Long.MaxValue): Future[_] = firstCompletedOf(futures, timeout).await - /** * Returns a Future to the result of the first future in the list that is completed */ - def firstCompletedOf(futures: Iterable[Future[_]], timeout: Long = Long.MaxValue): Future[_] = { - val futureResult = new DefaultCompletableFuture[Any](timeout) + def firstCompletedOf[T](futures: Iterable[Future[T]], timeout: Long = Long.MaxValue): Future[T] = { + val futureResult = new DefaultCompletableFuture[T](timeout) - val completeFirst: Future[_] => Unit = f => futureResult.completeWith(f.asInstanceOf[Future[Any]]) + val completeFirst: Future[T] => Unit = _.value.foreach(futureResult complete _) for(f <- futures) f onComplete completeFirst futureResult } - /** - * Applies the supplied function to the specified collection of Futures after awaiting each future to be completed - */ - def awaitMap[A,B](in: Traversable[Future[A]])(fun: (Future[A]) => B): Traversable[B] = - in map { f => fun(f.await) } - - /** - * Returns Future.resultOrException of the first completed of the 2 Futures provided (blocking!) - */ - def awaitEither[T](f1: Future[T], f2: Future[T]): Option[T] = awaitOne(List(f1,f2)).asInstanceOf[Future[T]].resultOrException - /** * A non-blocking fold over the specified futures. * The fold is performed on the thread where the last future is completed, @@ -139,6 +130,35 @@ object Futures { val fb = fn(a.asInstanceOf[A]) for (r <- fr; b <-fb) yield (r += b) }.map(_.result) + + //Deprecations + + + /** + * (Blocking!) + */ + @deprecated("Will be removed after 1.1, if you must block, use: futures.foreach(_.await)") + def awaitAll(futures: List[Future[_]]): Unit = futures.foreach(_.await) + + /** + * Returns the First Future that is completed (blocking!) + */ + @deprecated("Will be removed after 1.1, if you must block, use: firstCompletedOf(futures).await") + def awaitOne(futures: List[Future[_]], timeout: Long = Long.MaxValue): Future[_] = firstCompletedOf[Any](futures, timeout).await + + + /** + * Applies the supplied function to the specified collection of Futures after awaiting each future to be completed + */ + @deprecated("Will be removed after 1.1, if you must block, use: futures map { f => fun(f.await) }") + def awaitMap[A,B](in: Traversable[Future[A]])(fun: (Future[A]) => B): Traversable[B] = + in map { f => fun(f.await) } + + /** + * Returns Future.resultOrException of the first completed of the 2 Futures provided (blocking!) + */ + @deprecated("Will be removed after 1.1, if you must block, use: firstCompletedOf(List(f1,f2)).await.resultOrException") + def awaitEither[T](f1: Future[T], f2: Future[T]): Option[T] = firstCompletedOf[T](List(f1,f2)).await.resultOrException } object Future { @@ -206,7 +226,7 @@ sealed trait Future[+T] { * * Equivalent to calling future.await.value. */ - def awaitResult: Option[Either[Throwable, T]] + def awaitValue: Option[Either[Throwable, T]] /** * Returns the result of the Future if one is available within the specified @@ -215,7 +235,7 @@ sealed trait Future[+T] { * returns None if no result, Some(Right(t)) if a result, or * Some(Left(error)) if there was an exception */ - def resultWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] + def valueWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] /** * Returns the contained exception of this Future if it exists. @@ -431,7 +451,7 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com } } - def awaitResult: Option[Either[Throwable, T]] = { + def awaitValue: Option[Either[Throwable, T]] = { _lock.lock try { awaitUnsafe(timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)) @@ -441,7 +461,7 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com } } - def resultWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] = { + def valueWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] = { _lock.lock try { awaitUnsafe(unit.toNanos(time).min(timeoutInNanos - (currentTimeInNanos - _startTimeInNanos))) @@ -536,8 +556,8 @@ sealed class AlreadyCompletedFuture[T](suppliedValue: Either[Throwable, T]) exte def complete(value: Either[Throwable, T]): CompletableFuture[T] = this def onComplete(func: Future[T] => Unit): Future[T] = { func(this); this } - def awaitResult: Option[Either[Throwable, T]] = value - def resultWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] = value + def awaitValue: Option[Either[Throwable, T]] = value + def valueWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] = value def await : Future[T] = this def awaitBlocking : Future[T] = this def isExpired: Boolean = true diff --git a/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala index dfd94b40b5..9e47ecdbe2 100644 --- a/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala @@ -284,7 +284,7 @@ class FutureSpec extends JUnitSuite { } def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) => actor.!!![Int]((idx, if(idx >= 5) 5000 else 0 )) } - val result = for(f <- futures) yield f.resultWithin(2, TimeUnit.SECONDS) + val result = for(f <- futures) yield f.valueWithin(2, TimeUnit.SECONDS) val done = result collect { case Some(Right(x)) => x } val undone = result collect { case None => None } val errors = result collect { case Some(Left(t)) => t }