Rewrite dispatcher usage in ActorModelSpec. See #1563

* Configured dispatcher configurators instead of programatically created and registered configurators
* Removed Dispatchers.register
This commit is contained in:
Patrik Nordwall 2012-01-01 16:30:33 +01:00
parent 5ec89a4888
commit 774584642e
3 changed files with 112 additions and 87 deletions

View file

@ -8,6 +8,7 @@ import akka.testkit._
import akka.dispatch._
import akka.util.Timeout
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{ ConcurrentHashMap, CountDownLatch, TimeUnit }
import akka.util.Switch
import java.rmi.RemoteException
@ -18,7 +19,6 @@ import akka.actor.ActorSystem
import akka.util.duration._
import akka.event.Logging.Error
import com.typesafe.config.Config
import java.util.concurrent.atomic.AtomicInteger
import akka.util.Duration
object ActorModelSpec {
@ -231,20 +231,15 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa
import ActorModelSpec._
// FIXME Remove these settings as part of ticket #1563
val DispatcherThroughput = system.settings.config.getInt("akka.actor.default-dispatcher.throughput")
val DispatcherDefaultShutdown = Duration(system.settings.config.getMilliseconds("akka.actor.default-dispatcher.shutdown-timeout"), TimeUnit.MILLISECONDS)
val DispatcherThroughputDeadlineTime = Duration(system.settings.config.getNanoseconds("akka.actor.default-dispatcher.throughput-deadline-time"), TimeUnit.NANOSECONDS)
def newTestActor(dispatcher: String) = system.actorOf(Props[DispatcherActor].withDispatcher(dispatcher))
protected def registerInterceptedDispatcher(): MessageDispatcherInterceptor
protected def interceptedDispatcher(): MessageDispatcherInterceptor
protected def dispatcherType: String
"A " + dispatcherType must {
"must dynamically handle its own life cycle" in {
implicit val dispatcher = registerInterceptedDispatcher()
implicit val dispatcher = interceptedDispatcher()
assertDispatcher(dispatcher)(stops = 0)
val a = newTestActor(dispatcher.id)
assertDispatcher(dispatcher)(stops = 0)
@ -274,7 +269,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa
}
"process messages one at a time" in {
implicit val dispatcher = registerInterceptedDispatcher()
implicit val dispatcher = interceptedDispatcher()
val start, oneAtATime = new CountDownLatch(1)
val a = newTestActor(dispatcher.id)
@ -293,7 +288,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa
}
"handle queueing from multiple threads" in {
implicit val dispatcher = registerInterceptedDispatcher()
implicit val dispatcher = interceptedDispatcher()
val counter = new CountDownLatch(200)
val a = newTestActor(dispatcher.id)
@ -324,7 +319,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa
}
"not process messages for a suspended actor" in {
implicit val dispatcher = registerInterceptedDispatcher()
implicit val dispatcher = interceptedDispatcher()
val a = newTestActor(dispatcher.id).asInstanceOf[LocalActorRef]
val done = new CountDownLatch(1)
a.suspend
@ -343,7 +338,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa
}
"handle waves of actors" in {
val dispatcher = registerInterceptedDispatcher()
val dispatcher = interceptedDispatcher()
val props = Props[DispatcherActor].withDispatcher(dispatcher.id)
def flood(num: Int) {
@ -389,7 +384,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa
"continue to process messages when a thread gets interrupted" in {
filterEvents(EventFilter[InterruptedException](), EventFilter[akka.event.Logging.EventHandlerException]()) {
implicit val dispatcher = registerInterceptedDispatcher()
implicit val dispatcher = interceptedDispatcher()
implicit val timeout = Timeout(5 seconds)
val a = newTestActor(dispatcher.id)
val f1 = a ? Reply("foo")
@ -410,7 +405,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa
"continue to process messages when exception is thrown" in {
filterEvents(EventFilter[IndexOutOfBoundsException](), EventFilter[RemoteException]()) {
implicit val dispatcher = registerInterceptedDispatcher()
implicit val dispatcher = interceptedDispatcher()
val a = newTestActor(dispatcher.id)
val f1 = a ? Reply("foo")
val f2 = a ? Reply("bar")
@ -431,14 +426,38 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa
}
object DispatcherModelSpec {
val config = """
dispatcher {
type = Dispatcher
}
boss {
type = PinnedDispatcher
}
import ActorModelSpec._
val config = {
"""
boss {
type = PinnedDispatcher
}
""" +
// use unique dispatcher id for each test, since MessageDispatcherInterceptor holds state
(for (n 1 to 30) yield """
test-dispatcher-%s {
type = "akka.actor.dispatch.DispatcherModelSpec$MessageDispatcherInterceptorConfigurator"
}""".format(n)).mkString
}
class MessageDispatcherInterceptorConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
extends MessageDispatcherConfigurator(config, prerequisites) {
private val instance: MessageDispatcher = {
configureThreadPool(config,
threadPoolConfig new Dispatcher(prerequisites,
config.getString("name"),
config.getString("id"),
config.getInt("throughput"),
Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS),
mailboxType,
threadPoolConfig,
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) with MessageDispatcherInterceptor).build
}
override def dispatcher(): MessageDispatcher = instance
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
@ -447,28 +466,16 @@ class DispatcherModelSpec extends ActorModelSpec(DispatcherModelSpec.config) {
val dispatcherCount = new AtomicInteger()
override def registerInterceptedDispatcher(): MessageDispatcherInterceptor = {
// use new id for each invocation, since the MessageDispatcherInterceptor holds state
val id = "dispatcher-" + dispatcherCount.incrementAndGet()
val dispatcherConfigurator = new MessageDispatcherConfigurator(system.settings.config.getConfig("dispatcher"), system.dispatchers.prerequisites) {
val instance = {
ThreadPoolConfigDispatcherBuilder(config
new Dispatcher(system.dispatchers.prerequisites, id, id, DispatcherThroughput,
DispatcherThroughputDeadlineTime, UnboundedMailbox(), config,
DispatcherDefaultShutdown) with MessageDispatcherInterceptor,
ThreadPoolConfig()).build
}
override def dispatcher(): MessageDispatcher = instance
}
system.dispatchers.register(id, dispatcherConfigurator)
system.dispatchers.lookup(id).asInstanceOf[MessageDispatcherInterceptor]
override def interceptedDispatcher(): MessageDispatcherInterceptor = {
// use new id for each test, since the MessageDispatcherInterceptor holds state
system.dispatchers.lookup("test-dispatcher-" + dispatcherCount.incrementAndGet()).asInstanceOf[MessageDispatcherInterceptor]
}
override def dispatcherType = "Dispatcher"
"A " + dispatcherType must {
"process messages in parallel" in {
implicit val dispatcher = registerInterceptedDispatcher()
implicit val dispatcher = interceptedDispatcher()
val aStart, aStop, bParallel = new CountDownLatch(1)
val a, b = newTestActor(dispatcher.id)
@ -492,14 +499,40 @@ class DispatcherModelSpec extends ActorModelSpec(DispatcherModelSpec.config) {
}
object BalancingDispatcherModelSpec {
val config = """
dispatcher {
type = BalancingDispatcher
}
boss {
type = PinnedDispatcher
}
import ActorModelSpec._
// TODO check why throughput=1 here? (came from old test)
val config = {
"""
boss {
type = PinnedDispatcher
}
""" +
// use unique dispatcher id for each test, since MessageDispatcherInterceptor holds state
(for (n 1 to 30) yield """
test-balancing-dispatcher-%s {
type = "akka.actor.dispatch.BalancingDispatcherModelSpec$BalancingMessageDispatcherInterceptorConfigurator"
throughput=1
}""".format(n)).mkString
}
class BalancingMessageDispatcherInterceptorConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
extends MessageDispatcherConfigurator(config, prerequisites) {
private val instance: MessageDispatcher = {
configureThreadPool(config,
threadPoolConfig new BalancingDispatcher(prerequisites,
config.getString("name"),
config.getString("id"),
config.getInt("throughput"),
Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS),
mailboxType,
threadPoolConfig,
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) with MessageDispatcherInterceptor).build
}
override def dispatcher(): MessageDispatcher = instance
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
@ -508,29 +541,16 @@ class BalancingDispatcherModelSpec extends ActorModelSpec(BalancingDispatcherMod
val dispatcherCount = new AtomicInteger()
override def registerInterceptedDispatcher(): MessageDispatcherInterceptor = {
// use new id for each invocation, since the MessageDispatcherInterceptor holds state
val id = "dispatcher-" + dispatcherCount.incrementAndGet()
val dispatcherConfigurator = new MessageDispatcherConfigurator(system.settings.config.getConfig("dispatcher"), system.dispatchers.prerequisites) {
val instance = {
ThreadPoolConfigDispatcherBuilder(config
new BalancingDispatcher(system.dispatchers.prerequisites, id, id, 1, // TODO check why 1 here? (came from old test)
DispatcherThroughputDeadlineTime, UnboundedMailbox(),
config, DispatcherDefaultShutdown) with MessageDispatcherInterceptor,
ThreadPoolConfig()).build
}
override def dispatcher(): MessageDispatcher = instance
}
system.dispatchers.register(id, dispatcherConfigurator)
system.dispatchers.lookup(id).asInstanceOf[MessageDispatcherInterceptor]
override def interceptedDispatcher(): MessageDispatcherInterceptor = {
// use new id for each test, since the MessageDispatcherInterceptor holds state
system.dispatchers.lookup("test-balancing-dispatcher-" + dispatcherCount.incrementAndGet()).asInstanceOf[MessageDispatcherInterceptor]
}
override def dispatcherType = "Balancing Dispatcher"
"A " + dispatcherType must {
"process messages in parallel" in {
implicit val dispatcher = registerInterceptedDispatcher()
implicit val dispatcher = interceptedDispatcher()
val aStart, aStop, bParallel = new CountDownLatch(1)
val a, b = newTestActor(dispatcher.id)

View file

@ -3,21 +3,41 @@
*/
package akka.testkit
import akka.actor.dispatch.ActorModelSpec
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicInteger
import org.junit.{ After, Test }
import akka.actor.dispatch.ActorModelSpec
import com.typesafe.config.Config
import akka.dispatch.DispatcherPrerequisites
import akka.dispatch.MessageDispatcher
import akka.dispatch.MessageDispatcherConfigurator
object CallingThreadDispatcherModelSpec {
val config = """
boss {
type = PinnedDispatcher
}
import ActorModelSpec._
val config = {
"""
boss {
type = PinnedDispatcher
}
""" +
// use unique dispatcher id for each test, since MessageDispatcherInterceptor holds state
(for (n 1 to 30) yield """
test-calling-thread-%s {
type = "akka.testkit.CallingThreadDispatcherModelSpec$CallingThreadDispatcherInterceptorConfigurator"
}""".format(n)).mkString
}
class CallingThreadDispatcherInterceptorConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
extends MessageDispatcherConfigurator(config, prerequisites) {
private val instance: MessageDispatcher =
new CallingThreadDispatcher(prerequisites) with MessageDispatcherInterceptor {
override def id: String = config.getString("id")
}
override def dispatcher(): MessageDispatcher = instance
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
@ -26,17 +46,9 @@ class CallingThreadDispatcherModelSpec extends ActorModelSpec(CallingThreadDispa
val dispatcherCount = new AtomicInteger()
override def registerInterceptedDispatcher(): MessageDispatcherInterceptor = {
// use new id for each invocation, since the MessageDispatcherInterceptor holds state
val dispatcherId = "test-calling-thread" + dispatcherCount.incrementAndGet()
val dispatcherConfigurator = new MessageDispatcherConfigurator(system.dispatchers.defaultDispatcherConfig, system.dispatchers.prerequisites) {
val instance = new CallingThreadDispatcher(prerequisites) with MessageDispatcherInterceptor {
override def id: String = dispatcherId
}
override def dispatcher(): MessageDispatcher = instance
}
system.dispatchers.register(dispatcherId, dispatcherConfigurator)
system.dispatchers.lookup(dispatcherId).asInstanceOf[MessageDispatcherInterceptor]
override def interceptedDispatcher(): MessageDispatcherInterceptor = {
// use new id for each test, since the MessageDispatcherInterceptor holds state
system.dispatchers.lookup("test-calling-thread-" + dispatcherCount.incrementAndGet()).asInstanceOf[MessageDispatcherInterceptor]
}
override def dispatcherType = "Calling Thread Dispatcher"

View file

@ -57,13 +57,11 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
*/
def defaultGlobalDispatcher: MessageDispatcher = lookup(DefaultDispatcherId)
// FIXME: Configurators registered here are are not removed, see ticket #1494
private val dispatcherConfigurators = new ConcurrentHashMap[String, MessageDispatcherConfigurator]
/**
* Returns a dispatcher as specified in configuration, or if not defined it uses
* the default dispatcher. The same dispatcher instance is returned for subsequent
* lookups.
* the default dispatcher.
*/
def lookup(id: String): MessageDispatcher = lookupConfigurator(id).dispatcher()
@ -93,11 +91,6 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
}
}
// FIXME #1563: Remove this method when dispatcher usage is rewritten in ActorModelSpec and CallingThreadDispatcherModelSpec
private[akka] def register(id: String, dispatcherConfigurator: MessageDispatcherConfigurator): Unit = {
dispatcherConfigurators.putIfAbsent(id, dispatcherConfigurator)
}
private def config(id: String): Config = {
import scala.collection.JavaConverters._
def simpleName = id.substring(id.lastIndexOf('.') + 1)