Merge branch 'future-stackoverflow'

This commit is contained in:
Viktor Klang 2011-04-29 14:49:30 +02:00
commit e4283828d8

View file

@ -16,7 +16,9 @@ import java.util.concurrent.TimeUnit.{NANOSECONDS => NANOS, MILLISECONDS => MILL
import java.util.concurrent.atomic. {AtomicBoolean}
import java.lang.{Iterable => JIterable}
import java.util.{LinkedList => JLinkedList}
import scala.collection.mutable.Stack
import annotation.tailrec
import util.DynamicVariable
class FutureTimeoutException(message: String) extends AkkaException(message)
@ -271,6 +273,8 @@ object Future {
val fb = fn(a.asInstanceOf[A])
for (r <- fr; b <-fb) yield (r += b)
}.map(_.result)
private[akka] val callbacksPendingExecution = new DynamicVariable[Option[Stack[() => Unit]]](None)
}
sealed trait Future[+T] {
@ -672,8 +676,29 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com
_lock.unlock
}
if (notifyTheseListeners.nonEmpty)
notifyTheseListeners.reverse foreach notify
if (notifyTheseListeners.nonEmpty) { // Steps to ensure we don't run into a stack-overflow situation
@tailrec def runCallbacks(rest: List[Future[T] => Unit], callbacks: Stack[() => Unit]) {
if (rest.nonEmpty) {
notifyCompleted(rest.head)
while (callbacks.nonEmpty) { callbacks.pop().apply() }
runCallbacks(rest.tail, callbacks)
}
}
val pending = Future.callbacksPendingExecution.value
if (pending.isDefined) { //Instead of nesting the calls to the callbacks (leading to stack overflow)
pending.get.push(() => { // Linearize/aggregate callbacks at top level and then execute
val doNotify = notifyCompleted _ //Hoist closure to avoid garbage
notifyTheseListeners foreach doNotify
})
} else {
try {
val callbacks = Stack[() => Unit]() // Allocate new aggregator for pending callbacks
Future.callbacksPendingExecution.value = Some(callbacks) // Specify the callback aggregator
runCallbacks(notifyTheseListeners, callbacks) // Execute callbacks, if they trigger new callbacks, they are aggregated
} finally { Future.callbacksPendingExecution.value = None } // Ensure cleanup
}
}
this
}
@ -691,12 +716,12 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com
_lock.unlock
}
if (notifyNow) notify(func)
if (notifyNow) notifyCompleted(func)
this
}
private def notify(func: Future[T] => Unit) {
private def notifyCompleted(func: Future[T] => Unit) {
try {
func(this)
} catch {