Merge pull request #1160 from akka/wip-2761-java-durable-mailbox-master-patriknw
Java docs and sample of durable mailbox, see #2761
This commit is contained in:
commit
18935f2f49
14 changed files with 266 additions and 15 deletions
|
|
@ -0,0 +1,31 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package docs.actor.mailbox
|
||||
|
||||
import akka.actor.mailbox.DurableMailboxSpec
|
||||
|
||||
object MyDurableMailboxDocSpec {
|
||||
val config = """
|
||||
MyStorage-dispatcher {
|
||||
mailbox-type = docs.actor.mailbox.MyDurableMailboxType
|
||||
}
|
||||
"""
|
||||
}
|
||||
|
||||
class MyDurableMailboxDocSpec extends DurableMailboxSpec("MyStorage", MyDurableMailboxDocSpec.config) {
|
||||
override def atStartup() {
|
||||
}
|
||||
|
||||
override def afterTermination() {
|
||||
}
|
||||
|
||||
"MyDurableMailbox (Java)" must {
|
||||
"deliver a message" in {
|
||||
val actor = createMailboxTestActor()
|
||||
implicit val sender = testActor
|
||||
actor ! "hello"
|
||||
expectMsg("hello")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,30 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package docs.actor.mailbox;
|
||||
|
||||
//#custom-mailbox-type
|
||||
import scala.Option;
|
||||
import com.typesafe.config.Config;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.ExtendedActorSystem;
|
||||
import akka.dispatch.MailboxType;
|
||||
import akka.dispatch.MessageQueue;
|
||||
|
||||
public class MyDurableMailboxType implements MailboxType {
|
||||
|
||||
public MyDurableMailboxType(ActorSystem.Settings settings, Config config) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageQueue create(Option<ActorRef> owner,
|
||||
Option<ActorSystem> system) {
|
||||
if (owner.isEmpty())
|
||||
throw new IllegalArgumentException("requires an owner " +
|
||||
"(i.e. does not work with BalancingDispatcher)");
|
||||
return new MyDurableMessageQueue(owner.get(),
|
||||
(ExtendedActorSystem) system.get());
|
||||
}
|
||||
}
|
||||
//#custom-mailbox-type
|
||||
|
|
@ -0,0 +1,96 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package docs.actor.mailbox;
|
||||
|
||||
//#durable-message-queue
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.Callable;
|
||||
import scala.concurrent.duration.Duration;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ExtendedActorSystem;
|
||||
import akka.actor.mailbox.DurableMessageQueueWithSerialization;
|
||||
import akka.dispatch.Envelope;
|
||||
import akka.dispatch.MessageQueue;
|
||||
import akka.pattern.CircuitBreaker;
|
||||
|
||||
public class MyDurableMessageQueue extends DurableMessageQueueWithSerialization {
|
||||
|
||||
public MyDurableMessageQueue(ActorRef owner, ExtendedActorSystem system) {
|
||||
super(owner, system);
|
||||
}
|
||||
|
||||
private final QueueStorage storage = new QueueStorage();
|
||||
// A real-world implementation would use configuration to set the last
|
||||
// three parameters below
|
||||
private final CircuitBreaker breaker = CircuitBreaker.create(system().scheduler(), 5,
|
||||
Duration.create(30, "seconds"), Duration.create(1, "minute"));
|
||||
|
||||
@Override
|
||||
public void enqueue(ActorRef receiver, final Envelope envelope) {
|
||||
breaker.callWithSyncCircuitBreaker(new Callable<Object>() {
|
||||
@Override
|
||||
public Object call() {
|
||||
byte[] data = serialize(envelope);
|
||||
storage.push(data);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Envelope dequeue() {
|
||||
return breaker.callWithSyncCircuitBreaker(new Callable<Envelope>() {
|
||||
@Override
|
||||
public Envelope call() {
|
||||
byte[] data = storage.pull();
|
||||
if (data == null)
|
||||
return null;
|
||||
else
|
||||
return deserialize(data);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasMessages() {
|
||||
return breaker.callWithSyncCircuitBreaker(new Callable<Boolean>() {
|
||||
@Override
|
||||
public Boolean call() {
|
||||
return !storage.isEmpty();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public int numberOfMessages() {
|
||||
return breaker.callWithSyncCircuitBreaker(new Callable<Integer>() {
|
||||
@Override
|
||||
public Integer call() {
|
||||
return storage.size();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when the mailbox is disposed.
|
||||
* An ordinary mailbox would send remaining messages to deadLetters,
|
||||
* but the purpose of a durable mailbox is to continue
|
||||
* with the same message queue when the actor is started again.
|
||||
*/
|
||||
@Override
|
||||
public void cleanUp(ActorRef owner, MessageQueue deadLetters) {}
|
||||
|
||||
//#dummy-queue-storage
|
||||
// dummy
|
||||
private static class QueueStorage {
|
||||
private final ConcurrentLinkedQueue<byte[]> queue =
|
||||
new ConcurrentLinkedQueue<byte[]>();
|
||||
public void push(byte[] data) { queue.offer(data); }
|
||||
public byte[] pull() { return queue.poll(); }
|
||||
public boolean isEmpty() { return queue.isEmpty(); }
|
||||
public int size() { return queue.size(); }
|
||||
}
|
||||
//#dummy-queue-storage
|
||||
}
|
||||
//#durable-message-queue
|
||||
|
|
@ -175,7 +175,7 @@ Akka comes shipped with a number of default mailbox implementations:
|
|||
|
||||
- Bounded: Yes
|
||||
|
||||
* Durable mailboxes, see :ref:`durable-mailboxes`.
|
||||
* Durable mailboxes, see :ref:`durable-mailboxes-java`.
|
||||
|
||||
Mailbox configuration examples
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
|
|
|||
91
akka-docs/rst/java/durable-mailbox.rst
Normal file
91
akka-docs/rst/java/durable-mailbox.rst
Normal file
|
|
@ -0,0 +1,91 @@
|
|||
|
||||
.. _durable-mailboxes-java:
|
||||
|
||||
##########################
|
||||
Durable Mailboxes (Java)
|
||||
##########################
|
||||
|
||||
|
||||
Overview
|
||||
========
|
||||
|
||||
A durable mailbox is a mailbox which stores the messages on durable storage.
|
||||
What this means in practice is that if there are pending messages in the actor's
|
||||
mailbox when the node of the actor resides on crashes, then when you restart the
|
||||
node, the actor will be able to continue processing as if nothing had happened;
|
||||
with all pending messages still in its mailbox.
|
||||
|
||||
You configure durable mailboxes through the dispatcher. The actor is oblivious
|
||||
to which type of mailbox it is using.
|
||||
|
||||
This gives you an excellent way of creating bulkheads in your application, where
|
||||
groups of actors sharing the same dispatcher also share the same backing
|
||||
storage. Read more about that in the :ref:`dispatchers-scala` documentation.
|
||||
|
||||
One basic file based durable mailbox is provided by Akka out-of-the-box.
|
||||
Other implementations can easily be added. Some are available as separate community
|
||||
Open Source projects, such as:
|
||||
|
||||
* `AMQP Durable Mailbox <https://github.com/drexin/akka-amqp-mailbox>`_
|
||||
|
||||
|
||||
A durable mailbox is like any other mailbox not likely to be transactional. It's possible
|
||||
if the actor crashes after receiving a message, but before completing processing of
|
||||
it, that the message could be lost.
|
||||
|
||||
.. warning::
|
||||
|
||||
A durable mailbox typically doesn't work with blocking message send, i.e. the message
|
||||
send operations that are relying on futures; ``ask``. If the node
|
||||
has crashed and then restarted, the thread that was blocked waiting for the
|
||||
reply is gone and there is no way we can deliver the message.
|
||||
|
||||
|
||||
File-based durable mailbox
|
||||
==========================
|
||||
|
||||
This mailbox is backed by a journaling transaction log on the local file
|
||||
system. It is the simplest to use since it does not require an extra
|
||||
infrastructure piece to administer, but it is usually sufficient and just what
|
||||
you need.
|
||||
|
||||
In the configuration of the dispatcher you specify the fully qualified class name
|
||||
of the mailbox:
|
||||
|
||||
.. includecode:: ../scala/code/docs/actor/mailbox/DurableMailboxDocSpec.scala
|
||||
:include: dispatcher-config
|
||||
|
||||
Here is an example of how to create an actor with a durable dispatcher:
|
||||
|
||||
.. includecode:: code/docs/actor/mailbox/DurableMailboxDocTestBase.java
|
||||
:include: imports,dispatcher-config-use
|
||||
|
||||
You can also configure and tune the file-based durable mailbox. This is done in
|
||||
the ``akka.actor.mailbox.file-based`` section in the :ref:`configuration`.
|
||||
|
||||
.. literalinclude:: ../../../akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf
|
||||
:language: none
|
||||
|
||||
How to implement a durable mailbox
|
||||
==================================
|
||||
|
||||
Here is an example of how to implement a custom durable mailbox. Essentially it consists of
|
||||
a configurator (MailboxType) and a queue implementation (DurableMessageQueue).
|
||||
|
||||
The envelope contains the message sent to the actor, and information about sender. It is the
|
||||
envelope that needs to be stored. As a help utility you can extend DurableMessageQueueWithSerialization
|
||||
to serialize and deserialize the envelope using the ordinary :ref:`serialization-scala`
|
||||
mechanism. This optional and you may store the envelope data in any way you like. Durable
|
||||
mailboxes are an excellent fit for usage of circuit breakers. These are described in the
|
||||
:ref:`circuit-breaker` documentation.
|
||||
|
||||
.. includecode:: code/docs/actor/mailbox/MyDurableMailboxType.java
|
||||
:include: custom-mailbox-type
|
||||
|
||||
.. includecode:: code/docs/actor/mailbox/MyDurableMessageQueue.java
|
||||
:include: durable-message-queue
|
||||
:exclude: dummy-queue-storage
|
||||
|
||||
For more inspiration you can look at the old implementations based on Redis, MongoDB, Beanstalk,
|
||||
and ZooKeeper, which can be found in Akka git repository tag
|
||||
`v2.0.1 <https://github.com/akka/akka/tree/v2.0.1/akka-durable-mailboxes>`_.
|
||||
|
|
@ -22,9 +22,10 @@ Java API
|
|||
transactors
|
||||
io
|
||||
fsm
|
||||
testing
|
||||
extending-akka
|
||||
zeromq
|
||||
microkernel
|
||||
camel
|
||||
testing
|
||||
durable-mailbox
|
||||
howto
|
||||
|
|
|
|||
|
|
@ -4,5 +4,4 @@ Modules
|
|||
.. toctree::
|
||||
:maxdepth: 2
|
||||
|
||||
durable-mailbox
|
||||
http
|
||||
|
|
|
|||
|
|
@ -72,7 +72,7 @@ class MyMessageQueue(_owner: ActorRef, _system: ExtendedActorSystem)
|
|||
extends DurableMessageQueue(_owner, _system) with DurableMessageSerialization {
|
||||
|
||||
val storage = new QueueStorage
|
||||
// A real-world implmentation would use configuration to set the last
|
||||
// A real-world implementation would use configuration to set the last
|
||||
// three parameters below
|
||||
val breaker = CircuitBreaker(system.scheduler, 5, 30.seconds, 1.minute)
|
||||
|
||||
|
|
@ -177,7 +177,7 @@ Akka comes shipped with a number of default mailbox implementations:
|
|||
|
||||
- Bounded: Yes
|
||||
|
||||
* Durable mailboxes, see :ref:`durable-mailboxes`.
|
||||
* Durable mailboxes, see :ref:`durable-mailboxes-scala`.
|
||||
|
||||
Mailbox configuration examples
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
|
|
|||
|
|
@ -1,9 +1,9 @@
|
|||
|
||||
.. _durable-mailboxes:
|
||||
.. _durable-mailboxes-scala:
|
||||
|
||||
###################
|
||||
Durable Mailboxes
|
||||
###################
|
||||
###########################
|
||||
Durable Mailboxes (Scala)
|
||||
###########################
|
||||
|
||||
|
||||
Overview
|
||||
|
|
@ -55,16 +55,11 @@ of the mailbox:
|
|||
.. includecode:: code/docs/actor/mailbox/DurableMailboxDocSpec.scala
|
||||
:include: dispatcher-config
|
||||
|
||||
Here is an example of how to create an actor with a durable dispatcher, in Scala:
|
||||
Here is an example of how to create an actor with a durable dispatcher:
|
||||
|
||||
.. includecode:: code/docs/actor/mailbox/DurableMailboxDocSpec.scala
|
||||
:include: imports,dispatcher-config-use
|
||||
|
||||
Corresponding example in Java:
|
||||
|
||||
.. includecode:: code/docs/actor/mailbox/DurableMailboxDocTestBase.java
|
||||
:include: imports,dispatcher-config-use
|
||||
|
||||
You can also configure and tune the file-based durable mailbox. This is done in
|
||||
the ``akka.actor.mailbox.file-based`` section in the :ref:`configuration`.
|
||||
|
||||
|
|
@ -29,4 +29,5 @@ Scala API
|
|||
zeromq
|
||||
microkernel
|
||||
camel
|
||||
durable-mailbox
|
||||
howto
|
||||
|
|
|
|||
|
|
@ -22,6 +22,13 @@ abstract class DurableMessageQueue(val owner: ActorRef, val system: ExtendedActo
|
|||
|
||||
}
|
||||
|
||||
/**
|
||||
* Java API
|
||||
* DurableMessageQueue with functionality to serialize and deserialize Envelopes (messages)
|
||||
*/
|
||||
abstract class DurableMessageQueueWithSerialization(_owner: ActorRef, _system: ExtendedActorSystem)
|
||||
extends DurableMessageQueue(_owner, _system) with DurableMessageSerialization
|
||||
|
||||
/**
|
||||
* DurableMessageSerialization can be mixed into a DurableMessageQueue and adds functionality
|
||||
* to serialize and deserialize Envelopes (messages)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue