Removing the erronous execution context and added Java API

This commit is contained in:
Viktor Klang 2012-02-10 08:20:36 +01:00
parent 51a218b87f
commit d910eeae69
4 changed files with 35 additions and 6 deletions

View file

@ -860,7 +860,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
val l1, l2 = new TestLatch
val complex = Future() map { _
Future.blocking(system.dispatcher)
Future.blocking()
val nested = Future(())
nested foreach (_ l1.open())
Await.ready(l1, TestLatch.DefaultTimeout) // make sure nested is completed

View file

@ -151,6 +151,26 @@ object Futures {
for (r fr; b fb) yield { r add b; r }
}
}
/**
* Signals that the current thread of execution will potentially engage
* in blocking calls after the call to this method, giving the system a
* chance to spawn new threads, reuse old threads or otherwise, to prevent
* starvation and/or unfairness.
*
* Assures that any Future tasks initiated in the current thread will be
* executed asynchronously, including any tasks currently queued to be
* executed in the current thread. This is needed if the current task may
* block, causing delays in executing the remaining tasks which in some
* cases may cause a deadlock.
*
* Usage: Call this method in a callback (map, flatMap etc also count) to a Future,
* if you will be doing blocking in the callback.
*
* Note: Calling 'Await.result(future)' or 'Await.ready(future)' will automatically trigger this method.
*
*/
def blocking(): Unit = Future.blocking()
}
object Future {
@ -317,17 +337,22 @@ object Future {
* }
* </pre>
*/
def blocking(implicit executor: ExecutionContext): Unit =
def blocking(): Unit =
_taskStack.get match {
case stack if (stack ne null) && stack.nonEmpty
val executionContext = _executionContext.get match {
case null throw new IllegalStateException("'blocking' needs to be invoked inside a Future callback.")
case some some
}
val tasks = stack.elems
stack.clear()
_taskStack.remove()
dispatchTask(() _taskStack.get.elems = tasks, true)
dispatchTask(() _taskStack.get.elems = tasks, true)(executionContext)
case _ _taskStack.remove()
}
private val _taskStack = new ThreadLocal[Stack[() Unit]]()
private val _executionContext = new ThreadLocal[ExecutionContext]()
/**
* Internal API, do not call
@ -339,7 +364,7 @@ object Future {
new Runnable {
def run =
try {
_executionContext set executor
val taskStack = Stack.empty[() Unit]
taskStack push task
_taskStack set taskStack
@ -352,7 +377,10 @@ object Future {
case NonFatal(e) executor.reportFailure(e)
}
}
} finally { _taskStack.remove() }
} finally {
_executionContext.remove()
_taskStack.remove()
}
})
}

View file

@ -381,6 +381,7 @@ public class FutureDocTestBase {
@Test public void useOnSuccessOnFailureAndOnComplete() {
{
Future<String> future = Futures.successful("foo", system.dispatcher());
//#onSuccess
future.onSuccess(new OnSuccess<String>() {
public void onSuccess(String result) {

View file

@ -40,7 +40,7 @@ class NettySettings(config: Config, val systemName: String) {
case value value
}
@deprecated("WARNING: This should only be used by professionals.")
@deprecated("WARNING: This should only be used by professionals.", "2.0")
val PortSelector = getInt("port")
val ConnectionTimeout = Duration(getMilliseconds("connection-timeout"), MILLISECONDS)