=act #17530 fix BatchingExecutor vs. CallingThreadDispatcher

This commit is contained in:
Roland Kuhn 2015-05-21 12:49:53 +02:00
parent 53c5d974ea
commit b549ab9d45
2 changed files with 81 additions and 3 deletions

View file

@ -7,6 +7,11 @@ import scala.concurrent.duration._
import akka.testkit.{ TestLatch, AkkaSpec, DefaultTimeout }
import akka.util.SerializedSuspendableExecutionContext
import akka.dispatch.ExecutionContexts.sameThreadExecutionContext
import akka.testkit.TestActorRef
import akka.actor.Props
import akka.actor.Actor
import akka.testkit.TestProbe
import akka.testkit.CallingThreadDispatcher
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ExecutionContextSpec extends AkkaSpec with DefaultTimeout {
@ -106,6 +111,79 @@ class ExecutionContextSpec extends AkkaSpec with DefaultTimeout {
}
Await.result(f, 3.seconds) should be(())
}
"work with tasks that block inside blocking" in {
system.dispatcher.isInstanceOf[BatchingExecutor] should be(true)
import system.dispatcher
val f = Future(()).flatMap { _
blocking {
blocking {
blocking {
Future.successful(42)
}
}
}
}
Await.result(f, 3.seconds) should be(42)
}
"work with same-thread executor plus blocking" in {
val ec = akka.dispatch.ExecutionContexts.sameThreadExecutionContext
var x = 0
ec.execute(new Runnable {
override def run = {
ec.execute(new Runnable {
override def run = blocking {
x = 1
}
})
}
})
x should be(1)
}
"work with same-thread dispatcher plus blocking" in {
val a = TestActorRef(Props(new Actor {
def receive = {
case msg
blocking {
sender() ! msg
}
}
}))
val b = TestActorRef(Props(new Actor {
def receive = {
case msg a forward msg
}
}))
val p = TestProbe()
p.send(b, "hello")
p.expectMsg(0.seconds, "hello")
}
"work with same-thread dispatcher as executor with blocking" in {
abstract class RunBatch extends Runnable with Batchable {
override def isBatchable = true
}
val ec = system.dispatchers.lookup(CallingThreadDispatcher.Id)
var x = 0
ec.execute(new RunBatch {
override def run = {
// enqueue a task to the batch
ec.execute(new RunBatch {
override def run = blocking {
x = 1
}
})
// now run it
blocking {
()
}
}
})
x should be(1)
}
}
"A SerializedSuspendableExecutionContext" must {

View file

@ -85,8 +85,8 @@ private[akka] trait BatchingExecutor extends Executor {
override final def run(): Unit = {
require(_tasksLocal.get eq null)
_tasksLocal set this // Install ourselves as the current batch
val prevBlockContext = _blockContext.get
_blockContext.set(BlockContext.current)
val firstInvocation = _blockContext.get eq null
if (firstInvocation) _blockContext.set(BlockContext.current)
BlockContext.withBlockContext(this) {
try processBatch(this) catch {
case t: Throwable
@ -94,7 +94,7 @@ private[akka] trait BatchingExecutor extends Executor {
throw t
} finally {
_tasksLocal.remove()
_blockContext.set(prevBlockContext)
if (firstInvocation) _blockContext.remove()
}
}
}