Improved priority mailbox usage, and documentation. See #1458
This commit is contained in:
parent
c4401f1ca8
commit
817c3c5ee7
6 changed files with 115 additions and 118 deletions
|
|
@ -3,51 +3,43 @@ package akka.dispatch
|
||||||
import akka.actor.{ Props, LocalActorRef, Actor }
|
import akka.actor.{ Props, LocalActorRef, Actor }
|
||||||
import akka.testkit.AkkaSpec
|
import akka.testkit.AkkaSpec
|
||||||
import akka.util.Duration
|
import akka.util.Duration
|
||||||
|
import akka.util.duration._
|
||||||
import akka.testkit.DefaultTimeout
|
import akka.testkit.DefaultTimeout
|
||||||
|
import com.typesafe.config.Config
|
||||||
|
|
||||||
|
object PriorityDispatcherSpec {
|
||||||
|
val config = """
|
||||||
|
unbounded-prio-dispatcher {
|
||||||
|
mailboxType = "akka.dispatch.PriorityDispatcherSpec$Unbounded"
|
||||||
|
}
|
||||||
|
bounded-prio-dispatcher {
|
||||||
|
mailboxType = "akka.dispatch.PriorityDispatcherSpec$Bounded"
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
|
||||||
|
class Unbounded(config: Config) extends UnboundedPriorityMailbox(PriorityGenerator({
|
||||||
|
case i: Int ⇒ i //Reverse order
|
||||||
|
case 'Result ⇒ Int.MaxValue
|
||||||
|
}: Any ⇒ Int))
|
||||||
|
|
||||||
|
class Bounded(config: Config) extends BoundedPriorityMailbox(PriorityGenerator({
|
||||||
|
case i: Int ⇒ i //Reverse order
|
||||||
|
case 'Result ⇒ Int.MaxValue
|
||||||
|
}: Any ⇒ Int), 1000, 10 seconds)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||||
class PriorityDispatcherSpec extends AkkaSpec with DefaultTimeout {
|
class PriorityDispatcherSpec extends AkkaSpec(PriorityDispatcherSpec.config) with DefaultTimeout {
|
||||||
|
|
||||||
"A PriorityDispatcher" must {
|
"A PriorityDispatcher" must {
|
||||||
"Order it's messages according to the specified comparator using an unbounded mailbox" in {
|
"Order it's messages according to the specified comparator using an unbounded mailbox" in {
|
||||||
|
|
||||||
// FIXME #1458: how should we make it easy to configure prio mailbox?
|
|
||||||
val dispatcherKey = "unbounded-prio-dispatcher"
|
val dispatcherKey = "unbounded-prio-dispatcher"
|
||||||
val dispatcherConfigurator = new MessageDispatcherConfigurator(system.dispatchers.defaultDispatcherConfig, system.dispatchers.prerequisites) {
|
|
||||||
val instance = {
|
|
||||||
val mailboxType = UnboundedPriorityMailbox(PriorityGenerator({
|
|
||||||
case i: Int ⇒ i //Reverse order
|
|
||||||
case 'Result ⇒ Int.MaxValue
|
|
||||||
}: Any ⇒ Int))
|
|
||||||
|
|
||||||
system.dispatchers.newDispatcher(dispatcherKey, 5, mailboxType).build
|
|
||||||
}
|
|
||||||
|
|
||||||
override def dispatcher(): MessageDispatcher = instance
|
|
||||||
}
|
|
||||||
system.dispatchers.register(dispatcherKey, dispatcherConfigurator)
|
|
||||||
|
|
||||||
testOrdering(dispatcherKey)
|
testOrdering(dispatcherKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
"Order it's messages according to the specified comparator using a bounded mailbox" in {
|
"Order it's messages according to the specified comparator using a bounded mailbox" in {
|
||||||
|
|
||||||
// FIXME #1458: how should we make it easy to configure prio mailbox?
|
|
||||||
val dispatcherKey = "bounded-prio-dispatcher"
|
val dispatcherKey = "bounded-prio-dispatcher"
|
||||||
val dispatcherConfigurator = new MessageDispatcherConfigurator(system.dispatchers.defaultDispatcherConfig, system.dispatchers.prerequisites) {
|
|
||||||
val instance = {
|
|
||||||
val mailboxType = BoundedPriorityMailbox(PriorityGenerator({
|
|
||||||
case i: Int ⇒ i //Reverse order
|
|
||||||
case 'Result ⇒ Int.MaxValue
|
|
||||||
}: Any ⇒ Int), 1000, system.settings.MailboxPushTimeout)
|
|
||||||
|
|
||||||
system.dispatchers.newDispatcher(dispatcherKey, 5, mailboxType).build
|
|
||||||
}
|
|
||||||
|
|
||||||
override def dispatcher(): MessageDispatcher = instance
|
|
||||||
}
|
|
||||||
system.dispatchers.register(dispatcherKey, dispatcherConfigurator)
|
|
||||||
|
|
||||||
testOrdering(dispatcherKey)
|
testOrdering(dispatcherKey)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -112,8 +112,8 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// FIXME #1458: Not sure if we should have this, but needed it temporary for PriorityDispatcherSpec, ActorModelSpec and DispatcherDocSpec
|
// FIXME #1458: Not sure if we should have this, but needed it temporary for ActorModelSpec and DispatcherDocSpec
|
||||||
def register(id: String, dispatcherConfigurator: MessageDispatcherConfigurator): Unit = {
|
private[akka] def register(id: String, dispatcherConfigurator: MessageDispatcherConfigurator): Unit = {
|
||||||
dispatcherConfigurators.putIfAbsent(id, dispatcherConfigurator)
|
dispatcherConfigurators.putIfAbsent(id, dispatcherConfigurator)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -131,35 +131,6 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
|
||||||
ConfigFactory.parseMap(Map("id" -> id).asJava)
|
ConfigFactory.parseMap(Map("id" -> id).asJava)
|
||||||
}
|
}
|
||||||
|
|
||||||
// FIXME #1458: Remove these newDispatcher methods, but still need them temporary for PriorityDispatcherSpec, ActorModelSpec and DispatcherDocSpec
|
|
||||||
/**
|
|
||||||
* Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
|
|
||||||
* <p/>
|
|
||||||
* Has a fluent builder interface for configuring its semantics.
|
|
||||||
*/
|
|
||||||
def newDispatcher(name: String) =
|
|
||||||
ThreadPoolConfigDispatcherBuilder(config ⇒ new Dispatcher(prerequisites, name, name, settings.DispatcherThroughput,
|
|
||||||
settings.DispatcherThroughputDeadlineTime, MailboxType, config, settings.DispatcherDefaultShutdown), ThreadPoolConfig())
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
|
|
||||||
* <p/>
|
|
||||||
* Has a fluent builder interface for configuring its semantics.
|
|
||||||
*/
|
|
||||||
def newDispatcher(name: String, throughput: Int, mailboxType: MailboxType) =
|
|
||||||
ThreadPoolConfigDispatcherBuilder(config ⇒
|
|
||||||
new Dispatcher(prerequisites, name, name, throughput, settings.DispatcherThroughputDeadlineTime, mailboxType,
|
|
||||||
config, settings.DispatcherDefaultShutdown), ThreadPoolConfig())
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
|
|
||||||
* <p/>
|
|
||||||
* Has a fluent builder interface for configuring its semantics.
|
|
||||||
*/
|
|
||||||
def newDispatcher(name: String, throughput: Int, throughputDeadline: Duration, mailboxType: MailboxType) =
|
|
||||||
ThreadPoolConfigDispatcherBuilder(config ⇒
|
|
||||||
new Dispatcher(prerequisites, name, name, throughput, throughputDeadline, mailboxType, config, settings.DispatcherDefaultShutdown), ThreadPoolConfig())
|
|
||||||
|
|
||||||
val MailboxType: MailboxType =
|
val MailboxType: MailboxType =
|
||||||
if (settings.MailboxCapacity < 1) UnboundedMailbox()
|
if (settings.MailboxCapacity < 1) UnboundedMailbox()
|
||||||
else BoundedMailbox(settings.MailboxCapacity, settings.MailboxPushTimeout)
|
else BoundedMailbox(settings.MailboxCapacity, settings.MailboxPushTimeout)
|
||||||
|
|
|
||||||
|
|
@ -14,15 +14,18 @@ import akka.dispatch.MessageDispatcher;
|
||||||
import akka.actor.UntypedActor;
|
import akka.actor.UntypedActor;
|
||||||
import akka.actor.UntypedActorFactory;
|
import akka.actor.UntypedActorFactory;
|
||||||
import akka.actor.Actors;
|
import akka.actor.Actors;
|
||||||
import akka.dispatch.PriorityGenerator;
|
|
||||||
import akka.dispatch.UnboundedPriorityMailbox;
|
|
||||||
import akka.dispatch.MessageDispatcherConfigurator;
|
|
||||||
import akka.dispatch.DispatcherPrerequisites;
|
|
||||||
import akka.event.Logging;
|
import akka.event.Logging;
|
||||||
import akka.event.LoggingAdapter;
|
import akka.event.LoggingAdapter;
|
||||||
|
|
||||||
//#imports-prio
|
//#imports-prio
|
||||||
|
|
||||||
|
//#imports-prio-mailbox
|
||||||
|
import akka.dispatch.PriorityGenerator;
|
||||||
|
import akka.dispatch.UnboundedPriorityMailbox;
|
||||||
|
import com.typesafe.config.Config;
|
||||||
|
|
||||||
|
//#imports-prio-mailbox
|
||||||
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
@ -73,34 +76,6 @@ public class DispatcherDocTestBase {
|
||||||
@Test
|
@Test
|
||||||
public void priorityDispatcher() throws Exception {
|
public void priorityDispatcher() throws Exception {
|
||||||
//#prio-dispatcher
|
//#prio-dispatcher
|
||||||
final PriorityGenerator generator = new PriorityGenerator() { // Create a new PriorityGenerator, lower prio means more important
|
|
||||||
@Override
|
|
||||||
public int gen(Object message) {
|
|
||||||
if (message.equals("highpriority"))
|
|
||||||
return 0; // 'highpriority messages should be treated first if possible
|
|
||||||
else if (message.equals("lowpriority"))
|
|
||||||
return 100; // 'lowpriority messages should be treated last if possible
|
|
||||||
else if (message.equals(Actors.poisonPill()))
|
|
||||||
return 1000; // PoisonPill when no other left
|
|
||||||
else
|
|
||||||
return 50; // We default to 50
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// FIXME #1458: how should we make it easy to configure prio mailbox?
|
|
||||||
// We create a new Priority dispatcher and seed it with the priority generator
|
|
||||||
final String dispatcherKey = "prio-dispatcher";
|
|
||||||
MessageDispatcherConfigurator dispatcherConfigurator = new MessageDispatcherConfigurator(system.dispatchers()
|
|
||||||
.defaultDispatcherConfig(), system.dispatchers().prerequisites()) {
|
|
||||||
private final MessageDispatcher instance = system.dispatchers()
|
|
||||||
.newDispatcher(dispatcherKey, 5, new UnboundedPriorityMailbox(generator)).build();
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public MessageDispatcher dispatcher() {
|
|
||||||
return instance;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
system.dispatchers().register(dispatcherKey, dispatcherConfigurator);
|
|
||||||
|
|
||||||
ActorRef myActor = system.actorOf( // We create a new Actor that just prints out what it processes
|
ActorRef myActor = system.actorOf( // We create a new Actor that just prints out what it processes
|
||||||
new Props().withCreator(new UntypedActorFactory() {
|
new Props().withCreator(new UntypedActorFactory() {
|
||||||
|
|
@ -123,7 +98,7 @@ public class DispatcherDocTestBase {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}).withDispatcher(dispatcherKey));
|
}).withDispatcher("prio-dispatcher-java"));
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Logs:
|
Logs:
|
||||||
|
|
@ -143,4 +118,27 @@ public class DispatcherDocTestBase {
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//#prio-mailbox
|
||||||
|
public static class PrioMailbox extends UnboundedPriorityMailbox {
|
||||||
|
|
||||||
|
static final PriorityGenerator generator = new PriorityGenerator() { // Create a new PriorityGenerator, lower prio means more important
|
||||||
|
@Override
|
||||||
|
public int gen(Object message) {
|
||||||
|
if (message.equals("highpriority"))
|
||||||
|
return 0; // 'highpriority messages should be treated first if possible
|
||||||
|
else if (message.equals("lowpriority"))
|
||||||
|
return 100; // 'lowpriority messages should be treated last if possible
|
||||||
|
else if (message.equals(Actors.poisonPill()))
|
||||||
|
return 1000; // PoisonPill when no other left
|
||||||
|
else
|
||||||
|
return 50; // We default to 50
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
public PrioMailbox(Config config) {
|
||||||
|
super(generator);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//#prio-mailbox
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -118,7 +118,19 @@ Sometimes it's useful to be able to specify priority order of messages, that is
|
||||||
an UnboundedPriorityMailbox or BoundedPriorityMailbox with a ``java.util.Comparator[Envelope]`` or use a
|
an UnboundedPriorityMailbox or BoundedPriorityMailbox with a ``java.util.Comparator[Envelope]`` or use a
|
||||||
``akka.dispatch.PriorityGenerator`` (recommended).
|
``akka.dispatch.PriorityGenerator`` (recommended).
|
||||||
|
|
||||||
Creating a Dispatcher using PriorityGenerator:
|
Creating a Dispatcher with a mailbox using PriorityGenerator:
|
||||||
|
|
||||||
|
Config:
|
||||||
|
|
||||||
|
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala
|
||||||
|
:include: prio-dispatcher-config-java
|
||||||
|
|
||||||
|
Priority mailbox:
|
||||||
|
|
||||||
|
.. includecode:: code/akka/docs/dispatcher/DispatcherDocTestBase.java
|
||||||
|
:include: imports-prio-mailbox,prio-mailbox
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
|
||||||
.. includecode:: code/akka/docs/dispatcher/DispatcherDocTestBase.java
|
.. includecode:: code/akka/docs/dispatcher/DispatcherDocTestBase.java
|
||||||
:include: imports-prio,prio-dispatcher
|
:include: imports-prio,prio-dispatcher
|
||||||
|
|
|
||||||
|
|
@ -6,10 +6,8 @@ package akka.docs.dispatcher
|
||||||
import org.scalatest.{ BeforeAndAfterAll, WordSpec }
|
import org.scalatest.{ BeforeAndAfterAll, WordSpec }
|
||||||
import org.scalatest.matchers.MustMatchers
|
import org.scalatest.matchers.MustMatchers
|
||||||
import akka.testkit.AkkaSpec
|
import akka.testkit.AkkaSpec
|
||||||
import akka.dispatch.PriorityGenerator
|
|
||||||
import akka.actor.Props
|
import akka.actor.Props
|
||||||
import akka.actor.Actor
|
import akka.actor.Actor
|
||||||
import akka.dispatch.UnboundedPriorityMailbox
|
|
||||||
import akka.event.Logging
|
import akka.event.Logging
|
||||||
import akka.event.LoggingAdapter
|
import akka.event.LoggingAdapter
|
||||||
import akka.util.duration._
|
import akka.util.duration._
|
||||||
|
|
@ -56,8 +54,36 @@ object DispatcherDocSpec {
|
||||||
type = BalancingDispatcher
|
type = BalancingDispatcher
|
||||||
}
|
}
|
||||||
//#my-balancing-config
|
//#my-balancing-config
|
||||||
|
|
||||||
|
//#prio-dispatcher-config
|
||||||
|
prio-dispatcher {
|
||||||
|
mailboxType = "akka.docs.dispatcher.DispatcherDocSpec$PrioMailbox"
|
||||||
|
}
|
||||||
|
//#prio-dispatcher-config
|
||||||
|
|
||||||
|
//#prio-dispatcher-config-java
|
||||||
|
prio-dispatcher-java {
|
||||||
|
mailboxType = "akka.docs.dispatcher.DispatcherDocTestBase$PrioMailbox"
|
||||||
|
}
|
||||||
|
//#prio-dispatcher-config-java
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
//#prio-mailbox
|
||||||
|
import akka.dispatch.PriorityGenerator
|
||||||
|
import akka.dispatch.UnboundedPriorityMailbox
|
||||||
|
import com.typesafe.config.Config
|
||||||
|
|
||||||
|
val generator = PriorityGenerator { // Create a new PriorityGenerator, lower prio means more important
|
||||||
|
case 'highpriority ⇒ 0 // 'highpriority messages should be treated first if possible
|
||||||
|
case 'lowpriority ⇒ 100 // 'lowpriority messages should be treated last if possible
|
||||||
|
case PoisonPill ⇒ 1000 // PoisonPill when no other left
|
||||||
|
case otherwise ⇒ 50 // We default to 50
|
||||||
|
}
|
||||||
|
|
||||||
|
// We create a new Priority dispatcher and seed it with the priority generator
|
||||||
|
class PrioMailbox(config: Config) extends UnboundedPriorityMailbox(generator)
|
||||||
|
//#prio-mailbox
|
||||||
|
|
||||||
class MyActor extends Actor {
|
class MyActor extends Actor {
|
||||||
def receive = {
|
def receive = {
|
||||||
case x ⇒
|
case x ⇒
|
||||||
|
|
@ -90,21 +116,6 @@ class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) {
|
||||||
|
|
||||||
"defining priority dispatcher" in {
|
"defining priority dispatcher" in {
|
||||||
//#prio-dispatcher
|
//#prio-dispatcher
|
||||||
val gen = PriorityGenerator { // Create a new PriorityGenerator, lower prio means more important
|
|
||||||
case 'highpriority ⇒ 0 // 'highpriority messages should be treated first if possible
|
|
||||||
case 'lowpriority ⇒ 100 // 'lowpriority messages should be treated last if possible
|
|
||||||
case PoisonPill ⇒ 1000 // PoisonPill when no other left
|
|
||||||
case otherwise ⇒ 50 // We default to 50
|
|
||||||
}
|
|
||||||
|
|
||||||
// FIXME #1458: how should we make it easy to configure prio mailbox?
|
|
||||||
// We create a new Priority dispatcher and seed it with the priority generator
|
|
||||||
val dispatcherKey = "prio-dispatcher"
|
|
||||||
val dispatcherConfigurator = new MessageDispatcherConfigurator(system.dispatchers.defaultDispatcherConfig, system.dispatchers.prerequisites) {
|
|
||||||
val instance = system.dispatchers.newDispatcher(dispatcherKey, 5, UnboundedPriorityMailbox(gen)).build
|
|
||||||
override def dispatcher(): MessageDispatcher = instance
|
|
||||||
}
|
|
||||||
system.dispatchers.register(dispatcherKey, dispatcherConfigurator)
|
|
||||||
|
|
||||||
val a = system.actorOf( // We create a new Actor that just prints out what it processes
|
val a = system.actorOf( // We create a new Actor that just prints out what it processes
|
||||||
Props(new Actor {
|
Props(new Actor {
|
||||||
|
|
@ -122,7 +133,7 @@ class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) {
|
||||||
def receive = {
|
def receive = {
|
||||||
case x ⇒ log.info(x.toString)
|
case x ⇒ log.info(x.toString)
|
||||||
}
|
}
|
||||||
}).withDispatcher(dispatcherKey))
|
}).withDispatcher("prio-dispatcher"))
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Logs:
|
Logs:
|
||||||
|
|
|
||||||
|
|
@ -118,9 +118,22 @@ Sometimes it's useful to be able to specify priority order of messages, that is
|
||||||
an UnboundedPriorityMailbox or BoundedPriorityMailbox with a ``java.util.Comparator[Envelope]`` or use a
|
an UnboundedPriorityMailbox or BoundedPriorityMailbox with a ``java.util.Comparator[Envelope]`` or use a
|
||||||
``akka.dispatch.PriorityGenerator`` (recommended).
|
``akka.dispatch.PriorityGenerator`` (recommended).
|
||||||
|
|
||||||
Creating a Dispatcher using PriorityGenerator:
|
Creating a Dispatcher with a mailbox using PriorityGenerator:
|
||||||
|
|
||||||
.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala#prio-dispatcher
|
Config:
|
||||||
|
|
||||||
|
.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala
|
||||||
|
:include: prio-dispatcher-config
|
||||||
|
|
||||||
|
Priority mailbox:
|
||||||
|
|
||||||
|
.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala
|
||||||
|
:include: prio-mailbox
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
|
||||||
|
.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala
|
||||||
|
:include: prio-dispatcher
|
||||||
|
|
||||||
Work-sharing event-based
|
Work-sharing event-based
|
||||||
^^^^^^^^^^^^^^^^^^^^^^^^^
|
^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue