diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 20d7b2e1bf..9debbd053c 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -8,6 +8,7 @@ import akka.testkit._ import akka.dispatch._ import akka.util.Timeout import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.{ ConcurrentHashMap, CountDownLatch, TimeUnit } import akka.util.Switch import java.rmi.RemoteException @@ -18,7 +19,6 @@ import akka.actor.ActorSystem import akka.util.duration._ import akka.event.Logging.Error import com.typesafe.config.Config -import java.util.concurrent.atomic.AtomicInteger import akka.util.Duration object ActorModelSpec { @@ -231,20 +231,15 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa import ActorModelSpec._ - // FIXME Remove these settings as part of ticket #1563 - val DispatcherThroughput = system.settings.config.getInt("akka.actor.default-dispatcher.throughput") - val DispatcherDefaultShutdown = Duration(system.settings.config.getMilliseconds("akka.actor.default-dispatcher.shutdown-timeout"), TimeUnit.MILLISECONDS) - val DispatcherThroughputDeadlineTime = Duration(system.settings.config.getNanoseconds("akka.actor.default-dispatcher.throughput-deadline-time"), TimeUnit.NANOSECONDS) - def newTestActor(dispatcher: String) = system.actorOf(Props[DispatcherActor].withDispatcher(dispatcher)) - protected def registerInterceptedDispatcher(): MessageDispatcherInterceptor + protected def interceptedDispatcher(): MessageDispatcherInterceptor protected def dispatcherType: String "A " + dispatcherType must { "must dynamically handle its own life cycle" in { - implicit val dispatcher = registerInterceptedDispatcher() + implicit val dispatcher = interceptedDispatcher() assertDispatcher(dispatcher)(stops = 0) val a = newTestActor(dispatcher.id) assertDispatcher(dispatcher)(stops = 0) @@ -274,7 +269,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa } "process messages one at a time" in { - implicit val dispatcher = registerInterceptedDispatcher() + implicit val dispatcher = interceptedDispatcher() val start, oneAtATime = new CountDownLatch(1) val a = newTestActor(dispatcher.id) @@ -293,7 +288,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa } "handle queueing from multiple threads" in { - implicit val dispatcher = registerInterceptedDispatcher() + implicit val dispatcher = interceptedDispatcher() val counter = new CountDownLatch(200) val a = newTestActor(dispatcher.id) @@ -324,7 +319,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa } "not process messages for a suspended actor" in { - implicit val dispatcher = registerInterceptedDispatcher() + implicit val dispatcher = interceptedDispatcher() val a = newTestActor(dispatcher.id).asInstanceOf[LocalActorRef] val done = new CountDownLatch(1) a.suspend @@ -343,7 +338,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa } "handle waves of actors" in { - val dispatcher = registerInterceptedDispatcher() + val dispatcher = interceptedDispatcher() val props = Props[DispatcherActor].withDispatcher(dispatcher.id) def flood(num: Int) { @@ -389,7 +384,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa "continue to process messages when a thread gets interrupted" in { filterEvents(EventFilter[InterruptedException](), EventFilter[akka.event.Logging.EventHandlerException]()) { - implicit val dispatcher = registerInterceptedDispatcher() + implicit val dispatcher = interceptedDispatcher() implicit val timeout = Timeout(5 seconds) val a = newTestActor(dispatcher.id) val f1 = a ? Reply("foo") @@ -410,7 +405,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa "continue to process messages when exception is thrown" in { filterEvents(EventFilter[IndexOutOfBoundsException](), EventFilter[RemoteException]()) { - implicit val dispatcher = registerInterceptedDispatcher() + implicit val dispatcher = interceptedDispatcher() val a = newTestActor(dispatcher.id) val f1 = a ? Reply("foo") val f2 = a ? Reply("bar") @@ -431,14 +426,38 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa } object DispatcherModelSpec { - val config = """ - dispatcher { - type = Dispatcher - } - boss { - type = PinnedDispatcher - } + import ActorModelSpec._ + + val config = { """ + boss { + type = PinnedDispatcher + } + """ + + // use unique dispatcher id for each test, since MessageDispatcherInterceptor holds state + (for (n ← 1 to 30) yield """ + test-dispatcher-%s { + type = "akka.actor.dispatch.DispatcherModelSpec$MessageDispatcherInterceptorConfigurator" + }""".format(n)).mkString + } + + class MessageDispatcherInterceptorConfigurator(config: Config, prerequisites: DispatcherPrerequisites) + extends MessageDispatcherConfigurator(config, prerequisites) { + + private val instance: MessageDispatcher = { + configureThreadPool(config, + threadPoolConfig ⇒ new Dispatcher(prerequisites, + config.getString("name"), + config.getString("id"), + config.getInt("throughput"), + Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), + mailboxType, + threadPoolConfig, + Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) with MessageDispatcherInterceptor).build + } + + override def dispatcher(): MessageDispatcher = instance + } } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -447,28 +466,16 @@ class DispatcherModelSpec extends ActorModelSpec(DispatcherModelSpec.config) { val dispatcherCount = new AtomicInteger() - override def registerInterceptedDispatcher(): MessageDispatcherInterceptor = { - // use new id for each invocation, since the MessageDispatcherInterceptor holds state - val id = "dispatcher-" + dispatcherCount.incrementAndGet() - val dispatcherConfigurator = new MessageDispatcherConfigurator(system.settings.config.getConfig("dispatcher"), system.dispatchers.prerequisites) { - val instance = { - ThreadPoolConfigDispatcherBuilder(config ⇒ - new Dispatcher(system.dispatchers.prerequisites, id, id, DispatcherThroughput, - DispatcherThroughputDeadlineTime, UnboundedMailbox(), config, - DispatcherDefaultShutdown) with MessageDispatcherInterceptor, - ThreadPoolConfig()).build - } - override def dispatcher(): MessageDispatcher = instance - } - system.dispatchers.register(id, dispatcherConfigurator) - system.dispatchers.lookup(id).asInstanceOf[MessageDispatcherInterceptor] + override def interceptedDispatcher(): MessageDispatcherInterceptor = { + // use new id for each test, since the MessageDispatcherInterceptor holds state + system.dispatchers.lookup("test-dispatcher-" + dispatcherCount.incrementAndGet()).asInstanceOf[MessageDispatcherInterceptor] } override def dispatcherType = "Dispatcher" "A " + dispatcherType must { "process messages in parallel" in { - implicit val dispatcher = registerInterceptedDispatcher() + implicit val dispatcher = interceptedDispatcher() val aStart, aStop, bParallel = new CountDownLatch(1) val a, b = newTestActor(dispatcher.id) @@ -492,14 +499,40 @@ class DispatcherModelSpec extends ActorModelSpec(DispatcherModelSpec.config) { } object BalancingDispatcherModelSpec { - val config = """ - dispatcher { - type = BalancingDispatcher - } - boss { - type = PinnedDispatcher - } + import ActorModelSpec._ + + // TODO check why throughput=1 here? (came from old test) + val config = { """ + boss { + type = PinnedDispatcher + } + """ + + // use unique dispatcher id for each test, since MessageDispatcherInterceptor holds state + (for (n ← 1 to 30) yield """ + test-balancing-dispatcher-%s { + type = "akka.actor.dispatch.BalancingDispatcherModelSpec$BalancingMessageDispatcherInterceptorConfigurator" + throughput=1 + }""".format(n)).mkString + } + + class BalancingMessageDispatcherInterceptorConfigurator(config: Config, prerequisites: DispatcherPrerequisites) + extends MessageDispatcherConfigurator(config, prerequisites) { + + private val instance: MessageDispatcher = { + configureThreadPool(config, + threadPoolConfig ⇒ new BalancingDispatcher(prerequisites, + config.getString("name"), + config.getString("id"), + config.getInt("throughput"), + Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), + mailboxType, + threadPoolConfig, + Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) with MessageDispatcherInterceptor).build + } + + override def dispatcher(): MessageDispatcher = instance + } } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -508,29 +541,16 @@ class BalancingDispatcherModelSpec extends ActorModelSpec(BalancingDispatcherMod val dispatcherCount = new AtomicInteger() - override def registerInterceptedDispatcher(): MessageDispatcherInterceptor = { - // use new id for each invocation, since the MessageDispatcherInterceptor holds state - val id = "dispatcher-" + dispatcherCount.incrementAndGet() - val dispatcherConfigurator = new MessageDispatcherConfigurator(system.settings.config.getConfig("dispatcher"), system.dispatchers.prerequisites) { - val instance = { - ThreadPoolConfigDispatcherBuilder(config ⇒ - new BalancingDispatcher(system.dispatchers.prerequisites, id, id, 1, // TODO check why 1 here? (came from old test) - DispatcherThroughputDeadlineTime, UnboundedMailbox(), - config, DispatcherDefaultShutdown) with MessageDispatcherInterceptor, - ThreadPoolConfig()).build - } - - override def dispatcher(): MessageDispatcher = instance - } - system.dispatchers.register(id, dispatcherConfigurator) - system.dispatchers.lookup(id).asInstanceOf[MessageDispatcherInterceptor] + override def interceptedDispatcher(): MessageDispatcherInterceptor = { + // use new id for each test, since the MessageDispatcherInterceptor holds state + system.dispatchers.lookup("test-balancing-dispatcher-" + dispatcherCount.incrementAndGet()).asInstanceOf[MessageDispatcherInterceptor] } override def dispatcherType = "Balancing Dispatcher" "A " + dispatcherType must { "process messages in parallel" in { - implicit val dispatcher = registerInterceptedDispatcher() + implicit val dispatcher = interceptedDispatcher() val aStart, aStop, bParallel = new CountDownLatch(1) val a, b = newTestActor(dispatcher.id) diff --git a/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala b/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala index efaf28aaf1..800148a187 100644 --- a/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala @@ -3,21 +3,41 @@ */ package akka.testkit -import akka.actor.dispatch.ActorModelSpec -import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicInteger -import org.junit.{ After, Test } +import akka.actor.dispatch.ActorModelSpec import com.typesafe.config.Config import akka.dispatch.DispatcherPrerequisites import akka.dispatch.MessageDispatcher import akka.dispatch.MessageDispatcherConfigurator object CallingThreadDispatcherModelSpec { - val config = """ - boss { - type = PinnedDispatcher - } + import ActorModelSpec._ + + val config = { """ + boss { + type = PinnedDispatcher + } + """ + + // use unique dispatcher id for each test, since MessageDispatcherInterceptor holds state + (for (n ← 1 to 30) yield """ + test-calling-thread-%s { + type = "akka.testkit.CallingThreadDispatcherModelSpec$CallingThreadDispatcherInterceptorConfigurator" + }""".format(n)).mkString + } + + class CallingThreadDispatcherInterceptorConfigurator(config: Config, prerequisites: DispatcherPrerequisites) + extends MessageDispatcherConfigurator(config, prerequisites) { + + private val instance: MessageDispatcher = + new CallingThreadDispatcher(prerequisites) with MessageDispatcherInterceptor { + override def id: String = config.getString("id") + } + + override def dispatcher(): MessageDispatcher = instance + + } + } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -26,17 +46,9 @@ class CallingThreadDispatcherModelSpec extends ActorModelSpec(CallingThreadDispa val dispatcherCount = new AtomicInteger() - override def registerInterceptedDispatcher(): MessageDispatcherInterceptor = { - // use new id for each invocation, since the MessageDispatcherInterceptor holds state - val dispatcherId = "test-calling-thread" + dispatcherCount.incrementAndGet() - val dispatcherConfigurator = new MessageDispatcherConfigurator(system.dispatchers.defaultDispatcherConfig, system.dispatchers.prerequisites) { - val instance = new CallingThreadDispatcher(prerequisites) with MessageDispatcherInterceptor { - override def id: String = dispatcherId - } - override def dispatcher(): MessageDispatcher = instance - } - system.dispatchers.register(dispatcherId, dispatcherConfigurator) - system.dispatchers.lookup(dispatcherId).asInstanceOf[MessageDispatcherInterceptor] + override def interceptedDispatcher(): MessageDispatcherInterceptor = { + // use new id for each test, since the MessageDispatcherInterceptor holds state + system.dispatchers.lookup("test-calling-thread-" + dispatcherCount.incrementAndGet()).asInstanceOf[MessageDispatcherInterceptor] } override def dispatcherType = "Calling Thread Dispatcher" diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index db200d09d6..491d79a63b 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -57,13 +57,11 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc */ def defaultGlobalDispatcher: MessageDispatcher = lookup(DefaultDispatcherId) - // FIXME: Configurators registered here are are not removed, see ticket #1494 private val dispatcherConfigurators = new ConcurrentHashMap[String, MessageDispatcherConfigurator] /** * Returns a dispatcher as specified in configuration, or if not defined it uses - * the default dispatcher. The same dispatcher instance is returned for subsequent - * lookups. + * the default dispatcher. */ def lookup(id: String): MessageDispatcher = lookupConfigurator(id).dispatcher() @@ -93,11 +91,6 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc } } - // FIXME #1563: Remove this method when dispatcher usage is rewritten in ActorModelSpec and CallingThreadDispatcherModelSpec - private[akka] def register(id: String, dispatcherConfigurator: MessageDispatcherConfigurator): Unit = { - dispatcherConfigurators.putIfAbsent(id, dispatcherConfigurator) - } - private def config(id: String): Config = { import scala.collection.JavaConverters._ def simpleName = id.substring(id.lastIndexOf('.') + 1)