Renaming resultWithin to valueWithin, awaitResult to awaitValue to aling the naming, and then deprecating the blocking methods in Futures
This commit is contained in:
parent
f63024bc38
commit
331f3e6469
2 changed files with 51 additions and 31 deletions
|
|
@ -19,51 +19,42 @@ class FutureTimeoutException(message: String) extends AkkaException(message)
|
|||
|
||||
object Futures {
|
||||
|
||||
/**
|
||||
* Java API, equivalent to Future.apply
|
||||
*/
|
||||
def future[T](body: Callable[T]): Future[T] =
|
||||
Future(body.call)
|
||||
|
||||
/**
|
||||
* Java API, equivalent to Future.apply
|
||||
*/
|
||||
def future[T](body: Callable[T], timeout: Long): Future[T] =
|
||||
Future(body.call, timeout)
|
||||
|
||||
/**
|
||||
* Java API, equivalent to Future.apply
|
||||
*/
|
||||
def future[T](body: Callable[T], dispatcher: MessageDispatcher): Future[T] =
|
||||
Future(body.call)(dispatcher)
|
||||
|
||||
/**
|
||||
* Java API, equivalent to Future.apply
|
||||
*/
|
||||
def future[T](body: Callable[T], timeout: Long, dispatcher: MessageDispatcher): Future[T] =
|
||||
Future(body.call, timeout)(dispatcher)
|
||||
|
||||
/**
|
||||
* (Blocking!)
|
||||
*/
|
||||
def awaitAll(futures: List[Future[_]]): Unit = futures.foreach(_.await)
|
||||
|
||||
/**
|
||||
* Returns the First Future that is completed (blocking!)
|
||||
*/
|
||||
def awaitOne(futures: List[Future[_]], timeout: Long = Long.MaxValue): Future[_] = firstCompletedOf(futures, timeout).await
|
||||
|
||||
/**
|
||||
* Returns a Future to the result of the first future in the list that is completed
|
||||
*/
|
||||
def firstCompletedOf(futures: Iterable[Future[_]], timeout: Long = Long.MaxValue): Future[_] = {
|
||||
val futureResult = new DefaultCompletableFuture[Any](timeout)
|
||||
def firstCompletedOf[T](futures: Iterable[Future[T]], timeout: Long = Long.MaxValue): Future[T] = {
|
||||
val futureResult = new DefaultCompletableFuture[T](timeout)
|
||||
|
||||
val completeFirst: Future[_] => Unit = f => futureResult.completeWith(f.asInstanceOf[Future[Any]])
|
||||
val completeFirst: Future[T] => Unit = _.value.foreach(futureResult complete _)
|
||||
for(f <- futures) f onComplete completeFirst
|
||||
|
||||
futureResult
|
||||
}
|
||||
|
||||
/**
|
||||
* Applies the supplied function to the specified collection of Futures after awaiting each future to be completed
|
||||
*/
|
||||
def awaitMap[A,B](in: Traversable[Future[A]])(fun: (Future[A]) => B): Traversable[B] =
|
||||
in map { f => fun(f.await) }
|
||||
|
||||
/**
|
||||
* Returns Future.resultOrException of the first completed of the 2 Futures provided (blocking!)
|
||||
*/
|
||||
def awaitEither[T](f1: Future[T], f2: Future[T]): Option[T] = awaitOne(List(f1,f2)).asInstanceOf[Future[T]].resultOrException
|
||||
|
||||
/**
|
||||
* A non-blocking fold over the specified futures.
|
||||
* The fold is performed on the thread where the last future is completed,
|
||||
|
|
@ -139,6 +130,35 @@ object Futures {
|
|||
val fb = fn(a.asInstanceOf[A])
|
||||
for (r <- fr; b <-fb) yield (r += b)
|
||||
}.map(_.result)
|
||||
|
||||
//Deprecations
|
||||
|
||||
|
||||
/**
|
||||
* (Blocking!)
|
||||
*/
|
||||
@deprecated("Will be removed after 1.1, if you must block, use: futures.foreach(_.await)")
|
||||
def awaitAll(futures: List[Future[_]]): Unit = futures.foreach(_.await)
|
||||
|
||||
/**
|
||||
* Returns the First Future that is completed (blocking!)
|
||||
*/
|
||||
@deprecated("Will be removed after 1.1, if you must block, use: firstCompletedOf(futures).await")
|
||||
def awaitOne(futures: List[Future[_]], timeout: Long = Long.MaxValue): Future[_] = firstCompletedOf[Any](futures, timeout).await
|
||||
|
||||
|
||||
/**
|
||||
* Applies the supplied function to the specified collection of Futures after awaiting each future to be completed
|
||||
*/
|
||||
@deprecated("Will be removed after 1.1, if you must block, use: futures map { f => fun(f.await) }")
|
||||
def awaitMap[A,B](in: Traversable[Future[A]])(fun: (Future[A]) => B): Traversable[B] =
|
||||
in map { f => fun(f.await) }
|
||||
|
||||
/**
|
||||
* Returns Future.resultOrException of the first completed of the 2 Futures provided (blocking!)
|
||||
*/
|
||||
@deprecated("Will be removed after 1.1, if you must block, use: firstCompletedOf(List(f1,f2)).await.resultOrException")
|
||||
def awaitEither[T](f1: Future[T], f2: Future[T]): Option[T] = firstCompletedOf[T](List(f1,f2)).await.resultOrException
|
||||
}
|
||||
|
||||
object Future {
|
||||
|
|
@ -206,7 +226,7 @@ sealed trait Future[+T] {
|
|||
*
|
||||
* Equivalent to calling future.await.value.
|
||||
*/
|
||||
def awaitResult: Option[Either[Throwable, T]]
|
||||
def awaitValue: Option[Either[Throwable, T]]
|
||||
|
||||
/**
|
||||
* Returns the result of the Future if one is available within the specified
|
||||
|
|
@ -215,7 +235,7 @@ sealed trait Future[+T] {
|
|||
* returns None if no result, Some(Right(t)) if a result, or
|
||||
* Some(Left(error)) if there was an exception
|
||||
*/
|
||||
def resultWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]]
|
||||
def valueWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]]
|
||||
|
||||
/**
|
||||
* Returns the contained exception of this Future if it exists.
|
||||
|
|
@ -431,7 +451,7 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com
|
|||
}
|
||||
}
|
||||
|
||||
def awaitResult: Option[Either[Throwable, T]] = {
|
||||
def awaitValue: Option[Either[Throwable, T]] = {
|
||||
_lock.lock
|
||||
try {
|
||||
awaitUnsafe(timeoutInNanos - (currentTimeInNanos - _startTimeInNanos))
|
||||
|
|
@ -441,7 +461,7 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com
|
|||
}
|
||||
}
|
||||
|
||||
def resultWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] = {
|
||||
def valueWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] = {
|
||||
_lock.lock
|
||||
try {
|
||||
awaitUnsafe(unit.toNanos(time).min(timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)))
|
||||
|
|
@ -536,8 +556,8 @@ sealed class AlreadyCompletedFuture[T](suppliedValue: Either[Throwable, T]) exte
|
|||
|
||||
def complete(value: Either[Throwable, T]): CompletableFuture[T] = this
|
||||
def onComplete(func: Future[T] => Unit): Future[T] = { func(this); this }
|
||||
def awaitResult: Option[Either[Throwable, T]] = value
|
||||
def resultWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] = value
|
||||
def awaitValue: Option[Either[Throwable, T]] = value
|
||||
def valueWithin(time: Long, unit: TimeUnit): Option[Either[Throwable, T]] = value
|
||||
def await : Future[T] = this
|
||||
def awaitBlocking : Future[T] = this
|
||||
def isExpired: Boolean = true
|
||||
|
|
|
|||
|
|
@ -284,7 +284,7 @@ class FutureSpec extends JUnitSuite {
|
|||
}
|
||||
|
||||
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) => actor.!! 5000 else 0 )) }
|
||||
val result = for(f <- futures) yield f.resultWithin(2, TimeUnit.SECONDS)
|
||||
val result = for(f <- futures) yield f.valueWithin(2, TimeUnit.SECONDS)
|
||||
val done = result collect { case Some(Right(x)) => x }
|
||||
val undone = result collect { case None => None }
|
||||
val errors = result collect { case Some(Left(t)) => t }
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue