=dur #3664 Deprecate durable mailboxes
This commit is contained in:
parent
a5b52e9bea
commit
4bd1586b1e
33 changed files with 41 additions and 561 deletions
|
|
@ -579,7 +579,7 @@ The default approach is to take options 2a, 3a, and 4a - allowing ``A`` on
|
|||
messages during the update transition. This assumes stateless actors that do not
|
||||
have a dependency on message ordering from any given source.
|
||||
|
||||
- If an actor has a distributed durable mailbox then nothing needs to be done,
|
||||
- If an actor has persistent (durable) state then nothing needs to be done,
|
||||
other than migrating the actor.
|
||||
|
||||
- If message ordering needs to be maintained during the update transition then
|
||||
|
|
|
|||
|
|
@ -403,8 +403,4 @@ akka-zeromq
|
|||
.. literalinclude:: ../../../akka-zeromq/src/main/resources/reference.conf
|
||||
:language: none
|
||||
|
||||
akka-file-mailbox
|
||||
~~~~~~~~~~~~~~~~~
|
||||
|
||||
.. literalinclude:: ../../../akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf
|
||||
:language: none
|
||||
|
|
|
|||
|
|
@ -230,7 +230,7 @@ possibly non-exhaustive list of counter-indications is:
|
|||
deployed with Routers.
|
||||
|
||||
- As mentioned above, the problem occurs anywhere a lock is involved during
|
||||
enqueueing, which may also apply to custom mailboxes (or durable mailboxes).
|
||||
enqueueing, which may also apply to custom mailboxes.
|
||||
|
||||
This list has been compiled carefully, but other problematic scenarios may have
|
||||
escaped our analysis.
|
||||
|
|
|
|||
|
|
@ -34,15 +34,9 @@ Akka is very modular and consists of several JARs containing different features.
|
|||
|
||||
- ``akka-cluster`` – Cluster membership management, elastic routers.
|
||||
|
||||
- ``akka-file-mailbox`` – Akka durable mailbox (find more among community
|
||||
projects)
|
||||
|
||||
- ``akka-kernel`` – Akka microkernel for running a bare-bones mini application
|
||||
server
|
||||
|
||||
- ``akka-mailboxes-common`` – common infrastructure for implementing durable
|
||||
mailboxes
|
||||
|
||||
- ``akka-osgi`` – base bundle for using Akka in OSGi containers, containing the
|
||||
``akka-actor`` classes
|
||||
|
||||
|
|
|
|||
|
|
@ -1,44 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package docs.actor.mailbox;
|
||||
|
||||
//#imports
|
||||
import akka.actor.Props;
|
||||
import akka.actor.ActorRef;
|
||||
|
||||
//#imports
|
||||
|
||||
import akka.testkit.AkkaJUnitActorSystemResource;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
|
||||
import akka.testkit.AkkaSpec;
|
||||
import com.typesafe.config.ConfigFactory;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.UntypedActor;
|
||||
|
||||
public class DurableMailboxDocTest {
|
||||
|
||||
@ClassRule
|
||||
public static AkkaJUnitActorSystemResource actorSystemResource =
|
||||
new AkkaJUnitActorSystemResource("DurableMailboxDocTest",
|
||||
ConfigFactory.parseString(DurableMailboxDocSpec.config()).withFallback(AkkaSpec.testConf()));
|
||||
|
||||
private final ActorSystem system = actorSystemResource.getSystem();
|
||||
|
||||
@Test
|
||||
public void configDefinedDispatcher() {
|
||||
//#dispatcher-config-use
|
||||
ActorRef myActor = system.actorOf(Props.create(MyUntypedActor.class).
|
||||
withDispatcher("my-dispatcher"), "myactor");
|
||||
//#dispatcher-config-use
|
||||
myActor.tell("test", ActorRef.noSender());
|
||||
}
|
||||
|
||||
public static class MyUntypedActor extends UntypedActor {
|
||||
public void onReceive(Object message) {
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,31 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package docs.actor.mailbox
|
||||
|
||||
import akka.actor.mailbox.DurableMailboxSpec
|
||||
|
||||
object MyDurableMailboxDocSpec {
|
||||
val config = """
|
||||
MyStorage-dispatcher {
|
||||
mailbox-type = docs.actor.mailbox.MyDurableMailboxType
|
||||
}
|
||||
"""
|
||||
}
|
||||
|
||||
class MyDurableMailboxDocSpec extends DurableMailboxSpec("MyStorage", MyDurableMailboxDocSpec.config) {
|
||||
override def atStartup() {
|
||||
}
|
||||
|
||||
override def afterTermination() {
|
||||
}
|
||||
|
||||
"MyDurableMailbox (Java)" must {
|
||||
"deliver a message" in {
|
||||
val actor = createMailboxTestActor()
|
||||
implicit val sender = testActor
|
||||
actor ! "hello"
|
||||
expectMsg("hello")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,30 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package docs.actor.mailbox;
|
||||
|
||||
//#custom-mailbox-type
|
||||
import scala.Option;
|
||||
import com.typesafe.config.Config;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.ExtendedActorSystem;
|
||||
import akka.dispatch.MailboxType;
|
||||
import akka.dispatch.MessageQueue;
|
||||
|
||||
public class MyDurableMailboxType implements MailboxType {
|
||||
|
||||
public MyDurableMailboxType(ActorSystem.Settings settings, Config config) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageQueue create(Option<ActorRef> owner,
|
||||
Option<ActorSystem> system) {
|
||||
if (owner.isEmpty())
|
||||
throw new IllegalArgumentException("requires an owner " +
|
||||
"(i.e. does not work with BalancingDispatcher)");
|
||||
return new MyDurableMessageQueue(owner.get(),
|
||||
(ExtendedActorSystem) system.get());
|
||||
}
|
||||
}
|
||||
//#custom-mailbox-type
|
||||
|
|
@ -1,96 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package docs.actor.mailbox;
|
||||
|
||||
//#durable-message-queue
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.Callable;
|
||||
import scala.concurrent.duration.Duration;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ExtendedActorSystem;
|
||||
import akka.actor.mailbox.DurableMessageQueueWithSerialization;
|
||||
import akka.dispatch.Envelope;
|
||||
import akka.dispatch.MessageQueue;
|
||||
import akka.pattern.CircuitBreaker;
|
||||
|
||||
public class MyDurableMessageQueue extends DurableMessageQueueWithSerialization {
|
||||
|
||||
public MyDurableMessageQueue(ActorRef owner, ExtendedActorSystem system) {
|
||||
super(owner, system);
|
||||
}
|
||||
|
||||
private final QueueStorage storage = new QueueStorage();
|
||||
// A real-world implementation would use configuration to set the last
|
||||
// three parameters below
|
||||
private final CircuitBreaker breaker = CircuitBreaker.create(system().scheduler(),
|
||||
5, Duration.create(30, "seconds"), Duration.create(1, "minute"));
|
||||
|
||||
@Override
|
||||
public void enqueue(ActorRef receiver, final Envelope envelope) {
|
||||
breaker.callWithSyncCircuitBreaker(new Callable<Object>() {
|
||||
@Override
|
||||
public Object call() {
|
||||
byte[] data = serialize(envelope);
|
||||
storage.push(data);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Envelope dequeue() {
|
||||
return breaker.callWithSyncCircuitBreaker(new Callable<Envelope>() {
|
||||
@Override
|
||||
public Envelope call() {
|
||||
byte[] data = storage.pull();
|
||||
if (data == null)
|
||||
return null;
|
||||
else
|
||||
return deserialize(data);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasMessages() {
|
||||
return breaker.callWithSyncCircuitBreaker(new Callable<Boolean>() {
|
||||
@Override
|
||||
public Boolean call() {
|
||||
return !storage.isEmpty();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public int numberOfMessages() {
|
||||
return breaker.callWithSyncCircuitBreaker(new Callable<Integer>() {
|
||||
@Override
|
||||
public Integer call() {
|
||||
return storage.size();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when the mailbox is disposed.
|
||||
* An ordinary mailbox would send remaining messages to deadLetters,
|
||||
* but the purpose of a durable mailbox is to continue
|
||||
* with the same message queue when the actor is started again.
|
||||
*/
|
||||
@Override
|
||||
public void cleanUp(ActorRef owner, MessageQueue deadLetters) {}
|
||||
|
||||
//#dummy-queue-storage
|
||||
// dummy
|
||||
private static class QueueStorage {
|
||||
private final ConcurrentLinkedQueue<byte[]> queue =
|
||||
new ConcurrentLinkedQueue<byte[]>();
|
||||
public void push(byte[] data) { queue.offer(data); }
|
||||
public byte[] pull() { return queue.poll(); }
|
||||
public boolean isEmpty() { return queue.isEmpty(); }
|
||||
public int size() { return queue.size(); }
|
||||
}
|
||||
//#dummy-queue-storage
|
||||
}
|
||||
//#durable-message-queue
|
||||
|
|
@ -6,7 +6,6 @@ package docs.testkit;
|
|||
import static org.junit.Assert.*;
|
||||
|
||||
import akka.testkit.*;
|
||||
import docs.actor.mailbox.DurableMailboxDocSpec;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,91 +0,0 @@
|
|||
|
||||
.. _durable-mailboxes-java:
|
||||
|
||||
##########################
|
||||
Durable Mailboxes
|
||||
##########################
|
||||
|
||||
|
||||
Overview
|
||||
========
|
||||
|
||||
A durable mailbox is a mailbox which stores the messages on durable storage.
|
||||
What this means in practice is that if there are pending messages in the actor's
|
||||
mailbox when the node of the actor resides on crashes, then when you restart the
|
||||
node, the actor will be able to continue processing as if nothing had happened;
|
||||
with all pending messages still in its mailbox.
|
||||
|
||||
You configure durable mailboxes through the dispatcher or the actor deployment (see
|
||||
:ref:`mailboxes-java`). The actor is oblivious to which type of mailbox it is using.
|
||||
|
||||
This gives you an excellent way of creating bulkheads in your application, where
|
||||
groups of actors sharing the same dispatcher also share the same backing
|
||||
storage. Read more about that in the :ref:`dispatchers-scala` documentation.
|
||||
|
||||
One basic file based durable mailbox is provided by Akka out-of-the-box.
|
||||
Other implementations can easily be added. Some are available as separate community
|
||||
Open Source projects, such as:
|
||||
|
||||
* `AMQP Durable Mailbox <https://github.com/drexin/akka-amqp-mailbox>`_
|
||||
|
||||
|
||||
A durable mailbox is like any other mailbox not likely to be transactional. It's possible
|
||||
if the actor crashes after receiving a message, but before completing processing of
|
||||
it, that the message could be lost.
|
||||
|
||||
.. warning::
|
||||
|
||||
A durable mailbox typically doesn't work with blocking message send, i.e. the message
|
||||
send operations that are relying on futures; ``ask``. If the node
|
||||
has crashed and then restarted, the thread that was blocked waiting for the
|
||||
reply is gone and there is no way we can deliver the message.
|
||||
|
||||
|
||||
File-based durable mailbox
|
||||
==========================
|
||||
|
||||
This mailbox is backed by a journaling transaction log on the local file
|
||||
system. It is the simplest to use since it does not require an extra
|
||||
infrastructure piece to administer, but it is usually sufficient and just what
|
||||
you need.
|
||||
|
||||
In the configuration of the dispatcher you specify the fully qualified class name
|
||||
of the mailbox:
|
||||
|
||||
.. includecode:: ../scala/code/docs/actor/mailbox/DurableMailboxDocSpec.scala
|
||||
:include: dispatcher-config
|
||||
|
||||
Here is an example of how to create an actor with a durable dispatcher:
|
||||
|
||||
.. includecode:: code/docs/actor/mailbox/DurableMailboxDocTest.java
|
||||
:include: imports,dispatcher-config-use
|
||||
|
||||
You can also configure and tune the file-based durable mailbox. This is done in
|
||||
the ``akka.actor.mailbox.file-based`` section in the :ref:`configuration`.
|
||||
|
||||
.. literalinclude:: ../../../akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf
|
||||
:language: none
|
||||
|
||||
How to implement a durable mailbox
|
||||
==================================
|
||||
|
||||
Here is an example of how to implement a custom durable mailbox. Essentially it consists of
|
||||
a configurator (MailboxType) and a queue implementation (DurableMessageQueue).
|
||||
|
||||
The envelope contains the message sent to the actor, and information about sender. It is the
|
||||
envelope that needs to be stored. As a help utility you can extend DurableMessageQueueWithSerialization
|
||||
to serialize and deserialize the envelope using the ordinary :ref:`serialization-scala`
|
||||
mechanism. This optional and you may store the envelope data in any way you like. Durable
|
||||
mailboxes are an excellent fit for usage of circuit breakers. These are described in the
|
||||
:ref:`circuit-breaker` documentation.
|
||||
|
||||
.. includecode:: code/docs/actor/mailbox/MyDurableMailboxType.java
|
||||
:include: custom-mailbox-type
|
||||
|
||||
.. includecode:: code/docs/actor/mailbox/MyDurableMessageQueue.java
|
||||
:include: durable-message-queue
|
||||
:exclude: dummy-queue-storage
|
||||
|
||||
For more inspiration you can look at the old implementations based on Redis, MongoDB, Beanstalk,
|
||||
and ZooKeeper, which can be found in Akka git repository tag
|
||||
`v2.0.1 <https://github.com/akka/akka/tree/v2.0.1/akka-durable-mailboxes>`_.
|
||||
|
|
@ -10,5 +10,4 @@ Utilities
|
|||
../common/duration
|
||||
../common/circuitbreaker
|
||||
extending-akka
|
||||
durable-mailbox
|
||||
microkernel
|
||||
|
|
|
|||
|
|
@ -161,8 +161,6 @@ Akka comes shipped with a number of mailbox implementations:
|
|||
|
||||
- Configuration name: "akka.dispatch.BoundedPriorityMailbox"
|
||||
|
||||
* Durable mailboxes, see :ref:`durable-mailboxes-java`.
|
||||
|
||||
Mailbox configuration examples
|
||||
==============================
|
||||
|
||||
|
|
|
|||
|
|
@ -123,8 +123,8 @@ address which shall be the recipient of the serialized information. Use
|
|||
|
||||
This assumes that serialization happens in the context of sending a message
|
||||
through the remote transport. There are other uses of serialization, though,
|
||||
e.g. storing actor references outside of an actor application (database,
|
||||
durable mailbox, etc.). In this case, it is important to keep in mind that the
|
||||
e.g. storing actor references outside of an actor application (database, etc.).
|
||||
In this case, it is important to keep in mind that the
|
||||
address part of an actor’s path determines how that actor is communicated with.
|
||||
Storing a local actor path might be the right choice if the retrieval happens
|
||||
in the same logical context, but it is not enough when deserializing it on a
|
||||
|
|
|
|||
|
|
@ -104,6 +104,14 @@ Dataflow is Deprecated
|
|||
|
||||
Akka dataflow is superseded by `Scala Async <https://github.com/scala/async>`_.
|
||||
|
||||
Durable Mailboxes are Deprecated
|
||||
================================
|
||||
|
||||
Durable mailboxes are superseded by ``akka-persistence``, which offers several
|
||||
tools to support reliable messaging.
|
||||
|
||||
Read more about ``akka-persistence`` in the :ref:`documentation for Scala <persistence-scala>` and
|
||||
:ref:`documentation for Java <persistence-java>`.
|
||||
|
||||
Removed Deprecated Features
|
||||
===========================
|
||||
|
|
@ -114,4 +122,4 @@ The following, previously deprecated, features have been removed:
|
|||
* `API changes to FSM and TestFSMRef <http://doc.akka.io/docs/akka/2.2.3/project/migration-guide-2.1.x-2.2.x.html#API_changes_to_FSM_and_TestFSMRef>`_
|
||||
* DefaultScheduler superseded by LightArrayRevolverScheduler
|
||||
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -1,143 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package docs.actor.mailbox
|
||||
|
||||
import language.postfixOps
|
||||
|
||||
//#imports
|
||||
import akka.actor.Props
|
||||
|
||||
//#imports
|
||||
|
||||
import org.scalatest.{ BeforeAndAfterAll, WordSpec }
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.actor.{ Actor, ExtendedActorSystem }
|
||||
|
||||
class MyActor extends Actor {
|
||||
def receive = {
|
||||
case x =>
|
||||
}
|
||||
}
|
||||
|
||||
object DurableMailboxDocSpec {
|
||||
val config = """
|
||||
//#dispatcher-config
|
||||
my-dispatcher {
|
||||
mailbox-type = akka.actor.mailbox.filebased.FileBasedMailboxType
|
||||
}
|
||||
//#dispatcher-config
|
||||
"""
|
||||
}
|
||||
|
||||
class DurableMailboxDocSpec extends AkkaSpec(DurableMailboxDocSpec.config) {
|
||||
|
||||
"configuration of dispatcher with durable mailbox" in {
|
||||
//#dispatcher-config-use
|
||||
val myActor = system.actorOf(Props[MyActor].
|
||||
withDispatcher("my-dispatcher"), name = "myactor")
|
||||
//#dispatcher-config-use
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
//#custom-mailbox
|
||||
import com.typesafe.config.Config
|
||||
import akka.actor.ActorContext
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.ActorSystem
|
||||
import akka.dispatch.Envelope
|
||||
import akka.dispatch.MailboxType
|
||||
import akka.dispatch.MessageQueue
|
||||
import akka.actor.mailbox.DurableMessageQueue
|
||||
import akka.actor.mailbox.DurableMessageSerialization
|
||||
import akka.pattern.CircuitBreaker
|
||||
import scala.concurrent.duration._
|
||||
|
||||
class MyMailboxType(systemSettings: ActorSystem.Settings, config: Config)
|
||||
extends MailboxType {
|
||||
|
||||
override def create(owner: Option[ActorRef],
|
||||
system: Option[ActorSystem]): MessageQueue =
|
||||
(owner zip system) headOption match {
|
||||
case Some((o, s: ExtendedActorSystem)) => new MyMessageQueue(o, s)
|
||||
case _ =>
|
||||
throw new IllegalArgumentException("requires an owner " +
|
||||
"(i.e. does not work with BalancingDispatcher)")
|
||||
}
|
||||
}
|
||||
|
||||
class MyMessageQueue(_owner: ActorRef, _system: ExtendedActorSystem)
|
||||
extends DurableMessageQueue(_owner, _system) with DurableMessageSerialization {
|
||||
|
||||
val storage = new QueueStorage
|
||||
// A real-world implementation would use configuration to set the last
|
||||
// three parameters below
|
||||
val breaker = CircuitBreaker(system.scheduler, 5, 30.seconds, 1.minute)
|
||||
|
||||
def enqueue(receiver: ActorRef, envelope: Envelope): Unit =
|
||||
breaker.withSyncCircuitBreaker {
|
||||
val data: Array[Byte] = serialize(envelope)
|
||||
storage.push(data)
|
||||
}
|
||||
|
||||
def dequeue(): Envelope = breaker.withSyncCircuitBreaker {
|
||||
val data: Option[Array[Byte]] = storage.pull()
|
||||
data.map(deserialize).orNull
|
||||
}
|
||||
|
||||
def hasMessages: Boolean = breaker.withSyncCircuitBreaker { !storage.isEmpty }
|
||||
|
||||
def numberOfMessages: Int = breaker.withSyncCircuitBreaker { storage.size }
|
||||
|
||||
/**
|
||||
* Called when the mailbox is disposed.
|
||||
* An ordinary mailbox would send remaining messages to deadLetters,
|
||||
* but the purpose of a durable mailbox is to continue
|
||||
* with the same message queue when the actor is started again.
|
||||
*/
|
||||
def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = ()
|
||||
|
||||
}
|
||||
//#custom-mailbox
|
||||
|
||||
// dummy
|
||||
class QueueStorage {
|
||||
import java.util.concurrent.ConcurrentLinkedQueue
|
||||
val queue = new ConcurrentLinkedQueue[Array[Byte]]
|
||||
def push(data: Array[Byte]): Unit = queue.offer(data)
|
||||
def pull(): Option[Array[Byte]] = Option(queue.poll())
|
||||
def isEmpty: Boolean = queue.isEmpty
|
||||
def size: Int = queue.size
|
||||
}
|
||||
|
||||
//#custom-mailbox-test
|
||||
import akka.actor.mailbox.DurableMailboxSpec
|
||||
|
||||
object MyMailboxSpec {
|
||||
val config = """
|
||||
MyStorage-dispatcher {
|
||||
mailbox-type = docs.actor.mailbox.MyMailboxType
|
||||
}
|
||||
"""
|
||||
}
|
||||
|
||||
class MyMailboxSpec extends DurableMailboxSpec("MyStorage", MyMailboxSpec.config) {
|
||||
override def atStartup() {
|
||||
}
|
||||
|
||||
override def afterTermination() {
|
||||
}
|
||||
|
||||
"MyMailbox" must {
|
||||
"deliver a message" in {
|
||||
val actor = createMailboxTestActor()
|
||||
implicit val sender = testActor
|
||||
actor ! "hello"
|
||||
expectMsg("hello")
|
||||
}
|
||||
|
||||
// add more tests
|
||||
}
|
||||
}
|
||||
|
|
@ -1,99 +0,0 @@
|
|||
|
||||
.. _durable-mailboxes-scala:
|
||||
|
||||
###########################
|
||||
Durable Mailboxes
|
||||
###########################
|
||||
|
||||
|
||||
Overview
|
||||
========
|
||||
|
||||
A durable mailbox is a mailbox which stores the messages on durable storage.
|
||||
What this means in practice is that if there are pending messages in the actor's
|
||||
mailbox when the node of the actor resides on crashes, then when you restart the
|
||||
node, the actor will be able to continue processing as if nothing had happened;
|
||||
with all pending messages still in its mailbox.
|
||||
|
||||
You configure durable mailboxes through the dispatcher or the actor deployment (see
|
||||
:ref:`mailboxes-scala`). The actor is oblivious to which type of mailbox it is using.
|
||||
|
||||
This gives you an excellent way of creating bulkheads in your application, where
|
||||
groups of actors sharing the same dispatcher also share the same backing
|
||||
storage. Read more about that in the :ref:`dispatchers-scala` documentation.
|
||||
|
||||
One basic file based durable mailbox is provided by Akka out-of-the-box.
|
||||
Other implementations can easily be added. Some are available as separate community
|
||||
Open Source projects, such as:
|
||||
|
||||
* `AMQP Durable Mailbox <https://github.com/drexin/akka-amqp-mailbox>`_
|
||||
|
||||
|
||||
A durable mailbox is like any other mailbox not likely to be transactional. It's possible
|
||||
if the actor crashes after receiving a message, but before completing processing of
|
||||
it, that the message could be lost.
|
||||
|
||||
.. warning::
|
||||
|
||||
A durable mailbox typically doesn't work with blocking message send, i.e. the message
|
||||
send operations that are relying on futures; ``?`` or ``ask``. If the node
|
||||
has crashed and then restarted, the thread that was blocked waiting for the
|
||||
reply is gone and there is no way we can deliver the message.
|
||||
|
||||
|
||||
File-based durable mailbox
|
||||
==========================
|
||||
|
||||
This mailbox is backed by a journaling transaction log on the local file
|
||||
system. It is the simplest to use since it does not require an extra
|
||||
infrastructure piece to administer, but it is usually sufficient and just what
|
||||
you need.
|
||||
|
||||
In the configuration of the dispatcher you specify the fully qualified class name
|
||||
of the mailbox:
|
||||
|
||||
.. includecode:: code/docs/actor/mailbox/DurableMailboxDocSpec.scala
|
||||
:include: dispatcher-config
|
||||
|
||||
Here is an example of how to create an actor with a durable dispatcher:
|
||||
|
||||
.. includecode:: code/docs/actor/mailbox/DurableMailboxDocSpec.scala
|
||||
:include: imports,dispatcher-config-use
|
||||
|
||||
You can also configure and tune the file-based durable mailbox. This is done in
|
||||
the ``akka.actor.mailbox.file-based`` section in the :ref:`configuration`.
|
||||
|
||||
.. literalinclude:: ../../../akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf
|
||||
:language: none
|
||||
|
||||
How to implement a durable mailbox
|
||||
==================================
|
||||
|
||||
Here is an example of how to implement a custom durable mailbox. Essentially it consists of
|
||||
a configurator (MailboxType) and a queue implementation (DurableMessageQueue).
|
||||
|
||||
The envelope contains the message sent to the actor, and information about sender. It is the
|
||||
envelope that needs to be stored. As a help utility you can mixin DurableMessageSerialization
|
||||
to serialize and deserialize the envelope using the ordinary :ref:`serialization-scala`
|
||||
mechanism. This optional and you may store the envelope data in any way you like. Durable
|
||||
mailboxes are an excellent fit for usage of circuit breakers. These are described in the
|
||||
:ref:`circuit-breaker` documentation.
|
||||
|
||||
.. includecode:: code/docs/actor/mailbox/DurableMailboxDocSpec.scala
|
||||
:include: custom-mailbox
|
||||
|
||||
To facilitate testing of a durable mailbox you may use ``DurableMailboxSpec`` as base class.
|
||||
To use ``DurableMailboxDocSpec`` add this dependency::
|
||||
|
||||
"com.typesafe.akka" %% "akka-mailboxes-common" %
|
||||
"@version@" classifier "test" @crossString@
|
||||
|
||||
It implements a few basic tests and helps you setup the a fixture. More tests can be
|
||||
added in concrete subclass like this:
|
||||
|
||||
.. includecode:: code/docs/actor/mailbox/DurableMailboxDocSpec.scala
|
||||
:include: custom-mailbox-test
|
||||
|
||||
For more inspiration you can look at the old implementations based on Redis, MongoDB, Beanstalk,
|
||||
and ZooKeeper, which can be found in Akka git repository tag
|
||||
`v2.0.1 <https://github.com/akka/akka/tree/v2.0.1/akka-durable-mailboxes>`_.
|
||||
|
|
@ -10,5 +10,4 @@ Utilities
|
|||
../common/duration
|
||||
../common/circuitbreaker
|
||||
extending-akka
|
||||
durable-mailbox
|
||||
microkernel
|
||||
|
|
|
|||
|
|
@ -161,8 +161,6 @@ Akka comes shipped with a number of mailbox implementations:
|
|||
|
||||
- Configuration name: "akka.dispatch.BoundedPriorityMailbox"
|
||||
|
||||
* Durable mailboxes, see :ref:`durable-mailboxes-scala`.
|
||||
|
||||
Mailbox configuration examples
|
||||
==============================
|
||||
|
||||
|
|
|
|||
|
|
@ -112,8 +112,8 @@ address which shall be the recipient of the serialized information. Use
|
|||
|
||||
This assumes that serialization happens in the context of sending a message
|
||||
through the remote transport. There are other uses of serialization, though,
|
||||
e.g. storing actor references outside of an actor application (database,
|
||||
durable mailbox, etc.). In this case, it is important to keep in mind that the
|
||||
e.g. storing actor references outside of an actor application (database, etc.).
|
||||
In this case, it is important to keep in mind that the
|
||||
address part of an actor’s path determines how that actor is communicated with.
|
||||
Storing a local actor path might be the right choice if the retrieval happens
|
||||
in the same logical context, but it is not enough when deserializing it on a
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@
|
|||
akka {
|
||||
actor {
|
||||
mailbox {
|
||||
# deprecated, superseded by akka-persistence
|
||||
file-based {
|
||||
# directory below which this queue resides
|
||||
directory-path = "./_mb"
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ import scala.util.control.NonFatal
|
|||
import akka.pattern.{ CircuitBreakerOpenException, CircuitBreaker }
|
||||
import scala.concurrent.duration.Duration
|
||||
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
class FileBasedMailboxType(systemSettings: ActorSystem.Settings, config: Config) extends MailboxType {
|
||||
private val settings = new FileBasedMailboxSettings(systemSettings, config)
|
||||
override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = (owner zip system).headOption match {
|
||||
|
|
@ -22,6 +23,7 @@ class FileBasedMailboxType(systemSettings: ActorSystem.Settings, config: Config)
|
|||
}
|
||||
}
|
||||
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
class FileBasedMessageQueue(_owner: ActorRef, _system: ExtendedActorSystem, val settings: FileBasedMailboxSettings)
|
||||
extends DurableMessageQueue(_owner, _system) with DurableMessageSerialization {
|
||||
// TODO Is it reasonable for all FileBasedMailboxes to have their own logger?
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ import scala.concurrent.duration._
|
|||
import java.util.concurrent.TimeUnit.MILLISECONDS
|
||||
import akka.actor.ActorSystem
|
||||
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
class FileBasedMailboxSettings(val systemSettings: ActorSystem.Settings, val userConfig: Config)
|
||||
extends DurableMailboxSettings {
|
||||
|
||||
|
|
|
|||
|
|
@ -19,4 +19,5 @@ package akka.actor.mailbox.filebased.filequeue
|
|||
|
||||
import java.io.IOException
|
||||
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
case class BrokenItemException(lastValidPosition: Long, cause: Throwable) extends IOException(cause)
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ package akka.actor.mailbox.filebased.filequeue
|
|||
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
class Counter {
|
||||
private val value = new AtomicLong(0)
|
||||
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ import scala.util.control.NonFatal
|
|||
|
||||
// returned from journal replay
|
||||
sealed trait JournalItem
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
object JournalItem {
|
||||
case class Add(item: QItem) extends JournalItem
|
||||
case object Remove extends JournalItem
|
||||
|
|
@ -38,6 +39,7 @@ object JournalItem {
|
|||
/**
|
||||
* Codes for working with the journal file for a PersistentQueue.
|
||||
*/
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
class Journal(queuePath: String, syncJournal: ⇒ Boolean, log: LoggingAdapter) {
|
||||
|
||||
private val queueFile = new File(queuePath)
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ import scala.annotation.tailrec
|
|||
import akka.actor.mailbox.filebased.FileBasedMailboxSettings
|
||||
|
||||
// a config value that's backed by a global setting but may be locally overridden
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
class OverlaySetting[T](base: ⇒ T) {
|
||||
@volatile
|
||||
private var local: Option[T] = None
|
||||
|
|
@ -38,6 +39,7 @@ trait Prependable[T] {
|
|||
def prepend(t: T): Unit
|
||||
}
|
||||
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
class PersistentQueue(persistencePath: String, val name: String, val settings: FileBasedMailboxSettings, log: LoggingAdapter) {
|
||||
|
||||
private case object ItemArrived
|
||||
|
|
@ -448,6 +450,7 @@ class PersistentQueue(persistencePath: String, val name: String, val settings: F
|
|||
}
|
||||
}
|
||||
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
object PersistentQueue {
|
||||
@volatile
|
||||
var maxItems: Int = Int.MaxValue
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ package akka.actor.mailbox.filebased.filequeue
|
|||
|
||||
import java.nio.{ ByteBuffer, ByteOrder }
|
||||
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
case class QItem(addTime: Long, expiry: Long, data: Array[Byte], var xid: Int) {
|
||||
def pack(): Array[Byte] = {
|
||||
val bytes = new Array[Byte](data.length + 16)
|
||||
|
|
@ -31,6 +32,7 @@ case class QItem(addTime: Long, expiry: Long, data: Array[Byte], var xid: Int) {
|
|||
}
|
||||
}
|
||||
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
object QItem {
|
||||
def unpack(data: Array[Byte]): QItem = {
|
||||
val buffer = ByteBuffer.wrap(data)
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ import akka.event.LoggingAdapter
|
|||
import akka.actor.mailbox.filebased.FileBasedMailboxSettings
|
||||
|
||||
class InaccessibleQueuePath extends Exception("Inaccessible queue path: Must be a directory and writable")
|
||||
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
class QueueCollection(queueFolder: String, settings: FileBasedMailboxSettings, log: LoggingAdapter) {
|
||||
private val path = new File(queueFolder)
|
||||
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ import akka.actor.mailbox.filebased.filequeue._
|
|||
import akka.event.LoggingAdapter
|
||||
import akka.actor.ActorSystem
|
||||
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
class QueueDumper(filename: String, log: LoggingAdapter) {
|
||||
var offset = 0L
|
||||
var operations = 0L
|
||||
|
|
@ -110,6 +111,7 @@ class QueueDumper(filename: String, log: LoggingAdapter) {
|
|||
}
|
||||
}
|
||||
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
object QDumper {
|
||||
val filenames = new mutable.ListBuffer[String]
|
||||
var quiet = false
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package akka.actor.mailbox.filebased.filequeue.tools
|
||||
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
object Util {
|
||||
val KILOBYTE = 1024L
|
||||
val MEGABYTE = 1024 * KILOBYTE
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ private[akka] object DurableExecutableMailboxConfig {
|
|||
val Name = "[\\.\\/\\$\\s]".r
|
||||
}
|
||||
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
abstract class DurableMessageQueue(val owner: ActorRef, val system: ExtendedActorSystem) extends MessageQueue {
|
||||
import DurableExecutableMailboxConfig._
|
||||
|
||||
|
|
@ -26,6 +27,7 @@ abstract class DurableMessageQueue(val owner: ActorRef, val system: ExtendedActo
|
|||
* Java API
|
||||
* DurableMessageQueue with functionality to serialize and deserialize Envelopes (messages)
|
||||
*/
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
abstract class DurableMessageQueueWithSerialization(_owner: ActorRef, _system: ExtendedActorSystem)
|
||||
extends DurableMessageQueue(_owner, _system) with DurableMessageSerialization
|
||||
|
||||
|
|
@ -33,6 +35,7 @@ abstract class DurableMessageQueueWithSerialization(_owner: ActorRef, _system: E
|
|||
* DurableMessageSerialization can be mixed into a DurableMessageQueue and adds functionality
|
||||
* to serialize and deserialize Envelopes (messages)
|
||||
*/
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
trait DurableMessageSerialization { this: DurableMessageQueue ⇒
|
||||
|
||||
/**
|
||||
|
|
@ -91,6 +94,7 @@ trait DurableMessageSerialization { this: DurableMessageQueue ⇒
|
|||
*
|
||||
* where name=“my-durable-mailbox” in this example.
|
||||
*/
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
trait DurableMailboxSettings {
|
||||
/**
|
||||
* A reference to the enclosing actor system.
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ import akka.dispatch.Mailbox
|
|||
import akka.testkit.TestKit
|
||||
import scala.concurrent.duration._
|
||||
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
object DurableMailboxSpecActorFactory {
|
||||
|
||||
class MailboxTestActor extends Actor {
|
||||
|
|
@ -37,6 +38,7 @@ object DurableMailboxSpecActorFactory {
|
|||
|
||||
}
|
||||
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
object DurableMailboxSpec {
|
||||
def fallbackConfig: Config = ConfigFactory.parseString("""
|
||||
akka {
|
||||
|
|
@ -54,6 +56,7 @@ object DurableMailboxSpec {
|
|||
* Subclass must define dispatcher in the supplied config for the specific backend.
|
||||
* The id of the dispatcher must be the same as the `<backendName>-dispatcher`.
|
||||
*/
|
||||
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
|
||||
abstract class DurableMailboxSpec(system: ActorSystem, val backendName: String)
|
||||
extends TestKit(system) with WordSpecLike with MustMatchers with BeforeAndAfterAll {
|
||||
|
||||
|
|
|
|||
|
|
@ -618,8 +618,8 @@ object AkkaBuild extends Build {
|
|||
lazy val docs = Project(
|
||||
id = "akka-docs",
|
||||
base = file("akka-docs"),
|
||||
dependencies = Seq(actor, testkit % "test->test", mailboxesCommon % "compile;test->test", channels,
|
||||
remote % "compile;test->test", cluster, slf4j, agent, transactor, fileMailbox, zeroMQ, camel, osgi, osgiAries,
|
||||
dependencies = Seq(actor, testkit % "test->test", channels,
|
||||
remote % "compile;test->test", cluster, slf4j, agent, transactor, zeroMQ, camel, osgi, osgiAries,
|
||||
persistence % "compile;test->test"),
|
||||
settings = defaultSettings ++ docFormatSettings ++ site.settings ++ site.sphinxSupport() ++ site.publishSite ++ sphinxPreprocessing ++ cpsPlugin ++ Seq(
|
||||
sourceDirectory in Sphinx <<= baseDirectory / "rst",
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue