diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala index cf8695d35f..ec6aab48be 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala @@ -3,51 +3,43 @@ package akka.dispatch import akka.actor.{ Props, LocalActorRef, Actor } import akka.testkit.AkkaSpec import akka.util.Duration +import akka.util.duration._ import akka.testkit.DefaultTimeout +import com.typesafe.config.Config + +object PriorityDispatcherSpec { + val config = """ + unbounded-prio-dispatcher { + mailboxType = "akka.dispatch.PriorityDispatcherSpec$Unbounded" + } + bounded-prio-dispatcher { + mailboxType = "akka.dispatch.PriorityDispatcherSpec$Bounded" + } + """ + + class Unbounded(config: Config) extends UnboundedPriorityMailbox(PriorityGenerator({ + case i: Int ⇒ i //Reverse order + case 'Result ⇒ Int.MaxValue + }: Any ⇒ Int)) + + class Bounded(config: Config) extends BoundedPriorityMailbox(PriorityGenerator({ + case i: Int ⇒ i //Reverse order + case 'Result ⇒ Int.MaxValue + }: Any ⇒ Int), 1000, 10 seconds) + +} @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class PriorityDispatcherSpec extends AkkaSpec with DefaultTimeout { +class PriorityDispatcherSpec extends AkkaSpec(PriorityDispatcherSpec.config) with DefaultTimeout { "A PriorityDispatcher" must { "Order it's messages according to the specified comparator using an unbounded mailbox" in { - - // FIXME #1458: how should we make it easy to configure prio mailbox? val dispatcherKey = "unbounded-prio-dispatcher" - val dispatcherConfigurator = new MessageDispatcherConfigurator(system.dispatchers.defaultDispatcherConfig, system.dispatchers.prerequisites) { - val instance = { - val mailboxType = UnboundedPriorityMailbox(PriorityGenerator({ - case i: Int ⇒ i //Reverse order - case 'Result ⇒ Int.MaxValue - }: Any ⇒ Int)) - - system.dispatchers.newDispatcher(dispatcherKey, 5, mailboxType).build - } - - override def dispatcher(): MessageDispatcher = instance - } - system.dispatchers.register(dispatcherKey, dispatcherConfigurator) - testOrdering(dispatcherKey) } "Order it's messages according to the specified comparator using a bounded mailbox" in { - - // FIXME #1458: how should we make it easy to configure prio mailbox? val dispatcherKey = "bounded-prio-dispatcher" - val dispatcherConfigurator = new MessageDispatcherConfigurator(system.dispatchers.defaultDispatcherConfig, system.dispatchers.prerequisites) { - val instance = { - val mailboxType = BoundedPriorityMailbox(PriorityGenerator({ - case i: Int ⇒ i //Reverse order - case 'Result ⇒ Int.MaxValue - }: Any ⇒ Int), 1000, system.settings.MailboxPushTimeout) - - system.dispatchers.newDispatcher(dispatcherKey, 5, mailboxType).build - } - - override def dispatcher(): MessageDispatcher = instance - } - system.dispatchers.register(dispatcherKey, dispatcherConfigurator) - testOrdering(dispatcherKey) } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index c5d9831096..8c860fe013 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -112,8 +112,8 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc } } - // FIXME #1458: Not sure if we should have this, but needed it temporary for PriorityDispatcherSpec, ActorModelSpec and DispatcherDocSpec - def register(id: String, dispatcherConfigurator: MessageDispatcherConfigurator): Unit = { + // FIXME #1458: Not sure if we should have this, but needed it temporary for ActorModelSpec and DispatcherDocSpec + private[akka] def register(id: String, dispatcherConfigurator: MessageDispatcherConfigurator): Unit = { dispatcherConfigurators.putIfAbsent(id, dispatcherConfigurator) } @@ -131,35 +131,6 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc ConfigFactory.parseMap(Map("id" -> id).asJava) } - // FIXME #1458: Remove these newDispatcher methods, but still need them temporary for PriorityDispatcherSpec, ActorModelSpec and DispatcherDocSpec - /** - * Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool. - *

- * Has a fluent builder interface for configuring its semantics. - */ - def newDispatcher(name: String) = - ThreadPoolConfigDispatcherBuilder(config ⇒ new Dispatcher(prerequisites, name, name, settings.DispatcherThroughput, - settings.DispatcherThroughputDeadlineTime, MailboxType, config, settings.DispatcherDefaultShutdown), ThreadPoolConfig()) - - /** - * Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool. - *

- * Has a fluent builder interface for configuring its semantics. - */ - def newDispatcher(name: String, throughput: Int, mailboxType: MailboxType) = - ThreadPoolConfigDispatcherBuilder(config ⇒ - new Dispatcher(prerequisites, name, name, throughput, settings.DispatcherThroughputDeadlineTime, mailboxType, - config, settings.DispatcherDefaultShutdown), ThreadPoolConfig()) - - /** - * Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool. - *

- * Has a fluent builder interface for configuring its semantics. - */ - def newDispatcher(name: String, throughput: Int, throughputDeadline: Duration, mailboxType: MailboxType) = - ThreadPoolConfigDispatcherBuilder(config ⇒ - new Dispatcher(prerequisites, name, name, throughput, throughputDeadline, mailboxType, config, settings.DispatcherDefaultShutdown), ThreadPoolConfig()) - val MailboxType: MailboxType = if (settings.MailboxCapacity < 1) UnboundedMailbox() else BoundedMailbox(settings.MailboxCapacity, settings.MailboxPushTimeout) diff --git a/akka-docs/java/code/akka/docs/dispatcher/DispatcherDocTestBase.java b/akka-docs/java/code/akka/docs/dispatcher/DispatcherDocTestBase.java index 436c2b0677..fc76c36a14 100644 --- a/akka-docs/java/code/akka/docs/dispatcher/DispatcherDocTestBase.java +++ b/akka-docs/java/code/akka/docs/dispatcher/DispatcherDocTestBase.java @@ -14,15 +14,18 @@ import akka.dispatch.MessageDispatcher; import akka.actor.UntypedActor; import akka.actor.UntypedActorFactory; import akka.actor.Actors; -import akka.dispatch.PriorityGenerator; -import akka.dispatch.UnboundedPriorityMailbox; -import akka.dispatch.MessageDispatcherConfigurator; -import akka.dispatch.DispatcherPrerequisites; import akka.event.Logging; import akka.event.LoggingAdapter; //#imports-prio +//#imports-prio-mailbox +import akka.dispatch.PriorityGenerator; +import akka.dispatch.UnboundedPriorityMailbox; +import com.typesafe.config.Config; + +//#imports-prio-mailbox + import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -73,34 +76,6 @@ public class DispatcherDocTestBase { @Test public void priorityDispatcher() throws Exception { //#prio-dispatcher - final PriorityGenerator generator = new PriorityGenerator() { // Create a new PriorityGenerator, lower prio means more important - @Override - public int gen(Object message) { - if (message.equals("highpriority")) - return 0; // 'highpriority messages should be treated first if possible - else if (message.equals("lowpriority")) - return 100; // 'lowpriority messages should be treated last if possible - else if (message.equals(Actors.poisonPill())) - return 1000; // PoisonPill when no other left - else - return 50; // We default to 50 - } - }; - - // FIXME #1458: how should we make it easy to configure prio mailbox? - // We create a new Priority dispatcher and seed it with the priority generator - final String dispatcherKey = "prio-dispatcher"; - MessageDispatcherConfigurator dispatcherConfigurator = new MessageDispatcherConfigurator(system.dispatchers() - .defaultDispatcherConfig(), system.dispatchers().prerequisites()) { - private final MessageDispatcher instance = system.dispatchers() - .newDispatcher(dispatcherKey, 5, new UnboundedPriorityMailbox(generator)).build(); - - @Override - public MessageDispatcher dispatcher() { - return instance; - } - }; - system.dispatchers().register(dispatcherKey, dispatcherConfigurator); ActorRef myActor = system.actorOf( // We create a new Actor that just prints out what it processes new Props().withCreator(new UntypedActorFactory() { @@ -123,7 +98,7 @@ public class DispatcherDocTestBase { } }; } - }).withDispatcher(dispatcherKey)); + }).withDispatcher("prio-dispatcher-java")); /* Logs: @@ -143,4 +118,27 @@ public class DispatcherDocTestBase { Thread.sleep(100); } } + + //#prio-mailbox + public static class PrioMailbox extends UnboundedPriorityMailbox { + + static final PriorityGenerator generator = new PriorityGenerator() { // Create a new PriorityGenerator, lower prio means more important + @Override + public int gen(Object message) { + if (message.equals("highpriority")) + return 0; // 'highpriority messages should be treated first if possible + else if (message.equals("lowpriority")) + return 100; // 'lowpriority messages should be treated last if possible + else if (message.equals(Actors.poisonPill())) + return 1000; // PoisonPill when no other left + else + return 50; // We default to 50 + } + }; + + public PrioMailbox(Config config) { + super(generator); + } + } + //#prio-mailbox } diff --git a/akka-docs/java/dispatchers.rst b/akka-docs/java/dispatchers.rst index 901cf6ff56..29eba1769c 100644 --- a/akka-docs/java/dispatchers.rst +++ b/akka-docs/java/dispatchers.rst @@ -118,7 +118,19 @@ Sometimes it's useful to be able to specify priority order of messages, that is an UnboundedPriorityMailbox or BoundedPriorityMailbox with a ``java.util.Comparator[Envelope]`` or use a ``akka.dispatch.PriorityGenerator`` (recommended). -Creating a Dispatcher using PriorityGenerator: +Creating a Dispatcher with a mailbox using PriorityGenerator: + +Config: + +.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala + :include: prio-dispatcher-config-java + +Priority mailbox: + +.. includecode:: code/akka/docs/dispatcher/DispatcherDocTestBase.java + :include: imports-prio-mailbox,prio-mailbox + +Usage: .. includecode:: code/akka/docs/dispatcher/DispatcherDocTestBase.java :include: imports-prio,prio-dispatcher diff --git a/akka-docs/scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala b/akka-docs/scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala index 2ce292a882..2b576fe479 100644 --- a/akka-docs/scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala @@ -6,10 +6,8 @@ package akka.docs.dispatcher import org.scalatest.{ BeforeAndAfterAll, WordSpec } import org.scalatest.matchers.MustMatchers import akka.testkit.AkkaSpec -import akka.dispatch.PriorityGenerator import akka.actor.Props import akka.actor.Actor -import akka.dispatch.UnboundedPriorityMailbox import akka.event.Logging import akka.event.LoggingAdapter import akka.util.duration._ @@ -56,8 +54,36 @@ object DispatcherDocSpec { type = BalancingDispatcher } //#my-balancing-config + + //#prio-dispatcher-config + prio-dispatcher { + mailboxType = "akka.docs.dispatcher.DispatcherDocSpec$PrioMailbox" + } + //#prio-dispatcher-config + + //#prio-dispatcher-config-java + prio-dispatcher-java { + mailboxType = "akka.docs.dispatcher.DispatcherDocTestBase$PrioMailbox" + } + //#prio-dispatcher-config-java """ + //#prio-mailbox + import akka.dispatch.PriorityGenerator + import akka.dispatch.UnboundedPriorityMailbox + import com.typesafe.config.Config + + val generator = PriorityGenerator { // Create a new PriorityGenerator, lower prio means more important + case 'highpriority ⇒ 0 // 'highpriority messages should be treated first if possible + case 'lowpriority ⇒ 100 // 'lowpriority messages should be treated last if possible + case PoisonPill ⇒ 1000 // PoisonPill when no other left + case otherwise ⇒ 50 // We default to 50 + } + + // We create a new Priority dispatcher and seed it with the priority generator + class PrioMailbox(config: Config) extends UnboundedPriorityMailbox(generator) + //#prio-mailbox + class MyActor extends Actor { def receive = { case x ⇒ @@ -90,21 +116,6 @@ class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) { "defining priority dispatcher" in { //#prio-dispatcher - val gen = PriorityGenerator { // Create a new PriorityGenerator, lower prio means more important - case 'highpriority ⇒ 0 // 'highpriority messages should be treated first if possible - case 'lowpriority ⇒ 100 // 'lowpriority messages should be treated last if possible - case PoisonPill ⇒ 1000 // PoisonPill when no other left - case otherwise ⇒ 50 // We default to 50 - } - - // FIXME #1458: how should we make it easy to configure prio mailbox? - // We create a new Priority dispatcher and seed it with the priority generator - val dispatcherKey = "prio-dispatcher" - val dispatcherConfigurator = new MessageDispatcherConfigurator(system.dispatchers.defaultDispatcherConfig, system.dispatchers.prerequisites) { - val instance = system.dispatchers.newDispatcher(dispatcherKey, 5, UnboundedPriorityMailbox(gen)).build - override def dispatcher(): MessageDispatcher = instance - } - system.dispatchers.register(dispatcherKey, dispatcherConfigurator) val a = system.actorOf( // We create a new Actor that just prints out what it processes Props(new Actor { @@ -122,7 +133,7 @@ class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) { def receive = { case x ⇒ log.info(x.toString) } - }).withDispatcher(dispatcherKey)) + }).withDispatcher("prio-dispatcher")) /* Logs: diff --git a/akka-docs/scala/dispatchers.rst b/akka-docs/scala/dispatchers.rst index c33bc3e629..5f38b84641 100644 --- a/akka-docs/scala/dispatchers.rst +++ b/akka-docs/scala/dispatchers.rst @@ -118,9 +118,22 @@ Sometimes it's useful to be able to specify priority order of messages, that is an UnboundedPriorityMailbox or BoundedPriorityMailbox with a ``java.util.Comparator[Envelope]`` or use a ``akka.dispatch.PriorityGenerator`` (recommended). -Creating a Dispatcher using PriorityGenerator: +Creating a Dispatcher with a mailbox using PriorityGenerator: -.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala#prio-dispatcher +Config: + +.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala + :include: prio-dispatcher-config + +Priority mailbox: + +.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala + :include: prio-mailbox + +Usage: + +.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala + :include: prio-dispatcher Work-sharing event-based ^^^^^^^^^^^^^^^^^^^^^^^^^