diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 72cab081a2..0f326410c2 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -16,7 +16,9 @@ import java.util.concurrent.TimeUnit.{NANOSECONDS => NANOS, MILLISECONDS => MILL import java.util.concurrent.atomic. {AtomicBoolean} import java.lang.{Iterable => JIterable} import java.util.{LinkedList => JLinkedList} +import scala.collection.mutable.Stack import annotation.tailrec +import util.DynamicVariable class FutureTimeoutException(message: String) extends AkkaException(message) @@ -271,6 +273,8 @@ object Future { val fb = fn(a.asInstanceOf[A]) for (r <- fr; b <-fb) yield (r += b) }.map(_.result) + + private[akka] val callbacksPendingExecution = new DynamicVariable[Option[Stack[() => Unit]]](None) } sealed trait Future[+T] { @@ -672,8 +676,29 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com _lock.unlock } - if (notifyTheseListeners.nonEmpty) - notifyTheseListeners.reverse foreach notify + if (notifyTheseListeners.nonEmpty) { // Steps to ensure we don't run into a stack-overflow situation + @tailrec def runCallbacks(rest: List[Future[T] => Unit], callbacks: Stack[() => Unit]) { + if (rest.nonEmpty) { + notifyCompleted(rest.head) + while (callbacks.nonEmpty) { callbacks.pop().apply() } + runCallbacks(rest.tail, callbacks) + } + } + + val pending = Future.callbacksPendingExecution.value + if (pending.isDefined) { //Instead of nesting the calls to the callbacks (leading to stack overflow) + pending.get.push(() => { // Linearize/aggregate callbacks at top level and then execute + val doNotify = notifyCompleted _ //Hoist closure to avoid garbage + notifyTheseListeners foreach doNotify + }) + } else { + try { + val callbacks = Stack[() => Unit]() // Allocate new aggregator for pending callbacks + Future.callbacksPendingExecution.value = Some(callbacks) // Specify the callback aggregator + runCallbacks(notifyTheseListeners, callbacks) // Execute callbacks, if they trigger new callbacks, they are aggregated + } finally { Future.callbacksPendingExecution.value = None } // Ensure cleanup + } + } this } @@ -691,12 +716,12 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com _lock.unlock } - if (notifyNow) notify(func) + if (notifyNow) notifyCompleted(func) this } - private def notify(func: Future[T] => Unit) { + private def notifyCompleted(func: Future[T] => Unit) { try { func(this) } catch {