Removing Reactor based dispatchers and closing #428
This commit is contained in:
parent
9c1cbfffaf
commit
f1a1755772
16 changed files with 6 additions and 497 deletions
|
|
@ -1,52 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.dispatch
|
||||
|
||||
import java.util.{LinkedList, Queue, List}
|
||||
import java.util.HashMap
|
||||
|
||||
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
|
||||
|
||||
abstract class AbstractReactorBasedEventDrivenDispatcher(val name: String) extends MessageDispatcher {
|
||||
@volatile protected var active: Boolean = false
|
||||
protected val queue = new ReactiveMessageQueue(name)
|
||||
protected var selectorThread: Thread = _
|
||||
protected val guard = new Object
|
||||
|
||||
def dispatch(invocation: MessageInvocation) = queue.append(invocation)
|
||||
|
||||
def shutdown = if (active) {
|
||||
log.debug("Shutting down %s", toString)
|
||||
active = false
|
||||
selectorThread.interrupt
|
||||
doShutdown
|
||||
}
|
||||
|
||||
/**
|
||||
* Subclass callback. Override if additional shutdown behavior is needed.
|
||||
*/
|
||||
protected def doShutdown = {}
|
||||
}
|
||||
|
||||
class ReactiveMessageQueue(name: String) extends MessageQueue {
|
||||
private[akka] val queue: Queue[MessageInvocation] = new LinkedList[MessageInvocation]
|
||||
@volatile private var interrupted = false
|
||||
|
||||
def append(handle: MessageInvocation) = queue.synchronized {
|
||||
queue.offer(handle)
|
||||
queue.notifyAll
|
||||
}
|
||||
|
||||
def read(destination: List[MessageInvocation]) = queue.synchronized {
|
||||
while (queue.isEmpty && !interrupted) queue.wait
|
||||
if (!interrupted) while (!queue.isEmpty) destination.add(queue.remove)
|
||||
else interrupted = false
|
||||
}
|
||||
|
||||
def interrupt = queue.synchronized {
|
||||
interrupted = true
|
||||
queue.notifyAll
|
||||
}
|
||||
}
|
||||
|
|
@ -61,10 +61,6 @@ object Dispatchers extends Logging {
|
|||
}
|
||||
}
|
||||
|
||||
object globalReactorBasedSingleThreadEventDrivenDispatcher extends ReactorBasedSingleThreadEventDrivenDispatcher("global")
|
||||
|
||||
object globalReactorBasedThreadPoolEventDrivenDispatcher extends ReactorBasedThreadPoolEventDrivenDispatcher("global")
|
||||
|
||||
/**
|
||||
* Creates an event-driven dispatcher based on the excellent HawtDispatch library.
|
||||
* <p/>
|
||||
|
|
@ -132,18 +128,6 @@ object Dispatchers extends Logging {
|
|||
*/
|
||||
def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String, mailboxCapacity: Int) = new ExecutorBasedEventDrivenWorkStealingDispatcher(name, mailboxCapacity)
|
||||
|
||||
/**
|
||||
* Creates a reactor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
|
||||
* <p/>
|
||||
* Has a fluent builder interface for configuring its semantics.
|
||||
*/
|
||||
def newReactorBasedThreadPoolEventDrivenDispatcher(name: String) = new ReactorBasedThreadPoolEventDrivenDispatcher(name)
|
||||
|
||||
/**
|
||||
* Creates a reactor-based event-driven dispatcher serving multiple (millions) of actors through a single thread.
|
||||
*/
|
||||
def newReactorBasedSingleThreadEventDrivenDispatcher(name: String) = new ReactorBasedSingleThreadEventDrivenDispatcher(name)
|
||||
|
||||
/**
|
||||
* Utility function that tries to load the specified dispatcher config from the akka.conf
|
||||
* or else use the supplied default dispatcher
|
||||
|
|
@ -156,9 +140,8 @@ object Dispatchers extends Logging {
|
|||
*
|
||||
* default-dispatcher {
|
||||
* type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable
|
||||
* # ReactorBasedSingleThreadEventDriven, (ExecutorBasedEventDrivenWorkStealing), ExecutorBasedEventDriven,
|
||||
* # ReactorBasedThreadPoolEventDriven, Hawt, GlobalReactorBasedSingleThreadEventDriven,
|
||||
* # GlobalReactorBasedThreadPoolEventDriven, GlobalExecutorBasedEventDriven, GlobalHawt
|
||||
* # (ExecutorBasedEventDrivenWorkStealing), ExecutorBasedEventDriven,
|
||||
* # Hawt, GlobalExecutorBasedEventDriven, GlobalHawt
|
||||
* keep-alive-ms = 60000 # Keep alive time for threads
|
||||
* core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor)
|
||||
* max-pool-size-factor = 4.0 # Max no of threads ... ceil(available processors * factor)
|
||||
|
|
@ -197,13 +180,9 @@ object Dispatchers extends Logging {
|
|||
}
|
||||
|
||||
val dispatcher: Option[MessageDispatcher] = cfg.getString("type") map {
|
||||
case "ReactorBasedSingleThreadEventDriven" => new ReactorBasedSingleThreadEventDrivenDispatcher(name)
|
||||
case "ExecutorBasedEventDrivenWorkStealing" => new ExecutorBasedEventDrivenWorkStealingDispatcher(name,MAILBOX_CAPACITY,threadPoolConfig)
|
||||
case "ExecutorBasedEventDriven" => new ExecutorBasedEventDrivenDispatcher(name, cfg.getInt("throughput",THROUGHPUT),MAILBOX_CAPACITY,threadPoolConfig)
|
||||
case "ReactorBasedThreadPoolEventDriven" => new ReactorBasedThreadPoolEventDrivenDispatcher(name,threadPoolConfig)
|
||||
case "Hawt" => new HawtDispatcher(cfg.getBool("aggregate").getOrElse(true))
|
||||
case "GlobalReactorBasedSingleThreadEventDriven" => globalReactorBasedSingleThreadEventDrivenDispatcher
|
||||
case "GlobalReactorBasedThreadPoolEventDriven" => globalReactorBasedThreadPoolEventDrivenDispatcher
|
||||
case "GlobalExecutorBasedEventDriven" => globalExecutorBasedEventDrivenDispatcher
|
||||
case "GlobalHawt" => globalHawtDispatcher
|
||||
|
||||
|
|
|
|||
|
|
@ -1,62 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
/**
|
||||
* Implements the Reactor pattern as defined in: [http://www.cs.wustl.edu/~schmidt/PDF/reactor-siemens.pdf].
|
||||
* See also this article: [http://today.java.net/cs/user/print/a/350].
|
||||
*
|
||||
* Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
|
||||
*/
|
||||
package se.scalablesolutions.akka.dispatch
|
||||
|
||||
import java.util.{LinkedList, List}
|
||||
import se.scalablesolutions.akka.actor.ActorRef
|
||||
|
||||
class ReactorBasedSingleThreadEventDrivenDispatcher(_name: String)
|
||||
extends AbstractReactorBasedEventDrivenDispatcher("akka:event-driven:reactor:single-thread:dispatcher:" + _name) {
|
||||
|
||||
def start = if (!active) {
|
||||
log.debug("Starting up %s", toString)
|
||||
active = true
|
||||
val messageDemultiplexer = new Demultiplexer(queue)
|
||||
selectorThread = new Thread(name) {
|
||||
override def run = {
|
||||
while (active) {
|
||||
try {
|
||||
messageDemultiplexer.select
|
||||
} catch { case e: InterruptedException => active = false }
|
||||
val selectedInvocations = messageDemultiplexer.acquireSelectedInvocations
|
||||
val iter = selectedInvocations.iterator
|
||||
while (iter.hasNext) {
|
||||
val invocation = iter.next
|
||||
val invoker = invocation.receiver
|
||||
if (invoker ne null) invoker invoke invocation
|
||||
iter.remove
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
selectorThread.start
|
||||
}
|
||||
|
||||
def mailboxSize(a: ActorRef) = 0
|
||||
|
||||
def isShutdown = !active
|
||||
|
||||
override def toString = "ReactorBasedSingleThreadEventDrivenDispatcher[" + name + "]"
|
||||
|
||||
class Demultiplexer(private val messageQueue: ReactiveMessageQueue) extends MessageDemultiplexer {
|
||||
|
||||
private val selectedQueue: List[MessageInvocation] = new LinkedList[MessageInvocation]
|
||||
|
||||
def select = messageQueue.read(selectedQueue)
|
||||
|
||||
def acquireSelectedInvocations: List[MessageInvocation] = selectedQueue
|
||||
|
||||
def releaseSelectedInvocations = throw new UnsupportedOperationException("Demultiplexer can't release its queue")
|
||||
|
||||
def wakeUp = throw new UnsupportedOperationException("Demultiplexer can't be woken up")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1,176 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.dispatch
|
||||
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
|
||||
import java.util.{HashSet, HashMap, LinkedList, List}
|
||||
import se.scalablesolutions.akka.actor.{ActorRef, IllegalActorStateException}
|
||||
|
||||
/**
|
||||
* Implements the Reactor pattern as defined in: [http://www.cs.wustl.edu/~schmidt/PDF/reactor-siemens.pdf].<br/>
|
||||
* See also this article: [http://today.java.net/cs/user/print/a/350].
|
||||
* <p/>
|
||||
*
|
||||
* Default settings are:
|
||||
* <pre/>
|
||||
* - withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
|
||||
* - NR_START_THREADS = 16
|
||||
* - NR_MAX_THREADS = 128
|
||||
* - KEEP_ALIVE_TIME = 60000L // one minute
|
||||
* </pre>
|
||||
* <p/>
|
||||
*
|
||||
* The dispatcher has a fluent builder interface to build up a thread pool to suite your use-case.
|
||||
* There is a default thread pool defined but make use of the builder if you need it. Here are some examples.
|
||||
* <p/>
|
||||
*
|
||||
* Scala API.
|
||||
* <p/>
|
||||
* Example usage:
|
||||
* <pre/>
|
||||
* val dispatcher = new ReactorBasedThreadPoolEventDrivenDispatcher("name")
|
||||
* dispatcher
|
||||
* .withNewThreadPoolWithBoundedBlockingQueue(100)
|
||||
* .setCorePoolSize(16)
|
||||
* .setMaxPoolSize(128)
|
||||
* .setKeepAliveTimeInMillis(60000)
|
||||
* .setRejectionPolicy(new CallerRunsPolicy)
|
||||
* .buildThreadPool
|
||||
* </pre>
|
||||
* <p/>
|
||||
*
|
||||
* Java API.
|
||||
* <p/>
|
||||
* Example usage:
|
||||
* <pre/>
|
||||
* ReactorBasedThreadPoolEventDrivenDispatcher dispatcher = new ReactorBasedThreadPoolEventDrivenDispatcher("name");
|
||||
* dispatcher
|
||||
* .withNewThreadPoolWithBoundedBlockingQueue(100)
|
||||
* .setCorePoolSize(16)
|
||||
* .setMaxPoolSize(128)
|
||||
* .setKeepAliveTimeInMillis(60000)
|
||||
* .setRejectionPolicy(new CallerRunsPolicy())
|
||||
* .buildThreadPool();
|
||||
* </pre>
|
||||
* <p/>
|
||||
*
|
||||
* But the preferred way of creating dispatchers is to use
|
||||
* the {@link se.scalablesolutions.akka.dispatch.Dispatchers} factory object.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String,config: (ThreadPoolBuilder) => Unit)
|
||||
extends AbstractReactorBasedEventDrivenDispatcher("akka:event-driven:reactor:dispatcher:" + _name)
|
||||
with ThreadPoolBuilder {
|
||||
|
||||
def this(_name: String) = this(_name,_ => ())
|
||||
|
||||
private var fair = true
|
||||
private val busyActors = new HashSet[AnyRef]
|
||||
private val messageDemultiplexer = new Demultiplexer(queue)
|
||||
|
||||
// build default thread pool
|
||||
init
|
||||
|
||||
def start = if (!active) {
|
||||
log.debug("Starting up %s", toString)
|
||||
active = true
|
||||
|
||||
/**
|
||||
* This dispatcher code is based on code from the actorom actor framework by Sergio Bossa
|
||||
* [http://code.google.com/p/actorom/].
|
||||
*/
|
||||
selectorThread = new Thread(name) {
|
||||
override def run = {
|
||||
while (active) {
|
||||
try {
|
||||
try {
|
||||
messageDemultiplexer.select
|
||||
} catch { case e: InterruptedException => active = false }
|
||||
process(messageDemultiplexer.acquireSelectedInvocations)
|
||||
} finally {
|
||||
messageDemultiplexer.releaseSelectedInvocations
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
selectorThread.start
|
||||
}
|
||||
|
||||
override protected def doShutdown = executor.shutdownNow
|
||||
|
||||
private def process(selectedInvocations: List[MessageInvocation]) = synchronized {
|
||||
var nrOfBusyMessages = 0
|
||||
val totalNrOfActors = uuids.size
|
||||
val totalNrOfBusyActors = busyActors.size
|
||||
val invocations = selectedInvocations.iterator
|
||||
while (invocations.hasNext && totalNrOfActors > totalNrOfBusyActors && passFairnessCheck(nrOfBusyMessages)) {
|
||||
val invocation = invocations.next
|
||||
if (invocation eq null) throw new IllegalActorStateException("Message invocation is null [" + invocation + "]")
|
||||
if (!busyActors.contains(invocation.receiver)) {
|
||||
val invoker = invocation.receiver
|
||||
if (invoker eq null) throw new IllegalActorStateException(
|
||||
"Message invoker for invocation [" + invocation + "] is null")
|
||||
resume(invocation.receiver)
|
||||
invocations.remove
|
||||
executor.execute(new Runnable() {
|
||||
def run = {
|
||||
invoker.invoke(invocation)
|
||||
suspend(invocation.receiver)
|
||||
messageDemultiplexer.wakeUp
|
||||
}
|
||||
})
|
||||
} else nrOfBusyMessages += 1
|
||||
}
|
||||
}
|
||||
|
||||
private def resume(actor: AnyRef) = synchronized {
|
||||
busyActors.add(actor)
|
||||
}
|
||||
|
||||
private def suspend(actor: AnyRef) = synchronized {
|
||||
busyActors.remove(actor)
|
||||
}
|
||||
|
||||
private def passFairnessCheck(nrOfBusyMessages: Int) = {
|
||||
if (fair) true
|
||||
else nrOfBusyMessages < 100
|
||||
}
|
||||
|
||||
def mailboxSize(a: ActorRef) = 0
|
||||
|
||||
def ensureNotActive(): Unit = if (active) throw new IllegalActorStateException(
|
||||
"Can't build a new thread pool for a dispatcher that is already up and running")
|
||||
|
||||
override def toString = "ReactorBasedThreadPoolEventDrivenDispatcher[" + name + "]"
|
||||
|
||||
class Demultiplexer(private val messageQueue: ReactiveMessageQueue) extends MessageDemultiplexer {
|
||||
private val selectedInvocations: List[MessageInvocation] = new LinkedList[MessageInvocation]
|
||||
private val selectedInvocationsLock = new ReentrantLock
|
||||
|
||||
def select = try {
|
||||
selectedInvocationsLock.lock
|
||||
messageQueue.read(selectedInvocations)
|
||||
} finally {
|
||||
selectedInvocationsLock.unlock
|
||||
}
|
||||
|
||||
def acquireSelectedInvocations: List[MessageInvocation] = {
|
||||
selectedInvocationsLock.lock
|
||||
selectedInvocations
|
||||
}
|
||||
|
||||
def releaseSelectedInvocations = selectedInvocationsLock.unlock
|
||||
|
||||
def wakeUp = messageQueue.interrupt
|
||||
}
|
||||
|
||||
private[akka] def init = {
|
||||
withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
|
||||
config(this)
|
||||
buildThreadPool
|
||||
}
|
||||
}
|
||||
|
|
@ -28,13 +28,9 @@ object DispatchersSpec {
|
|||
def ofType[T <: MessageDispatcher : Manifest]: (MessageDispatcher) => Boolean = _.getClass == manifest[T].erasure
|
||||
|
||||
def typesAndValidators: Map[String,(MessageDispatcher) => Boolean] = Map(
|
||||
"ReactorBasedSingleThreadEventDriven" -> ofType[ReactorBasedSingleThreadEventDrivenDispatcher],
|
||||
"ExecutorBasedEventDrivenWorkStealing" -> ofType[ExecutorBasedEventDrivenWorkStealingDispatcher],
|
||||
"ExecutorBasedEventDriven" -> ofType[ExecutorBasedEventDrivenDispatcher],
|
||||
"ReactorBasedThreadPoolEventDriven" -> ofType[ReactorBasedThreadPoolEventDrivenDispatcher],
|
||||
"Hawt" -> ofType[HawtDispatcher],
|
||||
"GlobalReactorBasedSingleThreadEventDriven" -> instance(globalReactorBasedSingleThreadEventDrivenDispatcher),
|
||||
"GlobalReactorBasedThreadPoolEventDriven" -> instance(globalReactorBasedThreadPoolEventDrivenDispatcher),
|
||||
"GlobalExecutorBasedEventDriven" -> instance(globalExecutorBasedEventDrivenDispatcher),
|
||||
"GlobalHawt" -> instance(globalHawtDispatcher)
|
||||
)
|
||||
|
|
|
|||
|
|
@ -1,71 +0,0 @@
|
|||
package se.scalablesolutions.akka.actor.dispatch
|
||||
|
||||
import java.util.concurrent.{CountDownLatch, TimeUnit}
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
|
||||
import se.scalablesolutions.akka.dispatch.Dispatchers
|
||||
import se.scalablesolutions.akka.actor.Actor
|
||||
import Actor._
|
||||
|
||||
object ReactorBasedSingleThreadEventDrivenDispatcherActorSpec {
|
||||
class TestActor extends Actor {
|
||||
self.dispatcher = Dispatchers.newReactorBasedSingleThreadEventDrivenDispatcher(self.uuid)
|
||||
|
||||
def receive = {
|
||||
case "Hello" =>
|
||||
self.reply("World")
|
||||
case "Failure" =>
|
||||
throw new RuntimeException("Expected exception; to test fault-tolerance")
|
||||
}
|
||||
}
|
||||
|
||||
object OneWayTestActor {
|
||||
val oneWay = new CountDownLatch(1)
|
||||
}
|
||||
class OneWayTestActor extends Actor {
|
||||
self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(self.uuid)
|
||||
def receive = {
|
||||
case "OneWay" => OneWayTestActor.oneWay.countDown
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class ReactorBasedSingleThreadEventDrivenDispatcherActorSpec extends JUnitSuite {
|
||||
import ReactorBasedSingleThreadEventDrivenDispatcherActorSpec._
|
||||
|
||||
private val unit = TimeUnit.MILLISECONDS
|
||||
|
||||
@Test def shouldSendOneWay = {
|
||||
val actor = actorOf[OneWayTestActor].start
|
||||
val result = actor ! "OneWay"
|
||||
assert(OneWayTestActor.oneWay.await(1, TimeUnit.SECONDS))
|
||||
actor.stop
|
||||
}
|
||||
|
||||
@Test def shouldSendReplySync = {
|
||||
val actor = actorOf[TestActor].start
|
||||
val result = (actor !! ("Hello", 10000)).as[String].get
|
||||
assert("World" === result)
|
||||
actor.stop
|
||||
}
|
||||
|
||||
@Test def shouldSendReplyAsync = {
|
||||
val actor = actorOf[TestActor].start
|
||||
val result = actor !! "Hello"
|
||||
assert("World" === result.get.asInstanceOf[String])
|
||||
actor.stop
|
||||
}
|
||||
|
||||
@Test def shouldSendReceiveException = {
|
||||
val actor = actorOf[TestActor].start
|
||||
try {
|
||||
actor !! "Failure"
|
||||
fail("Should have thrown an exception")
|
||||
} catch {
|
||||
case e =>
|
||||
assert("Expected exception; to test fault-tolerance" === e.getMessage())
|
||||
}
|
||||
actor.stop
|
||||
}
|
||||
}
|
||||
|
|
@ -1,66 +0,0 @@
|
|||
package se.scalablesolutions.akka.actor.dispatch
|
||||
|
||||
import java.util.concurrent.{CountDownLatch, TimeUnit}
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
|
||||
import se.scalablesolutions.akka.dispatch.Dispatchers
|
||||
import se.scalablesolutions.akka.actor.Actor
|
||||
import Actor._
|
||||
|
||||
object ReactorBasedThreadPoolEventDrivenDispatcherActorSpec {
|
||||
class TestActor extends Actor {
|
||||
self.dispatcher = Dispatchers.newReactorBasedThreadPoolEventDrivenDispatcher(self.uuid)
|
||||
def receive = {
|
||||
case "Hello" =>
|
||||
self.reply("World")
|
||||
case "Failure" =>
|
||||
throw new RuntimeException("Expected exception; to test fault-tolerance")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class ReactorBasedThreadPoolEventDrivenDispatcherActorSpec extends JUnitSuite {
|
||||
import ReactorBasedThreadPoolEventDrivenDispatcherActorSpec._
|
||||
|
||||
private val unit = TimeUnit.MILLISECONDS
|
||||
|
||||
@Test def shouldSendOneWay {
|
||||
val oneWay = new CountDownLatch(1)
|
||||
val actor = actorOf(new Actor {
|
||||
self.dispatcher = Dispatchers.newReactorBasedThreadPoolEventDrivenDispatcher(self.uuid)
|
||||
def receive = {
|
||||
case "OneWay" => oneWay.countDown
|
||||
}
|
||||
}).start
|
||||
val result = actor ! "OneWay"
|
||||
assert(oneWay.await(1, TimeUnit.SECONDS))
|
||||
actor.stop
|
||||
}
|
||||
|
||||
@Test def shouldSendReplySync = {
|
||||
val actor = actorOf[TestActor].start
|
||||
val result = (actor !! ("Hello", 10000)).as[String].get
|
||||
assert("World" === result)
|
||||
actor.stop
|
||||
}
|
||||
|
||||
@Test def shouldSendReplyAsync = {
|
||||
val actor = actorOf[TestActor].start
|
||||
val result = actor !! "Hello"
|
||||
assert("World" === result.get.asInstanceOf[String])
|
||||
actor.stop
|
||||
}
|
||||
|
||||
@Test def shouldSendReceiveException = {
|
||||
val actor = actorOf[TestActor].start
|
||||
try {
|
||||
actor !! "Failure"
|
||||
fail("Should have thrown an exception")
|
||||
} catch {
|
||||
case e =>
|
||||
assert("Expected exception; to test fault-tolerance" === e.getMessage())
|
||||
}
|
||||
actor.stop
|
||||
}
|
||||
}
|
||||
|
|
@ -26,7 +26,7 @@ class SupervisorMiscSpec extends WordSpec with MustMatchers {
|
|||
}).start
|
||||
|
||||
val actor2 = Actor.actorOf(new Actor {
|
||||
self.dispatcher = Dispatchers.newReactorBasedSingleThreadEventDrivenDispatcher("test")
|
||||
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
|
||||
override def postRestart(cause: Throwable) {countDownLatch.countDown}
|
||||
|
||||
protected def receive = {
|
||||
|
|
|
|||
|
|
@ -39,8 +39,6 @@
|
|||
<xsd:restriction base="xsd:token">
|
||||
<xsd:enumeration value="executor-based-event-driven"/>
|
||||
<xsd:enumeration value="executor-based-event-driven-work-stealing"/>
|
||||
<xsd:enumeration value="reactor-based-thread-pool-event-driven"/>
|
||||
<xsd:enumeration value="reactor-based-single-thread-event-driven"/>
|
||||
<xsd:enumeration value="thread-based"/>
|
||||
<xsd:enumeration value="hawt"/>
|
||||
</xsd:restriction>
|
||||
|
|
|
|||
|
|
@ -39,8 +39,6 @@
|
|||
<xsd:restriction base="xsd:token">
|
||||
<xsd:enumeration value="executor-based-event-driven"/>
|
||||
<xsd:enumeration value="executor-based-event-driven-work-stealing"/>
|
||||
<xsd:enumeration value="reactor-based-thread-pool-event-driven"/>
|
||||
<xsd:enumeration value="reactor-based-single-thread-event-driven"/>
|
||||
<xsd:enumeration value="thread-based"/>
|
||||
<xsd:enumeration value="hawt"/>
|
||||
</xsd:restriction>
|
||||
|
|
|
|||
|
|
@ -98,8 +98,6 @@ object AkkaSpringConfigurationTags {
|
|||
// dispatcher types
|
||||
val EXECUTOR_BASED_EVENT_DRIVEN = "executor-based-event-driven"
|
||||
val EXECUTOR_BASED_EVENT_DRIVEN_WORK_STEALING = "executor-based-event-driven-work-stealing"
|
||||
val REACTOR_BASED_THREAD_POOL_EVENT_DRIVEN = "reactor-based-thread-pool-event-driven"
|
||||
val REACTOR_BASED_SINGLE_THREAD_EVENT_DRIVEN = "reactor-based-single-thread-event-driven"
|
||||
val THREAD_BASED = "thread-based"
|
||||
val HAWT = "hawt"
|
||||
|
||||
|
|
|
|||
|
|
@ -26,8 +26,6 @@ object DispatcherFactoryBean {
|
|||
var dispatcher = properties.dispatcherType match {
|
||||
case EXECUTOR_BASED_EVENT_DRIVEN => Dispatchers.newExecutorBasedEventDrivenDispatcher(properties.name)
|
||||
case EXECUTOR_BASED_EVENT_DRIVEN_WORK_STEALING => Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher(properties.name)
|
||||
case REACTOR_BASED_THREAD_POOL_EVENT_DRIVEN => Dispatchers.newReactorBasedThreadPoolEventDrivenDispatcher(properties.name)
|
||||
case REACTOR_BASED_SINGLE_THREAD_EVENT_DRIVEN => Dispatchers.newReactorBasedSingleThreadEventDrivenDispatcher(properties.name)
|
||||
case THREAD_BASED => if (!actorRef.isDefined) {
|
||||
throw new IllegalArgumentException("Need an ActorRef to create a thread based dispatcher.")
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -48,8 +48,7 @@ trait DispatcherParser extends BeanParser {
|
|||
|
||||
val threadPoolElement = DomUtils.getChildElementByTagName(dispatcherElement, THREAD_POOL_TAG);
|
||||
if (threadPoolElement != null) {
|
||||
if (properties.dispatcherType == REACTOR_BASED_SINGLE_THREAD_EVENT_DRIVEN ||
|
||||
properties.dispatcherType == THREAD_BASED) {
|
||||
if (properties.dispatcherType == THREAD_BASED) {
|
||||
throw new IllegalArgumentException("Element 'thread-pool' not allowed for this dispatcher type.")
|
||||
}
|
||||
val threadPoolProperties = parseThreadPool(threadPoolElement)
|
||||
|
|
|
|||
|
|
@ -66,13 +66,6 @@ http://scalablesolutions.se/akka/akka-1.0-SNAPSHOT.xsd">
|
|||
<akka:thread-pool queue="synchronous-queue" fairness="true" />
|
||||
</akka:dispatcher>
|
||||
|
||||
<!-- reactor-based-thread-pool-event-driven-dispatcher with synchronous-queue -->
|
||||
<akka:dispatcher id="reactor-based-thread-pool-event-driven-dispatcher" type="reactor-based-thread-pool-event-driven" name="myDispatcher">
|
||||
<akka:thread-pool queue="synchronous-queue" fairness="true" />
|
||||
</akka:dispatcher>
|
||||
|
||||
<akka:dispatcher id="reactor-based-single-thread-event-driven-dispatcher" type="reactor-based-single-thread-event-driven" name="myDispatcher" />
|
||||
|
||||
<!-- executor-based-event-driven-work-stealing-dispatcher -->
|
||||
<akka:dispatcher id="executor-based-event-driven-work-stealing-dispatcher" type="executor-based-event-driven-work-stealing" name="workStealingDispatcher" />
|
||||
|
||||
|
|
|
|||
|
|
@ -57,7 +57,7 @@ class DispatcherBeanDefinitionParserTest extends Spec with ShouldMatchers {
|
|||
|
||||
it("should be able to parse the dispatcher with a thread pool configuration") {
|
||||
val xml = <akka:dispatcher id="dispatcher"
|
||||
type="reactor-based-thread-pool-event-driven"
|
||||
type="executor-based-event-driven"
|
||||
name="myDispatcher">
|
||||
<akka:thread-pool queue="linked-blocking-queue"
|
||||
capacity="50"
|
||||
|
|
@ -67,7 +67,7 @@ class DispatcherBeanDefinitionParserTest extends Spec with ShouldMatchers {
|
|||
</akka:dispatcher>
|
||||
val props = parser.parseDispatcher(dom(xml).getDocumentElement);
|
||||
assert(props != null)
|
||||
assert(props.dispatcherType == "reactor-based-thread-pool-event-driven")
|
||||
assert(props.dispatcherType == "executor-based-event-driven")
|
||||
assert(props.name == "myDispatcher")
|
||||
assert(props.threadPool.corePoolSize == 2)
|
||||
assert(props.threadPool.maxPoolSize == 10)
|
||||
|
|
@ -86,16 +86,6 @@ class DispatcherBeanDefinitionParserTest extends Spec with ShouldMatchers {
|
|||
evaluating {parser.parseDispatcher(dom(xml).getDocumentElement)} should produce[IllegalArgumentException]
|
||||
}
|
||||
|
||||
it("should throw IllegalArgumentException when configuring a single thread dispatcher with a thread pool") {
|
||||
val xml = <akka:dispatcher id="reactor-based-single-thread-event-driven-dispatcher"
|
||||
type="reactor-based-single-thread-event-driven"
|
||||
name="myDispatcher">
|
||||
<akka:thread-pool queue="synchronous-queue" fairness="true"/>
|
||||
</akka:dispatcher>
|
||||
evaluating {parser.parseDispatcher(dom(xml).getDocumentElement)} should produce[IllegalArgumentException]
|
||||
}
|
||||
|
||||
|
||||
it("should throw IllegalArgumentException when configuring a thread based dispatcher without TypedActor or UntypedActor") {
|
||||
val xml = <akka:dispatcher id="dispatcher" type="thread-based" name="myDispatcher"/>
|
||||
evaluating {parser.parseDispatcher(dom(xml).getDocumentElement)} should produce[IllegalArgumentException]
|
||||
|
|
|
|||
|
|
@ -96,19 +96,6 @@ class DispatcherSpringFeatureTest extends FeatureSpec with ShouldMatchers {
|
|||
assert(executor.getQueue().isInstanceOf[SynchronousQueue[Runnable]])
|
||||
}
|
||||
|
||||
scenario("get a reactor-based-thread-pool-event-driven-dispatcher with synchronous-queue from context") {
|
||||
val context = new ClassPathXmlApplicationContext("/dispatcher-config.xml")
|
||||
val dispatcher = context.getBean("reactor-based-thread-pool-event-driven-dispatcher").asInstanceOf[ReactorBasedThreadPoolEventDrivenDispatcher]
|
||||
val executor = getThreadPoolExecutorAndAssert(dispatcher)
|
||||
assert(executor.getQueue().isInstanceOf[SynchronousQueue[Runnable]])
|
||||
}
|
||||
|
||||
scenario("get a reactor-based-single-thread-event-driven-dispatcher with synchronous-queue from context") {
|
||||
val context = new ClassPathXmlApplicationContext("/dispatcher-config.xml")
|
||||
val dispatcher = context.getBean("reactor-based-single-thread-event-driven-dispatcher").asInstanceOf[ReactorBasedSingleThreadEventDrivenDispatcher]
|
||||
assert(dispatcher != null)
|
||||
}
|
||||
|
||||
scenario("get a executor-based-event-driven-work-stealing-dispatcher from context") {
|
||||
val context = new ClassPathXmlApplicationContext("/dispatcher-config.xml")
|
||||
val dispatcher = context.getBean("executor-based-event-driven-work-stealing-dispatcher").asInstanceOf[ExecutorBasedEventDrivenWorkStealingDispatcher]
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue