Reducing overhead and locking involved in Futures.fold and Futures.reduce

This commit is contained in:
Viktor Klang 2011-03-18 01:21:26 +01:00
parent 8b8999cf32
commit 452a97db3f

View file

@ -76,24 +76,26 @@ object Futures {
} else {
val result = new DefaultCompletableFuture[R](timeout)
val results = new ConcurrentLinkedQueue[T]()
val waitingFor = new AtomicInteger(futures.size)
val allDone = futures.size
val aggregate: Future[T] => Unit = f => if (!result.isCompleted) { //TODO: This is an optimization, is it premature?
if (f.exception.isDefined)
result completeWithException f.exception.get
else {
results add f.result.get
if (waitingFor.decrementAndGet == 0) { //Only one thread can get here
try {
val r = scala.collection.JavaConversions.asScalaIterable(results).foldLeft(zero)(foldFun)
results.clear //Do not retain the values since someone can hold onto the Future for a long time
result completeWithResult r
} catch {
case e: Exception =>
EventHandler notify EventHandler.Error(e, this)
result completeWithException e
f.value.get match {
case r: Right[Throwable, T] =>
results add r.b
if (results.size == allDone) { //Only one thread can get here
try {
result completeWithResult scala.collection.JavaConversions.asScalaIterable(results).foldLeft(zero)(foldFun)
} catch {
case e: Exception =>
EventHandler notify EventHandler.Error(e, this)
result completeWithException e
} finally {
results.clear
}
}
}
case l: Left[Throwable, T] =>
result completeWithException l.a
results.clear
}
}
@ -112,11 +114,14 @@ object Futures {
val result = new DefaultCompletableFuture[R](timeout)
val seedFound = new AtomicBoolean(false)
val seedFold: Future[T] => Unit = f => {
if (seedFound.compareAndSet(false, true)){ //Only the first completed should trigger the fold
if (f.exception.isDefined) result completeWithException f.exception.get //If the seed is a failure, we're done here
else (fold[T,R](f.result.get, timeout)(futures.filterNot(_ eq f))(op)).onComplete(result.completeWith(_)) //Fold using the seed
if (seedFound.compareAndSet(false, true)) { //Only the first completed should trigger the fold
f.value.get match {
case r: Right[Throwable, T] =>
result.completeWith(fold[T,R](r.b, timeout)(futures.filterNot(_ eq f))(op))
case l: Left[Throwable, T] =>
result.completeWithException(l.a)
}
}
() //Without this Unit value, the compiler crashes
}
for(f <- futures) f onComplete seedFold //Attach the listener to the Futures
result