From b549ab9d456d3e8d21f6b8d36c9152f55b47833f Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Thu, 21 May 2015 12:49:53 +0200 Subject: [PATCH] =act #17530 fix BatchingExecutor vs. CallingThreadDispatcher --- .../akka/dispatch/ExecutionContextSpec.scala | 78 +++++++++++++++++++ .../akka/dispatch/BatchingExecutor.scala | 6 +- 2 files changed, 81 insertions(+), 3 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/ExecutionContextSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/ExecutionContextSpec.scala index 87fdbde2b5..5765d67bb8 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/ExecutionContextSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/ExecutionContextSpec.scala @@ -7,6 +7,11 @@ import scala.concurrent.duration._ import akka.testkit.{ TestLatch, AkkaSpec, DefaultTimeout } import akka.util.SerializedSuspendableExecutionContext import akka.dispatch.ExecutionContexts.sameThreadExecutionContext +import akka.testkit.TestActorRef +import akka.actor.Props +import akka.actor.Actor +import akka.testkit.TestProbe +import akka.testkit.CallingThreadDispatcher @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class ExecutionContextSpec extends AkkaSpec with DefaultTimeout { @@ -106,6 +111,79 @@ class ExecutionContextSpec extends AkkaSpec with DefaultTimeout { } Await.result(f, 3.seconds) should be(()) } + + "work with tasks that block inside blocking" in { + system.dispatcher.isInstanceOf[BatchingExecutor] should be(true) + import system.dispatcher + + val f = Future(()).flatMap { _ ⇒ + blocking { + blocking { + blocking { + Future.successful(42) + } + } + } + } + Await.result(f, 3.seconds) should be(42) + } + + "work with same-thread executor plus blocking" in { + val ec = akka.dispatch.ExecutionContexts.sameThreadExecutionContext + var x = 0 + ec.execute(new Runnable { + override def run = { + ec.execute(new Runnable { + override def run = blocking { + x = 1 + } + }) + } + }) + x should be(1) + } + + "work with same-thread dispatcher plus blocking" in { + val a = TestActorRef(Props(new Actor { + def receive = { + case msg ⇒ + blocking { + sender() ! msg + } + } + })) + val b = TestActorRef(Props(new Actor { + def receive = { + case msg ⇒ a forward msg + } + })) + val p = TestProbe() + p.send(b, "hello") + p.expectMsg(0.seconds, "hello") + } + + "work with same-thread dispatcher as executor with blocking" in { + abstract class RunBatch extends Runnable with Batchable { + override def isBatchable = true + } + val ec = system.dispatchers.lookup(CallingThreadDispatcher.Id) + var x = 0 + ec.execute(new RunBatch { + override def run = { + // enqueue a task to the batch + ec.execute(new RunBatch { + override def run = blocking { + x = 1 + } + }) + // now run it + blocking { + () + } + } + }) + x should be(1) + } } "A SerializedSuspendableExecutionContext" must { diff --git a/akka-actor/src/main/scala/akka/dispatch/BatchingExecutor.scala b/akka-actor/src/main/scala/akka/dispatch/BatchingExecutor.scala index bd39c73c25..71b0d85661 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BatchingExecutor.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BatchingExecutor.scala @@ -85,8 +85,8 @@ private[akka] trait BatchingExecutor extends Executor { override final def run(): Unit = { require(_tasksLocal.get eq null) _tasksLocal set this // Install ourselves as the current batch - val prevBlockContext = _blockContext.get - _blockContext.set(BlockContext.current) + val firstInvocation = _blockContext.get eq null + if (firstInvocation) _blockContext.set(BlockContext.current) BlockContext.withBlockContext(this) { try processBatch(this) catch { case t: Throwable ⇒ @@ -94,7 +94,7 @@ private[akka] trait BatchingExecutor extends Executor { throw t } finally { _tasksLocal.remove() - _blockContext.set(prevBlockContext) + if (firstInvocation) _blockContext.remove() } } }