diff --git a/akka-bench-jmh/src/main/scala/akka/persistence/PersistentActorThroughputBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/persistence/PersistentActorThroughputBenchmark.scala new file mode 100644 index 0000000000..71804d6ede --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/persistence/PersistentActorThroughputBenchmark.scala @@ -0,0 +1,144 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.persistence + +import org.openjdk.jmh.annotations._ +import org.openjdk.jmh._ +import com.typesafe.config.ConfigFactory +import akka.actor._ +import akka.testkit.TestProbe +import java.io.File +import org.apache.commons.io.FileUtils +import org.openjdk.jmh.annotations.Scope + +@State(Scope.Benchmark) +@BenchmarkMode(Array(Mode.Throughput)) +class PersistentActorThroughputBenchmark { + + val config = PersistenceSpec.config("leveldb", "benchmark") + + lazy val storageLocations = List( + "akka.persistence.journal.leveldb.dir", + "akka.persistence.journal.leveldb-shared.store.dir", + "akka.persistence.snapshot-store.local.dir").map(s ⇒ new File(system.settings.config.getString(s))) + + var system: ActorSystem = _ + + var probe: TestProbe = _ + var actor: ActorRef = _ + var persist1EventProcessor: ActorRef = _ + var persist1CommandProcessor: ActorRef = _ + var persistAsync1EventProcessor: ActorRef = _ + var persistAsync1QuickReplyEventProcessor: ActorRef = _ + + val data10k = (1 to 10000).toArray + + @Setup + def setup() { + system = ActorSystem("test", config) + + probe = TestProbe()(system) + + storageLocations.foreach(FileUtils.deleteDirectory) + actor = system.actorOf(Props(classOf[BaselineActor], data10k.last), "a-1") + persist1CommandProcessor = system.actorOf(Props(classOf[Persist1EventPersistentActor], data10k.last), "p-1") + persist1EventProcessor = system.actorOf(Props(classOf[Persist1EventPersistentActor], data10k.last), "ep-1") + persistAsync1EventProcessor = system.actorOf(Props(classOf[PersistAsync1EventPersistentActor], data10k.last), "epa-1") + persistAsync1QuickReplyEventProcessor = system.actorOf(Props(classOf[PersistAsync1EventQuickReplyPersistentActor], data10k.last), "epa-2") + } + + @TearDown + def shutdown() { + system.shutdown() + system.awaitTermination() + + storageLocations.foreach(FileUtils.deleteDirectory) + } + + @GenerateMicroBenchmark + @OperationsPerInvocation(10000) + def tell_normalActor_reply_baseline() { + for (i <- data10k) actor.tell(i, probe.ref) + + probe.expectMsg(Evt(data10k.last)) + } + + @GenerateMicroBenchmark + @OperationsPerInvocation(10000) + def tell_persist_reply() { + for (i <- data10k) persist1EventProcessor.tell(i, probe.ref) + + probe.expectMsg(Evt(data10k.last)) + } + + @GenerateMicroBenchmark + @OperationsPerInvocation(10000) + def tell_commandPersist_reply() { + for (i <- data10k) persist1CommandProcessor.tell(i, probe.ref) + + probe.expectMsg(Evt(data10k.last)) + } + + @GenerateMicroBenchmark + @OperationsPerInvocation(10000) + def tell_persistAsync_reply() { + for (i <- data10k) persistAsync1EventProcessor.tell(i, probe.ref) + + probe.expectMsg(Evt(data10k.last)) + } + + @GenerateMicroBenchmark + @OperationsPerInvocation(10000) + def tell_persistAsync_replyRightOnCommandReceive() { + for (i <- data10k) persistAsync1QuickReplyEventProcessor.tell(i, probe.ref) + + probe.expectMsg(Evt(data10k.last)) + } +} + +final case class Evt(i: Int) + +class Persist1EventPersistentActor(respondAfter: Int) extends PersistentActor { + override def receiveCommand = { + case n: Int => persist(Evt(n)) { e => if (e.i == respondAfter) sender() ! e } + } + override def receiveRecover = { + case _ => // do nothing + } + +} +class Persist1CommandProcessor(respondAfter: Int) extends Processor { + override def receive = { + case n: Int => if (n == respondAfter) sender() ! Evt(n) + } +} + +class PersistAsync1EventPersistentActor(respondAfter: Int) extends PersistentActor { + override def receiveCommand = { + case n: Int => + persistAsync(Evt(n)) { e => if (e.i == respondAfter) sender() ! e } + } + override def receiveRecover = { + case _ => // do nothing + } +} + +class PersistAsync1EventQuickReplyPersistentActor(respondAfter: Int) extends PersistentActor { + override def receiveCommand = { + case n: Int => + val e = Evt(n) + if (n == respondAfter) sender() ! e + persistAsync(e)(identity) + } + override def receiveRecover = { + case _ => // do nothing + } +} + +/** only as a "the best we could possibly get" baseline, does not persist anything */ +class BaselineActor(respondAfter: Int) extends Actor { + override def receive = { + case n: Int => if (n == respondAfter) sender() ! Evt(n) + } +} \ No newline at end of file diff --git a/akka-contrib/docs/cluster-sharding.rst b/akka-contrib/docs/cluster-sharding.rst index 924f70a673..eaa681830b 100644 --- a/akka-contrib/docs/cluster-sharding.rst +++ b/akka-contrib/docs/cluster-sharding.rst @@ -29,7 +29,7 @@ This is how an entry actor may look like: .. includecode:: @contribSrc@/src/test/java/akka/contrib/pattern/ClusterShardingTest.java#counter-actor -The above actor uses event sourcing and the support provided in ``UntypedEventsourcedProcessor`` to store its state. +The above actor uses event sourcing and the support provided in ``UntypedPersistentActor`` to store its state. It does not have to be a processor, but in case of failure or migration of entries between nodes it must be able to recover its state if it is valuable. @@ -73,7 +73,7 @@ This is how an entry actor may look like: .. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala#counter-actor -The above actor uses event sourcing and the support provided in ``EventsourcedProcessor`` to store its state. +The above actor uses event sourcing and the support provided in ``PersistentActor`` to store its state. It does not have to be a processor, but in case of failure or migration of entries between nodes it must be able to recover its state if it is valuable. diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala index 18db79b17a..fb47bfe818 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala @@ -32,7 +32,7 @@ import akka.cluster.ClusterEvent.MemberUp import akka.cluster.Member import akka.cluster.MemberStatus import akka.pattern.ask -import akka.persistence.EventsourcedProcessor +import akka.persistence.PersistentActor import akka.cluster.ClusterEvent.ClusterDomainEvent import akka.persistence.SnapshotOffer import akka.persistence.SaveSnapshotSuccess @@ -1183,7 +1183,7 @@ object ShardCoordinator { */ class ShardCoordinator(handOffTimeout: FiniteDuration, rebalanceInterval: FiniteDuration, snapshotInterval: FiniteDuration, allocationStrategy: ShardCoordinator.ShardAllocationStrategy) - extends EventsourcedProcessor with ActorLogging { + extends PersistentActor with ActorLogging { import ShardCoordinator._ import ShardCoordinator.Internal._ import ShardRegion.ShardId diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala index a4aa2b2522..25e89bf060 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala @@ -11,7 +11,7 @@ import akka.actor.Identify import akka.actor.PoisonPill import akka.actor.Props import akka.cluster.Cluster -import akka.persistence.EventsourcedProcessor +import akka.persistence.PersistentActor import akka.persistence.Persistence import akka.persistence.journal.leveldb.SharedLeveldbJournal import akka.persistence.journal.leveldb.SharedLeveldbStore @@ -72,7 +72,7 @@ object ClusterShardingSpec extends MultiNodeConfig { case object Stop final case class CounterChanged(delta: Int) - class Counter extends EventsourcedProcessor { + class Counter extends PersistentActor { import ShardRegion.Passivate context.setReceiveTimeout(120.seconds) diff --git a/akka-contrib/src/test/java/akka/contrib/pattern/ClusterShardingTest.java b/akka-contrib/src/test/java/akka/contrib/pattern/ClusterShardingTest.java index 0147d978c0..8427ce2f4d 100644 --- a/akka-contrib/src/test/java/akka/contrib/pattern/ClusterShardingTest.java +++ b/akka-contrib/src/test/java/akka/contrib/pattern/ClusterShardingTest.java @@ -13,7 +13,7 @@ import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.ReceiveTimeout; import akka.japi.Procedure; -import akka.persistence.UntypedEventsourcedProcessor; +import akka.persistence.UntypedPersistentActor; // Doc code, compile only public class ClusterShardingTest { @@ -78,7 +78,7 @@ public class ClusterShardingTest { } static//#counter-actor - public class Counter extends UntypedEventsourcedProcessor { + public class Counter extends UntypedPersistentActor { public static enum CounterOp { INCREMENT, DECREMENT diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java index 2e296ce950..f1d7e5bbf1 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java @@ -4,13 +4,17 @@ package docs.persistence; -import java.util.concurrent.TimeUnit; - -import scala.Option; -import scala.concurrent.duration.Duration; -import akka.actor.*; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.actor.UntypedActor; import akka.japi.Procedure; import akka.persistence.*; +import scala.Option; +import scala.concurrent.duration.Duration; + +import java.util.concurrent.TimeUnit; + import static java.util.Arrays.asList; public class PersistenceDocTest { @@ -353,11 +357,11 @@ public class PersistenceDocTest { static Object o8 = new Object() { //#reliable-event-delivery - class MyEventsourcedProcessor extends UntypedEventsourcedProcessor { + class MyPersistentActor extends UntypedPersistentActor { private ActorRef destination; private ActorRef channel; - public MyEventsourcedProcessor(ActorRef destination) { + public MyPersistentActor(ActorRef destination) { this.destination = destination; this.channel = getContext().actorOf(Channel.props(), "channel"); } @@ -390,6 +394,53 @@ public class PersistenceDocTest { }; static Object o9 = new Object() { + //#persist-async + class MyPersistentActor extends UntypedPersistentActor { + + @Override + public void onReceiveRecover(Object msg) { + // handle recovery here + } + + @Override + public void onReceiveCommand(Object msg) { + sender().tell(msg, getSelf()); + + persistAsync(String.format("evt-%s-1", msg), new Procedure(){ + @Override + public void apply(String event) throws Exception { + sender().tell(event, self()); + } + }); + persistAsync(String.format("evt-%s-2", msg), new Procedure(){ + @Override + public void apply(String event) throws Exception { + sender().tell(event, self()); + } + }); + } + } + //#persist-async + + public void usage() { + final ActorSystem system = ActorSystem.create("example"); + //#view-update + final ActorRef processor = system.actorOf(Props.create(MyPersistentActor.class)); + processor.tell("a", null); + processor.tell("b", null); + + // possible order of received messages: + // a + // b + // evt-a-1 + // evt-a-2 + // evt-b-1 + // evt-b-2 + //#view-update + } + }; + + static Object o10 = new Object() { //#view class MyView extends UntypedView { @Override diff --git a/akka-docs/rst/java/lambda-persistence.rst b/akka-docs/rst/java/lambda-persistence.rst index 955e7d114d..b70f5847f0 100644 --- a/akka-docs/rst/java/lambda-persistence.rst +++ b/akka-docs/rst/java/lambda-persistence.rst @@ -442,13 +442,13 @@ also process commands that do not change application state, such as query comman .. _Event Sourcing: http://martinfowler.com/eaaDev/EventSourcing.html -Akka persistence supports event sourcing with the ``AbstractEventsourcedProcessor`` abstract class (which implements +Akka persistence supports event sourcing with the ``AbstractPersistentActor`` abstract class (which implements event sourcing as a pattern on top of command sourcing). A processor that extends this abstract class does not handle ``Persistent`` messages directly but uses the ``persist`` method to persist and handle events. The behavior of an -``AbstractEventsourcedProcessor`` is defined by implementing ``receiveRecover`` and ``receiveCommand``. This is +``AbstractEventsPersistentActordefined by implementing ``receiveRecover`` and ``receiveCommand``. This is demonstrated in the following example. -.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/EventsourcedExample.java#eventsourced-example +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/PersistentActorExample.java#persistent-actor-example The example defines two data types, ``Cmd`` and ``Evt`` to represent commands and events, respectively. The ``state`` of the ``ExampleProcessor`` is a list of persisted event data contained in ``ExampleState``. @@ -473,7 +473,7 @@ calls in context of a single command. The easiest way to run this example yourself is to download `Typesafe Activator `_ and open the tutorial named `Akka Persistence Samples in Java with Lambdas `_. -It contains instructions on how to run the ``EventsourcedExample``. +It contains instructions on how to run the ``PersistentActorExample``. .. note:: @@ -518,9 +518,9 @@ Applications that want to have more explicit control over batch writes and batch size is greater than ``max-message-batch-size``. Also, a ``PersistentBatch`` is written isolated from other batches. ``Persistent`` messages contained in a ``PersistentBatch`` are received individually by a processor. -``PersistentBatch`` messages, for example, are used internally by an ``AbstractEventsourcedProcessor`` to ensure atomic +``PersistentBatch`` messages, for example, are used internally by an ``AbstractEventsourcedPersistentActor atomic writes of events. All events that are persisted in context of a single command are written as a single batch to the -journal (even if ``persist`` is called multiple times per command). The recovery of an ``AbstractEventsourcedProcessor`` +journal (even if ``persist`` is called multiple times per command). The recovery of an ``AbstractPersistentActor`` will therefore never be done partially (with only a subset of events persisted by a single command). Confirmation and deletion operations performed by :ref:`channels-java-lambda` are also batched. The maximum diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index fe256b071b..55f8aa5c5d 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -48,9 +48,14 @@ Akka persistence is a separate jar file. Make sure that you have the following d Architecture ============ -* *Processor*: A processor is a persistent, stateful actor. Messages sent to a processor are written to a journal - before its ``onReceive`` method is called. When a processor is started or restarted, journaled messages are replayed - to that processor, so that it can recover internal state from these messages. +* *Processor* (deprecated, use *PersistentActor* instead): A processor is a persistent, stateful actor. Messages sent + to a processor are written to a journal before its ``onReceive`` method is called. When a processor is started or + restarted, journaled messages are replayed to that processor, so that it can recover internal state from these messages. + +* *PersistentActor*: Is a persistent, stateful actor. It is able to persist events to a journal and can react to + them in a thread-safe manner. It can be used to implement both *command* as well as *event sourced* actors. + When a persistent actor is started or restarted, journaled messages are replayed to that actor, so that it can + recover internal state from these messages. * *View*: A view is a persistent, stateful actor that receives journaled messages that have been written by another processor. A view itself does not journal new messages, instead, it updates internal state only from a processor's @@ -79,6 +84,12 @@ Architecture Processors ========== +.. warning:: + ``Processor`` is deprecated. Instead the current ``PersistentActor`` will be extended to provide equivalent + functionality if required (by introducing the ``persistAsync`` method). + For details see `Relaxed local consistency requirements and high throughput use-cases`_ as well as the discussion + and pull requests related to this `issue on Github `_. + A processor can be implemented by extending the abstract ``UntypedProcessor`` class and implementing the ``onReceive`` method. @@ -451,6 +462,11 @@ use the ``deleteSnapshots`` method. Event sourcing ============== +.. note:: + The ``PersistentActor`` introduced in this section was previously known as ``EventsourcedProcessor``, + which was a subset of the ``PersistentActor``. Migrating your code to use persistent actors instead is + very simple and is explained in the :ref:`migration-guide-persistence-experimental-2.3.x-2.4.x`. + In all the examples so far, messages that change a processor's state have been sent as ``Persistent`` messages by an application, so that they can be replayed during recovery. From this point of view, the journal acts as a write-ahead-log for whatever ``Persistent`` messages a processor receives. This is also known as *command @@ -468,13 +484,13 @@ also process commands that do not change application state, such as query comman .. _Event Sourcing: http://martinfowler.com/eaaDev/EventSourcing.html -Akka persistence supports event sourcing with the abstract ``UntypedEventsourcedProcessor`` class (which implements +Akka persistence supports event sourcing with the abstract ``UntypedPersistentActor`` class (which implements event sourcing as a pattern on top of command sourcing). A processor that extends this abstract class does not handle ``Persistent`` messages directly but uses the ``persist`` method to persist and handle events. The behavior of an -``UntypedEventsourcedProcessor`` is defined by implementing ``onReceiveRecover`` and ``onReceiveCommand``. This is +``UntypedPersistentActor`` is defined by implementing ``onReceiveRecover`` and ``onReceiveCommand``. This is demonstrated in the following example. -.. includecode:: ../../../akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/EventsourcedExample.java#eventsourced-example +.. includecode:: ../../../akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/PersistentActorExample.java#persistent-actor-example The example defines two data types, ``Cmd`` and ``Evt`` to represent commands and events, respectively. The ``state`` of the ``ExampleProcessor`` is a list of persisted event data contained in ``ExampleState``. @@ -499,7 +515,7 @@ calls in context of a single command. The easiest way to run this example yourself is to download `Typesafe Activator `_ and open the tutorial named `Akka Persistence Samples with Java `_. -It contains instructions on how to run the ``EventsourcedExample``. +It contains instructions on how to run the ``PersistentActorExample``. .. note:: @@ -508,6 +524,29 @@ It contains instructions on how to run the ``EventsourcedExample``. recovery you need to take special care to perform the same state transitions with ``become`` and ``unbecome`` in the ``receiveRecover`` method as you would have done in the command handler. +.. _persist-async-java: + +Relaxed local consistency requirements and high throughput use-cases +-------------------------------------------------------------------- + +If faced with Relaxed local consistency requirements and high throughput demands sometimes ``PersistentActor`` and it's +``persist`` may not be enough in terms of consuming incoming Commands at a high rate, because it has to wait until all +Events related to a given Command are processed in order to start processing the next Command. While this abstraction is +very useful for most cases, sometimes you may be faced with relaxed requirements about consistency – for example you may +want to process commands as fast as you can, assuming that Event will eventually be persisted and handled properly in +the background and retroactively reacting to persistence failures if needed. + +The ``persistAsync`` method provides a tool for implementing high-throughput processors. It will *not* +stash incoming Commands while the Journal is still working on persisting and/or user code is executing event callbacks. + +In the below example, the event callbacks may be called "at any time", even after the next Command has been processed. +The ordering between events is still guaranteed ("evt-b-1" will be sent after "evt-a-2", which will be sent after "evt-a-1" etc.). + +.. includecode:: code/docs/persistence/PersistenceDocTest.java#persist-async + +.. note:: + In order to implement the pattern known as "*command sourcing*" simply ``persistAsync`` all incoming events right away, + and handle them in the callback. Reliable event delivery ----------------------- @@ -543,9 +582,9 @@ Applications that want to have more explicit control over batch writes and batch size is greater than ``max-message-batch-size``. Also, a ``PersistentBatch`` is written isolated from other batches. ``Persistent`` messages contained in a ``PersistentBatch`` are received individually by a processor. -``PersistentBatch`` messages, for example, are used internally by an ``UntypedEventsourcedProcessor`` to ensure atomic +``PersistentBatch`` messages, for example, are used internally by an ``UntypedPersistentActor`` to ensure atomic writes of events. All events that are persisted in context of a single command are written as a single batch to the -journal (even if ``persist`` is called multiple times per command). The recovery of an ``UntypedEventsourcedProcessor`` +journal (even if ``persist`` is called multiple times per command). The recovery of an ``UntypedPersistentActor`` will therefore never be done partially (with only a subset of events persisted by a single command). Confirmation and deletion operations performed by :ref:`channels-java` are also batched. The maximum confirmation diff --git a/akka-docs/rst/project/migration-guide-eventsourced-2.3.x.rst b/akka-docs/rst/project/migration-guide-eventsourced-2.3.x.rst index a5e5cf02e0..879c73e13b 100644 --- a/akka-docs/rst/project/migration-guide-eventsourced-2.3.x.rst +++ b/akka-docs/rst/project/migration-guide-eventsourced-2.3.x.rst @@ -1,8 +1,8 @@ .. _migration-eventsourced-2.3: -####################################################### - Migration Guide Eventsourced to Akka Persistence 2.3.x -####################################################### +###################################################### +Migration Guide Eventsourced to Akka Persistence 2.3.x +###################################################### General notes ============= diff --git a/akka-docs/rst/project/migration-guide-persistence-experimental-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-persistence-experimental-2.3.x-2.4.x.rst new file mode 100644 index 0000000000..937a378daa --- /dev/null +++ b/akka-docs/rst/project/migration-guide-persistence-experimental-2.3.x-2.4.x.rst @@ -0,0 +1,67 @@ +.. _migration-guide-persistence-experimental-2.3.x-2.4.x: + +##################################################### +Migration Guide Akka Persistence (experimental) 2.3.3 +##################################################### + +**Akka Persistence** is an **experimental module**, which means that neither Binary Compatibility nor API stability +is provided for Persistence while under the *experimental* flag. The goal of this phase is to gather user feedback +before we freeze the APIs in a major release. + +Renamed EventsourcedProcessor to PersistentActor +================================================ +``EventsourcedProcessor`` is now deprecated and replaced by ``PersistentActor`` which provides the same (and more) API. +Migrating to ``2.4.x`` is as simple as changing all your classes to extending ``PersistentActor``. + +Replace all classes like:: + + class DeprecatedProcessor extends EventsourcedProcessor { /*...*/ } + +To extend ``PersistentActor``:: + + class NewPersistentProcessor extends PersistentActor { /*...*/ } + +No other API changes are required for this migration. + +Removed Processor in favour of extending PersistentActor with persistAsync +========================================================================== + +The ``Processor`` is now deprecated since ``2.3.4`` and will be removed in ``2.4.x``. +It's semantics replicated in ``PersistentActor`` in the form of an additional ``persist`` method: ``persistAsync``. + +In essence, the difference betwen ``persist`` and ``persistAsync`` is that the former will stash all incomming commands +until all persist callbacks have been processed, whereas the latter does not stash any commands. The new ``persistAsync`` +should be used in cases of low consistency yet high responsiveness requirements, the Actor can keep processing incomming +commands, even though not all previous events have been handled. + +When these ``persist`` and ``persistAsync`` are used together in the same ``PersistentActor``, the ``persist`` +logic will win over the async version so that all guarantees concerning persist still hold. This will however lower +the throughput + +Now deprecated code using Processor:: + + class OldProcessor extends Processor { + def receive = { + case Persistent(cmd) => sender() ! cmd + } + } + +Replacement code, with the same semantics, using PersistentActor:: + + class NewProcessor extends PersistentActor { + def receiveCommand = { + case cmd => + persistAsync(cmd) { e => sender() ! e } + } + + def receiveEvent = { + case _ => // logic for handling replay + } + } + +It is worth pointing out that using ``sender()`` inside the persistAsync callback block is **valid**, and does *not* suffer +any of the problems Futures have when closing over the sender reference. + +Using the``PersistentActor`` instead of ``Processor`` also shifts the responsibility of deciding if a message should be persisted +to the receiver instead of the sender of the message. Previously, using ``Processor``, clients would have to wrap messages as ``Persistent(cmd)`` +manually, as well as have to be aware of the receiver being a ``Processor``, which didn't play well with transparency of the ActorRefs in general. diff --git a/akka-docs/rst/project/migration-guides.rst b/akka-docs/rst/project/migration-guides.rst index 93f36e63c1..2dae824d3c 100644 --- a/akka-docs/rst/project/migration-guides.rst +++ b/akka-docs/rst/project/migration-guides.rst @@ -11,4 +11,5 @@ Migration Guides migration-guide-2.1.x-2.2.x migration-guide-2.2.x-2.3.x migration-guide-2.3.x-2.4.x + migration-guide-persistence-experimental-2.3.x-2.4.x migration-guide-eventsourced-2.3.x diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala index 1292d2a517..7b132e43b9 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala @@ -7,7 +7,7 @@ package docs.persistence import scala.concurrent.duration._ import scala.language.postfixOps -import akka.actor.{ Actor, ActorSystem } +import akka.actor.{ Props, Actor, ActorSystem } import akka.persistence._ trait PersistenceDocSpec { @@ -314,7 +314,7 @@ trait PersistenceDocSpec { import akka.actor.ActorRef //#reliable-event-delivery - class MyEventsourcedProcessor(destination: ActorRef) extends EventsourcedProcessor { + class MyPersistentActor(destination: ActorRef) extends PersistentActor { val channel = context.actorOf(Channel.props("channel")) def handleEvent(event: String) = { @@ -337,6 +337,42 @@ trait PersistenceDocSpec { } //#reliable-event-delivery } + + new AnyRef { + import akka.actor.ActorRef + + val processor = system.actorOf(Props[MyPersistentActor]()) + + //#persist-async + class MyPersistentActor extends PersistentActor { + + def receiveRecover: Receive = { + case _ => // handle recovery here + } + + def receiveCommand: Receive = { + case c: String => { + sender() ! c + persistAsync(s"evt-$c-1") { e => sender() ! e } + persistAsync(s"evt-$c-2") { e => sender() ! e } + } + } + } + + // usage + processor ! "a" + processor ! "b" + + // possible order of received messages: + // a + // b + // evt-a-1 + // evt-a-2 + // evt-b-1 + // evt-b-2 + + //#persist-async + } new AnyRef { import akka.actor.Props diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 9b4d1c35ce..dd3871b015 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -36,9 +36,14 @@ Akka persistence is a separate jar file. Make sure that you have the following d Architecture ============ -* *Processor*: A processor is a persistent, stateful actor. Messages sent to a processor are written to a journal - before its ``receive`` method is called. When a processor is started or restarted, journaled messages are replayed - to that processor, so that it can recover internal state from these messages. +* *Processor* (deprecated, use *PersistentActor* instead): A processor is a persistent, stateful actor. Messages sent + to a processor are written to a journal before its ``onReceive`` method is called. When a processor is started or + restarted, journaled messages are replayed to that processor, so that it can recover internal state from these messages. + +* *PersistentActor*: Is a persistent, stateful actor. It is able to persist events to a journal and can react to + them in a thread-safe manner. It can be used to implement both *command* as well as *event sourced* actors. + When a persistent actor is started or restarted, journaled messages are replayed to that actor, so that it can + recover internal state from these messages. * *View*: A view is a persistent, stateful actor that receives journaled messages that have been written by another processor. A view itself does not journal new messages, instead, it updates internal state only from a processor's @@ -67,6 +72,12 @@ Architecture Processors ========== +.. warning:: + ``Processor`` is deprecated. Instead the current ``PersistentActor`` will be extended to provide equivalent + functionality if required (by introducing the ``persistAsync`` method). + For details see `Relaxed local consistency requirements and high throughput use-cases`_ as well as the discussion + and pull requests related to this `issue on Github `_. + A processor can be implemented by extending the ``Processor`` trait and implementing the ``receive`` method. .. includecode:: code/docs/persistence/PersistenceDocSpec.scala#definition @@ -463,6 +474,11 @@ use the ``deleteSnapshots`` method. Event sourcing ============== +.. note:: + The ``PersistentActor`` introduced in this section was previously known as ``EventsourcedProcessor`` + which was a subset of the ``PersistentActor``. Migrating your code to use persistent actors instead is + very simple and is explained in the :ref:`migration-guide-persistence-experimental-2.3.x-2.4.x`. + In all the examples so far, messages that change a processor's state have been sent as ``Persistent`` messages by an application, so that they can be replayed during recovery. From this point of view, the journal acts as a write-ahead-log for whatever ``Persistent`` messages a processor receives. This is also known as *command @@ -480,12 +496,12 @@ also process commands that do not change application state, such as query comman .. _Event Sourcing: http://martinfowler.com/eaaDev/EventSourcing.html -Akka persistence supports event sourcing with the ``EventsourcedProcessor`` trait (which implements event sourcing +Akka persistence supports event sourcing with the ``PersistentActor`` trait (which implements event sourcing as a pattern on top of command sourcing). A processor that extends this trait does not handle ``Persistent`` messages -directly but uses the ``persist`` method to persist and handle events. The behavior of an ``EventsourcedProcessor`` +directly but uses the ``persist`` method to persist and handle events. The behavior of an ``PersistentActor`` is defined by implementing ``receiveRecover`` and ``receiveCommand``. This is demonstrated in the following example. -.. includecode:: ../../../akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/EventsourcedExample.scala#eventsourced-example +.. includecode:: ../../../akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/PersistentActorExample.scala#persistent-actor-example The example defines two data types, ``Cmd`` and ``Evt`` to represent commands and events, respectively. The ``state`` of the ``ExampleProcessor`` is a list of persisted event data contained in ``ExampleState``. @@ -510,7 +526,7 @@ calls in context of a single command. The easiest way to run this example yourself is to download `Typesafe Activator `_ and open the tutorial named `Akka Persistence Samples with Scala `_. -It contains instructions on how to run the ``EventsourcedExample``. +It contains instructions on how to run the ``PersistentActorExample``. .. note:: @@ -519,6 +535,34 @@ It contains instructions on how to run the ``EventsourcedExample``. recovery you need to take special care to perform the same state transitions with ``become`` and ``unbecome`` in the ``receiveRecover`` method as you would have done in the command handler. +.. _persist-async-scala: + +Relaxed local consistency requirements and high throughput use-cases +-------------------------------------------------------------------- + +If faced with Relaxed local consistency requirements and high throughput demands sometimes ``PersistentActor`` and it's +``persist`` may not be enough in terms of consuming incoming Commands at a high rate, because it has to wait until all +Events related to a given Command are processed in order to start processing the next Command. While this abstraction is +very useful for most cases, sometimes you may be faced with relaxed requirements about consistency – for example you may +want to process commands as fast as you can, assuming that Event will eventually be persisted and handled properly in +the background and retroactively reacting to persistence failures if needed. + +The ``persistAsync`` method provides a tool for implementing high-throughput processors. It will *not* +stash incoming Commands while the Journal is still working on persisting and/or user code is executing event callbacks. + +In the below example, the event callbacks may be called "at any time", even after the next Command has been processed. +The ordering between events is still guaranteed ("evt-b-1" will be sent after "evt-a-2", which will be sent after "evt-a-1" etc.). + +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#persist-async + +Notice that the client does not have to wrap any messages in the `Persistent` class in order to obtain "command sourcing like" +semantics. It's up to the processor to decide about persisting (or not) of messages, unlike ``Processor`` where this decision +was made by the sender. + + +.. note:: + In order to implement the "*command sourcing*" simply call ``persistAsync(cmd)(...)`` right away on all incomming + messages right away, and handle them in the callback. Reliable event delivery ----------------------- @@ -556,9 +600,9 @@ Applications that want to have more explicit control over batch writes and batch size is greater than ``max-message-batch-size``. Also, a ``PersistentBatch`` is written isolated from other batches. ``Persistent`` messages contained in a ``PersistentBatch`` are received individually by a processor. -``PersistentBatch`` messages, for example, are used internally by an ``EventsourcedProcessor`` to ensure atomic +``PersistentBatch`` messages, for example, are used internally by an ``PersistentActor`` to ensure atomic writes of events. All events that are persisted in context of a single command are written as a single batch to the -journal (even if ``persist`` is called multiple times per command). The recovery of an ``EventsourcedProcessor`` +journal (even if ``persist`` is called multiple times per command). The recovery of an ``PersistentActor`` will therefore never be done partially (with only a subset of events persisted by a single command). Confirmation and deletion operations performed by :ref:`channels` are also batched. The maximum confirmation diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index 2f6cb2dcb8..c3190283bd 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -18,6 +18,8 @@ import akka.actor.AbstractActor * Event sourcing mixin for a [[Processor]]. */ private[persistence] trait Eventsourced extends Processor { + // TODO consolidate these traits as PersistentActor #15230 + /** * Processor recovery state. Waits for recovery completion and then changes to * `processingCommands` @@ -56,22 +58,44 @@ private[persistence] trait Eventsourced extends Processor { throw new UnsupportedOperationException("Persistent command batches not supported") case _: PersistentRepr ⇒ throw new UnsupportedOperationException("Persistent commands not supported") + case WriteMessageSuccess(p) ⇒ + withCurrentPersistent(p)(p ⇒ persistInvocations.get(0).handler(p.payload)) + onWriteComplete() + case s @ WriteMessagesSuccessful ⇒ Eventsourced.super.aroundReceive(receive, s) + case f: WriteMessagesFailed ⇒ Eventsourced.super.aroundReceive(receive, f) case _ ⇒ doAroundReceive(receive, message) } private def doAroundReceive(receive: Receive, message: Any): Unit = { Eventsourced.super.aroundReceive(receive, LoopMessageSuccess(message)) - if (!persistInvocations.isEmpty) { + + if (pendingStashingPersistInvocations > 0) { currentState = persistingEvents + } + + if (persistentEventBatch.nonEmpty) { Eventsourced.super.aroundReceive(receive, PersistentBatch(persistentEventBatch.reverse)) - persistInvocations = persistInvocations.reverse persistentEventBatch = Nil } else { processorStash.unstash() } } + private def onWriteComplete(): Unit = { + persistInvocations.remove(0) + + val nextIsStashing = !persistInvocations.isEmpty && persistInvocations.get(0).isInstanceOf[StashingPersistInvocation] + + if (nextIsStashing) { + currentState = persistingEvents + } + + if (persistInvocations.isEmpty) { + processorStash.unstash() + } + } + } /** @@ -86,29 +110,38 @@ private[persistence] trait Eventsourced extends Processor { case _: ConfirmablePersistent ⇒ processorStash.stash() case PersistentBatch(b) ⇒ - b.foreach(p ⇒ deleteMessage(p.sequenceNr, true)) + b.foreach(p ⇒ deleteMessage(p.sequenceNr, permanent = true)) throw new UnsupportedOperationException("Persistent command batches not supported") case p: PersistentRepr ⇒ - deleteMessage(p.sequenceNr, true) + deleteMessage(p.sequenceNr, permanent = true) throw new UnsupportedOperationException("Persistent commands not supported") case WriteMessageSuccess(p) ⇒ - withCurrentPersistent(p)(p ⇒ persistInvocations.head._2(p.payload)) - onWriteComplete() + val invocation = persistInvocations.get(0) + withCurrentPersistent(p)(p ⇒ invocation.handler(p.payload)) + onWriteComplete(invocation) case e @ WriteMessageFailure(p, _) ⇒ Eventsourced.super.aroundReceive(receive, message) // stops actor by default - onWriteComplete() - case s @ WriteMessagesSuccess ⇒ Eventsourced.super.aroundReceive(receive, s) - case f: WriteMessagesFailure ⇒ Eventsourced.super.aroundReceive(receive, f) - case other ⇒ processorStash.stash() + onWriteComplete(persistInvocations.get(0)) + case s @ WriteMessagesSuccessful ⇒ Eventsourced.super.aroundReceive(receive, s) + case f: WriteMessagesFailed ⇒ Eventsourced.super.aroundReceive(receive, f) + case other ⇒ processorStash.stash() } - def onWriteComplete(): Unit = { - persistInvocations = persistInvocations.tail - if (persistInvocations.isEmpty) { + private def onWriteComplete(invocation: PersistInvocation): Unit = { + if (invocation.isInstanceOf[StashingPersistInvocation]) { + // enables an early return to `processingCommands`, because if this counter hits `0`, + // we know the remaining persistInvocations are all `persistAsync` created, which + // means we can go back to processing commands also - and these callbacks will be called as soon as possible + pendingStashingPersistInvocations -= 1 + } + persistInvocations.remove(0) + + if (persistInvocations.isEmpty || pendingStashingPersistInvocations == 0) { currentState = processingCommands processorStash.unstash() } } + } /** @@ -126,7 +159,18 @@ private[persistence] trait Eventsourced extends Processor { receiveRecover(f) } - private var persistInvocations: List[(Any, Any ⇒ Unit)] = Nil + sealed trait PersistInvocation { + def handler: Any ⇒ Unit + } + /** forces processor to stash incoming commands untill all these invocations are handled */ + final case class StashingPersistInvocation(evt: Any, handler: Any ⇒ Unit) extends PersistInvocation + /** does not force the processor to stash commands */ + final case class AsyncPersistInvocation(evt: Any, handler: Any ⇒ Unit) extends PersistInvocation + + /** Used instead of iterating `persistInvocations` in order to check if safe to revert to processing commands */ + private var pendingStashingPersistInvocations: Long = 0 + /** Holds user-supplied callbacks for persist/persistAsync calls */ + private val persistInvocations = new java.util.LinkedList[PersistInvocation]() // we only append / isEmpty / get(0) on it private var persistentEventBatch: List[PersistentRepr] = Nil private var currentState: State = recovering @@ -151,11 +195,12 @@ private[persistence] trait Eventsourced extends Processor { * If persistence of an event fails, the processor will be stopped. This can be customized by * handling [[PersistenceFailure]] in [[receiveCommand]]. * - * @param event event to be persisted. + * @param event event to be persisted * @param handler handler for each persisted `event` */ final def persist[A](event: A)(handler: A ⇒ Unit): Unit = { - persistInvocations = (event, handler.asInstanceOf[Any ⇒ Unit]) :: persistInvocations + pendingStashingPersistInvocations += 1 + persistInvocations addLast StashingPersistInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) persistentEventBatch = PersistentRepr(event) :: persistentEventBatch } @@ -164,12 +209,47 @@ private[persistence] trait Eventsourced extends Processor { * `persist[A](event: A)(handler: A => Unit)` multiple times with the same `handler`, * except that `events` are persisted atomically with this method. * - * @param events events to be persisted. + * @param events events to be persisted * @param handler handler for each persisted `events` */ final def persist[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = events.foreach(persist(_)(handler)) + /** + * Asynchronously persists `event`. On successful persistence, `handler` is called with the + * persisted event. + * + * Unlike `persist` the processor will continue to receive incomming commands between the + * call to `persist` and executing it's `handler`. This asynchronous, non-stashing, version of + * of persist should be used when you favor throughput over the "command-2 only processed after + * command-1 effects' have been applied" guarantee, which is provided by the plain [[persist]] method. + * + * An event `handler` may close over processor state and modify it. The `sender` of a persisted + * event is the sender of the corresponding command. This means that one can reply to a command + * sender within an event `handler`. + * + * If persistence of an event fails, the processor will be stopped. This can be customized by + * handling [[PersistenceFailure]] in [[receiveCommand]]. + * + * @param event event to be persisted + * @param handler handler for each persisted `event` + */ + final def persistAsync[A](event: A)(handler: A ⇒ Unit): Unit = { + persistInvocations addLast AsyncPersistInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) + persistentEventBatch = PersistentRepr(event) :: persistentEventBatch + } + + /** + * Asynchronously persists `events` in specified order. This is equivalent to calling + * `persistAsync[A](event: A)(handler: A => Unit)` multiple times with the same `handler`, + * except that `events` are persisted atomically with this method. + * + * @param events events to be persisted + * @param handler handler for each persisted `events` + */ + final def persistAsync[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = + events.foreach(persistAsync(_)(handler)) + /** * Recovery handler that receives persisted events during recovery. If a state snapshot * has been captured and saved, this handler will receive a [[SnapshotOffer]] message @@ -235,13 +315,31 @@ private[persistence] trait Eventsourced extends Processor { /** * An event sourced processor. */ +@deprecated("EventsourcedProcessor will be removed in 2.4.x, instead extend the API equivalent `akka.persistence.PersistentProcessor`", since = "2.3.4") trait EventsourcedProcessor extends Processor with Eventsourced { + // TODO remove Processor #15230 def receive = receiveCommand } +/** + * An persistent Actor - can be used to implement command or event sourcing. + */ +// TODO remove EventsourcedProcessor / Processor #15230 +trait PersistentActor extends EventsourcedProcessor + +/** + * Java API: an persistent actor - can be used to implement command or event sourcing. + */ +abstract class UntypedPersistentActor extends UntypedEventsourcedProcessor +/** + * Java API: an persistent actor - can be used to implement command or event sourcing. + */ +abstract class AbstractPersistentActor extends AbstractEventsourcedProcessor + /** * Java API: an event sourced processor. */ +@deprecated("UntypedEventsourcedProcessor will be removed in 2.4.x, instead extend the API equivalent `akka.persistence.PersistentProcessor`", since = "2.3.4") abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Eventsourced { final def onReceive(message: Any) = onReceiveCommand(message) @@ -289,6 +387,39 @@ abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Events final def persist[A](events: JIterable[A], handler: Procedure[A]): Unit = persist(Util.immutableSeq(events))(event ⇒ handler(event)) + /** + * JAVA API: asynchronously persists `event`. On successful persistence, `handler` is called with the + * persisted event. + * + * Unlike `persist` the processor will continue to receive incomming commands between the + * call to `persist` and executing it's `handler`. This asynchronous, non-stashing, version of + * of persist should be used when you favor throughput over the "command-2 only processed after + * command-1 effects' have been applied" guarantee, which is provided by the plain [[persist]] method. + * + * An event `handler` may close over processor state and modify it. The `sender` of a persisted + * event is the sender of the corresponding command. This means that one can reply to a command + * sender within an event `handler`. + * + * If persistence of an event fails, the processor will be stopped. This can be customized by + * handling [[PersistenceFailure]] in [[receiveCommand]]. + * + * @param event event to be persisted + * @param handler handler for each persisted `event` + */ + final def persistAsync[A](event: A)(handler: Procedure[A]): Unit = + super[Eventsourced].persistAsync(event)(event ⇒ handler(event)) + + /** + * JAVA API: asynchronously persists `events` in specified order. This is equivalent to calling + * `persistAsync[A](event: A)(handler: A => Unit)` multiple times with the same `handler`, + * except that `events` are persisted atomically with this method. + * + * @param events events to be persisted + * @param handler handler for each persisted `events` + */ + final def persistAsync[A](events: JIterable[A])(handler: A ⇒ Unit): Unit = + super[Eventsourced].persistAsync(Util.immutableSeq(events))(event ⇒ handler(event)) + /** * Java API: recovery handler that receives persisted events during recovery. If a state snapshot * has been captured and saved, this handler will receive a [[SnapshotOffer]] message @@ -322,6 +453,7 @@ abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Events * [[PersistentBatch]] messages. In this case an `UnsupportedOperationException` is * thrown by the processor. */ +@deprecated("AbstractEventsourcedProcessor will be removed in 2.4.x, instead extend the API equivalent `akka.persistence.PersistentProcessor`", since = "2.3.4") abstract class AbstractEventsourcedProcessor extends AbstractActor with EventsourcedProcessor { /** * Java API: asynchronously persists `event`. On successful persistence, `handler` is called with the @@ -359,9 +491,37 @@ abstract class AbstractEventsourcedProcessor extends AbstractActor with Eventsou final def persist[A](events: JIterable[A], handler: Procedure[A]): Unit = persist(Util.immutableSeq(events))(event ⇒ handler(event)) + /** + * Java API: asynchronously persists `event`. On successful persistence, `handler` is called with the + * persisted event. + * + * Unlike `persist` the processor will continue to receive incomming commands between the + * call to `persistAsync` and executing it's `handler`. This asynchronous, non-stashing, version of + * of persist should be used when you favor throughput over the strict ordering guarantees that `persist` guarantees. + * + * If persistence of an event fails, the processor will be stopped. This can be customized by + * handling [[PersistenceFailure]] in [[receiveCommand]]. + * + * @param event event to be persisted + * @param handler handler for each persisted `event` + */ + final def persistAsync[A](event: A, handler: Procedure[A]): Unit = + persistAsync(event)(event ⇒ handler(event)) + + /** + * Java API: asynchronously persists `events` in specified order. This is equivalent to calling + * `persistAsync[A](event: A)(handler: A => Unit)` multiple times with the same `handler`, + * except that `events` are persisted atomically with this method. + * + * @param events events to be persisted + * @param handler handler for each persisted `events` + */ + final def persistAsync[A](events: JIterable[A], handler: Procedure[A]): Unit = + persistAsync(Util.immutableSeq(events))(event ⇒ handler(event)) + override def receive = super[EventsourcedProcessor].receive override def receive(receive: Receive): Unit = { throw new IllegalArgumentException("Define the behavior by overriding receiveRecover and receiveCommand") } -} \ No newline at end of file +} diff --git a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala index d55ee11325..030554cfd2 100644 --- a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala +++ b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala @@ -66,7 +66,7 @@ private[persistence] object JournalProtocol { * Reply message to a successful [[WriteMessages]] request. This reply is sent to the requestor * before all subsequent [[WriteMessageSuccess]] replies. */ - case object WriteMessagesSuccess + case object WriteMessagesSuccessful /** * Reply message to a failed [[WriteMessages]] request. This reply is sent to the requestor @@ -74,7 +74,7 @@ private[persistence] object JournalProtocol { * * @param cause failure cause. */ - final case class WriteMessagesFailure(cause: Throwable) + final case class WriteMessagesFailed(cause: Throwable) /** * Reply message to a successful [[WriteMessages]] request. For each contained [[PersistentRepr]] message diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala index 2a7bac5c19..22cdb268b4 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala @@ -17,6 +17,9 @@ import akka.persistence.serialization.Message /** * Persistent message. */ +@deprecated("Messages wrapped in Persistent were only required by Processor and Command Sourcing, " + + "which is now deprecated. Use `akka.persistence.PersistentActor` and Event Sourcing instead.", + since = "2.3.4") sealed abstract class Persistent { /** * This persistent message's payload. @@ -38,6 +41,9 @@ sealed abstract class Persistent { def withPayload(payload: Any): Persistent } +@deprecated("Messages wrapped in Persistent were only required by Processor and Command Sourcing, " + + "which is now deprecated. Use `akka.persistence.PersistentActor` and Event Sourcing instead.", + since = "2.3.4") object Persistent { /** * Java API: creates a new persistent message. Must only be used outside processors. @@ -65,6 +71,9 @@ object Persistent { * @param payload payload of the new persistent message. * @param currentPersistentMessage optional current persistent message, defaults to `None`. */ + @deprecated("Messages wrapped in Persistent were only required by Processor and Command Sourcing, " + + "which is now deprecated. Use `akka.persistence.PersistentActor` and Event Sourcing instead.", + since = "2.3.4") def apply(payload: Any)(implicit currentPersistentMessage: Option[Persistent] = None): Persistent = currentPersistentMessage.map(_.withPayload(payload)).getOrElse(PersistentRepr(payload)) @@ -79,6 +88,9 @@ object Persistent { * Persistent message that has been delivered by a [[Channel]] or [[PersistentChannel]]. Channel * destinations that receive messages of this type can confirm their receipt by calling [[confirm]]. */ +@deprecated("Messages wrapped in Persistent were only required by Processor and Command Sourcing, " + + "which is now deprecated. Use `akka.persistence.PersistentActor` and Event Sourcing instead.", + since = "2.3.4") sealed abstract class ConfirmablePersistent extends Persistent { /** * Called by [[Channel]] and [[PersistentChannel]] destinations to confirm the receipt of a @@ -93,6 +105,9 @@ sealed abstract class ConfirmablePersistent extends Persistent { def redeliveries: Int } +@deprecated("Messages wrapped in Persistent were only required by Processor and Command Sourcing, " + + "which is now deprecated. Use `akka.persistence.PersistentActor` and Event Sourcing instead.", + since = "2.3.4") object ConfirmablePersistent { /** * [[ConfirmablePersistent]] extractor. @@ -106,7 +121,12 @@ object ConfirmablePersistent { * journal. The processor receives the written messages individually as [[Persistent]] messages. * During recovery, they are also replayed individually. */ -final case class PersistentBatch(persistentBatch: immutable.Seq[Persistent]) extends Message { +@deprecated("Messages wrapped in Persistent were only required by Processor and Command Sourcing, " + + "which is now deprecated. Use `akka.persistence.PersistentActor` and Event Sourcing instead.", + since = "2.3.4") +case class PersistentBatch(persistentBatch: immutable.Seq[Persistent]) extends Message { + // todo while we want to remove Persistent() from user-land, the batch may (probably?) become private[akka] to remain for journal internals #15230 + /** * INTERNAL API. */ @@ -151,6 +171,8 @@ private[persistence] final case class PersistentIdImpl(processorId: String, sequ * @see [[journal.AsyncRecovery]] */ trait PersistentRepr extends Persistent with PersistentId with Message { + // todo we want to get rid of the Persistent() wrapper from user land; PersistentRepr is here to stay. #15230 + import scala.collection.JavaConverters._ /** diff --git a/akka-persistence/src/main/scala/akka/persistence/PersistentChannel.scala b/akka-persistence/src/main/scala/akka/persistence/PersistentChannel.scala index e647f7473b..add054bd53 100644 --- a/akka-persistence/src/main/scala/akka/persistence/PersistentChannel.scala +++ b/akka-persistence/src/main/scala/akka/persistence/PersistentChannel.scala @@ -265,7 +265,7 @@ private class RequestWriter(channelId: String, channelSettings: PersistentChanne override protected[akka] def aroundReceive(receive: Receive, message: Any): Unit = { super.aroundReceive(receive, message) message match { - case WriteMessagesSuccess | WriteMessagesFailure(_) ⇒ + case WriteMessagesSuccessful | WriteMessagesFailed(_) ⇒ // activate reader after to reduce delivery latency reader ! RequestsWritten case _ ⇒ diff --git a/akka-persistence/src/main/scala/akka/persistence/Processor.scala b/akka-persistence/src/main/scala/akka/persistence/Processor.scala index eb41391f0d..6882eacdbe 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Processor.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Processor.scala @@ -51,7 +51,10 @@ import akka.dispatch._ * @see [[Recover]] * @see [[PersistentBatch]] */ +@deprecated("Processor will be removed. Instead extend `akka.persistence.PersistentActor` and use it's `persistAsync(command)(callback)` method to get equivalent semantics.", since = "2.3.4") trait Processor extends Actor with Recovery { + // todo remove Processor in favor of PersistentActor #15230 + import JournalProtocol._ /** @@ -96,7 +99,7 @@ trait Processor extends Actor with Recovery { throw new ActorKilledException(errorMsg) } case LoopMessageSuccess(m) ⇒ process(receive, m) - case WriteMessagesSuccess | WriteMessagesFailure(_) ⇒ + case WriteMessagesSuccessful | WriteMessagesFailed(_) ⇒ if (processorBatch.isEmpty) batching = false else journalBatch() case p: PersistentRepr ⇒ addToBatch(p) @@ -113,7 +116,7 @@ trait Processor extends Actor with Recovery { } def addToBatch(p: PersistentRepr): Unit = - processorBatch = processorBatch :+ p.update(processorId = processorId, sequenceNr = nextSequenceNr(), sender = sender) + processorBatch = processorBatch :+ p.update(processorId = processorId, sequenceNr = nextSequenceNr(), sender = sender()) def addToBatch(pb: PersistentBatch): Unit = pb.persistentReprList.foreach(addToBatch) @@ -386,6 +389,7 @@ final case class RecoveryException(message: String, cause: Throwable) extends Ak * @see [[Recover]] * @see [[PersistentBatch]] */ +@deprecated("UntypedProcessor will be removed. Instead extend `akka.persistence.UntypedPersistentActor` and use it's `persistAsync(command)(callback)` method to get equivalent semantics.", since = "2.3.4") abstract class UntypedProcessor extends UntypedActor with Processor /** @@ -440,4 +444,5 @@ abstract class UntypedProcessor extends UntypedActor with Processor * @see [[Recover]] * @see [[PersistentBatch]] */ +@deprecated("AbstractProcessor will be removed. Instead extend `akka.persistence.AbstractPersistentActor` and use it's `persistAsync(command)(callback)` method to get equivalent semantics.", since = "2.3.4") abstract class AbstractProcessor extends AbstractActor with Processor diff --git a/akka-persistence/src/main/scala/akka/persistence/Recovery.scala b/akka-persistence/src/main/scala/akka/persistence/Recovery.scala index 06886b3a68..138bd46ada 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Recovery.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Recovery.scala @@ -36,7 +36,8 @@ trait Recovery extends Actor with Snapshotter with Stash with StashFactory { def updateLastSequenceNr(value: Long): Unit = _lastSequenceNr = value - protected def withCurrentPersistent(persistent: Persistent)(body: Persistent ⇒ Unit): Unit = try { + /** INTERNAL API */ + private[akka] def withCurrentPersistent(persistent: Persistent)(body: Persistent ⇒ Unit): Unit = try { _currentPersistent = persistent updateLastSequenceNr(persistent) body(persistent) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala index 68e1570b24..fc1a9daccf 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala @@ -35,10 +35,10 @@ trait AsyncWriteJournal extends Actor with AsyncRecovery { } asyncWriteMessages(persistentBatch.map(_.prepareWrite())) onComplete { case Success(_) ⇒ - resequencer ! Desequenced(WriteMessagesSuccess, cctr, processor, self) + resequencer ! Desequenced(WriteMessagesSuccessful, cctr, processor, self) resequence(WriteMessageSuccess(_)) case Failure(e) ⇒ - resequencer ! Desequenced(WriteMessagesFailure(e), cctr, processor, self) + resequencer ! Desequenced(WriteMessagesFailed(e), cctr, processor, self) resequence(WriteMessageFailure(_, e)) } resequencerCounter += persistentBatch.length + 1 diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala index dea109ea71..4ddd433a37 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala @@ -26,10 +26,10 @@ trait SyncWriteJournal extends Actor with AsyncRecovery { case WriteMessages(persistentBatch, processor) ⇒ Try(writeMessages(persistentBatch.map(_.prepareWrite()))) match { case Success(_) ⇒ - processor ! WriteMessagesSuccess + processor ! WriteMessagesSuccessful persistentBatch.foreach(p ⇒ processor.tell(WriteMessageSuccess(p), p.sender)) case Failure(e) ⇒ - processor ! WriteMessagesFailure(e) + processor ! WriteMessagesFailed(e) persistentBatch.foreach(p ⇒ processor tell (WriteMessageFailure(p, e), p.sender)) throw e } diff --git a/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala index ed9491d200..90498d29e3 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala @@ -87,7 +87,7 @@ object PerformanceSpec { } } - class EventsourcedTestProcessor(name: String) extends PerformanceTestProcessor(name) with EventsourcedProcessor { + class EventsourcedTestProcessor(name: String) extends PerformanceTestProcessor(name) with PersistentActor { val receiveRecover: Receive = { case _ ⇒ if (lastSequenceNr % 1000 == 0) print("r") } @@ -100,7 +100,7 @@ object PerformanceSpec { } } - class StashingEventsourcedTestProcessor(name: String) extends PerformanceTestProcessor(name) with EventsourcedProcessor { + class StashingEventsourcedTestProcessor(name: String) extends PerformanceTestProcessor(name) with PersistentActor { val receiveRecover: Receive = { case _ ⇒ if (lastSequenceNr % 1000 == 0) print("r") } @@ -141,7 +141,7 @@ class PerformanceSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "Perfor } } - def stressEventsourcedProcessor(failAt: Option[Long]): Unit = { + def stressPersistentActor(failAt: Option[Long]): Unit = { val processor = namedProcessor[EventsourcedTestProcessor] failAt foreach { processor ! FailAt(_) } 1 to warmupCycles foreach { i ⇒ processor ! s"msg${i}" } @@ -153,7 +153,7 @@ class PerformanceSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "Perfor } } - def stressStashingEventsourcedProcessor(): Unit = { + def stressStashingPersistentActor(): Unit = { val processor = namedProcessor[StashingEventsourcedTestProcessor] 1 to warmupCycles foreach { i ⇒ processor ! "b" } processor ! StartMeasure @@ -195,13 +195,13 @@ class PerformanceSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "Perfor "An event sourced processor" should { "have some reasonable throughput" in { - stressEventsourcedProcessor(None) + stressPersistentActor(None) } "have some reasonable throughput under failure conditions" in { - stressEventsourcedProcessor(Some(warmupCycles + loadCycles / 10)) + stressPersistentActor(Some(warmupCycles + loadCycles / 10)) } "have some reasonable throughput with stashing and unstashing every 3rd command" in { - stressStashingEventsourcedProcessor() + stressStashingPersistentActor() } } diff --git a/akka-persistence/src/test/scala/akka/persistence/EventsourcedSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala similarity index 61% rename from akka-persistence/src/test/scala/akka/persistence/EventsourcedSpec.scala rename to akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala index f11518646e..7ed797c071 100644 --- a/akka-persistence/src/test/scala/akka/persistence/EventsourcedSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala @@ -11,12 +11,14 @@ import akka.actor._ import akka.testkit.{ ImplicitSender, AkkaSpec } import akka.testkit.EventFilter import akka.testkit.TestProbe +import java.util.concurrent.atomic.AtomicInteger +import scala.util.Random -object EventsourcedSpec { +object PersistentActorSpec { final case class Cmd(data: Any) final case class Evt(data: Any) - abstract class ExampleProcessor(name: String) extends NamedProcessor(name) with EventsourcedProcessor { + abstract class ExampleProcessor(name: String) extends NamedProcessor(name) with PersistentActor { var events: List[Any] = Nil val updateState: Receive = { @@ -25,7 +27,7 @@ object EventsourcedSpec { val commonBehavior: Receive = { case "boom" ⇒ throw new TestException("boom") - case GetState ⇒ sender ! events.reverse + case GetState ⇒ sender() ! events.reverse } def receiveRecover = updateState @@ -122,7 +124,7 @@ object EventsourcedSpec { } } - class SnapshottingEventsourcedProcessor(name: String, probe: ActorRef) extends ExampleProcessor(name) { + class SnapshottingPersistentActor(name: String, probe: ActorRef) extends ExampleProcessor(name) { override def receiveRecover = super.receiveRecover orElse { case SnapshotOffer(_, events: List[_]) ⇒ probe ! "offered" @@ -141,7 +143,7 @@ object EventsourcedSpec { } } - class SnapshottingBecomingEventsourcedProcessor(name: String, probe: ActorRef) extends SnapshottingEventsourcedProcessor(name, probe) { + class SnapshottingBecomingPersistentActor(name: String, probe: ActorRef) extends SnapshottingPersistentActor(name, probe) { val becomingRecover: Receive = { case msg: SnapshotOffer ⇒ context.become(becomingCommand) @@ -160,17 +162,17 @@ object EventsourcedSpec { class ReplyInEventHandlerProcessor(name: String) extends ExampleProcessor(name) { val receiveCommand: Receive = { - case Cmd("a") ⇒ persist(Evt("a"))(evt ⇒ sender ! evt.data) - case p: Persistent ⇒ sender ! p // not expected + case Cmd("a") ⇒ persist(Evt("a"))(evt ⇒ sender() ! evt.data) + case p: Persistent ⇒ sender() ! p // not expected } } class UserStashProcessor(name: String) extends ExampleProcessor(name) { var stashed = false val receiveCommand: Receive = { - case Cmd("a") ⇒ if (!stashed) { stash(); stashed = true } else sender ! "a" - case Cmd("b") ⇒ persist(Evt("b"))(evt ⇒ sender ! evt.data) - case Cmd("c") ⇒ unstashAll(); sender ! "c" + case Cmd("a") ⇒ if (!stashed) { stash(); stashed = true } else sender() ! "a" + case Cmd("b") ⇒ persist(Evt("b"))(evt ⇒ sender() ! evt.data) + case Cmd("c") ⇒ unstashAll(); sender() ! "c" } } @@ -194,6 +196,110 @@ object EventsourcedSpec { case other ⇒ stash() } } + class AsyncPersistProcessor(name: String) extends ExampleProcessor(name) { + var counter = 0 + + val receiveCommand: Receive = commonBehavior orElse { + case Cmd(data) ⇒ + sender() ! data + persistAsync(Evt(s"$data-${incCounter()}")) { evt ⇒ + sender() ! evt.data + } + } + + private def incCounter(): Int = { + counter += 1 + counter + } + } + class AsyncPersistThreeTimesProcessor(name: String) extends ExampleProcessor(name) { + var counter = 0 + + val receiveCommand: Receive = commonBehavior orElse { + case Cmd(data) ⇒ + sender() ! data + + 1 to 3 foreach { i ⇒ + persistAsync(Evt(s"$data-${incCounter()}")) { evt ⇒ + sender() ! ("a" + evt.data.toString.drop(1)) // c-1 => a-1, as in "ack" + } + } + } + + private def incCounter(): Int = { + counter += 1 + counter + } + } + class AsyncPersistSameEventTwiceProcessor(name: String) extends ExampleProcessor(name) { + + // atomic because used from inside the *async* callbacks + val sendMsgCounter = new AtomicInteger() + + val receiveCommand: Receive = commonBehavior orElse { + case Cmd(data) ⇒ + sender() ! data + val event = Evt(data) + + persistAsync(event) { evt ⇒ + // be way slower, in order to be overtaken by the other callback + Thread.sleep(300) + sender() ! s"${evt.data}-a-${sendMsgCounter.incrementAndGet()}" + } + persistAsync(event) { evt ⇒ sender() ! s"${evt.data}-b-${sendMsgCounter.incrementAndGet()}" } + } + } + class AsyncPersistAndPersistMixedSyncAsyncSyncProcessor(name: String) extends ExampleProcessor(name) { + + var counter = 0 + + val receiveCommand: Receive = commonBehavior orElse { + case Cmd(data) ⇒ + sender() ! data + + persist(Evt(data + "-e1")) { evt ⇒ + sender() ! s"${evt.data}-${incCounter()}" + } + + // this should be happily executed + persistAsync(Evt(data + "-ea2")) { evt ⇒ + sender() ! s"${evt.data}-${incCounter()}" + } + + persist(Evt(data + "-e3")) { evt ⇒ + sender() ! s"${evt.data}-${incCounter()}" + } + } + + private def incCounter(): Int = { + counter += 1 + counter + } + } + class AsyncPersistAndPersistMixedSyncAsyncProcessor(name: String) extends ExampleProcessor(name) { + + var sendMsgCounter = 0 + + val start = System.currentTimeMillis() + def time = s" ${System.currentTimeMillis() - start}ms" + val receiveCommand: Receive = commonBehavior orElse { + case Cmd(data) ⇒ + sender() ! data + + persist(Evt(data + "-e1")) { evt ⇒ + sender() ! s"${evt.data}-${incCounter()}" + } + + persistAsync(Evt(data + "-ea2")) { evt ⇒ + sender() ! s"${evt.data}-${incCounter()}" + } + } + + def incCounter() = { + sendMsgCounter += 1 + sendMsgCounter + } + } class UserStashFailureProcessor(name: String) extends ExampleProcessor(name) { val receiveCommand: Receive = commonBehavior orElse { @@ -218,13 +324,13 @@ object EventsourcedSpec { class AnyValEventProcessor(name: String) extends ExampleProcessor(name) { val receiveCommand: Receive = { - case Cmd("a") ⇒ persist(5)(evt ⇒ sender ! evt) + case Cmd("a") ⇒ persist(5)(evt ⇒ sender() ! evt) } } } -abstract class EventsourcedSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender { - import EventsourcedSpec._ +abstract class PersistentActorSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender { + import PersistentActorSpec._ override protected def beforeEach() { super.beforeEach() @@ -235,7 +341,7 @@ abstract class EventsourcedSpec(config: Config) extends AkkaSpec(config) with Pe expectMsg(List("a-1", "a-2")) } - "An eventsourced processor" must { + "A persistent actor" must { "recover from persisted events" in { val processor = namedProcessor[Behavior1Processor] processor ! GetState @@ -306,7 +412,7 @@ abstract class EventsourcedSpec(config: Config) extends AkkaSpec(config) with Pe expectMsg(List("a-1", "a-2", "b-0", "c-30", "c-31", "c-32", "d-0", "e-30", "e-31", "e-32")) } "support snapshotting" in { - val processor1 = system.actorOf(Props(classOf[SnapshottingEventsourcedProcessor], name, testActor)) + val processor1 = system.actorOf(Props(classOf[SnapshottingPersistentActor], name, testActor)) processor1 ! Cmd("b") processor1 ! "snap" processor1 ! Cmd("c") @@ -314,13 +420,13 @@ abstract class EventsourcedSpec(config: Config) extends AkkaSpec(config) with Pe processor1 ! GetState expectMsg(List("a-1", "a-2", "b-41", "b-42", "c-41", "c-42")) - val processor2 = system.actorOf(Props(classOf[SnapshottingEventsourcedProcessor], name, testActor)) + val processor2 = system.actorOf(Props(classOf[SnapshottingPersistentActor], name, testActor)) expectMsg("offered") processor2 ! GetState expectMsg(List("a-1", "a-2", "b-41", "b-42", "c-41", "c-42")) } "support context.become during recovery" in { - val processor1 = system.actorOf(Props(classOf[SnapshottingEventsourcedProcessor], name, testActor)) + val processor1 = system.actorOf(Props(classOf[SnapshottingPersistentActor], name, testActor)) processor1 ! Cmd("b") processor1 ! "snap" processor1 ! Cmd("c") @@ -328,14 +434,14 @@ abstract class EventsourcedSpec(config: Config) extends AkkaSpec(config) with Pe processor1 ! GetState expectMsg(List("a-1", "a-2", "b-41", "b-42", "c-41", "c-42")) - val processor2 = system.actorOf(Props(classOf[SnapshottingBecomingEventsourcedProcessor], name, testActor)) + val processor2 = system.actorOf(Props(classOf[SnapshottingBecomingPersistentActor], name, testActor)) expectMsg("offered") expectMsg("I am becoming") processor2 ! GetState expectMsg(List("a-1", "a-2", "b-41", "b-42", "c-41", "c-42")) } "support confirmable persistent" in { - val processor1 = system.actorOf(Props(classOf[SnapshottingEventsourcedProcessor], name, testActor)) + val processor1 = system.actorOf(Props(classOf[SnapshottingPersistentActor], name, testActor)) processor1 ! Cmd("b") processor1 ! "snap" processor1 ! ConfirmablePersistentImpl(Cmd("c"), 4711, "some-id", false, 0, Seq.empty, null, null, null) @@ -343,7 +449,7 @@ abstract class EventsourcedSpec(config: Config) extends AkkaSpec(config) with Pe processor1 ! GetState expectMsg(List("a-1", "a-2", "b-41", "b-42", "c-41", "c-42")) - val processor2 = system.actorOf(Props(classOf[SnapshottingEventsourcedProcessor], name, testActor)) + val processor2 = system.actorOf(Props(classOf[SnapshottingPersistentActor], name, testActor)) expectMsg("offered") processor2 ! GetState expectMsg(List("a-1", "a-2", "b-41", "b-42", "c-41", "c-42")) @@ -356,9 +462,9 @@ abstract class EventsourcedSpec(config: Config) extends AkkaSpec(config) with Pe processor.tell(Persistent("not allowed"), probe.ref) } - processor.tell(Cmd("a"), probe.ref) - processor.tell(Cmd("a"), probe.ref) - processor.tell(Cmd("a"), probe.ref) + processor.tell(Cmd("w"), probe.ref) + processor.tell(Cmd("w"), probe.ref) + processor.tell(Cmd("w"), probe.ref) EventFilter[UnsupportedOperationException](occurrences = 1) intercept { processor.tell(Persistent("not allowed when persisting"), probe.ref) } @@ -401,8 +507,105 @@ abstract class EventsourcedSpec(config: Config) extends AkkaSpec(config) with Pe processor ! Cmd("a") expectMsg(5) } + "be able to opt-out from stashing messages until all events have been processed" in { + val processor = namedProcessor[AsyncPersistProcessor] + processor ! Cmd("x") + processor ! Cmd("y") + expectMsg("x") + expectMsg("y") // "y" command was processed before event persisted + expectMsg("x-1") + expectMsg("y-2") + } + "support multiple persistAsync calls for one command, and execute them 'when possible', not hindering command processing" in { + val processor = namedProcessor[AsyncPersistThreeTimesProcessor] + val commands = 1 to 10 map { i ⇒ Cmd(s"c-$i") } + + commands foreach { i ⇒ + Thread.sleep(Random.nextInt(10)) + processor ! i + } + + val all: Seq[String] = this.receiveN(40).asInstanceOf[Seq[String]] // each command = 1 reply + 3 event-replies + + val replies = all.filter(r ⇒ r.count(_ == '-') == 1) + replies should equal(commands.map(_.data)) + + val expectedAcks = (3 to 32) map { i ⇒ s"a-${i / 3}-${i - 2}" } + val acks = all.filter(r ⇒ r.count(_ == '-') == 2) + acks should equal(expectedAcks) + } + "reply to the original sender() of a command, even when using persistAsync" in { + // sanity check, the setting of sender() for PersistentRepl is handled by Processor currently + // but as we want to remove it soon, keeping the explicit test here. + val processor = namedProcessor[AsyncPersistThreeTimesProcessor] + + val commands = 1 to 10 map { i ⇒ Cmd(s"c-$i") } + val probes = Vector.fill(10)(TestProbe()) + + (probes zip commands) foreach { + case (p, c) ⇒ + processor.tell(c, p.ref) + } + + val ackClass = classOf[String] + within(3.seconds) { + probes foreach { _.expectMsgAllClassOf(ackClass, ackClass, ackClass) } + } + } + "support the same event being asyncPersist'ed multiple times" in { + val processor = namedProcessor[AsyncPersistSameEventTwiceProcessor] + processor ! Cmd("x") + expectMsg("x") + + expectMsg("x-a-1") + expectMsg("x-b-2") + expectNoMsg(100.millis) + } + "support a mix of persist calls (sync, async, sync) and persist calls in expected order" in { + val processor = namedProcessor[AsyncPersistAndPersistMixedSyncAsyncSyncProcessor] + processor ! Cmd("a") + processor ! Cmd("b") + processor ! Cmd("c") + expectMsg("a") + expectMsg("a-e1-1") // persist + expectMsg("a-ea2-2") // persistAsync, but ordering enforced by sync persist below + expectMsg("a-e3-3") // persist + expectMsg("b") + expectMsg("b-e1-4") + expectMsg("b-ea2-5") + expectMsg("b-e3-6") + expectMsg("c") + expectMsg("c-e1-7") + expectMsg("c-ea2-8") + expectMsg("c-e3-9") + + expectNoMsg(100.millis) + } + "support a mix of persist calls (sync, async) and persist calls" in { + val processor = namedProcessor[AsyncPersistAndPersistMixedSyncAsyncProcessor] + processor ! Cmd("a") + processor ! Cmd("b") + processor ! Cmd("c") + expectMsg("a") + expectMsg("a-e1-1") // persist, must be before next command + + var expectInAnyOrder1 = Set("b", "a-ea2-2") + expectInAnyOrder1 -= expectMsgAnyOf(expectInAnyOrder1.toList: _*) // ea2 is persistAsync, b (command) can processed before it + expectMsgAnyOf(expectInAnyOrder1.toList: _*) + + expectMsg("b-e1-3") // persist, must be before next command + + var expectInAnyOrder2 = Set("c", "b-ea2-4") + expectInAnyOrder2 -= expectMsgAnyOf(expectInAnyOrder2.toList: _*) // ea2 is persistAsync, b (command) can processed before it + expectMsgAnyOf(expectInAnyOrder2.toList: _*) + + expectMsg("c-e1-5") + expectMsg("c-ea2-6") + + expectNoMsg(100.millis) + } } } -class LeveldbEventsourcedSpec extends EventsourcedSpec(PersistenceSpec.config("leveldb", "LeveldbEventsourcedSpec")) -class InmemEventsourcedSpec extends EventsourcedSpec(PersistenceSpec.config("inmem", "InmemEventsourcedSpec")) +class LeveldbPersistentActorSpec extends PersistentActorSpec(PersistenceSpec.config("leveldb", "LeveldbEventsourcedSpec")) +class InmemPersistentActorSpec extends PersistentActorSpec(PersistenceSpec.config("inmem", "InmemEventsourcedSpec")) diff --git a/akka-persistence/src/test/scala/akka/persistence/ProcessorChannelSpec.scala b/akka-persistence/src/test/scala/akka/persistence/ProcessorChannelSpec.scala index c9822d5df5..2059031ce2 100644 --- a/akka-persistence/src/test/scala/akka/persistence/ProcessorChannelSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/ProcessorChannelSpec.scala @@ -47,7 +47,7 @@ object ProcessorChannelSpec { } } - class ResendingEventsourcedProcessor(name: String, channelProps: Props, destination: ActorRef) extends NamedProcessor(name) with EventsourcedProcessor { + class ResendingPersistentActor(name: String, channelProps: Props, destination: ActorRef) extends NamedProcessor(name) with PersistentActor { val channel = context.actorOf(channelProps) var events: List[String] = Nil @@ -167,10 +167,10 @@ abstract class ProcessorChannelSpec(config: Config) extends AkkaSpec(config) wit } } - "An eventsourced processor that uses a channel" can { + "A persistent actor that uses a channel" can { "reliably deliver events" in { val probe = TestProbe() - val ep = system.actorOf(Props(classOf[ResendingEventsourcedProcessor], "rep", testResendingChannelProps, probe.ref)) + val ep = system.actorOf(Props(classOf[ResendingPersistentActor], "rep", testResendingChannelProps, probe.ref)) ep ! "cmd" diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/EventsourcedExample.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/PersistentActorExample.java similarity index 93% rename from akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/EventsourcedExample.java rename to akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/PersistentActorExample.java index a903e30447..81302af637 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/EventsourcedExample.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/PersistentActorExample.java @@ -4,13 +4,13 @@ package sample.persistence; -//#eventsourced-example +//#persistent-actor-example import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; import akka.japi.pf.ReceiveBuilder; -import akka.persistence.AbstractEventsourcedProcessor; +import akka.persistence.AbstractPersistentActor; import akka.persistence.SnapshotOffer; import scala.PartialFunction; import scala.runtime.BoxedUnit; @@ -73,7 +73,7 @@ class ExampleState implements Serializable { } } -class ExampleProcessor extends AbstractEventsourcedProcessor { +class ExampleProcessor extends AbstractPersistentActor { private ExampleState state = new ExampleState(); public int getNumEvents() { @@ -104,9 +104,9 @@ class ExampleProcessor extends AbstractEventsourcedProcessor { match(String.class, s -> s.equals("print"), s -> System.out.println(state)).build(); } } -//#eventsourced-example +//#persistent-actor-example -public class EventsourcedExample { +public class PersistentActorExample { public static void main(String... args) throws Exception { final ActorSystem system = ActorSystem.create("example"); final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class), "processor-4-java8"); diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ProcessorFailureExample.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ProcessorFailureExample.java index 79bf884046..310fef14c4 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ProcessorFailureExample.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ProcessorFailureExample.java @@ -11,8 +11,6 @@ import akka.japi.pf.ReceiveBuilder; import akka.persistence.AbstractProcessor; import akka.persistence.Persistent; import scala.Option; -import scala.PartialFunction; -import scala.runtime.BoxedUnit; import java.util.ArrayList; diff --git a/akka-samples/akka-sample-persistence-java-lambda/tutorial/index.html b/akka-samples/akka-sample-persistence-java-lambda/tutorial/index.html index c15a18d423..4ad57d4772 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/tutorial/index.html +++ b/akka-samples/akka-sample-persistence-java-lambda/tutorial/index.html @@ -72,7 +72,7 @@ current processor state to stdout.

To run this example, go to the Run tab, and run the application main class -sample.persistence.EventsourcedExample several times. +sample.persistence.PersistentActorExample several times.

diff --git a/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/EventsourcedExample.java b/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/PersistentActorExample.java similarity index 90% rename from akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/EventsourcedExample.java rename to akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/PersistentActorExample.java index 5f3e51d00b..0a279debaf 100644 --- a/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/EventsourcedExample.java +++ b/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/PersistentActorExample.java @@ -1,13 +1,16 @@ package sample.persistence; -//#eventsourced-example +//#persistent-actor-example +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.japi.Procedure; +import akka.persistence.SnapshotOffer; +import akka.persistence.UntypedPersistentActor; + import java.io.Serializable; import java.util.ArrayList; -import akka.actor.*; -import akka.japi.Procedure; -import akka.persistence.*; - import static java.util.Arrays.asList; class Cmd implements Serializable { @@ -63,7 +66,7 @@ class ExampleState implements Serializable { } } -class ExampleProcessor extends UntypedEventsourcedProcessor { +class ExampleProcessor extends UntypedPersistentActor { private ExampleState state = new ExampleState(); public int getNumEvents() { @@ -100,9 +103,9 @@ class ExampleProcessor extends UntypedEventsourcedProcessor { } } } -//#eventsourced-example +//#persistent-actor-example -public class EventsourcedExample { +public class PersistentActorExample { public static void main(String... args) throws Exception { final ActorSystem system = ActorSystem.create("example"); final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class), "processor-4-java"); diff --git a/akka-samples/akka-sample-persistence-java/tutorial/index.html b/akka-samples/akka-sample-persistence-java/tutorial/index.html index d60d4e8d80..67b02252f1 100644 --- a/akka-samples/akka-sample-persistence-java/tutorial/index.html +++ b/akka-samples/akka-sample-persistence-java/tutorial/index.html @@ -72,7 +72,7 @@ current processor state to stdout.

To run this example, go to the Run tab, and run the application main class -sample.persistence.EventsourcedExample several times. +sample.persistence.PersistentActorExample several times.

diff --git a/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/EventsourcedExample.scala b/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/PersistentActorExample.scala similarity index 89% rename from akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/EventsourcedExample.scala rename to akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/PersistentActorExample.scala index 180210af9c..0165e5bc22 100644 --- a/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/EventsourcedExample.scala +++ b/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/PersistentActorExample.scala @@ -1,6 +1,6 @@ package sample.persistence -//#eventsourced-example +//#persistent-actor-example import akka.actor._ import akka.persistence._ @@ -13,7 +13,7 @@ final case class ExampleState(events: List[String] = Nil) { override def toString: String = events.reverse.toString } -class ExampleProcessor extends EventsourcedProcessor { +class ExampleProcessor extends PersistentActor { var state = ExampleState() def updateState(event: Evt): Unit = @@ -39,9 +39,9 @@ class ExampleProcessor extends EventsourcedProcessor { } } -//#eventsourced-example +//#persistent-actor-example -object EventsourcedExample extends App { +object PersistentActorExample extends App { val system = ActorSystem("example") val processor = system.actorOf(Props[ExampleProcessor], "processor-4-scala") diff --git a/akka-samples/akka-sample-persistence-scala/tutorial/index.html b/akka-samples/akka-sample-persistence-scala/tutorial/index.html index bb28189590..c9a2b56e25 100644 --- a/akka-samples/akka-sample-persistence-scala/tutorial/index.html +++ b/akka-samples/akka-sample-persistence-scala/tutorial/index.html @@ -90,7 +90,7 @@ current processor state to stdout.

To run this example, go to the Run tab, and run the application main class -sample.persistence.EventsourcedExample several times. +sample.persistence.PersistentActorExample several times.

diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index cb30abf776..93a94d7595 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -478,7 +478,6 @@ object AkkaBuild extends Build { validatePullRequest <<= validatePullRequest.dependsOn(test in Test), // add reportBinaryIssues to validatePullRequest on minor version maintenance branch validatePullRequest <<= validatePullRequest.dependsOn(reportBinaryIssues) - ) ++ mavenLocalResolverSettings ++ JUnitFileReporting.settings ++ StatsDMetrics.settings @@ -488,9 +487,9 @@ object AkkaBuild extends Build { val validatePullRequestTask = validatePullRequest := () lazy val mimaIgnoredProblems = { - import com.typesafe.tools.mima.core._ - Seq( - // add filters here, see release-2.2 branch + import com.typesafe.tools.mima.core._ + Seq( + // add filters here, see release-2.2 branch ) }