Merge pull request #319 from jboner/wip-futureblocking-√
Removing the erronous execution context and added Java API
This commit is contained in:
commit
eb02d461c7
4 changed files with 35 additions and 6 deletions
|
|
@ -860,7 +860,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
|
||||||
|
|
||||||
val l1, l2 = new TestLatch
|
val l1, l2 = new TestLatch
|
||||||
val complex = Future() map { _ ⇒
|
val complex = Future() map { _ ⇒
|
||||||
Future.blocking(system.dispatcher)
|
Future.blocking()
|
||||||
val nested = Future(())
|
val nested = Future(())
|
||||||
nested foreach (_ ⇒ l1.open())
|
nested foreach (_ ⇒ l1.open())
|
||||||
Await.ready(l1, TestLatch.DefaultTimeout) // make sure nested is completed
|
Await.ready(l1, TestLatch.DefaultTimeout) // make sure nested is completed
|
||||||
|
|
|
||||||
|
|
@ -151,6 +151,26 @@ object Futures {
|
||||||
for (r ← fr; b ← fb) yield { r add b; r }
|
for (r ← fr; b ← fb) yield { r add b; r }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Signals that the current thread of execution will potentially engage
|
||||||
|
* in blocking calls after the call to this method, giving the system a
|
||||||
|
* chance to spawn new threads, reuse old threads or otherwise, to prevent
|
||||||
|
* starvation and/or unfairness.
|
||||||
|
*
|
||||||
|
* Assures that any Future tasks initiated in the current thread will be
|
||||||
|
* executed asynchronously, including any tasks currently queued to be
|
||||||
|
* executed in the current thread. This is needed if the current task may
|
||||||
|
* block, causing delays in executing the remaining tasks which in some
|
||||||
|
* cases may cause a deadlock.
|
||||||
|
*
|
||||||
|
* Usage: Call this method in a callback (map, flatMap etc also count) to a Future,
|
||||||
|
* if you will be doing blocking in the callback.
|
||||||
|
*
|
||||||
|
* Note: Calling 'Await.result(future)' or 'Await.ready(future)' will automatically trigger this method.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
def blocking(): Unit = Future.blocking()
|
||||||
}
|
}
|
||||||
|
|
||||||
object Future {
|
object Future {
|
||||||
|
|
@ -317,17 +337,22 @@ object Future {
|
||||||
* }
|
* }
|
||||||
* </pre>
|
* </pre>
|
||||||
*/
|
*/
|
||||||
def blocking(implicit executor: ExecutionContext): Unit =
|
def blocking(): Unit =
|
||||||
_taskStack.get match {
|
_taskStack.get match {
|
||||||
case stack if (stack ne null) && stack.nonEmpty ⇒
|
case stack if (stack ne null) && stack.nonEmpty ⇒
|
||||||
|
val executionContext = _executionContext.get match {
|
||||||
|
case null ⇒ throw new IllegalStateException("'blocking' needs to be invoked inside a Future callback.")
|
||||||
|
case some ⇒ some
|
||||||
|
}
|
||||||
val tasks = stack.elems
|
val tasks = stack.elems
|
||||||
stack.clear()
|
stack.clear()
|
||||||
_taskStack.remove()
|
_taskStack.remove()
|
||||||
dispatchTask(() ⇒ _taskStack.get.elems = tasks, true)
|
dispatchTask(() ⇒ _taskStack.get.elems = tasks, true)(executionContext)
|
||||||
case _ ⇒ _taskStack.remove()
|
case _ ⇒ _taskStack.remove()
|
||||||
}
|
}
|
||||||
|
|
||||||
private val _taskStack = new ThreadLocal[Stack[() ⇒ Unit]]()
|
private val _taskStack = new ThreadLocal[Stack[() ⇒ Unit]]()
|
||||||
|
private val _executionContext = new ThreadLocal[ExecutionContext]()
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Internal API, do not call
|
* Internal API, do not call
|
||||||
|
|
@ -339,7 +364,7 @@ object Future {
|
||||||
new Runnable {
|
new Runnable {
|
||||||
def run =
|
def run =
|
||||||
try {
|
try {
|
||||||
|
_executionContext set executor
|
||||||
val taskStack = Stack.empty[() ⇒ Unit]
|
val taskStack = Stack.empty[() ⇒ Unit]
|
||||||
taskStack push task
|
taskStack push task
|
||||||
_taskStack set taskStack
|
_taskStack set taskStack
|
||||||
|
|
@ -352,7 +377,10 @@ object Future {
|
||||||
case NonFatal(e) ⇒ executor.reportFailure(e)
|
case NonFatal(e) ⇒ executor.reportFailure(e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally { _taskStack.remove() }
|
} finally {
|
||||||
|
_executionContext.remove()
|
||||||
|
_taskStack.remove()
|
||||||
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -381,6 +381,7 @@ public class FutureDocTestBase {
|
||||||
@Test public void useOnSuccessOnFailureAndOnComplete() {
|
@Test public void useOnSuccessOnFailureAndOnComplete() {
|
||||||
{
|
{
|
||||||
Future<String> future = Futures.successful("foo", system.dispatcher());
|
Future<String> future = Futures.successful("foo", system.dispatcher());
|
||||||
|
|
||||||
//#onSuccess
|
//#onSuccess
|
||||||
future.onSuccess(new OnSuccess<String>() {
|
future.onSuccess(new OnSuccess<String>() {
|
||||||
public void onSuccess(String result) {
|
public void onSuccess(String result) {
|
||||||
|
|
|
||||||
|
|
@ -40,7 +40,7 @@ class NettySettings(config: Config, val systemName: String) {
|
||||||
case value ⇒ value
|
case value ⇒ value
|
||||||
}
|
}
|
||||||
|
|
||||||
@deprecated("WARNING: This should only be used by professionals.")
|
@deprecated("WARNING: This should only be used by professionals.", "2.0")
|
||||||
val PortSelector = getInt("port")
|
val PortSelector = getInt("port")
|
||||||
|
|
||||||
val ConnectionTimeout = Duration(getMilliseconds("connection-timeout"), MILLISECONDS)
|
val ConnectionTimeout = Duration(getMilliseconds("connection-timeout"), MILLISECONDS)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue