From 452a97db3fc32f002d3f2f933fa4dff0ede900f5 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 18 Mar 2011 01:21:26 +0100 Subject: [PATCH] Reducing overhead and locking involved in Futures.fold and Futures.reduce --- .../src/main/scala/akka/dispatch/Future.scala | 43 +++++++++++-------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 7b8440fb85..52a51831e8 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -76,24 +76,26 @@ object Futures { } else { val result = new DefaultCompletableFuture[R](timeout) val results = new ConcurrentLinkedQueue[T]() - val waitingFor = new AtomicInteger(futures.size) + val allDone = futures.size val aggregate: Future[T] => Unit = f => if (!result.isCompleted) { //TODO: This is an optimization, is it premature? - if (f.exception.isDefined) - result completeWithException f.exception.get - else { - results add f.result.get - if (waitingFor.decrementAndGet == 0) { //Only one thread can get here - try { - val r = scala.collection.JavaConversions.asScalaIterable(results).foldLeft(zero)(foldFun) - results.clear //Do not retain the values since someone can hold onto the Future for a long time - result completeWithResult r - } catch { - case e: Exception => - EventHandler notify EventHandler.Error(e, this) - result completeWithException e + f.value.get match { + case r: Right[Throwable, T] => + results add r.b + if (results.size == allDone) { //Only one thread can get here + try { + result completeWithResult scala.collection.JavaConversions.asScalaIterable(results).foldLeft(zero)(foldFun) + } catch { + case e: Exception => + EventHandler notify EventHandler.Error(e, this) + result completeWithException e + } finally { + results.clear + } } - } + case l: Left[Throwable, T] => + result completeWithException l.a + results.clear } } @@ -112,11 +114,14 @@ object Futures { val result = new DefaultCompletableFuture[R](timeout) val seedFound = new AtomicBoolean(false) val seedFold: Future[T] => Unit = f => { - if (seedFound.compareAndSet(false, true)){ //Only the first completed should trigger the fold - if (f.exception.isDefined) result completeWithException f.exception.get //If the seed is a failure, we're done here - else (fold[T,R](f.result.get, timeout)(futures.filterNot(_ eq f))(op)).onComplete(result.completeWith(_)) //Fold using the seed + if (seedFound.compareAndSet(false, true)) { //Only the first completed should trigger the fold + f.value.get match { + case r: Right[Throwable, T] => + result.completeWith(fold[T,R](r.b, timeout)(futures.filterNot(_ eq f))(op)) + case l: Left[Throwable, T] => + result.completeWithException(l.a) + } } - () //Without this Unit value, the compiler crashes } for(f <- futures) f onComplete seedFold //Attach the listener to the Futures result