diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/ControlAwareDispatcherSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/ControlAwareDispatcherSpec.scala new file mode 100644 index 0000000000..ba444b1671 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/dispatch/ControlAwareDispatcherSpec.scala @@ -0,0 +1,60 @@ +package akka.dispatch + +import akka.testkit.{ DefaultTimeout, AkkaSpec } +import akka.actor.{ Actor, Props } + +object ControlAwareDispatcherSpec { + val config = """ + unbounded-control-dispatcher { + mailbox-type = "akka.dispatch.UnboundedControlAwareMailbox" + } + bounded-control-dispatcher { + mailbox-type = "akka.dispatch.BoundedControlAwareMailbox" + } + """ + + case object ImportantMessage extends ControlMessage +} + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class ControlAwareDispatcherSpec extends AkkaSpec(ControlAwareDispatcherSpec.config) with DefaultTimeout { + import ControlAwareDispatcherSpec.ImportantMessage + + "A ControlAwareDispatcher" must { + "deliver control messages first using an unbounded mailbox" in { + val dispatcherKey = "unbounded-control-dispatcher" + testControl(dispatcherKey) + } + + "deliver control messages first using a bounded mailbox" in { + val dispatcherKey = "bounded-control-dispatcher" + testControl(dispatcherKey) + } + } + + def testControl(dispatcherKey: String) = { + // It's important that the actor under test is not a top level actor + // with RepointableActorRef, since messages might be queued in + // UnstartedCell and the sent to the PriorityQueue and consumed immediately + // without the ordering taking place. + val actor = system.actorOf(Props(new Actor { + context.actorOf(Props(new Actor { + + self ! "test" + self ! "test2" + self ! ImportantMessage + + def receive = { + case x ⇒ testActor ! x + } + }).withDispatcher(dispatcherKey)) + + def receive = Actor.emptyBehavior + + })) + + expectMsg(ImportantMessage) + expectMsg("test") + expectMsg("test2") + } +} diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index 3d07c1c3e8..a958e15751 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -194,6 +194,14 @@ class PriorityMailboxSpec extends MailboxSpec { } } +class ControlAwareMailboxSpec extends MailboxSpec { + lazy val name = "The control aware mailbox implementation" + def factory = { + case UnboundedMailbox() ⇒ new UnboundedControlAwareMailbox().create(None, None) + case BoundedMailbox(capacity, pushTimeOut) ⇒ new BoundedControlAwareMailbox(capacity, pushTimeOut).create(None, None) + } +} + object CustomMailboxSpec { val config = """ my-dispatcher { diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 1c0f37deae..ed76edc77e 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -377,6 +377,12 @@ akka { akka.actor.mailbox.bounded-deque-based "akka.dispatch.MultipleConsumerSemantics" = akka.actor.mailbox.unbounded-queue-based + "akka.dispatch.ControlAwareMessageQueueSemantics" = + akka.actor.mailbox.unbounded-control-aware-queue-based + "akka.dispatch.UnboundedControlAwareMessageQueueSemantics" = + akka.actor.mailbox.unbounded-control-aware-queue-based + "akka.dispatch.BoundedControlAwareMessageQueueSemantics" = + akka.actor.mailbox.bounded-control-aware-queue-based } unbounded-queue-based { @@ -406,6 +412,20 @@ akka { # com.typesafe.config.Config) parameters. mailbox-type = "akka.dispatch.BoundedDequeBasedMailbox" } + + unbounded-control-aware-queue-based { + # FQCN of the MailboxType, The Class of the FQCN must have a public + # constructor with (akka.actor.ActorSystem.Settings, + # com.typesafe.config.Config) parameters. + mailbox-type = "akka.dispatch.UnboundedControlAwareMailbox" + } + + bounded-control-aware-queue-based { + # FQCN of the MailboxType, The Class of the FQCN must have a public + # constructor with (akka.actor.ActorSystem.Settings, + # com.typesafe.config.Config) parameters. + mailbox-type = "akka.dispatch.BoundedControlAwareMailbox" + } } debug { diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index b6e481cdde..d1dfb1629e 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -16,6 +16,9 @@ import scala.concurrent.duration.FiniteDuration import scala.annotation.tailrec import scala.util.control.NonFatal import com.typesafe.config.Config +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.locks.ReentrantLock + /** * INTERNAL API */ @@ -676,6 +679,153 @@ object BoundedDequeBasedMailbox { } } +/** + * ControlAwareMessageQueue handles messages that extend [[akka.dispatch.ControlMessage]] with priority. + */ +trait ControlAwareMessageQueueSemantics extends QueueBasedMessageQueue { + def controlQueue: Queue[Envelope] + def queue: Queue[Envelope] + + def enqueue(receiver: ActorRef, handle: Envelope): Unit = handle match { + case envelope @ Envelope(_: ControlMessage, _) ⇒ controlQueue add envelope + case envelope ⇒ queue add envelope + } + + def dequeue(): Envelope = { + val controlMsg = controlQueue.poll() + + if (controlMsg ne null) controlMsg + else queue.poll() + } + + override def numberOfMessages: Int = controlQueue.size() + queue.size() + + override def hasMessages: Boolean = !(queue.isEmpty && controlQueue.isEmpty) +} + +trait UnboundedControlAwareMessageQueueSemantics extends UnboundedMessageQueueSemantics with ControlAwareMessageQueueSemantics +trait BoundedControlAwareMessageQueueSemantics extends BoundedMessageQueueSemantics with ControlAwareMessageQueueSemantics + +/** + * Messages that extend this trait will be handled with priority by control aware mailboxes. + */ +trait ControlMessage + +/** + * UnboundedControlAwareMailbox is an unbounded MailboxType, that maintains two queues + * to allow messages that extend [[akka.dispatch.ControlMessage]] to be delivered with priority. + */ +final case class UnboundedControlAwareMailbox() extends MailboxType with ProducesMessageQueue[UnboundedControlAwareMailbox.MessageQueue] { + + // this constructor will be called via reflection when this mailbox type + // is used in the application config + def this(settings: ActorSystem.Settings, config: Config) = this() + + def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = new UnboundedControlAwareMailbox.MessageQueue +} + +object UnboundedControlAwareMailbox { + class MessageQueue extends ControlAwareMessageQueueSemantics with UnboundedMessageQueueSemantics { + val controlQueue: Queue[Envelope] = new ConcurrentLinkedQueue[Envelope]() + val queue: Queue[Envelope] = new ConcurrentLinkedQueue[Envelope]() + } +} + +/** + * BoundedControlAwareMailbox is a bounded MailboxType, that maintains two queues + * to allow messages that extend [[akka.dispatch.ControlMessage]] to be delivered with priority. + */ +final case class BoundedControlAwareMailbox(capacity: Int, pushTimeOut: FiniteDuration) extends MailboxType with ProducesMessageQueue[BoundedControlAwareMailbox.MessageQueue] { + def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity"), + config.getNanosDuration("mailbox-push-timeout-time")) + + def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = new BoundedControlAwareMailbox.MessageQueue(capacity, pushTimeOut) +} + +object BoundedControlAwareMailbox { + class MessageQueue(val capacity: Int, val pushTimeOut: FiniteDuration) extends BoundedControlAwareMessageQueueSemantics { + + private final val size = new AtomicInteger(0) + private final val putLock = new ReentrantLock() + private final val notFull = putLock.newCondition() + + // no need to use blocking queues here, as blocking is being handled in `enqueueWithTimeout` + val controlQueue = new ConcurrentLinkedQueue[Envelope]() + val queue = new ConcurrentLinkedQueue[Envelope]() + + override def enqueue(receiver: ActorRef, handle: Envelope): Unit = handle match { + case envelope @ Envelope(_: ControlMessage, _) ⇒ enqueueWithTimeout(controlQueue, receiver, envelope) + case envelope ⇒ enqueueWithTimeout(queue, receiver, envelope) + } + + override def numberOfMessages: Int = size.get() + override def hasMessages: Boolean = numberOfMessages > 0 + + @tailrec + final override def dequeue(): Envelope = { + val count = size.get() + + // if both queues are empty return null + if (count > 0) { + // if there are messages try to fetch the current head + // or retry if other consumer dequeued in the mean time + if (size.compareAndSet(count, count - 1)) { + val item = super.dequeue() + + if (size.get < capacity) signalNotFull() + + item + } else { + dequeue() + } + } else { + null + } + } + + private def signalNotFull() { + putLock.lock() + + try { + notFull.signal() + } finally { + putLock.unlock() + } + } + + private final def enqueueWithTimeout(q: Queue[Envelope], receiver: ActorRef, envelope: Envelope) { + var remaining = pushTimeOut.toNanos + + putLock.lockInterruptibly() + val inserted = try { + var stop = false + while (size.get() == capacity && !stop) { + remaining = notFull.awaitNanos(remaining) + stop = remaining <= 0 + } + + if (stop) { + false + } else { + q.add(envelope) + val c = size.incrementAndGet() + + if (c < capacity) notFull.signal() + + true + } + } finally { + putLock.unlock() + } + + if (!inserted) { + receiver.asInstanceOf[InternalActorRef].provider.deadLetters.tell( + DeadLetter(envelope.message, envelope.sender, receiver), envelope.sender) + } + } + } +} + /** * Trait to signal that an Actor requires a certain type of message queue semantics. * diff --git a/akka-docs/rst/java/code/docs/dispatcher/DispatcherDocTest.java b/akka-docs/rst/java/code/docs/dispatcher/DispatcherDocTest.java index c028c8f2d2..85aee84fd1 100644 --- a/akka-docs/rst/java/code/docs/dispatcher/DispatcherDocTest.java +++ b/akka-docs/rst/java/code/docs/dispatcher/DispatcherDocTest.java @@ -3,6 +3,7 @@ */ package docs.dispatcher; +import akka.dispatch.ControlMessage; import akka.dispatch.RequiresMessageQueue; import akka.testkit.AkkaSpec; import com.typesafe.config.ConfigFactory; @@ -150,6 +151,41 @@ public class DispatcherDocTest { probe.expectMsgClass(Terminated.class); } + @Test + public void controlAwareDispatcher() throws Exception { + JavaTestKit probe = new JavaTestKit(system); + //#control-aware-dispatcher + + class Demo extends UntypedActor { + LoggingAdapter log = Logging.getLogger(getContext().system(), this); + { + for (Object msg : new Object[] { "foo", "bar", new MyControlMessage(), + PoisonPill.getInstance() }) { + getSelf().tell(msg, getSelf()); + } + } + + public void onReceive(Object message) { + log.info(message.toString()); + } + } + + // We create a new Actor that just prints out what it processes + ActorRef myActor = system.actorOf(Props.create(Demo.class, this) + .withDispatcher("control-aware-dispatcher")); + + /* + Logs: + 'MyControlMessage + 'foo + 'bar + */ + //#control-aware-dispatcher + + probe.watch(myActor); + probe.expectMsgClass(Terminated.class); + } + static //#prio-mailbox public class MyPrioMailbox extends UnboundedPriorityMailbox { @@ -173,6 +209,11 @@ public class DispatcherDocTest { } //#prio-mailbox + static + //#control-aware-mailbox-messages + public class MyControlMessage implements ControlMessage {} + //#control-aware-mailbox-messages + @Test public void requiredMailboxDispatcher() throws Exception { ActorRef myActor = system.actorOf(Props.create(MyUntypedActor.class) diff --git a/akka-docs/rst/java/mailboxes.rst b/akka-docs/rst/java/mailboxes.rst index 3aebc64db6..fb0dc5ed33 100644 --- a/akka-docs/rst/java/mailboxes.rst +++ b/akka-docs/rst/java/mailboxes.rst @@ -161,9 +161,36 @@ Akka comes shipped with a number of mailbox implementations: - Configuration name: "akka.dispatch.BoundedPriorityMailbox" +* UnboundedControlAwareMailbox + + - Delivers messages that extend ``akka.dispatch.ControlMessage`` with higher priority + + - Backed by two ``java.util.concurrent.ConcurrentLinkedQueue`` + + - Blocking: No + + - Bounded: No + + - Configuration name: "akka.dispatch.UnboundedControlAwareMailbox" + +* BoundedControlAwareMailbox + + - Delivers messages that extend ``akka.dispatch.ControlMessage`` with higher priority + + - Backed by two ``java.util.concurrent.ConcurrentLinkedQueue`` and blocking on enqueue if capacity has been reached + + - Blocking: Yes + + - Bounded: Yes + + - Configuration name: "akka.dispatch.BoundedControlAwareMailbox" + Mailbox configuration examples ============================== +PriorityMailbox +--------------- + How to create a PriorityMailbox: .. includecode:: ../java/code/docs/dispatcher/DispatcherDocTest.java#prio-mailbox @@ -189,6 +216,23 @@ Or code like this: .. includecode:: code/docs/dispatcher/DispatcherDocTest.java#defining-mailbox-in-code +ControlAwareMailbox +------------------- + +A ``ControlAwareMailbox`` can be very useful if an actor needs to be able to receive control messages +immediately no matter how many other messages are already in its mailbox. + +It can be configured like this: + +.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#control-aware-mailbox-config + +Control messages need to extend the ``ControlMessage`` trait: + +.. includecode:: ../java/code/docs/dispatcher/DispatcherDocTest.java#control-aware-mailbox-messages + +And then an example on how you would use it: + +.. includecode:: ../java/code/docs/dispatcher/DispatcherDocTest.java#control-aware-dispatcher Creating your own Mailbox type ============================== diff --git a/akka-docs/rst/scala/code/docs/dispatcher/DispatcherDocSpec.scala b/akka-docs/rst/scala/code/docs/dispatcher/DispatcherDocSpec.scala index ec8d96fb6b..dceac101c6 100644 --- a/akka-docs/rst/scala/code/docs/dispatcher/DispatcherDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/dispatcher/DispatcherDocSpec.scala @@ -175,6 +175,14 @@ object DispatcherDocSpec { mailbox-type = "docs.dispatcher.MyUnboundedMailbox" } //#custom-mailbox-config + + //#control-aware-mailbox-config + control-aware-dispatcher { + mailbox-type = "akka.dispatch.UnboundedControlAwareMailbox" + //Other dispatcher configuration goes here + } + //#control-aware-mailbox-config + """ //#prio-mailbox @@ -202,6 +210,12 @@ object DispatcherDocSpec { }) //#prio-mailbox + //#control-aware-mailbox-messages + import akka.dispatch.ControlMessage + + case object MyControlMessage extends ControlMessage + //#control-aware-mailbox-messages + class MyActor extends Actor { def receive = { case x => @@ -331,6 +345,39 @@ class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) { } } + "defining control aware dispatcher" in { + new AnyRef { + //#control-aware-dispatcher + + // We create a new Actor that just prints out what it processes + class Logger extends Actor { + val log: LoggingAdapter = Logging(context.system, this) + + self ! 'foo + self ! 'bar + self ! MyControlMessage + self ! PoisonPill + + def receive = { + case x => log.info(x.toString) + } + } + val a = system.actorOf(Props(classOf[Logger], this).withDispatcher( + "control-aware-dispatcher")) + + /* + * Logs: + * MyControlMessage + * 'foo + * 'bar + */ + //#control-aware-dispatcher + + watch(a) + expectMsgPF() { case Terminated(`a`) => () } + } + } + "require custom mailbox on dispatcher" in { val myActor = system.actorOf(Props[MyActor].withDispatcher( "custom-dispatcher")) diff --git a/akka-docs/rst/scala/mailboxes.rst b/akka-docs/rst/scala/mailboxes.rst index d96faba4b1..608c41ce82 100644 --- a/akka-docs/rst/scala/mailboxes.rst +++ b/akka-docs/rst/scala/mailboxes.rst @@ -161,9 +161,36 @@ Akka comes shipped with a number of mailbox implementations: - Configuration name: "akka.dispatch.BoundedPriorityMailbox" +* UnboundedControlAwareMailbox + + - Delivers messages that extend ``akka.dispatch.ControlMessage`` with higher priority + + - Backed by two ``java.util.concurrent.ConcurrentLinkedQueue`` + + - Blocking: No + + - Bounded: No + + - Configuration name: "akka.dispatch.UnboundedControlAwareMailbox" + +* BoundedControlAwareMailbox + + - Delivers messages that extend ``akka.dispatch.ControlMessage`` with higher priority + + - Backed by two ``java.util.concurrent.ConcurrentLinkedQueue`` and blocking on enqueue if capacity has been reached + + - Blocking: Yes + + - Bounded: Yes + + - Configuration name: "akka.dispatch.BoundedControlAwareMailbox" + Mailbox configuration examples ============================== +PriorityMailbox +--------------- + How to create a PriorityMailbox: .. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#prio-mailbox @@ -189,6 +216,23 @@ Or code like this: .. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#defining-mailbox-in-code +ControlAwareMailbox +------------------- + +A ``ControlAwareMailbox`` can be very useful if an actor needs to be able to receive control messages +immediately no matter how many other messages are already in its mailbox. + +It can be configured like this: + +.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#control-aware-mailbox-config + +Control messages need to extend the ``ControlMessage`` trait: + +.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#control-aware-mailbox-messages + +And then an example on how you would use it: + +.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#control-aware-dispatcher Creating your own Mailbox type ==============================