Refining signatures on fold and reduce
This commit is contained in:
parent
ad26903410
commit
35457a4ecc
1 changed files with 7 additions and 4 deletions
|
|
@ -75,7 +75,7 @@ object Futures {
|
|||
* the result will be the first failure of any of the futures, or any failure in the actual fold,
|
||||
* or the result of the fold.
|
||||
*/
|
||||
def fold[R,T](zero: R, timeout: Long = Actor.TIMEOUT)(futures: Iterable[Future[T]])(foldFun: (R, T) => R): Future[R] = {
|
||||
def fold[T,R](zero: R, timeout: Long = Actor.TIMEOUT)(futures: Iterable[Future[T]])(foldFun: (R, T) => R): Future[R] = {
|
||||
val result = new DefaultCompletableFuture[R](timeout)
|
||||
val results = new ConcurrentLinkedQueue[T]()
|
||||
val waitingFor = new AtomicInteger(futures.size)
|
||||
|
|
@ -104,13 +104,16 @@ object Futures {
|
|||
/**
|
||||
* Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
|
||||
*/
|
||||
def reduce[T](futures: Iterable[Future[T]], timeout: Long = Actor.TIMEOUT)(op: (T,T) => T): Future[T] = {
|
||||
val result = new DefaultCompletableFuture[T](timeout)
|
||||
def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Long = Actor.TIMEOUT)(op: (R,T) => T): Future[R] = {
|
||||
if (futures.isEmpty)
|
||||
throw new UnsupportedOperationException("empty reduce left")
|
||||
|
||||
val result = new DefaultCompletableFuture[R](timeout)
|
||||
val seedFound = new AtomicBoolean(false)
|
||||
val seedFold = (f: Future[T]) => {
|
||||
if (seedFound.compareAndSet(false, true)){ //Only the first completed should trigger the fold
|
||||
if (f.exception.isDefined) result completeWithException f.exception.get //If the seed is a failure, we're done here
|
||||
else (fold[T,T](f.result.get, timeout)(futures.filterNot(_ eq f))(op)).onComplete(result.completeWith(_)) //Fold using the seed
|
||||
else (fold[T,R](f.result.get, timeout)(futures.filterNot(_ eq f))(op)).onComplete(result.completeWith(_)) //Fold using the seed
|
||||
}
|
||||
() //Returns Unit
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue