From 4bd1586b1e8642ea600f36cde3b12e9692d4f675 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 11 Dec 2013 13:38:13 +0100 Subject: [PATCH] =dur #3664 Deprecate durable mailboxes --- akka-docs/rst/common/cluster.rst | 2 +- akka-docs/rst/general/configuration.rst | 4 - .../general/message-delivery-guarantees.rst | 2 +- akka-docs/rst/intro/getting-started.rst | 6 - .../actor/mailbox/DurableMailboxDocTest.java | 44 ------ .../mailbox/MyDurableMailboxDocSpec.scala | 31 ---- .../actor/mailbox/MyDurableMailboxType.java | 30 ---- .../actor/mailbox/MyDurableMessageQueue.java | 96 ------------ .../code/docs/testkit/TestKitDocTest.java | 1 - akka-docs/rst/java/durable-mailbox.rst | 91 ----------- akka-docs/rst/java/index-utilities.rst | 1 - akka-docs/rst/java/mailboxes.rst | 2 - akka-docs/rst/java/serialization.rst | 4 +- .../project/migration-guide-2.2.x-2.3.x.rst | 10 +- .../actor/mailbox/DurableMailboxDocSpec.scala | 143 ------------------ akka-docs/rst/scala/durable-mailbox.rst | 99 ------------ akka-docs/rst/scala/index-utilities.rst | 1 - akka-docs/rst/scala/mailboxes.rst | 2 - akka-docs/rst/scala/serialization.rst | 4 +- .../src/main/resources/reference.conf | 1 + .../mailbox/filebased/FileBasedMailbox.scala | 2 + .../filebased/FileBasedMailboxSettings.scala | 1 + .../filequeue/BrokenItemException.scala | 1 + .../mailbox/filebased/filequeue/Counter.scala | 1 + .../mailbox/filebased/filequeue/Journal.scala | 2 + .../filebased/filequeue/PersistentQueue.scala | 3 + .../mailbox/filebased/filequeue/QItem.scala | 2 + .../filebased/filequeue/QueueCollection.scala | 2 +- .../filebased/filequeue/tools/QDumper.scala | 2 + .../filebased/filequeue/tools/Util.scala | 1 + .../akka/actor/mailbox/DurableMailbox.scala | 4 + .../actor/mailbox/DurableMailboxSpec.scala | 3 + project/AkkaBuild.scala | 4 +- 33 files changed, 41 insertions(+), 561 deletions(-) delete mode 100644 akka-docs/rst/java/code/docs/actor/mailbox/DurableMailboxDocTest.java delete mode 100644 akka-docs/rst/java/code/docs/actor/mailbox/MyDurableMailboxDocSpec.scala delete mode 100644 akka-docs/rst/java/code/docs/actor/mailbox/MyDurableMailboxType.java delete mode 100644 akka-docs/rst/java/code/docs/actor/mailbox/MyDurableMessageQueue.java delete mode 100644 akka-docs/rst/java/durable-mailbox.rst delete mode 100644 akka-docs/rst/scala/code/docs/actor/mailbox/DurableMailboxDocSpec.scala delete mode 100644 akka-docs/rst/scala/durable-mailbox.rst diff --git a/akka-docs/rst/common/cluster.rst b/akka-docs/rst/common/cluster.rst index c986fbb2d1..20ccc7088f 100644 --- a/akka-docs/rst/common/cluster.rst +++ b/akka-docs/rst/common/cluster.rst @@ -579,7 +579,7 @@ The default approach is to take options 2a, 3a, and 4a - allowing ``A`` on messages during the update transition. This assumes stateless actors that do not have a dependency on message ordering from any given source. -- If an actor has a distributed durable mailbox then nothing needs to be done, +- If an actor has persistent (durable) state then nothing needs to be done, other than migrating the actor. - If message ordering needs to be maintained during the update transition then diff --git a/akka-docs/rst/general/configuration.rst b/akka-docs/rst/general/configuration.rst index 4940dd14dc..70a69f3bda 100644 --- a/akka-docs/rst/general/configuration.rst +++ b/akka-docs/rst/general/configuration.rst @@ -403,8 +403,4 @@ akka-zeromq .. literalinclude:: ../../../akka-zeromq/src/main/resources/reference.conf :language: none -akka-file-mailbox -~~~~~~~~~~~~~~~~~ -.. literalinclude:: ../../../akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf - :language: none diff --git a/akka-docs/rst/general/message-delivery-guarantees.rst b/akka-docs/rst/general/message-delivery-guarantees.rst index 48547703bc..1f256f9c9f 100644 --- a/akka-docs/rst/general/message-delivery-guarantees.rst +++ b/akka-docs/rst/general/message-delivery-guarantees.rst @@ -230,7 +230,7 @@ possibly non-exhaustive list of counter-indications is: deployed with Routers. - As mentioned above, the problem occurs anywhere a lock is involved during - enqueueing, which may also apply to custom mailboxes (or durable mailboxes). + enqueueing, which may also apply to custom mailboxes. This list has been compiled carefully, but other problematic scenarios may have escaped our analysis. diff --git a/akka-docs/rst/intro/getting-started.rst b/akka-docs/rst/intro/getting-started.rst index 6930d36936..9781feae2e 100644 --- a/akka-docs/rst/intro/getting-started.rst +++ b/akka-docs/rst/intro/getting-started.rst @@ -34,15 +34,9 @@ Akka is very modular and consists of several JARs containing different features. - ``akka-cluster`` – Cluster membership management, elastic routers. -- ``akka-file-mailbox`` – Akka durable mailbox (find more among community - projects) - - ``akka-kernel`` – Akka microkernel for running a bare-bones mini application server -- ``akka-mailboxes-common`` – common infrastructure for implementing durable - mailboxes - - ``akka-osgi`` – base bundle for using Akka in OSGi containers, containing the ``akka-actor`` classes diff --git a/akka-docs/rst/java/code/docs/actor/mailbox/DurableMailboxDocTest.java b/akka-docs/rst/java/code/docs/actor/mailbox/DurableMailboxDocTest.java deleted file mode 100644 index ddb38af393..0000000000 --- a/akka-docs/rst/java/code/docs/actor/mailbox/DurableMailboxDocTest.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Copyright (C) 2009-2013 Typesafe Inc. - */ -package docs.actor.mailbox; - -//#imports -import akka.actor.Props; -import akka.actor.ActorRef; - -//#imports - -import akka.testkit.AkkaJUnitActorSystemResource; -import org.junit.ClassRule; -import org.junit.Test; - -import akka.testkit.AkkaSpec; -import com.typesafe.config.ConfigFactory; -import akka.actor.ActorSystem; -import akka.actor.UntypedActor; - -public class DurableMailboxDocTest { - - @ClassRule - public static AkkaJUnitActorSystemResource actorSystemResource = - new AkkaJUnitActorSystemResource("DurableMailboxDocTest", - ConfigFactory.parseString(DurableMailboxDocSpec.config()).withFallback(AkkaSpec.testConf())); - - private final ActorSystem system = actorSystemResource.getSystem(); - - @Test - public void configDefinedDispatcher() { - //#dispatcher-config-use - ActorRef myActor = system.actorOf(Props.create(MyUntypedActor.class). - withDispatcher("my-dispatcher"), "myactor"); - //#dispatcher-config-use - myActor.tell("test", ActorRef.noSender()); - } - - public static class MyUntypedActor extends UntypedActor { - public void onReceive(Object message) { - } - } - -} diff --git a/akka-docs/rst/java/code/docs/actor/mailbox/MyDurableMailboxDocSpec.scala b/akka-docs/rst/java/code/docs/actor/mailbox/MyDurableMailboxDocSpec.scala deleted file mode 100644 index cc5e944b98..0000000000 --- a/akka-docs/rst/java/code/docs/actor/mailbox/MyDurableMailboxDocSpec.scala +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Copyright (C) 2009-2013 Typesafe Inc. - */ -package docs.actor.mailbox - -import akka.actor.mailbox.DurableMailboxSpec - -object MyDurableMailboxDocSpec { - val config = """ - MyStorage-dispatcher { - mailbox-type = docs.actor.mailbox.MyDurableMailboxType - } - """ -} - -class MyDurableMailboxDocSpec extends DurableMailboxSpec("MyStorage", MyDurableMailboxDocSpec.config) { - override def atStartup() { - } - - override def afterTermination() { - } - - "MyDurableMailbox (Java)" must { - "deliver a message" in { - val actor = createMailboxTestActor() - implicit val sender = testActor - actor ! "hello" - expectMsg("hello") - } - } -} \ No newline at end of file diff --git a/akka-docs/rst/java/code/docs/actor/mailbox/MyDurableMailboxType.java b/akka-docs/rst/java/code/docs/actor/mailbox/MyDurableMailboxType.java deleted file mode 100644 index e9b18d3a4f..0000000000 --- a/akka-docs/rst/java/code/docs/actor/mailbox/MyDurableMailboxType.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Copyright (C) 2009-2013 Typesafe Inc. - */ -package docs.actor.mailbox; - -//#custom-mailbox-type -import scala.Option; -import com.typesafe.config.Config; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.ExtendedActorSystem; -import akka.dispatch.MailboxType; -import akka.dispatch.MessageQueue; - -public class MyDurableMailboxType implements MailboxType { - - public MyDurableMailboxType(ActorSystem.Settings settings, Config config) { - } - - @Override - public MessageQueue create(Option owner, - Option system) { - if (owner.isEmpty()) - throw new IllegalArgumentException("requires an owner " + - "(i.e. does not work with BalancingDispatcher)"); - return new MyDurableMessageQueue(owner.get(), - (ExtendedActorSystem) system.get()); - } -} -//#custom-mailbox-type \ No newline at end of file diff --git a/akka-docs/rst/java/code/docs/actor/mailbox/MyDurableMessageQueue.java b/akka-docs/rst/java/code/docs/actor/mailbox/MyDurableMessageQueue.java deleted file mode 100644 index 934b70af20..0000000000 --- a/akka-docs/rst/java/code/docs/actor/mailbox/MyDurableMessageQueue.java +++ /dev/null @@ -1,96 +0,0 @@ -/** - * Copyright (C) 2009-2013 Typesafe Inc. - */ -package docs.actor.mailbox; - -//#durable-message-queue -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Callable; -import scala.concurrent.duration.Duration; -import akka.actor.ActorRef; -import akka.actor.ExtendedActorSystem; -import akka.actor.mailbox.DurableMessageQueueWithSerialization; -import akka.dispatch.Envelope; -import akka.dispatch.MessageQueue; -import akka.pattern.CircuitBreaker; - -public class MyDurableMessageQueue extends DurableMessageQueueWithSerialization { - - public MyDurableMessageQueue(ActorRef owner, ExtendedActorSystem system) { - super(owner, system); - } - - private final QueueStorage storage = new QueueStorage(); - // A real-world implementation would use configuration to set the last - // three parameters below - private final CircuitBreaker breaker = CircuitBreaker.create(system().scheduler(), - 5, Duration.create(30, "seconds"), Duration.create(1, "minute")); - - @Override - public void enqueue(ActorRef receiver, final Envelope envelope) { - breaker.callWithSyncCircuitBreaker(new Callable() { - @Override - public Object call() { - byte[] data = serialize(envelope); - storage.push(data); - return null; - } - }); - } - - @Override - public Envelope dequeue() { - return breaker.callWithSyncCircuitBreaker(new Callable() { - @Override - public Envelope call() { - byte[] data = storage.pull(); - if (data == null) - return null; - else - return deserialize(data); - } - }); - } - - @Override - public boolean hasMessages() { - return breaker.callWithSyncCircuitBreaker(new Callable() { - @Override - public Boolean call() { - return !storage.isEmpty(); - } - }); - } - - @Override - public int numberOfMessages() { - return breaker.callWithSyncCircuitBreaker(new Callable() { - @Override - public Integer call() { - return storage.size(); - } - }); - } - - /** - * Called when the mailbox is disposed. - * An ordinary mailbox would send remaining messages to deadLetters, - * but the purpose of a durable mailbox is to continue - * with the same message queue when the actor is started again. - */ - @Override - public void cleanUp(ActorRef owner, MessageQueue deadLetters) {} - - //#dummy-queue-storage - // dummy - private static class QueueStorage { - private final ConcurrentLinkedQueue queue = - new ConcurrentLinkedQueue(); - public void push(byte[] data) { queue.offer(data); } - public byte[] pull() { return queue.poll(); } - public boolean isEmpty() { return queue.isEmpty(); } - public int size() { return queue.size(); } - } - //#dummy-queue-storage -} -//#durable-message-queue \ No newline at end of file diff --git a/akka-docs/rst/java/code/docs/testkit/TestKitDocTest.java b/akka-docs/rst/java/code/docs/testkit/TestKitDocTest.java index 9b4b8c0df4..83fbbdd0d1 100644 --- a/akka-docs/rst/java/code/docs/testkit/TestKitDocTest.java +++ b/akka-docs/rst/java/code/docs/testkit/TestKitDocTest.java @@ -6,7 +6,6 @@ package docs.testkit; import static org.junit.Assert.*; import akka.testkit.*; -import docs.actor.mailbox.DurableMailboxDocSpec; import org.junit.ClassRule; import org.junit.Test; diff --git a/akka-docs/rst/java/durable-mailbox.rst b/akka-docs/rst/java/durable-mailbox.rst deleted file mode 100644 index 1a5c38aadf..0000000000 --- a/akka-docs/rst/java/durable-mailbox.rst +++ /dev/null @@ -1,91 +0,0 @@ - -.. _durable-mailboxes-java: - -########################## - Durable Mailboxes -########################## - - -Overview -======== - -A durable mailbox is a mailbox which stores the messages on durable storage. -What this means in practice is that if there are pending messages in the actor's -mailbox when the node of the actor resides on crashes, then when you restart the -node, the actor will be able to continue processing as if nothing had happened; -with all pending messages still in its mailbox. - -You configure durable mailboxes through the dispatcher or the actor deployment (see -:ref:`mailboxes-java`). The actor is oblivious to which type of mailbox it is using. - -This gives you an excellent way of creating bulkheads in your application, where -groups of actors sharing the same dispatcher also share the same backing -storage. Read more about that in the :ref:`dispatchers-scala` documentation. - -One basic file based durable mailbox is provided by Akka out-of-the-box. -Other implementations can easily be added. Some are available as separate community -Open Source projects, such as: - -* `AMQP Durable Mailbox `_ - - -A durable mailbox is like any other mailbox not likely to be transactional. It's possible -if the actor crashes after receiving a message, but before completing processing of -it, that the message could be lost. - -.. warning:: - - A durable mailbox typically doesn't work with blocking message send, i.e. the message - send operations that are relying on futures; ``ask``. If the node - has crashed and then restarted, the thread that was blocked waiting for the - reply is gone and there is no way we can deliver the message. - - -File-based durable mailbox -========================== - -This mailbox is backed by a journaling transaction log on the local file -system. It is the simplest to use since it does not require an extra -infrastructure piece to administer, but it is usually sufficient and just what -you need. - -In the configuration of the dispatcher you specify the fully qualified class name -of the mailbox: - -.. includecode:: ../scala/code/docs/actor/mailbox/DurableMailboxDocSpec.scala - :include: dispatcher-config - -Here is an example of how to create an actor with a durable dispatcher: - -.. includecode:: code/docs/actor/mailbox/DurableMailboxDocTest.java - :include: imports,dispatcher-config-use - -You can also configure and tune the file-based durable mailbox. This is done in -the ``akka.actor.mailbox.file-based`` section in the :ref:`configuration`. - -.. literalinclude:: ../../../akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf - :language: none - -How to implement a durable mailbox -================================== - -Here is an example of how to implement a custom durable mailbox. Essentially it consists of -a configurator (MailboxType) and a queue implementation (DurableMessageQueue). - -The envelope contains the message sent to the actor, and information about sender. It is the -envelope that needs to be stored. As a help utility you can extend DurableMessageQueueWithSerialization -to serialize and deserialize the envelope using the ordinary :ref:`serialization-scala` -mechanism. This optional and you may store the envelope data in any way you like. Durable -mailboxes are an excellent fit for usage of circuit breakers. These are described in the -:ref:`circuit-breaker` documentation. - -.. includecode:: code/docs/actor/mailbox/MyDurableMailboxType.java - :include: custom-mailbox-type - -.. includecode:: code/docs/actor/mailbox/MyDurableMessageQueue.java - :include: durable-message-queue - :exclude: dummy-queue-storage - -For more inspiration you can look at the old implementations based on Redis, MongoDB, Beanstalk, -and ZooKeeper, which can be found in Akka git repository tag -`v2.0.1 `_. \ No newline at end of file diff --git a/akka-docs/rst/java/index-utilities.rst b/akka-docs/rst/java/index-utilities.rst index 3132e60d58..38ea3d7bdc 100644 --- a/akka-docs/rst/java/index-utilities.rst +++ b/akka-docs/rst/java/index-utilities.rst @@ -10,5 +10,4 @@ Utilities ../common/duration ../common/circuitbreaker extending-akka - durable-mailbox microkernel diff --git a/akka-docs/rst/java/mailboxes.rst b/akka-docs/rst/java/mailboxes.rst index cb972ed011..9bce28664c 100644 --- a/akka-docs/rst/java/mailboxes.rst +++ b/akka-docs/rst/java/mailboxes.rst @@ -161,8 +161,6 @@ Akka comes shipped with a number of mailbox implementations: - Configuration name: "akka.dispatch.BoundedPriorityMailbox" -* Durable mailboxes, see :ref:`durable-mailboxes-java`. - Mailbox configuration examples ============================== diff --git a/akka-docs/rst/java/serialization.rst b/akka-docs/rst/java/serialization.rst index b04b8bfc50..269ef4c5e9 100644 --- a/akka-docs/rst/java/serialization.rst +++ b/akka-docs/rst/java/serialization.rst @@ -123,8 +123,8 @@ address which shall be the recipient of the serialized information. Use This assumes that serialization happens in the context of sending a message through the remote transport. There are other uses of serialization, though, -e.g. storing actor references outside of an actor application (database, -durable mailbox, etc.). In this case, it is important to keep in mind that the +e.g. storing actor references outside of an actor application (database, etc.). +In this case, it is important to keep in mind that the address part of an actor’s path determines how that actor is communicated with. Storing a local actor path might be the right choice if the retrieval happens in the same logical context, but it is not enough when deserializing it on a diff --git a/akka-docs/rst/project/migration-guide-2.2.x-2.3.x.rst b/akka-docs/rst/project/migration-guide-2.2.x-2.3.x.rst index 1427d9f44a..b07dec0311 100644 --- a/akka-docs/rst/project/migration-guide-2.2.x-2.3.x.rst +++ b/akka-docs/rst/project/migration-guide-2.2.x-2.3.x.rst @@ -104,6 +104,14 @@ Dataflow is Deprecated Akka dataflow is superseded by `Scala Async `_. +Durable Mailboxes are Deprecated +================================ + +Durable mailboxes are superseded by ``akka-persistence``, which offers several +tools to support reliable messaging. + +Read more about ``akka-persistence`` in the :ref:`documentation for Scala ` and +:ref:`documentation for Java `. Removed Deprecated Features =========================== @@ -114,4 +122,4 @@ The following, previously deprecated, features have been removed: * `API changes to FSM and TestFSMRef `_ * DefaultScheduler superseded by LightArrayRevolverScheduler - + diff --git a/akka-docs/rst/scala/code/docs/actor/mailbox/DurableMailboxDocSpec.scala b/akka-docs/rst/scala/code/docs/actor/mailbox/DurableMailboxDocSpec.scala deleted file mode 100644 index 84eeb14a52..0000000000 --- a/akka-docs/rst/scala/code/docs/actor/mailbox/DurableMailboxDocSpec.scala +++ /dev/null @@ -1,143 +0,0 @@ -/** - * Copyright (C) 2009-2013 Typesafe Inc. - */ -package docs.actor.mailbox - -import language.postfixOps - -//#imports -import akka.actor.Props - -//#imports - -import org.scalatest.{ BeforeAndAfterAll, WordSpec } -import org.scalatest.matchers.MustMatchers -import akka.testkit.AkkaSpec -import akka.actor.{ Actor, ExtendedActorSystem } - -class MyActor extends Actor { - def receive = { - case x => - } -} - -object DurableMailboxDocSpec { - val config = """ - //#dispatcher-config - my-dispatcher { - mailbox-type = akka.actor.mailbox.filebased.FileBasedMailboxType - } - //#dispatcher-config - """ -} - -class DurableMailboxDocSpec extends AkkaSpec(DurableMailboxDocSpec.config) { - - "configuration of dispatcher with durable mailbox" in { - //#dispatcher-config-use - val myActor = system.actorOf(Props[MyActor]. - withDispatcher("my-dispatcher"), name = "myactor") - //#dispatcher-config-use - } - -} - -//#custom-mailbox -import com.typesafe.config.Config -import akka.actor.ActorContext -import akka.actor.ActorRef -import akka.actor.ActorSystem -import akka.dispatch.Envelope -import akka.dispatch.MailboxType -import akka.dispatch.MessageQueue -import akka.actor.mailbox.DurableMessageQueue -import akka.actor.mailbox.DurableMessageSerialization -import akka.pattern.CircuitBreaker -import scala.concurrent.duration._ - -class MyMailboxType(systemSettings: ActorSystem.Settings, config: Config) - extends MailboxType { - - override def create(owner: Option[ActorRef], - system: Option[ActorSystem]): MessageQueue = - (owner zip system) headOption match { - case Some((o, s: ExtendedActorSystem)) => new MyMessageQueue(o, s) - case _ => - throw new IllegalArgumentException("requires an owner " + - "(i.e. does not work with BalancingDispatcher)") - } -} - -class MyMessageQueue(_owner: ActorRef, _system: ExtendedActorSystem) - extends DurableMessageQueue(_owner, _system) with DurableMessageSerialization { - - val storage = new QueueStorage - // A real-world implementation would use configuration to set the last - // three parameters below - val breaker = CircuitBreaker(system.scheduler, 5, 30.seconds, 1.minute) - - def enqueue(receiver: ActorRef, envelope: Envelope): Unit = - breaker.withSyncCircuitBreaker { - val data: Array[Byte] = serialize(envelope) - storage.push(data) - } - - def dequeue(): Envelope = breaker.withSyncCircuitBreaker { - val data: Option[Array[Byte]] = storage.pull() - data.map(deserialize).orNull - } - - def hasMessages: Boolean = breaker.withSyncCircuitBreaker { !storage.isEmpty } - - def numberOfMessages: Int = breaker.withSyncCircuitBreaker { storage.size } - - /** - * Called when the mailbox is disposed. - * An ordinary mailbox would send remaining messages to deadLetters, - * but the purpose of a durable mailbox is to continue - * with the same message queue when the actor is started again. - */ - def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = () - -} -//#custom-mailbox - -// dummy -class QueueStorage { - import java.util.concurrent.ConcurrentLinkedQueue - val queue = new ConcurrentLinkedQueue[Array[Byte]] - def push(data: Array[Byte]): Unit = queue.offer(data) - def pull(): Option[Array[Byte]] = Option(queue.poll()) - def isEmpty: Boolean = queue.isEmpty - def size: Int = queue.size -} - -//#custom-mailbox-test -import akka.actor.mailbox.DurableMailboxSpec - -object MyMailboxSpec { - val config = """ - MyStorage-dispatcher { - mailbox-type = docs.actor.mailbox.MyMailboxType - } - """ -} - -class MyMailboxSpec extends DurableMailboxSpec("MyStorage", MyMailboxSpec.config) { - override def atStartup() { - } - - override def afterTermination() { - } - - "MyMailbox" must { - "deliver a message" in { - val actor = createMailboxTestActor() - implicit val sender = testActor - actor ! "hello" - expectMsg("hello") - } - - // add more tests - } -} diff --git a/akka-docs/rst/scala/durable-mailbox.rst b/akka-docs/rst/scala/durable-mailbox.rst deleted file mode 100644 index 2b3517ee9b..0000000000 --- a/akka-docs/rst/scala/durable-mailbox.rst +++ /dev/null @@ -1,99 +0,0 @@ - -.. _durable-mailboxes-scala: - -########################### - Durable Mailboxes -########################### - - -Overview -======== - -A durable mailbox is a mailbox which stores the messages on durable storage. -What this means in practice is that if there are pending messages in the actor's -mailbox when the node of the actor resides on crashes, then when you restart the -node, the actor will be able to continue processing as if nothing had happened; -with all pending messages still in its mailbox. - -You configure durable mailboxes through the dispatcher or the actor deployment (see -:ref:`mailboxes-scala`). The actor is oblivious to which type of mailbox it is using. - -This gives you an excellent way of creating bulkheads in your application, where -groups of actors sharing the same dispatcher also share the same backing -storage. Read more about that in the :ref:`dispatchers-scala` documentation. - -One basic file based durable mailbox is provided by Akka out-of-the-box. -Other implementations can easily be added. Some are available as separate community -Open Source projects, such as: - -* `AMQP Durable Mailbox `_ - - -A durable mailbox is like any other mailbox not likely to be transactional. It's possible -if the actor crashes after receiving a message, but before completing processing of -it, that the message could be lost. - -.. warning:: - - A durable mailbox typically doesn't work with blocking message send, i.e. the message - send operations that are relying on futures; ``?`` or ``ask``. If the node - has crashed and then restarted, the thread that was blocked waiting for the - reply is gone and there is no way we can deliver the message. - - -File-based durable mailbox -========================== - -This mailbox is backed by a journaling transaction log on the local file -system. It is the simplest to use since it does not require an extra -infrastructure piece to administer, but it is usually sufficient and just what -you need. - -In the configuration of the dispatcher you specify the fully qualified class name -of the mailbox: - -.. includecode:: code/docs/actor/mailbox/DurableMailboxDocSpec.scala - :include: dispatcher-config - -Here is an example of how to create an actor with a durable dispatcher: - -.. includecode:: code/docs/actor/mailbox/DurableMailboxDocSpec.scala - :include: imports,dispatcher-config-use - -You can also configure and tune the file-based durable mailbox. This is done in -the ``akka.actor.mailbox.file-based`` section in the :ref:`configuration`. - -.. literalinclude:: ../../../akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf - :language: none - -How to implement a durable mailbox -================================== - -Here is an example of how to implement a custom durable mailbox. Essentially it consists of -a configurator (MailboxType) and a queue implementation (DurableMessageQueue). - -The envelope contains the message sent to the actor, and information about sender. It is the -envelope that needs to be stored. As a help utility you can mixin DurableMessageSerialization -to serialize and deserialize the envelope using the ordinary :ref:`serialization-scala` -mechanism. This optional and you may store the envelope data in any way you like. Durable -mailboxes are an excellent fit for usage of circuit breakers. These are described in the -:ref:`circuit-breaker` documentation. - -.. includecode:: code/docs/actor/mailbox/DurableMailboxDocSpec.scala - :include: custom-mailbox - -To facilitate testing of a durable mailbox you may use ``DurableMailboxSpec`` as base class. -To use ``DurableMailboxDocSpec`` add this dependency:: - - "com.typesafe.akka" %% "akka-mailboxes-common" % - "@version@" classifier "test" @crossString@ - -It implements a few basic tests and helps you setup the a fixture. More tests can be -added in concrete subclass like this: - -.. includecode:: code/docs/actor/mailbox/DurableMailboxDocSpec.scala - :include: custom-mailbox-test - -For more inspiration you can look at the old implementations based on Redis, MongoDB, Beanstalk, -and ZooKeeper, which can be found in Akka git repository tag -`v2.0.1 `_. diff --git a/akka-docs/rst/scala/index-utilities.rst b/akka-docs/rst/scala/index-utilities.rst index 3132e60d58..38ea3d7bdc 100644 --- a/akka-docs/rst/scala/index-utilities.rst +++ b/akka-docs/rst/scala/index-utilities.rst @@ -10,5 +10,4 @@ Utilities ../common/duration ../common/circuitbreaker extending-akka - durable-mailbox microkernel diff --git a/akka-docs/rst/scala/mailboxes.rst b/akka-docs/rst/scala/mailboxes.rst index 2d4d805f3c..8b86a9e15b 100644 --- a/akka-docs/rst/scala/mailboxes.rst +++ b/akka-docs/rst/scala/mailboxes.rst @@ -161,8 +161,6 @@ Akka comes shipped with a number of mailbox implementations: - Configuration name: "akka.dispatch.BoundedPriorityMailbox" -* Durable mailboxes, see :ref:`durable-mailboxes-scala`. - Mailbox configuration examples ============================== diff --git a/akka-docs/rst/scala/serialization.rst b/akka-docs/rst/scala/serialization.rst index c5240e4f14..4154cd7e7b 100644 --- a/akka-docs/rst/scala/serialization.rst +++ b/akka-docs/rst/scala/serialization.rst @@ -112,8 +112,8 @@ address which shall be the recipient of the serialized information. Use This assumes that serialization happens in the context of sending a message through the remote transport. There are other uses of serialization, though, -e.g. storing actor references outside of an actor application (database, -durable mailbox, etc.). In this case, it is important to keep in mind that the +e.g. storing actor references outside of an actor application (database, etc.). +In this case, it is important to keep in mind that the address part of an actor’s path determines how that actor is communicated with. Storing a local actor path might be the right choice if the retrieval happens in the same logical context, but it is not enough when deserializing it on a diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf b/akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf index 66f125e624..286f544c29 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf @@ -10,6 +10,7 @@ akka { actor { mailbox { + # deprecated, superseded by akka-persistence file-based { # directory below which this queue resides directory-path = "./_mb" diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/FileBasedMailbox.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/FileBasedMailbox.scala index 85fae3d4a5..91d26270d1 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/FileBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/FileBasedMailbox.scala @@ -14,6 +14,7 @@ import scala.util.control.NonFatal import akka.pattern.{ CircuitBreakerOpenException, CircuitBreaker } import scala.concurrent.duration.Duration +@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") class FileBasedMailboxType(systemSettings: ActorSystem.Settings, config: Config) extends MailboxType { private val settings = new FileBasedMailboxSettings(systemSettings, config) override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = (owner zip system).headOption match { @@ -22,6 +23,7 @@ class FileBasedMailboxType(systemSettings: ActorSystem.Settings, config: Config) } } +@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") class FileBasedMessageQueue(_owner: ActorRef, _system: ExtendedActorSystem, val settings: FileBasedMailboxSettings) extends DurableMessageQueue(_owner, _system) with DurableMessageSerialization { // TODO Is it reasonable for all FileBasedMailboxes to have their own logger? diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/FileBasedMailboxSettings.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/FileBasedMailboxSettings.scala index b346906c35..7c73b83aa8 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/FileBasedMailboxSettings.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/FileBasedMailboxSettings.scala @@ -9,6 +9,7 @@ import scala.concurrent.duration._ import java.util.concurrent.TimeUnit.MILLISECONDS import akka.actor.ActorSystem +@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") class FileBasedMailboxSettings(val systemSettings: ActorSystem.Settings, val userConfig: Config) extends DurableMailboxSettings { diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/BrokenItemException.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/BrokenItemException.scala index 636cbfc4aa..c029c98451 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/BrokenItemException.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/BrokenItemException.scala @@ -19,4 +19,5 @@ package akka.actor.mailbox.filebased.filequeue import java.io.IOException +@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") case class BrokenItemException(lastValidPosition: Long, cause: Throwable) extends IOException(cause) diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/Counter.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/Counter.scala index c2de226d20..9d3133018f 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/Counter.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/Counter.scala @@ -19,6 +19,7 @@ package akka.actor.mailbox.filebased.filequeue import java.util.concurrent.atomic.AtomicLong +@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") class Counter { private val value = new AtomicLong(0) diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/Journal.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/Journal.scala index 6862ad9600..a15b78e4b9 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/Journal.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/Journal.scala @@ -25,6 +25,7 @@ import scala.util.control.NonFatal // returned from journal replay sealed trait JournalItem +@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") object JournalItem { case class Add(item: QItem) extends JournalItem case object Remove extends JournalItem @@ -38,6 +39,7 @@ object JournalItem { /** * Codes for working with the journal file for a PersistentQueue. */ +@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") class Journal(queuePath: String, syncJournal: ⇒ Boolean, log: LoggingAdapter) { private val queueFile = new File(queuePath) diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/PersistentQueue.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/PersistentQueue.scala index 399ca74829..3f1582994b 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/PersistentQueue.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/PersistentQueue.scala @@ -25,6 +25,7 @@ import scala.annotation.tailrec import akka.actor.mailbox.filebased.FileBasedMailboxSettings // a config value that's backed by a global setting but may be locally overridden +@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") class OverlaySetting[T](base: ⇒ T) { @volatile private var local: Option[T] = None @@ -38,6 +39,7 @@ trait Prependable[T] { def prepend(t: T): Unit } +@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") class PersistentQueue(persistencePath: String, val name: String, val settings: FileBasedMailboxSettings, log: LoggingAdapter) { private case object ItemArrived @@ -448,6 +450,7 @@ class PersistentQueue(persistencePath: String, val name: String, val settings: F } } +@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") object PersistentQueue { @volatile var maxItems: Int = Int.MaxValue diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/QItem.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/QItem.scala index 1d0eb543d6..8f0412b997 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/QItem.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/QItem.scala @@ -19,6 +19,7 @@ package akka.actor.mailbox.filebased.filequeue import java.nio.{ ByteBuffer, ByteOrder } +@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") case class QItem(addTime: Long, expiry: Long, data: Array[Byte], var xid: Int) { def pack(): Array[Byte] = { val bytes = new Array[Byte](data.length + 16) @@ -31,6 +32,7 @@ case class QItem(addTime: Long, expiry: Long, data: Array[Byte], var xid: Int) { } } +@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") object QItem { def unpack(data: Array[Byte]): QItem = { val buffer = ByteBuffer.wrap(data) diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/QueueCollection.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/QueueCollection.scala index 95550f6603..43112cd08a 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/QueueCollection.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/QueueCollection.scala @@ -24,7 +24,7 @@ import akka.event.LoggingAdapter import akka.actor.mailbox.filebased.FileBasedMailboxSettings class InaccessibleQueuePath extends Exception("Inaccessible queue path: Must be a directory and writable") - +@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") class QueueCollection(queueFolder: String, settings: FileBasedMailboxSettings, log: LoggingAdapter) { private val path = new File(queueFolder) diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/tools/QDumper.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/tools/QDumper.scala index f0944d0fe7..07c506dec2 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/tools/QDumper.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/tools/QDumper.scala @@ -25,6 +25,7 @@ import akka.actor.mailbox.filebased.filequeue._ import akka.event.LoggingAdapter import akka.actor.ActorSystem +@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") class QueueDumper(filename: String, log: LoggingAdapter) { var offset = 0L var operations = 0L @@ -110,6 +111,7 @@ class QueueDumper(filename: String, log: LoggingAdapter) { } } +@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") object QDumper { val filenames = new mutable.ListBuffer[String] var quiet = false diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/tools/Util.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/tools/Util.scala index 5a6f492815..c8e46b49d3 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/tools/Util.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filebased/filequeue/tools/Util.scala @@ -17,6 +17,7 @@ package akka.actor.mailbox.filebased.filequeue.tools +@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") object Util { val KILOBYTE = 1024L val MEGABYTE = 1024 * KILOBYTE diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala index c7a3c42433..e272fd3bca 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala @@ -13,6 +13,7 @@ private[akka] object DurableExecutableMailboxConfig { val Name = "[\\.\\/\\$\\s]".r } +@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") abstract class DurableMessageQueue(val owner: ActorRef, val system: ExtendedActorSystem) extends MessageQueue { import DurableExecutableMailboxConfig._ @@ -26,6 +27,7 @@ abstract class DurableMessageQueue(val owner: ActorRef, val system: ExtendedActo * Java API * DurableMessageQueue with functionality to serialize and deserialize Envelopes (messages) */ +@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") abstract class DurableMessageQueueWithSerialization(_owner: ActorRef, _system: ExtendedActorSystem) extends DurableMessageQueue(_owner, _system) with DurableMessageSerialization @@ -33,6 +35,7 @@ abstract class DurableMessageQueueWithSerialization(_owner: ActorRef, _system: E * DurableMessageSerialization can be mixed into a DurableMessageQueue and adds functionality * to serialize and deserialize Envelopes (messages) */ +@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") trait DurableMessageSerialization { this: DurableMessageQueue ⇒ /** @@ -91,6 +94,7 @@ trait DurableMessageSerialization { this: DurableMessageQueue ⇒ * * where name=“my-durable-mailbox” in this example. */ +@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") trait DurableMailboxSettings { /** * A reference to the enclosing actor system. diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala index 097187a509..2ca2364327 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala @@ -21,6 +21,7 @@ import akka.dispatch.Mailbox import akka.testkit.TestKit import scala.concurrent.duration._ +@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") object DurableMailboxSpecActorFactory { class MailboxTestActor extends Actor { @@ -37,6 +38,7 @@ object DurableMailboxSpecActorFactory { } +@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") object DurableMailboxSpec { def fallbackConfig: Config = ConfigFactory.parseString(""" akka { @@ -54,6 +56,7 @@ object DurableMailboxSpec { * Subclass must define dispatcher in the supplied config for the specific backend. * The id of the dispatcher must be the same as the `-dispatcher`. */ +@deprecated("durable mailboxes are superseded by akka-persistence", "2.3") abstract class DurableMailboxSpec(system: ActorSystem, val backendName: String) extends TestKit(system) with WordSpecLike with MustMatchers with BeforeAndAfterAll { diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 6bf84f7f66..406ab712b8 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -618,8 +618,8 @@ object AkkaBuild extends Build { lazy val docs = Project( id = "akka-docs", base = file("akka-docs"), - dependencies = Seq(actor, testkit % "test->test", mailboxesCommon % "compile;test->test", channels, - remote % "compile;test->test", cluster, slf4j, agent, transactor, fileMailbox, zeroMQ, camel, osgi, osgiAries, + dependencies = Seq(actor, testkit % "test->test", channels, + remote % "compile;test->test", cluster, slf4j, agent, transactor, zeroMQ, camel, osgi, osgiAries, persistence % "compile;test->test"), settings = defaultSettings ++ docFormatSettings ++ site.settings ++ site.sphinxSupport() ++ site.publishSite ++ sphinxPreprocessing ++ cpsPlugin ++ Seq( sourceDirectory in Sphinx <<= baseDirectory / "rst",