From f1a1755772e49e2c2cdce80c401dda22158cbd01 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 9 Sep 2010 12:51:47 +0200 Subject: [PATCH] Removing Reactor based dispatchers and closing #428 --- ...actReactorBasedEventDrivenDispatcher.scala | 52 ------ .../src/main/scala/dispatch/Dispatchers.scala | 25 +-- ...sedSingleThreadEventDrivenDispatcher.scala | 62 ------ ...BasedThreadPoolEventDrivenDispatcher.scala | 176 ------------------ .../test/scala/dispatch/DispatchersSpec.scala | 4 - ...ThreadEventDrivenDispatcherActorSpec.scala | 71 ------- ...adPoolEventDrivenDispatcherActorSpec.scala | 66 ------- .../actor/supervisor/SupervisorMiscSpec.scala | 2 +- .../akka/spring/akka-0.10.xsd | 2 - .../akka/spring/akka-1.0-SNAPSHOT.xsd | 2 - .../scala/AkkaSpringConfigurationTags.scala | 2 - .../main/scala/DispatcherFactoryBean.scala | 2 - .../src/main/scala/DispatcherParser.scala | 3 +- .../src/test/resources/dispatcher-config.xml | 7 - .../DispatcherBeanDefinitionParserTest.scala | 14 +- .../scala/DispatcherSpringFeatureTest.scala | 13 -- 16 files changed, 6 insertions(+), 497 deletions(-) delete mode 100644 akka-actor/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala delete mode 100644 akka-actor/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala delete mode 100644 akka-actor/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala delete mode 100644 akka-actor/src/test/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcherActorSpec.scala delete mode 100644 akka-actor/src/test/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcherActorSpec.scala diff --git a/akka-actor/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala deleted file mode 100644 index 24c566b48c..0000000000 --- a/akka-actor/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -package se.scalablesolutions.akka.dispatch - -import java.util.{LinkedList, Queue, List} -import java.util.HashMap - -import se.scalablesolutions.akka.actor.{Actor, ActorRef} - -abstract class AbstractReactorBasedEventDrivenDispatcher(val name: String) extends MessageDispatcher { - @volatile protected var active: Boolean = false - protected val queue = new ReactiveMessageQueue(name) - protected var selectorThread: Thread = _ - protected val guard = new Object - - def dispatch(invocation: MessageInvocation) = queue.append(invocation) - - def shutdown = if (active) { - log.debug("Shutting down %s", toString) - active = false - selectorThread.interrupt - doShutdown - } - - /** - * Subclass callback. Override if additional shutdown behavior is needed. - */ - protected def doShutdown = {} -} - -class ReactiveMessageQueue(name: String) extends MessageQueue { - private[akka] val queue: Queue[MessageInvocation] = new LinkedList[MessageInvocation] - @volatile private var interrupted = false - - def append(handle: MessageInvocation) = queue.synchronized { - queue.offer(handle) - queue.notifyAll - } - - def read(destination: List[MessageInvocation]) = queue.synchronized { - while (queue.isEmpty && !interrupted) queue.wait - if (!interrupted) while (!queue.isEmpty) destination.add(queue.remove) - else interrupted = false - } - - def interrupt = queue.synchronized { - interrupted = true - queue.notifyAll - } -} diff --git a/akka-actor/src/main/scala/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/dispatch/Dispatchers.scala index f33d3b1b24..5237f0b8dd 100644 --- a/akka-actor/src/main/scala/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/dispatch/Dispatchers.scala @@ -61,10 +61,6 @@ object Dispatchers extends Logging { } } - object globalReactorBasedSingleThreadEventDrivenDispatcher extends ReactorBasedSingleThreadEventDrivenDispatcher("global") - - object globalReactorBasedThreadPoolEventDrivenDispatcher extends ReactorBasedThreadPoolEventDrivenDispatcher("global") - /** * Creates an event-driven dispatcher based on the excellent HawtDispatch library. *

@@ -132,18 +128,6 @@ object Dispatchers extends Logging { */ def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String, mailboxCapacity: Int) = new ExecutorBasedEventDrivenWorkStealingDispatcher(name, mailboxCapacity) - /** - * Creates a reactor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool. - *

- * Has a fluent builder interface for configuring its semantics. - */ - def newReactorBasedThreadPoolEventDrivenDispatcher(name: String) = new ReactorBasedThreadPoolEventDrivenDispatcher(name) - - /** - * Creates a reactor-based event-driven dispatcher serving multiple (millions) of actors through a single thread. - */ - def newReactorBasedSingleThreadEventDrivenDispatcher(name: String) = new ReactorBasedSingleThreadEventDrivenDispatcher(name) - /** * Utility function that tries to load the specified dispatcher config from the akka.conf * or else use the supplied default dispatcher @@ -156,9 +140,8 @@ object Dispatchers extends Logging { * * default-dispatcher { * type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable - * # ReactorBasedSingleThreadEventDriven, (ExecutorBasedEventDrivenWorkStealing), ExecutorBasedEventDriven, - * # ReactorBasedThreadPoolEventDriven, Hawt, GlobalReactorBasedSingleThreadEventDriven, - * # GlobalReactorBasedThreadPoolEventDriven, GlobalExecutorBasedEventDriven, GlobalHawt + * # (ExecutorBasedEventDrivenWorkStealing), ExecutorBasedEventDriven, + * # Hawt, GlobalExecutorBasedEventDriven, GlobalHawt * keep-alive-ms = 60000 # Keep alive time for threads * core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor) * max-pool-size-factor = 4.0 # Max no of threads ... ceil(available processors * factor) @@ -197,13 +180,9 @@ object Dispatchers extends Logging { } val dispatcher: Option[MessageDispatcher] = cfg.getString("type") map { - case "ReactorBasedSingleThreadEventDriven" => new ReactorBasedSingleThreadEventDrivenDispatcher(name) case "ExecutorBasedEventDrivenWorkStealing" => new ExecutorBasedEventDrivenWorkStealingDispatcher(name,MAILBOX_CAPACITY,threadPoolConfig) case "ExecutorBasedEventDriven" => new ExecutorBasedEventDrivenDispatcher(name, cfg.getInt("throughput",THROUGHPUT),MAILBOX_CAPACITY,threadPoolConfig) - case "ReactorBasedThreadPoolEventDriven" => new ReactorBasedThreadPoolEventDrivenDispatcher(name,threadPoolConfig) case "Hawt" => new HawtDispatcher(cfg.getBool("aggregate").getOrElse(true)) - case "GlobalReactorBasedSingleThreadEventDriven" => globalReactorBasedSingleThreadEventDrivenDispatcher - case "GlobalReactorBasedThreadPoolEventDriven" => globalReactorBasedThreadPoolEventDrivenDispatcher case "GlobalExecutorBasedEventDriven" => globalExecutorBasedEventDrivenDispatcher case "GlobalHawt" => globalHawtDispatcher diff --git a/akka-actor/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala deleted file mode 100644 index c698b22c15..0000000000 --- a/akka-actor/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -/** - * Implements the Reactor pattern as defined in: [http://www.cs.wustl.edu/~schmidt/PDF/reactor-siemens.pdf]. - * See also this article: [http://today.java.net/cs/user/print/a/350]. - * - * Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/]. - */ -package se.scalablesolutions.akka.dispatch - -import java.util.{LinkedList, List} -import se.scalablesolutions.akka.actor.ActorRef - -class ReactorBasedSingleThreadEventDrivenDispatcher(_name: String) - extends AbstractReactorBasedEventDrivenDispatcher("akka:event-driven:reactor:single-thread:dispatcher:" + _name) { - - def start = if (!active) { - log.debug("Starting up %s", toString) - active = true - val messageDemultiplexer = new Demultiplexer(queue) - selectorThread = new Thread(name) { - override def run = { - while (active) { - try { - messageDemultiplexer.select - } catch { case e: InterruptedException => active = false } - val selectedInvocations = messageDemultiplexer.acquireSelectedInvocations - val iter = selectedInvocations.iterator - while (iter.hasNext) { - val invocation = iter.next - val invoker = invocation.receiver - if (invoker ne null) invoker invoke invocation - iter.remove - } - } - } - } - selectorThread.start - } - - def mailboxSize(a: ActorRef) = 0 - - def isShutdown = !active - - override def toString = "ReactorBasedSingleThreadEventDrivenDispatcher[" + name + "]" - - class Demultiplexer(private val messageQueue: ReactiveMessageQueue) extends MessageDemultiplexer { - - private val selectedQueue: List[MessageInvocation] = new LinkedList[MessageInvocation] - - def select = messageQueue.read(selectedQueue) - - def acquireSelectedInvocations: List[MessageInvocation] = selectedQueue - - def releaseSelectedInvocations = throw new UnsupportedOperationException("Demultiplexer can't release its queue") - - def wakeUp = throw new UnsupportedOperationException("Demultiplexer can't be woken up") - } -} - diff --git a/akka-actor/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala deleted file mode 100644 index 684f737c07..0000000000 --- a/akka-actor/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala +++ /dev/null @@ -1,176 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -package se.scalablesolutions.akka.dispatch - -import java.util.concurrent.locks.ReentrantLock - -import java.util.{HashSet, HashMap, LinkedList, List} -import se.scalablesolutions.akka.actor.{ActorRef, IllegalActorStateException} - -/** - * Implements the Reactor pattern as defined in: [http://www.cs.wustl.edu/~schmidt/PDF/reactor-siemens.pdf].
- * See also this article: [http://today.java.net/cs/user/print/a/350]. - *

- * - * Default settings are: - *

- *   - withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
- *   - NR_START_THREADS = 16
- *   - NR_MAX_THREADS = 128
- *   - KEEP_ALIVE_TIME = 60000L // one minute
- * 
- *

- * - * The dispatcher has a fluent builder interface to build up a thread pool to suite your use-case. - * There is a default thread pool defined but make use of the builder if you need it. Here are some examples. - *

- * - * Scala API. - *

- * Example usage: - *

- *   val dispatcher = new ReactorBasedThreadPoolEventDrivenDispatcher("name")
- *   dispatcher
- *     .withNewThreadPoolWithBoundedBlockingQueue(100)
- *     .setCorePoolSize(16)
- *     .setMaxPoolSize(128)
- *     .setKeepAliveTimeInMillis(60000)
- *     .setRejectionPolicy(new CallerRunsPolicy)
- *     .buildThreadPool
- * 
- *

- * - * Java API. - *

- * Example usage: - *

- *   ReactorBasedThreadPoolEventDrivenDispatcher dispatcher = new ReactorBasedThreadPoolEventDrivenDispatcher("name");
- *   dispatcher
- *     .withNewThreadPoolWithBoundedBlockingQueue(100)
- *     .setCorePoolSize(16)
- *     .setMaxPoolSize(128)
- *     .setKeepAliveTimeInMillis(60000)
- *     .setRejectionPolicy(new CallerRunsPolicy())
- *     .buildThreadPool();
- * 
- *

- * - * But the preferred way of creating dispatchers is to use - * the {@link se.scalablesolutions.akka.dispatch.Dispatchers} factory object. - * - * @author Jonas Bonér - */ -class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String,config: (ThreadPoolBuilder) => Unit) - extends AbstractReactorBasedEventDrivenDispatcher("akka:event-driven:reactor:dispatcher:" + _name) - with ThreadPoolBuilder { - - def this(_name: String) = this(_name,_ => ()) - - private var fair = true - private val busyActors = new HashSet[AnyRef] - private val messageDemultiplexer = new Demultiplexer(queue) - - // build default thread pool - init - - def start = if (!active) { - log.debug("Starting up %s", toString) - active = true - - /** - * This dispatcher code is based on code from the actorom actor framework by Sergio Bossa - * [http://code.google.com/p/actorom/]. - */ - selectorThread = new Thread(name) { - override def run = { - while (active) { - try { - try { - messageDemultiplexer.select - } catch { case e: InterruptedException => active = false } - process(messageDemultiplexer.acquireSelectedInvocations) - } finally { - messageDemultiplexer.releaseSelectedInvocations - } - } - } - }; - selectorThread.start - } - - override protected def doShutdown = executor.shutdownNow - - private def process(selectedInvocations: List[MessageInvocation]) = synchronized { - var nrOfBusyMessages = 0 - val totalNrOfActors = uuids.size - val totalNrOfBusyActors = busyActors.size - val invocations = selectedInvocations.iterator - while (invocations.hasNext && totalNrOfActors > totalNrOfBusyActors && passFairnessCheck(nrOfBusyMessages)) { - val invocation = invocations.next - if (invocation eq null) throw new IllegalActorStateException("Message invocation is null [" + invocation + "]") - if (!busyActors.contains(invocation.receiver)) { - val invoker = invocation.receiver - if (invoker eq null) throw new IllegalActorStateException( - "Message invoker for invocation [" + invocation + "] is null") - resume(invocation.receiver) - invocations.remove - executor.execute(new Runnable() { - def run = { - invoker.invoke(invocation) - suspend(invocation.receiver) - messageDemultiplexer.wakeUp - } - }) - } else nrOfBusyMessages += 1 - } - } - - private def resume(actor: AnyRef) = synchronized { - busyActors.add(actor) - } - - private def suspend(actor: AnyRef) = synchronized { - busyActors.remove(actor) - } - - private def passFairnessCheck(nrOfBusyMessages: Int) = { - if (fair) true - else nrOfBusyMessages < 100 - } - - def mailboxSize(a: ActorRef) = 0 - - def ensureNotActive(): Unit = if (active) throw new IllegalActorStateException( - "Can't build a new thread pool for a dispatcher that is already up and running") - - override def toString = "ReactorBasedThreadPoolEventDrivenDispatcher[" + name + "]" - - class Demultiplexer(private val messageQueue: ReactiveMessageQueue) extends MessageDemultiplexer { - private val selectedInvocations: List[MessageInvocation] = new LinkedList[MessageInvocation] - private val selectedInvocationsLock = new ReentrantLock - - def select = try { - selectedInvocationsLock.lock - messageQueue.read(selectedInvocations) - } finally { - selectedInvocationsLock.unlock - } - - def acquireSelectedInvocations: List[MessageInvocation] = { - selectedInvocationsLock.lock - selectedInvocations - } - - def releaseSelectedInvocations = selectedInvocationsLock.unlock - - def wakeUp = messageQueue.interrupt - } - - private[akka] def init = { - withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity - config(this) - buildThreadPool - } -} diff --git a/akka-actor/src/test/scala/dispatch/DispatchersSpec.scala b/akka-actor/src/test/scala/dispatch/DispatchersSpec.scala index bb548b9251..81fd933cda 100644 --- a/akka-actor/src/test/scala/dispatch/DispatchersSpec.scala +++ b/akka-actor/src/test/scala/dispatch/DispatchersSpec.scala @@ -28,13 +28,9 @@ object DispatchersSpec { def ofType[T <: MessageDispatcher : Manifest]: (MessageDispatcher) => Boolean = _.getClass == manifest[T].erasure def typesAndValidators: Map[String,(MessageDispatcher) => Boolean] = Map( - "ReactorBasedSingleThreadEventDriven" -> ofType[ReactorBasedSingleThreadEventDrivenDispatcher], "ExecutorBasedEventDrivenWorkStealing" -> ofType[ExecutorBasedEventDrivenWorkStealingDispatcher], "ExecutorBasedEventDriven" -> ofType[ExecutorBasedEventDrivenDispatcher], - "ReactorBasedThreadPoolEventDriven" -> ofType[ReactorBasedThreadPoolEventDrivenDispatcher], "Hawt" -> ofType[HawtDispatcher], - "GlobalReactorBasedSingleThreadEventDriven" -> instance(globalReactorBasedSingleThreadEventDrivenDispatcher), - "GlobalReactorBasedThreadPoolEventDriven" -> instance(globalReactorBasedThreadPoolEventDrivenDispatcher), "GlobalExecutorBasedEventDriven" -> instance(globalExecutorBasedEventDrivenDispatcher), "GlobalHawt" -> instance(globalHawtDispatcher) ) diff --git a/akka-actor/src/test/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcherActorSpec.scala b/akka-actor/src/test/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcherActorSpec.scala deleted file mode 100644 index de9b912bf5..0000000000 --- a/akka-actor/src/test/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcherActorSpec.scala +++ /dev/null @@ -1,71 +0,0 @@ -package se.scalablesolutions.akka.actor.dispatch - -import java.util.concurrent.{CountDownLatch, TimeUnit} -import org.scalatest.junit.JUnitSuite -import org.junit.Test - -import se.scalablesolutions.akka.dispatch.Dispatchers -import se.scalablesolutions.akka.actor.Actor -import Actor._ - -object ReactorBasedSingleThreadEventDrivenDispatcherActorSpec { - class TestActor extends Actor { - self.dispatcher = Dispatchers.newReactorBasedSingleThreadEventDrivenDispatcher(self.uuid) - - def receive = { - case "Hello" => - self.reply("World") - case "Failure" => - throw new RuntimeException("Expected exception; to test fault-tolerance") - } - } - - object OneWayTestActor { - val oneWay = new CountDownLatch(1) - } - class OneWayTestActor extends Actor { - self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(self.uuid) - def receive = { - case "OneWay" => OneWayTestActor.oneWay.countDown - } - } -} - -class ReactorBasedSingleThreadEventDrivenDispatcherActorSpec extends JUnitSuite { - import ReactorBasedSingleThreadEventDrivenDispatcherActorSpec._ - - private val unit = TimeUnit.MILLISECONDS - - @Test def shouldSendOneWay = { - val actor = actorOf[OneWayTestActor].start - val result = actor ! "OneWay" - assert(OneWayTestActor.oneWay.await(1, TimeUnit.SECONDS)) - actor.stop - } - - @Test def shouldSendReplySync = { - val actor = actorOf[TestActor].start - val result = (actor !! ("Hello", 10000)).as[String].get - assert("World" === result) - actor.stop - } - - @Test def shouldSendReplyAsync = { - val actor = actorOf[TestActor].start - val result = actor !! "Hello" - assert("World" === result.get.asInstanceOf[String]) - actor.stop - } - - @Test def shouldSendReceiveException = { - val actor = actorOf[TestActor].start - try { - actor !! "Failure" - fail("Should have thrown an exception") - } catch { - case e => - assert("Expected exception; to test fault-tolerance" === e.getMessage()) - } - actor.stop - } -} diff --git a/akka-actor/src/test/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcherActorSpec.scala b/akka-actor/src/test/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcherActorSpec.scala deleted file mode 100644 index 4001df8f56..0000000000 --- a/akka-actor/src/test/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcherActorSpec.scala +++ /dev/null @@ -1,66 +0,0 @@ -package se.scalablesolutions.akka.actor.dispatch - -import java.util.concurrent.{CountDownLatch, TimeUnit} -import org.scalatest.junit.JUnitSuite -import org.junit.Test - -import se.scalablesolutions.akka.dispatch.Dispatchers -import se.scalablesolutions.akka.actor.Actor -import Actor._ - -object ReactorBasedThreadPoolEventDrivenDispatcherActorSpec { - class TestActor extends Actor { - self.dispatcher = Dispatchers.newReactorBasedThreadPoolEventDrivenDispatcher(self.uuid) - def receive = { - case "Hello" => - self.reply("World") - case "Failure" => - throw new RuntimeException("Expected exception; to test fault-tolerance") - } - } -} - -class ReactorBasedThreadPoolEventDrivenDispatcherActorSpec extends JUnitSuite { - import ReactorBasedThreadPoolEventDrivenDispatcherActorSpec._ - - private val unit = TimeUnit.MILLISECONDS - - @Test def shouldSendOneWay { - val oneWay = new CountDownLatch(1) - val actor = actorOf(new Actor { - self.dispatcher = Dispatchers.newReactorBasedThreadPoolEventDrivenDispatcher(self.uuid) - def receive = { - case "OneWay" => oneWay.countDown - } - }).start - val result = actor ! "OneWay" - assert(oneWay.await(1, TimeUnit.SECONDS)) - actor.stop - } - - @Test def shouldSendReplySync = { - val actor = actorOf[TestActor].start - val result = (actor !! ("Hello", 10000)).as[String].get - assert("World" === result) - actor.stop - } - - @Test def shouldSendReplyAsync = { - val actor = actorOf[TestActor].start - val result = actor !! "Hello" - assert("World" === result.get.asInstanceOf[String]) - actor.stop - } - - @Test def shouldSendReceiveException = { - val actor = actorOf[TestActor].start - try { - actor !! "Failure" - fail("Should have thrown an exception") - } catch { - case e => - assert("Expected exception; to test fault-tolerance" === e.getMessage()) - } - actor.stop - } -} diff --git a/akka-core/src/test/scala/actor/supervisor/SupervisorMiscSpec.scala b/akka-core/src/test/scala/actor/supervisor/SupervisorMiscSpec.scala index 3d048684cd..26fdb6e1ef 100644 --- a/akka-core/src/test/scala/actor/supervisor/SupervisorMiscSpec.scala +++ b/akka-core/src/test/scala/actor/supervisor/SupervisorMiscSpec.scala @@ -26,7 +26,7 @@ class SupervisorMiscSpec extends WordSpec with MustMatchers { }).start val actor2 = Actor.actorOf(new Actor { - self.dispatcher = Dispatchers.newReactorBasedSingleThreadEventDrivenDispatcher("test") + self.dispatcher = Dispatchers.newThreadBasedDispatcher(self) override def postRestart(cause: Throwable) {countDownLatch.countDown} protected def receive = { diff --git a/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-0.10.xsd b/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-0.10.xsd index 2a42ec0900..e66090fe16 100644 --- a/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-0.10.xsd +++ b/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-0.10.xsd @@ -39,8 +39,6 @@ - - diff --git a/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-1.0-SNAPSHOT.xsd b/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-1.0-SNAPSHOT.xsd index c3d7608bee..cf3c8ffafc 100644 --- a/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-1.0-SNAPSHOT.xsd +++ b/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-1.0-SNAPSHOT.xsd @@ -39,8 +39,6 @@ - - diff --git a/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala b/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala index 2743d772da..4037f2c3ba 100644 --- a/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala +++ b/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala @@ -98,8 +98,6 @@ object AkkaSpringConfigurationTags { // dispatcher types val EXECUTOR_BASED_EVENT_DRIVEN = "executor-based-event-driven" val EXECUTOR_BASED_EVENT_DRIVEN_WORK_STEALING = "executor-based-event-driven-work-stealing" - val REACTOR_BASED_THREAD_POOL_EVENT_DRIVEN = "reactor-based-thread-pool-event-driven" - val REACTOR_BASED_SINGLE_THREAD_EVENT_DRIVEN = "reactor-based-single-thread-event-driven" val THREAD_BASED = "thread-based" val HAWT = "hawt" diff --git a/akka-spring/src/main/scala/DispatcherFactoryBean.scala b/akka-spring/src/main/scala/DispatcherFactoryBean.scala index 5986b5a697..4d13fa6814 100644 --- a/akka-spring/src/main/scala/DispatcherFactoryBean.scala +++ b/akka-spring/src/main/scala/DispatcherFactoryBean.scala @@ -26,8 +26,6 @@ object DispatcherFactoryBean { var dispatcher = properties.dispatcherType match { case EXECUTOR_BASED_EVENT_DRIVEN => Dispatchers.newExecutorBasedEventDrivenDispatcher(properties.name) case EXECUTOR_BASED_EVENT_DRIVEN_WORK_STEALING => Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher(properties.name) - case REACTOR_BASED_THREAD_POOL_EVENT_DRIVEN => Dispatchers.newReactorBasedThreadPoolEventDrivenDispatcher(properties.name) - case REACTOR_BASED_SINGLE_THREAD_EVENT_DRIVEN => Dispatchers.newReactorBasedSingleThreadEventDrivenDispatcher(properties.name) case THREAD_BASED => if (!actorRef.isDefined) { throw new IllegalArgumentException("Need an ActorRef to create a thread based dispatcher.") } else { diff --git a/akka-spring/src/main/scala/DispatcherParser.scala b/akka-spring/src/main/scala/DispatcherParser.scala index e9f10e1328..4eaa4e05a7 100644 --- a/akka-spring/src/main/scala/DispatcherParser.scala +++ b/akka-spring/src/main/scala/DispatcherParser.scala @@ -48,8 +48,7 @@ trait DispatcherParser extends BeanParser { val threadPoolElement = DomUtils.getChildElementByTagName(dispatcherElement, THREAD_POOL_TAG); if (threadPoolElement != null) { - if (properties.dispatcherType == REACTOR_BASED_SINGLE_THREAD_EVENT_DRIVEN || - properties.dispatcherType == THREAD_BASED) { + if (properties.dispatcherType == THREAD_BASED) { throw new IllegalArgumentException("Element 'thread-pool' not allowed for this dispatcher type.") } val threadPoolProperties = parseThreadPool(threadPoolElement) diff --git a/akka-spring/src/test/resources/dispatcher-config.xml b/akka-spring/src/test/resources/dispatcher-config.xml index 37c33516e0..728917c6c8 100644 --- a/akka-spring/src/test/resources/dispatcher-config.xml +++ b/akka-spring/src/test/resources/dispatcher-config.xml @@ -66,13 +66,6 @@ http://scalablesolutions.se/akka/akka-1.0-SNAPSHOT.xsd"> - - - - - - - diff --git a/akka-spring/src/test/scala/DispatcherBeanDefinitionParserTest.scala b/akka-spring/src/test/scala/DispatcherBeanDefinitionParserTest.scala index 83c179e29a..9dfb5bce94 100644 --- a/akka-spring/src/test/scala/DispatcherBeanDefinitionParserTest.scala +++ b/akka-spring/src/test/scala/DispatcherBeanDefinitionParserTest.scala @@ -57,7 +57,7 @@ class DispatcherBeanDefinitionParserTest extends Spec with ShouldMatchers { it("should be able to parse the dispatcher with a thread pool configuration") { val xml = val props = parser.parseDispatcher(dom(xml).getDocumentElement); assert(props != null) - assert(props.dispatcherType == "reactor-based-thread-pool-event-driven") + assert(props.dispatcherType == "executor-based-event-driven") assert(props.name == "myDispatcher") assert(props.threadPool.corePoolSize == 2) assert(props.threadPool.maxPoolSize == 10) @@ -86,16 +86,6 @@ class DispatcherBeanDefinitionParserTest extends Spec with ShouldMatchers { evaluating {parser.parseDispatcher(dom(xml).getDocumentElement)} should produce[IllegalArgumentException] } - it("should throw IllegalArgumentException when configuring a single thread dispatcher with a thread pool") { - val xml = - - - evaluating {parser.parseDispatcher(dom(xml).getDocumentElement)} should produce[IllegalArgumentException] - } - - it("should throw IllegalArgumentException when configuring a thread based dispatcher without TypedActor or UntypedActor") { val xml = evaluating {parser.parseDispatcher(dom(xml).getDocumentElement)} should produce[IllegalArgumentException] diff --git a/akka-spring/src/test/scala/DispatcherSpringFeatureTest.scala b/akka-spring/src/test/scala/DispatcherSpringFeatureTest.scala index 0165808ce9..5a8eb1c9c2 100644 --- a/akka-spring/src/test/scala/DispatcherSpringFeatureTest.scala +++ b/akka-spring/src/test/scala/DispatcherSpringFeatureTest.scala @@ -96,19 +96,6 @@ class DispatcherSpringFeatureTest extends FeatureSpec with ShouldMatchers { assert(executor.getQueue().isInstanceOf[SynchronousQueue[Runnable]]) } - scenario("get a reactor-based-thread-pool-event-driven-dispatcher with synchronous-queue from context") { - val context = new ClassPathXmlApplicationContext("/dispatcher-config.xml") - val dispatcher = context.getBean("reactor-based-thread-pool-event-driven-dispatcher").asInstanceOf[ReactorBasedThreadPoolEventDrivenDispatcher] - val executor = getThreadPoolExecutorAndAssert(dispatcher) - assert(executor.getQueue().isInstanceOf[SynchronousQueue[Runnable]]) - } - - scenario("get a reactor-based-single-thread-event-driven-dispatcher with synchronous-queue from context") { - val context = new ClassPathXmlApplicationContext("/dispatcher-config.xml") - val dispatcher = context.getBean("reactor-based-single-thread-event-driven-dispatcher").asInstanceOf[ReactorBasedSingleThreadEventDrivenDispatcher] - assert(dispatcher != null) - } - scenario("get a executor-based-event-driven-work-stealing-dispatcher from context") { val context = new ClassPathXmlApplicationContext("/dispatcher-config.xml") val dispatcher = context.getBean("executor-based-event-driven-work-stealing-dispatcher").asInstanceOf[ExecutorBasedEventDrivenWorkStealingDispatcher]