diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 4b985e0881..feb79d3c5c 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -75,7 +75,7 @@ object Futures { * the result will be the first failure of any of the futures, or any failure in the actual fold, * or the result of the fold. */ - def fold[R,T](zero: R, timeout: Long = Actor.TIMEOUT)(futures: Iterable[Future[T]])(foldFun: (R, T) => R): Future[R] = { + def fold[T,R](zero: R, timeout: Long = Actor.TIMEOUT)(futures: Iterable[Future[T]])(foldFun: (R, T) => R): Future[R] = { val result = new DefaultCompletableFuture[R](timeout) val results = new ConcurrentLinkedQueue[T]() val waitingFor = new AtomicInteger(futures.size) @@ -104,13 +104,16 @@ object Futures { /** * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first */ - def reduce[T](futures: Iterable[Future[T]], timeout: Long = Actor.TIMEOUT)(op: (T,T) => T): Future[T] = { - val result = new DefaultCompletableFuture[T](timeout) + def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Long = Actor.TIMEOUT)(op: (R,T) => T): Future[R] = { + if (futures.isEmpty) + throw new UnsupportedOperationException("empty reduce left") + + val result = new DefaultCompletableFuture[R](timeout) val seedFound = new AtomicBoolean(false) val seedFold = (f: Future[T]) => { if (seedFound.compareAndSet(false, true)){ //Only the first completed should trigger the fold if (f.exception.isDefined) result completeWithException f.exception.get //If the seed is a failure, we're done here - else (fold[T,T](f.result.get, timeout)(futures.filterNot(_ eq f))(op)).onComplete(result.completeWith(_)) //Fold using the seed + else (fold[T,R](f.result.get, timeout)(futures.filterNot(_ eq f))(op)).onComplete(result.completeWith(_)) //Fold using the seed } () //Returns Unit }