diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index 6163123632..1a8930437e 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -82,9 +82,35 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val future = Promise[String]().complete(Left(new NonLocalReturnControl[String]("test", result))) behave like futureWithResult(_(future, result)) } + + "have different ECs" in { + def namedCtx(n: String) = ExecutionContexts.fromExecutorService( + Executors.newSingleThreadExecutor(new ThreadFactory { + def newThread(r: Runnable) = new Thread(r, n) + })) + + val A = namedCtx("A") + val B = namedCtx("B") + + // create a promise with ctx A + val p = Promise[String]()(A) + + // I would expect that any callback from p + // is executed in the context of p + val result = p map { _ + Thread.currentThread().getName() } + + p.completeWith(Future { "Hi " }(B)) + try { + Await.result(result, timeout.duration) must be === "Hi A" + } finally { + A.asInstanceOf[ExecutorService].shutdown() + B.asInstanceOf[ExecutorService].shutdown() + } + } } "A Future" when { + "awaiting a result" that { "is not completed" must { behave like emptyFuture { test ⇒ diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index d6f4751d6e..d9e179a160 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit.NANOSECONDS import java.util.concurrent.{ ExecutionException, Callable, TimeoutException } import java.util.concurrent.atomic.{ AtomicInteger, AtomicReferenceFieldUpdater } import akka.pattern.AskTimeoutException +import util.DynamicVariable object Await { @@ -341,7 +342,7 @@ object Future { def blocking(): Unit = _taskStack.get match { case stack if (stack ne null) && stack.nonEmpty ⇒ - val executionContext = _executionContext.get match { + val executionContext = _executionContext.value match { case null ⇒ throw new IllegalStateException("'blocking' needs to be invoked inside a Future callback.") case some ⇒ some } @@ -353,33 +354,33 @@ object Future { } private val _taskStack = new ThreadLocal[Stack[() ⇒ Unit]]() - private val _executionContext = new ThreadLocal[ExecutionContext]() + private val _executionContext = new DynamicVariable[ExecutionContext](null) /** * Internal API, do not call */ private[akka] def dispatchTask(task: () ⇒ Unit, force: Boolean = false)(implicit executor: ExecutionContext): Unit = _taskStack.get match { - case stack if (stack ne null) && !force ⇒ stack push task + case stack if (stack ne null) && (executor eq _executionContext.value) && !force ⇒ stack push task case _ ⇒ executor.execute( new Runnable { def run = try { - _executionContext set executor - val taskStack = Stack.empty[() ⇒ Unit] - taskStack push task - _taskStack set taskStack + _executionContext.withValue(executor) { + val taskStack = Stack.empty[() ⇒ Unit] + taskStack push task + _taskStack set taskStack - while (taskStack.nonEmpty) { - val next = taskStack.pop() - try { - next.apply() - } catch { - case NonFatal(e) ⇒ executor.reportFailure(e) + while (taskStack.nonEmpty) { + val next = taskStack.pop() + try { + next.apply() + } catch { + case NonFatal(e) ⇒ executor.reportFailure(e) + } } } } finally { - _executionContext.remove() _taskStack.remove() } })