Send tasks back to the Dispatcher if Future.await is called. Fixes #1313
* Future.redispatchTasks() is a public method that can be manually called if a deadlock might occur due to queued tasks being executed synchronously.
This commit is contained in:
parent
fef4075b8a
commit
885fdfe2a8
2 changed files with 45 additions and 1 deletions
|
|
@ -831,6 +831,23 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll {
|
|||
|
||||
f4.await must be('completed)
|
||||
}
|
||||
|
||||
"should not deadlock with nested await (ticket 1313)" in {
|
||||
|
||||
val simple = Future() map (_ ⇒ (Future() map (_ ⇒ ())).get)
|
||||
simple.await must be('completed)
|
||||
|
||||
val latch = new StandardLatch
|
||||
val complex = Future() map { _ ⇒
|
||||
val nested = Future()
|
||||
nested.await
|
||||
nested foreach (_ ⇒ latch.open)
|
||||
Future.redispatchTasks
|
||||
latch.await
|
||||
}
|
||||
complex.await must be('completed)
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -359,6 +359,31 @@ object Future {
|
|||
|
||||
// TODO make variant of flow(timeout)(body) which does NOT break type inference
|
||||
|
||||
/**
|
||||
* Send queued tasks back to the dispatcher to be executed. This is needed if the current
|
||||
* task may block while waiting for something to happen in a queued task.
|
||||
*
|
||||
* Example:
|
||||
* <pre>
|
||||
* val latch = new StandardLatch
|
||||
* val future = Future() map { _ ⇒
|
||||
* val nested = Future()
|
||||
* nested.await
|
||||
* nested foreach (_ ⇒ latch.open)
|
||||
* Future.redispatchTasks
|
||||
* latch.await
|
||||
* }
|
||||
* </pre>
|
||||
*/
|
||||
def redispatchTasks()(implicit dispatcher: MessageDispatcher): Unit =
|
||||
_taskStack.get match {
|
||||
case Some(taskStack) if taskStack.nonEmpty ⇒
|
||||
val tasks = taskStack.elems
|
||||
taskStack.clear()
|
||||
dispatchTask(() ⇒ _taskStack.get.get.elems = tasks, true)
|
||||
case _ ⇒ // nothing to do
|
||||
}
|
||||
|
||||
private val _taskStack = new ThreadLocal[Option[Stack[() ⇒ Unit]]]() {
|
||||
override def initialValue = None
|
||||
}
|
||||
|
|
@ -857,7 +882,9 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
|
|||
}
|
||||
}
|
||||
|
||||
def await(atMost: Duration): this.type = {
|
||||
def await(atMost: Duration): this.type = if (value.isDefined) this else {
|
||||
Future.redispatchTasks()
|
||||
|
||||
val waitNanos =
|
||||
if (timeout.duration.isFinite && atMost.isFinite)
|
||||
atMost.toNanos min timeLeft()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue