diff --git a/akka-docs/rst/modules/code/docs/actor/mailbox/DurableMailboxDocTest.scala b/akka-docs/rst/java/code/docs/actor/mailbox/DurableMailboxDocTest.scala similarity index 100% rename from akka-docs/rst/modules/code/docs/actor/mailbox/DurableMailboxDocTest.scala rename to akka-docs/rst/java/code/docs/actor/mailbox/DurableMailboxDocTest.scala diff --git a/akka-docs/rst/modules/code/docs/actor/mailbox/DurableMailboxDocTestBase.java b/akka-docs/rst/java/code/docs/actor/mailbox/DurableMailboxDocTestBase.java similarity index 100% rename from akka-docs/rst/modules/code/docs/actor/mailbox/DurableMailboxDocTestBase.java rename to akka-docs/rst/java/code/docs/actor/mailbox/DurableMailboxDocTestBase.java diff --git a/akka-docs/rst/java/code/docs/actor/mailbox/MyDurableMailboxDocSpec.scala b/akka-docs/rst/java/code/docs/actor/mailbox/MyDurableMailboxDocSpec.scala new file mode 100644 index 0000000000..cc5e944b98 --- /dev/null +++ b/akka-docs/rst/java/code/docs/actor/mailbox/MyDurableMailboxDocSpec.scala @@ -0,0 +1,31 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package docs.actor.mailbox + +import akka.actor.mailbox.DurableMailboxSpec + +object MyDurableMailboxDocSpec { + val config = """ + MyStorage-dispatcher { + mailbox-type = docs.actor.mailbox.MyDurableMailboxType + } + """ +} + +class MyDurableMailboxDocSpec extends DurableMailboxSpec("MyStorage", MyDurableMailboxDocSpec.config) { + override def atStartup() { + } + + override def afterTermination() { + } + + "MyDurableMailbox (Java)" must { + "deliver a message" in { + val actor = createMailboxTestActor() + implicit val sender = testActor + actor ! "hello" + expectMsg("hello") + } + } +} \ No newline at end of file diff --git a/akka-docs/rst/java/code/docs/actor/mailbox/MyDurableMailboxType.java b/akka-docs/rst/java/code/docs/actor/mailbox/MyDurableMailboxType.java new file mode 100644 index 0000000000..e9b18d3a4f --- /dev/null +++ b/akka-docs/rst/java/code/docs/actor/mailbox/MyDurableMailboxType.java @@ -0,0 +1,30 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package docs.actor.mailbox; + +//#custom-mailbox-type +import scala.Option; +import com.typesafe.config.Config; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.ExtendedActorSystem; +import akka.dispatch.MailboxType; +import akka.dispatch.MessageQueue; + +public class MyDurableMailboxType implements MailboxType { + + public MyDurableMailboxType(ActorSystem.Settings settings, Config config) { + } + + @Override + public MessageQueue create(Option owner, + Option system) { + if (owner.isEmpty()) + throw new IllegalArgumentException("requires an owner " + + "(i.e. does not work with BalancingDispatcher)"); + return new MyDurableMessageQueue(owner.get(), + (ExtendedActorSystem) system.get()); + } +} +//#custom-mailbox-type \ No newline at end of file diff --git a/akka-docs/rst/java/code/docs/actor/mailbox/MyDurableMessageQueue.java b/akka-docs/rst/java/code/docs/actor/mailbox/MyDurableMessageQueue.java new file mode 100644 index 0000000000..0e878573f0 --- /dev/null +++ b/akka-docs/rst/java/code/docs/actor/mailbox/MyDurableMessageQueue.java @@ -0,0 +1,96 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package docs.actor.mailbox; + +//#durable-message-queue +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Callable; +import scala.concurrent.duration.Duration; +import akka.actor.ActorRef; +import akka.actor.ExtendedActorSystem; +import akka.actor.mailbox.DurableMessageQueueWithSerialization; +import akka.dispatch.Envelope; +import akka.dispatch.MessageQueue; +import akka.pattern.CircuitBreaker; + +public class MyDurableMessageQueue extends DurableMessageQueueWithSerialization { + + public MyDurableMessageQueue(ActorRef owner, ExtendedActorSystem system) { + super(owner, system); + } + + private final QueueStorage storage = new QueueStorage(); + // A real-world implementation would use configuration to set the last + // three parameters below + private final CircuitBreaker breaker = CircuitBreaker.create(system().scheduler(), 5, + Duration.create(30, "seconds"), Duration.create(1, "minute")); + + @Override + public void enqueue(ActorRef receiver, final Envelope envelope) { + breaker.callWithSyncCircuitBreaker(new Callable() { + @Override + public Object call() { + byte[] data = serialize(envelope); + storage.push(data); + return null; + } + }); + } + + @Override + public Envelope dequeue() { + return breaker.callWithSyncCircuitBreaker(new Callable() { + @Override + public Envelope call() { + byte[] data = storage.pull(); + if (data == null) + return null; + else + return deserialize(data); + } + }); + } + + @Override + public boolean hasMessages() { + return breaker.callWithSyncCircuitBreaker(new Callable() { + @Override + public Boolean call() { + return !storage.isEmpty(); + } + }); + } + + @Override + public int numberOfMessages() { + return breaker.callWithSyncCircuitBreaker(new Callable() { + @Override + public Integer call() { + return storage.size(); + } + }); + } + + /** + * Called when the mailbox is disposed. + * An ordinary mailbox would send remaining messages to deadLetters, + * but the purpose of a durable mailbox is to continue + * with the same message queue when the actor is started again. + */ + @Override + public void cleanUp(ActorRef owner, MessageQueue deadLetters) {} + + //#dummy-queue-storage + // dummy + private static class QueueStorage { + private final ConcurrentLinkedQueue queue = + new ConcurrentLinkedQueue(); + public void push(byte[] data) { queue.offer(data); } + public byte[] pull() { return queue.poll(); } + public boolean isEmpty() { return queue.isEmpty(); } + public int size() { return queue.size(); } + } + //#dummy-queue-storage +} +//#durable-message-queue \ No newline at end of file diff --git a/akka-docs/rst/java/dispatchers.rst b/akka-docs/rst/java/dispatchers.rst index 0be567ea7f..fb23bd6ef1 100644 --- a/akka-docs/rst/java/dispatchers.rst +++ b/akka-docs/rst/java/dispatchers.rst @@ -175,7 +175,7 @@ Akka comes shipped with a number of default mailbox implementations: - Bounded: Yes -* Durable mailboxes, see :ref:`durable-mailboxes`. +* Durable mailboxes, see :ref:`durable-mailboxes-java`. Mailbox configuration examples ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/akka-docs/rst/java/durable-mailbox.rst b/akka-docs/rst/java/durable-mailbox.rst new file mode 100644 index 0000000000..d2324cd9ef --- /dev/null +++ b/akka-docs/rst/java/durable-mailbox.rst @@ -0,0 +1,91 @@ + +.. _durable-mailboxes-java: + +########################## + Durable Mailboxes (Java) +########################## + + +Overview +======== + +A durable mailbox is a mailbox which stores the messages on durable storage. +What this means in practice is that if there are pending messages in the actor's +mailbox when the node of the actor resides on crashes, then when you restart the +node, the actor will be able to continue processing as if nothing had happened; +with all pending messages still in its mailbox. + +You configure durable mailboxes through the dispatcher. The actor is oblivious +to which type of mailbox it is using. + +This gives you an excellent way of creating bulkheads in your application, where +groups of actors sharing the same dispatcher also share the same backing +storage. Read more about that in the :ref:`dispatchers-scala` documentation. + +One basic file based durable mailbox is provided by Akka out-of-the-box. +Other implementations can easily be added. Some are available as separate community +Open Source projects, such as: + +* `AMQP Durable Mailbox `_ + + +A durable mailbox is like any other mailbox not likely to be transactional. It's possible +if the actor crashes after receiving a message, but before completing processing of +it, that the message could be lost. + +.. warning:: + + A durable mailbox typically doesn't work with blocking message send, i.e. the message + send operations that are relying on futures; ``ask``. If the node + has crashed and then restarted, the thread that was blocked waiting for the + reply is gone and there is no way we can deliver the message. + + +File-based durable mailbox +========================== + +This mailbox is backed by a journaling transaction log on the local file +system. It is the simplest to use since it does not require an extra +infrastructure piece to administer, but it is usually sufficient and just what +you need. + +In the configuration of the dispatcher you specify the fully qualified class name +of the mailbox: + +.. includecode:: ../scala/code/docs/actor/mailbox/DurableMailboxDocSpec.scala + :include: dispatcher-config + +Here is an example of how to create an actor with a durable dispatcher: + +.. includecode:: code/docs/actor/mailbox/DurableMailboxDocTestBase.java + :include: imports,dispatcher-config-use + +You can also configure and tune the file-based durable mailbox. This is done in +the ``akka.actor.mailbox.file-based`` section in the :ref:`configuration`. + +.. literalinclude:: ../../../akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf + :language: none + +How to implement a durable mailbox +================================== + +Here is an example of how to implement a custom durable mailbox. Essentially it consists of +a configurator (MailboxType) and a queue implementation (DurableMessageQueue). + +The envelope contains the message sent to the actor, and information about sender. It is the +envelope that needs to be stored. As a help utility you can extend DurableMessageQueueWithSerialization +to serialize and deserialize the envelope using the ordinary :ref:`serialization-scala` +mechanism. This optional and you may store the envelope data in any way you like. Durable +mailboxes are an excellent fit for usage of circuit breakers. These are described in the +:ref:`circuit-breaker` documentation. + +.. includecode:: code/docs/actor/mailbox/MyDurableMailboxType.java + :include: custom-mailbox-type + +.. includecode:: code/docs/actor/mailbox/MyDurableMessageQueue.java + :include: durable-message-queue + :exclude: dummy-queue-storage + +For more inspiration you can look at the old implementations based on Redis, MongoDB, Beanstalk, +and ZooKeeper, which can be found in Akka git repository tag +`v2.0.1 `_. \ No newline at end of file diff --git a/akka-docs/rst/java/index.rst b/akka-docs/rst/java/index.rst index 81404409cb..2d2274b4c6 100644 --- a/akka-docs/rst/java/index.rst +++ b/akka-docs/rst/java/index.rst @@ -22,9 +22,10 @@ Java API transactors io fsm + testing extending-akka zeromq microkernel camel - testing + durable-mailbox howto diff --git a/akka-docs/rst/modules/index.rst b/akka-docs/rst/modules/index.rst index 8dbf7533b7..90e332df06 100644 --- a/akka-docs/rst/modules/index.rst +++ b/akka-docs/rst/modules/index.rst @@ -4,5 +4,4 @@ Modules .. toctree:: :maxdepth: 2 - durable-mailbox http diff --git a/akka-docs/rst/modules/code/docs/actor/mailbox/DurableMailboxDocSpec.scala b/akka-docs/rst/scala/code/docs/actor/mailbox/DurableMailboxDocSpec.scala similarity index 98% rename from akka-docs/rst/modules/code/docs/actor/mailbox/DurableMailboxDocSpec.scala rename to akka-docs/rst/scala/code/docs/actor/mailbox/DurableMailboxDocSpec.scala index 0b0c320a42..df8782790d 100644 --- a/akka-docs/rst/modules/code/docs/actor/mailbox/DurableMailboxDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/actor/mailbox/DurableMailboxDocSpec.scala @@ -72,7 +72,7 @@ class MyMessageQueue(_owner: ActorRef, _system: ExtendedActorSystem) extends DurableMessageQueue(_owner, _system) with DurableMessageSerialization { val storage = new QueueStorage - // A real-world implmentation would use configuration to set the last + // A real-world implementation would use configuration to set the last // three parameters below val breaker = CircuitBreaker(system.scheduler, 5, 30.seconds, 1.minute) diff --git a/akka-docs/rst/scala/dispatchers.rst b/akka-docs/rst/scala/dispatchers.rst index 3ca629b54c..593b67c97f 100644 --- a/akka-docs/rst/scala/dispatchers.rst +++ b/akka-docs/rst/scala/dispatchers.rst @@ -177,7 +177,7 @@ Akka comes shipped with a number of default mailbox implementations: - Bounded: Yes -* Durable mailboxes, see :ref:`durable-mailboxes`. +* Durable mailboxes, see :ref:`durable-mailboxes-scala`. Mailbox configuration examples ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/akka-docs/rst/modules/durable-mailbox.rst b/akka-docs/rst/scala/durable-mailbox.rst similarity index 94% rename from akka-docs/rst/modules/durable-mailbox.rst rename to akka-docs/rst/scala/durable-mailbox.rst index 7fa5aa2480..42b4069dbf 100644 --- a/akka-docs/rst/modules/durable-mailbox.rst +++ b/akka-docs/rst/scala/durable-mailbox.rst @@ -1,9 +1,9 @@ -.. _durable-mailboxes: +.. _durable-mailboxes-scala: -################### - Durable Mailboxes -################### +########################### + Durable Mailboxes (Scala) +########################### Overview @@ -55,16 +55,11 @@ of the mailbox: .. includecode:: code/docs/actor/mailbox/DurableMailboxDocSpec.scala :include: dispatcher-config -Here is an example of how to create an actor with a durable dispatcher, in Scala: +Here is an example of how to create an actor with a durable dispatcher: .. includecode:: code/docs/actor/mailbox/DurableMailboxDocSpec.scala :include: imports,dispatcher-config-use -Corresponding example in Java: - -.. includecode:: code/docs/actor/mailbox/DurableMailboxDocTestBase.java - :include: imports,dispatcher-config-use - You can also configure and tune the file-based durable mailbox. This is done in the ``akka.actor.mailbox.file-based`` section in the :ref:`configuration`. diff --git a/akka-docs/rst/scala/index.rst b/akka-docs/rst/scala/index.rst index c0ad9fe985..21cf66b551 100644 --- a/akka-docs/rst/scala/index.rst +++ b/akka-docs/rst/scala/index.rst @@ -29,4 +29,5 @@ Scala API zeromq microkernel camel + durable-mailbox howto diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala index 98170ff851..89f87bc7df 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala @@ -22,6 +22,13 @@ abstract class DurableMessageQueue(val owner: ActorRef, val system: ExtendedActo } +/** + * Java API + * DurableMessageQueue with functionality to serialize and deserialize Envelopes (messages) + */ +abstract class DurableMessageQueueWithSerialization(_owner: ActorRef, _system: ExtendedActorSystem) + extends DurableMessageQueue(_owner, _system) with DurableMessageSerialization + /** * DurableMessageSerialization can be mixed into a DurableMessageQueue and adds functionality * to serialize and deserialize Envelopes (messages)