Merge pull request #17462 from akka/wip-bc-2.3.11-patriknw

=pro Check BC against 2.3.11
This commit is contained in:
Patrik Nordwall 2015-05-13 10:09:44 +02:00
commit 6ebb8b3627
4 changed files with 45 additions and 8 deletions

View file

@ -3,8 +3,10 @@ package akka.dispatch
import java.util.concurrent.{ ExecutorService, Executor, Executors }
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent._
import scala.concurrent.duration._
import akka.testkit.{ TestLatch, AkkaSpec, DefaultTimeout }
import akka.util.SerializedSuspendableExecutionContext
import akka.dispatch.ExecutionContexts.sameThreadExecutionContext
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ExecutionContextSpec extends AkkaSpec with DefaultTimeout {
@ -81,6 +83,29 @@ class ExecutionContextSpec extends AkkaSpec with DefaultTimeout {
}
Await.ready(latch, timeout.duration)
}
"work with tasks that use blocking{} multiple times" in {
system.dispatcher.isInstanceOf[BatchingExecutor] should be(true)
import system.dispatcher
val f = Future(()).flatMap { _
// this needs to be within an OnCompleteRunnable so that things are added to the batch
val p = Future.successful(42)
// we need the callback list to be non-empty when the blocking{} call is executing
p.onComplete { _ () }
val r = p.map { _
// trigger the resubmitUnbatched() call
blocking { () }
// make sure that the other task runs to completion before continuing
Thread.sleep(500)
// now try again to blockOn()
blocking { () }
}
p.onComplete { _ () }
r
}
Await.result(f, 3.seconds) should be(())
}
}
"A SerializedSuspendableExecutionContext" must {

View file

@ -78,22 +78,23 @@ private[akka] trait BatchingExecutor extends Executor {
}
}
private[this] val _blockContext = new ThreadLocal[BlockContext]()
private[this] final class BlockableBatch extends AbstractBatch with BlockContext {
private var parentBlockContext: BlockContext = _
// this method runs in the delegate ExecutionContext's thread
override final def run(): Unit = {
require(_tasksLocal.get eq null)
_tasksLocal set this // Install ourselves as the current batch
val prevBlockContext = BlockContext.current
val prevBlockContext = _blockContext.get
_blockContext.set(BlockContext.current)
BlockContext.withBlockContext(this) {
parentBlockContext = prevBlockContext
try processBatch(this) catch {
case t: Throwable
resubmitUnbatched()
throw t
} finally {
_tasksLocal.remove()
parentBlockContext = null
_blockContext.set(prevBlockContext)
}
}
}
@ -102,8 +103,7 @@ private[akka] trait BatchingExecutor extends Executor {
// if we know there will be blocking, we don't want to keep tasks queued up because it could deadlock.
resubmitUnbatched()
// now delegate the blocking to the previous BC
require(parentBlockContext ne null)
parentBlockContext.blockOn(thunk)
_blockContext.get.blockOn(thunk)
}
}

View file

@ -360,7 +360,7 @@ object AkkaBuild extends Build {
def akkaPreviousArtifact(id: String): Def.Initialize[Option[sbt.ModuleID]] = Def.setting {
if (enableMiMa) {
val version: String = "2.3.10" // FIXME
val version: String = "2.3.11" // FIXME verify all 2.3.x versions
val fullId = crossVersion.value match {
case _ : CrossVersion.Binary => id + "_" + scalaBinaryVersion.value
case _ : CrossVersion.Full => id + "_" + scalaVersion.value

View file

@ -515,7 +515,19 @@ object MiMa extends AutoPlugin {
FilterAnyProblem("akka.remote.PhiAccrualFailureDetector$State"),
FilterAnyProblem("akka.cluster.ClusterDomainEventPublisher"),
FilterAnyProblem("akka.cluster.InternalClusterAction"),
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.InternalClusterAction$PublishCurrentClusterState")
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.InternalClusterAction$PublishCurrentClusterState"),
// issue #16327 compared to 2.3.10
// synthetic method akka$dispatch$BatchingExecutor$BlockableBatch$$parentBlockContext_=(scala.concurrent.BlockContext)Unit in class akka.dispatch.BatchingExecutor#BlockableBatch does not have a correspondent in new version
ProblemFilters.exclude[MissingMethodProblem]("akka.dispatch.BatchingExecutor#BlockableBatch.akka$dispatch$BatchingExecutor$BlockableBatch$$parentBlockContext_="),
// synthetic method akka$dispatch$BatchingExecutor$_setter_$akka$dispatch$BatchingExecutor$$_blockContext_=(java.lang.ThreadLocal)Unit in trait akka.dispatch.BatchingExecutor does not have a correspondent in old version
ProblemFilters.exclude[MissingMethodProblem]("akka.dispatch.BatchingExecutor.akka$dispatch$BatchingExecutor$_setter_$akka$dispatch$BatchingExecutor$$_blockContext_="),
// synthetic method akka$dispatch$BatchingExecutor$$_blockContext()java.lang.ThreadLocal in trait akka.dispatch.BatchingExecutor does not have a correspondent in old version
ProblemFilters.exclude[MissingMethodProblem]("akka.dispatch.BatchingExecutor.akka$dispatch$BatchingExecutor$$_blockContext"),
// issue #16327 compared to 2.3.11
// synthetic method akka$dispatch$BatchingExecutor$_setter_$akka$dispatch$BatchingExecutor$$_blockContext_=(java.lang.ThreadLocal)Unit in class akka.dispatch.MessageDispatcher does not have a correspondent in new version
ProblemFilters.exclude[MissingMethodProblem]("akka.dispatch.MessageDispatcher.akka$dispatch$BatchingExecutor$_setter_$akka$dispatch$BatchingExecutor$$_blockContext_="),
// synthetic method akka$dispatch$BatchingExecutor$$_blockContext()java.lang.ThreadLocal in class akka.dispatch.MessageDispatcher does not have a correspondent in new version
ProblemFilters.exclude[MissingMethodProblem]("akka.dispatch.MessageDispatcher.akka$dispatch$BatchingExecutor$$_blockContext")
)
}