diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index 417ee1e441..3b6b147ec5 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -831,6 +831,23 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll { f4.await must be('completed) } + + "should not deadlock with nested await (ticket 1313)" in { + + val simple = Future() map (_ ⇒ (Future() map (_ ⇒ ())).get) + simple.await must be('completed) + + val latch = new StandardLatch + val complex = Future() map { _ ⇒ + val nested = Future() + nested.await + nested foreach (_ ⇒ latch.open) + Future.redispatchTasks + latch.await + } + complex.await must be('completed) + + } } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 922aa9cf5c..a41463999b 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -359,6 +359,31 @@ object Future { // TODO make variant of flow(timeout)(body) which does NOT break type inference + /** + * Send queued tasks back to the dispatcher to be executed. This is needed if the current + * task may block while waiting for something to happen in a queued task. + * + * Example: + *
+   * val latch = new StandardLatch
+   * val future = Future() map { _ ⇒
+   *   val nested = Future()
+   *   nested.await
+   *   nested foreach (_ ⇒ latch.open)
+   *   Future.redispatchTasks
+   *   latch.await
+   * }
+   * 
+ */ + def redispatchTasks()(implicit dispatcher: MessageDispatcher): Unit = + _taskStack.get match { + case Some(taskStack) if taskStack.nonEmpty ⇒ + val tasks = taskStack.elems + taskStack.clear() + dispatchTask(() ⇒ _taskStack.get.get.elems = tasks, true) + case _ ⇒ // nothing to do + } + private val _taskStack = new ThreadLocal[Option[Stack[() ⇒ Unit]]]() { override def initialValue = None } @@ -857,7 +882,9 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi } } - def await(atMost: Duration): this.type = { + def await(atMost: Duration): this.type = if (value.isDefined) this else { + Future.redispatchTasks() + val waitNanos = if (timeout.duration.isFinite && atMost.isFinite) atMost.toNanos min timeLeft()