From f6e142a58351b820406c806769225db128819c65 Mon Sep 17 00:00:00 2001 From: Derek Williams Date: Thu, 28 Apr 2011 20:53:45 -0600 Subject: [PATCH 1/4] prevent chain of callbacks from overflowing the stack --- .../src/main/scala/akka/dispatch/Future.scala | 31 +++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 72cab081a2..c6d270324a 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -16,6 +16,7 @@ import java.util.concurrent.TimeUnit.{NANOSECONDS => NANOS, MILLISECONDS => MILL import java.util.concurrent.atomic. {AtomicBoolean} import java.lang.{Iterable => JIterable} import java.util.{LinkedList => JLinkedList} +import scala.collection.mutable.Stack import annotation.tailrec class FutureTimeoutException(message: String) extends AkkaException(message) @@ -271,6 +272,10 @@ object Future { val fb = fn(a.asInstanceOf[A]) for (r <- fr; b <-fb) yield (r += b) }.map(_.result) + + private[akka] val callbacks = new ThreadLocal[Option[Stack[() => Unit]]]() { + override def initialValue = None + } } sealed trait Future[+T] { @@ -672,8 +677,30 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com _lock.unlock } - if (notifyTheseListeners.nonEmpty) - notifyTheseListeners.reverse foreach notify + @tailrec + def addToCallbacks(rest: List[Future[T] => Unit], callbacks: Stack[() => Unit]) { + if (rest.nonEmpty) { + callbacks.push(() => notify(rest.head)) + addToCallbacks(rest.tail, callbacks) + } + } + + if (notifyTheseListeners.nonEmpty) { + val optCallbacks = Future.callbacks.get + if (optCallbacks.isDefined) addToCallbacks(notifyTheseListeners, optCallbacks.get) + else { + try { + val callbacks = Stack[() => Unit]() + Future.callbacks.set(Some(callbacks)) + addToCallbacks(notifyTheseListeners, callbacks) + while (callbacks.nonEmpty) { + callbacks.pop().apply + } + } finally { + Future.callbacks.set(None) + } + } + } this } From ae481fc39a14423bd7a74138e0f150761c2fff61 Mon Sep 17 00:00:00 2001 From: Derek Williams Date: Thu, 28 Apr 2011 21:14:18 -0600 Subject: [PATCH 2/4] Avoid unneeded allocations --- .../src/main/scala/akka/dispatch/Future.scala | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index c6d270324a..a925978b24 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -685,6 +685,14 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com } } + def runCallbacks(rest: List[Future[T] => Unit], callbacks: Stack[() => Unit]) { + if (rest.nonEmpty) { + notify(rest.head) + while (callbacks.nonEmpty) { callbacks.pop().apply } + runCallbacks(rest.tail, callbacks) + } + } + if (notifyTheseListeners.nonEmpty) { val optCallbacks = Future.callbacks.get if (optCallbacks.isDefined) addToCallbacks(notifyTheseListeners, optCallbacks.get) @@ -692,10 +700,7 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com try { val callbacks = Stack[() => Unit]() Future.callbacks.set(Some(callbacks)) - addToCallbacks(notifyTheseListeners, callbacks) - while (callbacks.nonEmpty) { - callbacks.pop().apply - } + runCallbacks(notifyTheseListeners, callbacks) } finally { Future.callbacks.set(None) } From 2bfa5e5fc2c800b28d19b786bead5015af7d8948 Mon Sep 17 00:00:00 2001 From: Derek Williams Date: Thu, 28 Apr 2011 21:24:58 -0600 Subject: [PATCH 3/4] Add @tailrec check --- akka-actor/src/main/scala/akka/dispatch/Future.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index a925978b24..d745f8ec4a 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -685,6 +685,7 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com } } + @tailrec def runCallbacks(rest: List[Future[T] => Unit], callbacks: Stack[() => Unit]) { if (rest.nonEmpty) { notify(rest.head) From 1fb228c06d5fdb67c22d59455357821f5a2290fb Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 29 Apr 2011 13:22:39 +0200 Subject: [PATCH 4/4] Reducing object creation overhead --- .../src/main/scala/akka/dispatch/Future.scala | 56 ++++++++----------- 1 file changed, 24 insertions(+), 32 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index d745f8ec4a..0f326410c2 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -18,6 +18,7 @@ import java.lang.{Iterable => JIterable} import java.util.{LinkedList => JLinkedList} import scala.collection.mutable.Stack import annotation.tailrec +import util.DynamicVariable class FutureTimeoutException(message: String) extends AkkaException(message) @@ -273,9 +274,7 @@ object Future { for (r <- fr; b <-fb) yield (r += b) }.map(_.result) - private[akka] val callbacks = new ThreadLocal[Option[Stack[() => Unit]]]() { - override def initialValue = None - } + private[akka] val callbacksPendingExecution = new DynamicVariable[Option[Stack[() => Unit]]](None) } sealed trait Future[+T] { @@ -677,35 +676,28 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com _lock.unlock } - @tailrec - def addToCallbacks(rest: List[Future[T] => Unit], callbacks: Stack[() => Unit]) { - if (rest.nonEmpty) { - callbacks.push(() => notify(rest.head)) - addToCallbacks(rest.tail, callbacks) - } - } - - @tailrec - def runCallbacks(rest: List[Future[T] => Unit], callbacks: Stack[() => Unit]) { - if (rest.nonEmpty) { - notify(rest.head) - while (callbacks.nonEmpty) { callbacks.pop().apply } - runCallbacks(rest.tail, callbacks) - } - } - - if (notifyTheseListeners.nonEmpty) { - val optCallbacks = Future.callbacks.get - if (optCallbacks.isDefined) addToCallbacks(notifyTheseListeners, optCallbacks.get) - else { - try { - val callbacks = Stack[() => Unit]() - Future.callbacks.set(Some(callbacks)) - runCallbacks(notifyTheseListeners, callbacks) - } finally { - Future.callbacks.set(None) + if (notifyTheseListeners.nonEmpty) { // Steps to ensure we don't run into a stack-overflow situation + @tailrec def runCallbacks(rest: List[Future[T] => Unit], callbacks: Stack[() => Unit]) { + if (rest.nonEmpty) { + notifyCompleted(rest.head) + while (callbacks.nonEmpty) { callbacks.pop().apply() } + runCallbacks(rest.tail, callbacks) } } + + val pending = Future.callbacksPendingExecution.value + if (pending.isDefined) { //Instead of nesting the calls to the callbacks (leading to stack overflow) + pending.get.push(() => { // Linearize/aggregate callbacks at top level and then execute + val doNotify = notifyCompleted _ //Hoist closure to avoid garbage + notifyTheseListeners foreach doNotify + }) + } else { + try { + val callbacks = Stack[() => Unit]() // Allocate new aggregator for pending callbacks + Future.callbacksPendingExecution.value = Some(callbacks) // Specify the callback aggregator + runCallbacks(notifyTheseListeners, callbacks) // Execute callbacks, if they trigger new callbacks, they are aggregated + } finally { Future.callbacksPendingExecution.value = None } // Ensure cleanup + } } this @@ -724,12 +716,12 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com _lock.unlock } - if (notifyNow) notify(func) + if (notifyNow) notifyCompleted(func) this } - private def notify(func: Future[T] => Unit) { + private def notifyCompleted(func: Future[T] => Unit) { try { func(this) } catch {