=doc #3639 Show complete custom mailbox example.
This commit is contained in:
parent
e8bc604280
commit
4e1100f84d
8 changed files with 237 additions and 78 deletions
|
|
@ -3,6 +3,15 @@
|
||||||
*/
|
*/
|
||||||
package docs.dispatcher;
|
package docs.dispatcher;
|
||||||
|
|
||||||
|
import akka.dispatch.RequiresMessageQueue;
|
||||||
|
import akka.testkit.AkkaSpec;
|
||||||
|
import com.typesafe.config.ConfigFactory;
|
||||||
|
import docs.actor.MyBoundedUntypedActor;
|
||||||
|
import docs.actor.MyUntypedActor;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import scala.concurrent.ExecutionContext;
|
||||||
|
|
||||||
//#imports
|
//#imports
|
||||||
import akka.actor.*;
|
import akka.actor.*;
|
||||||
//#imports
|
//#imports
|
||||||
|
|
@ -18,35 +27,14 @@ import akka.dispatch.PriorityGenerator;
|
||||||
import akka.dispatch.UnboundedPriorityMailbox;
|
import akka.dispatch.UnboundedPriorityMailbox;
|
||||||
import akka.testkit.AkkaJUnitActorSystemResource;
|
import akka.testkit.AkkaJUnitActorSystemResource;
|
||||||
import akka.testkit.JavaTestKit;
|
import akka.testkit.JavaTestKit;
|
||||||
import akka.testkit.TestKit;
|
|
||||||
import com.typesafe.config.Config;
|
import com.typesafe.config.Config;
|
||||||
|
|
||||||
//#imports-prio-mailbox
|
//#imports-prio-mailbox
|
||||||
|
|
||||||
//#imports-custom
|
|
||||||
import akka.dispatch.Envelope;
|
|
||||||
import akka.dispatch.MessageQueue;
|
|
||||||
import akka.dispatch.MailboxType;
|
|
||||||
import java.util.Queue;
|
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
||||||
|
|
||||||
//#imports-custom
|
|
||||||
|
|
||||||
//#imports-required-mailbox
|
//#imports-required-mailbox
|
||||||
|
|
||||||
//#imports-required-mailbox
|
//#imports-required-mailbox
|
||||||
|
|
||||||
import docs.actor.MyBoundedUntypedActor;
|
|
||||||
import org.junit.ClassRule;
|
|
||||||
import org.junit.Test;
|
|
||||||
import scala.Option;
|
|
||||||
import scala.concurrent.ExecutionContext;
|
|
||||||
|
|
||||||
import com.typesafe.config.ConfigFactory;
|
|
||||||
|
|
||||||
import docs.actor.MyUntypedActor;
|
|
||||||
import akka.testkit.AkkaSpec;
|
|
||||||
|
|
||||||
public class DispatcherDocTest {
|
public class DispatcherDocTest {
|
||||||
|
|
||||||
@ClassRule
|
@ClassRule
|
||||||
|
|
@ -185,34 +173,29 @@ public class DispatcherDocTest {
|
||||||
}
|
}
|
||||||
//#prio-mailbox
|
//#prio-mailbox
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void requiredMailboxDispatcher() throws Exception {
|
||||||
|
ActorRef myActor = system.actorOf(Props.create(MyUntypedActor.class)
|
||||||
|
.withDispatcher("custom-dispatcher"));
|
||||||
|
}
|
||||||
|
|
||||||
static
|
static
|
||||||
//#mailbox-implementation-example
|
//#require-mailbox-on-actor
|
||||||
public class MyUnboundedMailbox implements MailboxType {
|
public class MySpecialActor extends UntypedActor implements
|
||||||
|
RequiresMessageQueue<MyUnboundedJMessageQueueSemantics> {
|
||||||
|
//#require-mailbox-on-actor
|
||||||
|
@Override
|
||||||
|
public void onReceive(Object message) throws Exception {
|
||||||
|
unhandled(message);
|
||||||
|
}
|
||||||
|
//#require-mailbox-on-actor
|
||||||
|
// ...
|
||||||
|
}
|
||||||
|
//#require-mailbox-on-actor
|
||||||
|
|
||||||
// This constructor signature must exist, it will be called by Akka
|
@Test
|
||||||
public MyUnboundedMailbox(ActorSystem.Settings settings, Config config) {
|
public void requiredMailboxActor() throws Exception {
|
||||||
// put your initialization code here
|
ActorRef myActor = system.actorOf(Props.create(MySpecialActor.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
// The create method is called to create the MessageQueue
|
|
||||||
public MessageQueue create(Option<ActorRef> owner, Option<ActorSystem> system) {
|
|
||||||
return new MessageQueue() {
|
|
||||||
private final Queue<Envelope> queue = new ConcurrentLinkedQueue<Envelope>();
|
|
||||||
|
|
||||||
// these must be implemented; queue used as example
|
|
||||||
public void enqueue(ActorRef receiver, Envelope handle) {
|
|
||||||
queue.offer(handle);
|
|
||||||
}
|
|
||||||
public Envelope dequeue() { return queue.poll(); }
|
|
||||||
public int numberOfMessages() { return queue.size(); }
|
|
||||||
public boolean hasMessages() { return !queue.isEmpty(); }
|
|
||||||
public void cleanUp(ActorRef owner, MessageQueue deadLetters) {
|
|
||||||
for (Envelope handle: queue) {
|
|
||||||
deadLetters.enqueue(owner, handle);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
//#mailbox-implementation-example
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,51 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
package docs.dispatcher;
|
||||||
|
|
||||||
|
//#mailbox-implementation-example
|
||||||
|
import akka.actor.ActorRef;
|
||||||
|
import akka.actor.ActorSystem;
|
||||||
|
import akka.dispatch.Envelope;
|
||||||
|
import akka.dispatch.MailboxType;
|
||||||
|
import akka.dispatch.MessageQueue;
|
||||||
|
import akka.dispatch.ProducesMessageQueue;
|
||||||
|
import com.typesafe.config.Config;
|
||||||
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
|
import java.util.Queue;
|
||||||
|
import scala.Option;
|
||||||
|
|
||||||
|
public class MyUnboundedJMailbox implements MailboxType,
|
||||||
|
ProducesMessageQueue<MyUnboundedJMailbox.MyMessageQueue> {
|
||||||
|
|
||||||
|
// This is the MessageQueue implementation
|
||||||
|
public static class MyMessageQueue implements MessageQueue,
|
||||||
|
MyUnboundedJMessageQueueSemantics {
|
||||||
|
private final Queue<Envelope> queue =
|
||||||
|
new ConcurrentLinkedQueue<Envelope>();
|
||||||
|
|
||||||
|
// these must be implemented; queue used as example
|
||||||
|
public void enqueue(ActorRef receiver, Envelope handle) {
|
||||||
|
queue.offer(handle);
|
||||||
|
}
|
||||||
|
public Envelope dequeue() { return queue.poll(); }
|
||||||
|
public int numberOfMessages() { return queue.size(); }
|
||||||
|
public boolean hasMessages() { return !queue.isEmpty(); }
|
||||||
|
public void cleanUp(ActorRef owner, MessageQueue deadLetters) {
|
||||||
|
for (Envelope handle: queue) {
|
||||||
|
deadLetters.enqueue(owner, handle);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// This constructor signature must exist, it will be called by Akka
|
||||||
|
public MyUnboundedJMailbox(ActorSystem.Settings settings, Config config) {
|
||||||
|
// put your initialization code here
|
||||||
|
}
|
||||||
|
|
||||||
|
// The create method is called to create the MessageQueue
|
||||||
|
public MessageQueue create(Option<ActorRef> owner, Option<ActorSystem> system) {
|
||||||
|
return new MyMessageQueue();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//#mailbox-implementation-example
|
||||||
|
|
@ -0,0 +1,11 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package docs.dispatcher;
|
||||||
|
|
||||||
|
//#mailbox-implementation-example
|
||||||
|
// Marker interface used for mailbox requirements mapping
|
||||||
|
public interface MyUnboundedJMessageQueueSemantics {
|
||||||
|
}
|
||||||
|
//#mailbox-implementation-example
|
||||||
|
|
@ -197,9 +197,9 @@ Creating your own Mailbox type
|
||||||
|
|
||||||
An example is worth a thousand quacks:
|
An example is worth a thousand quacks:
|
||||||
|
|
||||||
.. includecode:: code/docs/dispatcher/DispatcherDocTest.java#imports-custom
|
.. includecode:: code/docs/dispatcher/MyUnboundedJMailbox.java#mailbox-implementation-example
|
||||||
|
|
||||||
.. includecode:: code/docs/dispatcher/DispatcherDocTest.java#mailbox-implementation-example
|
.. includecode:: code/docs/dispatcher/MyUnboundedJMessageQueueSemantics.java#mailbox-implementation-example
|
||||||
|
|
||||||
And then you just specify the FQCN of your MailboxType as the value of the "mailbox-type" in the dispatcher
|
And then you just specify the FQCN of your MailboxType as the value of the "mailbox-type" in the dispatcher
|
||||||
configuration, or the mailbox configuration.
|
configuration, or the mailbox configuration.
|
||||||
|
|
@ -214,6 +214,16 @@ configuration, or the mailbox configuration.
|
||||||
this mailbox type; the mailbox type will be instantiated once for each
|
this mailbox type; the mailbox type will be instantiated once for each
|
||||||
dispatcher or mailbox setting using it.
|
dispatcher or mailbox setting using it.
|
||||||
|
|
||||||
|
You can also use the mailbox as a requirement on the dispatcher like this:
|
||||||
|
|
||||||
|
.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#custom-mailbox-config-java
|
||||||
|
|
||||||
|
|
||||||
|
Or by defining the requirement on your actor class like this:
|
||||||
|
|
||||||
|
.. includecode:: code/docs/dispatcher/DispatcherDocTest.java#require-mailbox-on-actor
|
||||||
|
|
||||||
|
|
||||||
Special Semantics of ``system.actorOf``
|
Special Semantics of ``system.actorOf``
|
||||||
=======================================
|
=======================================
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,7 @@ import akka.event.LoggingAdapter
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import akka.actor._
|
import akka.actor._
|
||||||
import docs.dispatcher.DispatcherDocSpec.MyBoundedActor
|
import docs.dispatcher.DispatcherDocSpec.MyBoundedActor
|
||||||
|
import akka.dispatch.RequiresMessageQueue
|
||||||
|
|
||||||
object DispatcherDocSpec {
|
object DispatcherDocSpec {
|
||||||
val javaConfig = """
|
val javaConfig = """
|
||||||
|
|
@ -29,6 +30,22 @@ object DispatcherDocSpec {
|
||||||
//Other mailbox configuration goes here
|
//Other mailbox configuration goes here
|
||||||
}
|
}
|
||||||
//#prio-mailbox-config-java
|
//#prio-mailbox-config-java
|
||||||
|
|
||||||
|
//#custom-mailbox-config-java
|
||||||
|
custom-dispatcher {
|
||||||
|
mailbox-requirement =
|
||||||
|
"docs.dispatcher.MyUnboundedJMessageQueueSemantics"
|
||||||
|
}
|
||||||
|
|
||||||
|
akka.actor.mailbox.requirements {
|
||||||
|
"docs.dispatcher.MyUnboundedJMessageQueueSemantics" =
|
||||||
|
custom-dispatcher-mailbox
|
||||||
|
}
|
||||||
|
|
||||||
|
custom-dispatcher-mailbox {
|
||||||
|
mailbox-type = "docs.dispatcher.MyUnboundedJMailbox"
|
||||||
|
}
|
||||||
|
//#custom-mailbox-config-java
|
||||||
"""
|
"""
|
||||||
|
|
||||||
val config = """
|
val config = """
|
||||||
|
|
@ -154,6 +171,21 @@ object DispatcherDocSpec {
|
||||||
}
|
}
|
||||||
//#required-mailbox-config
|
//#required-mailbox-config
|
||||||
|
|
||||||
|
//#custom-mailbox-config
|
||||||
|
custom-dispatcher {
|
||||||
|
mailbox-requirement =
|
||||||
|
"docs.dispatcher.MyUnboundedMessageQueueSemantics"
|
||||||
|
}
|
||||||
|
|
||||||
|
akka.actor.mailbox.requirements {
|
||||||
|
"docs.dispatcher.MyUnboundedMessageQueueSemantics" =
|
||||||
|
custom-dispatcher-mailbox
|
||||||
|
}
|
||||||
|
|
||||||
|
custom-dispatcher-mailbox {
|
||||||
|
mailbox-type = "docs.dispatcher.MyUnboundedMailbox"
|
||||||
|
}
|
||||||
|
//#custom-mailbox-config
|
||||||
"""
|
"""
|
||||||
|
|
||||||
//#prio-mailbox
|
//#prio-mailbox
|
||||||
|
|
@ -195,33 +227,22 @@ object DispatcherDocSpec {
|
||||||
with RequiresMessageQueue[BoundedMessageQueueSemantics]
|
with RequiresMessageQueue[BoundedMessageQueueSemantics]
|
||||||
//#required-mailbox-class
|
//#required-mailbox-class
|
||||||
|
|
||||||
//#mailbox-implementation-example
|
//#require-mailbox-on-actor
|
||||||
class MyUnboundedMailbox extends akka.dispatch.MailboxType {
|
class MySpecialActor extends Actor
|
||||||
import akka.actor.{ ActorRef, ActorSystem }
|
with RequiresMessageQueue[MyUnboundedMessageQueueSemantics] {
|
||||||
import com.typesafe.config.Config
|
//#require-mailbox-on-actor
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue
|
def receive = {
|
||||||
import akka.dispatch.{
|
case _ ⇒
|
||||||
Envelope,
|
|
||||||
MessageQueue,
|
|
||||||
UnboundedQueueBasedMessageQueue
|
|
||||||
}
|
}
|
||||||
|
//#require-mailbox-on-actor
|
||||||
// This constructor signature must exist, it will be called by Akka
|
// ...
|
||||||
def this(settings: ActorSystem.Settings, config: Config) = this()
|
|
||||||
|
|
||||||
// The create method is called to create the MessageQueue
|
|
||||||
final override def create(owner: Option[ActorRef],
|
|
||||||
system: Option[ActorSystem]): MessageQueue =
|
|
||||||
new UnboundedQueueBasedMessageQueue {
|
|
||||||
final val queue = new ConcurrentLinkedQueue[Envelope]()
|
|
||||||
}
|
}
|
||||||
}
|
//#require-mailbox-on-actor
|
||||||
//#mailbox-implementation-example
|
|
||||||
}
|
}
|
||||||
|
|
||||||
class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) {
|
class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) {
|
||||||
|
|
||||||
import DispatcherDocSpec.MyActor
|
import DispatcherDocSpec._
|
||||||
|
|
||||||
"defining dispatcher in config" in {
|
"defining dispatcher in config" in {
|
||||||
val context = system
|
val context = system
|
||||||
|
|
@ -325,4 +346,13 @@ class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) {
|
||||||
val dispatcher = system.dispatchers.lookup("my-balancing-dispatcher")
|
val dispatcher = system.dispatchers.lookup("my-balancing-dispatcher")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"require custom mailbox on dispatcher" in {
|
||||||
|
val myActor = system.actorOf(Props[MyActor].withDispatcher(
|
||||||
|
"custom-dispatcher"))
|
||||||
|
}
|
||||||
|
|
||||||
|
"require custom mailbox on actor" in {
|
||||||
|
val myActor = system.actorOf(Props[MySpecialActor])
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,59 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package docs.dispatcher
|
||||||
|
|
||||||
|
//#mailbox-implementation-example
|
||||||
|
import akka.actor.ActorRef
|
||||||
|
import akka.actor.ActorSystem
|
||||||
|
import akka.dispatch.Envelope
|
||||||
|
import akka.dispatch.MailboxType
|
||||||
|
import akka.dispatch.MessageQueue
|
||||||
|
import akka.dispatch.ProducesMessageQueue
|
||||||
|
import com.typesafe.config.Config
|
||||||
|
import java.util.concurrent.ConcurrentLinkedQueue
|
||||||
|
import scala.Option
|
||||||
|
|
||||||
|
// Marker trait used for mailbox requirements mapping
|
||||||
|
trait MyUnboundedMessageQueueSemantics
|
||||||
|
|
||||||
|
object MyUnboundedMailbox {
|
||||||
|
// This is the MessageQueue implementation
|
||||||
|
class MyMessageQueue extends MessageQueue
|
||||||
|
with MyUnboundedMessageQueueSemantics {
|
||||||
|
|
||||||
|
private final val queue = new ConcurrentLinkedQueue[Envelope]()
|
||||||
|
|
||||||
|
// these must be implemented; queue used as example
|
||||||
|
def enqueue(receiver: ActorRef, handle: Envelope): Unit =
|
||||||
|
queue.offer(handle)
|
||||||
|
def dequeue(): Envelope = queue.poll()
|
||||||
|
def numberOfMessages: Int = queue.size
|
||||||
|
def hasMessages: Boolean = !queue.isEmpty
|
||||||
|
def cleanUp(owner: ActorRef, deadLetters: MessageQueue) {
|
||||||
|
while (hasMessages) {
|
||||||
|
deadLetters.enqueue(owner, dequeue())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// This is the Mailbox implementation
|
||||||
|
class MyUnboundedMailbox extends MailboxType
|
||||||
|
with ProducesMessageQueue[MyUnboundedMailbox.MyMessageQueue] {
|
||||||
|
|
||||||
|
import MyUnboundedMailbox._
|
||||||
|
|
||||||
|
// This constructor signature must exist, it will be called by Akka
|
||||||
|
def this(settings: ActorSystem.Settings, config: Config) = {
|
||||||
|
// put your initialization code here
|
||||||
|
this()
|
||||||
|
}
|
||||||
|
|
||||||
|
// The create method is called to create the MessageQueue
|
||||||
|
final override def create(owner: Option[ActorRef],
|
||||||
|
system: Option[ActorSystem]): MessageQueue =
|
||||||
|
new MyMessageQueue()
|
||||||
|
}
|
||||||
|
//#mailbox-implementation-example
|
||||||
|
|
@ -197,7 +197,7 @@ Creating your own Mailbox type
|
||||||
|
|
||||||
An example is worth a thousand quacks:
|
An example is worth a thousand quacks:
|
||||||
|
|
||||||
.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#mailbox-implementation-example
|
.. includecode:: ../scala/code/docs/dispatcher/MyUnboundedMailbox.scala#mailbox-implementation-example
|
||||||
|
|
||||||
And then you just specify the FQCN of your MailboxType as the value of the "mailbox-type" in the dispatcher
|
And then you just specify the FQCN of your MailboxType as the value of the "mailbox-type" in the dispatcher
|
||||||
configuration, or the mailbox configuration.
|
configuration, or the mailbox configuration.
|
||||||
|
|
@ -212,6 +212,15 @@ configuration, or the mailbox configuration.
|
||||||
this mailbox type; the mailbox type will be instantiated once for each
|
this mailbox type; the mailbox type will be instantiated once for each
|
||||||
dispatcher or mailbox setting using it.
|
dispatcher or mailbox setting using it.
|
||||||
|
|
||||||
|
You can also use the mailbox as a requirement on the dispatcher like this:
|
||||||
|
|
||||||
|
.. includecode:: code/docs/dispatcher/DispatcherDocSpec.scala#custom-mailbox-config-java
|
||||||
|
|
||||||
|
|
||||||
|
Or by defining the requirement on your actor class like this:
|
||||||
|
|
||||||
|
.. includecode:: code/docs/dispatcher/DispatcherDocSpec.scala#require-mailbox-on-actor
|
||||||
|
|
||||||
|
|
||||||
Special Semantics of ``system.actorOf``
|
Special Semantics of ``system.actorOf``
|
||||||
=======================================
|
=======================================
|
||||||
|
|
|
||||||
|
|
@ -34,11 +34,17 @@ public class AkkaJUnitActorSystemResource extends ExternalResource {
|
||||||
private final Config config;
|
private final Config config;
|
||||||
|
|
||||||
private ActorSystem createSystem(String name, Config config) {
|
private ActorSystem createSystem(String name, Config config) {
|
||||||
|
try {
|
||||||
if (config == null)
|
if (config == null)
|
||||||
return ActorSystem.create(name);
|
return ActorSystem.create(name);
|
||||||
else
|
else
|
||||||
return ActorSystem.create(name, config);
|
return ActorSystem.create(name, config);
|
||||||
}
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public AkkaJUnitActorSystemResource(String name, Config config) {
|
public AkkaJUnitActorSystemResource(String name, Config config) {
|
||||||
this.name = name;
|
this.name = name;
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue