diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/ExecutionContextSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/ExecutionContextSpec.scala index f62e6de0ad..87fdbde2b5 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/ExecutionContextSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/ExecutionContextSpec.scala @@ -3,8 +3,10 @@ package akka.dispatch import java.util.concurrent.{ ExecutorService, Executor, Executors } import java.util.concurrent.atomic.AtomicInteger import scala.concurrent._ +import scala.concurrent.duration._ import akka.testkit.{ TestLatch, AkkaSpec, DefaultTimeout } import akka.util.SerializedSuspendableExecutionContext +import akka.dispatch.ExecutionContexts.sameThreadExecutionContext @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class ExecutionContextSpec extends AkkaSpec with DefaultTimeout { @@ -81,6 +83,29 @@ class ExecutionContextSpec extends AkkaSpec with DefaultTimeout { } Await.ready(latch, timeout.duration) } + + "work with tasks that use blocking{} multiple times" in { + system.dispatcher.isInstanceOf[BatchingExecutor] should be(true) + import system.dispatcher + + val f = Future(()).flatMap { _ ⇒ + // this needs to be within an OnCompleteRunnable so that things are added to the batch + val p = Future.successful(42) + // we need the callback list to be non-empty when the blocking{} call is executing + p.onComplete { _ ⇒ () } + val r = p.map { _ ⇒ + // trigger the resubmitUnbatched() call + blocking { () } + // make sure that the other task runs to completion before continuing + Thread.sleep(500) + // now try again to blockOn() + blocking { () } + } + p.onComplete { _ ⇒ () } + r + } + Await.result(f, 3.seconds) should be(()) + } } "A SerializedSuspendableExecutionContext" must { diff --git a/akka-actor/src/main/scala/akka/dispatch/BatchingExecutor.scala b/akka-actor/src/main/scala/akka/dispatch/BatchingExecutor.scala index adf87d047c..bd39c73c25 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BatchingExecutor.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BatchingExecutor.scala @@ -78,22 +78,23 @@ private[akka] trait BatchingExecutor extends Executor { } } + private[this] val _blockContext = new ThreadLocal[BlockContext]() + private[this] final class BlockableBatch extends AbstractBatch with BlockContext { - private var parentBlockContext: BlockContext = _ // this method runs in the delegate ExecutionContext's thread override final def run(): Unit = { require(_tasksLocal.get eq null) _tasksLocal set this // Install ourselves as the current batch - val prevBlockContext = BlockContext.current + val prevBlockContext = _blockContext.get + _blockContext.set(BlockContext.current) BlockContext.withBlockContext(this) { - parentBlockContext = prevBlockContext try processBatch(this) catch { case t: Throwable ⇒ resubmitUnbatched() throw t } finally { _tasksLocal.remove() - parentBlockContext = null + _blockContext.set(prevBlockContext) } } } @@ -102,8 +103,7 @@ private[akka] trait BatchingExecutor extends Executor { // if we know there will be blocking, we don't want to keep tasks queued up because it could deadlock. resubmitUnbatched() // now delegate the blocking to the previous BC - require(parentBlockContext ne null) - parentBlockContext.blockOn(thunk) + _blockContext.get.blockOn(thunk) } } diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index c7bd2ea88a..f0dbe5af37 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -360,7 +360,7 @@ object AkkaBuild extends Build { def akkaPreviousArtifact(id: String): Def.Initialize[Option[sbt.ModuleID]] = Def.setting { if (enableMiMa) { - val version: String = "2.3.10" // FIXME + val version: String = "2.3.11" // FIXME verify all 2.3.x versions val fullId = crossVersion.value match { case _ : CrossVersion.Binary => id + "_" + scalaBinaryVersion.value case _ : CrossVersion.Full => id + "_" + scalaVersion.value diff --git a/project/MiMa.scala b/project/MiMa.scala index 06ea21a344..7917d78224 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -515,7 +515,19 @@ object MiMa extends AutoPlugin { FilterAnyProblem("akka.remote.PhiAccrualFailureDetector$State"), FilterAnyProblem("akka.cluster.ClusterDomainEventPublisher"), FilterAnyProblem("akka.cluster.InternalClusterAction"), - ProblemFilters.exclude[MissingClassProblem]("akka.cluster.InternalClusterAction$PublishCurrentClusterState") + ProblemFilters.exclude[MissingClassProblem]("akka.cluster.InternalClusterAction$PublishCurrentClusterState"), + // issue #16327 compared to 2.3.10 + // synthetic method akka$dispatch$BatchingExecutor$BlockableBatch$$parentBlockContext_=(scala.concurrent.BlockContext)Unit in class akka.dispatch.BatchingExecutor#BlockableBatch does not have a correspondent in new version + ProblemFilters.exclude[MissingMethodProblem]("akka.dispatch.BatchingExecutor#BlockableBatch.akka$dispatch$BatchingExecutor$BlockableBatch$$parentBlockContext_="), + // synthetic method akka$dispatch$BatchingExecutor$_setter_$akka$dispatch$BatchingExecutor$$_blockContext_=(java.lang.ThreadLocal)Unit in trait akka.dispatch.BatchingExecutor does not have a correspondent in old version + ProblemFilters.exclude[MissingMethodProblem]("akka.dispatch.BatchingExecutor.akka$dispatch$BatchingExecutor$_setter_$akka$dispatch$BatchingExecutor$$_blockContext_="), + // synthetic method akka$dispatch$BatchingExecutor$$_blockContext()java.lang.ThreadLocal in trait akka.dispatch.BatchingExecutor does not have a correspondent in old version + ProblemFilters.exclude[MissingMethodProblem]("akka.dispatch.BatchingExecutor.akka$dispatch$BatchingExecutor$$_blockContext"), + // issue #16327 compared to 2.3.11 + // synthetic method akka$dispatch$BatchingExecutor$_setter_$akka$dispatch$BatchingExecutor$$_blockContext_=(java.lang.ThreadLocal)Unit in class akka.dispatch.MessageDispatcher does not have a correspondent in new version + ProblemFilters.exclude[MissingMethodProblem]("akka.dispatch.MessageDispatcher.akka$dispatch$BatchingExecutor$_setter_$akka$dispatch$BatchingExecutor$$_blockContext_="), + // synthetic method akka$dispatch$BatchingExecutor$$_blockContext()java.lang.ThreadLocal in class akka.dispatch.MessageDispatcher does not have a correspondent in new version + ProblemFilters.exclude[MissingMethodProblem]("akka.dispatch.MessageDispatcher.akka$dispatch$BatchingExecutor$$_blockContext") ) }