Only aggregating callbacks for the same EC

This commit is contained in:
Viktor Klang 2012-02-16 17:04:03 +01:00
parent 1cd34fae5b
commit 258d710dab
2 changed files with 41 additions and 14 deletions

View file

@ -82,9 +82,35 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
val future = Promise[String]().complete(Left(new NonLocalReturnControl[String]("test", result)))
behave like futureWithResult(_(future, result))
}
"have different ECs" in {
def namedCtx(n: String) = ExecutionContexts.fromExecutorService(
Executors.newSingleThreadExecutor(new ThreadFactory {
def newThread(r: Runnable) = new Thread(r, n)
}))
val A = namedCtx("A")
val B = namedCtx("B")
// create a promise with ctx A
val p = Promise[String]()(A)
// I would expect that any callback from p
// is executed in the context of p
val result = p map { _ + Thread.currentThread().getName() }
p.completeWith(Future { "Hi " }(B))
try {
Await.result(result, timeout.duration) must be === "Hi A"
} finally {
A.asInstanceOf[ExecutorService].shutdown()
B.asInstanceOf[ExecutorService].shutdown()
}
}
}
"A Future" when {
"awaiting a result" that {
"is not completed" must {
behave like emptyFuture { test

View file

@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit.NANOSECONDS
import java.util.concurrent.{ ExecutionException, Callable, TimeoutException }
import java.util.concurrent.atomic.{ AtomicInteger, AtomicReferenceFieldUpdater }
import akka.pattern.AskTimeoutException
import util.DynamicVariable
object Await {
@ -341,7 +342,7 @@ object Future {
def blocking(): Unit =
_taskStack.get match {
case stack if (stack ne null) && stack.nonEmpty
val executionContext = _executionContext.get match {
val executionContext = _executionContext.value match {
case null throw new IllegalStateException("'blocking' needs to be invoked inside a Future callback.")
case some some
}
@ -353,33 +354,33 @@ object Future {
}
private val _taskStack = new ThreadLocal[Stack[() Unit]]()
private val _executionContext = new ThreadLocal[ExecutionContext]()
private val _executionContext = new DynamicVariable[ExecutionContext](null)
/**
* Internal API, do not call
*/
private[akka] def dispatchTask(task: () Unit, force: Boolean = false)(implicit executor: ExecutionContext): Unit =
_taskStack.get match {
case stack if (stack ne null) && !force stack push task
case stack if (stack ne null) && (executor eq _executionContext.value) && !force stack push task
case _ executor.execute(
new Runnable {
def run =
try {
_executionContext set executor
val taskStack = Stack.empty[() Unit]
taskStack push task
_taskStack set taskStack
_executionContext.withValue(executor) {
val taskStack = Stack.empty[() Unit]
taskStack push task
_taskStack set taskStack
while (taskStack.nonEmpty) {
val next = taskStack.pop()
try {
next.apply()
} catch {
case NonFatal(e) executor.reportFailure(e)
while (taskStack.nonEmpty) {
val next = taskStack.pop()
try {
next.apply()
} catch {
case NonFatal(e) executor.reportFailure(e)
}
}
}
} finally {
_executionContext.remove()
_taskStack.remove()
}
})